Use select() instead of epoll()/poll()

This commit is contained in:
Neale Pickett 2009-02-08 22:26:27 -07:00
parent 488fa6a04c
commit da4d22de5b
6 changed files with 81 additions and 539 deletions

View File

@ -1,13 +1,10 @@
OCAMLPACKS[] =
unix
str
OCAML_CLIBS = ocamlepoll
OCAMLCFLAGS += -g
.DEFAULT: bot
StaticCLibrary(ocamlepoll, epoll_wrapper)
OCamlProgram(bot, bot irc command iobuf dispatch)
section

View File

@ -1,6 +1,6 @@
type event = Input | Priority | Output | Error | Hangup
type event = Input | Output | Exception
type timer_handler = float -> unit
type fd_handler = Unix.file_descr -> event list -> unit
type fd_handler = Unix.file_descr -> event -> unit
module Timer =
Set.Make (struct
@ -15,9 +15,10 @@ module Fd_map =
end)
type t = {
e : Epoll.t;
fds : (fd_handler * event list) Fd_map.t ref;
numfds : int ref;
read_fds : Unix.file_descr list ref;
write_fds : Unix.file_descr list ref;
except_fds : Unix.file_descr list ref;
handlers : fd_handler Fd_map.t ref;
timers : Timer.t ref;
}
@ -29,53 +30,52 @@ type t = {
let timeout_fudge = 0.001
let to_epoll = function
| Input -> Epoll.In
| Priority -> Epoll.Priority
| Output -> Epoll.Out
| Error -> Epoll.Error
| Hangup -> Epoll.Hangup
let from_epoll = function
| Epoll.In -> Input
| Epoll.Priority -> Priority
| Epoll.Out -> Output
| Epoll.Error -> Error
| Epoll.Hangup -> Hangup
let rec epoll_events_of_events = List.map to_epoll
let rec events_of_epoll_events = List.map from_epoll
let create size =
{e = Epoll.create size;
fds = ref Fd_map.empty;
numfds = ref 0;
{read_fds = ref [];
write_fds = ref [];
except_fds = ref [];
handlers = ref Fd_map.empty;
timers = ref Timer.empty}
let destroy d =
Epoll.destroy d.e;
(* Explicitly unreference fds and timers, in case d sticks around *)
d.fds := Fd_map.empty;
d.numfds := 0;
d.handlers := Fd_map.empty;
d.timers := Timer.empty
let add d fd handler events =
Epoll.ctl d.e Epoll.Add (fd, (epoll_events_of_events events));
d.fds := Fd_map.add fd (handler, events) !(d.fds);
d.numfds := !(d.numfds) + 1
let get_fds d event =
match event with
| Input -> d.read_fds
| Output -> d.write_fds
| Exception -> d.except_fds
let modify d fd events =
Epoll.ctl d.e Epoll.Modify (fd, (epoll_events_of_events events))
let add_event event =
let l = get_fds d event in
let nl = (List.filter ((<>) fd) !l) in
if List.mem event events then
l := fd :: nl
else
l := nl
in
if Fd_map.mem fd !(d.handlers) then
List.iter add_event [Input; Output; Exception]
else
raise Not_found
let set_handler d fd handler =
let (_, events) = Fd_map.find fd !(d.fds) in
d.fds := Fd_map.add fd (handler, events) !(d.fds)
d.handlers := Fd_map.add fd handler !(d.handlers)
let add d fd handler events =
set_handler d fd handler;
modify d fd events
let delete d fd =
Epoll.ctl d.e Epoll.Delete (fd, []);
d.fds := Fd_map.remove fd !(d.fds);
d.numfds := !(d.numfds) - 1
let del_event event =
let l = get_fds d event in
l := (List.filter ((<>) fd) !l)
in
d.handlers := Fd_map.remove fd !(d.handlers);
List.iter del_event [Input; Output; Exception]
let add_timer d handler time =
d.timers := Timer.add (time, handler) !(d.timers)
@ -98,15 +98,19 @@ let rec dispatch_timers d now =
dispatch_timers d now
end
let rec dispatch_results d events_list =
match events_list with
| [] ->
()
| (fd, epoll_events) :: tl ->
let handler, _ = Fd_map.find fd !(d.fds) in
let events = events_of_epoll_events epoll_events in
handler fd events;
dispatch_results d tl
let rec dispatch_results d (read_ready, write_ready, except_ready) =
let rec dispatch event fd_list =
match fd_list with
| [] ->
()
| fd :: tl ->
let handler = Fd_map.find fd !(d.handlers) in
handler fd event;
dispatch event tl
in
dispatch Input read_ready;
dispatch Output write_ready;
dispatch Exception except_ready
let once d =
let now = Unix.gettimeofday () in
@ -118,19 +122,16 @@ let once d =
with Not_found ->
(-1.0)
in
(if !(d.numfds) = 0 then
(* epoll()--and probably poll()--barfs if it has no file descriptors *)
ignore (Unix.select [] [] [] timeout)
else
(* poll() and epoll() wait *at most* timeout ms. If you have fds but they're not
doing anything, multiple calls to once may be required. This is lame. *)
let timeout_ms = int_of_float (timeout *. 1000.0) in
let result = Epoll.wait d.e !(d.numfds) timeout_ms in
dispatch_results d result);
(* select () waits *at most* timeout ms. If you have fds but they're
not
doing anything, multiple calls to once may be required. This is
lame. *)
let result = Unix.select !(d.read_fds) !(d.write_fds) !(d.except_fds) timeout in
dispatch_results d result;
dispatch_timers d (Unix.gettimeofday ())
let rec run d =
if ((!(d.fds) == Fd_map.empty) &&
if ((!(d.handlers) == Fd_map.empty) &&
(!(d.timers) == Timer.empty)) then
()
else begin

View File

@ -1,18 +1,18 @@
type t
(** The type of event dispatchers *)
type event = Input | Priority | Output | Error | Hangup
type event = Input | Output | Exception
(** An event associated with a file descriptor *)
type fd_handler = Unix.file_descr -> event list -> unit
(** [fd_handler d fd evt] handles an [event] generated by dispatcher [d] *)
type fd_handler = Unix.file_descr -> event -> unit
(** [fd_handler fd evt] handles event [evt] from file descriptor [fd] *)
type timer_handler = float -> unit
(** [timer_handler d when] is called at or after [when] by dispatcher [d] *)
(** [timer_handler d when] is called at or after [when] *)
val create : int -> t
(** Create a new event dispatcher, preallocating [size] fd events. [size]
is just a hint, the fd list will grow on demand. *)
(** Create a new event dispatcher, preallocating [size] fd events.
[size] is just a hint, the fd list will grow on demand. *)
val destroy : t -> unit
(** Destroy an event dispatcher *)

View File

@ -1,33 +0,0 @@
(*
* OCaml epoll() interface
* Author: Neale Pickett <neale@woozle.org>
* Time-stamp: <2008-03-14 11:49:20 neale>
*)
(**
* This module provides an interface to epoll() on Linux, or poll() on
* everything else.
*)
type t
type event = In | Priority | Out | Error | Hangup
(** Event types, mirroring poll() and epoll() event constants. *)
type op = Add | Modify | Delete
(** Operations for ctl *)
external create : int -> t = "ocaml_epoll_create"
(** Create a new poll structure *)
external destroy : t -> unit = "ocaml_epoll_destroy"
(** Destroy a poll structure *)
external ctl : t -> op -> (Unix.file_descr * event list) -> unit = "ocaml_epoll_ctl"
(** Add, Modify, or Delete an event list *)
external wait : t -> int -> int -> (Unix.file_descr * event list) list = "ocaml_epoll_wait"
(** [wait e maxevents timeout] returns a list of at most [maxevents]
(file descriptor * event list)s that occurred before at least
[timeout] milliseconds elapsed.
*)

View File

@ -1,415 +0,0 @@
/** OCaml poll() interface
*
* Time-stamp: <2008-03-12 23:20:54 neale>
*
* Copyright (C) 2008 Neale Pickett
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <caml/mlvalues.h>
#include <caml/fail.h>
#include <caml/custom.h>
#include <caml/memory.h>
#include <caml/alloc.h>
#ifdef __linux
# include <sys/epoll.h>
# define EPOLL
#else
# include <poll.h>
# undef EPOLL
# define EPOLLIN POLLIN
# define EPOLLPRI POLLPRI
# define EPOLLOUT POLLOUT
# define EPOLLERR POLLERR
# define EPOLLHUP POLLHUP
#endif
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#define puke() \
{ \
char errstr[512]; \
snprintf(errstr, sizeof(errstr), "%s: %s", __FUNCTION__, strerror(errno)); \
caml_failwith(errstr); \
}
enum {
caml_POLLIN,
caml_POLLPRI,
caml_POLLOUT,
caml_POLLERR,
caml_POLLHUP
};
enum {
caml_POLL_ADD,
caml_POLL_MOD,
caml_POLL_DEL
};
static int
list_length(value list)
{
CAMLparam1(list);
CAMLlocal1(l);
int len = 0;
for (l = list; l != Val_int(0); l = Field(l, 1)) {
len += 1;
}
CAMLreturn(len);
}
static int
int_of_event_list(value list)
{
CAMLparam1(list);
CAMLlocal1(l);
int acc = 0;
for (l = list; l != Val_int(0); l = Field(l, 1)) {
switch (Int_val(Field(l, 0))) {
case caml_POLLIN:
acc |= EPOLLIN;
break;
case caml_POLLPRI:
acc |= EPOLLPRI;
break;
case caml_POLLOUT:
acc |= EPOLLOUT;
break;
case caml_POLLERR:
acc |= EPOLLERR;
break;
case caml_POLLHUP:
acc |= EPOLLHUP;
break;
}
}
CAMLreturn(acc);
}
static value
cons(value item, value list)
{
CAMLparam2(item, list);
CAMLlocal1(new);
new = alloc_small(2, 0);
Field(new, 0) = item;
Field(new, 1) = list;
CAMLreturn(new);
}
static value
event_list_of_int(int events)
{
CAMLparam0();
CAMLlocal1(result);
result = Val_int(0);
/* Do these in reverse order since we're prepending to the list */
if (events & EPOLLHUP) {
result = cons(Val_int(caml_POLLHUP), result);
}
if (events & EPOLLERR) {
result = cons(Val_int(caml_POLLERR), result);
}
if (events & EPOLLOUT) {
result = cons(Val_int(caml_POLLOUT), result);
}
if (events & EPOLLPRI) {
result = cons(Val_int(caml_POLLPRI), result);
}
if (events & EPOLLIN) {
result = cons(Val_int(caml_POLLIN), result);
}
CAMLreturn(result);
}
#ifdef EPOLL
/********************************************************************************
*
* epoll()
*
********************************************************************************/
CAMLprim value
ocaml_epoll_create(value size)
{
CAMLparam1(size);
CAMLlocal1(result);
int ret;
ret = epoll_create(Int_val(size));
if (-1 == ret) {
puke();
}
result = Val_int(ret);
CAMLreturn(result);
}
CAMLprim value
ocaml_epoll_destroy(value t)
{
CAMLparam1(t);
int ret;
ret = close(Int_val(t));
if (-1 == ret) {
puke();
}
CAMLreturn(Val_unit);
}
/* There are three reasons why I can't store a continuation or any other
* complex type in evt.data:
*
* 1. GC might blow them away
* 2. Heap compaction might move them
* 3. The kernel can remove events from its internal table without
* telling us (this is why there's no EPOLLNVAL)
*
* 1 and 2 can be solved by calling caml_register_global_root for each
* continuation, but this does not solve 3. So you get file
* descriptors. You can make a nice record type and wrap a Set around
* it. */
CAMLprim value
ocaml_epoll_ctl(value t, value op, value what)
{
CAMLparam3(t, op, what);
int op_;
int fd;
struct epoll_event evt;
int ret;
switch (Int_val(op)) {
case caml_POLL_ADD:
op_ = EPOLL_CTL_ADD;
break;
case caml_POLL_MOD:
op_ = EPOLL_CTL_MOD;
break;
case caml_POLL_DEL:
op_ = EPOLL_CTL_DEL;
break;
}
fd = Int_val(Field(what, 0));
evt.events = int_of_event_list(Field(what, 1));
evt.data.fd = fd;
ret = epoll_ctl(Int_val(t), op_, fd, &evt);
if (-1 == ret) {
puke();
}
CAMLreturn(Val_unit);
}
CAMLprim value
ocaml_epoll_wait(value t, value maxevents, value timeout)
{
CAMLparam2(t, timeout);
CAMLlocal2(result, item);
int maxevents_ = Int_val(maxevents);
struct epoll_event events[maxevents_];
int i;
int ret;
caml_enter_blocking_section();
ret = epoll_wait(Int_val(t), events, maxevents_, Int_val(timeout));
caml_leave_blocking_section();
if (-1 == ret) {
puke();
}
result = Val_int(0);
for (i = 0; i < ret; i += 1) {
item = alloc_small(2,0);
Field(item, 0) = Val_int(events[i].data.fd);
Field(item, 1) = event_list_of_int(events[i].events);
result = cons(item, result);
}
CAMLreturn(result);
}
#else
/********************************************************************************
*
* poll() compatibility
*
********************************************************************************/
struct t {
int nfds;
int size;
struct pollfd *fds;
};
CAMLprim value
ocaml_epoll_create(value size)
{
CAMLparam1(size);
CAMLlocal1(result);
struct t *t_;
t_ = (struct t *)malloc(sizeof(struct t));
t_->nfds = 0;
t_->size = size;
t_->fds = (struct pollfd *)calloc(size, sizeof(struct pollfd));
result = caml_alloc(1, Abstract_tag);
Field(result, 0) = (value)t_;
CAMLreturn(result);
}
CAMLprim value
ocaml_epoll_destroy(value t)
{
CAMLparam1(t);
struct t *t_ = (struct t *)Field(t, 0);
free(t_->fds);
free(t_);
CAMLreturn(Val_unit);
}
CAMLprim value
ocaml_epoll_ctl(value t, value op, value what)
{
CAMLparam3(t, op, what);
struct t *t_ = (struct t *)Field(t, 0);
int op_ = Int_val(op);
struct pollfd pfd;
int i;
pfd.fd = Int_val(Field(what, 0));
pfd.events = int_of_event_list(Field(what, 1));
/* Find this fd in our list */
for (i == 0; i < t_->nfds; i += 1) {
struct pollfd *p = &(t_->fds[i]);
if (p->fd == pfd.fd) {
break;
}
}
switch (op_) {
case caml_POLL_ADD:
if (i < t_->nfds) {
caml_failwith("file descriptor already present");
}
if (i >= t_->size) {
struct pollfd *newfds;
int newsize;
newsize = t_->size + 20;
newfds = (struct pollfd *)realloc(t_, (sizeof(struct pollfd)) * newsize);
if (! newfds) {
caml_failwith("out of memory");
}
t_->size = newsize;
t_->fds = newfds;
}
t_->nfds += 1;
t_->fds[i] = pfd;
break;
case caml_POLL_MOD:
t_->fds[i] = pfd;
break;
case caml_POLL_DEL:
if (i == t_->nfds) {
caml_failwith("file descriptor not present");
}
t_->nfds -= 1;
for(; i < t_->nfds; i += 1) {
t_->fds[i] = t_->fds[i+1];
}
break;
}
}
#include "obj.h"
CAMLprim value
ocaml_epoll_wait(value t, value maxevents, value timeout)
{
CAMLparam3(t, maxevents, timeout);
CAMLlocal2(result, v);
struct t *t_ = (struct t *)Field(t, 0);
int maxevents_ = Int_val(maxevents);
int i;
int j;
int ret;
/* Call poll */
caml_enter_blocking_section();
ret = poll(t_->fds, t_->nfds, Int_val(timeout));
caml_leave_blocking_section();
if (-1 == ret) {
puke();
}
result = Val_int(0);
if (0 < ret) {
j = 0;
for (i = 0; ((i < t_->nfds) && (i < maxevents_)); i += 1) {
struct pollfd *p = &(t_->fds[i]);
if (p->revents & POLLNVAL) {
/* Don't let j increment: remove this item */
continue;
} else if (p->revents) {
v = alloc_small(2, 0);
Field(v, 0) = Val_int(p->fd);
Field(v, 1) = event_list_of_int(p->revents);
result = cons(v, result);
}
if (i != j) {
t_->fds[i] = t_->fds[j];
}
j += 1;
}
t_->nfds = j;
}
CAMLreturn(result);
#error "I haven't yet figured out why this causes a segfault."
}
#endif

View File

@ -54,21 +54,20 @@ let write iobuf cmd =
if ((len = 0) && (!(iobuf.unsent) = "")) then
Dispatch.modify iobuf.d iobuf.fd [Dispatch.Input; Dispatch.Output]
let rec handle_events iobuf fd events =
match events with
| [] ->
()
| Dispatch.Input :: tl ->
let handle_event iobuf fd event =
match event with
| Dispatch.Input ->
let size = ibuf_max - !(iobuf.ibuf_len) in
let len = Unix.read fd iobuf.ibuf !(iobuf.ibuf_len) size in
iobuf.ibuf_len := !(iobuf.ibuf_len) + len;
handle_input iobuf;
if (!(iobuf.ibuf_len) = ibuf_max) then
(* No newline found, and the buffer is full *)
close iobuf "Input buffer overrun"
else
handle_events iobuf fd tl
| Dispatch.Output :: tl ->
(match Unix.read fd iobuf.ibuf !(iobuf.ibuf_len) size with
| 0 ->
close iobuf "Hangup"
| len ->
iobuf.ibuf_len := !(iobuf.ibuf_len) + len;
handle_input iobuf;
if (!(iobuf.ibuf_len) = ibuf_max) then
(* No newline found, and the buffer is full *)
close iobuf "Input buffer overrun")
| Dispatch.Output ->
let buf = Buffer.create obuf_max in
Buffer.add_string buf !(iobuf.unsent);
while (((Buffer.length buf) < obuf_max) &&
@ -82,25 +81,18 @@ let rec handle_events iobuf fd events =
let n = Unix.single_write fd bufstr 0 buflen in
if n < buflen then begin
iobuf.unsent := Str.string_after bufstr n;
handle_events iobuf fd tl
end else if Queue.is_empty iobuf.outq then
if !(iobuf.alive) then begin
(* We're out of data to send *)
Dispatch.modify iobuf.d fd [Dispatch.Input];
handle_events iobuf fd tl
end else begin
(* Close dead connection after all output has despooled *)
Dispatch.delete iobuf.d iobuf.fd;
Unix.close iobuf.fd
end
| Dispatch.Priority :: tl ->
| Dispatch.Exception ->
let s = String.create 4096 in
ignore (Unix.recv fd s 0 4096 [Unix.MSG_OOB]);
handle_events iobuf fd tl
| Dispatch.Error :: tl ->
close iobuf "Error"
| Dispatch.Hangup :: tl ->
close iobuf "Hangup"
ignore (Unix.recv fd s 0 4096 [Unix.MSG_OOB])
let bind iobuf handle_command handle_error =
iobuf.handle_command := handle_command;
@ -117,5 +109,5 @@ let create d fd name handle_command handle_error =
handle_command = ref handle_command;
handle_error = ref handle_error;
alive = ref true} in
Dispatch.add d fd (handle_events iobuf) [Dispatch.Input];
Dispatch.add d fd (handle_event iobuf) [Dispatch.Input];
iobuf