Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make server response type abstract and allow streaming in cohttp-eio #1024

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
@@ -1,5 +1,6 @@
## Unreleased

- cohttp-eio: Make server response type abstract and allow streaming in cohttp-eio (talex5 #1024)
talex5 marked this conversation as resolved.
Show resolved Hide resolved
- cohttp-eio: Improve error handling in example server (talex5 #1023)
- cohttp-eio: Don't blow up `Server.callback` on client disconnections. (mefyl #1015)
- http: Fix assertion in `Source.to_string_trim` when `pos <> 0` (mefyl #1017)
Expand Down
2 changes: 1 addition & 1 deletion cohttp-eio/examples/dune
@@ -1,5 +1,5 @@
(executables
(names server1 client1 docker_client client_timeout client_tls)
(names server1 server2 client1 docker_client client_timeout client_tls)
(libraries
cohttp-eio
eio_main
Expand Down
14 changes: 7 additions & 7 deletions cohttp-eio/examples/server1.ml
Expand Up @@ -33,14 +33,14 @@ and () = Logs.Src.set_level Cohttp_eio.src (Some Debug)

let handler _socket request _body =
match Http.Request.resource request with
| "/" -> (Http.Response.make (), Cohttp_eio.Body.of_string text)
| "/" -> Cohttp_eio.Server.respond_string ~status:`OK ~body:text ()
| "/html" ->
( Http.Response.make
~headers:(Http.Header.of_list [ ("content-type", "text/html") ])
(),
(* Use a plain flow to test chunked encoding *)
Eio.Flow.string_source text )
| _ -> (Http.Response.make ~status:`Not_found (), Cohttp_eio.Body.of_string "")
(* Use a plain flow to test chunked encoding *)
let body = Eio.Flow.string_source text in
Cohttp_eio.Server.respond () ~status:`OK
~headers:(Http.Header.of_list [ ("content-type", "text/html") ])
~body
| _ -> Cohttp_eio.Server.respond_string ~status:`Not_found ~body:"" ()

let log_warning ex = Logs.warn (fun f -> f "%a" Eio.Exn.pp ex)

Expand Down
37 changes: 37 additions & 0 deletions cohttp-eio/examples/server2.ml
@@ -0,0 +1,37 @@
let () = Logs.set_reporter (Logs_fmt.reporter ())
and () = Logs.Src.set_level Cohttp_eio.src (Some Debug)

let ( / ) = Eio.Path.( / )

(* To stream a file, we take the extra [writer] argument explicitly.
This means that we stream the response while the function is still
running and the file is still open. *)
let handler dir _socket request _body writer =
let path =
Http.Request.resource request
|> String.split_on_char '/'
|> List.filter (( <> ) "")
|> String.concat "/"
in
let path = if path = "" then "index.html" else path in
Eio.Path.with_open_in (dir / path) @@ fun flow ->
Cohttp_eio.Server.respond () ~status:`OK
~headers:(Http.Header.of_list [ ("content-type", "text/html") ])
~body:flow writer

let log_warning ex = Logs.warn (fun f -> f "%a" Eio.Exn.pp ex)

let () =
let port = ref 8080 in
Arg.parse
[ ("-p", Arg.Set_int port, " Listening port number(8080 by default)") ]
ignore "An HTTP/1.1 server";
Eio_main.run @@ fun env ->
Eio.Switch.run @@ fun sw ->
(* Restrict to current directory: *)
let htdocs = Eio.Stdenv.cwd env in
let socket =
Eio.Net.listen env#net ~sw ~backlog:128 ~reuse_addr:true
(`Tcp (Eio.Net.Ipaddr.V4.loopback, !port))
and server = Cohttp_eio.Server.make ~callback:(handler htdocs) () in
Cohttp_eio.Server.run socket server ~on_error:log_warning
60 changes: 30 additions & 30 deletions cohttp-eio/src/server.ml
Expand Up @@ -3,44 +3,42 @@ module IO = Io.IO

type body = Body.t
type conn = IO.conn * Cohttp.Connection.t [@@warning "-3"]
type writer = IO.oc
type response = writer -> unit

type response_action =
[ `Expert of Http.Response.t * (IO.ic -> IO.oc -> unit IO.t)
| `Response of Http.Response.t * body ]

(* type handler =
* sw:Eio.Switch.t ->
* Eio.Net.Sockaddr.stream ->
* Http.Request.t ->
* Eio.Flow.source ->
* Http.Response.t * Eio.Flow.source *)
[ `Expert of Http.Response.t * (IO.ic -> IO.oc -> unit)
| `Response of response ]

type t = {
conn_closed : conn -> unit;
handler : conn -> Http.Request.t -> body -> response_action IO.t;
handler : conn -> Http.Request.t -> body -> IO.ic -> IO.oc -> unit;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern is that the change to old-style response was done to align the APIs, with this they APIs are diverging again (at least in the expectations and signatures)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, although not when using the normal response functions. So it's certainly possible to write code that works with both.

}

let make_response_action ?(conn_closed = fun _ -> ()) ~callback () =
{ conn_closed; handler = callback }
{
conn_closed;
handler =
(fun conn request body ic oc ->
match callback conn request body with
| `Expert (response, handler) ->
Io.Response.write_header response oc;
handler ic oc
| `Response fn -> fn oc);
}

let make_expert ?conn_closed ~callback () =
make_response_action ?conn_closed
~callback:(fun conn request body ->
IO.(callback conn request body >>= fun expert -> `Expert expert))
let expert = callback conn request body in
`Expert expert)
()

let make ?conn_closed ~callback () =
make_response_action ?conn_closed
~callback:(fun conn request body ->
IO.(callback conn request body >>= fun response -> `Response response))
()

let respond ?headers ?flush ~status ~body () =
let response = Cohttp.Response.make ?headers ?flush ~status () in
(response, body)

let respond_string ?headers ?flush ~status ~body () =
respond ?headers ?flush ~status ~body:(Body.of_string body) ()
let make ?(conn_closed = fun _ -> ()) ~callback () =
{
conn_closed;
handler = (fun conn request body _ic oc -> callback conn request body oc);
}

let read input =
match Io.Request.read input with
Expand Down Expand Up @@ -92,6 +90,13 @@ let write output (response : Cohttp.Response.t) body =
in
Eio.Buf_write.flush output

let respond ?headers ?flush ~status ~body () oc =
let response = Cohttp.Response.make ?headers ?flush ~status () in
write oc response body

let respond_string ?headers ?flush ~status ~body () =
respond ?headers ?flush ~status ~body:(Body.of_string body) ()

let callback { conn_closed; handler } ((_, peer_address) as conn) input output =
let id = (Cohttp.Connection.create () [@ocaml.warning "-3"]) in
let rec handle () =
Expand All @@ -114,12 +119,7 @@ let callback { conn_closed; handler } ((_, peer_address) as conn) input output =
(Body.of_string e)
| `Ok (request, body) ->
let () =
try
match handler (conn, id) request body with
| `Response (response, body) -> write output response body
| `Expert (response, handler) ->
let () = Io.Response.write_header response output in
handler input output
try handler (conn, id) request body input output
with Eio.Io (Eio.Net.E (Connection_reset _), _) ->
Logs.info (fun m ->
m "%a: connection reset" Eio.Net.Sockaddr.pp peer_address)
Expand Down
16 changes: 15 additions & 1 deletion cohttp-eio/src/server.mli
@@ -1,4 +1,18 @@
include Cohttp.Generic.Server.S with module IO = Io.IO and type body = Body.t
type writer

include
Cohttp.Generic.Server.S
with module IO = Io.IO
and type body = Body.t
and type response = writer -> unit

val respond :
?headers:Http.Header.t ->
?flush:bool ->
status:Http.Status.t ->
body:_ Eio.Flow.source ->
unit ->
response IO.t

val run :
?max_connections:int ->
Expand Down
10 changes: 4 additions & 6 deletions cohttp-eio/tests/test.ml
@@ -1,21 +1,19 @@
open Eio.Std

let () =
Logs.set_level ~all:true @@ Some Logs.Debug;
Logs.set_reporter (Logs_fmt.reporter ())

let handler _conn request body =
match Http.Request.resource request with
| "/" -> (Http.Response.make (), Cohttp_eio.Body.of_string "root")
| "/" -> Cohttp_eio.Server.respond_string ~status:`OK ~body:"root" ()
| "/stream" ->
let body = Eio_mock.Flow.make "streaming body" in
let () =
Eio_mock.Flow.on_read body
[ `Return "Hello"; `Yield_then (`Return "World") ]
in
(Http.Response.make (), (body :> Eio.Flow.source_ty r))
| "/post" -> (Http.Response.make (), body)
| _ -> (Http.Response.make ~status:`Not_found (), Cohttp_eio.Body.of_string "")
Cohttp_eio.Server.respond ~status:`OK ~body ()
| "/post" -> Cohttp_eio.Server.respond ~status:`OK ~body ()
| _ -> Cohttp_eio.Server.respond_string ~status:`Not_found ~body:"" ()

let () =
Eio_main.run @@ fun env ->
Expand Down
7 changes: 6 additions & 1 deletion cohttp-lwt/src/s.ml
Expand Up @@ -224,7 +224,12 @@ end
(** The [Server] module implements a pipelined HTTP/1.1 server. *)
module type Server = sig
module IO : IO
include Cohttp.Generic.Server.S with type body = Body.t and module IO := IO

include
Cohttp.Generic.Server.S
with type body = Body.t
and module IO := IO
and type response = Http.Response.t * Body.t

val resolve_local_file : docroot:string -> uri:Uri.t -> string
[@@deprecated "Please use Cohttp.Path.resolve_local_file. "]
Expand Down
1 change: 1 addition & 0 deletions cohttp-lwt/src/server.ml
Expand Up @@ -7,6 +7,7 @@ module Make (IO : S.IO) = struct
module Request = Make.Request (IO)
module Response = Make.Response (IO)

type response = Http.Response.t * Body.t
type body = Body.t

let src = Logs.Src.create "cohttp.lwt.server" ~doc:"Cohttp Lwt server module"
Expand Down
9 changes: 5 additions & 4 deletions cohttp/src/server.ml
Expand Up @@ -3,10 +3,11 @@ module type S = sig

type body
type conn = IO.conn * Connection.t [@@warning "-3"]
type response

type response_action =
[ `Expert of Http.Response.t * (IO.ic -> IO.oc -> unit IO.t)
| `Response of Http.Response.t * body ]
| `Response of response ]
(** A request handler can respond in two ways:

- Using [`Response], with a {!Response.t} and a {!body}.
Expand Down Expand Up @@ -38,7 +39,7 @@ module type S = sig

val make :
?conn_closed:(conn -> unit) ->
callback:(conn -> Http.Request.t -> body -> (Http.Response.t * body) IO.t) ->
callback:(conn -> Http.Request.t -> body -> response IO.t) ->
unit ->
t

Expand All @@ -48,7 +49,7 @@ module type S = sig
status:Http.Status.t ->
body:body ->
unit ->
(Http.Response.t * body) IO.t
response IO.t
(** [respond ?headers ?flush ~status ~body] will respond to an HTTP request
with the given [status] code and response [body]. If [flush] is true, then
every response chunk will be flushed to the network rather than being
Expand All @@ -64,7 +65,7 @@ module type S = sig
status:Http.Status.t ->
body:string ->
unit ->
(Http.Response.t * body) IO.t
response IO.t

val callback : t -> IO.conn -> IO.ic -> IO.oc -> unit IO.t
end