"""
Object Dictionary module
"""
from __future__ import annotations
import logging
import struct
from collections.abc import Mapping, MutableMapping
from typing import Dict, Iterator, List, Optional, TextIO, Union
from canopen.objectdictionary.datatypes import *
from canopen.objectdictionary.datatypes import IntegerN, UnsignedN
from canopen.utils import pretty_index
logger = logging.getLogger(__name__)
[docs]
def export_od(
od: ObjectDictionary,
dest: Union[str, TextIO, None] = None,
doc_type: Optional[str] = None
) -> None:
"""Export an object dictionary.
:param od:
The object dictionary to be exported.
:param dest:
The export destination as a filename, a file-like object, or ``None``.
If ``None``, the document is written to :data:`sys.stdout`.
:param doc_type:
The type of document to export.
If *dest* is a file-like object or ``None``,
*doc_type* must be explicitly provided.
If *dest* is a filename and its extension is ``.eds`` or ``.dcf``,
*doc_type* defaults to that extension (the preceeding dot excluded);
else, it defaults to ``eds``.
:raises ValueError:
When exporting to an unknown format.
"""
supported_doctypes = {"eds", "dcf"}
if doc_type and doc_type not in supported_doctypes:
supported = ", ".join(supported_doctypes)
raise ValueError(
f"Cannot export to the {doc_type!r} format; "
f"supported formats: {supported}"
)
opened_here = False
try:
if isinstance(dest, str):
if doc_type is None:
for t in supported_doctypes:
if dest.endswith(f".{t}"):
doc_type = t
break
else:
doc_type = "eds"
dest = open(dest, 'w')
opened_here = True
if doc_type == "eds":
from canopen.objectdictionary import eds
return eds.export_eds(od, dest)
elif doc_type == "dcf":
from canopen.objectdictionary import eds
return eds.export_dcf(od, dest)
finally:
# If dest is opened in this fn, it should be closed
if opened_here:
dest.close()
[docs]
def import_od(
source: Union[str, TextIO, None],
node_id: Optional[int] = None,
) -> ObjectDictionary:
"""Parse an EDS, DCF, or EPF file.
:param source:
The path to object dictionary file, a file like object, or an EPF XML tree.
:param node_id:
For EDS and DCF files, the node ID to use.
For other formats, this parameter is ignored.
:raises ObjectDictionaryError:
For object dictionary errors and inconsistencies.
:raises ValueError:
When passed a file of an unknown format.
"""
if source is None:
return ObjectDictionary()
if hasattr(source, "read"):
# File like object
filename = source.name
elif hasattr(source, "tag"):
# XML tree, probably from an EPF file
filename = "od.epf"
else:
# Path to file
filename = source
suffix = filename[filename.rfind("."):].lower()
if suffix in (".eds", ".dcf"):
from canopen.objectdictionary import eds
return eds.import_eds(source, node_id)
elif suffix == ".epf":
from canopen.objectdictionary import epf
return epf.import_epf(source)
else:
doc_type = suffix[1:]
allowed = ", ".join(["eds", "dcf", "epf"])
raise ValueError(
f"Cannot import from the {doc_type!r} format; "
f"supported formats: {allowed}"
)
[docs]
class ObjectDictionary(MutableMapping):
"""Representation of the object dictionary as a Python dictionary."""
def __init__(self):
self.indices = {}
self.names = {}
self.comments = ""
#: Default bitrate if specified by file
self.bitrate: Optional[int] = None
#: Node ID if specified by file
self.node_id: Optional[int] = None
#: Some information about the device
self.device_information = DeviceInformation()
def __getitem__(
self, index: Union[int, str]
) -> Union[ODArray, ODRecord, ODVariable]:
"""Get object from object dictionary by name or index."""
item = self.names.get(index)
if item is None:
item = self.indices.get(index)
if item is None:
if isinstance(index, str) and '.' in index:
idx, sub = index.split('.', maxsplit=1)
return self[idx][sub]
raise KeyError(f"{pretty_index(index)} was not found in Object Dictionary")
return item
def __setitem__(
self, index: Union[int, str], obj: Union[ODArray, ODRecord, ODVariable]
):
assert index == obj.index or index == obj.name
self.add_object(obj)
def __delitem__(self, index: Union[int, str]):
obj = self[index]
del self.indices[obj.index]
del self.names[obj.name]
def __iter__(self) -> Iterator[int]:
return iter(sorted(self.indices))
def __len__(self) -> int:
return len(self.indices)
def __contains__(self, index: Union[int, str]):
return index in self.names or index in self.indices
[docs]
def add_object(self, obj: Union[ODArray, ODRecord, ODVariable]) -> None:
"""Add object to the object dictionary.
:param obj:
Should be either one of
:class:`~canopen.objectdictionary.ODVariable`,
:class:`~canopen.objectdictionary.ODRecord`, or
:class:`~canopen.objectdictionary.ODArray`.
"""
obj.parent = self
self.indices[obj.index] = obj
self.names[obj.name] = obj
[docs]
def get_variable(
self, index: Union[int, str], subindex: int = 0
) -> Optional[ODVariable]:
"""Get the variable object at specified index (and subindex if applicable).
:return: ODVariable if found, else `None`
"""
obj = self.get(index)
if isinstance(obj, ODVariable):
return obj
elif isinstance(obj, (ODRecord, ODArray)):
return obj.get(subindex)
[docs]
class ODRecord(MutableMapping):
"""Groups multiple :class:`~canopen.objectdictionary.ODVariable` objects using
subindices.
"""
#: Description for the whole record
description = ""
def __init__(self, name: str, index: int):
#: The :class:`~canopen.ObjectDictionary` owning the record.
self.parent: Optional[ObjectDictionary] = None
#: 16-bit address of the record
self.index = index
#: Name of record
self.name = name
#: Storage location of index
self.storage_location = None
self.subindices = {}
self.names = {}
def __repr__(self) -> str:
return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>"
def __getitem__(self, subindex: Union[int, str]) -> ODVariable:
item = self.names.get(subindex) or self.subindices.get(subindex)
if item is None:
raise KeyError(f"Subindex {pretty_index(None, subindex)} was not found")
return item
def __setitem__(self, subindex: Union[int, str], var: ODVariable):
assert subindex == var.subindex
self.add_member(var)
def __delitem__(self, subindex: Union[int, str]):
var = self[subindex]
del self.subindices[var.subindex]
del self.names[var.name]
def __len__(self) -> int:
return len(self.subindices)
def __iter__(self) -> Iterator[int]:
return iter(sorted(self.subindices))
def __contains__(self, subindex: Union[int, str]) -> bool:
return subindex in self.names or subindex in self.subindices
def __eq__(self, other: ODRecord) -> bool:
return self.index == other.index
[docs]
def add_member(self, variable: ODVariable) -> None:
"""Adds a :class:`~canopen.objectdictionary.ODVariable` to the record."""
variable.parent = self
self.subindices[variable.subindex] = variable
self.names[variable.name] = variable
[docs]
class ODArray(Mapping):
"""An array of :class:`~canopen.objectdictionary.ODVariable` objects using
subindices.
Actual length of array must be read from the node using SDO.
"""
#: Description for the whole array
description = ""
def __init__(self, name: str, index: int):
#: The :class:`~canopen.ObjectDictionary` owning the record.
self.parent = None
#: 16-bit address of the array
self.index = index
#: Name of array
self.name = name
#: Storage location of index
self.storage_location = None
self.subindices = {}
self.names = {}
def __repr__(self) -> str:
return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>"
def __getitem__(self, subindex: Union[int, str]) -> ODVariable:
var = self.names.get(subindex) or self.subindices.get(subindex)
if var is not None:
# This subindex is defined
pass
elif isinstance(subindex, int) and 0 < subindex < 256:
# Create a new variable based on first array item
template = self.subindices[1]
name = f"{template.name}_{subindex:x}"
var = ODVariable(name, self.index, subindex)
var.parent = self
for attr in ("data_type", "unit", "factor", "min", "max", "default",
"access_type", "description", "value_descriptions",
"bit_definitions", "storage_location"):
if attr in template.__dict__:
var.__dict__[attr] = template.__dict__[attr]
else:
raise KeyError(f"Could not find subindex {pretty_index(None, subindex)}")
return var
def __len__(self) -> int:
return len(self.subindices)
def __iter__(self) -> Iterator[int]:
return iter(sorted(self.subindices))
def __eq__(self, other: ODArray) -> bool:
return self.index == other.index
[docs]
def add_member(self, variable: ODVariable) -> None:
"""Adds a :class:`~canopen.objectdictionary.ODVariable` to the record."""
variable.parent = self
self.subindices[variable.subindex] = variable
self.names[variable.name] = variable
[docs]
class ODVariable:
"""Simple variable."""
STRUCT_TYPES: dict[int, struct.Struct] = {
# Use struct module to pack/unpack data where possible and use the
# custom IntegerN and UnsignedN classes for the special data types.
BOOLEAN: struct.Struct("?"),
INTEGER8: struct.Struct("b"),
INTEGER16: struct.Struct("<h"),
INTEGER24: IntegerN(24),
INTEGER32: struct.Struct("<l"),
INTEGER40: IntegerN(40),
INTEGER48: IntegerN(48),
INTEGER56: IntegerN(56),
INTEGER64: struct.Struct("<q"),
UNSIGNED8: struct.Struct("B"),
UNSIGNED16: struct.Struct("<H"),
UNSIGNED24: UnsignedN(24),
UNSIGNED32: struct.Struct("<L"),
UNSIGNED40: UnsignedN(40),
UNSIGNED48: UnsignedN(48),
UNSIGNED56: UnsignedN(56),
UNSIGNED64: struct.Struct("<Q"),
REAL32: struct.Struct("<f"),
REAL64: struct.Struct("<d")
}
def __init__(self, name: str, index: int, subindex: int = 0):
#: The :class:`~canopen.ObjectDictionary`,
#: :class:`~canopen.objectdictionary.ODRecord` or
#: :class:`~canopen.objectdictionary.ODArray` owning the variable
self.parent = None
#: 16-bit address of the object in the dictionary
self.index = index
#: 8-bit sub-index of the object in the dictionary
self.subindex = subindex
#: String representation of the variable
self.name = name
#: Physical unit
self.unit: str = ""
#: Factor between physical unit and integer value
self.factor: float = 1
#: Minimum allowed value
self.min: Optional[int] = None
#: Maximum allowed value
self.max: Optional[int] = None
#: Default value at start-up
self.default: Optional[int] = None
#: Is the default value relative to the node-ID (only applies to COB-IDs)
self.relative = False
#: The value of this variable stored in the object dictionary
self.value: Optional[int] = None
#: Data type according to the standard as an :class:`int`
self.data_type: Optional[int] = None
#: Access type, should be "rw", "ro", "wo", or "const"
self.access_type: str = "rw"
#: Description of variable
self.description: str = ""
#: Dictionary of value descriptions
self.value_descriptions: Dict[int, str] = {}
#: Dictionary of bitfield definitions
self.bit_definitions: Dict[str, List[int]] = {}
#: Storage location of index
self.storage_location = None
#: Can this variable be mapped to a PDO
self.pdo_mappable = False
def __repr__(self) -> str:
subindex = self.subindex if isinstance(self.parent, (ODRecord, ODArray)) else None
return f"<{type(self).__qualname__} {self.qualname!r} at {pretty_index(self.index, subindex)}>"
@property
def qualname(self) -> str:
"""Fully qualified name of the variable. If the variable is a subindex
of a record or array, the name will be prefixed with the parent's name."""
if isinstance(self.parent, (ODRecord, ODArray)):
return f"{self.parent.name}.{self.name}"
return self.name
def __eq__(self, other: ODVariable) -> bool:
return (self.index == other.index and
self.subindex == other.subindex)
def __len__(self) -> int:
if self.data_type in self.STRUCT_TYPES:
return self.STRUCT_TYPES[self.data_type].size * 8
else:
return 8
@property
def writable(self) -> bool:
return "w" in self.access_type
@property
def readable(self) -> bool:
return "r" in self.access_type or self.access_type == "const"
[docs]
def add_value_description(self, value: int, descr: str) -> None:
"""Associate a value with a string description.
:param value: Value to describe
:param desc: Description of value
"""
self.value_descriptions[value] = descr
[docs]
def add_bit_definition(self, name: str, bits: List[int]) -> None:
"""Associate bit(s) with a string description.
:param name: Name of bit(s)
:param bits: List of bits as integers
"""
self.bit_definitions[name] = bits
@property
def fixed_size(self) -> bool:
"""Indicate whether the amount of needed data is known in advance."""
# Only for types which we parse using a structure.
return self.data_type in self.STRUCT_TYPES
def decode_raw(self, data: bytes) -> Union[int, float, str, bytes, bytearray]:
if self.data_type == VISIBLE_STRING:
# Strip any trailing NUL characters from C-based systems
return data.decode("ascii", errors="ignore").rstrip("\x00")
elif self.data_type == UNICODE_STRING:
# The CANopen standard does not specify the encoding. This
# library assumes UTF-16, being the most common two-byte encoding format.
# Strip any trailing NUL characters from C-based systems
return data.decode("utf_16_le", errors="ignore").rstrip("\x00")
elif self.data_type in self.STRUCT_TYPES:
try:
value, = self.STRUCT_TYPES[self.data_type].unpack(data)
return value
except struct.error:
raise ObjectDictionaryError(
"Mismatch between expected and actual data size")
else:
# Just return the data as is
return data
def encode_raw(self, value: Union[int, float, str, bytes, bytearray]) -> bytes:
if isinstance(value, (bytes, bytearray)):
return value
elif self.data_type == VISIBLE_STRING:
return value.encode("ascii")
elif self.data_type == UNICODE_STRING:
return value.encode("utf_16_le")
elif self.data_type in (DOMAIN, OCTET_STRING):
return bytes(value)
elif self.data_type in self.STRUCT_TYPES:
if self.data_type in INTEGER_TYPES:
value = int(value)
if self.data_type in NUMBER_TYPES:
if self.min is not None and value < self.min:
logger.warning(
"Value %d is less than min value %d", value, self.min)
if self.max is not None and value > self.max:
logger.warning(
"Value %d is greater than max value %d",
value, self.max)
try:
return self.STRUCT_TYPES[self.data_type].pack(value)
except struct.error:
raise ValueError("Value does not fit in specified type")
elif self.data_type is None:
raise ObjectDictionaryError("Data type has not been specified")
else:
raise TypeError(
f"Do not know how to encode {value!r} to data type 0x{self.data_type:X}")
def decode_phys(self, value: int) -> Union[int, bool, float, str, bytes]:
if self.data_type in INTEGER_TYPES:
value *= self.factor
return value
def encode_phys(self, value: Union[int, bool, float, str, bytes]) -> int:
if self.data_type in INTEGER_TYPES:
value /= self.factor
value = int(round(value))
return value
def decode_desc(self, value: int) -> str:
if not self.value_descriptions:
raise ObjectDictionaryError("No value descriptions exist")
elif value not in self.value_descriptions:
raise ObjectDictionaryError(
f"No value description exists for {value}")
else:
return self.value_descriptions[value]
def encode_desc(self, desc: str) -> int:
if not self.value_descriptions:
raise ObjectDictionaryError("No value descriptions exist")
else:
for value, description in self.value_descriptions.items():
if description == desc:
return value
valid_values = ", ".join(self.value_descriptions.values())
raise ValueError(
f"No value corresponds to '{desc}'. Valid values are: {valid_values}")
def decode_bits(self, value: int, bits: List[int]) -> int:
try:
bits = self.bit_definitions[bits]
except (TypeError, KeyError):
pass
mask = 0
for bit in bits:
mask |= 1 << bit
return (value & mask) >> min(bits)
def encode_bits(self, original_value: int, bits: List[int], bit_value: int):
try:
bits = self.bit_definitions[bits]
except (TypeError, KeyError):
pass
temp = original_value
mask = 0
for bit in bits:
mask |= 1 << bit
temp &= ~mask
temp |= bit_value << min(bits)
return temp
class DeviceInformation:
def __init__(self):
self.allowed_baudrates = set()
self.vendor_name:Optional[str] = None
self.vendor_number:Optional[int] = None
self.product_name:Optional[str] = None
self.product_number:Optional[int] = None
self.revision_number:Optional[int] = None
self.order_code:Optional[str] = None
self.simple_boot_up_master:Optional[bool] = None
self.simple_boot_up_slave:Optional[bool] = None
self.granularity:Optional[int] = None
self.dynamic_channels_supported:Optional[bool] = None
self.group_messaging:Optional[bool] = None
self.nr_of_RXPDO:Optional[bool] = None
self.nr_of_TXPDO:Optional[bool] = None
self.LSS_supported:Optional[bool] = None
[docs]
class ObjectDictionaryError(Exception):
"""Unsupported operation with the current Object Dictionary."""
# Compatibility for old names
Record = ODRecord
Array = ODArray
Variable = ODVariable