"""
Object Dictionary module
"""
import struct
from typing import Dict, Iterable, List, Optional, TextIO, Union
try:
from collections.abc import MutableMapping, Mapping
except ImportError:
from collections import MutableMapping, Mapping
import logging
from canopen.objectdictionary.datatypes import *
from canopen.objectdictionary.datatypes_24bit import Integer24, Unsigned24
logger = logging.getLogger(__name__)
def export_od(od, dest:Union[str,TextIO,None]=None, doc_type:Optional[str]=None):
""" Export :class: ObjectDictionary to a file.
:param od:
:class: ObjectDictionary object to be exported
:param dest:
export destination. filename, or file-like object or None.
if None, the document is returned as string
:param doc_type: type of document to export.
If a filename is given for dest, this default to the file extension.
Otherwise, this defaults to "eds"
:rtype: str or None
"""
doctypes = {"eds", "dcf"}
if isinstance(dest, str):
if doc_type is None:
for t in doctypes:
if dest.endswith(f".{t}"):
doc_type = t
break
if doc_type is None:
doc_type = "eds"
dest = open(dest, 'w')
assert doc_type in doctypes
if doc_type == "eds":
from canopen.objectdictionary import eds
return eds.export_eds(od, dest)
elif doc_type == "dcf":
from canopen.objectdictionary import eds
return eds.export_dcf(od, dest)
# If dest is opened in this fn, it should be closed
if type(dest) is str:
dest.close()
def import_od(
source: Union[str, TextIO, None],
node_id: Optional[int] = None,
) -> "ObjectDictionary":
"""Parse an EDS, DCF, or EPF file.
:param source:
Path to object dictionary file or a file like object or an EPF XML tree.
:return:
An Object Dictionary instance.
"""
if source is None:
return ObjectDictionary()
if hasattr(source, "read"):
# File like object
filename = source.name
elif hasattr(source, "tag"):
# XML tree, probably from an EPF file
filename = "od.epf"
else:
# Path to file
filename = source
suffix = filename[filename.rfind("."):].lower()
if suffix in (".eds", ".dcf"):
from canopen.objectdictionary import eds
return eds.import_eds(source, node_id)
elif suffix == ".epf":
from canopen.objectdictionary import epf
return epf.import_epf(source)
else:
raise NotImplementedError("No support for this format")
[docs]
class ObjectDictionary(MutableMapping):
"""Representation of the object dictionary as a Python dictionary."""
def __init__(self):
self.indices = {}
self.names = {}
self.comments = ""
#: Default bitrate if specified by file
self.bitrate: Optional[int] = None
#: Node ID if specified by file
self.node_id: Optional[int] = None
#: Some information about the device
self.device_information = DeviceInformation()
def __getitem__(
self, index: Union[int, str]
) -> Union["ODArray", "ODRecord", "ODVariable"]:
"""Get object from object dictionary by name or index."""
item = self.names.get(index) or self.indices.get(index)
if item is None:
name = "0x%X" % index if isinstance(index, int) else index
raise KeyError("%s was not found in Object Dictionary" % name)
return item
def __setitem__(
self, index: Union[int, str], obj: Union["ODArray", "ODRecord", "ODVariable"]
):
assert index == obj.index or index == obj.name
self.add_object(obj)
def __delitem__(self, index: Union[int, str]):
obj = self[index]
del self.indices[obj.index]
del self.names[obj.name]
def __iter__(self) -> Iterable[int]:
return iter(sorted(self.indices))
def __len__(self) -> int:
return len(self.indices)
def __contains__(self, index: Union[int, str]):
return index in self.names or index in self.indices
[docs]
def add_object(self, obj: Union["ODArray", "ODRecord", "ODVariable"]) -> None:
"""Add object to the object dictionary.
:param obj:
Should be either one of
:class:`~canopen.objectdictionary.ODVariable`,
:class:`~canopen.objectdictionary.ODRecord`, or
:class:`~canopen.objectdictionary.ODArray`.
"""
obj.parent = self
self.indices[obj.index] = obj
self.names[obj.name] = obj
[docs]
def get_variable(
self, index: Union[int, str], subindex: int = 0
) -> Optional["ODVariable"]:
"""Get the variable object at specified index (and subindex if applicable).
:return: ODVariable if found, else `None`
"""
obj = self.get(index)
if isinstance(obj, ODVariable):
return obj
elif isinstance(obj, (ODRecord, ODArray)):
return obj.get(subindex)
[docs]
class ODRecord(MutableMapping):
"""Groups multiple :class:`~canopen.objectdictionary.ODVariable` objects using
subindices.
"""
#: Description for the whole record
description = ""
def __init__(self, name: str, index: int):
#: The :class:`~canopen.ObjectDictionary` owning the record.
self.parent: Optional[ObjectDictionary] = None
#: 16-bit address of the record
self.index = index
#: Name of record
self.name = name
#: Storage location of index
self.storage_location = None
self.subindices = {}
self.names = {}
def __getitem__(self, subindex: Union[int, str]) -> "ODVariable":
item = self.names.get(subindex) or self.subindices.get(subindex)
if item is None:
raise KeyError("Subindex %s was not found" % subindex)
return item
def __setitem__(self, subindex: Union[int, str], var: "ODVariable"):
assert subindex == var.subindex
self.add_member(var)
def __delitem__(self, subindex: Union[int, str]):
var = self[subindex]
del self.subindices[var.subindex]
del self.names[var.name]
def __len__(self) -> int:
return len(self.subindices)
def __iter__(self) -> Iterable[int]:
return iter(sorted(self.subindices))
def __contains__(self, subindex: Union[int, str]) -> bool:
return subindex in self.names or subindex in self.subindices
def __eq__(self, other: "ODRecord") -> bool:
return self.index == other.index
[docs]
def add_member(self, variable: "ODVariable") -> None:
"""Adds a :class:`~canopen.objectdictionary.ODVariable` to the record."""
variable.parent = self
self.subindices[variable.subindex] = variable
self.names[variable.name] = variable
[docs]
class ODArray(Mapping):
"""An array of :class:`~canopen.objectdictionary.ODVariable` objects using
subindices.
Actual length of array must be read from the node using SDO.
"""
#: Description for the whole array
description = ""
def __init__(self, name: str, index: int):
#: The :class:`~canopen.ObjectDictionary` owning the record.
self.parent = None
#: 16-bit address of the array
self.index = index
#: Name of array
self.name = name
#: Storage location of index
self.storage_location = None
self.subindices = {}
self.names = {}
def __getitem__(self, subindex: Union[int, str]) -> "ODVariable":
var = self.names.get(subindex) or self.subindices.get(subindex)
if var is not None:
# This subindex is defined
pass
elif isinstance(subindex, int) and 0 < subindex < 256:
# Create a new variable based on first array item
template = self.subindices[1]
name = "%s_%x" % (template.name, subindex)
var = ODVariable(name, self.index, subindex)
var.parent = self
for attr in ("data_type", "unit", "factor", "min", "max", "default",
"access_type", "description", "value_descriptions",
"bit_definitions", "storage_location"):
if attr in template.__dict__:
var.__dict__[attr] = template.__dict__[attr]
else:
raise KeyError("Could not find subindex %r" % subindex)
return var
def __len__(self) -> int:
return len(self.subindices)
def __iter__(self) -> Iterable[int]:
return iter(sorted(self.subindices))
def __eq__(self, other: "ODArray") -> bool:
return self.index == other.index
[docs]
def add_member(self, variable: "ODVariable") -> None:
"""Adds a :class:`~canopen.objectdictionary.ODVariable` to the record."""
variable.parent = self
self.subindices[variable.subindex] = variable
self.names[variable.name] = variable
[docs]
class ODVariable:
"""Simple variable."""
STRUCT_TYPES = {
BOOLEAN: struct.Struct("?"),
INTEGER8: struct.Struct("b"),
INTEGER16: struct.Struct("<h"),
INTEGER24: Integer24(),
INTEGER32: struct.Struct("<l"),
INTEGER64: struct.Struct("<q"),
UNSIGNED8: struct.Struct("B"),
UNSIGNED16: struct.Struct("<H"),
UNSIGNED24: Unsigned24(),
UNSIGNED32: struct.Struct("<L"),
UNSIGNED64: struct.Struct("<Q"),
REAL32: struct.Struct("<f"),
REAL64: struct.Struct("<d")
}
def __init__(self, name: str, index: int, subindex: int = 0):
#: The :class:`~canopen.ObjectDictionary`,
#: :class:`~canopen.objectdictionary.ODRecord` or
#: :class:`~canopen.objectdictionary.ODArray` owning the variable
self.parent = None
#: 16-bit address of the object in the dictionary
self.index = index
#: 8-bit sub-index of the object in the dictionary
self.subindex = subindex
#: String representation of the variable
self.name = name
#: Physical unit
self.unit: str = ""
#: Factor between physical unit and integer value
self.factor: float = 1
#: Minimum allowed value
self.min: Optional[int] = None
#: Maximum allowed value
self.max: Optional[int] = None
#: Default value at start-up
self.default: Optional[int] = None
#: Is the default value relative to the node-ID (only applies to COB-IDs)
self.relative = False
#: The value of this variable stored in the object dictionary
self.value: Optional[int] = None
#: Data type according to the standard as an :class:`int`
self.data_type: Optional[int] = None
#: Access type, should be "rw", "ro", "wo", or "const"
self.access_type: str = "rw"
#: Description of variable
self.description: str = ""
#: Dictionary of value descriptions
self.value_descriptions: Dict[int, str] = {}
#: Dictionary of bitfield definitions
self.bit_definitions: Dict[str, List[int]] = {}
#: Storage location of index
self.storage_location = None
#: Can this variable be mapped to a PDO
self.pdo_mappable = False
def __eq__(self, other: "ODVariable") -> bool:
return (self.index == other.index and
self.subindex == other.subindex)
def __len__(self) -> int:
if self.data_type in self.STRUCT_TYPES:
return self.STRUCT_TYPES[self.data_type].size * 8
else:
return 8
@property
def writable(self) -> bool:
return "w" in self.access_type
@property
def readable(self) -> bool:
return "r" in self.access_type or self.access_type == "const"
[docs]
def add_value_description(self, value: int, descr: str) -> None:
"""Associate a value with a string description.
:param value: Value to describe
:param desc: Description of value
"""
self.value_descriptions[value] = descr
[docs]
def add_bit_definition(self, name: str, bits: List[int]) -> None:
"""Associate bit(s) with a string description.
:param name: Name of bit(s)
:param bits: List of bits as integers
"""
self.bit_definitions[name] = bits
def decode_raw(self, data: bytes) -> Union[int, float, str, bytes, bytearray]:
if self.data_type == VISIBLE_STRING:
return data.rstrip(b"\x00").decode("ascii", errors="ignore")
elif self.data_type == UNICODE_STRING:
# Is this correct?
return data.rstrip(b"\x00").decode("utf_16_le", errors="ignore")
elif self.data_type in self.STRUCT_TYPES:
try:
value, = self.STRUCT_TYPES[self.data_type].unpack(data)
return value
except struct.error:
raise ObjectDictionaryError(
"Mismatch between expected and actual data size")
else:
# Just return the data as is
return data
def encode_raw(self, value: Union[int, float, str, bytes, bytearray]) -> bytes:
if isinstance(value, (bytes, bytearray)):
return value
elif self.data_type == VISIBLE_STRING:
return value.encode("ascii")
elif self.data_type == UNICODE_STRING:
# Is this correct?
return value.encode("utf_16_le")
elif self.data_type in self.STRUCT_TYPES:
if self.data_type in INTEGER_TYPES:
value = int(value)
if self.data_type in NUMBER_TYPES:
if self.min is not None and value < self.min:
logger.warning(
"Value %d is less than min value %d", value, self.min)
if self.max is not None and value > self.max:
logger.warning(
"Value %d is greater than max value %d",
value,
self.max)
try:
return self.STRUCT_TYPES[self.data_type].pack(value)
except struct.error:
raise ValueError("Value does not fit in specified type")
elif self.data_type is None:
raise ObjectDictionaryError("Data type has not been specified")
else:
raise TypeError(
"Do not know how to encode %r to data type %Xh" % (
value, self.data_type))
def decode_phys(self, value: int) -> Union[int, bool, float, str, bytes]:
if self.data_type in INTEGER_TYPES:
value *= self.factor
return value
def encode_phys(self, value: Union[int, bool, float, str, bytes]) -> int:
if self.data_type in INTEGER_TYPES:
value /= self.factor
value = int(round(value))
return value
def decode_desc(self, value: int) -> str:
if not self.value_descriptions:
raise ObjectDictionaryError("No value descriptions exist")
elif value not in self.value_descriptions:
raise ObjectDictionaryError(
"No value description exists for %d" % value)
else:
return self.value_descriptions[value]
def encode_desc(self, desc: str) -> int:
if not self.value_descriptions:
raise ObjectDictionaryError("No value descriptions exist")
else:
for value, description in self.value_descriptions.items():
if description == desc:
return value
valid_values = ", ".join(self.value_descriptions.values())
error_text = "No value corresponds to '%s'. Valid values are: %s"
raise ValueError(error_text % (desc, valid_values))
def decode_bits(self, value: int, bits: List[int]) -> int:
try:
bits = self.bit_definitions[bits]
except (TypeError, KeyError):
pass
mask = 0
for bit in bits:
mask |= 1 << bit
return (value & mask) >> min(bits)
def encode_bits(self, original_value: int, bits: List[int], bit_value: int):
try:
bits = self.bit_definitions[bits]
except (TypeError, KeyError):
pass
temp = original_value
mask = 0
for bit in bits:
mask |= 1 << bit
temp &= ~mask
temp |= bit_value << min(bits)
return temp
class DeviceInformation:
def __init__(self):
self.allowed_baudrates = set()
self.vendor_name:Optional[str] = None
self.vendor_number:Optional[int] = None
self.product_name:Optional[str] = None
self.product_number:Optional[int] = None
self.revision_number:Optional[int] = None
self.order_code:Optional[str] = None
self.simple_boot_up_master:Optional[bool] = None
self.simple_boot_up_slave:Optional[bool] = None
self.granularity:Optional[int] = None
self.dynamic_channels_supported:Optional[bool] = None
self.group_messaging:Optional[bool] = None
self.nr_of_RXPDO:Optional[bool] = None
self.nr_of_TXPDO:Optional[bool] = None
self.LSS_supported:Optional[bool] = None
[docs]
class ObjectDictionaryError(Exception):
"""Unsupported operation with the current Object Dictionary."""
# Compatibility for old names
Record = ODRecord
Array = ODArray
Variable = ODVariable