Skip to content

Commit

Permalink
Sepatate Aggregate measure and collect functions
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Jan 1, 2025
1 parent 42b4f2f commit 4813ad3
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 167 deletions.
94 changes: 40 additions & 54 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@ pub(crate) trait Measure<T>: Send + Sync + 'static {
fn call(&self, measurement: T, attrs: &[KeyValue]);
}

impl<F, T> Measure<T> for F
where
F: Fn(T, &[KeyValue]) + Send + Sync + 'static,
{
fn call(&self, measurement: T, attrs: &[KeyValue]) {
self(measurement, attrs)
}
}

/// Stores the aggregate of measurements into the aggregation and returns the number
/// of aggregate data-points output.
pub(crate) trait ComputeAggregation: Send + Sync + 'static {
Expand All @@ -47,15 +38,23 @@ pub(crate) trait ComputeAggregation: Send + Sync + 'static {
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>);
}

impl<T> ComputeAggregation for T
/// Separate `measure` and `collect` functions for an aggregate.
pub(crate) struct AggregateFns<T> {
pub(crate) measure: Arc<dyn Measure<T>>,
pub(crate) collect: Arc<dyn ComputeAggregation>,
}

/// Creates aggregate functions out of aggregate instance
impl<A, T> From<A> for AggregateFns<T>
where
T: Fn(Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>)
+ Send
+ Sync
+ 'static,
A: Measure<T> + ComputeAggregation,
{
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
self(dest)
fn from(value: A) -> Self {
let inst = Arc::new(value);
Self {
measure: inst.clone(),
collect: inst,
}
}
}

Expand Down Expand Up @@ -144,30 +143,18 @@ impl<T: Number> AggregateBuilder<T> {
}

/// Builds a last-value aggregate function input and output.
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
let lv = Arc::new(LastValue::new(self.temporality, self.filter.clone()));
(lv.clone(), lv)
pub(crate) fn last_value(&self) -> AggregateFns<T> {
LastValue::new(self.temporality, self.filter.clone()).into()
}

/// Builds a precomputed sum aggregate function input and output.
pub(crate) fn precomputed_sum(
&self,
monotonic: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(PrecomputedSum::new(
self.temporality,
self.filter.clone(),
monotonic,
));

(s.clone(), s)
pub(crate) fn precomputed_sum(&self, monotonic: bool) -> AggregateFns<T> {
PrecomputedSum::new(self.temporality, self.filter.clone(), monotonic).into()
}

/// Builds a sum aggregate function input and output.
pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(Sum::new(self.temporality, self.filter.clone(), monotonic));

(s.clone(), s)
pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns<T> {
Sum::new(self.temporality, self.filter.clone(), monotonic).into()
}

/// Builds a histogram aggregate function input and output.
Expand All @@ -176,16 +163,15 @@ impl<T: Number> AggregateBuilder<T> {
boundaries: Vec<f64>,
record_min_max: bool,
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(Histogram::new(
) -> AggregateFns<T> {
Histogram::new(
self.temporality,
self.filter.clone(),
boundaries,
record_min_max,
record_sum,
));

(h.clone(), h)
)
.into()
}

/// Builds an exponential histogram aggregate function input and output.
Expand All @@ -195,17 +181,16 @@ impl<T: Number> AggregateBuilder<T> {
max_scale: i8,
record_min_max: bool,
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(ExpoHistogram::new(
) -> AggregateFns<T> {
ExpoHistogram::new(
self.temporality,
self.filter.clone(),
max_size,
max_scale,
record_min_max,
record_sum,
));

(h.clone(), h)
)
.into()
}
}

Expand All @@ -221,7 +206,7 @@ mod tests {

#[test]
fn last_value_aggregation() {
let (measure, agg) =
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value();
let mut a = Gauge {
data_points: vec![GaugeDataPoint {
Expand All @@ -235,7 +220,7 @@ mod tests {
let new_attributes = [KeyValue::new("b", 2)];
measure.call(2, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));
let (count, new_agg) = collect.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
Expand All @@ -247,7 +232,7 @@ mod tests {
#[test]
fn precomputed_sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) =
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
let mut a = Sum {
data_points: vec![
Expand All @@ -274,7 +259,7 @@ mod tests {
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));
let (count, new_agg) = collect.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
Expand All @@ -289,7 +274,8 @@ mod tests {
#[test]
fn sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None).sum(true);
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
Expand All @@ -315,7 +301,7 @@ mod tests {
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));
let (count, new_agg) = collect.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
Expand All @@ -330,7 +316,7 @@ mod tests {
#[test]
fn explicit_bucket_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
.explicit_bucket_histogram(vec![1.0], true, true);
let mut a = Histogram {
data_points: vec![HistogramDataPoint {
Expand All @@ -354,7 +340,7 @@ mod tests {
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));
let (count, new_agg) = collect.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
Expand All @@ -373,7 +359,7 @@ mod tests {
#[test]
fn exponential_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
.exponential_bucket_histogram(4, 20, true, true);
let mut a = ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
Expand Down Expand Up @@ -406,7 +392,7 @@ mod tests {
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));
let (count, new_agg) = collect.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
Expand Down
Loading

0 comments on commit 4813ad3

Please sign in to comment.