Skip to content

Commit

Permalink
refactor: cleanup ProgressSliceWriter (#2000)
Browse files Browse the repository at this point in the history
## Description

The `ProgressSliceWriter` is currently duplicated code in `iroh_bytes`
and `iroh`. Also there's two versions of it, one that takes a fallible
and one that takes an infallible one.

This PR removes the code duplication and makes the naming
understandable.

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [ ] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
Frando authored Feb 6, 2024
1 parent 7844577 commit 7edd7ab
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 143 deletions.
50 changes: 4 additions & 46 deletions iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
use std::path::PathBuf;
use std::time::Duration;

use bytes::Bytes;
use futures::Future;
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
use futures::StreamExt;
use iroh_base::{hash::Hash, rpc::RpcError};
use serde::{Deserialize, Serialize};

use crate::protocol::RangeSpec;
use crate::util::progress::FallibleProgressSliceWriter;
use std::io;

use crate::hashseq::parse_hash_seq;
Expand Down Expand Up @@ -196,7 +196,7 @@ async fn get_blob_inner<D: BaoStore>(
})?;
Ok(())
};
let mut pw = ProgressSliceWriter2::new(df, on_write);
let mut pw = FallibleProgressSliceWriter::new(df, on_write);
// use the convenience method to write all to the two vfs objects
let end = at_content
.write_all_with_outboard(of.as_mut(), &mut pw)
Expand Down Expand Up @@ -263,7 +263,7 @@ async fn get_blob_inner_partial<D: BaoStore>(
})?;
Ok(())
};
let mut pw = ProgressSliceWriter2::new(df, on_write);
let mut pw = FallibleProgressSliceWriter::new(df, on_write);
// use the convenience method to write all to the two vfs objects
let at_end = at_content
.write_all_with_outboard(of.as_mut(), &mut pw)
Expand Down Expand Up @@ -599,45 +599,3 @@ pub enum DownloadProgress {
/// We are done with the whole operation.
AllDone,
}

/// A slice writer that adds a synchronous progress callback
#[derive(Debug)]
struct ProgressSliceWriter2<W, F>(W, F);

impl<W: AsyncSliceWriter, F: Fn(u64, usize) -> io::Result<()> + 'static>
ProgressSliceWriter2<W, F>
{
/// Create a new `ProgressSliceWriter` from an inner writer and a progress callback
pub fn new(inner: W, on_write: F) -> Self {
Self(inner, on_write)
}
}

impl<W: AsyncSliceWriter + 'static, F: Fn(u64, usize) -> io::Result<()> + 'static> AsyncSliceWriter
for ProgressSliceWriter2<W, F>
{
type WriteBytesAtFuture<'a> = LocalBoxFuture<'a, io::Result<()>>;
fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> {
// todo: get rid of the boxing
async move {
(self.1)(offset, data.len())?;
self.0.write_bytes_at(offset, data).await
}
.boxed_local()
}

type WriteAtFuture<'a> = W::WriteAtFuture<'a>;
fn write_at<'a>(&'a mut self, offset: u64, bytes: &'a [u8]) -> Self::WriteAtFuture<'a> {
self.0.write_at(offset, bytes)
}

type SyncFuture<'a> = W::SyncFuture<'a>;
fn sync(&mut self) -> Self::SyncFuture<'_> {
self.0.sync()
}

type SetLenFuture<'a> = W::SetLenFuture<'a>;
fn set_len(&mut self, size: u64) -> Self::SetLenFuture<'_> {
self.0.set_len(size)
}
}
114 changes: 112 additions & 2 deletions iroh-bytes/src/util/progress.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! Utilities for reporting progress.
//!
//! The main entry point is the [ProgressSender] trait.
use futures::{FutureExt, TryFutureExt};
use std::marker::PhantomData;
use bytes::Bytes;
use futures::{future::LocalBoxFuture, FutureExt, TryFutureExt};
use iroh_io::AsyncSliceWriter;
use std::{io, marker::PhantomData};

/// A general purpose progress sender. This should be usable for reporting progress
/// from both blocking and non-blocking contexts.
Expand Down Expand Up @@ -391,3 +393,111 @@ impl From<ProgressSendError> for std::io::Error {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)
}
}

/// A slice writer that adds a synchronous progress callback.
///
/// This wraps any `AsyncSliceWriter`, passes through all operations to the inner writer, and
/// calls the passed `on_write` callback whenever data is written.
#[derive(Debug)]
pub struct ProgressSliceWriter<W, F>(W, F);

impl<W: AsyncSliceWriter, F: FnMut(u64)> ProgressSliceWriter<W, F> {
/// Create a new `ProgressSliceWriter` from an inner writer and a progress callback
///
/// The `on_write` function is called for each write, with the `offset` as the first and the
/// length of the data as the second param.
pub fn new(inner: W, on_write: F) -> Self {
Self(inner, on_write)
}

/// Return the inner writer
pub fn into_inner(self) -> W {
self.0
}
}

impl<W: AsyncSliceWriter + 'static, F: FnMut(u64, usize) + 'static> AsyncSliceWriter
for ProgressSliceWriter<W, F>
{
type WriteBytesAtFuture<'a> = W::WriteBytesAtFuture<'a>;
fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> {
(self.1)(offset, data.len());
self.0.write_bytes_at(offset, data)
}

type WriteAtFuture<'a> = W::WriteAtFuture<'a>;
fn write_at<'a>(&'a mut self, offset: u64, data: &'a [u8]) -> Self::WriteAtFuture<'a> {
(self.1)(offset, data.len());
self.0.write_at(offset, data)
}

type SyncFuture<'a> = W::SyncFuture<'a>;
fn sync(&mut self) -> Self::SyncFuture<'_> {
self.0.sync()
}

type SetLenFuture<'a> = W::SetLenFuture<'a>;
fn set_len(&mut self, size: u64) -> Self::SetLenFuture<'_> {
self.0.set_len(size)
}
}

/// A slice writer that adds a fallible progress callback.
///
/// This wraps any `AsyncSliceWriter`, passes through all operations to the inner writer, and
/// calls the passed `on_write` callback whenever data is written. `on_write` must return an
/// `io::Result`, and can abort the download by returning an error.
#[derive(Debug)]
pub struct FallibleProgressSliceWriter<W, F>(W, F);

impl<W: AsyncSliceWriter, F: Fn(u64, usize) -> io::Result<()> + 'static>
FallibleProgressSliceWriter<W, F>
{
/// Create a new `ProgressSliceWriter` from an inner writer and a progress callback
///
/// The `on_write` function is called for each write, with the `offset` as the first and the
/// length of the data as the second param. `on_write` must return a future which resolves to
/// an `io::Result`. If `on_write` returns an error, the download is aborted.
pub fn new(inner: W, on_write: F) -> Self {
Self(inner, on_write)
}

/// Return the inner writer.
pub fn into_inner(self) -> W {
self.0
}
}

impl<W: AsyncSliceWriter + 'static, F: Fn(u64, usize) -> io::Result<()> + 'static> AsyncSliceWriter
for FallibleProgressSliceWriter<W, F>
{
type WriteBytesAtFuture<'a> = LocalBoxFuture<'a, io::Result<()>>;
fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> {
// todo: get rid of the boxing
async move {
(self.1)(offset, data.len())?;
self.0.write_bytes_at(offset, data).await
}
.boxed_local()
}

type WriteAtFuture<'a> = LocalBoxFuture<'a, io::Result<()>>;
fn write_at<'a>(&'a mut self, offset: u64, data: &'a [u8]) -> Self::WriteAtFuture<'a> {
// todo: get rid of the boxing
async move {
(self.1)(offset, data.len())?;
self.0.write_at(offset, data).await
}
.boxed_local()
}

type SyncFuture<'a> = W::SyncFuture<'a>;
fn sync(&mut self) -> Self::SyncFuture<'_> {
self.0.sync()
}

type SetLenFuture<'a> = W::SetLenFuture<'a>;
fn set_len(&mut self, size: u64) -> Self::SetLenFuture<'_> {
self.0.set_len(size)
}
}
6 changes: 3 additions & 3 deletions iroh/src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use iroh_bytes::{
hashseq::parse_hash_seq,
protocol::{GetRequest, RangeSpecSeq},
store::{MapEntry, PartialMapEntry, PossiblyPartialEntry, Store},
util::progress::FallibleProgressSliceWriter,
BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
};
#[cfg(feature = "metrics")]
Expand All @@ -24,7 +25,6 @@ use tracing::trace;

#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
use crate::util::progress::ProgressSliceWriter2;

use super::{DownloadKind, FailureAction, GetFut, Getter};

Expand Down Expand Up @@ -320,7 +320,7 @@ async fn get_blob_inner<D: Store>(
None
};
let on_write = move |_offset: u64, _length: usize| Ok(());
let mut pw = ProgressSliceWriter2::new(df, on_write);
let mut pw = FallibleProgressSliceWriter::new(df, on_write);
// use the convenience method to write all to the two vfs objects
let end = content
.write_all_with_outboard(of.as_mut(), &mut pw)
Expand Down Expand Up @@ -363,7 +363,7 @@ async fn get_blob_inner_partial<D: Store>(
None
};
let on_write = move |_offset: u64, _length: usize| Ok(());
let mut pw = ProgressSliceWriter2::new(df, on_write);
let mut pw = FallibleProgressSliceWriter::new(df, on_write);
// use the convenience method to write all to the two vfs objects
let end = content
.write_all_with_outboard(of.as_mut(), &mut pw)
Expand Down
92 changes: 0 additions & 92 deletions iroh/src/util/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Poll;

use bytes::Bytes;
use futures::future::LocalBoxFuture;
use futures::FutureExt;
use iroh_bytes::util::io::TrackingWriter;
use iroh_io::AsyncSliceWriter;
use portable_atomic::{AtomicU16, AtomicU64};
use tokio::io::{self, AsyncRead, AsyncWrite};
use tokio::sync::{broadcast, mpsc};
Expand Down Expand Up @@ -150,94 +146,6 @@ where
}
}

/// A slice writer that adds a synchronous progress callback
#[derive(Debug)]
pub struct ProgressSliceWriter<W, S>(W, S);

impl<W: AsyncSliceWriter, S: FnMut(u64)> ProgressSliceWriter<W, S> {
/// Create a new `ProgressSliceWriter` from an inner writer and a progress callback
pub fn new(inner: W, on_write: S) -> Self {
Self(inner, on_write)
}

/// Return the inner writer
pub fn into_inner(self) -> W {
self.0
}
}

impl<W: AsyncSliceWriter + 'static, S: FnMut(u64) + 'static> AsyncSliceWriter
for ProgressSliceWriter<W, S>
{
type WriteBytesAtFuture<'a> = W::WriteBytesAtFuture<'a>;
fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> {
(self.1)(offset);
self.0.write_bytes_at(offset, data)
}

type WriteAtFuture<'a> = W::WriteAtFuture<'a>;
fn write_at<'a>(&'a mut self, offset: u64, bytes: &'a [u8]) -> Self::WriteAtFuture<'a> {
self.0.write_at(offset, bytes)
}

type SyncFuture<'a> = W::SyncFuture<'a>;
fn sync(&mut self) -> Self::SyncFuture<'_> {
self.0.sync()
}

type SetLenFuture<'a> = W::SetLenFuture<'a>;
fn set_len(&mut self, size: u64) -> Self::SetLenFuture<'_> {
self.0.set_len(size)
}
}

/// A slice writer that adds a synchronous progress callback
#[derive(Debug)]
pub struct ProgressSliceWriter2<W, F>(W, F);

impl<W: AsyncSliceWriter, F: Fn(u64, usize) -> io::Result<()> + 'static>
ProgressSliceWriter2<W, F>
{
/// Create a new `ProgressSliceWriter` from an inner writer and a progress callback
pub fn new(inner: W, on_write: F) -> Self {
Self(inner, on_write)
}

/// Return the inner writer
pub fn into_inner(self) -> W {
self.0
}
}

impl<W: AsyncSliceWriter + 'static, F: Fn(u64, usize) -> io::Result<()> + 'static> AsyncSliceWriter
for ProgressSliceWriter2<W, F>
{
type WriteBytesAtFuture<'a> = LocalBoxFuture<'a, io::Result<()>>;
fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> {
// todo: get rid of the boxing
async move {
(self.1)(offset, data.len())?;
self.0.write_bytes_at(offset, data).await
}
.boxed_local()
}

type WriteAtFuture<'a> = W::WriteAtFuture<'a>;
fn write_at<'a>(&'a mut self, offset: u64, bytes: &'a [u8]) -> Self::WriteAtFuture<'a> {
self.0.write_at(offset, bytes)
}

type SyncFuture<'a> = W::SyncFuture<'a>;
fn sync(&mut self) -> Self::SyncFuture<'_> {
self.0.sync()
}

type SetLenFuture<'a> = W::SetLenFuture<'a>;
fn set_len(&mut self, size: u64) -> Self::SetLenFuture<'_> {
self.0.set_len(size)
}
}

/// A writer that tries to send the total number of bytes written after each write
///
/// It sends the total number instead of just an increment so the update is self-contained
Expand Down

0 comments on commit 7edd7ab

Please sign in to comment.