Skip to content

Commit

Permalink
refactor: make AsyncSliceDecoder pub(crate)
Browse files Browse the repository at this point in the history
The public interface is now BaoDecoder. A benefit of this is that we
- hide the exact way we decode bao (bao crate or our own thing)
- hide the s2n_quic::stream::ReceiveStream
  • Loading branch information
rklaehn committed Feb 3, 2023
1 parent 9095718 commit 6bc00b3
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/bao_slice_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ impl<R: Read> Read for SliceDecoder<R> {
}

#[derive(Debug)]
pub struct AsyncSliceDecoder<R: tokio::io::AsyncRead + Unpin> {
pub(crate) struct AsyncSliceDecoder<R: tokio::io::AsyncRead + Unpin> {
inner: SliceValidator<R>,
current_item: Option<StreamItem>,
}
Expand Down
45 changes: 36 additions & 9 deletions src/get.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Debug;
use std::io;
use std::net::SocketAddr;
use std::time::{Duration, Instant};

Expand All @@ -9,7 +10,7 @@ use postcard::experimental::max_size::MaxSize;
use s2n_quic::stream::ReceiveStream;
use s2n_quic::Connection;
use s2n_quic::{client::Connect, Client};
use tokio::io::AsyncRead;
use tokio::io::{AsyncRead, ReadBuf};
use tracing::debug;

use crate::bao_slice_decoder::AsyncSliceDecoder;
Expand Down Expand Up @@ -66,6 +67,34 @@ pub struct Stats {
pub mbits: f64,
}

#[repr(transparent)]
#[derive(Debug)]
pub struct BaoDecoder(AsyncSliceDecoder<ReceiveStream>);

impl BaoDecoder {
fn new(inner: ReceiveStream, hash: bao::Hash) -> Self {
BaoDecoder(AsyncSliceDecoder::new(inner, hash, 0, u64::MAX))
}

async fn read_size(&mut self) -> io::Result<u64> {
self.0.read_size().await
}

fn into_inner(self) -> ReceiveStream {
self.0.into_inner()
}
}

impl AsyncRead for BaoDecoder {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf,
) -> std::task::Poll<std::io::Result<()>> {
std::pin::Pin::new(&mut self.0).poll_read(cx, buf)
}
}

pub async fn run<A, B, C, FutA, FutB, FutC>(
hash: bao::Hash,
token: AuthToken,
Expand All @@ -79,8 +108,8 @@ where
FutA: Future<Output = Result<()>>,
B: FnMut(Collection) -> FutB,
FutB: Future<Output = Result<()>>,
C: FnMut(bao::Hash, AsyncSliceDecoder<ReceiveStream>, Option<String>) -> FutC,
FutC: Future<Output = Result<AsyncSliceDecoder<ReceiveStream>>>,
C: FnMut(bao::Hash, BaoDecoder, Option<String>) -> FutC,
FutC: Future<Output = Result<BaoDecoder>>,
{
let now = Instant::now();
let (_client, mut connection) = setup(opts).await?;
Expand Down Expand Up @@ -208,13 +237,11 @@ where
///
/// Returns an `AsyncReader`
/// The `AsyncReader` can be used to read the content.
async fn handle_blob_response<
R: AsyncRead + futures::io::AsyncRead + Send + Sync + Unpin + 'static,
>(
async fn handle_blob_response(
hash: bao::Hash,
mut reader: R,
mut reader: ReceiveStream,
buffer: &mut BytesMut,
) -> Result<AsyncSliceDecoder<R>> {
) -> Result<BaoDecoder> {
match read_lp_data(&mut reader, buffer).await? {
Some(response_buffer) => {
let response: Response = postcard::from_bytes(&response_buffer)?;
Expand All @@ -231,7 +258,7 @@ async fn handle_blob_response<
// next blob in collection will be sent over
Res::Found => {
assert!(buffer.is_empty());
let decoder = AsyncSliceDecoder::new(reader, hash, 0, u64::MAX);
let decoder = BaoDecoder::new(reader, hash);
Ok(decoder)
}
}
Expand Down

0 comments on commit 6bc00b3

Please sign in to comment.