Skip to content

Commit

Permalink
add feat: subscription.py optional param for sampling_interval (#1087)
Browse files Browse the repository at this point in the history
* add feat: subscription.py optional param for sampling_interval

* fix: flake8 stuff

* fix: flake8 stuff

* add: missing typehints and small refactoring according to flake8

* Update subscription.py

* remove: unnecessary import

* update: comments

* update: fix wrong typehint
  • Loading branch information
AndreasHeine authored Oct 18, 2022
1 parent f49068a commit ff07391
Showing 1 changed file with 107 additions and 75 deletions.
182 changes: 107 additions & 75 deletions asyncua/common/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,6 @@
from .node import Node


class SubHandler:
"""
Subscription Handler. To receive events from server for a subscription
This class is just a sample class. Whatever class having these methods can be used
"""

def datachange_notification(self, node, val, data):
"""
called for every datachange notification from server
"""
pass

def event_notification(self, event):
"""
called for every event notification from server
"""
pass

def status_change_notification(self, status):
"""
called for every status change notification from server
"""
pass


class SubscriptionItemData:
"""
To store useful data from a monitored item.
Expand All @@ -55,7 +30,11 @@ class DataChangeNotif:
To be send to clients for every datachange notification from server.
"""

def __init__(self, subscription_data, monitored_item):
def __init__(
self,
subscription_data: SubscriptionItemData,
monitored_item: ua.MonitoredItemNotification
):
self.monitored_item = monitored_item
self.subscription_data = subscription_data

Expand All @@ -65,6 +44,31 @@ def __str__(self):
__repr__ = __str__


class SubHandler:
"""
Subscription Handler. To receive events from server for a subscription
This class is just a sample class. Whatever class having these methods can be used
"""

def datachange_notification(self, node: Node, val, data: DataChangeNotif):
"""
called for every datachange notification from server
"""
pass

def event_notification(self, event: ua.EventNotificationList):
"""
called for every event notification from server
"""
pass

def status_change_notification(self, status: ua.StatusChangeNotification):
"""
called for every status change notification from server
"""
pass


class Subscription:
"""
Subscription object returned by Server or Client objects.
Expand All @@ -85,14 +89,16 @@ def __init__(self, server, params: ua.CreateSubscriptionParameters, handler: Sub

async def init(self) -> ua.CreateSubscriptionResult:
response = await self.server.create_subscription(
self.parameters, callback=self.publish_callback
self.parameters,
callback=self.publish_callback
)
self.subscription_id = response.SubscriptionId # move to data class
self.logger.info("Subscription created %s", self.subscription_id)
return response

async def update(
self, params: ua.ModifySubscriptionParameters
self,
params: ua.ModifySubscriptionParameters
) -> ua.ModifySubscriptionResponse:
response = await self.server.update_subscription(params)
self.logger.info('Subscription updated %s', params.SubscriptionId)
Expand Down Expand Up @@ -141,7 +147,7 @@ async def _call_datachange(self, datachange: ua.DataChangeNotification):
tasks = [
self._handler.datachange_notification(*args) for args in known_handles_args
]
if asyncio.iscoroutinefunction(self._handler.datachange_notification):
if asyncio.iscoroutinefunction(self._handler.datachange_notification):
await asyncio.gather(*tasks)
except Exception as ex:
self.logger.exception("Exception calling data change handler. Error: %s", ex)
Expand Down Expand Up @@ -171,12 +177,14 @@ async def _call_status(self, status: ua.StatusChangeNotification):
except Exception:
self.logger.exception("Exception calling status change handler")

async def subscribe_data_change(self,
nodes: Union[Node, Iterable[Node]],
attr=ua.AttributeIds.Value,
queuesize=0,
monitoring=ua.MonitoringMode.Reporting,
) -> Union[int, List[Union[int, ua.StatusCode]]]:
async def subscribe_data_change(
self,
nodes: Union[Node, Iterable[Node]],
attr: ua.AttributeIds = ua.AttributeIds.Value,
queuesize: int = 0,
monitoring=ua.MonitoringMode.Reporting,
sampling_interval: ua.Duration = 0.0
) -> Union[int, List[Union[int, ua.StatusCode]]]:
"""
Subscribe to data change events of one or multiple nodes.
The default attribute used for the subscription is `Value`.
Expand All @@ -191,34 +199,37 @@ async def subscribe_data_change(self,
:param nodes: One Node or an Iterable of Nodes
:param attr: The Node attribute you want to subscribe to
:param queuesize: 0 or 1 for default queue size (shall be 1 - no queuing), n for FIFO queue
:param sampling_interval: ua.Duration
:return: Handle for changing/cancelling of the subscription
"""
return await self._subscribe(
nodes, attr, queuesize=queuesize, monitoring=monitoring
nodes, attr, queuesize=queuesize, monitoring=monitoring, sampling_interval=sampling_interval
)

async def _create_eventfilter(self, evtypes):
async def _create_eventfilter(self, evtypes: Union[ua.ObjectIds, List[ua.ObjectIds], ua.NodeId, List[ua.NodeId]]):
if not type(evtypes) in (list, tuple):
evtypes = [evtypes]
evtypes = [Node(self.server, evtype) for evtype in evtypes]
evfilter = await get_filter_from_event_type(evtypes)
return evfilter

async def subscribe_events(self,
sourcenode: Node = ua.ObjectIds.Server,
evtypes=ua.ObjectIds.BaseEventType,
evfilter=None,
queuesize=0) -> int:
async def subscribe_events(
self,
sourcenode: Node = ua.ObjectIds.Server,
evtypes: Union[ua.ObjectIds, List[ua.ObjectIds], ua.NodeId, List[ua.NodeId]] = ua.ObjectIds.BaseEventType,
evfilter: ua.EventFilter = None,
queuesize: int = 0
) -> int:
"""
Subscribe to events from a node. Default node is Server node.
In most servers the server node is the only one you can subscribe to.
If evtypes is not provided, evtype defaults to BaseEventType.
If evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types.
A handle (integer value) is returned which can be used to modify/cancel the subscription.
:param sourcenode:
:param evtypes:
:param evfilter:
:param sourcenode: Node
:param evtypes: ua.ObjectIds or ua.NodeId
:param evfilter: ua.EventFilter which provides the SelectClauses and WhereClause
:param queuesize: 0 for default queue size, 1 for minimum queue size, n for FIFO queue,
MaxUInt32 for max queue size
:return: Handle for changing/cancelling of the subscription
Expand All @@ -228,21 +239,23 @@ async def subscribe_events(self,
evfilter = await self._create_eventfilter(evtypes)
return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize) # type: ignore

async def subscribe_alarms_and_conditions(self,
sourcenode: Node = ua.ObjectIds.Server,
evtypes=ua.ObjectIds.ConditionType,
evfilter=None,
queuesize=0) -> int:
async def subscribe_alarms_and_conditions(
self,
sourcenode: Node = ua.ObjectIds.Server,
evtypes: Union[ua.ObjectIds, List[ua.ObjectIds], ua.NodeId, List[ua.NodeId]] = ua.ObjectIds.ConditionType,
evfilter: ua.EventFilter = None,
queuesize: int = 0
) -> int:
"""
Subscribe to alarm and condition events from a node. Default node is Server node.
In many servers the server node is the only one you can subscribe to.
If evtypes is not provided, evtype defaults to ConditionType.
If evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types.
A handle (integer value) is returned which can be used to modify/cancel the subscription.
:param sourcenode:
:param evtypes:
:param evfilter:
:param sourcenode: Node
:param evtypes: ua.ObjectIds or ua.NodeId
:param evfilter: ua.EventFilter which provides the SelectClauses and WhereClause
:param queuesize: 0 for default queue size, 1 for minimum queue size, n for FIFO queue,
MaxUInt32 for max queue size
:return: Handle for changing/cancelling of the subscription
Expand All @@ -259,20 +272,23 @@ async def subscribe_alarms_and_conditions(self,
evfilter.SelectClauses.append(conditionIdOperand)
return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize) # type: ignore

async def _subscribe(self,
nodes: Union[Node, Iterable[Node]],
attr=ua.AttributeIds.Value,
mfilter=None,
queuesize=0,
monitoring=ua.MonitoringMode.Reporting,
) -> Union[int, List[Union[int, ua.StatusCode]]]:
async def _subscribe(
self,
nodes: Union[Node, Iterable[Node]],
attr = ua.AttributeIds.Value,
mfilter: ua.MonitoringFilter = None,
queuesize: int = 0,
monitoring: ua.MonitoringMode = ua.MonitoringMode.Reporting,
sampling_interval: ua.Duration = 0.0
) -> Union[int, List[Union[int, ua.StatusCode]]]:
"""
Private low level method for subscribing.
:param nodes: One Node or an Iterable of Nodes.
:param attr: ua.AttributeId
:param mfilter: MonitoringFilter
:param attr: ua.AttributeId which shall be subscribed
:param mfilter: ua.MonitoringFilter which shall be applied
:param queuesize: queue size
:param monitoring: ua.MonitoringMode
:param sampling_interval: ua.Duration
:return: Integer handle or if multiple Nodes were given a List of Integer handles/ua.StatusCode
"""
is_list = True
Expand All @@ -285,7 +301,7 @@ async def _subscribe(self,
mirs = []
for node in nodes:
mir = self._make_monitored_item_request(
node, attr, mfilter, queuesize, monitoring
node, attr, mfilter, queuesize, monitoring, sampling_interval
)
mirs.append(mir)
# Await MonitoredItemCreateResult
Expand All @@ -298,20 +314,23 @@ async def _subscribe(self,
mids[0].check()
return mids[0] # type: ignore

def _make_monitored_item_request(self,
node: Node,
attr,
mfilter,
queuesize,
monitoring) -> ua.MonitoredItemCreateRequest:
def _make_monitored_item_request(
self,
node: Node,
attr,
mfilter,
queuesize,
monitoring,
sampling_interval
) -> ua.MonitoredItemCreateRequest:
rv = ua.ReadValueId()
rv.NodeId = node.nodeid
rv.AttributeId = attr
# rv.IndexRange //We leave it null, then the entire array is returned
mparams = ua.MonitoringParameters()
self._client_handle += 1
mparams.ClientHandle = self._client_handle
mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
mparams.SamplingInterval = sampling_interval
mparams.QueueSize = queuesize
mparams.DiscardOldest = True
if mfilter:
Expand All @@ -322,7 +341,7 @@ def _make_monitored_item_request(self,
mir.RequestedParameters = mparams
return mir

async def create_monitored_items(self, monitored_items) -> List[Union[int, ua.StatusCode]]:
async def create_monitored_items(self, monitored_items: List[ua.MonitoredItemCreateRequest]) -> List[Union[int, ua.StatusCode]]:
"""
low level method to have full control over subscription parameters.
Client handle must be unique since it will be used as key for internal registration of data.
Expand Down Expand Up @@ -374,8 +393,8 @@ async def unsubscribe(self, handle: Union[int, List[int]]):
for handle in handles:
if handle in handle_map:
del self._monitored_items[handle_map[handle]]

async def modify_monitored_item(self, handle: int, new_samp_time, new_queuesize=0, mod_filter_val=-1):
async def modify_monitored_item(self, handle: int, new_samp_time: ua.Duration, new_queuesize: int = 0, mod_filter_val: int = -1):
"""
Modify a monitored item.
:param handle: Handle returned when originally subscribing
Expand Down Expand Up @@ -411,15 +430,28 @@ async def modify_monitored_item(self, handle: int, new_samp_time, new_queuesize=
item_to_change.mfilter = results[0].FilterResult
return results

def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter, client_handle):
def _modify_monitored_item_request(
self,
new_queuesize: int,
new_samp_time: ua.Duration,
mod_filter: ua.DataChangeFilter,
client_handle: ua.IntegerId
):
req_params = ua.MonitoringParameters()
req_params.ClientHandle = client_handle
req_params.QueueSize = new_queuesize
req_params.Filter = mod_filter
req_params.SamplingInterval = new_samp_time
return req_params

def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
def deadband_monitor(
self,
var: Union[Node, Iterable[Node]],
deadband_val: ua.Double,
deadbandtype: ua.UInt32 = 1,
queuesize: int = 0,
attr: ua.AttributeIds = ua.AttributeIds.Value
):
"""
Method to create a subscription with a Deadband Value.
Default deadband value type is absolute.
Expand Down

0 comments on commit ff07391

Please sign in to comment.