-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AsyncEventQueue (AsyncEventBus replacement). #275
Merged
Merged
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
d675f6c
AsyncEventQueue (AsyncEventBus replacement) initial commit.
cheatfate bda5d38
Add closeWait() and remove assertion test.
cheatfate 978df76
Fix "possible" memory leak.
cheatfate 4e6ec9e
Add limits to AsyncEventQueue[T].
cheatfate f594b3d
Adopt AsyncEventQueue to garbage collect events on emit() too.
cheatfate c834a0e
Remove AsyncEventBus test suite.
cheatfate 5799173
Remove unneeded [T] usage.
cheatfate d50f1af
Optimize memory usage in 1m test.
cheatfate 80c4d11
Lower number of events in test from 1m to 100k.
cheatfate da2106a
Deprecate AsyncEventBus.
cheatfate f17ab03
Fix mistype.
cheatfate f5a7c47
One more attempt to fix crash.
cheatfate c543bcc
Add some echo debugging.
cheatfate ace652d
More echo debugging.
cheatfate 597a3b2
More closer debug echoes.
cheatfate fe85ba9
Attempt to workaround crash place.
cheatfate ddc41a9
More debugging echoes in exception handlers.
cheatfate abe488c
Convert suspected test into async procedure.
cheatfate 4e38a77
Make multiple consumers test async.
cheatfate 22d4c4d
Remove GC debugging.
cheatfate 4e2e7ac
Disable added tests.
cheatfate 70338e5
Disable AsyncEventQueue tests to confirm that this is an issue.
cheatfate 88d8477
Enable all the tests.
cheatfate File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,7 @@ | |
|
||
{.push raises: [Defect].} | ||
|
||
import std/[sequtils, deques, tables, typetraits] | ||
import std/[sequtils, math, deques, tables, typetraits] | ||
import ./asyncloop | ||
export asyncloop | ||
|
||
|
@@ -55,11 +55,11 @@ type | |
queue: Deque[T] | ||
maxsize: int | ||
|
||
AsyncQueueEmptyError* = object of CatchableError | ||
AsyncQueueEmptyError* = object of AsyncError | ||
## ``AsyncQueue`` is empty. | ||
AsyncQueueFullError* = object of CatchableError | ||
AsyncQueueFullError* = object of AsyncError | ||
## ``AsyncQueue`` is full. | ||
AsyncLockError* = object of CatchableError | ||
AsyncLockError* = object of AsyncError | ||
## ``AsyncLock`` is either locked or unlocked. | ||
|
||
EventBusSubscription*[T] = proc(bus: AsyncEventBus, | ||
|
@@ -106,6 +106,23 @@ type | |
eventName: string | ||
payload: EventPayloadBase | ||
|
||
AsyncEventQueueFullError* = object of AsyncError | ||
|
||
EventQueueKey* = distinct uint64 | ||
|
||
EventQueueReader* = object | ||
key: EventQueueKey | ||
offset: int | ||
waiter: Future[void] | ||
overflow: bool | ||
|
||
AsyncEventQueue*[T] = ref object of RootObj | ||
readers: seq[EventQueueReader] | ||
queue: Deque[T] | ||
counter: uint64 | ||
limit: int | ||
offset: int | ||
|
||
proc newAsyncLock*(): AsyncLock = | ||
## Creates new asynchronous lock ``AsyncLock``. | ||
## | ||
|
@@ -448,7 +465,9 @@ proc `$`*[T](aq: AsyncQueue[T]): string = | |
template generateKey(typeName, eventName: string): string = | ||
"type[" & typeName & "]-key[" & eventName & "]" | ||
|
||
proc newAsyncEventBus*(): AsyncEventBus = | ||
proc newAsyncEventBus*(): AsyncEventBus {. | ||
deprecated: "Implementation has unfixable flaws, please use" & | ||
"AsyncEventQueue[T] instead".} = | ||
## Creates new ``AsyncEventBus``. | ||
AsyncEventBus(counter: 0'u64, events: initTable[string, EventItem]()) | ||
|
||
|
@@ -460,7 +479,9 @@ template location*(payload: EventPayloadBase): SrcLoc = | |
## Returns source location address of event emitter. | ||
payload.loc[] | ||
|
||
proc get*(event: AwaitableEvent, T: typedesc): T = | ||
proc get*(event: AwaitableEvent, T: typedesc): T {. | ||
deprecated: "Implementation has unfixable flaws, please use " & | ||
"AsyncEventQueue[T] instead".} = | ||
## Returns event's payload of type ``T`` from event ``event``. | ||
cast[EventPayload[T]](event.payload).value | ||
|
||
|
@@ -472,7 +493,9 @@ template location*(event: AwaitableEvent): SrcLoc = | |
## Returns source location address of event emitter. | ||
event.payload.loc[] | ||
|
||
proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] = | ||
proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] {. | ||
deprecated: "Implementation has unfixable flaws, please use " & | ||
"AsyncEventQueue[T] instead".} = | ||
## Wait for the event from AsyncEventBus ``bus`` with name ``event``. | ||
## | ||
## Returned ``Future[T]`` will hold event's payload of type ``T``. | ||
|
@@ -488,7 +511,9 @@ proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] = | |
bus.events.mgetOrPut(eventKey, default).waiters.add(baseFuture) | ||
retFuture | ||
|
||
proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] = | ||
proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] {. | ||
deprecated: "Implementation has unfixable flaws, please use " & | ||
"AsyncEventQueue[T] instead".} = | ||
## Wait for any event from AsyncEventBus ``bus``. | ||
## | ||
## Returns ``Future`` which holds helper object. Using this object you can | ||
|
@@ -502,7 +527,9 @@ proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] = | |
retFuture | ||
|
||
proc subscribe*[T](bus: AsyncEventBus, event: string, | ||
callback: EventBusSubscription[T]): EventBusKey = | ||
callback: EventBusSubscription[T]): EventBusKey {. | ||
deprecated: "Implementation has unfixable flaws, please use " & | ||
"AsyncEventQueue[T] instead".} = | ||
## Subscribe to the event ``event`` passed through eventbus ``bus`` with | ||
## callback ``callback``. | ||
## | ||
|
@@ -524,7 +551,9 @@ proc subscribe*[T](bus: AsyncEventBus, event: string, | |
subkey | ||
|
||
proc subscribeAll*(bus: AsyncEventBus, | ||
callback: EventBusAllSubscription): EventBusKey = | ||
callback: EventBusAllSubscription): EventBusKey {. | ||
deprecated: "Implementation has unfixable flaws, please use " & | ||
"AsyncEventQueue instead".} = | ||
## Subscribe to all events passed through eventbus ``bus`` with callback | ||
## ``callback``. | ||
## | ||
|
@@ -542,7 +571,9 @@ proc subscribeAll*(bus: AsyncEventBus, | |
bus.subscribers.add(subkey) | ||
subkey | ||
|
||
proc unsubscribe*(bus: AsyncEventBus, key: EventBusKey) = | ||
proc unsubscribe*(bus: AsyncEventBus, key: EventBusKey) {. | ||
deprecated: "Implementation has unfixable flaws, please use " & | ||
"AsyncEventQueue instead".} = | ||
## Cancel subscription of subscriber with key ``key`` from eventbus ``bus``. | ||
let eventKey = generateKey(key.typeName, key.eventName) | ||
|
||
|
@@ -590,7 +621,9 @@ proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) = | |
for subscriber in bus.subscribers: | ||
triggerSubscriberCallback(subscriber) | ||
|
||
template emit*[T](bus: AsyncEventBus, event: string, data: T) = | ||
template emit*[T](bus: AsyncEventBus, event: string, data: T) {. | ||
deprecated: "Implementation has unfixable flaws, please use " & | ||
"AsyncEventQueue instead".} = | ||
## Emit new event ``event`` to the eventbus ``bus`` with payload ``data``. | ||
emit(bus, event, data, getSrcLocation()) | ||
|
||
|
@@ -605,8 +638,205 @@ proc emitWait[T](bus: AsyncEventBus, event: string, data: T, | |
return retFuture | ||
|
||
template emitWait*[T](bus: AsyncEventBus, event: string, | ||
data: T): Future[void] = | ||
data: T): Future[void] {. | ||
deprecated: "Implementation has unfixable flaws, please use " & | ||
"AsyncEventQueue instead".} = | ||
## Emit new event ``event`` to the eventbus ``bus`` with payload ``data`` and | ||
## wait until all the subscribers/waiters will receive notification about | ||
## event. | ||
emitWait(bus, event, data, getSrcLocation()) | ||
|
||
proc `==`(a, b: EventQueueKey): bool {.borrow.} | ||
|
||
proc compact(ab: AsyncEventQueue) {.raises: [Defect].} = | ||
if len(ab.readers) > 0: | ||
let minOffset = | ||
block: | ||
var res = -1 | ||
for reader in ab.readers.items(): | ||
if not(reader.overflow): | ||
res = reader.offset | ||
break | ||
res | ||
|
||
if minOffset == -1: | ||
ab.offset += len(ab.queue) | ||
ab.queue.clear() | ||
else: | ||
doAssert(minOffset >= ab.offset) | ||
if minOffset > ab.offset: | ||
let delta = minOffset - ab.offset | ||
ab.queue.shrink(fromFirst = delta) | ||
ab.offset += delta | ||
else: | ||
ab.queue.clear() | ||
|
||
proc getReaderIndex(ab: AsyncEventQueue, key: EventQueueKey): int {. | ||
raises: [Defect].} = | ||
for index, value in ab.readers.pairs(): | ||
if value.key == key: | ||
return index | ||
-1 | ||
|
||
proc newAsyncEventQueue*[T](limitSize = 0): AsyncEventQueue[T] {. | ||
raises: [Defect].} = | ||
## Creates new ``AsyncEventBus`` maximum size of ``limitSize`` (default is | ||
## ``0`` which means that there no limits). | ||
## | ||
## When number of events emitted exceeds ``limitSize`` - emit() procedure | ||
## will discard new events, consumers which has number of pending events | ||
## more than ``limitSize`` will get ``AsyncEventQueueFullError`` | ||
## error. | ||
doAssert(limitSize >= 0, "Limit size should be non-negative integer") | ||
let queue = | ||
if limitSize == 0: | ||
initDeque[T]() | ||
elif isPowerOfTwo(limitSize + 1): | ||
initDeque[T](limitSize + 1) | ||
else: | ||
initDeque[T](nextPowerOfTwo(limitSize + 1)) | ||
AsyncEventQueue[T](counter: 0'u64, queue: queue, limit: limitSize) | ||
|
||
proc len*(ab: AsyncEventQueue): int {.raises: [Defect].} = | ||
len(ab.queue) | ||
|
||
proc register*(ab: AsyncEventQueue): EventQueueKey {.raises: [Defect].} = | ||
inc(ab.counter) | ||
let reader = EventQueueReader(key: EventQueueKey(ab.counter), | ||
offset: ab.offset + len(ab.queue), | ||
overflow: false) | ||
ab.readers.add(reader) | ||
EventQueueKey(ab.counter) | ||
|
||
proc unregister*(ab: AsyncEventQueue, key: EventQueueKey) {. | ||
raises: [Defect] .} = | ||
let index = ab.getReaderIndex(key) | ||
if index >= 0: | ||
let reader = ab.readers[index] | ||
# Completing pending Future to avoid deadlock. | ||
if not(isNil(reader.waiter)) and not(reader.waiter.finished()): | ||
reader.waiter.complete() | ||
ab.readers.delete(index) | ||
ab.compact() | ||
|
||
proc close*(ab: AsyncEventQueue) {.raises: [Defect].} = | ||
for reader in ab.readers.items(): | ||
if not(isNil(reader.waiter)) and not(reader.waiter.finished()): | ||
reader.waiter.complete() | ||
ab.readers.reset() | ||
ab.queue.clear() | ||
|
||
proc closeWait*(ab: AsyncEventQueue): Future[void] {.raises: [Defect].} = | ||
var retFuture = newFuture[void]("AsyncEventQueue.closeWait()") | ||
proc continuation(udata: pointer) {.gcsafe.} = | ||
if not(retFuture.finished()): | ||
retFuture.complete() | ||
ab.close() | ||
# Schedule `continuation` to be called only after all the `reader` | ||
# notifications will be scheduled and processed. | ||
callSoon(continuation) | ||
retFuture | ||
|
||
template readerOverflow*(ab: AsyncEventQueue, | ||
reader: EventQueueReader): bool = | ||
ab.limit + (reader.offset - ab.offset) <= len(ab.queue) | ||
|
||
proc emit*[T](ab: AsyncEventQueue[T], data: T) {.raises: [Defect].} = | ||
if len(ab.readers) > 0: | ||
# We enqueue `data` only if there active reader present. | ||
var changesPresent = false | ||
let couldEmit = | ||
if ab.limit == 0: | ||
true | ||
else: | ||
# Because ab.readers is sequence sorted by `offset`, we will apply our | ||
# limit to the most recent consumer. | ||
if ab.readerOverflow(ab.readers[^1]): | ||
cheatfate marked this conversation as resolved.
Show resolved
Hide resolved
|
||
false | ||
else: | ||
true | ||
|
||
if couldEmit: | ||
if ab.limit != 0: | ||
for reader in ab.readers.mitems(): | ||
if not(reader.overflow): | ||
if ab.readerOverflow(reader): | ||
reader.overflow = true | ||
changesPresent = true | ||
ab.queue.addLast(data) | ||
cheatfate marked this conversation as resolved.
Show resolved
Hide resolved
arnetheduck marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for reader in ab.readers.mitems(): | ||
if not(isNil(reader.waiter)) and not(reader.waiter.finished()): | ||
reader.waiter.complete() | ||
else: | ||
for reader in ab.readers.mitems(): | ||
if not(reader.overflow): | ||
reader.overflow = true | ||
changesPresent = true | ||
|
||
if changesPresent: | ||
ab.compact() | ||
|
||
proc waitEvents*[T](ab: AsyncEventQueue[T], | ||
key: EventQueueKey, | ||
eventsCount = -1): Future[seq[T]] {.async.} = | ||
## Wait for events | ||
var | ||
events: seq[T] | ||
resetFuture = false | ||
|
||
while true: | ||
# We need to obtain reader index at every iteration, because `ab.readers` | ||
# sequence could be changed after `await waitFuture` call. | ||
let index = ab.getReaderIndex(key) | ||
if index < 0: | ||
# We going to return everything we have in `events`. | ||
break | ||
|
||
if resetFuture: | ||
resetFuture = false | ||
ab.readers[index].waiter = nil | ||
|
||
let reader = ab.readers[index] | ||
doAssert(isNil(reader.waiter), | ||
"Concurrent waits on same key are not allowed!") | ||
|
||
if reader.overflow: | ||
raise newException(AsyncEventQueueFullError, | ||
"AsyncEventQueue size exceeds limits") | ||
|
||
let length = len(ab.queue) + ab.offset | ||
doAssert(length >= ab.readers[index].offset) | ||
if length == ab.readers[index].offset: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all this offset stuff is to avoid copying the list of readers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, |
||
# We are at the end of queue, it means that we should wait for new events. | ||
let waitFuture = newFuture[void]("AsyncEventQueue.waitEvents") | ||
ab.readers[index].waiter = waitFuture | ||
resetFuture = true | ||
await waitFuture | ||
else: | ||
let | ||
itemsInQueue = length - ab.readers[index].offset | ||
itemsOffset = ab.readers[index].offset - ab.offset | ||
itemsCount = | ||
if eventsCount <= 0: | ||
itemsInQueue | ||
else: | ||
min(itemsInQueue, eventsCount - len(events)) | ||
|
||
for i in 0 ..< itemsCount: | ||
events.add(ab.queue[itemsOffset + i]) | ||
ab.readers[index].offset += itemsCount | ||
|
||
# Keep readers sequence sorted by `offset` field. | ||
var slider = index | ||
while (slider + 1 < len(ab.readers)) and | ||
cheatfate marked this conversation as resolved.
Show resolved
Hide resolved
|
||
(ab.readers[slider].offset > ab.readers[slider + 1].offset): | ||
swap(ab.readers[slider], ab.readers[slider + 1]) | ||
inc(slider) | ||
|
||
# Shrink data queue. | ||
ab.compact() | ||
|
||
if (eventsCount <= 0) or (len(events) == eventsCount): | ||
break | ||
|
||
return events |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be a cancel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? to raise
CancelledError
fromcloseWait()
procedure?The idea here is to ensure that all event consumers will receive notification about that
AsyncEventQueue
is get closed, and because all this callbacks get schedule insideab.close()
call, it means thatcontinuation
procedure will be scheduled after all the consumer notification callbacks will be already processed.