Source code for omnitiles.protocol.parser

"""Stateful parser for inbound telemetry frames."""

import struct
import time
from collections.abc import Iterator

from omnitiles.hardware import ADC_MAX, M1_CONFIG, M2_CONFIG
from omnitiles.protocol.messages import START_BYTE, MessageId
from omnitiles.telemetry import ImuSample, Telemetry

# Known telemetry packet lengths (bytes on the wire, including start byte and
# checksum). Each variant is distinguished only by length; add new entries
# here when the firmware grows the packet.
_TELEMETRY_LENGTHS = (7, 15, 17, 53)
_MAX_TELEMETRY_LEN = max(_TELEMETRY_LENGTHS)


[docs] class StreamParser: """Feed BLE notification bytes in, get :class:`Telemetry` frames out. The parser maintains an internal byte buffer and hunts for ``0xA5 0x60`` packets. Because telemetry packets don't carry an explicit length byte, the parser identifies variants by total length and validates the checksum. Invalid/corrupt frames are silently skipped. """ def __init__(self) -> None: self._buf = bytearray()
[docs] def feed(self, data: bytes | bytearray) -> list[Telemetry]: """Append ``data`` to the internal buffer and return any frames completed by the new bytes.""" self._buf.extend(data) return list(self._drain())
def _drain(self) -> Iterator[Telemetry]: while True: frame = self._try_consume_frame() if frame is None: return yield frame def _try_consume_frame(self) -> Telemetry | None: buf = self._buf while buf and buf[0] != START_BYTE: del buf[0] if len(buf) < 2: return None if buf[1] != MessageId.TELEMETRY: del buf[0] return None # Try the longest length we have enough bytes for first, so that # short prefixes of a longer packet don't cause a false positive on # a rare 1-in-256 checksum collision. If nothing validates at the # longest length but we might still be waiting for the tail of a # bigger packet, return None and wait for more bytes. if len(buf) < min(_TELEMETRY_LENGTHS): return None for length in sorted(_TELEMETRY_LENGTHS, reverse=True): if len(buf) < length: continue frame = _try_parse(bytes(buf[:length])) if frame is not None: del buf[:length] return frame if len(buf) < _MAX_TELEMETRY_LEN: return None # We have enough bytes for the largest possible packet and none # validated — this start byte is junk. Advance. del buf[0] return None
def _checksum_ok(packet: bytes) -> bool: total = 0 for byte in packet[1:-1]: total = (total + byte) & 0xFF return packet[-1] == total def _try_parse(packet: bytes) -> Telemetry | None: if not _checksum_ok(packet): return None length = len(packet) m1_pos_adc, m2_pos_adc = struct.unpack_from("<HH", packet, 2) m1_pos_mm = (m1_pos_adc / ADC_MAX) * M1_CONFIG.stroke_mm m2_pos_mm = (m2_pos_adc / ADC_MAX) * M2_CONFIG.stroke_mm uwb_mm: tuple[int | None, int | None, int | None, int | None] | None = None tof_mm: int | None = None imu: ImuSample | None = None m1_adcs: tuple[int, ...] = () m2_adcs: tuple[int, ...] = () if length >= 15: d0, d1, d2, d3 = struct.unpack_from("<HHHH", packet, 6) uwb_mm = ( None if d0 == 0xFFFF else d0, None if d1 == 0xFFFF else d1, None if d2 == 0xFFFF else d2, None if d3 == 0xFFFF else d3, ) if length >= 17: (tof_raw,) = struct.unpack_from("<H", packet, 14) tof_mm = None if tof_raw == 0xFFFF else tof_raw if length == 53: imu_vals = struct.unpack_from("<6f", packet, 16) imu = ImuSample(*imu_vals) m1_adcs = tuple(struct.unpack_from("<4H", packet, 40)) m2_adcs = tuple(struct.unpack_from("<2H", packet, 48)) return Telemetry( timestamp=time.monotonic(), m1_pos_adc=m1_pos_adc, m2_pos_adc=m2_pos_adc, m1_pos_mm=m1_pos_mm, m2_pos_mm=m2_pos_mm, m1_adcs=m1_adcs, m2_adcs=m2_adcs, uwb_mm=uwb_mm, tof_mm=tof_mm, imu=imu, raw=packet, )