diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 8f11e5db2ab7d4..69845a3f900219 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -494,7 +494,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest !== process.stdout && dest !== process.stderr; - var endFn = doEnd ? onend : cleanup; + var endFn = doEnd ? onend : unpipe; if (state.endEmitted) process.nextTick(endFn); else @@ -530,7 +530,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.removeListener('error', onerror); dest.removeListener('unpipe', onunpipe); src.removeListener('end', onend); - src.removeListener('end', cleanup); + src.removeListener('end', unpipe); src.removeListener('data', ondata); cleanedUp = true; diff --git a/test/parallel/test-stream-unpipe-event.js b/test/parallel/test-stream-unpipe-event.js new file mode 100644 index 00000000000000..a9e634a11a3092 --- /dev/null +++ b/test/parallel/test-stream-unpipe-event.js @@ -0,0 +1,87 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const {Writable, Readable} = require('stream'); +class NullWriteable extends Writable { + _write(chunk, encoding, callback) { + return callback(); + } +} +class QuickEndReadable extends Readable { + _read() { + this.push(null); + } +} +class NeverEndReadable extends Readable { + _read() {} +} + +function noop() {} + +{ + const dest = new NullWriteable(); + const src = new QuickEndReadable(); + dest.on('pipe', common.mustCall(noop)); + dest.on('unpipe', common.mustCall(noop)); + src.pipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall(noop)); + dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); + src.pipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 1); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall(noop)); + dest.on('unpipe', common.mustCall(noop)); + src.pipe(dest); + src.unpipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} + +{ + const dest = new NullWriteable(); + const src = new QuickEndReadable(); + dest.on('pipe', common.mustCall(noop)); + dest.on('unpipe', common.mustCall(noop)); + src.pipe(dest, {end: false}); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall(noop)); + dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); + src.pipe(dest, {end: false}); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 1); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall(noop)); + dest.on('unpipe', common.mustCall(noop)); + src.pipe(dest, {end: false}); + src.unpipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +}