From 68852780eb2b65cc4bf492f020ad48a9af96a4e9 Mon Sep 17 00:00:00 2001 From: Weyert de Boer Date: Thu, 13 May 2021 10:55:53 +0100 Subject: [PATCH] feat: add `forceFlush`-function to the `BasicTracerProvider`-class (#2191) Co-authored-by: Weyert de Boer --- .../src/BasicTracerProvider.ts | 56 ++++++++++++++++++ packages/opentelemetry-tracing/src/config.ts | 1 + packages/opentelemetry-tracing/src/types.ts | 6 ++ .../test/BasicTracerProvider.test.ts | 58 +++++++++++++++++++ 4 files changed, 121 insertions(+) diff --git a/packages/opentelemetry-tracing/src/BasicTracerProvider.ts b/packages/opentelemetry-tracing/src/BasicTracerProvider.ts index 01d6cb8b216..436bf313a75 100644 --- a/packages/opentelemetry-tracing/src/BasicTracerProvider.ts +++ b/packages/opentelemetry-tracing/src/BasicTracerProvider.ts @@ -41,6 +41,13 @@ import { BatchSpanProcessor } from './export/BatchSpanProcessor'; export type PROPAGATOR_FACTORY = () => TextMapPropagator; export type EXPORTER_FACTORY = () => SpanExporter; +export enum ForceFlushState { + 'resolved', + 'timeout', + 'error', + 'unresolved', +} + /** * This class represents a basic tracer provider which platform libraries can extend */ @@ -140,6 +147,55 @@ export class BasicTracerProvider implements TracerProvider { } } + forceFlush(): Promise { + const timeout = this._config.forceFlushTimeoutMillis; + const promises = this._registeredSpanProcessors.map( + (spanProcessor: SpanProcessor) => { + return new Promise(resolve => { + let state: ForceFlushState; + const timeoutInterval = setTimeout(() => { + resolve( + new Error( + `Span processor did not completed within timeout period of ${timeout} ms` + ) + ); + state = ForceFlushState.timeout; + }, timeout); + + spanProcessor + .forceFlush() + .then(() => { + clearTimeout(timeoutInterval); + if (state !== ForceFlushState.timeout) { + state = ForceFlushState.resolved; + resolve(state); + } + }) + .catch(error => { + clearTimeout(timeoutInterval); + state = ForceFlushState.error; + resolve(error); + }); + }); + } + ); + + return new Promise((resolve, reject) => { + Promise.all(promises) + .then(results => { + const errors = results.filter( + result => result !== ForceFlushState.resolved + ); + if (errors.length > 0) { + reject(errors); + } else { + resolve(); + } + }) + .catch(error => reject([error])); + }); + } + shutdown() { return this.activeSpanProcessor.shutdown(); } diff --git a/packages/opentelemetry-tracing/src/config.ts b/packages/opentelemetry-tracing/src/config.ts index 2db3db8adcb..420e84eb86a 100644 --- a/packages/opentelemetry-tracing/src/config.ts +++ b/packages/opentelemetry-tracing/src/config.ts @@ -36,6 +36,7 @@ const FALLBACK_OTEL_TRACES_SAMPLER = TracesSamplerValues.AlwaysOn; */ export const DEFAULT_CONFIG = { sampler: buildSamplerFromEnv(env), + forceFlushTimeoutMillis: 30000, traceParams: { numberOfAttributesPerSpan: getEnv().OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, numberOfLinksPerSpan: getEnv().OTEL_SPAN_LINK_COUNT_LIMIT, diff --git a/packages/opentelemetry-tracing/src/types.ts b/packages/opentelemetry-tracing/src/types.ts index 31561c19154..2633122db82 100644 --- a/packages/opentelemetry-tracing/src/types.ts +++ b/packages/opentelemetry-tracing/src/types.ts @@ -40,6 +40,12 @@ export interface TracerConfig { * The default idGenerator generates random ids */ idGenerator?: IdGenerator; + + /** + * How long the forceFlush can run before it is cancelled. + * The default value is 30000ms + */ + forceFlushTimeoutMillis?: number; } /** diff --git a/packages/opentelemetry-tracing/test/BasicTracerProvider.test.ts b/packages/opentelemetry-tracing/test/BasicTracerProvider.test.ts index 8f2f20269b8..9d60d452644 100644 --- a/packages/opentelemetry-tracing/test/BasicTracerProvider.test.ts +++ b/packages/opentelemetry-tracing/test/BasicTracerProvider.test.ts @@ -441,6 +441,64 @@ describe('BasicTracerProvider', () => { }); }); + describe('.forceFlush()', () => { + it('should call forceFlush on all registered span processors', done => { + sinon.restore(); + const forceFlushStub = sinon.stub( + NoopSpanProcessor.prototype, + 'forceFlush' + ); + forceFlushStub.resolves(); + + const tracerProvider = new BasicTracerProvider(); + const spanProcessorOne = new NoopSpanProcessor(); + const spanProcessorTwo = new NoopSpanProcessor(); + + tracerProvider.addSpanProcessor(spanProcessorOne); + tracerProvider.addSpanProcessor(spanProcessorTwo); + + tracerProvider + .forceFlush() + .then(() => { + sinon.restore(); + assert(forceFlushStub.calledTwice); + done(); + }) + .catch(error => { + sinon.restore(); + done(error); + }); + }); + + it('should throw error when calling forceFlush on all registered span processors fails', done => { + sinon.restore(); + + const forceFlushStub = sinon.stub( + NoopSpanProcessor.prototype, + 'forceFlush' + ); + forceFlushStub.returns(Promise.reject('Error')); + + const tracerProvider = new BasicTracerProvider(); + const spanProcessorOne = new NoopSpanProcessor(); + const spanProcessorTwo = new NoopSpanProcessor(); + tracerProvider.addSpanProcessor(spanProcessorOne); + tracerProvider.addSpanProcessor(spanProcessorTwo); + + tracerProvider + .forceFlush() + .then(() => { + sinon.restore(); + done(new Error('Successful forceFlush not expected')); + }) + .catch(_error => { + sinon.restore(); + sinon.assert.calledTwice(forceFlushStub); + done(); + }); + }); + }); + describe('.bind()', () => { it('should bind context with NoopContextManager context manager', done => { const tracer = new BasicTracerProvider().getTracer('default');