Skip to content

Commit

Permalink
surface write failures through Body.Writer.flush (#247)
Browse files Browse the repository at this point in the history
* surface write failures through `Body.Writer.flush`

* fix h2spec

* add changelog entry
  • Loading branch information
anmonteiro authored Aug 27, 2024
1 parent 471c7ae commit 554575c
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 48 deletions.
10 changes: 10 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
Unreleased
--------------

- h2: surface (body) write failures through `flush`
([#247](https://github.com/anmonteiro/ocaml-h2/pull/247))
- `Body.Writer.flush` now takes a callback of the type
``([ `Written | ` Closed] -> unit)``, informing the caller whether the
previous writes have been written or whether the output channel was
closed.

0.12.0 2024-06-23
--------------

Expand Down
60 changes: 40 additions & 20 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -125,28 +125,41 @@ module Writer = struct
let ready_to_write t = Serialize.Writer.wakeup t.writer

let write_char t c =
Faraday.write_char t.faraday c;
if not (Faraday.is_closed t.faraday) then Faraday.write_char t.faraday c;
ready_to_write t

let write_string t ?off ?len s =
Faraday.write_string ?off ?len t.faraday s;
if not (Faraday.is_closed t.faraday)
then Faraday.write_string ?off ?len t.faraday s;
ready_to_write t

let write_bigstring t ?off ?len b =
Faraday.write_bigstring ?off ?len t.faraday b;
if not (Faraday.is_closed t.faraday)
then Faraday.write_bigstring ?off ?len t.faraday b;
ready_to_write t

let schedule_bigstring t ?off ?len (b : Bigstringaf.t) =
Faraday.schedule_bigstring ?off ?len t.faraday b;
if not (Faraday.is_closed t.faraday)
then Faraday.schedule_bigstring ?off ?len t.faraday b;
ready_to_write t

let flush t kontinue =
Faraday.flush t.faraday kontinue;
ready_to_write t
if Serialize.Writer.is_closed t.writer
then kontinue `Closed
else (
Faraday.flush_with_reason t.faraday (function
| Drain -> kontinue `Closed
| Nothing_pending | Shift -> kontinue `Written);
ready_to_write t)

let is_closed t = Faraday.is_closed t.faraday
let has_pending_output t = Faraday.has_pending_output t.faraday

let close_and_drain t =
Faraday.close t.faraday;
(* Resolve all pending flushes *)
ignore (Faraday.drain t.faraday : int)

let close t =
Serialize.Writer.unyield t.writer;
Faraday.close t.faraday;
Expand All @@ -156,18 +169,25 @@ module Writer = struct

let transfer_to_writer t writer ~max_frame_size ~max_bytes stream_id =
let faraday = t.faraday in
match Faraday.operation faraday with
| `Yield | `Close -> 0
| `Writev iovecs ->
let buffered = t.buffered_bytes in
let iovecs = Httpun_types.IOVec.shiftv iovecs !buffered in
let lengthv = Httpun_types.IOVec.lengthv iovecs in
let writev_len = if max_bytes < lengthv then max_bytes else lengthv in
buffered := !buffered + writev_len;
let frame_info = Writer.make_frame_info ~max_frame_size stream_id in
Writer.schedule_iovecs writer frame_info ~len:writev_len iovecs;
Writer.flush writer (fun () ->
Faraday.shift faraday writev_len;
buffered := !buffered - writev_len);
writev_len
if Serialize.Writer.is_closed t.writer
then (
close_and_drain t;
0)
else
match Faraday.operation faraday with
| `Yield | `Close -> 0
| `Writev iovecs ->
let buffered = t.buffered_bytes in
let iovecs = Httpun_types.IOVec.shiftv iovecs !buffered in
let lengthv = Httpun_types.IOVec.lengthv iovecs in
let writev_len = if max_bytes < lengthv then max_bytes else lengthv in
buffered := !buffered + writev_len;
let frame_info = Writer.make_frame_info ~max_frame_size stream_id in
Writer.schedule_iovecs writer frame_info ~len:writev_len iovecs;
Writer.flush t.writer (function
| `Closed -> close_and_drain t
| `Written ->
Faraday.shift faraday writev_len;
buffered := !buffered - writev_len);
writev_len
end
2 changes: 1 addition & 1 deletion lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ let report_error =
t.did_send_go_away <- true;
if error <> Error_code.NoError
then t.error_handler (`Protocol_error (error, data));
Writer.flush t.writer (fun () ->
Writer.flush t.writer (fun _reason ->
(* XXX: We need to allow lower numbered streams to complete before
* shutting down. *)
shutdown_rw t);
Expand Down
26 changes: 13 additions & 13 deletions lib/h2.mli
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@

(** H2 is a high-performance, memory-efficient, and scalable HTTP/2
implementation for OCaml. It is based on the concepts introduced http/af,
and therefore uses the Angstrom and Faraday libraries to implement the
parsing and serialization layers of the HTTP/2 standard. It preserves
the same API as httpun wherever possible.
and therefore uses the Angstrom and Faraday libraries to implement the
parsing and serialization layers of the HTTP/2 standard. It preserves the
same API as httpun wherever possible.
Not unlike httpun, the user should be familiar with HTTP, and the basic
principles of memory management and vectorized IO in order to use this
library. *)
Not unlike httpun, the user should be familiar with HTTP, and the basic
principles of memory management and vectorized IO in order to use this
library. *)

(** {2 Basic HTTP Types} *)

Expand All @@ -65,10 +65,10 @@ module Method : module type of Httpun_types.Method
See {{:https://tools.ietf.org/html/rfc7231#section-6} RFC7231§6} for more
details.
This module is a strict superset of [Httpun_types.Status]. Even though the HTTP/2
specification removes support for the [Switching_protocols] status code, h2
keeps it for the sake of higher level interaction between OCaml libraries
that support both HTTP/1 and HTTP/2.
This module is a strict superset of [Httpun_types.Status]. Even though the
HTTP/2 specification removes support for the [Switching_protocols] status
code, h2 keeps it for the sake of higher level interaction between OCaml
libraries that support both HTTP/1 and HTTP/2.
See {{:https://tools.ietf.org/html/rfc7540#section-8.1.1} RFC7540§8.1.1} for
more details. *)
Expand Down Expand Up @@ -373,7 +373,7 @@ module Body : sig
the next opportunity without performing a copy. [bs] should not be
modified until a subsequent call to {!flush} has successfully completed. *)

val flush : t -> (unit -> unit) -> unit
val flush : t -> ([ `Written | `Closed ] -> unit) -> unit
(** [flush t f] makes all bytes in [t] available for writing to the awaiting
output channel. Once those bytes have reached that output channel, [f]
will be called.
Expand Down Expand Up @@ -449,8 +449,8 @@ module Response : sig
-> Status.t
-> t
(** [create ?headers status] creates an HTTP response with the given
parameters. Unlike the [Response] type in httpun, h2 does not define a
way for responses to carry reason phrases or protocol version.
parameters. Unlike the [Response] type in httpun, h2 does not define a way
for responses to carry reason phrases or protocol version.
See
{{:https://tools.ietf.org/html/rfc7540#section-8.1.2.4} RFC7540§8.1.2.4}
Expand Down
8 changes: 5 additions & 3 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ let unsafe_respond_with_data (t : t) response data =
* reserved (local): [...] In this state, only the following transitions
* are possible: The endpoint can send a HEADERS frame. This causes the
* stream to open in a "half-closed (remote)" state. *)
Writer.flush t.writer (fun () ->
Writer.flush t.writer (fun _reason ->
(* TODO(anmonteiro): different if closed? *)
t.state <- Active (HalfClosed request_info, stream))
| Closed _ -> assert false

Expand Down Expand Up @@ -268,7 +269,8 @@ let unsafe_respond_with_streaming (t : t) ~flush_headers_immediately response =
* reserved (local): [...] In this state, only the following transitions
* are possible: The endpoint can send a HEADERS frame. This causes the
* stream to open in a "half-closed (remote)" state. *)
Writer.flush t.writer (fun () ->
Writer.flush t.writer (fun _reason ->
(* TODO(anmonteiro): different if closed? *)
t.state <- Active (HalfClosed request_info, stream));
response_body
| Closed _ -> assert false
Expand Down Expand Up @@ -444,7 +446,7 @@ let close_stream (t : t) =
* flag). *)
Stream.reset_stream t Error_code.NoError
| Active (HalfClosed _, _) ->
Writer.flush t.writer (fun () -> Stream.finish_stream t Finished)
Writer.flush t.writer (fun _reason -> Stream.finish_stream t Finished)
| _ -> assert false)
| Exn _ -> Stream.reset_stream t InternalError
| Other { code; _ } -> Stream.reset_stream t code
Expand Down
11 changes: 9 additions & 2 deletions lib/serialize.ml
Original file line number Diff line number Diff line change
Expand Up @@ -589,12 +589,19 @@ module Writer = struct
t.wakeup <- Optional_thunk.none;
Optional_thunk.call_if_some f

let flush t f = flush t.encoder f
let flush t f =
flush_with_reason t.encoder (fun reason ->
let result =
match reason with
| Nothing_pending | Shift -> `Written
| Drain -> `Closed
in
f result)

let unyield t =
(* Faraday doesn't have a function to take the serializer out of a yield
state. In the meantime, `flush` does it. *)
flush t (fun () -> ())
flush t (fun _reason -> ())

let yield t = Faraday.yield t.encoder
let close t = Faraday.close t.encoder
Expand Down
2 changes: 1 addition & 1 deletion lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ let report_error t = function
~debug_data
~last_stream_id:t.max_client_stream_id
error;
Writer.flush t.writer (fun () ->
Writer.flush t.writer (fun _reason ->
(* XXX: We need to allow lower numbered streams to complete before
* shutting down. *)
shutdown t);
Expand Down
4 changes: 3 additions & 1 deletion lib_test/test_h2_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,9 @@ module Client_connection_tests = struct
flush_request t;
Body.Writer.write_string request_body "hello";
flush_request t;
Body.Writer.flush request_body (fun () -> Body.Writer.close request_body);
Body.Writer.flush request_body (function
| `Closed -> assert false
| `Written -> Body.Writer.close request_body);
let frames, _lenv = flush_pending_writes t in
Alcotest.(check (list int))
"Writes empty DATA frame"
Expand Down
17 changes: 11 additions & 6 deletions lib_test/test_h2_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,9 @@ module Server_connection_tests = struct
| [] -> Body.Writer.close body
| w :: ws ->
Body.Writer.write_string body w;
Body.Writer.flush body (fun () -> write ws)
Body.Writer.flush body (function
| `Closed -> assert false
| `Written -> write ws)
in
write writes

Expand Down Expand Up @@ -1159,8 +1161,9 @@ module Server_connection_tests = struct
let response = Response.create `OK in
let response_body = Reqd.respond_with_streaming reqd response in
Body.Writer.write_string response_body "hello";
Body.Writer.flush response_body (fun () ->
Body.Writer.close response_body)
Body.Writer.flush response_body (function
| `Closed -> assert false
| `Written -> Body.Writer.close response_body)
in
let t =
create_and_handle_preface
Expand Down Expand Up @@ -1211,9 +1214,11 @@ module Server_connection_tests = struct
(* Send the response for / *)
let response_body = Reqd.respond_with_streaming reqd response in
Body.Writer.write_string response_body "somedata";
Body.Writer.flush response_body (fun () ->
Reqd.schedule_trailers reqd Headers.(add empty "foo" "bar");
Body.Writer.close response_body)
Body.Writer.flush response_body (function
| `Closed -> assert false
| `Written ->
Reqd.schedule_trailers reqd Headers.(add empty "foo" "bar");
Body.Writer.close response_body)

let test_trailers () =
let t = create ~error_handler trailers_request_handler in
Expand Down
2 changes: 1 addition & 1 deletion spec/lwt_h2spec.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ let connection_handler : Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t =
ignore
@@ Reqd.try_with request_descriptor (fun () ->
Body.Writer.write_string response_body " data");
Body.Writer.flush response_body (fun () ->
Body.Writer.flush response_body (fun _reason ->
Body.Writer.close response_body))
| "/bigstring" ->
let res_body = "non-empty data." in
Expand Down

0 comments on commit 554575c

Please sign in to comment.