Source code for canopen.sync

from __future__ import annotations

from typing import Optional, TYPE_CHECKING

if TYPE_CHECKING:
    import canopen.network


[docs] class SyncProducer: """Transmits a SYNC message periodically.""" #: COB-ID of the SYNC message cob_id = 0x80 def __init__(self, network: canopen.network.Network): self.network = network self.period: Optional[float] = None self._task: Optional[canopen.network.PeriodicMessageTask] = None
[docs] def transmit(self, count: Optional[int] = None): """Send out a SYNC message once. :param count: Counter to add in message. :raises ValueError: If the counter value does not fit in one byte. """ data = bytes([count]) if count is not None else b"" self.network.send_message(self.cob_id, data)
[docs] def start(self, period: Optional[float] = None): """Start periodic transmission of SYNC message in a background thread. :param period: Period of SYNC message in seconds. :raises RuntimeError: If a periodic transmission is already started. :raises ValueError: If no period is set via argument nor the instance attribute. """ if self._task is not None: raise RuntimeError("Periodic SYNC transmission task already running") if period is not None: self.period = period if not self.period: raise ValueError("A valid transmission period has not been given") self._task = self.network.send_periodic(self.cob_id, b"", self.period)
[docs] def stop(self): """Stop periodic transmission of SYNC message.""" if self._task is not None: self._task.stop() self._task = None