diff --git a/crossbar/worker/rlink.py b/crossbar/worker/rlink.py index 7c33690b3..f33e7c1d2 100644 --- a/crossbar/worker/rlink.py +++ b/crossbar/worker/rlink.py @@ -74,7 +74,7 @@ def on_subscription_create(sub_session, sub_details, details=None): The handler will then also subscribe on the other router, and when receiving events, re-publish those on this router. - :param sub_id: + :param sub_session: :param sub_details: :param details: :return: @@ -82,15 +82,19 @@ def on_subscription_create(sub_session, sub_details, details=None): if sub_details["uri"].startswith("wamp."): return - if sub_details["id"] in self._subs: - # this should not happen actually, but not sure .. + sub_id = sub_details["id"] + + if sub_id in self._subs and self._subs[sub_id]["sub"]: + # This will happen if, partway through the subscription process, the RLink disconnects self.log.error('on_subscription_create: sub ID {sub_id} already in map {method}', - sub_id=sub_details["id"], + sub_id=sub_id, method=hltype(BridgeSession._setup_event_forwarding)) return - self._subs[sub_details["id"]] = sub_details - self._subs[sub_details["id"]]["sub"] = None + sub_details_local = copy.deepcopy(sub_details) + if sub_id not in self._subs: + sub_details_local["sub"] = None + self._subs[sub_id] = sub_details_local uri = sub_details['uri'] ERR_MSG = [None] @@ -162,17 +166,17 @@ def on_event(*args, **kwargs): uri)) return - if sub_details["id"] not in self._subs: + if sub_id not in self._subs: self.log.info("subscription already gone: {uri}", uri=sub_details['uri']) - yield sub.unregister() + yield sub.unsubscribe() else: - self._subs[sub_details["id"]]["sub"] = sub + self._subs[sub_id]["sub"] = sub self.log.debug( "created forwarding subscription: me={me} other={other} sub_id={sub_id} sub_details={sub_details} details={details} sub_session={sub_session}", me=self._session_id, other=other, - sub_id=sub_details["id"], + sub_id=sub_id, sub_details=sub_details, details=details, sub_session=sub_session, @@ -222,12 +226,21 @@ def forward_current_subs(): def on_remote_join(_session, _details): yield forward_current_subs() + def on_remote_leave(_session, _details): + # The remote session has ended, clear subscription records. + # Clearing this dictionary helps avoid the case where + # local procedures are not subscribed on the remote leg + # on reestablishment of remote session. + # See: /~https://github.com/crossbario/crossbar/issues/1909 + self._subs = {} + if self.IS_REMOTE_LEG: yield forward_current_subs() else: # from the local leg, don't try to forward events on the # remote leg unless the remote session is established. other.on('join', on_remote_join) + other.on('leave', on_remote_leave) # listen to when new subscriptions are created on the local router yield self.subscribe(on_subscription_create, @@ -267,15 +280,19 @@ def on_registration_create(reg_session, reg_details, details=None): if reg_details['uri'].startswith("wamp."): return - if reg_details['id'] in self._regs: - # this should not happen actually, but not sure .. + reg_id = reg_details["id"] + + if reg_id in self._regs and self._regs[reg_id]["reg"]: + # This will happen if, partway through the registration process, the RLink disconnects self.log.error('on_registration_create: reg ID {reg_id} already in map {method}', - reg_id=reg_details['id'], + reg_id=reg_id, method=hltype(BridgeSession._setup_invocation_forwarding)) return - self._regs[reg_details['id']] = reg_details - self._regs[reg_details['id']]['reg'] = None + reg_details_local = copy.deepcopy(reg_details) + if reg_id not in self._regs: + reg_details_local["reg"] = None + self._regs[reg_id] = reg_details_local uri = reg_details['uri'] ERR_MSG = [None] @@ -365,17 +382,17 @@ def on_call(*args, **kwargs): # on the "other" router, *this* router may have already # un-registered. If that happened, our registration will # be gone, so we immediately un-register on the other side - if reg_details['id'] not in self._regs: + if reg_id not in self._regs: self.log.info("registration already gone: {uri}", uri=reg_details['uri']) yield reg.unregister() else: - self._regs[reg_details['id']]['reg'] = reg + self._regs[reg_id]['reg'] = reg - self.log.info( + self.log.debug( "created forwarding registration: me={me} other={other} reg_id={reg_id} reg_details={reg_details} details={details} reg_session={reg_session}", me=self._session_id, other=other._session_id, - reg_id=reg_details['id'], + reg_id=reg_id, reg_details=reg_details, details=details, reg_session=reg_session,