[docs]classSyncProducer:"""Transmits a SYNC message periodically."""#: COB-ID of the SYNC messagecob_id=0x80def__init__(self,network:canopen.network.Network):self.network=networkself.period:Optional[float]=Noneself._task:Optional[canopen.network.PeriodicMessageTask]=None
[docs]deftransmit(self,count:Optional[int]=None):"""Send out a SYNC message once. :param count: Counter to add in message. :raises ValueError: If the counter value does not fit in one byte. """data=bytes([count])ifcountisnotNoneelseb""self.network.send_message(self.cob_id,data)
[docs]defstart(self,period:Optional[float]=None):"""Start periodic transmission of SYNC message in a background thread. :param period: Period of SYNC message in seconds. :raises RuntimeError: If a periodic transmission is already started. :raises ValueError: If no period is set via argument nor the instance attribute. """ifself._taskisnotNone:raiseRuntimeError("Periodic SYNC transmission task already running")ifperiodisnotNone:self.period=periodifnotself.period:raiseValueError("A valid transmission period has not been given")self._task=self.network.send_periodic(self.cob_id,b"",self.period)
[docs]defstop(self):"""Stop periodic transmission of SYNC message."""ifself._taskisnotNone:self._task.stop()self._task=None