Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplex.from({ writable, readable }) breaks on backpressure #44925

Closed
pavelhoral opened this issue Oct 8, 2022 · 20 comments
Closed

Duplex.from({ writable, readable }) breaks on backpressure #44925

pavelhoral opened this issue Oct 8, 2022 · 20 comments
Labels
confirmed-bug Issues with confirmed bugs. stream Issues and PRs related to the stream subsystem.

Comments

@pavelhoral
Copy link
Contributor

pavelhoral commented Oct 8, 2022

Version

v18.10.0

Platform

Microsoft Windows NT 10.0.19044.0 x64

Subsystem

No response

What steps will reproduce the bug?

I have created repository with failing test case - /~https://github.com/pavelhoral/node-duplexify-issue that can be cloned and run:

import { Duplex, PassThrough, Readable, Transform } from 'node:stream';

describe('Duplex.from({ writable, readable })', () => {

  it('flushes stream after filling buffer', async () => {
    // Simple pass-through as a placeholder for more complex setup
    const through = new PassThrough({ objectMode: true });

    // Stream prepared values, pipe through simple duplex and async transformer for backpressure
    const stream = Readable.from(['foo', 'bar'], { objectMode: true })
      .pipe(Duplex.from({
        writable: through,
        readable: through
      }))
      .pipe(new Transform({
        objectMode: true,
        highWaterMark: 1, // Setting 1 to force backpressure after a single item
        transform(chunk, encoding, callback) {
          setTimeout(() => callback(null, chunk), 0);
        }
      }));

    // This never finishes when high water mark is reached
    const result = await stream.toArray();

    expect(result).toEqual(['foo', 'bar']);
  });

});

How often does it reproduce? Is there a required condition?

The issue happens every time in my test case when the internal buffers are full and read must be paused (I guess this condition must happen - /~https://github.com/nodejs/node/blob/v18.10.0/lib/internal/streams/duplexify.js#L350).

What is the expected behavior?

Stream should correctly finish processing after read buffers are free again.

What do you see instead?

Duplex stream never correctly resumes its operation after the read has to pause.

Additional information

No response

@pavelhoral
Copy link
Contributor Author

Maybe a better issue title would be Duplex.from({ writable, readable }) breaks on backpressure?

@VoltrexKeyva VoltrexKeyva added the stream Issues and PRs related to the stream subsystem. label Oct 8, 2022
@xtx1130
Copy link
Contributor

xtx1130 commented Oct 9, 2022

In objectMode, highWaterMark default set to 16, please see: /~https://github.com/nodejs/node/blob/v18.10.0/lib/internal/streams/state.js#L15-L17

The stream will temporarily stop reading data from the underlying resource, and toArray method is an async function, so nodejs will consider the promise is not resolved yet, and exit with code 13. You can set:

new Transform({
        objectMode: true,
        emitClose: false,
        highWaterMark:100,
        ...

to avoid this.

@rluvaton
Copy link
Member

rluvaton commented Oct 9, 2022

@xtx1130 this sounds like a bug, no?

@bnoordhuis
Copy link
Member

You're calling await stream.toArray() inside an async function - i.e., a promise - but what awaits that promise? Try excluding vitest and using only built-in modules.

@pavelhoral
Copy link
Contributor Author

pavelhoral commented Oct 9, 2022

I think you are missing the point. It is not about a specific highWaterMark number, it is about resuming the stream after the buffer limit is reached (btw. I have updated the test in the original comment).

And vitest and toArray here are also irrelevant - the stream never correctly resumes after backpressure occurs. That is the reason the promise is never resolved. If you change hightWaterMark so that the backpressure is never an issue the test passes.

@pavelhoral
Copy link
Contributor Author

pavelhoral commented Oct 9, 2022

Without vitest and promises (I expect to see foo and bar in the console, but get only foo):

const { PassThrough, Duplex, Transform, Readable } = require('node:stream');

// Simple pass-through as a placeholder for more complex setup
const through = new PassThrough({ objectMode: true });

// Stream prepared values, pipe through simple duplex and async transformer for backpressure
Readable.from(['foo', 'bar'], { objectMode: true })
  .pipe(Duplex.from({
    writable: through,
    readable: through
  }))
  .pipe(new Transform({
    objectMode: true,
    highWaterMark: 1, // Setting 1 to force backpressure after a single item
    transform(chunk, encoding, callback) {
      setTimeout(() => callback(null, chunk), 0);
    }
  }))
  .on('data', chunk => console.log(chunk));

@pavelhoral pavelhoral changed the title Duplex.from({ writable, readable }) breaks when the internal buffers are full Duplex.from({ writable, readable }) breaks on backpressure Oct 9, 2022
@climba03003
Copy link
Contributor

Seems like setTimeout inside the transform break something.
Changing setTimeout to process.nextTick works perfectly without error.

@targos
Copy link
Member

targos commented Oct 11, 2022

@nodejs/streams

@pavelhoral
Copy link
Contributor Author

pavelhoral commented Oct 11, 2022

Tried to clone NodeJS repository to write a test for this and found out there are other things broken :(. First thing I did was to change this line to make some existing test fail and start from there

assert.strictEqual(ret, 'abcdefghi');
and the test did not start failing :(. I guess this is off-topic for this particular issue but wanted to write a comment here.

@ronag
Copy link
Member

ronag commented Oct 11, 2022

I'm a little low on time atm. But if someone can improve tests (i.e. add failing tests), then I could try to have a quick look at this.

@climba03003
Copy link
Contributor

climba03003 commented Oct 11, 2022

Interesting, if I implement the logic with new Duplex also works fine with setTimeout.
It can be further narrow down to Duplex.from is not compatible to setTimeout?

import { Duplex, PassThrough, Readable, Transform } from 'node:stream';

// Hold node process for 3s to see result
setTimeout(() => {}, 2999)

// Simple pass-through as a placeholder for more complex setup
const through = new PassThrough({
  objectMode: true, 
  highWaterMark: 1,
  transform(chunk, encoding, callback) { 
    console.log('passthrough', chunk)
    callback(null, chunk)
  }
});

// Self implement of duplex
const duplex = new Duplex({
  readableObjectMode: true,
  writableObjectMode: true,
  read(size) {
    return this.push(passthrough.read(size))
  },
  write(chunk, encoding, callback) {
    passthrough.write(chunk, encoding, callback)
  }
})

// Stream prepared values, pipe through simple duplex and async transformer for backpressure
Readable.from(['foo', 'bar', 'baz'])
  // working with self implemented duplex
  .pipe(duplex)
  // not working with Duplex.from
  // .pipe(Duplex.from({
  //   writable: passthrough,
  //   readable: passthrough
  // }))
  .pipe(new Transform({
    objectMode: true,
    highWaterMark: 1, // Setting 1 to force backpressure after a single item
    transform(chunk, encoding, callback) {
      console.log('transform', chunk)
      // setTimeout is not working with Duplex.from
      setTimeout(() => {
        callback(null, chunk)
      }, 100);
    }
  }))
  .on('data', (chunk) => { console.log('onData', chunk) })

The duplexify logic is too complicated and hard to follow. Hope my finding give some insight to the others to troubleshoot deeper.

@mcollina mcollina added the confirmed-bug Issues with confirmed bugs. label Oct 11, 2022
@pavelhoral
Copy link
Contributor Author

pavelhoral commented Oct 11, 2022

I have added failing test here - /~https://github.com/pavelhoral/node/tree/duplex-issue / pavelhoral@69c7752. The interesting thing is that it fails even without forcing the asynchronous processing (i.e. setTimeout), which I thought was significant part of the issue. So I am not that sure about anything anymore 🗡️

cc @ronag

P.S.: there is another issue in tests there #44925 (comment) that should be fixed - pavelhoral@d5c069b

@climba03003
Copy link
Contributor

climba03003 commented Oct 11, 2022

The interesting thing is that it fails even without forcing the asynchronous processing

toArray is actually asynchronous processing. It using async iterators and returns a Promise.
That means whenever it comes to next event cycle, it will be failed.

@xtx1130
Copy link
Contributor

xtx1130 commented Oct 11, 2022

I think this is not associated with toArray, either use Promise or nextTick can not reproduce the problem. And I have found the difference between use setTimeout and do not use: /~https://github.com/nodejs/node/blob/main/lib/internal/streams/transform.js#L181-L184
Using setTimeout will first trigger this.push(val), but val is null and second is foo.
But if you do not use setTimeout, the first is foo and second is bar. I'm trying to find more info, but the logic is too complicated

@climba03003
Copy link
Contributor

climba03003 commented Oct 11, 2022

I think this is not associated with toArray.

Yes, it should be when the value is resolved or pushed in the next event cycle. Then, the stream do not process remaining.
The symptoms is using readable.toArray which is async iterators and setTimeout.
Unlike the above two, process.nextTick is resolved in the current event cycle. So, it does cause any problem.

@climba03003
Copy link
Contributor

climba03003 commented Oct 11, 2022

@xtx1130

Removed all the unnecessary code. It should be small enough to see the symptoms.

Async Iterators

import { Duplex, PassThrough, Readable } from 'node:stream';

const passthrough = new PassThrough({ objectMode: true });

const stream = Readable.from(['foo', 'bar', 'baz'])
  .pipe(Duplex.from({
    writable: passthrough,
    readable: passthrough
  }))
  .pipe(new PassThrough({ highWaterMark: 1 }))

for await (const chunk of stream) {
    console.log('async iterator', chunk)
}

setTimeout, setImmediate

import { Duplex, PassThrough, Readable, Transform } from 'node:stream';

const passthrough = new PassThrough({ objectMode: true });

Readable.from(['foo', 'bar', 'baz'])
  .pipe(Duplex.from({
    writable: passthrough,
    readable: passthrough
  }))
  .pipe(new Transform({
    highWaterMark: 1,
    transform(chunk, encoding, callback) {
      console.log('transform', chunk)
      // either one
      setTimeout(() => callback(null, chunk), 0)
      // setImmediate(() => callback(null, chunk))
    }
  }))

@xtx1130
Copy link
Contributor

xtx1130 commented Oct 11, 2022

@climba03003 highWaterMark param is needed I think.

   // ...
  .pipe(new Transform({
    highWaterMark: 1,
    transform(chunk, encoding, callback) {
      // ...
    }
  }))

@climba03003
Copy link
Contributor

@climba03003 highWaterMark param is needed I think.

   // ...
  .pipe(new Transform({
    highWaterMark: 1,
    transform(chunk, encoding, callback) {
      // ...
    }
  }))

@xtx1130 Updated

@pavelhoral
Copy link
Contributor Author

pavelhoral commented Oct 14, 2022

Does this have anything to do with the observed behaviour?

// a "bug" where we check needDrain before calling _write and not after.

Maybe the issue has to do more with Transform / PassThrough than Duplex.from / Duplexify?

@ronag
Copy link
Member

ronag commented Oct 23, 2022

Simpler tes:

  const through = new PassThrough({ objectMode: true });

  let res = '';
  const d = Readable.from(['foo', 'bar'], { objectMode: true })
    .pipe(Duplex.from({
      writable: through,
      readable: through
    }));

  d.on('data', (data) => {
    d.pause();
    process.nextTick(() => {
      process.nextTick(() => {
        d.resume()
      });
    });
    res += data;
  }).on('end', common.mustCall(() => {
    assert.strictEqual(res, 'foobar');
  }));

Seems related to pause, tick, tick and resume... somehow... note there has to be 2 ticks before resume.

ronag added a commit to nxtedition/node that referenced this issue Oct 23, 2022
The duplexified Duplex should be autoDestroyed instead of
prematurely destroyed when the readable and writable sides
have finished without error.

Fixes: nodejs#44925
RafaelGSS pushed a commit that referenced this issue Nov 1, 2022
The duplexified Duplex should be autoDestroyed instead of
prematurely destroyed when the readable and writable sides
have finished without error.

Fixes: #44925

PR-URL: #45133
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
RafaelGSS pushed a commit that referenced this issue Nov 10, 2022
The duplexified Duplex should be autoDestroyed instead of
prematurely destroyed when the readable and writable sides
have finished without error.

Fixes: #44925

PR-URL: #45133
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
danielleadams pushed a commit that referenced this issue Dec 30, 2022
The duplexified Duplex should be autoDestroyed instead of
prematurely destroyed when the readable and writable sides
have finished without error.

Fixes: #44925

PR-URL: #45133
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
danielleadams pushed a commit that referenced this issue Dec 30, 2022
The duplexified Duplex should be autoDestroyed instead of
prematurely destroyed when the readable and writable sides
have finished without error.

Fixes: #44925

PR-URL: #45133
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
danielleadams pushed a commit that referenced this issue Jan 3, 2023
The duplexified Duplex should be autoDestroyed instead of
prematurely destroyed when the readable and writable sides
have finished without error.

Fixes: #44925

PR-URL: #45133
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
confirmed-bug Issues with confirmed bugs. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants