Source code for omnitiles.sync

"""Synchronous wrappers around :class:`Tile` and :class:`TileFleet`.

These are thin shims that run the async SDK on a shared background event
loop. They exist so callback-driven code (notably the Viser GUI) can call
``tile.m1_extend()`` without having to manage asyncio themselves.
"""

from __future__ import annotations

import asyncio
import threading
from collections.abc import Awaitable, Callable
from concurrent.futures import Future
from typing import TypeVar

from omnitiles.fleet import TileFleet, scan as async_scan
from omnitiles.telemetry import Telemetry
from omnitiles.tile import Tile, TelemetryCallback, Unsubscribe
from omnitiles.transport import DEFAULT_TILE_NAME_PREFIX, TileInfo

T = TypeVar("T")

_loop: asyncio.AbstractEventLoop | None = None
_loop_lock = threading.Lock()


def _ensure_loop() -> asyncio.AbstractEventLoop:
    """Return the SDK's shared background event loop, starting it lazily."""
    global _loop
    with _loop_lock:
        if _loop is not None and _loop.is_running():
            return _loop
        _loop = asyncio.new_event_loop()
        started = threading.Event()

        def _runner() -> None:
            assert _loop is not None
            asyncio.set_event_loop(_loop)
            started.set()
            _loop.run_forever()

        thread = threading.Thread(target=_runner, name="omnitiles-sdk-loop", daemon=True)
        thread.start()
        started.wait()
        return _loop


def _run(coro: Awaitable[T]) -> T:
    loop = _ensure_loop()
    future: Future[T] = asyncio.run_coroutine_threadsafe(coro, loop)  # type: ignore[arg-type]
    return future.result()


[docs] def scan_sync( *, timeout: float = 5.0, name_prefix: str = DEFAULT_TILE_NAME_PREFIX, ) -> list[TileInfo]: """Synchronous version of :func:`omnitiles.scan`.""" return _run(async_scan(timeout=timeout, name_prefix=name_prefix))
[docs] class SyncTile: """Blocking facade around :class:`Tile`. Every method delegates to the async :class:`Tile` running on the shared background loop. ``on_telemetry`` callbacks fire on that loop's thread; keep them fast and non-blocking. """ def __init__(self, tile: Tile) -> None: self._tile = tile
[docs] @classmethod def connect(cls, info: TileInfo) -> "SyncTile": tile = Tile(info) _run(tile.connect()) return cls(tile)
[docs] @classmethod def discover_one( cls, *, timeout: float = 5.0, name_prefix: str = DEFAULT_TILE_NAME_PREFIX, ) -> "SyncTile": """Scan, take the first match, and return a connected SyncTile.""" infos = scan_sync(timeout=timeout, name_prefix=name_prefix) if not infos: raise RuntimeError(f"No tiles found with name prefix {name_prefix!r}") return cls.connect(infos[0])
# ---- identity ---- @property def name(self) -> str: return self._tile.name @property def address(self) -> str: return self._tile.address @property def connected(self) -> bool: return self._tile.connected def __repr__(self) -> str: return f"SyncTile(name={self.name!r}, connected={self.connected})" # ---- connection ----
[docs] def disconnect(self) -> None: _run(self._tile.disconnect())
def __enter__(self) -> "SyncTile": return self def __exit__(self, exc_type, exc, tb) -> None: self.disconnect() # ---- commands ----
[docs] def ping(self) -> None: _run(self._tile.ping())
[docs] def m1_extend(self, speed: int = 255) -> None: _run(self._tile.m1_extend(speed))
[docs] def m1_retract(self, speed: int = 255) -> None: _run(self._tile.m1_retract(speed))
[docs] def m1_brake(self) -> None: _run(self._tile.m1_brake())
[docs] def m1_set_position(self, position: int) -> None: _run(self._tile.m1_set_position(position))
[docs] def m1_set_position_mm(self, mm: float) -> None: _run(self._tile.m1_set_position_mm(mm))
[docs] def m2_extend(self, speed: int = 255) -> None: _run(self._tile.m2_extend(speed))
[docs] def m2_retract(self, speed: int = 255) -> None: _run(self._tile.m2_retract(speed))
[docs] def m2_brake(self) -> None: _run(self._tile.m2_brake())
[docs] def m2_set_position(self, position: int) -> None: _run(self._tile.m2_set_position(position))
[docs] def m2_set_position_mm(self, mm: float) -> None: _run(self._tile.m2_set_position_mm(mm))
[docs] def base_velocity(self, vx: int, vy: int, omega: int) -> None: _run(self._tile.base_velocity(vx, vy, omega))
[docs] def base_brake(self) -> None: _run(self._tile.base_brake())
# ---- telemetry ---- @property def telemetry(self) -> Telemetry | None: return self._tile.telemetry
[docs] def on_telemetry(self, callback: TelemetryCallback) -> Unsubscribe: return self._tile.on_telemetry(callback)
[docs] def wait_for_telemetry(self, timeout: float | None = None) -> Telemetry: return _run(self._tile.wait_for_telemetry(timeout))
[docs] class SyncFleet: """Blocking facade around :class:`TileFleet`.""" def __init__(self, fleet: TileFleet) -> None: self._fleet = fleet self._tiles: dict[str, SyncTile] = {tile.name: SyncTile(tile) for tile in fleet}
[docs] @classmethod def discover( cls, *, timeout: float = 5.0, name_prefix: str = DEFAULT_TILE_NAME_PREFIX, ) -> "SyncFleet": fleet = _run(TileFleet.discover(timeout=timeout, name_prefix=name_prefix)) _run(fleet.connect_all()) return cls(fleet)
def __len__(self) -> int: return len(self._tiles) def __iter__(self): return iter(self._tiles.values()) def __getitem__(self, name: str) -> SyncTile: return self._tiles[name] def __contains__(self, name: object) -> bool: return name in self._tiles @property def names(self) -> list[str]: return list(self._tiles.keys())
[docs] def disconnect_all(self) -> None: _run(self._fleet.disconnect_all())
def __enter__(self) -> "SyncFleet": return self def __exit__(self, exc_type, exc, tb) -> None: self.disconnect_all()
[docs] def broadcast(self, action: Callable[[Tile], Awaitable[None]]) -> None: _run(self._fleet.broadcast(action))