-
Notifications
You must be signed in to change notification settings - Fork 441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(queue): observability for queue #2721
Changes from 18 commits
722fb5d
1b2fd0b
e6dfb85
a75a8dd
1b176c6
66b85c5
54a2c18
2e3fe37
c4cb517
14b2f2e
7f6c674
059d50b
f70cc8a
8f38087
379abf5
3e8e0d8
6169ea3
908ec13
2dab35f
846e3ef
fc733e9
4c199f8
7261e40
677fcc6
51a353e
6742a03
167a7dd
210d36e
0f172ff
23924d3
49b2c36
9b0b3fa
e35233b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,14 @@ | ||
import { EventEmitter } from 'events'; | ||
import { QueueBaseOptions, RedisClient } from '../interfaces'; | ||
import { | ||
QueueBaseOptions, | ||
RedisClient, | ||
Span, | ||
Tracer, | ||
SetSpan, | ||
ContextManager, | ||
Propagation, | ||
Context, | ||
} from '../interfaces'; | ||
import { MinimalQueue } from '../types'; | ||
import { | ||
delay, | ||
|
@@ -11,6 +20,7 @@ import { RedisConnection } from './redis-connection'; | |
import { Job } from './job'; | ||
import { KeysMap, QueueKeys } from './queue-keys'; | ||
import { Scripts } from './scripts'; | ||
import { TelemetryAttributes, SpanKind } from '../enums'; | ||
|
||
/** | ||
* @class QueueBase | ||
|
@@ -30,6 +40,16 @@ export class QueueBase extends EventEmitter implements MinimalQueue { | |
protected connection: RedisConnection; | ||
public readonly qualifiedName: string; | ||
|
||
/** | ||
* Instance of a telemetry client | ||
* To use it create if statement in a method to observe with start and end of a span | ||
* It will check if tracer is provided and if not it will continue as is | ||
*/ | ||
private tracer: Tracer | undefined; | ||
private setSpan: SetSpan | undefined; | ||
protected contextManager: ContextManager | undefined; | ||
protected propagation: Propagation | undefined; | ||
|
||
/** | ||
* | ||
* @param name - The name of the queue. | ||
|
@@ -76,6 +96,27 @@ export class QueueBase extends EventEmitter implements MinimalQueue { | |
this.keys = queueKeys.getKeys(name); | ||
this.toKey = (type: string) => queueKeys.toKey(name, type); | ||
this.setScripts(); | ||
|
||
if (opts?.telemetry) { | ||
this.tracer = opts.telemetry.trace.getTracer(opts.telemetry.tracerName); | ||
manast marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.setSpan = opts.telemetry.trace.setSpan; | ||
this.contextManager = opts.telemetry.contextManager; | ||
this.propagation = opts.telemetry.propagation; | ||
|
||
this.contextManager.getMetadata = (context: Context) => { | ||
const metadata = {}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
this.propagation.inject(context, metadata); | ||
return metadata; | ||
}; | ||
|
||
this.contextManager.fromMetadata = ( | ||
activeContext: Context, | ||
metadata: Record<string, string>, | ||
) => { | ||
const context = this.propagation.extract(activeContext, metadata); | ||
return context; | ||
}; | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -175,4 +216,61 @@ export class QueueBase extends EventEmitter implements MinimalQueue { | |
} | ||
} | ||
} | ||
|
||
/** | ||
* Wraps the code with telemetry and provides span for configuration. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "... provides a span for configuration." |
||
* | ||
* @param spanKind - kind of the span: Producer, Consumer, Internal | ||
* @param getSpanName - name of the span | ||
* @param callback - code to wrap with telemetry | ||
* @param srcPropagationMedatada - | ||
* @returns | ||
*/ | ||
protected async trace<T>( | ||
spanKind: SpanKind, | ||
getSpanName: () => string, | ||
callback: ( | ||
span?: Span, | ||
dstPropagationMetadata?: Record<string, string>, | ||
) => Promise<T> | T, | ||
srcPropagationMetadata?: Record<string, string>, | ||
) { | ||
if (!this.tracer) { | ||
return callback(); | ||
} | ||
|
||
const span = this.tracer.startSpan(getSpanName(), { | ||
kind: spanKind, | ||
}); | ||
|
||
try { | ||
span.setAttributes({ | ||
[TelemetryAttributes.QueueName]: this.name, | ||
}); | ||
|
||
let activeContext = this.contextManager.active(); | ||
if (srcPropagationMetadata) { | ||
activeContext = this.contextManager.fromMetadata( | ||
activeContext, | ||
srcPropagationMetadata, | ||
); | ||
} | ||
|
||
let dstPropagationMetadata: undefined | Record<string, string>; | ||
if (spanKind === SpanKind.PRODUCER) { | ||
dstPropagationMetadata = this.contextManager.getMetadata(activeContext); | ||
} | ||
|
||
const messageContext = this.setSpan(activeContext, span); | ||
|
||
return await this.contextManager.with(messageContext, () => | ||
callback(span, dstPropagationMetadata), | ||
); | ||
} catch (err) { | ||
span.recordException(err as Error); | ||
throw err; | ||
} finally { | ||
span.end(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think this comment is correct anymore as we are using the trace helper.