Skip to content

Commit

Permalink
node: add AsyncListener support
Browse files Browse the repository at this point in the history
AsyncListener is a JS API that works in tandem with the AsyncWrap class
to allow the user to be alerted to key events in the life cycle of an
asynchronous event. The AsyncWrap class has its own MakeCallback
implementation that core will be migrated to use, and uses state sharing
techniques to allow quicker communication between JS and C++ whether the
async event callbacks need to be called.
  • Loading branch information
trevnorris committed Oct 31, 2013
1 parent 21fbbd5 commit efa62fd
Show file tree
Hide file tree
Showing 26 changed files with 1,594 additions and 200 deletions.
92 changes: 77 additions & 15 deletions lib/timers.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ var TIMEOUT_MAX = 2147483647; // 2^31-1

var debug = require('util').debuglog('timer');

var asyncFlags = process._asyncFlags;
var runAsyncQueue = process._runAsyncQueue;
var loadAsyncQueue = process._loadAsyncQueue;
var unloadAsyncQueue = process._unloadAsyncQueue;

// Do a little housekeeping.
delete process._asyncFlags;
delete process._runAsyncQueue;
delete process._loadAsyncQueue;
delete process._unloadAsyncQueue;


// IDLE TIMEOUTS
//
Expand All @@ -44,6 +55,9 @@ var debug = require('util').debuglog('timer');
// value = list
var lists = {};

// Make Timer as monomorphic as possible.
Timer.prototype._asyncQueue = undefined;

// the main function - creates lists on demand and the watchers associated
// with them.
function insert(item, msecs) {
Expand Down Expand Up @@ -80,9 +94,9 @@ function listOnTimeout() {
var now = Timer.now();
debug('now: %s', now);

var first;
var diff, first, hasQueue, threw;
while (first = L.peek(list)) {
var diff = now - first._idleStart;
diff = now - first._idleStart;
if (diff < msecs) {
list.start(msecs - diff, 0);
debug('%d list wait because diff is %d', msecs, diff);
Expand All @@ -99,12 +113,20 @@ function listOnTimeout() {
//
// https://github.com/joyent/node/issues/2631
var domain = first.domain;
if (domain && domain._disposed) continue;
if (domain && domain._disposed)
continue;

hasQueue = !!first._asyncQueue;

try {
if (hasQueue)
loadAsyncQueue(first);
if (domain)
domain.enter();
var threw = true;
threw = true;
first._onTimeout();
if (hasQueue)
unloadAsyncQueue(first);
if (domain)
domain.exit();
threw = false;
Expand Down Expand Up @@ -162,7 +184,6 @@ exports.enroll = function(item, msecs) {
exports.active = function(item) {
var msecs = item._idleTimeout;
if (msecs >= 0) {

var list = lists[msecs];
if (!list || L.isEmpty(list)) {
insert(item, msecs);
Expand All @@ -171,6 +192,11 @@ exports.active = function(item) {
L.append(list, item);
}
}
// Whether or not a new TimerWrap needed to be created, this should run
// for each item. This way each "item" (i.e. timer) can properly have
// their own domain assigned.
if (asyncFlags[0] > 0)
runAsyncQueue(item);
};


Expand Down Expand Up @@ -316,16 +342,43 @@ L.init(immediateQueue);

function processImmediate() {
var queue = immediateQueue;
var domain, hasQueue, immediate;

immediateQueue = {};
L.init(immediateQueue);

while (L.isEmpty(queue) === false) {
var immediate = L.shift(queue);
var domain = immediate.domain;
if (domain) domain.enter();
immediate._onImmediate();
if (domain) domain.exit();
immediate = L.shift(queue);
hasQueue = !!immediate._asyncQueue;
domain = immediate.domain;

if (hasQueue)
loadAsyncQueue(immediate);
if (domain)
domain.enter();

var threw = true;
try {
immediate._onImmediate();
threw = false;
} finally {
if (threw) {
if (!L.isEmpty(queue)) {
// Handle any remaining on next tick, assuming we're still
// alive to do so.
while (!L.isEmpty(immediateQueue)) {
L.append(queue, L.shift(immediateQueue));
}
immediateQueue = queue;
process.nextTick(processImmediate);
}
}
}

if (hasQueue)
unloadAsyncQueue(immediate);
if (domain)
domain.exit();
}

// Only round-trip to C++ land if we have to. Calling clearImmediate() on an
Expand Down Expand Up @@ -357,7 +410,11 @@ exports.setImmediate = function(callback) {
process._immediateCallback = processImmediate;
}

if (process.domain) immediate.domain = process.domain;
// setImmediates are handled more like nextTicks.
if (asyncFlags[0] > 0)
runAsyncQueue(immediate);
if (process.domain)
immediate.domain = process.domain;

L.append(immediateQueue, immediate);

Expand Down Expand Up @@ -389,9 +446,10 @@ function unrefTimeout() {

debug('unrefTimer fired');

var first;
var diff, domain, first, hasQueue, threw;
while (first = L.peek(unrefList)) {
var diff = now - first._idleStart;
diff = now - first._idleStart;
hasQueue = !!first._asyncQueue;

if (diff < first._idleTimeout) {
diff = first._idleTimeout - diff;
Expand All @@ -403,17 +461,21 @@ function unrefTimeout() {

L.remove(first);

var domain = first.domain;
domain = first.domain;

if (!first._onTimeout) continue;
if (domain && domain._disposed) continue;

try {
if (hasQueue)
loadAsyncQueue(first);
if (domain) domain.enter();
var threw = true;
threw = true;
debug('unreftimer firing timeout');
first._onTimeout();
threw = false;
if (hasQueue)
unloadAsyncQueue(first);
if (domain) domain.exit();
} finally {
if (threw) process.nextTick(unrefTimeout);
Expand Down
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@
'src/udp_wrap.cc',
'src/uv.cc',
# headers to make for a more pleasant IDE experience
'src/async-wrap.h',
'src/async-wrap-inl.h',
'src/env.h',
'src/env-inl.h',
'src/handle_wrap.h',
Expand Down
Loading

0 comments on commit efa62fd

Please sign in to comment.