-
-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathengineioxide.rs
87 lines (72 loc) · 2.72 KB
/
engineioxide.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
//! This is a end-to-end test server used with this [test suite](/~https://github.com/socketio/engine.io-protocol)
use std::{sync::Arc, time::Duration};
use bytes::Bytes;
use engineioxide::{
config::EngineIoConfig,
handler::EngineIoHandler,
service::EngineIoService,
socket::{DisconnectReason, Socket},
Str,
};
use hyper::server::conn::http1;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;
#[derive(Debug, Clone)]
struct MyHandler;
impl EngineIoHandler for MyHandler {
type Data = ();
fn on_connect(self: Arc<Self>, socket: Arc<Socket<Self::Data>>) {
println!("socket connect {}", socket.id);
}
fn on_disconnect(&self, socket: Arc<Socket<Self::Data>>, reason: DisconnectReason) {
println!("socket disconnect {}: {:?}", socket.id, reason);
}
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong message {:?}", msg);
socket.emit(msg).ok();
}
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong binary message {:?}", data);
socket.emit_binary(data).ok();
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = FmtSubscriber::builder()
.with_line_number(true)
.with_max_level(Level::DEBUG)
.finish();
tracing::subscriber::set_global_default(subscriber)?;
let config = EngineIoConfig::builder()
.ping_interval(Duration::from_millis(300))
.ping_timeout(Duration::from_millis(200))
.max_payload(1e6 as u64)
.build();
let svc = EngineIoService::with_config(Arc::new(MyHandler), config);
let listener = TcpListener::bind("127.0.0.1:3000").await?;
#[cfg(feature = "v3")]
tracing::info!("Starting server with v3 protocol");
#[cfg(feature = "v4")]
tracing::info!("Starting server with v4 protocol");
// We start a loop to continuously accept incoming connections
loop {
let (stream, _) = listener.accept().await?;
// Use an adapter to access something implementing `tokio::io` traits as if they implement
// `hyper::rt` IO traits.
let io = TokioIo::new(stream);
let svc = svc.clone();
// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
// Finally, we bind the incoming connection to our `hello` service
if let Err(err) = http1::Builder::new()
.serve_connection(io, svc)
.with_upgrades()
.await
{
println!("Error serving connection: {:?}", err);
}
});
}
}