diff --git a/OMakefile b/OMakefile index 6b8f0c4..5b9e05d 100644 --- a/OMakefile +++ b/OMakefile @@ -3,7 +3,7 @@ OCAMLPACKS[] = equeue pcre -.DEFAULT: ircd +.DEFAULT: pgircd OCamlProgram(pgircd, pgircd ircd connection) @@ -19,4 +19,4 @@ section .PHONY: clean clean: - rm $(filter-out %.pem tls.c, $(filter-proper-targets $(ls R, .))) + rm $(filter-proper-targets $(ls R, .)) diff --git a/chat.ml b/chat.ml index 743cace..b530a13 100644 --- a/chat.ml +++ b/chat.ml @@ -39,22 +39,16 @@ let read_fd fd = class chat_handler chatscript (ues : unix_event_system) fd = object (self) - inherit Connection.connection ues fd + inherit Connection.bare_connection ~input_timeout:0.1 ~output_timeout:0.1 ues fd val mutable script = chatscript val inbuf = Buffer.create 4096 initializer self#run_script (); - self#pulse (Send "") () - - - method pulse hd () = - if (List.hd script = hd) then - raise (Chat_timeout hd) - else - ues#once g 2.0 (self#pulse (List.hd script)) + method handle_timeout op = + raise (Chat_timeout (List.hd script)) method run_script () = match script with @@ -110,11 +104,11 @@ let chat script proc = Unixqueue.run ues with | Chat_match (got, expected) -> - raise (Failure ("Chat_match; got " ^ + raise (Failure ("Not matched: got " ^ (string_of_chat_event got) ^ ", expected " ^ (string_of_chat_event expected))) | Chat_timeout evt -> - raise (Failure ("Chat_timeout waiting for " ^ + raise (Failure ("Timeout waiting for " ^ (string_of_chat_event evt))) diff --git a/connection.ml b/connection.ml index 8f2a3f5..3db2b3f 100644 --- a/connection.ml +++ b/connection.ml @@ -2,36 +2,15 @@ open Unixqueue exception Buffer_overrun -(** Generic equeue connection class. - - Input is line-buffered: handle_data is only called once a complete - line has been read. If the line is longer than the size of the - input queue, you get an Input_buffer_overrun exception. - - Output could be further memory-optimized by instead storing a list - of strings, which would have a nice memory utilization if you're - frequently sending the same string out to multiple connections (like - with a chat server). However, I don't care that much. - - You can inherit this and define appropriate [handle_*] methods. - A [write] method is provided for your convenience. - - *) -class connection +(** Generic equeue connection class. *) +class virtual connection (ues : unix_event_system) - ?(input_max = 1024) - ?(output_max = 1024) + ?(input_timeout = -.1.0) fd = object (self) val g = ues#new_group () val mutable debug = false - val obuf = String.create output_max - val mutable obuf_len = 0 - - val input_timeout = -.1.0 - val output_timeout = -.1.0 - initializer ues#add_handler g self#handle_event; ues#add_resource g (Wait_in fd, input_timeout) @@ -43,15 +22,6 @@ object (self) if debug then print_endline msg - method write data = - let data_len = String.length data in - if (data_len + obuf_len > output_max) then - raise Buffer_overrun; - String.blit data 0 obuf obuf_len data_len; - obuf_len <- obuf_len + data_len; - ues#add_resource g (Wait_out fd, output_timeout) - - method handle_event ues esys e = match e with | Input_arrived (g, fd) -> @@ -67,37 +37,9 @@ object (self) | Extra exn -> self#handle_extra exn - method output_ready fd = - let size = obuf_len in - let n = Unix.single_write fd obuf 0 size in - obuf_len <- obuf_len - n; - if (obuf_len = 0) then - (* Don't check for output readiness anymore *) - begin - ues#remove_resource g (Wait_out fd) - end - else - (* Put unwritten output back into the output queue *) - begin - String.blit obuf n obuf 0 (obuf_len) - end + method virtual output_ready : Unix.file_descr -> unit - method input_ready fd = - let data = String.create input_max in - let len = Unix.read fd data 0 input_max in - if (len > 0) then - self#handle_input (String.sub data 0 len) - else - begin - self#handle_close (); - Unix.close fd; - ues#clear g; - end - - - method handle_input data = - self#log ("<-- [" ^ (String.escaped data) ^ "]"); - raise Equeue.Reject + method virtual input_ready : Unix.file_descr -> unit method handle_oob fd = self#log "Unhandled OOB"; @@ -116,22 +58,124 @@ object (self) raise Equeue.Reject method handle_close () = - self#log "Closed"; - () + self#log "Closed" end -class line_connection + +(** Bare connection for reading and writing. + + You can inherit this and define appropriate [handle_*] methods. + A [write] method is provided for your convenience. + +*) +class bare_connection (ues : unix_event_system) + ?(input_timeout = -.1.0) + ?(output_timeout = -.1.0) ?(input_max = 1024) ?(output_max = 1024) fd = object (self) - inherit connection ues ~input_max ~output_max fd + inherit connection ues ~input_timeout fd - val ibuf = String.create input_max + + val obuf = String.create output_max + val mutable obuf_len = 0 + + method write data = + let data_len = String.length data in + if (data_len + obuf_len > output_max) then + raise Buffer_overrun; + String.blit data 0 obuf obuf_len data_len; + obuf_len <- obuf_len + data_len; + ues#add_resource g (Wait_out fd, output_timeout) + + method output_ready fd = + let size = obuf_len in + let n = Unix.single_write fd obuf 0 size in + obuf_len <- obuf_len - n; + if (obuf_len = 0) then + (* Don't check for output readiness anymore *) + begin + ues#remove_resource g (Wait_out fd) + end + else + (* Put unwritten output back into the output queue *) + begin + String.blit obuf n obuf 0 (obuf_len) + end + + method input_ready fd = + let data = String.create input_max in + let len = Unix.read fd data 0 input_max in + if (len > 0) then + self#handle_input (String.sub data 0 len) + else + begin + self#handle_close (); + Unix.close fd; + ues#clear g; + end + + method handle_input data = + self#log ("<-- [" ^ (String.escaped data) ^ "]") + + +end + + +(** Write s to fd, returning any unwritten data. *) +let write fd s = + let sl = String.length s in + let n = Unix.single_write fd s 0 sl in + (String.sub s n (sl - n)) + +(** Buffered connection class. + + Input is split by newlines and sent to [handle_line]. + + Output is done with [write]. Send a list of words to be joined by a + space. This is intended to make one-to-many communications more + memory-efficient: the common strings need not be copied to all + recipients. +*) +class virtual buffered_connection + (ues : unix_event_system) + ?(output_timeout = -.1.0) + ?(ibuf_max = 4096) + ?(max_outq = 50) + ?(max_unsent = 4096) + fd = +object (self) + inherit connection ues fd + + (* This allocates a string of length ibuf_max for each connection. + That could add up. *) + val mutable ibuf = String.create ibuf_max val mutable ibuf_len = 0 + val mutable unsent = "" + val mutable outq = Queue.create () + + method output_ready fd = + (* This could be better optimized, I'm sure. *) + match (unsent, Queue.is_empty outq) with + | ("", true) -> + ues#remove_resource g (Wait_out fd) + | ("", false) -> + let s = (String.concat " " (Queue.pop outq)) ^ "\n" in + unsent <- write fd s; + if (unsent = "") then + self#output_ready fd + | (s, _) -> + unsent <- write fd s; + if (unsent = "") then + self#output_ready fd + + + method virtual handle_line : string -> unit + (** Split ibuf on newline, feeding each split into self#handle_input. Does not send the trailing newline. You can add it back if you want. @@ -151,16 +195,15 @@ object (self) self#split_handle_input () method input_ready fd = - let size = input_max - ibuf_len in + let size = ibuf_max - ibuf_len in let len = Unix.read fd ibuf ibuf_len size in if (len > 0) then begin ibuf_len <- ibuf_len + len; - prerr_endline ("ibuf_len" ^ (string_of_int ibuf_len)); try self#split_handle_input () with Not_found -> - if (ibuf_len = output_max) then + if (ibuf_len = ibuf_max) then (* No newline found, and the buffer is full *) raise Buffer_overrun; end @@ -171,14 +214,17 @@ object (self) ues#clear g; end - method handle_input data = - raise (Failure "handle_input should never be called for line_connection objects") - - method handle_line line = - self#log ("<-- " ^ (String.escaped line)) - + method write line = + if (Queue.length outq) >= max_outq then + raise (Failure "Maximum output queue length exceeded") + else + begin + Queue.add line outq; + ues#add_resource g (Wait_out fd, output_timeout) + end end + (** Establish a server on the given address. [connection_handler] will be called with the file descriptor of diff --git a/ircd.ml b/ircd.ml index b75a2b8..83427fb 100644 --- a/ircd.ml +++ b/ircd.ml @@ -1,22 +1,35 @@ open Unixqueue -class ircd_connection (ues : unix_event_system) fd = +class ircd_connection + (ues : unix_event_system) + ?(output_timeout = -.1.0) + ?(ibuf_max = 4096) + ?(max_outq = 50) + ?(max_unsent = 4096) + fd = object (self) - inherit Connection.line_connection ues fd + inherit Connection.buffered_connection + ues + ~output_timeout + ~ibuf_max + ~max_outq + ~max_unsent + fd method handle_line line = let parts = Pcre.split ~pat:" " line in match parts with | ["NICK"; nick] -> self#log ("Set nickname to " ^ nick); - self#write ":testserver.test NOTICE nick :*** Hi there.\n"; - self#write "PING :12345\n"; + self#write [":testserver.test"; "NOTICE"; nick; ":*** Hi there."]; + self#write ["PING"; ":12345"]; | _ -> self#log ("Unknown: " ^ line) + method die reason = + self#log ("Dying: " ^ reason) end - let main () = let ues = new unix_event_system () in let handle_connection fd = diff --git a/tests.ml b/tests.ml index 701b600..47a4c6c 100644 --- a/tests.ml +++ b/tests.ml @@ -29,22 +29,22 @@ let normal_tests = (do_chat (login_script @ [ - Send ":testserver.test 001 nick :Welcome to the test script\n"; - Send ":testserver.test 002 nick :Your host is testserver.test\n"; - Send ":testserver.test 003 nick :This server is timeless\n"; - Send ":testserver.test 004 nick testserver.test testscript DGabcdfg bilmnopst bkloveI\n"; - Send ":testserver.test 005 nick CALLERID CASEMAPPING=rfc1459 KICKLEN=160 MODES=4 WHATEVER=4 WHO=1 CARES=3 :are supported by this server\n"; - Send ":testserver.test 043 00XAAAAL6 :your unique ID\n"; - Send ":testserver.test 251 nick :There are 14 users and 4 invisible on 1 servers\n"; - Send ":testserver.test 252 nick 1 :IRC Operators online\n"; - Send ":testserver.test 254 4 :channels formed\n"; - Send ":testserver.test 255 nick :I have 17 clients and 0 servers\n"; - Send ":testserver.test 265 nick :Current local users: 17 Max: 25\n"; - Send ":testserver.test 266 nick :Current global users: 17 Max: 25\n"; - Send ":testserver.test 250 nick :Highest connection count: 25 (25 clients) (430 connections received)\n"; - Send ":testserver.test 375 nick :- xirc.lanl.gov Message of the Day -\n"; - Send ":testserver.test 372 nick :- This is ircd-hybrid MOTD replace it with something better\n"; - Send ":testserver.test 376 nick :End of /MOTD command.\n"; + Recv ":testserver.test 001 nick :Welcome to the test script\n"; + Recv ":testserver.test 002 nick :Your host is testserver.test\n"; + Recv ":testserver.test 003 nick :This server is timeless\n"; + Recv ":testserver.test 004 nick testserver.test testscript DGabcdfg bilmnopst bkloveI\n"; + Recv ":testserver.test 005 nick CALLERID CASEMAPPING=rfc1459 KICKLEN=160 MODES=4 WHATEVER=4 WHO=1 CARES=3 :are supported by this server\n"; + Recv ":testserver.test 043 00XAAAAL6 :your unique ID\n"; + Recv ":testserver.test 251 nick :There are 14 users and 4 invisible on 1 servers\n"; + Recv ":testserver.test 252 nick 1 :IRC Operators online\n"; + Recv ":testserver.test 254 4 :channels formed\n"; + Recv ":testserver.test 255 nick :I have 17 clients and 0 servers\n"; + Recv ":testserver.test 265 nick :Current local users: 17 Max: 25\n"; + Recv ":testserver.test 266 nick :Current global users: 17 Max: 25\n"; + Recv ":testserver.test 250 nick :Highest connection count: 25 (25 clients) (430 connections received)\n"; + Recv ":testserver.test 375 nick :- xirc.lanl.gov Message of the Day -\n"; + Recv ":testserver.test 372 nick :- This is ircd-hybrid MOTD replace it with something better\n"; + Recv ":testserver.test 376 nick :End of /MOTD command.\n"; ] )); ]