Skip to content

Commit

Permalink
stream: pipeline with end option
Browse files Browse the repository at this point in the history
Currently pipeline cannot fully replace pipe due
to the missing end option. This PR adds the end
option to the promisified pipeline method.

PR-URL: #40886
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
ronag authored and danielleadams committed Feb 1, 2022
1 parent e092fde commit e110c96
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 17 deletions.
40 changes: 25 additions & 15 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async function* fromReadable(val) {
yield* Readable.prototype[SymbolAsyncIterator].call(val);
}

async function pump(iterable, writable, finish) {
async function pump(iterable, writable, finish, opts) {
let error;
let onresolve = null;

Expand Down Expand Up @@ -153,7 +153,9 @@ async function pump(iterable, writable, finish) {
}
}

writable.end();
if (opts?.end !== false) {
writable.end();
}

await wait();

Expand Down Expand Up @@ -227,17 +229,22 @@ function pipelineImpl(streams, callback, opts) {
const stream = streams[i];
const reading = i < streams.length - 1;
const writing = i > 0;
const end = reading || opts?.end !== false;

if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, (err) => {
if (!err && !reading && isReadableFinished(stream, false)) {
stream.read(0);
destroyer(stream, true, writing, finish);
} else {
finish(err);
}
}));
if (end) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, (err) => {
if (!err && !reading && isReadableFinished(stream, false)) {
stream.read(0);
destroyer(stream, true, writing, finish);
} else {
finish(err);
}
}));
} else {
stream.on('error', finish);
}
}

if (i === 0) {
Expand Down Expand Up @@ -282,14 +289,17 @@ function pipelineImpl(streams, callback, opts) {
then.call(ret,
(val) => {
value = val;
pt.end(val);
pt.write(val);
if (end) {
pt.end();
}
}, (err) => {
pt.destroy(err);
},
);
} else if (isIterable(ret, true)) {
finishCount++;
pump(ret, pt, finish);
pump(ret, pt, finish, { end });
} else {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable or Promise', 'destination', ret);
Expand All @@ -302,7 +312,7 @@ function pipelineImpl(streams, callback, opts) {
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);
ret.pipe(stream, { end });

// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
Expand All @@ -314,7 +324,7 @@ function pipelineImpl(streams, callback, opts) {
ret = makeAsyncIterable(ret);

finishCount++;
pump(ret, stream, finish);
pump(ret, stream, finish, { end });
}
ret = stream;
} else {
Expand Down
4 changes: 3 additions & 1 deletion lib/stream/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ const eos = require('internal/streams/end-of-stream');
function pipeline(...streams) {
return new Promise((resolve, reject) => {
let signal;
let end;
const lastArg = streams[streams.length - 1];
if (lastArg && typeof lastArg === 'object' &&
!isNodeStream(lastArg) && !isIterable(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
end = options.end;
}

pl(streams, (err, value) => {
Expand All @@ -29,7 +31,7 @@ function pipeline(...streams) {
} else {
resolve(value);
}
}, { signal });
}, { signal, end });
});
}

Expand Down
23 changes: 22 additions & 1 deletion test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1465,5 +1465,26 @@ const tsp = require('timers/promises');
assert.strictEqual(duplex.destroyed, true);
}

run();
run().then(common.mustCall());
}

{
const pipelinePromise = promisify(pipeline);

async function run() {
const read = new Readable({
read() {}
});

const duplex = new PassThrough();

read.push(null);

await pipelinePromise(read, duplex, { end: false });

assert.strictEqual(duplex.destroyed, false);
assert.strictEqual(duplex.writableEnded, false);
}

run().then(common.mustCall());
}

0 comments on commit e110c96

Please sign in to comment.