Skip to content

Commit

Permalink
stream: pipeline with end option
Browse files Browse the repository at this point in the history
Currently pipeline cannot fully replace pipe due
to the missing end option. This PR adds the end
option to the promisified pipeline method.
  • Loading branch information
ronag committed Nov 19, 2021
1 parent 340b770 commit f3afed1
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
18 changes: 12 additions & 6 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async function* fromReadable(val) {
yield* Readable.prototype[SymbolAsyncIterator].call(val);
}

async function pump(iterable, writable, finish) {
async function pump(iterable, writable, finish, opts) {
let error;
let onresolve = null;

Expand Down Expand Up @@ -153,7 +153,9 @@ async function pump(iterable, writable, finish) {
}
}

writable.end();
if (opts !== false) {
writable.end();
}

await wait();

Expand Down Expand Up @@ -227,6 +229,7 @@ function pipelineImpl(streams, callback, opts) {
const stream = streams[i];
const reading = i < streams.length - 1;
const writing = i > 0;
const end = reading || opts?.end !== false;

if (isNodeStream(stream)) {
finishCount++;
Expand Down Expand Up @@ -282,14 +285,17 @@ function pipelineImpl(streams, callback, opts) {
then.call(ret,
(val) => {
value = val;
pt.end(val);
pt.write(val);
if (end) {
pt.end();
}
}, (err) => {
pt.destroy(err);
},
);
} else if (isIterable(ret, true)) {
finishCount++;
pump(ret, pt, finish);
pump(ret, pt, finish, { end });
} else {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable or Promise', 'destination', ret);
Expand All @@ -302,7 +308,7 @@ function pipelineImpl(streams, callback, opts) {
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);
ret.pipe(stream, { end });

// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
Expand All @@ -314,7 +320,7 @@ function pipelineImpl(streams, callback, opts) {
ret = makeAsyncIterable(ret);

finishCount++;
pump(ret, stream, finish);
pump(ret, stream, finish, { end });
}
ret = stream;
} else {
Expand Down
4 changes: 3 additions & 1 deletion lib/stream/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ const eos = require('internal/streams/end-of-stream');
function pipeline(...streams) {
return new Promise((resolve, reject) => {
let signal;
let end;
const lastArg = streams[streams.length - 1];
if (lastArg && typeof lastArg === 'object' &&
!isNodeStream(lastArg) && !isIterable(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
end = options.end;
}

pl(streams, (err, value) => {
Expand All @@ -29,7 +31,7 @@ function pipeline(...streams) {
} else {
resolve(value);
}
}, { signal });
}, { signal, end });
});
}

Expand Down

0 comments on commit f3afed1

Please sign in to comment.