From ab3c57e5618cad637265772071032a67aac60bc9 Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Wed, 15 May 2024 22:14:46 +0200 Subject: [PATCH] Add Workflow service type and handler type (#1506) * Adds the new service type and handler type, plus reorganizes a bit the internal representation of service types * Implements the run-once semantics of workflow methods * Allows the user to tweak the completion_retention_time of the workflow from the admin api --- cli/src/ui/service_handlers.rs | 3 +- crates/admin/src/rest_api/services.rs | 6 + crates/admin/src/schema_registry/error.rs | 10 +- crates/admin/src/schema_registry/mod.rs | 3 +- crates/admin/src/schema_registry/updater.rs | 86 +++-- crates/ingress-dispatcher/src/dispatcher.rs | 16 +- crates/ingress-dispatcher/src/lib.rs | 21 +- crates/ingress-http/src/handler/error.rs | 7 +- .../ingress-http/src/handler/path_parsing.rs | 16 +- .../src/handler/service_handler.rs | 56 ++-- crates/ingress-http/src/handler/tests.rs | 23 +- crates/ingress-http/src/lib.rs | 13 +- crates/invoker-api/src/entry_enricher.rs | 2 + crates/meta-rest-model/src/services.rs | 10 + .../partition-store/tests/integration_test.rs | 25 +- .../tests/invocation_status_table_test/mod.rs | 8 +- .../tests/journal_table_test/mod.rs | 10 +- crates/schema-api/src/invocation_target.rs | 30 +- crates/schema-api/src/lib.rs | 53 ++- crates/schema/src/service.rs | 4 +- crates/service-protocol/src/codec.rs | 8 +- .../proto/dev/restate/storage/v1/domain.proto | 42 +-- .../src/invocation_status_table/mod.rs | 34 +- crates/storage-api/src/storage.rs | 239 +++++++------- .../src/invocation_status/row.rs | 1 + .../src/journal/tests.rs | 1 + .../src/service/row.rs | 1 + crates/types/src/identifiers.rs | 4 +- crates/types/src/invocation.rs | 187 +++++++---- crates/types/src/journal/enriched.rs | 2 + crates/worker/src/invoker_integration.rs | 84 ++--- .../state_machine/command_interpreter/mod.rs | 231 +++++++------ .../command_interpreter/tests.rs | 6 +- .../state_machine/effect_interpreter.rs | 18 +- .../src/partition/state_machine/effects.rs | 14 + .../worker/src/partition/state_machine/mod.rs | 305 +++++++++++++++--- 36 files changed, 1048 insertions(+), 531 deletions(-) diff --git a/cli/src/ui/service_handlers.rs b/cli/src/ui/service_handlers.rs index 966e6e9c3..c9b16f79b 100644 --- a/cli/src/ui/service_handlers.rs +++ b/cli/src/ui/service_handlers.rs @@ -78,7 +78,8 @@ pub fn create_service_handlers_table_diff( pub fn icon_for_service_type(svc_type: &ServiceType) -> Icon { match svc_type { ServiceType::Service => Icon("", ""), - ServiceType::VirtualObject => Icon("⬅️ 🚶🚶🚶", "keyed"), + ServiceType::VirtualObject => Icon("⬅️ 🚶🚶🚶", "virtual object"), + ServiceType::Workflow => Icon("📝", "workflow"), } } diff --git a/crates/admin/src/rest_api/services.rs b/crates/admin/src/rest_api/services.rs index 99877fc5a..4b250817e 100644 --- a/crates/admin/src/rest_api/services.rs +++ b/crates/admin/src/rest_api/services.rs @@ -87,6 +87,7 @@ pub async fn modify_service( #[request_body(required = true)] Json(ModifyServiceRequest { public, idempotency_retention, + workflow_completion_retention, }): Json, ) -> Result, MetaApiError> { let mut modify_request = vec![]; @@ -98,6 +99,11 @@ pub async fn modify_service( new_idempotency_retention.into(), )); } + if let Some(new_workflow_completion_retention) = workflow_completion_retention { + modify_request.push(ModifyServiceChange::WorkflowCompletionRetention( + new_workflow_completion_retention.into(), + )); + } if modify_request.is_empty() { // No need to do anything diff --git a/crates/admin/src/schema_registry/error.rs b/crates/admin/src/schema_registry/error.rs index a4a46c3b3..708e5a49f 100644 --- a/crates/admin/src/schema_registry/error.rs +++ b/crates/admin/src/schema_registry/error.rs @@ -14,8 +14,10 @@ use http::Uri; use restate_core::metadata_store::ReadModifyWriteError; use restate_core::ShutdownError; use restate_schema_api::invocation_target::BadInputContentType; +use restate_service_protocol::discovery::schema; use restate_types::errors::GenericError; use restate_types::identifiers::DeploymentId; +use restate_types::invocation::ServiceType; #[derive(Debug, thiserror::Error, codederror::CodedError)] pub enum SchemaRegistryError { @@ -87,6 +89,12 @@ pub enum ServiceError { #[error("the handler '{0}' output content-type is not valid: {1}")] #[code(unknown)] BadOutputContentType(String, InvalidHeaderValue), + #[error("invalid combination of service type and handler type '({0}, {1:?})'")] + #[code(unknown)] + BadServiceAndHandlerType(ServiceType, Option), + #[error("modifying retention time for service type {0} is unsupported")] + #[code(unknown)] + CannotModifyRetentionTime(ServiceType), } #[derive(Debug, thiserror::Error, codederror::CodedError)] @@ -107,7 +115,7 @@ pub enum SubscriptionError { InvalidServiceSinkAuthority(Uri), #[error("invalid sink URI '{0}': cannot find service/handler specified in the sink URI.")] SinkServiceNotFound(Uri), - #[error("invalid sink URI '{0}': virtual object shared handlers cannot be used as sinks.")] + #[error("invalid sink URI '{0}': shared handlers cannot be used as sinks.")] InvalidSinkSharedHandler(Uri), #[error(transparent)] diff --git a/crates/admin/src/schema_registry/mod.rs b/crates/admin/src/schema_registry/mod.rs index 9852d577e..e6637d90b 100644 --- a/crates/admin/src/schema_registry/mod.rs +++ b/crates/admin/src/schema_registry/mod.rs @@ -65,6 +65,7 @@ impl ApplyMode { pub enum ModifyServiceChange { Public(bool), IdempotencyRetention(Duration), + WorkflowCompletionRetention(Duration), } /// Responsible for updating the registered schema information. This includes the discovery of @@ -213,7 +214,7 @@ impl SchemaRegistry { .is_some() { let mut updater = SchemaUpdater::from(schema_information); - updater.modify_service(service_name.clone(), changes.clone()); + updater.modify_service(service_name.clone(), changes.clone())?; Ok(updater.into_inner()) } else { Err(SchemaError::NotFound(format!( diff --git a/crates/admin/src/schema_registry/updater.rs b/crates/admin/src/schema_registry/updater.rs index dc76e57c3..ce64cda03 100644 --- a/crates/admin/src/schema_registry/updater.rs +++ b/crates/admin/src/schema_registry/updater.rs @@ -19,14 +19,16 @@ use restate_schema::Schema; use restate_schema_api::deployment::DeploymentMetadata; use restate_schema_api::invocation_target::{ InputRules, InputValidationRule, InvocationTargetMetadata, OutputContentTypeRule, OutputRules, - DEFAULT_IDEMPOTENCY_RETENTION, + DEFAULT_IDEMPOTENCY_RETENTION, DEFAULT_WORKFLOW_COMPLETION_RETENTION, }; use restate_schema_api::subscription::{ EventReceiverServiceType, Sink, Source, Subscription, SubscriptionValidator, }; use restate_service_protocol::discovery::schema; use restate_types::identifiers::{DeploymentId, SubscriptionId}; -use restate_types::invocation::{HandlerType, ServiceType}; +use restate_types::invocation::{ + InvocationTargetType, ServiceType, VirtualObjectHandlerType, WorkflowHandlerType, +}; use serde::{Deserialize, Serialize}; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -127,7 +129,6 @@ impl SchemaUpdater { for (service_name, service) in proposed_services { let service_type = ServiceType::from(service.ty); let handlers = DiscoveredHandlerMetadata::compute_handlers( - service_type, service .handlers .into_iter() @@ -201,6 +202,11 @@ impl SchemaUpdater { public: true, }, idempotency_retention: DEFAULT_IDEMPOTENCY_RETENTION, + workflow_completion_retention: if service_type == ServiceType::Workflow { + Some(DEFAULT_WORKFLOW_COMPLETION_RETENTION) + } else { + None + }, } }; @@ -329,16 +335,19 @@ impl SchemaUpdater { )) })?; - let ty = match (service_schemas.ty, handler_schemas.target_meta.handler_ty) { - (ServiceType::VirtualObject, HandlerType::Exclusive) => { + let ty = match handler_schemas.target_meta.target_ty { + InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) => { + EventReceiverServiceType::Workflow + } + InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) => { EventReceiverServiceType::VirtualObject } - (ServiceType::VirtualObject, HandlerType::Shared) => { + InvocationTargetType::Workflow(_) | InvocationTargetType::VirtualObject(_) => { return Err(SchemaError::Subscription( SubscriptionError::InvalidSinkSharedHandler(sink), )) } - (ServiceType::Service, _) => EventReceiverServiceType::Service, + InvocationTargetType::Service => EventReceiverServiceType::Service, }; Sink::Service { @@ -382,7 +391,11 @@ impl SchemaUpdater { } } - pub fn modify_service(&mut self, name: String, changes: Vec) { + pub fn modify_service( + &mut self, + name: String, + changes: Vec, + ) -> Result<(), SchemaError> { if let Some(schemas) = self.schema_information.services.get_mut(&name) { for command in changes { match command { @@ -398,18 +411,38 @@ impl SchemaUpdater { h.target_meta.idempotency_retention = new_idempotency_retention; } } + ModifyServiceChange::WorkflowCompletionRetention( + new_workflow_completion_retention, + ) => { + if schemas.ty != ServiceType::Workflow { + return Err(SchemaError::Service( + ServiceError::CannotModifyRetentionTime(schemas.ty), + )); + } + schemas.workflow_completion_retention = + Some(new_workflow_completion_retention); + for h in schemas.handlers.values_mut().filter(|w| { + w.target_meta.target_ty + == InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) + }) { + h.target_meta.completion_retention = + Some(new_workflow_completion_retention); + } + } } } } self.modified = true; + + Ok(()) } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] struct DiscoveredHandlerMetadata { name: String, - ty: HandlerType, + ty: InvocationTargetType, input: InputRules, output: OutputRules, } @@ -419,15 +452,27 @@ impl DiscoveredHandlerMetadata { service_type: ServiceType, handler: schema::Handler, ) -> Result { - let handler_type = match handler.ty { - None => HandlerType::default_for_service_type(service_type), - Some(schema::HandlerType::Exclusive) => HandlerType::Exclusive, - Some(schema::HandlerType::Shared) => HandlerType::Shared, + let ty = match (service_type, handler.ty) { + (ServiceType::Service, None | Some(schema::HandlerType::Shared)) => { + InvocationTargetType::Service + } + (ServiceType::VirtualObject, None | Some(schema::HandlerType::Exclusive)) => { + InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) + } + (ServiceType::VirtualObject, Some(schema::HandlerType::Shared)) => { + InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Shared) + } + _ => { + return Err(ServiceError::BadServiceAndHandlerType( + service_type, + handler.ty, + )) + } }; Ok(Self { name: handler.name.to_string(), - ty: handler_type, + ty, input: handler .input .map(|s| DiscoveredHandlerMetadata::input_rules_from_schema(&handler.name, s)) @@ -495,7 +540,6 @@ impl DiscoveredHandlerMetadata { } fn compute_handlers( - service_ty: ServiceType, handlers: Vec, ) -> HashMap { handlers @@ -507,8 +551,14 @@ impl DiscoveredHandlerMetadata { target_meta: InvocationTargetMetadata { public: true, idempotency_retention: DEFAULT_IDEMPOTENCY_RETENTION, - service_ty, - handler_ty: handler.ty, + completion_retention: if handler.ty + == InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) + { + Some(DEFAULT_WORKFLOW_COMPLETION_RETENTION) + } else { + None + }, + target_ty: handler.ty, input_rules: handler.input, output_rules: handler.output, }, @@ -662,7 +712,7 @@ mod tests { updater.modify_service( GREETER_SERVICE_NAME.to_owned(), vec![ModifyServiceChange::Public(false)], - ); + )?; let schemas = updater.into_inner(); assert!(version_before_modification < schemas.version()); diff --git a/crates/ingress-dispatcher/src/dispatcher.rs b/crates/ingress-dispatcher/src/dispatcher.rs index f2b44be30..75a7a2f9b 100644 --- a/crates/ingress-dispatcher/src/dispatcher.rs +++ b/crates/ingress-dispatcher/src/dispatcher.rs @@ -211,7 +211,7 @@ mod tests { use restate_test_util::{let_assert, matchers::*}; use restate_types::identifiers::{IdempotencyId, InvocationId, WithPartitionKey}; use restate_types::invocation::{ - HandlerType, Idempotency, InvocationTarget, ResponseResult, ServiceInvocation, + InvocationTarget, ResponseResult, ServiceInvocation, VirtualObjectHandlerType, }; use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_types::partition_table::{FindPartition, FixedPartitionTable}; @@ -245,7 +245,7 @@ mod tests { "MySvc", "MyKey", "pippo", - HandlerType::Exclusive, + VirtualObjectHandlerType::Exclusive, ); let argument = Bytes::from_static(b"nbfjksdfs"); let idempotency_key = ByteString::from_static("123"); @@ -265,10 +265,8 @@ mod tests { restate_types::invocation::Source::Ingress, ); invocation.argument = argument.clone(); - invocation.idempotency = Some(Idempotency { - key: idempotency_key.clone(), - retention: Duration::from_secs(60), - }); + invocation.idempotency_key = Some(idempotency_key.clone()); + invocation.completion_retention_time = Some(Duration::from_secs(60)); let (ingress_req, _, res) = IngressDispatcherRequest::invocation(invocation); dispatcher.dispatch_ingress_request(ingress_req).await?; @@ -296,10 +294,8 @@ mod tests { invocation_id: eq(invocation_id), invocation_target: eq(invocation_target.clone()), argument: eq(argument.clone()), - idempotency: some(eq(Idempotency { - key: idempotency_key.clone(), - retention: Duration::from_secs(60), - })) + idempotency_key: some(eq(idempotency_key.clone())), + completion_retention_time: some(eq(Duration::from_secs(60))) }) ); diff --git a/crates/ingress-dispatcher/src/lib.rs b/crates/ingress-dispatcher/src/lib.rs index 2db9fb2c3..7891cb81b 100644 --- a/crates/ingress-dispatcher/src/lib.rs +++ b/crates/ingress-dispatcher/src/lib.rs @@ -9,14 +9,15 @@ // by the Apache License, Version 2.0. use bytes::Bytes; +use bytestring::ByteString; use restate_core::metadata; use restate_schema_api::subscription::{EventReceiverServiceType, Sink, Subscription}; use restate_types::identifiers::{ partitioner, IdempotencyId, InvocationId, PartitionKey, WithPartitionKey, }; use restate_types::invocation::{ - HandlerType, Idempotency, InvocationResponse, InvocationTarget, ResponseResult, - ServiceInvocation, ServiceInvocationResponseSink, SpanRelation, + InvocationResponse, InvocationTarget, ResponseResult, ServiceInvocation, + ServiceInvocationResponseSink, SpanRelation, VirtualObjectHandlerType, WorkflowHandlerType, }; use restate_types::message::MessageIndex; use std::fmt::Display; @@ -102,7 +103,7 @@ impl IngressDispatcherRequest { let correlation_id = ingress_correlation_id( &service_invocation.invocation_id, &service_invocation.invocation_target, - service_invocation.idempotency.as_ref(), + service_invocation.idempotency_key.as_ref(), ); let my_node_id = metadata().my_node_id(); @@ -168,7 +169,15 @@ impl IngressDispatcherRequest { .map_err(|e| anyhow::anyhow!("The key must be valid UTF-8: {e}"))? .to_owned(), &**handler, - HandlerType::Exclusive, + VirtualObjectHandlerType::Exclusive, + ), + EventReceiverServiceType::Workflow => InvocationTarget::workflow( + &**name, + std::str::from_utf8(&key) + .map_err(|e| anyhow::anyhow!("The key must be valid UTF-8: {e}"))? + .to_owned(), + &**handler, + WorkflowHandlerType::Workflow, ), EventReceiverServiceType::Service => { InvocationTarget::service(&**name, &**handler) @@ -211,13 +220,13 @@ impl IngressDispatcherRequest { pub fn ingress_correlation_id( id: &InvocationId, invocation_target: &InvocationTarget, - idempotency: Option<&Idempotency>, + idempotency: Option<&ByteString>, ) -> IngressCorrelationId { if let Some(idempotency) = idempotency { IngressCorrelationId::IdempotencyId(IdempotencyId::combine( *id, invocation_target, - idempotency.key.clone(), + idempotency.clone(), )) } else { IngressCorrelationId::InvocationId(*id) diff --git a/crates/ingress-http/src/handler/error.rs b/crates/ingress-http/src/handler/error.rs index 003ffef12..d5e975bee 100644 --- a/crates/ingress-http/src/handler/error.rs +++ b/crates/ingress-http/src/handler/error.rs @@ -55,6 +55,10 @@ pub(crate) enum HandlerError { "cannot use the delay query parameter with calls. The delay is supported only with sends" )] UnsupportedDelay, + #[error( + "cannot use the idempotency key with workflow handlers. The handler invocation will already be idempotent by the workflow key itself." + )] + UnsupportedIdempotencyKey, #[error("bad awakeable id '{0}': {1}")] BadAwakeableId(String, IdDecodeError), } @@ -89,7 +93,8 @@ impl HandlerError { | HandlerError::UnsupportedDelay | HandlerError::BadHeader(_, _) | HandlerError::BadAwakeableId(_, _) - | HandlerError::InputValidation(_) => StatusCode::BAD_REQUEST, + | HandlerError::InputValidation(_) + | HandlerError::UnsupportedIdempotencyKey => StatusCode::BAD_REQUEST, HandlerError::Body(_) => StatusCode::INTERNAL_SERVER_ERROR, HandlerError::Unavailable => StatusCode::SERVICE_UNAVAILABLE, HandlerError::MethodNotAllowed => StatusCode::METHOD_NOT_ALLOWED, diff --git a/crates/ingress-http/src/handler/path_parsing.rs b/crates/ingress-http/src/handler/path_parsing.rs index e01e10150..37c031b15 100644 --- a/crates/ingress-http/src/handler/path_parsing.rs +++ b/crates/ingress-http/src/handler/path_parsing.rs @@ -13,7 +13,6 @@ use super::HandlerError; use http::Uri; use restate_schema_api::service::ServiceMetadataResolver; -use restate_types::invocation::ServiceType; use std::collections::VecDeque; pub(crate) enum AwakeableRequestType { @@ -42,8 +41,8 @@ impl AwakeableRequestType { } pub(crate) enum TargetType { - Service, - VirtualObject { key: String }, + Unkeyed, + Keyed { key: String }, } pub(crate) enum InvokeType { @@ -68,19 +67,20 @@ impl ServiceRequestType { Schemas: ServiceMetadataResolver + Clone + Send + Sync + 'static, { // We need to query the service type before continuing to parse - let ct = schemas + let service_type = schemas .resolve_latest_service_type(&service_name) .ok_or(HandlerError::NotFound)?; - let target_type = match ct { - ServiceType::Service => TargetType::Service, - ServiceType::VirtualObject => TargetType::VirtualObject { + let target_type = if service_type.is_keyed() { + TargetType::Keyed { key: urlencoding::decode( path_parts.pop_front().ok_or(HandlerError::BadServicePath)?, ) .map_err(HandlerError::UrlDecodingError)? .into_owned(), - }, + } + } else { + TargetType::Unkeyed }; let handler = path_parts diff --git a/crates/ingress-http/src/handler/service_handler.rs b/crates/ingress-http/src/handler/service_handler.rs index 59b5dc5aa..8577e8ac6 100644 --- a/crates/ingress-http/src/handler/service_handler.rs +++ b/crates/ingress-http/src/handler/service_handler.rs @@ -23,7 +23,8 @@ use restate_ingress_dispatcher::{DispatchIngressRequest, IngressDispatcherReques use restate_schema_api::invocation_target::{InvocationTargetMetadata, InvocationTargetResolver}; use restate_types::identifiers::InvocationId; use restate_types::invocation::{ - Header, Idempotency, InvocationTarget, ResponseResult, ServiceInvocation, Source, SpanRelation, + Header, InvocationTarget, InvocationTargetType, ResponseResult, ServiceInvocation, Source, + SpanRelation, WorkflowHandlerType, }; use serde::Serialize; use std::time::{Duration, Instant, SystemTime}; @@ -82,25 +83,40 @@ where }; // Check if Idempotency-Key is available - let idempotency = - parse_idempotency(req.headers(), invocation_target_meta.idempotency_retention)?; + let idempotency_key = parse_idempotency(req.headers())?; + if idempotency_key.is_some() + && invocation_target_meta.target_ty + == InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) + { + return Err(HandlerError::UnsupportedIdempotencyKey); + } // Craft Invocation Target and Id - let invocation_target = if let TargetType::VirtualObject { key } = target { - InvocationTarget::virtual_object( - &*service_name, - key, - &*handler_name, - invocation_target_meta.handler_ty, - ) + let invocation_target = if let TargetType::Keyed { key } = target { + match invocation_target_meta.target_ty { + InvocationTargetType::VirtualObject(handler_ty) => { + InvocationTarget::virtual_object( + &*service_name, + key, + &*handler_name, + handler_ty, + ) + } + InvocationTargetType::Workflow(handler_ty) => { + InvocationTarget::workflow(&*service_name, key, &*handler_name, handler_ty) + } + InvocationTargetType::Service => { + panic!("Unexpected keyed target, this should have been checked before in the path parsing.") + } + } } else { InvocationTarget::service(&*service_name, &*handler_name) }; - let invocation_id = if let Some(ref idempotency) = idempotency { + let invocation_id = if let Some(ref idempotency_key) = idempotency_key { // We need this to make sure the internal services will deliver correctly this idempotent invocation always // to the same partition. This piece of logic could be improved and moved into ingress-dispatcher with // /~https://github.com/restatedev/restate/issues/1329 - InvocationId::generate_with_idempotency_key(&invocation_target, &idempotency.key) + InvocationId::generate_with_idempotency_key(&invocation_target, idempotency_key) } else { InvocationId::generate(&invocation_target) }; @@ -150,7 +166,11 @@ where let mut service_invocation = ServiceInvocation::initialize(invocation_id, invocation_target, Source::Ingress); service_invocation.with_related_span(SpanRelation::Parent(ingress_span_context)); - service_invocation.idempotency = idempotency; + service_invocation.completion_retention_time = + invocation_target_meta.compute_retention(idempotency_key.is_some()); + if let Some(key) = idempotency_key { + service_invocation.idempotency_key = Some(key); + } service_invocation.headers = headers; service_invocation.argument = body; @@ -336,10 +356,7 @@ fn parse_delay(query: Option<&str>) -> Result, HandlerError> { Ok(None) } -fn parse_idempotency( - headers: &HeaderMap, - retention: Duration, -) -> Result, HandlerError> { +fn parse_idempotency(headers: &HeaderMap) -> Result, HandlerError> { let idempotency_key = if let Some(idempotency_key) = headers.get(IDEMPOTENCY_KEY) { ByteString::from( idempotency_key @@ -350,8 +367,5 @@ fn parse_idempotency( return Ok(None); }; - Ok(Some(Idempotency { - key: idempotency_key, - retention, - })) + Ok(Some(idempotency_key)) } diff --git a/crates/ingress-http/src/handler/tests.rs b/crates/ingress-http/src/handler/tests.rs index 6304b4017..4d4f669a9 100644 --- a/crates/ingress-http/src/handler/tests.rs +++ b/crates/ingress-http/src/handler/tests.rs @@ -28,7 +28,7 @@ use restate_schema_api::invocation_target::{ }; use restate_test_util::{assert, assert_eq}; use restate_types::identifiers::IdempotencyId; -use restate_types::invocation::{HandlerType, Header, Idempotency, ResponseResult, ServiceType}; +use restate_types::invocation::{Header, InvocationTargetType, ResponseResult}; use std::time::Duration; use tokio::sync::mpsc; use tower::ServiceExt; @@ -112,7 +112,7 @@ async fn call_service_with_get() { input_rules: InputRules { input_validation_rules: vec![InputValidationRule::NoBodyAndContentType], }, - ..InvocationTargetMetadata::mock(ServiceType::Service, HandlerType::Shared) + ..InvocationTargetMetadata::mock(InvocationTargetType::Service) }, ), |ingress_req| { @@ -359,11 +359,12 @@ async fn idempotency_key_parsing() { )) ); assert_eq!( - service_invocation.idempotency, - Some(Idempotency { - key: ByteString::from_static("123456"), - retention: Duration::from_secs(60 * 60 * 24) - }) + service_invocation.idempotency_key, + Some(ByteString::from_static("123456")) + ); + assert_eq!( + service_invocation.completion_retention_time, + Some(Duration::from_secs(60 * 60 * 24)) ); response_tx @@ -490,7 +491,7 @@ async fn private_service() { "greet", InvocationTargetMetadata { public: false, - ..InvocationTargetMetadata::mock(ServiceType::Service, HandlerType::Shared) + ..InvocationTargetMetadata::mock(InvocationTargetType::Service) }, ), request_handler_not_reached, @@ -519,7 +520,7 @@ async fn invalid_input() { ), }], }, - ..InvocationTargetMetadata::mock(ServiceType::Service, HandlerType::Shared) + ..InvocationTargetMetadata::mock(InvocationTargetType::Service) }, ), request_handler_not_reached, @@ -542,7 +543,7 @@ async fn set_custom_content_type_on_response() { has_json_schema: false, }, }, - ..InvocationTargetMetadata::mock(ServiceType::Service, HandlerType::Shared) + ..InvocationTargetMetadata::mock(InvocationTargetType::Service) }, ); let req = hyper::Request::builder() @@ -589,7 +590,7 @@ async fn set_custom_content_type_on_empty_response() { has_json_schema: false, }, }, - ..InvocationTargetMetadata::mock(ServiceType::Service, HandlerType::Shared) + ..InvocationTargetMetadata::mock(InvocationTargetType::Service) }, ); let req = hyper::Request::builder() diff --git a/crates/ingress-http/src/lib.rs b/crates/ingress-http/src/lib.rs index be3ea0061..470be3c83 100644 --- a/crates/ingress-http/src/lib.rs +++ b/crates/ingress-http/src/lib.rs @@ -46,7 +46,7 @@ mod mocks { use restate_schema_api::service::mocks::MockServiceMetadataResolver; use restate_schema_api::service::{HandlerMetadata, ServiceMetadata, ServiceMetadataResolver}; use restate_types::identifiers::DeploymentId; - use restate_types::invocation::{HandlerType, ServiceType}; + use restate_types::invocation::{InvocationTargetType, ServiceType, VirtualObjectHandlerType}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -76,15 +76,16 @@ mod mocks { name: service_name.to_string(), handlers: vec![HandlerMetadata { name: handler_name.to_string(), - ty: invocation_target_metadata.handler_ty, + ty: invocation_target_metadata.target_ty.into(), input_description: "any".to_string(), output_description: "any".to_string(), }], - ty: invocation_target_metadata.service_ty, + ty: invocation_target_metadata.target_ty.into(), deployment_id: DeploymentId::default(), revision: 0, public: invocation_target_metadata.public, idempotency_retention: DEFAULT_IDEMPOTENCY_RETENTION.into(), + workflow_completion_retention: None, }); self.1 .add(service_name, [(handler_name, invocation_target_metadata)]); @@ -135,12 +136,14 @@ mod mocks { mock_schemas.add_service_and_target( "greeter.Greeter", "greet", - InvocationTargetMetadata::mock(ServiceType::Service, HandlerType::Shared), + InvocationTargetMetadata::mock(InvocationTargetType::Service), ); mock_schemas.add_service_and_target( "greeter.GreeterObject", "greet", - InvocationTargetMetadata::mock(ServiceType::VirtualObject, HandlerType::Exclusive), + InvocationTargetMetadata::mock(InvocationTargetType::VirtualObject( + VirtualObjectHandlerType::Exclusive, + )), ); mock_schemas diff --git a/crates/invoker-api/src/entry_enricher.rs b/crates/invoker-api/src/entry_enricher.rs index c02221167..8c6d3da0f 100644 --- a/crates/invoker-api/src/entry_enricher.rs +++ b/crates/invoker-api/src/entry_enricher.rs @@ -66,6 +66,7 @@ pub mod mocks { enrichment_result: Some(CallEnrichmentResult { invocation_id: InvocationId::mock_random(), invocation_target: InvocationTarget::service("", ""), + completion_retention_time: None, span_context: current_invocation_span_context.clone(), }), } @@ -81,6 +82,7 @@ pub mod mocks { enrichment_result: CallEnrichmentResult { invocation_id: InvocationId::mock_random(), invocation_target: InvocationTarget::service("", ""), + completion_retention_time: None, span_context: current_invocation_span_context.clone(), }, }, diff --git a/crates/meta-rest-model/src/services.rs b/crates/meta-rest-model/src/services.rs index 2e6be46b7..12c5fbd00 100644 --- a/crates/meta-rest-model/src/services.rs +++ b/crates/meta-rest-model/src/services.rs @@ -33,6 +33,7 @@ pub struct ModifyServiceRequest { /// If false, the service can be invoked only from another Restate service. #[serde(default)] pub public: Option, + /// # Idempotency retention /// /// Modify the retention of idempotent requests for this service. @@ -41,6 +42,15 @@ pub struct ModifyServiceRequest { #[serde(default, with = "serde_with::As::>")] #[cfg_attr(feature = "schema", schemars(with = "Option"))] pub idempotency_retention: Option, + + /// # Workflow completion retention + /// + /// Modify the retention of the workflow completion. This can be modified only for workflow services! + /// + /// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format. + #[serde(default, with = "serde_with::As::>")] + #[cfg_attr(feature = "schema", schemars(with = "Option"))] + pub workflow_completion_retention: Option, } #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] diff --git a/crates/partition-store/tests/integration_test.rs b/crates/partition-store/tests/integration_test.rs index af6ca7c89..2d60bb01c 100644 --- a/crates/partition-store/tests/integration_test.rs +++ b/crates/partition-store/tests/integration_test.rs @@ -13,7 +13,6 @@ use std::fmt::Debug; use std::ops::RangeInclusive; use std::pin::pin; -use bytes::Bytes; use futures::Stream; use tokio_stream::StreamExt; @@ -24,7 +23,7 @@ use restate_storage_api::StorageError; use restate_types::arc_util::Constant; use restate_types::config::{CommonOptions, WorkerOptions}; use restate_types::identifiers::{InvocationId, PartitionId, PartitionKey, ServiceId}; -use restate_types::invocation::{InvocationTarget, ServiceInvocation, Source, SpanRelation}; +use restate_types::invocation::{InvocationTarget, ServiceInvocation, Source}; use restate_types::state_mut::ExternalStateMutation; mod idempotency_table_test; @@ -75,7 +74,6 @@ async fn test_read_write() { // run the tests // inbox_table_test::run_tests(rocksdb.clone()).await; - journal_table_test::run_tests(rocksdb.clone()).await; outbox_table_test::run_tests(rocksdb.clone()).await; state_table_test::run_tests(rocksdb.clone()).await; invocation_status_table_test::run_tests(rocksdb.clone()).await; @@ -85,17 +83,18 @@ async fn test_read_write() { pub(crate) fn mock_service_invocation(service_id: ServiceId) -> ServiceInvocation { let invocation_target = InvocationTarget::mock_from_service_id(service_id); - ServiceInvocation::new( - InvocationId::generate(&invocation_target), + ServiceInvocation { + invocation_id: InvocationId::generate(&invocation_target), invocation_target, - Bytes::new(), - Source::Ingress, - None, - SpanRelation::None, - vec![], - None, - None, - ) + argument: Default::default(), + source: Source::Ingress, + response_sink: None, + span_context: Default::default(), + headers: vec![], + execution_time: None, + completion_retention_time: None, + idempotency_key: None, + } } pub(crate) fn mock_random_service_invocation() -> ServiceInvocation { diff --git a/crates/partition-store/tests/invocation_status_table_test/mod.rs b/crates/partition-store/tests/invocation_status_table_test/mod.rs index 73ec1df70..e6f7c9364 100644 --- a/crates/partition-store/tests/invocation_status_table_test/mod.rs +++ b/crates/partition-store/tests/invocation_status_table_test/mod.rs @@ -23,7 +23,7 @@ use restate_storage_api::invocation_status_table::{ }; use restate_types::identifiers::InvocationId; use restate_types::invocation::{ - HandlerType, InvocationTarget, ServiceInvocationSpanContext, Source, + InvocationTarget, ServiceInvocationSpanContext, Source, VirtualObjectHandlerType, }; use restate_types::time::MillisSinceEpoch; use std::collections::HashSet; @@ -33,7 +33,7 @@ const INVOCATION_TARGET_1: InvocationTarget = InvocationTarget::VirtualObject { name: ByteString::from_static("abc"), key: ByteString::from_static("1"), handler: ByteString::from_static("myhandler"), - handler_ty: HandlerType::Exclusive, + handler_ty: VirtualObjectHandlerType::Exclusive, }; static INVOCATION_ID_1: Lazy = Lazy::new(|| InvocationId::generate(&INVOCATION_TARGET_1)); @@ -42,7 +42,7 @@ const INVOCATION_TARGET_2: InvocationTarget = InvocationTarget::VirtualObject { name: ByteString::from_static("abc"), key: ByteString::from_static("2"), handler: ByteString::from_static("myhandler"), - handler_ty: HandlerType::Exclusive, + handler_ty: VirtualObjectHandlerType::Exclusive, }; static INVOCATION_ID_2: Lazy = Lazy::new(|| InvocationId::generate(&INVOCATION_TARGET_2)); @@ -51,7 +51,7 @@ const INVOCATION_TARGET_3: InvocationTarget = InvocationTarget::VirtualObject { name: ByteString::from_static("abc"), key: ByteString::from_static("3"), handler: ByteString::from_static("myhandler"), - handler_ty: HandlerType::Exclusive, + handler_ty: VirtualObjectHandlerType::Exclusive, }; static INVOCATION_ID_3: Lazy = Lazy::new(|| InvocationId::generate(&INVOCATION_TARGET_3)); diff --git a/crates/partition-store/tests/journal_table_test/mod.rs b/crates/partition-store/tests/journal_table_test/mod.rs index 10d67c78b..584116ee4 100644 --- a/crates/partition-store/tests/journal_table_test/mod.rs +++ b/crates/partition-store/tests/journal_table_test/mod.rs @@ -8,11 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::storage_test_environment; + use bytes::Bytes; use bytestring::ByteString; use futures_util::StreamExt; use once_cell::sync::Lazy; -use restate_partition_store::PartitionStore; use restate_storage_api::journal_table::{JournalEntry, JournalTable}; use restate_storage_api::Transaction; use restate_types::identifiers::{InvocationId, InvocationUuid}; @@ -21,6 +22,7 @@ use restate_types::journal::enriched::{ CallEnrichmentResult, EnrichedEntryHeader, EnrichedRawEntry, }; use std::pin::pin; +use std::time::Duration; // false positive because of Bytes #[allow(clippy::declare_interior_mutable_const)] @@ -39,6 +41,7 @@ static MOCK_INVOKE_JOURNAL_ENTRY: Lazy = Lazy::new(|| { name: ByteString::from_static("MySvc"), handler: ByteString::from_static("MyHandler"), }, + completion_retention_time: Some(Duration::from_secs(10)), span_context: ServiceInvocationSpanContext::empty(), }), }, @@ -124,7 +127,10 @@ async fn verify_journal_deleted(txn: &mut T) { } } -pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { +#[tokio::test] +async fn journal_tests() { + let mut rocksdb = storage_test_environment().await; + let mut txn = rocksdb.transaction(); populate_data(&mut txn).await; diff --git a/crates/schema-api/src/invocation_target.rs b/crates/schema-api/src/invocation_target.rs index 25098b7b2..72d40063e 100644 --- a/crates/schema-api/src/invocation_target.rs +++ b/crates/schema-api/src/invocation_target.rs @@ -11,24 +11,40 @@ use bytes::Bytes; use bytestring::ByteString; use itertools::Itertools; -use restate_types::invocation::{HandlerType, ServiceType}; -use std::fmt; +use restate_types::invocation::InvocationTargetType; use std::str::FromStr; use std::time::Duration; +use std::{cmp, fmt}; pub const DEFAULT_IDEMPOTENCY_RETENTION: Duration = Duration::from_secs(60 * 60 * 24); +pub const DEFAULT_WORKFLOW_COMPLETION_RETENTION: Duration = Duration::from_secs(60 * 60 * 24); #[derive(Debug, Clone)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct InvocationTargetMetadata { pub public: bool, + /// Retention timer to be used for the completion. See [`InvocationTargetMetadata::compute_retention`] for more details. + pub completion_retention: Option, + /// Retention timer that should be used only if an idempotency key is set. See [`InvocationTargetMetadata::compute_retention`] for more details. pub idempotency_retention: Duration, - pub service_ty: ServiceType, - pub handler_ty: HandlerType, + pub target_ty: InvocationTargetType, pub input_rules: InputRules, pub output_rules: OutputRules, } +impl InvocationTargetMetadata { + pub fn compute_retention(&self, has_idempotency_key: bool) -> Option { + if has_idempotency_key { + Some(cmp::max( + self.completion_retention.unwrap_or_default(), + self.idempotency_retention, + )) + } else { + self.completion_retention + } + } +} + /// This API resolves invocation targets. pub trait InvocationTargetResolver { /// Returns None if the service handler doesn't exist, Some(basic_service_metadata) otherwise. @@ -411,12 +427,12 @@ pub mod mocks { } impl InvocationTargetMetadata { - pub fn mock(service_ty: ServiceType, handler_ty: HandlerType) -> Self { + pub fn mock(invocation_target_type: InvocationTargetType) -> Self { Self { public: true, idempotency_retention: DEFAULT_IDEMPOTENCY_RETENTION, - service_ty, - handler_ty, + completion_retention: None, + target_ty: invocation_target_type, input_rules: Default::default(), output_rules: Default::default(), } diff --git a/crates/schema-api/src/lib.rs b/crates/schema-api/src/lib.rs index 44e9e991d..66cd28561 100644 --- a/crates/schema-api/src/lib.rs +++ b/crates/schema-api/src/lib.rs @@ -290,7 +290,9 @@ pub mod deployment { #[cfg(feature = "service")] pub mod service { use restate_types::identifiers::{DeploymentId, ServiceRevision}; - use restate_types::invocation::{HandlerType, ServiceType}; + use restate_types::invocation::{ + InvocationTargetType, ServiceType, VirtualObjectHandlerType, WorkflowHandlerType, + }; #[derive(Debug, Clone)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] @@ -331,6 +333,46 @@ pub mod service { )] #[cfg_attr(feature = "serde_schema", schemars(with = "String"))] pub idempotency_retention: humantime::Duration, + + /// # Workflow completion retention + /// + /// The retention duration of workflows. Only available on workflow services. + #[cfg_attr( + feature = "serde", + serde( + with = "serde_with::As::>", + skip_serializing_if = "Option::is_none", + default + ) + )] + #[cfg_attr(feature = "serde_schema", schemars(with = "Option"))] + pub workflow_completion_retention: Option, + } + + // This type is used only for exposing the handler metadata, and not internally. See [ServiceAndHandlerType]. + #[derive(Debug, Clone)] + #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] + #[cfg_attr(feature = "serde_schema", derive(schemars::JsonSchema))] + pub enum HandlerMetadataType { + Exclusive, + Shared, + Workflow, + } + + impl From for HandlerMetadataType { + fn from(value: InvocationTargetType) -> Self { + match value { + InvocationTargetType::Service => HandlerMetadataType::Shared, + InvocationTargetType::VirtualObject(h_ty) => match h_ty { + VirtualObjectHandlerType::Exclusive => HandlerMetadataType::Exclusive, + VirtualObjectHandlerType::Shared => HandlerMetadataType::Shared, + }, + InvocationTargetType::Workflow(h_ty) => match h_ty { + WorkflowHandlerType::Workflow => HandlerMetadataType::Workflow, + WorkflowHandlerType::Shared => HandlerMetadataType::Shared, + }, + } + } } #[derive(Debug, Clone)] @@ -339,7 +381,7 @@ pub mod service { pub struct HandlerMetadata { pub name: String, - pub ty: HandlerType, + pub ty: HandlerMetadataType, // # Human readable input description // @@ -410,7 +452,7 @@ pub mod service { .into_iter() .map(|s| HandlerMetadata { name: s.as_ref().to_string(), - ty: HandlerType::Shared, + ty: HandlerMetadataType::Shared, input_description: "any".to_string(), output_description: "any".to_string(), }) @@ -420,6 +462,7 @@ pub mod service { revision: 0, public: true, idempotency_retention: std::time::Duration::from_secs(60).into(), + workflow_completion_retention: None, } } @@ -433,7 +476,7 @@ pub mod service { .into_iter() .map(|s| HandlerMetadata { name: s.as_ref().to_string(), - ty: HandlerType::Exclusive, + ty: HandlerMetadataType::Exclusive, input_description: "any".to_string(), output_description: "any".to_string(), }) @@ -443,6 +486,7 @@ pub mod service { revision: 0, public: true, idempotency_retention: std::time::Duration::from_secs(60).into(), + workflow_completion_retention: None, } } } @@ -488,6 +532,7 @@ pub mod subscription { #[cfg_attr(feature = "serde_schema", derive(schemars::JsonSchema))] pub enum EventReceiverServiceType { VirtualObject, + Workflow, Service, } diff --git a/crates/schema/src/service.rs b/crates/schema/src/service.rs index 797bc051b..86ee6ca5d 100644 --- a/crates/schema/src/service.rs +++ b/crates/schema/src/service.rs @@ -27,6 +27,7 @@ pub struct ServiceSchemas { pub ty: ServiceType, pub location: ServiceLocation, pub idempotency_retention: Duration, + pub workflow_completion_retention: Option, } impl ServiceSchemas { @@ -38,7 +39,7 @@ impl ServiceSchemas { .iter() .map(|(h_name, h_schemas)| HandlerMetadata { name: h_name.clone(), - ty: h_schemas.target_meta.handler_ty, + ty: h_schemas.target_meta.target_ty.into(), input_description: h_schemas.target_meta.input_rules.to_string(), output_description: h_schemas.target_meta.output_rules.to_string(), }) @@ -48,6 +49,7 @@ impl ServiceSchemas { revision: self.revision, public: self.location.public, idempotency_retention: self.idempotency_retention.into(), + workflow_completion_retention: self.workflow_completion_retention.map(Into::into), } } } diff --git a/crates/service-protocol/src/codec.rs b/crates/service-protocol/src/codec.rs index 204042c7c..7b95ec59e 100644 --- a/crates/service-protocol/src/codec.rs +++ b/crates/service-protocol/src/codec.rs @@ -174,7 +174,7 @@ mod mocks { InputEntryMessage, OneWayCallEntryMessage, OutputEntryMessage, SetStateEntryMessage, }; use restate_types::identifiers::InvocationId; - use restate_types::invocation::{HandlerType, InvocationTarget}; + use restate_types::invocation::{InvocationTarget, VirtualObjectHandlerType}; use restate_types::journal::enriched::{ AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader, EnrichedRawEntry, }; @@ -247,8 +247,9 @@ mod mocks { name: entry.request.service_name.clone(), key: entry.request.key.clone(), handler: entry.request.handler_name.clone(), - handler_ty: HandlerType::Exclusive, + handler_ty: VirtualObjectHandlerType::Exclusive, }, + completion_retention_time: None, span_context: Default::default(), }), }, @@ -282,8 +283,9 @@ mod mocks { name: entry.request.service_name.clone(), key: entry.request.key.clone(), handler: entry.request.handler_name.clone(), - handler_ty: HandlerType::Exclusive, + handler_ty: VirtualObjectHandlerType::Exclusive, }, + completion_retention_time: None, span_context: Default::default(), }, }, diff --git a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto index 4efcd3e58..6f1da6690 100644 --- a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto +++ b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto @@ -12,20 +12,16 @@ message InvocationTarget { enum Ty { UNKNOWN_TY = 0; SERVICE = 1; - VIRTUAL_OBJECT = 2; + VIRTUAL_OBJECT_EXCLUSIVE = 2; + VIRTUAL_OBJECT_SHARED = 3; + WORKFLOW_WORKFLOW = 4; + WORKFLOW_SHARED = 5; } - enum HandlerType { - UNKNOWN_HANDLER_TYPE = 0; - EXCLUSIVE = 1; - SHARED = 2; - } - - Ty ty = 1; + Ty service_and_handler_ty = 1; bytes name = 2; bytes handler = 3; bytes key = 4; - HandlerType handler_ty = 5; } message ServiceId { @@ -93,10 +89,7 @@ message InvocationStatus { } Source source = 9; Duration completion_retention_time = 10; - oneof idempotency_key { - google.protobuf.Empty idempotency_key_none = 11; - string idempotency_key_value = 12; - } + optional string idempotency_key = 11; } message Suspended { @@ -112,10 +105,7 @@ message InvocationStatus { } Source source = 10; Duration completion_retention_time = 11; - oneof idempotency_key { - google.protobuf.Empty idempotency_key_none = 12; - string idempotency_key_value = 13; - } + optional string idempotency_key = 12; } message Completed { @@ -125,10 +115,7 @@ message InvocationStatus { uint64 creation_time = 4; uint64 modification_time = 5; - oneof idempotency_key { - google.protobuf.Empty idempotency_key_none = 12; - string idempotency_key_value = 13; - } + optional string idempotency_key = 12; } message Free { @@ -147,7 +134,8 @@ message InvocationStatus { SpanContext span_context = 9; repeated Header headers = 10; uint64 execution_time = 11; - IdempotentRequestMetadata idempotency = 12; + Duration completion_retention_time = 12; + optional string idempotency_key = 13; } oneof status { @@ -229,7 +217,8 @@ message ServiceInvocation { Source source = 6; repeated Header headers = 7; uint64 execution_time = 8; - IdempotentRequestMetadata idempotency = 9; + Duration completion_retention_time = 9; + optional string idempotency_key = 10; } message StateMutation { @@ -255,6 +244,7 @@ message InvocationResolutionResult { InvocationId invocation_id = 1; InvocationTarget invocation_target = 2; SpanContext span_context = 3; + Duration completion_retention_time = 4; } oneof result { @@ -267,6 +257,7 @@ message BackgroundCallResolutionResult { InvocationId invocation_id = 1; InvocationTarget invocation_target = 2; SpanContext span_context = 3; + Duration completion_retention_time = 4; } message EnrichedEntryHeader { @@ -480,9 +471,4 @@ message DedupSequenceNumber { message IdempotencyMetadata { InvocationId invocation_id = 1; -} - -message IdempotentRequestMetadata { - string key = 1; - Duration retention = 2; } \ No newline at end of file diff --git a/crates/storage-api/src/invocation_status_table/mod.rs b/crates/storage-api/src/invocation_status_table/mod.rs index eb50758db..238e64652 100644 --- a/crates/storage-api/src/invocation_status_table/mod.rs +++ b/crates/storage-api/src/invocation_status_table/mod.rs @@ -14,7 +14,7 @@ use bytestring::ByteString; use futures_util::Stream; use restate_types::identifiers::{DeploymentId, EntryIndex, InvocationId, PartitionKey}; use restate_types::invocation::{ - Header, Idempotency, InvocationInput, InvocationTarget, ResponseResult, ServiceInvocation, + Header, InvocationInput, InvocationTarget, ResponseResult, ServiceInvocation, ServiceInvocationResponseSink, ServiceInvocationSpanContext, Source, }; use restate_types::time::MillisSinceEpoch; @@ -218,7 +218,9 @@ pub struct InboxedInvocation { pub headers: Vec
, /// Time when the request should be executed pub execution_time: Option, - pub idempotency: Option, + /// If zero, the invocation completion will not be retained. + pub completion_retention_time: Duration, + pub idempotency_key: Option, } impl InboxedInvocation { @@ -236,7 +238,10 @@ impl InboxedInvocation { span_context: service_invocation.span_context, headers: service_invocation.headers, execution_time: service_invocation.execution_time, - idempotency: service_invocation.idempotency, + completion_retention_time: service_invocation + .completion_retention_time + .unwrap_or_default(), + idempotency_key: service_invocation.idempotency_key, } } } @@ -258,10 +263,6 @@ impl InFlightInvocationMetadata { pub fn from_service_invocation( service_invocation: ServiceInvocation, ) -> (Self, InvocationInput) { - let (completion_retention_time, idempotency_key) = service_invocation - .idempotency - .map(|idempotency| (idempotency.retention, Some(idempotency.key))) - .unwrap_or((Duration::ZERO, None)); ( Self { invocation_target: service_invocation.invocation_target, @@ -270,8 +271,10 @@ impl InFlightInvocationMetadata { response_sinks: service_invocation.response_sink.into_iter().collect(), timestamps: StatusTimestamps::now(), source: service_invocation.source, - completion_retention_time, - idempotency_key, + completion_retention_time: service_invocation + .completion_retention_time + .unwrap_or_default(), + idempotency_key: service_invocation.idempotency_key, }, InvocationInput { argument: service_invocation.argument, @@ -283,11 +286,6 @@ impl InFlightInvocationMetadata { pub fn from_inboxed_invocation( mut inboxed_invocation: InboxedInvocation, ) -> (Self, InvocationInput) { - let (completion_retention_time, idempotency_key) = inboxed_invocation - .idempotency - .map(|idempotency| (idempotency.retention, Some(idempotency.key))) - .unwrap_or((Duration::ZERO, None)); - inboxed_invocation.timestamps.update(); ( @@ -298,8 +296,8 @@ impl InFlightInvocationMetadata { response_sinks: inboxed_invocation.response_sinks, timestamps: inboxed_invocation.timestamps, source: inboxed_invocation.source, - completion_retention_time, - idempotency_key, + completion_retention_time: inboxed_invocation.completion_retention_time, + idempotency_key: inboxed_invocation.idempotency_key, }, InvocationInput { argument: inboxed_invocation.argument, @@ -376,7 +374,7 @@ pub trait InvocationStatusTable: ReadOnlyInvocationStatusTable { mod mocks { use super::*; - use restate_types::invocation::HandlerType; + use restate_types::invocation::VirtualObjectHandlerType; impl InFlightInvocationMetadata { pub fn mock() -> Self { @@ -385,7 +383,7 @@ mod mocks { "MyService", "MyKey", "mock", - HandlerType::Exclusive, + VirtualObjectHandlerType::Exclusive, ), journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()), deployment_id: None, diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index d0beee92b..6b28b0827 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -107,12 +107,11 @@ pub mod v1 { enriched_entry_header, inbox_entry, invocation_resolution_result, invocation_status, invocation_target, outbox_message, response_result, source, span_relation, timer, virtual_object_status, BackgroundCallResolutionResult, DedupSequenceNumber, Duration, - EnrichedEntryHeader, EpochSequenceNumber, Header, IdempotencyMetadata, - IdempotentRequestMetadata, InboxEntry, InvocationId, InvocationResolutionResult, - InvocationStatus, InvocationTarget, JournalEntry, JournalMeta, KvPair, OutboxMessage, - ResponseResult, SequenceNumber, ServiceId, ServiceInvocation, - ServiceInvocationResponseSink, Source, SpanContext, SpanRelation, StateMutation, Timer, - VirtualObjectStatus, + EnrichedEntryHeader, EpochSequenceNumber, Header, IdempotencyMetadata, InboxEntry, + InvocationId, InvocationResolutionResult, InvocationStatus, InvocationTarget, + JournalEntry, JournalMeta, KvPair, OutboxMessage, ResponseResult, SequenceNumber, + ServiceId, ServiceInvocation, ServiceInvocationResponseSink, Source, SpanContext, + SpanRelation, StateMutation, Timer, VirtualObjectStatus, }; use crate::StorageError; @@ -340,15 +339,7 @@ pub mod v1 { value.completion_retention_time.unwrap_or_default(), )?; - let idempotency_key = match value - .idempotency_key - .ok_or(ConversionError::missing_field("idempotency_key"))? - { - invocation_status::invoked::IdempotencyKey::IdempotencyKeyValue(key) => { - Some(ByteString::from(key)) - } - invocation_status::invoked::IdempotencyKey::IdempotencyKeyNone(_) => None, - }; + let idempotency_key = value.idempotency_key.map(ByteString::from); Ok(crate::invocation_status_table::InFlightInvocationMetadata { invocation_target, @@ -396,14 +387,7 @@ pub mod v1 { modification_time: timestamps.modification_time().as_u64(), source: Some(Source::from(source)), completion_retention_time: Some(Duration::from(completion_retention_time)), - idempotency_key: Some(match idempotency_key { - Some(key) => { - invocation_status::invoked::IdempotencyKey::IdempotencyKeyValue( - key.to_string(), - ) - } - _ => invocation_status::invoked::IdempotencyKey::IdempotencyKeyNone(()), - }), + idempotency_key: idempotency_key.map(|s| s.to_string()), } } } @@ -461,15 +445,7 @@ pub mod v1 { value.completion_retention_time.unwrap_or_default(), )?; - let idempotency_key = match value - .idempotency_key - .ok_or(ConversionError::missing_field("idempotency_key"))? - { - invocation_status::suspended::IdempotencyKey::IdempotencyKeyValue(key) => { - Some(ByteString::from(key)) - } - invocation_status::suspended::IdempotencyKey::IdempotencyKeyNone(_) => None, - }; + let idempotency_key = value.idempotency_key.map(ByteString::from); Ok(( crate::invocation_status_table::InFlightInvocationMetadata { @@ -528,14 +504,7 @@ pub mod v1 { completion_retention_time: Some(Duration::from( metadata.completion_retention_time, )), - idempotency_key: Some(match metadata.idempotency_key { - Some(key) => { - invocation_status::suspended::IdempotencyKey::IdempotencyKeyValue( - key.to_string(), - ) - } - _ => invocation_status::suspended::IdempotencyKey::IdempotencyKeyNone(()), - }), + idempotency_key: metadata.idempotency_key.map(|s| s.to_string()), } } } @@ -586,10 +555,11 @@ pub mod v1 { Some(MillisSinceEpoch::new(value.execution_time)) }; - let idempotency = value - .idempotency - .map(restate_types::invocation::Idempotency::try_from) - .transpose()?; + let completion_retention_time = std::time::Duration::try_from( + value.completion_retention_time.unwrap_or_default(), + )?; + + let idempotency_key = value.idempotency_key.map(ByteString::from); Ok(crate::invocation_status_table::InboxedInvocation { inbox_sequence_number: value.inbox_sequence_number, @@ -603,7 +573,8 @@ pub mod v1 { headers, argument: value.argument, execution_time, - idempotency, + idempotency_key, + completion_retention_time, invocation_target, }) } @@ -621,7 +592,8 @@ pub mod v1 { span_context, headers, execution_time, - idempotency, + completion_retention_time, + idempotency_key, } = value; let headers = headers.into_iter().map(Into::into).collect(); @@ -640,7 +612,8 @@ pub mod v1 { headers, argument, execution_time: execution_time.map(|m| m.as_u64()).unwrap_or_default(), - idempotency: idempotency.map(Into::into), + completion_retention_time: Some(Duration::from(completion_retention_time)), + idempotency_key: idempotency_key.map(|s| s.to_string()), } } } @@ -661,15 +634,7 @@ pub mod v1 { .ok_or(ConversionError::missing_field("source"))?, )?; - let idempotency_key = match value - .idempotency_key - .ok_or(ConversionError::missing_field("idempotency_key"))? - { - invocation_status::completed::IdempotencyKey::IdempotencyKeyValue(key) => { - Some(ByteString::from(key)) - } - invocation_status::completed::IdempotencyKey::IdempotencyKeyNone(_) => None, - }; + let idempotency_key = value.idempotency_key.map(ByteString::from); Ok(crate::invocation_status_table::CompletedInvocation { invocation_target, @@ -703,14 +668,7 @@ pub mod v1 { result: Some(ResponseResult::from(response_result)), creation_time: timestamps.creation_time().as_u64(), modification_time: timestamps.modification_time().as_u64(), - idempotency_key: Some(match idempotency_key { - Some(key) => { - invocation_status::completed::IdempotencyKey::IdempotencyKeyValue( - key.to_string(), - ) - } - _ => invocation_status::completed::IdempotencyKey::IdempotencyKeyNone(()), - }), + idempotency_key: idempotency_key.map(|s| s.to_string()), } } } @@ -860,7 +818,8 @@ pub mod v1 { source, headers, execution_time, - idempotency, + idempotency_key, + completion_retention_time, } = value; let invocation_id = restate_types::identifiers::InvocationId::try_from( @@ -896,10 +855,12 @@ pub mod v1 { Some(MillisSinceEpoch::new(execution_time)) }; - let idempotency = idempotency - .map(restate_types::invocation::Idempotency::try_from) + let completion_retention_time = completion_retention_time + .map(std::time::Duration::try_from) .transpose()?; + let idempotency_key = idempotency_key.map(ByteString::from); + Ok(restate_types::invocation::ServiceInvocation { invocation_id, invocation_target, @@ -909,7 +870,8 @@ pub mod v1 { span_context, headers, execution_time, - idempotency, + completion_retention_time, + idempotency_key, }) } } @@ -931,32 +893,8 @@ pub mod v1 { source: Some(source), headers, execution_time: value.execution_time.map(|m| m.as_u64()).unwrap_or_default(), - idempotency: value.idempotency.map(Into::into), - } - } - } - - impl TryFrom for restate_types::invocation::Idempotency { - type Error = ConversionError; - - fn try_from(value: IdempotentRequestMetadata) -> Result { - let retention: std::time::Duration = value - .retention - .ok_or(ConversionError::missing_field("retention"))? - .try_into()?; - - Ok(Self { - key: ByteString::from(value.key), - retention, - }) - } - } - - impl From for IdempotentRequestMetadata { - fn from(value: restate_types::invocation::Idempotency) -> Self { - Self { - key: value.key.to_string(), - retention: Some(value.retention.into()), + completion_retention_time: value.completion_retention_time.map(Duration::from), + idempotency_key: value.idempotency_key.map(|s| s.to_string()), } } } @@ -1005,42 +943,56 @@ pub mod v1 { type Error = ConversionError; fn try_from(value: InvocationTarget) -> Result { - match invocation_target::Ty::try_from(value.ty) { + let name = + ByteString::try_from(value.name).map_err(ConversionError::invalid_data)?; + let handler = + ByteString::try_from(value.handler).map_err(ConversionError::invalid_data)?; + + match invocation_target::Ty::try_from(value.service_and_handler_ty) { Ok(invocation_target::Ty::Service) => { - Ok(restate_types::invocation::InvocationTarget::Service { - name: ByteString::try_from(value.name) - .map_err(ConversionError::invalid_data)?, - handler: ByteString::try_from(value.handler) + Ok(restate_types::invocation::InvocationTarget::Service { name, handler }) + } + Ok(invocation_target::Ty::VirtualObjectExclusive) => { + Ok(restate_types::invocation::InvocationTarget::VirtualObject { + name, + handler, + key: ByteString::try_from(value.key) .map_err(ConversionError::invalid_data)?, + handler_ty: + restate_types::invocation::VirtualObjectHandlerType::Exclusive, }) } - Ok(invocation_target::Ty::VirtualObject) => { + Ok(invocation_target::Ty::VirtualObjectShared) => { Ok(restate_types::invocation::InvocationTarget::VirtualObject { - name: ByteString::try_from(value.name) + name, + handler, + key: ByteString::try_from(value.key) .map_err(ConversionError::invalid_data)?, - handler: ByteString::try_from(value.handler) + handler_ty: restate_types::invocation::VirtualObjectHandlerType::Shared, + }) + } + Ok(invocation_target::Ty::WorkflowWorkflow) => { + Ok(restate_types::invocation::InvocationTarget::Workflow { + name, + handler, + key: ByteString::try_from(value.key) .map_err(ConversionError::invalid_data)?, + handler_ty: restate_types::invocation::WorkflowHandlerType::Workflow, + }) + } + Ok(invocation_target::Ty::WorkflowShared) => { + Ok(restate_types::invocation::InvocationTarget::Workflow { + name, + handler, key: ByteString::try_from(value.key) .map_err(ConversionError::invalid_data)?, - handler_ty: match invocation_target::HandlerType::try_from( - value.handler_ty, - ) { - Ok(invocation_target::HandlerType::Exclusive) => { - restate_types::invocation::HandlerType::Exclusive - } - Ok(invocation_target::HandlerType::Shared) => { - restate_types::invocation::HandlerType::Shared - } - _ => { - return Err(ConversionError::unexpected_enum_variant( - "handler_ty", - value.handler_ty, - )) - } - }, + handler_ty: restate_types::invocation::WorkflowHandlerType::Shared, }) } - _ => Err(ConversionError::unexpected_enum_variant("ty", value.ty)), + _ => Err(ConversionError::unexpected_enum_variant( + "ty", + value.service_and_handler_ty, + )), } } } @@ -1050,9 +1002,9 @@ pub mod v1 { match value { restate_types::invocation::InvocationTarget::Service { name, handler } => { InvocationTarget { - ty: invocation_target::Ty::Service.into(), name: name.into_bytes(), handler: handler.into_bytes(), + service_and_handler_ty: invocation_target::Ty::Service.into(), ..InvocationTarget::default() } } @@ -1062,16 +1014,34 @@ pub mod v1 { handler, handler_ty, } => InvocationTarget { - ty: invocation_target::Ty::VirtualObject.into(), name: name.into_bytes(), handler: handler.into_bytes(), key: key.into_bytes(), - handler_ty: match handler_ty { - restate_types::invocation::HandlerType::Shared => { - invocation_target::HandlerType::Shared + service_and_handler_ty: match handler_ty { + restate_types::invocation::VirtualObjectHandlerType::Shared => { + invocation_target::Ty::VirtualObjectShared } - restate_types::invocation::HandlerType::Exclusive => { - invocation_target::HandlerType::Exclusive + restate_types::invocation::VirtualObjectHandlerType::Exclusive => { + invocation_target::Ty::VirtualObjectExclusive + } + } + .into(), + }, + restate_types::invocation::InvocationTarget::Workflow { + name, + key, + handler, + handler_ty, + } => InvocationTarget { + name: name.into_bytes(), + handler: handler.into_bytes(), + key: key.into_bytes(), + service_and_handler_ty: match handler_ty { + restate_types::invocation::WorkflowHandlerType::Shared => { + invocation_target::Ty::WorkflowShared + } + restate_types::invocation::WorkflowHandlerType::Workflow => { + invocation_target::Ty::WorkflowWorkflow } } .into(), @@ -1671,10 +1641,15 @@ pub mod v1 { .ok_or(ConversionError::missing_field("span_context"))?, )?; + let completion_retention_time = Some(std::time::Duration::try_from( + success.completion_retention_time.unwrap_or_default(), + )?); + Some(restate_types::journal::enriched::CallEnrichmentResult { invocation_id, invocation_target, span_context, + completion_retention_time, }) } }; @@ -1694,11 +1669,15 @@ pub mod v1 { invocation_id, invocation_target, span_context, + completion_retention_time, } => invocation_resolution_result::Result::Success( invocation_resolution_result::Success { invocation_id: Some(InvocationId::from(invocation_id)), invocation_target: Some(invocation_target.into()), span_context: Some(SpanContext::from(span_context)), + completion_retention_time: Some(Duration::from( + completion_retention_time.unwrap_or_default(), + )), }, ), }, @@ -1734,10 +1713,15 @@ pub mod v1 { .ok_or(ConversionError::missing_field("span_context"))?, )?; + let completion_retention_time = Some(std::time::Duration::try_from( + value.completion_retention_time.unwrap_or_default(), + )?); + Ok(restate_types::journal::enriched::CallEnrichmentResult { invocation_id, span_context, invocation_target, + completion_retention_time, }) } } @@ -1750,6 +1734,9 @@ pub mod v1 { invocation_id: Some(InvocationId::from(value.invocation_id)), invocation_target: Some(value.invocation_target.into()), span_context: Some(SpanContext::from(value.span_context)), + completion_retention_time: Some(Duration::from( + value.completion_retention_time.unwrap_or_default(), + )), } } } diff --git a/crates/storage-query-datafusion/src/invocation_status/row.rs b/crates/storage-query-datafusion/src/invocation_status/row.rs index 5d89f5b5e..7e4e9dba2 100644 --- a/crates/storage-query-datafusion/src/invocation_status/row.rs +++ b/crates/storage-query-datafusion/src/invocation_status/row.rs @@ -38,6 +38,7 @@ pub(crate) fn append_invocation_status_row( row.target_service_ty(match invocation_target.service_ty() { ServiceType::Service => "service", ServiceType::VirtualObject => "virtual_object", + ServiceType::Workflow => "workflow", }); } diff --git a/crates/storage-query-datafusion/src/journal/tests.rs b/crates/storage-query-datafusion/src/journal/tests.rs index 25a015cbe..10efa2003 100644 --- a/crates/storage-query-datafusion/src/journal/tests.rs +++ b/crates/storage-query-datafusion/src/journal/tests.rs @@ -61,6 +61,7 @@ async fn get_entries() { enrichment_result: Some(CallEnrichmentResult { invocation_id: invoked_invocation_id, invocation_target: invoked_invocation_target.clone(), + completion_retention_time: None, span_context: Default::default(), }), }, diff --git a/crates/storage-query-datafusion/src/service/row.rs b/crates/storage-query-datafusion/src/service/row.rs index e3f573e59..9ef4a62c8 100644 --- a/crates/storage-query-datafusion/src/service/row.rs +++ b/crates/storage-query-datafusion/src/service/row.rs @@ -28,5 +28,6 @@ pub(crate) fn append_service_row( row.ty(match service_metadata.ty { ServiceType::Service => "service", ServiceType::VirtualObject => "virtual_object", + ServiceType::Workflow => "workflow", }) } diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index e79d9be56..a78e0929e 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -823,7 +823,7 @@ mod mocks { mod tests { use super::*; - use crate::invocation::HandlerType; + use crate::invocation::VirtualObjectHandlerType; #[test] fn service_id_and_invocation_id_partition_key_should_match() { @@ -831,7 +831,7 @@ mod tests { "MyService", "MyKey", "MyMethod", - HandlerType::Exclusive, + VirtualObjectHandlerType::Exclusive, ); let invocation_id = InvocationId::generate(&invocation_target); diff --git a/crates/types/src/invocation.rs b/crates/types/src/invocation.rs index f9338095c..7120f6ca5 100644 --- a/crates/types/src/invocation.rs +++ b/crates/types/src/invocation.rs @@ -34,11 +34,12 @@ pub use opentelemetry::trace::TraceId; pub enum ServiceType { Service, VirtualObject, + Workflow, } impl ServiceType { pub fn is_keyed(&self) -> bool { - matches!(self, ServiceType::VirtualObject) + matches!(self, ServiceType::VirtualObject | ServiceType::Workflow) } pub fn has_state(&self) -> bool { @@ -52,28 +53,83 @@ impl fmt::Display for ServiceType { } } -#[derive(Eq, Hash, PartialEq, Clone, Copy, Debug, serde::Serialize, serde::Deserialize)] +#[derive( + Eq, Hash, PartialEq, Clone, Copy, Debug, Default, serde::Serialize, serde::Deserialize, +)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] -pub enum HandlerType { +pub enum VirtualObjectHandlerType { + #[default] Exclusive, Shared, } -impl HandlerType { - pub fn default_for_service_type(service_type: ServiceType) -> Self { - match service_type { - ServiceType::Service => HandlerType::Shared, - ServiceType::VirtualObject => HandlerType::Exclusive, - } +impl fmt::Display for VirtualObjectHandlerType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(self, f) + } +} + +#[derive( + Eq, Hash, PartialEq, Clone, Copy, Debug, Default, serde::Serialize, serde::Deserialize, +)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub enum WorkflowHandlerType { + #[default] + Workflow, + Shared, +} + +impl fmt::Display for WorkflowHandlerType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(self, f) + } +} + +#[derive(Eq, Hash, PartialEq, Clone, Copy, Debug, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub enum InvocationTargetType { + Service, + VirtualObject(VirtualObjectHandlerType), + Workflow(WorkflowHandlerType), +} + +impl InvocationTargetType { + pub fn is_keyed(&self) -> bool { + matches!( + self, + InvocationTargetType::VirtualObject(_) | InvocationTargetType::Workflow(_) + ) + } + + pub fn can_read_state(&self) -> bool { + self.is_keyed() + } + + pub fn can_write_state(&self) -> bool { + matches!( + self, + InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) + | InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) + ) } } -impl fmt::Display for HandlerType { +impl fmt::Display for InvocationTargetType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(self, f) } } +impl From for ServiceType { + fn from(value: InvocationTargetType) -> Self { + match value { + InvocationTargetType::Service => ServiceType::Service, + InvocationTargetType::VirtualObject(_) => ServiceType::VirtualObject, + InvocationTargetType::Workflow(_) => ServiceType::Workflow, + } + } +} + #[derive(Eq, Hash, PartialEq, Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum InvocationTarget { Service { @@ -84,7 +140,13 @@ pub enum InvocationTarget { name: ByteString, key: ByteString, handler: ByteString, - handler_ty: HandlerType, + handler_ty: VirtualObjectHandlerType, + }, + Workflow { + name: ByteString, + key: ByteString, + handler: ByteString, + handler_ty: WorkflowHandlerType, }, } @@ -100,7 +162,7 @@ impl InvocationTarget { name: impl Into, key: impl Into, handler: impl Into, - handler_ty: HandlerType, + handler_ty: VirtualObjectHandlerType, ) -> Self { Self::VirtualObject { name: name.into(), @@ -110,10 +172,25 @@ impl InvocationTarget { } } + pub fn workflow( + name: impl Into, + key: impl Into, + handler: impl Into, + handler_ty: WorkflowHandlerType, + ) -> Self { + Self::Workflow { + name: name.into(), + key: key.into(), + handler: handler.into(), + handler_ty, + } + } + pub fn service_name(&self) -> &ByteString { match self { InvocationTarget::Service { name, .. } => name, InvocationTarget::VirtualObject { name, .. } => name, + InvocationTarget::Workflow { name, .. } => name, } } @@ -121,6 +198,7 @@ impl InvocationTarget { match self { InvocationTarget::Service { .. } => None, InvocationTarget::VirtualObject { key, .. } => Some(key), + InvocationTarget::Workflow { key, .. } => Some(key), } } @@ -128,6 +206,7 @@ impl InvocationTarget { match self { InvocationTarget::Service { handler, .. } => handler, InvocationTarget::VirtualObject { handler, .. } => handler, + InvocationTarget::Workflow { handler, .. } => handler, } } @@ -137,6 +216,9 @@ impl InvocationTarget { InvocationTarget::VirtualObject { name, key, .. } => { Some(ServiceId::new(name.clone(), key.clone())) } + InvocationTarget::Workflow { name, key, .. } => { + Some(ServiceId::new(name.clone(), key.clone())) + } } } @@ -144,13 +226,19 @@ impl InvocationTarget { match self { InvocationTarget::Service { .. } => ServiceType::Service, InvocationTarget::VirtualObject { .. } => ServiceType::VirtualObject, + InvocationTarget::Workflow { .. } => ServiceType::Workflow, } } - pub fn handler_ty(&self) -> Option { + pub fn invocation_target_ty(&self) -> InvocationTargetType { match self { - InvocationTarget::Service { .. } => None, - InvocationTarget::VirtualObject { handler_ty, .. } => Some(*handler_ty), + InvocationTarget::Service { .. } => InvocationTargetType::Service, + InvocationTarget::VirtualObject { handler_ty, .. } => { + InvocationTargetType::VirtualObject(*handler_ty) + } + InvocationTarget::Workflow { handler_ty, .. } => { + InvocationTargetType::Workflow(*handler_ty) + } } } } @@ -178,13 +266,8 @@ pub struct ServiceInvocation { pub headers: Vec
, /// Time when the request should be executed pub execution_time: Option, - pub idempotency: Option, -} - -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct Idempotency { - pub key: ByteString, - pub retention: Duration, + pub completion_retention_time: Option, + pub idempotency_key: Option, } impl ServiceInvocation { @@ -202,45 +285,14 @@ impl ServiceInvocation { span_context: ServiceInvocationSpanContext::empty(), headers: vec![], execution_time: None, - idempotency: None, + completion_retention_time: None, + idempotency_key: None, } } pub fn with_related_span(&mut self, span_relation: SpanRelation) { self.span_context = ServiceInvocationSpanContext::start(&self.invocation_id, span_relation); } - - /// Create a new [`ServiceInvocation`]. - /// - /// This method returns the [`Span`] associated to the created [`ServiceInvocation`]. - /// It is not required to keep this [`Span`] around for the whole lifecycle of the invocation. - /// On the contrary, it is encouraged to drop it as soon as possible, - /// to let the exporter commit this span to jaeger/zipkin to visualize intermediate results of the invocation. - #[allow(clippy::too_many_arguments)] - pub fn new( - invocation_id: InvocationId, - invocation_target: InvocationTarget, - argument: impl Into, - source: Source, - response_sink: Option, - related_span: SpanRelation, - headers: Vec
, - execution_time: Option, - idempotency: Option, - ) -> Self { - let span_context = ServiceInvocationSpanContext::start(&invocation_id, related_span); - Self { - invocation_id, - invocation_target, - argument: argument.into(), - source, - response_sink, - span_context, - headers, - execution_time, - idempotency, - } - } } impl WithPartitionKey for ServiceInvocation { @@ -317,6 +369,15 @@ pub enum ServiceInvocationResponseSink { Ingress(GenerationalNodeId), } +impl ServiceInvocationResponseSink { + pub fn partition_processor(caller: InvocationId, entry_index: EntryIndex) -> Self { + Self::PartitionProcessor { + caller, + entry_index, + } + } +} + /// Source of an invocation #[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub enum Source { @@ -693,7 +754,16 @@ mod mocks { generate_string(), generate_string(), generate_string(), - HandlerType::Exclusive, + VirtualObjectHandlerType::Exclusive, + ) + } + + pub fn mock_workflow() -> Self { + InvocationTarget::workflow( + generate_string(), + generate_string(), + generate_string(), + WorkflowHandlerType::Workflow, ) } @@ -702,7 +772,7 @@ mod mocks { service_id.service_name, service_id.key, "MyMethod", - HandlerType::Exclusive, + VirtualObjectHandlerType::Exclusive, ) } } @@ -721,7 +791,8 @@ mod mocks { span_context: Default::default(), headers: vec![], execution_time: None, - idempotency: None, + completion_retention_time: None, + idempotency_key: None, } } } diff --git a/crates/types/src/journal/enriched.rs b/crates/types/src/journal/enriched.rs index 644e0fc14..d3f2d3364 100644 --- a/crates/types/src/journal/enriched.rs +++ b/crates/types/src/journal/enriched.rs @@ -13,6 +13,7 @@ use super::*; use crate::identifiers::InvocationId; use crate::invocation::{InvocationTarget, ServiceInvocationSpanContext}; +use std::time::Duration; pub type EnrichedEntryHeader = EntryHeader; pub type EnrichedRawEntry = RawEntry; @@ -22,6 +23,7 @@ pub type EnrichedRawEntry = RawEntry, // When resolving the service and generating its id, we also generate the associated span pub span_context: ServiceInvocationSpanContext, diff --git a/crates/worker/src/invoker_integration.rs b/crates/worker/src/invoker_integration.rs index 8b66e5a41..bacf25d81 100644 --- a/crates/worker/src/invoker_integration.rs +++ b/crates/worker/src/invoker_integration.rs @@ -16,7 +16,7 @@ use restate_service_protocol::awakeable_id::AwakeableIdentifier; use restate_types::errors::{codes, InvocationError}; use restate_types::identifiers::InvocationId; use restate_types::invocation::{ - HandlerType, InvocationTarget, ServiceInvocationSpanContext, ServiceType, SpanRelation, + InvocationTarget, InvocationTargetType, ServiceInvocationSpanContext, SpanRelation, }; use restate_types::journal::enriched::{ AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader, EnrichedRawEntry, @@ -61,31 +61,40 @@ where .map_err(InvocationError::internal)?; let request = request_extractor(entry); - let invocation_target = match self + let meta = self .schemas .resolve_latest_invocation_target(&request.service_name, &request.handler_name) - { - Some(meta) => match meta.service_ty { - ServiceType::Service => { - InvocationTarget::service(request.service_name, request.handler_name) - } - ServiceType::VirtualObject => InvocationTarget::virtual_object( - request.service_name.clone(), - ByteString::try_from(request.key.clone().into_bytes()).map_err(|e| { - InvocationError::from(anyhow!( - "The request key is not a valid UTF-8 string: {e}" - )) - })?, - request.handler_name, - meta.handler_ty, - ), - }, - None => { - return Err(InvocationError::service_handler_not_found( + .ok_or_else(|| { + InvocationError::service_handler_not_found( &request.service_name, &request.handler_name, - )) + ) + })?; + + let invocation_target = match meta.target_ty { + InvocationTargetType::Service => { + InvocationTarget::service(request.service_name, request.handler_name) } + InvocationTargetType::VirtualObject(h_ty) => InvocationTarget::virtual_object( + request.service_name.clone(), + ByteString::try_from(request.key.clone().into_bytes()).map_err(|e| { + InvocationError::from(anyhow!( + "The request key is not a valid UTF-8 string: {e}" + )) + })?, + request.handler_name, + h_ty, + ), + InvocationTargetType::Workflow(h_ty) => InvocationTarget::workflow( + request.service_name.clone(), + ByteString::try_from(request.key.clone().into_bytes()).map_err(|e| { + InvocationError::from(anyhow!( + "The request key is not a valid UTF-8 string: {e}" + )) + })?, + request.handler_name, + h_ty, + ), }; let invocation_id = InvocationId::generate(&invocation_target); @@ -96,6 +105,7 @@ where Ok(CallEnrichmentResult { invocation_id, invocation_target, + completion_retention_time: meta.compute_retention(false), span_context, }) } @@ -120,38 +130,35 @@ where PlainEntryHeader::GetState { is_completed } => { can_read_state( &header.as_entry_type(), - ¤t_invocation_target.service_ty(), + ¤t_invocation_target.invocation_target_ty(), )?; EnrichedEntryHeader::GetState { is_completed } } PlainEntryHeader::SetState {} => { can_write_state( &header.as_entry_type(), - ¤t_invocation_target.service_ty(), - current_invocation_target.handler_ty(), + ¤t_invocation_target.invocation_target_ty(), )?; EnrichedEntryHeader::SetState {} } PlainEntryHeader::ClearState {} => { can_write_state( &header.as_entry_type(), - ¤t_invocation_target.service_ty(), - current_invocation_target.handler_ty(), + ¤t_invocation_target.invocation_target_ty(), )?; EnrichedEntryHeader::ClearState {} } PlainEntryHeader::GetStateKeys { is_completed } => { can_read_state( &header.as_entry_type(), - ¤t_invocation_target.service_ty(), + ¤t_invocation_target.invocation_target_ty(), )?; EnrichedEntryHeader::GetStateKeys { is_completed } } PlainEntryHeader::ClearAllState => { can_write_state( &header.as_entry_type(), - ¤t_invocation_target.service_ty(), - current_invocation_target.handler_ty(), + ¤t_invocation_target.invocation_target_ty(), )?; EnrichedEntryHeader::ClearAllState {} } @@ -229,14 +236,14 @@ where #[inline] fn can_read_state( entry_type: &EntryType, - service_type: &ServiceType, + invocation_target_type: &InvocationTargetType, ) -> Result<(), InvocationError> { - if !service_type.has_state() { + if !invocation_target_type.can_read_state() { return Err(InvocationError::new( codes::BAD_REQUEST, format!( - "The service type {} does not have state and, therefore, does not support the entry type {}", - service_type, entry_type + "The service/handler type {} does not have state and, therefore, does not support the entry type {}", + invocation_target_type, entry_type ), )); } @@ -246,16 +253,15 @@ fn can_read_state( #[inline] fn can_write_state( entry_type: &EntryType, - service_type: &ServiceType, - handler_type: Option, + invocation_target_type: &InvocationTargetType, ) -> Result<(), InvocationError> { - can_read_state(entry_type, service_type)?; - if handler_type != Some(HandlerType::Exclusive) { + can_read_state(entry_type, invocation_target_type)?; + if !invocation_target_type.can_write_state() { return Err(InvocationError::new( codes::BAD_REQUEST, format!( - "The service type {} with handler type {:?} has no exclusive state access and, therefore, does not support the entry type {}", - service_type, handler_type, entry_type + "The service/handler type {} has no exclusive state access and, therefore, does not support the entry type {}", + invocation_target_type, entry_type ), )); } diff --git a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs index 98a63158b..2d1987b3e 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs @@ -37,9 +37,9 @@ use restate_types::identifiers::{ }; use restate_types::ingress::IngressResponse; use restate_types::invocation::{ - HandlerType, InvocationResponse, InvocationTarget, InvocationTermination, ResponseResult, - ServiceInvocation, ServiceInvocationResponseSink, ServiceInvocationSpanContext, Source, - SpanRelationCause, TerminationFlavor, + InvocationResponse, InvocationTarget, InvocationTargetType, InvocationTermination, + ResponseResult, ServiceInvocation, ServiceInvocationResponseSink, ServiceInvocationSpanContext, + Source, SpanRelationCause, TerminationFlavor, VirtualObjectHandlerType, WorkflowHandlerType, }; use restate_types::journal::enriched::{ AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader, EnrichedRawEntry, @@ -56,6 +56,7 @@ use restate_wal_protocol::Command; use std::collections::HashSet; use std::fmt::{Debug, Formatter}; use std::future::Future; +use std::iter; use std::marker::PhantomData; use std::ops::RangeInclusive; use std::pin::pin; @@ -223,27 +224,33 @@ where effects.set_parent_span_context(&service_invocation.span_context); // If an idempotency key is set, handle idempotency - if let Some(idempotency) = &service_invocation.idempotency { - let idempotency_id = IdempotencyId::combine( - service_invocation.invocation_id, - &service_invocation.invocation_target, - idempotency.key.clone(), - ); - if self - .try_resolve_idempotent_request( - effects, - state, - &idempotency_id, - &service_invocation.invocation_id, - service_invocation.response_sink.as_ref(), - ) - .await? + if let Some(idempotency_key) = &service_invocation.idempotency_key { + if service_invocation.invocation_target.invocation_target_ty() + == InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) { - // Invocation was either resolved, or the sink was enqueued. Nothing else to do here. - return Ok(()); + warn!("The idempotency key for workflow methods is ignored!"); + } else { + let idempotency_id = IdempotencyId::combine( + service_invocation.invocation_id, + &service_invocation.invocation_target, + idempotency_key.clone(), + ); + if self + .try_resolve_idempotent_request( + effects, + state, + &idempotency_id, + &service_invocation.invocation_id, + service_invocation.response_sink.as_ref(), + ) + .await? + { + // Invocation was either resolved, or the sink was enqueued. Nothing else to do here. + return Ok(()); + } + // Idempotent invocation needs to be processed for the first time, let's roll! + effects.store_idempotency_id(idempotency_id, service_invocation.invocation_id); } - // Idempotent invocation needs to be processed for the first time, let's roll! - effects.store_idempotency_id(idempotency_id, service_invocation.invocation_id); } // If an execution_time is set, we schedule the invocation to be processed later @@ -258,7 +265,9 @@ where } // If it's exclusive, we need to acquire the exclusive lock - if service_invocation.invocation_target.handler_ty() == Some(HandlerType::Exclusive) { + if service_invocation.invocation_target.invocation_target_ty() + == InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) + { let keyed_service_id = service_invocation .invocation_target .as_keyed_service_id() @@ -285,6 +294,47 @@ where } } + if service_invocation.invocation_target.invocation_target_ty() + == InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) + { + let keyed_service_id = service_invocation + .invocation_target + .as_keyed_service_id() + .expect("When the handler type is Workflow, the invocation target must have a key"); + + let service_status = state.get_virtual_object_status(&keyed_service_id).await?; + + // If locked, then we check the original invocation + if let VirtualObjectStatus::Locked(original_invocation_id) = service_status { + if let Some(response_sink) = service_invocation.response_sink { + let invocation_status = + state.get_invocation_status(&original_invocation_id).await?; + + match invocation_status { + InvocationStatus::Completed( + CompletedInvocation { response_result, .. }) => { + self.send_response_to_sinks( + effects, + &service_invocation.invocation_id, + None, + iter::once(response_sink), + response_result, + ); + } + InvocationStatus::Free => panic!("Unexpected state, the InvocationStatus cannot be Free for invocation {} given it's in locked status", original_invocation_id), + is => effects.append_response_sink( + original_invocation_id, + is, + response_sink + ) + } + } + + // TODO ADD ATTACH INVOCATION ID NOTIFICATION TO INGRESS!!!!!! + return Ok(()); + } + } + // We're ready to invoke the service! effects.invoke_service(service_invocation); Ok(()) @@ -600,8 +650,8 @@ where } = inboxed_invocation; // Reply back to callers with error, and publish end trace - let idempotency_id = inboxed_invocation.idempotency.map(|idempotency| { - IdempotencyId::combine(invocation_id, &invocation_target, idempotency.key) + let idempotency_id = inboxed_invocation.idempotency_key.map(|idempotency| { + IdempotencyId::combine(invocation_id, &invocation_target, idempotency) }); self.send_response_to_sinks( @@ -830,20 +880,36 @@ where match Self::get_invocation_status_and_trace(state, &invocation_id, effects).await? { InvocationStatus::Completed(CompletedInvocation { invocation_target, - idempotency_key: Some(idempotency_key), + idempotency_key, .. }) => { effects.free_invocation(invocation_id); - // Also cleanup the associated idempotency key - effects.delete_idempotency_id(IdempotencyId::combine( - invocation_id, - &invocation_target, - idempotency_key, - )); - } - InvocationStatus::Completed(_) => { - // Just free the invocation - effects.free_invocation(invocation_id); + + // Also cleanup the associated idempotency key if any + if let Some(idempotency_key) = idempotency_key { + effects.delete_idempotency_id(IdempotencyId::combine( + invocation_id, + &invocation_target, + idempotency_key, + )); + } + + // For workflow, we should also clean up the service lock, associated state and promises. + if invocation_target.invocation_target_ty() + == InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) + { + let service_id = invocation_target + .as_keyed_service_id() + .expect("Workflow methods must have keyed service id"); + + effects.unlock_service_id(service_id.clone()); + effects.clear_all_state( + service_id.clone(), + invocation_id, + ServiceInvocationSpanContext::empty(), + ); + // TODO CLEANUP PROMISES + } } InvocationStatus::Free => { // Nothing to do @@ -1143,8 +1209,10 @@ where } fn try_pop_inbox(effects: &mut Effects, invocation_target: &InvocationTarget) { - // Inbox exists only for exclusive handler cases - if invocation_target.handler_ty() == Some(HandlerType::Exclusive) { + // Inbox exists only for virtual object exclusive handler cases + if invocation_target.invocation_target_ty() + == InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) + { effects.pop_inbox(invocation_target.as_keyed_service_id().expect( "When the handler type is Exclusive, the invocation target must have a key", )) @@ -1312,7 +1380,7 @@ where span_context, invocation_id: callee_invocation_id, invocation_target: callee_invocation_target, - .. + completion_retention_time, }) = enrichment_result { let_assert!( @@ -1320,18 +1388,25 @@ where journal_entry.deserialize_entry_ref::()? ); - let service_invocation = Self::create_service_invocation( - *callee_invocation_id, - callee_invocation_target.clone(), - request, - Source::Service( + let service_invocation = ServiceInvocation { + invocation_id: *callee_invocation_id, + invocation_target: callee_invocation_target.clone(), + argument: request.parameter, + source: Source::Service( invocation_id, invocation_metadata.invocation_target.clone(), ), - Some((invocation_id, entry_index)), - span_context.clone(), - None, - ); + response_sink: Some(ServiceInvocationResponseSink::partition_processor( + invocation_id, + entry_index, + )), + span_context: span_context.clone(), + headers: vec![], + execution_time: None, + completion_retention_time: *completion_retention_time, + idempotency_key: None, + }; + self.handle_outgoing_message( OutboxMessage::ServiceInvocation(service_invocation), effects, @@ -1347,7 +1422,7 @@ where invocation_id: callee_invocation_id, invocation_target: callee_invocation_target, span_context, - .. + completion_retention_time, } = enrichment_result; let_assert!( @@ -1364,15 +1439,21 @@ where Some(MillisSinceEpoch::new(invoke_time)) }; - let service_invocation = Self::create_service_invocation( - *callee_invocation_id, - callee_invocation_target.clone(), - request, - Source::Service(invocation_id, invocation_metadata.invocation_target.clone()), - None, - span_context.clone(), - delay, - ); + let service_invocation = ServiceInvocation { + invocation_id: *callee_invocation_id, + invocation_target: callee_invocation_target.clone(), + argument: request.parameter, + source: Source::Service( + invocation_id, + invocation_metadata.invocation_target.clone(), + ), + response_sink: None, + span_context: span_context.clone(), + headers: vec![], + execution_time: delay, + completion_retention_time: *completion_retention_time, + idempotency_key: None, + }; let pointer_span_id = match span_context.span_cause() { Some(SpanRelationCause::Linked(_, span_id)) => Some(*span_id), @@ -1588,40 +1669,6 @@ where } Ok(status) } - - #[allow(clippy::too_many_arguments)] - fn create_service_invocation( - invocation_id: InvocationId, - invocation_target: InvocationTarget, - invoke_request: InvokeRequest, - source: Source, - response_target: Option<(InvocationId, EntryIndex)>, - span_context: ServiceInvocationSpanContext, - execution_time: Option, - ) -> ServiceInvocation { - let InvokeRequest { parameter, .. } = invoke_request; - - let response_sink = if let Some((caller, entry_index)) = response_target { - Some(ServiceInvocationResponseSink::PartitionProcessor { - caller, - entry_index, - }) - } else { - None - }; - - ServiceInvocation { - invocation_id, - invocation_target, - argument: parameter, - source, - response_sink, - span_context, - headers: vec![], - execution_time, - idempotency: None, - } - } } /// Projected [`InvocationStatus`] for cancellation purposes. diff --git a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs index 8278ef5d1..6222e092f 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs @@ -455,7 +455,8 @@ async fn kill_inboxed_invocation() -> Result<(), Error> { span_context: Default::default(), headers: vec![], execution_time: None, - idempotency: None, + completion_retention_time: Default::default(), + idempotency_key: None, }), ); @@ -567,6 +568,7 @@ fn completed_invoke_entry(invocation_id: InvocationId) -> JournalEntry { enrichment_result: Some(CallEnrichmentResult { invocation_id, invocation_target: InvocationTarget::mock_service(), + completion_retention_time: None, span_context: ServiceInvocationSpanContext::empty(), }), }, @@ -580,6 +582,7 @@ fn background_invoke_entry(invocation_id: InvocationId) -> JournalEntry { enrichment_result: CallEnrichmentResult { invocation_id, invocation_target: InvocationTarget::mock_service(), + completion_retention_time: None, span_context: ServiceInvocationSpanContext::empty(), }, }, @@ -594,6 +597,7 @@ fn uncompleted_invoke_entry(invocation_id: InvocationId) -> JournalEntry { enrichment_result: Some(CallEnrichmentResult { invocation_id, invocation_target: InvocationTarget::mock_service(), + completion_retention_time: None, span_context: ServiceInvocationSpanContext::empty(), }), }, diff --git a/crates/worker/src/partition/state_machine/effect_interpreter.rs b/crates/worker/src/partition/state_machine/effect_interpreter.rs index b58d9b5c5..72323deaf 100644 --- a/crates/worker/src/partition/state_machine/effect_interpreter.rs +++ b/crates/worker/src/partition/state_machine/effect_interpreter.rs @@ -24,7 +24,9 @@ use restate_storage_api::service_status_table::VirtualObjectStatus; use restate_storage_api::timer_table::{Timer, TimerKey}; use restate_storage_api::Result as StorageResult; use restate_types::identifiers::{EntryIndex, InvocationId, ServiceId}; -use restate_types::invocation::{HandlerType, InvocationInput}; +use restate_types::invocation::{ + InvocationInput, InvocationTargetType, VirtualObjectHandlerType, WorkflowHandlerType, +}; use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal::raw::{PlainRawEntry, RawEntryCodec}; use restate_types::journal::{Completion, CompletionResult, EntryType}; @@ -303,6 +305,11 @@ impl EffectInterpreter { message, }); } + Effect::UnlockService(service_id) => { + state_storage + .store_service_status(&service_id, VirtualObjectStatus::Unlocked) + .await?; + } Effect::SetState { service_id, key, @@ -541,8 +548,13 @@ impl EffectInterpreter { // In our current data model, ServiceInvocation has always an input, so initial length is 1 in_flight_invocation_metadata.journal_metadata.length = 1; - if in_flight_invocation_metadata.invocation_target.handler_ty() - == Some(HandlerType::Exclusive) + let invocation_target_type = in_flight_invocation_metadata + .invocation_target + .invocation_target_ty(); + if invocation_target_type + == InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) + || invocation_target_type + == InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) { state_storage .store_service_status( diff --git a/crates/worker/src/partition/state_machine/effects.rs b/crates/worker/src/partition/state_machine/effects.rs index 06770761f..d9f6e6b9e 100644 --- a/crates/worker/src/partition/state_machine/effects.rs +++ b/crates/worker/src/partition/state_machine/effects.rs @@ -77,6 +77,9 @@ pub(crate) enum Effect { sequence_number: MessageIndex, }, + // Lock status + UnlockService(ServiceId), + // State SetState { service_id: ServiceId, @@ -338,6 +341,13 @@ impl Effect { "Effect: Delete inbox entry", ); } + Effect::UnlockService(service_id) => { + debug_if_leader!( + is_leader, + rpc.service = %service_id.service_name, + "Effect: Unlock service id", + ); + } Effect::TruncateOutbox(seq_number) => { trace!(restate.outbox.seq = seq_number, "Effect: Truncate outbox") } @@ -747,6 +757,10 @@ impl Effects { self.effects.push(Effect::FreeInvocation(invocation_id)) } + pub(crate) fn unlock_service_id(&mut self, service_id: ServiceId) { + self.effects.push(Effect::UnlockService(service_id)) + } + pub(crate) fn enqueue_into_inbox(&mut self, seq_number: MessageIndex, inbox_entry: InboxEntry) { self.effects.push(Effect::EnqueueIntoInbox { seq_number, diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index ccccd3362..01ba5339a 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -115,8 +115,8 @@ mod tests { use restate_types::identifiers::{InvocationId, PartitionId, PartitionKey, ServiceId}; use restate_types::ingress::IngressResponse; use restate_types::invocation::{ - HandlerType, InvocationResponse, InvocationTarget, InvocationTermination, ResponseResult, - ServiceInvocation, ServiceInvocationResponseSink, Source, + InvocationResponse, InvocationTarget, InvocationTermination, ResponseResult, + ServiceInvocation, ServiceInvocationResponseSink, Source, VirtualObjectHandlerType, }; use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal::{Completion, CompletionResult, EntryResult}; @@ -251,8 +251,12 @@ mod tests { .run_in_scope("mock-state-machine", None, MockStateMachine::create()) .await; - let invocation_target = - InvocationTarget::virtual_object("MySvc", "MyKey", "MyHandler", HandlerType::Shared); + let invocation_target = InvocationTarget::virtual_object( + "MySvc", + "MyKey", + "MyHandler", + VirtualObjectHandlerType::Shared, + ); // Let's lock the virtual object let mut tx = state_machine.rocksdb_storage.transaction(); @@ -659,7 +663,8 @@ mod tests { span_context: Default::default(), headers: vec![], execution_time: None, - idempotency: None, + completion_retention_time: None, + idempotency_key: None, })) .await; assert_that!( @@ -732,7 +737,7 @@ mod tests { use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind}; use restate_types::errors::GONE_INVOCATION_ERROR; use restate_types::identifiers::IdempotencyId; - use restate_types::invocation::{Idempotency, InvocationTarget}; + use restate_types::invocation::InvocationTarget; use restate_wal_protocol::timer::TimerKeyValue; use test_log::test; @@ -746,17 +751,15 @@ mod tests { .run_in_scope("mock-state-machine", None, MockStateMachine::create()) .await; - let idempotency = Idempotency { - key: ByteString::from_static("my-idempotency-key"), - retention: Duration::from_secs(60) * 60 * 24, - }; + let idempotency_key = ByteString::from_static("my-idempotency-key"); + let retention = Duration::from_secs(60) * 60 * 24; let invocation_target = InvocationTarget::mock_virtual_object(); let invocation_id = InvocationId::generate_with_idempotency_key( &invocation_target, - Some(idempotency.key.clone()), + Some(idempotency_key.clone()), ); let idempotency_id = - IdempotencyId::combine(invocation_id, &invocation_target, idempotency.key.clone()); + IdempotencyId::combine(invocation_id, &invocation_target, idempotency_key.clone()); // Send fresh invocation with idempotency key let actions = state_machine @@ -766,7 +769,8 @@ mod tests { response_sink: Some(ServiceInvocationResponseSink::Ingress( GenerationalNodeId::new(1, 1), )), - idempotency: Some(idempotency), + idempotency_key: Some(idempotency_key), + completion_retention_time: Some(retention), ..ServiceInvocation::mock() })) .await; @@ -851,17 +855,14 @@ mod tests { .run_in_scope("mock-state-machine", None, MockStateMachine::create()) .await; - let idempotency = Idempotency { - key: ByteString::from_static("my-idempotency-key"), - retention: Duration::from_secs(60) * 60 * 24, - }; + let idempotency_key = ByteString::from_static("my-idempotency-key"); let invocation_target = InvocationTarget::mock_virtual_object(); let invocation_id = InvocationId::generate_with_idempotency_key( &invocation_target, - Some(idempotency.key.clone()), + Some(idempotency_key.clone()), ); let idempotency_id = - IdempotencyId::combine(invocation_id, &invocation_target, idempotency.key.clone()); + IdempotencyId::combine(invocation_id, &invocation_target, idempotency_key.clone()); let response_bytes = Bytes::from_static(b"123"); let ingress_id = GenerationalNodeId::new(1, 1); @@ -875,7 +876,7 @@ mod tests { InvocationStatus::Completed(CompletedInvocation { invocation_target: invocation_target.clone(), source: Source::Ingress, - idempotency_key: Some(idempotency.key.clone()), + idempotency_key: Some(idempotency_key.clone()), timestamps: StatusTimestamps::now(), response_result: ResponseResult::Success(response_bytes.clone()), }), @@ -886,14 +887,14 @@ mod tests { // Send a request, should be completed immediately with result let second_invocation_id = InvocationId::generate_with_idempotency_key( &invocation_target, - Some(idempotency.key.clone()), + Some(idempotency_key.clone()), ); let actions = state_machine .apply(Command::Invoke(ServiceInvocation { invocation_id: second_invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress(ingress_id)), - idempotency: Some(idempotency), + idempotency_key: Some(idempotency_key), ..ServiceInvocation::mock() })) .await; @@ -926,17 +927,15 @@ mod tests { .run_in_scope("mock-state-machine", None, MockStateMachine::create()) .await; - let idempotency = Idempotency { - key: ByteString::from_static("my-idempotency-key"), - retention: Duration::from_secs(60) * 60 * 24, - }; + let idempotency_key = ByteString::from_static("my-idempotency-key"); + let retention = Duration::from_secs(60) * 60 * 24; let invocation_target = InvocationTarget::mock_virtual_object(); let invocation_id = InvocationId::generate_with_idempotency_key( &invocation_target, - Some(idempotency.key.clone()), + Some(idempotency_key.clone()), ); let idempotency_id = - IdempotencyId::combine(invocation_id, &invocation_target, idempotency.key.clone()); + IdempotencyId::combine(invocation_id, &invocation_target, idempotency_key.clone()); let ingress_id = GenerationalNodeId::new(1, 1); @@ -949,14 +948,15 @@ mod tests { // Send a request, should be completed immediately with result let second_invocation_id = InvocationId::generate_with_idempotency_key( &invocation_target, - Some(idempotency.key.clone()), + Some(idempotency_key.clone()), ); let actions = state_machine .apply(Command::Invoke(ServiceInvocation { invocation_id: second_invocation_id, invocation_target, response_sink: Some(ServiceInvocationResponseSink::Ingress(ingress_id)), - idempotency: Some(idempotency), + idempotency_key: Some(idempotency_key), + completion_retention_time: Some(retention), ..ServiceInvocation::mock() })) .await; @@ -989,19 +989,17 @@ mod tests { .run_in_scope("mock-state-machine", None, MockStateMachine::create()) .await; - let idempotency = Idempotency { - key: ByteString::from_static("my-idempotency-key"), - retention: Duration::from_secs(60) * 60 * 24, - }; + let idempotency_key = ByteString::from_static("my-idempotency-key"); + let retention = Duration::from_secs(60) * 60 * 24; let invocation_target = InvocationTarget::mock_virtual_object(); let first_invocation_id = InvocationId::generate_with_idempotency_key( &invocation_target, - Some(idempotency.key.clone()), + Some(idempotency_key.clone()), ); let idempotency_id = IdempotencyId::combine( first_invocation_id, &invocation_target, - idempotency.key.clone(), + idempotency_key.clone(), ); let ingress_id_1 = GenerationalNodeId::new(1, 1); @@ -1013,7 +1011,8 @@ mod tests { invocation_id: first_invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress(ingress_id_1)), - idempotency: Some(idempotency.clone()), + idempotency_key: Some(idempotency_key.clone()), + completion_retention_time: Some(retention), ..ServiceInvocation::mock() })) .await; @@ -1028,14 +1027,14 @@ mod tests { // Latch to existing invocation let second_invocation_id = InvocationId::generate_with_idempotency_key( &invocation_target, - Some(idempotency.key.clone()), + Some(idempotency_key.clone()), ); let actions = state_machine .apply(Command::Invoke(ServiceInvocation { invocation_id: second_invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress(ingress_id_2)), - idempotency: Some(idempotency), + idempotency_key: Some(idempotency_key), ..ServiceInvocation::mock() })) .await; @@ -1089,17 +1088,14 @@ mod tests { .run_in_scope("mock-state-machine", None, MockStateMachine::create()) .await; - let idempotency = Idempotency { - key: ByteString::from_static("my-idempotency-key"), - retention: Duration::from_secs(60) * 60 * 24, - }; + let idempotency_key = ByteString::from_static("my-idempotency-key"); let invocation_target = InvocationTarget::mock_virtual_object(); let invocation_id = InvocationId::generate_with_idempotency_key( &invocation_target, - Some(idempotency.key.clone()), + Some(idempotency_key.clone()), ); let idempotency_id = - IdempotencyId::combine(invocation_id, &invocation_target, idempotency.key.clone()); + IdempotencyId::combine(invocation_id, &invocation_target, idempotency_key.clone()); // Prepare idempotency metadata and completed status let mut txn = state_machine.storage().transaction(); @@ -1110,7 +1106,7 @@ mod tests { InvocationStatus::Completed(CompletedInvocation { invocation_target, source: Source::Ingress, - idempotency_key: Some(idempotency.key.clone()), + idempotency_key: Some(idempotency_key.clone()), timestamps: StatusTimestamps::now(), response_result: ResponseResult::Success(Bytes::from_static(b"123")), }), @@ -1151,6 +1147,220 @@ mod tests { } } + mod workflow { + use super::*; + use std::time::Duration; + + use restate_storage_api::invocation_status_table::{CompletedInvocation, StatusTimestamps}; + use restate_storage_api::service_status_table::ReadOnlyVirtualObjectStatusTable; + use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind}; + use restate_types::invocation::InvocationTarget; + use restate_wal_protocol::timer::TimerKeyValue; + use test_log::test; + + #[test(tokio::test)] + async fn start_workflow_method() { + let tc = TaskCenterBuilder::default() + .default_runtime_handle(tokio::runtime::Handle::current()) + .build() + .expect("task_center builds"); + let mut state_machine = tc + .run_in_scope("mock-state-machine", None, MockStateMachine::create()) + .await; + + let invocation_target = InvocationTarget::mock_workflow(); + let invocation_id = InvocationId::mock_random(); + + // Send fresh invocation + let actions = state_machine + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + invocation_target: invocation_target.clone(), + completion_retention_time: Some(Duration::from_secs(60)), + response_sink: Some(ServiceInvocationResponseSink::Ingress( + GenerationalNodeId::new(1, 1), + )), + ..ServiceInvocation::mock() + })) + .await; + assert_that!( + actions, + contains(pat!(Action::Invoke { + invocation_id: eq(invocation_id), + invoke_input_journal: pat!(InvokeInputJournal::CachedJournal(_, _)) + })) + ); + + // Assert service is locked + let mut txn = state_machine.storage().transaction(); + assert_that!( + txn.get_virtual_object_status(&invocation_target.as_keyed_service_id().unwrap()) + .await + .unwrap(), + eq(VirtualObjectStatus::Locked(invocation_id)) + ); + txn.commit().await.unwrap(); + + // Sending another invocation won't re-execute + let actions = state_machine + .apply(Command::Invoke(ServiceInvocation { + invocation_id: InvocationId::mock_random(), + invocation_target: invocation_target.clone(), + response_sink: Some(ServiceInvocationResponseSink::Ingress( + GenerationalNodeId::new(2, 2), + )), + ..ServiceInvocation::mock() + })) + .await; + assert_that!( + actions, + not(contains(pat!(Action::Invoke { + invocation_id: eq(invocation_id), + invoke_input_journal: pat!(InvokeInputJournal::CachedJournal(_, _)) + }))) + ); + + // Send output, then end + let response_bytes = Bytes::from_static(b"123"); + let actions = state_machine + .apply_multiple([ + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::JournalEntry { + entry_index: 1, + entry: ProtobufRawEntryCodec::serialize_enriched(Entry::output( + EntryResult::Success(response_bytes.clone()), + )), + }, + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::End, + }), + ]) + .await; + + // Assert response and cleanup timer + assert_that!( + actions, + all!( + contains(pat!(Action::IngressResponse(pat!(IngressResponse { + target_node: eq(GenerationalNodeId::new(1, 1)), + invocation_id: eq(invocation_id), + response: eq(ResponseResult::Success(response_bytes.clone())) + })))), + contains(pat!(Action::IngressResponse(pat!(IngressResponse { + target_node: eq(GenerationalNodeId::new(2, 2)), + invocation_id: eq(invocation_id), + response: eq(ResponseResult::Success(response_bytes.clone())) + })))), + contains(pat!(Action::ScheduleInvocationStatusCleanup { + invocation_id: eq(invocation_id) + })) + ) + ); + + // InvocationStatus contains completed + let invocation_status = state_machine + .storage() + .transaction() + .get_invocation_status(&invocation_id) + .await + .unwrap(); + assert_that!( + invocation_status, + pat!(InvocationStatus::Completed(pat!(CompletedInvocation { + response_result: eq(ResponseResult::Success(response_bytes.clone())) + }))) + ); + + // Sending a new request will be completed immediately + let actions = state_machine + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + invocation_target: invocation_target.clone(), + response_sink: Some(ServiceInvocationResponseSink::Ingress( + GenerationalNodeId::new(1, 1), + )), + ..ServiceInvocation::mock() + })) + .await; + assert_that!( + actions, + contains(pat!(Action::IngressResponse(pat!(IngressResponse { + target_node: eq(GenerationalNodeId::new(1, 1)), + invocation_id: eq(invocation_id), + response: eq(ResponseResult::Success(response_bytes)) + })))) + ); + } + + #[test(tokio::test)] + async fn timer_cleanup() { + let tc = TaskCenterBuilder::default() + .default_runtime_handle(tokio::runtime::Handle::current()) + .build() + .expect("task_center builds"); + let mut state_machine = tc + .run_in_scope("mock-state-machine", None, MockStateMachine::create()) + .await; + + let invocation_target = InvocationTarget::mock_workflow(); + let invocation_id = InvocationId::mock_random(); + + // Prepare idempotency metadata and completed status + let mut txn = state_machine.storage().transaction(); + txn.put_invocation_status( + &invocation_id, + InvocationStatus::Completed(CompletedInvocation { + invocation_target: invocation_target.clone(), + source: Source::Ingress, + idempotency_key: None, + timestamps: StatusTimestamps::now(), + response_result: ResponseResult::Success(Bytes::from_static(b"123")), + }), + ) + .await; + txn.put_virtual_object_status( + &invocation_target.as_keyed_service_id().unwrap(), + VirtualObjectStatus::Locked(invocation_id), + ) + .await; + txn.commit().await.unwrap(); + + // Send timer fired command + let _ = state_machine + .apply(Command::Timer(TimerKeyValue::new( + TimerKey { + kind: TimerKeyKind::Invoke { + invocation_uuid: invocation_id.invocation_uuid(), + }, + timestamp: 0, + }, + Timer::CleanInvocationStatus(invocation_id), + ))) + .await; + assert_that!( + state_machine + .storage() + .transaction() + .get_invocation_status(&invocation_id) + .await + .unwrap(), + pat!(InvocationStatus::Free) + ); + assert_that!( + state_machine + .storage() + .transaction() + .get_virtual_object_status(&invocation_target.as_keyed_service_id().unwrap()) + .await + .unwrap(), + pat!(VirtualObjectStatus::Unlocked) + ); + } + } + async fn mock_start_invocation_with_service_id( state_machine: &mut MockStateMachine, service_id: ServiceId, @@ -1178,7 +1388,8 @@ mod tests { span_context: Default::default(), headers: vec![], execution_time: None, - idempotency: None, + completion_retention_time: None, + idempotency_key: None, })) .await;