diff --git a/crates/admin-rest-model/src/deployments.rs b/crates/admin-rest-model/src/deployments.rs index e9c4c8f42..2f8595198 100644 --- a/crates/admin-rest-model/src/deployments.rs +++ b/crates/admin-rest-model/src/deployments.rs @@ -320,3 +320,66 @@ pub struct DetailedDeploymentResponse { /// List of services exposed by this deployment. pub services: Vec, } + +// RegisterDeploymentRequest except without `force` +#[serde_as] +#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +pub enum UpdateDeploymentRequest { + Http { + /// # Uri + /// + /// Uri to use to discover/invoke the http deployment. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schema", schemars(with = "String"))] + uri: Uri, + + /// # Additional headers + /// + /// Additional headers added to the discover/invoke requests to the deployment. + /// + additional_headers: Option, + + /// # Use http1.1 + /// + /// If `true`, discovery will be attempted using a client that defaults to HTTP1.1 + /// instead of a prior-knowledge HTTP2 client. HTTP2 may still be used for TLS servers + /// that advertise HTTP2 support via ALPN. HTTP1.1 deployments will only work in + /// request-response mode. + /// + #[serde(default = "restate_serde_util::default::bool::")] + use_http_11: bool, + + /// # Dry-run mode + /// + /// If `true`, discovery will run but the deployment will not be registered. + /// This is useful to see the impact of a new deployment before registering it. + #[serde(default = "restate_serde_util::default::bool::")] + dry_run: bool, + }, + Lambda { + /// # ARN + /// + /// ARN to use to discover/invoke the lambda deployment. + arn: String, + + /// # Assume role ARN + /// + /// Optional ARN of a role to assume when invoking the addressed Lambda, to support role chaining + assume_role_arn: Option, + + /// # Additional headers + /// + /// Additional headers added to the discover/invoke requests to the deployment. + /// + additional_headers: Option, + + /// # Dry-run mode + /// + /// If `true`, discovery will run but the deployment will not be registered. + /// This is useful to see the impact of a new deployment before registering it. + #[serde(default = "restate_serde_util::default::bool::")] + dry_run: bool, + }, +} diff --git a/crates/admin/src/rest_api/deployments.rs b/crates/admin/src/rest_api/deployments.rs index b1ebb10ac..4c9dea292 100644 --- a/crates/admin/src/rest_api/deployments.rs +++ b/crates/admin/src/rest_api/deployments.rs @@ -258,3 +258,102 @@ pub async fn delete_deployment( Ok(StatusCode::NOT_IMPLEMENTED) } } + +/// Update a deployment +#[openapi( + summary = "Update deployment", + description = "Update deployment. Invokes the endpoint and replaces the existing deployment metadata with the discovered information. This is a dangerous operation that should be used only when there are failing invocations on the deployment that cannot be resolved any other way. Sense checks are applied to test that the new deployment is sufficiently similar to the old one.", + operation_id = "update_deployment", + tags = "deployment", + parameters(path( + name = "deployment", + description = "Deployment identifier", + schema = "std::string::String" + )) +)] +pub async fn update_deployment( + State(state): State>, + Extension(version): Extension, + Path(deployment_id): Path, + #[request_body(required = true)] Json(payload): Json, +) -> Result, MetaApiError> { + let (discover_endpoint, dry_run) = match payload { + UpdateDeploymentRequest::Http { + uri, + additional_headers, + use_http_11, + dry_run, + } => { + // Verify URI is absolute! + if uri.scheme().is_none() || uri.authority().is_none() { + return Err(MetaApiError::InvalidField( + "uri", + format!( + "The provided uri {uri} is not absolute, only absolute URIs can be used." + ), + )); + } + + let is_using_https = uri.scheme().unwrap() == &Scheme::HTTPS; + + ( + DiscoverEndpoint::new( + Endpoint::Http( + uri, + if use_http_11 { + Some(http::Version::HTTP_11) + } else if is_using_https { + // ALPN will sort this out + None + } else { + // By default, we use h2c on HTTP + Some(http::Version::HTTP_2) + }, + ), + additional_headers.unwrap_or_default().into(), + ), + dry_run, + ) + } + UpdateDeploymentRequest::Lambda { + arn, + assume_role_arn, + additional_headers, + dry_run, + } => ( + DiscoverEndpoint::new( + Endpoint::Lambda( + arn.parse().map_err(|e: InvalidLambdaARN| { + MetaApiError::InvalidField("arn", e.to_string()) + })?, + assume_role_arn.map(Into::into), + ), + additional_headers.unwrap_or_default().into(), + ), + dry_run, + ), + }; + + let apply_mode = if dry_run { + ApplyMode::DryRun + } else { + ApplyMode::Apply + }; + + let (deployment, services) = state + .schema_registry + .update_deployment(deployment_id, discover_endpoint, apply_mode) + .await + .inspect_err(|e| warn_it!(e))?; + + Ok(Json( + restate_admin_rest_model::converters::convert_detailed_deployment_response( + version, + DetailedDeploymentResponse { + id: deployment.id, + deployment: deployment.metadata.into(), + services, + }, + ), + )) +} diff --git a/crates/admin/src/rest_api/error.rs b/crates/admin/src/rest_api/error.rs index b5561c49e..81ddb157b 100644 --- a/crates/admin/src/rest_api/error.rs +++ b/crates/admin/src/rest_api/error.rs @@ -8,9 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::schema_registry::error::{ - DeploymentError, SchemaError, SchemaRegistryError, ServiceError, -}; +use crate::schema_registry::error::{SchemaError, SchemaRegistryError, ServiceError}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::Json; @@ -77,9 +75,8 @@ impl IntoResponse for MetaApiError { MetaApiError::Schema(schema_error) => match schema_error { SchemaError::NotFound(_) => StatusCode::NOT_FOUND, SchemaError::Override(_) - | SchemaError::Service(ServiceError::DifferentType { .. }) - | SchemaError::Service(ServiceError::RemovedHandlers { .. }) - | SchemaError::Deployment(DeploymentError::IncorrectId { .. }) => { + | SchemaError::Service(ServiceError::DifferentServiceType { .. }) + | SchemaError::Service(ServiceError::RemovedHandlers { .. }) => { StatusCode::CONFLICT } SchemaError::Service(_) => StatusCode::BAD_REQUEST, diff --git a/crates/admin/src/rest_api/mod.rs b/crates/admin/src/rest_api/mod.rs index eb5d5786a..99f46c7f9 100644 --- a/crates/admin/src/rest_api/mod.rs +++ b/crates/admin/src/rest_api/mod.rs @@ -19,6 +19,7 @@ mod services; mod subscriptions; mod version; +use axum_integration::put; use okapi_operation::axum_integration::{delete, get, patch, post}; use okapi_operation::*; use restate_types::identifiers::PartitionKey; @@ -49,6 +50,10 @@ where "/deployments/:deployment", delete(openapi_handler!(deployments::delete_deployment)), ) + .route( + "/deployments/:deployment", + put(openapi_handler!(deployments::update_deployment)), + ) .route("/services", get(openapi_handler!(services::list_services))) .route( "/services/:service", diff --git a/crates/admin/src/schema_registry/error.rs b/crates/admin/src/schema_registry/error.rs index 990d6311e..9d7f131ad 100644 --- a/crates/admin/src/schema_registry/error.rs +++ b/crates/admin/src/schema_registry/error.rs @@ -80,7 +80,7 @@ pub enum ServiceError { ReservedName(String), #[error("detected a new service '{0}' revision with a service type different from the previous revision. Service type cannot be changed across revisions")] #[code(restate_errors::META0006)] - DifferentType(ServiceName), + DifferentServiceType(ServiceName), #[error("the service '{0}' already exists but the new revision removed the handlers {1:?}")] #[code(restate_errors::META0006)] RemovedHandlers(ServiceName, Vec), @@ -135,12 +135,12 @@ pub enum SubscriptionError { #[derive(Debug, thiserror::Error, codederror::CodedError)] pub enum DeploymentError { - #[error("existing deployment id is different from requested (requested = {requested}, existing = {existing})")] - #[code(restate_errors::META0004)] - IncorrectId { - requested: DeploymentId, - existing: DeploymentId, - }, + #[error("an update deployment operation must provide an endpoint with the same services and handlers. The update tried to remove the services {0:?}")] + #[code(restate_errors::META0006)] + RemovedServices(Vec), + #[error("multiple deployments ({0:?}) were found that reference the discovered endpoint. a deployment can only be force updated when it uniquely owns its endpoint. First delete one or more of the deployments")] + #[code(restate_errors::META0006)] + MultipleExistingDeployments(Vec), } impl From> for SchemaRegistryError { diff --git a/crates/admin/src/schema_registry/mod.rs b/crates/admin/src/schema_registry/mod.rs index 39481e6d2..ca09f7a5d 100644 --- a/crates/admin/src/schema_registry/mod.rs +++ b/crates/admin/src/schema_registry/mod.rs @@ -105,7 +105,7 @@ impl SchemaRegistry { apply_mode: ApplyMode, ) -> Result<(DeploymentId, Vec), SchemaRegistryError> { // The number of concurrent discovery calls is bound by the number of concurrent - // register_deployment calls. If it should become a problem that a user tries to register + // {register,update}_deployment calls. If it should become a problem that a user tries to register // the same endpoint too often, then we need to add a synchronization mechanism which // ensures that only a limited number of discover calls per endpoint are running. let discovered_metadata = self.service_discovery.discover(discover_endpoint).await?; @@ -135,7 +135,6 @@ impl SchemaRegistry { // suppress logging output in case of a dry run let id = tracing::subscriber::with_default(NoSubscriber::new(), || { updater.add_deployment( - None, deployment_metadata, discovered_metadata.services, force.force_enabled(), @@ -162,7 +161,6 @@ impl SchemaRegistry { ); new_deployment_id = Some(updater.add_deployment( - None, deployment_metadata.clone(), discovered_metadata.services.clone(), force.force_enabled(), @@ -187,6 +185,87 @@ impl SchemaRegistry { Ok((id, services)) } + pub async fn update_deployment( + &self, + deployment_id: DeploymentId, + discover_endpoint: DiscoverEndpoint, + apply_mode: ApplyMode, + ) -> Result<(Deployment, Vec), SchemaRegistryError> { + // The number of concurrent discovery calls is bound by the number of concurrent + // {register,update}_deployment calls. If it should become a problem that a user tries to register + // the same endpoint too often, then we need to add a synchronization mechanism which + // ensures that only a limited number of discover calls per endpoint are running. + let discovered_metadata = self.service_discovery.discover(discover_endpoint).await?; + + let deployment_metadata = match discovered_metadata.endpoint { + DiscoveredEndpoint::Http(uri, http_version) => DeploymentMetadata::new_http( + uri.clone(), + discovered_metadata.protocol_type, + http_version, + DeliveryOptions::new(discovered_metadata.headers), + discovered_metadata.supported_protocol_versions, + ), + DiscoveredEndpoint::Lambda(arn, assume_role_arn) => DeploymentMetadata::new_lambda( + arn, + assume_role_arn, + DeliveryOptions::new(discovered_metadata.headers), + discovered_metadata.supported_protocol_versions, + ), + }; + + if !apply_mode.should_apply() { + let mut updater = SchemaUpdater::new( + Metadata::with_current(|m| m.schema()).deref().clone(), + self.experimental_feature_kafka_ingress_next, + ); + + // suppress logging output in case of a dry run + tracing::subscriber::with_default(NoSubscriber::new(), || { + updater.update_deployment( + deployment_id, + deployment_metadata, + discovered_metadata.services, + ) + })?; + + let schema_information = updater.into_inner(); + Ok(schema_information + .get_deployment_and_services(&deployment_id) + .expect("deployment was just added")) + } else { + let schema_information = self + .metadata_writer + .metadata_store_client() + .read_modify_write( + SCHEMA_INFORMATION_KEY.clone(), + |schema_information: Option| { + let mut updater = SchemaUpdater::new( + schema_information.unwrap_or_default(), + self.experimental_feature_kafka_ingress_next, + ); + + updater.update_deployment( + deployment_id, + deployment_metadata.clone(), + discovered_metadata.services.clone(), + )?; + Ok(updater.into_inner()) + }, + ) + .await?; + + let (deployment, services) = schema_information + .get_deployment_and_services(&deployment_id) + .expect("deployment was just updated"); + + self.metadata_writer + .update(Arc::new(schema_information)) + .await?; + + Ok((deployment, services)) + } + } + pub async fn delete_deployment( &self, deployment_id: DeploymentId, diff --git a/crates/admin/src/schema_registry/updater.rs b/crates/admin/src/schema_registry/updater.rs index 288076a80..18847fbf1 100644 --- a/crates/admin/src/schema_registry/updater.rs +++ b/crates/admin/src/schema_registry/updater.rs @@ -32,7 +32,7 @@ use restate_types::schema::subscriptions::{ use restate_types::schema::Schema; use std::collections::hash_map::Entry; use std::collections::HashMap; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; /// Responsible for updating the provided [`Schema`] with new /// schema information. It makes sure that the version of schema information @@ -68,65 +68,60 @@ impl SchemaUpdater { pub fn add_deployment( &mut self, - requested_deployment_id: Option, deployment_metadata: DeploymentMetadata, services: Vec, force: bool, ) -> Result { - let deployment_id: Option; - let proposed_services: HashMap<_, _> = services .into_iter() .map(|c| ServiceName::try_from(c.name.to_string()).map(|name| (name, c))) .collect::, _>>()?; - // Did we find an existing deployment with same id or with a conflicting endpoint url? - let found_existing_deployment = requested_deployment_id - .and_then(|id| self.schema_information.find_existing_deployment_by_id(&id)) - .or_else(|| { - self.schema_information - .find_existing_deployment_by_endpoint(&deployment_metadata.ty) - }); + // Did we find an existing deployment with a conflicting endpoint url? + let mut existing_deployments = self + .schema_information + .find_existing_deployments_by_endpoint(&deployment_metadata.ty); let mut services_to_remove = Vec::default(); - if let Some((existing_deployment_id, existing_deployment)) = found_existing_deployment { - if requested_deployment_id.is_some_and(|dp| &dp != existing_deployment_id) { - // The deployment id is different from the existing one, we don't accept that even - // if force is used. It means that the user intended to update another deployment. - return Err(SchemaError::Deployment(DeploymentError::IncorrectId { - requested: requested_deployment_id.expect("must be set"), - existing: *existing_deployment_id, - })); - } - + let deployment_id = if let Some((existing_deployment_id, existing_deployment)) = + existing_deployments.next() + { if force { - deployment_id = Some(*existing_deployment_id); + // Even under force we will only accept exactly one existing deployment with this endpoint + if let Some((another_existing_deployment_id, _)) = existing_deployments.next() { + let mut existing_deployment_ids = + vec![*existing_deployment_id, *another_existing_deployment_id]; + existing_deployment_ids + .extend(existing_deployments.map(|(deployment_id, _)| *deployment_id)); + + return Err(SchemaError::Deployment( + DeploymentError::MultipleExistingDeployments(existing_deployment_ids), + )); + } for service in &existing_deployment.services { // If a service is not available anymore in the new deployment, we need to remove it if !proposed_services.contains_key(&service.name) { warn!( restate.deployment.id = %existing_deployment_id, - restate.deployment.address = %deployment_metadata.address_display(), + restate.deployment.address = %existing_deployment.metadata.address_display(), "Going to remove service {} due to a forced deployment update", service.name ); services_to_remove.push(service.name.clone()); } } + + *existing_deployment_id } else { return Err(SchemaError::Override(format!( "deployment with id '{existing_deployment_id}'" ))); } } else { - // New deployment. Use the supplied deployment_id if passed, otherwise, generate one. - deployment_id = requested_deployment_id.or_else(|| Some(DeploymentId::new())); - } - - // We must have a deployment id by now, either a new or existing one. - let deployment_id = deployment_id.unwrap(); + DeploymentId::new() + }; let mut services_to_add = HashMap::with_capacity(proposed_services.len()); @@ -186,7 +181,7 @@ impl SchemaUpdater { service_type ); } else { - return Err(SchemaError::Service(ServiceError::DifferentType( + return Err(SchemaError::Service(ServiceError::DifferentServiceType( service_name, ))); } @@ -232,6 +227,8 @@ impl SchemaUpdater { services_to_add.insert(service_name, service_schema); } + drop(existing_deployments); + for service_to_remove in services_to_remove { self.schema_information.services.remove(&service_to_remove); } @@ -260,6 +257,169 @@ impl SchemaUpdater { Ok(deployment_id) } + pub fn update_deployment( + &mut self, + deployment_id: DeploymentId, + deployment_metadata: DeploymentMetadata, + services: Vec, + ) -> Result<(), SchemaError> { + let proposed_services: HashMap<_, _> = services + .into_iter() + .map(|c| ServiceName::try_from(c.name.to_string()).map(|name| (name, c))) + .collect::, _>>()?; + + // Look for an existing deployment with this ID + let Some((_, existing_deployment)) = self + .schema_information + .find_existing_deployment_by_id(&deployment_id) + else { + return Err(SchemaError::NotFound(format!( + "deployment with id '{deployment_id}'" + ))); + }; + + let mut services_to_remove = Vec::default(); + + for service in &existing_deployment.services { + if !proposed_services.contains_key(&service.name) { + services_to_remove.push(service.name.clone()); + } + } + + if !services_to_remove.is_empty() { + // we don't allow removing services as part of update deployment + return Err(SchemaError::Deployment(DeploymentError::RemovedServices( + services_to_remove, + ))); + } + + let mut services_to_add = HashMap::with_capacity(proposed_services.len()); + + // Compute service schemas + for (service_name, service) in proposed_services { + let service_type = ServiceType::from(service.ty); + let handlers = DiscoveredHandlerMetadata::compute_handlers( + service + .handlers + .into_iter() + .map(|h| { + DiscoveredHandlerMetadata::from_schema( + service_name.as_ref(), + service_type, + h, + ) + }) + .collect::, _>>()?, + ); + + let service_schema = if let Some(existing_service) = + self.schema_information.services.get(service_name.as_ref()) + { + let removed_handlers: Vec = existing_service + .handlers + .keys() + .filter(|name| !handlers.contains_key(*name)) + .map(|name| name.to_string()) + .collect(); + + if !removed_handlers.is_empty() { + return Err(SchemaError::Service(ServiceError::RemovedHandlers( + service_name, + removed_handlers, + ))); + } + + if existing_service.ty != service_type { + return Err(SchemaError::Service(ServiceError::DifferentServiceType( + service_name, + ))); + } + + let mut service_schemas = existing_service.clone(); + service_schemas.revision = existing_service.revision.wrapping_add(1); + service_schemas.ty = service_type; + service_schemas.handlers = handlers; + service_schemas.location.latest_deployment = deployment_id; + service_schemas.service_openapi_cache = Default::default(); + service_schemas.documentation = service.documentation; + service_schemas.metadata = service.metadata; + + service_schemas + } else { + ServiceSchemas { + revision: 1, + handlers, + ty: service_type, + location: ServiceLocation { + latest_deployment: deployment_id, + public: true, + }, + idempotency_retention: DEFAULT_IDEMPOTENCY_RETENTION, + workflow_completion_retention: if service_type == ServiceType::Workflow { + Some(DEFAULT_WORKFLOW_COMPLETION_RETENTION) + } else { + None + }, + inactivity_timeout: None, + abort_timeout: None, + service_openapi_cache: Default::default(), + documentation: service.documentation, + metadata: service.metadata, + } + }; + + services_to_add.insert(service_name, service_schema); + } + + let services_metadata = services_to_add + .into_iter() + .map(|(name, schema)| { + let metadata = schema.as_service_metadata(name.clone().into_inner()); + match self.schema_information.services.get(name.as_ref()) { + Some(ServiceSchemas { + location: ServiceLocation { latest_deployment, .. }, + .. + }) if latest_deployment == &deployment_id => { + // This deployment is the latest for this service, so we should update the service schema + info!( + rpc.service = %name, + "Overwriting existing service schemas" + ); + self.schema_information + .services + .insert(name.into_inner(), schema); + }, + Some(_) => { + debug!( + rpc.service = %name, + "Keeping existing service schema as this update operation affected a draining deployment" + ); + }, + None => { + // we have a new service, it deserves a schema as normal + self.schema_information + .services + .insert(name.into_inner(), schema); + } + } + + metadata + }) + .collect(); + + self.schema_information.deployments.insert( + deployment_id, + DeploymentSchemas { + services: services_metadata, + metadata: deployment_metadata, + }, + ); + + self.modified = true; + + Ok(()) + } + pub fn remove_deployment(&mut self, deployment_id: DeploymentId) { if let Some(deployment) = self.schema_information.deployments.remove(&deployment_id) { for service_metadata in deployment.services { @@ -686,7 +846,8 @@ impl DiscoveredHandlerMetadata { mod tests { use super::*; - use restate_test_util::{assert, assert_eq, let_assert}; + use http::HeaderName; + use restate_test_util::{assert, assert_eq}; use restate_types::schema::deployment::{Deployment, DeploymentResolver}; use restate_types::schema::service::ServiceMetadataResolver; @@ -753,24 +914,16 @@ mod tests { let initial_version = schema_information.version(); let mut updater = SchemaUpdater::new(schema_information, false); - let deployment = Deployment::mock(); - let deployment_id = updater - .add_deployment( - Some(deployment.id), - deployment.metadata.clone(), - vec![greeter_service()], - false, - ) + let mut deployment = Deployment::mock(); + deployment.id = updater + .add_deployment(deployment.metadata.clone(), vec![greeter_service()], false) .unwrap(); - // Ensure we are using the pre-determined id - assert_eq!(deployment.id, deployment_id); - let schema = updater.into_inner(); assert!(initial_version < schema.version()); schema.assert_service_revision(GREETER_SERVICE_NAME, 1); - schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment.id); schema.assert_service_handler(GREETER_SERVICE_NAME, "greet"); } @@ -778,13 +931,12 @@ mod tests { fn register_new_deployment_add_unregistered_service() { let mut updater = SchemaUpdater::default(); - let deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); - let deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); + let mut deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); + let mut deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); // Register first deployment - updater + deployment_1.id = updater .add_deployment( - Some(deployment_1.id), deployment_1.metadata.clone(), vec![greeter_service()], false, @@ -799,9 +951,8 @@ mod tests { .is_none()); updater = SchemaUpdater::new(schemas, false); - updater + deployment_2.id = updater .add_deployment( - Some(deployment_2.id), deployment_2.metadata.clone(), vec![greeter_service(), another_greeter_service()], false, @@ -819,14 +970,10 @@ mod tests { #[test] fn force_deploy_private_service() -> Result<(), SchemaError> { let mut updater = SchemaUpdater::default(); - let deployment = Deployment::mock(); + let mut deployment = Deployment::mock(); - updater.add_deployment( - Some(deployment.id), - deployment.metadata.clone(), - vec![greeter_service()], - false, - )?; + deployment.id = + updater.add_deployment(deployment.metadata.clone(), vec![greeter_service()], false)?; let schemas = updater.into_inner(); @@ -844,12 +991,8 @@ mod tests { assert!(!schemas.assert_service(GREETER_SERVICE_NAME).public); updater = SchemaUpdater::new(schemas, false); - updater.add_deployment( - Some(deployment.id), - deployment.metadata.clone(), - vec![greeter_service()], - true, - )?; + deployment.id = + updater.add_deployment(deployment.metadata.clone(), vec![greeter_service()], true)?; let schemas = updater.into_inner(); assert!(!schemas.assert_service(GREETER_SERVICE_NAME).public); @@ -867,12 +1010,11 @@ mod tests { fn register_new_deployment_fails_changing_instance_type() { let mut updater = SchemaUpdater::default(); - let deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); + let mut deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); let deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); - updater + deployment_1.id = updater .add_deployment( - Some(deployment_1.id), deployment_1.metadata.clone(), vec![greeter_service()], false, @@ -883,14 +1025,13 @@ mod tests { schemas.assert_service_deployment(GREETER_SERVICE_NAME, deployment_1.id); let compute_result = SchemaUpdater::new(schemas, false).add_deployment( - Some(deployment_2.id), deployment_2.metadata, vec![greeter_virtual_object()], false, ); assert!(let &SchemaError::Service( - ServiceError::DifferentType(_) + ServiceError::DifferentServiceType(_) ) = compute_result.unwrap_err()); } } @@ -899,10 +1040,9 @@ mod tests { fn override_existing_deployment_removing_a_service() { let mut updater = SchemaUpdater::default(); - let deployment = Deployment::mock(); - updater + let mut deployment = Deployment::mock(); + deployment.id = updater .add_deployment( - Some(deployment.id), deployment.metadata.clone(), vec![greeter_service(), another_greeter_service()], false, @@ -914,14 +1054,12 @@ mod tests { schemas.assert_service_deployment(ANOTHER_GREETER_SERVICE_NAME, deployment.id); updater = SchemaUpdater::new(schemas, false); - updater - .add_deployment( - Some(deployment.id), - deployment.metadata.clone(), - vec![greeter_service()], - true, - ) - .unwrap(); + assert_eq!( + updater + .add_deployment(deployment.metadata.clone(), vec![greeter_service()], true,) + .unwrap(), + deployment.id + ); let schemas = updater.into_inner(); @@ -935,76 +1073,34 @@ mod tests { fn cannot_override_existing_deployment_endpoint_conflict() { let mut updater = SchemaUpdater::default(); - let deployment = Deployment::mock(); - updater - .add_deployment( - Some(deployment.id), - deployment.metadata.clone(), - vec![greeter_service()], - false, - ) + let mut deployment = Deployment::mock(); + deployment.id = updater + .add_deployment(deployment.metadata.clone(), vec![greeter_service()], false) .unwrap(); assert!(let SchemaError::Override(_) = updater.add_deployment( - Some(deployment.id), deployment.metadata, vec![greeter_service()], false).unwrap_err() ); } - #[test] - fn cannot_override_existing_deployment_existing_id_mismatch() { - let mut updater = SchemaUpdater::default(); - - let deployment = Deployment::mock(); - updater - .add_deployment( - Some(deployment.id), - deployment.metadata.clone(), - vec![greeter_service()], - false, - ) - .unwrap(); - - let new_id = DeploymentId::new(); - - let rejection = updater - .add_deployment( - Some(new_id), - deployment.metadata, - vec![greeter_service()], - false, - ) - .unwrap_err(); - let_assert!( - SchemaError::Deployment(DeploymentError::IncorrectId { - requested, - existing - }) = rejection - ); - assert_eq!(new_id, requested); - assert_eq!(deployment.id, existing); - } - #[test] fn register_two_deployments_then_remove_first() { let mut updater = SchemaUpdater::default(); - let deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); - let deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); + let mut deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); + let mut deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); - updater + deployment_1.id = updater .add_deployment( - Some(deployment_1.id), deployment_1.metadata.clone(), vec![greeter_service(), another_greeter_service()], false, ) .unwrap(); - updater + deployment_2.id = updater .add_deployment( - Some(deployment_2.id), deployment_2.metadata.clone(), vec![greeter_service()], false, @@ -1085,28 +1181,18 @@ mod tests { fn reject_removing_existing_methods() { let mut updater = SchemaUpdater::default(); - let deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); + let mut deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); let deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); - updater - .add_deployment( - Some(deployment_1.id), - deployment_1.metadata, - vec![greeter_v1_service()], - false, - ) + deployment_1.id = updater + .add_deployment(deployment_1.metadata, vec![greeter_v1_service()], false) .unwrap(); let schemas = updater.into_inner(); schemas.assert_service_revision(GREETER_SERVICE_NAME, 1); updater = SchemaUpdater::new(schemas, false); let rejection = updater - .add_deployment( - Some(deployment_2.id), - deployment_2.metadata, - vec![greeter_v2_service()], - false, - ) + .add_deployment(deployment_2.metadata, vec![greeter_v2_service()], false) .unwrap_err(); let schemas = updater.into_inner(); @@ -1120,4 +1206,456 @@ mod tests { check!(missing_methods == &["doSomething"]); } } + + #[test] + fn update_latest_deployment() { + let mut updater = SchemaUpdater::default(); + + let mut deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); + + deployment_1.id = updater + .add_deployment( + deployment_1.metadata.clone(), + vec![greeter_virtual_object()], + false, + ) + .unwrap(); + + assert!(let &SchemaError::NotFound(_) = updater.update_deployment( + DeploymentId::new(), + deployment_1.metadata.clone(), + vec![], + ).unwrap_err()); + + assert!(let &SchemaError::Deployment( + DeploymentError::RemovedServices(_) + ) = updater.update_deployment( + deployment_1.id, + deployment_1.metadata.clone(), + vec![], + ).unwrap_err()); + + { + let mut greeter_virtual_object = greeter_virtual_object(); + greeter_virtual_object.ty = endpoint_manifest::ServiceType::Service; + + assert!(let &SchemaError::Service( + ServiceError::DifferentServiceType(_) + ) = updater.update_deployment( + deployment_1.id, + deployment_1.metadata.clone(), + vec![greeter_virtual_object], + ).unwrap_err()); + } + + assert!(let &SchemaError::Service( + ServiceError::RemovedHandlers(_, _) + ) = updater.update_deployment( + deployment_1.id, + deployment_1.metadata.clone(), + vec![endpoint_manifest::Service { + handlers: Default::default(), + ..greeter_virtual_object() + }], + ).unwrap_err()); + + deployment_1 + .metadata + .delivery_options + .additional_headers + .insert( + HeaderName::from_static("foo"), + HeaderValue::from_static("bar"), + ); + + updater + .update_deployment( + deployment_1.id, + deployment_1.metadata.clone(), + vec![greeter_virtual_object()], + ) + .unwrap(); + + let schemas = updater.into_inner(); + + schemas.assert_service_deployment(GREETER_SERVICE_NAME, deployment_1.id); + schemas.assert_service_revision(GREETER_SERVICE_NAME, 2); + + let (_, updated_deployment) = schemas + .find_existing_deployment_by_id(&deployment_1.id) + .unwrap(); + + assert!(updated_deployment + .metadata + .delivery_options + .additional_headers + .contains_key(&HeaderName::from_static("foo"))); + } + + #[test] + fn update_draining_deployment() { + let mut updater = SchemaUpdater::default(); + + let mut deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); + let mut deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); + + deployment_1.id = updater + .add_deployment( + deployment_1.metadata.clone(), + vec![greeter_service()], + false, + ) + .unwrap(); + + deployment_2.id = updater + .add_deployment( + deployment_2.metadata.clone(), + vec![greeter_service(), another_greeter_service()], + false, + ) + .unwrap(); + + deployment_1 + .metadata + .delivery_options + .additional_headers + .insert( + HeaderName::from_static("foo"), + HeaderValue::from_static("bar"), + ); + + updater + .update_deployment( + deployment_1.id, + deployment_1.metadata.clone(), + vec![greeter_service()], + ) + .unwrap(); + + let schemas = updater.into_inner(); + + schemas.assert_service_deployment(GREETER_SERVICE_NAME, deployment_2.id); + schemas.assert_service_revision(GREETER_SERVICE_NAME, 2); + + let (_, updated_deployment) = schemas + .find_existing_deployment_by_id(&deployment_1.id) + .unwrap(); + + assert!(updated_deployment + .metadata + .delivery_options + .additional_headers + .contains_key(&HeaderName::from_static("foo"))); + } + + #[test] + fn update_deployment_same_uri() { + let mut updater = SchemaUpdater::default(); + + let mut deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); + let mut deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); + + deployment_1.id = updater + .add_deployment( + deployment_1.metadata.clone(), + vec![greeter_service()], + false, + ) + .unwrap(); + + // patching new invocations + deployment_2.id = updater + .add_deployment( + deployment_2.metadata.clone(), + vec![greeter_service()], + false, + ) + .unwrap(); + + // oh, i have some old failing invocations, wish those were on the patched version too + + deployment_1.metadata.ty = deployment_2.metadata.ty.clone(); + updater + .update_deployment( + deployment_1.id, + deployment_1.metadata.clone(), + vec![greeter_service()], + ) + .unwrap(); + + // there are now two deployment IDs pointing to :9081, so we shouldn't be able to force either of them + assert!(let &SchemaError::Deployment( + DeploymentError::MultipleExistingDeployments(_) + ) = updater.add_deployment( + deployment_1.metadata.clone(), + vec![greeter_service(), greeter_virtual_object()], + true, + ).unwrap_err()); + + assert!(let &SchemaError::Deployment( + DeploymentError::MultipleExistingDeployments(_) + ) = updater.add_deployment( + deployment_2.metadata.clone(), + vec![greeter_service(), greeter_virtual_object()], + true, + ).unwrap_err()); + + updater + .schema_information + .assert_service_deployment(GREETER_SERVICE_NAME, deployment_2.id); + updater + .schema_information + .assert_service_revision(GREETER_SERVICE_NAME, 2); + + let (_, updated_deployment_1) = updater + .schema_information + .find_existing_deployment_by_id(&deployment_1.id) + .unwrap(); + + let (_, updated_deployment_2) = updater + .schema_information + .find_existing_deployment_by_id(&deployment_2.id) + .unwrap(); + + assert_eq!( + updated_deployment_1.metadata.ty, + updated_deployment_2.metadata.ty + ); + + // the failing invocations have drained so I can safely delete the original deployment + updater.remove_deployment(deployment_1.id); + + assert_eq!( + deployment_2.id, + updater + .add_deployment( + deployment_2.metadata.clone(), + vec![greeter_service(), greeter_virtual_object()], + true, + ) + .unwrap() + ); + } + + #[test] + fn update_latest_deployment_add_handler() { + let mut updater = SchemaUpdater::default(); + + let mut deployment_1 = Deployment::mock_with_uri("http://localhost:9081"); + + deployment_1.id = updater + .add_deployment( + deployment_1.metadata.clone(), + vec![greeter_service()], + false, + ) + .unwrap(); + + let mut updated_greeter_service = greeter_service(); + updated_greeter_service + .handlers + .push(endpoint_manifest::Handler { + documentation: None, + name: "greetAgain".parse().unwrap(), + ty: None, + input: None, + output: None, + metadata: Default::default(), + }); + + updater + .update_deployment( + deployment_1.id, + deployment_1.metadata.clone(), + vec![updated_greeter_service], + ) + .unwrap(); + + updater + .schema_information + .assert_service_deployment(GREETER_SERVICE_NAME, deployment_1.id); + updater + .schema_information + .assert_service_revision(GREETER_SERVICE_NAME, 2); + + let (_, updated_deployment_1) = updater + .schema_information + .find_existing_deployment_by_id(&deployment_1.id) + .unwrap(); + + assert_eq!( + updated_deployment_1 + .services + .iter() + .find(|svc| svc.name == GREETER_SERVICE_NAME) + .unwrap() + .handlers + .len(), + 2 + ); + } + + #[test] + fn update_draining_deployment_add_handler() { + let mut updater = SchemaUpdater::default(); + + let mut deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); + let mut deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); + + deployment_1.id = updater + .add_deployment( + deployment_1.metadata.clone(), + vec![greeter_service()], + false, + ) + .unwrap(); + + deployment_2.id = updater + .add_deployment( + deployment_2.metadata.clone(), + vec![greeter_service()], + false, + ) + .unwrap(); + + let mut updated_greeter_service = greeter_service(); + updated_greeter_service + .handlers + .push(endpoint_manifest::Handler { + documentation: None, + name: "greetAgain".parse().unwrap(), + ty: None, + input: None, + output: None, + metadata: Default::default(), + }); + + updater + .update_deployment( + deployment_1.id, + deployment_1.metadata.clone(), + vec![updated_greeter_service], + ) + .unwrap(); + + updater + .schema_information + .assert_service_deployment(GREETER_SERVICE_NAME, deployment_2.id); + updater + .schema_information + .assert_service_revision(GREETER_SERVICE_NAME, 2); + + let (_, updated_deployment_1) = updater + .schema_information + .find_existing_deployment_by_id(&deployment_1.id) + .unwrap(); + + let (_, updated_deployment_2) = updater + .schema_information + .find_existing_deployment_by_id(&deployment_2.id) + .unwrap(); + + assert_eq!( + updated_deployment_1 + .services + .iter() + .find(|svc| svc.name == GREETER_SERVICE_NAME) + .unwrap() + .handlers + .len(), + 2 + ); + + assert_eq!( + updated_deployment_2 + .services + .iter() + .find(|svc| svc.name == GREETER_SERVICE_NAME) + .unwrap() + .handlers + .len(), + 1 + ) + } + + #[test] + fn update_latest_deployment_add_service() { + let mut updater = SchemaUpdater::default(); + + let mut deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); + + deployment_1.id = updater + .add_deployment( + deployment_1.metadata.clone(), + vec![greeter_service()], + false, + ) + .unwrap(); + + updater + .update_deployment( + deployment_1.id, + deployment_1.metadata.clone(), + vec![greeter_service(), another_greeter_service()], + ) + .unwrap(); + + updater + .schema_information + .assert_service_deployment(GREETER_SERVICE_NAME, deployment_1.id); + updater + .schema_information + .assert_service_deployment(ANOTHER_GREETER_SERVICE_NAME, deployment_1.id); + updater + .schema_information + .assert_service_revision(GREETER_SERVICE_NAME, 2); + updater + .schema_information + .assert_service_revision(ANOTHER_GREETER_SERVICE_NAME, 1); + } + + #[test] + fn update_draining_deployment_add_service() { + let mut updater = SchemaUpdater::default(); + + let mut deployment_1 = Deployment::mock_with_uri("http://localhost:9080"); + let mut deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); + + deployment_1.id = updater + .add_deployment( + deployment_1.metadata.clone(), + vec![greeter_service()], + false, + ) + .unwrap(); + + deployment_2.id = updater + .add_deployment( + deployment_2.metadata.clone(), + vec![greeter_service()], + false, + ) + .unwrap(); + + updater + .update_deployment( + deployment_1.id, + deployment_1.metadata.clone(), + vec![greeter_service(), another_greeter_service()], + ) + .unwrap(); + + updater + .schema_information + .assert_service_deployment(GREETER_SERVICE_NAME, deployment_2.id); + updater + .schema_information + .assert_service_deployment(ANOTHER_GREETER_SERVICE_NAME, deployment_1.id); + updater + .schema_information + .assert_service_revision(GREETER_SERVICE_NAME, 2); + updater + .schema_information + .assert_service_revision(ANOTHER_GREETER_SERVICE_NAME, 1); + } } diff --git a/crates/types/src/schema/mod.rs b/crates/types/src/schema/mod.rs index eca19ec0f..c26f1036d 100644 --- a/crates/types/src/schema/mod.rs +++ b/crates/types/src/schema/mod.rs @@ -56,12 +56,12 @@ impl Schema { self.version = self.version.next(); } - /// Find existing deployment that knows about a particular endpoint - pub fn find_existing_deployment_by_endpoint( - &self, - endpoint: &DeploymentType, - ) -> Option<(&DeploymentId, &DeploymentSchemas)> { - self.deployments.iter().find(|(_, schemas)| { + /// Find existing deployments that know about a particular endpoint + pub fn find_existing_deployments_by_endpoint<'a>( + &'a self, + endpoint: &'a DeploymentType, + ) -> impl Iterator { + self.deployments.iter().filter(|(_, schemas)| { schemas.metadata.ty.protocol_type() == endpoint.protocol_type() && schemas.metadata.ty.normalized_address() == endpoint.normalized_address() }) diff --git a/crates/types/src/schema/service.rs b/crates/types/src/schema/service.rs index 7dcb99467..015a62f22 100644 --- a/crates/types/src/schema/service.rs +++ b/crates/types/src/schema/service.rs @@ -125,7 +125,7 @@ pub struct ServiceMetadata { } // This type is used only for exposing the handler metadata, and not internally. See [ServiceAndHandlerType]. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] pub enum HandlerMetadataType { Exclusive, @@ -214,6 +214,21 @@ pub struct HandlerSchemas { pub metadata: HashMap, } +impl HandlerSchemas { + pub fn as_handler_metadata(&self, name: String) -> HandlerMetadata { + HandlerMetadata { + name, + ty: self.target_meta.target_ty.into(), + documentation: self.documentation.clone(), + metadata: self.metadata.clone(), + input_description: self.target_meta.input_rules.to_string(), + output_description: self.target_meta.output_rules.to_string(), + input_json_schema: self.target_meta.input_rules.json_schema(), + output_json_schema: self.target_meta.output_rules.json_schema(), + } + } +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ServiceSchemas { pub revision: ServiceRevision, @@ -241,16 +256,7 @@ impl ServiceSchemas { handlers: self .handlers .iter() - .map(|(h_name, h_schemas)| HandlerMetadata { - name: h_name.clone(), - ty: h_schemas.target_meta.target_ty.into(), - documentation: h_schemas.documentation.clone(), - metadata: h_schemas.metadata.clone(), - input_description: h_schemas.target_meta.input_rules.to_string(), - output_description: h_schemas.target_meta.output_rules.to_string(), - input_json_schema: h_schemas.target_meta.input_rules.json_schema(), - output_json_schema: h_schemas.target_meta.output_rules.json_schema(), - }) + .map(|(h_name, h_schemas)| h_schemas.as_handler_metadata(h_name.clone())) .collect(), ty: self.ty, documentation: self.documentation.clone(),