From d1d72ab825762c53524cc46ac24bb5aff9f8e6d0 Mon Sep 17 00:00:00 2001 From: boxdot Date: Fri, 20 Jan 2023 12:35:02 +0100 Subject: [PATCH] Share websocket between clones of Manager (#97) Also add non-exhaustive debug impl for Registered state and manager in case State implements it. Fixes #96 --- Cargo.toml | 1 + src/manager.rs | 37 ++++++++++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f1f189d3d..23364be31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ libsodium-sys = { version = "0.2.7", optional = true } matrix-sdk-store-encryption = { version = "0.2.0", optional = true } secrets = { version = "1.2.0", features = ["use-libsodium-sys"], optional = true } sled = { version = "0.34", optional = true } +parking_lot = "0.11.2" [build-dependencies] prost-build = "0.10" diff --git a/src/manager.rs b/src/manager.rs index d3506c033..3ac491f4d 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -1,7 +1,12 @@ -use std::time::{Duration, UNIX_EPOCH}; +use std::{ + fmt, + sync::Arc, + time::{Duration, UNIX_EPOCH}, +}; use futures::{channel::mpsc, channel::oneshot, future, pin_mut, AsyncReadExt, Stream, StreamExt}; use log::{debug, error, info, trace}; +use parking_lot::Mutex; use rand::{distributions::Alphanumeric, prelude::ThreadRng, Rng, RngCore}; use serde::{Deserialize, Serialize}; use url::Url; @@ -19,10 +24,7 @@ use libsignal_service::{ protocol::{KeyPair, PrivateKey, PublicKey}, Content, Envelope, GroupMasterKey, GroupSecretParams, PushService, Uuid, }, - proto::{ - sync_message::{self}, - AttachmentPointer, GroupContextV2, - }, + proto::{sync_message, AttachmentPointer, GroupContextV2}, provisioning::{ generate_registration_id, LinkingManager, ProvisioningManager, SecondaryDeviceProvisioning, VerificationCodeResponse, @@ -54,6 +56,14 @@ pub struct Manager { state: State, } +impl fmt::Debug for Manager { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Manager") + .field("state", &self.state) + .finish_non_exhaustive() + } +} + #[derive(Clone, Serialize, Deserialize)] pub struct RegistrationOptions<'a> { pub signal_servers: SignalServers, @@ -77,7 +87,7 @@ pub struct Registered { #[serde(skip)] push_service_cache: CacheCell, #[serde(skip)] - websocket: Option, + websocket: Arc>>, pub signal_servers: SignalServers, pub device_name: Option, @@ -95,6 +105,14 @@ pub struct Registered { profile_key: ProfileKey, } +impl fmt::Debug for Registered { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Registered") + .field("websocket", &self.websocket.lock().is_some()) + .finish_non_exhaustive() + } +} + impl Registered { pub fn device_id(&self) -> u32 { self.device_id.unwrap_or(DEFAULT_DEVICE_ID) @@ -309,7 +327,7 @@ impl Manager { config_store, state: Registered { push_service_cache: CacheCell::default(), - websocket: None, + websocket: Default::default(), signal_servers, device_name: Some(device_name), phone_number, @@ -424,7 +442,7 @@ impl Manager { config_store: self.config_store, state: Registered { push_service_cache: CacheCell::default(), - websocket: None, + websocket: Default::default(), signal_servers: self.state.signal_servers, device_name: None, phone_number, @@ -633,7 +651,7 @@ impl Manager { let pipe = MessageReceiver::new(self.push_service()?) .create_message_pipe(credentials) .await?; - self.state.websocket.replace(pipe.ws()); + self.state.websocket.lock().replace(pipe.ws()); Ok(pipe.stream()) } @@ -903,6 +921,7 @@ impl Manager { Ok(MessageSender::new( self.state .websocket + .lock() .clone() .ok_or(Error::MessagePipeNotStarted)?, self.push_service()?,