Skip to content

Commit

Permalink
Merge branch 'main' into sync-propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
divagant-martian committed Oct 6, 2023
2 parents 99359b8 + 2fd31b7 commit e5c0f3a
Show file tree
Hide file tree
Showing 23 changed files with 449 additions and 277 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ rust-version = "1.70"

[dependencies]
anyhow = { version = "1", features = ["backtrace"] }
bao-tree = { version = "0.8.0", features = ["tokio_fsm"], default-features = false }
bao-tree = { version = "0.9.1", features = ["tokio_fsm"], default-features = false }
bytes = { version = "1.4", features = ["serde"] }
chrono = "0.4.31"
data-encoding = "2.3.3"
Expand All @@ -22,7 +22,7 @@ flume = "0.10.14"
futures = "0.3.25"
genawaiter = { version = "0.99.1", features = ["futures03"] }
hex = "0.4.3"
iroh-io = { version = "0.2.2" }
iroh-io = { version = "0.3.0" }
multibase = "0.9.1"
num_cpus = "1.15.0"
once_cell = "1.17.0"
Expand Down
5 changes: 5 additions & 0 deletions iroh-bytes/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ pub mod fsm {
&self.ranges
}

/// Hash of the root blob
pub fn hash(&self) -> &Hash {
&self.hash
}

/// Go into the next state, reading the header
///
/// For the collection we already know the hash, since it was part of the request
Expand Down
124 changes: 97 additions & 27 deletions iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ use anyhow::{Context, Result};
use bao_tree::io::fsm::{encode_ranges_validated, Outboard};
use bytes::Bytes;
use futures::future::BoxFuture;
use iroh_io::stats::{
SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter,
};
use iroh_io::{AsyncStreamWriter, TokioStreamWriter};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWrite;
use tracing::{debug, debug_span, warn};
use tracing::{debug, debug_span, info, trace, warn};
use tracing_futures::Instrument;

use crate::baomap::*;
Expand Down Expand Up @@ -65,15 +68,15 @@ pub enum Event {
request_id: u64,
/// The number of blobs in the collection.
num_blobs: Option<u64>,
/// The total blob size of the data.
total_blobs_size: Option<u64>,
},
/// A collection request was completed and the data was sent to the client.
TransferCollectionCompleted {
/// An unique connection id.
connection_id: u64,
/// An identifier uniquely identifying this transfer request.
request_id: u64,
/// statistics about the transfer
stats: Box<TransferStats>,
},
/// A blob in a collection was transferred.
TransferBlobCompleted {
Expand All @@ -94,9 +97,23 @@ pub enum Event {
connection_id: u64,
/// An identifier uniquely identifying this request.
request_id: u64,
/// statistics about the transfer. This is None if the transfer
/// was aborted before any data was sent.
stats: Option<Box<TransferStats>>,
},
}

/// The stats for a transfer of a collection or blob.
#[derive(Debug, Clone, Copy, Default)]
pub struct TransferStats {
/// Stats for sending to the client.
pub send: StreamWriterStats,
/// Stats for reading from disk.
pub read: SliceReaderStats,
/// The total duration of the transfer.
pub duration: Duration,
}

/// Progress updates for the add operation.
#[derive(Debug, Serialize, Deserialize)]
pub enum AddProgress {
Expand Down Expand Up @@ -268,6 +285,7 @@ pub async fn transfer_collection<D: Map, E: EventSender, C: CollectionParser>(
mut outboard: D::Outboard,
mut data: D::DataReader,
collection_parser: C,
stats: &mut TransferStats,
) -> Result<SentStatus> {
let hash = request.hash;

Expand All @@ -282,7 +300,6 @@ pub async fn transfer_collection<D: Map, E: EventSender, C: CollectionParser>(
connection_id: writer.connection_id(),
request_id: writer.request_id(),
num_blobs: stats.num_blobs,
total_blobs_size: stats.total_blob_size,
})
.await;
Some(c)
Expand All @@ -292,16 +309,22 @@ pub async fn transfer_collection<D: Map, E: EventSender, C: CollectionParser>(

let mut prev = 0;
for (offset, ranges) in request.ranges.iter_non_empty() {
// create a tracking writer so we can get some stats for writing
let mut tw = writer.tracking_writer();
if offset == 0 {
debug!("writing ranges '{:?}' of collection {}", ranges, hash);
// wrap the data reader in a tracking reader so we can get some stats for reading
let mut tracking_reader = TrackingSliceReader::new(&mut data);
// send the root
encode_ranges_validated(
&mut data,
&mut tracking_reader,
&mut outboard,
&ranges.to_chunk_ranges(),
&mut writer.inner,
&mut tw,
)
.await?;
stats.read += tracking_reader.stats();
stats.send += tw.stats();
debug!(
"finished writing ranges '{:?}' of collection {}",
ranges, hash
Expand All @@ -315,7 +338,9 @@ pub async fn transfer_collection<D: Map, E: EventSender, C: CollectionParser>(
}
if let Some(hash) = c.next().await? {
tokio::task::yield_now().await;
let (status, size) = send_blob(db, hash, ranges, &mut writer.inner).await?;
let (status, size, blob_read_stats) = send_blob(db, hash, ranges, &mut tw).await?;
stats.send += tw.stats();
stats.read += blob_read_stats;
if SentStatus::NotFound == status {
writer.inner.finish().await?;
return Ok(status);
Expand All @@ -340,7 +365,6 @@ pub async fn transfer_collection<D: Map, E: EventSender, C: CollectionParser>(
}

debug!("done writing");
writer.inner.finish().await?;
Ok(SentStatus::Sent)
}

Expand Down Expand Up @@ -422,7 +446,7 @@ async fn handle_stream<D: Map, E: EventSender, C: CollectionParser>(
let request = match read_request(reader).await {
Ok(r) => r,
Err(e) => {
writer.notify_transfer_aborted().await;
writer.notify_transfer_aborted(None).await;
return Err(e);
}
};
Expand All @@ -433,7 +457,7 @@ async fn handle_stream<D: Map, E: EventSender, C: CollectionParser>(
.authorize(request.token().cloned(), &request)
.await
{
writer.notify_transfer_aborted().await;
writer.notify_transfer_aborted(None).await;
return Err(e);
}

Expand Down Expand Up @@ -494,25 +518,29 @@ pub async fn handle_get<D: Map, E: EventSender, C: CollectionParser>(
match db.get(&hash) {
// Collection or blob request
Some(entry) => {
let mut stats = Box::<TransferStats>::default();
let t0 = std::time::Instant::now();
// 5. Transfer data!
match transfer_collection(
let res = transfer_collection(
request,
&db,
&mut writer,
entry.outboard().await?,
entry.data_reader().await?,
collection_parser,
&mut stats,
)
.await
{
.await;
stats.duration = t0.elapsed();
match res {
Ok(SentStatus::Sent) => {
writer.notify_transfer_completed().await;
writer.notify_transfer_completed(&hash, stats).await;
}
Ok(SentStatus::NotFound) => {
writer.notify_transfer_aborted().await;
writer.notify_transfer_aborted(Some(stats)).await;
}
Err(e) => {
writer.notify_transfer_aborted().await;
writer.notify_transfer_aborted(Some(stats)).await;
return Err(e);
}
}
Expand All @@ -521,7 +549,7 @@ pub async fn handle_get<D: Map, E: EventSender, C: CollectionParser>(
}
None => {
debug!("not found {}", hash);
writer.notify_transfer_aborted().await;
writer.notify_transfer_aborted(None).await;
writer.inner.finish().await?;
}
};
Expand All @@ -538,6 +566,12 @@ pub struct ResponseWriter<E> {
}

impl<E: EventSender> ResponseWriter<E> {
fn tracking_writer(
&mut self,
) -> TrackingStreamWriter<TokioStreamWriter<&mut quinn::SendStream>> {
TrackingStreamWriter::new(TokioStreamWriter(&mut self.inner))
}

fn connection_id(&self) -> u64 {
self.connection_id
}
Expand All @@ -546,20 +580,56 @@ impl<E: EventSender> ResponseWriter<E> {
self.inner.id().index()
}

async fn notify_transfer_completed(&self) {
fn print_stats(stats: &TransferStats) {
let send = stats.send.total();
let read = stats.read.total();
let total_sent_bytes = send.size;
let send_duration = send.stats.duration;
let read_duration = read.stats.duration;
let total_duration = stats.duration;
let other_duration = total_duration
.saturating_sub(send_duration)
.saturating_sub(read_duration);
let avg_send_size = total_sent_bytes / send.stats.count;
info!(
"sent {} bytes in {}s",
total_sent_bytes,
total_duration.as_secs_f64()
);
debug!(
"{}s sending, {}s reading, {}s other",
send_duration.as_secs_f64(),
read_duration.as_secs_f64(),
other_duration.as_secs_f64()
);
trace!(
"send_count: {} avg_send_size {}",
send.stats.count,
avg_send_size,
)
}

async fn notify_transfer_completed(&self, hash: &Hash, stats: Box<TransferStats>) {
info!("trasnfer completed for {}", hash);
Self::print_stats(&stats);
self.events
.send(Event::TransferCollectionCompleted {
connection_id: self.connection_id(),
request_id: self.request_id(),
stats,
})
.await;
}

async fn notify_transfer_aborted(&self) {
async fn notify_transfer_aborted(&self, stats: Option<Box<TransferStats>>) {
if let Some(stats) = &stats {
Self::print_stats(stats);
};
self.events
.send(Event::TransferAborted {
connection_id: self.connection_id(),
request_id: self.request_id(),
stats,
})
.await;
}
Expand All @@ -575,18 +645,18 @@ pub enum SentStatus {
}

/// Send a
pub async fn send_blob<D: Map, W: AsyncWrite + Unpin + Send + 'static>(
pub async fn send_blob<D: Map, W: AsyncStreamWriter>(
db: &D,
name: Hash,
ranges: &RangeSpec,
writer: &mut W,
) -> Result<(SentStatus, u64)> {
writer: W,
) -> Result<(SentStatus, u64, SliceReaderStats)> {
match db.get(&name) {
Some(entry) => {
let outboard = entry.outboard().await?;
let size = outboard.tree().size().0;
let mut file_reader = entry.data_reader().await?;
let res = bao_tree::io::fsm::encode_ranges_validated(
let mut file_reader = TrackingSliceReader::new(entry.data_reader().await?);
let res = encode_ranges_validated(
&mut file_reader,
outboard,
&ranges.to_chunk_ranges(),
Expand All @@ -596,11 +666,11 @@ pub async fn send_blob<D: Map, W: AsyncWrite + Unpin + Send + 'static>(
debug!("done sending blob {} {:?}", name, res);
res?;

Ok((SentStatus::Sent, size))
Ok((SentStatus::Sent, size, file_reader.stats()))
}
_ => {
debug!("blob not found {}", hex::encode(name));
Ok((SentStatus::NotFound, 0))
Ok((SentStatus::NotFound, 0, SliceReaderStats::default()))
}
}
}
2 changes: 1 addition & 1 deletion iroh-gossip/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ impl Default for Metrics {

impl Metric for Metrics {
fn name() -> &'static str {
"Iroh Gossip"
"gossip"
}
}
2 changes: 1 addition & 1 deletion iroh-net/src/derp/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ impl Default for Metrics {

impl Metric for Metrics {
fn name() -> &'static str {
"Derpserver"
"derpserver"
}
}
2 changes: 1 addition & 1 deletion iroh-net/src/magicsock/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,6 @@ impl Default for Metrics {

impl Metric for Metrics {
fn name() -> &'static str {
"Magicsock"
"magicsock"
}
}
2 changes: 1 addition & 1 deletion iroh-net/src/netcheck/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ impl Default for Metrics {

impl Metric for Metrics {
fn name() -> &'static str {
"Netcheck"
"netcheck"
}
}
2 changes: 1 addition & 1 deletion iroh-net/src/portmapper/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@ impl Default for Metrics {

impl Metric for Metrics {
fn name() -> &'static str {
"Portmap"
"portmap"
}
}
2 changes: 1 addition & 1 deletion iroh-sync/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ impl Default for Metrics {

impl Metric for Metrics {
fn name() -> &'static str {
"iroh-sync"
"iroh_sync"
}
}
Loading

0 comments on commit e5c0f3a

Please sign in to comment.