Skip to content

Commit

Permalink
deserialize messages
Browse files Browse the repository at this point in the history
  • Loading branch information
DrewRidley committed Jun 16, 2024
1 parent 81ca091 commit 55d3826
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 0 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bevy_interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod prelude {
pub use crate::description::{CloneableDescription, Description};
pub use crate::endpoint::{BevyConnection, BevyEndpoint, ConnectError, Connections};
pub use crate::stream_headers::{
headers::{HeaderId, HeaderPlugin},
EndpointStreamHeaders, HeaderStreamEvent, HeaderStreamEventType, HeaderStreamId,
StreamHeaderPlugin,
};
Expand Down
49 changes: 49 additions & 0 deletions crates/bevy_interface/src/stream_headers/headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use bevy::prelude::*;
use std::marker::PhantomData;

use super::Header;

/// add this plugin to assign a [HeaderId] for `T`
pub struct HeaderPlugin<T>(PhantomData<T>);

impl<T> Default for HeaderPlugin<T> {
fn default() -> Self {
HeaderPlugin(PhantomData)
}
}

impl<T: Send + Sync + 'static> Plugin for HeaderPlugin<T> {
fn build(&self, app: &mut App) {
let header = if let Some(mut next_header_id) = app.world.get_resource_mut::<NextHeaderId>()
{
let header = next_header_id.0;
next_header_id.0 += 1;
header
} else {
let header = 0;
app.world.insert_resource(NextHeaderId(header + 1));
header
};

app.insert_resource(HeaderId::<T> {
_p: PhantomData,
header,
});
}
}

#[derive(Resource)]
struct NextHeaderId(Header);

/// use this resource to get the header id for `T`
#[derive(Resource)]
pub struct HeaderId<T> {
_p: PhantomData<T>,
header: Header,
}

impl<T> HeaderId<T> {
pub fn get(&self) -> Header {
self.header
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use bevy::{ecs::schedule::ScheduleLabel, prelude::*, utils::intern::Interned};
use transport_interface::StreamEventType;

pub mod headers;

use crate::{
connections::{BevyConnectionMut, StreamError},
prelude::{BevyStreamEvent, BevyStreamId, Connections, Description},
Expand Down
1 change: 1 addition & 0 deletions crates/bevy_interface/src\stream_headers/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Empty file.
10 changes: 10 additions & 0 deletions crates/nevy_messaging/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "nevy_messaging"
version = "0.1.0"
edition = "2021"

[dependencies]
bevy_interface.path = "../bevy_interface"
bevy.workspace = true
serde = { version = "1.0", features = ["derive"] }
bincode = "1.3"
5 changes: 5 additions & 0 deletions crates/nevy_messaging/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use std::collections::VecDeque;

use bevy::prelude::Resource;

pub mod message_id;
129 changes: 129 additions & 0 deletions crates/nevy_messaging/src/message_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use std::{collections::VecDeque, marker::PhantomData};

use bevy::{ecs::schedule::ScheduleLabel, prelude::*, utils::intern::Interned};
use serde::de::DeserializeOwned;

pub struct MessageDeserializationPlugin {
schedule: Interned<dyn ScheduleLabel>,
messages: Vec<Box<dyn MessageIdBuilder>>,
}

trait MessageIdBuilder: Send + Sync + 'static {
fn build(&self, schedule: Interned<dyn ScheduleLabel>, message_id: u16, app: &mut App);
}

struct MessageIdBuilderType<T>(PhantomData<T>);

impl MessageDeserializationPlugin {
pub fn new(schedule: impl ScheduleLabel) -> Self {
MessageDeserializationPlugin {
schedule: schedule.intern(),
messages: Vec::new(),
}
}
}

impl Default for MessageDeserializationPlugin {
fn default() -> Self {
MessageDeserializationPlugin::new(PreUpdate)
}
}

impl MessageDeserializationPlugin {
pub fn add_message<T: DeserializeOwned + Send + Sync + 'static>(&mut self) -> &mut Self {
self.messages
.push(Box::new(MessageIdBuilderType::<T>(PhantomData)));

self
}
}

impl Plugin for MessageDeserializationPlugin {
fn build(&self, app: &mut App) {
app.init_resource::<ReceivedSerializedMessages>();

for (message_id, builder) in self.messages.iter().enumerate() {
builder.build(self.schedule, message_id as u16, app);
}
}
}

impl<T: DeserializeOwned + Send + Sync + 'static> MessageIdBuilder for MessageIdBuilderType<T> {
fn build(&self, schedule: Interned<dyn ScheduleLabel>, message_id: u16, app: &mut App) {
app.insert_resource(MessageId::<T> {
_p: PhantomData,
message_id,
});

app.insert_resource(ReceivedMessages::<T> {
messages: VecDeque::default(),
});

app.add_systems(schedule, deserialize_messages::<T>);
}
}

#[derive(Resource, Default)]
pub(crate) struct ReceivedSerializedMessages {
buffers: Vec<VecDeque<Box<[u8]>>>,
}

impl ReceivedSerializedMessages {
pub fn push_message(&mut self, message_id: u16, message: Box<[u8]>) {
let buffer = loop {
if let Some(buffer) = self.buffers.get_mut(message_id as usize) {
break buffer;
} else {
self.buffers.push(VecDeque::new());
}
};

buffer.push_back(message);
}

pub fn poll_message_received(&mut self, message_id: u16) -> Option<Box<[u8]>> {
let buffer = self.buffers.get_mut(message_id as usize)?;
buffer.pop_front()
}
}

#[derive(Resource)]
pub(crate) struct MessageId<T> {
_p: PhantomData<T>,
message_id: u16,
}

impl<T> MessageId<T> {
pub fn get(&self) -> u16 {
self.message_id
}
}

#[derive(Resource, Default)]
pub struct ReceivedMessages<T> {
messages: VecDeque<T>,
}

impl<T> ReceivedMessages<T> {
pub fn drain(&mut self) -> impl Iterator<Item = T> + '_ {
self.messages.drain(..)
}
}

fn deserialize_messages<T: DeserializeOwned + Send + Sync + 'static>(
message_id: Res<MessageId<T>>,
mut serialized_messages: ResMut<ReceivedSerializedMessages>,
mut deserialized_messages: ResMut<ReceivedMessages<T>>,
) {
while let Some(bytes) = serialized_messages.poll_message_received(message_id.get()) {
let Ok(deserialized) = bincode::deserialize(bytes.as_ref()) else {
warn!(
"failed to deserialize a \"{}\" message",
std::any::type_name::<T>()
);
continue;
};

deserialized_messages.messages.push_back(deserialized);
}
}

0 comments on commit 55d3826

Please sign in to comment.