Skip to content

Commit

Permalink
feat: Remove Request.id from the protocol (#782)
Browse files Browse the repository at this point in the history
Since we now only issue a single request per stream this field was not
used.  Using it in the provider to identify requests would introduce
complexity and overhead for the bookkeeping of request IDs.  Instead
remove the field from the protocol.

The provider still has a use-case for identifying which request events
relate to.  It now uses the QUIC stream ID for this, but does not
promise so on an API-level.  This also simplifies the events since
this new request_id is always known, while previously it was only
known after having decoded the request message.
  • Loading branch information
flub authored Feb 22, 2023
1 parent 42a6235 commit fd37cab
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ where
// 2. Send Request
{
debug!("sending request");
let req = Request { id: 1, name: hash };
let req = Request { name: hash };

let used = postcard::to_slice(&req, &mut out_buffer)?;
write_lp(&mut writer, used).await?;
Expand Down
2 changes: 0 additions & 2 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ impl Handshake {

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone, MaxSize)]
pub(crate) struct Request {
pub id: u64,
/// blake3 hash
pub name: Hash,
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)]
pub(crate) struct Response {
pub id: u64,
pub data: Res,
}

Expand Down
55 changes: 24 additions & 31 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ pub enum Event {
RequestReceived {
/// An unique connection id.
connection_id: u64,
/// The request id.
/// An identifier uniquely identifying this transfer request.
request_id: u64,
/// The hash for which the client wants to receive data.
hash: Hash,
Expand All @@ -237,16 +237,15 @@ pub enum Event {
TransferCompleted {
/// An unique connection id.
connection_id: u64,
/// The request id.
/// An identifier uniquely identifying this transfer request.
request_id: u64,
},
/// A request was aborted because the client disconnected.
TransferAborted {
/// The quic connection id.
connection_id: u64,
/// The request id. When `None`, the transfer was aborted before or during reading and decoding
/// the transfer request.
request_id: Option<u64>,
/// An identifier uniquely identifying this request.
request_id: u64,
},
}

Expand Down Expand Up @@ -414,8 +413,6 @@ async fn transfer_collection(
mut writer: quinn::SendStream,
// Buffer used when writing to writer.
buffer: &mut BytesMut,
// The id of the transfer request.
request_id: u64,
// The bao outboard encoded data.
outboard: &Bytes,
// The actual blob data.
Expand All @@ -441,7 +438,6 @@ async fn transfer_collection(
write_response(
&mut writer,
buffer,
request_id,
Res::FoundCollection {
total_blobs_size: c.total_blobs_size,
},
Expand All @@ -452,11 +448,10 @@ async fn transfer_collection(
writer.write_buf(&mut data).await?;
for (i, blob) in c.blobs.iter().enumerate() {
debug!("writing blob {}/{}", i, c.blobs.len());
let (status, writer1) =
send_blob(db.clone(), blob.hash, writer, buffer, request_id).await?;
let (status, writer1) = send_blob(db.clone(), blob.hash, writer, buffer).await?;
writer = writer1;
if SentStatus::NotFound == status {
write_response(&mut writer, buffer, request_id, Res::NotFound).await?;
write_response(&mut writer, buffer, Res::NotFound).await?;
writer.finish().await?;
return Ok(status);
}
Expand All @@ -466,11 +461,7 @@ async fn transfer_collection(
Ok(SentStatus::Sent)
}

fn notify_transfer_aborted(
events: broadcast::Sender<Event>,
connection_id: u64,
request_id: Option<u64>,
) {
fn notify_transfer_aborted(events: broadcast::Sender<Event>, connection_id: u64, request_id: u64) {
let _ = events.send(Event::TransferAborted {
connection_id,
request_id,
Expand All @@ -487,10 +478,14 @@ async fn handle_stream(
let mut out_buffer = BytesMut::with_capacity(1024);
let mut in_buffer = BytesMut::with_capacity(1024);

// The stream ID index is used to identify this request. Requests only arrive in
// bi-directional RecvStreams initiated by the client, so this uniquely identifies them.
let request_id = reader.id().index();

// 1. Read Handshake
debug!("reading handshake");
if let Err(e) = read_handshake(&mut reader, &mut in_buffer, token).await {
notify_transfer_aborted(events, connection_id, None);
notify_transfer_aborted(events, connection_id, request_id);
return Err(e);
}

Expand All @@ -499,17 +494,17 @@ async fn handle_stream(
let request = match read_request(reader, &mut in_buffer).await {
Ok(r) => r,
Err(e) => {
notify_transfer_aborted(events, connection_id, None);
notify_transfer_aborted(events, connection_id, request_id);
return Err(e);
}
};

let hash = request.name;
debug!("got request({})", request.id);
debug!("got request for ({hash})");
let _ = events.send(Event::RequestReceived {
connection_id,
request_id: request.id,
hash,
request_id,
});

// 4. Attempt to find hash
Expand All @@ -518,27 +513,27 @@ async fn handle_stream(
Some(BlobOrCollection::Collection(d)) => d,
_ => {
debug!("not found {}", hash);
notify_transfer_aborted(events, connection_id, Some(request.id));
write_response(&mut writer, &mut out_buffer, request.id, Res::NotFound).await?;
notify_transfer_aborted(events, connection_id, request_id);
write_response(&mut writer, &mut out_buffer, Res::NotFound).await?;
writer.finish().await?;

return Ok(());
}
};

// 5. Transfer data!
match transfer_collection(&db, writer, &mut out_buffer, request.id, outboard, data).await {
match transfer_collection(&db, writer, &mut out_buffer, outboard, data).await {
Ok(SentStatus::Sent) => {
let _ = events.send(Event::TransferCompleted {
connection_id,
request_id: request.id,
request_id,
});
}
Ok(SentStatus::NotFound) => {
notify_transfer_aborted(events, connection_id, Some(request.id));
notify_transfer_aborted(events, connection_id, request_id);
}
Err(e) => {
notify_transfer_aborted(events, connection_id, Some(request.id));
notify_transfer_aborted(events, connection_id, request_id);
return Err(e);
}
}
Expand All @@ -558,15 +553,14 @@ async fn send_blob<W: AsyncWrite + Unpin + Send + 'static>(
name: Hash,
mut writer: W,
buffer: &mut BytesMut,
id: u64,
) -> Result<(SentStatus, W)> {
match db.get(&name) {
Some(BlobOrCollection::Blob(Data {
outboard,
path,
size,
})) => {
write_response(&mut writer, buffer, id, Res::Found).await?;
write_response(&mut writer, buffer, Res::Found).await?;
let path = path.clone();
let outboard = outboard.clone();
let size = *size;
Expand All @@ -590,7 +584,7 @@ async fn send_blob<W: AsyncWrite + Unpin + Send + 'static>(
Ok((SentStatus::Sent, writer))
}
_ => {
write_response(&mut writer, buffer, id, Res::NotFound).await?;
write_response(&mut writer, buffer, Res::NotFound).await?;
Ok((SentStatus::NotFound, writer))
}
}
Expand Down Expand Up @@ -766,10 +760,9 @@ pub async fn create_collection(data_sources: Vec<DataSource>) -> Result<(Databas
async fn write_response<W: AsyncWrite + Unpin>(
mut writer: W,
buffer: &mut BytesMut,
id: u64,
res: Res,
) -> Result<()> {
let response = Response { id, data: res };
let response = Response { data: res };

// TODO: do not transfer blob data as part of the responses
if buffer.len() < 1024 {
Expand Down

0 comments on commit fd37cab

Please sign in to comment.