From eb07c7970068fd95dc5323a6d75f26d22485b4a7 Mon Sep 17 00:00:00 2001 From: Neale Pickett Date: Fri, 8 Feb 2008 18:11:49 -0700 Subject: [PATCH] Test 1 working, but now there are some obuf overrun problems --- chat.ml | 125 +++++++++++++++++++++----------------------------- connection.ml | 110 +++++++++++++++++++++++++++++--------------- ircd.ml | 16 +++++-- tests.ml | 20 ++++---- 4 files changed, 146 insertions(+), 125 deletions(-) diff --git a/chat.ml b/chat.ml index 5cc16fc..743cace 100644 --- a/chat.ml +++ b/chat.ml @@ -5,7 +5,7 @@ type chat_event = | Recv of string exception Chat_match of (chat_event * chat_event) -exception Chat_failure of string +exception Chat_timeout of chat_event let string_of_chat_event e = match e with @@ -14,7 +14,7 @@ let string_of_chat_event e = | Recv str -> ("Recv(\"" ^ (String.escaped str) ^ "\")") - (** Return true if str starts with substr *) +(** Return true if str starts with substr *) let startswith str substr = let l = String.length substr in if l > String.length str then @@ -39,83 +39,58 @@ let read_fd fd = class chat_handler chatscript (ues : unix_event_system) fd = object (self) + inherit Connection.connection ues fd + val mutable script = chatscript - val g = ues#new_group () + val inbuf = Buffer.create 4096 initializer - ues#add_handler g self#handler; - self#setup () + self#run_script (); + self#pulse (Send "") () - method setup () = + + method pulse hd () = + if (List.hd script = hd) then + raise (Chat_timeout hd) + else + ues#once g 2.0 (self#pulse (List.hd script)) + + + method run_script () = match script with | [] -> Unix.close fd; ues#clear g - | Send _ :: _ -> - ues#add_resource g (Wait_out fd, -.1.0); - begin - try - ues#remove_resource g (Wait_in fd) - with Not_found -> - () - end - | Recv _ :: _ -> - ues#add_resource g (Wait_in fd, -.1.0); - begin - try - ues#remove_resource g (Wait_out fd) - with Not_found -> - () - end - - - method handler ues' (esys : event Equeue.t) e = - assert (ues = ues'); - match e with - | Input_arrived (g, fd) -> - self#handle_input fd - | Output_readiness (g, fd) -> - self#handle_output fd - | _ -> - raise Equeue.Reject - - method handle_input fd = - let buf = read_fd fd in - match script with - | Recv str :: tl -> - if (buf = str) then - begin - script <- tl; - self#setup() - end - else if startswith buf str then - begin - script <- [Recv (string_after buf (String.length str))] @ tl; - self#setup() - end + | Send buf :: tl -> + self#write buf; + script <- tl; + self#run_script () + | Recv buf :: tl -> + let buf_len = String.length buf in + let inbuf_str = Buffer.contents inbuf in + if (Buffer.length inbuf >= buf_len) then + if startswith inbuf_str buf then + begin + script <- tl; + Buffer.clear inbuf; + Buffer.add_substring + inbuf + inbuf_str + buf_len + ((String.length inbuf_str) - buf_len); + self#run_script () + end + else + raise (Chat_match (Recv inbuf_str, + Recv buf)) else - raise (Chat_match ((Recv str), (Recv buf))) - | x :: tl -> - raise (Chat_match (x, (Recv buf))) - | [] -> - raise (Chat_match ((Recv ""), (Recv buf))) + () - method handle_output fd = - match script with - | Send str :: tl -> - let slen = String.length str in - let n = Unix.single_write fd str 0 slen in - if (n <> slen) then - script <- [Send (string_after str n)] @ tl - else - script <- tl; - self#setup() - | x :: tl -> - raise (Chat_match (x, (Send ""))) - | [] -> - raise (Chat_match ((Recv ""), (Send ""))) - + method handle_input data = + Buffer.add_string inbuf data; + self#run_script () + end @@ -133,9 +108,13 @@ let chat script proc = let _ = new chat_handler script ues b in try Unixqueue.run ues - with Chat_match (got, expected) -> - raise (Chat_failure ("Chat_match; got " ^ - (string_of_chat_event got) ^ - ", expected " ^ - (string_of_chat_event expected))) + with + | Chat_match (got, expected) -> + raise (Failure ("Chat_match; got " ^ + (string_of_chat_event got) ^ + ", expected " ^ + (string_of_chat_event expected))) + | Chat_timeout evt -> + raise (Failure ("Chat_timeout waiting for " ^ + (string_of_chat_event evt))) diff --git a/connection.ml b/connection.ml index cca8463..8f2a3f5 100644 --- a/connection.ml +++ b/connection.ml @@ -8,8 +8,14 @@ exception Buffer_overrun line has been read. If the line is longer than the size of the input queue, you get an Input_buffer_overrun exception. + Output could be further memory-optimized by instead storing a list + of strings, which would have a nice memory utilization if you're + frequently sending the same string out to multiple connections (like + with a chat server). However, I don't care that much. + You can inherit this and define appropriate [handle_*] methods. A [write] method is provided for your convenience. + *) class connection (ues : unix_event_system) @@ -17,25 +23,25 @@ class connection ?(output_max = 1024) fd = object (self) - val g = ues#new_group () val mutable debug = false val obuf = String.create output_max val mutable obuf_len = 0 - val ibuf = String.create input_max - val mutable ibuf_len = 0 + + val input_timeout = -.1.0 + val output_timeout = -.1.0 initializer ues#add_handler g self#handle_event; - ues#add_resource g (Wait_in fd, -.1.0) + ues#add_resource g (Wait_in fd, input_timeout) method debug v = debug <- v method log msg = if debug then - prerr_endline msg + print_endline msg method write data = let data_len = String.length data in @@ -43,7 +49,7 @@ object (self) raise Buffer_overrun; String.blit data 0 obuf obuf_len data_len; obuf_len <- obuf_len + data_len; - ues#add_resource g (Wait_out fd, -.1.0) + ues#add_resource g (Wait_out fd, output_timeout) method handle_event ues esys e = @@ -76,49 +82,22 @@ object (self) String.blit obuf n obuf 0 (obuf_len) end - (** Split ibuf on newline, feeding each split into self#handle_input. - - Does not send the trailing newline. You can add it back if you want. - *) - method split_handle_input () = - match ibuf with - | "" -> - () - | ibuf -> - let p = String.index ibuf '\n' in - let s = String.sub ibuf 0 p in - if p >= ibuf_len then - raise Not_found; - ibuf_len <- ibuf_len - (p + 1); - String.blit ibuf (p + 1) ibuf 0 ibuf_len; - self#handle_input s; - self#split_handle_input () - method input_ready fd = - let size = input_max - ibuf_len in - let len = Unix.read fd ibuf ibuf_len size in + let data = String.create input_max in + let len = Unix.read fd data 0 input_max in if (len > 0) then - begin - ibuf_len <- ibuf_len + len; - try - self#split_handle_input () - with Not_found -> - if (ibuf_len = output_max) then - (* No newline found, and the buffer is full *) - raise Buffer_overrun; - end + self#handle_input (String.sub data 0 len) else begin self#handle_close (); Unix.close fd; ues#clear g; end - - method handle_input data = self#log ("<-- [" ^ (String.escaped data) ^ "]"); + raise Equeue.Reject method handle_oob fd = self#log "Unhandled OOB"; @@ -142,6 +121,63 @@ object (self) end +class line_connection + (ues : unix_event_system) + ?(input_max = 1024) + ?(output_max = 1024) + fd = +object (self) + inherit connection ues ~input_max ~output_max fd + + val ibuf = String.create input_max + val mutable ibuf_len = 0 + + (** Split ibuf on newline, feeding each split into self#handle_input. + + Does not send the trailing newline. You can add it back if you want. + *) + method split_handle_input () = + match ibuf with + | "" -> + () + | ibuf -> + let p = String.index ibuf '\n' in + let s = String.sub ibuf 0 p in + if p >= ibuf_len then + raise Not_found; + ibuf_len <- ibuf_len - (p + 1); + String.blit ibuf (p + 1) ibuf 0 ibuf_len; + self#handle_line s; + self#split_handle_input () + + method input_ready fd = + let size = input_max - ibuf_len in + let len = Unix.read fd ibuf ibuf_len size in + if (len > 0) then + begin + ibuf_len <- ibuf_len + len; + prerr_endline ("ibuf_len" ^ (string_of_int ibuf_len)); + try + self#split_handle_input () + with Not_found -> + if (ibuf_len = output_max) then + (* No newline found, and the buffer is full *) + raise Buffer_overrun; + end + else + begin + self#handle_close (); + Unix.close fd; + ues#clear g; + end + + method handle_input data = + raise (Failure "handle_input should never be called for line_connection objects") + + method handle_line line = + self#log ("<-- " ^ (String.escaped line)) + +end (** Establish a server on the given address. diff --git a/ircd.ml b/ircd.ml index 44973cc..b75a2b8 100644 --- a/ircd.ml +++ b/ircd.ml @@ -2,7 +2,18 @@ open Unixqueue class ircd_connection (ues : unix_event_system) fd = object (self) - inherit Connection.connection ues fd + inherit Connection.line_connection ues fd + + method handle_line line = + let parts = Pcre.split ~pat:" " line in + match parts with + | ["NICK"; nick] -> + self#log ("Set nickname to " ^ nick); + self#write ":testserver.test NOTICE nick :*** Hi there.\n"; + self#write "PING :12345\n"; + | _ -> + self#log ("Unknown: " ^ line) + end @@ -19,6 +30,3 @@ let main () = (Unix.ADDR_INET (Unix.inet_addr_any, 7777)); ues#run () -let _ = - main () - diff --git a/tests.ml b/tests.ml index 3b333e5..701b600 100644 --- a/tests.ml +++ b/tests.ml @@ -3,21 +3,20 @@ open OUnit open Chat let do_chat script () = - let irc_instance ues fd = - let irc = new Irc.irc ues in - irc#set_fd fd "nick" "gecos"; + let ircd_instance ues fd = + let irc = new Ircd.ircd_connection ues fd in irc#debug true in - chat script irc_instance + chat script ircd_instance let normal_tests = let login_script = [ - Recv "USER nick +iw nick :gecos\n"; - Recv "NICK nick\n"; - Send ":testserver.test NOTICE nick :*** Hi there.\n"; - Send "PING :12345\n"; - Recv "PONG :12345\n"; + Send "USER nick +iw nick :gecos\n"; + Send "NICK nick\n"; + Recv ":testserver.test NOTICE nick :*** Hi there.\n"; + Recv "PING :12345\n"; + Send "PONG :12345\n"; ] in "Normal tests" >::: @@ -28,8 +27,7 @@ let normal_tests = "Full connection" >:: (do_chat - ([Send ":testserver.test NOTICE AUTH :*** Doing some pointless ident junk...\n"] @ - login_script @ + (login_script @ [ Send ":testserver.test 001 nick :Welcome to the test script\n"; Send ":testserver.test 002 nick :Your host is testserver.test\n";