Complete transition to epoll, all unit tests passing

This commit is contained in:
Neale Pickett 2008-03-18 15:27:03 -06:00
parent 9f058248ba
commit 667062bf93
9 changed files with 121 additions and 157 deletions

View File

@ -1,6 +1,4 @@
OCAMLPACKS[] = OCAMLPACKS[] =
equeue
pcre
str str
OCAML_CLIBS = ocamlepoll OCAML_CLIBS = ocamlepoll
OCAMLCFLAGS += -g OCAMLCFLAGS += -g
@ -26,7 +24,7 @@ section
dispatch_tests.cmo: dispatch_tests.cmo:
dispatch_tests$(EXT_OBJ): dispatch_tests$(EXT_OBJ):
OCamlProgram(tests, tests dispatch chat irc command iobuf client channel) OCamlProgram(tests, tests dispatch irc command iobuf client channel)
.PHONY: test .PHONY: test
test: tests test: tests

View File

@ -40,7 +40,7 @@ let reply cli num ?(args=[]) text =
([!(cli.nick)] @ args) ([!(cli.nick)] @ args)
(Some text)) (Some text))
let handle_close cli () = let handle_close cli message =
Hashtbl.remove by_nick !(cli.nick) Hashtbl.remove by_nick !(cli.nick)
let handle_command cli iobuf cmd = let handle_command cli iobuf cmd =
@ -161,22 +161,22 @@ let set_nick cli nick =
Hashtbl.replace by_nick nick cli; Hashtbl.replace by_nick nick cli;
cli.nick := nick cli.nick := nick
let rec handle_command_prereg (nick', username', realname', password') iobuf cmd = let rec handle_command_prereg (nick, username, realname, password) iobuf cmd =
(* Handle a command during the login phase *) (* Handle a command during the login phase *)
let acc = let acc =
match (Command.as_tuple cmd) with match (Command.as_tuple cmd) with
| (None, "PASS", [password], None) -> | (None, "PASS", [password'], None) ->
(nick', username', realname', Some password) (nick, username, realname, Some password')
| (None, "USER", [username; _; _], Some realname) -> | (None, "USER", [username'; _; _], Some realname') ->
(nick', Some username, Some (Irc.truncate realname 40), password') (nick, Some username', Some (Irc.truncate realname' 40), password)
| (None, "NICK", [nick], None) -> | (None, "NICK", [nick'], None) ->
(Some nick, username', realname', password') (Some nick', username, realname, password)
| _ -> | _ ->
Iobuf.write iobuf (Command.create Iobuf.write iobuf (Command.create
(Some !(Irc.name)) (Some !(Irc.name))
"451" ["*"] "451" ["*"]
(Some "Register first.")); (Some "Register first."));
(nick', username', realname', password') (nick, username, realname, password)
in in
let welcome cli = let welcome cli =
try try
@ -211,8 +211,8 @@ let rec handle_command_prereg (nick', username', realname', password') iobuf cmd
| _ -> | _ ->
Iobuf.rebind iobuf (handle_command_prereg acc) ignore Iobuf.rebind iobuf (handle_command_prereg acc) ignore
let handle_connection ues grp fd = let handle_connection d fd addr =
let command_handler = handle_command_prereg (None, None, None, None) in let command_handler = handle_command_prereg (None, None, None, None) in
let close_handler = ignore in let close_handler = ignore in
Iobuf.bind ues grp fd command_handler close_handler Iobuf.bind d fd command_handler close_handler

View File

@ -1,5 +1,5 @@
type t type t
val write : t -> Command.t -> unit val write : t -> Command.t -> unit
val handle_connection : Unixqueue.event_system -> Unixqueue.group -> Unix.file_descr -> unit val handle_connection : Dispatch.t -> Unix.file_descr -> Unix.sockaddr -> unit

View File

@ -39,7 +39,7 @@ let extract_word s =
let rec from_string line = let rec from_string line =
(* Very simple. Pull out words until you get one starting with ":". (* Very simple. Pull out words until you get one starting with ":".
The very first word might start with ":", that doesn't count The very first word might start with ":", that doesn't count
because it's the sender.. *) because it's the sender. *)
let rec loop sender acc line = let rec loop sender acc line =
let c = (if (line = "") then None else (Some line.[0])) in let c = (if (line = "") then None else (Some line.[0])) in
match (c, acc) with match (c, acc) with

View File

@ -1,8 +1,7 @@
(* ========================================== (* **************************************
* I/O buf stuff * IRC Command I/O buffers
*) * **************************************)
type t = {ues: Unixqueue.event_system; type t = {d: Dispatch.t;
grp: Unixqueue.group;
fd: Unix.file_descr; fd: Unix.file_descr;
outq: Command.t Queue.t; outq: Command.t Queue.t;
unsent: string ref; unsent: string ref;
@ -10,39 +9,24 @@ type t = {ues: Unixqueue.event_system;
ibuf_len: int ref; ibuf_len: int ref;
addr: string; addr: string;
command_handler: (t -> Command.t -> unit) ref; command_handler: (t -> Command.t -> unit) ref;
close_handler: (unit -> unit) ref} close_handler: (string -> unit) ref}
let ibuf_max = 4096 let ibuf_max = 4096
let max_outq = 50 let max_outq = 50
let obuf_max = 4096 let obuf_max = 4096
let by_file_descr = Hashtbl.create 25
let addr iobuf = iobuf.addr let addr iobuf = iobuf.addr
let write iobuf cmd = let write iobuf cmd =
let was_empty = Queue.is_empty iobuf.outq in let was_empty = Queue.is_empty iobuf.outq in
Queue.add cmd iobuf.outq; Queue.add cmd iobuf.outq;
if (was_empty && (!(iobuf.unsent) = "")) then if (was_empty && (!(iobuf.unsent) = "")) then
Unixqueue.add_resource Dispatch.modify iobuf.d iobuf.fd [Dispatch.Input; Dispatch.Output]
iobuf.ues iobuf.grp (Unixqueue.Wait_out iobuf.fd, -.1.0)
let close iobuf = let close iobuf message =
!(iobuf.close_handler) (); !(iobuf.close_handler) message;
Hashtbl.remove by_file_descr iobuf.fd; Dispatch.delete iobuf.d iobuf.fd;
Unix.close iobuf.fd; Unix.close iobuf.fd
Unixqueue.remove_resource iobuf.ues iobuf.grp (Unixqueue.Wait_in iobuf.fd);
try
Unixqueue.remove_resource iobuf.ues iobuf.grp (Unixqueue.Wait_out iobuf.fd);
with Not_found ->
()
let handle_close fd =
try
let iobuf = Hashtbl.find by_file_descr fd in
close iobuf
with Not_found ->
()
let crlf = Str.regexp "\r?\n" let crlf = Str.regexp "\r?\n"
@ -63,27 +47,22 @@ let handle_input iobuf =
in in
loop lines loop lines
let handle_event ues esys e = let rec handle_events iobuf fd events =
match e with match events with
| Unixqueue.Input_arrived (g, fd) -> | [] ->
let iobuf = Hashtbl.find by_file_descr fd in ()
| Dispatch.Input :: tl ->
let size = ibuf_max - !(iobuf.ibuf_len) in let size = ibuf_max - !(iobuf.ibuf_len) in
let len = Unix.read fd iobuf.ibuf !(iobuf.ibuf_len) size in let len = Unix.read fd iobuf.ibuf !(iobuf.ibuf_len) size in
if (len > 0) then iobuf.ibuf_len := !(iobuf.ibuf_len) + len;
begin handle_input iobuf;
iobuf.ibuf_len := !(iobuf.ibuf_len) + len; if (!(iobuf.ibuf_len) = ibuf_max) then
try (* No newline found, and the buffer is full *)
handle_input iobuf close iobuf "Input buffer overrun"
with Not_found -> else
if (!(iobuf.ibuf_len) = ibuf_max) then handle_events iobuf fd tl
(* No newline found, and the buffer is full *) | Dispatch.Output :: tl ->
raise (Failure "Buffer overrun");
end
else
close iobuf
| Unixqueue.Output_readiness (g, fd) ->
(* XXX: Could be optimized to try and fill the output buffer *) (* XXX: Could be optimized to try and fill the output buffer *)
let iobuf = Hashtbl.find by_file_descr fd in
let buf = let buf =
if (!(iobuf.unsent) = "") then if (!(iobuf.unsent) = "") then
let cmd = Queue.pop iobuf.outq in let cmd = Queue.pop iobuf.outq in
@ -96,17 +75,19 @@ let handle_event ues esys e =
if n < buflen then if n < buflen then
iobuf.unsent := Str.string_after buf n iobuf.unsent := Str.string_after buf n
else if Queue.is_empty iobuf.outq then else if Queue.is_empty iobuf.outq then
Unixqueue.remove_resource ues g (Unixqueue.Wait_out fd) Dispatch.modify iobuf.d fd [Dispatch.Input];
| Unixqueue.Out_of_band (g, fd) -> handle_events iobuf fd tl
print_endline "oob" | Dispatch.Priority :: tl ->
| Unixqueue.Timeout (g, op) -> let s = String.create 4096 in
print_endline "timeout" ignore (Unix.recv fd s 0 4096 [Unix.MSG_OOB]);
| Unixqueue.Signal -> handle_events iobuf fd tl
print_endline "signal" | Dispatch.Error :: tl ->
| Unixqueue.Extra exn -> close iobuf "Error"
print_endline "extra" | Dispatch.Hangup :: tl ->
close iobuf "Hangup"
let bind ues grp fd command_handler close_handler =
let bind d fd command_handler close_handler =
let (outq, unsent, ibuf, ibuf_len) = let (outq, unsent, ibuf, ibuf_len) =
(Queue.create (), ref "", String.create ibuf_max, ref 0) (Queue.create (), ref "", String.create ibuf_max, ref 0)
in in
@ -117,8 +98,7 @@ let bind ues grp fd command_handler close_handler =
| Unix.ADDR_INET (addr, port) -> | Unix.ADDR_INET (addr, port) ->
Unix.string_of_inet_addr addr Unix.string_of_inet_addr addr
in in
let iobuf = {ues = ues; let iobuf = {d = d;
grp = grp;
fd = fd; fd = fd;
outq = outq; outq = outq;
unsent = unsent; unsent = unsent;
@ -128,9 +108,7 @@ let bind ues grp fd command_handler close_handler =
command_handler = ref command_handler; command_handler = ref command_handler;
close_handler = ref close_handler} close_handler = ref close_handler}
in in
Hashtbl.replace by_file_descr fd iobuf; Dispatch.add d fd (handle_events iobuf) [Dispatch.Input]
Unixqueue.add_resource ues grp (Unixqueue.Wait_in fd, -.1.0);
Unixqueue.add_close_action ues grp (fd, handle_close)
let rebind t command_handler close_handler = let rebind t command_handler close_handler =
t.command_handler := command_handler; t.command_handler := command_handler;

View File

@ -3,7 +3,6 @@ type t
val addr : t -> string val addr : t -> string
val write : t -> Command.t -> unit val write : t -> Command.t -> unit
val bind : Unixqueue.event_system -> Unixqueue.group -> Unix.file_descr -> (t -> Command.t -> unit) -> (unit -> unit) -> unit val bind : Dispatch.t -> Unix.file_descr -> (t -> Command.t -> unit) -> (string -> unit) -> unit
val rebind: t -> (t -> Command.t -> unit) -> (unit -> unit) -> unit val rebind: t -> (t -> Command.t -> unit) -> (string -> unit) -> unit
val close: t -> unit val close: t -> string -> unit
val handle_event : Unixqueue.event_system -> Unixqueue.event Equeue.t -> Unixqueue.event -> unit

4
irc.ml
View File

@ -1,10 +1,6 @@
let name = ref "irc.test" let name = ref "irc.test"
let version = "0.1" let version = "0.1"
let newline_re = Pcre.regexp "\n\r?"
let argsep_re = Pcre.regexp " :"
let space_re = Pcre.regexp " "
let dbg msg a = let dbg msg a =
prerr_endline ("[" ^ msg ^ "]"); prerr_endline ("[" ^ msg ^ "]");
a a

34
ircd.ml
View File

@ -7,32 +7,34 @@ let dbg msg a =
[connection_handler] will be called with the file descriptor of [connection_handler] will be called with the file descriptor of
any new connections. any new connections.
*) *)
let establish_server ues connection_handler addr = let establish_server d connection_handler addr =
let g = Unixqueue.new_group ues in let rec handle_event fd events =
let handle_event ues esys e = match events with
match e with | [] ->
| Unixqueue.Input_arrived (g, fd) -> ()
| Dispatch.Input :: tl ->
let cli_fd, cli_addr = Unix.accept fd in let cli_fd, cli_addr = Unix.accept fd in
connection_handler cli_fd connection_handler cli_fd cli_addr;
| _ -> handle_event fd tl
raise Equeue.Reject | Dispatch.Hangup :: tl ->
Dispatch.delete d fd;
handle_event fd tl
| _ :: tl ->
handle_event fd tl
in in
let srv = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in let srv = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.bind srv addr; Unix.bind srv addr;
Unix.listen srv 50; Unix.listen srv 50;
Unix.setsockopt srv Unix.SO_REUSEADDR true; Unix.setsockopt srv Unix.SO_REUSEADDR true;
Unixqueue.add_handler ues g handle_event; Dispatch.add d fd handle_event [Dispatch.Input];
Unixqueue.add_resource ues g (Unixqueue.Wait_in srv, -.1.0)
let main () = let main () =
let ues = Unixqueue.create_unix_event_system () in let d = Dispatch.create 50 in
let g = Unixqueue.new_group ues in
Unixqueue.add_handler ues g Iobuf.handle_event;
establish_server establish_server
ues ues
Client.handle_connection (Client.handle_connection d)
(Unix.ADDR_INET (Unix.inet_addr_any, 7777)); (Unix.ADDR_INET (Unix.inet_addr_any, 6667));
ues#run () Dispatch.run d
let _ = let _ =
main () main ()

107
tests.ml
View File

@ -1,6 +1,4 @@
open Unixqueue
open OUnit open OUnit
open Irc
let dump x = let dump x =
Printf.ksprintf (fun str -> prerr_string str; flush stderr) x Printf.ksprintf (fun str -> prerr_string str; flush stderr) x
@ -88,29 +86,29 @@ let chat d fd s =
let ibuf = Buffer.create 4096 in let ibuf = Buffer.create 4096 in
let handle_timer _ = let handle_timer _ =
failwith (Printf.sprintf "fd=%d timeout waiting for %s" failwith (Printf.sprintf "fd=%d timeout waiting for %s"
(int_of_file_descr fd) (int_of_file_descr fd)
(string_of_chat_event (List.hd !script))) (string_of_chat_event (List.hd !script)))
in in
let nomatch got = let nomatch got =
failwith (Printf.sprintf "fd=%d expecting %s\n got %s" failwith (Printf.sprintf "fd=%d expecting %s\n got %s"
(int_of_file_descr fd) (int_of_file_descr fd)
(string_of_chat_event (List.hd !script)) (string_of_chat_event (List.hd !script))
(String.escaped got)) (String.escaped got))
in in
let rec run_script fd = let rec run_script fd =
match !script with match !script with
| [] -> | [] ->
if ((Buffer.length obuf) = 0) then begin if ((Buffer.length obuf) = 0) then begin
Dispatch.delete_timer d timer; Dispatch.delete_timer d timer;
(try (try
Dispatch.delete d fd Dispatch.delete d fd
with (Failure _) -> with (Failure _) ->
()); ());
Unix.close fd Unix.close fd
end end
| Send buf :: tl -> | Send buf :: tl ->
Buffer.add_string obuf buf; Buffer.add_string obuf buf;
Dispatch.modify d fd [Dispatch.Input; Dispatch.Output]; Dispatch.modify d fd [Dispatch.Input; Dispatch.Output];
script := tl; script := tl;
run_script fd run_script fd
| Recv buf :: tl -> | Recv buf :: tl ->
@ -127,8 +125,8 @@ let chat d fd s =
((String.length ibuf_str) - buf_len); ((String.length ibuf_str) - buf_len);
run_script fd run_script fd
end else end else
nomatch ibuf_str nomatch ibuf_str
end else end else
() ()
| Regex buf :: tl -> | Regex buf :: tl ->
let ibuf_str = Buffer.contents ibuf in let ibuf_str = Buffer.contents ibuf in
@ -145,7 +143,7 @@ let chat d fd s =
((String.length ibuf_str) - match_len); ((String.length ibuf_str) - match_len);
run_script fd run_script fd
else else
nomatch ibuf_str nomatch ibuf_str
else else
() ()
@ -159,25 +157,25 @@ let chat d fd s =
let n = Unix.read fd s 0 4096 in let n = Unix.read fd s 0 4096 in
Buffer.add_substring ibuf s 0 n; Buffer.add_substring ibuf s 0 n;
run_script fd; run_script fd;
handler fd tl handler fd tl
| Dispatch.Output :: tl -> | Dispatch.Output :: tl ->
begin begin
if ((Buffer.length obuf) = 0) then if ((Buffer.length obuf) = 0) then
Dispatch.modify d fd [Dispatch.Input] Dispatch.modify d fd [Dispatch.Input]
else else
let ostr = Buffer.contents obuf in let ostr = Buffer.contents obuf in
let olen = Buffer.length obuf in let olen = Buffer.length obuf in
let n = Unix.write fd ostr 0 olen in let n = Unix.write fd ostr 0 olen in
Buffer.clear obuf; Buffer.clear obuf;
Buffer.add_substring obuf ostr n (olen - n) Buffer.add_substring obuf ostr n (olen - n)
end; end;
handler fd tl handler fd tl
| Dispatch.Hangup :: tl -> | Dispatch.Hangup :: tl ->
(* Stop listening to this fd, it will always return Hangup *) (* Stop listening to this fd, it will always return Hangup *)
(try (try
Dispatch.delete d fd Dispatch.delete d fd
with (Failure _) -> with (Failure _) ->
()) ())
| _ -> | _ ->
failwith "Unexpected event" failwith "Unexpected event"
in in
@ -258,7 +256,7 @@ let unit_tests =
last_timer := time last_timer := time
in in
let s = String.create 4096 in let s = String.create 4096 in
assert_equal 8 (Unix.write a "dispatch" 0 8); assert_equal 8 (Unix.write a "dispatch" 0 8);
Dispatch.add d b handle [Dispatch.Input; Dispatch.Output]; Dispatch.add d b handle [Dispatch.Input; Dispatch.Output];
@ -275,15 +273,15 @@ let unit_tests =
Dispatch.once d; Dispatch.once d;
assert_equal ~printer:string_of_float 0.0 !last_timer; assert_equal ~printer:string_of_float 0.0 !last_timer;
Dispatch.modify d b [Dispatch.Input]; Dispatch.modify d b [Dispatch.Input];
Dispatch.once d; Dispatch.once d;
if (!last_timer = 0.0) then if (!last_timer = 0.0) then
(* Give it one chance *) (* Give it one chance *)
Dispatch.once d; Dispatch.once d;
assert_equal ~printer:string_of_float time !last_timer; assert_equal ~printer:string_of_float time !last_timer;
Dispatch.modify d b [Dispatch.Input; Dispatch.Output]; Dispatch.modify d b [Dispatch.Input; Dispatch.Output];
assert_equal 6 (Unix.write a "gnarly" 0 6); assert_equal 6 (Unix.write a "gnarly" 0 6);
Dispatch.once d; Dispatch.once d;
assert_equal (b, [Dispatch.Input; Dispatch.Output]) !last_event; assert_equal (b, [Dispatch.Input; Dispatch.Output]) !last_event;
assert_equal 6 (Unix.read b s 0 4096); assert_equal 6 (Unix.read b s 0 4096);
@ -335,7 +333,6 @@ let unit_tests =
); );
] ]
(*
let do_login nick = let do_login nick =
[ [
Send ("USER " ^ nick ^ " +iw " ^ nick ^ " :gecos\r\n"); Send ("USER " ^ nick ^ " +iw " ^ nick ^ " :gecos\r\n");
@ -378,12 +375,11 @@ let regression_tests =
Recv ":testserver.test 401 nick otherguy :No such nick/channel\r\n"; Recv ":testserver.test 401 nick otherguy :No such nick/channel\r\n";
] ]
in in
let g = Unixqueue.new_group ues in let d = Dispatch.create 2 in
let a,b = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in let a,b = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in
Unixqueue.add_handler ues g Iobuf.handle_event; Client.handle_connection d a (Unix.getpeername a);
Client.handle_connection ues g a; chat d b script;
ignore (new chat_handler script ues b); Dispatch.run d);
chat_run ues);
"Second connection" >:: "Second connection" >::
(fun () -> (fun () ->
@ -394,12 +390,11 @@ let regression_tests =
Recv ":testserver.test 303 otherguy :otherguy\r\n"; Recv ":testserver.test 303 otherguy :otherguy\r\n";
] ]
in in
let g = Unixqueue.new_group ues in let d = Dispatch.create 2 in
let a,b = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in let a,b = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in
Unixqueue.add_handler ues g Iobuf.handle_event; Client.handle_connection d a (Unix.getpeername a);
Client.handle_connection ues g a; chat d b script;
ignore (new chat_handler script ues b); Dispatch.run d);
chat_run ues);
"Simultaneous connections" >:: "Simultaneous connections" >::
(fun () -> (fun () ->
@ -421,20 +416,16 @@ let regression_tests =
Recv ":alice!alice@UDS PRIVMSG bob :Hi Bob!\r\n"; Recv ":alice!alice@UDS PRIVMSG bob :Hi Bob!\r\n";
] ]
in in
let g = Unixqueue.new_group ues in let d = Dispatch.create 4 in
let aa,ab = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in let aa,ab = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in
let ba,bb = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in let ba,bb = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in
Unixqueue.add_handler ues g Iobuf.handle_event; Client.handle_connection d aa (Unix.getpeername aa);
Client.handle_connection ues g aa; Client.handle_connection d ba (Unix.getpeername ba);
Client.handle_connection ues g ba; chat d ab script1;
ignore (new chat_handler script1 ues ab); chat d bb script2;
ignore (new chat_handler script2 ues bb); Dispatch.run d);
chat_run ues);
] ]
*)
let _ = let _ =
Irc.name := "testserver.test"; Irc.name := "testserver.test";
run_test_tt_main (TestList [unit_tests (*; regression_tests *)]) run_test_tt_main (TestList [unit_tests; regression_tests])