From 2c78916fccef7cc72e38ad5c6308400222e0e2aa Mon Sep 17 00:00:00 2001 From: Alexis Metge Date: Wed, 11 Jan 2023 13:49:22 +0100 Subject: [PATCH] Interpret Null UserIdentityToken as Anonymous during ActivateSession OPC-UA specification Part 4, 5.6.3 specifies that a Null or empty user token shall always be interpreted as anonymous. Add a test for this case and a fix to properly handle it. --- asyncua/server/internal_session.py | 4 ++++ tests/test_server.py | 16 ++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/asyncua/server/internal_session.py b/asyncua/server/internal_session.py index 89940a448..20fb6ec3a 100644 --- a/asyncua/server/internal_session.py +++ b/asyncua/server/internal_session.py @@ -89,6 +89,10 @@ def activate_session(self, params, peer_certificate): for _ in params.ClientSoftwareCertificates: result.Results.append(ua.StatusCode()) id_token = params.UserIdentityToken + if isinstance(id_token, ua.ExtensionObject) and id_token.TypeId == ua.NodeId(ua.ObjectIds.Null): + # https://reference.opcfoundation.org/Core/Part4/v104/docs/5.6.3 + # Null or empty user token shall always be interpreted as anonymous. + id_token = ua.AnonymousIdentityToken() # Check if security policy is supported if not isinstance(id_token, self.iserver.supported_tokens): self.logger.error('Rejected active session UserIdentityToken not supported') diff --git a/tests/test_server.py b/tests/test_server.py index 085b051dc..877054120 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -769,3 +769,19 @@ def test_port_in_use(self): server1.stop() server2.stop() """ + +async def test_null_auth(server): + """ + OPC-UA Specification Part 4, 5.6.3 specifies that a: + > Null or empty user token shall always be interpreted as anonymous + + Ensure a Null token is accepted as an anonymous connection token. + """ + client = Client(server.endpoint.geturl()) + # Modify the authentication creation in the client request + def _add_null_auth(self, params): + params.UserIdentityToken = ua.ExtensionObject(ua.NodeId(ua.ObjectIds.Null)) + client._add_anonymous_auth = _add_null_auth.__get__(client, Client) + # Attempt to connect, this should be accepted without error + async with client: + pass