Source code for canopen.sdo.client

import struct
import logging
import io
import time
try:
    import queue
except ImportError:
    import Queue as queue

from canopen.network import CanError
from canopen import objectdictionary
from canopen.sdo.base import SdoBase
from canopen.sdo.constants import *
from canopen.sdo.exceptions import *

logger = logging.getLogger(__name__)


[docs] class SdoClient(SdoBase): """Handles communication with an SDO server.""" #: Max time in seconds to wait for response from server RESPONSE_TIMEOUT = 0.3 #: Max number of request retries before raising error MAX_RETRIES = 1 #: Seconds to wait before sending a request, for rate limiting PAUSE_BEFORE_SEND = 0.0 def __init__(self, rx_cobid, tx_cobid, od): """ :param int rx_cobid: COB-ID that the server receives on (usually 0x600 + node ID) :param int tx_cobid: COB-ID that the server responds with (usually 0x580 + node ID) :param canopen.ObjectDictionary od: Object Dictionary to use for communication """ SdoBase.__init__(self, rx_cobid, tx_cobid, od) self.responses = queue.Queue() def on_response(self, can_id, data, timestamp): self.responses.put(bytes(data)) def send_request(self, request): retries_left = self.MAX_RETRIES while True: try: if self.PAUSE_BEFORE_SEND: time.sleep(self.PAUSE_BEFORE_SEND) self.network.send_message(self.rx_cobid, request) except CanError as e: # Could be a buffer overflow. Wait some time before trying again retries_left -= 1 if not retries_left: raise logger.info(str(e)) time.sleep(0.1) else: break def read_response(self): try: response = self.responses.get( block=True, timeout=self.RESPONSE_TIMEOUT) except queue.Empty: raise SdoCommunicationError("No SDO response received") res_command, = struct.unpack_from("B", response) if res_command == RESPONSE_ABORTED: abort_code, = struct.unpack_from("<L", response, 4) raise SdoAbortedError(abort_code) return response def request_response(self, sdo_request): retries_left = self.MAX_RETRIES if not self.responses.empty(): # logger.warning("There were unexpected messages in the queue") self.responses = queue.Queue() while True: self.send_request(sdo_request) # Wait for node to respond try: return self.read_response() except SdoCommunicationError as e: retries_left -= 1 if not retries_left: self.abort(0x5040000) raise logger.warning(str(e))
[docs] def abort(self, abort_code=0x08000000): """Abort current transfer.""" request = bytearray(8) request[0] = REQUEST_ABORTED # TODO: Is it necessary to include index and subindex? struct.pack_into("<L", request, 4, abort_code) self.send_request(request) logger.error("Transfer aborted by client with code 0x{:08X}".format(abort_code))
[docs] def upload(self, index: int, subindex: int) -> bytes: """May be called to make a read operation without an Object Dictionary. :param index: Index of object to read. :param subindex: Sub-index of object to read. :return: A data object. :raises canopen.SdoCommunicationError: On unexpected response or timeout. :raises canopen.SdoAbortedError: When node responds with an error. """ with self.open(index, subindex, buffering=0) as fp: response_size = fp.size data = fp.read() # If size is available through variable in OD, then use the smaller of the two sizes. # Some devices send U32/I32 even if variable is smaller in OD var = self.od.get_variable(index, subindex) if var is not None: # Found a matching variable in OD # If this is a data type (string, domain etc) the size is # unknown anyway so keep the data as is if var.data_type not in objectdictionary.DATA_TYPES: # Get the size in bytes for this variable var_size = len(var) // 8 if response_size is None or var_size < response_size: # Truncate the data to specified size data = data[0:var_size] return data
[docs] def download( self, index: int, subindex: int, data: bytes, force_segment: bool = False, ) -> None: """May be called to make a write operation without an Object Dictionary. :param index: Index of object to write. :param subindex: Sub-index of object to write. :param data: Data to be written. :param force_segment: Force use of segmented transfer regardless of data size. :raises canopen.SdoCommunicationError: On unexpected response or timeout. :raises canopen.SdoAbortedError: When node responds with an error. """ with self.open(index, subindex, "wb", buffering=7, size=len(data), force_segment=force_segment) as fp: fp.write(data)
[docs] def open(self, index, subindex=0, mode="rb", encoding="ascii", buffering=1024, size=None, block_transfer=False, force_segment=False, request_crc_support=True): """Open the data stream as a file like object. :param int index: Index of object to open. :param int subindex: Sub-index of object to open. :param str mode: ========= ========================================================== Character Meaning --------- ---------------------------------------------------------- 'r' open for reading (default) 'w' open for writing 'b' binary mode (default) 't' text mode ========= ========================================================== :param str encoding: The str name of the encoding used to decode or encode the file. This will only be used in text mode. :param int buffering: An optional integer used to set the buffering policy. Pass 0 to switch buffering off (only allowed in binary mode), 1 to select line buffering (only usable in text mode), and an integer > 1 to indicate the size in bytes of a fixed-size chunk buffer. :param int size: Size of data to that will be transmitted. :param bool block_transfer: If block transfer should be used. :param bool force_segment: Force use of segmented download regardless of data size. :param bool request_crc_support: If crc calculation should be requested when using block transfer :returns: A file like object. """ buffer_size = buffering if buffering > 1 else io.DEFAULT_BUFFER_SIZE if "r" in mode: if block_transfer: raw_stream = BlockUploadStream(self, index, subindex, request_crc_support=request_crc_support) else: raw_stream = ReadableStream(self, index, subindex) if buffering: buffered_stream = io.BufferedReader(raw_stream, buffer_size=buffer_size) else: return raw_stream if "w" in mode: if block_transfer: raw_stream = BlockDownloadStream(self, index, subindex, size, request_crc_support=request_crc_support) else: raw_stream = WritableStream(self, index, subindex, size, force_segment) if buffering: buffered_stream = io.BufferedWriter(raw_stream, buffer_size=buffer_size) else: return raw_stream if "b" not in mode: # Text mode line_buffering = buffering == 1 return io.TextIOWrapper(buffered_stream, encoding, line_buffering=line_buffering) return buffered_stream
class ReadableStream(io.RawIOBase): """File like object for reading from a variable.""" #: Total size of data or ``None`` if not specified size = None def __init__(self, sdo_client, index, subindex=0): """ :param canopen.sdo.SdoClient sdo_client: The SDO client to use for reading. :param int index: Object dictionary index to read from. :param int subindex: Object dictionary sub-index to read from. """ self._done = False self.sdo_client = sdo_client self._toggle = 0 self.pos = 0 logger.debug("Reading 0x%X:%d from node %d", index, subindex, sdo_client.rx_cobid - 0x600) request = bytearray(8) SDO_STRUCT.pack_into(request, 0, REQUEST_UPLOAD, index, subindex) response = sdo_client.request_response(request) res_command, res_index, res_subindex = SDO_STRUCT.unpack_from(response) res_data = response[4:8] if res_command & 0xE0 != RESPONSE_UPLOAD: raise SdoCommunicationError("Unexpected response 0x%02X" % res_command) # Check that the message is for us if res_index != index or res_subindex != subindex: raise SdoCommunicationError(( "Node returned a value for 0x{:X}:{:d} instead, " "maybe there is another SDO client communicating " "on the same SDO channel?").format(res_index, res_subindex)) self.exp_data = None if res_command & EXPEDITED: # Expedited upload if res_command & SIZE_SPECIFIED: self.size = 4 - ((res_command >> 2) & 0x3) self.exp_data = res_data[:self.size] else: self.exp_data = res_data self.pos += len(self.exp_data) elif res_command & SIZE_SPECIFIED: self.size, = struct.unpack("<L", res_data) logger.debug("Using segmented transfer of %d bytes", self.size) else: logger.debug("Using segmented transfer") def read(self, size=-1): """Read one segment which may be up to 7 bytes. :param int size: If size is -1, all data will be returned. Other values are ignored. :returns: 1 - 7 bytes of data or no bytes if EOF. :rtype: bytes """ if self._done: return b"" if self.exp_data is not None: self._done = True return self.exp_data if size is None or size < 0: return self.readall() command = REQUEST_SEGMENT_UPLOAD command |= self._toggle request = bytearray(8) request[0] = command response = self.sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if res_command & 0xE0 != RESPONSE_SEGMENT_UPLOAD: raise SdoCommunicationError("Unexpected response 0x%02X" % res_command) if res_command & TOGGLE_BIT != self._toggle: raise SdoCommunicationError("Toggle bit mismatch") length = 7 - ((res_command >> 1) & 0x7) if res_command & NO_MORE_DATA: self._done = True self._toggle ^= TOGGLE_BIT self.pos += length return response[1:length + 1] def readinto(self, b): """ Read bytes into a pre-allocated, writable bytes-like object b, and return the number of bytes read. """ data = self.read(7) b[:len(data)] = data return len(data) def readable(self): return True def tell(self): return self.pos class WritableStream(io.RawIOBase): """File like object for writing to a variable.""" def __init__(self, sdo_client, index, subindex=0, size=None, force_segment=False): """ :param canopen.sdo.SdoClient sdo_client: The SDO client to use for communication. :param int index: Object dictionary index to read from. :param int subindex: Object dictionary sub-index to read from. :param int size: Size of data in number of bytes if known in advance. :param bool force_segment: Force use of segmented transfer regardless of size. """ self.sdo_client = sdo_client self.size = size self.pos = 0 self._toggle = 0 self._exp_header = None self._done = False if size is None or size > 4 or force_segment: # Initiate segmented download request = bytearray(8) command = REQUEST_DOWNLOAD if size is not None: command |= SIZE_SPECIFIED struct.pack_into("<L", request, 4, size) SDO_STRUCT.pack_into(request, 0, command, index, subindex) response = sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if res_command != RESPONSE_DOWNLOAD: raise SdoCommunicationError( "Unexpected response 0x%02X" % res_command) else: # Expedited download # Prepare header (first 4 bytes in CAN message) command = REQUEST_DOWNLOAD | EXPEDITED | SIZE_SPECIFIED command |= (4 - size) << 2 self._exp_header = SDO_STRUCT.pack(command, index, subindex) def write(self, b): """ Write the given bytes-like object, b, to the SDO server, and return the number of bytes written. This will be at most 7 bytes. """ if self._done: raise RuntimeError("All expected data has already been transmitted") if self._exp_header is not None: # Expedited download if len(b) < self.size: # Not enough data provided return 0 if len(b) > 4: raise AssertionError("More data received than expected") data = b.tobytes() if isinstance(b, memoryview) else b request = self._exp_header + data.ljust(4, b"\x00") response = self.sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if res_command & 0xE0 != RESPONSE_DOWNLOAD: raise SdoCommunicationError( "Unexpected response 0x%02X" % res_command) bytes_sent = len(b) self._done = True else: # Segmented download request = bytearray(8) command = REQUEST_SEGMENT_DOWNLOAD # Add toggle bit command |= self._toggle self._toggle ^= TOGGLE_BIT # Can send up to 7 bytes at a time bytes_sent = min(len(b), 7) if self.size is not None and self.pos + bytes_sent >= self.size: # No more data after this message command |= NO_MORE_DATA self._done = True # Specify number of bytes that do not contain segment data command |= (7 - bytes_sent) << 1 request[0] = command request[1:bytes_sent + 1] = b[0:bytes_sent] response = self.sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command & 0xE0 != RESPONSE_SEGMENT_DOWNLOAD: raise SdoCommunicationError( "Unexpected response 0x%02X (expected 0x%02X)" % (res_command, RESPONSE_SEGMENT_DOWNLOAD)) # Advance position self.pos += bytes_sent return bytes_sent def close(self): """Closes the stream. An empty segmented SDO message may be sent saying there is no more data. """ super(WritableStream, self).close() if not self._done and not self._exp_header: # Segmented download not finished command = REQUEST_SEGMENT_DOWNLOAD | NO_MORE_DATA command |= self._toggle # No data in this message command |= 7 << 1 request = bytearray(8) request[0] = command self.sdo_client.request_response(request) self._done = True def writable(self): return True def tell(self): return self.pos class BlockUploadStream(io.RawIOBase): """File like object for reading from a variable using block upload.""" #: Total size of data or ``None`` if not specified size = None blksize = 127 crc_supported = False def __init__(self, sdo_client, index, subindex=0, request_crc_support=True): """ :param canopen.sdo.SdoClient sdo_client: The SDO client to use for reading. :param int index: Object dictionary index to read from. :param int subindex: Object dictionary sub-index to read from. :param bool request_crc_support: If crc calculation should be requested when using block transfer """ self._done = False self.sdo_client = sdo_client self.pos = 0 self._crc = sdo_client.crc_cls() self._server_crc = None self._ackseq = 0 logger.debug("Reading 0x%X:%d from node %d", index, subindex, sdo_client.rx_cobid - 0x600) # Initiate Block Upload request = bytearray(8) command = REQUEST_BLOCK_UPLOAD | INITIATE_BLOCK_TRANSFER if request_crc_support: command |= CRC_SUPPORTED struct.pack_into("<BHBBB", request, 0, command, index, subindex, self.blksize, 0) response = sdo_client.request_response(request) res_command, res_index, res_subindex = SDO_STRUCT.unpack_from(response) if res_command & 0xE0 != RESPONSE_BLOCK_UPLOAD: raise SdoCommunicationError("Unexpected response 0x%02X" % res_command) # Check that the message is for us if res_index != index or res_subindex != subindex: raise SdoCommunicationError(( "Node returned a value for 0x{:X}:{:d} instead, " "maybe there is another SDO client communicating " "on the same SDO channel?").format(res_index, res_subindex)) if res_command & BLOCK_SIZE_SPECIFIED: self.size, = struct.unpack_from("<L", response, 4) logger.debug("Size is %d bytes", self.size) self.crc_supported = bool(res_command & CRC_SUPPORTED) # Start upload request = bytearray(8) request[0] = REQUEST_BLOCK_UPLOAD | START_BLOCK_UPLOAD sdo_client.send_request(request) def read(self, size=-1): """Read one segment which may be up to 7 bytes. :param int size: If size is -1, all data will be returned. Other values are ignored. :returns: 1 - 7 bytes of data or no bytes if EOF. :rtype: bytes """ if self._done: return b"" if size is None or size < 0: return self.readall() try: response = self.sdo_client.read_response() except SdoCommunicationError: response = self._retransmit() res_command, = struct.unpack_from("B", response) seqno = res_command & 0x7F if seqno == self._ackseq + 1: self._ackseq = seqno else: # Wrong sequence number response = self._retransmit() res_command, = struct.unpack_from("B", response) if self._ackseq >= self.blksize or res_command & NO_MORE_BLOCKS: self._ack_block() if res_command & NO_MORE_BLOCKS: n = self._end_upload() data = response[1:8 - n] self._done = True else: data = response[1:8] if self.crc_supported: self._crc.process(data) if self._done: if self._server_crc != self._crc.final(): self.sdo_client.abort(0x05040004) raise SdoCommunicationError("CRC is not OK") logger.info("CRC is OK") self.pos += len(data) return data def _retransmit(self): logger.info("Only %d sequences were received. Requesting retransmission", self._ackseq) end_time = time.time() + self.sdo_client.RESPONSE_TIMEOUT self._ack_block() while time.time() < end_time: response = self.sdo_client.read_response() res_command, = struct.unpack_from("B", response) seqno = res_command & 0x7F if seqno == self._ackseq + 1: # We should be back in sync self._ackseq = seqno return response raise SdoCommunicationError("Some data were lost and could not be retransmitted") def _ack_block(self): request = bytearray(8) request[0] = REQUEST_BLOCK_UPLOAD | BLOCK_TRANSFER_RESPONSE request[1] = self._ackseq request[2] = self.blksize self.sdo_client.send_request(request) if self._ackseq == self.blksize: self._ackseq = 0 def _end_upload(self): response = self.sdo_client.read_response() res_command, self._server_crc = struct.unpack_from("<BH", response) if res_command & 0xE0 != RESPONSE_BLOCK_UPLOAD: self.sdo_client.abort(0x05040001) raise SdoCommunicationError("Unexpected response 0x%02X" % res_command) if res_command & 0x3 != END_BLOCK_TRANSFER: self.sdo_client.abort(0x05040001) raise SdoCommunicationError("Server did not end transfer as expected") # Return number of bytes not used in last message return (res_command >> 2) & 0x7 def close(self): if self.closed: return super(BlockUploadStream, self).close() if self._done: request = bytearray(8) request[0] = REQUEST_BLOCK_UPLOAD | END_BLOCK_TRANSFER self.sdo_client.send_request(request) def tell(self): return self.pos def readinto(self, b): """ Read bytes into a pre-allocated, writable bytes-like object b, and return the number of bytes read. """ data = self.read(7) b[:len(data)] = data return len(data) def readable(self): return True class BlockDownloadStream(io.RawIOBase): """File like object for block download.""" def __init__(self, sdo_client, index, subindex=0, size=None, request_crc_support=True): """ :param canopen.sdo.SdoClient sdo_client: The SDO client to use for communication. :param int index: Object dictionary index to read from. :param int subindex: Object dictionary sub-index to read from. :param int size: Size of data in number of bytes if known in advance. :param bool request_crc_support: If crc calculation should be requested when using block transfer """ self.sdo_client = sdo_client self.size = size self.pos = 0 self._done = False self._seqno = 0 self._crc = sdo_client.crc_cls() self._last_bytes_sent = 0 self._current_block = [] self._retransmitting = False command = REQUEST_BLOCK_DOWNLOAD | INITIATE_BLOCK_TRANSFER if request_crc_support: command |= CRC_SUPPORTED request = bytearray(8) logger.info("Initiating block download for 0x%X:%d", index, subindex) if size is not None: logger.debug("Expected size of data is %d bytes", size) command |= BLOCK_SIZE_SPECIFIED struct.pack_into("<L", request, 4, size) else: logger.warning("Data size has not been specified") SDO_STRUCT.pack_into(request, 0, command, index, subindex) response = sdo_client.request_response(request) res_command, res_index, res_subindex = SDO_STRUCT.unpack_from(response) if res_command & 0xE0 != RESPONSE_BLOCK_DOWNLOAD: self.sdo_client.abort(0x05040001) raise SdoCommunicationError( "Unexpected response 0x%02X" % res_command) # Check that the message is for us if res_index != index or res_subindex != subindex: self.sdo_client.abort() raise SdoCommunicationError(( "Node returned a value for 0x{:X}:{:d} instead, " "maybe there is another SDO client communicating " "on the same SDO channel?").format(res_index, res_subindex)) self._blksize, = struct.unpack_from("B", response, 4) logger.debug("Server requested a block size of %d", self._blksize) self.crc_supported = bool(res_command & CRC_SUPPORTED) def write(self, b): """ Write the given bytes-like object, b, to the SDO server, and return the number of bytes written. This will be at most 7 bytes. :param bytes b: Data to be transmitted. :returns: Number of bytes successfully sent or ``None`` if length of data is less than 7 bytes and the total size has not been reached yet. """ if self._done: raise RuntimeError("All expected data has already been transmitted") # Can send up to 7 bytes at a time data = b[0:7] if self.size is not None and self.pos + len(data) >= self.size: # This is the last data to be transmitted based on expected size self.send(data, end=True) elif len(data) < 7: # We can't send less than 7 bytes in the middle of a transmission return None else: self.send(data) return len(data) def send(self, b, end=False): """Send up to 7 bytes of data. :param bytes b: 0 - 7 bytes of data to transmit. :param bool end: If this is the last data. """ assert len(b) <= 7, "Max 7 bytes can be sent" if not end: assert len(b) == 7, "Less than 7 bytes only allowed if last data" self._seqno += 1 command = self._seqno if end: command |= NO_MORE_BLOCKS self._done = True # Change expected ACK:ed sequence self._blksize = self._seqno # Save how many bytes this message contains since this is the last self._last_bytes_sent = len(b) request = bytearray(8) request[0] = command request[1:len(b) + 1] = b self.sdo_client.send_request(request) self.pos += len(b) # Add the sent data to the current block buffer self._current_block.append(b) # Don't calculate crc if retransmitting if self.crc_supported and not self._retransmitting: # Calculate CRC self._crc.process(b) if self._seqno >= self._blksize: # End of this block, wait for ACK self._block_ack() def tell(self): return self.pos def _block_ack(self): logger.debug("Waiting for acknowledgement of last block...") response = self.sdo_client.read_response() res_command, ackseq, blksize = struct.unpack_from("BBB", response) if res_command & 0xE0 != RESPONSE_BLOCK_DOWNLOAD: self.sdo_client.abort(0x05040001) raise SdoCommunicationError( "Unexpected response 0x%02X" % res_command) if res_command & 0x3 != BLOCK_TRANSFER_RESPONSE: self.sdo_client.abort(0x05040001) raise SdoCommunicationError("Server did not respond with a " "block download response") if ackseq != self._blksize: # Sequence error, try to retransmit self._retransmit(ackseq, blksize) # We should be back in sync return # Clear the current block buffer self._current_block = [] logger.debug("All %d sequences were received successfully", ackseq) logger.debug("Server requested a block size of %d", blksize) self._blksize = blksize self._seqno = 0 def _retransmit(self, ackseq, blksize): """Retransmit the failed block""" logger.info(("%d of %d sequences were received. " "Will start retransmission") % (ackseq, self._blksize)) # Sub blocks betwen ackseq and end of corrupted block need to be resent # Get the part of the block to resend block = self._current_block[ackseq:] # Go back to correct position in stream self.pos = self.pos - (len(block) * 7) # Reset the _current_block before starting the retransmission self._current_block = [] # Reset _seqno and update blksize self._seqno = 0 self._blksize = blksize # We are retransmitting self._retransmitting = True # Resend the block for b in block: self.write(b) self._retransmitting = False def close(self): """Closes the stream.""" if self.closed: return super(BlockDownloadStream, self).close() if not self._done: logger.error("Block transfer was not finished") command = REQUEST_BLOCK_DOWNLOAD | END_BLOCK_TRANSFER # Specify number of bytes in last message that did not contain data command |= (7 - self._last_bytes_sent) << 2 request = bytearray(8) request[0] = command if self.crc_supported: # Add CRC struct.pack_into("<H", request, 1, self._crc.final()) logger.debug("Ending block transfer...") response = self.sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if not res_command & END_BLOCK_TRANSFER: raise SdoCommunicationError("Block download unsuccessful") logger.info("Block download successful") def writable(self): return True