Skip to content

Commit

Permalink
feat(sdk-metrics-base): detect resets on async metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed May 31, 2022
1 parent ba3e320 commit 672368f
Show file tree
Hide file tree
Showing 31 changed files with 685 additions and 283 deletions.
4 changes: 4 additions & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ All notable changes to experimental packages in this project will be documented
* fix(sdk-metrics-base): misbehaving aggregation temporality selector tolerance #2958 @legendecas
* feat(trace-otlp-grpc): configure security with env vars #2827 @svetlanabrennan
* feat(sdk-metrics-base): async instruments callback timeout #2742 @legendecas
* feat(sdk-metrics-base): detect resets on async metrics #2990 @legendecas
* Added monotonicity support in SumAggregator.
* Added reset and gaps detection for async metric instruments.
* Fixed the start time and end time of an exported metric with regarding to resets and gaps.

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ describe('OTLPMetricExporter - node with json over http', () => {
assert.ok(typeof metric3 !== 'undefined', "histogram doesn't exist");
ensureHistogramIsCorrect(
metric3,
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[1].dataPoints[0].endTime),
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[1].dataPoints[0].startTime),
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[2].dataPoints[0].endTime),
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[2].dataPoints[0].startTime),
[0, 100],
[0, 2, 0]
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import * as api from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics';
import { ObservableCallback } from '@opentelemetry/api-metrics';
import { hrTime } from '@opentelemetry/core';
import { InstrumentDescriptor } from './InstrumentDescriptor';
import { ObservableRegistry } from './state/ObservableRegistry';
import { AsyncWritableMetricStorage, WritableMetricStorage } from './state/WritableMetricStorage';
Expand All @@ -31,7 +32,7 @@ export class SyncInstrument {
);
value = Math.trunc(value);
}
this._writableMetricStorage.record(value, attributes, context);
this._writableMetricStorage.record(value, attributes, context, hrTime());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ export class DropAggregator implements Aggregator<undefined> {
_descriptor: InstrumentDescriptor,
_aggregationTemporality: AggregationTemporality,
_accumulationByAttributes: AccumulationRecord<undefined>[],
_startTime: HrTime,
_endTime: HrTime): Maybe<MetricData> {
return undefined;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ function createNewEmptyCheckpoint(boundaries: number[]): Histogram {

export class HistogramAccumulation implements Accumulation {
constructor(
public startTime: HrTime,
private readonly _boundaries: number[],
private _current: Histogram = createNewEmptyCheckpoint(_boundaries)
) {}
Expand All @@ -60,6 +61,10 @@ export class HistogramAccumulation implements Accumulation {
this._current.buckets.counts[this._boundaries.length] += 1;
}

setStartTime(startTime: HrTime): void {
this.startTime = startTime;
}

toPointValue(): Histogram {
return this._current;
}
Expand All @@ -77,8 +82,8 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
*/
constructor(private readonly _boundaries: number[]) {}

createAccumulation() {
return new HistogramAccumulation(this._boundaries);
createAccumulation(startTime: HrTime) {
return new HistogramAccumulation(startTime, this._boundaries);
}

/**
Expand All @@ -98,7 +103,7 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
mergedCounts[idx] = previousCounts[idx] + deltaCounts[idx];
}

return new HistogramAccumulation(previousValue.buckets.boundaries, {
return new HistogramAccumulation(previous.startTime, previousValue.buckets.boundaries, {
buckets: {
boundaries: previousValue.buckets.boundaries,
counts: mergedCounts,
Expand All @@ -123,7 +128,7 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
diffedCounts[idx] = currentCounts[idx] - previousCounts[idx];
}

return new HistogramAccumulation(previousValue.buckets.boundaries, {
return new HistogramAccumulation(current.startTime, previousValue.buckets.boundaries, {
buckets: {
boundaries: previousValue.buckets.boundaries,
counts: diffedCounts,
Expand All @@ -137,7 +142,6 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
descriptor: InstrumentDescriptor,
aggregationTemporality: AggregationTemporality,
accumulationByAttributes: AccumulationRecord<HistogramAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<HistogramMetricData> {
return {
descriptor,
Expand All @@ -146,7 +150,7 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => {
return {
attributes,
startTime,
startTime: accumulation.startTime,
endTime,
value: accumulation.toPointValue(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ import { Maybe } from '../utils';
import { AggregationTemporality } from '../export/AggregationTemporality';

export class LastValueAccumulation implements Accumulation {
constructor(private _current: number = 0, public sampleTime: HrTime = [0, 0]) {}
constructor(public startTime: HrTime, private _current: number = 0, public sampleTime: HrTime = [0, 0]) {}

record(value: number): void {
this._current = value;
this.sampleTime = hrTime();
}

setStartTime(startTime: HrTime): void {
this.startTime = startTime;
}

toPointValue(): LastValue {
return this._current;
}
Expand All @@ -39,8 +43,8 @@ export class LastValueAccumulation implements Accumulation {
export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
public kind: AggregatorKind.LAST_VALUE = AggregatorKind.LAST_VALUE;

createAccumulation() {
return new LastValueAccumulation();
createAccumulation(startTime: HrTime) {
return new LastValueAccumulation(startTime);
}

/**
Expand All @@ -51,7 +55,7 @@ export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
merge(previous: LastValueAccumulation, delta: LastValueAccumulation): LastValueAccumulation {
// nanoseconds may lose precisions.
const latestAccumulation = hrTimeToMicroseconds(delta.sampleTime) >= hrTimeToMicroseconds(previous.sampleTime) ? delta : previous;
return new LastValueAccumulation(latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
return new LastValueAccumulation(previous.startTime, latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
}

/**
Expand All @@ -63,14 +67,13 @@ export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
diff(previous: LastValueAccumulation, current: LastValueAccumulation): LastValueAccumulation {
// nanoseconds may lose precisions.
const latestAccumulation = hrTimeToMicroseconds(current.sampleTime) >= hrTimeToMicroseconds(previous.sampleTime) ? current : previous;
return new LastValueAccumulation(latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
return new LastValueAccumulation(current.startTime, latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
}

toMetricData(
descriptor: InstrumentDescriptor,
aggregationTemporality: AggregationTemporality,
accumulationByAttributes: AccumulationRecord<LastValueAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<SingularMetricData> {
return {
descriptor,
Expand All @@ -79,7 +82,7 @@ export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => {
return {
attributes,
startTime,
startTime: accumulation.startTime,
endTime,
value: accumulation.toPointValue(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@ import { Maybe } from '../utils';
import { AggregationTemporality } from '../export/AggregationTemporality';

export class SumAccumulation implements Accumulation {
constructor(private _current: number = 0) {}
constructor(public startTime: HrTime, public monotonic: boolean, private _current: number = 0, public reset = false) {}

record(value: number): void {
if (this.monotonic && value < 0) {
return;
}
this._current += value;
}

setStartTime(startTime: HrTime): void {
this.startTime = startTime;
}

toPointValue(): Sum {
return this._current;
}
Expand All @@ -37,29 +44,43 @@ export class SumAccumulation implements Accumulation {
export class SumAggregator implements Aggregator<SumAccumulation> {
public kind: AggregatorKind.SUM = AggregatorKind.SUM;

createAccumulation() {
return new SumAccumulation();
constructor (public monotonic: boolean) {}

createAccumulation(startTime: HrTime) {
return new SumAccumulation(startTime, this.monotonic);
}

/**
* Returns the result of the merge of the given accumulations.
*/
merge(previous: SumAccumulation, delta: SumAccumulation): SumAccumulation {
return new SumAccumulation(previous.toPointValue() + delta.toPointValue());
if (delta.reset) {
return new SumAccumulation(delta.startTime, this.monotonic, delta.toPointValue(), delta.reset);
}
return new SumAccumulation(previous.startTime, this.monotonic, previous.toPointValue() + delta.toPointValue());
}

/**
* Returns a new DELTA aggregation by comparing two cumulative measurements.
*/
diff(previous: SumAccumulation, current: SumAccumulation): SumAccumulation {
return new SumAccumulation(current.toPointValue() - previous.toPointValue());
const prevPv = previous.toPointValue();
const currPv = current.toPointValue();
/**
* If the SumAggregator is a monotonic one and the previous point value is
* greater than the current one, a reset is deemed to be happened.
* Return the current point value to prevent the value from been reset.
*/
if (this.monotonic && (prevPv > currPv)) {
return new SumAccumulation(current.startTime, this.monotonic, currPv, true);
}
return new SumAccumulation(current.startTime, this.monotonic, current.toPointValue() - previous.toPointValue());
}

toMetricData(
descriptor: InstrumentDescriptor,
aggregationTemporality: AggregationTemporality,
accumulationByAttributes: AccumulationRecord<SumAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<SingularMetricData> {
return {
descriptor,
Expand All @@ -68,7 +89,7 @@ export class SumAggregator implements Aggregator<SumAccumulation> {
dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => {
return {
attributes,
startTime,
startTime: accumulation.startTime,
endTime,
value: accumulation.toPointValue(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export interface Histogram {
* An Aggregator accumulation state.
*/
export interface Accumulation {
setStartTime(startTime: HrTime): void;
record(value: number): void;
}

Expand All @@ -81,7 +82,7 @@ export interface Aggregator<T> {
/**
* Create a clean state of accumulation.
*/
createAccumulation(): T;
createAccumulation(startTime: HrTime): T;

/**
* Returns the result of the merge of the given accumulations.
Expand Down Expand Up @@ -109,13 +110,11 @@ export interface Aggregator<T> {
*
* @param descriptor the metric instrument descriptor.
* @param accumulationByAttributes the array of attributes and accumulation pairs.
* @param startTime the start time of the metric data.
* @param endTime the end time of the metric data.
* @return the {@link MetricData} that this {@link Aggregator} will produce.
*/
toMetricData(descriptor: InstrumentDescriptor,
aggregationTemporality: AggregationTemporality,
accumulationByAttributes: AccumulationRecord<T>[],
startTime: HrTime,
endTime: HrTime): Maybe<MetricData>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}

record(measurements: AttributeHashMap<number>) {
record(measurements: AttributeHashMap<number>, observationTime: HrTime) {
const processed = new AttributeHashMap<number>();
Array.from(measurements.entries()).forEach(([attributes, value]) => {
processed.set(this._attributesProcessor.process(attributes), value);
});
this._deltaMetricStorage.batchCumulate(processed);
this._deltaMetricStorage.batchCumulate(processed, observationTime);
}

/**
Expand All @@ -64,7 +64,6 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
sdkStartTime: HrTime,
collectionTime: HrTime,
): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();
Expand All @@ -74,7 +73,6 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
collectors,
this._instrumentDescriptor,
accumulations,
sdkStartTime,
collectionTime
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { Context } from '@opentelemetry/api';
import { Context, HrTime } from '@opentelemetry/api';
import { MetricAttributes } from '@opentelemetry/api-metrics';
import { Maybe } from '../utils';
import { Accumulation, Aggregator } from '../aggregator/types';
Expand All @@ -35,31 +35,36 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {

constructor(private _aggregator: Aggregator<T>) {}

/** Bind an efficient storage handle for a set of attributes. */
private bind(attributes: MetricAttributes) {
return this._activeCollectionStorage.getOrDefault(attributes, () => this._aggregator.createAccumulation());
}

record(value: number, attributes: MetricAttributes, _context: Context) {
const accumulation = this.bind(attributes);
record(value: number, attributes: MetricAttributes, _context: Context, collectionTime: HrTime) {
const accumulation = this._activeCollectionStorage.getOrDefault(
attributes,
() => this._aggregator.createAccumulation(collectionTime)
);
accumulation?.record(value);
}

batchCumulate(measurements: AttributeHashMap<number>) {
batchCumulate(measurements: AttributeHashMap<number>, collectionTime: HrTime) {
Array.from(measurements.entries()).forEach(([attributes, value, hashCode]) => {
let accumulation = this._aggregator.createAccumulation();
const accumulation = this._aggregator.createAccumulation(collectionTime);
accumulation?.record(value);
let delta = accumulation;
if (this._cumulativeMemoStorage.has(attributes, hashCode)) {
// previous must present.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const previous = this._cumulativeMemoStorage.get(attributes, hashCode)!;
accumulation = this._aggregator.diff(previous, accumulation);
delta = this._aggregator.diff(previous, accumulation);
}

// Save the current record and the delta record.
this._cumulativeMemoStorage.set(attributes, accumulation, hashCode);
this._activeCollectionStorage.set(attributes, accumulation, hashCode);
this._activeCollectionStorage.set(attributes, delta, hashCode);
});
}

/**
* Returns a collection of delta metrics. Their start time is the when first
* time event collected.
*/
collect() {
const unreportedDelta = this._activeCollectionStorage;
this._activeCollectionStorage = new AttributeHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ export class HashMap<KeyType, ValueType, HashCodeType> {
return this._valueMap.has(hashCode);
}

*keys(): IterableIterator<[KeyType, HashCodeType]> {
const keyIterator = this._keyMap.entries();
let next = keyIterator.next();
while (next.done !== true) {
yield [ next.value[1], next.value[0]];
next = keyIterator.next();
}
}

*entries(): IterableIterator<[KeyType, ValueType, HashCodeType]> {
const valueIterator = this._valueMap.entries();
let next = valueIterator.next();
Expand Down
Loading

0 comments on commit 672368f

Please sign in to comment.