-
Notifications
You must be signed in to change notification settings - Fork 30.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: flow for 'data' listeners upon removal of last 'readable' listener #21696
Conversation
/cc @nodejs/streams |
ping @mcollina @mafintosh. |
84c2cfa
to
945efbe
Compare
@mcollina CI has a seemingly not related failure, can we rerun or am I misunderstanding something? (I also tried --repeat 192 on my linux for those 3 tests and no failures). CITGM has a lot of build failures, I'm not sure how to interpret the results. Also, I've rebased on master just in case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
doc/api/stream.md
Outdated
only when [`stream.read()`][stream-read] is called. | ||
only when [`stream.read()`][stream-read] is called. But when last `'readable'` | ||
was removed stream will try to [`stream.resume()`][stream-resume] to fulfill | ||
`'data'` listeners. Therefore you can use it like this (bar.txt has 'abc\n' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid using you
in the docs. This last sentence can just be dropped.
945efbe
to
95c8e7b
Compare
@jasnell done. Though we will probably have to wait until CITGM machines are working properly again. |
I think landing this without a CITGM run is ok. CI: https://ci.nodejs.org/job/node-test-pull-request/16217/. |
So this basically restores the "magic" behaivor of the |
lib/_stream_readable.js
Outdated
if (!state.readableListening && | ||
self.listenerCount('data') > 0 && | ||
state.pipesCount === 0) { | ||
self.resume(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the stream was explicitly paused before? Something like:
stream.pause()
stream.on('data', ...)
stream.once('readable', ...)
Seems bad that this would auto resume that, if that is the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lundibundi can you please add a unit test in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I've added a test and now spotted a problem. After @mcollina PR to make readable take precedence over data there is no way now to determine whether it was user who called pause or we via (this.resume() in on('data') handler) because we use cf5f986#diff-ba6a0df0f5212f5cba5ca5179e209a17R878 and now state.flowing is false if we add 'data' listener after 'readable' listener even though user didn't explicitly pause the stream.
I'm not sure what's the correct solution here. If I add this.isPaused
check in here it will only work if a user called it like this
r.on('data', (chunk) => receivedData += chunk);
r.once('readable', common.mustCall());
and won't work for
r.once('readable', common.mustCall());
r.on('data', (chunk) => receivedData += chunk);
which is extremely bizarre.
I'm thinking of not setting flowing
at all until we have 0 'readable' listeners at which point check for data listeners and then set it to true/false (resume/pause).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking of not setting flowing at all until we have 0 'readable' listeners at which point check for data listeners and then set it to true/false (resume/pause).
I think that would work.
doc/api/stream.md
Outdated
- version: REPLACEME | ||
pr-url: /~https://github.com/nodejs/node/pull/21696 | ||
description: > | ||
When using both `'readable'` and [`'data'`][] stream will try to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: the stream
doc/api/stream.md
Outdated
@@ -825,7 +831,33 @@ result in increased throughput. | |||
|
|||
If both `'readable'` and [`'data'`][] are used at the same time, `'readable'` | |||
takes precedence in controlling the flow, i.e. `'data'` will be emitted | |||
only when [`stream.read()`][stream-read] is called. | |||
only when [`stream.read()`][stream-read] is called. But when last `'readable'` | |||
was removed stream will try to [`stream.resume()`][stream-resume] to fulfill |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: the stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks.
doc/api/stream.md
Outdated
console.log('readable'); | ||
}); | ||
rr.on('data', (chunk) => { | ||
console.log(chunk.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want to add rr.setEncoding('utf8')
, or pass the encoding: 'utf8'
option to createReadStream
rather than using .toString()
, to encourage best practices
(The current code could fail if bar.txt
contains UTF-8 characters outside the ASCII range)
lib/_stream_readable.js
Outdated
// (if pipesCount is not 0 then we have already 'flowed') | ||
if (!state.readableListening && | ||
self.listenerCount('data') > 0 && | ||
state.pipesCount === 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test covering what happens if the last readable
listener is removed but there is a pipe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
I’d prefer a CITGM run, btw. |
PTAL #21696 (comment) Also, currently if we have 'readable' listener on a Readable stream, if we pipe it to other stream nothing will happen, try this (uncomment/comment 'readable' listener): const data = ['foo', 'bar', 'baz'];
const r = new Readable({
read: () => {},
});
// r.on('readable', () => {});
r.pipe(process.stdout);
r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
r.push(null); |
#21696 (comment) is as documented. |
@mcollina Yeah we have that in the documentation of
Therefore we should either change the doc or the code. And imo the latter will be better as we will restore the ability to tell if user manually called I have a fix (the one I mentioned here #21696 (comment)) but for some reason |
The docs would need changing, as
I'm not sure about |
@mcollina But then the problem with Also, Maybe it will be better for me to push (my fix that also restores pipe) so you could take a look? |
When there is at least one 'data' listener try to flow when last 'readable' listener gets removed and the stream is not piped. Currently if we have both 'readable' and 'data' listeners set only 'readable' listener will get called and stream will be stalled without 'data' and 'end' events if 'readable' listener neither read()'s nor resume()'s the stream. Fixes: nodejs#21398
Now state.flowing will be set only after all of the 'readable' listeners are gone and if we have at least one 'data' listener. * on('data') will not flow (flowing === null and not false) if there are 'readable' listeners * pipe() will work regardless of 'readable' listeners * isPause reports only user .pause call (before setting 'data' listener when there is already 'readable' listener also set flowing to false) * resume always sets stream to flowing state
95c8e7b
to
c07abd7
Compare
I've added an alternative implementation of the fix: see #22209. |
Fixes: nodejs#21398 See: nodejs#21696 PR-URL: nodejs#22209 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Mathias Buus <mathiasbuus@gmail.com>
Fixed in 98cf84f. |
When there is at least one 'data' listener try to flow when
last 'readable' listener gets removed and the stream is not piped.
Currently if we have both 'readable' and 'data' listeners set only
'readable' listener will get called and stream will be stalled without
'data' and 'end' events if 'readable' listener neither read()'s nor
resume()'s the stream.
Fixes: #21398
Checklist
make -j4 test
(UNIX) passes