eventcore.drivers.posix.epoll 10/34(29%) line coverage

      
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
170
180
190
200
210
220
230
240
250
260
270
280
290
300
310
321
330
342
351
360
370
380
390
400
410
420
430
440
450
460
470
480
490
500
510
520
530
540
550
560
570
580
590
600
610
620
630
640
650
660
670
680
690
700
712
724
734
742
752
762
774
780
790
800
810
820
830
840
850
860
870
880
890
900
910
920
930
940
950
960
970
980
990
1000
1010
1020
1030
1040
1050
/** Linux epoll based event driver implementation. Epoll is an efficient API for asynchronous I/O on Linux, suitable for large numbers of concurrently open sockets. */ module eventcore.drivers.posix.epoll; @safe: /*@nogc:*/ nothrow: version (linux): public import eventcore.drivers.posix.driver; import eventcore.internal.utils; import core.time : Duration; import core.sys.posix.sys.time : timeval; import core.sys.linux.epoll; alias EpollEventDriver = PosixEventDriver!EpollEventLoop; static if (!is(typeof(SOCK_CLOEXEC))) enum SOCK_CLOEXEC = 0x80000; final class EpollEventLoop : PosixEventLoop { @safe: nothrow: private { int m_epoll; epoll_event[] m_events; } this() { m_epoll = () @trusted { return epoll_create1(SOCK_CLOEXEC); } (); m_events.length = 100; } override bool doProcessEvents(Duration timeout) @trusted { import std.algorithm : min, max; //assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!"); debug (EventCoreEpollDebug) print("Epoll wait %s, %s", m_events.length, timeout); long tomsec; if (timeout == Duration.max) tomsec = long.max; else tomsec = max((timeout.total!"hnsecs" + 9999) / 10_000, 0); auto ret = epoll_wait(m_epoll, m_events.ptr, cast(int)m_events.length, tomsec > int.max ? -1 : cast(int)tomsec); debug (EventCoreEpollDebug) print("Epoll wait done: %s", ret); if (ret > 0) { foreach (ref evt; m_events[0 .. ret]) { debug (EventCoreEpollDebug) print("Epoll event on %s: %s", evt.data.fd, evt.events); auto fd = cast(FD)evt.data.fd; if (evt.events & (EPOLLERR|EPOLLHUP|EPOLLRDHUP)) notify!(EventType.status)(fd); if (evt.events & EPOLLIN) notify!(EventType.read)(fd); if (evt.events & EPOLLOUT) notify!(EventType.write)(fd); } return true; } else return false; } override void dispose() { import core.sys.posix.unistd : close; close(m_epoll); } override void registerFD(FD fd, EventMask mask, bool edge_triggered = true) { debug (EventCoreEpollDebug) print("Epoll register FD %s: %s", fd, mask); epoll_event ev; if (edge_triggered) ev.events |= EPOLLET; if (mask & EventMask.read) ev.events |= EPOLLIN; if (mask & EventMask.write) ev.events |= EPOLLOUT; if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP; ev.data.fd = cast(int)fd; () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_ADD, cast(int)fd, &ev); } (); } override void unregisterFD(FD fd, EventMask mask) { debug (EventCoreEpollDebug) print("Epoll unregister FD %s", fd); () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_DEL, cast(int)fd, null); } (); } override void updateFD(FD fd, EventMask old_mask, EventMask mask, bool edge_triggered = true) { debug (EventCoreEpollDebug) print("Epoll update FD %s: %s", fd, mask); epoll_event ev; if (edge_triggered) ev.events |= EPOLLET; //ev.events = EPOLLONESHOT; if (mask & EventMask.read) ev.events |= EPOLLIN; if (mask & EventMask.write) ev.events |= EPOLLOUT; if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP; ev.data.fd = cast(int)fd; () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_MOD, cast(int)fd, &ev); } (); } } private timeval toTimeVal(Duration dur) { timeval tvdur; dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec); return tvdur; }