-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AsyncEventQueue (AsyncEventBus replacement). #275
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, as I see it, the principal difference between AsyncEventBus and AsyncEventQueue is that the former gets a single piece of data to users whereas the latter gets many.
Both are useful - ie it's often the case that one wants to notify some listeners about a change, but only the latest version of the change is interesting (ie a counter that changed and has to be displayed on a screen - if it changed many times before it's displayed, the latest value should be "read" in the event - is that the current semantic of AsyncEventBus
? If yes, I'd think we should keep both primitives rather than this being a replacement.
In terms of this queue, we'll need the option to set bounds on it so that we can deal with slow subscribers (by kicking them out, more or less).
var retFuture = newFuture[void]("AsyncEventQueue.closeWait()") | ||
proc continuation(udata: pointer) {.gcsafe.} = | ||
if not(retFuture.finished()): | ||
retFuture.complete() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be a cancel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? to raise CancelledError
from closeWait()
procedure?
The idea here is to ensure that all event consumers will receive notification about that AsyncEventQueue
is get closed, and because all this callbacks get schedule inside ab.close()
call, it means that continuation
procedure will be scheduled after all the consumer notification callbacks will be already processed.
|
||
let length = len(ab.queue) + ab.offset | ||
doAssert(length >= ab.readers[index].offset) | ||
if length == ab.readers[index].offset: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all this offset stuff is to avoid copying the list of readers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, offset
indicates from which point in the queue you should start reading events.
So as |
Because every consumer has its own |
Add tests for AsyncEventQueue[T] limits. Rebase AsyncSync errors to be children of AsyncError.
Add test from review comment.
Add some GC debugging for tests.
Because its impossible to achieve goals which was initially set for
AsyncEventBus
. So this new structure will become as replacement forAsyncEventBus
.