| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- """
- AI-Light BLE Relay
- 从stdin读取JSON状态消息,通过BLE发送到AI-Light设备
- 用法:
- python ble_relay.py [选项]
- 选项:
- --device 蓝牙设备名称 (默认: AI-Light)
- --service-uuid 服务UUID (默认: b8b7e001-7a6b-4f4f-9a8b-11c0ffee0001)
- --char-uuid 特征UUID (默认: b8b7e002-7a6b-4f4f-9a8b-11c0ffee0001)
- """
- import sys
- import asyncio
- import json
- import argparse
- import logging
- from bleak import BleakClient, BleakScanner
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [BLE] %(message)s",
- datefmt="%H:%M:%S"
- )
- logger = logging.getLogger(__name__)
- STATUS_MAP = {
- "idle": "idle",
- "busy": "busy",
- "retry": "thinking",
- "pending": "thinking",
- "reasoning": "thinking",
- "using_tool": "ai",
- "running": "ai",
- # "completed" 和 "session_completed" 跳动太快,不发送到灯
- "permission": "alarm",
- "error": "error"
- }
- async def find_device(device_name: str):
- logger.info(f"Searching for {device_name}...")
- device = await BleakScanner.find_device_by_name(device_name, timeout=10.0)
- if device is None:
- logger.error(f"Device '{device_name}' not found")
- return None
- logger.info(f"Found: {device.name} ({device.address})")
- return device
- async def run_relay(device_name: str, service_uuid: str, char_uuid: str):
- device = await find_device(device_name)
- if not device:
- sys.exit(1)
- async with BleakClient(device) as client:
- logger.info(f"Connected to {device.name}")
- for line in sys.stdin:
- try:
- line = line.strip()
- if not line:
- continue
- data = json.loads(line)
- code = data.get("code", "idle")
- mode = STATUS_MAP.get(code)
- if mode is None:
- continue
- await client.write_gatt_char(char_uuid, mode.encode("utf-8"))
- logger.info(f"Sent: {mode} (from code: {code})")
- except json.JSONDecodeError:
- logger.warning(f"Invalid JSON: {line}")
- except Exception as e:
- logger.error(f"Error: {e}")
- # Try to reconnect
- logger.info("Attempting to reconnect...")
- device = await find_device(device_name)
- if not device:
- logger.error("Reconnection failed")
- break
- client = BleakClient(device)
- await client.__aenter__()
- logger.info("Reconnected")
- def main():
- parser = argparse.ArgumentParser(description="AI-Light BLE Relay")
- parser.add_argument("--device", default="AI-Light",
- help="BLE device name (default: AI-Light)")
- parser.add_argument("--service-uuid", default="b8b7e001-7a6b-4f4f-9a8b-11c0ffee0001",
- help="BLE service UUID")
- parser.add_argument("--char-uuid", default="b8b7e002-7a6b-4f4f-9a8b-11c0ffee0001",
- help="BLE characteristic UUID")
- args = parser.parse_args()
- logger.info(f"BLE Relay starting...")
- logger.info(f"Device: {args.device}")
- logger.info(f"Waiting for status messages on stdin...")
- try:
- asyncio.run(run_relay(args.device, args.service_uuid, args.char_uuid))
- except KeyboardInterrupt:
- logger.info("Stopped")
- except Exception as e:
- logger.error(f"Fatal error: {e}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|