Source code for canopen.profiles.p402

# inspired by the NmtMaster code
import logging
import time

from canopen.node import RemoteNode
from canopen.sdo import SdoCommunicationError

logger = logging.getLogger(__name__)


class State402:
    # Controlword (0x6040) commands
    CW_OPERATION_ENABLED = 0x000F
    CW_SHUTDOWN = 0x0006
    CW_SWITCH_ON = 0x0007
    CW_QUICK_STOP = 0x0002
    CW_DISABLE_VOLTAGE = 0x0000
    CW_SWITCH_ON_DISABLED = 0x0080

    CW_CODE_COMMANDS = {
        CW_SWITCH_ON_DISABLED:          'SWITCH ON DISABLED',
        CW_DISABLE_VOLTAGE:             'DISABLE VOLTAGE',
        CW_SHUTDOWN:                    'READY TO SWITCH ON',
        CW_SWITCH_ON:                   'SWITCHED ON',
        CW_OPERATION_ENABLED:           'OPERATION ENABLED',
        CW_QUICK_STOP:                  'QUICK STOP ACTIVE',
    }

    CW_COMMANDS_CODE = {
        'SWITCH ON DISABLED':           CW_SWITCH_ON_DISABLED,
        'DISABLE VOLTAGE':              CW_DISABLE_VOLTAGE,
        'READY TO SWITCH ON':           CW_SHUTDOWN,
        'SWITCHED ON':                  CW_SWITCH_ON,
        'OPERATION ENABLED':            CW_OPERATION_ENABLED,
        'QUICK STOP ACTIVE':            CW_QUICK_STOP,
    }

    # Statusword 0x6041 bitmask and values in the list in the dictionary value
    SW_MASK = {
        'NOT READY TO SWITCH ON':       (0x4F, 0x00),
        'SWITCH ON DISABLED':           (0x4F, 0x40),
        'READY TO SWITCH ON':           (0x6F, 0x21),
        'SWITCHED ON':                  (0x6F, 0x23),
        'OPERATION ENABLED':            (0x6F, 0x27),
        'FAULT':                        (0x4F, 0x08),
        'FAULT REACTION ACTIVE':        (0x4F, 0x0F),
        'QUICK STOP ACTIVE':            (0x6F, 0x07),
    }

    # Transition path to reach and state without a direct transition
    NEXTSTATE2ANY = {
        ('START'):                                                      'NOT READY TO SWITCH ON',
        ('FAULT', 'NOT READY TO SWITCH ON', 'QUICK STOP ACTIVE'):       'SWITCH ON DISABLED',
        ('SWITCH ON DISABLED'):                                         'READY TO SWITCH ON',
        ('READY TO SWITCH ON'):                                         'SWITCHED ON',
        ('SWITCHED ON'):                                                'OPERATION ENABLED',
        ('FAULT REACTION ACTIVE'):                                      'FAULT',
    }

    # Tansition table from the DS402 State Machine
    TRANSITIONTABLE = {
        # disable_voltage ---------------------------------------------------------------------
        ('READY TO SWITCH ON', 'SWITCH ON DISABLED'):     CW_DISABLE_VOLTAGE,  # transition 7
        ('OPERATION ENABLED', 'SWITCH ON DISABLED'):      CW_DISABLE_VOLTAGE,  # transition 9
        ('SWITCHED ON', 'SWITCH ON DISABLED'):            CW_DISABLE_VOLTAGE,  # transition 10
        ('QUICK STOP ACTIVE', 'SWITCH ON DISABLED'):      CW_DISABLE_VOLTAGE,  # transition 12
        # automatic ---------------------------------------------------------------------------
        ('NOT READY TO SWITCH ON', 'SWITCH ON DISABLED'): 0x00,  # transition 1
        ('START', 'NOT READY TO SWITCH ON'):              0x00,  # transition 0
        ('FAULT REACTION ACTIVE', 'FAULT'):               0x00,  # transition 14
        # shutdown ----------------------------------------------------------------------------
        ('SWITCH ON DISABLED', 'READY TO SWITCH ON'):     CW_SHUTDOWN,  # transition 2
        ('SWITCHED ON', 'READY TO SWITCH ON'):            CW_SHUTDOWN,  # transition 6
        ('OPERATION ENABLED', 'READY TO SWITCH ON'):      CW_SHUTDOWN,  # transition 8
        # switch_on ---------------------------------------------------------------------------
        ('READY TO SWITCH ON', 'SWITCHED ON'):            CW_SWITCH_ON,  # transition 3
        ('OPERATION ENABLED', 'SWITCHED ON'):             CW_SWITCH_ON,  # transition 5
        # enable_operation --------------------------------------------------------------------
        ('SWITCHED ON', 'OPERATION ENABLED'):             CW_OPERATION_ENABLED,  # transition 4
        ('QUICK STOP ACTIVE', 'OPERATION ENABLED'):       CW_OPERATION_ENABLED,  # transition 16
        # quickstop ---------------------------------------------------------------------------
        ('OPERATION ENABLED', 'QUICK STOP ACTIVE'):       CW_QUICK_STOP,  # transition 11
        # fault -------------------------------------------------------------------------------
        ('FAULT', 'SWITCH ON DISABLED'):                  CW_SWITCH_ON_DISABLED,  # transition 15
    }

    @staticmethod
    def next_state_indirect(_from):
        """Return the next state needed to reach any state indirectly.

        The chosen path always points toward the OPERATION ENABLED state, except when
        coming from QUICK STOP ACTIVE.  In that case, it will cycle through SWITCH ON
        DISABLED first, as there would have been a direct transition if the opposite was
        desired.

        :param str target: Target state.
        :return: Next target to change.
        :rtype: str
        """
        for cond, next_state in State402.NEXTSTATE2ANY.items():
            if _from in cond:
                return next_state


class OperationMode:
    NO_MODE = 0
    PROFILED_POSITION = 1
    VELOCITY = 2
    PROFILED_VELOCITY = 3
    PROFILED_TORQUE = 4
    HOMING = 6
    INTERPOLATED_POSITION = 7
    CYCLIC_SYNCHRONOUS_POSITION = 8
    CYCLIC_SYNCHRONOUS_VELOCITY = 9
    CYCLIC_SYNCHRONOUS_TORQUE = 10
    OPEN_LOOP_SCALAR_MODE = -1
    OPEN_LOOP_VECTOR_MODE = -2

    CODE2NAME = {
        NO_MODE:                        'NO MODE',
        PROFILED_POSITION:              'PROFILED POSITION',
        VELOCITY:                       'VELOCITY',
        PROFILED_VELOCITY:              'PROFILED VELOCITY',
        PROFILED_TORQUE:                'PROFILED TORQUE',
        HOMING:                         'HOMING',
        INTERPOLATED_POSITION:          'INTERPOLATED POSITION',
        CYCLIC_SYNCHRONOUS_POSITION:    'CYCLIC SYNCHRONOUS POSITION',
        CYCLIC_SYNCHRONOUS_VELOCITY:    'CYCLIC SYNCHRONOUS VELOCITY',
        CYCLIC_SYNCHRONOUS_TORQUE:      'CYCLIC SYNCHRONOUS TORQUE',
    }

    NAME2CODE = {
        'NO MODE':                      NO_MODE,
        'PROFILED POSITION':            PROFILED_POSITION,
        'VELOCITY':                     VELOCITY,
        'PROFILED VELOCITY':            PROFILED_VELOCITY,
        'PROFILED TORQUE':              PROFILED_TORQUE,
        'HOMING':                       HOMING,
        'INTERPOLATED POSITION':        INTERPOLATED_POSITION,
        'CYCLIC SYNCHRONOUS POSITION':  CYCLIC_SYNCHRONOUS_POSITION,
        'CYCLIC SYNCHRONOUS VELOCITY':  CYCLIC_SYNCHRONOUS_VELOCITY,
        'CYCLIC SYNCHRONOUS TORQUE':    CYCLIC_SYNCHRONOUS_TORQUE,
    }

    SUPPORTED = {
        'NO MODE':                      0x0000,
        'PROFILED POSITION':            0x0001,
        'VELOCITY':                     0x0002,
        'PROFILED VELOCITY':            0x0004,
        'PROFILED TORQUE':              0x0008,
        'HOMING':                       0x0020,
        'INTERPOLATED POSITION':        0x0040,
        'CYCLIC SYNCHRONOUS POSITION':  0x0080,
        'CYCLIC SYNCHRONOUS VELOCITY':  0x0100,
        'CYCLIC SYNCHRONOUS TORQUE':    0x0200,
    }


class Homing:
    CW_START = 0x10
    CW_HALT = 0x100

    HM_ON_POSITIVE_FOLLOWING_ERROR = -8
    HM_ON_NEGATIVE_FOLLOWING_ERROR = -7
    HM_ON_POSITIVE_FOLLOWING_AND_INDEX_PULSE = -6
    HM_ON_NEGATIVE_FOLLOWING_AND_INDEX_PULSE = -5
    HM_ON_THE_POSITIVE_MECHANICAL_LIMIT = -4
    HM_ON_THE_NEGATIVE_MECHANICAL_LIMIT = -3
    HM_ON_THE_POSITIVE_MECHANICAL_LIMIT_AND_INDEX_PULSE = -2
    HM_ON_THE_NEGATIVE_MECHANICAL_LIMIT_AND_INDEX_PULSE = -1
    HM_NO_HOMING_OPERATION = 0
    HM_ON_THE_NEGATIVE_LIMIT_SWITCH_AND_INDEX_PULSE = 1
    HM_ON_THE_POSITIVE_LIMIT_SWITCH_AND_INDEX_PULSE = 2
    HM_ON_THE_POSITIVE_HOME_SWITCH_AND_INDEX_PULSE = (3, 4)
    HM_ON_THE_NEGATIVE_HOME_SWITCH_AND_INDEX_PULSE = (5, 6)
    HM_ON_THE_NEGATIVE_LIMIT_SWITCH = 17
    HM_ON_THE_POSITIVE_LIMIT_SWITCH = 18
    HM_ON_THE_POSITIVE_HOME_SWITCH = (19, 20)
    HM_ON_THE_NEGATIVE_HOME_SWITCH = (21, 22)
    HM_ON_NEGATIVE_INDEX_PULSE = 33
    HM_ON_POSITIVE_INDEX_PULSE = 34
    HM_ON_CURRENT_POSITION = 35

    STATES = {
        'IN PROGRESS':                  (0x3400, 0x0000),
        'INTERRUPTED':                  (0x3400, 0x0400),
        'ATTAINED':                     (0x3400, 0x1000),
        'TARGET REACHED':               (0x3400, 0x1400),
        'ERROR VELOCITY IS NOT ZERO':   (0x3400, 0x2000),
        'ERROR VELOCITY IS ZERO':       (0x3400, 0x2400),
    }


[docs] class BaseNode402(RemoteNode): """A CANopen CiA 402 profile slave node. :param int node_id: Node ID (set to None or 0 if specified by object dictionary) :param object_dictionary: Object dictionary as either a path to a file, an ``ObjectDictionary`` or a file like object. :type object_dictionary: :class:`str`, :class:`canopen.ObjectDictionary` """ TIMEOUT_RESET_FAULT = 0.4 # seconds TIMEOUT_SWITCH_OP_MODE = 0.5 # seconds TIMEOUT_SWITCH_STATE_FINAL = 0.8 # seconds TIMEOUT_SWITCH_STATE_SINGLE = 0.4 # seconds TIMEOUT_CHECK_TPDO = 0.2 # seconds TIMEOUT_HOMING_DEFAULT = 30 # seconds def __init__(self, node_id, object_dictionary): super(BaseNode402, self).__init__(node_id, object_dictionary) self.tpdo_values = {} # { index: value from last received TPDO } self.tpdo_pointers = {} # { index: pdo.Map instance } self.rpdo_pointers = {} # { index: pdo.Map instance }
[docs] def setup_402_state_machine(self, read_pdos=True): """Configure the state machine by searching for a TPDO that has the StatusWord mapped. :param bool read_pdos: Upload current PDO configuration from node. :raises ValueError: If the the node can't find a Statusword configured in any of the TPDOs. """ self.setup_pdos(read_pdos) self._check_controlword_configured() self._check_statusword_configured() self._check_op_mode_configured()
[docs] def setup_pdos(self, upload=True): """Find the relevant PDO configuration to handle the state machine. :param bool upload: Retrieve up-to-date configuration via SDO. If False, the node's mappings must already be configured in the object, matching the drive's settings. :raises AssertionError: When the node's NMT state disallows SDOs for reading the PDO configuration. """ if upload: assert self.nmt.state in 'PRE-OPERATIONAL', 'OPERATIONAL' self.pdo.read() # TPDO and RPDO configurations else: self.pdo.subscribe() # Get notified on reception, usually a side-effect of read() self._init_tpdo_values() self._init_rpdo_pointers()
def _init_tpdo_values(self): for tpdo in self.tpdo.values(): if tpdo.enabled: tpdo.add_callback(self.on_TPDOs_update_callback) for obj in tpdo: logger.debug('Configured TPDO: {0}'.format(obj.index)) if obj.index not in self.tpdo_values: self.tpdo_values[obj.index] = 0 self.tpdo_pointers[obj.index] = obj def _init_rpdo_pointers(self): # If RPDOs have overlapping indecies, rpdo_pointers will point to # the first RPDO that has that index configured. for rpdo in self.rpdo.values(): if rpdo.enabled: for obj in rpdo: logger.debug('Configured RPDO: {0}'.format(obj.index)) if obj.index not in self.rpdo_pointers: self.rpdo_pointers[obj.index] = obj def _check_controlword_configured(self): if 0x6040 not in self.rpdo_pointers: # Controlword logger.warning( "Controlword not configured in node {0}'s PDOs. Using SDOs can cause slow performance.".format( self.id)) def _check_statusword_configured(self): if 0x6041 not in self.tpdo_values: # Statusword logger.warning( "Statusword not configured in node {0}'s PDOs. Using SDOs can cause slow performance.".format( self.id)) def _check_op_mode_configured(self): if 0x6060 not in self.rpdo_pointers: # Operation Mode logger.warning( "Operation Mode not configured in node {0}'s PDOs. Using SDOs can cause slow performance.".format( self.id)) if 0x6061 not in self.tpdo_values: # Operation Mode Display logger.warning( "Operation Mode Display not configured in node {0}'s PDOs. Using SDOs can cause slow performance.".format( self.id))
[docs] def reset_from_fault(self): """Reset node from fault and set it to Operation Enable state.""" if self.state == 'FAULT': # Resets the Fault Reset bit (rising edge 0 -> 1) self.controlword = State402.CW_DISABLE_VOLTAGE # FIXME! The rising edge happens with the transitions toward OPERATION # ENABLED below, but until then the loop will always reach the timeout! timeout = time.monotonic() + self.TIMEOUT_RESET_FAULT while self.is_faulted(): if time.monotonic() > timeout: break self.check_statusword() self.state = 'OPERATION ENABLED'
def is_faulted(self): bitmask, bits = State402.SW_MASK['FAULT'] return self.statusword & bitmask == bits def _homing_status(self): """Interpret the current Statusword bits as homing state string.""" # Wait to make sure a TPDO was received self.check_statusword() status = None for key, value in Homing.STATES.items(): bitmask, bits = value if self.statusword & bitmask == bits: status = key return status
[docs] def is_homed(self, restore_op_mode=False): """Switch to homing mode and determine its status. :param bool restore_op_mode: Switch back to the previous operation mode when done. :return: If the status indicates successful homing. :rtype: bool """ previous_op_mode = self.op_mode if previous_op_mode != 'HOMING': logger.info('Switch to HOMING from %s', previous_op_mode) self.op_mode = 'HOMING' # blocks until confirmed homingstatus = self._homing_status() if restore_op_mode: self.op_mode = previous_op_mode return homingstatus in ('TARGET REACHED', 'ATTAINED')
[docs] def homing(self, timeout=None, restore_op_mode=False): """Execute the configured Homing method on the node. :param int timeout: Timeout value (default: 30, zero to disable). :param bool restore_op_mode: Switch back to the previous operation mode after homing (default: no). :return: If the homing was complete with success. :rtype: bool """ if timeout is None: timeout = self.TIMEOUT_HOMING_DEFAULT if restore_op_mode: previous_op_mode = self.op_mode self.op_mode = 'HOMING' # The homing process will initialize at operation enabled self.state = 'OPERATION ENABLED' homingstatus = 'UNKNOWN' self.controlword = State402.CW_OPERATION_ENABLED | Homing.CW_START # does not block # Wait for one extra cycle, to make sure the controlword was received self.check_statusword() t = time.monotonic() + timeout try: while homingstatus not in ('TARGET REACHED', 'ATTAINED'): homingstatus = self._homing_status() if homingstatus in ('INTERRUPTED', 'ERROR VELOCITY IS NOT ZERO', 'ERROR VELOCITY IS ZERO'): raise RuntimeError('Unable to home. Reason: {0}'.format(homingstatus)) if timeout and time.monotonic() > t: raise RuntimeError('Unable to home, timeout reached') logger.info('Homing mode carried out successfully.') return True except RuntimeError as e: logger.info(str(e)) finally: if restore_op_mode: self.op_mode = previous_op_mode return False
@property def op_mode(self): """The node's Operation Mode stored in the object 0x6061. Uses SDO or PDO to access the current value. The modes are passed as one of the following strings: - 'NO MODE' - 'PROFILED POSITION' - 'VELOCITY' - 'PROFILED VELOCITY' - 'PROFILED TORQUE' - 'HOMING' - 'INTERPOLATED POSITION' - 'CYCLIC SYNCHRONOUS POSITION' - 'CYCLIC SYNCHRONOUS VELOCITY' - 'CYCLIC SYNCHRONOUS TORQUE' - 'OPEN LOOP SCALAR MODE' - 'OPEN LOOP VECTOR MODE' :raises TypeError: When setting a mode not advertised as supported by the node. :raises RuntimeError: If the switch is not confirmed within the configured timeout. """ try: pdo = self.tpdo_pointers[0x6061].pdo_parent if pdo.is_periodic: timestamp = pdo.wait_for_reception(timeout=self.TIMEOUT_CHECK_TPDO) if timestamp is None: raise RuntimeError("Timeout getting node {0}'s mode of operation.".format( self.id)) code = self.tpdo_values[0x6061] except KeyError: logger.warning('The object 0x6061 is not a configured TPDO, fallback to SDO') code = self.sdo[0x6061].raw return OperationMode.CODE2NAME[code] @op_mode.setter def op_mode(self, mode): try: if not self.is_op_mode_supported(mode): raise TypeError( 'Operation mode {m} not suppported on node {n}.'.format(n=self.id, m=mode)) # Update operation mode in RPDO if possible, fall back to SDO if 0x6060 in self.rpdo_pointers: self.rpdo_pointers[0x6060].raw = OperationMode.NAME2CODE[mode] pdo = self.rpdo_pointers[0x6060].pdo_parent if not pdo.is_periodic: pdo.transmit() else: self.sdo[0x6060].raw = OperationMode.NAME2CODE[mode] timeout = time.monotonic() + self.TIMEOUT_SWITCH_OP_MODE while self.op_mode != mode: if time.monotonic() > timeout: raise RuntimeError( "Timeout setting node {0}'s new mode of operation to {1}.".format( self.id, mode)) logger.info('Set node {n} operation mode to {m}.'.format(n=self.id, m=mode)) except SdoCommunicationError as e: logger.warning('[SDO communication error] Cause: {0}'.format(str(e))) except (RuntimeError, ValueError) as e: logger.warning('{0}'.format(str(e))) def _clear_target_values(self): # [target velocity, target position, target torque] for target_index in [0x60FF, 0x607A, 0x6071]: if target_index in self.sdo.keys(): self.sdo[target_index].raw = 0
[docs] def is_op_mode_supported(self, mode): """Check if the operation mode is supported by the node. The object listing the supported modes is retrieved once using SDO, then cached for later checks. :param str mode: Same format as the :attr:`op_mode` property. :return: If the operation mode is supported. :rtype: bool """ if not hasattr(self, '_op_mode_support'): # Cache value only on first lookup, this object should never change. self._op_mode_support = self.sdo[0x6502].raw logger.info('Caching node {n} supported operation modes 0x{m:04X}'.format( n=self.id, m=self._op_mode_support)) bits = OperationMode.SUPPORTED[mode] return self._op_mode_support & bits == bits
[docs] def on_TPDOs_update_callback(self, mapobject): """Cache updated values from a TPDO received from this node. :param mapobject: The received PDO message. :type mapobject: canopen.pdo.Map """ for obj in mapobject: self.tpdo_values[obj.index] = obj.raw
@property def statusword(self): """Return the last read value of the Statusword (0x6041) from the device. If the object 0x6041 is not configured in any TPDO it will fall back to the SDO mechanism and try to get the value. """ try: return self.tpdo_values[0x6041] except KeyError: logger.warning('The object 0x6041 is not a configured TPDO, fallback to SDO') return self.sdo[0x6041].raw
[docs] def check_statusword(self, timeout=None): """Report an up-to-date reading of the Statusword (0x6041) from the device. If the TPDO with the Statusword is configured as periodic, this method blocks until one was received. Otherwise, it uses the SDO fallback of the ``statusword`` property. :param timeout: Maximum time in seconds to wait for TPDO reception. :raises RuntimeError: Occurs when the given timeout expires without a TPDO. :return: Updated value of the ``statusword`` property. :rtype: int """ if 0x6041 in self.tpdo_pointers: pdo = self.tpdo_pointers[0x6041].pdo_parent if pdo.is_periodic: timestamp = pdo.wait_for_reception(timeout or self.TIMEOUT_CHECK_TPDO) if timestamp is None: raise RuntimeError('Timeout waiting for updated statusword') else: return self.sdo[0x6041].raw return self.statusword
@property def controlword(self): """Send a state change command using PDO or SDO. :param int value: Controlword value to set. :raises RuntimeError: Read access to the controlword is not intended. """ raise RuntimeError('The Controlword is write-only.') @controlword.setter def controlword(self, value): if 0x6040 in self.rpdo_pointers: self.rpdo_pointers[0x6040].raw = value pdo = self.rpdo_pointers[0x6040].pdo_parent if not pdo.is_periodic: pdo.transmit() else: self.sdo[0x6040].raw = value @property def state(self): """Manipulate current state of the DS402 State Machine on the node. Uses the last received Statusword value for read access, and manipulates the :attr:`controlword` for changing states. The states are passed as one of the following strings: - 'NOT READY TO SWITCH ON' (cannot be switched to deliberately) - 'SWITCH ON DISABLED' - 'READY TO SWITCH ON' - 'SWITCHED ON' - 'OPERATION ENABLED' - 'FAULT' (cannot be switched to deliberately) - 'FAULT REACTION ACTIVE' (cannot be switched to deliberately) - 'QUICK STOP ACTIVE' - 'DISABLE VOLTAGE' (only as a command when writing) :raises RuntimeError: If the switch is not confirmed within the configured timeout. :raises ValueError: Trying to execute a illegal transition in the state machine. """ for state, mask_val_pair in State402.SW_MASK.items(): bitmask, bits = mask_val_pair if self.statusword & bitmask == bits: return state return 'UNKNOWN' @state.setter def state(self, target_state): timeout = time.monotonic() + self.TIMEOUT_SWITCH_STATE_FINAL while self.state != target_state: next_state = self._next_state(target_state) if self._change_state(next_state): continue if time.monotonic() > timeout: raise RuntimeError('Timeout when trying to change state') self.check_statusword() def _next_state(self, target_state): if target_state in ('NOT READY TO SWITCH ON', 'FAULT REACTION ACTIVE', 'FAULT'): raise ValueError( 'Target state {} cannot be entered programmatically'.format(target_state)) from_state = self.state if (from_state, target_state) in State402.TRANSITIONTABLE: return target_state else: return State402.next_state_indirect(from_state) def _change_state(self, target_state): try: self.controlword = State402.TRANSITIONTABLE[(self.state, target_state)] except KeyError: raise ValueError( 'Illegal state transition from {f} to {t}'.format(f=self.state, t=target_state)) timeout = time.monotonic() + self.TIMEOUT_SWITCH_STATE_SINGLE while self.state != target_state: if time.monotonic() > timeout: return False self.check_statusword() return True