-
Notifications
You must be signed in to change notification settings - Fork 70
/
Copy pathpool.py
222 lines (186 loc) · 7.89 KB
/
pool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
"""Handling of ledger pool instances."""
import json
from datetime import datetime
from typing import Dict, Mapping, Sequence, Union
from . import bindings
from .error import VdrError, VdrErrorCode
from .ledger import Request, build_custom_request
from .utils import get_genesis_txns_from_did_indy_repo_by_name
class Pool:
"""An opened ledger pool instance."""
def __init__(self, handle: bindings.PoolHandle):
"""Initialize the pool instance."""
self.handle = handle
self.last_refresh: datetime = None
self.last_status: dict = None
def close(self):
"""Close and free the pool instance."""
if hasattr(self, "handle") and self.handle:
bindings.pool_close(self.handle)
self.handle = None
async def get_status(self) -> dict:
"""Get the current status of the pool instance."""
if not self.handle:
raise VdrError(VdrErrorCode.WRAPPER, "pool is closed")
result = await bindings.pool_get_status(self.handle)
self.last_status = json.loads(result)
return result
async def get_transactions(self) -> str:
"""Get the current pool transactions of the pool instance."""
if not self.handle:
raise VdrError(VdrErrorCode.WRAPPER, "pool is closed")
return await bindings.pool_get_transactions(self.handle)
async def get_verifiers(self) -> dict:
"""Get the current set of active verifiers for the pool instance."""
if not self.handle:
raise VdrError(VdrErrorCode.WRAPPER, "pool is closed")
return json.loads(await bindings.pool_get_verifiers(self.handle))
@property
def last_refresh_seconds(self) -> float:
"""Get the number of seconds since the last verifier pool refresh."""
return (
(datetime.now() - self.last_refresh).total_seconds()
if self.last_refresh
else None
)
async def refresh(self) -> dict:
"""Check the verifier pool and load any new pool transactions."""
await bindings.pool_refresh(self.handle)
result = await bindings.pool_get_status(self.handle)
self.last_status = json.loads(result)
self.last_refresh = datetime.now()
return self.last_status
async def submit_action(
self,
request: Union[str, bytes, dict, Request],
node_aliases: Sequence[str] = None,
timeout: int = None,
) -> dict:
"""Submit a pool action to all verifier nodes.
The following requests are sent as actions:
GET_VALIDATOR_INFO
POOL_RESTART
Args:
request: May be a prepared `Request` instance, a JSON string or bytes
instance, or a dict representing a new custom ledger request
Returns:
A dict with the node aliases as keys and the node's responses
as values
"""
if not isinstance(request, Request):
request = build_custom_request(request)
if not self.handle:
raise VdrError(VdrErrorCode.WRAPPER, "pool is closed")
if not request.handle:
raise VdrError(VdrErrorCode.WRAPPER, "no request handle")
fut = bindings.pool_submit_action(
self.handle, request.handle, node_aliases, timeout
)
request.handle = None # request has been removed
result = await fut
return json.loads(result)
async def submit_request(self, request: Union[str, bytes, dict, Request]) -> dict:
"""Submit a ledger request.
Args:
request: May be a prepared `Request` instance, a JSON string or bytes
instance, or a dict representing a new custom ledger request
Returns:
A dict representing the decoded JSON response
"""
if not isinstance(request, Request):
request = build_custom_request(request)
if not self.handle:
raise VdrError(VdrErrorCode.WRAPPER, "pool is closed")
if not request.handle:
raise VdrError(VdrErrorCode.WRAPPER, "no request handle")
fut = bindings.pool_submit_request(self.handle, request.handle)
request.handle = None # request has been removed
result = await fut
# FIXME improve handling of bad request
return json.loads(result)["result"]
def __del__(self):
"""Close the pool instance when there are no more references to this object."""
self.close()
def __repr__(self) -> str:
"""Format the pool instance as a debug string."""
if self.handle:
last_refresh = self.last_refresh_seconds
last_refresh = int(last_refresh) if last_refresh is not None else None
mt_size = self.last_status and self.last_status.get("mt_size")
node_count = len(self.last_status and self.last_status.get("nodes") or ())
status = (
f"({self.handle.value}, count={node_count}, "
f"last_refresh={last_refresh}, mt_size={mt_size})"
)
else:
status = "(closed)"
return f"{self.__class__.__name__}{status}"
async def open_pool(
transactions_path: str = None,
transactions: str = None,
node_weights: Mapping[str, float] = None,
no_refresh: bool = False,
socks_proxy: str = None,
) -> Pool:
"""Create a new ledger pool instance.
Either `transactions` or `transactions_path` must be specified, but not both.
Args:
transactions_path: The path to a genesis transactions file
transactions: A JSON string representing the genesis transactions
node_weights: A dict with node aliases as the keys and priority weights
as the values. The default weight is 1.0, so higher weights give the
node a higher probability of being selected. A weight of zero means the
node will never be selected.
no_refresh: Disable the initial verifier pool refresh
socks_proxy: The socks proxy host name and port for ZMQ
(example: proxy1.intranet.company.com:1080)
Returns:
A new `Pool` instance which may be used to submit ledger requests
"""
if not (bool(transactions_path) ^ bool(transactions)):
raise VdrError(
VdrErrorCode.WRAPPER,
"Must provide one of transactions or transactions_path",
)
if socks_proxy:
bindings.set_socks_proxy(socks_proxy)
params = {
"transactions": transactions,
"transactions_path": transactions_path,
"node_weights": node_weights,
}
pool = Pool(bindings.pool_create(params))
if not no_refresh:
await pool.refresh()
return pool
async def open_pools(
genesis_map: dict = None,
ledgers: list = None,
no_refresh: bool = False,
socks_proxy: str = None,
) -> Dict[str, Pool]:
"""Create a map of new ledger pool instances.
Either `genesis_map` or `ledgers` must be specified, but not both.
Args:
genesis_map: A dict mapping namespaces to ledger genesis files
ledgers: A list of ledger namespaces. Genesis files will be fetched from
did indy networks Github repo
no_refresh: Disable the initial verifier pool refresh
socks_proxy: The socks proxy host name and port for ZMQ
(example: proxy1.intranet.company.com:1080)
Returns:
A dict of namespace to `Pool` instance mappings which may be used to
initialize a DID resolver.
"""
pool_map = dict()
if not (bool(genesis_map) ^ bool(ledgers)):
raise VdrError(
VdrErrorCode.WRAPPER,
"Must provide one of genesis_map or ledgers",
)
if not genesis_map:
genesis_map = get_genesis_txns_from_did_indy_repo_by_name(ledgers)
for namespace, genesis_txn_path in genesis_map.items():
pool = await open_pool(genesis_txn_path, None, None, no_refresh, socks_proxy)
pool_map[namespace] = pool
return pool_map