Skip to content

Commit

Permalink
sending routing packets works
Browse files Browse the repository at this point in the history
  • Loading branch information
LaurinZ committed Jun 9, 2024
1 parent 0723d51 commit 662c377
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
16 changes: 1 addition & 15 deletions src/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,10 @@ pub async fn handle_console( state: Arc<Mutex<Shared>>) -> Result<(), Box<dyn Er
//spawn asynchronous handler
tokio::spawn(async move {
tracing::info!("connected to: {}",destination_addr);
if let Err(e) = process(proccess_state, stream, destination_addr).await {
if let Err(e) = process(proccess_state, stream, destination_addr,true).await {
tracing::info!("an error occurred; error = {:?}", e);
}
});
{
let lock = state.lock().await;
let peer = match lock.peers.get(&addr) {
Some(peer) => peer,
None => {
tracing::error!("Maybe too early for CR?: {}",addr);
continue;
}
};
//send to channel
if let Err(e) = peer.send(ChannelEvent::Routing(CR)) {
tracing::info!("Error sending the CR. error = {:?}", e);
}
}
} else if line.starts_with("contacts") {
// diplay the routing table
{
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Spawn our handler to be run asynchronously.
tokio::spawn(async move {
tracing::info!("accepted connection to {}",addr);
if let Err(e) = process(state, stream, addr).await {
if let Err(e) = process(state, stream, addr, false).await {
tracing::info!("an error occurred; error = {:?}", e);
}
});
Expand Down
29 changes: 29 additions & 0 deletions src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use crate::peer::Peer;
use crate::protocol::routing_packet::{RoutingEntry, RoutingPacket};
use crate::protocol::Packet;
use crate::protocol::{MESSAGE, CR, CRR, SCC, SCCR, STU};
use crate::protocol::shared_header::SharedHeader;
Expand All @@ -26,6 +27,7 @@ pub async fn process(
state: Arc<Mutex<Shared>>,
stream: TcpStream,
addr: SocketAddr,
send_cr: bool,
) -> Result<(), Box<dyn Error>> {
let local_addr = match stream.local_addr() {
Ok(local_addr) => local_addr,
Expand All @@ -43,6 +45,18 @@ pub async fn process(
let mut state = state.lock().await;
let msg = tracing::info!("{addr} has joined the chat");
state.broadcast(addr, &ChannelEvent::Join(addr.to_string())).await;
if send_cr {
match state.peers.get(&addr) {
Some(entry) => {
if let Err(e) = entry.send(ChannelEvent::Routing(CR)) {
tracing::info!("Error sending the CR. error = {:?}", e);
}
},
None => {
tracing::error!("Maybe too early for CR?: {}",addr);
}
};
}
}

// Process incoming messages until our stream is exhausted by a disconnect.
Expand Down Expand Up @@ -73,6 +87,20 @@ pub async fn process(
ChannelEvent::Forward(packet) => {
peer.swag_coder.send(packet).await?;
}
ChannelEvent::Routing(type_id) => {
//get current routing table
let rt: Vec<RoutingEntry>;
{
let mut lock = state.lock().await;
rt = lock.get_routing_table(addr).await;
}
let routing_packet = RoutingPacket {
header,
table: rt,
};
tracing::info!("sending a routing packet.");
peer.swag_coder.send(Packet::RoutingPacket(routing_packet,type_id)).await?;
}
_ => tracing::error!("Received Event: {:#?} is not implemented!", event),
}

Expand Down Expand Up @@ -139,6 +167,7 @@ pub async fn process(
}
}
Packet::RoutingPacket(routing_packet, type_id) => {
tracing::info!("received a routing packet.");
//we received a routing packet, check which one and handle it:
match *type_id {
//routing packet type_ids:
Expand Down

0 comments on commit 662c377

Please sign in to comment.