""" AI-Light BLE Relay 从stdin读取JSON状态消息,通过BLE发送到AI-Light设备 用法: python ble_relay.py [选项] 选项: --device 蓝牙设备名称 (默认: AI-Light) --service-uuid 服务UUID (默认: b8b7e001-7a6b-4f4f-9a8b-11c0ffee0001) --char-uuid 特征UUID (默认: b8b7e002-7a6b-4f4f-9a8b-11c0ffee0001) """ import sys import asyncio import json import argparse import logging from bleak import BleakClient, BleakScanner logging.basicConfig( level=logging.INFO, format="%(asctime)s [BLE] %(message)s", datefmt="%H:%M:%S" ) logger = logging.getLogger(__name__) STATUS_MAP = { "idle": "idle", "busy": "busy", "retry": "thinking", "pending": "thinking", "reasoning": "thinking", "using_tool": "ai", "running": "ai", # "completed" 和 "session_completed" 跳动太快,不发送到灯 "permission": "alarm", "error": "error" } async def find_device(device_name: str): logger.info(f"Searching for {device_name}...") device = await BleakScanner.find_device_by_name(device_name, timeout=10.0) if device is None: logger.error(f"Device '{device_name}' not found") return None logger.info(f"Found: {device.name} ({device.address})") return device async def run_relay(device_name: str, service_uuid: str, char_uuid: str): device = await find_device(device_name) if not device: sys.exit(1) async with BleakClient(device) as client: logger.info(f"Connected to {device.name}") for line in sys.stdin: try: line = line.strip() if not line: continue data = json.loads(line) code = data.get("code", "idle") mode = STATUS_MAP.get(code) if mode is None: continue await client.write_gatt_char(char_uuid, mode.encode("utf-8")) logger.info(f"Sent: {mode} (from code: {code})") except json.JSONDecodeError: logger.warning(f"Invalid JSON: {line}") except Exception as e: logger.error(f"Error: {e}") # Try to reconnect logger.info("Attempting to reconnect...") device = await find_device(device_name) if not device: logger.error("Reconnection failed") break client = BleakClient(device) await client.__aenter__() logger.info("Reconnected") def main(): parser = argparse.ArgumentParser(description="AI-Light BLE Relay") parser.add_argument("--device", default="AI-Light", help="BLE device name (default: AI-Light)") parser.add_argument("--service-uuid", default="b8b7e001-7a6b-4f4f-9a8b-11c0ffee0001", help="BLE service UUID") parser.add_argument("--char-uuid", default="b8b7e002-7a6b-4f4f-9a8b-11c0ffee0001", help="BLE characteristic UUID") args = parser.parse_args() logger.info(f"BLE Relay starting...") logger.info(f"Device: {args.device}") logger.info(f"Waiting for status messages on stdin...") try: asyncio.run(run_relay(args.device, args.service_uuid, args.char_uuid)) except KeyboardInterrupt: logger.info("Stopped") except Exception as e: logger.error(f"Fatal error: {e}") sys.exit(1) if __name__ == "__main__": main()