Dispatch module working

This commit is contained in:
Neale Pickett 2008-03-15 19:46:06 -06:00
parent 3eb2571769
commit 27d0137167
4 changed files with 89 additions and 77 deletions

View File

@ -21,6 +21,11 @@ section
tests.cmo: tests.cmo:
tests$(EXT_OBJ): tests$(EXT_OBJ):
dispatch_tests.cmx:
dispatch_tests.cmi:
dispatch_tests.cmo:
dispatch_tests$(EXT_OBJ):
OCamlProgram(tests, tests dispatch_tests dispatch chat irc command iobuf client channel) OCamlProgram(tests, tests dispatch_tests dispatch chat irc command iobuf client channel)
.PHONY: test .PHONY: test

View File

@ -1,92 +1,94 @@
type fd_handler = t -> event -> Unix.file_descr -> unit type event = Input | Priority | Output | Error | Hangup
type timeout_handler = t -> float -> unit type timer_handler = float -> unit
type timeout = (float * timeout_handler) type fd_handler = Unix.file_descr -> event list -> unit
module Timer =
Set.Make (struct
type t = (float * timer_handler)
let compare = compare
end)
module Fd_map = module Fd_map =
Map.Make (struct Map.Make (struct
type t = Unix.file_descr type t = Unix.file_descr
let compare = compare let compare = compare
) end)
module Timeout_set =
Set.Make (struct
type t = timeout
let compare = compare
)
type event = Input | Priority | Output | Error | Hangup
type t = { type t = {
e : Epoll.t; e : Epoll.t;
fds : (fd_handler * event list) Fd_map.t ref; fds : (fd_handler * event list) Fd_map.t ref;
numfds : int ref; numfds : int ref;
timeouts : timeout_handler Timeout_set.t ref; timers : Timer.t ref;
} }
let rec epoll_events_of_events
| [] -> []
| Input :: tl -> Epoll.In @ (epoll_events_of_events tl)
| Priority :: tl -> Epoll.Priority @ (epoll_events_of_events tl)
| Output :: tl -> Epoll.Output @ (epoll_events_of_events tl)
| Error :: tl -> Epoll.Error @ (epoll_events_of_events tl)
| Hangup :: tl -> Epoll.Hangup @ (epoll_events_of_events tl)
let rec events_of_epoll_events let to_epoll = function
| [] -> [] | Input -> Epoll.In
| Epoll.In :: tl -> Input @ (events_of_epoll_events) | Priority -> Epoll.Priority
| Epoll.Priority :: tl -> Priority @ (events_of_epoll_events) | Output -> Epoll.Out
| Epoll.Out :: tl -> Out @ (events_of_epoll_events) | Error -> Epoll.Error
| Epoll.Error :: tl -> Error @ (events_of_epoll_events) | Hangup -> Epoll.Hangup
| Epoll.Hangup :: tl -> Hangup @ (events_of_epoll_events)
let from_epoll = function
| Epoll.In -> Input
| Epoll.Priority -> Priority
| Epoll.Out -> Output
| Epoll.Error -> Error
| Epoll.Hangup -> Hangup
let rec epoll_events_of_events = List.map to_epoll
let rec events_of_epoll_events = List.map from_epoll
let create size = let create size =
{e = Epoll.create size; {e = Epoll.create size;
fds = ref Fd_map.empty; fds = ref Fd_map.empty;
numfds = ref 0; numfds = ref 0;
timeouts = ref Timeout_set.empty} timers = ref Timer.empty}
let destroy d = let destroy d =
Epoll.destroy d.e; Epoll.destroy d.e;
(* Explicitly unreference fds and timeouts, in case d sticks around *) (* Explicitly unreference fds and timers, in case d sticks around *)
d.fds := Fd_map.empty; d.fds := Fd_map.empty;
d.numfds := 0; d.numfds := 0;
d.timeouts := Timeout_set.empty d.timers := Timer.empty
let add d fd handler events = let add d fd handler events =
Epoll.ctl d.e Epoll.Add (fd, (epoll_events_of_events events)); Epoll.ctl d.e Epoll.Add (fd, (epoll_events_of_events events));
d.fds := Fd_map.add fd (handler, events) !d.fds; d.fds := Fd_map.add fd (handler, events) !(d.fds);
d.numfds := !d.numfds + 1 d.numfds := !(d.numfds) + 1
let modify d fd events = let modify d fd events =
Epoll.ctl d.e Epoll.Modify (fd, (epoll_events_of_events events)) Epoll.ctl d.e Epoll.Modify (fd, (epoll_events_of_events events))
let set_handler d fd handler = let set_handler d fd handler =
let (_, events) = Fd_map.find fd in let (_, events) = Fd_map.find fd !(d.fds) in
d.fds := Fd_map.add fd (handler, events) !d.fds d.fds := Fd_map.add fd (handler, events) !(d.fds)
let delete d fd = let delete d fd =
Epoll.ctl d.e Epoll.Delete (fd, []); Epoll.ctl d.e Epoll.Delete (fd, []);
d.fds := Fd_map.remove fd !d.fds; d.fds := Fd_map.remove fd !(d.fds);
d.numfds := !d.numfds - 1 d.numfds := !(d.numfds) - 1
let add_timeout d time handler = let add_timer d time handler =
d.timeouts := Timeout_set.add (time, handler) !d.timeouts d.timers := Timer.add (time, handler) !(d.timers)
let delete d time = let delete_timer d time =
let may_remain (time', _) = let may_remain (time', _) =
time' <> time time' <> time
in in
d.timeouts := Timeout_set.filter may_remain !d.timeouts d.timers := Timer.filter may_remain !(d.timers)
let rec dispatch_timeouts d now = let rec dispatch_timers d now =
let (time, handler) = Timeout_set.min_elt !d.timeouts in if (!(d.timers) != Timer.empty) then
let (time, handler) = Timer.min_elt !(d.timers) in
if now > time then if now > time then
() ()
else begin else begin
handler d time; handler time;
d.timeouts := Timeout_set.remove time !d.timeouts; d.timers := Timer.remove (time, handler) !(d.timers);
dispatch_timeouts d now dispatch_timers d now
end end
let rec dispatch_results d events_list = let rec dispatch_results d events_list =
@ -94,28 +96,29 @@ let rec dispatch_results d events_list =
| [] -> | [] ->
() ()
| (fd, epoll_events) :: tl -> | (fd, epoll_events) :: tl ->
let handler = Fd_map.find fd !d.fds in let handler, _ = Fd_map.find fd !(d.fds) in
let events = events_of_epoll_events in let events = events_of_epoll_events epoll_events in
handler d fd events; handler fd events;
dispatch_results d tl dispatch_results d tl
let once d = let once d =
let now = Unix.time () in let now = Unix.time () in
let timeout = let timeout =
try try
let (time, _) = Timeout_set.min_elt !d.timeouts in let (time, _) = Timer.min_elt !(d.timers) in
let timeout_s = max (time - now) 0.0 in let timeout_s = max (time -. now) 0.0 in
int_of_float (timeout_s *. 1000.0) int_of_float (timeout_s *. 1000.0)
with Not_found -> with Not_found ->
-1 -1
in in
let result = Epoll.wait d.e !d.nfds timeout in let result = Epoll.wait d.e !(d.numfds) timeout in
dispatch_timeouts d (Unix.time ()); dispatch_timers d (Unix.time ());
dispatch_results d result dispatch_results d result
let rec run d = let rec run d =
if ((!d.fds == Fd_map.empty) && if ((!(d.fds) == Fd_map.empty) &&
(!d.timeouts == Timeout_set.empty)) then (!(d.timers) == Timer.empty)) then
() ()
else begin else begin
once d; once d;

View File

@ -4,13 +4,13 @@ type t
type event = Input | Priority | Output | Error | Hangup type event = Input | Priority | Output | Error | Hangup
(** An event associated with a file descriptor *) (** An event associated with a file descriptor *)
type fd_handler = t -> Unix.file_descr -> event list -> unit type fd_handler = Unix.file_descr -> event list -> unit
(** [fd_handler d fd evt] handles an [event] generated by dispatcher [d] *) (** [fd_handler d fd evt] handles an [event] generated by dispatcher [d] *)
type timeout_handler = t -> float -> unit type timer_handler = float -> unit
(** [timeout_handler d when] is called at or after [when] by dispatcher [d] *) (** [timer_handler d when] is called at or after [when] by dispatcher [d] *)
val create : [size] -> t val create : int -> t
(** Create a new event dispatcher, preallocating [size] fd events. [size] (** Create a new event dispatcher, preallocating [size] fd events. [size]
is just a hint, the fd list will grow on demand. *) is just a hint, the fd list will grow on demand. *)
@ -21,7 +21,7 @@ val add : t -> Unix.file_descr -> fd_handler -> event list -> unit
(** [add d fd handler events] begins listening for [events] on file (** [add d fd handler events] begins listening for [events] on file
descriptor [fd], calling [handler] when an event occurs. *) descriptor [fd], calling [handler] when an event occurs. *)
val modify : t -> Unix.file_descr -> event_list -> unit val modify : t -> Unix.file_descr -> event list -> unit
(** [modify d fd events] changes the events to listen for on fd *) (** [modify d fd events] changes the events to listen for on fd *)
val set_handler : t -> Unix.file_descr -> fd_handler -> unit val set_handler : t -> Unix.file_descr -> fd_handler -> unit
@ -32,12 +32,12 @@ val delete : t -> Unix.file_descr -> unit
(** [delete d fd] stops [d] paying attention to events on file (** [delete d fd] stops [d] paying attention to events on file
descriptor [fd] *) descriptor [fd] *)
val add_timeout : t -> float -> timeout_handler -> unit val add_timer : t -> float -> timer_handler -> unit
(** [add_timeout d time handler] will cause dispatcher [d] to invoke (** [add_timer d time handler] will cause dispatcher [d] to invoke
[handler d time] at or after [time] *) [handler d time] at or after [time] *)
val delete_timeout : t -> float -> unit val delete_timer : t -> float -> unit
(** [delete_timeout d time] prevents dispatcher from invoking any (** [delete_timer d time] prevents dispatcher from invoking any
handlers added for [time] *) handlers added for [time] *)
val once : t -> unit val once : t -> unit
@ -46,4 +46,4 @@ val once : t -> unit
val run : t -> unit val run : t -> unit
(** [run d] will dispatch events from [d] until all file descriptors (** [run d] will dispatch events from [d] until all file descriptors
have been removed and all timeouts have run or been removed *) have been removed and all timers have run or been removed *)

View File

@ -6,7 +6,7 @@ let unit =
(fun () -> (fun () ->
let d = Dispatch.create 3 in let d = Dispatch.create 3 in
let a,b = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in let a,b = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in
let rec handle d fd events = let rec handle fd events =
match events with match events with
| [Dispatch.Input; Dispatch.Output] -> | [Dispatch.Input; Dispatch.Output] ->
let s = String.create 4096 in let s = String.create 4096 in
@ -23,5 +23,9 @@ let unit =
let s = String.create 4096 in let s = String.create 4096 in
assert_equal 2 (Unix.read a s 0 4096); assert_equal 2 (Unix.read a s 0 4096);
assert_equal "hi" (Str.string_before s 2); assert_equal "hi" (Str.string_before s 2);
Dispatch.destroy d;
Unix.close a;
Unix.close b
); );
] ]