Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: pipe to a closed or destroyed stream should not be allowed in pipeline #53241

Merged
merged 2 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -2621,6 +2621,12 @@ or a pipeline ends non gracefully with no explicit error.
An attempt was made to call [`stream.push()`][] after a `null`(EOF) had been
pushed to the stream.

<a id="ERR_STREAM_UNABLE_TO_PIPE"></a>

### `ERR_STREAM_UNABLE_TO_PIPE`

An attempt was made to pipe to a closed or destroyed stream in a pipeline.

<a id="ERR_STREAM_UNSHIFT_AFTER_END_EVENT"></a>

### `ERR_STREAM_UNSHIFT_AFTER_END_EVENT`
Expand Down
1 change: 1 addition & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,7 @@ E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_UNABLE_TO_PIPE', 'Connot pipe to a closed or destroyed stream', Error);
E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
'stream.unshift() after end event', Error);
E('ERR_STREAM_WRAP', 'Stream has StringDecoder set or is in objectMode', Error);
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED,
ERR_STREAM_PREMATURE_CLOSE,
ERR_STREAM_UNABLE_TO_PIPE,
},
} = require('internal/errors');

Expand Down Expand Up @@ -253,10 +254,15 @@ function pipelineImpl(streams, callback, opts) {
const stream = streams[i];
const reading = i < streams.length - 1;
const writing = i > 0;
const next = i + 1 < streams.length ? streams[i + 1] : null;
const end = reading || opts?.end !== false;
const isLastStream = i === streams.length - 1;

if (isNodeStream(stream)) {
if (next !== null && (next?.closed || next?.destroyed)) {
throw new ERR_STREAM_UNABLE_TO_PIPE();
}

if (end) {
const { destroy, cleanup } = destroyer(stream, reading, writing);
destroys.push(destroy);
Expand Down
13 changes: 13 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const http = require('http');
const { promisify } = require('util');
const net = require('net');
const tsp = require('timers/promises');
const tmpdir = require('../common/tmpdir');
const fs = require('fs');

{
let finished = false;
Expand Down Expand Up @@ -69,6 +71,17 @@ const tsp = require('timers/promises');
}, /ERR_INVALID_ARG_TYPE/);
}

tmpdir.refresh();
{
assert.rejects(async () => {
const read = fs.createReadStream(__filename);
const write = fs.createWriteStream(tmpdir.resolve('a'));
const close = promisify(write.close);
await close.call(write);
await pipelinep(read, write);
}, /ERR_STREAM_UNABLE_TO_PIPE/).then(common.mustCall());
}

{
const read = new Readable({
read() {}
Expand Down