Skip to content

Commit

Permalink
Unify duration parsing in admin api and ingress (#1547)
Browse files Browse the repository at this point in the history
* Unify duration parsing in admin api and ingress

* Add little test

* Add a few tests more, remove the duration milliseconds parsing

* Fix compilation error from other PR
  • Loading branch information
slinkydeveloper authored May 24, 2024
1 parent b07da55 commit c768db1
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 15 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cli/src/commands/state/clear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub async fn run_clear(State(env): State<CliEnv>, opts: &Clear) -> Result<()> {
}

async fn clear(env: &CliEnv, opts: &Clear) -> Result<()> {
let sql_client = crate::clients::DataFusionHttpClient::new(env)?;
let sql_client = crate::clients::DataFusionHttpClient::new(env).await?;

let (svc, key) = match opts.query.split_once('/') {
None => (opts.query.as_str(), None),
Expand Down
19 changes: 13 additions & 6 deletions crates/admin-rest-model/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;

// Export schema types to be used by other crates without exposing the fact
// that we are using proxying to restate-schema-api or restate-types
Expand Down Expand Up @@ -38,19 +39,25 @@ pub struct ModifyServiceRequest {
///
/// Modify the retention of idempotent requests for this service.
///
/// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format.
#[serde(default, with = "serde_with::As::<Option<serde_with::DisplayFromStr>>")]
/// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format or the ISO8601.
#[serde(
default,
with = "serde_with::As::<Option<restate_serde_util::DurationString>>"
)]
#[cfg_attr(feature = "schema", schemars(with = "Option<String>"))]
pub idempotency_retention: Option<humantime::Duration>,
pub idempotency_retention: Option<Duration>,

/// # Workflow completion retention
///
/// Modify the retention of the workflow completion. This can be modified only for workflow services!
///
/// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format.
#[serde(default, with = "serde_with::As::<Option<serde_with::DisplayFromStr>>")]
/// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format or the ISO8601.
#[serde(
default,
with = "serde_with::As::<Option<restate_serde_util::DurationString>>"
)]
#[cfg_attr(feature = "schema", schemars(with = "Option<String>"))]
pub workflow_completion_retention: Option<humantime::Duration>,
pub workflow_completion_retention: Option<Duration>,
}

#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
Expand Down
4 changes: 2 additions & 2 deletions crates/admin/src/rest_api/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ pub async fn modify_service<V>(
}
if let Some(new_idempotency_retention) = idempotency_retention {
modify_request.push(ModifyServiceChange::IdempotencyRetention(
new_idempotency_retention.into(),
new_idempotency_retention,
));
}
if let Some(new_workflow_completion_retention) = workflow_completion_retention {
modify_request.push(ModifyServiceChange::WorkflowCompletionRetention(
new_workflow_completion_retention.into(),
new_workflow_completion_retention,
));
}

Expand Down
2 changes: 1 addition & 1 deletion crates/ingress-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ options_schema = ["dep:schemars"]
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-ingress-dispatcher = { workspace = true }
restate-serde-util = { workspace = true }
restate-schema-api = { workspace = true, features = ["service", "invocation_target"]}
restate-types = { workspace = true }
restate-service-protocol = { workspace = true, features = [ "awakeable-id" ] }
Expand Down Expand Up @@ -60,7 +61,6 @@ schemars = { workspace = true, optional = true }
thiserror = { workspace = true }
urlencoding = "2.1"
pin-project-lite = "0.2.13"
iso8601 = "0.6.1"
humantime = { workspace = true }

[dev-dependencies]
Expand Down
46 changes: 42 additions & 4 deletions crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use restate_types::invocation::{
Header, InvocationTarget, InvocationTargetType, ServiceInvocation, Source, SpanRelation,
WorkflowHandlerType,
};
use serde::Serialize;
use serde::de::IntoDeserializer;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::time::{Duration, Instant, SystemTime};
use tracing::{info, trace, warn, Instrument};

Expand Down Expand Up @@ -317,6 +319,11 @@ fn parse_headers(headers: HeaderMap) -> Result<Vec<Header>, HandlerError> {
.collect()
}

#[serde_as]
#[derive(Deserialize)]
#[serde(transparent)]
struct DurationQueryParam(#[serde_as(as = "restate_serde_util::DurationString")] Duration);

fn parse_delay(query: Option<&str>) -> Result<Option<Duration>, HandlerError> {
if query.is_none() {
return Ok(None);
Expand All @@ -325,9 +332,11 @@ fn parse_delay(query: Option<&str>) -> Result<Option<Duration>, HandlerError> {
for (k, v) in url::form_urlencoded::parse(query.unwrap().as_bytes()) {
if k.eq_ignore_ascii_case(DELAY_QUERY_PARAM) {
return Ok(Some(
iso8601::duration(v.as_ref())
.map_err(HandlerError::BadDelayDuration)?
.into(),
DurationQueryParam::deserialize(v.as_ref().into_deserializer())
.map_err(|e: serde::de::value::Error| {
HandlerError::BadDelayDuration(e.to_string())
})?
.0,
));
}
if k.eq_ignore_ascii_case(DELAYSEC_QUERY_PARAM) {
Expand All @@ -353,3 +362,32 @@ fn parse_idempotency(headers: &HeaderMap) -> Result<Option<ByteString>, HandlerE

Ok(Some(idempotency_key))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn delay() {
assert_eq!(
parse_delay(Some("delay=PT60S")).unwrap().unwrap(),
Duration::from_secs(60),
);
assert_eq!(
parse_delay(Some("delay=60+sec")).unwrap().unwrap(),
Duration::from_secs(60),
);
assert_eq!(
parse_delay(Some("delay=60sec")).unwrap().unwrap(),
Duration::from_secs(60),
);
assert_eq!(
parse_delay(Some("delay=60ms")).unwrap().unwrap(),
Duration::from_millis(60),
);
assert_eq!(
parse_delay(Some("delay=60000ms")).unwrap().unwrap(),
Duration::from_millis(60000),
);
}
}
2 changes: 2 additions & 0 deletions crates/serde-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ proto = ["dep:prost", "dep:bytes"]
bytes = { workspace = true, optional = true }
bytesize = { version = "1.3.0" }
http = { workspace = true }
humantime = { workspace = true }
iso8601 = "0.6.1"
prost = { workspace = true, optional = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
Expand Down
107 changes: 107 additions & 0 deletions crates/serde-util/src/duration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use serde::de::Error;
use serde::{Deserialize, Deserializer, Serializer};
use serde_with::{DeserializeAs, SerializeAs};

/// Serializable/Deserializable duration to use with serde_with.
///
/// When serializing the humantime format is used.
///
/// When deserializing, the following formats are accepted:
///
/// * ISO8601 durations
/// * Humantime durations
pub struct DurationString;

impl<'de> DeserializeAs<'de, std::time::Duration> for DurationString {
fn deserialize_as<D>(deserializer: D) -> Result<std::time::Duration, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
if s.starts_with('P') {
Ok(iso8601::duration(&s).map_err(Error::custom)?.into())
} else {
humantime::parse_duration(&s).map_err(Error::custom)
}
}
}

impl SerializeAs<std::time::Duration> for DurationString {
fn serialize_as<S>(source: &std::time::Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.collect_str(&humantime::Duration::from(*source))
}
}

#[cfg(test)]
mod tests {
use super::*;

use serde::{Deserialize, Serialize};

use serde_with::serde_as;

#[serde_as]
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
struct MyDuration(#[serde_as(as = "DurationString")] std::time::Duration);

#[test]
fn serialize_humantime() {
let d = std::time::Duration::from_secs(60 * 23);

let result_string =
serde_json::from_str::<String>(&serde_json::to_string(&MyDuration(d)).unwrap())
.unwrap();

assert_eq!(result_string, humantime::Duration::from(d).to_string());
}

#[test]
fn deserialize_iso8601() {
let d = std::time::Duration::from_secs(10);

assert_eq!(
serde_json::from_value::<MyDuration>(serde_json::Value::String("PT10S".to_owned()))
.unwrap()
.0,
d
);
}

#[test]
fn deserialize_humantime() {
let d = std::time::Duration::from_secs(60 * 23);

assert_eq!(
serde_json::from_value::<MyDuration>(serde_json::Value::String(
humantime::Duration::from(d).to_string()
))
.unwrap()
.0,
d
);
}
}
2 changes: 2 additions & 0 deletions crates/serde-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ mod header_map;
mod proto;

pub mod default;
mod duration;
pub mod header_value;

pub use byte_count::*;
pub use duration::DurationString;
pub use header_map::SerdeableHeaderHashMap;
pub use header_value::HeaderValueSerde;
#[cfg(feature = "proto")]
Expand Down

0 comments on commit c768db1

Please sign in to comment.