Skip to content

Commit

Permalink
Merge pull request #2405 from albinsuresh/refactor/tedge-plugins-init…
Browse files Browse the repository at this point in the history
…-use-file-util-move_file

Tedge ops plugins init logic use async move_file
  • Loading branch information
albinsuresh authored Nov 3, 2023
2 parents f2c7c61 + a7e018d commit d07775a
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 30 deletions.
16 changes: 11 additions & 5 deletions crates/extensions/tedge_config_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ mod tests;

use actor::*;
pub use config::*;
use std::fs::rename;
use std::path::PathBuf;
use tedge_actors::futures::channel::mpsc;
use tedge_actors::Builder;
Expand All @@ -25,7 +24,9 @@ use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::TopicFilter;
use tedge_utils::file::create_directory_with_defaults;
use tedge_utils::file::create_file_with_defaults;
use tedge_utils::file::move_file;
use tedge_utils::file::FileError;
use tedge_utils::file::PermissionEntry;

/// An instance of the config manager
///
Expand All @@ -41,7 +42,7 @@ pub struct ConfigManagerBuilder {
}

impl ConfigManagerBuilder {
pub fn try_new(
pub async fn try_new(
config: ConfigManagerConfig,
mqtt: &mut impl ServiceProvider<MqttMessage, MqttMessage, TopicFilter>,
fs_notify: &mut impl MessageSource<FsWatchEvent, PathBuf>,
Expand All @@ -52,7 +53,7 @@ impl ConfigManagerBuilder {
>,
uploader_actor: &mut impl ServiceProvider<ConfigUploadRequest, ConfigUploadResult, NoConfig>,
) -> Result<Self, FileError> {
Self::init(&config)?;
Self::init(&config).await?;

let plugin_config = PluginConfig::new(config.plugin_config_path.as_path());

Expand Down Expand Up @@ -88,7 +89,7 @@ impl ConfigManagerBuilder {
})
}

pub fn init(config: &ConfigManagerConfig) -> Result<(), FileError> {
pub async fn init(config: &ConfigManagerConfig) -> Result<(), FileError> {
if config.plugin_config_path.exists() {
return Ok(());
}
Expand All @@ -101,7 +102,12 @@ impl ConfigManagerBuilder {
.join("c8y")
.join("c8y-configuration-plugin.toml");
if legacy_plugin_config.exists() {
rename(legacy_plugin_config, &config.plugin_config_path)?;
move_file(
legacy_plugin_config,
&config.plugin_config_path,
PermissionEntry::default(),
)
.await?;
return Ok(());
}

Expand Down
22 changes: 13 additions & 9 deletions crates/extensions/tedge_config_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn prepare() -> Result<TempTedgeDir, anyhow::Error> {
}

#[allow(clippy::type_complexity)]
fn new_config_manager_builder(
async fn new_config_manager_builder(
temp_dir: &Path,
) -> (
ConfigManagerBuilder,
Expand Down Expand Up @@ -100,6 +100,7 @@ fn new_config_manager_builder(
&mut downloader_builder,
&mut uploader_builder,
)
.await
.unwrap();

(
Expand All @@ -111,15 +112,16 @@ fn new_config_manager_builder(
)
}

fn spawn_config_manager_actor(
async fn spawn_config_manager_actor(
temp_dir: &Path,
) -> (
MqttMessageBox,
SimpleMessageBox<NoMessage, FsWatchEvent>,
DownloaderMessageBox,
UploaderMessageBox,
) {
let (actor_builder, mqtt, fs, downloader, uploader) = new_config_manager_builder(temp_dir);
let (actor_builder, mqtt, fs, downloader, uploader) =
new_config_manager_builder(temp_dir).await;
let actor = actor_builder.build();
tokio::spawn(async move { actor.run().await });
(mqtt, fs, downloader, uploader)
Expand All @@ -128,7 +130,7 @@ fn spawn_config_manager_actor(
#[tokio::test]
async fn config_manager_reloads_config_types() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _fs, _downloader, _uploader) = spawn_config_manager_actor(tempdir.path());
let (mut mqtt, _fs, _downloader, _uploader) = spawn_config_manager_actor(tempdir.path()).await;

let config_snapshot_reload_topic = Topic::new_unchecked("te/device/main///cmd/config_snapshot");
let config_update_reload_topic = Topic::new_unchecked("te/device/main///cmd/config_update");
Expand Down Expand Up @@ -161,7 +163,8 @@ async fn config_manager_reloads_config_types() -> Result<(), anyhow::Error> {
#[tokio::test]
async fn config_manager_uploads_snapshot() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _fs, _downloader, mut uploader) = spawn_config_manager_actor(tempdir.path());
let (mut mqtt, _fs, _downloader, mut uploader) =
spawn_config_manager_actor(tempdir.path()).await;

let config_topic = Topic::new_unchecked("te/device/main///cmd/config_snapshot/1234");

Expand Down Expand Up @@ -224,7 +227,8 @@ async fn config_manager_uploads_snapshot() -> Result<(), anyhow::Error> {
#[tokio::test]
async fn config_manager_download_update() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _fs, mut downloader, _uploader) = spawn_config_manager_actor(tempdir.path());
let (mut mqtt, _fs, mut downloader, _uploader) =
spawn_config_manager_actor(tempdir.path()).await;

let config_topic = Topic::new_unchecked("te/device/main///cmd/config_update/1234");

Expand Down Expand Up @@ -289,7 +293,7 @@ async fn config_manager_download_update() -> Result<(), anyhow::Error> {
#[tokio::test]
async fn request_config_snapshot_that_does_not_exist() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _fs, _downloader, _uploader) = spawn_config_manager_actor(tempdir.path());
let (mut mqtt, _fs, _downloader, _uploader) = spawn_config_manager_actor(tempdir.path()).await;

let config_topic = Topic::new_unchecked("te/device/main///cmd/config_snapshot/1234");

Expand Down Expand Up @@ -335,7 +339,7 @@ async fn request_config_snapshot_that_does_not_exist() -> Result<(), anyhow::Err
#[tokio::test]
async fn ignore_topic_for_another_device() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _fs, _downloader, _uploader) = spawn_config_manager_actor(tempdir.path());
let (mut mqtt, _fs, _downloader, _uploader) = spawn_config_manager_actor(tempdir.path()).await;

// Check for child device topic
let another_device_topic = Topic::new_unchecked("te/device/child01///cmd/config-snapshot/1234");
Expand Down Expand Up @@ -363,7 +367,7 @@ async fn ignore_topic_for_another_device() -> Result<(), anyhow::Error> {
#[tokio::test]
async fn send_incorrect_payload() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _fs, _downloader, _uploader) = spawn_config_manager_actor(tempdir.path());
let (mut mqtt, _fs, _downloader, _uploader) = spawn_config_manager_actor(tempdir.path()).await;

let config_topic = Topic::new_unchecked("te/device/main///cmd/config_snapshot/1234");

Expand Down
16 changes: 11 additions & 5 deletions crates/extensions/tedge_log_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ mod tests;
pub use actor::*;
pub use config::*;
use log_manager::LogPluginConfig;
use std::fs::rename;
use std::path::PathBuf;
use tedge_actors::adapt;
use tedge_actors::Builder;
Expand All @@ -27,7 +26,9 @@ use tedge_file_system_ext::FsWatchEvent;
use tedge_mqtt_ext::*;
use tedge_utils::file::create_directory_with_defaults;
use tedge_utils::file::create_file_with_defaults;
use tedge_utils::file::move_file;
use tedge_utils::file::FileError;
use tedge_utils::file::PermissionEntry;

/// This is an actor builder.
pub struct LogManagerBuilder {
Expand All @@ -39,13 +40,13 @@ pub struct LogManagerBuilder {
}

impl LogManagerBuilder {
pub fn try_new(
pub async fn try_new(
config: LogManagerConfig,
mqtt: &mut impl ServiceProvider<MqttMessage, MqttMessage, TopicFilter>,
fs_notify: &mut impl MessageSource<FsWatchEvent, PathBuf>,
uploader_actor: &mut impl ServiceProvider<LogUploadRequest, LogUploadResult, NoConfig>,
) -> Result<Self, FileError> {
Self::init(&config)?;
Self::init(&config).await?;
let plugin_config = LogPluginConfig::new(&config.plugin_config_path);

let box_builder = SimpleMessageBoxBuilder::new("Log Manager", 16);
Expand All @@ -70,7 +71,7 @@ impl LogManagerBuilder {
})
}

pub fn init(config: &LogManagerConfig) -> Result<(), FileError> {
pub async fn init(config: &LogManagerConfig) -> Result<(), FileError> {
if config.plugin_config_path.exists() {
return Ok(());
}
Expand All @@ -80,7 +81,12 @@ impl LogManagerBuilder {

let legacy_plugin_config = config.config_dir.join("c8y").join("c8y-log-plugin.toml");
if legacy_plugin_config.exists() {
rename(legacy_plugin_config, &config.plugin_config_path)?;
move_file(
legacy_plugin_config,
&config.plugin_config_path,
PermissionEntry::default(),
)
.await?;
return Ok(());
}

Expand Down
19 changes: 10 additions & 9 deletions crates/extensions/tedge_log_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn prepare() -> Result<TempTedgeDir, anyhow::Error> {
/// Create a log manager actor builder
/// along two boxes to exchange MQTT and HTTP messages with the log actor
#[allow(clippy::type_complexity)]
fn new_log_manager_builder(
async fn new_log_manager_builder(
temp_dir: &Path,
) -> (
LogManagerBuilder,
Expand Down Expand Up @@ -108,6 +108,7 @@ fn new_log_manager_builder(
&mut fs_watcher_builder,
&mut uploader_builder,
)
.await
.unwrap();

(
Expand All @@ -119,14 +120,14 @@ fn new_log_manager_builder(
}

/// Spawn a log manager actor and return 2 boxes to exchange MQTT and HTTP messages with it
fn spawn_log_manager_actor(
async fn spawn_log_manager_actor(
temp_dir: &Path,
) -> (
MqttMessageBox,
SimpleMessageBox<NoMessage, FsWatchEvent>,
UploaderMessageBox,
) {
let (actor_builder, mqtt, fs, uploader) = new_log_manager_builder(temp_dir);
let (actor_builder, mqtt, fs, uploader) = new_log_manager_builder(temp_dir).await;
let actor = actor_builder.build();
tokio::spawn(async move { actor.run().await });
(mqtt, fs, uploader)
Expand All @@ -135,7 +136,7 @@ fn spawn_log_manager_actor(
#[tokio::test]
async fn log_manager_reloads_log_types() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _fs, _uploader) = spawn_log_manager_actor(tempdir.path());
let (mut mqtt, _fs, _uploader) = spawn_log_manager_actor(tempdir.path()).await;

let log_reload_topic = Topic::new_unchecked("te/device/main///cmd/log_upload");

Expand All @@ -156,7 +157,7 @@ async fn log_manager_reloads_log_types() -> Result<(), anyhow::Error> {
#[tokio::test]
async fn log_manager_upload_log_files_on_request() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _fs, mut uploader) = spawn_log_manager_actor(tempdir.path());
let (mut mqtt, _fs, mut uploader) = spawn_log_manager_actor(tempdir.path()).await;

let logfile_topic = Topic::new_unchecked("te/device/main///cmd/log_upload/1234");

Expand Down Expand Up @@ -220,7 +221,7 @@ async fn log_manager_upload_log_files_on_request() -> Result<(), anyhow::Error>
#[tokio::test]
async fn request_logtype_that_does_not_exist() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _fs, _uploader) = spawn_log_manager_actor(tempdir.path());
let (mut mqtt, _fs, _uploader) = spawn_log_manager_actor(tempdir.path()).await;

let logfile_topic = Topic::new_unchecked("te/device/main///cmd/log_upload/1234");

Expand Down Expand Up @@ -267,7 +268,7 @@ async fn request_logtype_that_does_not_exist() -> Result<(), anyhow::Error> {
#[tokio::test]
async fn ignore_topic_for_another_device() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _http, _fs) = spawn_log_manager_actor(tempdir.path());
let (mut mqtt, _http, _fs) = spawn_log_manager_actor(tempdir.path()).await;

// Check for child device topic
let another_device_topic = Topic::new_unchecked("te/device/child01///cmd/log_upload/1234");
Expand Down Expand Up @@ -297,7 +298,7 @@ async fn ignore_topic_for_another_device() -> Result<(), anyhow::Error> {
#[tokio::test]
async fn send_incorrect_payload() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _fs, _uploader) = spawn_log_manager_actor(tempdir.path());
let (mut mqtt, _fs, _uploader) = spawn_log_manager_actor(tempdir.path()).await;

let logfile_topic = Topic::new_unchecked("te/device/main///cmd/log_upload/1234");

Expand Down Expand Up @@ -326,7 +327,7 @@ async fn send_incorrect_payload() -> Result<(), anyhow::Error> {
#[tokio::test]
async fn read_log_from_file_that_does_not_exist() -> Result<(), anyhow::Error> {
let tempdir = prepare()?;
let (mut mqtt, _fs, _uploader) = spawn_log_manager_actor(tempdir.path());
let (mut mqtt, _fs, _uploader) = spawn_log_manager_actor(tempdir.path()).await;

let logfile_topic = Topic::new_unchecked("te/device/main///cmd/log_upload/1234");

Expand Down
3 changes: 2 additions & 1 deletion plugins/tedge_configuration_plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ async fn run_with(
&mut fs_watch_actor,
&mut downloader_actor,
&mut uploader_actor,
)?;
)
.await?;

// Shutdown on SIGINT
let signal_actor = SignalActor::builder(&runtime.get_handle());
Expand Down
3 changes: 2 additions & 1 deletion plugins/tedge_log_plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ async fn run_with(
&mut mqtt_actor,
&mut fs_watch_actor,
&mut uploader_actor,
)?;
)
.await?;

// Shutdown on SIGINT
let signal_actor = SignalActor::builder(&runtime.get_handle());
Expand Down

1 comment on commit d07775a

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
356 0 3 356 100 59m27.855s

Please sign in to comment.