Skip to content

Commit

Permalink
cohttp-eio: Allow streaming responses from handler
Browse files Browse the repository at this point in the history
This changes the response type to `writer -> unit`. This allows
handlers to write the response inside the function, rather than
returning a request to cohttp to write it later. That's useful because
it allows e.g. streaming from an open file and then closing it
afterwards.

Partial application means that code using `respond_string` etc will
continue to work as before.

This also exposes a more polymorphic version of the `respond` function
that accepts sub-types of `Flow.source`, so that callers don't need to
cast the body.
  • Loading branch information
talex5 committed Mar 14, 2024
1 parent e48cbaf commit 34f881c
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
@@ -1,5 +1,6 @@
## Unreleased

- cohttp-eio: Make server response type abstract and allow streaming in cohttp-eio (talex5 #1024)
- cohttp-eio: Improve error handling in example server (talex5 #1023)
- cohttp-eio: Don't blow up `Server.callback` on client disconnections. (mefyl #1015)
- http: Fix assertion in `Source.to_string_trim` when `pos <> 0` (mefyl #1017)
Expand Down
2 changes: 1 addition & 1 deletion cohttp-eio/examples/dune
@@ -1,5 +1,5 @@
(executables
(names server1 client1 docker_client client_timeout client_tls)
(names server1 server2 client1 docker_client client_timeout client_tls)
(libraries
cohttp-eio
eio_main
Expand Down
37 changes: 37 additions & 0 deletions cohttp-eio/examples/server2.ml
@@ -0,0 +1,37 @@
let () = Logs.set_reporter (Logs_fmt.reporter ())
and () = Logs.Src.set_level Cohttp_eio.src (Some Debug)

let ( / ) = Eio.Path.( / )

(* To stream a file, we take the extra [writer] argument explicitly.
This means that we stream the response while the function is still
running and the file is still open. *)
let handler dir _socket request _body writer =
let path =
Http.Request.resource request
|> String.split_on_char '/'
|> List.filter (( <> ) "")
|> String.concat "/"
in
let path = if path = "" then "index.html" else path in
Eio.Path.with_open_in (dir / path) @@ fun flow ->
Cohttp_eio.Server.respond () ~status:`OK
~headers:(Http.Header.of_list [ ("content-type", "text/html") ])
~body:flow writer

let log_warning ex = Logs.warn (fun f -> f "%a" Eio.Exn.pp ex)

let () =
let port = ref 8080 in
Arg.parse
[ ("-p", Arg.Set_int port, " Listening port number(8080 by default)") ]
ignore "An HTTP/1.1 server";
Eio_main.run @@ fun env ->
Eio.Switch.run @@ fun sw ->
(* Restrict to current directory: *)
let htdocs = Eio.Stdenv.cwd env in
let socket =
Eio.Net.listen env#net ~sw ~backlog:128 ~reuse_addr:true
(`Tcp (Eio.Net.Ipaddr.V4.loopback, !port))
and server = Cohttp_eio.Server.make ~callback:(handler htdocs) () in
Cohttp_eio.Server.run socket server ~on_error:log_warning
47 changes: 22 additions & 25 deletions cohttp-eio/src/server.ml
Expand Up @@ -3,26 +3,29 @@ module IO = Io.IO

type body = Body.t
type conn = IO.conn * Cohttp.Connection.t [@@warning "-3"]
type response = Http.Response.t * Body.t
type writer = IO.oc
type response = writer -> unit

type response_action =
[ `Expert of Http.Response.t * (IO.ic -> IO.oc -> unit)
| `Response of Http.Response.t * body ]

(* type handler =
* sw:Eio.Switch.t ->
* Eio.Net.Sockaddr.stream ->
* Http.Request.t ->
* Eio.Flow.source ->
* Http.Response.t * Eio.Flow.source *)
| `Response of response ]

type t = {
conn_closed : conn -> unit;
handler : conn -> Http.Request.t -> body -> response_action;
handler : conn -> Http.Request.t -> body -> IO.ic -> IO.oc -> unit;
}

let make_response_action ?(conn_closed = fun _ -> ()) ~callback () =
{ conn_closed; handler = callback }
{
conn_closed;
handler =
(fun conn request body ic oc ->
match callback conn request body with
| `Expert (response, handler) ->
Io.Response.write_header response oc;
handler ic oc
| `Response fn -> fn oc);
}
let make_expert ?conn_closed ~callback () =
make_response_action ?conn_closed
Expand All @@ -31,12 +34,11 @@ let make_expert ?conn_closed ~callback () =
`Expert expert)
()
let make ?conn_closed ~callback () =
make_response_action ?conn_closed
~callback:(fun conn request body ->
let response = callback conn request body in
`Response response)
()
let make ?(conn_closed = fun _ -> ()) ~callback () =
{
conn_closed;
handler = (fun conn request body _ic oc -> callback conn request body oc);
}
let read input =
match Io.Request.read input with
Expand Down Expand Up @@ -88,9 +90,9 @@ let write output (response : Cohttp.Response.t) body =
in
Eio.Buf_write.flush output
let respond ?headers ?flush ~status ~body () =
let respond ?headers ?flush ~status ~body () oc =
let response = Cohttp.Response.make ?headers ?flush ~status () in
(response, body)
write oc response body
let respond_string ?headers ?flush ~status ~body () =
respond ?headers ?flush ~status ~body:(Body.of_string body) ()
Expand All @@ -117,12 +119,7 @@ let callback { conn_closed; handler } ((_, peer_address) as conn) input output =
(Body.of_string e)
| `Ok (request, body) ->
let () =
try
match handler (conn, id) request body with
| `Response (response, body) -> write output response body
| `Expert (response, handler) ->
let () = Io.Response.write_header response output in
handler input output
try handler (conn, id) request body input output
with Eio.Io (Eio.Net.E (Connection_reset _), _) ->
Logs.info (fun m ->
m "%a: connection reset" Eio.Net.Sockaddr.pp peer_address)
Expand Down
12 changes: 11 additions & 1 deletion cohttp-eio/src/server.mli
@@ -1,8 +1,18 @@
type writer

include
Cohttp.Generic.Server.S
with module IO = Io.IO
and type body = Body.t
and type response = Http.Response.t * Body.t
and type response = writer -> unit

val respond :
?headers:Http.Header.t ->
?flush:bool ->
status:Http.Status.t ->
body:_ Eio.Flow.source ->
unit ->
response IO.t

val run :
?max_connections:int ->
Expand Down
4 changes: 1 addition & 3 deletions cohttp-eio/tests/test.ml
@@ -1,5 +1,3 @@
open Eio.Std

let () =
Logs.set_level ~all:true @@ Some Logs.Debug;
Logs.set_reporter (Logs_fmt.reporter ())
Expand All @@ -13,7 +11,7 @@ let handler _conn request body =
Eio_mock.Flow.on_read body
[ `Return "Hello"; `Yield_then (`Return "World") ]
in
Cohttp_eio.Server.respond ~status:`OK ~body:(body :> Eio.Flow.source_ty r) ()
Cohttp_eio.Server.respond ~status:`OK ~body ()
| "/post" -> Cohttp_eio.Server.respond ~status:`OK ~body ()
| _ -> Cohttp_eio.Server.respond_string ~status:`Not_found ~body:"" ()

Expand Down

0 comments on commit 34f881c

Please sign in to comment.