Source code for canopen.sdo.client

import io
import logging
import queue
import struct
import time

from can import CanError

from canopen import objectdictionary
from canopen.sdo.base import SdoBase
from canopen.sdo.constants import *
from canopen.sdo.exceptions import *
from canopen.utils import pretty_index


logger = logging.getLogger(__name__)


[docs] class SdoClient(SdoBase): """Handles communication with an SDO server.""" #: Max time in seconds to wait for response from server RESPONSE_TIMEOUT = 0.3 #: Max number of request retries before raising error MAX_RETRIES = 1 #: Seconds to wait before sending a request, for rate limiting PAUSE_BEFORE_SEND = 0.0 #: Seconds to wait before retrying a request after a send error RETRY_DELAY = 0.1 def __init__(self, rx_cobid, tx_cobid, od): """ :param int rx_cobid: COB-ID that the server receives on (usually 0x600 + node ID) :param int tx_cobid: COB-ID that the server responds with (usually 0x580 + node ID) :param canopen.ObjectDictionary od: Object Dictionary to use for communication """ SdoBase.__init__(self, rx_cobid, tx_cobid, od) self.responses = queue.Queue() def on_response(self, can_id, data, timestamp): self.responses.put(bytes(data)) def send_request(self, request): retries_left = self.MAX_RETRIES if self.PAUSE_BEFORE_SEND: time.sleep(self.PAUSE_BEFORE_SEND) while True: try: self.network.send_message(self.rx_cobid, request) except CanError as e: # Could be a buffer overflow. Wait some time before trying again retries_left -= 1 if not retries_left: raise logger.info(str(e)) if self.RETRY_DELAY: time.sleep(self.RETRY_DELAY) else: break
[docs] def read_response(self): """Wait for an SDO response and handle timeout or remote abort. :raises canopen.SdoAbortedError: When receiving an SDO abort response from the server. :raises canopen.SdoCommunicationError: After timeout with no response received. """ try: response = self.responses.get( block=True, timeout=self.RESPONSE_TIMEOUT) except queue.Empty: raise SdoCommunicationError("No SDO response received") res_command, = struct.unpack_from("B", response) if res_command == RESPONSE_ABORTED: abort_code, = struct.unpack_from("<L", response, 4) raise SdoAbortedError(abort_code) return response
def request_response(self, sdo_request): retries_left = self.MAX_RETRIES if not self.responses.empty(): # logger.warning("There were unexpected messages in the queue") self.responses = queue.Queue() while True: self.send_request(sdo_request) # Wait for node to respond try: return self.read_response() except SdoCommunicationError as e: retries_left -= 1 if not retries_left: self.abort(ABORT_TIMED_OUT) raise logger.warning(str(e))
[docs] def abort(self, abort_code=ABORT_GENERAL_ERROR): """Abort current transfer.""" request = bytearray(8) request[0] = REQUEST_ABORTED # TODO: Is it necessary to include index and subindex? struct.pack_into("<L", request, 4, abort_code) self.send_request(request) logger.error("Transfer aborted by client with code 0x%08X", abort_code)
[docs] def upload(self, index: int, subindex: int) -> bytes: """May be called to make a read operation without an Object Dictionary. No validation against the Object Dictionary is performed, even if an object description would be available. The length of the returned data depends only on the transferred amount, possibly truncated to the size indicated by the server. :param index: Index of object to read. :param subindex: Sub-index of object to read. :return: A data object. :raises canopen.SdoCommunicationError: On unexpected response or timeout. :raises canopen.SdoAbortedError: When node responds with an error. """ with self.open(index, subindex, buffering=0) as fp: response_size = fp.size data = fp.read() if response_size and response_size < len(data): data = data[:response_size] return data
[docs] def download( self, index: int, subindex: int, data: bytes, force_segment: bool = False, ) -> None: """May be called to make a write operation without an Object Dictionary. :param index: Index of object to write. :param subindex: Sub-index of object to write. :param data: Data to be written. :param force_segment: Force use of segmented transfer regardless of data size. :raises canopen.SdoCommunicationError: On unexpected response or timeout. :raises canopen.SdoAbortedError: When node responds with an error. """ with self.open(index, subindex, "wb", buffering=7, size=len(data), force_segment=force_segment) as fp: fp.write(data)
[docs] def open(self, index, subindex=0, mode="rb", encoding="ascii", buffering=1024, size=None, block_transfer=False, force_segment=False, request_crc_support=True): """Open the data stream as a file like object. :param int index: Index of object to open. :param int subindex: Sub-index of object to open. :param str mode: ========= ========================================================== Character Meaning --------- ---------------------------------------------------------- 'r' open for reading (default) 'w' open for writing 'b' binary mode (default) 't' text mode ========= ========================================================== :param str encoding: The str name of the encoding used to decode or encode the file. This will only be used in text mode. :param int buffering: An optional integer used to set the buffering policy. Pass 0 to switch buffering off (only allowed in binary mode), 1 to select line buffering (only usable in text mode), and an integer > 1 to indicate the size in bytes of a fixed-size chunk buffer. :param int size: Size of data to that will be transmitted. :param bool block_transfer: If block transfer should be used. :param bool force_segment: Force use of segmented download regardless of data size. :param bool request_crc_support: If crc calculation should be requested when using block transfer :returns: A file like object. """ buffer_size = buffering if buffering > 1 else io.DEFAULT_BUFFER_SIZE if "r" in mode: if block_transfer: raw_stream = BlockUploadStream(self, index, subindex, request_crc_support=request_crc_support) else: raw_stream = ReadableStream(self, index, subindex) if buffering: buffered_stream = io.BufferedReader(raw_stream, buffer_size=buffer_size) else: return raw_stream if "w" in mode: if block_transfer: raw_stream = BlockDownloadStream(self, index, subindex, size, request_crc_support=request_crc_support) else: raw_stream = WritableStream(self, index, subindex, size, force_segment) if buffering: buffered_stream = io.BufferedWriter(raw_stream, buffer_size=buffer_size) else: return raw_stream if "b" not in mode: # Text mode line_buffering = buffering == 1 return io.TextIOWrapper(buffered_stream, encoding, line_buffering=line_buffering) return buffered_stream
class ReadableStream(io.RawIOBase): """File like object for reading from a variable.""" #: Total size of data or ``None`` if not specified size = None def __init__(self, sdo_client, index, subindex=0): """ :param canopen.sdo.SdoClient sdo_client: The SDO client to use for reading. :param int index: Object dictionary index to read from. :param int subindex: Object dictionary sub-index to read from. """ self._done = False self.sdo_client = sdo_client self._toggle = 0 self.pos = 0 logger.debug("Reading 0x%04X:%02X from node %d", index, subindex, sdo_client.rx_cobid - 0x600) request = bytearray(8) SDO_STRUCT.pack_into(request, 0, REQUEST_UPLOAD, index, subindex) response = sdo_client.request_response(request) res_command, res_index, res_subindex = SDO_STRUCT.unpack_from(response) res_data = response[4:8] if res_command & 0xE0 != RESPONSE_UPLOAD: raise SdoCommunicationError(f"Unexpected response 0x{res_command:02X}") # Check that the message is for us if res_index != index or res_subindex != subindex: raise SdoCommunicationError( f"Node returned a value for {pretty_index(res_index, res_subindex)} instead, " "maybe there is another SDO client communicating " "on the same SDO channel?") self.exp_data = None if res_command & EXPEDITED: # Expedited upload if res_command & SIZE_SPECIFIED: self.size = 4 - ((res_command >> 2) & 0x3) self.exp_data = res_data[:self.size] else: self.exp_data = res_data self.pos += len(self.exp_data) elif res_command & SIZE_SPECIFIED: self.size, = struct.unpack("<L", res_data) logger.debug("Using segmented transfer of %d bytes", self.size) else: logger.debug("Using segmented transfer") def read(self, size=-1): """Read one segment which may be up to 7 bytes. :param int size: If size is -1, all data will be returned. Other values are ignored. :returns: 1 - 7 bytes of data or no bytes if EOF. :rtype: bytes """ if self._done: return b"" if self.exp_data is not None: self._done = True return self.exp_data if size is None or size < 0: return self.readall() command = REQUEST_SEGMENT_UPLOAD command |= self._toggle request = bytearray(8) request[0] = command response = self.sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if res_command & 0xE0 != RESPONSE_SEGMENT_UPLOAD: self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError(f"Unexpected response 0x{res_command:02X}") if res_command & TOGGLE_BIT != self._toggle: self.sdo_client.abort(ABORT_TOGGLE_NOT_ALTERNATED) raise SdoCommunicationError("Toggle bit mismatch") length = 7 - ((res_command >> 1) & 0x7) if res_command & NO_MORE_DATA: self._done = True self._toggle ^= TOGGLE_BIT self.pos += length return response[1:length + 1] def readinto(self, b): """ Read bytes into a pre-allocated, writable bytes-like object b, and return the number of bytes read. """ data = self.read(7) b[:len(data)] = data return len(data) def readable(self): return True def tell(self): return self.pos class WritableStream(io.RawIOBase): """File like object for writing to a variable.""" def __init__(self, sdo_client, index, subindex=0, size=None, force_segment=False): """ :param canopen.sdo.SdoClient sdo_client: The SDO client to use for communication. :param int index: Object dictionary index to read from. :param int subindex: Object dictionary sub-index to read from. :param int size: Size of data in number of bytes if known in advance. :param bool force_segment: Force use of segmented transfer regardless of size. """ self.sdo_client = sdo_client self.size = size self.pos = 0 self._toggle = 0 self._exp_header = None self._done = False if size is None or size < 1 or size > 4 or force_segment: # Initiate segmented download request = bytearray(8) command = REQUEST_DOWNLOAD if size is not None: command |= SIZE_SPECIFIED struct.pack_into("<L", request, 4, size) SDO_STRUCT.pack_into(request, 0, command, index, subindex) response = sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if res_command != RESPONSE_DOWNLOAD: self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError( f"Unexpected response 0x{res_command:02X}") else: # Expedited download # Prepare header (first 4 bytes in CAN message) command = REQUEST_DOWNLOAD | EXPEDITED | SIZE_SPECIFIED command |= (4 - size) << 2 self._exp_header = SDO_STRUCT.pack(command, index, subindex) def write(self, b): """ Write the given bytes-like object, b, to the SDO server, and return the number of bytes written. This will be at most 7 bytes. """ if self._done: raise RuntimeError("All expected data has already been transmitted") if self._exp_header is not None: # Expedited download if len(b) < self.size: # Not enough data provided return 0 if len(b) > 4: raise AssertionError("More data received than expected") data = b.tobytes() if isinstance(b, memoryview) else b request = self._exp_header + data.ljust(4, b"\x00") response = self.sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if res_command & 0xE0 != RESPONSE_DOWNLOAD: self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError( f"Unexpected response 0x{res_command:02X}") bytes_sent = len(b) self._done = True else: # Segmented download request = bytearray(8) command = REQUEST_SEGMENT_DOWNLOAD # Add toggle bit command |= self._toggle self._toggle ^= TOGGLE_BIT # Can send up to 7 bytes at a time bytes_sent = min(len(b), 7) if self.size is not None and self.pos + bytes_sent >= self.size: # No more data after this message command |= NO_MORE_DATA self._done = True # Specify number of bytes that do not contain segment data command |= (7 - bytes_sent) << 1 request[0] = command request[1:bytes_sent + 1] = b[0:bytes_sent] response = self.sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command & 0xE0 != RESPONSE_SEGMENT_DOWNLOAD: self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError( f"Unexpected response 0x{res_command:02X} " f"(expected 0x{RESPONSE_SEGMENT_DOWNLOAD:02X})") # Advance position self.pos += bytes_sent return bytes_sent def close(self): """Closes the stream. An empty segmented SDO message may be sent saying there is no more data. """ super(WritableStream, self).close() if not self._done and not self._exp_header: # Segmented download not finished command = REQUEST_SEGMENT_DOWNLOAD | NO_MORE_DATA command |= self._toggle # No data in this message command |= 7 << 1 request = bytearray(8) request[0] = command self.sdo_client.request_response(request) self._done = True def writable(self): return True def tell(self): return self.pos class BlockUploadStream(io.RawIOBase): """File like object for reading from a variable using block upload.""" #: Total size of data or ``None`` if not specified size = None blksize = 127 crc_supported = False def __init__(self, sdo_client, index, subindex=0, request_crc_support=True): """ :param canopen.sdo.SdoClient sdo_client: The SDO client to use for reading. :param int index: Object dictionary index to read from. :param int subindex: Object dictionary sub-index to read from. :param bool request_crc_support: If crc calculation should be requested when using block transfer """ self._done = False self.sdo_client = sdo_client self.pos = 0 self._crc = sdo_client.crc_cls() self._server_crc = None self._ackseq = 0 self._error = False logger.debug("Reading 0x%04X:%02X from node %d", index, subindex, sdo_client.rx_cobid - 0x600) # Initiate Block Upload request = bytearray(8) command = REQUEST_BLOCK_UPLOAD | INITIATE_BLOCK_TRANSFER if request_crc_support: command |= CRC_SUPPORTED struct.pack_into("<BHBBB", request, 0, command, index, subindex, self.blksize, 0) response = sdo_client.request_response(request) res_command, res_index, res_subindex = SDO_STRUCT.unpack_from(response) if res_command & 0xE0 != RESPONSE_BLOCK_UPLOAD: self._error = True self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError(f"Unexpected response 0x{res_command:02X}") # Check that the message is for us if res_index != index or res_subindex != subindex: self._error = True raise SdoCommunicationError( f"Node returned a value for {pretty_index(res_index, res_subindex)} instead, " "maybe there is another SDO client communicating " "on the same SDO channel?") if res_command & BLOCK_SIZE_SPECIFIED: self.size, = struct.unpack_from("<L", response, 4) logger.debug("Size is %d bytes", self.size) self.crc_supported = bool(res_command & CRC_SUPPORTED) # Start upload request = bytearray(8) request[0] = REQUEST_BLOCK_UPLOAD | START_BLOCK_UPLOAD sdo_client.send_request(request) def read(self, size=-1): """Read one segment which may be up to 7 bytes. :param int size: If size is -1, all data will be returned. Other values are ignored. :returns: 1 - 7 bytes of data or no bytes if EOF. :rtype: bytes """ if self._done: return b"" if size is None or size < 0: return self.readall() try: response = self.sdo_client.read_response() except SdoCommunicationError: response = self._retransmit() res_command, = struct.unpack_from("B", response) seqno = res_command & 0x7F if seqno == self._ackseq + 1: self._ackseq = seqno else: # Wrong sequence number response = self._retransmit() res_command, = struct.unpack_from("B", response) if self._ackseq >= self.blksize or res_command & NO_MORE_BLOCKS: self._ack_block() if res_command & NO_MORE_BLOCKS: n = self._end_upload() data = response[1:8 - n] self._done = True else: data = response[1:8] if self.crc_supported: self._crc.process(data) if self._done: if self._server_crc != self._crc.final(): self._error = True self.sdo_client.abort(ABORT_CRC_ERROR) raise SdoCommunicationError("CRC is not OK") logger.info("CRC is OK") self.pos += len(data) return data def _retransmit(self): logger.info("Only %d sequences were received. Requesting retransmission", self._ackseq) end_time = time.time() + self.sdo_client.RESPONSE_TIMEOUT self._ack_block() while time.time() < end_time: response = self.sdo_client.read_response() res_command, = struct.unpack_from("B", response) seqno = res_command & 0x7F if seqno == self._ackseq + 1: # We should be back in sync self._ackseq = seqno return response self._error = True self.sdo_client.abort(ABORT_TIMED_OUT) raise SdoCommunicationError("Some data was lost and could not be retransmitted") def _ack_block(self): request = bytearray(8) request[0] = REQUEST_BLOCK_UPLOAD | BLOCK_TRANSFER_RESPONSE request[1] = self._ackseq request[2] = self.blksize self.sdo_client.send_request(request) self._ackseq = 0 def _end_upload(self): try: response = self.sdo_client.read_response() except SdoCommunicationError: self.abort(ABORT_TIMED_OUT) raise res_command, self._server_crc = struct.unpack_from("<BH", response) if res_command & 0xE0 != RESPONSE_BLOCK_UPLOAD: self._error = True self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError(f"Unexpected response 0x{res_command:02X}") if res_command & 0x3 != END_BLOCK_TRANSFER: self._error = True self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError("Server did not end transfer as expected") # Return number of bytes not used in last message return (res_command >> 2) & 0x7 def close(self): if self.closed: return super(BlockUploadStream, self).close() if self._done and not self._error: request = bytearray(8) request[0] = REQUEST_BLOCK_UPLOAD | END_BLOCK_TRANSFER self.sdo_client.send_request(request) def tell(self): return self.pos def readinto(self, b): """ Read bytes into a pre-allocated, writable bytes-like object b, and return the number of bytes read. """ data = self.read(7) b[:len(data)] = data return len(data) def readable(self): return True class BlockDownloadStream(io.RawIOBase): """File like object for block download.""" def __init__(self, sdo_client, index, subindex=0, size=None, request_crc_support=True): """ :param canopen.sdo.SdoClient sdo_client: The SDO client to use for communication. :param int index: Object dictionary index to read from. :param int subindex: Object dictionary sub-index to read from. :param int size: Size of data in number of bytes if known in advance. :param bool request_crc_support: If crc calculation should be requested when using block transfer """ self.sdo_client = sdo_client self.size = size self.pos = 0 self._done = False self._seqno = 0 self._crc = sdo_client.crc_cls() self._last_bytes_sent = 0 self._current_block = [] self._retransmitting = False command = REQUEST_BLOCK_DOWNLOAD | INITIATE_BLOCK_TRANSFER if request_crc_support: command |= CRC_SUPPORTED request = bytearray(8) logger.info("Initiating block download for 0x%04X:%02X", index, subindex) if size is not None: logger.debug("Expected size of data is %d bytes", size) command |= BLOCK_SIZE_SPECIFIED struct.pack_into("<L", request, 4, size) else: logger.warning("Data size has not been specified") SDO_STRUCT.pack_into(request, 0, command, index, subindex) response = sdo_client.request_response(request) res_command, res_index, res_subindex = SDO_STRUCT.unpack_from(response) if res_command & 0xE0 != RESPONSE_BLOCK_DOWNLOAD: self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError( f"Unexpected response 0x{res_command:02X}") # Check that the message is for us if res_index != index or res_subindex != subindex: self.sdo_client.abort() raise SdoCommunicationError( f"Node returned a value for {pretty_index(res_index, res_subindex)} instead, " "maybe there is another SDO client communicating " "on the same SDO channel?") self._blksize, = struct.unpack_from("B", response, 4) logger.debug("Server requested a block size of %d", self._blksize) self.crc_supported = bool(res_command & CRC_SUPPORTED) # Run this last, used later to determine if initialization was successful self._initialized = True def write(self, b): """ Write the given bytes-like object, b, to the SDO server, and return the number of bytes written. This will be at most 7 bytes. :param bytes b: Data to be transmitted. :returns: Number of bytes successfully sent or ``None`` if length of data is less than 7 bytes and the total size has not been reached yet. """ if self._done: raise RuntimeError("All expected data has already been transmitted") # Can send up to 7 bytes at a time data = b[0:7] if self.size is not None and self.pos + len(data) >= self.size: # This is the last data to be transmitted based on expected size self.send(data, end=True) elif len(data) < 7: # We can't send less than 7 bytes in the middle of a transmission return None else: self.send(data) return len(data) def send(self, b, end=False): """Send up to 7 bytes of data. :param bytes b: 0 - 7 bytes of data to transmit. :param bool end: If this is the last data. """ assert len(b) <= 7, "Max 7 bytes can be sent" if not end: assert len(b) == 7, "Less than 7 bytes only allowed if last data" self._seqno += 1 command = self._seqno if end: command |= NO_MORE_BLOCKS self._done = True # Change expected ACK:ed sequence self._blksize = self._seqno # Save how many bytes this message contains since this is the last self._last_bytes_sent = len(b) request = bytearray(8) request[0] = command request[1:len(b) + 1] = b self.sdo_client.send_request(request) self.pos += len(b) # Add the sent data to the current block buffer self._current_block.append(b) # Don't calculate crc if retransmitting if self.crc_supported and not self._retransmitting: # Calculate CRC self._crc.process(b) if self._seqno >= self._blksize: # End of this block, wait for ACK self._block_ack() def tell(self): return self.pos def _block_ack(self): logger.debug("Waiting for acknowledgement of last block...") try: response = self.sdo_client.read_response() except SdoCommunicationError: self.sdo_client.abort(ABORT_TIMED_OUT) raise res_command, ackseq, blksize = struct.unpack_from("BBB", response) if res_command & 0xE0 != RESPONSE_BLOCK_DOWNLOAD: self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError( f"Unexpected response 0x{res_command:02X}") if res_command & 0x3 != BLOCK_TRANSFER_RESPONSE: self.sdo_client.abort(ABORT_INVALID_COMMAND_SPECIFIER) raise SdoCommunicationError("Server did not respond with a " "block download response") if ackseq != self._blksize: # Sequence error, try to retransmit self._retransmit(ackseq, blksize) # We should be back in sync return # Clear the current block buffer self._current_block = [] logger.debug("All %d sequences were received successfully", ackseq) logger.debug("Server requested a block size of %d", blksize) self._blksize = blksize self._seqno = 0 def _retransmit(self, ackseq, blksize): """Retransmit the failed block""" logger.info("%d of %d sequences were received. " "Will start retransmission", ackseq, self._blksize) # Sub blocks betwen ackseq and end of corrupted block need to be resent # Get the part of the block to resend block = self._current_block[ackseq:] # Go back to correct position in stream self.pos = self.pos - (len(block) * 7) # Reset the _current_block before starting the retransmission self._current_block = [] # Reset _seqno and update blksize self._seqno = 0 self._blksize = blksize # We are retransmitting self._retransmitting = True # Resend the block for b in block: self.write(b) self._retransmitting = False def close(self): """Closes the stream.""" if self.closed: return super(BlockDownloadStream, self).close() if not getattr(self, "_initialized", False): # Don't do finalization if initialization was not successful return if not self._done: logger.error("Block transfer was not finished") command = REQUEST_BLOCK_DOWNLOAD | END_BLOCK_TRANSFER # Specify number of bytes in last message that did not contain data command |= (7 - self._last_bytes_sent) << 2 request = bytearray(8) request[0] = command if self.crc_supported: # Add CRC struct.pack_into("<H", request, 1, self._crc.final()) logger.debug("Ending block transfer...") response = self.sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if not res_command & END_BLOCK_TRANSFER: raise SdoCommunicationError("Block download unsuccessful") logger.info("Block download successful") def writable(self): return True