import logging
from canopen.sdo.base import SdoBase
from canopen.sdo.constants import *
from canopen.sdo.exceptions import *
logger = logging.getLogger(__name__)
[docs]
class SdoServer(SdoBase):
"""Creates an SDO server."""
def __init__(self, rx_cobid, tx_cobid, node):
"""
:param int rx_cobid:
COB-ID that the server receives on (usually 0x600 + node ID)
:param int tx_cobid:
COB-ID that the server responds with (usually 0x580 + node ID)
:param canopen.LocalNode od:
Node object owning the server
"""
SdoBase.__init__(self, rx_cobid, tx_cobid, node.object_dictionary)
self._node = node
self._buffer = None
self._toggle = 0
self._index = None
self._subindex = None
self.last_received_error = 0x00000000
def on_request(self, can_id, data, timestamp):
command, = struct.unpack_from("B", data, 0)
ccs = command & 0xE0
try:
if ccs == REQUEST_UPLOAD:
self.init_upload(data)
elif ccs == REQUEST_SEGMENT_UPLOAD:
self.segmented_upload(command)
elif ccs == REQUEST_DOWNLOAD:
self.init_download(data)
elif ccs == REQUEST_SEGMENT_DOWNLOAD:
self.segmented_download(command, data)
elif ccs == REQUEST_BLOCK_UPLOAD:
self.block_upload(data)
elif ccs == REQUEST_BLOCK_DOWNLOAD:
self.block_download(data)
elif ccs == REQUEST_ABORTED:
self.request_aborted(data)
else:
self.abort(0x05040001)
except SdoAbortedError as exc:
self.abort(exc.code)
except KeyError as exc:
self.abort(0x06020000)
except Exception as exc:
self.abort()
logger.exception(exc)
def init_upload(self, request):
_, index, subindex = SDO_STRUCT.unpack_from(request)
self._index = index
self._subindex = subindex
res_command = RESPONSE_UPLOAD | SIZE_SPECIFIED
response = bytearray(8)
data = self._node.get_data(index, subindex, check_readable=True)
size = len(data)
if size <= 4:
logger.info("Expedited upload for 0x%X:%d", index, subindex)
res_command |= EXPEDITED
res_command |= (4 - size) << 2
response[4:4 + size] = data
else:
logger.info("Initiating segmented upload for 0x%X:%d", index, subindex)
struct.pack_into("<L", response, 4, size)
self._buffer = bytearray(data)
self._toggle = 0
SDO_STRUCT.pack_into(response, 0, res_command, index, subindex)
self.send_response(response)
def segmented_upload(self, command):
if command & TOGGLE_BIT != self._toggle:
# Toggle bit mismatch
raise SdoAbortedError(0x05030000)
data = self._buffer[:7]
size = len(data)
# Remove sent data from buffer
del self._buffer[:7]
res_command = RESPONSE_SEGMENT_UPLOAD
# Add toggle bit
res_command |= self._toggle
# Add nof bytes not used
res_command |= (7 - size) << 1
if not self._buffer:
# Nothing left in buffer
res_command |= NO_MORE_DATA
# Toggle bit for next message
self._toggle ^= TOGGLE_BIT
response = bytearray(8)
response[0] = res_command
response[1:1 + size] = data
self.send_response(response)
def block_upload(self, data):
# We currently don't support BLOCK UPLOAD
# according to CIA301 the server is allowed
# to switch to regular upload
logger.info("Received block upload, switch to regular SDO upload")
self.init_upload(data)
def request_aborted(self, data):
_, index, subindex, code = struct.unpack_from("<BHBL", data)
self.last_received_error = code
logger.info("Received request aborted for 0x%X:%d with code 0x%X", index, subindex, code)
def block_download(self, data):
# We currently don't support BLOCK DOWNLOAD
logger.error("Block download is not supported")
self.abort(0x05040001)
def init_download(self, request):
# TODO: Check if writable (now would fail on end of segmented downloads)
command, index, subindex = SDO_STRUCT.unpack_from(request)
self._index = index
self._subindex = subindex
res_command = RESPONSE_DOWNLOAD
response = bytearray(8)
if command & EXPEDITED:
logger.info("Expedited download for 0x%X:%d", index, subindex)
if command & SIZE_SPECIFIED:
size = 4 - ((command >> 2) & 0x3)
else:
size = 4
self._node.set_data(index, subindex, request[4:4 + size], check_writable=True)
else:
logger.info("Initiating segmented download for 0x%X:%d", index, subindex)
if command & SIZE_SPECIFIED:
size, = struct.unpack_from("<L", request, 4)
logger.info("Size is %d bytes", size)
self._buffer = bytearray()
self._toggle = 0
SDO_STRUCT.pack_into(response, 0, res_command, index, subindex)
self.send_response(response)
def segmented_download(self, command, request):
if command & TOGGLE_BIT != self._toggle:
# Toggle bit mismatch
raise SdoAbortedError(0x05030000)
last_byte = 8 - ((command >> 1) & 0x7)
self._buffer.extend(request[1:last_byte])
if command & NO_MORE_DATA:
self._node.set_data(self._index,
self._subindex,
self._buffer,
check_writable=True)
res_command = RESPONSE_SEGMENT_DOWNLOAD
# Add toggle bit
res_command |= self._toggle
# Toggle bit for next message
self._toggle ^= TOGGLE_BIT
response = bytearray(8)
response[0] = res_command
self.send_response(response)
def send_response(self, response):
self.network.send_message(self.tx_cobid, response)
[docs]
def abort(self, abort_code=0x08000000):
"""Abort current transfer."""
data = struct.pack("<BHBL", RESPONSE_ABORTED,
self._index, self._subindex, abort_code)
self.send_response(data)
# logger.error("Transfer aborted with code 0x{:08X}".format(abort_code))
[docs]
def upload(self, index: int, subindex: int) -> bytes:
"""May be called to make a read operation without an Object Dictionary.
:param index:
Index of object to read.
:param subindex:
Sub-index of object to read.
:return: A data object.
:raises canopen.SdoAbortedError:
When node responds with an error.
"""
return self._node.get_data(index, subindex)
[docs]
def download(
self,
index: int,
subindex: int,
data: bytes,
force_segment: bool = False,
):
"""May be called to make a write operation without an Object Dictionary.
:param index:
Index of object to write.
:param subindex:
Sub-index of object to write.
:param data:
Data to be written.
:raises canopen.SdoAbortedError:
When node responds with an error.
"""
return self._node.set_data(index, subindex, data)