-
Notifications
You must be signed in to change notification settings - Fork 30.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Duplex.from({ writable, readable }) breaks on backpressure #44925
Comments
Maybe a better issue title would be |
In The stream will temporarily stop reading data from the underlying resource, and new Transform({
objectMode: true,
emitClose: false,
highWaterMark:100,
... to avoid this. |
@xtx1130 this sounds like a bug, no? |
You're calling |
I think you are missing the point. It is not about a specific And |
Without vitest and promises (I expect to see const { PassThrough, Duplex, Transform, Readable } = require('node:stream');
// Simple pass-through as a placeholder for more complex setup
const through = new PassThrough({ objectMode: true });
// Stream prepared values, pipe through simple duplex and async transformer for backpressure
Readable.from(['foo', 'bar'], { objectMode: true })
.pipe(Duplex.from({
writable: through,
readable: through
}))
.pipe(new Transform({
objectMode: true,
highWaterMark: 1, // Setting 1 to force backpressure after a single item
transform(chunk, encoding, callback) {
setTimeout(() => callback(null, chunk), 0);
}
}))
.on('data', chunk => console.log(chunk)); |
Seems like |
@nodejs/streams |
Tried to clone NodeJS repository to write a test for this and found out there are other things broken :(. First thing I did was to change this line to make some existing test fail and start from there node/test/parallel/test-stream-duplex-from.js Line 143 in 36805e8
|
I'm a little low on time atm. But if someone can improve tests (i.e. add failing tests), then I could try to have a quick look at this. |
Interesting, if I implement the logic with import { Duplex, PassThrough, Readable, Transform } from 'node:stream';
// Hold node process for 3s to see result
setTimeout(() => {}, 2999)
// Simple pass-through as a placeholder for more complex setup
const through = new PassThrough({
objectMode: true,
highWaterMark: 1,
transform(chunk, encoding, callback) {
console.log('passthrough', chunk)
callback(null, chunk)
}
});
// Self implement of duplex
const duplex = new Duplex({
readableObjectMode: true,
writableObjectMode: true,
read(size) {
return this.push(passthrough.read(size))
},
write(chunk, encoding, callback) {
passthrough.write(chunk, encoding, callback)
}
})
// Stream prepared values, pipe through simple duplex and async transformer for backpressure
Readable.from(['foo', 'bar', 'baz'])
// working with self implemented duplex
.pipe(duplex)
// not working with Duplex.from
// .pipe(Duplex.from({
// writable: passthrough,
// readable: passthrough
// }))
.pipe(new Transform({
objectMode: true,
highWaterMark: 1, // Setting 1 to force backpressure after a single item
transform(chunk, encoding, callback) {
console.log('transform', chunk)
// setTimeout is not working with Duplex.from
setTimeout(() => {
callback(null, chunk)
}, 100);
}
}))
.on('data', (chunk) => { console.log('onData', chunk) }) The |
I have added failing test here - /~https://github.com/pavelhoral/node/tree/duplex-issue / pavelhoral@69c7752. The interesting thing is that it fails even without forcing the asynchronous processing (i.e. cc @ronag P.S.: there is another issue in tests there #44925 (comment) that should be fixed - pavelhoral@d5c069b |
|
I think this is not associated with |
Yes, it should be when the value is resolved or pushed in the |
Removed all the unnecessary code. It should be small enough to see the symptoms.
import { Duplex, PassThrough, Readable } from 'node:stream';
const passthrough = new PassThrough({ objectMode: true });
const stream = Readable.from(['foo', 'bar', 'baz'])
.pipe(Duplex.from({
writable: passthrough,
readable: passthrough
}))
.pipe(new PassThrough({ highWaterMark: 1 }))
for await (const chunk of stream) {
console.log('async iterator', chunk)
}
import { Duplex, PassThrough, Readable, Transform } from 'node:stream';
const passthrough = new PassThrough({ objectMode: true });
Readable.from(['foo', 'bar', 'baz'])
.pipe(Duplex.from({
writable: passthrough,
readable: passthrough
}))
.pipe(new Transform({
highWaterMark: 1,
transform(chunk, encoding, callback) {
console.log('transform', chunk)
// either one
setTimeout(() => callback(null, chunk), 0)
// setImmediate(() => callback(null, chunk))
}
})) |
@climba03003 // ...
.pipe(new Transform({
highWaterMark: 1,
transform(chunk, encoding, callback) {
// ...
}
})) |
@xtx1130 Updated |
Does this have anything to do with the observed behaviour? node/lib/internal/streams/transform.js Line 99 in bda460d
Maybe the issue has to do more with |
Simpler tes: const through = new PassThrough({ objectMode: true });
let res = '';
const d = Readable.from(['foo', 'bar'], { objectMode: true })
.pipe(Duplex.from({
writable: through,
readable: through
}));
d.on('data', (data) => {
d.pause();
process.nextTick(() => {
process.nextTick(() => {
d.resume()
});
});
res += data;
}).on('end', common.mustCall(() => {
assert.strictEqual(res, 'foobar');
})); Seems related to pause, tick, tick and resume... somehow... note there has to be 2 ticks before resume. |
The duplexified Duplex should be autoDestroyed instead of prematurely destroyed when the readable and writable sides have finished without error. Fixes: nodejs#44925
The duplexified Duplex should be autoDestroyed instead of prematurely destroyed when the readable and writable sides have finished without error. Fixes: #44925 PR-URL: #45133 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
The duplexified Duplex should be autoDestroyed instead of prematurely destroyed when the readable and writable sides have finished without error. Fixes: #44925 PR-URL: #45133 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
The duplexified Duplex should be autoDestroyed instead of prematurely destroyed when the readable and writable sides have finished without error. Fixes: #44925 PR-URL: #45133 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
The duplexified Duplex should be autoDestroyed instead of prematurely destroyed when the readable and writable sides have finished without error. Fixes: #44925 PR-URL: #45133 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
The duplexified Duplex should be autoDestroyed instead of prematurely destroyed when the readable and writable sides have finished without error. Fixes: #44925 PR-URL: #45133 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
Version
v18.10.0
Platform
Microsoft Windows NT 10.0.19044.0 x64
Subsystem
No response
What steps will reproduce the bug?
I have created repository with failing test case - /~https://github.com/pavelhoral/node-duplexify-issue that can be cloned and run:
How often does it reproduce? Is there a required condition?
The issue happens every time in my test case when the internal buffers are full and read must be paused (I guess this condition must happen - /~https://github.com/nodejs/node/blob/v18.10.0/lib/internal/streams/duplexify.js#L350).
What is the expected behavior?
Stream should correctly finish processing after read buffers are free again.
What do you see instead?
Duplex stream never correctly resumes its operation after the read has to pause.
Additional information
No response
The text was updated successfully, but these errors were encountered: