ble_relay.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. """
  2. AI-Light BLE Relay
  3. 从stdin读取JSON状态消息,通过BLE发送到AI-Light设备
  4. 用法:
  5. python ble_relay.py [选项]
  6. 选项:
  7. --device 蓝牙设备名称 (默认: AI-Light)
  8. --service-uuid 服务UUID (默认: b8b7e001-7a6b-4f4f-9a8b-11c0ffee0001)
  9. --char-uuid 特征UUID (默认: b8b7e002-7a6b-4f4f-9a8b-11c0ffee0001)
  10. """
  11. import sys
  12. import asyncio
  13. import json
  14. import argparse
  15. import logging
  16. from bleak import BleakClient, BleakScanner
  17. logging.basicConfig(
  18. level=logging.INFO,
  19. format="%(asctime)s [BLE] %(message)s",
  20. datefmt="%H:%M:%S"
  21. )
  22. logger = logging.getLogger(__name__)
  23. STATUS_MAP = {
  24. "idle": "idle",
  25. "busy": "busy",
  26. "retry": "thinking",
  27. "pending": "thinking",
  28. "reasoning": "thinking",
  29. "using_tool": "ai",
  30. "running": "ai",
  31. # "completed" 和 "session_completed" 跳动太快,不发送到灯
  32. "permission": "alarm",
  33. "error": "error"
  34. }
  35. async def find_device(device_name: str):
  36. logger.info(f"Searching for {device_name}...")
  37. device = await BleakScanner.find_device_by_name(device_name, timeout=10.0)
  38. if device is None:
  39. logger.error(f"Device '{device_name}' not found")
  40. return None
  41. logger.info(f"Found: {device.name} ({device.address})")
  42. return device
  43. async def run_relay(device_name: str, service_uuid: str, char_uuid: str):
  44. device = await find_device(device_name)
  45. if not device:
  46. sys.exit(1)
  47. async with BleakClient(device) as client:
  48. logger.info(f"Connected to {device.name}")
  49. for line in sys.stdin:
  50. try:
  51. line = line.strip()
  52. if not line:
  53. continue
  54. data = json.loads(line)
  55. code = data.get("code", "idle")
  56. mode = STATUS_MAP.get(code)
  57. if mode is None:
  58. continue
  59. await client.write_gatt_char(char_uuid, mode.encode("utf-8"))
  60. logger.info(f"Sent: {mode} (from code: {code})")
  61. except json.JSONDecodeError:
  62. logger.warning(f"Invalid JSON: {line}")
  63. except Exception as e:
  64. logger.error(f"Error: {e}")
  65. # Try to reconnect
  66. logger.info("Attempting to reconnect...")
  67. device = await find_device(device_name)
  68. if not device:
  69. logger.error("Reconnection failed")
  70. break
  71. client = BleakClient(device)
  72. await client.__aenter__()
  73. logger.info("Reconnected")
  74. def main():
  75. parser = argparse.ArgumentParser(description="AI-Light BLE Relay")
  76. parser.add_argument("--device", default="AI-Light",
  77. help="BLE device name (default: AI-Light)")
  78. parser.add_argument("--service-uuid", default="b8b7e001-7a6b-4f4f-9a8b-11c0ffee0001",
  79. help="BLE service UUID")
  80. parser.add_argument("--char-uuid", default="b8b7e002-7a6b-4f4f-9a8b-11c0ffee0001",
  81. help="BLE characteristic UUID")
  82. args = parser.parse_args()
  83. logger.info(f"BLE Relay starting...")
  84. logger.info(f"Device: {args.device}")
  85. logger.info(f"Waiting for status messages on stdin...")
  86. try:
  87. asyncio.run(run_relay(args.device, args.service_uuid, args.char_uuid))
  88. except KeyboardInterrupt:
  89. logger.info("Stopped")
  90. except Exception as e:
  91. logger.error(f"Fatal error: {e}")
  92. sys.exit(1)
  93. if __name__ == "__main__":
  94. main()