From 21c416879ec47b1c952793f4eeee73777816528f Mon Sep 17 00:00:00 2001 From: Povilas Versockas Date: Fri, 8 Nov 2024 15:43:48 +0200 Subject: [PATCH] feat(sdk-metrics): add aggregation cardinality limit --- CHANGELOG.md | 2 + .../src/state/DeltaMetricProcessor.ts | 19 +++- .../sdk-metrics/src/state/MeterSharedState.ts | 6 +- .../src/state/SyncMetricStorage.ts | 8 +- packages/sdk-metrics/src/view/View.ts | 18 ++++ .../sdk-metrics/test/MeterProvider.test.ts | 88 +++++++++++++++++++ 6 files changed, 135 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7fc0c04ae4..a451c457ab8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ For semantic convention package changes, see the [semconv CHANGELOG](packages/se ### :boom: Breaking Change +* feat(sdk-metrics): Add support for aggregation cardinality limit with a default limit of 2000. This limit can be customized via views [#5182](/~https://github.com/open-telemetry/opentelemetry-js/pull/5128) + ### :rocket: (Enhancement) ### :bug: (Bug Fix) diff --git a/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts b/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts index 2764727de24..4c6e6c80bf8 100644 --- a/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts +++ b/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts @@ -31,8 +31,13 @@ export class DeltaMetricProcessor> { // TODO: find a reasonable mean to clean the memo; // /~https://github.com/open-telemetry/opentelemetry-specification/pull/2208 private _cumulativeMemoStorage = new AttributeHashMap(); - - constructor(private _aggregator: Aggregator) {} + private _cardinalityLimit: number; + constructor( + private _aggregator: Aggregator, + aggregationCardinalityLimit?: number + ) { + this._cardinalityLimit = aggregationCardinalityLimit ?? 2000; + } record( value: number, @@ -40,6 +45,16 @@ export class DeltaMetricProcessor> { _context: Context, collectionTime: HrTime ) { + if (this._activeCollectionStorage.size >= this._cardinalityLimit) { + const overflowAttributes = { 'otel.metric.overflow': true }; + const overflowAccumulation = this._activeCollectionStorage.getOrDefault( + overflowAttributes, + () => this._aggregator.createAccumulation(collectionTime) + ); + overflowAccumulation?.record(value); + return; + } + const accumulation = this._activeCollectionStorage.getOrDefault( attributes, () => this._aggregator.createAccumulation(collectionTime) diff --git a/packages/sdk-metrics/src/state/MeterSharedState.ts b/packages/sdk-metrics/src/state/MeterSharedState.ts index 2c0c1a5105b..3737e3c403f 100644 --- a/packages/sdk-metrics/src/state/MeterSharedState.ts +++ b/packages/sdk-metrics/src/state/MeterSharedState.ts @@ -142,7 +142,8 @@ export class MeterSharedState { viewDescriptor, aggregator, view.attributesProcessor, - this._meterProviderSharedState.metricCollectors + this._meterProviderSharedState.metricCollectors, + view.aggregationCardinalityLimit ) as R; this.metricStorageRegistry.register(viewStorage); return viewStorage; @@ -190,6 +191,7 @@ interface MetricStorageConstructor { instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator>, attributesProcessor: AttributesProcessor, - collectors: MetricCollectorHandle[] + collectors: MetricCollectorHandle[], + aggregationCardinalityLimit?: number ): MetricStorage; } diff --git a/packages/sdk-metrics/src/state/SyncMetricStorage.ts b/packages/sdk-metrics/src/state/SyncMetricStorage.ts index 2e97d20d8d1..9d01e263861 100644 --- a/packages/sdk-metrics/src/state/SyncMetricStorage.ts +++ b/packages/sdk-metrics/src/state/SyncMetricStorage.ts @@ -42,10 +42,14 @@ export class SyncMetricStorage> instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator, private _attributesProcessor: AttributesProcessor, - collectorHandles: MetricCollectorHandle[] + collectorHandles: MetricCollectorHandle[], + private _aggregationCardinalityLimit?: number ) { super(instrumentDescriptor); - this._deltaMetricStorage = new DeltaMetricProcessor(aggregator); + this._deltaMetricStorage = new DeltaMetricProcessor( + aggregator, + this._aggregationCardinalityLimit + ); this._temporalMetricStorage = new TemporalMetricProcessor( aggregator, collectorHandles diff --git a/packages/sdk-metrics/src/view/View.ts b/packages/sdk-metrics/src/view/View.ts index 1e8d4fb0e05..97a49c5124d 100644 --- a/packages/sdk-metrics/src/view/View.ts +++ b/packages/sdk-metrics/src/view/View.ts @@ -61,6 +61,15 @@ export type ViewOptions = { * aggregation: new LastValueAggregation() */ aggregation?: Aggregation; + /** + * Alters the metric stream: + * Sets a limit on the number of unique attribute combinations (cardinality) that can be aggregated. + * If not provided, the default limit will be used. + * + * @example sets the cardinality limit to 1000 + * aggregationCardinalityLimit: 1000 + */ + aggregationCardinalityLimit?: number; /** * Instrument selection criteria: * The original type of the Instrument(s). @@ -138,6 +147,7 @@ export class View { readonly attributesProcessor: AttributesProcessor; readonly instrumentSelector: InstrumentSelector; readonly meterSelector: MeterSelector; + readonly aggregationCardinalityLimit?: number; /** * Create a new {@link View} instance. @@ -182,6 +192,13 @@ export class View { * @param viewOptions.meterSchemaUrl * Instrument selection criteria: * The schema URL of the Meter. No wildcard support, schema URL must match exactly. + * @param viewOptions.aggregationCardinalityLimit + * Alters the metric stream: + * Sets a limit on the number of unique attribute combinations (cardinality) that can be aggregated. + * If not provided, the default limit will be used. + * + * @example sets the cardinality limit to 1000 + * aggregationCardinalityLimit: 1000 * * @example * // Create a view that changes the Instrument 'my.instrument' to use to an @@ -232,5 +249,6 @@ export class View { version: viewOptions.meterVersion, schemaUrl: viewOptions.meterSchemaUrl, }); + this.aggregationCardinalityLimit = viewOptions.aggregationCardinalityLimit; } } diff --git a/packages/sdk-metrics/test/MeterProvider.test.ts b/packages/sdk-metrics/test/MeterProvider.test.ts index f06305ad2af..0948a98ac7d 100644 --- a/packages/sdk-metrics/test/MeterProvider.test.ts +++ b/packages/sdk-metrics/test/MeterProvider.test.ts @@ -21,6 +21,7 @@ import { DataPointType, ExplicitBucketHistogramAggregation, HistogramMetricData, + DataPoint, } from '../src'; import { assertScopeMetrics, @@ -541,6 +542,93 @@ describe('MeterProvider', () => { }); }); + describe('aggregationCardinalityLimit with view should apply the cardinality limit', () => { + it('should respect the aggregationCardinalityLimit', async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ + resource: defaultResource, + readers: [reader], + views: [ + new View({ + instrumentName: 'test-counter', + aggregationCardinalityLimit: 2, // Set cardinality limit to 2 + }), + ], + }); + + const meter = meterProvider.getMeter('meter1', 'v1.0.0'); + const counter = meter.createCounter('test-counter'); + + // Add values with different attributes + counter.add(1, { attr1: 'value1' }); + counter.add(1, { attr2: 'value2' }); + counter.add(1, { attr3: 'value3' }); + + // Perform collection + const { resourceMetrics, errors } = await reader.collect(); + + assert.strictEqual(errors.length, 0); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); + + const metricData = resourceMetrics.scopeMetrics[0].metrics[0]; + assert.strictEqual(metricData.dataPoints.length, 3); + + // Check if the overflow data point is present + const overflowDataPoint = ( + metricData.dataPoints as DataPoint[] + ).find((dataPoint: DataPoint) => + Object.prototype.hasOwnProperty.call( + dataPoint.attributes, + 'otel.metric.overflow' + ) + ); + assert.ok(overflowDataPoint); + assert.strictEqual(overflowDataPoint.value, 1); + }); + }); + + describe('default aggregationCardinalityLimit should apply the cardinality limit', () => { + it('should respect the default aggregationCardinalityLimit', async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ + resource: defaultResource, + readers: [reader], + }); + + const meter = meterProvider.getMeter('meter1', 'v1.0.0'); + const counter = meter.createCounter('test-counter'); + + // Add values with different attributes + for (let i = 0; i < 2002; i++) { + const attributes = { [`attr${i}`]: `value${i}` }; + counter.add(1, attributes); + } + + // Perform collection + const { resourceMetrics, errors } = await reader.collect(); + + assert.strictEqual(errors.length, 0); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); + + const metricData = resourceMetrics.scopeMetrics[0].metrics[0]; + assert.strictEqual(metricData.dataPoints.length, 2001); + + // Check if the overflow data point is present + const overflowDataPoint = ( + metricData.dataPoints as DataPoint[] + ).find((dataPoint: DataPoint) => + Object.prototype.hasOwnProperty.call( + dataPoint.attributes, + 'otel.metric.overflow' + ) + ); + assert.ok(overflowDataPoint); + assert.strictEqual(overflowDataPoint.value, 2); + }); + }); + describe('shutdown', () => { it('should shutdown all registered metric readers', async () => { const reader1 = new TestMetricReader();