Skip to content

Commit

Permalink
cleanup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Oct 11, 2023
1 parent 5565a18 commit dd63dfd
Showing 1 changed file with 11 additions and 95 deletions.
106 changes: 11 additions & 95 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
#![cfg(feature = "mem-db")]

use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
use std::{
future::Future,
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};

use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use futures::{Stream, StreamExt};
use iroh::{
client::mem::Doc,
node::{Builder, Node},
rpc_protocol::ShareMode,
sync_engine::{LiveEvent, SyncEvent},
};
use iroh_net::key::{PublicKey, SecretKey};
use iroh_net::{
key::{PublicKey, SecretKey},
PeerAddr,
};
use quic_rpc::transport::misc::DummyServerEndpoint;
use rand::{CryptoRng, Rng, SeedableRng};
use tracing::{debug, info};
Expand Down Expand Up @@ -352,51 +360,6 @@ async fn sync_subscribe_stop() -> Result<()> {
Ok(())
}

async fn wait_for_events(
mut events: impl Stream<Item = Result<LiveEvent>> + Send + Unpin + 'static,
expected_n: usize,
timeout_per_event: Duration,
me: String,
matcher: impl Fn(LiveEvent) -> bool,
) -> anyhow::Result<()> {
let mut i = 0;
while i < expected_n {
let event = tokio::time::timeout(timeout_per_event, events.next())
.await
.map_err(|_| {
anyhow!("timeout while waiting for next event after {i} (expected {expected_n})")
})?
.ok_or_else(|| anyhow!("end of event stream after {i} (expected {expected_n})"))??;
if matcher(event) {
i += 1;
debug!(%me, "recv event {i} of {expected_n}");
}
}
Ok(())
}

async fn assert_all_docs(
docs: &[Doc],
peer_ids: &[PublicKey],
expected: &Vec<ExpectedEntry>,
label: &str,
) {
info!("validate all peers: {label}");
for (i, doc) in docs.iter().enumerate() {
let entries = get_all(doc).await.unwrap_or_else(|err| {
panic!("failed to get entries for peer {:?}: {err:?}", peer_ids[i])
});
assert_eq!(
&entries,
expected,
"{label}: peer {i} {:?} failed (have {} but expected {})",
peer_ids[i],
entries.len(),
expected.len()
);
}
}

#[derive(Debug, Ord, Eq, PartialEq, PartialOrd, Clone)]
struct ExpectedEntry {
author: AuthorId,
Expand Down Expand Up @@ -430,24 +393,6 @@ impl PartialEq<ExpectedEntry> for (Entry, Bytes) {
}
}

async fn publish(
docs: &[Doc],
expected: &mut Vec<ExpectedEntry>,
n: usize,
cb: impl Fn(usize, usize) -> (AuthorId, String, String),
) -> anyhow::Result<()> {
for (i, doc) in docs.iter().enumerate() {
for j in 0..n {
let (author, key, value) = cb(i, j);
doc.set_bytes(author, key.as_bytes().to_vec(), value.as_bytes().to_vec())
.await?;
expected.push(ExpectedEntry { author, key, value });
}
}
expected.sort();
Ok(())
}

async fn assert_latest(doc: &Doc, key: &[u8], value: &[u8]) {
let content = get_latest(doc, key).await.unwrap();
assert_eq!(content, value.to_vec());
Expand Down Expand Up @@ -588,32 +533,3 @@ fn match_sync_finished(event: &LiveEvent, peer: PublicKey, namespace: NamespaceI
finished: e.finished,
}
}

/// Collect an iterator into futures by joining them all and failing if any future failed.
async fn collect_futures<T>(
futs: impl IntoIterator<Item = impl Future<Output = anyhow::Result<T>>>,
) -> anyhow::Result<Vec<T>> {
futures::future::join_all(futs)
.await
.into_iter()
.collect::<Result<Vec<_>>>()
}

/// Get all entries of a document.
async fn get_all(doc: &Doc) -> anyhow::Result<Vec<Entry>> {
let entries = doc.get_many(GetFilter::All).await?;
let entries = entries.collect::<Vec<_>>().await;
entries.into_iter().collect()
}

/// Get all entries of a document with the blob content.
async fn get_all_with_content(doc: &Doc) -> anyhow::Result<Vec<(Entry, Bytes)>> {
let entries = doc.get_many(GetFilter::All).await?;
let entries = entries.and_then(|entry| async {
let content = doc.read_to_bytes(&entry).await;
content.map(|c| (entry, c))
});
let entries = entries.collect::<Vec<_>>().await;
let entries = entries.into_iter().collect::<Result<Vec<_>>>()?;
Ok(entries)
}

0 comments on commit dd63dfd

Please sign in to comment.