Skip to content

Commit

Permalink
Add an edit deployment API
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Jan 20, 2025
1 parent abf1b7f commit bed347c
Show file tree
Hide file tree
Showing 9 changed files with 961 additions and 174 deletions.
63 changes: 63 additions & 0 deletions crates/admin-rest-model/src/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,66 @@ pub struct DetailedDeploymentResponse {
/// List of services exposed by this deployment.
pub services: Vec<ServiceMetadata>,
}

// RegisterDeploymentRequest except without `force`
#[serde_as]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum UpdateDeploymentRequest {
Http {
/// # Uri
///
/// Uri to use to discover/invoke the http deployment.
#[serde_as(as = "serde_with::DisplayFromStr")]
#[cfg_attr(feature = "schema", schemars(with = "String"))]
uri: Uri,

/// # Additional headers
///
/// Additional headers added to the discover/invoke requests to the deployment.
///
additional_headers: Option<SerdeableHeaderHashMap>,

/// # Use http1.1
///
/// If `true`, discovery will be attempted using a client that defaults to HTTP1.1
/// instead of a prior-knowledge HTTP2 client. HTTP2 may still be used for TLS servers
/// that advertise HTTP2 support via ALPN. HTTP1.1 deployments will only work in
/// request-response mode.
///
#[serde(default = "restate_serde_util::default::bool::<false>")]
use_http_11: bool,

/// # Dry-run mode
///
/// If `true`, discovery will run but the deployment will not be registered.
/// This is useful to see the impact of a new deployment before registering it.
#[serde(default = "restate_serde_util::default::bool::<false>")]
dry_run: bool,
},
Lambda {
/// # ARN
///
/// ARN to use to discover/invoke the lambda deployment.
arn: String,

/// # Assume role ARN
///
/// Optional ARN of a role to assume when invoking the addressed Lambda, to support role chaining
assume_role_arn: Option<String>,

/// # Additional headers
///
/// Additional headers added to the discover/invoke requests to the deployment.
///
additional_headers: Option<SerdeableHeaderHashMap>,

/// # Dry-run mode
///
/// If `true`, discovery will run but the deployment will not be registered.
/// This is useful to see the impact of a new deployment before registering it.
#[serde(default = "restate_serde_util::default::bool::<false>")]
dry_run: bool,
},
}
99 changes: 99 additions & 0 deletions crates/admin/src/rest_api/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,102 @@ pub async fn delete_deployment<V>(
Ok(StatusCode::NOT_IMPLEMENTED)
}
}

/// Update a deployment
#[openapi(
summary = "Update deployment",
description = "Update deployment. Invokes the endpoint and replaces the existing deployment metadata with the discovered information. This is a dangerous operation that should be used only when there are failing invocations on the deployment that cannot be resolved any other way. Sense checks are applied to test that the new deployment is sufficiently similar to the old one.",
operation_id = "update_deployment",
tags = "deployment",
parameters(path(
name = "deployment",
description = "Deployment identifier",
schema = "std::string::String"
))
)]
pub async fn update_deployment<V>(
State(state): State<AdminServiceState<V>>,
Extension(version): Extension<AdminApiVersion>,
Path(deployment_id): Path<DeploymentId>,
#[request_body(required = true)] Json(payload): Json<UpdateDeploymentRequest>,
) -> Result<Json<DetailedDeploymentResponse>, MetaApiError> {
let (discover_endpoint, dry_run) = match payload {
UpdateDeploymentRequest::Http {
uri,
additional_headers,
use_http_11,
dry_run,
} => {
// Verify URI is absolute!
if uri.scheme().is_none() || uri.authority().is_none() {
return Err(MetaApiError::InvalidField(
"uri",
format!(
"The provided uri {uri} is not absolute, only absolute URIs can be used."
),
));
}

let is_using_https = uri.scheme().unwrap() == &Scheme::HTTPS;

(
DiscoverEndpoint::new(
Endpoint::Http(
uri,
if use_http_11 {
Some(http::Version::HTTP_11)
} else if is_using_https {
// ALPN will sort this out
None
} else {
// By default, we use h2c on HTTP
Some(http::Version::HTTP_2)
},
),
additional_headers.unwrap_or_default().into(),
),
dry_run,
)
}
UpdateDeploymentRequest::Lambda {
arn,
assume_role_arn,
additional_headers,
dry_run,
} => (
DiscoverEndpoint::new(
Endpoint::Lambda(
arn.parse().map_err(|e: InvalidLambdaARN| {
MetaApiError::InvalidField("arn", e.to_string())
})?,
assume_role_arn.map(Into::into),
),
additional_headers.unwrap_or_default().into(),
),
dry_run,
),
};

let apply_mode = if dry_run {
ApplyMode::DryRun
} else {
ApplyMode::Apply
};

let (deployment, services) = state
.schema_registry
.update_deployment(deployment_id, discover_endpoint, apply_mode)
.await
.inspect_err(|e| warn_it!(e))?;

Ok(Json(
restate_admin_rest_model::converters::convert_detailed_deployment_response(
version,
DetailedDeploymentResponse {
id: deployment.id,
deployment: deployment.metadata.into(),
services,
},
),
))
}
9 changes: 3 additions & 6 deletions crates/admin/src/rest_api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::schema_registry::error::{
DeploymentError, SchemaError, SchemaRegistryError, ServiceError,
};
use crate::schema_registry::error::{SchemaError, SchemaRegistryError, ServiceError};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
Expand Down Expand Up @@ -77,9 +75,8 @@ impl IntoResponse for MetaApiError {
MetaApiError::Schema(schema_error) => match schema_error {
SchemaError::NotFound(_) => StatusCode::NOT_FOUND,
SchemaError::Override(_)
| SchemaError::Service(ServiceError::DifferentType { .. })
| SchemaError::Service(ServiceError::RemovedHandlers { .. })
| SchemaError::Deployment(DeploymentError::IncorrectId { .. }) => {
| SchemaError::Service(ServiceError::DifferentServiceType { .. })
| SchemaError::Service(ServiceError::RemovedHandlers { .. }) => {
StatusCode::CONFLICT
}
SchemaError::Service(_) => StatusCode::BAD_REQUEST,
Expand Down
5 changes: 5 additions & 0 deletions crates/admin/src/rest_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod services;
mod subscriptions;
mod version;

use axum_integration::put;
use okapi_operation::axum_integration::{delete, get, patch, post};
use okapi_operation::*;
use restate_types::identifiers::PartitionKey;
Expand Down Expand Up @@ -49,6 +50,10 @@ where
"/deployments/:deployment",
delete(openapi_handler!(deployments::delete_deployment)),
)
.route(
"/deployments/:deployment",
put(openapi_handler!(deployments::update_deployment)),
)
.route("/services", get(openapi_handler!(services::list_services)))
.route(
"/services/:service",
Expand Down
14 changes: 7 additions & 7 deletions crates/admin/src/schema_registry/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub enum ServiceError {
ReservedName(String),
#[error("detected a new service '{0}' revision with a service type different from the previous revision. Service type cannot be changed across revisions")]
#[code(restate_errors::META0006)]
DifferentType(ServiceName),
DifferentServiceType(ServiceName),
#[error("the service '{0}' already exists but the new revision removed the handlers {1:?}")]
#[code(restate_errors::META0006)]
RemovedHandlers(ServiceName, Vec<String>),
Expand Down Expand Up @@ -135,12 +135,12 @@ pub enum SubscriptionError {

#[derive(Debug, thiserror::Error, codederror::CodedError)]
pub enum DeploymentError {
#[error("existing deployment id is different from requested (requested = {requested}, existing = {existing})")]
#[code(restate_errors::META0004)]
IncorrectId {
requested: DeploymentId,
existing: DeploymentId,
},
#[error("an update deployment operation must provide an endpoint with the same services and handlers. The update tried to remove the services {0:?}")]
#[code(restate_errors::META0006)]
RemovedServices(Vec<String>),
#[error("multiple deployments ({0:?}) were found that reference the discovered endpoint. a deployment can only be force updated when it uniquely owns its endpoint. First delete one or more of the deployments")]
#[code(restate_errors::META0006)]
MultipleExistingDeployments(Vec<DeploymentId>),
}

impl From<ReadModifyWriteError<SchemaError>> for SchemaRegistryError {
Expand Down
85 changes: 82 additions & 3 deletions crates/admin/src/schema_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<V> SchemaRegistry<V> {
apply_mode: ApplyMode,
) -> Result<(DeploymentId, Vec<ServiceMetadata>), SchemaRegistryError> {
// The number of concurrent discovery calls is bound by the number of concurrent
// register_deployment calls. If it should become a problem that a user tries to register
// {register,update}_deployment calls. If it should become a problem that a user tries to register
// the same endpoint too often, then we need to add a synchronization mechanism which
// ensures that only a limited number of discover calls per endpoint are running.
let discovered_metadata = self.service_discovery.discover(discover_endpoint).await?;
Expand Down Expand Up @@ -135,7 +135,6 @@ impl<V> SchemaRegistry<V> {
// suppress logging output in case of a dry run
let id = tracing::subscriber::with_default(NoSubscriber::new(), || {
updater.add_deployment(
None,
deployment_metadata,
discovered_metadata.services,
force.force_enabled(),
Expand All @@ -162,7 +161,6 @@ impl<V> SchemaRegistry<V> {
);

new_deployment_id = Some(updater.add_deployment(
None,
deployment_metadata.clone(),
discovered_metadata.services.clone(),
force.force_enabled(),
Expand All @@ -187,6 +185,87 @@ impl<V> SchemaRegistry<V> {
Ok((id, services))
}

pub async fn update_deployment(
&self,
deployment_id: DeploymentId,
discover_endpoint: DiscoverEndpoint,
apply_mode: ApplyMode,
) -> Result<(Deployment, Vec<ServiceMetadata>), SchemaRegistryError> {
// The number of concurrent discovery calls is bound by the number of concurrent
// {register,update}_deployment calls. If it should become a problem that a user tries to register
// the same endpoint too often, then we need to add a synchronization mechanism which
// ensures that only a limited number of discover calls per endpoint are running.
let discovered_metadata = self.service_discovery.discover(discover_endpoint).await?;

let deployment_metadata = match discovered_metadata.endpoint {
DiscoveredEndpoint::Http(uri, http_version) => DeploymentMetadata::new_http(
uri.clone(),
discovered_metadata.protocol_type,
http_version,
DeliveryOptions::new(discovered_metadata.headers),
discovered_metadata.supported_protocol_versions,
),
DiscoveredEndpoint::Lambda(arn, assume_role_arn) => DeploymentMetadata::new_lambda(
arn,
assume_role_arn,
DeliveryOptions::new(discovered_metadata.headers),
discovered_metadata.supported_protocol_versions,
),
};

if !apply_mode.should_apply() {
let mut updater = SchemaUpdater::new(
Metadata::with_current(|m| m.schema()).deref().clone(),
self.experimental_feature_kafka_ingress_next,
);

// suppress logging output in case of a dry run
tracing::subscriber::with_default(NoSubscriber::new(), || {
updater.update_deployment(
deployment_id,
deployment_metadata,
discovered_metadata.services,
)
})?;

let schema_information = updater.into_inner();
Ok(schema_information
.get_deployment_and_services(&deployment_id)
.expect("deployment was just added"))
} else {
let schema_information = self
.metadata_writer
.metadata_store_client()
.read_modify_write(
SCHEMA_INFORMATION_KEY.clone(),
|schema_information: Option<Schema>| {
let mut updater = SchemaUpdater::new(
schema_information.unwrap_or_default(),
self.experimental_feature_kafka_ingress_next,
);

updater.update_deployment(
deployment_id,
deployment_metadata.clone(),
discovered_metadata.services.clone(),
)?;
Ok(updater.into_inner())
},
)
.await?;

let (deployment, services) = schema_information
.get_deployment_and_services(&deployment_id)
.expect("deployment was just updated");

self.metadata_writer
.update(Arc::new(schema_information))
.await?;

Ok((deployment, services))
}
}

pub async fn delete_deployment(
&self,
deployment_id: DeploymentId,
Expand Down
Loading

0 comments on commit bed347c

Please sign in to comment.