Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an edit deployment API #2515

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions crates/admin-rest-model/src/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,66 @@ pub struct DetailedDeploymentResponse {
/// List of services exposed by this deployment.
pub services: Vec<ServiceMetadata>,
}

// RegisterDeploymentRequest except without `force`
#[serde_as]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum UpdateDeploymentRequest {
Http {
/// # Uri
///
/// Uri to use to discover/invoke the http deployment.
#[serde_as(as = "serde_with::DisplayFromStr")]
#[cfg_attr(feature = "schema", schemars(with = "String"))]
uri: Uri,

/// # Additional headers
///
/// Additional headers added to the discover/invoke requests to the deployment.
///
additional_headers: Option<SerdeableHeaderHashMap>,

/// # Use http1.1
///
/// If `true`, discovery will be attempted using a client that defaults to HTTP1.1
/// instead of a prior-knowledge HTTP2 client. HTTP2 may still be used for TLS servers
/// that advertise HTTP2 support via ALPN. HTTP1.1 deployments will only work in
/// request-response mode.
///
#[serde(default = "restate_serde_util::default::bool::<false>")]
use_http_11: bool,

/// # Dry-run mode
///
/// If `true`, discovery will run but the deployment will not be registered.
/// This is useful to see the impact of a new deployment before registering it.
#[serde(default = "restate_serde_util::default::bool::<false>")]
dry_run: bool,
},
Lambda {
/// # ARN
///
/// ARN to use to discover/invoke the lambda deployment.
arn: String,

/// # Assume role ARN
///
/// Optional ARN of a role to assume when invoking the addressed Lambda, to support role chaining
assume_role_arn: Option<String>,

/// # Additional headers
///
/// Additional headers added to the discover/invoke requests to the deployment.
///
additional_headers: Option<SerdeableHeaderHashMap>,

/// # Dry-run mode
///
/// If `true`, discovery will run but the deployment will not be registered.
/// This is useful to see the impact of a new deployment before registering it.
#[serde(default = "restate_serde_util::default::bool::<false>")]
dry_run: bool,
},
}
99 changes: 99 additions & 0 deletions crates/admin/src/rest_api/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,102 @@ pub async fn delete_deployment<V>(
Ok(StatusCode::NOT_IMPLEMENTED)
}
}

/// Update a deployment
#[openapi(
summary = "Update deployment",
description = "Update deployment. Invokes the endpoint and replaces the existing deployment metadata with the discovered information. This is a dangerous operation that should be used only when there are failing invocations on the deployment that cannot be resolved any other way. Sense checks are applied to test that the new deployment is sufficiently similar to the old one.",
operation_id = "update_deployment",
tags = "deployment",
parameters(path(
name = "deployment",
description = "Deployment identifier",
schema = "std::string::String"
))
)]
pub async fn update_deployment<V>(
State(state): State<AdminServiceState<V>>,
Extension(version): Extension<AdminApiVersion>,
Path(deployment_id): Path<DeploymentId>,
#[request_body(required = true)] Json(payload): Json<UpdateDeploymentRequest>,
) -> Result<Json<DetailedDeploymentResponse>, MetaApiError> {
let (discover_endpoint, dry_run) = match payload {
UpdateDeploymentRequest::Http {
uri,
additional_headers,
use_http_11,
dry_run,
} => {
// Verify URI is absolute!
if uri.scheme().is_none() || uri.authority().is_none() {
return Err(MetaApiError::InvalidField(
"uri",
format!(
"The provided uri {uri} is not absolute, only absolute URIs can be used."
),
));
}

let is_using_https = uri.scheme().unwrap() == &Scheme::HTTPS;

(
DiscoverEndpoint::new(
Endpoint::Http(
uri,
if use_http_11 {
Some(http::Version::HTTP_11)
} else if is_using_https {
// ALPN will sort this out
None
} else {
// By default, we use h2c on HTTP
Some(http::Version::HTTP_2)
},
),
additional_headers.unwrap_or_default().into(),
),
dry_run,
)
}
UpdateDeploymentRequest::Lambda {
arn,
assume_role_arn,
additional_headers,
dry_run,
} => (
DiscoverEndpoint::new(
Endpoint::Lambda(
arn.parse().map_err(|e: InvalidLambdaARN| {
MetaApiError::InvalidField("arn", e.to_string())
})?,
assume_role_arn.map(Into::into),
),
additional_headers.unwrap_or_default().into(),
),
dry_run,
),
};

let apply_mode = if dry_run {
ApplyMode::DryRun
} else {
ApplyMode::Apply
};

let (deployment, services) = state
.schema_registry
.update_deployment(deployment_id, discover_endpoint, apply_mode)
.await
.inspect_err(|e| warn_it!(e))?;

Ok(Json(
restate_admin_rest_model::converters::convert_detailed_deployment_response(
version,
DetailedDeploymentResponse {
id: deployment.id,
deployment: deployment.metadata.into(),
services,
},
),
))
}
9 changes: 3 additions & 6 deletions crates/admin/src/rest_api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::schema_registry::error::{
DeploymentError, SchemaError, SchemaRegistryError, ServiceError,
};
use crate::schema_registry::error::{SchemaError, SchemaRegistryError, ServiceError};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
Expand Down Expand Up @@ -77,9 +75,8 @@ impl IntoResponse for MetaApiError {
MetaApiError::Schema(schema_error) => match schema_error {
SchemaError::NotFound(_) => StatusCode::NOT_FOUND,
SchemaError::Override(_)
| SchemaError::Service(ServiceError::DifferentType { .. })
| SchemaError::Service(ServiceError::RemovedHandlers { .. })
| SchemaError::Deployment(DeploymentError::IncorrectId { .. }) => {
| SchemaError::Service(ServiceError::DifferentServiceType { .. })
| SchemaError::Service(ServiceError::RemovedHandlers { .. }) => {
StatusCode::CONFLICT
}
SchemaError::Service(_) => StatusCode::BAD_REQUEST,
Expand Down
5 changes: 5 additions & 0 deletions crates/admin/src/rest_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod services;
mod subscriptions;
mod version;

use axum_integration::put;
use okapi_operation::axum_integration::{delete, get, patch, post};
use okapi_operation::*;
use restate_types::identifiers::PartitionKey;
Expand Down Expand Up @@ -49,6 +50,10 @@ where
"/deployments/:deployment",
delete(openapi_handler!(deployments::delete_deployment)),
)
.route(
"/deployments/:deployment",
put(openapi_handler!(deployments::update_deployment)),
)
.route("/services", get(openapi_handler!(services::list_services)))
.route(
"/services/:service",
Expand Down
14 changes: 7 additions & 7 deletions crates/admin/src/schema_registry/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub enum ServiceError {
ReservedName(String),
#[error("detected a new service '{0}' revision with a service type different from the previous revision. Service type cannot be changed across revisions")]
#[code(restate_errors::META0006)]
DifferentType(ServiceName),
DifferentServiceType(ServiceName),
#[error("the service '{0}' already exists but the new revision removed the handlers {1:?}")]
#[code(restate_errors::META0006)]
RemovedHandlers(ServiceName, Vec<String>),
Expand Down Expand Up @@ -135,12 +135,12 @@ pub enum SubscriptionError {

#[derive(Debug, thiserror::Error, codederror::CodedError)]
pub enum DeploymentError {
#[error("existing deployment id is different from requested (requested = {requested}, existing = {existing})")]
#[code(restate_errors::META0004)]
IncorrectId {
requested: DeploymentId,
existing: DeploymentId,
},
#[error("an update deployment operation must provide an endpoint with the same services and handlers. The update tried to remove the services {0:?}")]
#[code(restate_errors::META0006)]
RemovedServices(Vec<String>),
#[error("multiple deployments ({0:?}) were found that reference the discovered endpoint. a deployment can only be force updated when it uniquely owns its endpoint. First delete one or more of the deployments")]
#[code(restate_errors::META0006)]
MultipleExistingDeployments(Vec<DeploymentId>),
}

impl From<ReadModifyWriteError<SchemaError>> for SchemaRegistryError {
Expand Down
85 changes: 82 additions & 3 deletions crates/admin/src/schema_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<V> SchemaRegistry<V> {
apply_mode: ApplyMode,
) -> Result<(DeploymentId, Vec<ServiceMetadata>), SchemaRegistryError> {
// The number of concurrent discovery calls is bound by the number of concurrent
// register_deployment calls. If it should become a problem that a user tries to register
// {register,update}_deployment calls. If it should become a problem that a user tries to register
// the same endpoint too often, then we need to add a synchronization mechanism which
// ensures that only a limited number of discover calls per endpoint are running.
let discovered_metadata = self.service_discovery.discover(discover_endpoint).await?;
Expand Down Expand Up @@ -135,7 +135,6 @@ impl<V> SchemaRegistry<V> {
// suppress logging output in case of a dry run
let id = tracing::subscriber::with_default(NoSubscriber::new(), || {
updater.add_deployment(
None,
deployment_metadata,
discovered_metadata.services,
force.force_enabled(),
Expand All @@ -162,7 +161,6 @@ impl<V> SchemaRegistry<V> {
);

new_deployment_id = Some(updater.add_deployment(
None,
deployment_metadata.clone(),
discovered_metadata.services.clone(),
force.force_enabled(),
Expand All @@ -187,6 +185,87 @@ impl<V> SchemaRegistry<V> {
Ok((id, services))
}

pub async fn update_deployment(
&self,
deployment_id: DeploymentId,
discover_endpoint: DiscoverEndpoint,
apply_mode: ApplyMode,
) -> Result<(Deployment, Vec<ServiceMetadata>), SchemaRegistryError> {
// The number of concurrent discovery calls is bound by the number of concurrent
// {register,update}_deployment calls. If it should become a problem that a user tries to register
// the same endpoint too often, then we need to add a synchronization mechanism which
// ensures that only a limited number of discover calls per endpoint are running.
let discovered_metadata = self.service_discovery.discover(discover_endpoint).await?;

let deployment_metadata = match discovered_metadata.endpoint {
DiscoveredEndpoint::Http(uri, http_version) => DeploymentMetadata::new_http(
uri.clone(),
discovered_metadata.protocol_type,
http_version,
DeliveryOptions::new(discovered_metadata.headers),
discovered_metadata.supported_protocol_versions,
),
DiscoveredEndpoint::Lambda(arn, assume_role_arn) => DeploymentMetadata::new_lambda(
arn,
assume_role_arn,
DeliveryOptions::new(discovered_metadata.headers),
discovered_metadata.supported_protocol_versions,
),
};

if !apply_mode.should_apply() {
let mut updater = SchemaUpdater::new(
Metadata::with_current(|m| m.schema()).deref().clone(),
self.experimental_feature_kafka_ingress_next,
);

// suppress logging output in case of a dry run
tracing::subscriber::with_default(NoSubscriber::new(), || {
updater.update_deployment(
deployment_id,
deployment_metadata,
discovered_metadata.services,
)
})?;

let schema_information = updater.into_inner();
Ok(schema_information
.get_deployment_and_services(&deployment_id)
.expect("deployment was just added"))
} else {
let schema_information = self
.metadata_writer
.metadata_store_client()
.read_modify_write(
SCHEMA_INFORMATION_KEY.clone(),
|schema_information: Option<Schema>| {
let mut updater = SchemaUpdater::new(
schema_information.unwrap_or_default(),
self.experimental_feature_kafka_ingress_next,
);

updater.update_deployment(
deployment_id,
deployment_metadata.clone(),
discovered_metadata.services.clone(),
)?;
Ok(updater.into_inner())
},
)
.await?;

let (deployment, services) = schema_information
.get_deployment_and_services(&deployment_id)
.expect("deployment was just updated");

self.metadata_writer
.update(Arc::new(schema_information))
.await?;

Ok((deployment, services))
}
}

pub async fn delete_deployment(
&self,
deployment_id: DeploymentId,
Expand Down
Loading
Loading