Skip to content

Commit

Permalink
feat(sdk-metrics): add aggregation cardinality limit
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Nov 8, 2024
1 parent 72c9af9 commit 21c4168
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ For semantic convention package changes, see the [semconv CHANGELOG](packages/se

### :boom: Breaking Change

* feat(sdk-metrics): Add support for aggregation cardinality limit with a default limit of 2000. This limit can be customized via views [#5182](/~https://github.com/open-telemetry/opentelemetry-js/pull/5128)

### :rocket: (Enhancement)

### :bug: (Bug Fix)
Expand Down
19 changes: 17 additions & 2 deletions packages/sdk-metrics/src/state/DeltaMetricProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,30 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
// TODO: find a reasonable mean to clean the memo;
// /~https://github.com/open-telemetry/opentelemetry-specification/pull/2208
private _cumulativeMemoStorage = new AttributeHashMap<T>();

constructor(private _aggregator: Aggregator<T>) {}
private _cardinalityLimit: number;
constructor(
private _aggregator: Aggregator<T>,
aggregationCardinalityLimit?: number
) {
this._cardinalityLimit = aggregationCardinalityLimit ?? 2000;
}

record(
value: number,
attributes: Attributes,
_context: Context,
collectionTime: HrTime
) {
if (this._activeCollectionStorage.size >= this._cardinalityLimit) {
const overflowAttributes = { 'otel.metric.overflow': true };
const overflowAccumulation = this._activeCollectionStorage.getOrDefault(
overflowAttributes,
() => this._aggregator.createAccumulation(collectionTime)
);
overflowAccumulation?.record(value);
return;
}

const accumulation = this._activeCollectionStorage.getOrDefault(
attributes,
() => this._aggregator.createAccumulation(collectionTime)
Expand Down
6 changes: 4 additions & 2 deletions packages/sdk-metrics/src/state/MeterSharedState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ export class MeterSharedState {
viewDescriptor,
aggregator,
view.attributesProcessor,
this._meterProviderSharedState.metricCollectors
this._meterProviderSharedState.metricCollectors,
view.aggregationCardinalityLimit
) as R;
this.metricStorageRegistry.register(viewStorage);
return viewStorage;
Expand Down Expand Up @@ -190,6 +191,7 @@ interface MetricStorageConstructor {
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<Maybe<Accumulation>>,
attributesProcessor: AttributesProcessor,
collectors: MetricCollectorHandle[]
collectors: MetricCollectorHandle[],
aggregationCardinalityLimit?: number
): MetricStorage;
}
8 changes: 6 additions & 2 deletions packages/sdk-metrics/src/state/SyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>>
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
collectorHandles: MetricCollectorHandle[]
collectorHandles: MetricCollectorHandle[],
private _aggregationCardinalityLimit?: number
) {
super(instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._deltaMetricStorage = new DeltaMetricProcessor(
aggregator,
this._aggregationCardinalityLimit
);
this._temporalMetricStorage = new TemporalMetricProcessor(
aggregator,
collectorHandles
Expand Down
18 changes: 18 additions & 0 deletions packages/sdk-metrics/src/view/View.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ export type ViewOptions = {
* aggregation: new LastValueAggregation()
*/
aggregation?: Aggregation;
/**
* Alters the metric stream:
* Sets a limit on the number of unique attribute combinations (cardinality) that can be aggregated.
* If not provided, the default limit will be used.
*
* @example <caption>sets the cardinality limit to 1000</caption>
* aggregationCardinalityLimit: 1000
*/
aggregationCardinalityLimit?: number;
/**
* Instrument selection criteria:
* The original type of the Instrument(s).
Expand Down Expand Up @@ -138,6 +147,7 @@ export class View {
readonly attributesProcessor: AttributesProcessor;
readonly instrumentSelector: InstrumentSelector;
readonly meterSelector: MeterSelector;
readonly aggregationCardinalityLimit?: number;

/**
* Create a new {@link View} instance.
Expand Down Expand Up @@ -182,6 +192,13 @@ export class View {
* @param viewOptions.meterSchemaUrl
* Instrument selection criteria:
* The schema URL of the Meter. No wildcard support, schema URL must match exactly.
* @param viewOptions.aggregationCardinalityLimit
* Alters the metric stream:
* Sets a limit on the number of unique attribute combinations (cardinality) that can be aggregated.
* If not provided, the default limit will be used.
*
* @example <caption>sets the cardinality limit to 1000</caption>
* aggregationCardinalityLimit: 1000
*
* @example
* // Create a view that changes the Instrument 'my.instrument' to use to an
Expand Down Expand Up @@ -232,5 +249,6 @@ export class View {
version: viewOptions.meterVersion,
schemaUrl: viewOptions.meterSchemaUrl,
});
this.aggregationCardinalityLimit = viewOptions.aggregationCardinalityLimit;
}
}
88 changes: 88 additions & 0 deletions packages/sdk-metrics/test/MeterProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
DataPointType,
ExplicitBucketHistogramAggregation,
HistogramMetricData,
DataPoint,
} from '../src';
import {
assertScopeMetrics,
Expand Down Expand Up @@ -541,6 +542,93 @@ describe('MeterProvider', () => {
});
});

describe('aggregationCardinalityLimit with view should apply the cardinality limit', () => {
it('should respect the aggregationCardinalityLimit', async () => {
const reader = new TestMetricReader();
const meterProvider = new MeterProvider({
resource: defaultResource,
readers: [reader],
views: [
new View({
instrumentName: 'test-counter',
aggregationCardinalityLimit: 2, // Set cardinality limit to 2
}),
],
});

const meter = meterProvider.getMeter('meter1', 'v1.0.0');
const counter = meter.createCounter('test-counter');

// Add values with different attributes
counter.add(1, { attr1: 'value1' });
counter.add(1, { attr2: 'value2' });
counter.add(1, { attr3: 'value3' });

// Perform collection
const { resourceMetrics, errors } = await reader.collect();

assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);

const metricData = resourceMetrics.scopeMetrics[0].metrics[0];
assert.strictEqual(metricData.dataPoints.length, 3);

// Check if the overflow data point is present
const overflowDataPoint = (
metricData.dataPoints as DataPoint<number>[]
).find((dataPoint: DataPoint<number>) =>
Object.prototype.hasOwnProperty.call(
dataPoint.attributes,
'otel.metric.overflow'
)
);
assert.ok(overflowDataPoint);
assert.strictEqual(overflowDataPoint.value, 1);
});
});

describe('default aggregationCardinalityLimit should apply the cardinality limit', () => {
it('should respect the default aggregationCardinalityLimit', async () => {
const reader = new TestMetricReader();
const meterProvider = new MeterProvider({
resource: defaultResource,
readers: [reader],
});

const meter = meterProvider.getMeter('meter1', 'v1.0.0');
const counter = meter.createCounter('test-counter');

// Add values with different attributes
for (let i = 0; i < 2002; i++) {
const attributes = { [`attr${i}`]: `value${i}` };
counter.add(1, attributes);
}

// Perform collection
const { resourceMetrics, errors } = await reader.collect();

assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);

const metricData = resourceMetrics.scopeMetrics[0].metrics[0];
assert.strictEqual(metricData.dataPoints.length, 2001);

// Check if the overflow data point is present
const overflowDataPoint = (
metricData.dataPoints as DataPoint<number>[]
).find((dataPoint: DataPoint<number>) =>
Object.prototype.hasOwnProperty.call(
dataPoint.attributes,
'otel.metric.overflow'
)
);
assert.ok(overflowDataPoint);
assert.strictEqual(overflowDataPoint.value, 2);
});
});

describe('shutdown', () => {
it('should shutdown all registered metric readers', async () => {
const reader1 = new TestMetricReader();
Expand Down

0 comments on commit 21c4168

Please sign in to comment.