Skip to content

Commit

Permalink
Record nanos instead of millis for bifrost record creation time
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed May 24, 2024
1 parent c768db1 commit a21f9a0
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 13 deletions.
16 changes: 3 additions & 13 deletions crates/types/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};

use crate::flexbuffers_storage_encode_decode;
use crate::identifiers::PartitionId;
use crate::time::MillisSinceEpoch;
use crate::time::NanosSinceEpoch;

pub mod metadata;

Expand Down Expand Up @@ -111,17 +111,13 @@ where

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Header {
created_at: MillisSinceEpoch,
// additional custom headers can be added here. Those should be somewhat
// generic and values must be optional.
pub custom_data_1: Option<u64>,
pub created_at: NanosSinceEpoch,
}

impl Default for Header {
fn default() -> Self {
Self {
created_at: MillisSinceEpoch::now(),
custom_data_1: None,
created_at: NanosSinceEpoch::now(),
}
}
}
Expand Down Expand Up @@ -149,12 +145,6 @@ impl Payload {
}
}

/// Sets the custom data 1 field on the record header
pub fn with_custom_data_1(mut self, value: u64) -> Self {
self.header.custom_data_1 = Some(value);
self
}

pub fn body(&self) -> &Bytes {
&self.body
}
Expand Down
60 changes: 60 additions & 0 deletions crates/types/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,58 @@ impl From<MillisSinceEpoch> for SystemTime {
}
}

/// Nanos since the unix epoch. Used internally to get rough latency measurements across nodes.
/// It's vulnerable to clock skews and sync issues, so use with care. That said, it's fairly
/// accurate when used on the same node. This roughly maps to std::time::Instant except that the
/// value is portable across nodes.
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[serde(transparent)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct NanosSinceEpoch(u64);

impl NanosSinceEpoch {
pub fn now() -> Self {
SystemTime::now().into()
}

pub fn as_u64(&self) -> u64 {
self.0
}

pub fn elapsed(&self) -> Duration {
let now = Self::now();
Duration::from_nanos(now.0 - self.0)
}
}

impl Default for NanosSinceEpoch {
fn default() -> Self {
Self::now()
}
}

impl From<u64> for NanosSinceEpoch {
fn from(value: u64) -> Self {
Self(value)
}
}

impl From<SystemTime> for NanosSinceEpoch {
fn from(value: SystemTime) -> Self {
Self(
u64::try_from(
value
.duration_since(SystemTime::UNIX_EPOCH)
.expect("duration since Unix epoch should be well-defined")
.as_nanos(),
)
.expect("nanos since Unix epoch should fit in u64"),
)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -81,4 +133,12 @@ mod tests {
let t: SystemTime = MillisSinceEpoch::new(u64::MAX).into();
println!("{:?}", t);
}

#[test]
fn nanos_should_not_overflow() {
// it's ~580 years from unix epoch until u64 wouldn't become sufficient to store nanos.
let t = NanosSinceEpoch::now().as_u64();
assert!(t < u64::MAX);
println!("{:?}", t);
}
}

0 comments on commit a21f9a0

Please sign in to comment.