Skip to content

Commit

Permalink
Merge pull request #479 from kelvinfan001/broadcast-before-reboot
Browse files Browse the repository at this point in the history
agent: Broadcast message to logged in users before rebooting
  • Loading branch information
Luca Bruno authored Feb 25, 2021
2 parents 0581ade + de4c260 commit 8b11c09
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 3 deletions.
1 change: 1 addition & 0 deletions dist/systemd/system/zincati.service
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ After=systemd-machine-id-commit.service
[Service]
User=zincati
Group=zincati
SupplementaryGroups=tty
Environment=ZINCATI_VERBOSITY="-v"
Type=notify
ExecStart=/usr/libexec/zincati agent ${ZINCATI_VERBOSITY}
Expand Down
23 changes: 20 additions & 3 deletions src/update_agent/actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Update agent actor.
use super::{UpdateAgent, UpdateAgentState};
use super::{broadcast, UpdateAgent, UpdateAgentState};
use crate::rpm_ostree::{self, Release};
use actix::prelude::*;
use failure::Error;
Expand Down Expand Up @@ -384,10 +384,27 @@ impl UpdateAgent {
return Box::pin(actix::fut::err(()));
}

log::info!(
"staged deployment '{}' available, proceeding to finalize it",
// Warn logged in users of imminent reboot.
let msg = format!(
"staged deployment '{}' available, proceeding to finalize it and reboot",
release.version
);
log::info!("{}", &msg);
match broadcast(&msg) {
Ok((sessions_total, sessions_broadcasted)) => {
if sessions_total != sessions_broadcasted {
log::warn!(
"{} sessions found, but only broadcasted to {}",
sessions_total,
sessions_broadcasted
);
}
}
Err(e) => {
log::error!("failed to broadcast to user sessions: {}", e);
}
}

let msg = rpm_ostree::FinalizeDeployment { release };
let upgrade = self
.rpm_ostree_actor
Expand Down
97 changes: 97 additions & 0 deletions src/update_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use crate::rpm_ostree::{Release, RpmOstreeClient};
use crate::strategy::UpdateStrategy;
use actix::Addr;
use chrono::prelude::*;
use failure::{bail, Fallible, ResultExt};
use prometheus::IntGauge;
use serde::{Deserialize, Deserializer};
use std::fs;
use std::time::Duration;

/// Default refresh interval for steady state (in seconds).
Expand Down Expand Up @@ -37,6 +40,28 @@ lazy_static::lazy_static! {
)).unwrap();
}

/// JSON output from `loginctl list-sessions --output=json`
#[derive(Debug, Deserialize)]
pub struct SessionsJSON {
user: String,
#[serde(deserialize_with = "empty_string_as_none")]
tty: Option<String>,
}

/// Function to deserialize field to `Option<String>`, where empty strings are
/// deserialized into `None`.
fn empty_string_as_none<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
if s.is_empty() {
Ok(None)
} else {
Ok(Some(s))
}
}

/// State machine for the agent.
#[derive(Clone, Debug, PartialEq, Eq)]
enum UpdateAgentState {
Expand Down Expand Up @@ -238,6 +263,78 @@ impl UpdateAgent {
}
}

/// Attempt to broadcast msg to all sessions registered in systemd's login manager.
/// Returns a Result with a tuple of total sessions found and sessions broadcasted to,
/// if no error.
fn broadcast(msg: &str) -> Fallible<(usize, usize)> {
let sessions = get_user_sessions()?;
let sessions_total = sessions.len();
let mut sessions_broadcasted: usize = 0;

let broadcast_msg = format!(
"\nBroadcast message from Zincati at {}:\n{}\n",
chrono::Utc::now().format("%a %Y-%m-%d %H:%M:%S %Z"),
msg
);

// Iterate over sessions and attempt to write to each session's tty.
for session in sessions.into_iter() {
let user = session.user;
let tty_dev = match session.tty {
Some(mut tty) => {
tty.insert_str(0, "/dev/");
tty
}
None => {
log::debug!(
"found user {} with no tty, skipping broadcast to this user",
user
);
continue;
}
};

log::trace!(
"Attempting to broadcast a message to user {} at {}",
user,
tty_dev
);

{
if let Err(e) = fs::write(&tty_dev, &broadcast_msg) {
log::error!("failed to write to {}: {}", &tty_dev, e);
continue;
};
}

sessions_broadcasted = sessions_broadcasted.saturating_add(1);
}

Ok((sessions_total, sessions_broadcasted))
}

/// Get sessions with users logged in using `loginctl`.
/// Returns a Result with vector of `SessionsJSON`, if no error.
fn get_user_sessions() -> Fallible<Vec<SessionsJSON>> {
let cmdrun = std::process::Command::new("loginctl")
.arg("list-sessions")
.arg("--output=json")
.output()
.context("failed to run `loginctl` binary")?;

if !cmdrun.status.success() {
bail!(
"`loginctl` failed to list current sessions: {}",
String::from_utf8_lossy(&cmdrun.stderr)
);
}

let sessions = serde_json::from_slice(&cmdrun.stdout)
.context("failed to deserialize output of `loginctl`")?;

Ok(sessions)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 8b11c09

Please sign in to comment.