Source code for omnitiles.tile

"""High-level asynchronous API for a single OmniTile."""

from __future__ import annotations

import asyncio
import struct
from collections.abc import Callable
from typing import TYPE_CHECKING

from omnitiles.hardware import M1_CONFIG, M2_CONFIG, ActuatorConfig
from omnitiles.protocol import MessageId, StreamParser, encode
from omnitiles.telemetry import Telemetry
from omnitiles.transport import BleakTransport, Transport

if TYPE_CHECKING:
    from omnitiles.transport import TileInfo

TelemetryCallback = Callable[[Telemetry], None]
Unsubscribe = Callable[[], None]


[docs] class Tile: """Async interface to a single OmniTile over BLE. A :class:`Tile` owns a :class:`Transport` and a :class:`StreamParser`. Every command method returns once the packet has been handed to the transport — the protocol does not ACK, so a successful ``await`` means "sent", not "executed". Subscribe to telemetry to observe effects. Tiles are async context managers:: async with Tile(info) as tile: await tile.m1_extend() """ def __init__( self, info: "TileInfo", *, transport: Transport | None = None, ) -> None: self._info = info self._transport: Transport = transport or BleakTransport(info.address) self._parser = StreamParser() self._latest: Telemetry | None = None self._callbacks: list[TelemetryCallback] = [] self._new_frame_event = asyncio.Event() self._transport.set_notify_handler(self._on_bytes) self._transport.set_disconnect_handler(self._on_transport_disconnect) self._loop: asyncio.AbstractEventLoop | None = None self._reconnect_task: asyncio.Task[None] | None = None self._user_disconnected = False # ---- identity ---- @property def name(self) -> str: return self._info.name @property def address(self) -> str: return self._info.address @property def connected(self) -> bool: return self._transport.connected def __repr__(self) -> str: return f"Tile(name={self.name!r}, connected={self.connected})" # ---- connection ----
[docs] async def connect(self) -> None: self._loop = asyncio.get_running_loop() self._user_disconnected = False await self._transport.connect()
[docs] async def disconnect(self) -> None: self._user_disconnected = True if self._reconnect_task is not None and not self._reconnect_task.done(): self._reconnect_task.cancel() await self._transport.disconnect()
def _on_transport_disconnect(self) -> None: """Called from bleak's disconnect callback (runs on the SDK loop).""" if self._user_disconnected: return if self._loop is None or self._loop.is_closed(): return if self._reconnect_task is not None and not self._reconnect_task.done(): return self._reconnect_task = self._loop.create_task(self._reconnect_loop()) async def _reconnect_loop(self) -> None: """Retry :meth:`Transport.connect` with bounded exponential backoff.""" delay = 1.0 max_delay = 10.0 while not self._user_disconnected: try: await self._transport.connect() return except asyncio.CancelledError: raise except Exception: await asyncio.sleep(delay) delay = min(delay * 2.0, max_delay) async def __aenter__(self) -> "Tile": await self.connect() return self async def __aexit__(self, exc_type, exc, tb) -> None: await self.disconnect() # ---- commands ----
[docs] async def ping(self) -> None: await self._send(MessageId.PING)
[docs] async def m1_extend(self, speed: int = 255) -> None: """Drive M1 in the extend direction at the given PWM speed (0-255).""" await self._send(MessageId.M1_EXTEND, _u8(speed))
[docs] async def m1_retract(self, speed: int = 255) -> None: await self._send(MessageId.M1_RETRACT, _u8(speed))
[docs] async def m1_brake(self) -> None: await self._send(MessageId.M1_BRAKE)
[docs] async def m1_set_position(self, position: int) -> None: """Command a closed-loop M1 target, scaled 0-255 over the stroke.""" await self._send(MessageId.M1_SET_POSITION, _u8(position))
[docs] async def m1_set_position_mm(self, mm: float) -> None: """Convenience: set M1 target in millimeters along the stroke.""" await self.m1_set_position(_mm_to_scaled(mm, M1_CONFIG))
[docs] async def m2_extend(self, speed: int = 255) -> None: await self._send(MessageId.M2_EXTEND, _u8(speed))
[docs] async def m2_retract(self, speed: int = 255) -> None: await self._send(MessageId.M2_RETRACT, _u8(speed))
[docs] async def m2_brake(self) -> None: await self._send(MessageId.M2_BRAKE)
[docs] async def m2_set_position(self, position: int) -> None: await self._send(MessageId.M2_SET_POSITION, _u8(position))
[docs] async def m2_set_position_mm(self, mm: float) -> None: await self.m2_set_position(_mm_to_scaled(mm, M2_CONFIG))
[docs] async def base_velocity(self, vx: int, vy: int, omega: int) -> None: """Command open-loop mobile-base velocity. Each component is int8.""" payload = struct.pack("<bbb", _i8(vx), _i8(vy), _i8(omega)) await self._send(MessageId.BASE_VELOCITY, payload)
[docs] async def base_brake(self) -> None: await self._send(MessageId.BASE_BRAKE)
# ---- telemetry ---- @property def telemetry(self) -> Telemetry | None: """Most recently received telemetry frame, or ``None``.""" return self._latest
[docs] def on_telemetry(self, callback: TelemetryCallback) -> Unsubscribe: """Register ``callback`` to run on every incoming telemetry frame. Returns a zero-argument function that removes the subscription. Callbacks fire on whatever thread/loop the transport delivers notifications on, so they must not block. """ self._callbacks.append(callback) def _unsubscribe() -> None: try: self._callbacks.remove(callback) except ValueError: pass return _unsubscribe
[docs] async def wait_for_telemetry(self, timeout: float | None = None) -> Telemetry: """Await the next telemetry frame and return it.""" self._new_frame_event.clear() await asyncio.wait_for(self._new_frame_event.wait(), timeout=timeout) assert self._latest is not None return self._latest
# ---- internals ---- async def _send(self, msg_id: MessageId, payload: bytes = b"") -> None: await self._transport.send(encode(msg_id, payload)) def _on_bytes(self, data: bytes) -> None: for frame in self._parser.feed(data): self._latest = frame self._new_frame_event.set() for cb in list(self._callbacks): try: cb(frame) except Exception: pass
def _u8(value: int) -> bytes: return bytes([max(0, min(255, int(value)))]) def _i8(value: int) -> int: return max(-128, min(127, int(value))) def _mm_to_scaled(mm: float, config: ActuatorConfig) -> int: """Map a millimeter target to the firmware's 0-255 set-position scale.""" frac = mm / config.stroke_mm return max(0, min(255, int(round(frac * 255))))