Skip to content

Commit

Permalink
impl derp::Client (#808)
Browse files Browse the repository at this point in the history
implementation of the `derp::Client`. Does not include any mutex locks, which may have to be added back in the future. The "correct" way to implement `mesh_key` is still up in the air.
  • Loading branch information
ramfox authored Mar 9, 2023
1 parent 160b760 commit 9d0f025
Show file tree
Hide file tree
Showing 10 changed files with 924 additions and 193 deletions.
376 changes: 230 additions & 146 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ ed25519-dalek = { version = "=2.0.0-pre.0", features = ["serde", "rand_core"] }
flume = "0.10.14"
futures = "0.3.25"
hex = "0.4.0"
governor = "0.5.1"
hostname = "0.3.1"
indicatif = { version = "0.17", features = ["tokio"], optional = true }
libc = "0.2.139"
Expand Down
84 changes: 49 additions & 35 deletions src/hp/derp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
/// including its on-wire framing overhead)
const MAX_PACKET_SIZE: usize = 64 * 1024;

const MAX_FRAME_SIZE: usize = 10 * 1024 * 1024;
const MAX_FRAME_SIZE: usize = 1024 * 1024;

/// The DERP magic number, sent in the FRAME_SERVER_KEY frame
/// upon initial connection
///
/// 8 bytes: 0x44 45 52 50 f0 9f 94 91
const MAGIC: &str = "DERP🔑";

const NONCE_LEN: u8 = 24;
const FRAME_HEADER_LEN: u8 = 1 + 4; // FrameType byte + 4 byte length
const KEY_LEN: u8 = 32;
const NONCE_LEN: usize = 24;
const FRAME_HEADER_LEN: usize = 1 + 4; // FrameType byte + 4 byte length
const KEY_LEN: usize = 32;
const MAX_INFO_LEN: usize = 1024 * 1024;
const KEEP_ALIVE: Duration = Duration::from_secs(60);

/// ProtocolVersion is bumped whenever there's a wire-incompatiable change.
/// - version 1 (zero on wire): consistent box headers, in use by employee dev nodes a bit
/// - version 2: received packets have src addrs in FRAME_RECV_PACKET at beginning
const PROTOCOL_VERSION: u8 = 2;
const PROTOCOL_VERSION: usize = 2;

/// The one byte frame type at the beginning of the frame
/// header. The second field is a big-endian u32 describing the
Expand Down Expand Up @@ -134,52 +134,39 @@ const FRAME_HEALTH: FrameType = 0x14;
/// more details on how the client should interpret them.
const FRAME_RESTARTING: FrameType = 0x15;

async fn read_frame_type_header(
reader: impl AsyncRead + Unpin,
want_type: FrameType,
) -> Result<u32> {
let (got_type, frame_len) = read_frame_header(reader).await?;
if want_type != got_type {
bail!("bad frame type {got_type:#04x}, want {want_type:#04x}");
}
Ok(frame_len)
}

async fn read_frame_header(mut reader: impl AsyncRead + Unpin) -> Result<(FrameType, u32)> {
async fn read_frame_header(mut reader: impl AsyncRead + Unpin) -> Result<(FrameType, usize)> {
let frame_type = reader.read_u8().await?;
let frame_len = reader.read_u32().await?;
Ok((frame_type, frame_len))
Ok((frame_type, frame_len.try_into()?))
}

/// AsyncReads a frame header and then reads its payload into `bytes` of
/// `frame_len`.
/// AsyncReads a frame header and then reads a `frame_len` of bytes into `buf`.
/// It resizes the `buf` to the expected `frame_len`.
///
/// If the frame header length is greater than `max_size`, `read_frame` returns
/// an error after reading the frame header.
///
/// If the frame is less than `max_size` but greater than the `bytes.len()`,
/// `bytes.len()` bytes are read, and there is no error. The `frame_type` and `frame_len`
/// are returned as a tuple `(FrameType, u32)`. If the number of bytes read are less than
/// `frame_len`, we DO NOT ERROR.
/// Also errors if we receive EOF before the end of the expected length of the frame.
async fn read_frame(
mut reader: impl AsyncRead + Unpin,
max_size: u32,
mut bytes: BytesMut,
) -> Result<(FrameType, u32)> {
max_size: usize,
mut buf: &mut BytesMut,
) -> Result<(FrameType, usize)> {
let (frame_type, frame_len) = read_frame_header(&mut reader).await?;
if frame_len > max_size {
bail!("frame header size {frame_len} exceeds reader limit of {max_size}");
}

reader.read_exact(&mut bytes).await?;
buf.resize(frame_len, 0u8);
reader.read_exact(&mut buf).await?;
Ok((frame_type, frame_len))
}

async fn write_frame_header(
mut writer: impl AsyncWrite + Unpin,
frame_type: FrameType,
frame_len: u32,
frame_len: usize,
) -> Result<()> {
let frame_len = u32::try_from(frame_len)?;
writer.write_u8(frame_type).await?;
writer.write_u32(frame_len).await?;
Ok(())
Expand All @@ -189,14 +176,41 @@ async fn write_frame_header(
async fn write_frame(
mut writer: impl AsyncWrite + Unpin,
frame_type: FrameType,
bytes: BytesMut,
bytes: Vec<&[u8]>,
) -> Result<()> {
if bytes.len() > MAX_FRAME_SIZE {
let bytes_len: usize = bytes.iter().map(|b| b.len()).sum();
if bytes_len > MAX_FRAME_SIZE {
bail!("unreasonably large frame write");
}
let frame_len = u32::try_from(bytes.len())?;
write_frame_header(&mut writer, frame_type, frame_len).await?;
writer.write_all(&bytes).await?;
write_frame_header(&mut writer, frame_type, bytes_len).await?;
for b in bytes {
writer.write_all(b).await?;
}
writer.flush().await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_basic_read_write() -> Result<()> {
let (mut reader, mut writer) = tokio::io::duplex(1024);

write_frame_header(&mut writer, FRAME_PEER_GONE, 301).await?;
let (frame_type, frame_len) = read_frame_header(&mut reader).await?;
assert_eq!(frame_type, FRAME_PEER_GONE);
assert_eq!(frame_len, 301);

let expect_buf = b"hello world!";
write_frame(&mut writer, FRAME_HEALTH, vec![expect_buf]).await?;
println!("{:?}", reader);
let mut got_buf = BytesMut::new();
let (frame_type, frame_len) = read_frame(&mut reader, 1024, &mut got_buf).await?;
assert_eq!(FRAME_HEALTH, frame_type);
assert_eq!(expect_buf.len(), frame_len);
assert_eq!(expect_buf.as_slice(), &got_buf);
Ok(())
}
}
Loading

0 comments on commit 9d0f025

Please sign in to comment.