Commit c09db518 authored by mikolajr's avatar mikolajr
Browse files

In OPC-UA comm protocol: improved error handling and corresponding tests,...

In OPC-UA comm protocol: improved error handling and corresponding tests, speed-up tests, format (black)
parent 9f81d0c0
......@@ -16,6 +16,7 @@ from .modbus_tcp import ( # noqa: F401
from .opc import ( # noqa: F401
OpcUaCommunication,
OpcUaCommunicationConfig,
OpcUaCommunicationIOError,
OpcUaSubHandler,
)
from .serial import ( # noqa: F401
......
......@@ -7,6 +7,7 @@ This protocol is used to interface with the "Supercube" PLC from Siemens.
import logging
from functools import wraps
from socket import gaierror
from typing import Iterable, Union, Optional
from opcua import Client, Node, Subscription
......@@ -67,7 +68,7 @@ def _wrap_ua_error(method):
Wrap any `UaError` raised from a `OpcUaCommunication` method into
`OpcUaCommunicationIOError`; additionally, log source error.
:param method: `OpcUaCommunication` method to wrap
:param method: `OpcUaCommunication` instance method to wrap
:return: Whatever `method` returns
"""
......@@ -78,6 +79,35 @@ def _wrap_ua_error(method):
except UaError as e:
self.logger.error(f"UA error: {str(e)}")
raise OpcUaCommunicationIOError from e
except gaierror as e:
self.logger.error(f"Socket address error: {str(e)}")
raise OpcUaCommunicationIOError from e
return wrapper
def _require_ua_opened(method):
"""
Check if `opcua.client.ua_client.UaClient` socket is opened and raise an
`OpcUaCommunicationIOError` if not.
NOTE: this checks should be implemented downstream in
`opcua.client.ua_client.UaClient`; currently you get `AttributeError: 'NoneType'
object has no attribute ...`.
:param method: `OpcUaCommunication` instance method to wrap
:return: Whatever `method` returns
"""
@wraps(method)
def wrapper(self, *args, **kwargs):
# BLAH: this checks should be implemented downstream in
# `opcua.client.ua_client.UaClient`
if self._client.uaclient._uasocket is None:
err_msg = f"Client's socket is not set in {str(self)}. Was it opened?"
self.logger.error(err_msg)
raise OpcUaCommunicationIOError(err_msg)
return method(self, *args, **kwargs)
return wrapper
......@@ -172,6 +202,7 @@ class OpcUaCommunication(CommunicationProtocol):
self._objects_node = None
self._client.disconnect()
@_require_ua_opened
@_wrap_ua_error
def read(self, node_id, ns_index):
"""
......@@ -189,6 +220,7 @@ class OpcUaCommunication(CommunicationProtocol):
NodeId(identifier=node_id, namespaceidx=ns_index)
).get_value()
@_require_ua_opened
@_wrap_ua_error
def write(self, node_id, ns_index, value) -> None:
"""
......@@ -206,6 +238,7 @@ class OpcUaCommunication(CommunicationProtocol):
NodeId(identifier=node_id, namespaceidx=ns_index)
).set_value(DataValue(value))
@_require_ua_opened
@_wrap_ua_error
def init_monitored_nodes(
self, node_id: Union[str, Iterable[str]], ns_index: int
......@@ -220,7 +253,7 @@ class OpcUaCommunication(CommunicationProtocol):
"""
if not self._subscription:
err_msg = f"Looks like {str(self)} was not correctly opened."
err_msg = f"Missing subscription in {str(self)}. Was it opened?"
self.logger.error(err_msg)
raise OpcUaCommunicationIOError(err_msg)
......
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Tests for the OPC UA communication protocol.
"""
......@@ -6,24 +8,24 @@ from time import sleep
import pytest
from hvl_ccb.comm import OpcUaCommunication, OpcUaSubHandler
from hvl_ccb.comm import OpcUaCommunication, OpcUaCommunicationIOError, OpcUaSubHandler
from tests.opctools import DemoServer
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def testconfig():
return {
'host': 'localhost',
'port': 14125,
'endpoint_name': '',
'sub_handler': MySubHandler(),
'update_period': 100
"host": "localhost",
"port": 14125,
"endpoint_name": "",
"sub_handler": MySubHandler(),
"update_period": 10,
}
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def demo_opcua_server():
opcua_server = DemoServer(100, 'x', 14125)
opcua_server = DemoServer(100, "x", 14125)
opcua_server.start()
yield opcua_server
......@@ -31,7 +33,7 @@ def demo_opcua_server():
opcua_server.stop()
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def connected_comm_protocol(testconfig, demo_opcua_server):
opc_comm = OpcUaCommunication(testconfig)
opc_comm.open()
......@@ -40,7 +42,6 @@ def connected_comm_protocol(testconfig, demo_opcua_server):
class MySubHandler(OpcUaSubHandler):
def __init__(self):
self.change_counter = 0
......@@ -50,46 +51,86 @@ class MySubHandler(OpcUaSubHandler):
self.change_counter += 1
def test_opcua(testconfig, demo_opcua_server):
def test_opcua_open_close(testconfig, demo_opcua_server):
# comm I/O errors on open
config_dict = dict(testconfig)
for config_key, wrong_value in (
("host", "Not a host"),
("port", 0),
):
config_dict[config_key] = wrong_value
opc_comm = OpcUaCommunication(config_dict)
assert opc_comm is not None
assert not opc_comm.is_open
with pytest.raises(OpcUaCommunicationIOError):
opc_comm.open()
assert not opc_comm.is_open
# successful open and close
opc_comm = OpcUaCommunication(testconfig)
assert opc_comm is not None
assert not opc_comm.is_open
opc_comm.open()
assert opc_comm.is_open
opc_comm.close()
assert not opc_comm.is_open
try:
assert not opc_comm.is_open
opc_comm.open()
assert opc_comm.is_open
finally:
opc_comm.close()
assert not opc_comm.is_open
def test_read(connected_comm_protocol, demo_opcua_server):
demo_opcua_server.add_var('testvar_read', 1.23, True)
assert connected_comm_protocol.read('testvar_read', 100) == 1.23
demo_opcua_server.add_var("testvar_read", 1.23, True)
assert connected_comm_protocol.read("testvar_read", 100) == 1.23
def test_write_read(testconfig, demo_opcua_server):
demo_opcua_server.add_var("testvar_write", 1.23, True)
comm_protocol = OpcUaCommunication(testconfig)
with pytest.raises(OpcUaCommunicationIOError):
comm_protocol.write("testvar_write", 100, 2.04)
with pytest.raises(OpcUaCommunicationIOError):
comm_protocol.read("testvar_write", 100)
comm_protocol.open()
try:
comm_protocol.write("testvar_write", 100, 2.04)
assert comm_protocol.read("testvar_write", 100) == 2.04
finally:
comm_protocol.close()
def test_write(connected_comm_protocol, demo_opcua_server):
demo_opcua_server.add_var('testvar_write', 1.23, True)
connected_comm_protocol.write('testvar_write', 100, 2.04)
assert connected_comm_protocol.read('testvar_write', 100) == 2.04
def test_init_monitored_nodes(testconfig, demo_opcua_server):
demo_opcua_server.add_var("mon1", 0, False)
demo_opcua_server.add_var("mon2", 0, False)
demo_opcua_server.add_var("mon3", 0, False)
comm_protocol = OpcUaCommunication(testconfig)
def test_init_monitored_nodes(connected_comm_protocol, demo_opcua_server):
demo_opcua_server.add_var('mon1', 0, False)
demo_opcua_server.add_var('mon2', 0, False)
demo_opcua_server.add_var('mon3', 0, False)
with pytest.raises(OpcUaCommunicationIOError):
comm_protocol.init_monitored_nodes("mon1", 100)
with pytest.raises(OpcUaCommunicationIOError):
comm_protocol.init_monitored_nodes(["mon2", "mon3"], 100)
connected_comm_protocol.init_monitored_nodes('mon1', 100)
connected_comm_protocol.init_monitored_nodes(['mon2', 'mon3'], 100)
sleep(0.15)
comm_protocol.open()
try:
comm_protocol.init_monitored_nodes("mon1", 100)
comm_protocol.init_monitored_nodes(["mon2", "mon3"], 100)
finally:
comm_protocol.close()
def test_datachange(connected_comm_protocol, demo_opcua_server):
demo_opcua_server.add_var('test_datachange', 0.1, True)
connected_comm_protocol.init_monitored_nodes('test_datachange', 100)
sleep(0.5)
demo_opcua_server.add_var("test_datachange", 0.1, True)
connected_comm_protocol.init_monitored_nodes("test_datachange", 100)
sleep(0.05)
counter_before = connected_comm_protocol._sub_handler.change_counter
demo_opcua_server.set_var('test_datachange', 0.2)
assert demo_opcua_server.get_var('test_datachange') == 0.2
sleep(0.15)
demo_opcua_server.set_var("test_datachange", 0.2)
assert demo_opcua_server.get_var("test_datachange") == 0.2
sleep(0.05)
counter_after = connected_comm_protocol._sub_handler.change_counter
assert counter_after == counter_before + 1
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment