Skip to content

Commit

Permalink
refactor(sdk-metrics-base): meter shared states
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed Mar 8, 2022
1 parent c6dab2a commit e1fd1b8
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 91 deletions.
71 changes: 11 additions & 60 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,27 @@

import * as metrics from '@opentelemetry/api-metrics-wip';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { createInstrumentDescriptor, InstrumentDescriptor, InstrumentType } from './InstrumentDescriptor';
import { createInstrumentDescriptor, InstrumentType } from './InstrumentDescriptor';
import { Counter, Histogram, UpDownCounter } from './Instruments';
import { MeterProviderSharedState } from './state/MeterProviderSharedState';
import { MultiMetricStorage } from './state/MultiWritableMetricStorage';
import { SyncMetricStorage } from './state/SyncMetricStorage';
import { InstrumentationLibraryMetrics } from './export/MetricData';
import { isNotNullish } from './utils';
import { MetricCollectorHandle } from './state/MetricCollector';
import { HrTime } from '@opentelemetry/api';
import { AsyncMetricStorage } from './state/AsyncMetricStorage';
import { WritableMetricStorage } from './state/WritableMetricStorage';
import { MetricStorageRegistry } from './state/MetricStorageRegistry';
import { MeterSharedState } from './state/MeterSharedState';

/**
* This class implements the {@link metrics.Meter} interface.
*/
export class Meter implements metrics.Meter {
private _metricStorageRegistry = new MetricStorageRegistry();
private _meterSharedState: MeterSharedState;

constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {
this._meterProviderSharedState.meters.push(this);
constructor(meterProviderSharedState: MeterProviderSharedState, instrumentationLibrary: InstrumentationLibrary) {
this._meterSharedState = meterProviderSharedState.getMeterSharedState(instrumentationLibrary);
}

/**
* Create a {@link metrics.Histogram} instrument.
*/
createHistogram(name: string, options?: metrics.HistogramOptions): metrics.Histogram {
const descriptor = createInstrumentDescriptor(name, InstrumentType.HISTOGRAM, options);
const storage = this._registerMetricStorage(descriptor);
const storage = this._meterSharedState.registerMetricStorage(descriptor);
return new Histogram(storage, descriptor);
}

Expand All @@ -53,7 +45,7 @@ export class Meter implements metrics.Meter {
*/
createCounter(name: string, options?: metrics.CounterOptions): metrics.Counter {
const descriptor = createInstrumentDescriptor(name, InstrumentType.COUNTER, options);
const storage = this._registerMetricStorage(descriptor);
const storage = this._meterSharedState.registerMetricStorage(descriptor);
return new Counter(storage, descriptor);
}

Expand All @@ -62,7 +54,7 @@ export class Meter implements metrics.Meter {
*/
createUpDownCounter(name: string, options?: metrics.UpDownCounterOptions): metrics.UpDownCounter {
const descriptor = createInstrumentDescriptor(name, InstrumentType.UP_DOWN_COUNTER, options);
const storage = this._registerMetricStorage(descriptor);
const storage = this._meterSharedState.registerMetricStorage(descriptor);
return new UpDownCounter(storage, descriptor);
}

Expand All @@ -75,7 +67,7 @@ export class Meter implements metrics.Meter {
options?: metrics.ObservableGaugeOptions,
): void {
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_GAUGE, options);
this._registerAsyncMetricStorage(descriptor, callback);
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
}

/**
Expand All @@ -87,7 +79,7 @@ export class Meter implements metrics.Meter {
options?: metrics.ObservableCounterOptions,
): void {
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_COUNTER, options);
this._registerAsyncMetricStorage(descriptor, callback);
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
}

/**
Expand All @@ -99,47 +91,6 @@ export class Meter implements metrics.Meter {
options?: metrics.ObservableUpDownCounterOptions,
): void {
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, options);
this._registerAsyncMetricStorage(descriptor, callback);
}

private _registerMetricStorage(descriptor: InstrumentDescriptor): WritableMetricStorage {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
const storages = views.map(view => this._metricStorageRegistry.register(SyncMetricStorage.create(view, descriptor)))
.filter(isNotNullish);

if (storages.length === 1) {
return storages[0];
}

// This will be a no-op WritableMetricStorage when length is null.
return new MultiMetricStorage(storages);
}

private _registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
views.forEach(view => {
this._metricStorageRegistry.register(AsyncMetricStorage.create(view, descriptor, callback));
});
}

/**
* @internal
* @param collector opaque handle of {@link MetricCollector} which initiated the collection.
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<InstrumentationLibraryMetrics> {
const metricData = await Promise.all(this._metricStorageRegistry.getStorages().map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
}));

return {
instrumentationLibrary: this._instrumentationLibrary,
metrics: metricData.filter(isNotNullish),
};
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/

import { HrTime } from '@opentelemetry/api';
import { hrTime } from '@opentelemetry/core';
import { hrTime, InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { Meter } from '../Meter';
import { ViewRegistry } from '../view/ViewRegistry';
import { MeterSharedState } from './MeterSharedState';
import { MetricCollector } from './MetricCollector';

/**
Expand All @@ -30,7 +30,15 @@ export class MeterProviderSharedState {

metricCollectors: MetricCollector[] = [];

meters: Meter[] = [];
meterSharedStates: MeterSharedState[] = [];

constructor(public resource: Resource) {}

getMeterSharedState(instrumentationLibrary: InstrumentationLibrary) {
// TODO: meter identity
// /~https://github.com/open-telemetry/opentelemetry-specification/pull/2317
const meterSharedState = new MeterSharedState(this, instrumentationLibrary);
this.meterSharedStates.push(meterSharedState);
return meterSharedState;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { HrTime } from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics-wip';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { InstrumentationLibraryMetrics } from '../export/MetricData';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { isNotNullish } from '../utils';
import { AsyncMetricStorage } from './AsyncMetricStorage';
import { MeterProviderSharedState } from './MeterProviderSharedState';
import { MetricCollectorHandle } from './MetricCollector';
import { MetricStorageRegistry } from './MetricStorageRegistry';
import { MultiMetricStorage } from './MultiWritableMetricStorage';
import { SyncMetricStorage } from './SyncMetricStorage';

/**
* An internal record for shared meter provider states.
*/
export class MeterSharedState {
private _metricStorageRegistry = new MetricStorageRegistry();

constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {}

registerMetricStorage(descriptor: InstrumentDescriptor) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
const storages = views
.map(view => {
const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor);
const aggregator = view.aggregation.createAggregator(viewDescriptor);
const storage = new SyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor);
return this._metricStorageRegistry.register(storage);
})
.filter(isNotNullish);
if (storages.length === 1) {
return storages[0];
}
return new MultiMetricStorage(storages);
}

registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
views.forEach(view => {
const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor);
const aggregator = view.aggregation.createAggregator(viewDescriptor);
const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor, callback);
this._metricStorageRegistry.register(viewStorage);
});
}

/**
* @param collector opaque handle of {@link MetricCollector} which initiated the collection.
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<InstrumentationLibraryMetrics> {
/**
* 1. Call all observable callbacks first.
* 2. Collect metric result for the collector.
*/
const metricDataList = await Promise.all(Array.from(this._metricStorageRegistry.getStorages())
.map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
})
.filter(isNotNullish));

return {
instrumentationLibrary: this._instrumentationLibrary,
metrics: metricDataList.filter(isNotNullish),
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ export class MetricCollector implements MetricProducer {

async collect(): Promise<ResourceMetrics> {
const collectionTime = hrTime();
const instrumentationLibraryMetrics = (await Promise.all(this._sharedState.meters
.map(meter => meter.collect(this, collectionTime))));
const instrumentationLibraryMetrics = (await Promise.all(this._sharedState.meterSharedStates
.map(meterSharedState => meterSharedState.collect(this, collectionTime))));

return {
resource: this._sharedState.resource,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import { AggregationTemporality, PushMetricExporter, ResourceMetrics } from '../../src';

export class TestMetricExporter implements PushMetricExporter {
resourceMetricsList: ResourceMetrics[] = [];
export(resourceMetrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void {
this.resourceMetricsList.push(resourceMetrics);
process.nextTick(() => resultCallback({ code: ExportResultCode.SUCCESS }));
}

async forceFlush(): Promise<void> {}
async shutdown(): Promise<void> {}

getPreferredAggregationTemporality(): AggregationTemporality {
return AggregationTemporality.CUMULATIVE;
}
}

export class TestDeltaMetricExporter extends TestMetricExporter {
override getPreferredAggregationTemporality(): AggregationTemporality {
return AggregationTemporality.DELTA;
}
}
Loading

0 comments on commit e1fd1b8

Please sign in to comment.