-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventqueue.nim
513 lines (443 loc) · 14.7 KB
/
eventqueue.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
import std/strutils
import std/macros
import std/os
import std/selectors
import std/monotimes
import std/nativesockets
import std/tables
import std/times
import std/deques
import cps
import eventqueue/semaphore
export Semaphore, semaphore.`==`, semaphore.`<`, semaphore.hash
export Event
const
eqDebug {.booldefine, used.} = false ## emit extra debugging output
eqPoolSize {.intdefine, used.} = 64 ## expected pending continuations
eqTraceSize {.intdefine, used.} = 1000 ## limit the traceback
cpsTrace = false
type
Readiness = enum
Unready = "the default state, pre-initialized"
Stopped = "we are outside an event loop but available for queuing events"
Running = "we're in a loop polling for events and running continuations"
Stopping = "we're tearing down the dispatcher and it will shortly stop"
Clock = MonoTime
Id = distinct int
Fd = distinct int
WaitingIds = seq[Id]
PendingIds = Table[Semaphore, Id]
EventQueue = object
state: Readiness ## dispatcher readiness
pending: PendingIds ## maps pending semaphores to Ids
waiting: WaitingIds ## maps waiting selector Fds to Ids
goto: Table[Id, Cont] ## where to go from here!
lastId: Id ## id of last-issued registration
selector: Selector[Id] ## watches selectable stuff
yields: Deque[Cont] ## continuations ready to run
waiters: int ## a count of selector listeners
manager: Selector[Clock] ## monitor polling, wake-ups
timer: Fd ## file-descriptor of polling timer
wake: SelectEvent ## wake-up event for queue actions
eager: bool ## debounce wake-up triggers
Cont* = ref object of Continuation
when eqDebug:
clock: Clock ## time of latest poll loop
delay: Duration ## polling overhead
id: Id ## our last registration
fd: Fd ## our last file-descriptor
when cpsTrace:
filename: string
line: int
column: int
identity: string
when cpsTrace:
type
Frame = object
c: Cont
e: ref CatchableError
Stack = Deque[Frame]
const
wakeupId = Id(-1)
invalidId = Id(0)
invalidFd = Fd(-1)
oneMs = initDuration(milliseconds = 1)
var eq {.threadvar.}: EventQueue
template now(): Clock = getMonoTime()
proc `$`(id: Id): string {.used.} = "{" & system.`$`(id.int) & "}"
proc `$`(fd: Fd): string {.used.} = "[" & system.`$`(fd.int) & "]"
proc `$`(c: Cont): string {.used.} =
when cpsTrace:
# quality poor!
#"$1($2) $3" % [ c.filename, $c.line, c.identity ]
c.identity
else:
"&" & $cast[uint](c)
proc `<`(a, b: Id): bool {.borrow, used.}
proc `<`(a, b: Fd): bool {.borrow, used.}
proc `==`(a, b: Id): bool {.borrow, used.}
proc `==`(a, b: Fd): bool {.borrow, used.}
proc put(w: var WaitingIds; fd: int | Fd; id: Id) =
while fd.int >= w.len:
setLen(w, w.len * 2)
system.`[]=`(w, fd.int, id)
case id
of wakeupId, invalidId: # don't count invalid ids
discard
else:
inc eq.waiters
assert eq.waiters > 0
proc get(w: var WaitingIds; fd: int | Fd): Id =
result = w[fd.int]
if result != wakeupId: # don't zap our wakeup id
if result != invalidId: # don't count invalid ids
dec eq.waiters
w[fd.int] = invalidId
method clone(c: Cont): Cont {.base.} =
## copy the continuation for the purposes of, eg. fork
new result
result[] = c[]
proc init() {.inline.} =
## initialize the event queue to prepare it for requests
if eq.state == Unready:
# create a new manager
eq.timer = invalidFd
eq.manager = newSelector[Clock]()
eq.wake = newSelectEvent()
eq.eager = false
eq.selector = newSelector[Id]()
eq.waiters = 0
# make sure we have a decent amount of space for registrations
if len(eq.waiting) < eqPoolSize:
eq.waiting = newSeq[Id](eqPoolSize).WaitingIds
# the manager wakes up when triggered to do so
registerEvent(eq.manager, eq.wake, now())
# so does the main selector
registerEvent(eq.selector, eq.wake, wakeupId)
# XXX: this seems to be the only reasonable way to get our wakeup fd
# we want to get the fd used for the wakeup event
trigger eq.wake
for ready in select(eq.selector, -1):
assert User in ready.events
eq.waiting.put(ready.fd, wakeupId)
eq.lastId = invalidId
eq.yields = initDeque[Cont]()
eq.state = Stopped
proc nextId(): Id {.inline.} =
## generate a new registration identifier
init()
# rollover is pretty unlikely, right?
when sizeof(eq.lastId) < 8:
if (unlikely) eq.lastId == high(eq.lastId):
eq.lastId = succ(invalidId)
else:
inc eq.lastId
else:
inc eq.lastId
result = eq.lastId
proc newSemaphore*(): Semaphore =
## Create a new Semaphore.
result.init nextId().int
proc wakeUp() =
case eq.state
of Unready:
init()
of Stopped:
discard "ignored wake-up to stopped dispatcher"
of Running:
if not eq.eager:
trigger eq.wake
eq.eager = true
of Stopping:
discard "ignored wake-up request; dispatcher is stopping"
template wakeAfter(body: untyped): untyped =
## wake up the dispatcher after performing the following block
init()
try:
body
finally:
wakeUp()
proc len*(eq: EventQueue): int =
## The number of pending continuations.
result = len(eq.goto) + len(eq.yields) + len(eq.pending)
proc `[]=`(eq: var EventQueue; s: var Semaphore; id: Id) =
## put a semaphore into the queue with its registration
assert id != invalidId
assert id != wakeupId
assert not s.isReady
assert s.id.Id != invalidId
eq.pending[s] = id
proc `[]=`(eq: var EventQueue; id: Id; cont: Cont) =
## put a continuation into the queue according to its registration
assert id != invalidId
assert id != wakeupId
assert id notin eq.goto
eq.goto[id] = cont
proc add(eq: var EventQueue; cont: Cont): Id =
## Add a continuation to the queue; returns a registration.
result = nextId()
eq[result] = cont
when eqDebug:
echo "🤞queue ", $result, " now ", len(eq), " items"
proc stop*() =
## Tell the dispatcher to stop, discarding all pending continuations.
if eq.state == Running:
eq.state = Stopping
# tear down the manager
assert not eq.manager.isNil
eq.manager.unregister eq.wake
if eq.timer != invalidFd:
eq.manager.unregister eq.timer.int
eq.timer = invalidFd
close(eq.manager)
# shutdown the wake-up trigger
eq.selector.unregister eq.wake
close(eq.wake)
# discard the current selector to dismiss any pending events
close(eq.selector)
# discard the contents of the semaphore cache
eq.pending = initTable[Semaphore, Id](eqPoolSize)
# discard the contents of the continuation cache
eq.goto = initTable[Id, Cont]()
# re-initialize the queue
eq.state = Unready
init()
proc init*(c: Cont): Cont =
result = c
when cpsTrace:
import std/strformat
proc init*(c: Cont; identity: static[string];
file: static[string]; row, col: static[int]): Cont =
result = init c
result.identity = identity
result.filename = file
result.line = row
result.column = col
proc addFrame(stack: var Stack; c: Cont) =
if c.state != Dismissed:
while stack.len >= eqTraceSize:
popFirst stack
stack.addLast Frame(c: c)
proc formatDuration(d: Duration): string =
## format a duration to a nice string
let
n = d.inNanoseconds
ss = (n div 1_000_000_000) mod 1_000
ms = (n div 1_000_000) mod 1_000
us = (n div 1_000) mod 1_000
ns = (n div 1) mod 1_000
try:
result = fmt"{ss:>3}s {ms:>3}ms {us:>3}μs {ns:>3}ns"
except:
result = [$ss, $ms, $us, $ns].join(" ")
proc `$`(f: Frame): string =
result = $f.c
when eqDebug:
let took = formatDuration(f.c.delay)
result.add "\n"
result.add took.align(20) & " delay"
proc writeStackTrace(stack: Stack) =
if stack.len == 0:
writeLine(stderr, "no stack recorded")
else:
writeLine(stderr, "noroutine stack: (most recent call last)")
when eqDebug:
var prior = stack[0].c.clock
for i, frame in stack.pairs:
let took = formatDuration(frame.c.clock - prior)
prior = frame.c.clock
writeLine(stderr, $frame)
writeLine(stderr, took.align(20) & " total")
else:
for frame in stack.items:
writeLine(stderr, $frame)
else:
proc writeStackTrace(c: Cont): Cont =
result = c
warning "--define:cpsTrace:on to output traces"
proc trampoline*(c: Cont) =
## Run the supplied continuation until it is complete.
var c: Continuation = c
when cpsTrace:
var stack = initDeque[Frame](eqTraceSize)
while c.running:
when eqDebug:
echo "🎪tramp ", c, " at ", c.clock
try:
c = c.fn(c)
when cpsTrace:
addFrame(stack, c)
except CatchableError:
when cpsTrace:
writeStackTrace stack
raise
proc poll*() =
## See what continuations need running and run them.
if eq.state != Running: return
eq.eager = false # make sure we can trigger again
if eq.waiters > 0:
when eqDebug:
let clock = now()
# ready holds the ready file descriptors and their events.
let ready = select(eq.selector, -1)
for event in ready.items:
# get the registration of the pending continuation
let id = eq.waiting.get(event.fd)
# pity the fool that removed this assert due to ioselectors spam
#assert getData(eq.selector, event.fd) == id
# the id will be wakeupId if it's a wake-up event
assert id != invalidId
if id == wakeupId:
discard
else:
# stop listening on this fd
unregister(eq.selector, event.fd)
var cont: Cont
if take(eq.goto, id, cont):
when eqDebug:
cont.clock = clock
cont.delay = now() - clock
cont.id = id
cont.fd = event.fd.Fd
echo "💈delay ", id, " ", cont.delay
trampoline cont
else:
raise newException(KeyError, "missing registration " & $id)
if len(eq.yields) > 0:
# run no more than the current number of ready continuations
for index in 1 .. len(eq.yields):
let cont = popFirst eq.yields
trampoline cont
# if there are no pending continuations,
if len(eq) == 0:
# and there is no polling timer setup,
if eq.timer == invalidFd:
# then we'll stop the dispatcher now.
stop()
else:
when eqDebug:
echo "💈"
# else wait until the next polling interval or signal
for ready in eq.manager.select(-1):
# if we get any kind of error, all we can reasonably do is stop
if ready.errorCode.int != 0:
stop()
raiseOSError(ready.errorCode, "eventqueue error")
break
proc run*(interval: Duration = DurationZero) =
## The dispatcher runs with a maximal polling interval; an `interval` of
## `DurationZero` causes the dispatcher to return when the queue is empty.
# make sure the eventqueue is ready to run
init()
assert eq.state in {Running, Stopped}, $eq.state
if interval.inMilliseconds == 0:
discard "the dispatcher returns after emptying the queue"
else:
# the manager wakes up repeatedly, according to the provided interval
eq.timer = registerTimer(eq.manager,
timeout = interval.inMilliseconds.int,
oneshot = false, data = now()).Fd
# the dispatcher is now running
eq.state = Running
while eq.state == Running:
poll()
proc coop*(c: Cont): Cont {.cpsMagic.} =
## Pass control to other pending continuations in the dispatcher before
## continuing; effectively a cooperative yield.
wakeAfter:
addLast(eq.yields, c)
proc jield*(c: Cont): Cont {.cpsMagic, deprecated: "renamed to coop()".} =
coop c
proc sleep*(c: Cont; interval: Duration): Cont {.cpsMagic.} =
## Sleep for `interval` before continuing.
if interval < oneMs:
raise newException(ValueError, "intervals < 1ms unsupported")
else:
wakeAfter:
let id = eq.add(c)
let fd = registerTimer(eq.selector,
timeout = interval.inMilliseconds.int,
oneshot = true, data = id)
eq.waiting.put(fd, id)
when eqDebug:
echo "⏰timer ", fd.Fd
proc sleep*(c: Cont; ms: int): Cont {.cpsMagic.} =
## Sleep for `ms` milliseconds before continuing.
let interval = initDuration(milliseconds = ms)
sleep(c, interval)
proc sleep*(c: Cont; secs: float): Cont {.cpsMagic.} =
## Sleep for `secs` seconds before continuing.
sleep(c, (1_000 * secs).int)
proc dismiss*(c: Cont): Cont {.cpsMagic.} =
## Discard the current continuation.
discard
proc noop*(c: Cont): Cont {.cpsMagic.} =
## A primitive that merely sheds scope.
result = c
template signalImpl(s: Semaphore; body: untyped): untyped =
## run the body when when semaphore is NOT found in the queue
var trigger = false
var id = invalidId
try:
if take(eq.pending, s, id):
var c: Cont
if take(eq.goto, id, c):
addLast(eq.yields, c)
trigger = true
else:
body
finally:
if trigger:
wakeUp()
proc signal*(s: var Semaphore) =
## Signal the given Semaphore `s`, causing the first waiting continuation
## to be queued for execution in the dispatcher; control remains in
## the calling procedure.
semaphore.signal s
withReady s:
init()
signalImpl s:
discard
proc signalAll*(s: var Semaphore) =
## Signal the given Semaphore `s`, causing all waiting continuations
## to be queued for execution in the dispatcher; control remains in
## the calling procedure.
semaphore.signal s
if s.isReady:
init()
while true:
signalImpl s:
break
proc wait*(c: Cont; s: var Semaphore): Cont {.cpsMagic.} =
## Queue the current continuation pending readiness of the given
## Semaphore `s`.
let id = nextId()
if s.isReady:
addLast(eq.yields, c)
wakeUp()
else:
eq[s] = id
eq[id] = c
proc fork*(c: Cont): Cont {.cpsMagic.} =
## Duplicate the current continuation.
result = c
wakeAfter:
addLast(eq.yields, clone(c))
proc spawn*(c: Cont) =
## Queue the supplied continuation `c`; control remains in the calling
## procedure.
wakeAfter:
addLast(eq.yields, c)
proc iowait*(c: Cont; file: int | SocketHandle;
events: set[Event]): Cont {.cpsMagic.} =
## Continue upon any of `events` on the given file-descriptor or
## SocketHandle.
if len(events) == 0:
raise newException(ValueError, "no events supplied")
else:
wakeAfter:
let id = eq.add(c)
registerHandle(eq.selector, file, events = events, data = id)
eq.waiting.put(file.int, id)
when eqDebug:
echo "📂file ", $Fd(file)