From da4d22de5b0af78805492fc6907be88858484bba Mon Sep 17 00:00:00 2001 From: Neale Pickett Date: Sun, 8 Feb 2009 22:26:27 -0700 Subject: [PATCH] Use select() instead of epoll()/poll() --- OMakefile | 3 - dispatch.ml | 117 +++++++------- dispatch.mli | 12 +- epoll.mli | 33 ---- epoll_wrapper.c | 415 ------------------------------------------------ iobuf.ml | 40 ++--- 6 files changed, 81 insertions(+), 539 deletions(-) delete mode 100644 epoll.mli delete mode 100644 epoll_wrapper.c diff --git a/OMakefile b/OMakefile index a27780a..50d5312 100644 --- a/OMakefile +++ b/OMakefile @@ -1,13 +1,10 @@ OCAMLPACKS[] = unix str -OCAML_CLIBS = ocamlepoll OCAMLCFLAGS += -g .DEFAULT: bot -StaticCLibrary(ocamlepoll, epoll_wrapper) - OCamlProgram(bot, bot irc command iobuf dispatch) section diff --git a/dispatch.ml b/dispatch.ml index 24c38f2..6dacc69 100644 --- a/dispatch.ml +++ b/dispatch.ml @@ -1,6 +1,6 @@ -type event = Input | Priority | Output | Error | Hangup +type event = Input | Output | Exception type timer_handler = float -> unit -type fd_handler = Unix.file_descr -> event list -> unit +type fd_handler = Unix.file_descr -> event -> unit module Timer = Set.Make (struct @@ -15,9 +15,10 @@ module Fd_map = end) type t = { - e : Epoll.t; - fds : (fd_handler * event list) Fd_map.t ref; - numfds : int ref; + read_fds : Unix.file_descr list ref; + write_fds : Unix.file_descr list ref; + except_fds : Unix.file_descr list ref; + handlers : fd_handler Fd_map.t ref; timers : Timer.t ref; } @@ -29,53 +30,52 @@ type t = { let timeout_fudge = 0.001 -let to_epoll = function - | Input -> Epoll.In - | Priority -> Epoll.Priority - | Output -> Epoll.Out - | Error -> Epoll.Error - | Hangup -> Epoll.Hangup - -let from_epoll = function - | Epoll.In -> Input - | Epoll.Priority -> Priority - | Epoll.Out -> Output - | Epoll.Error -> Error - | Epoll.Hangup -> Hangup - -let rec epoll_events_of_events = List.map to_epoll - -let rec events_of_epoll_events = List.map from_epoll - let create size = - {e = Epoll.create size; - fds = ref Fd_map.empty; - numfds = ref 0; + {read_fds = ref []; + write_fds = ref []; + except_fds = ref []; + handlers = ref Fd_map.empty; timers = ref Timer.empty} let destroy d = - Epoll.destroy d.e; (* Explicitly unreference fds and timers, in case d sticks around *) - d.fds := Fd_map.empty; - d.numfds := 0; + d.handlers := Fd_map.empty; d.timers := Timer.empty -let add d fd handler events = - Epoll.ctl d.e Epoll.Add (fd, (epoll_events_of_events events)); - d.fds := Fd_map.add fd (handler, events) !(d.fds); - d.numfds := !(d.numfds) + 1 +let get_fds d event = + match event with + | Input -> d.read_fds + | Output -> d.write_fds + | Exception -> d.except_fds let modify d fd events = - Epoll.ctl d.e Epoll.Modify (fd, (epoll_events_of_events events)) + let add_event event = + let l = get_fds d event in + let nl = (List.filter ((<>) fd) !l) in + if List.mem event events then + l := fd :: nl + else + l := nl + in + if Fd_map.mem fd !(d.handlers) then + List.iter add_event [Input; Output; Exception] + else + raise Not_found let set_handler d fd handler = - let (_, events) = Fd_map.find fd !(d.fds) in - d.fds := Fd_map.add fd (handler, events) !(d.fds) + d.handlers := Fd_map.add fd handler !(d.handlers) + +let add d fd handler events = + set_handler d fd handler; + modify d fd events let delete d fd = - Epoll.ctl d.e Epoll.Delete (fd, []); - d.fds := Fd_map.remove fd !(d.fds); - d.numfds := !(d.numfds) - 1 + let del_event event = + let l = get_fds d event in + l := (List.filter ((<>) fd) !l) + in + d.handlers := Fd_map.remove fd !(d.handlers); + List.iter del_event [Input; Output; Exception] let add_timer d handler time = d.timers := Timer.add (time, handler) !(d.timers) @@ -98,15 +98,19 @@ let rec dispatch_timers d now = dispatch_timers d now end -let rec dispatch_results d events_list = - match events_list with - | [] -> - () - | (fd, epoll_events) :: tl -> - let handler, _ = Fd_map.find fd !(d.fds) in - let events = events_of_epoll_events epoll_events in - handler fd events; - dispatch_results d tl +let rec dispatch_results d (read_ready, write_ready, except_ready) = + let rec dispatch event fd_list = + match fd_list with + | [] -> + () + | fd :: tl -> + let handler = Fd_map.find fd !(d.handlers) in + handler fd event; + dispatch event tl + in + dispatch Input read_ready; + dispatch Output write_ready; + dispatch Exception except_ready let once d = let now = Unix.gettimeofday () in @@ -118,19 +122,16 @@ let once d = with Not_found -> (-1.0) in - (if !(d.numfds) = 0 then - (* epoll()--and probably poll()--barfs if it has no file descriptors *) - ignore (Unix.select [] [] [] timeout) - else - (* poll() and epoll() wait *at most* timeout ms. If you have fds but they're not - doing anything, multiple calls to once may be required. This is lame. *) - let timeout_ms = int_of_float (timeout *. 1000.0) in - let result = Epoll.wait d.e !(d.numfds) timeout_ms in - dispatch_results d result); + (* select () waits *at most* timeout ms. If you have fds but they're +not + doing anything, multiple calls to once may be required. This is + lame. *) + let result = Unix.select !(d.read_fds) !(d.write_fds) !(d.except_fds) timeout in + dispatch_results d result; dispatch_timers d (Unix.gettimeofday ()) let rec run d = - if ((!(d.fds) == Fd_map.empty) && + if ((!(d.handlers) == Fd_map.empty) && (!(d.timers) == Timer.empty)) then () else begin diff --git a/dispatch.mli b/dispatch.mli index d4fdfda..2b1a2f0 100644 --- a/dispatch.mli +++ b/dispatch.mli @@ -1,18 +1,18 @@ type t (** The type of event dispatchers *) -type event = Input | Priority | Output | Error | Hangup +type event = Input | Output | Exception (** An event associated with a file descriptor *) -type fd_handler = Unix.file_descr -> event list -> unit -(** [fd_handler d fd evt] handles an [event] generated by dispatcher [d] *) +type fd_handler = Unix.file_descr -> event -> unit +(** [fd_handler fd evt] handles event [evt] from file descriptor [fd] *) type timer_handler = float -> unit -(** [timer_handler d when] is called at or after [when] by dispatcher [d] *) +(** [timer_handler d when] is called at or after [when] *) val create : int -> t -(** Create a new event dispatcher, preallocating [size] fd events. [size] - is just a hint, the fd list will grow on demand. *) +(** Create a new event dispatcher, preallocating [size] fd events. + [size] is just a hint, the fd list will grow on demand. *) val destroy : t -> unit (** Destroy an event dispatcher *) diff --git a/epoll.mli b/epoll.mli deleted file mode 100644 index f8c3c6f..0000000 --- a/epoll.mli +++ /dev/null @@ -1,33 +0,0 @@ -(* - * OCaml epoll() interface - * Author: Neale Pickett - * Time-stamp: <2008-03-14 11:49:20 neale> - *) - -(** - * This module provides an interface to epoll() on Linux, or poll() on - * everything else. - *) - -type t - -type event = In | Priority | Out | Error | Hangup - (** Event types, mirroring poll() and epoll() event constants. *) - -type op = Add | Modify | Delete - (** Operations for ctl *) - -external create : int -> t = "ocaml_epoll_create" - (** Create a new poll structure *) - -external destroy : t -> unit = "ocaml_epoll_destroy" - (** Destroy a poll structure *) - -external ctl : t -> op -> (Unix.file_descr * event list) -> unit = "ocaml_epoll_ctl" - (** Add, Modify, or Delete an event list *) - -external wait : t -> int -> int -> (Unix.file_descr * event list) list = "ocaml_epoll_wait" -(** [wait e maxevents timeout] returns a list of at most [maxevents] - (file descriptor * event list)s that occurred before at least - [timeout] milliseconds elapsed. - *) diff --git a/epoll_wrapper.c b/epoll_wrapper.c deleted file mode 100644 index 137e9c1..0000000 --- a/epoll_wrapper.c +++ /dev/null @@ -1,415 +0,0 @@ -/** OCaml poll() interface - * - * Time-stamp: <2008-03-12 23:20:54 neale> - * - * Copyright (C) 2008 Neale Pickett - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - - -#include -#include -#include -#include -#include - -#ifdef __linux -# include -# define EPOLL -#else -# include -# undef EPOLL -# define EPOLLIN POLLIN -# define EPOLLPRI POLLPRI -# define EPOLLOUT POLLOUT -# define EPOLLERR POLLERR -# define EPOLLHUP POLLHUP -#endif - -#include -#include -#include - -#define puke() \ - { \ - char errstr[512]; \ - snprintf(errstr, sizeof(errstr), "%s: %s", __FUNCTION__, strerror(errno)); \ - caml_failwith(errstr); \ - } - -enum { - caml_POLLIN, - caml_POLLPRI, - caml_POLLOUT, - caml_POLLERR, - caml_POLLHUP -}; - -enum { - caml_POLL_ADD, - caml_POLL_MOD, - caml_POLL_DEL -}; - - -static int -list_length(value list) -{ - CAMLparam1(list); - CAMLlocal1(l); - - int len = 0; - - for (l = list; l != Val_int(0); l = Field(l, 1)) { - len += 1; - } - CAMLreturn(len); -} - -static int -int_of_event_list(value list) -{ - CAMLparam1(list); - CAMLlocal1(l); - - int acc = 0; - - for (l = list; l != Val_int(0); l = Field(l, 1)) { - switch (Int_val(Field(l, 0))) { - case caml_POLLIN: - acc |= EPOLLIN; - break; - case caml_POLLPRI: - acc |= EPOLLPRI; - break; - case caml_POLLOUT: - acc |= EPOLLOUT; - break; - case caml_POLLERR: - acc |= EPOLLERR; - break; - case caml_POLLHUP: - acc |= EPOLLHUP; - break; - } - } - CAMLreturn(acc); -} - -static value -cons(value item, value list) -{ - CAMLparam2(item, list); - CAMLlocal1(new); - - new = alloc_small(2, 0); - - Field(new, 0) = item; - Field(new, 1) = list; - CAMLreturn(new); -} - -static value -event_list_of_int(int events) -{ - CAMLparam0(); - CAMLlocal1(result); - - result = Val_int(0); - - /* Do these in reverse order since we're prepending to the list */ - if (events & EPOLLHUP) { - result = cons(Val_int(caml_POLLHUP), result); - } - if (events & EPOLLERR) { - result = cons(Val_int(caml_POLLERR), result); - } - if (events & EPOLLOUT) { - result = cons(Val_int(caml_POLLOUT), result); - } - if (events & EPOLLPRI) { - result = cons(Val_int(caml_POLLPRI), result); - } - if (events & EPOLLIN) { - result = cons(Val_int(caml_POLLIN), result); - } - CAMLreturn(result); -} - -#ifdef EPOLL -/******************************************************************************** - * - * epoll() - * - ********************************************************************************/ - -CAMLprim value -ocaml_epoll_create(value size) -{ - CAMLparam1(size); - CAMLlocal1(result); - - int ret; - - ret = epoll_create(Int_val(size)); - if (-1 == ret) { - puke(); - } - - result = Val_int(ret); - CAMLreturn(result); -} - -CAMLprim value -ocaml_epoll_destroy(value t) -{ - CAMLparam1(t); - - int ret; - - ret = close(Int_val(t)); - if (-1 == ret) { - puke(); - } - CAMLreturn(Val_unit); -} - -/* There are three reasons why I can't store a continuation or any other - * complex type in evt.data: - * - * 1. GC might blow them away - * 2. Heap compaction might move them - * 3. The kernel can remove events from its internal table without - * telling us (this is why there's no EPOLLNVAL) - * - * 1 and 2 can be solved by calling caml_register_global_root for each - * continuation, but this does not solve 3. So you get file - * descriptors. You can make a nice record type and wrap a Set around - * it. */ -CAMLprim value -ocaml_epoll_ctl(value t, value op, value what) -{ - CAMLparam3(t, op, what); - - int op_; - int fd; - struct epoll_event evt; - int ret; - - switch (Int_val(op)) { - case caml_POLL_ADD: - op_ = EPOLL_CTL_ADD; - break; - case caml_POLL_MOD: - op_ = EPOLL_CTL_MOD; - break; - case caml_POLL_DEL: - op_ = EPOLL_CTL_DEL; - break; - } - fd = Int_val(Field(what, 0)); - evt.events = int_of_event_list(Field(what, 1)); - evt.data.fd = fd; - - ret = epoll_ctl(Int_val(t), op_, fd, &evt); - if (-1 == ret) { - puke(); - } - - CAMLreturn(Val_unit); -} - -CAMLprim value -ocaml_epoll_wait(value t, value maxevents, value timeout) -{ - CAMLparam2(t, timeout); - CAMLlocal2(result, item); - - int maxevents_ = Int_val(maxevents); - struct epoll_event events[maxevents_]; - int i; - int ret; - - caml_enter_blocking_section(); - ret = epoll_wait(Int_val(t), events, maxevents_, Int_val(timeout)); - caml_leave_blocking_section(); - if (-1 == ret) { - puke(); - } - - result = Val_int(0); - for (i = 0; i < ret; i += 1) { - item = alloc_small(2,0); - Field(item, 0) = Val_int(events[i].data.fd); - Field(item, 1) = event_list_of_int(events[i].events); - result = cons(item, result); - } - - CAMLreturn(result); -} - -#else -/******************************************************************************** - * - * poll() compatibility - * - ********************************************************************************/ - -struct t { - int nfds; - int size; - struct pollfd *fds; -}; - -CAMLprim value -ocaml_epoll_create(value size) -{ - CAMLparam1(size); - CAMLlocal1(result); - - struct t *t_; - - t_ = (struct t *)malloc(sizeof(struct t)); - t_->nfds = 0; - t_->size = size; - t_->fds = (struct pollfd *)calloc(size, sizeof(struct pollfd)); - - result = caml_alloc(1, Abstract_tag); - Field(result, 0) = (value)t_; - - CAMLreturn(result); -} - -CAMLprim value -ocaml_epoll_destroy(value t) -{ - CAMLparam1(t); - - struct t *t_ = (struct t *)Field(t, 0); - - free(t_->fds); - free(t_); - CAMLreturn(Val_unit); -} - - -CAMLprim value -ocaml_epoll_ctl(value t, value op, value what) -{ - CAMLparam3(t, op, what); - - struct t *t_ = (struct t *)Field(t, 0); - int op_ = Int_val(op); - struct pollfd pfd; - int i; - - pfd.fd = Int_val(Field(what, 0)); - pfd.events = int_of_event_list(Field(what, 1)); - - /* Find this fd in our list */ - for (i == 0; i < t_->nfds; i += 1) { - struct pollfd *p = &(t_->fds[i]); - - if (p->fd == pfd.fd) { - break; - } - } - - switch (op_) { - case caml_POLL_ADD: - if (i < t_->nfds) { - caml_failwith("file descriptor already present"); - } - if (i >= t_->size) { - struct pollfd *newfds; - int newsize; - - newsize = t_->size + 20; - newfds = (struct pollfd *)realloc(t_, (sizeof(struct pollfd)) * newsize); - if (! newfds) { - caml_failwith("out of memory"); - } - t_->size = newsize; - t_->fds = newfds; - } - t_->nfds += 1; - t_->fds[i] = pfd; - break; - - case caml_POLL_MOD: - t_->fds[i] = pfd; - break; - - case caml_POLL_DEL: - if (i == t_->nfds) { - caml_failwith("file descriptor not present"); - } - t_->nfds -= 1; - for(; i < t_->nfds; i += 1) { - t_->fds[i] = t_->fds[i+1]; - } - break; - } -} - -#include "obj.h" - -CAMLprim value -ocaml_epoll_wait(value t, value maxevents, value timeout) -{ - CAMLparam3(t, maxevents, timeout); - CAMLlocal2(result, v); - - struct t *t_ = (struct t *)Field(t, 0); - int maxevents_ = Int_val(maxevents); - int i; - int j; - int ret; - - /* Call poll */ - caml_enter_blocking_section(); - ret = poll(t_->fds, t_->nfds, Int_val(timeout)); - caml_leave_blocking_section(); - if (-1 == ret) { - puke(); - } - - result = Val_int(0); - if (0 < ret) { - j = 0; - for (i = 0; ((i < t_->nfds) && (i < maxevents_)); i += 1) { - struct pollfd *p = &(t_->fds[i]); - - if (p->revents & POLLNVAL) { - /* Don't let j increment: remove this item */ - continue; - } else if (p->revents) { - v = alloc_small(2, 0); - Field(v, 0) = Val_int(p->fd); - Field(v, 1) = event_list_of_int(p->revents); - result = cons(v, result); - } - if (i != j) { - t_->fds[i] = t_->fds[j]; - } - j += 1; - } - t_->nfds = j; - } - CAMLreturn(result); -#error "I haven't yet figured out why this causes a segfault." -} - -#endif diff --git a/iobuf.ml b/iobuf.ml index ad5307b..b263071 100644 --- a/iobuf.ml +++ b/iobuf.ml @@ -54,21 +54,20 @@ let write iobuf cmd = if ((len = 0) && (!(iobuf.unsent) = "")) then Dispatch.modify iobuf.d iobuf.fd [Dispatch.Input; Dispatch.Output] -let rec handle_events iobuf fd events = - match events with - | [] -> - () - | Dispatch.Input :: tl -> +let handle_event iobuf fd event = + match event with + | Dispatch.Input -> let size = ibuf_max - !(iobuf.ibuf_len) in - let len = Unix.read fd iobuf.ibuf !(iobuf.ibuf_len) size in - iobuf.ibuf_len := !(iobuf.ibuf_len) + len; - handle_input iobuf; - if (!(iobuf.ibuf_len) = ibuf_max) then - (* No newline found, and the buffer is full *) - close iobuf "Input buffer overrun" - else - handle_events iobuf fd tl - | Dispatch.Output :: tl -> + (match Unix.read fd iobuf.ibuf !(iobuf.ibuf_len) size with + | 0 -> + close iobuf "Hangup" + | len -> + iobuf.ibuf_len := !(iobuf.ibuf_len) + len; + handle_input iobuf; + if (!(iobuf.ibuf_len) = ibuf_max) then + (* No newline found, and the buffer is full *) + close iobuf "Input buffer overrun") + | Dispatch.Output -> let buf = Buffer.create obuf_max in Buffer.add_string buf !(iobuf.unsent); while (((Buffer.length buf) < obuf_max) && @@ -82,25 +81,18 @@ let rec handle_events iobuf fd events = let n = Unix.single_write fd bufstr 0 buflen in if n < buflen then begin iobuf.unsent := Str.string_after bufstr n; - handle_events iobuf fd tl end else if Queue.is_empty iobuf.outq then if !(iobuf.alive) then begin (* We're out of data to send *) Dispatch.modify iobuf.d fd [Dispatch.Input]; - handle_events iobuf fd tl end else begin (* Close dead connection after all output has despooled *) Dispatch.delete iobuf.d iobuf.fd; Unix.close iobuf.fd end - | Dispatch.Priority :: tl -> + | Dispatch.Exception -> let s = String.create 4096 in - ignore (Unix.recv fd s 0 4096 [Unix.MSG_OOB]); - handle_events iobuf fd tl - | Dispatch.Error :: tl -> - close iobuf "Error" - | Dispatch.Hangup :: tl -> - close iobuf "Hangup" + ignore (Unix.recv fd s 0 4096 [Unix.MSG_OOB]) let bind iobuf handle_command handle_error = iobuf.handle_command := handle_command; @@ -117,5 +109,5 @@ let create d fd name handle_command handle_error = handle_command = ref handle_command; handle_error = ref handle_error; alive = ref true} in - Dispatch.add d fd (handle_events iobuf) [Dispatch.Input]; + Dispatch.add d fd (handle_event iobuf) [Dispatch.Input]; iobuf