Skip to content

Commit

Permalink
stream: add FileHandle support to Read/WriteStream
Browse files Browse the repository at this point in the history
Support creating a Read/WriteStream from a
FileHandle instead of a raw file descriptor
Add an EventEmitter to FileHandle with a single
'close' event.

Fixes: #35240

PR-URL: #35922
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
mmomtchev authored and danielleadams committed Dec 7, 2020
1 parent 754b7a7 commit 6033d30
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 41 deletions.
20 changes: 18 additions & 2 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,10 @@ fs.copyFileSync('source.txt', 'destination.txt', COPYFILE_EXCL);
<!-- YAML
added: v0.1.31
changes:
- version:
- REPLACEME
pr-url: /~https://github.com/nodejs/node/pull/35922
description: The `fd` option accepts FileHandle arguments.
- version:
- v13.6.0
- v12.17.0
Expand Down Expand Up @@ -1782,7 +1786,7 @@ changes:
* `flags` {string} See [support of file system `flags`][]. **Default:**
`'r'`.
* `encoding` {string} **Default:** `null`
* `fd` {integer} **Default:** `null`
* `fd` {integer|FileHandle} **Default:** `null`
* `mode` {integer} **Default:** `0o666`
* `autoClose` {boolean} **Default:** `true`
* `emitClose` {boolean} **Default:** `false`
Expand Down Expand Up @@ -1858,6 +1862,10 @@ If `options` is a string, then it specifies the encoding.
<!-- YAML
added: v0.1.31
changes:
- version:
- REPLACEME
pr-url: /~https://github.com/nodejs/node/pull/35922
description: The `fd` option accepts FileHandle arguments.
- version:
- v13.6.0
- v12.17.0
Expand Down Expand Up @@ -1887,7 +1895,7 @@ changes:
* `flags` {string} See [support of file system `flags`][]. **Default:**
`'w'`.
* `encoding` {string} **Default:** `'utf8'`
* `fd` {integer} **Default:** `null`
* `fd` {integer|FileHandle} **Default:** `null`
* `mode` {integer} **Default:** `0o666`
* `autoClose` {boolean} **Default:** `true`
* `emitClose` {boolean} **Default:** `false`
Expand Down Expand Up @@ -4697,6 +4705,14 @@ the promise-based API uses the `FileHandle` class in order to help avoid
accidental leaking of unclosed file descriptors after a `Promise` is resolved or
rejected.

#### Event: `'close'`
<!-- YAML
added: REPLACEME
-->

The `'close'` event is emitted when the `FileHandle` and any of its underlying
resources (a file descriptor, for example) have been closed.

#### `filehandle.appendFile(data, options)`
<!-- YAML
added: v10.0.0
Expand Down
17 changes: 17 additions & 0 deletions lib/internal/event_target.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ const {
ArrayFrom,
Boolean,
Error,
FunctionPrototypeCall,
NumberIsInteger,
ObjectAssign,
ObjectDefineProperties,
ObjectDefineProperty,
ObjectGetOwnPropertyDescriptor,
ObjectGetOwnPropertyDescriptors,
ReflectApply,
SafeMap,
String,
Expand Down Expand Up @@ -646,8 +648,23 @@ function defineEventHandler(emitter, name) {
enumerable: true
});
}

const EventEmitterMixin = (Superclass) => {
class MixedEventEmitter extends Superclass {
constructor(...args) {
super(...args);
FunctionPrototypeCall(EventEmitter, this);
}
}
const protoProps = ObjectGetOwnPropertyDescriptors(EventEmitter.prototype);
delete protoProps.constructor;
ObjectDefineProperties(MixedEventEmitter.prototype, protoProps);
return MixedEventEmitter;
};

module.exports = {
Event,
EventEmitterMixin,
EventTarget,
NodeEventTarget,
defineEventHandler,
Expand Down
38 changes: 26 additions & 12 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,16 @@ const {
} = require('internal/validators');
const pathModule = require('path');
const { promisify } = require('internal/util');
const { EventEmitterMixin } = require('internal/event_target');

const kHandle = Symbol('kHandle');
const kFd = Symbol('kFd');
const kRefs = Symbol('kRefs');
const kClosePromise = Symbol('kClosePromise');
const kCloseResolve = Symbol('kCloseResolve');
const kCloseReject = Symbol('kCloseReject');
const kRef = Symbol('kRef');
const kUnref = Symbol('kUnref');

const { kUsePromises } = binding;
const {
Expand All @@ -94,7 +97,7 @@ const lazyDOMException = hideStackFrames((message, name) => {
return new DOMException(message, name);
});

class FileHandle extends JSTransferable {
class FileHandle extends EventEmitterMixin(JSTransferable) {
constructor(filehandle) {
super();
this[kHandle] = filehandle;
Expand Down Expand Up @@ -197,6 +200,7 @@ class FileHandle extends JSTransferable {
);
}

this.emit('close');
return this[kClosePromise];
}

Expand Down Expand Up @@ -226,6 +230,22 @@ class FileHandle extends JSTransferable {
this[kHandle] = handle;
this[kFd] = handle.fd;
}

[kRef]() {
this[kRefs]++;
}

[kUnref]() {
this[kRefs]--;
if (this[kRefs] === 0) {
this[kFd] = -1;
PromisePrototypeThen(
this[kHandle].close(),
this[kCloseResolve],
this[kCloseReject]
);
}
}
}

async function fsCall(fn, handle, ...args) {
Expand All @@ -242,18 +262,10 @@ async function fsCall(fn, handle, ...args) {
}

try {
handle[kRefs]++;
handle[kRef]();
return await fn(handle, ...args);
} finally {
handle[kRefs]--;
if (handle[kRefs] === 0) {
handle[kFd] = -1;
PromisePrototypeThen(
handle[kHandle].close(),
handle[kCloseResolve],
handle[kCloseReject]
);
}
handle[kUnref]();
}
}

Expand Down Expand Up @@ -712,5 +724,7 @@ module.exports = {
readFile,
},

FileHandle
FileHandle,
kRef,
kUnref,
};
68 changes: 65 additions & 3 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@

const {
Array,
FunctionPrototypeBind,
MathMin,
ObjectDefineProperty,
ObjectSetPrototypeOf,
PromisePrototypeThen,
ReflectApply,
Symbol,
} = primordials;

const {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE
ERR_OUT_OF_RANGE,
ERR_METHOD_NOT_IMPLEMENTED,
} = require('internal/errors').codes;
const { deprecate } = require('internal/util');
const { validateInteger } = require('internal/validators');
const { errorOrDestroy } = require('internal/streams/destroy');
const fs = require('fs');
const { kRef, kUnref, FileHandle } = require('internal/fs/promises');
const { Buffer } = require('buffer');
const {
copyObject,
Expand All @@ -28,6 +32,7 @@ const kIoDone = Symbol('kIoDone');
const kIsPerformingIO = Symbol('kIsPerformingIO');

const kFs = Symbol('kFs');
const kHandle = Symbol('kHandle');

function _construct(callback) {
const stream = this;
Expand Down Expand Up @@ -66,6 +71,35 @@ function _construct(callback) {
}
}

// This generates an fs operations structure for a FileHandle
const FileHandleOperations = (handle) => {
return {
open: (path, flags, mode, cb) => {
throw new ERR_METHOD_NOT_IMPLEMENTED('open()');
},
close: (fd, cb) => {
handle[kUnref]();
PromisePrototypeThen(handle.close(),
() => cb(), cb);
},
read: (fd, buf, offset, length, pos, cb) => {
PromisePrototypeThen(handle.read(buf, offset, length, pos),
(r) => cb(null, r.bytesRead, r.buffer),
(err) => cb(err, 0, buf));
},
write: (fd, buf, offset, length, pos, cb) => {
PromisePrototypeThen(handle.write(buf, offset, length, pos),
(r) => cb(null, r.bytesWritten, r.buffer),
(err) => cb(err, 0, buf));
},
writev: (fd, buffers, pos, cb) => {
PromisePrototypeThen(handle.writev(buffers, pos),
(r) => cb(null, r.bytesWritten, r.buffers),
(err) => cb(err, 0, buffers));
}
};
};

function close(stream, err, cb) {
if (!stream.fd) {
// TODO(ronag)
Expand All @@ -80,6 +114,32 @@ function close(stream, err, cb) {
}
}

function importFd(stream, options) {
stream.fd = null;
if (options.fd) {
if (typeof options.fd === 'number') {
// When fd is a raw descriptor, we must keep our fingers crossed
// that the descriptor won't get closed, or worse, replaced with
// another one
// /~https://github.com/nodejs/node/issues/35862
stream.fd = options.fd;
} else if (typeof options.fd === 'object' &&
options.fd instanceof FileHandle) {
// When fd is a FileHandle we can listen for 'close' events
if (options.fs)
// FileHandle is not supported with custom fs operations
throw new ERR_METHOD_NOT_IMPLEMENTED('FileHandle with fs');
stream[kHandle] = options.fd;
stream.fd = options.fd.fd;
stream[kFs] = FileHandleOperations(stream[kHandle]);
stream[kHandle][kRef]();
options.fd.on('close', FunctionPrototypeBind(stream.close, stream));
} else
throw ERR_INVALID_ARG_TYPE('options.fd',
['number', 'FileHandle'], options.fd);
}
}

function ReadStream(path, options) {
if (!(this instanceof ReadStream))
return new ReadStream(path, options);
Expand Down Expand Up @@ -115,10 +175,11 @@ function ReadStream(path, options) {

// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'r' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;

importFd(this, options);

this.start = options.start;
this.end = options.end;
this.pos = undefined;
Expand Down Expand Up @@ -287,10 +348,11 @@ function WriteStream(path, options) {

// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'w' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;

importFd(this, options);

this.start = options.start;
this.pos = undefined;
this.bytesWritten = 0;
Expand Down
50 changes: 50 additions & 0 deletions test/parallel/test-fs-promises-file-handle-read-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict';
const common = require('../common');
const fs = require('fs');
const assert = require('assert');
const path = require('path');
const tmpdir = require('../common/tmpdir');
const file = path.join(tmpdir.path, 'read_stream_filehandle_worker.txt');
const input = 'hello world';
const { Worker, isMainThread, workerData } = require('worker_threads');

if (isMainThread || !workerData) {
tmpdir.refresh();
fs.writeFileSync(file, input);

fs.promises.open(file, 'r').then((handle) => {
handle.on('close', common.mustNotCall());
new Worker(__filename, {
workerData: { handle },
transferList: [handle]
});
});
fs.promises.open(file, 'r').then((handle) => {
fs.createReadStream(null, { fd: handle });
assert.throws(() => {
new Worker(__filename, {
workerData: { handle },
transferList: [handle]
});
}, {
code: 25,
});
});
} else {
let output = '';

const handle = workerData.handle;
handle.on('close', common.mustCall());
const stream = fs.createReadStream(null, { fd: handle });

stream.on('data', common.mustCallAtLeast((data) => {
output += data;
}));

stream.on('end', common.mustCall(() => {
handle.close();
assert.strictEqual(output, input);
}));

stream.on('close', common.mustCall());
}
Loading

0 comments on commit 6033d30

Please sign in to comment.