try:
from collections.abc import MutableMapping
except ImportError:
from collections import MutableMapping
import logging
import threading
from typing import Callable, Dict, Iterable, List, Optional, Union
try:
import can
from can import Listener
from can import CanError
except ImportError:
# Do not fail if python-can is not installed
can = None
CanError = Exception
class Listener:
""" Dummy listener """
from canopen.node import RemoteNode, LocalNode
from canopen.sync import SyncProducer
from canopen.timestamp import TimeProducer
from canopen.nmt import NmtMaster
from canopen.lss import LssMaster
from canopen.objectdictionary.eds import import_from_node
from canopen.objectdictionary import ObjectDictionary
logger = logging.getLogger(__name__)
Callback = Callable[[int, bytearray, float], None]
[docs]
class Network(MutableMapping):
"""Representation of one CAN bus containing one or more nodes."""
def __init__(self, bus=None):
"""
:param can.BusABC bus:
A python-can bus instance to re-use.
"""
#: A python-can :class:`can.BusABC` instance which is set after
#: :meth:`canopen.Network.connect` is called
self.bus = bus
#: A :class:`~canopen.network.NodeScanner` for detecting nodes
self.scanner = NodeScanner(self)
#: List of :class:`can.Listener` objects.
#: Includes at least MessageListener.
self.listeners = [MessageListener(self)]
self.notifier = None
self.nodes: Dict[int, Union[RemoteNode, LocalNode]] = {}
self.subscribers: Dict[int, List[Callback]] = {}
self.send_lock = threading.Lock()
self.sync = SyncProducer(self)
self.time = TimeProducer(self)
self.nmt = NmtMaster(0)
self.nmt.network = self
self.lss = LssMaster()
self.lss.network = self
self.subscribe(self.lss.LSS_RX_COBID, self.lss.on_message_received)
[docs]
def subscribe(self, can_id: int, callback: Callback) -> None:
"""Listen for messages with a specific CAN ID.
:param can_id:
The CAN ID to listen for.
:param callback:
Function to call when message is received.
"""
self.subscribers.setdefault(can_id, list())
if callback not in self.subscribers[can_id]:
self.subscribers[can_id].append(callback)
[docs]
def unsubscribe(self, can_id, callback=None) -> None:
"""Stop listening for message.
:param int can_id:
The CAN ID from which to unsubscribe.
:param callback:
If given, remove only this callback. Otherwise all callbacks for
the CAN ID.
"""
if callback is None:
del self.subscribers[can_id]
else:
self.subscribers[can_id].remove(callback)
[docs]
def connect(self, *args, **kwargs) -> "Network":
"""Connect to CAN bus using python-can.
Arguments are passed directly to :class:`can.BusABC`. Typically these
may include:
:param channel:
Backend specific channel for the CAN interface.
:param str bustype:
Name of the interface. See
`python-can manual <https://python-can.readthedocs.io/en/stable/configuration.html#interface-names>`__
for full list of supported interfaces.
:param int bitrate:
Bitrate in bit/s.
:raises can.CanError:
When connection fails.
"""
# If bitrate has not been specified, try to find one node where bitrate
# has been specified
if "bitrate" not in kwargs:
for node in self.nodes.values():
if node.object_dictionary.bitrate:
kwargs["bitrate"] = node.object_dictionary.bitrate
break
self.bus = can.interface.Bus(*args, **kwargs)
logger.info("Connected to '%s'", self.bus.channel_info)
self.notifier = can.Notifier(self.bus, self.listeners, 1)
return self
[docs]
def disconnect(self) -> None:
"""Disconnect from the CAN bus.
Must be overridden in a subclass if a custom interface is used.
"""
for node in self.nodes.values():
if hasattr(node, "pdo"):
node.pdo.stop()
if self.notifier is not None:
self.notifier.stop()
if self.bus is not None:
self.bus.shutdown()
self.bus = None
self.check()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.disconnect()
[docs]
def add_node(
self,
node: Union[int, RemoteNode, LocalNode],
object_dictionary: Union[str, ObjectDictionary, None] = None,
upload_eds: bool = False,
) -> RemoteNode:
"""Add a remote node to the network.
:param node:
Can be either an integer representing the node ID, a
:class:`canopen.RemoteNode` or :class:`canopen.LocalNode` object.
:param object_dictionary:
Can be either a string for specifying the path to an
Object Dictionary file or a
:class:`canopen.ObjectDictionary` object.
:param upload_eds:
Set ``True`` if EDS file should be uploaded from 0x1021.
:return:
The Node object that was added.
"""
if isinstance(node, int):
if upload_eds:
logger.info("Trying to read EDS from node %d", node)
object_dictionary = import_from_node(node, self)
node = RemoteNode(node, object_dictionary)
self[node.id] = node
return node
[docs]
def create_node(
self,
node: int,
object_dictionary: Union[str, ObjectDictionary, None] = None,
) -> LocalNode:
"""Create a local node in the network.
:param node:
An integer representing the node ID.
:param object_dictionary:
Can be either a string for specifying the path to an
Object Dictionary file or a
:class:`canopen.ObjectDictionary` object.
:return:
The Node object that was added.
"""
if isinstance(node, int):
node = LocalNode(node, object_dictionary)
self[node.id] = node
return node
[docs]
def send_message(self, can_id: int, data: bytes, remote: bool = False) -> None:
"""Send a raw CAN message to the network.
This method may be overridden in a subclass if you need to integrate
this library with a custom backend.
It is safe to call this from multiple threads.
:param int can_id:
CAN-ID of the message
:param data:
Data to be transmitted (anything that can be converted to bytes)
:param bool remote:
Set to True to send remote frame
:raises can.CanError:
When the message fails to be transmitted
"""
if not self.bus:
raise RuntimeError("Not connected to CAN bus")
msg = can.Message(is_extended_id=can_id > 0x7FF,
arbitration_id=can_id,
data=data,
is_remote_frame=remote)
with self.send_lock:
self.bus.send(msg)
self.check()
[docs]
def send_periodic(
self, can_id: int, data: bytes, period: float, remote: bool = False
) -> "PeriodicMessageTask":
"""Start sending a message periodically.
:param can_id:
CAN-ID of the message
:param data:
Data to be transmitted (anything that can be converted to bytes)
:param period:
Seconds between each message
:param remote:
indicates if the message frame is a remote request to the slave node
:return:
An task object with a ``.stop()`` method to stop the transmission
"""
return PeriodicMessageTask(can_id, data, period, self.bus, remote)
[docs]
def notify(self, can_id: int, data: bytearray, timestamp: float) -> None:
"""Feed incoming message to this library.
If a custom interface is used, this function must be called for each
message read from the CAN bus.
:param can_id:
CAN-ID of the message
:param data:
Data part of the message (0 - 8 bytes)
:param timestamp:
Timestamp of the message, preferably as a Unix timestamp
"""
if can_id in self.subscribers:
callbacks = self.subscribers[can_id]
for callback in callbacks:
callback(can_id, data, timestamp)
self.scanner.on_message_received(can_id)
[docs]
def check(self) -> None:
"""Check that no fatal error has occurred in the receiving thread.
If an exception caused the thread to terminate, that exception will be
raised.
"""
if self.notifier is not None:
exc = self.notifier.exception
if exc is not None:
logger.error("An error has caused receiving of messages to stop")
raise exc
def __getitem__(self, node_id: int) -> Union[RemoteNode, LocalNode]:
return self.nodes[node_id]
def __setitem__(self, node_id: int, node: Union[RemoteNode, LocalNode]):
assert node_id == node.id
if node_id in self.nodes:
# Remove old callbacks
self.nodes[node_id].remove_network()
self.nodes[node_id] = node
node.associate_network(self)
def __delitem__(self, node_id: int):
self.nodes[node_id].remove_network()
del self.nodes[node_id]
def __iter__(self) -> Iterable[int]:
return iter(self.nodes)
def __len__(self) -> int:
return len(self.nodes)
[docs]
class PeriodicMessageTask:
"""
Task object to transmit a message periodically using python-can's
CyclicSendTask
"""
def __init__(
self,
can_id: int,
data: bytes,
period: float,
bus,
remote: bool = False,
):
"""
:param can_id:
CAN-ID of the message
:param data:
Data to be transmitted (anything that can be converted to bytes)
:param period:
Seconds between each message
:param can.BusABC bus:
python-can bus to use for transmission
"""
self.bus = bus
self.period = period
self.msg = can.Message(is_extended_id=can_id > 0x7FF,
arbitration_id=can_id,
data=data, is_remote_frame=remote)
self._task = None
self._start()
def _start(self):
self._task = self.bus.send_periodic(self.msg, self.period)
[docs]
def stop(self):
"""Stop transmission"""
self._task.stop()
[docs]
def update(self, data: bytes) -> None:
"""Update data of message
:param data:
New data to transmit
"""
new_data = bytearray(data)
old_data = self.msg.data
self.msg.data = new_data
if hasattr(self._task, "modify_data"):
self._task.modify_data(self.msg)
elif new_data != old_data:
# Stop and start (will mess up period unfortunately)
self._task.stop()
self._start()
[docs]
class MessageListener(Listener):
"""Listens for messages on CAN bus and feeds them to a Network instance.
:param network:
The network to notify on new messages.
"""
def __init__(self, network: Network):
self.network = network
[docs]
def on_message_received(self, msg):
if msg.is_error_frame or msg.is_remote_frame:
return
try:
self.network.notify(msg.arbitration_id, msg.data, msg.timestamp)
except Exception as e:
# Exceptions in any callbaks should not affect CAN processing
logger.error(str(e))
[docs]
class NodeScanner:
"""Observes which nodes are present on the bus.
Listens for the following messages:
- Heartbeat (0x700)
- SDO response (0x580)
- TxPDO (0x180, 0x280, 0x380, 0x480)
- EMCY (0x80)
:param canopen.Network network:
The network to use when doing active searching.
"""
#: Activate or deactivate scanning
active = True
SERVICES = (0x700, 0x580, 0x180, 0x280, 0x380, 0x480, 0x80)
def __init__(self, network: Optional[Network] = None):
self.network = network
#: A :class:`list` of nodes discovered
self.nodes: List[int] = []
def on_message_received(self, can_id: int):
service = can_id & 0x780
node_id = can_id & 0x7F
if node_id not in self.nodes and node_id != 0 and service in self.SERVICES:
self.nodes.append(node_id)
[docs]
def reset(self):
"""Clear list of found nodes."""
self.nodes = []
[docs]
def search(self, limit: int = 127) -> None:
"""Search for nodes by sending SDO requests to all node IDs."""
if self.network is None:
raise RuntimeError("A Network is required to do active scanning")
sdo_req = b"\x40\x00\x10\x00\x00\x00\x00\x00"
for node_id in range(1, limit + 1):
self.network.send_message(0x600 + node_id, sdo_req)