From 526554329e3ef5e3bb6ea95c5ea14f9fc2518a95 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 12 Jul 2024 04:34:54 +0300 Subject: [PATCH] Trace and copy Promisify* from @google-cloud/promisify --- samples/observability-traces.js | 29 +++++-- src/backup.ts | 2 +- src/batch-transaction.ts | 3 +- src/database.ts | 22 ++++- src/index.ts | 3 +- src/instance-config.ts | 2 +- src/instance.ts | 3 +- src/session.ts | 3 +- src/table.ts | 3 +- src/transaction-runner.ts | 2 +- src/transaction.ts | 3 +- src/v1/instrument.ts | 144 +++++++++++++++++++++++++++++++- 12 files changed, 188 insertions(+), 31 deletions(-) diff --git a/samples/observability-traces.js b/samples/observability-traces.js index 12dc1bdda..11c2849cc 100644 --- a/samples/observability-traces.js +++ b/samples/observability-traces.js @@ -97,12 +97,15 @@ function exportSpans(instanceId, databaseId, projectId) { instanceId, databaseId, () => { - span.end(); - console.log('main span.end'); - setTimeout(() => { - exporter.forceFlush(); - console.log('finished delete and creation of the database'); - }, 18000); + insertUsingDml(tracer, database, () => { + console.log('main span.end'); + span.end(); + setTimeout(() => { + spanner.close(); + exporter.forceFlush(); + console.log('finished delete and creation of the database'); + }, 8000); + }); } ); }); @@ -265,7 +268,7 @@ function runMutations(tracer, database) { }); } -function insertUsingDml(tracer, database) { +function insertUsingDml(tracer, database, callback) { tracer.startActiveSpan('insertUsingDML', span => { database.runTransaction(async (err, transaction) => { if (err) { @@ -293,12 +296,20 @@ function insertUsingDml(tracer, database) { ); await transaction.commit(); - span.end(); } catch (err) { console.error('ERROR:', err); } finally { // Close the database when finished. - setTimeout(() => {}, 8000); + console.log('exiting insertUsingDml'); + tracer.startActiveSpan('timingOutToExport-insertUsingDML', eSpan => { + setTimeout(() => { + eSpan.end(); + span.end(); + if (callback) { + callback(); + } + }, 50); + }); } }); }); diff --git a/src/backup.ts b/src/backup.ts index ba4753bb8..9acf81084 100644 --- a/src/backup.ts +++ b/src/backup.ts @@ -13,7 +13,6 @@ * limitations under the License. */ -import {promisifyAll, callbackifyAll} from '@google-cloud/promisify'; import {Instance} from './instance'; import { IOperation, @@ -33,6 +32,7 @@ import { import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; import {google as databaseAdmin} from '../protos/protos'; import {common as p} from 'protobufjs'; +import {promisifyAll, callbackifyAll} from './v1/instrument'; export type CreateBackupCallback = LongRunningCallback; export type CopyBackupCallback = LongRunningCallback; diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index aab800bd4..9e0db1f76 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -15,7 +15,6 @@ */ import {PreciseDate} from '@google-cloud/precise-date'; -import {promisifyAll} from '@google-cloud/promisify'; import * as extend from 'extend'; import * as is from 'is'; import {Snapshot} from './transaction'; @@ -25,7 +24,7 @@ import { CLOUD_RESOURCE_HEADER, addLeaderAwareRoutingHeader, } from '../src/common'; -import {tracer, SPAN_CODE_ERROR} from './v1/instrument'; +import {promisifyAll, tracer, SPAN_CODE_ERROR} from './v1/instrument'; export interface TransactionIdentifier { session: string | Session; diff --git a/src/database.ts b/src/database.ts index d6f0ce7b0..2ccb72012 100644 --- a/src/database.ts +++ b/src/database.ts @@ -23,7 +23,6 @@ import { } from '@google-cloud/common'; // eslint-disable-next-line @typescript-eslint/no-var-requires const common = require('./common-grpc/service-object'); -import {promisify, promisifyAll} from '@google-cloud/promisify'; import * as extend from 'extend'; import * as r from 'teeny-request'; import * as streamEvents from 'stream-events'; @@ -99,7 +98,13 @@ import Policy = google.iam.v1.Policy; import FieldMask = google.protobuf.FieldMask; import IDatabase = google.spanner.admin.database.v1.IDatabase; import snakeCase = require('lodash.snakecase'); -import {tracer, SPAN_CODE_ERROR, callbackifyAll} from './v1/instrument'; +import { + tracer, + SPAN_CODE_ERROR, + callbackifyAll, + promisify, + promisifyAll, +} from './v1/instrument'; export type GetDatabaseRolesCallback = RequestCallback< IDatabaseRole, @@ -3084,6 +3089,7 @@ class Database extends common.GrpcServiceObject { optionsOrRunFn: RunTransactionOptions | RunTransactionCallback, fn?: RunTransactionCallback ): void { + console.log('database.runTransaction', fn); tracer.startActiveSpan( 'cloud.google.com/nodejs/spanner/Database.runTransaction', span => { @@ -3097,6 +3103,7 @@ class Database extends common.GrpcServiceObject { : {}; this.pool_.getSession((err, session?, transaction?) => { + console.log('getSession', err); if (err) { span.setStatus({ code: SPAN_CODE_ERROR, @@ -3131,12 +3138,19 @@ class Database extends common.GrpcServiceObject { ); runner.run().then(release, err => { + console.log('runner.result', err); + if (err) { + span.setStatus({ + code: SPAN_CODE_ERROR, + message: err.toString(), + }); + } + span.end(); + if (isSessionNotFoundError(err)) { - span.end(); release(); this.runTransaction(options, runFn!); } else { - span.end(); setImmediate(runFn!, err); release(); } diff --git a/src/index.ts b/src/index.ts index fb92293e4..ea7c9f6b1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,7 +17,6 @@ import {GrpcService, GrpcServiceConfig} from './common-grpc/service'; import {PreciseDate} from '@google-cloud/precise-date'; import {replaceProjectIdToken} from '@google-cloud/projectify'; -import {promisifyAll} from '@google-cloud/promisify'; import * as extend from 'extend'; import {GoogleAuth, GoogleAuthOptions} from 'google-auth-library'; import * as path from 'path'; @@ -79,7 +78,7 @@ import { import grpcGcpModule = require('grpc-gcp'); const grpcGcp = grpcGcpModule(grpc); import * as v1 from './v1'; -import {tracer, SPAN_CODE_ERROR} from './v1/instrument'; +import {promisifyAll, tracer, SPAN_CODE_ERROR} from './v1/instrument'; // eslint-disable-next-line @typescript-eslint/no-var-requires const gcpApiConfig = require('./spanner_grpc_config.json'); diff --git a/src/instance-config.ts b/src/instance-config.ts index bc069b5d7..114bde18a 100644 --- a/src/instance-config.ts +++ b/src/instance-config.ts @@ -34,9 +34,9 @@ import { RequestConfig, Spanner, } from './index'; -import {promisifyAll} from '@google-cloud/promisify'; import {CallOptions, grpc} from 'google-gax'; import extend = require('extend'); +import {promisifyAll} from './v1/instrument'; export type IOperation = instanceAdmin.longrunning.IOperation; diff --git a/src/instance.ts b/src/instance.ts index e6c575320..c9e6d4db3 100644 --- a/src/instance.ts +++ b/src/instance.ts @@ -18,7 +18,6 @@ import arrify = require('arrify'); import {ServiceObjectConfig, GetConfig} from '@google-cloud/common'; // eslint-disable-next-line @typescript-eslint/no-var-requires const common = require('./common-grpc/service-object'); -import {promisifyAll} from '@google-cloud/promisify'; import * as extend from 'extend'; import snakeCase = require('lodash.snakecase'); import {Database, SessionPoolConstructor} from './database'; @@ -51,7 +50,7 @@ import {google as instanceAdmin} from '../protos/protos'; import {google as databaseAdmin} from '../protos/protos'; import {google as spannerClient} from '../protos/protos'; import {CreateInstanceRequest} from './index'; -import {tracer, SPAN_CODE_ERROR} from './v1/instrument'; +import {promisifyAll, tracer, SPAN_CODE_ERROR} from './v1/instrument'; export type IBackup = databaseAdmin.spanner.admin.database.v1.IBackup; export type IDatabase = databaseAdmin.spanner.admin.database.v1.IDatabase; diff --git a/src/session.ts b/src/session.ts index c8550a796..70886ed7f 100644 --- a/src/session.ts +++ b/src/session.ts @@ -20,7 +20,6 @@ // eslint-disable-next-line @typescript-eslint/no-var-requires const common = require('./common-grpc/service-object'); -import {promisifyAll} from '@google-cloud/promisify'; import * as extend from 'extend'; import * as r from 'teeny-request'; import { @@ -44,7 +43,7 @@ import { import {grpc, CallOptions} from 'google-gax'; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Spanner} from '.'; -import {tracer, SPAN_CODE_ERROR} from './v1/instrument'; +import {promisifyAll, tracer, SPAN_CODE_ERROR} from './v1/instrument'; export type GetSessionResponse = [Session, r.Response]; diff --git a/src/table.ts b/src/table.ts index d057387bc..b3513f1ae 100644 --- a/src/table.ts +++ b/src/table.ts @@ -14,7 +14,6 @@ * limitations under the License. */ -import {promisifyAll} from '@google-cloud/promisify'; import * as through from 'through2'; import {Operation as GaxOperation, CallOptions} from 'google-gax'; import {Database, UpdateSchemaCallback, UpdateSchemaResponse} from './database'; @@ -31,7 +30,7 @@ import { import {google as databaseAdmin} from '../protos/protos'; import {Schema, LongRunningCallback} from './common'; import IRequestOptions = databaseAdmin.spanner.v1.IRequestOptions; -import {tracer, SPAN_CODE_ERROR} from './v1/instrument'; +import {promisifyAll, tracer, SPAN_CODE_ERROR} from './v1/instrument'; export type Key = string | string[]; diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index 61d979e8c..fe092681d 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -14,7 +14,6 @@ * limitations under the License. */ -import {promisify} from '@google-cloud/promisify'; import {grpc} from 'google-gax'; import {Root} from 'protobufjs'; import * as through from 'through2'; @@ -26,6 +25,7 @@ import {isSessionNotFoundError} from './session-pool'; import {Database} from './database'; import {google} from '../protos/protos'; import IRequestOptions = google.spanner.v1.IRequestOptions; +import {promisify} from './v1/instrument'; // eslint-disable-next-line @typescript-eslint/no-var-requires const jsonProtos = require('../protos/protos.json'); diff --git a/src/transaction.ts b/src/transaction.ts index d5df06fd4..0d0b304c8 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -15,7 +15,6 @@ */ import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; -import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); import Long = require('long'); import {EventEmitter} from 'events'; @@ -45,7 +44,7 @@ import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Database, Spanner} from '.'; import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; -import {tracer, SPAN_CODE_ERROR} from './v1/instrument'; +import {promisifyAll, tracer, SPAN_CODE_ERROR} from './v1/instrument'; export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; diff --git a/src/v1/instrument.ts b/src/v1/instrument.ts index 5c94e5cce..45c4b5265 100644 --- a/src/v1/instrument.ts +++ b/src/v1/instrument.ts @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -import {SpanStatusCode} from '@opentelemetry/api'; +import {Span, SpanStatusCode} from '@opentelemetry/api'; // Ensure that we've registered the gRPC instrumentation. const {GrpcInstrumentation} = require('@opentelemetry/instrumentation-grpc'); @@ -24,6 +24,10 @@ const { CallbackMethod, Class, CallbackifyAllOptions, + PromiseMethod, + PromisifyAllOptions, + PromisifyOptions, + WithPromise, } = require('@google-cloud/promisify'); import {trace} from '@opentelemetry/api'; @@ -66,6 +70,9 @@ export function startTraceExport(exporter) { * @param {object=} options - Callback options. * @param {boolean} options.singular - Pass to the callback a single arg instead of an array. * @return {function} wrapped + * + * This code although modified for OpenTelemetry instrumentation, is copied from + * /~https://github.com/googleapis/nodejs-promisify/blob/main/src/index.ts */ function callbackify(originalMethod: typeof CallbackMethod) { if (originalMethod.callbackified_) { @@ -81,11 +88,12 @@ function callbackify(originalMethod: typeof CallbackMethod) { const cb = Array.prototype.pop.call(arguments); console.log('cb.name', cb.name); + const that = this; tracer.startActiveSpan( - 'cloud.google.com/nodejs/Spanner' + cb.name, + 'cloud.google.com/nodejs/Spanner.' + cb.name + '.callbackify', span => { - originalMethod.apply(this, arguments).then( + originalMethod.apply(that, arguments).then( // tslint:disable-next-line:no-any (res: any) => { res = Array.isArray(res) ? res : [res]; @@ -114,6 +122,9 @@ function callbackify(originalMethod: typeof CallbackMethod) { * * @param {module:common/service} Class - Service class. * @param {object=} options - Configuration object. + * + * This code although modified for OpenTelemetry instrumentation, is copied from + * /~https://github.com/googleapis/nodejs-promisify/blob/main/src/index.ts */ export function callbackifyAll( // tslint:disable-next-line:variable-name @@ -139,3 +150,130 @@ export function callbackifyAll( } }); } + +/** + * Wraps a callback style function to conditionally return a promise. + * + * @param {function} originalMethod - The method to promisify. + * @param {object=} options - Promise options. + * @param {boolean} options.singular - Resolve the promise with single arg instead of an array. + * @return {function} wrapped + * + * This code although modified for OpenTelemetry instrumentation, is copied from + * /~https://github.com/googleapis/nodejs-promisify/blob/main/src/index.ts + */ +export function promisify( + originalMethod: typeof PromiseMethod, + options?: typeof PromisifyOptions +) { + if (originalMethod.promisified_) { + return originalMethod; + } + + options = options || {}; + + const slice = Array.prototype.slice; + + const wrapper: any = function (this: typeof WithPromise) { + const that = this; + + return tracer.startActiveSpan( + 'cloud.google.com/nodejs/Spanner.' + originalMethod.name + '.promisify', + span => { + // tslint:disable-next-line:no-any + let last; + + for (last = arguments.length - 1; last >= 0; last--) { + const arg = arguments[last]; + + if (typeof arg === 'undefined') { + continue; // skip trailing undefined. + } + + if (typeof arg !== 'function') { + break; // non-callback last argument found. + } + + return originalMethod.apply(that, arguments); + } + + // peel trailing undefined. + const args = slice.call(arguments, 0, last + 1); + + // tslint:disable-next-line:variable-name + let PromiseCtor = Promise; + + // Because dedupe will likely create a single install of + // @google-cloud/common to be shared amongst all modules, we need to + // localize it at the Service level. + if (that && that.Promise) { + PromiseCtor = that.Promise; + } + + return new PromiseCtor((resolve, reject) => { + // tslint:disable-next-line:no-any + args.push((...args: any[]) => { + const callbackArgs = slice.call(args); + const err = callbackArgs.shift(); + + if (err) { + span.setStatus({ + code: SPAN_CODE_ERROR, + message: err.toString(), + }); + span.end(); + return reject(err); + } + + span.end(); + if (options!.singular && callbackArgs.length === 1) { + resolve(callbackArgs[0]); + } else { + resolve(callbackArgs); + } + }); + + originalMethod.apply(that, args); + }); + } + ); + }; + + wrapper.promisified_ = true; + return wrapper; +} + +/** + * Promisifies certain Class methods. This will not promisify private or + * streaming methods. + * + * @param {module:common/service} Class - Service class. + * @param {object=} options - Configuration object. + * + * This code although modified for OpenTelemetry instrumentation, is copied from + * /~https://github.com/googleapis/nodejs-promisify/blob/main/src/index.ts + */ +// tslint:disable-next-line:variable-name +export function promisifyAll( + Class: Function, + options?: typeof PromisifyAllOptions +) { + const exclude = (options && options.exclude) || []; + const ownPropertyNames = Object.getOwnPropertyNames(Class.prototype); + const methods = ownPropertyNames.filter(methodName => { + // clang-format off + return ( + !exclude.includes(methodName) && + typeof Class.prototype[methodName] === 'function' && // is it a function? + !/(^_|(Stream|_)|promise$)|^constructor$/.test(methodName) // is it promisable? + ); + // clang-format on + }); + + methods.forEach(methodName => { + const originalMethod = Class.prototype[methodName]; + if (!originalMethod.promisified_) { + Class.prototype[methodName] = exports.promisify(originalMethod, options); + } + }); +}