diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index cbcbf1fd69cc3b..813da0f8ef52b8 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -210,11 +210,13 @@ function fromAsyncGen(fn) { const value = fn(async function*() { while (true) { const { chunk, done, cb } = await promise; + promise = null; + resolve = null; process.nextTick(cb); if (done) return; if (signal.aborted) throw new AbortError(); - yield chunk; ({ promise, resolve } = createDeferredPromise()); + yield chunk; } }(), { signal }); diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index 265b61dfd062f9..446768d6eef3e3 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -2,7 +2,7 @@ const common = require('../common'); const assert = require('assert'); -const { Duplex, Readable, Writable } = require('stream'); +const { Duplex, Readable, Writable, pipeline } = require('stream'); { const d = Duplex.from({ @@ -118,3 +118,29 @@ const { Duplex, Readable, Writable } = require('stream'); assert.strictEqual(d.readable, false); })); } + +{ + // /~https://github.com/nodejs/node/issues/40497 + pipeline( + ['abc\ndef\nghi'], + Duplex.from(async function * (source) { + let rest = ''; + for await (const chunk of source) { + const lines = (rest + chunk.toString()).split('\n'); + rest = lines.pop(); + for (const line of lines) { + yield line; + } + } + yield rest; + }), + async function * (source) { + let ret = ''; + for await (const x of source) { + ret += x; + } + assert.strictEqual(ret, 'abcdefghi'); + }, + common.mustCall(() => {}), + ); +}