diff --git a/CHANGES.md b/CHANGES.md index 6e79f1d..92b1d33 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,13 @@ +Unreleased +-------------- + +- h2: surface (body) write failures through `flush` + ([#247](https://github.com/anmonteiro/ocaml-h2/pull/247)) + - `Body.Writer.flush` now takes a callback of the type + ``([ `Written | ` Closed] -> unit)``, informing the caller whether the + previous writes have been written or whether the output channel was + closed. + 0.12.0 2024-06-23 -------------- diff --git a/lib/body.ml b/lib/body.ml index af071c3..d786629 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -125,28 +125,41 @@ module Writer = struct let ready_to_write t = Serialize.Writer.wakeup t.writer let write_char t c = - Faraday.write_char t.faraday c; + if not (Faraday.is_closed t.faraday) then Faraday.write_char t.faraday c; ready_to_write t let write_string t ?off ?len s = - Faraday.write_string ?off ?len t.faraday s; + if not (Faraday.is_closed t.faraday) + then Faraday.write_string ?off ?len t.faraday s; ready_to_write t let write_bigstring t ?off ?len b = - Faraday.write_bigstring ?off ?len t.faraday b; + if not (Faraday.is_closed t.faraday) + then Faraday.write_bigstring ?off ?len t.faraday b; ready_to_write t let schedule_bigstring t ?off ?len (b : Bigstringaf.t) = - Faraday.schedule_bigstring ?off ?len t.faraday b; + if not (Faraday.is_closed t.faraday) + then Faraday.schedule_bigstring ?off ?len t.faraday b; ready_to_write t let flush t kontinue = - Faraday.flush t.faraday kontinue; - ready_to_write t + if Serialize.Writer.is_closed t.writer + then kontinue `Closed + else ( + Faraday.flush_with_reason t.faraday (function + | Drain -> kontinue `Closed + | Nothing_pending | Shift -> kontinue `Written); + ready_to_write t) let is_closed t = Faraday.is_closed t.faraday let has_pending_output t = Faraday.has_pending_output t.faraday + let close_and_drain t = + Faraday.close t.faraday; + (* Resolve all pending flushes *) + ignore (Faraday.drain t.faraday : int) + let close t = Serialize.Writer.unyield t.writer; Faraday.close t.faraday; @@ -156,18 +169,25 @@ module Writer = struct let transfer_to_writer t writer ~max_frame_size ~max_bytes stream_id = let faraday = t.faraday in - match Faraday.operation faraday with - | `Yield | `Close -> 0 - | `Writev iovecs -> - let buffered = t.buffered_bytes in - let iovecs = Httpun_types.IOVec.shiftv iovecs !buffered in - let lengthv = Httpun_types.IOVec.lengthv iovecs in - let writev_len = if max_bytes < lengthv then max_bytes else lengthv in - buffered := !buffered + writev_len; - let frame_info = Writer.make_frame_info ~max_frame_size stream_id in - Writer.schedule_iovecs writer frame_info ~len:writev_len iovecs; - Writer.flush writer (fun () -> - Faraday.shift faraday writev_len; - buffered := !buffered - writev_len); - writev_len + if Serialize.Writer.is_closed t.writer + then ( + close_and_drain t; + 0) + else + match Faraday.operation faraday with + | `Yield | `Close -> 0 + | `Writev iovecs -> + let buffered = t.buffered_bytes in + let iovecs = Httpun_types.IOVec.shiftv iovecs !buffered in + let lengthv = Httpun_types.IOVec.lengthv iovecs in + let writev_len = if max_bytes < lengthv then max_bytes else lengthv in + buffered := !buffered + writev_len; + let frame_info = Writer.make_frame_info ~max_frame_size stream_id in + Writer.schedule_iovecs writer frame_info ~len:writev_len iovecs; + Writer.flush t.writer (function + | `Closed -> close_and_drain t + | `Written -> + Faraday.shift faraday writev_len; + buffered := !buffered - writev_len); + writev_len end diff --git a/lib/client_connection.ml b/lib/client_connection.ml index 2fa9cf9..b6963a2 100644 --- a/lib/client_connection.ml +++ b/lib/client_connection.ml @@ -135,7 +135,7 @@ let report_error = t.did_send_go_away <- true; if error <> Error_code.NoError then t.error_handler (`Protocol_error (error, data)); - Writer.flush t.writer (fun () -> + Writer.flush t.writer (fun _reason -> (* XXX: We need to allow lower numbered streams to complete before * shutting down. *) shutdown_rw t); diff --git a/lib/h2.mli b/lib/h2.mli index d2c79e6..5674c6e 100644 --- a/lib/h2.mli +++ b/lib/h2.mli @@ -34,13 +34,13 @@ (** H2 is a high-performance, memory-efficient, and scalable HTTP/2 implementation for OCaml. It is based on the concepts introduced http/af, - and therefore uses the Angstrom and Faraday libraries to implement the - parsing and serialization layers of the HTTP/2 standard. It preserves - the same API as httpun wherever possible. + and therefore uses the Angstrom and Faraday libraries to implement the + parsing and serialization layers of the HTTP/2 standard. It preserves the + same API as httpun wherever possible. - Not unlike httpun, the user should be familiar with HTTP, and the basic - principles of memory management and vectorized IO in order to use this - library. *) + Not unlike httpun, the user should be familiar with HTTP, and the basic + principles of memory management and vectorized IO in order to use this + library. *) (** {2 Basic HTTP Types} *) @@ -65,10 +65,10 @@ module Method : module type of Httpun_types.Method See {{:https://tools.ietf.org/html/rfc7231#section-6} RFC7231§6} for more details. - This module is a strict superset of [Httpun_types.Status]. Even though the HTTP/2 - specification removes support for the [Switching_protocols] status code, h2 - keeps it for the sake of higher level interaction between OCaml libraries - that support both HTTP/1 and HTTP/2. + This module is a strict superset of [Httpun_types.Status]. Even though the + HTTP/2 specification removes support for the [Switching_protocols] status + code, h2 keeps it for the sake of higher level interaction between OCaml + libraries that support both HTTP/1 and HTTP/2. See {{:https://tools.ietf.org/html/rfc7540#section-8.1.1} RFC7540§8.1.1} for more details. *) @@ -373,7 +373,7 @@ module Body : sig the next opportunity without performing a copy. [bs] should not be modified until a subsequent call to {!flush} has successfully completed. *) - val flush : t -> (unit -> unit) -> unit + val flush : t -> ([ `Written | `Closed ] -> unit) -> unit (** [flush t f] makes all bytes in [t] available for writing to the awaiting output channel. Once those bytes have reached that output channel, [f] will be called. @@ -449,8 +449,8 @@ module Response : sig -> Status.t -> t (** [create ?headers status] creates an HTTP response with the given - parameters. Unlike the [Response] type in httpun, h2 does not define a - way for responses to carry reason phrases or protocol version. + parameters. Unlike the [Response] type in httpun, h2 does not define a way + for responses to carry reason phrases or protocol version. See {{:https://tools.ietf.org/html/rfc7540#section-8.1.2.4} RFC7540§8.1.2.4} diff --git a/lib/reqd.ml b/lib/reqd.ml index 21ab47e..b50c308 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -215,7 +215,8 @@ let unsafe_respond_with_data (t : t) response data = * reserved (local): [...] In this state, only the following transitions * are possible: The endpoint can send a HEADERS frame. This causes the * stream to open in a "half-closed (remote)" state. *) - Writer.flush t.writer (fun () -> + Writer.flush t.writer (fun _reason -> + (* TODO(anmonteiro): different if closed? *) t.state <- Active (HalfClosed request_info, stream)) | Closed _ -> assert false @@ -268,7 +269,8 @@ let unsafe_respond_with_streaming (t : t) ~flush_headers_immediately response = * reserved (local): [...] In this state, only the following transitions * are possible: The endpoint can send a HEADERS frame. This causes the * stream to open in a "half-closed (remote)" state. *) - Writer.flush t.writer (fun () -> + Writer.flush t.writer (fun _reason -> + (* TODO(anmonteiro): different if closed? *) t.state <- Active (HalfClosed request_info, stream)); response_body | Closed _ -> assert false @@ -444,7 +446,7 @@ let close_stream (t : t) = * flag). *) Stream.reset_stream t Error_code.NoError | Active (HalfClosed _, _) -> - Writer.flush t.writer (fun () -> Stream.finish_stream t Finished) + Writer.flush t.writer (fun _reason -> Stream.finish_stream t Finished) | _ -> assert false) | Exn _ -> Stream.reset_stream t InternalError | Other { code; _ } -> Stream.reset_stream t code diff --git a/lib/serialize.ml b/lib/serialize.ml index e015341..979014e 100644 --- a/lib/serialize.ml +++ b/lib/serialize.ml @@ -589,12 +589,19 @@ module Writer = struct t.wakeup <- Optional_thunk.none; Optional_thunk.call_if_some f - let flush t f = flush t.encoder f + let flush t f = + flush_with_reason t.encoder (fun reason -> + let result = + match reason with + | Nothing_pending | Shift -> `Written + | Drain -> `Closed + in + f result) let unyield t = (* Faraday doesn't have a function to take the serializer out of a yield state. In the meantime, `flush` does it. *) - flush t (fun () -> ()) + flush t (fun _reason -> ()) let yield t = Faraday.yield t.encoder let close t = Faraday.close t.encoder diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 8164cd9..2ec972f 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -129,7 +129,7 @@ let report_error t = function ~debug_data ~last_stream_id:t.max_client_stream_id error; - Writer.flush t.writer (fun () -> + Writer.flush t.writer (fun _reason -> (* XXX: We need to allow lower numbered streams to complete before * shutting down. *) shutdown t); diff --git a/lib_test/test_h2_client.ml b/lib_test/test_h2_client.ml index 3f9feb0..700e71a 100644 --- a/lib_test/test_h2_client.ml +++ b/lib_test/test_h2_client.ml @@ -1251,7 +1251,9 @@ module Client_connection_tests = struct flush_request t; Body.Writer.write_string request_body "hello"; flush_request t; - Body.Writer.flush request_body (fun () -> Body.Writer.close request_body); + Body.Writer.flush request_body (function + | `Closed -> assert false + | `Written -> Body.Writer.close request_body); let frames, _lenv = flush_pending_writes t in Alcotest.(check (list int)) "Writes empty DATA frame" diff --git a/lib_test/test_h2_server.ml b/lib_test/test_h2_server.ml index 7fe2f1c..f18cafa 100644 --- a/lib_test/test_h2_server.ml +++ b/lib_test/test_h2_server.ml @@ -886,7 +886,9 @@ module Server_connection_tests = struct | [] -> Body.Writer.close body | w :: ws -> Body.Writer.write_string body w; - Body.Writer.flush body (fun () -> write ws) + Body.Writer.flush body (function + | `Closed -> assert false + | `Written -> write ws) in write writes @@ -1159,8 +1161,9 @@ module Server_connection_tests = struct let response = Response.create `OK in let response_body = Reqd.respond_with_streaming reqd response in Body.Writer.write_string response_body "hello"; - Body.Writer.flush response_body (fun () -> - Body.Writer.close response_body) + Body.Writer.flush response_body (function + | `Closed -> assert false + | `Written -> Body.Writer.close response_body) in let t = create_and_handle_preface @@ -1211,9 +1214,11 @@ module Server_connection_tests = struct (* Send the response for / *) let response_body = Reqd.respond_with_streaming reqd response in Body.Writer.write_string response_body "somedata"; - Body.Writer.flush response_body (fun () -> - Reqd.schedule_trailers reqd Headers.(add empty "foo" "bar"); - Body.Writer.close response_body) + Body.Writer.flush response_body (function + | `Closed -> assert false + | `Written -> + Reqd.schedule_trailers reqd Headers.(add empty "foo" "bar"); + Body.Writer.close response_body) let test_trailers () = let t = create ~error_handler trailers_request_handler in diff --git a/spec/lwt_h2spec.ml b/spec/lwt_h2spec.ml index 413dd6d..733f2d8 100644 --- a/spec/lwt_h2spec.ml +++ b/spec/lwt_h2spec.ml @@ -53,7 +53,7 @@ let connection_handler : Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t = ignore @@ Reqd.try_with request_descriptor (fun () -> Body.Writer.write_string response_body " data"); - Body.Writer.flush response_body (fun () -> + Body.Writer.flush response_body (fun _reason -> Body.Writer.close response_body)) | "/bigstring" -> let res_body = "non-empty data." in