From b07da55240cd9e5dc7064937709c0743d3bf8616 Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Fri, 24 May 2024 00:12:57 +0200 Subject: [PATCH] Add clear state of a specific instance or of all instances. (#1530) --- cli/src/clients/datafusion_helpers.rs | 43 ++++++++++- cli/src/commands/state/clear.rs | 100 ++++++++++++++++++++++++++ cli/src/commands/state/mod.rs | 3 + 3 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 cli/src/commands/state/clear.rs diff --git a/cli/src/clients/datafusion_helpers.rs b/cli/src/clients/datafusion_helpers.rs index f7b5177bb..591dfc238 100644 --- a/cli/src/clients/datafusion_helpers.rs +++ b/cli/src/clients/datafusion_helpers.rs @@ -24,10 +24,11 @@ use restate_admin_rest_model::deployments::DeploymentId; use anyhow::Result; use arrow_convert::{ArrowDeserialize, ArrowField}; +use bytes::Bytes; use chrono::{DateTime, Duration, Local, TimeZone}; use restate_admin_rest_model::services::ServiceType; use restate_service_protocol::awakeable_id::AwakeableIdentifier; -use restate_types::identifiers::InvocationId; +use restate_types::identifiers::{InvocationId, ServiceId}; static JOURNAL_QUERY_LIMIT: usize = 100; @@ -991,3 +992,43 @@ pub async fn get_invocation_journal( journal.reverse(); Ok(journal) } + +#[derive(Debug, Clone, PartialEq, ArrowField, ArrowDeserialize)] +pub struct StateKeysQueryResult { + service_name: Option, + service_key: Option, + key: Option, + value: Option>, +} + +pub(crate) async fn get_state_keys( + client: &DataFusionHttpClient, + service: &str, + key: Option<&str>, +) -> Result>> { + let filter = if let Some(k) = key { + format!("service_name = '{}' AND service_key = '{}'", service, k) + } else { + format!("service_name = '{}'", service) + }; + let sql = format!( + "SELECT service_name, service_key, key, value FROM state WHERE {}", + filter + ); + let query_result_iter = client + .run_query_and_map_results::(sql) + .await?; + + #[allow(clippy::mutable_key_type)] + let mut user_state: HashMap> = HashMap::new(); + for row in query_result_iter { + user_state + .entry(ServiceId::new( + row.service_name.expect("service_name"), + row.service_key.expect("service_key"), + )) + .or_default() + .insert(row.key.expect("key"), Bytes::from(row.value.expect("key"))); + } + Ok(user_state) +} diff --git a/cli/src/commands/state/clear.rs b/cli/src/commands/state/clear.rs new file mode 100644 index 000000000..b74700b09 --- /dev/null +++ b/cli/src/commands/state/clear.rs @@ -0,0 +1,100 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::c_indent_table; +use crate::cli_env::CliEnv; +use crate::clients::datafusion_helpers::get_state_keys; +use crate::commands::state::util::{compute_version, update_state}; +use crate::console::c_println; +use crate::ui::console::{confirm_or_exit, StyledTable}; +use anyhow::{bail, Result}; +use cling::prelude::*; +use comfy_table::{Cell, Table}; +use crossterm::style::Stylize; +use itertools::Itertools; +use std::collections::HashMap; + +#[derive(Run, Parser, Collect, Clone)] +#[cling(run = "run_clear")] +pub struct Clear { + /// A string with either service name and key, or only the service name, e.g.: + /// * `virtualObjectName` + /// * `virtualObjectName/key` + /// * `workflowName` + /// * `workflowName/key` + query: String, + + /// Force means, ignore the current version + #[clap(long, short)] + force: bool, +} + +pub async fn run_clear(State(env): State, opts: &Clear) -> Result<()> { + clear(&env, opts).await +} + +async fn clear(env: &CliEnv, opts: &Clear) -> Result<()> { + let sql_client = crate::clients::DataFusionHttpClient::new(env)?; + + let (svc, key) = match opts.query.split_once('/') { + None => (opts.query.as_str(), None), + Some((svc, key)) => (svc, Some(key)), + }; + + #[allow(clippy::mutable_key_type)] + let services_state = get_state_keys(&sql_client, svc, key).await?; + if services_state.is_empty() { + bail!("No state found!"); + } + + let mut table = Table::new_styled(&env.ui_config); + table.set_styled_header(vec!["SERVICE/KEY", "STATE"]); + for (svc_id, svc_state) in &services_state { + table.add_row(vec![ + Cell::new(format!("{}/{}", svc_id.service_name, svc_id.key)), + Cell::new(format!("[{}]", svc_state.keys().join(", "))), + ]); + } + c_indent_table!(0, table); + + c_println!(); + + c_println!( + "Going to {} all the aforementioned state entries.", + "remove".bold().red() + ); + c_println!("About to submit the new state mutation to the system for processing."); + c_println!("If there are currently active invocations, then this mutation will be enqueued to be processed after them."); + c_println!(); + confirm_or_exit(env, "Are you sure?")?; + + c_println!(); + + for (svc_id, svc_state) in services_state { + let version = if opts.force { + None + } else { + Some(compute_version(&svc_state)) + }; + update_state( + env, + version, + &svc_id.service_name, + &svc_id.key, + HashMap::default(), + ) + .await?; + } + + c_println!(); + c_println!("Enqueued successfully for processing"); + + Ok(()) +} diff --git a/cli/src/commands/state/mod.rs b/cli/src/commands/state/mod.rs index 825836b72..fb2c6a611 100644 --- a/cli/src/commands/state/mod.rs +++ b/cli/src/commands/state/mod.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod clear; mod edit; mod get; mod util; @@ -20,4 +21,6 @@ pub enum ServiceState { Get(get::Get), /// Edit the persisted state stored for a service key Edit(edit::Edit), + /// Clear of the state of a given service + Clear(clear::Clear), }