Source code for canopen.sdo.base

import binascii
from typing import Iterable, Union
try:
    from collections.abc import Mapping
except ImportError:
    from collections import Mapping

from canopen import objectdictionary
from canopen.objectdictionary import ObjectDictionary
from canopen import variable


class CrcXmodem:
    """Mimics CrcXmodem from crccheck."""

    def __init__(self):
        self._value = 0

    def process(self, data):
        self._value = binascii.crc_hqx(data, self._value)

    def final(self):
        return self._value


class SdoBase(Mapping):

    #: The CRC algorithm used for block transfers
    crc_cls = CrcXmodem

    def __init__(
        self,
        rx_cobid: int,
        tx_cobid: int,
        od: ObjectDictionary,
    ):
        """
        :param rx_cobid:
            COB-ID that the server receives on (usually 0x600 + node ID)
        :param tx_cobid:
            COB-ID that the server responds with (usually 0x580 + node ID)
        :param od:
            Object Dictionary to use for communication
        """
        self.rx_cobid = rx_cobid
        self.tx_cobid = tx_cobid
        self.network = None
        self.od = od

    def __getitem__(
        self, index: Union[str, int]
    ) -> Union["SdoVariable", "SdoArray", "SdoRecord"]:
        entry = self.od[index]
        if isinstance(entry, objectdictionary.ODVariable):
            return SdoVariable(self, entry)
        elif isinstance(entry, objectdictionary.ODArray):
            return SdoArray(self, entry)
        elif isinstance(entry, objectdictionary.ODRecord):
            return SdoRecord(self, entry)

    def __iter__(self) -> Iterable[int]:
        return iter(self.od)

    def __len__(self) -> int:
        return len(self.od)

    def __contains__(self, key: Union[int, str]) -> bool:
        return key in self.od

    def upload(self, index: int, subindex: int) -> bytes:
        raise NotImplementedError()

    def download(
        self,
        index: int,
        subindex: int,
        data: bytes,
        force_segment: bool = False,
    ) -> None:
        raise NotImplementedError()


[docs] class SdoRecord(Mapping): def __init__(self, sdo_node: SdoBase, od: ObjectDictionary): self.sdo_node = sdo_node self.od = od def __getitem__(self, subindex: Union[int, str]) -> "SdoVariable": return SdoVariable(self.sdo_node, self.od[subindex]) def __iter__(self) -> Iterable[int]: return iter(self.od) def __len__(self) -> int: return len(self.od) def __contains__(self, subindex: Union[int, str]) -> bool: return subindex in self.od
[docs] class SdoArray(Mapping): def __init__(self, sdo_node: SdoBase, od: ObjectDictionary): self.sdo_node = sdo_node self.od = od def __getitem__(self, subindex: Union[int, str]) -> "SdoVariable": return SdoVariable(self.sdo_node, self.od[subindex]) def __iter__(self) -> Iterable[int]: return iter(range(1, len(self) + 1)) def __len__(self) -> int: return self[0].raw def __contains__(self, subindex: int) -> bool: return 0 <= subindex <= len(self)
[docs] class SdoVariable(variable.Variable): """Access object dictionary variable values using SDO protocol.""" def __init__(self, sdo_node: SdoBase, od: ObjectDictionary): self.sdo_node = sdo_node variable.Variable.__init__(self, od) def get_data(self) -> bytes: return self.sdo_node.upload(self.od.index, self.od.subindex) def set_data(self, data: bytes): force_segment = self.od.data_type == objectdictionary.DOMAIN self.sdo_node.download(self.od.index, self.od.subindex, data, force_segment)
[docs] def open(self, mode="rb", encoding="ascii", buffering=1024, size=None, block_transfer=False, request_crc_support=True): """Open the data stream as a file like object. :param str mode: ========= ========================================================== Character Meaning --------- ---------------------------------------------------------- 'r' open for reading (default) 'w' open for writing 'b' binary mode (default) 't' text mode ========= ========================================================== :param str encoding: The str name of the encoding used to decode or encode the file. This will only be used in text mode. :param int buffering: An optional integer used to set the buffering policy. Pass 0 to switch buffering off (only allowed in binary mode), 1 to select line buffering (only usable in text mode), and an integer > 1 to indicate the size in bytes of a fixed-size chunk buffer. :param int size: Size of data to that will be transmitted. :param bool block_transfer: If block transfer should be used. :param bool request_crc_support: If crc calculation should be requested when using block transfer :returns: A file like object. """ return self.sdo_node.open(self.od.index, self.od.subindex, mode, encoding, buffering, size, block_transfer, request_crc_support=request_crc_support)
# For compatibility Record = SdoRecord Array = SdoArray Variable = SdoVariable