Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable workflow request/response calls from ingress, and invoke them only once from context client #1603

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub mod codes {
pub struct InvocationError {
code: InvocationErrorCode,
message: Cow<'static, str>,
#[serde(skip_serializing_if = "Option::is_none")]
description: Option<Cow<'static, str>>,
}

Expand Down Expand Up @@ -214,6 +215,9 @@ pub const ATTACH_NOT_SUPPORTED_INVOCATION_ERROR: InvocationError =
pub const ALREADY_COMPLETED_INVOCATION_ERROR: InvocationError =
InvocationError::new_static(codes::CONFLICT, "promise was already completed");

pub const WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR: InvocationError =
InvocationError::new_static(codes::CONFLICT, "the workflow method was already invoked");

/// Error parsing/decoding a resource ID.
#[derive(Debug, thiserror::Error, Clone, Eq, PartialEq)]
pub enum IdDecodeError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use restate_storage_api::Result as StorageResult;
use restate_types::errors::{
InvocationError, InvocationErrorCode, ALREADY_COMPLETED_INVOCATION_ERROR,
ATTACH_NOT_SUPPORTED_INVOCATION_ERROR, CANCELED_INVOCATION_ERROR, GONE_INVOCATION_ERROR,
KILLED_INVOCATION_ERROR, NOT_FOUND_INVOCATION_ERROR,
KILLED_INVOCATION_ERROR, NOT_FOUND_INVOCATION_ERROR, WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR,
};
use restate_types::identifiers::{
EntryIndex, IdempotencyId, InvocationId, JournalEntryId, PartitionKey, ServiceId,
Expand Down Expand Up @@ -328,27 +328,38 @@ where
// If locked, then we check the original invocation
if let VirtualObjectStatus::Locked(original_invocation_id) = service_status {
if let Some(response_sink) = service_invocation.response_sink {
let invocation_status =
state.get_invocation_status(&original_invocation_id).await?;

match invocation_status {
InvocationStatus::Completed(
CompletedInvocation { response_result, .. }) => {
self.send_response_to_sinks(
effects,
iter::once(response_sink),
response_result,
Some(service_invocation.invocation_id),
Some(&service_invocation.invocation_target),
);
}
InvocationStatus::Free => panic!("Unexpected state, the InvocationStatus cannot be Free for invocation {} given it's in locked status", original_invocation_id),
is => effects.append_response_sink(
original_invocation_id,
is,
response_sink
)
}
// --- ATTACH business logic below, this is currently disabled due to the pending discussion about equality check.
// We instead simply fail the invocation with CONFLICT status code
//
// let invocation_status =
// state.get_invocation_status(&original_invocation_id).await?;
//
// match invocation_status {
// InvocationStatus::Completed(
// CompletedInvocation { response_result, .. }) => {
// self.send_response_to_sinks(
// effects,
// iter::once(response_sink),
// response_result,
// Some(service_invocation.invocation_id),
// Some(&service_invocation.invocation_target),
// );
// }
// InvocationStatus::Free => panic!("Unexpected state, the InvocationStatus cannot be Free for invocation {} given it's in locked status", original_invocation_id),
// is => effects.append_response_sink(
// original_invocation_id,
// is,
// response_sink
// )
// }

self.send_response_to_sinks(
effects,
iter::once(response_sink),
ResponseResult::Failure(WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR),
Some(original_invocation_id),
Some(&service_invocation.invocation_target),
);
}

Self::send_submit_notification_if_needed(
Expand Down
52 changes: 34 additions & 18 deletions crates/worker/src/partition/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,7 @@ mod tests {
use restate_storage_api::invocation_status_table::{CompletedInvocation, StatusTimestamps};
use restate_storage_api::service_status_table::ReadOnlyVirtualObjectStatusTable;
use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind};
use restate_types::errors::WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR;
use restate_types::invocation::{
AttachInvocationRequest, InvocationQuery, InvocationTarget,
};
Expand Down Expand Up @@ -1684,10 +1685,25 @@ mod tests {
.await;
assert_that!(
actions,
not(contains(pat!(Action::Invoke {
invocation_id: eq(invocation_id),
invoke_input_journal: pat!(InvokeInputJournal::CachedJournal(_, _))
})))
all!(
not(contains(pat!(Action::Invoke {
invocation_id: eq(invocation_id),
invoke_input_journal: pat!(InvokeInputJournal::CachedJournal(_, _))
}))),
// We get back this error due to the fact that we disabled the attach semantics
contains(pat!(Action::IngressResponse(pat!(
IngressResponseEnvelope {
target_node: eq(node_id),
inner: pat!(ingress::InvocationResponse {
request_id: eq(request_id_2),
invocation_id: some(eq(invocation_id)),
response: eq(IngressResponseResult::Failure(
WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR
))
})
}
))))
)
);

// Send output, then end
Expand Down Expand Up @@ -1727,7 +1743,8 @@ mod tests {
})
}
)))),
contains(pat!(Action::IngressResponse(pat!(
// This is a not() because we currently disabled the attach semantics on request/response
not(contains(pat!(Action::IngressResponse(pat!(
IngressResponseEnvelope {
target_node: eq(node_id),
inner: pat!(ingress::InvocationResponse {
Expand All @@ -1739,7 +1756,7 @@ mod tests {
))
})
}
)))),
))))),
contains(pat!(Action::ScheduleInvocationStatusCleanup {
invocation_id: eq(invocation_id)
}))
Expand All @@ -1760,7 +1777,7 @@ mod tests {
})))
);

// Sending a new request will be completed immediately
// Sending a new request will not be completed because we don't support attach semantics
let request_id_3 = IngressRequestId::default();
let actions = state_machine
.apply(Command::Invoke(ServiceInvocation {
Expand All @@ -1779,11 +1796,10 @@ mod tests {
IngressResponseEnvelope {
target_node: eq(node_id),
inner: pat!(ingress::InvocationResponse {
invocation_id: some(eq(invocation_id)),
request_id: eq(request_id_3),
response: eq(IngressResponseResult::Success(
invocation_target.clone(),
response_bytes.clone()
invocation_id: some(eq(invocation_id)),
response: eq(IngressResponseResult::Failure(
WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR
))
})
}
Expand Down Expand Up @@ -1919,16 +1935,16 @@ mod tests {
})))
);

// Sending a new request will be completed immediately
// Sending another attach will be completed immediately
let actions = state_machine
.apply(Command::Invoke(ServiceInvocation {
invocation_id,
invocation_target: invocation_target.clone(),
response_sink: Some(ServiceInvocationResponseSink::Ingress {
.apply(Command::AttachInvocation(AttachInvocationRequest {
invocation_query: InvocationQuery::Workflow(
invocation_target.as_keyed_service_id().unwrap(),
),
response_sink: ServiceInvocationResponseSink::Ingress {
node_id,
request_id: request_id_3,
}),
..ServiceInvocation::mock()
},
}))
.await;
assert_that!(
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/local-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Optionally, you can install [just](/~https://github.com/casey/just) to make use of
To setup these on Fedora, run:

```
sudo dnf install clang lld lldb libcxx cmake openssl-devel rocksdb-devel protobuf-compiler just
sudo dnf install clang lld lldb libcxx cmake openssl-devel rocksdb-devel protobuf-compiler just liburing-devel
```

On MacOS, you can use [homebrew](https://brew.sh)
Expand Down
Loading