Skip to content

Commit

Permalink
streams: use private symbol for bitmap state
Browse files Browse the repository at this point in the history
PR-URL: nodejs#49993
  • Loading branch information
ronag committed Oct 1, 2023
1 parent aadfea4 commit eedb2d8
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 142 deletions.
45 changes: 23 additions & 22 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const {
const { validateObject } = require('internal/validators');

const kPaused = Symbol('kPaused');
const kState = Symbol('kState');

const { StringDecoder } = require('string_decoder');
const from = require('internal/streams/from');
Expand Down Expand Up @@ -107,10 +108,10 @@ const kDataEmitted = 1 << 18;
function makeBitMapDescriptor(bit) {
return {
enumerable: false,
get() { return (this.state & bit) !== 0; },
get() { return (this[kState] & bit) !== 0; },
set(value) {
if (value) this.state |= bit;
else this.state &= ~bit;
if (value) this[kState] |= bit;
else this[kState] &= ~bit;
},
};
}
Expand Down Expand Up @@ -163,13 +164,13 @@ function ReadableState(options, stream, isDuplex) {

// Bit map field to store ReadableState more effciently with 1 bit per field
// instead of a V8 slot per field.
this.state = kEmitClose | kAutoDestroy | kConstructed | kSync;
this[kState] = kEmitClose | kAutoDestroy | kConstructed | kSync;
// Object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away.
if (options && options.objectMode) this.state |= kObjectMode;
if (options && options.objectMode) this[kState] |= kObjectMode;

if (isDuplex && options && options.readableObjectMode)
this.state |= kObjectMode;
this[kState] |= kObjectMode;

// The point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
Expand All @@ -188,10 +189,10 @@ function ReadableState(options, stream, isDuplex) {
this[kPaused] = null;

// Should close be emitted on destroy. Defaults to true.
if (options && options.emitClose === false) this.state &= ~kEmitClose;
if (options && options.emitClose === false) this[kState] &= ~kEmitClose;

// Should .destroy() be called after 'end' (and potentially 'finish').
if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy;
if (options && options.autoDestroy === false) this[kState] &= ~kAutoDestroy;


// Indicates whether the stream has errored. When true no further
Expand Down Expand Up @@ -296,7 +297,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
const state = stream._readableState;

let err;
if ((state.state & kObjectMode) === 0) {
if ((state[kState] & kObjectMode) === 0) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
Expand All @@ -323,11 +324,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
if (err) {
errorOrDestroy(stream, err);
} else if (chunk === null) {
state.state &= ~kReading;
state[kState] &= ~kReading;
onEofChunk(stream, state);
} else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
} else if (((state[kState] & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
if (addToFront) {
if ((state.state & kEndEmitted) !== 0)
if ((state[kState] & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
return false;
Expand All @@ -338,7 +339,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
} else if (state.destroyed || state.errored) {
return false;
} else {
state.state &= ~kReading;
state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
Expand All @@ -350,7 +351,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
}
}
} else if (!addToFront) {
state.state &= ~kReading;
state[kState] &= ~kReading;
maybeReadMore(stream, state);
}

Expand All @@ -366,7 +367,7 @@ function addChunk(stream, state, chunk, addToFront) {
stream.listenerCount('data') > 0) {
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
if ((state.state & kMultiAwaitDrain) !== 0) {
if ((state[kState] & kMultiAwaitDrain) !== 0) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
Expand All @@ -382,7 +383,7 @@ function addChunk(stream, state, chunk, addToFront) {
else
state.buffer.push(chunk);

if ((state.state & kNeedReadable) !== 0)
if ((state[kState] & kNeedReadable) !== 0)
emitReadable(stream);
}
maybeReadMore(stream, state);
Expand Down Expand Up @@ -437,7 +438,7 @@ function computeNewHighWaterMark(n) {
function howMuchToRead(n, state) {
if (n <= 0 || (state.length === 0 && state.ended))
return 0;
if ((state.state & kObjectMode) !== 0)
if ((state[kState] & kObjectMode) !== 0)
return 1;
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
Expand Down Expand Up @@ -468,7 +469,7 @@ Readable.prototype.read = function(n) {
state.highWaterMark = computeNewHighWaterMark(n);

if (n !== 0)
state.state &= ~kEmittedReadable;
state[kState] &= ~kEmittedReadable;

// If we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
Expand Down Expand Up @@ -519,7 +520,7 @@ Readable.prototype.read = function(n) {
// 3. Actually pull the requested chunks out of the buffer and return.

// if we need a readable event, then we need to do some reading.
let doRead = (state.state & kNeedReadable) !== 0;
let doRead = (state[kState] & kNeedReadable) !== 0;
debug('need readable', doRead);

// If we currently have less than the highWaterMark, then also read some.
Expand All @@ -537,18 +538,18 @@ Readable.prototype.read = function(n) {
debug('reading, ended or constructing', doRead);
} else if (doRead) {
debug('do read');
state.state |= kReading | kSync;
state[kState] |= kReading | kSync;
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.state |= kNeedReadable;
state[kState] |= kNeedReadable;

// Call internal read method
try {
this._read(state.highWaterMark);
} catch (err) {
errorOrDestroy(this, err);
}
state.state &= ~kSync;
state[kState] &= ~kSync;

// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
Expand Down
Loading

0 comments on commit eedb2d8

Please sign in to comment.