Skip to content

Commit

Permalink
Feat: Request Tokens (#1109)
Browse files Browse the repository at this point in the history
Plumb opaque request authorization tokens into the protocol. Max token size is set to match the max size of an HTTP cookie (4096 bytes). 
Using Request tokens requires defining a custom `AuthorizationHandler`, which is set to reject any token by default

---------

Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com>
Co-authored-by: dignifiedquire <me@dignifiedquire.com>
  • Loading branch information
3 people authored Jun 27, 2023
1 parent 488a6a4 commit dbd7bfb
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ jobs:
run: cross check --all --target ${{ matrix.target }}

- name: test
run: cross test --all --target ${{ matrix.target }} -- --test-threads=8
run: cross test --all --target ${{ matrix.target }} -- --test-threads=4

check_fmt_and_docs:
name: Checking fmt and docs
Expand Down
86 changes: 84 additions & 2 deletions iroh-bytes/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Protocol for communication between provider and client.
use std::fmt::{self, Display};
use std::io;
use std::str::FromStr;

use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
Expand Down Expand Up @@ -31,13 +33,77 @@ impl Handshake {
}
}

/// Maximum size of a request token, matches a browser cookie max size:
/// https://datatracker.ietf.org/doc/html/rfc2109#section-6.3
const MAX_REQUEST_TOKEN_SIZE: usize = 4096;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, From)]
/// A Request token is an opaque byte sequence associated with a single request.
/// Applications can use request tokens to implement request authorization,
/// user association, etc.
pub struct RequestToken {
bytes: Bytes,
}

impl RequestToken {
pub fn new(bytes: impl Into<Bytes>) -> Result<Self> {
let bytes: Bytes = bytes.into();
ensure!(
bytes.len() < MAX_REQUEST_TOKEN_SIZE,
"request token is too large"
);
Ok(Self { bytes })
}

/// Returns a reference the token bytes.
pub fn as_bytes(&self) -> &Bytes {
&self.bytes
}
}

impl FromStr for RequestToken {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let bytes = data_encoding::BASE32_NOPAD.decode(s.to_ascii_uppercase().as_bytes())?;
RequestToken::new(bytes)
}
}

/// Serializes to base32.
impl Display for RequestToken {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut text = data_encoding::BASE32_NOPAD.encode(&self.bytes);
text.make_ascii_lowercase();
write!(f, "{text}")
}
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone, From)]
/// A request to the provider
pub enum Request {
/// A get request for a blob or collection
Get(GetRequest),
/// A get request that allows the receiver to create a collection
CustomGet(Bytes),
CustomGet(CustomGetRequest),
}

impl Request {
pub fn token(&self) -> Option<&RequestToken> {
match self {
Request::Get(get) => get.token(),
Request::CustomGet(get) => get.token.as_ref(),
}
}
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)]
/// A get request that allows the receiver to create a collection
/// Custom request handlers will receive this struct destructured into
/// handler arguments
pub struct CustomGetRequest {
pub token: Option<RequestToken>,
pub data: Bytes,
}

/// Currently all requests are get requests. But that won't always be the case.
Expand All @@ -54,18 +120,25 @@ pub struct GetRequest {
///
/// The first element is the parent, all subsequent elements are children.
pub ranges: RangeSpecSeq,
/// Optional Request token
token: Option<RequestToken>,
}

impl GetRequest {
/// Request a blob or collection with specified ranges
pub fn new(hash: Hash, ranges: RangeSpecSeq) -> Self {
Self { hash, ranges }
Self {
hash,
ranges,
token: None,
}
}

/// Request a collection and all its children
pub fn all(hash: Hash) -> Self {
Self {
hash,
token: None,
ranges: RangeSpecSeq::all(),
}
}
Expand All @@ -74,9 +147,18 @@ impl GetRequest {
pub fn single(hash: Hash) -> Self {
Self {
hash,
token: None,
ranges: RangeSpecSeq::new([RangeSet2::all()]),
}
}

pub fn with_token(self, token: Option<RequestToken>) -> Self {
Self { token, ..self }
}

pub fn token(&self) -> Option<&RequestToken> {
self.token.as_ref()
}
}

/// Write the given data to the provider sink, with a unsigned varint length prefix.
Expand Down
78 changes: 72 additions & 6 deletions iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use tracing_futures::Instrument;
use walkdir::WalkDir;

use crate::blobs::Collection;
use crate::protocol::{read_lp, write_lp, GetRequest, Handshake, RangeSpec, Request, VERSION};
use crate::protocol::{
read_lp, write_lp, CustomGetRequest, GetRequest, Handshake, RangeSpec, Request, RequestToken,
VERSION,
};
use crate::tokio_util::read_as_bytes;
use crate::util::{canonicalize_path, Hash, Progress, RpcError};
use crate::IROH_BLOCK_SIZE;
Expand Down Expand Up @@ -50,6 +53,8 @@ pub enum Event {
connection_id: u64,
/// An identifier uniquely identifying this transfer request.
request_id: u64,
/// Token requester gve for this request, if any
token: Option<RequestToken>,
/// The hash for which the client wants to receive data.
hash: Hash,
},
Expand All @@ -59,6 +64,8 @@ pub enum Event {
connection_id: u64,
/// An identifier uniquely identifying this transfer request.
request_id: u64,
/// Token requester gve for this request, if any
token: Option<RequestToken>,
/// The size of the custom get request.
len: usize,
},
Expand Down Expand Up @@ -139,12 +146,47 @@ pub enum ProvideProgress {
Abort(RpcError),
}

/// hook into the request handling to process authorization by examining
/// the request and any given token. Any error returned will abort the request,
/// and the error will be sent to the requester.
pub trait RequestAuthorizationHandler: Send + Sync + Clone + 'static {
/// Handle the authorization request, given an opaque data blob from the requester.
fn authorize(
&self,
db: Database,
token: Option<RequestToken>,
request: &Request,
) -> BoxFuture<'static, anyhow::Result<()>>;
}

/// Define RequestAuthorizationHandler for () so we can use it as a no-op default.
impl RequestAuthorizationHandler for () {
fn authorize(
&self,
_db: Database,
token: Option<RequestToken>,
_request: &Request,
) -> BoxFuture<'static, anyhow::Result<()>> {
async move {
if let Some(token) = token {
anyhow::bail!(
"no authorization handler defined, but token was provided: {:?}",
token
);
}
Ok(())
}
.boxed()
}
}

/// A custom get request handler that allows the user to make up a get request
/// on the fly.
pub trait CustomGetHandler: Send + Sync + Clone + 'static {
/// Handle the custom request, given an opaque data blob from the requester.
fn handle(
&self,
token: Option<RequestToken>,
request: Bytes,
db: Database,
) -> BoxFuture<'static, anyhow::Result<GetRequest>>;
Expand All @@ -154,6 +196,7 @@ pub trait CustomGetHandler: Send + Sync + Clone + 'static {
impl CustomGetHandler for () {
fn handle(
&self,
_token: Option<RequestToken>,
_request: Bytes,
_db: Database,
) -> BoxFuture<'static, anyhow::Result<GetRequest>> {
Expand Down Expand Up @@ -395,11 +438,16 @@ pub trait EventSender: Clone + Send + 'static {
fn send(&self, event: Event) -> Option<Event>;
}

pub async fn handle_connection<C: CustomGetHandler, E: EventSender>(
pub async fn handle_connection<
C: CustomGetHandler,
E: EventSender,
A: RequestAuthorizationHandler,
>(
connecting: quinn::Connecting,
db: Database,
events: E,
custom_get_handler: C,
authorization_handler: A,
rt: crate::runtime::Handle,
) {
// let _x = NonSend::default();
Expand Down Expand Up @@ -427,9 +475,18 @@ pub async fn handle_connection<C: CustomGetHandler, E: EventSender>(
events.send(Event::ClientConnected { connection_id });
let db = db.clone();
let custom_get_handler = custom_get_handler.clone();
let authorization_handler = authorization_handler.clone();
rt.local_pool().spawn_pinned(|| {
async move {
if let Err(err) = handle_stream(db, reader, writer, custom_get_handler).await {
if let Err(err) = handle_stream(
db,
reader,
writer,
custom_get_handler,
authorization_handler,
)
.await
{
warn!("error: {err:#?}",);
}
}
Expand All @@ -446,6 +503,7 @@ async fn handle_stream<E: EventSender>(
mut reader: quinn::RecvStream,
writer: ResponseWriter<E>,
custom_get_handler: impl CustomGetHandler,
authorization_handler: impl RequestAuthorizationHandler,
) -> Result<()> {
let mut in_buffer = BytesMut::with_capacity(1024);

Expand All @@ -466,6 +524,10 @@ async fn handle_stream<E: EventSender>(
}
};

authorization_handler
.authorize(db.clone(), request.token().cloned(), &request)
.await?;

match request {
Request::Get(request) => handle_get(db, request, writer).await,
Request::CustomGet(request) => {
Expand All @@ -475,17 +537,20 @@ async fn handle_stream<E: EventSender>(
}
async fn handle_custom_get<E: EventSender>(
db: Database,
request: Bytes,
request: CustomGetRequest,
mut writer: ResponseWriter<E>,
custom_get_handler: impl CustomGetHandler,
) -> Result<()> {
let _ = writer.events.send(Event::CustomGetRequestReceived {
len: request.len(),
len: request.data.len(),
connection_id: writer.connection_id(),
request_id: writer.request_id(),
token: request.token.clone(),
});
// try to make a GetRequest from the custom bytes
let request = custom_get_handler.handle(request, db.clone()).await?;
let request = custom_get_handler
.handle(request.token, request.data, db.clone())
.await?;
// write it to the requester as the first thing
let data = postcard::to_stdvec(&request)?;
write_lp(&mut writer.inner, &data).await?;
Expand All @@ -504,6 +569,7 @@ pub async fn handle_get<E: EventSender>(
hash,
connection_id: writer.connection_id(),
request_id: writer.request_id(),
token: request.token().cloned(),
});

// 4. Attempt to find hash
Expand Down
Loading

0 comments on commit dbd7bfb

Please sign in to comment.