Skip to content

Commit

Permalink
feat: add forceFlush-function to the BasicTracerProvider-class (#…
Browse files Browse the repository at this point in the history
…2191)

Co-authored-by: Weyert de Boer <weyert.deboer@tapico.io>
  • Loading branch information
weyert and tapico-weyert authored May 13, 2021
1 parent 1758fa6 commit 6885278
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 0 deletions.
56 changes: 56 additions & 0 deletions packages/opentelemetry-tracing/src/BasicTracerProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ import { BatchSpanProcessor } from './export/BatchSpanProcessor';
export type PROPAGATOR_FACTORY = () => TextMapPropagator;
export type EXPORTER_FACTORY = () => SpanExporter;

export enum ForceFlushState {
'resolved',
'timeout',
'error',
'unresolved',
}

/**
* This class represents a basic tracer provider which platform libraries can extend
*/
Expand Down Expand Up @@ -140,6 +147,55 @@ export class BasicTracerProvider implements TracerProvider {
}
}

forceFlush(): Promise<void> {
const timeout = this._config.forceFlushTimeoutMillis;
const promises = this._registeredSpanProcessors.map(
(spanProcessor: SpanProcessor) => {
return new Promise(resolve => {
let state: ForceFlushState;
const timeoutInterval = setTimeout(() => {
resolve(
new Error(
`Span processor did not completed within timeout period of ${timeout} ms`
)
);
state = ForceFlushState.timeout;
}, timeout);

spanProcessor
.forceFlush()
.then(() => {
clearTimeout(timeoutInterval);
if (state !== ForceFlushState.timeout) {
state = ForceFlushState.resolved;
resolve(state);
}
})
.catch(error => {
clearTimeout(timeoutInterval);
state = ForceFlushState.error;
resolve(error);
});
});
}
);

return new Promise<void>((resolve, reject) => {
Promise.all(promises)
.then(results => {
const errors = results.filter(
result => result !== ForceFlushState.resolved
);
if (errors.length > 0) {
reject(errors);
} else {
resolve();
}
})
.catch(error => reject([error]));
});
}

shutdown() {
return this.activeSpanProcessor.shutdown();
}
Expand Down
1 change: 1 addition & 0 deletions packages/opentelemetry-tracing/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const FALLBACK_OTEL_TRACES_SAMPLER = TracesSamplerValues.AlwaysOn;
*/
export const DEFAULT_CONFIG = {
sampler: buildSamplerFromEnv(env),
forceFlushTimeoutMillis: 30000,
traceParams: {
numberOfAttributesPerSpan: getEnv().OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
numberOfLinksPerSpan: getEnv().OTEL_SPAN_LINK_COUNT_LIMIT,
Expand Down
6 changes: 6 additions & 0 deletions packages/opentelemetry-tracing/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ export interface TracerConfig {
* The default idGenerator generates random ids
*/
idGenerator?: IdGenerator;

/**
* How long the forceFlush can run before it is cancelled.
* The default value is 30000ms
*/
forceFlushTimeoutMillis?: number;
}

/**
Expand Down
58 changes: 58 additions & 0 deletions packages/opentelemetry-tracing/test/BasicTracerProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,64 @@ describe('BasicTracerProvider', () => {
});
});

describe('.forceFlush()', () => {
it('should call forceFlush on all registered span processors', done => {
sinon.restore();
const forceFlushStub = sinon.stub(
NoopSpanProcessor.prototype,
'forceFlush'
);
forceFlushStub.resolves();

const tracerProvider = new BasicTracerProvider();
const spanProcessorOne = new NoopSpanProcessor();
const spanProcessorTwo = new NoopSpanProcessor();

tracerProvider.addSpanProcessor(spanProcessorOne);
tracerProvider.addSpanProcessor(spanProcessorTwo);

tracerProvider
.forceFlush()
.then(() => {
sinon.restore();
assert(forceFlushStub.calledTwice);
done();
})
.catch(error => {
sinon.restore();
done(error);
});
});

it('should throw error when calling forceFlush on all registered span processors fails', done => {
sinon.restore();

const forceFlushStub = sinon.stub(
NoopSpanProcessor.prototype,
'forceFlush'
);
forceFlushStub.returns(Promise.reject('Error'));

const tracerProvider = new BasicTracerProvider();
const spanProcessorOne = new NoopSpanProcessor();
const spanProcessorTwo = new NoopSpanProcessor();
tracerProvider.addSpanProcessor(spanProcessorOne);
tracerProvider.addSpanProcessor(spanProcessorTwo);

tracerProvider
.forceFlush()
.then(() => {
sinon.restore();
done(new Error('Successful forceFlush not expected'));
})
.catch(_error => {
sinon.restore();
sinon.assert.calledTwice(forceFlushStub);
done();
});
});
});

describe('.bind()', () => {
it('should bind context with NoopContextManager context manager', done => {
const tracer = new BasicTracerProvider().getTracer('default');
Expand Down

0 comments on commit 6885278

Please sign in to comment.