Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncEventQueue (AsyncEventBus replacement). #275

Merged
merged 23 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 243 additions & 13 deletions chronos/asyncsync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

{.push raises: [Defect].}

import std/[sequtils, deques, tables, typetraits]
import std/[sequtils, math, deques, tables, typetraits]
import ./asyncloop
export asyncloop

Expand Down Expand Up @@ -55,11 +55,11 @@ type
queue: Deque[T]
maxsize: int

AsyncQueueEmptyError* = object of CatchableError
AsyncQueueEmptyError* = object of AsyncError
## ``AsyncQueue`` is empty.
AsyncQueueFullError* = object of CatchableError
AsyncQueueFullError* = object of AsyncError
## ``AsyncQueue`` is full.
AsyncLockError* = object of CatchableError
AsyncLockError* = object of AsyncError
## ``AsyncLock`` is either locked or unlocked.

EventBusSubscription*[T] = proc(bus: AsyncEventBus,
Expand Down Expand Up @@ -106,6 +106,23 @@ type
eventName: string
payload: EventPayloadBase

AsyncEventQueueFullError* = object of AsyncError

EventQueueKey* = distinct uint64

EventQueueReader* = object
key: EventQueueKey
offset: int
waiter: Future[void]
overflow: bool

AsyncEventQueue*[T] = ref object of RootObj
readers: seq[EventQueueReader]
queue: Deque[T]
counter: uint64
limit: int
offset: int

proc newAsyncLock*(): AsyncLock =
## Creates new asynchronous lock ``AsyncLock``.
##
Expand Down Expand Up @@ -448,7 +465,9 @@ proc `$`*[T](aq: AsyncQueue[T]): string =
template generateKey(typeName, eventName: string): string =
"type[" & typeName & "]-key[" & eventName & "]"

proc newAsyncEventBus*(): AsyncEventBus =
proc newAsyncEventBus*(): AsyncEventBus {.
deprecated: "Implementation has unfixable flaws, please use" &
"AsyncEventQueue[T] instead".} =
## Creates new ``AsyncEventBus``.
AsyncEventBus(counter: 0'u64, events: initTable[string, EventItem]())

Expand All @@ -460,7 +479,9 @@ template location*(payload: EventPayloadBase): SrcLoc =
## Returns source location address of event emitter.
payload.loc[]

proc get*(event: AwaitableEvent, T: typedesc): T =
proc get*(event: AwaitableEvent, T: typedesc): T {.
deprecated: "Implementation has unfixable flaws, please use " &
"AsyncEventQueue[T] instead".} =
## Returns event's payload of type ``T`` from event ``event``.
cast[EventPayload[T]](event.payload).value

Expand All @@ -472,7 +493,9 @@ template location*(event: AwaitableEvent): SrcLoc =
## Returns source location address of event emitter.
event.payload.loc[]

proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] =
proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] {.
deprecated: "Implementation has unfixable flaws, please use " &
"AsyncEventQueue[T] instead".} =
## Wait for the event from AsyncEventBus ``bus`` with name ``event``.
##
## Returned ``Future[T]`` will hold event's payload of type ``T``.
Expand All @@ -488,7 +511,9 @@ proc waitEvent*(bus: AsyncEventBus, T: typedesc, event: string): Future[T] =
bus.events.mgetOrPut(eventKey, default).waiters.add(baseFuture)
retFuture

proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] =
proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] {.
deprecated: "Implementation has unfixable flaws, please use " &
"AsyncEventQueue[T] instead".} =
## Wait for any event from AsyncEventBus ``bus``.
##
## Returns ``Future`` which holds helper object. Using this object you can
Expand All @@ -502,7 +527,9 @@ proc waitAllEvents*(bus: AsyncEventBus): Future[AwaitableEvent] =
retFuture

proc subscribe*[T](bus: AsyncEventBus, event: string,
callback: EventBusSubscription[T]): EventBusKey =
callback: EventBusSubscription[T]): EventBusKey {.
deprecated: "Implementation has unfixable flaws, please use " &
"AsyncEventQueue[T] instead".} =
## Subscribe to the event ``event`` passed through eventbus ``bus`` with
## callback ``callback``.
##
Expand All @@ -524,7 +551,9 @@ proc subscribe*[T](bus: AsyncEventBus, event: string,
subkey

proc subscribeAll*(bus: AsyncEventBus,
callback: EventBusAllSubscription): EventBusKey =
callback: EventBusAllSubscription): EventBusKey {.
deprecated: "Implementation has unfixable flaws, please use " &
"AsyncEventQueue instead".} =
## Subscribe to all events passed through eventbus ``bus`` with callback
## ``callback``.
##
Expand All @@ -542,7 +571,9 @@ proc subscribeAll*(bus: AsyncEventBus,
bus.subscribers.add(subkey)
subkey

proc unsubscribe*(bus: AsyncEventBus, key: EventBusKey) =
proc unsubscribe*(bus: AsyncEventBus, key: EventBusKey) {.
deprecated: "Implementation has unfixable flaws, please use " &
"AsyncEventQueue instead".} =
## Cancel subscription of subscriber with key ``key`` from eventbus ``bus``.
let eventKey = generateKey(key.typeName, key.eventName)

Expand Down Expand Up @@ -590,7 +621,9 @@ proc emit[T](bus: AsyncEventBus, event: string, data: T, loc: ptr SrcLoc) =
for subscriber in bus.subscribers:
triggerSubscriberCallback(subscriber)

template emit*[T](bus: AsyncEventBus, event: string, data: T) =
template emit*[T](bus: AsyncEventBus, event: string, data: T) {.
deprecated: "Implementation has unfixable flaws, please use " &
"AsyncEventQueue instead".} =
## Emit new event ``event`` to the eventbus ``bus`` with payload ``data``.
emit(bus, event, data, getSrcLocation())

Expand All @@ -605,8 +638,205 @@ proc emitWait[T](bus: AsyncEventBus, event: string, data: T,
return retFuture

template emitWait*[T](bus: AsyncEventBus, event: string,
data: T): Future[void] =
data: T): Future[void] {.
deprecated: "Implementation has unfixable flaws, please use " &
"AsyncEventQueue instead".} =
## Emit new event ``event`` to the eventbus ``bus`` with payload ``data`` and
## wait until all the subscribers/waiters will receive notification about
## event.
emitWait(bus, event, data, getSrcLocation())

proc `==`(a, b: EventQueueKey): bool {.borrow.}

proc compact(ab: AsyncEventQueue) {.raises: [Defect].} =
if len(ab.readers) > 0:
let minOffset =
block:
var res = -1
for reader in ab.readers.items():
if not(reader.overflow):
res = reader.offset
break
res

if minOffset == -1:
ab.offset += len(ab.queue)
ab.queue.clear()
else:
doAssert(minOffset >= ab.offset)
if minOffset > ab.offset:
let delta = minOffset - ab.offset
ab.queue.shrink(fromFirst = delta)
ab.offset += delta
else:
ab.queue.clear()

proc getReaderIndex(ab: AsyncEventQueue, key: EventQueueKey): int {.
raises: [Defect].} =
for index, value in ab.readers.pairs():
if value.key == key:
return index
-1

proc newAsyncEventQueue*[T](limitSize = 0): AsyncEventQueue[T] {.
raises: [Defect].} =
## Creates new ``AsyncEventBus`` maximum size of ``limitSize`` (default is
## ``0`` which means that there no limits).
##
## When number of events emitted exceeds ``limitSize`` - emit() procedure
## will discard new events, consumers which has number of pending events
## more than ``limitSize`` will get ``AsyncEventQueueFullError``
## error.
doAssert(limitSize >= 0, "Limit size should be non-negative integer")
let queue =
if limitSize == 0:
initDeque[T]()
elif isPowerOfTwo(limitSize + 1):
initDeque[T](limitSize + 1)
else:
initDeque[T](nextPowerOfTwo(limitSize + 1))
AsyncEventQueue[T](counter: 0'u64, queue: queue, limit: limitSize)

proc len*(ab: AsyncEventQueue): int {.raises: [Defect].} =
len(ab.queue)

proc register*(ab: AsyncEventQueue): EventQueueKey {.raises: [Defect].} =
inc(ab.counter)
let reader = EventQueueReader(key: EventQueueKey(ab.counter),
offset: ab.offset + len(ab.queue),
overflow: false)
ab.readers.add(reader)
EventQueueKey(ab.counter)

proc unregister*(ab: AsyncEventQueue, key: EventQueueKey) {.
raises: [Defect] .} =
let index = ab.getReaderIndex(key)
if index >= 0:
let reader = ab.readers[index]
# Completing pending Future to avoid deadlock.
if not(isNil(reader.waiter)) and not(reader.waiter.finished()):
reader.waiter.complete()
ab.readers.delete(index)
ab.compact()

proc close*(ab: AsyncEventQueue) {.raises: [Defect].} =
for reader in ab.readers.items():
if not(isNil(reader.waiter)) and not(reader.waiter.finished()):
reader.waiter.complete()
ab.readers.reset()
ab.queue.clear()

proc closeWait*(ab: AsyncEventQueue): Future[void] {.raises: [Defect].} =
var retFuture = newFuture[void]("AsyncEventQueue.closeWait()")
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be a cancel?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? to raise CancelledError from closeWait() procedure?
The idea here is to ensure that all event consumers will receive notification about that AsyncEventQueue is get closed, and because all this callbacks get schedule inside ab.close() call, it means that continuation procedure will be scheduled after all the consumer notification callbacks will be already processed.

ab.close()
# Schedule `continuation` to be called only after all the `reader`
# notifications will be scheduled and processed.
callSoon(continuation)
retFuture

template readerOverflow*(ab: AsyncEventQueue,
reader: EventQueueReader): bool =
ab.limit + (reader.offset - ab.offset) <= len(ab.queue)

proc emit*[T](ab: AsyncEventQueue[T], data: T) {.raises: [Defect].} =
if len(ab.readers) > 0:
# We enqueue `data` only if there active reader present.
var changesPresent = false
let couldEmit =
if ab.limit == 0:
true
else:
# Because ab.readers is sequence sorted by `offset`, we will apply our
# limit to the most recent consumer.
if ab.readerOverflow(ab.readers[^1]):
cheatfate marked this conversation as resolved.
Show resolved Hide resolved
false
else:
true

if couldEmit:
if ab.limit != 0:
for reader in ab.readers.mitems():
if not(reader.overflow):
if ab.readerOverflow(reader):
reader.overflow = true
changesPresent = true
ab.queue.addLast(data)
cheatfate marked this conversation as resolved.
Show resolved Hide resolved
arnetheduck marked this conversation as resolved.
Show resolved Hide resolved
for reader in ab.readers.mitems():
if not(isNil(reader.waiter)) and not(reader.waiter.finished()):
reader.waiter.complete()
else:
for reader in ab.readers.mitems():
if not(reader.overflow):
reader.overflow = true
changesPresent = true

if changesPresent:
ab.compact()

proc waitEvents*[T](ab: AsyncEventQueue[T],
key: EventQueueKey,
eventsCount = -1): Future[seq[T]] {.async.} =
## Wait for events
var
events: seq[T]
resetFuture = false

while true:
# We need to obtain reader index at every iteration, because `ab.readers`
# sequence could be changed after `await waitFuture` call.
let index = ab.getReaderIndex(key)
if index < 0:
# We going to return everything we have in `events`.
break

if resetFuture:
resetFuture = false
ab.readers[index].waiter = nil

let reader = ab.readers[index]
doAssert(isNil(reader.waiter),
"Concurrent waits on same key are not allowed!")

if reader.overflow:
raise newException(AsyncEventQueueFullError,
"AsyncEventQueue size exceeds limits")

let length = len(ab.queue) + ab.offset
doAssert(length >= ab.readers[index].offset)
if length == ab.readers[index].offset:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all this offset stuff is to avoid copying the list of readers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, offset indicates from which point in the queue you should start reading events.

# We are at the end of queue, it means that we should wait for new events.
let waitFuture = newFuture[void]("AsyncEventQueue.waitEvents")
ab.readers[index].waiter = waitFuture
resetFuture = true
await waitFuture
else:
let
itemsInQueue = length - ab.readers[index].offset
itemsOffset = ab.readers[index].offset - ab.offset
itemsCount =
if eventsCount <= 0:
itemsInQueue
else:
min(itemsInQueue, eventsCount - len(events))

for i in 0 ..< itemsCount:
events.add(ab.queue[itemsOffset + i])
ab.readers[index].offset += itemsCount

# Keep readers sequence sorted by `offset` field.
var slider = index
while (slider + 1 < len(ab.readers)) and
cheatfate marked this conversation as resolved.
Show resolved Hide resolved
(ab.readers[slider].offset > ab.readers[slider + 1].offset):
swap(ab.readers[slider], ab.readers[slider + 1])
inc(slider)

# Shrink data queue.
ab.compact()

if (eventsCount <= 0) or (len(events) == eventsCount):
break

return events
Loading