Skip to content

Commit

Permalink
stream: prevent 'end' to be emitted after 'error'
Browse files Browse the repository at this point in the history
This PR adds _readableState.errorEmitted and add the tracking of it.

Fixes: #6083

PR-URL: #20104
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Ruben Bridgewater <[email protected]>
Reviewed-By: Trivikram Kamat <[email protected]>
  • Loading branch information
mcollina committed Apr 20, 2018
1 parent 700344e commit 0857790
Show file tree
Hide file tree
Showing 17 changed files with 76 additions and 20 deletions.
16 changes: 11 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ function ReadableState(options, stream, isDuplex) {
this.endEmitted = false;
this.reading = false;

// Flipped if an 'error' is emitted.
this.errorEmitted = false;

// a flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
Expand Down Expand Up @@ -1069,20 +1072,23 @@ function fromList(n, state) {
function endReadable(stream) {
var state = stream._readableState;

debug('endReadable', state.endEmitted);
if (!state.endEmitted) {
debug('endReadable', state.endEmitted, state.errorEmitted);
if (!state.endEmitted && !state.errorEmitted) {
state.ended = true;
process.nextTick(endReadableNT, state, stream);
}
}

function endReadableNT(state, stream) {
debug('endReadableNT', state.endEmitted, state.length);
debug('endReadableNT', state.endEmitted, state.length, state.errorEmitted);

// Check that we didn't get one last unshift.
if (!state.endEmitted && state.length === 0) {
state.endEmitted = true;
stream.readable = false;
stream.emit('end');

if (!state.errorEmitted) {
state.endEmitted = true;
stream.emit('end');
}
}
}
10 changes: 10 additions & 0 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,22 @@ function onwriteError(stream, state, sync, er, cb) {
// this can emit finish, and it will always happen
// after error
process.nextTick(finishMaybe, stream, state);

// needed for duplex, fixes https://github.com/nodejs/node/issues/6083
if (stream._readableState) {
stream._readableState.errorEmitted = true;
}
stream._writableState.errorEmitted = true;
stream.emit('error', er);
} else {
// the caller expect this to happen before if
// it is async
cb(er);

// needed for duplex, fixes https://github.com/nodejs/node/issues/6083
if (stream._readableState) {
stream._readableState.errorEmitted = true;
}
stream._writableState.errorEmitted = true;
stream.emit('error', er);
// this can emit finish, but finish must
Expand Down
14 changes: 12 additions & 2 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ function destroy(err, cb) {
this._writableState.destroyed;

if (readableDestroyed || writableDestroyed) {
const readableErrored = this._readableState &&
this._readableState.errorEmitted;
const writableErrored = this._writableState &&
this._writableState.errorEmitted;

if (cb) {
cb(err);
} else if (err &&
(!this._writableState || !this._writableState.errorEmitted)) {
} else if (err && !readableErrored && !writableErrored) {
process.nextTick(emitErrorNT, this, err);
}
return this;
Expand All @@ -32,6 +36,11 @@ function destroy(err, cb) {
this._destroy(err || null, (err) => {
if (!cb && err) {
process.nextTick(emitErrorAndCloseNT, this, err);

if (this._readableState) {
this._readableState.errorEmitted = true;
}

if (this._writableState) {
this._writableState.errorEmitted = true;
}
Expand Down Expand Up @@ -65,6 +74,7 @@ function undestroy() {
this._readableState.reading = false;
this._readableState.ended = false;
this._readableState.endEmitted = false;
this._readableState.errorEmitted = false;
}

if (this._writableState) {
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-http2-client-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ const Countdown = require('../common/countdown');
});

req.resume();
req.on('end', common.mustCall());
req.on('close', common.mustCall(() => server.close()));
}));
}
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-http2-client-onconnect-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ function runTest(test) {
});
}

req.on('end', common.mustCall());
req.on('close', common.mustCall(() => {
client.destroy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,4 @@ server.listen(0, common.mustCall(() => {

req.on('response', common.mustNotCall());
req.resume();
req.on('end', common.mustCall());
}));
2 changes: 0 additions & 2 deletions test/parallel/test-http2-compat-serverresponse-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ server.listen(0, common.mustCall(() => {
req.on('close', common.mustCall(() => countdown.dec()));

req.resume();
req.on('end', common.mustCall());
}

{
Expand All @@ -78,6 +77,5 @@ server.listen(0, common.mustCall(() => {
req.on('close', common.mustCall(() => countdown.dec()));

req.resume();
req.on('end', common.mustCall());
}
}));
1 change: 0 additions & 1 deletion test/parallel/test-http2-max-concurrent-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ server.listen(0, common.mustCall(() => {
req.on('aborted', common.mustCall());
req.on('response', common.mustNotCall());
req.resume();
req.on('end', common.mustCall());
req.on('close', common.mustCall(() => countdown.dec()));
req.on('error', common.expectsError({
code: 'ERR_HTTP2_STREAM_ERROR',
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-http2-misused-pseudoheaders.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ server.listen(0, common.mustCall(() => {

req.on('response', common.mustCall());
req.resume();
req.on('end', common.mustCall());
req.on('close', common.mustCall(() => {
server.close();
client.close();
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-http2-multi-content-length.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ server.listen(0, common.mustCall(() => {
// header to be set for non-payload bearing requests...
const req = client.request({ 'content-length': 1 });
req.resume();
req.on('end', common.mustCall());
req.on('close', common.mustCall(() => countdown.dec()));
req.on('error', common.expectsError({
code: 'ERR_HTTP2_STREAM_ERROR',
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-respond-file-fd-invalid.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ server.listen(0, () => {
req.on('response', common.mustCall());
req.on('error', common.mustCall(errorCheck));
req.on('data', common.mustNotCall());
req.on('end', common.mustCall(() => {
req.on('close', common.mustCall(() => {
assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR);
client.close();
server.close();
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-respond-nghttperrors.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ function runTest(test) {
req.resume();
req.end();

req.on('end', common.mustCall(() => {
req.on('close', common.mustCall(() => {
client.close();

if (!tests.length) {
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-respond-with-fd-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ function runTest(test) {
req.resume();
req.end();

req.on('end', common.mustCall(() => {
req.on('close', common.mustCall(() => {
client.close();

if (!tests.length) {
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-server-shutdown-before-respond.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ server.on('listening', common.mustCall(() => {
}));
req.resume();
req.on('data', common.mustNotCall());
req.on('end', common.mustCall(() => server.close()));
req.on('close', common.mustCall(() => server.close()));
}));
1 change: 0 additions & 1 deletion test/parallel/test-http2-server-socket-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,4 @@ server.on('listening', common.mustCall(() => {

req.on('aborted', common.mustCall());
req.resume();
req.on('end', common.mustCall());
}));
24 changes: 24 additions & 0 deletions test/parallel/test-stream-duplex-error-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';

const common = require('../common');
const { Duplex } = require('stream');
const { strictEqual } = require('assert');

const duplex = new Duplex({
write(chunk, enc, cb) {
cb(new Error('kaboom'));
},
read() {
this.push(null);
}
});

duplex.on('error', common.mustCall(function() {
strictEqual(this._readableState.errorEmitted, true);
strictEqual(this._writableState.errorEmitted, true);
}));

duplex.on('end', common.mustNotCall());

duplex.end('hello');
duplex.resume();
15 changes: 15 additions & 0 deletions test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,18 @@ const { inherits } = require('util');
read.push('hi');
read.on('data', common.mustNotCall());
}

{
// double error case
const read = new Readable({
read() {}
});

read.on('close', common.mustCall());
read.on('error', common.mustCall());

read.destroy(new Error('kaboom 1'));
read.destroy(new Error('kaboom 2'));
assert.strictEqual(read._readableState.errorEmitted, true);
assert.strictEqual(read.destroyed, true);
}

0 comments on commit 0857790

Please sign in to comment.