Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make streams default pause mode apply for non data events (such as 'error') #39722

Closed
eyalroth opened this issue Aug 9, 2021 · 10 comments
Closed
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@eyalroth
Copy link

eyalroth commented Aug 9, 2021

Is your feature request related to a problem? Please describe.

I am trying build a pipeline of streams in conjunction with promises/async code in the midst of the orchestration of the pipeline:

const pipeline = util.promisify(stream.pipeline);

async function queryToFile(querySql, path) {
  // promise that resolves on first result from the DB or rejects on SQL error
  const queryResults = await dbClient.query(querySql).promiseFirstResult();
  const dbStream = queryResults.stream();

  const formattingStream = new stream.Transform(...);
  return pipeline(dbStream, formattingStream,  fs.createWriteStream(path));
}

This should work without a problem.

However, one might decide to split this function into multiple ones:

async function queryStream(querySql) {
  const queryResults = await dbClient.query(querySql).promiseFirstResult();
  return queryResults.stream();
}

async function writeStream(inputStream) {
  const formattingStream = new stream.Transform(...);
  return pipeline(inputStream, formattingStream,  fs.createWriteStream(path));
}

async function queryToFile(querySql, path) {
  const dbStream = queryStream(querySql);
  return writeStream(dbStream);
}

Now, though, the code isn't safe.

If I understand correctly, it could be that in between the time that the queryStream function produces a stream and the time that the writeStream function subscribes to said stream's events (via pipeline), there might be events emitted to the stream, since these functions do not run synchronically and so they make room for other functions -- such as IO -- to run in between.

This is fine for incoming data, as the stream begins in paused mode, buffering data until turning to flowing mode when the stream is piped by pipeline. In contrast, other events such as error or close are not buffered, and are emitted immediately as they are received, which means that the subscribing stream in writeStream will not be aware of them, which is a problem.

Describe the solution you'd like

I would love for the default paused mode -- i.e, buffering events until first "subscriber" -- to apply for non data events (at least the builtin ones). Should it be the default behavior or not is a separate question, but having an ability to provide an option to the stream at construction could prove extremely useful.

As a first step though, it might be helpful to add this scenario to the documentation on streams.

Describe alternatives you've considered

I tried a workaround where before returning a queue from an async function, I added listeners to these events to buffer them, and then overridden each of the event-registration methods (on, once, etc) to push events to any new subscriber of these events. I encountered two problems:

  1. Using pipeline, which in turn uses pipe, makes it so that if the stream has event listeners prior to the function, it will not propagate these evens further on (I guess it assumes that there's already a handler in place).
  2. The order of event registration by the pipeline/pipe method is not necessarily (and is often not) the same as the order in which the events were emitted; for instance, error is usually emitted before close, while the registration to these events is in the opposite order.
@targos targos added the stream Issues and PRs related to the stream subsystem. label Aug 9, 2021
@targos
Copy link
Member

targos commented Aug 9, 2021

@nodejs/streams

@eyalroth
Copy link
Author

eyalroth commented Aug 10, 2021

Adding a test case to replicate:

const stream = require('stream');
const chai = require('chai');

async function fork(f) {
  return new Promise((resolve, reject) => setTimeout(() => {
    try {
      resolve(f());
    } catch (e) {
      reject(e);
    }
  }, 0));
}

async function main() {
  const st1 = stream.PassThrough();
  const st2 = stream.PassThrough();
  const st3 = stream.PassThrough();

  const data = [];
  const errors = [[], []];

  await fork(() => {
    st1.write('a');
    st1.write('b');
    st1.write('c');
  });

  const pp1 = await fork(() => stream.pipeline(st1, st2, (err) => {
    if (err) {
      errors[0].push(err);
    }
  }));

  await fork(() => {
    st1.emit('error', 500);
    st1.emit('close', null); // can either send 'close' or invoke `end()`
    // st1.end();
  });

  const pp2 = await fork(() =>
    stream.pipeline(pp1, st3, (err) => {
      if (err) {
        errors[1].push(err);
      }
    })
  );
  pp2.on('data', (d) => data.push(d.toString()));

  await fork(() => {}); // do nothing and let the event loop process events

  chai.expect(data).to.deep.eq(['a', 'b', 'c']);
  chai.expect(errors[0]).to.deep.eq([500]);
  chai.expect(errors[1]).to.deep.eq([500]); // fails (error output of pipeline #2 is empty)
  console.log('success');
}

main.catch(console.log);

Trying to replace the last fork that does nothing with finished:

// await fork(() => {});

await new Promise(resolve => stream.finished(pp2, () => {
  console.log('this is never printed');
  resolve();
}));

Results in the process exiting without printing anything (not even the success message).

@benjamingr
Copy link
Member

Thanks for the report.

I'm really not sure I see the case where this is a problem?

Is the scenario:

  • You create a stream
  • You await something else related to it or otherwise introduce a microtick delay.
  • While awaiting (immediately or within a microtick) the stream emits an error.
  • Since streams are not values like promises that error is emitted globally and crashes the process instead of "buffer"ing?

If the stream emits error synchronously you won't catch it in either case - only if they emit it "in a microtask" you're precisely able to catch it in the first case and not the second. (I vaguely recall discussions about the timing of error events on streams).

I am not sure how that's different from the following scanerio:

  • You creating a promise by calling any async function
  • You await something else related to it or otherwise that introduces a delay
  • While awaiting the promise rejects
  • Since you did not attach a catch handler immediately the error is lost and the process crashes with an unhandled rejection

Can you elaborate on when you ran into this?

(As a note promises don't wait for I/O and pending promises don't block Node.js from exiting)
(I am in favour of adding documentation to always adding error handlers synchronously regardless of pipeline/compose)

@eyalroth
Copy link
Author

@benjamingr Thanks for the reply!

I am not sure how that's different from the following scanerio:

  • You creating a promise by calling any async function
  • You await something else related to it or otherwise that introduces a delay
  • While awaiting the promise rejects
  • Since you did not attach a catch handler immediately the error is lost and the process crashes with an unhandled rejection

I'm not sure I follow this scenario, as I don't see how this would happen with promises. For instance, here:

async function foo() {
  const responsePromise = callApi().promise();
  await longProcessing();

  try {
    await responsePromise;
  } catch { ... }
  // or
  responsePromise.catch(...);
}

I would expect to catch an error even if the responsePromise was rejected during the (async) execution of longProcessing. It's worth mentioning that my expectation is based mostly on experience with Java/Scala and not so much with NodeJS to which I'm relatively new.

According to this code it seems that my expectation is met:

async function lateErrorHandling() {
    const p = new Promise((resolve, reject) => setTimeout(reject('foo'), 0));
    await new Promise(resolve => setTimeout(resolve, 1000));
    try {
        await p;
        console.log('success');
    } catch (e) {
        console.log(`error: ${e}`);
    }
}

async function main() {
    console.log('start');
    await lateErrorHandling();
    console.log('end');
}

main().catch(console.log);

which prints:

start
error: foo
end

However, it does come with a deprecation warning "Unhandled promise rejections are deprecated" and so on (I'm using node 14.16.1), which seems very unfortunate to me.

Can you elaborate on when you ran into this?

In short, when trying to stream a result set from a mysql database to S3, while formatting and zipping the results in between the two.

I am using node-mysql2 as a client with their promise-wrapping API (mysql/promises), while implementing a custom workaround -- described here -- to produce a code similar to my initial (and a bit contrived) example in the original post (dbClient.query(querySql).promiseFirstResult()).

@mcollina
Copy link
Member

The "big" example could be simplified to the following:

const assert = require('assert')
const { PassThrough, pipeline } = require('stream')

const s1 = new PassThrough()
const s2 = new PassThrough()

s1.destroy()
s1.on('close', function () {
  let called = false
  pipeline(s1, s2, function (err) {
    // this function is never called
    called = true
    console.log(err)
  })

  setImmediate(function () {
    assert.strictEqual(called, true)
  })
})

The ask is for pipeline to destroy the other streams if the first one is destroyed.

(Note, the proper way to error a stream is to call stream.destroy(err) - there is quite a bit of machinery behind it and emitting error will not be equivalent).

@mcollina
Copy link
Member

wdyt @ronag?

@ronag
Copy link
Member

ronag commented Aug 10, 2021

I think this is already fixed on master?

@mcollina
Copy link
Member

I think this is already fixed on master?

Indeed it is! Which PR did you fix this in? Are we planning to backport?

@ronag
Copy link
Member

ronag commented Aug 10, 2021

#39235 semver-major

@eyalroth
Copy link
Author

eyalroth commented Aug 11, 2021

@mcollina Is this part of the nightly build? I just tried the both "tests" with v17.0.0-nightly202108113914354cd7 and they both fail.

My bad. Running from IntelliJ doesn't seem to use the right version (I used npm from the shell instead).

With the nightly build it seems that the "short" test case passes and prints:

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at new NodeError (node:internal/errors:371:5)
    at onclose (node:internal/streams/end-of-stream:122:30)
    at processTicksAndRejections (node:internal/process/task_queues:78:11) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}

(not sure if this is intended)

However, the "long" test case fails on the assertion of the data:

AssertionError: expected [] to deeply equal [ 'a', 'b', 'c' ]
    at streams3 (/Users/eyal/repos/sandbox-node/src/streams.js:57:29) {
  showDiff: true,
  actual: [],
  expected: [ 'a', 'b', 'c' ],
  operator: 'deepStrictEqual'
}

Removing this assertion and leaving only the error assertions passes the test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

5 participants