Source code for canopen.node.remote

from __future__ import annotations

import logging
from typing import TextIO, Union

import canopen.network
from canopen.emcy import EmcyConsumer
from canopen.nmt import NmtMaster
from canopen.node.base import BaseNode
from canopen.objectdictionary import ODArray, ODRecord, ODVariable, ObjectDictionary
from canopen.pdo import PDO, RPDO, TPDO
from canopen.sdo import SdoAbortedError, SdoClient, SdoCommunicationError


logger = logging.getLogger(__name__)


[docs] class RemoteNode(BaseNode): """A CANopen remote node. :param node_id: Node ID (set to None or 0 if specified by object dictionary) :param object_dictionary: Object dictionary as either a path to a file, an ``ObjectDictionary`` or a file like object. :param load_od: Enable the Object Dictionary to be sent through SDO's to the remote node at startup. """ def __init__( self, node_id: int, object_dictionary: Union[ObjectDictionary, str, TextIO], load_od: bool = False, ): super(RemoteNode, self).__init__(node_id, object_dictionary) #: Enable WORKAROUND for reversed PDO mapping entries self.curtis_hack = False self.sdo_channels = [] self.sdo = self.add_sdo(0x600 + self.id, 0x580 + self.id) self.tpdo = TPDO(self) self.rpdo = RPDO(self) self.pdo = PDO(self, self.rpdo, self.tpdo) self.nmt = NmtMaster(self.id) self.emcy = EmcyConsumer() if load_od: self.load_configuration() def associate_network(self, network: canopen.network.Network): if self.has_network(): raise RuntimeError("Node is already associated with a network") self.network = network self.sdo.network = network self.pdo.network = network self.tpdo.network = network self.rpdo.network = network self.nmt.network = network for sdo in self.sdo_channels: network.subscribe(sdo.tx_cobid, sdo.on_response) network.subscribe(0x700 + self.id, self.nmt.on_heartbeat) network.subscribe(0x80 + self.id, self.emcy.on_emcy) network.subscribe(0, self.nmt.on_command) def remove_network(self) -> None: if not self.has_network(): return for sdo in self.sdo_channels: self.network.unsubscribe(sdo.tx_cobid, sdo.on_response) self.network.unsubscribe(0x700 + self.id, self.nmt.on_heartbeat) self.network.unsubscribe(0x80 + self.id, self.emcy.on_emcy) self.network.unsubscribe(0, self.nmt.on_command) self.network = canopen.network._UNINITIALIZED_NETWORK self.sdo.network = canopen.network._UNINITIALIZED_NETWORK self.pdo.network = canopen.network._UNINITIALIZED_NETWORK self.tpdo.network = canopen.network._UNINITIALIZED_NETWORK self.rpdo.network = canopen.network._UNINITIALIZED_NETWORK self.nmt.network = canopen.network._UNINITIALIZED_NETWORK
[docs] def add_sdo(self, rx_cobid, tx_cobid): """Add an additional SDO channel. The SDO client will be added to :attr:`sdo_channels`. :param int rx_cobid: COB-ID that the server receives on :param int tx_cobid: COB-ID that the server responds with :return: The SDO client created :rtype: canopen.sdo.SdoClient """ client = SdoClient(rx_cobid, tx_cobid, self.object_dictionary) self.sdo_channels.append(client) if self.has_network(): self.network.subscribe(client.tx_cobid, client.on_response) return client
[docs] def store(self, subindex=1): """Store parameters in non-volatile memory. :param int subindex: 1 = All parameters\n 2 = Communication related parameters\n 3 = Application related parameters\n 4 - 127 = Manufacturer specific """ self.sdo.download(0x1010, subindex, b"save")
[docs] def restore(self, subindex=1): """Restore default parameters. :param int subindex: 1 = All parameters\n 2 = Communication related parameters\n 3 = Application related parameters\n 4 - 127 = Manufacturer specific """ self.sdo.download(0x1011, subindex, b"load")
def __load_configuration_helper(self, index, subindex, name, value): """Helper function to send SDOs to the remote node :param index: Object index :param subindex: Object sub-index (if it does not exist e should be None) :param name: Object name :param value: Value to set in the object """ try: if subindex is not None: logger.info('SDO [0x%04X][0x%02X]: %s: %#06x', index, subindex, name, value) self.sdo[index][subindex].raw = value else: self.sdo[index].raw = value logger.info('SDO [0x%04X]: %s: %#06x', index, name, value) except SdoCommunicationError as e: logger.warning(str(e)) except SdoAbortedError as e: # WORKAROUND for broken implementations: the SDO is set but the error # "Attempt to write a read-only object" is raised any way. if e.code != 0x06010002: # Abort codes other than "Attempt to write a read-only object" # should still be reported. logger.warning('[ERROR SETTING object 0x%04X:%02X] %s', index, subindex, e) raise
[docs] def load_configuration(self) -> None: """Load the configuration of the node from the Object Dictionary. Iterate through all objects in the Object Dictionary and download the values to the remote node via SDO. To avoid PDO mapping conflicts, PDO-related objects are handled through the methods :meth:`canopen.pdo.PdoBase.read` and :meth:`canopen.pdo.PdoBase.save`. """ # First apply PDO configuration from object dictionary self.pdo.read(from_od=True) self.pdo.save() # Now apply all other records in object dictionary for obj in self.object_dictionary.values(): if 0x1400 <= obj.index < 0x1c00: # Ignore PDO related objects continue if isinstance(obj, ODRecord) or isinstance(obj, ODArray): for subobj in obj.values(): if isinstance(subobj, ODVariable) and subobj.writable and (subobj.value is not None): self.__load_configuration_helper(subobj.index, subobj.subindex, subobj.name, subobj.value) elif isinstance(obj, ODVariable) and obj.writable and (obj.value is not None): self.__load_configuration_helper(obj.index, None, obj.name, obj.value)