From 2e9f7284a13c4bf942e2db2a8142cda31d54a94e Mon Sep 17 00:00:00 2001 From: Stephen Belanger Date: Fri, 31 Mar 2023 10:40:30 -0700 Subject: [PATCH] lib: add tracing channel to diagnostics_channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-URL: /~https://github.com/nodejs/node/pull/44943 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: Chengzhong Wu Reviewed-By: Colin Ihrig Reviewed-By: Gerhard Stöbich Reviewed-By: Rafael Gonzaga Reviewed-By: Bryan English --- doc/api/diagnostics_channel.md | 647 ++++++++++++++++++ lib/diagnostics_channel.js | 307 ++++++++- src/node_util.cc | 13 +- src/node_util.h | 1 - .../test-diagnostics-channel-bind-store.js | 108 +++ ...ics-channel-tracing-channel-async-error.js | 46 ++ ...agnostics-channel-tracing-channel-async.js | 60 ++ ...nel-tracing-channel-callback-run-stores.js | 29 + ...nnel-tracing-channel-promise-run-stores.js | 24 + ...tics-channel-tracing-channel-run-stores.js | 21 + ...tics-channel-tracing-channel-sync-error.js | 39 ++ ...iagnostics-channel-tracing-channel-sync.js | 46 ++ tools/doc/type-parser.mjs | 3 + 13 files changed, 1308 insertions(+), 36 deletions(-) create mode 100644 test/parallel/test-diagnostics-channel-bind-store.js create mode 100644 test/parallel/test-diagnostics-channel-tracing-channel-async-error.js create mode 100644 test/parallel/test-diagnostics-channel-tracing-channel-async.js create mode 100644 test/parallel/test-diagnostics-channel-tracing-channel-callback-run-stores.js create mode 100644 test/parallel/test-diagnostics-channel-tracing-channel-promise-run-stores.js create mode 100644 test/parallel/test-diagnostics-channel-tracing-channel-run-stores.js create mode 100644 test/parallel/test-diagnostics-channel-tracing-channel-sync-error.js create mode 100644 test/parallel/test-diagnostics-channel-tracing-channel-sync.js diff --git a/doc/api/diagnostics_channel.md b/doc/api/diagnostics_channel.md index 1c174f4f6f9f64..59e147e77ee60c 100644 --- a/doc/api/diagnostics_channel.md +++ b/doc/api/diagnostics_channel.md @@ -227,6 +227,56 @@ diagnostics_channel.subscribe('my-channel', onMessage); diagnostics_channel.unsubscribe('my-channel', onMessage); ``` +#### `diagnostics_channel.tracingChannel(nameOrChannels)` + + + +> Stability: 1 - Experimental + +* `nameOrChannels` {string|TracingChannel} Channel name or + object containing all the [TracingChannel Channels][] +* Returns: {TracingChannel} Collection of channels to trace with + +Creates a [`TracingChannel`][] wrapper for the given +[TracingChannel Channels][]. If a name is given, the corresponding tracing +channels will be created in the form of `tracing:${name}:${eventType}` where +`eventType` corresponds to the types of [TracingChannel Channels][]. + +```mjs +import diagnostics_channel from 'node:diagnostics_channel'; + +const channelsByName = diagnostics_channel.tracingChannel('my-channel'); + +// or... + +const channelsByCollection = diagnostics_channel.tracingChannel({ + start: diagnostics_channel.channel('tracing:my-channel:start'), + end: diagnostics_channel.channel('tracing:my-channel:end'), + asyncStart: diagnostics_channel.channel('tracing:my-channel:asyncStart'), + asyncEnd: diagnostics_channel.channel('tracing:my-channel:asyncEnd'), + error: diagnostics_channel.channel('tracing:my-channel:error'), +}); +``` + +```cjs +const diagnostics_channel = require('node:diagnostics_channel'); + +const channelsByName = diagnostics_channel.tracingChannel('my-channel'); + +// or... + +const channelsByCollection = diagnostics_channel.tracingChannel({ + start: diagnostics_channel.channel('tracing:my-channel:start'), + end: diagnostics_channel.channel('tracing:my-channel:end'), + asyncStart: diagnostics_channel.channel('tracing:my-channel:asyncStart'), + asyncEnd: diagnostics_channel.channel('tracing:my-channel:asyncEnd'), + error: diagnostics_channel.channel('tracing:my-channel:error'), +}); +``` + ### Class: `Channel` + +> Stability: 1 - Experimental + +* `store` {AsyncLocalStorage} The store to which to bind the context data +* `transform` {Function} Transform context data before setting the store context + +When [`channel.runStores(context, ...)`][] is called, the given context data +will be applied to any store bound to the channel. If the store has already been +bound the previous `transform` function will be replaced with the new one. +The `transform` function may be omitted to set the given context data as the +context directly. + +```mjs +import diagnostics_channel from 'node:diagnostics_channel'; +import { AsyncLocalStorage } from 'node:async_hooks'; + +const store = new AsyncLocalStorage(); + +const channel = diagnostics_channel.channel('my-channel'); + +channel.bindStore(store, (data) => { + return { data }; +}); +``` + +```cjs +const diagnostics_channel = require('node:diagnostics_channel'); +const { AsyncLocalStorage } = require('node:async_hooks'); + +const store = new AsyncLocalStorage(); + +const channel = diagnostics_channel.channel('my-channel'); + +channel.bindStore(store, (data) => { + return { data }; +}); +``` + +#### `channel.unbindStore(store)` + + + +> Stability: 1 - Experimental + +* `store` {AsyncLocalStorage} The store to unbind from the channel. +* Returns: {boolean} `true` if the store was found, `false` otherwise. + +Remove a message handler previously registered to this channel with +[`channel.bindStore(store)`][]. + +```mjs +import diagnostics_channel from 'node:diagnostics_channel'; +import { AsyncLocalStorage } from 'node:async_hooks'; + +const store = new AsyncLocalStorage(); + +const channel = diagnostics_channel.channel('my-channel'); + +channel.bindStore(store); +channel.unbindStore(store); +``` + +```cjs +const diagnostics_channel = require('node:diagnostics_channel'); +const { AsyncLocalStorage } = require('node:async_hooks'); + +const store = new AsyncLocalStorage(); + +const channel = diagnostics_channel.channel('my-channel'); + +channel.bindStore(store); +channel.unbindStore(store); +``` + +#### `channel.runStores(context, fn[, thisArg[, ...args]])` + + + +> Stability: 1 - Experimental + +* `context` {any} Message to send to subscribers and bind to stores +* `fn` {Function} Handler to run within the entered storage context +* `thisArg` {any} The receiver to be used for the function call. +* `...args` {any} Optional arguments to pass to the function. + +Applies the given data to any AsyncLocalStorage instances bound to the channel +for the duration of the given function, then publishes to the channel within +the scope of that data is applied to the stores. + +If a transform function was given to [`channel.bindStore(store)`][] it will be +applied to transform the message data before it becomes the context value for +the store. The prior storage context is accessible from within the transform +function in cases where context linking is required. + +The context applied to the store should be accesible in any async code which +continues from execution which began during the given function, however +there are some situations in which [context loss][] may occur. + +```mjs +import diagnostics_channel from 'node:diagnostics_channel'; +import { AsyncLocalStorage } from 'node:async_hooks'; + +const store = new AsyncLocalStorage(); + +const channel = diagnostics_channel.channel('my-channel'); + +channel.bindStore(store, (message) => { + const parent = store.getStore(); + return new Span(message, parent); +}); +channel.runStores({ some: 'message' }, () => { + store.getStore(); // Span({ some: 'message' }) +}); +``` + +```cjs +const diagnostics_channel = require('node:diagnostics_channel'); +const { AsyncLocalStorage } = require('node:async_hooks'); + +const store = new AsyncLocalStorage(); + +const channel = diagnostics_channel.channel('my-channel'); + +channel.bindStore(store, (message) => { + const parent = store.getStore(); + return new Span(message, parent); +}); +channel.runStores({ some: 'message' }, () => { + store.getStore(); // Span({ some: 'message' }) +}); +``` + +### Class: `TracingChannel` + + + +> Stability: 1 - Experimental + +The class `TracingChannel` is a collection of [TracingChannel Channels][] which +together express a single traceable action. It is used to formalize and +simplify the process of producing events for tracing application flow. +[`diagnostics_channel.tracingChannel()`][] is used to construct a +`TracingChannel`. As with `Channel` it is recommended to create and reuse a +single `TracingChannel` at the top-level of the file rather than creating them +dynamically. + +#### `tracingChannel.subscribe(subscribers)` + + + +> Stability: 1 - Experimental + +* `subscribers` {Object} Set of [TracingChannel Channels][] subscribers + * `start` {Function} The [`start` event][] subscriber + * `end` {Function} The [`end` event][] subscriber + * `asyncStart` {Function} The [`asyncStart` event][] subscriber + * `asyncEnd` {Function} The [`asyncEnd` event][] subscriber + * `error` {Function} The [`error` event][] subscriber + +Helper to subscribe a collection of functions to the corresponding channels. +This is the same as calling [`channel.subscribe(onMessage)`][] on each channel +individually. + +```mjs +import diagnostics_channel from 'node:diagnostics_channel'; + +const channels = diagnostics_channel.tracingChannel('my-channel'); + +channels.subscribe({ + start(message) { + // Handle start message + }, + end(message) { + // Handle end message + }, + asyncStart(message) { + // Handle asyncStart message + }, + asyncEnd(message) { + // Handle asyncEnd message + }, + error(message) { + // Handle error message + }, +}); +``` + +```cjs +const diagnostics_channel = require('node:diagnostics_channel'); + +const channels = diagnostics_channel.tracingChannel('my-channel'); + +channels.subscribe({ + start(message) { + // Handle start message + }, + end(message) { + // Handle end message + }, + asyncStart(message) { + // Handle asyncStart message + }, + asyncEnd(message) { + // Handle asyncEnd message + }, + error(message) { + // Handle error message + }, +}); +``` + +#### `tracingChannel.unsubscribe(subscribers)` + + + +> Stability: 1 - Experimental + +* `subscribers` {Object} Set of [TracingChannel Channels][] subscribers + * `start` {Function} The [`start` event][] subscriber + * `end` {Function} The [`end` event][] subscriber + * `asyncStart` {Function} The [`asyncStart` event][] subscriber + * `asyncEnd` {Function} The [`asyncEnd` event][] subscriber + * `error` {Function} The [`error` event][] subscriber +* Returns: {boolean} `true` if all handlers were successfully unsubscribed, + and `false` otherwise. + +Helper to unsubscribe a collection of functions from the corresponding channels. +This is the same as calling [`channel.unsubscribe(onMessage)`][] on each channel +individually. + +```mjs +import diagnostics_channel from 'node:diagnostics_channel'; + +const channels = diagnostics_channel.tracingChannel('my-channel'); + +channels.unsubscribe({ + start(message) { + // Handle start message + }, + end(message) { + // Handle end message + }, + asyncStart(message) { + // Handle asyncStart message + }, + asyncEnd(message) { + // Handle asyncEnd message + }, + error(message) { + // Handle error message + }, +}); +``` + +```cjs +const diagnostics_channel = require('node:diagnostics_channel'); + +const channels = diagnostics_channel.tracingChannel('my-channel'); + +channels.unsubscribe({ + start(message) { + // Handle start message + }, + end(message) { + // Handle end message + }, + asyncStart(message) { + // Handle asyncStart message + }, + asyncEnd(message) { + // Handle asyncEnd message + }, + error(message) { + // Handle error message + }, +}); +``` + +#### `tracingChannel.traceSync(fn[, context[, thisArg[, ...args]]])` + + + +> Stability: 1 - Experimental + +* `fn` {Function} Function to wrap a trace around +* `context` {Object} Shared object to correlate events through +* `thisArg` {any} The receiver to be used for the function call +* `...args` {any} Optional arguments to pass to the function +* Returns: {any} The return value of the given function + +Trace a synchronous function call. This will always produce a [`start` event][] +and [`end` event][] around the execution and may produce an [`error` event][] +if the given function throws an error. This will run the given function using +[`channel.runStores(context, ...)`][] on the `start` channel which ensures all +events should have any bound stores set to match this trace context. + +```mjs +import diagnostics_channel from 'node:diagnostics_channel'; + +const channels = diagnostics_channel.tracingChannel('my-channel'); + +channels.traceSync(() => { + // Do something +}, { + some: 'thing', +}); +``` + +```cjs +const diagnostics_channel = require('node:diagnostics_channel'); + +const channels = diagnostics_channel.tracingChannel('my-channel'); + +channels.traceSync(() => { + // Do something +}, { + some: 'thing', +}); +``` + +#### `tracingChannel.tracePromise(fn[, context[, thisArg[, ...args]]])` + + + +> Stability: 1 - Experimental + +* `fn` {Function} Promise-returning function to wrap a trace around +* `context` {Object} Shared object to correlate trace events through +* `thisArg` {any} The receiver to be used for the function call +* `...args` {any} Optional arguments to pass to the function +* Returns: {Promise} Chained from promise returned by the given function + +Trace a promise-returning function call. This will always produce a +[`start` event][] and [`end` event][] around the synchronous portion of the +function execution, and will produce an [`asyncStart` event][] and +[`asyncEnd` event][] when a promise continuation is reached. It may also +produce an [`error` event][] if the given function throws an error or the +returned promise rejects. This will run the given function using +[`channel.runStores(context, ...)`][] on the `start` channel which ensures all +events should have any bound stores set to match this trace context. + +```mjs +import diagnostics_channel from 'node:diagnostics_channel'; + +const channels = diagnostics_channel.tracingChannel('my-channel'); + +channels.tracePromise(async () => { + // Do something +}, { + some: 'thing', +}); +``` + +```cjs +const diagnostics_channel = require('node:diagnostics_channel'); + +const channels = diagnostics_channel.tracingChannel('my-channel'); + +channels.tracePromise(async () => { + // Do something +}, { + some: 'thing', +}); +``` + +#### `tracingChannel.traceCallback(fn[, position[, context[, thisArg[, ...args]]]])` + + + +> Stability: 1 - Experimental + +* `fn` {Function} callback using function to wrap a trace around +* `position` {number} Zero-indexed argument position of expected callback +* `context` {Object} Shared object to correlate trace events through +* `thisArg` {any} The receiver to be used for the function call +* `...args` {any} Optional arguments to pass to the function +* Returns: {any} The return value of the given function + +Trace a callback-receiving function call. This will always produce a +[`start` event][] and [`end` event][] around the synchronous portion of the +function execution, and will produce a [`asyncStart` event][] and +[`asyncEnd` event][] around the callback execution. It may also produce an +[`error` event][] if the given function throws an error or the returned +promise rejects. This will run the given function using +[`channel.runStores(context, ...)`][] on the `start` channel which ensures all +events should have any bound stores set to match this trace context. + +The `position` will be -1 by default to indicate the final argument should +be used as the callback. + +```mjs +import diagnostics_channel from 'node:diagnostics_channel'; + +const channels = diagnostics_channel.tracingChannel('my-channel'); + +channels.traceCallback((arg1, callback) => { + // Do something + callback(null, 'result'); +}, 1, { + some: 'thing', +}, thisArg, arg1, callback); +``` + +```cjs +const diagnostics_channel = require('node:diagnostics_channel'); + +const channels = diagnostics_channel.tracingChannel('my-channel'); + +channels.traceCallback((arg1, callback) => { + // Do something + callback(null, 'result'); +}, { + some: 'thing', +}, thisArg, arg1, callback); +``` + +The callback will also be run with [`channel.runStores(context, ...)`][] which +enables context loss recovery in some cases. + +```mjs +import diagnostics_channel from 'node:diagnostics_channel'; +import { AsyncLocalStorage } from 'node:async_hooks'; + +const channels = diagnostics_channel.tracingChannel('my-channel'); +const myStore = new AsyncLocalStorage(); + +// The start channel sets the initial store data to something +// and stores that store data value on the trace context object +channels.start.bindStore(myStore, (data) => { + const span = new Span(data); + data.span = span; + return span; +}); + +// Then asyncStart can restore from that data it stored previously +channels.asyncStart.bindStore(myStore, (data) => { + return data.span; +}); +``` + +```cjs +const diagnostics_channel = require('node:diagnostics_channel'); +const { AsyncLocalStorage } = require('node:async_hooks'); + +const channels = diagnostics_channel.tracingChannel('my-channel'); +const myStore = new AsyncLocalStorage(); + +// The start channel sets the initial store data to something +// and stores that store data value on the trace context object +channels.start.bindStore(myStore, (data) => { + const span = new Span(data); + data.span = span; + return span; +}); + +// Then asyncStart can restore from that data it stored previously +channels.asyncStart.bindStore(myStore, (data) => { + return data.span; +}); +``` + +### TracingChannel Channels + +A TracingChannel is a collection of several diagnostics\_channels representing +specific points in the execution lifecycle of a single traceable action. The +behaviour is split into five diagnostics\_channels consisting of `start`, +`end`, `asyncStart`, `asyncEnd`, and `error`. A single traceable action will +share the same event object between all events, this can be helpful for +managing correlation through a weakmap. + +These event objects will be extended with `result` or `error` values when +the task "completes". In the case of a synchronous task the `result` will be +the return value and the `error` will be anything thrown from the function. +With callback-based async functions the `result` will be the second argument +of the callback while the `error` will either be a thrown error visible in the +`end` event or the first callback argument in either of the `asyncStart` or +`asyncEnd` events. + +Tracing channels should follow a naming pattern of: + +* `tracing:module.class.method:start` or `tracing:module.function:start` +* `tracing:module.class.method:end` or `tracing:module.function:end` +* `tracing:module.class.method:asyncStart` or `tracing:module.function:asyncStart` +* `tracing:module.class.method:asyncEnd` or `tracing:module.function:asyncEnd` +* `tracing:module.class.method:error` or `tracing:module.function:error` + +#### `start(event)` + +* Name: `tracing:${name}:start` + +The `start` event represents the point at which a function is called. At this +point the event data may contain function arguments or anything else available +at the very start of the execution of the function. + +#### `end(event)` + +* Name: `tracing:${name}:end` + +The `end` event represents the point at which a function call returns a value. +In the case of an async function this is when the promise returned not when the +function itself makes a return statement internally. At this point, if the +traced function was synchronous the `result` field will be set to the return +value of the function. Alternatively, the `error` field may be present to +represent any thrown errors. + +It is recommended to listen specifically to the `error` event to track errors +as it may be possible for a traceable action to produce multiple errors. For +example, an async task which fails may be started internally before the sync +part of the task then throws an error. + +#### `asyncStart(event)` + +* Name: `tracing:${name}:asyncStart` + +The `asyncStart` event represents the callback or continuation of a traceable +function being reached. At this point things like callback arguments may be +available, or anything else expressing the "result" of the action. + +For callbacks-based functions, the first argument of the callback will be +assigned to the `error` field, if not `undefined` or `null`, and the second +argument will be assigned to the `result` field. + +For promises, the argument to the `resolve` path will be assigned to `result` +or the argument to the `reject` path will be assign to `error`. + +It is recommended to listen specifically to the `error` event to track errors +as it may be possible for a traceable action to produce multiple errors. For +example, an async task which fails may be started internally before the sync +part of the task then throws an error. + +#### `asyncEnd(event)` + +* Name: `tracing:${name}:asyncEnd` + +The `asyncEnd` event represents the callback of an asynchronous function +returning. It's not likely event data will change after the `asyncStart` event, +however it may be useful to see the point where the callback completes. + +#### `error(event)` + +* Name: `tracing:${name}:error` + +The `error` event represents any error produced by the traceable function +either synchronously or asynchronously. If an error is thrown in the +synchronous portion of the traced function the error will be assigned to the +`error` field of the event and the `error` event will be triggered. If an error +is received asynchronously through a callback or promise rejection it will also +be assigned to the `error` field of the event and trigger the `error` event. + +It is possible for a single traceable function call to produce errors multiple +times so this should be considered when consuming this event. For example, if +another async task is triggered internally which fails and then the sync part +of the function then throws and error two `error` events will be emitted, one +for the sync error and one for the async error. + ### Built-in Channels > Stability: 1 - Experimental @@ -462,8 +1097,20 @@ Emitted when a new TCP or pipe connection is received. Emitted when a new UDP socket is created. +[TracingChannel Channels]: #tracingchannel-channels [`'uncaughtException'`]: process.md#event-uncaughtexception +[`TracingChannel`]: #class-tracingchannel +[`asyncEnd` event]: #asyncendevent +[`asyncStart` event]: #asyncstartevent +[`channel.bindStore(store)`]: #channelbindstorestore-transform +[`channel.runStores(context, ...)`]: #channelrunstorescontext-fn-thisarg-args [`channel.subscribe(onMessage)`]: #channelsubscribeonmessage +[`channel.unsubscribe(onMessage)`]: #channelunsubscribeonmessage [`diagnostics_channel.channel(name)`]: #diagnostics_channelchannelname [`diagnostics_channel.subscribe(name, onMessage)`]: #diagnostics_channelsubscribename-onmessage +[`diagnostics_channel.tracingChannel()`]: #diagnostics_channeltracingchannelnameorchannels [`diagnostics_channel.unsubscribe(name, onMessage)`]: #diagnostics_channelunsubscribename-onmessage +[`end` event]: #endevent +[`error` event]: #errorevent +[`start` event]: #startevent +[context loss]: async_context.md#troubleshooting-context-loss diff --git a/lib/diagnostics_channel.js b/lib/diagnostics_channel.js index c20a9bffc8610b..e8c5529167a078 100644 --- a/lib/diagnostics_channel.js +++ b/lib/diagnostics_channel.js @@ -4,9 +4,14 @@ const { ArrayPrototypeIndexOf, ArrayPrototypePush, ArrayPrototypeSplice, - ObjectCreate, ObjectGetPrototypeOf, ObjectSetPrototypeOf, + Promise, + PromisePrototypeThen, + PromiseResolve, + PromiseReject, + ReflectApply, + SafeMap, SymbolHasInstance, } = primordials; @@ -23,11 +28,59 @@ const { triggerUncaughtException } = internalBinding('errors'); const { WeakReference } = internalBinding('util'); +function decRef(channel) { + if (channels.get(channel.name).decRef() === 0) { + channels.delete(channel.name); + } +} + +function incRef(channel) { + channels.get(channel.name).incRef(); +} + +function markActive(channel) { + // eslint-disable-next-line no-use-before-define + ObjectSetPrototypeOf(channel, ActiveChannel.prototype); + channel._subscribers = []; + channel._stores = new SafeMap(); +} + +function maybeMarkInactive(channel) { + // When there are no more active subscribers or bound, restore to fast prototype. + if (!channel._subscribers.length && !channel._stores.size) { + // eslint-disable-next-line no-use-before-define + ObjectSetPrototypeOf(channel, Channel.prototype); + channel._subscribers = undefined; + channel._stores = undefined; + } +} + +function defaultTransform(data) { + return data; +} + +function wrapStoreRun(store, data, next, transform = defaultTransform) { + return () => { + let context; + try { + context = transform(data); + } catch (err) { + process.nextTick(() => { + triggerUncaughtException(err, false); + }); + return next(); + } + + return store.run(context, next); + }; +} + // TODO(qard): should there be a C++ channel interface? class ActiveChannel { subscribe(subscription) { validateFunction(subscription, 'subscription'); ArrayPrototypePush(this._subscribers, subscription); + incRef(this); } unsubscribe(subscription) { @@ -36,12 +89,28 @@ class ActiveChannel { ArrayPrototypeSplice(this._subscribers, index, 1); - // When there are no more active subscribers, restore to fast prototype. - if (!this._subscribers.length) { - // eslint-disable-next-line no-use-before-define - ObjectSetPrototypeOf(this, Channel.prototype); + decRef(this); + maybeMarkInactive(this); + + return true; + } + + bindStore(store, transform) { + const replacing = this._stores.has(store); + if (!replacing) incRef(this); + this._stores.set(store, transform); + } + + unbindStore(store) { + if (!this._stores.has(store)) { + return false; } + this._stores.delete(store); + + decRef(this); + maybeMarkInactive(this); + return true; } @@ -61,12 +130,30 @@ class ActiveChannel { } } } + + runStores(data, fn, thisArg, ...args) { + let run = () => { + this.publish(data); + return ReflectApply(fn, thisArg, args); + }; + + for (const entry of this._stores.entries()) { + const store = entry[0]; + const transform = entry[1]; + run = wrapStoreRun(store, data, run, transform); + } + + return run(); + } } class Channel { constructor(name) { this._subscribers = undefined; + this._stores = undefined; this.name = name; + + channels.set(name, new WeakReference(this)); } static [SymbolHasInstance](instance) { @@ -76,8 +163,7 @@ class Channel { } subscribe(subscription) { - ObjectSetPrototypeOf(this, ActiveChannel.prototype); - this._subscribers = []; + markActive(this); this.subscribe(subscription); } @@ -85,18 +171,31 @@ class Channel { return false; } + bindStore(store, transform) { + markActive(this); + this.bindStore(store, transform); + } + + unbindStore() { + return false; + } + get hasSubscribers() { return false; } publish() {} + + runStores(data, fn, thisArg, ...args) { + return ReflectApply(fn, thisArg, args); + } } -const channels = ObjectCreate(null); +const channels = new SafeMap(); function channel(name) { let channel; - const ref = channels[name]; + const ref = channels.get(name); if (ref) channel = ref.get(); if (channel) return channel; @@ -104,33 +203,20 @@ function channel(name) { throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name); } - channel = new Channel(name); - channels[name] = new WeakReference(channel); - return channel; + return new Channel(name); } function subscribe(name, subscription) { - const chan = channel(name); - channels[name].incRef(); - chan.subscribe(subscription); + return channel(name).subscribe(subscription); } function unsubscribe(name, subscription) { - const chan = channel(name); - if (!chan.unsubscribe(subscription)) { - return false; - } - - channels[name].decRef(); - if (channels[name].getRef() === 0) { - delete channels[name]; - } - return true; + return channel(name).unsubscribe(subscription); } function hasSubscribers(name) { let channel; - const ref = channels[name]; + const ref = channels.get(name); if (ref) channel = ref.get(); if (!channel) { return false; @@ -139,10 +225,179 @@ function hasSubscribers(name) { return channel.hasSubscribers; } +const traceEvents = [ + 'start', + 'end', + 'asyncStart', + 'asyncEnd', + 'error', +]; + +function assertChannel(value, name) { + if (!(value instanceof Channel)) { + throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value); + } +} + +class TracingChannel { + constructor(nameOrChannels) { + if (typeof nameOrChannels === 'string') { + this.start = channel(`tracing:${nameOrChannels}:start`); + this.end = channel(`tracing:${nameOrChannels}:end`); + this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`); + this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`); + this.error = channel(`tracing:${nameOrChannels}:error`); + } else if (typeof nameOrChannels === 'object') { + const { start, end, asyncStart, asyncEnd, error } = nameOrChannels; + + assertChannel(start, 'nameOrChannels.start'); + assertChannel(end, 'nameOrChannels.end'); + assertChannel(asyncStart, 'nameOrChannels.asyncStart'); + assertChannel(asyncEnd, 'nameOrChannels.asyncEnd'); + assertChannel(error, 'nameOrChannels.error'); + + this.start = start; + this.end = end; + this.asyncStart = asyncStart; + this.asyncEnd = asyncEnd; + this.error = error; + } else { + throw new ERR_INVALID_ARG_TYPE('nameOrChannels', + ['string', 'object', 'Channel'], + nameOrChannels); + } + } + + subscribe(handlers) { + for (const name of traceEvents) { + if (!handlers[name]) continue; + + this[name]?.subscribe(handlers[name]); + } + } + + unsubscribe(handlers) { + let done = true; + + for (const name of traceEvents) { + if (!handlers[name]) continue; + + if (!this[name]?.unsubscribe(handlers[name])) { + done = false; + } + } + + return done; + } + + traceSync(fn, context = {}, thisArg, ...args) { + const { start, end, error } = this; + + return start.runStores(context, () => { + try { + const result = ReflectApply(fn, thisArg, args); + context.result = result; + return result; + } catch (err) { + context.error = err; + error.publish(context); + throw err; + } finally { + end.publish(context); + } + }); + } + + tracePromise(fn, context = {}, thisArg, ...args) { + const { start, end, asyncStart, asyncEnd, error } = this; + + function reject(err) { + context.error = err; + error.publish(context); + asyncStart.publish(context); + // TODO: Is there a way to have asyncEnd _after_ the continuation? + asyncEnd.publish(context); + return PromiseReject(err); + } + + function resolve(result) { + context.result = result; + asyncStart.publish(context); + // TODO: Is there a way to have asyncEnd _after_ the continuation? + asyncEnd.publish(context); + return result; + } + + return start.runStores(context, () => { + try { + let promise = ReflectApply(fn, thisArg, args); + // Convert thenables to native promises + if (!(promise instanceof Promise)) { + promise = PromiseResolve(promise); + } + return PromisePrototypeThen(promise, resolve, reject); + } catch (err) { + context.error = err; + error.publish(context); + throw err; + } finally { + end.publish(context); + } + }); + } + + traceCallback(fn, position = -1, context = {}, thisArg, ...args) { + const { start, end, asyncStart, asyncEnd, error } = this; + + function wrappedCallback(err, res) { + if (err) { + context.error = err; + error.publish(context); + } else { + context.result = res; + } + + // Using runStores here enables manual context failure recovery + asyncStart.runStores(context, () => { + try { + if (callback) { + return ReflectApply(callback, this, arguments); + } + } finally { + asyncEnd.publish(context); + } + }); + } + + const callback = args.at(position); + if (typeof callback !== 'function') { + throw new ERR_INVALID_ARG_TYPE('callback', ['function'], callback); + } + ArrayPrototypeSplice(args, position, 1, wrappedCallback); + + return start.runStores(context, () => { + try { + return ReflectApply(fn, thisArg, args); + } catch (err) { + context.error = err; + error.publish(context); + throw err; + } finally { + end.publish(context); + } + }); + } +} + +function tracingChannel(nameOrChannels) { + return new TracingChannel(nameOrChannels); +} + module.exports = { channel, hasSubscribers, subscribe, + tracingChannel, unsubscribe, Channel, }; diff --git a/src/node_util.cc b/src/node_util.cc index f7467caf899d9c..a069fa2aa1665c 100644 --- a/src/node_util.cc +++ b/src/node_util.cc @@ -262,18 +262,13 @@ void WeakReference::Get(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(weak_ref->target_.Get(isolate)); } -void WeakReference::GetRef(const FunctionCallbackInfo& args) { - WeakReference* weak_ref = Unwrap(args.Holder()); - Isolate* isolate = args.GetIsolate(); - args.GetReturnValue().Set( - v8::Number::New(isolate, weak_ref->reference_count_)); -} - void WeakReference::IncRef(const FunctionCallbackInfo& args) { WeakReference* weak_ref = Unwrap(args.Holder()); weak_ref->reference_count_++; if (weak_ref->target_.IsEmpty()) return; if (weak_ref->reference_count_ == 1) weak_ref->target_.ClearWeak(); + args.GetReturnValue().Set( + v8::Number::New(args.GetIsolate(), weak_ref->reference_count_)); } void WeakReference::DecRef(const FunctionCallbackInfo& args) { @@ -282,6 +277,8 @@ void WeakReference::DecRef(const FunctionCallbackInfo& args) { weak_ref->reference_count_--; if (weak_ref->target_.IsEmpty()) return; if (weak_ref->reference_count_ == 0) weak_ref->target_.SetWeak(); + args.GetReturnValue().Set( + v8::Number::New(args.GetIsolate(), weak_ref->reference_count_)); } static void GuessHandleType(const FunctionCallbackInfo& args) { @@ -365,7 +362,6 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(ArrayBufferViewHasBuffer); registry->Register(WeakReference::New); registry->Register(WeakReference::Get); - registry->Register(WeakReference::GetRef); registry->Register(WeakReference::IncRef); registry->Register(WeakReference::DecRef); registry->Register(GuessHandleType); @@ -457,7 +453,6 @@ void Initialize(Local target, WeakReference::kInternalFieldCount); weak_ref->Inherit(BaseObject::GetConstructorTemplate(env)); SetProtoMethod(isolate, weak_ref, "get", WeakReference::Get); - SetProtoMethod(isolate, weak_ref, "getRef", WeakReference::GetRef); SetProtoMethod(isolate, weak_ref, "incRef", WeakReference::IncRef); SetProtoMethod(isolate, weak_ref, "decRef", WeakReference::DecRef); SetConstructorFunction(context, target, "WeakReference", weak_ref); diff --git a/src/node_util.h b/src/node_util.h index fa0faa618a61bc..715686856db879 100644 --- a/src/node_util.h +++ b/src/node_util.h @@ -21,7 +21,6 @@ class WeakReference : public SnapshotableObject { v8::Local target); static void New(const v8::FunctionCallbackInfo& args); static void Get(const v8::FunctionCallbackInfo& args); - static void GetRef(const v8::FunctionCallbackInfo& args); static void IncRef(const v8::FunctionCallbackInfo& args); static void DecRef(const v8::FunctionCallbackInfo& args); diff --git a/test/parallel/test-diagnostics-channel-bind-store.js b/test/parallel/test-diagnostics-channel-bind-store.js new file mode 100644 index 00000000000000..81fb299c2f637c --- /dev/null +++ b/test/parallel/test-diagnostics-channel-bind-store.js @@ -0,0 +1,108 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const dc = require('diagnostics_channel'); +const { AsyncLocalStorage } = require('async_hooks'); + +let n = 0; +const thisArg = new Date(); +const inputs = [ + { foo: 'bar' }, + { baz: 'buz' }, +]; + +const channel = dc.channel('test'); + +// Bind a storage directly to published data +const store1 = new AsyncLocalStorage(); +channel.bindStore(store1); +let store1bound = true; + +// Bind a store with transformation of published data +const store2 = new AsyncLocalStorage(); +channel.bindStore(store2, common.mustCall((data) => { + assert.strictEqual(data, inputs[n]); + return { data }; +}, 4)); + +// Regular subscribers should see publishes from runStores calls +channel.subscribe(common.mustCall((data) => { + if (store1bound) { + assert.deepStrictEqual(data, store1.getStore()); + } + assert.deepStrictEqual({ data }, store2.getStore()); + assert.strictEqual(data, inputs[n]); +}, 4)); + +// Verify stores are empty before run +assert.strictEqual(store1.getStore(), undefined); +assert.strictEqual(store2.getStore(), undefined); + +channel.runStores(inputs[n], common.mustCall(function(a, b) { + // Verify this and argument forwarding + assert.strictEqual(this, thisArg); + assert.strictEqual(a, 1); + assert.strictEqual(b, 2); + + // Verify store 1 state matches input + assert.strictEqual(store1.getStore(), inputs[n]); + + // Verify store 2 state has expected transformation + assert.deepStrictEqual(store2.getStore(), { data: inputs[n] }); + + // Should support nested contexts + n++; + channel.runStores(inputs[n], common.mustCall(function() { + // Verify this and argument forwarding + assert.strictEqual(this, undefined); + + // Verify store 1 state matches input + assert.strictEqual(store1.getStore(), inputs[n]); + + // Verify store 2 state has expected transformation + assert.deepStrictEqual(store2.getStore(), { data: inputs[n] }); + })); + n--; + + // Verify store 1 state matches input + assert.strictEqual(store1.getStore(), inputs[n]); + + // Verify store 2 state has expected transformation + assert.deepStrictEqual(store2.getStore(), { data: inputs[n] }); +}), thisArg, 1, 2); + +// Verify stores are empty after run +assert.strictEqual(store1.getStore(), undefined); +assert.strictEqual(store2.getStore(), undefined); + +// Verify unbinding works +assert.ok(channel.unbindStore(store1)); +store1bound = false; + +// Verify unbinding a store that is not bound returns false +assert.ok(!channel.unbindStore(store1)); + +n++; +channel.runStores(inputs[n], common.mustCall(() => { + // Verify after unbinding store 1 will remain undefined + assert.strictEqual(store1.getStore(), undefined); + + // Verify still bound store 2 receives expected data + assert.deepStrictEqual(store2.getStore(), { data: inputs[n] }); +})); + +// Contain transformer errors and emit on next tick +const fail = new Error('fail'); +channel.bindStore(store1, () => { + throw fail; +}); + +let calledRunStores = false; +process.once('uncaughtException', common.mustCall((err) => { + assert.strictEqual(calledRunStores, true); + assert.strictEqual(err, fail); +})); + +channel.runStores(inputs[n], common.mustCall()); +calledRunStores = true; diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-async-error.js b/test/parallel/test-diagnostics-channel-tracing-channel-async-error.js new file mode 100644 index 00000000000000..7335e15de9dfb5 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-async-error.js @@ -0,0 +1,46 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('test'); + +const expectedError = new Error('test'); +const input = { foo: 'bar' }; +const thisArg = { baz: 'buz' }; + +function check(found) { + assert.deepStrictEqual(found, input); +} + +const handlers = { + start: common.mustCall(check, 2), + end: common.mustCall(check, 2), + asyncStart: common.mustCall(check, 2), + asyncEnd: common.mustCall(check, 2), + error: common.mustCall((found) => { + check(found); + assert.deepStrictEqual(found.error, expectedError); + }, 2) +}; + +channel.subscribe(handlers); + +channel.traceCallback(function(cb, err) { + assert.deepStrictEqual(this, thisArg); + setImmediate(cb, err); +}, 0, input, thisArg, common.mustCall((err, res) => { + assert.strictEqual(err, expectedError); + assert.strictEqual(res, undefined); +}), expectedError); + +channel.tracePromise(function(value) { + assert.deepStrictEqual(this, thisArg); + return Promise.reject(value); +}, input, thisArg, expectedError).then( + common.mustNotCall(), + common.mustCall((value) => { + assert.deepStrictEqual(value, expectedError); + }) +); diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-async.js b/test/parallel/test-diagnostics-channel-tracing-channel-async.js new file mode 100644 index 00000000000000..03bdc85fb14e70 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-async.js @@ -0,0 +1,60 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('test'); + +const expectedResult = { foo: 'bar' }; +const input = { foo: 'bar' }; +const thisArg = { baz: 'buz' }; + +function check(found) { + assert.deepStrictEqual(found, input); +} + +const handlers = { + start: common.mustCall(check, 2), + end: common.mustCall(check, 2), + asyncStart: common.mustCall((found) => { + check(found); + assert.strictEqual(found.error, undefined); + assert.deepStrictEqual(found.result, expectedResult); + }, 2), + asyncEnd: common.mustCall((found) => { + check(found); + assert.strictEqual(found.error, undefined); + assert.deepStrictEqual(found.result, expectedResult); + }, 2), + error: common.mustNotCall() +}; + +channel.subscribe(handlers); + +channel.traceCallback(function(cb, err, res) { + assert.deepStrictEqual(this, thisArg); + setImmediate(cb, err, res); +}, 0, input, thisArg, common.mustCall((err, res) => { + assert.strictEqual(err, null); + assert.deepStrictEqual(res, expectedResult); +}), null, expectedResult); + +channel.tracePromise(function(value) { + assert.deepStrictEqual(this, thisArg); + return Promise.resolve(value); +}, input, thisArg, expectedResult).then( + common.mustCall((value) => { + assert.deepStrictEqual(value, expectedResult); + }), + common.mustNotCall() +); + +let failed = false; +try { + channel.traceCallback(common.mustNotCall(), 0, input, thisArg, 1, 2, 3); +} catch (err) { + assert.ok(/"callback" argument must be of type function/.test(err.message)); + failed = true; +} +assert.strictEqual(failed, true); diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-callback-run-stores.js b/test/parallel/test-diagnostics-channel-tracing-channel-callback-run-stores.js new file mode 100644 index 00000000000000..874433efd2cb4a --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-callback-run-stores.js @@ -0,0 +1,29 @@ +'use strict'; + +const common = require('../common'); +const { AsyncLocalStorage } = require('async_hooks'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('test'); +const store = new AsyncLocalStorage(); + +const firstContext = { foo: 'bar' }; +const secondContext = { baz: 'buz' }; + +channel.start.bindStore(store, common.mustCall(() => { + return firstContext; +})); + +channel.asyncStart.bindStore(store, common.mustCall(() => { + return secondContext; +})); + +assert.strictEqual(store.getStore(), undefined); +channel.traceCallback(common.mustCall((cb) => { + assert.deepStrictEqual(store.getStore(), firstContext); + setImmediate(cb); +}), 0, {}, null, common.mustCall(() => { + assert.deepStrictEqual(store.getStore(), secondContext); +})); +assert.strictEqual(store.getStore(), undefined); diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-promise-run-stores.js b/test/parallel/test-diagnostics-channel-tracing-channel-promise-run-stores.js new file mode 100644 index 00000000000000..bd88e5553359bd --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-promise-run-stores.js @@ -0,0 +1,24 @@ +'use strict'; + +const common = require('../common'); +const { setTimeout } = require('node:timers/promises'); +const { AsyncLocalStorage } = require('async_hooks'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('test'); +const store = new AsyncLocalStorage(); + +const context = { foo: 'bar' }; + +channel.start.bindStore(store, common.mustCall(() => { + return context; +})); + +assert.strictEqual(store.getStore(), undefined); +channel.tracePromise(common.mustCall(async () => { + assert.deepStrictEqual(store.getStore(), context); + await setTimeout(1); + assert.deepStrictEqual(store.getStore(), context); +})); +assert.strictEqual(store.getStore(), undefined); diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-run-stores.js b/test/parallel/test-diagnostics-channel-tracing-channel-run-stores.js new file mode 100644 index 00000000000000..3ffe5e6720c0e4 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-run-stores.js @@ -0,0 +1,21 @@ +'use strict'; + +const common = require('../common'); +const { AsyncLocalStorage } = require('async_hooks'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('test'); +const store = new AsyncLocalStorage(); + +const context = { foo: 'bar' }; + +channel.start.bindStore(store, common.mustCall(() => { + return context; +})); + +assert.strictEqual(store.getStore(), undefined); +channel.traceSync(common.mustCall(() => { + assert.deepStrictEqual(store.getStore(), context); +})); +assert.strictEqual(store.getStore(), undefined); diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-sync-error.js b/test/parallel/test-diagnostics-channel-tracing-channel-sync-error.js new file mode 100644 index 00000000000000..0965bf3fb495f0 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-sync-error.js @@ -0,0 +1,39 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('test'); + +const expectedError = new Error('test'); +const input = { foo: 'bar' }; +const thisArg = { baz: 'buz' }; + +function check(found) { + assert.deepStrictEqual(found, input); +} + +const handlers = { + start: common.mustCall(check), + end: common.mustCall(check), + asyncStart: common.mustNotCall(), + asyncEnd: common.mustNotCall(), + error: common.mustCall((found) => { + check(found); + assert.deepStrictEqual(found.error, expectedError); + }) +}; + +channel.subscribe(handlers); +try { + channel.traceSync(function(err) { + assert.deepStrictEqual(this, thisArg); + assert.strictEqual(err, expectedError); + throw err; + }, input, thisArg, expectedError); + + throw new Error('It should not reach this error'); +} catch (error) { + assert.deepStrictEqual(error, expectedError); +} diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-sync.js b/test/parallel/test-diagnostics-channel-tracing-channel-sync.js new file mode 100644 index 00000000000000..b28b47256b755d --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-sync.js @@ -0,0 +1,46 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('test'); + +const expectedResult = { foo: 'bar' }; +const input = { foo: 'bar' }; +const thisArg = { baz: 'buz' }; +const arg = { baz: 'buz' }; + +function check(found) { + assert.strictEqual(found, input); +} + +const handlers = { + start: common.mustCall(check), + end: common.mustCall((found) => { + check(found); + assert.strictEqual(found.result, expectedResult); + }), + asyncStart: common.mustNotCall(), + asyncEnd: common.mustNotCall(), + error: common.mustNotCall() +}; + +assert.strictEqual(channel.start.hasSubscribers, false); +channel.subscribe(handlers); +assert.strictEqual(channel.start.hasSubscribers, true); +const result1 = channel.traceSync(function(arg1) { + assert.strictEqual(arg1, arg); + assert.strictEqual(this, thisArg); + return expectedResult; +}, input, thisArg, arg); +assert.strictEqual(result1, expectedResult); + +channel.unsubscribe(handlers); +assert.strictEqual(channel.start.hasSubscribers, false); +const result2 = channel.traceSync(function(arg1) { + assert.strictEqual(arg1, arg); + assert.strictEqual(this, thisArg); + return expectedResult; +}, input, thisArg, arg); +assert.strictEqual(result2, expectedResult); diff --git a/tools/doc/type-parser.mjs b/tools/doc/type-parser.mjs index 9aa9d67c7e958e..db31f6ed729b69 100644 --- a/tools/doc/type-parser.mjs +++ b/tools/doc/type-parser.mjs @@ -57,6 +57,8 @@ const customTypesMap = { 'Module Namespace Object': 'https://tc39.github.io/ecma262/#sec-module-namespace-exotic-objects', + 'AsyncLocalStorage': 'async_context.html#class-asynclocalstorage', + 'AsyncHook': 'async_hooks.html#async_hookscreatehookcallbacks', 'AsyncResource': 'async_hooks.html#class-asyncresource', @@ -108,6 +110,7 @@ const customTypesMap = { 'dgram.Socket': 'dgram.html#class-dgramsocket', 'Channel': 'diagnostics_channel.html#class-channel', + 'TracingChannel': 'diagnostics_channel.html#class-tracingchannel', 'Domain': 'domain.html#class-domain',