Source code for canopen.sync
from typing import Optional
[docs]
class SyncProducer:
"""Transmits a SYNC message periodically."""
#: COB-ID of the SYNC message
cob_id = 0x80
def __init__(self, network):
self.network = network
self.period: Optional[float] = None
self._task = None
[docs]
def transmit(self, count: Optional[int] = None):
"""Send out a SYNC message once.
:param count:
Counter to add in message.
"""
data = [count] if count is not None else []
self.network.send_message(self.cob_id, data)
[docs]
def start(self, period: Optional[float] = None):
"""Start periodic transmission of SYNC message in a background thread.
:param period:
Period of SYNC message in seconds.
"""
if period is not None:
self.period = period
if not self.period:
raise ValueError("A valid transmission period has not been given")
self._task = self.network.send_periodic(self.cob_id, [], self.period)
[docs]
def stop(self):
"""Stop periodic transmission of SYNC message."""
if self._task is not None:
self._task.stop()