diff --git a/asyncua/client/client.py b/asyncua/client/client.py index 6909e87d5..a2606be44 100644 --- a/asyncua/client/client.py +++ b/asyncua/client/client.py @@ -454,8 +454,13 @@ async def _monitor_server_loop(self): await asyncio.sleep(timeout) # @FIXME handle state change _ = await self.nodes.server_state.read_value() + except ConnectionError as e: + _logger.info("connection error in watchdog loop %s", e, exc_info=True) + await self.uaclient.inform_subscriptions(ua.StatusCodes.BadShutdown) + raise except Exception: _logger.exception("Error in watchdog loop") + await self.uaclient.inform_subscriptions(ua.StatusCodes.BadShutdown) raise async def _renew_channel_loop(self): @@ -474,6 +479,9 @@ async def _renew_channel_loop(self): await self.open_secure_channel(renew=True) val = await self.nodes.server_state.read_value() _logger.debug("server state is: %s ", val) + except ConnectionError as e: + _logger.info("connection error in watchdog loop %s", e, exc_info=True) + raise except Exception: _logger.exception("Error while renewing session") raise diff --git a/asyncua/client/ua_client.py b/asyncua/client/ua_client.py index 41e15cfb8..d939dae6b 100644 --- a/asyncua/client/ua_client.py +++ b/asyncua/client/ua_client.py @@ -490,6 +490,25 @@ async def create_subscription( self._publish_task = asyncio.create_task(self._publish_loop()) return response.Parameters + async def inform_subscriptions(self, status: ua.StatusCode): + """ + Inform all current subscriptions with a status code. This calls the handlers status_change_notification + """ + status_message = ua.StatusChangeNotification(Status=status) + notification_message = ua.NotificationMessage(NotificationData=[status_message]) + for subid, callback in self._subscription_callbacks.items(): + try: + parameters = ua.PublishResult( + subid, + NotificationMessage_=notification_message + ) + if asyncio.iscoroutinefunction(callback): + await callback(parameters) + else: + callback(parameters) + except Exception: # we call user code, catch everything! + self.logger.exception("Exception while calling user callback: %s") + async def update_subscription( self, params: ua.ModifySubscriptionParameters ) -> ua.ModifySubscriptionResult: diff --git a/tests/test_connections.py b/tests/test_connections.py index 4f7063706..be86eed8d 100644 --- a/tests/test_connections.py +++ b/tests/test_connections.py @@ -50,6 +50,12 @@ async def test_safe_disconnect(): async def test_client_connection_lost(): + class LostSubHandler: + def __init__(self) -> None: + self.status = ua.StatusCodes.Good + + def status_change_notification(self, status): + self.status = status # Test the disconnect behavoir port = find_free_port() srv = Server() @@ -57,11 +63,15 @@ async def test_client_connection_lost(): srv.set_endpoint(f'opc.tcp://127.0.0.1:{port}') await srv.start() async with Client(f'opc.tcp://127.0.0.1:{port}', timeout=0.5, watchdog_intervall=1) as cl: + myhandler = LostSubHandler() + _ = await cl.create_subscription(1, myhandler) await srv.stop() await asyncio.sleep(2) with pytest.raises(ConnectionError): # check if connection is alive await cl.check_connection() + # check if the status_change_notification was triggered + assert myhandler.status == ua.StatusCodes.BadShutdown # check if exception is correct rethrown on second call with pytest.raises(ConnectionError): await cl.check_connection()