-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathroutemsg.py
27 lines (25 loc) · 1.15 KB
/
routemsg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import json
import re
from pprint import pprint
def routemsg(r, agave_dest):
# Kick off downstream Reactors by filename glob match
message = {'uri': agave_dest}
for routename, globs in r.settings.routings.items():
actor_id = r.settings.linked_reactors.get(routename, {}).get('id')
for glob in globs:
if re.compile(glob).search(agave_dest):
r.logger.debug('Route: dest={}, content={}'.format(actor_id, message))
try:
if r.local is False:
resp = r.send_message(actor_id, message=message)
if resp is not None:
if 'executionId' in resp:
r.logger.debug(
'Route: executionId={}, actorId={}'.format(
resp['executionId'], actor_id))
else:
pprint(message)
break
except Exception as exc:
r.on_failure('Route: Failed to launch {}:{} for {}'.format(
routename, actor_id, agave_dest), exc)