diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index eaba8ebfa8320d..10a0cf57e7789e 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -1,223 +1,3 @@ 'use strict'; -const assert = require('assert'); -const util = require('util'); -const { Socket } = require('net'); -const { JSStream } = process.binding('js_stream'); -const uv = process.binding('uv'); -const debug = util.debuglog('stream_wrap'); -const errors = require('internal/errors'); - -function StreamWrap(stream) { - const handle = new JSStream(); - - this.stream = stream; - - this._list = null; - - const self = this; - handle.close = function(cb) { - debug('close'); - self.doClose(cb); - }; - handle.isAlive = function() { - return self.isAlive(); - }; - handle.isClosing = function() { - return self.isClosing(); - }; - handle.onreadstart = function() { - return self.readStart(); - }; - handle.onreadstop = function() { - return self.readStop(); - }; - handle.onshutdown = function(req) { - return self.doShutdown(req); - }; - handle.onwrite = function(req, bufs) { - return self.doWrite(req, bufs); - }; - - this.stream.pause(); - this.stream.on('error', function onerror(err) { - self.emit('error', err); - }); - this.stream.on('data', function ondata(chunk) { - if (typeof chunk === 'string' || this._readableState.objectMode === true) { - // Make sure that no further `data` events will happen - this.pause(); - this.removeListener('data', ondata); - - self.emit('error', new errors.Error('ERR_STREAM_WRAP')); - return; - } - - debug('data', chunk.length); - if (self._handle) - self._handle.readBuffer(chunk); - }); - this.stream.once('end', function onend() { - debug('end'); - if (self._handle) - self._handle.emitEOF(); - }); - - Socket.call(this, { - handle: handle - }); -} -util.inherits(StreamWrap, Socket); -module.exports = StreamWrap; - -// require('_stream_wrap').StreamWrap -StreamWrap.StreamWrap = StreamWrap; - -StreamWrap.prototype.isAlive = function isAlive() { - return true; -}; - -StreamWrap.prototype.isClosing = function isClosing() { - return !this.readable || !this.writable; -}; - -StreamWrap.prototype.readStart = function readStart() { - this.stream.resume(); - return 0; -}; - -StreamWrap.prototype.readStop = function readStop() { - this.stream.pause(); - return 0; -}; - -StreamWrap.prototype.doShutdown = function doShutdown(req) { - const self = this; - const handle = this._handle; - const item = this._enqueue('shutdown', req); - - this.stream.end(function() { - // Ensure that write was dispatched - setImmediate(function() { - if (!self._dequeue(item)) - return; - - handle.finishShutdown(req, 0); - }); - }); - return 0; -}; - -StreamWrap.prototype.doWrite = function doWrite(req, bufs) { - const self = this; - const handle = self._handle; - - var pending = bufs.length; - - // Queue the request to be able to cancel it - const item = self._enqueue('write', req); - - self.stream.cork(); - for (var n = 0; n < bufs.length; n++) - self.stream.write(bufs[n], done); - self.stream.uncork(); - - function done(err) { - if (!err && --pending !== 0) - return; - - // Ensure that this is called once in case of error - pending = 0; - - let errCode = 0; - if (err) { - const code = uv[`UV_${err.code}`]; - errCode = (err.code && code) ? code : uv.UV_EPIPE; - } - - // Ensure that write was dispatched - setImmediate(function() { - // Do not invoke callback twice - if (!self._dequeue(item)) - return; - - handle.doAfterWrite(req); - handle.finishWrite(req, errCode); - }); - } - - return 0; -}; - -function QueueItem(type, req) { - this.type = type; - this.req = req; - this.prev = this; - this.next = this; -} - -StreamWrap.prototype._enqueue = function _enqueue(type, req) { - const item = new QueueItem(type, req); - if (this._list === null) { - this._list = item; - return item; - } - - item.next = this._list.next; - item.prev = this._list; - item.next.prev = item; - item.prev.next = item; - - return item; -}; - -StreamWrap.prototype._dequeue = function _dequeue(item) { - assert(item instanceof QueueItem); - - var next = item.next; - var prev = item.prev; - - if (next === null && prev === null) - return false; - - item.next = null; - item.prev = null; - - if (next === item) { - prev = null; - next = null; - } else { - prev.next = next; - next.prev = prev; - } - - if (this._list === item) - this._list = next; - - return true; -}; - -StreamWrap.prototype.doClose = function doClose(cb) { - const self = this; - const handle = self._handle; - - setImmediate(function() { - while (self._list !== null) { - const item = self._list; - const req = item.req; - self._dequeue(item); - - const errCode = uv.UV_ECANCELED; - if (item.type === 'write') { - handle.doAfterWrite(req); - handle.finishWrite(req, errCode); - } else if (item.type === 'shutdown') { - handle.finishShutdown(req, errCode); - } - } - - // Should be already set by net.js - assert(self._handle === null); - cb(); - }); -}; +module.exports = require('internal/wrap_js_stream'); diff --git a/lib/internal/wrap_js_stream.js b/lib/internal/wrap_js_stream.js new file mode 100644 index 00000000000000..eaba8ebfa8320d --- /dev/null +++ b/lib/internal/wrap_js_stream.js @@ -0,0 +1,223 @@ +'use strict'; + +const assert = require('assert'); +const util = require('util'); +const { Socket } = require('net'); +const { JSStream } = process.binding('js_stream'); +const uv = process.binding('uv'); +const debug = util.debuglog('stream_wrap'); +const errors = require('internal/errors'); + +function StreamWrap(stream) { + const handle = new JSStream(); + + this.stream = stream; + + this._list = null; + + const self = this; + handle.close = function(cb) { + debug('close'); + self.doClose(cb); + }; + handle.isAlive = function() { + return self.isAlive(); + }; + handle.isClosing = function() { + return self.isClosing(); + }; + handle.onreadstart = function() { + return self.readStart(); + }; + handle.onreadstop = function() { + return self.readStop(); + }; + handle.onshutdown = function(req) { + return self.doShutdown(req); + }; + handle.onwrite = function(req, bufs) { + return self.doWrite(req, bufs); + }; + + this.stream.pause(); + this.stream.on('error', function onerror(err) { + self.emit('error', err); + }); + this.stream.on('data', function ondata(chunk) { + if (typeof chunk === 'string' || this._readableState.objectMode === true) { + // Make sure that no further `data` events will happen + this.pause(); + this.removeListener('data', ondata); + + self.emit('error', new errors.Error('ERR_STREAM_WRAP')); + return; + } + + debug('data', chunk.length); + if (self._handle) + self._handle.readBuffer(chunk); + }); + this.stream.once('end', function onend() { + debug('end'); + if (self._handle) + self._handle.emitEOF(); + }); + + Socket.call(this, { + handle: handle + }); +} +util.inherits(StreamWrap, Socket); +module.exports = StreamWrap; + +// require('_stream_wrap').StreamWrap +StreamWrap.StreamWrap = StreamWrap; + +StreamWrap.prototype.isAlive = function isAlive() { + return true; +}; + +StreamWrap.prototype.isClosing = function isClosing() { + return !this.readable || !this.writable; +}; + +StreamWrap.prototype.readStart = function readStart() { + this.stream.resume(); + return 0; +}; + +StreamWrap.prototype.readStop = function readStop() { + this.stream.pause(); + return 0; +}; + +StreamWrap.prototype.doShutdown = function doShutdown(req) { + const self = this; + const handle = this._handle; + const item = this._enqueue('shutdown', req); + + this.stream.end(function() { + // Ensure that write was dispatched + setImmediate(function() { + if (!self._dequeue(item)) + return; + + handle.finishShutdown(req, 0); + }); + }); + return 0; +}; + +StreamWrap.prototype.doWrite = function doWrite(req, bufs) { + const self = this; + const handle = self._handle; + + var pending = bufs.length; + + // Queue the request to be able to cancel it + const item = self._enqueue('write', req); + + self.stream.cork(); + for (var n = 0; n < bufs.length; n++) + self.stream.write(bufs[n], done); + self.stream.uncork(); + + function done(err) { + if (!err && --pending !== 0) + return; + + // Ensure that this is called once in case of error + pending = 0; + + let errCode = 0; + if (err) { + const code = uv[`UV_${err.code}`]; + errCode = (err.code && code) ? code : uv.UV_EPIPE; + } + + // Ensure that write was dispatched + setImmediate(function() { + // Do not invoke callback twice + if (!self._dequeue(item)) + return; + + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); + }); + } + + return 0; +}; + +function QueueItem(type, req) { + this.type = type; + this.req = req; + this.prev = this; + this.next = this; +} + +StreamWrap.prototype._enqueue = function _enqueue(type, req) { + const item = new QueueItem(type, req); + if (this._list === null) { + this._list = item; + return item; + } + + item.next = this._list.next; + item.prev = this._list; + item.next.prev = item; + item.prev.next = item; + + return item; +}; + +StreamWrap.prototype._dequeue = function _dequeue(item) { + assert(item instanceof QueueItem); + + var next = item.next; + var prev = item.prev; + + if (next === null && prev === null) + return false; + + item.next = null; + item.prev = null; + + if (next === item) { + prev = null; + next = null; + } else { + prev.next = next; + next.prev = prev; + } + + if (this._list === item) + this._list = next; + + return true; +}; + +StreamWrap.prototype.doClose = function doClose(cb) { + const self = this; + const handle = self._handle; + + setImmediate(function() { + while (self._list !== null) { + const item = self._list; + const req = item.req; + self._dequeue(item); + + const errCode = uv.UV_ECANCELED; + if (item.type === 'write') { + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); + } else if (item.type === 'shutdown') { + handle.finishShutdown(req, errCode); + } + } + + // Should be already set by net.js + assert(self._handle === null); + cb(); + }); +}; diff --git a/node.gyp b/node.gyp index 66949aa3b8bb5a..4b9e50e6dce307 100644 --- a/node.gyp +++ b/node.gyp @@ -133,6 +133,7 @@ 'lib/internal/streams/BufferList.js', 'lib/internal/streams/legacy.js', 'lib/internal/streams/destroy.js', + 'lib/internal/wrap_js_stream.js', 'deps/v8/tools/splaytree.js', 'deps/v8/tools/codemap.js', 'deps/v8/tools/consarray.js',