Source code for canopen.nmt

import threading
import logging
import struct
import time
from typing import Callable, Optional

logger = logging.getLogger(__name__)

NMT_STATES = {
    0: 'INITIALISING',
    4: 'STOPPED',
    5: 'OPERATIONAL',
    80: 'SLEEP',
    96: 'STANDBY',
    127: 'PRE-OPERATIONAL'
}

NMT_COMMANDS = {
    'OPERATIONAL': 1,
    'STOPPED': 2,
    'SLEEP': 80,
    'STANDBY': 96,
    'PRE-OPERATIONAL': 128,
    'INITIALISING': 129,
    'RESET': 129,
    'RESET COMMUNICATION': 130
}

COMMAND_TO_STATE = {
    1: 5,
    2: 4,
    80: 80,
    96: 96,
    128: 127,
    129: 0,
    130: 0
}


class NmtBase:
    """
    Can set the state of the node it controls using NMT commands and monitor
    the current state using the heartbeat protocol.
    """

    def __init__(self, node_id: int):
        self.id = node_id
        self.network = None
        self._state = 0

    def on_command(self, can_id, data, timestamp):
        cmd, node_id = struct.unpack_from("BB", data)
        if node_id in (self.id, 0):
            logger.info("Node %d received command %d", self.id, cmd)
            if cmd in COMMAND_TO_STATE:
                new_state = COMMAND_TO_STATE[cmd]
                if new_state != self._state:
                    logger.info("New NMT state %s, old state %s",
                                NMT_STATES[new_state], NMT_STATES[self._state])
                self._state = new_state

    def send_command(self, code: int):
        """Send an NMT command code to the node.

        :param code:
            NMT command code.
        """
        if code in COMMAND_TO_STATE:
            new_state = COMMAND_TO_STATE[code]
            logger.info("Changing NMT state on node %d from %s to %s",
                        self.id, NMT_STATES[self._state], NMT_STATES[new_state])
            self._state = new_state

    @property
    def state(self) -> str:
        """Attribute to get or set node's state as a string.

        Can be one of:

        - 'INITIALISING'
        - 'PRE-OPERATIONAL'
        - 'STOPPED'
        - 'OPERATIONAL'
        - 'SLEEP'
        - 'STANDBY'
        - 'RESET'
        - 'RESET COMMUNICATION'
        """
        if self._state in NMT_STATES:
            return NMT_STATES[self._state]
        else:
            return self._state

    @state.setter
    def state(self, new_state: str):
        if new_state in NMT_COMMANDS:
            code = NMT_COMMANDS[new_state]
        else:
            raise ValueError("'%s' is an invalid state. Must be one of %s." %
                             (new_state, ", ".join(NMT_COMMANDS)))

        self.send_command(code)


[docs] class NmtMaster(NmtBase): def __init__(self, node_id: int): super(NmtMaster, self).__init__(node_id) self._state_received = None self._node_guarding_producer = None #: Timestamp of last heartbeat message self.timestamp: Optional[float] = None self.state_update = threading.Condition() self._callbacks = [] def on_heartbeat(self, can_id, data, timestamp): with self.state_update: self.timestamp = timestamp new_state, = struct.unpack_from("B", data) # Mask out toggle bit new_state &= 0x7F logger.debug("Received heartbeat can-id %d, state is %d", can_id, new_state) for callback in self._callbacks: callback(new_state) if new_state == 0: # Boot-up, will go to PRE-OPERATIONAL automatically self._state = 127 else: self._state = new_state self._state_received = new_state self.state_update.notify_all()
[docs] def send_command(self, code: int): """Send an NMT command code to the node. :param code: NMT command code. """ super(NmtMaster, self).send_command(code) logger.info( "Sending NMT command 0x%X to node %d", code, self.id) self.network.send_message(0, [code, self.id])
[docs] def wait_for_heartbeat(self, timeout: float = 10): """Wait until a heartbeat message is received.""" with self.state_update: self._state_received = None self.state_update.wait(timeout) if self._state_received is None: raise NmtError("No boot-up or heartbeat received") return self.state
[docs] def wait_for_bootup(self, timeout: float = 10) -> None: """Wait until a boot-up message is received.""" end_time = time.time() + timeout while True: now = time.time() with self.state_update: self._state_received = None self.state_update.wait(end_time - now + 0.1) if now > end_time: raise NmtError("Timeout waiting for boot-up message") if self._state_received == 0: break
[docs] def add_hearbeat_callback(self, callback: Callable[[int], None]): """Add function to be called on heartbeat reception. :param callback: Function that should accept an NMT state as only argument. """ self._callbacks.append(callback)
[docs] def start_node_guarding(self, period: float): """Starts the node guarding mechanism. :param period: Period (in seconds) at which the node guarding should be advertised to the slave node. """ if self._node_guarding_producer : self.stop_node_guarding() self._node_guarding_producer = self.network.send_periodic(0x700 + self.id, None, period, True)
[docs] def stop_node_guarding(self): """Stops the node guarding mechanism.""" if self._node_guarding_producer is not None: self._node_guarding_producer.stop() self._node_guarding_producer = None
class NmtSlave(NmtBase): """ Handles the NMT state and handles heartbeat NMT service. """ def __init__(self, node_id: int, local_node): super(NmtSlave, self).__init__(node_id) self._send_task = None self._heartbeat_time_ms = 0 self._local_node = local_node def on_command(self, can_id, data, timestamp): super(NmtSlave, self).on_command(can_id, data, timestamp) self.update_heartbeat() def send_command(self, code: int) -> None: """Send an NMT command code to the node. :param code: NMT command code. """ old_state = self._state super(NmtSlave, self).send_command(code) if self._state == 0: logger.info("Sending boot-up message") self.network.send_message(0x700 + self.id, [0]) # The heartbeat service should start on the transition # between INITIALIZING and PRE-OPERATIONAL state if old_state == 0 and self._state == 127: heartbeat_time_ms = self._local_node.sdo[0x1017].raw self.start_heartbeat(heartbeat_time_ms) else: self.update_heartbeat() def on_write(self, index, data, **kwargs): if index == 0x1017: hearbeat_time, = struct.unpack_from("<H", data) if hearbeat_time == 0: self.stop_heartbeat() else: self.start_heartbeat(hearbeat_time) def start_heartbeat(self, heartbeat_time_ms: int): """Start the hearbeat service. :param hearbeat_time The heartbeat time in ms. If the heartbeat time is 0 the heartbeating will not start. """ self._heartbeat_time_ms = heartbeat_time_ms self.stop_heartbeat() if heartbeat_time_ms > 0: logger.info("Start the hearbeat timer, interval is %d ms", self._heartbeat_time_ms) self._send_task = self.network.send_periodic( 0x700 + self.id, [self._state], heartbeat_time_ms / 1000.0) def stop_heartbeat(self): """Stop the hearbeat service.""" if self._send_task is not None: logger.info("Stop the heartbeat timer") self._send_task.stop() self._send_task = None def update_heartbeat(self): if self._send_task is not None: self._send_task.update([self._state])
[docs] class NmtError(Exception): """Some NMT operation failed."""