[docs]classLocalNode(BaseNode):def__init__(self,node_id:int,object_dictionary:Union[ObjectDictionary,str],):super(LocalNode,self).__init__(node_id,object_dictionary)self.data_store:Dict[int,Dict[int,bytes]]={}self._read_callbacks=[]self._write_callbacks=[]self.sdo=SdoServer(0x600+self.id,0x580+self.id,self)self.tpdo=TPDO(self)self.rpdo=RPDO(self)self.pdo=PDO(self,self.rpdo,self.tpdo)self.nmt=NmtSlave(self.id,self)# Let self.nmt handle writes for 0x1017self.add_write_callback(self.nmt.on_write)self.emcy=EmcyProducer(0x80+self.id)defassociate_network(self,network:canopen.network.Network):ifself.has_network():raiseRuntimeError("Node is already associated with a network")self.network=networkself.sdo.network=networkself.tpdo.network=networkself.rpdo.network=networkself.nmt.network=networkself.emcy.network=networknetwork.subscribe(self.sdo.rx_cobid,self.sdo.on_request)network.subscribe(0,self.nmt.on_command)defremove_network(self)->None:ifnotself.has_network():returnself.network.unsubscribe(self.sdo.rx_cobid,self.sdo.on_request)self.network.unsubscribe(0,self.nmt.on_command)self.network=canopen.network._UNINITIALIZED_NETWORKself.sdo.network=canopen.network._UNINITIALIZED_NETWORKself.tpdo.network=canopen.network._UNINITIALIZED_NETWORKself.rpdo.network=canopen.network._UNINITIALIZED_NETWORKself.nmt.network=canopen.network._UNINITIALIZED_NETWORKself.emcy.network=canopen.network._UNINITIALIZED_NETWORKdefadd_read_callback(self,callback):self._read_callbacks.append(callback)defadd_write_callback(self,callback):self._write_callbacks.append(callback)defget_data(self,index:int,subindex:int,check_readable:bool=False)->bytes:obj=self._find_object(index,subindex)ifcheck_readableandnotobj.readable:raiseSdoAbortedError(0x06010001)# Try callbackforcallbackinself._read_callbacks:result=callback(index=index,subindex=subindex,od=obj)ifresultisnotNone:returnobj.encode_raw(result)# Try stored datatry:returnself.data_store[index][subindex]exceptKeyError:# Try ParameterValue in EDSifobj.valueisnotNone:returnobj.encode_raw(obj.value)# Try default valueifobj.defaultisnotNone:returnobj.encode_raw(obj.default)# Resource not availablelogger.info("Resource unavailable for 0x%04X:%02X",index,subindex)raiseSdoAbortedError(0x060A0023)defset_data(self,index:int,subindex:int,data:bytes,check_writable:bool=False,)->None:obj=self._find_object(index,subindex)ifcheck_writableandnotobj.writable:raiseSdoAbortedError(0x06010002)# Check length matches type (length of od variable is in bits)ifobj.data_typeinobjectdictionary.NUMBER_TYPESand(not8*len(data)==len(obj)):raiseSdoAbortedError(0x06070010)# Try callbacksforcallbackinself._write_callbacks:callback(index=index,subindex=subindex,od=obj,data=data)# Store dataself.data_store.setdefault(index,{})self.data_store[index][subindex]=bytes(data)def_find_object(self,index,subindex):ifindexnotinself.object_dictionary:# Index does not existraiseSdoAbortedError(0x06020000)obj=self.object_dictionary[index]ifnotisinstance(obj,objectdictionary.ODVariable):# Group or arrayifsubindexnotinobj:# Subindex does not existraiseSdoAbortedError(0x06090011)obj=obj[subindex]returnobj