forked from paritytech/jsonrpc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpubsub-macros.rs
105 lines (87 loc) · 2.69 KB
/
pubsub-macros.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
extern crate jsonrpc_core;
extern crate jsonrpc_pubsub;
#[macro_use]
extern crate jsonrpc_macros;
extern crate jsonrpc_tcp_server;
use std::thread;
use std::sync::{atomic, Arc, RwLock};
use std::collections::HashMap;
use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_core::futures::Future;
use jsonrpc_pubsub::{Session, PubSubHandler, SubscriptionId};
use jsonrpc_macros::pubsub;
build_rpc_trait! {
pub trait Rpc {
type Metadata;
/// Adds two numbers and returns a result
#[rpc(name = "add")]
fn add(&self, u64, u64) -> Result<u64>;
#[pubsub(name = "hello")] {
/// Hello subscription
#[rpc(name = "hello_subscribe", alias = ["hello_sub", ])]
fn subscribe(&self, Self::Metadata, pubsub::Subscriber<String>, u64);
/// Unsubscribe from hello subscription.
#[rpc(name = "hello_unsubscribe")]
fn unsubscribe(&self, Option<Self::Metadata>, SubscriptionId) -> Result<bool>;
}
}
}
#[derive(Default)]
struct RpcImpl {
uid: atomic::AtomicUsize,
active: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<String>>>>,
}
impl Rpc for RpcImpl {
type Metadata = Arc<Session>;
fn add(&self, a: u64, b: u64) -> Result<u64> {
Ok(a + b)
}
fn subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<String>, param: u64) {
if param != 10 {
subscriber.reject(Error {
code: ErrorCode::InvalidParams,
message: "Rejecting subscription - invalid parameters provided.".into(),
data: None,
}).unwrap();
return;
}
let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
let sub_id = SubscriptionId::Number(id as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
self.active.write().unwrap().insert(sub_id, sink);
}
fn unsubscribe(&self, _meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
let removed = self.active.write().unwrap().remove(&id);
if removed.is_some() {
Ok(true)
} else {
Err(Error {
code: ErrorCode::InvalidParams,
message: "Invalid subscription.".into(),
data: None,
})
}
}
}
fn main() {
let mut io = PubSubHandler::default();
let rpc = RpcImpl::default();
let active_subscriptions = rpc.active.clone();
thread::spawn(move || {
loop {
{
let subscribers = active_subscriptions.read().unwrap();
for sink in subscribers.values() {
let _ = sink.notify(Ok("Hello World!".into())).wait();
}
}
thread::sleep(::std::time::Duration::from_secs(1));
}
});
io.extend_with(rpc.to_delegate());
let server = jsonrpc_tcp_server::ServerBuilder
::with_meta_extractor(io, |context: &jsonrpc_tcp_server::RequestContext| Arc::new(Session::new(context.sender.clone())))
.start(&"0.0.0.0:3030".parse().unwrap())
.expect("Server must start with no issues");
server.wait()
}