-
Notifications
You must be signed in to change notification settings - Fork 834
/
Copy pathprometheus.ts
320 lines (289 loc) · 10.3 KB
/
prometheus.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
/*!
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
ExportResult,
NoopLogger,
hrTimeToMilliseconds,
} from '@opentelemetry/core';
import {
CounterSumAggregator,
LastValue,
MetricExporter,
MetricRecord,
MetricDescriptor,
MetricKind,
ObserverAggregator,
Sum,
} from '@opentelemetry/metrics';
import * as api from '@opentelemetry/api';
import { createServer, IncomingMessage, Server, ServerResponse } from 'http';
import { Counter, Gauge, labelValues, Metric, Registry } from 'prom-client';
import * as url from 'url';
import { ExporterConfig } from './export/types';
export class PrometheusExporter implements MetricExporter {
static readonly DEFAULT_OPTIONS = {
port: 9464,
startServer: false,
endpoint: '/metrics',
prefix: '',
};
private readonly _registry = new Registry();
private readonly _logger: api.Logger;
private readonly _port: number;
private readonly _endpoint: string;
private readonly _server: Server;
private readonly _prefix?: string;
private readonly _invalidCharacterRegex = /[^a-z0-9_]/gi;
// This will be required when histogram is implemented. Leaving here so it is not forgotten
// Histogram cannot have a label named 'le'
// private static readonly RESERVED_HISTOGRAM_LABEL = 'le';
/**
* Constructor
* @param config Exporter configuration
* @param callback Callback to be called after a server was started
*/
constructor(config: ExporterConfig = {}, callback?: () => void) {
this._logger = config.logger || new NoopLogger();
this._port = config.port || PrometheusExporter.DEFAULT_OPTIONS.port;
this._prefix = config.prefix || PrometheusExporter.DEFAULT_OPTIONS.prefix;
this._server = createServer(this._requestHandler);
this._endpoint = (
config.endpoint || PrometheusExporter.DEFAULT_OPTIONS.endpoint
).replace(/^([^/])/, '/$1');
if (config.startServer || PrometheusExporter.DEFAULT_OPTIONS.startServer) {
this.startServer(callback);
} else if (callback) {
callback();
}
}
/**
* Saves the current values of all exported {@link MetricRecord}s so that
* they can be pulled by the Prometheus backend.
*
* In its current state, the exporter saves the current values of all metrics
* when export is called and returns them when the export endpoint is called.
* In the future, this should be a no-op and the exporter should reach into
* the metrics when the export endpoint is called. As there is currently no
* interface to do this, this is our only option.
*
* @param records Metrics to be sent to the prometheus backend
* @param cb result callback to be called on finish
*/
export(records: MetricRecord[], cb: (result: ExportResult) => void) {
if (!this._server) {
// It is conceivable that the _server may not be started as it is an async startup
// However unlikely, if this happens the caller may retry the export
cb(ExportResult.FAILED_RETRYABLE);
return;
}
this._logger.debug('Prometheus exporter export');
for (const record of records) {
this._updateMetric(record);
}
cb(ExportResult.SUCCESS);
}
/**
* Shuts down the export server and clears the registry
*
* @param cb called when server is stopped
*/
shutdown(cb?: () => void) {
this._registry.clear();
this.stopServer(cb);
}
/**
* Updates the value of a single metric in the registry
*
* @param record Metric value to be saved
*/
private _updateMetric(record: MetricRecord) {
const metric = this._registerMetric(record);
if (!metric) return;
const labelValues = this._getLabelValues(
record.descriptor.labelKeys,
record.labels
);
const point = record.aggregator.toPoint();
if (metric instanceof Counter) {
// Prometheus counter saves internal state and increments by given value.
// MetricRecord value is the current state, not the delta to be incremented by.
// Currently, _registerMetric creates a new counter every time the value changes,
// so the increment here behaves as a set value (increment from 0)
metric.inc(
labelValues,
point.value as Sum,
hrTimeToMilliseconds(point.timestamp)
);
}
if (metric instanceof Gauge) {
if (record.aggregator instanceof CounterSumAggregator) {
metric.set(labelValues, point.value as Sum);
} else if (record.aggregator instanceof ObserverAggregator) {
metric.set(
labelValues,
point.value as LastValue,
hrTimeToMilliseconds(point.timestamp)
);
}
}
// TODO: only counter and gauge are implemented in metrics so far
}
private _getLabelValues(keys: string[], labels: api.Labels) {
const labelValues: labelValues = {};
for (let i = 0; i < keys.length; i++) {
if (labels[keys[i]] !== null) {
labelValues[keys[i]] = labels[keys[i]];
}
}
return labelValues;
}
private _registerMetric(record: MetricRecord): Metric | undefined {
const metricName = this._getPrometheusMetricName(record.descriptor);
const metric = this._registry.getSingleMetric(metricName);
/**
* Prometheus library does aggregation, which means its inc method must be called with
* the value to be incremented by. It does not have a set method. As our MetricRecord
* contains the current value, not the value to be incremented by, we destroy and
* recreate counters when they are updated.
*
* This works because counters are identified by their name and no other internal ID
* https://prometheus.io/docs/instrumenting/exposition_formats/
*/
if (metric instanceof Counter) {
metric.remove(
...record.descriptor.labelKeys.map(k => record.labels[k].toString())
);
}
if (metric) return metric;
return this._newMetric(record, metricName);
}
private _newMetric(record: MetricRecord, name: string): Metric | undefined {
const metricObject = {
name,
// prom-client throws with empty description which is our default
help: record.descriptor.description || 'description missing',
labelNames: record.descriptor.labelKeys,
// list of registries to register the newly created metric
registers: [this._registry],
};
switch (record.descriptor.metricKind) {
case MetricKind.COUNTER:
// there is no such thing as a non-monotonic counter in prometheus
return record.descriptor.monotonic
? new Counter(metricObject)
: new Gauge(metricObject);
case MetricKind.OBSERVER:
return new Gauge(metricObject);
default:
// Other metric types are currently unimplemented
return undefined;
}
}
private _getPrometheusMetricName(descriptor: MetricDescriptor): string {
return this._sanitizePrometheusMetricName(
this._prefix ? `${this._prefix}_${descriptor.name}` : descriptor.name
);
}
/**
* Ensures metric names are valid Prometheus metric names by removing
* characters allowed by OpenTelemetry but disallowed by Prometheus.
*
* https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
*
* 1. Names must match `[a-zA-Z_:][a-zA-Z0-9_:]*`
*
* 2. Colons are reserved for user defined recording rules.
* They should not be used by exporters or direct instrumentation.
*
* OpenTelemetry metric names are already validated in the Meter when they are created,
* and they match the format `[a-zA-Z][a-zA-Z0-9_.\-]*` which is very close to a valid
* prometheus metric name, so we only need to strip characters valid in OpenTelemetry
* but not valid in prometheus and replace them with '_'.
*
* @param name name to be sanitized
*/
private _sanitizePrometheusMetricName(name: string): string {
return name.replace(this._invalidCharacterRegex, '_'); // replace all invalid characters with '_'
}
/**
* Stops the Prometheus export server
* @param callback A callback that will be executed once the server is stopped
*/
stopServer(callback?: () => void) {
if (!this._server) {
this._logger.debug(
`Prometheus stopServer() was called but server was never started.`
);
if (callback) {
callback();
}
} else {
this._server.close(() => {
this._logger.debug(`Prometheus exporter was stopped`);
if (callback) {
callback();
}
});
}
}
/**
* Starts the Prometheus export server
*
* @param callback called once the server is ready
*/
startServer(callback?: () => void) {
this._server.listen(this._port, () => {
this._logger.debug(
`Prometheus exporter started on port ${this._port} at endpoint ${this._endpoint}`
);
if (callback) {
callback();
}
});
}
/**
* Request handler used by http library to respond to incoming requests
* for the current state of metrics by the Prometheus backend.
*
* @param request Incoming HTTP request to export server
* @param response HTTP response object used to respond to request
*/
private _requestHandler = (
request: IncomingMessage,
response: ServerResponse
) => {
if (url.parse(request.url!).pathname === this._endpoint) {
this._exportMetrics(response);
} else {
this._notFound(response);
}
};
/**
* Responds to incoming message with current state of all metrics.
*/
private _exportMetrics = (response: ServerResponse) => {
response.statusCode = 200;
response.setHeader('content-type', this._registry.contentType);
response.end(this._registry.metrics() || '# no registered metrics');
};
/**
* Responds with 404 status code to all requests that do not match the configured endpoint.
*/
private _notFound = (response: ServerResponse) => {
response.statusCode = 404;
response.end();
};
}