diff --git a/doc/api/timers.md b/doc/api/timers.md index 09502dee1003c8..13f2dea37d9d23 100644 --- a/doc/api/timers.md +++ b/doc/api/timers.md @@ -18,6 +18,38 @@ This object is created internally and is returned from [`setImmediate()`][]. It can be passed to [`clearImmediate()`][] in order to cancel the scheduled actions. +By default, when an immediate is scheduled, the Node.js event loop will continue +running as long as the immediate is active. The `Immediate` object returned by +[`setImmediate()`][] exports both `immediate.ref()` and `immediate.unref()` +functions that can be used to control this default behavior. + +### immediate.ref() + + +When called, requests that the Node.js event loop *not* exit so long as the +`Immediate` is active. Calling `immediate.ref()` multiple times will have no +effect. + +*Note*: By default, all `Immediate` objects are "ref'd", making it normally +unnecessary to call `immediate.ref()` unless `immediate.unref()` had been called +previously. + +Returns a reference to the `Immediate`. + +### immediate.unref() + + +When called, the active `Immediate` object will not require the Node.js event +loop to remain active. If there is no other activity keeping the event loop +running, the process may exit before the `Immediate` object's callback is +invoked. Calling `immediate.unref()` multiple times will have no effect. + +Returns a reference to the `Immediate`. + ## Class: Timeout This object is created internally and is returned from [`setTimeout()`][] and diff --git a/lib/internal/bootstrap_node.js b/lib/internal/bootstrap_node.js index 85d72f876124fa..06bed3faaa4870 100644 --- a/lib/internal/bootstrap_node.js +++ b/lib/internal/bootstrap_node.js @@ -409,6 +409,8 @@ } } + function noop() {} + function setupProcessFatal() { const async_wrap = process.binding('async_wrap'); // Arrays containing hook flags and ids for async_hook calls. @@ -419,23 +421,15 @@ kDefaultTriggerAsyncId, kStackLength } = async_wrap.constants; process._fatalException = function(er) { - var caught; - // It's possible that kDefaultTriggerAsyncId was set for a constructor // call that threw and was never cleared. So clear it now. async_id_fields[kDefaultTriggerAsyncId] = -1; if (exceptionHandlerState.captureFn !== null) { exceptionHandlerState.captureFn(er); - caught = true; - } - - if (!caught) - caught = process.emit('uncaughtException', er); - - // If someone handled it, then great. otherwise, die in C++ land - // since that means that we'll exit the process, emit the 'exit' event - if (!caught) { + } else if (!process.emit('uncaughtException', er)) { + // If someone handled it, then great. otherwise, die in C++ land + // since that means that we'll exit the process, emit the 'exit' event try { if (!process._exiting) { process._exiting = true; @@ -444,24 +438,25 @@ } catch (er) { // nothing to be done about it at this point. } + return false; + } + // If we handled an error, then make sure any ticks get processed + // by ensuring that the next Immediate cycle isn't empty + NativeModule.require('timers').setImmediate(noop); + + // Emit the after() hooks now that the exception has been handled. + if (async_hook_fields[kAfter] > 0) { + const { emitAfter } = NativeModule.require('internal/async_hooks'); + do { + emitAfter(async_id_fields[kExecutionAsyncId]); + } while (async_hook_fields[kStackLength] > 0); + // Or completely empty the id stack. } else { - // If we handled an error, then make sure any ticks get processed - NativeModule.require('timers').setImmediate(process._tickCallback); - - // Emit the after() hooks now that the exception has been handled. - if (async_hook_fields[kAfter] > 0) { - do { - NativeModule.require('internal/async_hooks').emitAfter( - async_id_fields[kExecutionAsyncId]); - } while (async_hook_fields[kStackLength] > 0); - // Or completely empty the id stack. - } else { - clearAsyncIdStack(); - } + clearAsyncIdStack(); } - return caught; + return true; }; } @@ -634,7 +629,7 @@ }; NativeModule.wrapper = [ - '(function (exports, require, module, internalBinding) {', + '(function (exports, require, module, internalBinding, process) {', '\n});' ]; @@ -650,7 +645,7 @@ lineOffset: 0, displayErrors: true }); - fn(this.exports, NativeModule.require, this, internalBinding); + fn(this.exports, NativeModule.require, this, internalBinding, process); this.loaded = true; } finally { diff --git a/lib/internal/process/next_tick.js b/lib/internal/process/next_tick.js index bf7d0bc94dc4ce..9481bebd224be6 100644 --- a/lib/internal/process/next_tick.js +++ b/lib/internal/process/next_tick.js @@ -1,47 +1,9 @@ 'use strict'; -// This value is used to prevent the nextTickQueue from becoming too -// large and cause the process to run out of memory. When this value -// is reached the nextTimeQueue array will be shortened (see tickDone -// for details). -const kMaxCallbacksPerLoop = 1e4; - exports.setup = setupNextTick; // Will be overwritten when setupNextTick() is called. exports.nextTick = null; -class NextTickQueue { - constructor() { - this.head = null; - this.tail = null; - } - - push(v) { - const entry = { data: v, next: null }; - if (this.tail !== null) - this.tail.next = entry; - else - this.head = entry; - this.tail = entry; - } - - shift() { - if (this.head === null) - return; - const ret = this.head.data; - if (this.head === this.tail) - this.head = this.tail = null; - else - this.head = this.head.next; - return ret; - } - - clear() { - this.head = null; - this.tail = null; - } -} - function setupNextTick() { const async_wrap = process.binding('async_wrap'); const async_hooks = require('internal/async_hooks'); @@ -56,15 +18,47 @@ function setupNextTick() { // Grab the constants necessary for working with internal arrays. const { kInit, kDestroy, kAsyncIdCounter } = async_wrap.constants; const { async_id_symbol, trigger_async_id_symbol } = async_wrap; - const nextTickQueue = new NextTickQueue(); - var microtasksScheduled = false; - // Used to run V8's micro task queue. - var _runMicrotasks = {}; + // tickInfo is used so that the C++ code in src/node.cc can + // have easy access to our nextTick state, and avoid unnecessary + // calls into JS land. + // runMicrotasks is used to run V8's micro task queue. + const [ + tickInfo, + runMicrotasks + ] = process._setupNextTick(_tickCallback); // *Must* match Environment::TickInfo::Fields in src/env.h. - var kIndex = 0; - var kLength = 1; + const kHasScheduled = 0; + + const nextTickQueue = { + head: null, + tail: null, + push(data) { + const entry = { data, next: null }; + if (this.tail !== null) { + this.tail.next = entry; + } else { + this.head = entry; + tickInfo[kHasScheduled] = 1; + } + this.tail = entry; + }, + shift() { + if (this.head === null) + return; + const ret = this.head.data; + if (this.head === this.tail) { + this.head = this.tail = null; + tickInfo[kHasScheduled] = 0; + } else { + this.head = this.head.next; + } + return ret; + } + }; + + var microtasksScheduled = false; process.nextTick = nextTick; // Needs to be accessible from beyond this scope. @@ -73,25 +67,6 @@ function setupNextTick() { // Set the nextTick() function for internal usage. exports.nextTick = internalNextTick; - // This tickInfo thing is used so that the C++ code in src/node.cc - // can have easy access to our nextTick state, and avoid unnecessary - // calls into JS land. - const tickInfo = process._setupNextTick(_tickCallback, _runMicrotasks); - - _runMicrotasks = _runMicrotasks.runMicrotasks; - - function tickDone() { - if (tickInfo[kLength] !== 0) { - if (tickInfo[kLength] <= tickInfo[kIndex]) { - nextTickQueue.clear(); - tickInfo[kLength] = 0; - } else { - tickInfo[kLength] -= tickInfo[kIndex]; - } - } - tickInfo[kIndex] = 0; - } - const microTasksTickObject = { callback: runMicrotasksCallback, args: undefined, @@ -105,38 +80,27 @@ function setupNextTick() { // For the moment all microtasks come from the void until the PromiseHook // API is implemented. nextTickQueue.push(microTasksTickObject); - - tickInfo[kLength]++; microtasksScheduled = true; } function runMicrotasksCallback() { microtasksScheduled = false; - _runMicrotasks(); + runMicrotasks(); - if (tickInfo[kIndex] < tickInfo[kLength] || - emitPendingUnhandledRejections()) { + if (nextTickQueue.head !== null || emitPendingUnhandledRejections()) scheduleMicrotasks(); - } } function _tickCallback() { + let tock; do { - while (tickInfo[kIndex] < tickInfo[kLength]) { - ++tickInfo[kIndex]; - const tock = nextTickQueue.shift(); - - // CHECK(Number.isSafeInteger(tock[async_id_symbol])) - // CHECK(tock[async_id_symbol] > 0) - // CHECK(Number.isSafeInteger(tock[trigger_async_id_symbol])) - // CHECK(tock[trigger_async_id_symbol] > 0) - + while (tock = nextTickQueue.shift()) { const asyncId = tock[async_id_symbol]; emitBefore(asyncId, tock[trigger_async_id_symbol]); // emitDestroy() places the async_id_symbol into an asynchronous queue // that calls the destroy callback in the future. It's called before // calling tock.callback so destroy will be called even if the callback - // throws an exception that is handles by 'uncaughtException' or a + // throws an exception that is handled by 'uncaughtException' or a // domain. // TODO(trevnorris): This is a bit of a hack. It relies on the fact // that nextTick() doesn't allow the event loop to proceed, but if @@ -152,24 +116,21 @@ function setupNextTick() { Reflect.apply(callback, undefined, tock.args); emitAfter(asyncId); - - if (kMaxCallbacksPerLoop < tickInfo[kIndex]) - tickDone(); } - tickDone(); - _runMicrotasks(); + runMicrotasks(); emitPendingUnhandledRejections(); - } while (tickInfo[kLength] !== 0); + } while (nextTickQueue.head !== null); } class TickObject { - constructor(callback, args, asyncId, triggerAsyncId) { + constructor(callback, args, triggerAsyncId) { // this must be set to null first to avoid function tracking // on the hidden class, revisit in V8 versions after 6.2 this.callback = null; this.callback = callback; this.args = args; + const asyncId = ++async_id_fields[kAsyncIdCounter]; this[async_id_symbol] = asyncId; this[trigger_async_id_symbol] = triggerAsyncId; @@ -203,13 +164,7 @@ function setupNextTick() { args[i - 1] = arguments[i]; } - // In V8 6.2, moving tickInfo & async_id_fields[kAsyncIdCounter] into the - // TickObject incurs a significant performance penalty in the - // next-tick-breadth-args benchmark (revisit later) - ++tickInfo[kLength]; - nextTickQueue.push(new TickObject(callback, - args, - ++async_id_fields[kAsyncIdCounter], + nextTickQueue.push(new TickObject(callback, args, getDefaultTriggerAsyncId())); } @@ -238,13 +193,6 @@ function setupNextTick() { if (triggerAsyncId === null) triggerAsyncId = getDefaultTriggerAsyncId(); - // In V8 6.2, moving tickInfo & async_id_fields[kAsyncIdCounter] into the - // TickObject incurs a significant performance penalty in the - // next-tick-breadth-args benchmark (revisit later) - ++tickInfo[kLength]; - nextTickQueue.push(new TickObject(callback, - args, - ++async_id_fields[kAsyncIdCounter], - triggerAsyncId)); + nextTickQueue.push(new TickObject(callback, args, triggerAsyncId)); } } diff --git a/lib/timers.js b/lib/timers.js index 43d2cbbd07bdb3..46cd770fc643bd 100644 --- a/lib/timers.js +++ b/lib/timers.js @@ -22,7 +22,10 @@ 'use strict'; const async_wrap = process.binding('async_wrap'); -const TimerWrap = process.binding('timer_wrap').Timer; +const { + Timer: TimerWrap, + setImmediateCallback, +} = process.binding('timer_wrap'); const L = require('internal/linkedlist'); const internalUtil = require('internal/util'); const { createPromise, promiseResolve } = process.binding('util'); @@ -47,12 +50,15 @@ const { kInit, kDestroy, kAsyncIdCounter } = async_wrap.constants; const async_id_symbol = Symbol('asyncId'); const trigger_async_id_symbol = Symbol('triggerAsyncId'); -/* This is an Uint32Array for easier sharing with C++ land. */ -const scheduledImmediateCount = process._scheduledImmediateCount; -delete process._scheduledImmediateCount; -/* Kick off setImmediate processing */ -const activateImmediateCheck = process._activateImmediateCheck; -delete process._activateImmediateCheck; +// *Must* match Environment::ImmediateInfo::Fields in src/env.h. +const kCount = 0; +const kRefCount = 1; +const kHasOutstanding = 2; + +const [immediateInfo, toggleImmediateRef] = + setImmediateCallback(processImmediate); + +const kRefed = Symbol('refed'); // Timeout values > TIMEOUT_MAX are set to 1. const TIMEOUT_MAX = 2 ** 31 - 1; @@ -675,76 +681,76 @@ ImmediateList.prototype.remove = function(item) { }; // Create a single linked list instance only once at startup -var immediateQueue = new ImmediateList(); +const immediateQueue = new ImmediateList(); + +// If an uncaught exception was thrown during execution of immediateQueue, +// this queue will store all remaining Immediates that need to run upon +// resolution of all error handling (if process is still alive). +const outstandingQueue = new ImmediateList(); function processImmediate() { - var immediate = immediateQueue.head; - var tail = immediateQueue.tail; + const queue = outstandingQueue.head !== null ? + outstandingQueue : immediateQueue; + var immediate = queue.head; + const tail = queue.tail; // Clear the linked list early in case new `setImmediate()` calls occur while // immediate callbacks are executed - immediateQueue.head = immediateQueue.tail = null; + queue.head = queue.tail = null; + + let count = 0; + let refCount = 0; while (immediate !== null) { - if (!immediate._onImmediate) { - immediate = immediate._idleNext; - continue; - } + immediate._destroyed = true; + + const asyncId = immediate[async_id_symbol]; + emitBefore(asyncId, immediate[trigger_async_id_symbol]); - // Save next in case `clearImmediate(immediate)` is called from callback - var next = immediate._idleNext; + count++; + if (immediate[kRefed]) + refCount++; + immediate[kRefed] = undefined; - tryOnImmediate(immediate, tail); + tryOnImmediate(immediate, tail, count, refCount); - // If `clearImmediate(immediate)` wasn't called from the callback, use the - // `immediate`'s next item - if (immediate._idleNext !== null) - immediate = immediate._idleNext; - else - immediate = next; + emitAfter(asyncId); + + immediate = immediate._idleNext; } -} -process._immediateCallback = processImmediate; + immediateInfo[kCount] -= count; + immediateInfo[kRefCount] -= refCount; + immediateInfo[kHasOutstanding] = 0; +} // An optimization so that the try/finally only de-optimizes (since at least v8 // 4.7) what is in this smaller function. -function tryOnImmediate(immediate, oldTail) { +function tryOnImmediate(immediate, oldTail, count, refCount) { var threw = true; - emitBefore(immediate[async_id_symbol], immediate[trigger_async_id_symbol]); try { // make the actual call outside the try/finally to allow it to be optimized runCallback(immediate); threw = false; } finally { immediate._onImmediate = null; - if (!threw) - emitAfter(immediate[async_id_symbol]); - - if (!immediate._destroyed) { - immediate._destroyed = true; - scheduledImmediateCount[0]--; - if (async_hook_fields[kDestroy] > 0) { - emitDestroy(immediate[async_id_symbol]); - } + if (async_hook_fields[kDestroy] > 0) { + emitDestroy(immediate[async_id_symbol]); } - if (threw && immediate._idleNext !== null) { - // Handle any remaining on next tick, assuming we're still alive to do so. - const curHead = immediateQueue.head; - const next = immediate._idleNext; - if (curHead !== null) { - curHead._idlePrev = oldTail; - oldTail._idleNext = curHead; - next._idlePrev = null; - immediateQueue.head = next; - } else { - immediateQueue.head = next; - immediateQueue.tail = oldTail; + if (threw) { + immediateInfo[kCount] -= count; + immediateInfo[kRefCount] -= refCount; + + if (immediate._idleNext !== null) { + // Handle any remaining Immediates after error handling has resolved, + // assuming we're still alive to do so. + outstandingQueue.head = immediate._idleNext; + outstandingQueue.tail = oldTail; + immediateInfo[kHasOutstanding] = 1; } - process.nextTick(processImmediate); } } } @@ -759,31 +765,51 @@ function runCallback(timer) { } -function Immediate(callback, args) { - this._idleNext = null; - this._idlePrev = null; - // this must be set to null first to avoid function tracking - // on the hidden class, revisit in V8 versions after 6.2 - this._onImmediate = null; - this._onImmediate = callback; - this._argv = args; - this._destroyed = false; +const Immediate = class Immediate { + constructor(callback, args) { + this._idleNext = null; + this._idlePrev = null; + // this must be set to null first to avoid function tracking + // on the hidden class, revisit in V8 versions after 6.2 + this._onImmediate = null; + this._onImmediate = callback; + this._argv = args; + this._destroyed = false; + this[kRefed] = false; + + this[async_id_symbol] = ++async_id_fields[kAsyncIdCounter]; + this[trigger_async_id_symbol] = getDefaultTriggerAsyncId(); + if (async_hook_fields[kInit] > 0) { + emitInit(this[async_id_symbol], + 'Immediate', + this[trigger_async_id_symbol], + this); + } - this[async_id_symbol] = ++async_id_fields[kAsyncIdCounter]; - this[trigger_async_id_symbol] = getDefaultTriggerAsyncId(); - if (async_hook_fields[kInit] > 0) { - emitInit(this[async_id_symbol], - 'Immediate', - this[trigger_async_id_symbol], - this); + this.ref(); + immediateInfo[kCount]++; + + immediateQueue.append(this); } - if (scheduledImmediateCount[0] === 0) - activateImmediateCheck(); - scheduledImmediateCount[0]++; + ref() { + if (this[kRefed] === false) { + this[kRefed] = true; + if (immediateInfo[kRefCount]++ === 0) + toggleImmediateRef(true); + } + return this; + } - immediateQueue.append(this); -} + unref() { + if (this[kRefed] === true) { + this[kRefed] = false; + if (--immediateInfo[kRefCount] === 0) + toggleImmediateRef(false); + } + return this; + } +}; function setImmediate(callback, arg1, arg2, arg3) { if (typeof callback !== 'function') { @@ -823,15 +849,18 @@ exports.setImmediate = setImmediate; exports.clearImmediate = function(immediate) { - if (!immediate) return; + if (!immediate || immediate._destroyed) + return; - if (!immediate._destroyed) { - scheduledImmediateCount[0]--; - immediate._destroyed = true; + immediateInfo[kCount]--; + immediate._destroyed = true; - if (async_hook_fields[kDestroy] > 0) { - emitDestroy(immediate[async_id_symbol]); - } + if (immediate[kRefed] && --immediateInfo[kRefCount] === 0) + toggleImmediateRef(false); + immediate[kRefed] = undefined; + + if (async_hook_fields[kDestroy] > 0) { + emitDestroy(immediate[async_id_symbol]); } immediate._onImmediate = null; diff --git a/src/env-inl.h b/src/env-inl.h index bf919644dfbe49..37d1cf172ea14b 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -217,29 +217,51 @@ inline bool Environment::AsyncCallbackScope::in_makecallback() const { return env_->makecallback_cntr_ > 1; } -inline Environment::TickInfo::TickInfo() { - for (int i = 0; i < kFieldsCount; ++i) - fields_[i] = 0; -} +inline Environment::ImmediateInfo::ImmediateInfo(v8::Isolate* isolate) + : fields_(isolate, kFieldsCount) {} -inline uint32_t* Environment::TickInfo::fields() { +inline AliasedBuffer& + Environment::ImmediateInfo::fields() { return fields_; } -inline int Environment::TickInfo::fields_count() const { - return kFieldsCount; +inline uint32_t Environment::ImmediateInfo::count() const { + return fields_[kCount]; +} + +inline uint32_t Environment::ImmediateInfo::ref_count() const { + return fields_[kRefCount]; +} + +inline bool Environment::ImmediateInfo::has_outstanding() const { + return fields_[kHasOutstanding] == 1; +} + +inline void Environment::ImmediateInfo::count_inc(uint32_t increment) { + fields_[kCount] = fields_[kCount] + increment; } -inline uint32_t Environment::TickInfo::index() const { - return fields_[kIndex]; +inline void Environment::ImmediateInfo::count_dec(uint32_t decrement) { + fields_[kCount] = fields_[kCount] - decrement; } -inline uint32_t Environment::TickInfo::length() const { - return fields_[kLength]; +inline void Environment::ImmediateInfo::ref_count_inc(uint32_t increment) { + fields_[kRefCount] = fields_[kRefCount] + increment; } -inline void Environment::TickInfo::set_index(uint32_t value) { - fields_[kIndex] = value; +inline void Environment::ImmediateInfo::ref_count_dec(uint32_t decrement) { + fields_[kRefCount] = fields_[kRefCount] - decrement; +} + +inline Environment::TickInfo::TickInfo(v8::Isolate* isolate) + : fields_(isolate, kFieldsCount) {} + +inline AliasedBuffer& Environment::TickInfo::fields() { + return fields_; +} + +inline bool Environment::TickInfo::has_scheduled() const { + return fields_[kHasScheduled] == 1; } inline void Environment::AssignToContext(v8::Local context, @@ -277,13 +299,14 @@ inline Environment::Environment(IsolateData* isolate_data, v8::Local context) : isolate_(context->GetIsolate()), isolate_data_(isolate_data), + immediate_info_(context->GetIsolate()), + tick_info_(context->GetIsolate()), timer_base_(uv_now(isolate_data->event_loop())), printed_error_(false), trace_sync_io_(false), abort_on_uncaught_exception_(false), emit_napi_warning_(true), makecallback_cntr_(0), - scheduled_immediate_count_(isolate_, 1), should_abort_on_uncaught_toggle_(isolate_, 1), #if HAVE_INSPECTOR inspector_agent_(new inspector::Agent(this)), @@ -370,6 +393,10 @@ inline Environment::AsyncHooks* Environment::async_hooks() { return &async_hooks_; } +inline Environment::ImmediateInfo* Environment::immediate_info() { + return &immediate_info_; +} + inline Environment::TickInfo* Environment::tick_info() { return &tick_info_; } @@ -499,23 +526,34 @@ inline void Environment::set_fs_stats_field_array(double* fields) { fs_stats_field_array_ = fields; } -inline AliasedBuffer& -Environment::scheduled_immediate_count() { - return scheduled_immediate_count_; -} - -void Environment::SetImmediate(native_immediate_callback cb, +void Environment::CreateImmediate(native_immediate_callback cb, void* data, - v8::Local obj) { + v8::Local obj, + bool ref) { native_immediate_callbacks_.push_back({ cb, data, - std::unique_ptr>( - obj.IsEmpty() ? nullptr : new v8::Persistent(isolate_, obj)) + std::unique_ptr>(obj.IsEmpty() ? + nullptr : new v8::Persistent(isolate_, obj)), + ref }); - if (scheduled_immediate_count_[0] == 0) - ActivateImmediateCheck(); - scheduled_immediate_count_[0] = scheduled_immediate_count_[0] + 1; + immediate_info()->count_inc(1); +} + +void Environment::SetImmediate(native_immediate_callback cb, + void* data, + v8::Local obj) { + CreateImmediate(cb, data, obj, true); + + if (immediate_info()->ref_count() == 0) + ToggleImmediateRef(true); + immediate_info()->ref_count_inc(1); +} + +void Environment::SetUnrefImmediate(native_immediate_callback cb, + void* data, + v8::Local obj) { + CreateImmediate(cb, data, obj, false); } inline performance::performance_state* Environment::performance_state() { diff --git a/src/env.cc b/src/env.cc index 902429e18a7e74..17cdbbb79f9af1 100644 --- a/src/env.cc +++ b/src/env.cc @@ -80,6 +80,8 @@ void Environment::Start(int argc, uv_idle_init(event_loop(), immediate_idle_handle()); + uv_check_start(immediate_check_handle(), CheckImmediate); + // Inform V8's CPU profiler when we're idle. The profiler is sampling-based // but not all samples are created equal; mark the wall clock time spent in // epoll_wait() and friends so profiling tools can filter it out. The samples @@ -272,55 +274,57 @@ void Environment::EnvPromiseHook(v8::PromiseHookType type, void Environment::RunAndClearNativeImmediates() { size_t count = native_immediate_callbacks_.size(); if (count > 0) { + size_t ref_count = 0; std::vector list; native_immediate_callbacks_.swap(list); for (const auto& cb : list) { cb.cb_(this, cb.data_); if (cb.keep_alive_) cb.keep_alive_->Reset(); + if (cb.refed_) + ref_count++; } #ifdef DEBUG - CHECK_GE(scheduled_immediate_count_[0], count); + CHECK_GE(immediate_info()->count(), count); #endif - scheduled_immediate_count_[0] = scheduled_immediate_count_[0] - count; - } -} - -static bool MaybeStopImmediate(Environment* env) { - if (env->scheduled_immediate_count()[0] == 0) { - uv_check_stop(env->immediate_check_handle()); - uv_idle_stop(env->immediate_idle_handle()); - return true; + immediate_info()->count_dec(count); + immediate_info()->ref_count_dec(ref_count); } - return false; } void Environment::CheckImmediate(uv_check_t* handle) { Environment* env = Environment::from_immediate_check_handle(handle); - HandleScope scope(env->isolate()); - Context::Scope context_scope(env->context()); - if (MaybeStopImmediate(env)) + if (env->immediate_info()->count() == 0) return; - env->RunAndClearNativeImmediates(); + HandleScope scope(env->isolate()); + Context::Scope context_scope(env->context()); - MakeCallback(env->isolate(), - env->process_object(), - env->immediate_callback_string(), - 0, - nullptr, - {0, 0}).ToLocalChecked(); + env->RunAndClearNativeImmediates(); - MaybeStopImmediate(env); + do { + MakeCallback(env->isolate(), + env->process_object(), + env->immediate_callback_function(), + 0, + nullptr, + {0, 0}).ToLocalChecked(); + } while (env->immediate_info()->has_outstanding()); + + if (env->immediate_info()->ref_count() == 0) + env->ToggleImmediateRef(false); } -void Environment::ActivateImmediateCheck() { - uv_check_start(&immediate_check_handle_, CheckImmediate); - // Idle handle is needed only to stop the event loop from blocking in poll. - uv_idle_start(&immediate_idle_handle_, [](uv_idle_t*){ }); +void Environment::ToggleImmediateRef(bool ref) { + if (ref) { + // Idle handle is needed only to stop the event loop from blocking in poll. + uv_idle_start(immediate_idle_handle(), [](uv_idle_t*){ }); + } else { + uv_idle_stop(immediate_idle_handle()); + } } void Environment::AsyncHooks::grow_async_ids_stack() { diff --git a/src/env.h b/src/env.h index 5ebd56a3fff633..a5991cff0804e1 100644 --- a/src/env.h +++ b/src/env.h @@ -158,7 +158,6 @@ class ModuleWrap; V(homedir_string, "homedir") \ V(hostmaster_string, "hostmaster") \ V(ignore_string, "ignore") \ - V(immediate_callback_string, "_immediateCallback") \ V(infoaccess_string, "infoAccess") \ V(inherit_string, "inherit") \ V(input_string, "input") \ @@ -288,6 +287,7 @@ class ModuleWrap; V(http2ping_constructor_template, v8::ObjectTemplate) \ V(http2stream_constructor_template, v8::ObjectTemplate) \ V(http2settings_constructor_template, v8::ObjectTemplate) \ + V(immediate_callback_function, v8::Function) \ V(inspector_console_api_object, v8::Object) \ V(pbkdf2_constructor_template, v8::ObjectTemplate) \ V(pipe_constructor_template, v8::FunctionTemplate) \ @@ -450,25 +450,50 @@ class Environment { DISALLOW_COPY_AND_ASSIGN(AsyncCallbackScope); }; + class ImmediateInfo { + public: + inline AliasedBuffer& fields(); + inline uint32_t count() const; + inline uint32_t ref_count() const; + inline bool has_outstanding() const; + + inline void count_inc(uint32_t increment); + inline void count_dec(uint32_t decrement); + + inline void ref_count_inc(uint32_t increment); + inline void ref_count_dec(uint32_t decrement); + + private: + friend class Environment; // So we can call the constructor. + inline explicit ImmediateInfo(v8::Isolate* isolate); + + enum Fields { + kCount, + kRefCount, + kHasOutstanding, + kFieldsCount + }; + + AliasedBuffer fields_; + + DISALLOW_COPY_AND_ASSIGN(ImmediateInfo); + }; + class TickInfo { public: - inline uint32_t* fields(); - inline int fields_count() const; - inline uint32_t index() const; - inline uint32_t length() const; - inline void set_index(uint32_t value); + inline AliasedBuffer& fields(); + inline bool has_scheduled() const; private: friend class Environment; // So we can call the constructor. - inline TickInfo(); + inline explicit TickInfo(v8::Isolate* isolate); enum Fields { - kIndex, - kLength, + kHasScheduled, kFieldsCount }; - uint32_t fields_[kFieldsCount]; + AliasedBuffer fields_; DISALLOW_COPY_AND_ASSIGN(TickInfo); }; @@ -533,6 +558,7 @@ class Environment { inline void FinishHandleCleanup(uv_handle_t* handle); inline AsyncHooks* async_hooks(); + inline ImmediateInfo* immediate_info(); inline TickInfo* tick_info(); inline uint64_t timer_base() const; @@ -580,8 +606,6 @@ class Environment { inline double* fs_stats_field_array() const; inline void set_fs_stats_field_array(double* fields); - inline AliasedBuffer& scheduled_immediate_count(); - inline performance::performance_state* performance_state(); inline std::map* performance_marks(); @@ -660,8 +684,12 @@ class Environment { inline void SetImmediate(native_immediate_callback cb, void* data, v8::Local obj = v8::Local()); + inline void SetUnrefImmediate(native_immediate_callback cb, + void* data, + v8::Local obj = + v8::Local()); // This needs to be available for the JS-land setImmediate(). - void ActivateImmediateCheck(); + void ToggleImmediateRef(bool ref); class ShouldNotAbortOnUncaughtScope { public: @@ -678,6 +706,11 @@ class Environment { static inline Environment* ForAsyncHooks(AsyncHooks* hooks); private: + inline void CreateImmediate(native_immediate_callback cb, + void* data, + v8::Local obj, + bool ref); + inline void ThrowError(v8::Local (*fun)(v8::Local), const char* errmsg); @@ -689,6 +722,7 @@ class Environment { uv_check_t idle_check_handle_; AsyncHooks async_hooks_; + ImmediateInfo immediate_info_; TickInfo tick_info_; const uint64_t timer_base_; bool printed_error_; @@ -698,7 +732,6 @@ class Environment { size_t makecallback_cntr_; std::vector destroy_async_id_list_; - AliasedBuffer scheduled_immediate_count_; AliasedBuffer should_abort_on_uncaught_toggle_; int should_not_abort_scope_counter_ = 0; @@ -741,6 +774,7 @@ class Environment { native_immediate_callback cb_; void* data_; std::unique_ptr> keep_alive_; + bool refed_; }; std::vector native_immediate_callbacks_; void RunAndClearNativeImmediates(); diff --git a/src/node.cc b/src/node.cc index 0a466d20c90d88..f63865e5d78918 100644 --- a/src/node.cc +++ b/src/node.cc @@ -1144,25 +1144,25 @@ void SetupNextTick(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); CHECK(args[0]->IsFunction()); - CHECK(args[1]->IsObject()); env->set_tick_callback_function(args[0].As()); - env->SetMethod(args[1].As(), "runMicrotasks", RunMicrotasks); - - // Do a little housekeeping. env->process_object()->Delete( env->context(), - FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupNextTick")).FromJust(); + FIXED_ONE_BYTE_STRING(env->isolate(), "_setupNextTick")).FromJust(); - // Values use to cross communicate with processNextTick. - uint32_t* const fields = env->tick_info()->fields(); - uint32_t const fields_count = env->tick_info()->fields_count(); + v8::Local run_microtasks_fn = + env->NewFunctionTemplate(RunMicrotasks)->GetFunction(env->context()) + .ToLocalChecked(); + run_microtasks_fn->SetName( + FIXED_ONE_BYTE_STRING(env->isolate(), "runMicrotasks")); - Local array_buffer = - ArrayBuffer::New(env->isolate(), fields, sizeof(*fields) * fields_count); + Local ret = Array::New(env->isolate(), 2); + ret->Set(env->context(), 0, + env->tick_info()->fields().GetJSArray()).FromJust(); + ret->Set(env->context(), 1, run_microtasks_fn).FromJust(); - args.GetReturnValue().Set(Uint32Array::New(array_buffer, 0, fields_count)); + args.GetReturnValue().Set(ret); } void PromiseRejectCallback(PromiseRejectMessage message) { @@ -1278,7 +1278,7 @@ void InternalCallbackScope::Close() { Environment::TickInfo* tick_info = env_->tick_info(); - if (tick_info->length() == 0) { + if (!tick_info->has_scheduled()) { env_->isolate()->RunMicrotasks(); } @@ -1289,10 +1289,7 @@ void InternalCallbackScope::Close() { CHECK_EQ(env_->trigger_async_id(), 0); } - Local process = env_->process_object(); - - if (tick_info->length() == 0) { - tick_info->set_index(0); + if (!tick_info->has_scheduled()) { return; } @@ -1301,6 +1298,8 @@ void InternalCallbackScope::Close() { CHECK_EQ(env_->trigger_async_id(), 0); } + Local process = env_->process_object(); + if (env_->tick_callback_function()->Call(process, 0, nullptr).IsEmpty()) { failed_ = true; } @@ -3162,12 +3161,6 @@ static void DebugEnd(const FunctionCallbackInfo& args); namespace { -void ActivateImmediateCheck(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - env->ActivateImmediateCheck(); -} - - void StartProfilerIdleNotifier(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); env->StartProfilerIdleNotifier(); @@ -3392,12 +3385,6 @@ void SetupProcessObject(Environment* env, FIXED_ONE_BYTE_STRING(env->isolate(), "ppid"), GetParentProcessId).FromJust()); - auto scheduled_immediate_count = - FIXED_ONE_BYTE_STRING(env->isolate(), "_scheduledImmediateCount"); - CHECK(process->Set(env->context(), - scheduled_immediate_count, - env->scheduled_immediate_count().GetJSArray()).FromJust()); - auto should_abort_on_uncaught_toggle = FIXED_ONE_BYTE_STRING(env->isolate(), "_shouldAbortOnUncaughtToggle"); CHECK(process->Set(env->context(), @@ -3529,9 +3516,6 @@ void SetupProcessObject(Environment* env, env->as_external()).FromJust()); // define various internal methods - env->SetMethod(process, - "_activateImmediateCheck", - ActivateImmediateCheck); env->SetMethod(process, "_startProfilerIdleNotifier", StartProfilerIdleNotifier); diff --git a/src/node_perf.cc b/src/node_perf.cc index 38dbdaca5adce1..48b8d02b79facc 100644 --- a/src/node_perf.cc +++ b/src/node_perf.cc @@ -183,9 +183,8 @@ void SetupPerformanceObservers(const FunctionCallbackInfo& args) { } // Creates a GC Performance Entry and passes it to observers -void PerformanceGCCallback(uv_async_t* handle) { - GCPerformanceEntry* entry = static_cast(handle->data); - Environment* env = entry->env(); +void PerformanceGCCallback(Environment* env, void* ptr) { + GCPerformanceEntry* entry = static_cast(ptr); HandleScope scope(env->isolate()); Local context = env->context(); @@ -203,10 +202,6 @@ void PerformanceGCCallback(uv_async_t* handle) { } delete entry; - auto closeCB = [](uv_handle_t* handle) { - delete reinterpret_cast(handle); - }; - uv_close(reinterpret_cast(handle), closeCB); } // Marks the start of a GC cycle @@ -223,16 +218,13 @@ void MarkGarbageCollectionEnd(Isolate* isolate, v8::GCCallbackFlags flags, void* data) { Environment* env = static_cast(data); - uv_async_t* async = new uv_async_t(); - if (uv_async_init(env->event_loop(), async, PerformanceGCCallback)) - return delete async; - uv_unref(reinterpret_cast(async)); - async->data = + GCPerformanceEntry* entry = new GCPerformanceEntry(env, static_cast(type), performance_last_gc_start_mark_, PERFORMANCE_NOW()); - CHECK_EQ(0, uv_async_send(async)); + env->SetUnrefImmediate(PerformanceGCCallback, + entry); } diff --git a/src/timer_wrap.cc b/src/timer_wrap.cc index 874c80d8d7095b..1725cf30e71d04 100644 --- a/src/timer_wrap.cc +++ b/src/timer_wrap.cc @@ -29,7 +29,9 @@ namespace node { namespace { +using v8::Array; using v8::Context; +using v8::Function; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; using v8::HandleScope; @@ -67,11 +69,33 @@ class TimerWrap : public HandleWrap { env->SetProtoMethod(constructor, "stop", Stop); target->Set(timerString, constructor->GetFunction()); + + target->Set(env->context(), + FIXED_ONE_BYTE_STRING(env->isolate(), "setImmediateCallback"), + env->NewFunctionTemplate(SetImmediateCallback) + ->GetFunction(env->context()).ToLocalChecked()).FromJust(); } size_t self_size() const override { return sizeof(*this); } private: + static void SetImmediateCallback(const FunctionCallbackInfo& args) { + CHECK(args[0]->IsFunction()); + auto env = Environment::GetCurrent(args); + env->set_immediate_callback_function(args[0].As()); + auto toggle_ref_cb = [] (const FunctionCallbackInfo& args) { + Environment::GetCurrent(args)->ToggleImmediateRef(args[0]->IsTrue()); + }; + auto toggle_ref_function = + env->NewFunctionTemplate(toggle_ref_cb)->GetFunction(env->context()) + .ToLocalChecked(); + auto result = Array::New(env->isolate(), 2); + result->Set(env->context(), 0, + env->immediate_info()->fields().GetJSArray()).FromJust(); + result->Set(env->context(), 1, toggle_ref_function).FromJust(); + args.GetReturnValue().Set(result); + } + static void New(const FunctionCallbackInfo& args) { // This constructor should not be exposed to public javascript. // Therefore we assert that we are not trying to call this as a diff --git a/test/addons-napi/test_uv_loop/test_uv_loop.cc b/test/addons-napi/test_uv_loop/test_uv_loop.cc index 44819f72bb6b9d..048e25af9ddfb3 100644 --- a/test/addons-napi/test_uv_loop/test_uv_loop.cc +++ b/test/addons-napi/test_uv_loop/test_uv_loop.cc @@ -24,6 +24,15 @@ void* SetImmediate(napi_env env, T&& cb) { assert(cb() != nullptr); }); + // Idle handle is needed only to stop the event loop from blocking in poll. + uv_idle_t* idle = new uv_idle_t; + uv_idle_init(loop, idle); + uv_idle_start(idle, [](uv_idle_t* idle) { + uv_close(reinterpret_cast(idle), [](uv_handle_t* handle) { + delete reinterpret_cast(handle); + }); + }); + return nullptr; } diff --git a/test/common/index.js b/test/common/index.js index 30b6aca88e05e3..b5973b1821bb5f 100644 --- a/test/common/index.js +++ b/test/common/index.js @@ -21,6 +21,7 @@ /* eslint-disable required-modules, crypto-check */ 'use strict'; +const process = global.process; // Some tests tamper with the process global. const path = require('path'); const fs = require('fs'); const assert = require('assert'); diff --git a/test/parallel/test-process-fatal-exception-tick.js b/test/parallel/test-process-fatal-exception-tick.js new file mode 100644 index 00000000000000..19678d2922516e --- /dev/null +++ b/test/parallel/test-process-fatal-exception-tick.js @@ -0,0 +1,24 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); + +// If a process encounters an uncaughtException, it should schedule +// processing of nextTicks on the next Immediates cycle but not +// before all Immediates are handled + +let stage = 0; + +process.once('uncaughtException', common.expectsError({ + type: Error, + message: 'caughtException' +})); + +setImmediate(() => { + stage++; + process.nextTick(() => assert.strictEqual(stage, 2)); +}); +const now = Date.now(); +setTimeout(() => setImmediate(() => stage++), 1); +while (now + 10 >= Date.now()); +throw new Error('caughtException'); diff --git a/test/parallel/test-repl-let-process.js b/test/parallel/test-repl-let-process.js new file mode 100644 index 00000000000000..3e6c3e85665be1 --- /dev/null +++ b/test/parallel/test-repl-let-process.js @@ -0,0 +1,10 @@ +'use strict'; +const common = require('../common'); +const repl = require('repl'); + +common.globalCheck = false; + +// Regression test for https://github.com/nodejs/node/issues/6802 +const input = new common.ArrayStream(); +repl.start({ input, output: process.stdout, useGlobal: true }); +input.run(['let process']); diff --git a/test/parallel/test-timer-immediate.js b/test/parallel/test-timer-immediate.js new file mode 100644 index 00000000000000..385fa4baca4ee5 --- /dev/null +++ b/test/parallel/test-timer-immediate.js @@ -0,0 +1,5 @@ +'use strict'; +const common = require('../common'); +common.globalCheck = false; +global.process = {}; // Boom! +setImmediate(common.mustCall()); diff --git a/test/parallel/test-timers-immediate-queue-throw.js b/test/parallel/test-timers-immediate-queue-throw.js new file mode 100644 index 00000000000000..9929b27ab2fea3 --- /dev/null +++ b/test/parallel/test-timers-immediate-queue-throw.js @@ -0,0 +1,53 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const domain = require('domain'); + +// setImmediate should run clear its queued cbs once per event loop turn +// but immediates queued while processing the current queue should happen +// on the next turn of the event loop. + +// In addition, if any setImmediate throws, the rest of the queue should +// be processed after all error handling is resolved, but that queue +// should not include any setImmediate calls scheduled after the +// processing of the queue started. + +let threw = false; +let stage = -1; + +const QUEUE = 10; + +const errObj = { + type: Error, + message: 'setImmediate Err' +}; + +process.once('uncaughtException', common.expectsError(errObj)); +process.once('uncaughtException', () => assert.strictEqual(stage, 0)); + +const d1 = domain.create(); +d1.once('error', common.expectsError(errObj)); +d1.once('error', () => assert.strictEqual(stage, 0)); + +const run = common.mustCall((callStage) => { + assert(callStage >= stage); + stage = callStage; + if (threw) + return; + + setImmediate(run, 2); +}, QUEUE * 3); + +for (let i = 0; i < QUEUE; i++) + setImmediate(run, 0); +setImmediate(() => { + threw = true; + process.nextTick(() => assert.strictEqual(stage, 1)); + throw new Error('setImmediate Err'); +}); +d1.run(() => setImmediate(() => { + throw new Error('setImmediate Err'); +})); +for (let i = 0; i < QUEUE; i++) + setImmediate(run, 1); diff --git a/test/parallel/test-timers-immediate-unref-simple.js b/test/parallel/test-timers-immediate-unref-simple.js new file mode 100644 index 00000000000000..68497460328c32 --- /dev/null +++ b/test/parallel/test-timers-immediate-unref-simple.js @@ -0,0 +1,7 @@ +'use strict'; + +const common = require('../common'); + +// This immediate should not execute as it was unrefed +// and nothing else is keeping the event loop alive +setImmediate(common.mustNotCall()).unref(); diff --git a/test/parallel/test-timers-immediate-unref.js b/test/parallel/test-timers-immediate-unref.js new file mode 100644 index 00000000000000..5b56eb7e1d8e5b --- /dev/null +++ b/test/parallel/test-timers-immediate-unref.js @@ -0,0 +1,37 @@ +'use strict'; + +const common = require('../common'); +const Countdown = require('../common/countdown'); + +// This immediate should execute as it was unrefed and refed again. +// It also confirms that unref/ref are chainable. +setImmediate(common.mustCall(firstStep)).ref().unref().unref().ref(); + +function firstStep() { + const countdown = + new Countdown(2, common.mustCall(() => setImmediate(secondStep))); + // Unrefed setImmediate executes if it was unrefed but something else keeps + // the loop open + setImmediate(() => countdown.dec()).unref(); + setTimeout(() => countdown.dec(), 50); +} + +function secondStep() { + // clearImmediate works just fine with unref'd immediates + const immA = setImmediate(() => { + clearImmediate(immA); + clearImmediate(immB); + // this should not keep the event loop open indefinitely + // or do anything else weird + immA.ref(); + immB.ref(); + }).unref(); + const immB = setImmediate(common.mustNotCall()).unref(); + setImmediate(common.mustCall(finalStep)); +} + +function finalStep() { + // This immediate should not execute as it was unrefed + // and nothing else is keeping the event loop alive + setImmediate(common.mustNotCall()).unref(); +}