Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RLink Forwarding Issue #2080

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions crossbar/worker/rlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,27 @@ def on_subscription_create(sub_session, sub_details, details=None):
The handler will then also subscribe on the other router, and when receiving
events, re-publish those on this router.

:param sub_id:
:param sub_session:
:param sub_details:
:param details:
:return:
"""
if sub_details["uri"].startswith("wamp."):
return

if sub_details["id"] in self._subs:
# this should not happen actually, but not sure ..
sub_id = sub_details["id"]

if sub_id in self._subs and self._subs[sub_id]["sub"]:
# This will happen if, partway through the subscription process, the RLink disconnects
self.log.error('on_subscription_create: sub ID {sub_id} already in map {method}',
sub_id=sub_details["id"],
sub_id=sub_id,
method=hltype(BridgeSession._setup_event_forwarding))
return

self._subs[sub_details["id"]] = sub_details
self._subs[sub_details["id"]]["sub"] = None
sub_details_local = copy.deepcopy(sub_details)
if sub_id not in self._subs:
sub_details_local["sub"] = None
self._subs[sub_id] = sub_details_local

uri = sub_details['uri']
ERR_MSG = [None]
Expand Down Expand Up @@ -162,17 +166,17 @@ def on_event(*args, **kwargs):
uri))
return

if sub_details["id"] not in self._subs:
if sub_id not in self._subs:
self.log.info("subscription already gone: {uri}", uri=sub_details['uri'])
yield sub.unregister()
yield sub.unsubscribe()
else:
self._subs[sub_details["id"]]["sub"] = sub
self._subs[sub_id]["sub"] = sub

self.log.debug(
"created forwarding subscription: me={me} other={other} sub_id={sub_id} sub_details={sub_details} details={details} sub_session={sub_session}",
me=self._session_id,
other=other,
sub_id=sub_details["id"],
sub_id=sub_id,
sub_details=sub_details,
details=details,
sub_session=sub_session,
Expand Down Expand Up @@ -222,12 +226,21 @@ def forward_current_subs():
def on_remote_join(_session, _details):
yield forward_current_subs()

def on_remote_leave(_session, _details):
# The remote session has ended, clear subscription records.
# Clearing this dictionary helps avoid the case where
# local procedures are not subscribed on the remote leg
# on reestablishment of remote session.
# See: /~https://github.com/crossbario/crossbar/issues/1909
self._subs = {}

if self.IS_REMOTE_LEG:
yield forward_current_subs()
else:
# from the local leg, don't try to forward events on the
# remote leg unless the remote session is established.
other.on('join', on_remote_join)
other.on('leave', on_remote_leave)

# listen to when new subscriptions are created on the local router
yield self.subscribe(on_subscription_create,
Expand Down Expand Up @@ -267,15 +280,19 @@ def on_registration_create(reg_session, reg_details, details=None):
if reg_details['uri'].startswith("wamp."):
return

if reg_details['id'] in self._regs:
# this should not happen actually, but not sure ..
reg_id = reg_details["id"]

if reg_id in self._regs and self._regs[reg_id]["reg"]:
# This will happen if, partway through the registration process, the RLink disconnects
self.log.error('on_registration_create: reg ID {reg_id} already in map {method}',
reg_id=reg_details['id'],
reg_id=reg_id,
method=hltype(BridgeSession._setup_invocation_forwarding))
return

self._regs[reg_details['id']] = reg_details
self._regs[reg_details['id']]['reg'] = None
reg_details_local = copy.deepcopy(reg_details)
if reg_id not in self._regs:
reg_details_local["reg"] = None
self._regs[reg_id] = reg_details_local

uri = reg_details['uri']
ERR_MSG = [None]
Expand Down Expand Up @@ -365,17 +382,17 @@ def on_call(*args, **kwargs):
# on the "other" router, *this* router may have already
# un-registered. If that happened, our registration will
# be gone, so we immediately un-register on the other side
if reg_details['id'] not in self._regs:
if reg_id not in self._regs:
self.log.info("registration already gone: {uri}", uri=reg_details['uri'])
yield reg.unregister()
else:
self._regs[reg_details['id']]['reg'] = reg
self._regs[reg_id]['reg'] = reg

self.log.info(
self.log.debug(
"created forwarding registration: me={me} other={other} reg_id={reg_id} reg_details={reg_details} details={details} reg_session={reg_session}",
me=self._session_id,
other=other._session_id,
reg_id=reg_details['id'],
reg_id=reg_id,
reg_details=reg_details,
details=details,
reg_session=reg_session,
Expand Down
Loading