import logging
import time
import struct
try:
import queue
except ImportError:
import Queue as queue
logger = logging.getLogger(__name__)
# Command Specifier (CS)
CS_SWITCH_STATE_GLOBAL = 0x04
CS_CONFIGURE_NODE_ID = 0x11
CS_CONFIGURE_BIT_TIMING = 0x13
CS_ACTIVATE_BIT_TIMING = 0x15
CS_STORE_CONFIGURATION = 0x17
CS_SWITCH_STATE_SELECTIVE_VENDOR_ID = 0x40
CS_SWITCH_STATE_SELECTIVE_PRODUCT_CODE = 0x41
CS_SWITCH_STATE_SELECTIVE_REVISION_NUMBER = 0x42
CS_SWITCH_STATE_SELECTIVE_SERIAL_NUMBER = 0x43
CS_SWITCH_STATE_SELECTIVE_RESPONSE = 0x44
CS_IDENTIFY_REMOTE_SLAVE_VENDOR_ID = 0x46 # m -> s
CS_IDENTIFY_REMOTE_SLAVE_PRODUCT_CODE = 0x47 # m -> s
CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_LOW = 0x48 # m -> s
CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_HIGH = 0x49 # m -> s
CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_LOW = 0x4A # m -> s
CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_HIGH = 0x4B # m -> s
CS_IDENTIFY_NON_CONFIGURED_REMOTE_SLAVE = 0x4C # m -> s
CS_IDENTIFY_SLAVE = 0x4F # s -> m
CS_IDENTIFY_NON_CONFIGURED_SLAVE = 0x50 # s -> m
CS_FAST_SCAN = 0x51 # m -> s
CS_INQUIRE_VENDOR_ID = 0x5A
CS_INQUIRE_PRODUCT_CODE = 0x5B
CS_INQUIRE_REVISION_NUMBER = 0x5C
CS_INQUIRE_SERIAL_NUMBER = 0x5D
CS_INQUIRE_NODE_ID = 0x5E
# obsolete
SWITCH_MODE_GLOBAL = 0x04
CONFIGURE_NODE_ID = 0x11
CONFIGURE_BIT_TIMING = 0x13
STORE_CONFIGURATION = 0x17
INQUIRE_NODE_ID = 0x5E
ERROR_NONE = 0
ERROR_INADMISSIBLE = 1
ERROR_STORE_NONE = 0
ERROR_STORE_NOT_SUPPORTED = 1
ERROR_STORE_ACCESS_PROBLEM = 2
ERROR_VENDOR_SPECIFIC = 0xff
ListMessageNeedResponse = [
CS_CONFIGURE_NODE_ID,
CS_CONFIGURE_BIT_TIMING,
CS_STORE_CONFIGURATION,
CS_SWITCH_STATE_SELECTIVE_SERIAL_NUMBER,
CS_FAST_SCAN,
CS_INQUIRE_VENDOR_ID,
CS_INQUIRE_PRODUCT_CODE,
CS_INQUIRE_REVISION_NUMBER,
CS_INQUIRE_SERIAL_NUMBER,
CS_INQUIRE_NODE_ID,
]
[docs]
class LssMaster:
"""The Master of Layer Setting Services"""
LSS_TX_COBID = 0x7E5
LSS_RX_COBID = 0x7E4
WAITING_STATE = 0x00
CONFIGURATION_STATE = 0x01
# obsolete
NORMAL_MODE = 0x00
CONFIGURATION_MODE = 0x01
#: Max time in seconds to wait for response from server
RESPONSE_TIMEOUT = 0.5
def __init__(self):
self.network = None
self._node_id = 0
self._data = None
self.responses = queue.Queue()
[docs]
def send_switch_state_global(self, mode):
"""switch mode to CONFIGURATION_STATE or WAITING_STATE
in the all slaves on CAN bus.
There is no reply for this request
:param int mode:
CONFIGURATION_STATE or WAITING_STATE
"""
# LSS messages are always a full 8 bytes long.
# Unused bytes are reserved and should be initialized with 0.
message = bytearray(8)
message[0] = CS_SWITCH_STATE_GLOBAL
message[1] = mode
self.__send_command(message)
[docs]
def send_switch_mode_global(self, mode):
"""obsolete"""
self.send_switch_state_global(mode)
[docs]
def send_switch_state_selective(self,
vendorId, productCode, revisionNumber, serialNumber):
"""switch mode from WAITING_STATE to CONFIGURATION_STATE
only if 128bits LSS address matches with the arguments.
It sends 4 messages for each argument.
Then wait the response from the slave.
There will be no response if there is no matching slave
:param int vendorId:
object index 0x1018 subindex 1
:param int productCode:
object index 0x1018 subindex 2
:param int revisionNumber:
object index 0x1018 subindex 3
:param int serialNumber:
object index 0x1018 subindex 4
:return:
True if any slave responds.
False if there is no response.
:rtype: bool
"""
self.__send_lss_address(CS_SWITCH_STATE_SELECTIVE_VENDOR_ID, vendorId)
self.__send_lss_address(CS_SWITCH_STATE_SELECTIVE_PRODUCT_CODE, productCode)
self.__send_lss_address(CS_SWITCH_STATE_SELECTIVE_REVISION_NUMBER, revisionNumber)
response = self.__send_lss_address(CS_SWITCH_STATE_SELECTIVE_SERIAL_NUMBER, serialNumber)
cs = struct.unpack_from("<B", response)[0]
if cs == CS_SWITCH_STATE_SELECTIVE_RESPONSE:
return True
return False
[docs]
def inquire_node_id(self):
"""Read the node id.
CANopen node id must be within the range from 1 to 127.
:return:
node id. 0 means it is not read by LSS protocol
:rtype: int
"""
return self.__send_inquire_node_id()
[docs]
def inquire_lss_address(self, req_cs):
"""Read the part of LSS address.
VENDOR_ID, PRODUCT_CODE, REVISION_NUMBER, or SERIAL_NUMBER
:param int req_cs:
command specifier for request
:return:
part of LSS address
:rtype: int
"""
return self.__send_inquire_lss_address(req_cs)
[docs]
def activate_bit_timing(self, switch_delay_ms):
"""Activate the bit timing.
:param uint16_t switch_delay_ms:
The slave that receives this message waits for switch delay,
then activate the bit timing. But it shouldn't send any message
until another switch delay is elapsed.
"""
message = bytearray(8)
message[0] = CS_ACTIVATE_BIT_TIMING
message[1:3] = struct.pack('<H', switch_delay_ms)
self.__send_command(message)
[docs]
def store_configuration(self):
"""Store node id and baud rate.
"""
self.__send_configure(CS_STORE_CONFIGURATION)
[docs]
def send_identify_remote_slave(self,
vendorId, productCode,
revisionNumberLow, revisionNumberHigh,
serialNumberLow, serialNumberHigh):
"""This command sends the range of LSS address to find the slave nodes
in the specified range
:param int vendorId:
:param int productCode:
:param int revisionNumberLow:
:param int revisionNumberHigh:
:param int serialNumberLow:
:param int serialNumberHigh:
:return:
True if any slave responds.
False if there is no response.
:rtype: bool
"""
# TODO it should handle the multiple respones from slaves
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_VENDOR_ID, vendorId)
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_PRODUCT_CODE, productCode)
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_LOW, revisionNumberLow)
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_HIGH, revisionNumberHigh)
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_LOW, serialNumberLow)
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_HIGH, serialNumberHigh)
def send_identify_non_configured_remote_slave(self):
# TODO it should handle the multiple respones from slaves
message = bytearray(8)
message[0] = CS_IDENTIFY_NON_CONFIGURED_REMOTE_SLAVE
self.__send_command(message)
[docs]
def fast_scan(self):
"""This command sends a series of fastscan message
to find unconfigured slave with lowest number of LSS idenities
:return:
True if a slave is found.
False if there is no candidate.
list is the LSS identities [vendor_id, product_code, revision_number, serial_number]
:rtype: bool, list
"""
lss_id = [0] * 4
lss_bit_check = 128
lss_sub = 0
lss_next = 0
if self.__send_fast_scan_message(lss_id[0], lss_bit_check, lss_sub, lss_next):
time.sleep(0.01)
while lss_sub < 4:
lss_bit_check = 32
while lss_bit_check > 0:
lss_bit_check -= 1
if not self.__send_fast_scan_message(lss_id[lss_sub], lss_bit_check, lss_sub, lss_next):
lss_id[lss_sub] |= 1<<lss_bit_check
time.sleep(0.01)
lss_next = (lss_sub + 1) & 3
if not self.__send_fast_scan_message(lss_id[lss_sub], lss_bit_check, lss_sub, lss_next):
return False, None
time.sleep(0.01)
# Now the next 32 bits will be scanned
lss_sub += 1
# Now lss_id contains the entire 128 bits scanned
return True, lss_id
return False, None
def __send_fast_scan_message(self, id_number, bit_checker, lss_sub, lss_next):
message = bytearray(8)
message[0:8] = struct.pack('<BIBBB', CS_FAST_SCAN, id_number, bit_checker, lss_sub, lss_next)
try:
recv_msg = self.__send_command(message)
except LssError:
return False
cs = struct.unpack_from("<B", recv_msg)[0]
if cs == CS_IDENTIFY_SLAVE:
return True
return False
def __send_lss_address(self, req_cs, number):
message = bytearray(8)
message[0] = req_cs
message[1:5] = struct.pack('<I', number)
response = self.__send_command(message)
# some device needs these delays between messages
# because it can't handle messages arriving with no delay
time.sleep(0.2)
return response
def __send_inquire_node_id(self):
"""
:return:
Current node id
:rtype: int
"""
message = bytearray(8)
message[0] = CS_INQUIRE_NODE_ID
response = self.__send_command(message)
cs, current_node_id = struct.unpack_from("<BB", response)
if cs != CS_INQUIRE_NODE_ID:
raise LssError("Response message is not for the request")
return current_node_id
def __send_inquire_lss_address(self, req_cs):
"""
:return:
part of address. e.g., vendor ID or product code, ..
:rtype: int
"""
message = bytearray(8)
message[0] = req_cs
response = self.__send_command(message)
res_cs, part_of_address = struct.unpack_from("<BI", response)
if res_cs != req_cs:
raise LssError("Response message is not for the request")
return part_of_address
def __send_configure(self, req_cs, value1=0, value2=0):
"""Send a message to set a key with values"""
message = bytearray(8)
message[0] = req_cs
message[1] = value1
message[2] = value2
response = self.__send_command(message)
res_cs, error_code = struct.unpack_from("<BB", response)
if res_cs != req_cs:
raise LssError("Response message is not for the request")
if error_code != ERROR_NONE:
error_msg = "LSS Error: %d" % error_code
raise LssError(error_msg)
def __send_command(self, message):
"""Send a LSS operation code to the network
:param bytearray message:
LSS request message.
:return:
response
None if there is no response
:rtype: bytes
"""
message_str = " ".join(["{:02x}".format(x) for x in message])
logger.info(
"Sending LSS message {}".format(message_str))
response = None
if not self.responses.empty():
logger.info("There were unexpected messages in the queue")
self.responses = queue.Queue()
self.network.send_message(self.LSS_TX_COBID, message)
if not bool(message[0] in ListMessageNeedResponse):
return response
# Wait for the slave to respond
# TODO check if the response is LSS response message
try:
response = self.responses.get(
block=True, timeout=self.RESPONSE_TIMEOUT)
except queue.Empty:
raise LssError("No LSS response received")
return response
def on_message_received(self, can_id, data, timestamp):
self.responses.put(bytes(data))
[docs]
class LssError(Exception):
"""Some LSS operation failed."""