diff --git a/observability-test/database.ts b/observability-test/database.ts new file mode 100644 index 000000000..bf1536fa6 --- /dev/null +++ b/observability-test/database.ts @@ -0,0 +1,315 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {grpc} from 'google-gax'; +import {google} from '../protos/protos'; +import {Database, Instance, SessionPool, Snapshot, Spanner} from '../src'; +import protobuf = google.spanner.v1; +import * as mock from '../test/mockserver/mockspanner'; +import * as mockInstanceAdmin from '../test/mockserver/mockinstanceadmin'; +import * as mockDatabaseAdmin from '../test/mockserver/mockdatabaseadmin'; +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +// eslint-disable-next-line n/no-extraneous-require +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); + +/** A simple result set for SELECT 1. */ +function createSelect1ResultSet(): protobuf.ResultSet { + const fields = [ + protobuf.StructType.Field.create({ + name: 'NUM', + type: protobuf.Type.create({code: protobuf.TypeCode.INT64}), + }), + ]; + const metadata = new protobuf.ResultSetMetadata({ + rowType: new protobuf.StructType({ + fields, + }), + }); + return protobuf.ResultSet.create({ + metadata, + rows: [{values: [{stringValue: '1'}]}], + }); +} + +interface setupResults { + server: grpc.Server; + spanner: Spanner; + spannerMock: mock.MockSpanner; +} + +async function setup(): Promise { + const server = new grpc.Server(); + + const spannerMock = mock.createMockSpanner(server); + mockInstanceAdmin.createMockInstanceAdmin(server); + mockDatabaseAdmin.createMockDatabaseAdmin(server); + + const port: number = await new Promise((resolve, reject) => { + server.bindAsync( + '0.0.0.0:0', + grpc.ServerCredentials.createInsecure(), + (err, assignedPort) => { + if (err) { + reject(err); + } else { + resolve(assignedPort); + } + } + ); + }); + + const selectSql = 'SELECT 1'; + const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(1) + ); + + const spanner = new Spanner({ + projectId: 'observability-project-id', + servicePath: 'localhost', + port, + sslCreds: grpc.credentials.createInsecure(), + }); + + return Promise.resolve({ + spanner: spanner, + server: server, + spannerMock: spannerMock, + }); +} + +describe('Database', () => { + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; + + after(() => { + spanner.close(); + server.tryShutdown(() => {}); + }); + + before(async () => { + const setupResult = await setup(); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; + + const selectSql = 'SELECT 1'; + const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(1) + ); + + traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const instance = spanner.instance('instance'); + database = instance.database('database'); + database.observabilityConfig = { + tracerProvider: provider, + enableExtendedTracing: false, + }; + }); + + beforeEach(() => { + spannerMock.resetRequests(); + }); + + afterEach(() => { + traceExporter.reset(); + }); + + it('getSessions', async () => { + const [rows] = await database.getSessions(); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + }); + it('getSnapshot', done => { + database.getSnapshot((err, transaction) => { + assert.ifError(err); + + transaction!.run('SELECT 1', (err, rows) => { + assert.ifError(err); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getSnapshot']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + }); + + it('runStream', done => { + database + .runStream('SELECT 1') + .on('data', row => {}) + .on('error', assert.ifError) + .on('end', () => { + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runStream']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + + it('run', async () => { + const [rows] = await database.run('SELECT 1'); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that RunStream is a child span of createQueryPartitions. + const spanRunStream = spans[0]; + const spanRun = spans[1]; + assert.ok( + spanRun.spanContext().traceId, + 'Expected that createQueryPartitions has a defined traceId' + ); + assert.ok( + spanRunStream.spanContext().traceId, + 'Expected that RunStream has a defined traceId' + ); + assert.deepStrictEqual( + spanRunStream.spanContext().traceId, + spanRun.spanContext().traceId, + 'Expected that both spans share a traceId' + ); + assert.ok( + spanRun.spanContext().spanId, + 'Expected that createQueryPartitions has a defined spanId' + ); + assert.ok( + spanRunStream.spanContext().spanId, + 'Expected that RunStream has a defined spanId' + ); + assert.deepStrictEqual( + spanRunStream.parentSpanId, + spanRun.spanContext().spanId, + 'Expected that run is the parent to runStream' + ); + }); + + it('runTransaction', done => { + database.runTransaction((err, transaction) => { + assert.ifError(err); + transaction!.run('SELECT 1', (err, rows) => { + assert.ifError(err); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runTransaction']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + }); +}); diff --git a/src/database.ts b/src/database.ts index 70a650675..4b0df4165 100644 --- a/src/database.ts +++ b/src/database.ts @@ -92,7 +92,7 @@ import { Schema, addLeaderAwareRoutingHeader, } from './common'; -import {Duplex, Readable, Transform} from 'stream'; +import {finished, Duplex, Readable, Transform} from 'stream'; import {PreciseDate} from '@google-cloud/precise-date'; import {EnumKey, RequestConfig, TranslateEnumKeys, Spanner} from '.'; import arrify = require('arrify'); @@ -102,7 +102,13 @@ import Policy = google.iam.v1.Policy; import FieldMask = google.protobuf.FieldMask; import IDatabase = google.spanner.admin.database.v1.IDatabase; import snakeCase = require('lodash.snakecase'); -import {getActiveOrNoopSpan} from './instrument'; +import { + ObservabilityOptions, + getActiveOrNoopSpan, + startTrace, + setSpanError, + setSpanErrorAndException, +} from './instrument'; export type GetDatabaseRolesCallback = RequestCallback< IDatabaseRole, @@ -337,6 +343,7 @@ class Database extends common.GrpcServiceObject { databaseDialect?: EnumKey< typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect > | null; + observabilityConfig: ObservabilityOptions | undefined; constructor( instance: Instance, name: string, @@ -459,6 +466,7 @@ class Database extends common.GrpcServiceObject { Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() ); + this.observabilityConfig = instance.observabilityConfig; } /** * @typedef {array} SetDatabaseMetadataResponse @@ -840,6 +848,7 @@ class Database extends common.GrpcServiceObject { }); }); } + /** * Create a new session. * @@ -967,6 +976,7 @@ class Database extends common.GrpcServiceObject { } ); } + /** * @typedef {array} CreateTableResponse * @property {Table} 0 The new {@link Table}. @@ -1855,32 +1865,39 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetSessionsOptions).pageToken; } - this.request< - google.spanner.v1.ISession, - google.spanner.v1.IListSessionsResponse - >( - { - client: 'SpannerClient', - method: 'listSessions', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, sessions, nextPageRequest, ...args) => { - let sessionInstances: Session[] | null = null; - if (sessions) { - sessionInstances = sessions.map(metadata => { - const session = self.session(metadata.name!); - session.metadata = metadata; - return session; - }); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.getSessions', q, span => { + this.request< + google.spanner.v1.ISession, + google.spanner.v1.IListSessionsResponse + >( + { + client: 'SpannerClient', + method: 'listSessions', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }, + (err, sessions, nextPageRequest, ...args) => { + let sessionInstances: Session[] | null = null; + if (sessions) { + sessionInstances = sessions.map(metadata => { + const session = self.session(metadata.name!); + session.metadata = metadata; + return session; + }); + } + const nextQuery = nextPageRequest! + ? extend({}, options, nextPageRequest!) + : null; + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err, sessionInstances!, nextQuery, ...args); } - const nextQuery = nextPageRequest! - ? extend({}, options, nextPageRequest!) - : null; - callback!(err, sessionInstances!, nextQuery, ...args); - } - ); + ); + }); } /** @@ -2021,43 +2038,51 @@ class Database extends common.GrpcServiceObject { optionsOrCallback?: TimestampBounds | GetSnapshotCallback, cb?: GetSnapshotCallback ): void | Promise<[Snapshot]> { - const callback = - typeof optionsOrCallback === 'function' - ? (optionsOrCallback as GetSnapshotCallback) - : cb; - const options = - typeof optionsOrCallback === 'object' - ? (optionsOrCallback as TimestampBounds) - : {}; - - this.pool_.getSession((err, session) => { - if (err) { - callback!(err as ServiceError); - return; - } - - const snapshot = session!.snapshot(options, this.queryOptions_); - - snapshot.begin(err => { - const span = getActiveOrNoopSpan(); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.getSnapshot', q, span => { + const callback = + typeof optionsOrCallback === 'function' + ? (optionsOrCallback as GetSnapshotCallback) + : cb; + const options = + typeof optionsOrCallback === 'object' + ? (optionsOrCallback as TimestampBounds) + : {}; + + this.pool_.getSession((err, session) => { if (err) { - if (isSessionNotFoundError(err)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - session!.lastError = err; - this.pool_.release(session!); - this.getSnapshot(options, callback!); - } else { - span.addEvent('Using Session', {'session.id': session?.id}); - this.pool_.release(session!); - callback!(err); - } + setSpanError(span, err); + span.end(); + callback!(err as ServiceError); return; } - this._releaseOnEnd(session!, snapshot); - callback!(err, snapshot); + const snapshot = session!.snapshot(options, this.queryOptions_); + + snapshot.begin(err => { + if (err) { + setSpanError(span, err); + if (isSessionNotFoundError(err)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + session!.lastError = err; + this.pool_.release(session!); + this.getSnapshot(options, callback!); + span.end(); + } else { + span.addEvent('Using Session', {'session.id': session?.id}); + this.pool_.release(session!); + span.end(); + callback!(err); + } + return; + } + + this._releaseOnEnd(session!, snapshot); + span.end(); + callback!(err, snapshot); + }); }); }); } @@ -2133,12 +2158,13 @@ class Database extends common.GrpcServiceObject { if (options.excludeTxnFromChangeStreams) { transaction!.excludeTxnFromChangeStreams(); } + + const span = getActiveOrNoopSpan(); + if (!err) { - const span = getActiveOrNoopSpan(); span.addEvent('Using Session', {'session.id': session?.id}); this._releaseOnEnd(session!, transaction!); } else if (isSessionNotFoundError(err as grpc.ServiceError)) { - const span = getActiveOrNoopSpan(); span.addEvent('No session available', { 'session.id': session?.id, }); @@ -2324,6 +2350,8 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetDatabaseRolesOptions).pageSize; delete (gaxOpts as GetDatabaseRolesOptions).pageToken; } + + const q = {opts: this.observabilityConfig}; this.request< IDatabaseRole, databaseAdmin.spanner.admin.database.v1.ListDatabaseRolesResponse @@ -2363,12 +2391,14 @@ class Database extends common.GrpcServiceObject { callback?: PoolRequestCallback ): void | Promise { const pool = this.pool_; + const span = getActiveOrNoopSpan(); + pool.getSession((err, session) => { if (err) { callback!(err as ServiceError, null); return; } - const span = getActiveOrNoopSpan(); + span.addEvent('Using Session', {'session.id': session?.id}); config.reqOpts.session = session!.formattedName_; this.request(config, (err, ...args) => { @@ -2409,9 +2439,9 @@ class Database extends common.GrpcServiceObject { session = null; } } + const span = getActiveOrNoopSpan(); waitForSessionStream.on('reading', () => { pool.getSession((err, session_) => { - const span = getActiveOrNoopSpan(); if (err) { if (isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { @@ -2540,6 +2570,7 @@ class Database extends common.GrpcServiceObject { reqOpts.encryptionConfig = (options as RestoreOptions).encryptionConfig; } + const q = {opts: this.observabilityConfig}; return this.request( { client: 'DatabaseAdminClient', @@ -2727,32 +2758,39 @@ class Database extends common.GrpcServiceObject { optionsOrCallback?: TimestampBounds | RunCallback, cb?: RunCallback ): void | Promise { - let stats: ResultSetStats; - let metadata: ResultSetMetadata; - const rows: Row[] = []; - const callback = - typeof optionsOrCallback === 'function' - ? (optionsOrCallback as RunCallback) - : cb; - const options = - typeof optionsOrCallback === 'object' - ? (optionsOrCallback as TimestampBounds) - : {}; - - this.runStream(query, options) - .on('error', callback!) - .on('response', response => { - if (response.metadata) { - metadata = response.metadata; - } - }) - .on('stats', _stats => (stats = _stats)) - .on('data', row => { - rows.push(row); - }) - .on('end', () => { - callback!(null, rows, stats, metadata); - }); + const q = {sql: query, opts: this.observabilityConfig}; + return startTrace('Database.run', q, span => { + let stats: ResultSetStats; + let metadata: ResultSetMetadata; + const rows: Row[] = []; + const callback = + typeof optionsOrCallback === 'function' + ? (optionsOrCallback as RunCallback) + : cb; + const options = + typeof optionsOrCallback === 'object' + ? (optionsOrCallback as TimestampBounds) + : {}; + + this.runStream(query, options) + .on('error', err => { + setSpanError(span, err); + callback!(err as grpc.ServiceError, rows, stats, metadata); + }) + .on('response', response => { + if (response.metadata) { + metadata = response.metadata; + } + }) + .on('stats', _stats => (stats = _stats)) + .on('data', row => { + rows.push(row); + }) + .on('end', () => { + span.end(); + callback!(null, rows, stats, metadata); + }); + }); } /** * Partitioned DML transactions are used to execute DML statements with a @@ -2952,61 +2990,78 @@ class Database extends common.GrpcServiceObject { query: string | ExecuteSqlRequest, options?: TimestampBounds ): PartialResultStream { - const proxyStream: Transform = through.obj(); - const span = getActiveOrNoopSpan(); + const q = {sql: query, opts: this.observabilityConfig}; + return startTrace('Database.runStream', q, span => { + const proxyStream: Transform = through.obj(); - this.pool_.getSession((err, session) => { - if (err) { - proxyStream.destroy(err); - return; - } + this.pool_.getSession((err, session) => { + if (err) { + setSpanError(span, err); + proxyStream.destroy(err); + span.end(); + return; + } - const span = getActiveOrNoopSpan(); - span.addEvent('Using Session', {'session.id': session?.id}); + span.addEvent('Using Session', {'session.id': session?.id}); - const snapshot = session!.snapshot(options, this.queryOptions_); + const snapshot = session!.snapshot(options, this.queryOptions_); - this._releaseOnEnd(session!, snapshot); + this._releaseOnEnd(session!, snapshot); - let dataReceived = false; - let dataStream = snapshot.runStream(query); - const endListener = () => snapshot.end(); - dataStream - .once('data', () => (dataReceived = true)) - .once('error', err => { - if ( - !dataReceived && - isSessionNotFoundError(err as grpc.ServiceError) - ) { - // If it is a 'Session not found' error and we have not yet received - // any data, we can safely retry the query on a new session. - // Register the error on the session so the pool can discard it. - if (session) { - session.lastError = err as grpc.ServiceError; + let dataReceived = false; + let dataStream = snapshot.runStream(query); + + const endListener = () => { + snapshot.end(); + }; + dataStream + .once('data', () => (dataReceived = true)) + .once('error', err => { + setSpanError(span, err); + + if ( + !dataReceived && + isSessionNotFoundError(err as grpc.ServiceError) + ) { + // If it is a 'Session not found' error and we have not yet received + // any data, we can safely retry the query on a new session. + // Register the error on the session so the pool can discard it. + if (session) { + session.lastError = err as grpc.ServiceError; + } + span.addEvent('No session available', { + 'session.id': session?.id, + }); + // Remove the current data stream from the end user stream. + dataStream.unpipe(proxyStream); + dataStream.removeListener('end', endListener); + dataStream.end(); + snapshot.end(); + // Create a new data stream and add it to the end user stream. + dataStream = this.runStream(query, options); + dataStream.pipe(proxyStream); + } else { + proxyStream.destroy(err); + snapshot.end(); } - span.addEvent('No session available', { - 'session.id': session?.id, - }); - // Remove the current data stream from the end user stream. - dataStream.unpipe(proxyStream); - dataStream.removeListener('end', endListener); - dataStream.end(); - snapshot.end(); - // Create a new data stream and add it to the end user stream. - dataStream = this.runStream(query, options); - dataStream.pipe(proxyStream); - } else { - proxyStream.destroy(err); - snapshot.end(); - } - }) - .on('stats', stats => proxyStream.emit('stats', stats)) - .on('response', response => proxyStream.emit('response', response)) - .once('end', endListener) - .pipe(proxyStream); - }); + }) + .on('stats', stats => proxyStream.emit('stats', stats)) + .on('response', response => proxyStream.emit('response', response)) + .once('end', () => { + endListener(); + }) + .pipe(proxyStream); + }); + + finished(proxyStream, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); - return proxyStream as PartialResultStream; + return proxyStream as PartialResultStream; + }); } /** @@ -3107,58 +3162,75 @@ class Database extends common.GrpcServiceObject { optionsOrRunFn: RunTransactionOptions | RunTransactionCallback, fn?: RunTransactionCallback ): void { - const runFn = - typeof optionsOrRunFn === 'function' - ? (optionsOrRunFn as RunTransactionCallback) - : fn; - const options = - typeof optionsOrRunFn === 'object' && optionsOrRunFn - ? (optionsOrRunFn as RunTransactionOptions) - : {}; - - this.pool_.getSession((err, session?, transaction?) => { - const span = getActiveOrNoopSpan(); - if (err && isSessionNotFoundError(err as grpc.ServiceError)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - this.runTransaction(options, runFn!); - return; - } - if (err) { - runFn!(err as grpc.ServiceError); - return; - } - if (options.optimisticLock) { - transaction!.useOptimisticLock(); - } - if (options.excludeTxnFromChangeStreams) { - transaction!.excludeTxnFromChangeStreams(); - } - - const release = this.pool_.release.bind(this.pool_, session!); - const runner = new TransactionRunner( - session!, - transaction!, - runFn!, - options - ); + const q = {opts: this.observabilityConfig}; + startTrace('Database.runTransaction', q, span => { + const runFn = + typeof optionsOrRunFn === 'function' + ? (optionsOrRunFn as RunTransactionCallback) + : fn; + const options = + typeof optionsOrRunFn === 'object' && optionsOrRunFn + ? (optionsOrRunFn as RunTransactionOptions) + : {}; + + this.pool_.getSession((err, session?, transaction?) => { + if (err) { + setSpanError(span, err); + } - runner.run().then(release, err => { - const span = getActiveOrNoopSpan(); - if (isSessionNotFoundError(err)) { + if (err && isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { 'session.id': session?.id, }); - release(); + span.end(); this.runTransaction(options, runFn!); - } else { - if (!err) { - span.addEvent('Using Session', {'session.id': session!.id}); - } - setImmediate(runFn!, err); - release(); + return; + } + + if (err) { + span.end(); + runFn!(err as grpc.ServiceError); + return; + } + if (options.optimisticLock) { + transaction!.useOptimisticLock(); + } + if (options.excludeTxnFromChangeStreams) { + transaction!.excludeTxnFromChangeStreams(); } + + const release = () => { + span.end(); + this.pool_.release(session!); + }; + + const runner = new TransactionRunner( + session!, + transaction!, + (err, resp) => { + span.end(); + runFn!(err, resp); + }, + options + ); + + runner.run().then(release, err => { + setSpanError(span, err!); + + if (isSessionNotFoundError(err)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + release(); + this.runTransaction(options, runFn!); + } else { + if (!err) { + span.addEvent('Using Session', {'session.id': session!.id}); + } + setImmediate(runFn!, err); + release(); + } + }); }); }); } @@ -3350,15 +3422,15 @@ class Database extends common.GrpcServiceObject { ): NodeJS.ReadableStream { const proxyStream: Transform = through.obj(); + const span = getActiveOrNoopSpan(); + this.pool_.getSession((err, session) => { if (err) { proxyStream.destroy(err); return; } - const span = getActiveOrNoopSpan(); span.addEvent('Using Session', {'session.id': session?.id}); - const gaxOpts = extend(true, {}, options?.gaxOptions); const reqOpts = Object.assign( {} as spannerClient.spanner.v1.BatchWriteRequest, @@ -3390,7 +3462,9 @@ class Database extends common.GrpcServiceObject { if (session) { session.lastError = err as grpc.ServiceError; } - span.addEvent('No session available', {'session.id': session?.id}); + span.addEvent('No session available', { + 'session.id': session?.id, + }); // Remove the current data stream from the end user stream. dataStream.unpipe(proxyStream); dataStream.end(); @@ -3401,7 +3475,9 @@ class Database extends common.GrpcServiceObject { proxyStream.destroy(err); } }) - .once('end', () => this.pool_.release(session!)) + .once('end', () => { + this.pool_.release(session!); + }) .pipe(proxyStream); }); @@ -3474,32 +3550,64 @@ class Database extends common.GrpcServiceObject { optionsOrCallback?: CallOptions | CommitCallback, callback?: CommitCallback ): void | Promise { - const cb = - typeof optionsOrCallback === 'function' - ? (optionsOrCallback as CommitCallback) - : callback; - const options = - typeof optionsOrCallback === 'object' && optionsOrCallback - ? (optionsOrCallback as CallOptions) - : {}; - this.pool_.getSession((err, session?, transaction?) => { - const span = getActiveOrNoopSpan(); - if (err && isSessionNotFoundError(err as grpc.ServiceError)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - this.writeAtLeastOnce(mutations, options, cb!); - return; - } + const proxyStream: Transform = through.obj(); + + this.pool_.getSession((err, session) => { if (err) { - cb!(err as grpc.ServiceError); + proxyStream.destroy(err); return; } + + const span = getActiveOrNoopSpan(); span.addEvent('Using Session', {'session.id': session?.id}); - this._releaseOnEnd(session!, transaction!); - transaction?.setQueuedMutations(mutations.proto()); - return transaction?.commit(options, cb!); + + const gaxOpts = extend(true, {}, options?.gaxOptions); + const reqOpts = Object.assign( + {} as spannerClient.spanner.v1.BatchWriteRequest, + { + session: session!.formattedName_!, + mutationGroups: mutationGroups.map(mg => mg.proto()), + requestOptions: options?.requestOptions, + excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams, + } + ); + let dataReceived = false; + let dataStream = this.requestStream({ + client: 'SpannerClient', + method: 'batchWrite', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }); + dataStream + .once('data', () => (dataReceived = true)) + .once('error', err => { + if ( + !dataReceived && + isSessionNotFoundError(err as grpc.ServiceError) + ) { + // If there's a 'Session not found' error and we have not yet received + // any data, we can safely retry the writes on a new session. + // Register the error on the session so the pool can discard it. + if (session) { + session.lastError = err as grpc.ServiceError; + } + span.addEvent('No session available', {'session.id': session?.id}); + // Remove the current data stream from the end user stream. + dataStream.unpipe(proxyStream); + dataStream.end(); + // Create a new stream and add it to the end user stream. + dataStream = this.batchWriteAtLeastOnce(mutationGroups, options); + dataStream.pipe(proxyStream); + } else { + proxyStream.destroy(err); + } + }) + .once('end', () => this.pool_.release(session!)) + .pipe(proxyStream); }); + + return proxyStream as NodeJS.ReadableStream; } /**