Skip to content

Commit

Permalink
Share websocket between clones of Manager (#97)
Browse files Browse the repository at this point in the history
Also add non-exhaustive debug impl for Registered state and manager in
case State implements it.

Fixes #96
  • Loading branch information
boxdot authored Jan 20, 2023
1 parent fde2eaa commit d1d72ab
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ libsodium-sys = { version = "0.2.7", optional = true }
matrix-sdk-store-encryption = { version = "0.2.0", optional = true }
secrets = { version = "1.2.0", features = ["use-libsodium-sys"], optional = true }
sled = { version = "0.34", optional = true }
parking_lot = "0.11.2"

[build-dependencies]
prost-build = "0.10"
Expand Down
37 changes: 28 additions & 9 deletions src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::time::{Duration, UNIX_EPOCH};
use std::{
fmt,
sync::Arc,
time::{Duration, UNIX_EPOCH},
};

use futures::{channel::mpsc, channel::oneshot, future, pin_mut, AsyncReadExt, Stream, StreamExt};
use log::{debug, error, info, trace};
use parking_lot::Mutex;
use rand::{distributions::Alphanumeric, prelude::ThreadRng, Rng, RngCore};
use serde::{Deserialize, Serialize};
use url::Url;
Expand All @@ -19,10 +24,7 @@ use libsignal_service::{
protocol::{KeyPair, PrivateKey, PublicKey},
Content, Envelope, GroupMasterKey, GroupSecretParams, PushService, Uuid,
},
proto::{
sync_message::{self},
AttachmentPointer, GroupContextV2,
},
proto::{sync_message, AttachmentPointer, GroupContextV2},
provisioning::{
generate_registration_id, LinkingManager, ProvisioningManager, SecondaryDeviceProvisioning,
VerificationCodeResponse,
Expand Down Expand Up @@ -54,6 +56,14 @@ pub struct Manager<Store, State> {
state: State,
}

impl<Store, State: fmt::Debug> fmt::Debug for Manager<Store, State> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Manager")
.field("state", &self.state)
.finish_non_exhaustive()
}
}

#[derive(Clone, Serialize, Deserialize)]
pub struct RegistrationOptions<'a> {
pub signal_servers: SignalServers,
Expand All @@ -77,7 +87,7 @@ pub struct Registered {
#[serde(skip)]
push_service_cache: CacheCell<HyperPushService>,
#[serde(skip)]
websocket: Option<SignalWebSocket>,
websocket: Arc<Mutex<Option<SignalWebSocket>>>,

pub signal_servers: SignalServers,
pub device_name: Option<String>,
Expand All @@ -95,6 +105,14 @@ pub struct Registered {
profile_key: ProfileKey,
}

impl fmt::Debug for Registered {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Registered")
.field("websocket", &self.websocket.lock().is_some())
.finish_non_exhaustive()
}
}

impl Registered {
pub fn device_id(&self) -> u32 {
self.device_id.unwrap_or(DEFAULT_DEVICE_ID)
Expand Down Expand Up @@ -309,7 +327,7 @@ impl<C: Store> Manager<C, Linking> {
config_store,
state: Registered {
push_service_cache: CacheCell::default(),
websocket: None,
websocket: Default::default(),
signal_servers,
device_name: Some(device_name),
phone_number,
Expand Down Expand Up @@ -424,7 +442,7 @@ impl<C: Store> Manager<C, Confirmation> {
config_store: self.config_store,
state: Registered {
push_service_cache: CacheCell::default(),
websocket: None,
websocket: Default::default(),
signal_servers: self.state.signal_servers,
device_name: None,
phone_number,
Expand Down Expand Up @@ -633,7 +651,7 @@ impl<C: Store> Manager<C, Registered> {
let pipe = MessageReceiver::new(self.push_service()?)
.create_message_pipe(credentials)
.await?;
self.state.websocket.replace(pipe.ws());
self.state.websocket.lock().replace(pipe.ws());
Ok(pipe.stream())
}

Expand Down Expand Up @@ -903,6 +921,7 @@ impl<C: Store> Manager<C, Registered> {
Ok(MessageSender::new(
self.state
.websocket
.lock()
.clone()
.ok_or(Error::MessagePipeNotStarted)?,
self.push_service()?,
Expand Down

0 comments on commit d1d72ab

Please sign in to comment.