Source code for omnitiles.fleet

"""Multi-tile management."""

from __future__ import annotations

import asyncio
from collections.abc import Awaitable, Callable, Iterable, Iterator

from omnitiles.tile import Tile
from omnitiles.transport import DEFAULT_TILE_NAME_PREFIX, TileInfo, discover_tiles


[docs] async def scan( *, timeout: float = 5.0, name_prefix: str = DEFAULT_TILE_NAME_PREFIX, ) -> list[TileInfo]: """Scan for OmniTile devices and return their descriptors. Thin wrapper around :func:`omnitiles.transport.discover_tiles` that re-exports with the canonical ``scan`` name. """ return await discover_tiles(timeout=timeout, name_prefix=name_prefix)
[docs] class TileFleet: """A collection of :class:`Tile` instances addressed by name. :class:`TileFleet` is an async context manager: it connects every contained tile on ``__aenter__`` and disconnects them on ``__aexit__``. Use :meth:`discover` to build a fleet from a live BLE scan. """ def __init__(self, tiles: Iterable[Tile]) -> None: self._tiles: dict[str, Tile] = {} for tile in tiles: self._tiles[tile.name] = tile
[docs] @classmethod async def discover( cls, *, timeout: float = 5.0, name_prefix: str = DEFAULT_TILE_NAME_PREFIX, ) -> "TileFleet": """Scan for tiles and return a fleet wrapping all matches. The returned fleet is **not** yet connected — either ``await fleet.connect_all()`` or use it as an async context manager. """ infos = await scan(timeout=timeout, name_prefix=name_prefix) return cls(Tile(info) for info in infos)
# ---- collection interface ---- def __len__(self) -> int: return len(self._tiles) def __iter__(self) -> Iterator[Tile]: return iter(self._tiles.values()) def __getitem__(self, name: str) -> Tile: return self._tiles[name] def __contains__(self, name: object) -> bool: return name in self._tiles @property def names(self) -> list[str]: return list(self._tiles.keys()) # ---- connection ----
[docs] async def connect_all(self) -> None: await asyncio.gather(*(t.connect() for t in self._tiles.values()))
[docs] async def disconnect_all(self) -> None: await asyncio.gather( *(t.disconnect() for t in self._tiles.values()), return_exceptions=True, )
async def __aenter__(self) -> "TileFleet": await self.connect_all() return self async def __aexit__(self, exc_type, exc, tb) -> None: await self.disconnect_all() # ---- broadcast ----
[docs] async def broadcast( self, action: Callable[[Tile], Awaitable[None]], ) -> None: """Run an async action against every tile in parallel. Example:: await fleet.broadcast(lambda t: t.m1_set_position(128)) """ await asyncio.gather(*(action(tile) for tile in self._tiles.values()))