Skip to content

Commit

Permalink
Merge pull request #2479 from didier-wenzek/feat/restart-with-context
Browse files Browse the repository at this point in the history
Add support for operation workflow triggering device restart
  • Loading branch information
didier-wenzek authored Dec 1, 2023
2 parents ea254bf + b34e5ae commit 1c9e284
Show file tree
Hide file tree
Showing 36 changed files with 782 additions and 431 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 18 additions & 12 deletions crates/common/logged_command/src/logged_command.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use log::error;
use nix::unistd::Pid;
use std::ffi::OsStr;
use std::os::unix::process::ExitStatusExt;
use std::process::Output;
use std::process::Stdio;
use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::io::BufWriter;
use tokio::process::Child;
Expand Down Expand Up @@ -198,25 +200,29 @@ impl LoggedCommand {
})
}

async fn log_outcome(
pub async fn log_outcome(
command_line: &str,
result: &Result<Output, std::io::Error>,
logger: &mut BufWriter<File>,
logger: &mut (impl AsyncWrite + Unpin),
) -> Result<(), std::io::Error> {
logger
.write_all(format!("----- $ {}\n", command_line).as_bytes())
.await?;
if !command_line.is_empty() {
logger
.write_all(format!("----- $ {}\n", command_line).as_bytes())
.await?;
}

match result.as_ref() {
Ok(output) => {
match &output.status.code() {
None => logger.write_all(b"exit status: unknown\n\n").await?,
Some(code) => {
logger
.write_all(format!("exit status: {}\n\n", code).as_bytes())
.await?
}
if let Some(code) = &output.status.code() {
logger
.write_all(format!("exit status: {}\n\n", code).as_bytes())
.await?
};
if let Some(signal) = &output.status.signal() {
logger
.write_all(format!("killed by signal: {}\n\n", signal).as_bytes())
.await?
}
logger.write_all(b"stdout <<EOF\n").await?;
logger.write_all(&output.stdout).await?;
logger.write_all(b"EOF\n\n").await?;
Expand Down
1 change: 1 addition & 0 deletions crates/common/mqtt_channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ fastrand = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
rumqttc = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt", "time"] }
zeroize = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion crates/common/mqtt_channel/src/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use crate::errors::MqttError;
use crate::Message;
use rumqttc::QoS;
use rumqttc::SubscribeFilter;
use serde::Deserialize;
use serde::Serialize;
use std::convert::TryInto;

/// An MQTT topic
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
pub struct Topic {
pub name: String,
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/plugin_sm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async-trait = { workspace = true }
csv = { workspace = true }
download = { workspace = true }
logged_command = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
23 changes: 5 additions & 18 deletions crates/core/plugin_sm/src/operation_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,15 @@ impl OperationLogs {

pub fn remove_outdated_logs(&self) -> Result<(), OperationLogsError> {
let mut log_tracker: HashMap<String, BinaryHeap<Reverse<String>>> = HashMap::new();
let re = regex::Regex::new("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}")
.expect("Regex matching a date");

// FIXME: this is a hotfix to map "software-list" and "software-update" to "software-management"
// this should be fixed in /~https://github.com/thin-edge/thin-edge.io/issues/1077
for file in (self.log_dir.read_dir()?).flatten() {
if let Some(path) = file.path().file_name().and_then(|name| name.to_str()) {
if path.starts_with("software-list") {
if let Some(date_match) = re.find(path) {
let (prefix, _) = path.split_at(date_match.start());
log_tracker
.entry("software-list".to_string())
.or_default()
.push(Reverse(path.to_string()));
} else if path.starts_with("software-update") {
log_tracker
.entry("software-update".to_string())
.or_default()
.push(Reverse(path.to_string()));
} else {
let file_name = path
.split('-')
.next()
.ok_or(OperationLogsError::FileFormatError)?;
log_tracker
.entry(file_name.to_string())
.entry(prefix.to_string())
.or_default()
.push(Reverse(path.to_string()));
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/tedge_agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ futures = { workspace = true }
hyper = { workspace = true, features = ["full"] }
lazy_static = { workspace = true }
log = { workspace = true }
logged_command = { workspace = true }
path-clean = { workspace = true }
plugin_sm = { workspace = true }
reqwest = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::restart_manager::builder::RestartManagerBuilder;
use crate::restart_manager::config::RestartManagerConfig;
use crate::software_manager::builder::SoftwareManagerBuilder;
use crate::software_manager::config::SoftwareManagerConfig;
use crate::state_repository::state::agent_state_dir;
use crate::tedge_operation_converter::builder::TedgeOperationConverterBuilder;
use crate::tedge_to_te_converter::converter::TedgetoTeConverter;
use crate::AgentOpt;
Expand Down Expand Up @@ -172,7 +173,7 @@ impl Agent {
#[instrument(skip(self), name = "sm-agent")]
pub fn init(&self) -> Result<(), anyhow::Error> {
// `config_dir` by default is `/etc/tedge` (or whatever the user sets with --config-dir)
create_directory_with_defaults(self.config.config_dir.join(".agent"))?;
create_directory_with_defaults(agent_state_dir(self.config.config_dir.clone()))?;
create_directory_with_defaults(&self.config.log_dir)?;
create_directory_with_defaults(&self.config.data_dir)?;
create_directory_with_defaults(&self.config.http_config.file_transfer_dir)?;
Expand Down Expand Up @@ -209,6 +210,7 @@ impl Agent {
self.config.mqtt_topic_root.as_ref(),
self.config.mqtt_device_topic_id.clone(),
workflows,
self.config.log_dir.clone(),
&mut software_update_builder,
&mut restart_actor_builder,
&mut mqtt_actor_builder,
Expand Down
100 changes: 36 additions & 64 deletions crates/core/tedge_agent/src/restart_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ use crate::restart_manager::restart_operation_handler::restart_operation::create
use crate::restart_manager::restart_operation_handler::restart_operation::has_rebooted;
use crate::state_repository::error::StateError;
use crate::state_repository::state::AgentStateRepository;
use crate::state_repository::state::RestartOperationStatus;
use crate::state_repository::state::State;
use crate::state_repository::state::StateRepository;
use crate::state_repository::state::StateStatus;
use async_trait::async_trait;
use std::time::Duration;
use tedge_actors::Actor;
Expand All @@ -18,7 +14,6 @@ use tedge_actors::RuntimeRequest;
use tedge_actors::Sender;
use tedge_actors::SimpleMessageBox;
use tedge_api::messages::CommandStatus;
use tedge_api::messages::RestartCommandPayload;
use tedge_api::RestartCommand;
use tedge_config::system_services::SystemConfig;
use tedge_config::system_services::SystemSpecificCommands;
Expand All @@ -37,7 +32,7 @@ const SUDO: &str = "echo";

pub struct RestartManagerActor {
config: RestartManagerConfig,
state_repository: AgentStateRepository,
state_repository: AgentStateRepository<RestartCommand>,
message_box: SimpleMessageBox<RestartCommand, RestartCommand>,
}

Expand All @@ -51,6 +46,7 @@ impl Actor for RestartManagerActor {
if let Some(response) = self.process_pending_restart_operation().await {
self.message_box.send(response).await?;
}
self.clear_state_repository().await;

while let Some(request) = self.message_box.recv().await {
if request.status() != CommandStatus::Scheduled {
Expand Down Expand Up @@ -112,10 +108,8 @@ impl RestartManagerActor {
config: RestartManagerConfig,
message_box: SimpleMessageBox<RestartCommand, RestartCommand>,
) -> Self {
let state_repository = AgentStateRepository::new_with_file_name(
config.config_dir.clone(),
"restart-current-operation",
);
let state_repository =
AgentStateRepository::new(config.config_dir.clone(), "restart-current-operation");
Self {
config,
state_repository,
Expand All @@ -125,71 +119,49 @@ impl RestartManagerActor {

async fn process_pending_restart_operation(&mut self) -> Option<RestartCommand> {
match self.state_repository.load().await {
Ok(State {
operation_id: Some(operation_id),
operation: Some(operation),
}) => {
self.clear_state_repository().await;

let command = RestartCommand {
target: self.config.device_topic_id.clone(),
cmd_id: operation_id,
payload: RestartCommandPayload::default(),
};

match operation {
StateStatus::Restart(RestartOperationStatus::Restarting) => {
let command = match has_rebooted(&self.config.tmp_dir) {
Ok(true) => {
info!("Device restart successful");
command.with_status(CommandStatus::Successful)
}
Ok(false) => {
let error = "Device failed to restart";
error!(error);
command.with_error(error.to_string())
}
Err(err) => {
let error = format!("Fail to detect a restart: {err}");
error!(error);
command.with_error(error)
}
};

Some(command)
Ok(Some(command)) if command.status() == CommandStatus::Executing => {
let command = match has_rebooted(&self.config.tmp_dir) {
Ok(true) => {
info!("Device restart successful");
command.with_status(CommandStatus::Successful)
}
StateStatus::Restart(RestartOperationStatus::Pending) => {
let error = "The agent has been restarted but not the device";
Ok(false) => {
let error = "Device failed to restart";
error!(error);
Some(command.with_error(error.to_string()))
command.with_error(error.to_string())
}
StateStatus::Software(_) | StateStatus::UnknownOperation => {
error!("UnknownOperation in store.");
None
Err(err) => {
let error = format!("Fail to detect a restart: {err}");
error!(error);
command.with_error(error)
}
}
};

Some(command)
}
Ok(Some(command)) => {
let error = "The agent has been restarted but not the device";
error!(error);
Some(command.with_error(error.to_string()))
}
Err(StateError::LoadingFromFileFailed { source, .. })
if source.kind() == std::io::ErrorKind::NotFound =>
{
// file missing means the operation has never been performed, so just do nothing
None
}
Err(err) => {
match err {
// file missing means we don't have to perform the operation, so just do nothing
StateError::LoadingFromFileFailed { source, .. }
if source.kind() == std::io::ErrorKind::NotFound => {}
// if read failed for some other reason, we should probably log it
_ => error!("{err}"),
}
// if read failed for some other reason, we should probably log it
error!("{err}");
None
}
Ok(_) => None,
Ok(None) => None,
}
}

async fn update_state_repository(&mut self, command: RestartCommand) -> RestartCommand {
let state = State {
operation_id: Some(command.cmd_id.clone()),
operation: Some(StateStatus::Restart(RestartOperationStatus::Restarting)),
};

if let Err(err) = self.state_repository.store(&state).await {
let command = command.with_status(CommandStatus::Executing);
if let Err(err) = self.state_repository.store(&command).await {
let reason = format!(
"Fail to update the restart state in {} due to: {}",
self.state_repository.state_repo_path, err
Expand All @@ -207,7 +179,7 @@ impl RestartManagerActor {
return command.with_error(reason);
}

command.with_status(CommandStatus::Executing)
command
}

/// Run the restart command
Expand Down
Loading

1 comment on commit 1c9e284

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
388 0 3 388 100 53m30.622s

Please sign in to comment.