Skip to content

Commit

Permalink
Add clear state of a specific instance or of all instances. (#1530)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored May 23, 2024
1 parent c619ff1 commit b07da55
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 1 deletion.
43 changes: 42 additions & 1 deletion cli/src/clients/datafusion_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use restate_admin_rest_model::deployments::DeploymentId;

use anyhow::Result;
use arrow_convert::{ArrowDeserialize, ArrowField};
use bytes::Bytes;
use chrono::{DateTime, Duration, Local, TimeZone};
use restate_admin_rest_model::services::ServiceType;
use restate_service_protocol::awakeable_id::AwakeableIdentifier;
use restate_types::identifiers::InvocationId;
use restate_types::identifiers::{InvocationId, ServiceId};

static JOURNAL_QUERY_LIMIT: usize = 100;

Expand Down Expand Up @@ -991,3 +992,43 @@ pub async fn get_invocation_journal(
journal.reverse();
Ok(journal)
}

#[derive(Debug, Clone, PartialEq, ArrowField, ArrowDeserialize)]
pub struct StateKeysQueryResult {
service_name: Option<String>,
service_key: Option<String>,
key: Option<String>,
value: Option<Vec<u8>>,
}

pub(crate) async fn get_state_keys(
client: &DataFusionHttpClient,
service: &str,
key: Option<&str>,
) -> Result<HashMap<ServiceId, HashMap<String, Bytes>>> {
let filter = if let Some(k) = key {
format!("service_name = '{}' AND service_key = '{}'", service, k)
} else {
format!("service_name = '{}'", service)
};
let sql = format!(
"SELECT service_name, service_key, key, value FROM state WHERE {}",
filter
);
let query_result_iter = client
.run_query_and_map_results::<StateKeysQueryResult>(sql)
.await?;

#[allow(clippy::mutable_key_type)]
let mut user_state: HashMap<ServiceId, HashMap<String, Bytes>> = HashMap::new();
for row in query_result_iter {
user_state
.entry(ServiceId::new(
row.service_name.expect("service_name"),
row.service_key.expect("service_key"),
))
.or_default()
.insert(row.key.expect("key"), Bytes::from(row.value.expect("key")));
}
Ok(user_state)
}
100 changes: 100 additions & 0 deletions cli/src/commands/state/clear.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::c_indent_table;
use crate::cli_env::CliEnv;
use crate::clients::datafusion_helpers::get_state_keys;
use crate::commands::state::util::{compute_version, update_state};
use crate::console::c_println;
use crate::ui::console::{confirm_or_exit, StyledTable};
use anyhow::{bail, Result};
use cling::prelude::*;
use comfy_table::{Cell, Table};
use crossterm::style::Stylize;
use itertools::Itertools;
use std::collections::HashMap;

#[derive(Run, Parser, Collect, Clone)]
#[cling(run = "run_clear")]
pub struct Clear {
/// A string with either service name and key, or only the service name, e.g.:
/// * `virtualObjectName`
/// * `virtualObjectName/key`
/// * `workflowName`
/// * `workflowName/key`
query: String,

/// Force means, ignore the current version
#[clap(long, short)]
force: bool,
}

pub async fn run_clear(State(env): State<CliEnv>, opts: &Clear) -> Result<()> {
clear(&env, opts).await
}

async fn clear(env: &CliEnv, opts: &Clear) -> Result<()> {
let sql_client = crate::clients::DataFusionHttpClient::new(env)?;

Check failure on line 44 in cli/src/commands/state/clear.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

the `?` operator can only be applied to values that implement `std::ops::Try`

let (svc, key) = match opts.query.split_once('/') {
None => (opts.query.as_str(), None),
Some((svc, key)) => (svc, Some(key)),
};

#[allow(clippy::mutable_key_type)]
let services_state = get_state_keys(&sql_client, svc, key).await?;
if services_state.is_empty() {
bail!("No state found!");
}

let mut table = Table::new_styled(&env.ui_config);
table.set_styled_header(vec!["SERVICE/KEY", "STATE"]);
for (svc_id, svc_state) in &services_state {
table.add_row(vec![
Cell::new(format!("{}/{}", svc_id.service_name, svc_id.key)),
Cell::new(format!("[{}]", svc_state.keys().join(", "))),
]);
}
c_indent_table!(0, table);

c_println!();

c_println!(
"Going to {} all the aforementioned state entries.",
"remove".bold().red()
);
c_println!("About to submit the new state mutation to the system for processing.");
c_println!("If there are currently active invocations, then this mutation will be enqueued to be processed after them.");
c_println!();
confirm_or_exit(env, "Are you sure?")?;

c_println!();

for (svc_id, svc_state) in services_state {
let version = if opts.force {
None
} else {
Some(compute_version(&svc_state))
};
update_state(
env,
version,
&svc_id.service_name,
&svc_id.key,
HashMap::default(),
)
.await?;
}

c_println!();
c_println!("Enqueued successfully for processing");

Ok(())
}
3 changes: 3 additions & 0 deletions cli/src/commands/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod clear;
mod edit;
mod get;
mod util;
Expand All @@ -20,4 +21,6 @@ pub enum ServiceState {
Get(get::Get),
/// Edit the persisted state stored for a service key
Edit(edit::Edit),
/// Clear of the state of a given service
Clear(clear::Clear),
}

0 comments on commit b07da55

Please sign in to comment.