package main import ( "context" "flag" "fmt" "io" "os" "os/exec" "os/signal" "path/filepath" "sort" "strconv" "strings" "sync" "syscall" "time" "ai-status-light/internal/api" "ai-status-light/internal/database" "ai-status-light/internal/discovery" "ai-status-light/internal/event" "ai-status-light/internal/logger" "ai-status-light/internal/monitor" mqttcli "ai-status-light/internal/mqtt" ) const defaultDBPath = "./data/config.db" var Version = "dev" func main() { if len(os.Args) < 2 { printUsage() return } switch os.Args[1] { case "monitor": runMonitor(os.Args[2:]) case "config": runConfig(os.Args[2:]) case "serve": runServe(os.Args[2:]) case "version": fmt.Printf("opencode-monitor %s\n", Version) default: printUsage() } } func printUsage() { fmt.Printf("opencode-monitor %s\n\n", Version) fmt.Println("用法: opencode-monitor <命令> [选项]") fmt.Println("") fmt.Println("命令:") fmt.Println(" monitor 启动监控") fmt.Println(" config 管理 MQTT 和 BLE 配置") fmt.Println(" serve 启动 API 服务") fmt.Println(" version 显示版本信息") fmt.Println("") fmt.Println("运行 'opencode-monitor <命令> -h' 查看命令帮助") } func runServe(args []string) { fs := flag.NewFlagSet("serve", flag.ExitOnError) addr := fs.String("addr", ":8080", "监听地址") dbPath := fs.String("db", defaultDBPath, "数据库路径") logFile := fs.String("log-file", "./logs", "日志文件路径(默认 ./logs/monitor.log)") logLevel := fs.String("log-level", "info", "日志级别 (debug/info/warn/error)") tls := fs.Bool("tls", false, "启用 HTTPS (使用自签名证书)") tlsCert := fs.String("tls-cert", "./data/tls/cert.pem", "TLS 证书文件路径") tlsKey := fs.String("tls-key", "./data/tls/key.pem", "TLS 私钥文件路径") fs.Parse(args) logger.SetLevel(logger.ParseLevel(*logLevel)) if err := logger.InitFileLog(*logFile); err != nil { fmt.Printf("初始化日志文件失败: %v\n", err) return } defer logger.Close() db, err := database.New(*dbPath) if err != nil { logger.Error("打开数据库失败: %v", err) fmt.Printf("打开数据库失败: %v\n", err) return } defer db.Close() logger.Info("数据库已连接: %s", *dbPath) server := api.New(db, *addr) if *tls { if err := api.EnsureSelfSignedCert(*tlsCert, *tlsKey); err != nil { logger.Error("生成自签名证书失败: %v", err) fmt.Printf("生成自签名证书失败: %v\n", err) return } server.EnableTLS(*tlsCert, *tlsKey) logger.Info("HTTPS 已启用") fmt.Println("HTTPS 已启用 (自签名证书)") } else { fmt.Printf("API 服务已启动: %s\n", *addr) } fmt.Println("接口文档:") fmt.Println(" GET /api/health - 健康检查") fmt.Println(" GET /api/mqtt - 获取所有 MQTT 配置") fmt.Println(" POST /api/mqtt - 创建 MQTT 配置") fmt.Println(" GET /api/mqtt/:id - 获取单个 MQTT 配置") fmt.Println(" PUT /api/mqtt/:id - 更新 MQTT 配置") fmt.Println(" DELETE /api/mqtt/:id - 删除 MQTT 配置") fmt.Println(" GET /api/ble - 获取所有 BLE 配置") fmt.Println(" POST /api/ble - 创建 BLE 配置") fmt.Println(" GET /api/ble/:id - 获取单个 BLE 配置") fmt.Println(" PUT /api/ble/:id - 更新 BLE 配置") fmt.Println(" DELETE /api/ble/:id - 删除 BLE 配置") if err := server.Start(); err != nil { logger.Error("服务启动失败: %v", err) fmt.Printf("服务启动失败: %v\n", err) } } func runMonitor(args []string) { fs := flag.NewFlagSet("monitor", flag.ExitOnError) host := fs.String("host", "127.0.0.1", "主机地址") portsFlag := fs.String("ports", "", "端口列表,逗号分隔 (如: 4096,4097,4098)") scanFlag := fs.String("scan", "", "扫描端口范围 (如: 4096-4100)") intervalFlag := fs.Int("interval", 1, "动态扫描间隔(秒), 默认1") dbPath := fs.String("db", defaultDBPath, "数据库路径") apiAddr := fs.String("api-addr", "", "API 服务地址 (如: :8080)") tls := fs.Bool("tls", false, "启用 HTTPS (使用自签名证书)") tlsCert := fs.String("tls-cert", "./data/tls/cert.pem", "TLS 证书文件路径") tlsKey := fs.String("tls-key", "./data/tls/key.pem", "TLS 私钥文件路径") logFile := fs.String("log-file", "./logs", "日志文件路径(默认 ./logs/monitor.log)") logLevel := fs.String("log-level", "info", "日志级别 (debug/info/warn/error)") fs.Parse(args) logger.SetLevel(logger.ParseLevel(*logLevel)) if err := logger.InitFileLog(*logFile); err != nil { fmt.Printf("初始化日志文件失败: %v\n", err) return } defer logger.Close() var scanRange *[2]int if *scanFlag != "" { parts := strings.Split(*scanFlag, "-") if len(parts) == 2 { start, err1 := strconv.Atoi(parts[0]) end, err2 := strconv.Atoi(parts[1]) if err1 == nil && err2 == nil { scanRange = &[2]int{start, end} } } } ctx, cancel := context.WithCancel(context.Background()) defer cancel() sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) db, err := database.New(*dbPath) if err != nil { logger.Error("打开数据库失败: %v", err) fmt.Printf("打开数据库失败: %v\n", err) return } defer db.Close() logger.Info("数据库已连接: %s", *dbPath) var mqttClient *mqttcli.Client cfg, err := db.GetMQTTConfig() if err != nil { logger.Error("读取 MQTT 配置失败: %v", err) fmt.Printf("读取 MQTT 配置失败: %v\n", err) } else if cfg != nil { mqttClient = mqttcli.NewFromConfig(cfg) if err := mqttClient.Connect(); err != nil { logger.Error("MQTT 连接失败: %v", err) fmt.Printf("MQTT 连接失败: %v\n", err) mqttClient = nil } else { defer mqttClient.Disconnect() logger.Info("MQTT 已连接: %s (主题: %s)", cfg.Broker, cfg.Topic) fmt.Printf("MQTT 已连接: %s (主题: %s)\n", cfg.Broker, cfg.Topic) } } else { logger.Info("未配置 MQTT,跳过 MQTT 连接") } var bleStdin io.WriteCloser bleCfg, err := db.GetBLEConfig() if err != nil { logger.Error("读取 BLE 配置失败: %v", err) fmt.Printf("读取 BLE 配置失败: %v\n", err) } else if bleCfg != nil { bleStdin = startBLERelay(bleCfg, ctx) } else { logger.Info("未配置 BLE,跳过 BLE 中继") } var apiServer *api.Server if *apiAddr != "" { apiServer = api.New(db, *apiAddr) if *tls { if err := api.EnsureSelfSignedCert(*tlsCert, *tlsKey); err != nil { logger.Error("生成自签名证书失败: %v", err) fmt.Printf("生成自签名证书失败: %v\n", err) return } apiServer.EnableTLS(*tlsCert, *tlsKey) } go func() { scheme := "http" if *tls { scheme = "https" } logger.Info("API 服务已启动: %s://%s", scheme, *apiAddr) fmt.Printf("API 服务已启动: %s://%s\n", scheme, *apiAddr) if err := apiServer.Start(); err != nil { logger.Error("API 服务失败: %v", err) fmt.Printf("API 服务失败: %v\n", err) } }() } callback := createCallback(mqttClient, apiServer, bleStdin) if *portsFlag != "" { runFixedMode(ctx, *host, *portsFlag, callback, sigChan) cancel() return } if scanRange != nil { fmt.Printf("扫描端口范围 %d-%d...\n", scanRange[0], scanRange[1]) } else { fmt.Println("查找 OpenCode 实例...") } runDynamicMode(ctx, *host, scanRange, *intervalFlag, callback, sigChan) cancel() } func runConfig(args []string) { if len(args) < 1 { printConfigUsage() return } dbPath := defaultDBPath logFile := "./logs" logLevel := "info" for i, arg := range args { if arg == "--db" && i+1 < len(args) { dbPath = args[i+1] } if arg == "--log-file" && i+1 < len(args) { logFile = args[i+1] } if arg == "--log-level" && i+1 < len(args) { logLevel = args[i+1] } } logger.SetLevel(logger.ParseLevel(logLevel)) if err := logger.InitFileLog(logFile); err != nil { fmt.Printf("初始化日志文件失败: %v\n", err) return } defer logger.Close() // 过滤全局选项,避免传递给子命令的 FlagSet var filtered []string for i := 0; i < len(args); i++ { switch args[i] { case "--db", "--log-file", "--log-level": i++ // 跳过值 default: filtered = append(filtered, args[i]) } } args = filtered db, err := database.New(dbPath) if err != nil { logger.Error("打开数据库失败: %v", err) fmt.Printf("打开数据库失败: %v\n", err) return } defer db.Close() logger.Info("数据库已连接: %s", dbPath) switch args[0] { case "list": configs, err := db.ListMQTTConfigs() if err != nil { logger.Error("查询配置失败: %v", err) fmt.Printf("查询失败: %v\n", err) return } if len(configs) == 0 { fmt.Println("未配置 MQTT") return } logger.Info("查询到 %d 条 MQTT 配置", len(configs)) for _, cfg := range configs { status := "禁用" if cfg.Enabled { status = "启用" } auth := "" if cfg.Username != "" { auth = fmt.Sprintf(" [认证: %s]", cfg.Username) } fmt.Printf("[%d] %s | %s | %s%s | %s\n", cfg.ID, cfg.Broker, cfg.ClientID, cfg.Topic, auth, status) } case "set": fs := flag.NewFlagSet("config set", flag.ExitOnError) broker := fs.String("broker", "", "MQTT Broker 地址 (如: tcp://127.0.0.1:1883)") clientID := fs.String("client-id", "opencode-monitor", "MQTT 客户端 ID") username := fs.String("username", "", "MQTT 用户名") password := fs.String("password", "", "MQTT 密码") topic := fs.String("topic", "opencode/status", "MQTT 主题") enabled := fs.Bool("enabled", true, "是否启用") fs.Parse(args[1:]) if *broker == "" { fmt.Println("必须指定 --broker") return } cfg := &database.MQTTConfig{ Broker: *broker, ClientID: *clientID, Username: *username, Password: *password, Topic: *topic, Enabled: *enabled, } if err := db.SaveMQTTConfig(cfg); err != nil { logger.Error("保存 MQTT 配置失败: %v", err) fmt.Printf("保存失败: %v\n", err) return } logger.Info("MQTT 配置已保存: %s (主题: %s)", cfg.Broker, cfg.Topic) fmt.Println("配置已保存") case "delete": if len(args) < 2 { fmt.Println("必须指定配置 ID") return } id, err := strconv.Atoi(args[1]) if err != nil { logger.Warn("无效的配置 ID: %s", args[1]) fmt.Println("无效的 ID") return } if err := db.DeleteMQTTConfig(id); err != nil { logger.Error("删除配置失败: id=%d, %v", id, err) fmt.Printf("删除失败: %v\n", err) return } logger.Info("MQTT 配置已删除: id=%d", id) fmt.Println("配置已删除") case "ble": runBleConfig(db, args[1:]) default: printConfigUsage() } } func printConfigUsage() { fmt.Println("用法: opencode-monitor config <子命令> [选项]") fmt.Println("") fmt.Println("子命令:") fmt.Println(" list 列出所有 MQTT 配置") fmt.Println(" set 设置 MQTT 配置") fmt.Println(" delete 删除 MQTT 配置") fmt.Println(" ble list 列出所有 BLE 配置") fmt.Println(" ble set 设置 BLE 配置") fmt.Println(" ble delete 删除 BLE 配置") fmt.Println("") fmt.Println("MQTT 选项:") fmt.Println(" --broker MQTT Broker 地址") fmt.Println(" --client-id MQTT 客户端 ID") fmt.Println(" --username MQTT 用户名") fmt.Println(" --password MQTT 密码") fmt.Println(" --topic MQTT 主题") fmt.Println(" --enabled 是否启用 (true/false)") fmt.Println("") fmt.Println("BLE 选项:") fmt.Println(" --device 蓝牙设备名称 (默认: AI-Light)") fmt.Println(" --service-uuid BLE 服务 UUID") fmt.Println(" --char-uuid BLE 特征 UUID") fmt.Println(" --enabled 是否启用 (true/false)") fmt.Println("") fmt.Println("全局选项:") fmt.Println(" --db 数据库路径") fmt.Println(" --log-file 日志文件路径(默认 ./logs/monitor.log)") fmt.Println(" --log-level 日志级别 (debug/info/warn/error)") } func startBLERelay(bleCfg *database.BLEConfig, ctx context.Context) io.WriteCloser { if len(embeddedBLERelay) == 0 { logger.Warn("BLE 中继未嵌入,请使用 make build-with-ble 构建") fmt.Println("警告: BLE 中继未嵌入,已跳过。请使用 make build-with-ble 构建") return nil } // 释放嵌入的 exe 到临时目录 tmpDir := filepath.Join(os.TempDir(), "ai-status-light") if err := os.MkdirAll(tmpDir, 0755); err != nil { logger.Error("创建临时目录失败: %v", err) return nil } var tmpExe string if strings.Contains(strings.ToLower(os.Getenv("OS")), "windows") { tmpExe = filepath.Join(tmpDir, "ble_relay.exe") } else { tmpExe = filepath.Join(tmpDir, "ble_relay") } if err := os.WriteFile(tmpExe, embeddedBLERelay, 0755); err != nil { logger.Error("释放 BLE 中继失败: %v", err) return nil } logger.Debug("BLE 中继已释放到: %s", tmpExe) args := []string{ "--device", bleCfg.DeviceName, "--service-uuid", bleCfg.ServiceUUID, "--char-uuid", bleCfg.CharUUID, } cmd := exec.CommandContext(ctx, tmpExe, args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr stdin, err := cmd.StdinPipe() if err != nil { logger.Error("创建 BLE 中继 stdin 管道失败: %v", err) return nil } if err := cmd.Start(); err != nil { logger.Error("启动 BLE 中继失败: %v", err) return nil } logger.Info("BLE 中继已启动: %s (PID: %d)", bleCfg.DeviceName, cmd.Process.Pid) fmt.Printf("BLE 中继已启动: %s (PID: %d)\n", bleCfg.DeviceName, cmd.Process.Pid) go func() { if err := cmd.Wait(); err != nil { logger.Error("BLE 中继退出: %v", err) } }() return stdin } func runBleConfig(db *database.DB, args []string) { if len(args) < 1 { printBleConfigUsage() return } switch args[0] { case "list": configs, err := db.ListBLEConfigs() if err != nil { logger.Error("查询 BLE 配置失败: %v", err) fmt.Printf("查询失败: %v\n", err) return } if len(configs) == 0 { fmt.Println("未配置 BLE") return } logger.Info("查询到 %d 条 BLE 配置", len(configs)) for _, cfg := range configs { status := "禁用" if cfg.Enabled { status = "启用" } fmt.Printf("[%d] %s | %s | %s | %s\n", cfg.ID, cfg.DeviceName, cfg.ServiceUUID, cfg.CharUUID, status) } case "set": fs := flag.NewFlagSet("config ble set", flag.ExitOnError) deviceName := fs.String("device", "AI-Light", "蓝牙设备名称") serviceUUID := fs.String("service-uuid", "b8b7e001-7a6b-4f4f-9a8b-11c0ffee0001", "BLE 服务 UUID") charUUID := fs.String("char-uuid", "b8b7e002-7a6b-4f4f-9a8b-11c0ffee0001", "BLE 特征 UUID") enabled := fs.Bool("enabled", true, "是否启用") fs.Parse(args[1:]) cfg := &database.BLEConfig{ DeviceName: *deviceName, ServiceUUID: *serviceUUID, CharUUID: *charUUID, Enabled: *enabled, } if err := db.SaveBLEConfig(cfg); err != nil { logger.Error("保存 BLE 配置失败: %v", err) fmt.Printf("保存失败: %v\n", err) return } logger.Info("BLE 配置已保存: %s", cfg.DeviceName) fmt.Println("配置已保存") case "delete": if len(args) < 2 { fmt.Println("必须指定配置 ID") return } id, err := strconv.Atoi(args[1]) if err != nil { logger.Warn("无效的配置 ID: %s", args[1]) fmt.Println("无效的 ID") return } if err := db.DeleteBLEConfig(id); err != nil { logger.Error("删除 BLE 配置失败: id=%d, %v", id, err) fmt.Printf("删除失败: %v\n", err) return } logger.Info("BLE 配置已删除: id=%d", id) fmt.Println("配置已删除") default: printBleConfigUsage() } } func printBleConfigUsage() { fmt.Println("用法: opencode-monitor config ble <子命令> [选项]") fmt.Println("") fmt.Println("子命令:") fmt.Println(" list 列出所有 BLE 配置") fmt.Println(" set 设置 BLE 配置") fmt.Println(" delete 删除 BLE 配置") fmt.Println("") fmt.Println("选项:") fmt.Println(" --device 蓝牙设备名称 (默认: AI-Light)") fmt.Println(" --service-uuid BLE 服务 UUID") fmt.Println(" --char-uuid BLE 特征 UUID") fmt.Println(" --enabled 是否启用 (true/false)") } func createCallback(mqttClient *mqttcli.Client, apiServer *api.Server, bleStdin io.Writer) monitor.EventCallback { lastStatus := make(map[int]string) var mu sync.Mutex publish := func(port int, status string, code string) { if mqttClient != nil { payload := map[string]interface{}{ "port": port, "status": status, "code": code, "timestamp": time.Now().Format(time.RFC3339), } if err := mqttClient.PublishRaw(mqttClient.GetTopic(), payload); err != nil { logger.Error("MQTT 发送失败: %v", err) fmt.Printf("MQTT 发送失败: %v\n", err) } } if apiServer != nil { apiServer.BroadcastStatus(port, status, code) } if bleStdin != nil { msg := fmt.Sprintf(`{"port":%d,"code":"%s"}`+"\n", port, code) if _, err := bleStdin.Write([]byte(msg)); err != nil { logger.Error("BLE 发送失败: %v", err) } } } return func(port int, evt *event.SSEEvent) { msg := event.FormatEvent(port, evt) if msg != "" { fmt.Println(msg) } if mqttClient == nil && apiServer == nil && bleStdin == nil { return } var status, code string switch evt.Type { case "session.status": if s, ok := evt.Properties["status"].(map[string]interface{}); ok { if t, ok := s["type"].(string); ok { code = t } status = event.ParseStatus(s) } case "session.idle": status = "空闲" code = "idle" case "message.part.updated": if part, ok := evt.Properties["part"].(map[string]interface{}); ok { switch part["type"].(string) { case "tool": if st, ok := part["state"].(map[string]interface{}); ok { if s, ok := st["status"].(string); ok { code = s } status = event.ParseToolState(st) } case "reasoning": status = "思考中" code = "reasoning" default: status = "使用工具中" code = "using_tool" } } case "permission.updated": code = "permission" if title, ok := evt.Properties["title"].(string); ok && title != "" { status = "等待权限: " + title } else { status = "等待权限" } case "session.error": status = "错误" code = "error" } if status != "" { mu.Lock() prev := lastStatus[port] if status == "空闲" && prev != "" && prev != "空闲" { publish(port, "会话完成", "session_completed") } lastStatus[port] = status mu.Unlock() publish(port, status, code) } } } func runFixedMode(ctx context.Context, host string, portsFlag string, callback monitor.EventCallback, sigChan chan os.Signal) { var ports []int for _, p := range strings.Split(portsFlag, ",") { p = strings.TrimSpace(p) if port, err := strconv.Atoi(p); err == nil { ports = append(ports, port) } } if len(ports) == 0 { fmt.Println("未指定端口") return } logger.Info("固定模式启动,监控端口: %v", ports) fmt.Printf("监控端口: %v\n", ports) fmt.Println("Ctrl+C 停止") fmt.Println(strings.Repeat("-", 40)) var wg sync.WaitGroup for _, port := range ports { wg.Add(1) go func(p int) { defer wg.Done() logger.Info("开始监控端口: %d", p) m := monitor.New(host, p, callback) m.Run(ctx) logger.Info("端口 %d 监控已停止", p) }(port) } <-sigChan logger.Info("收到停止信号,正在退出") fmt.Println("\n已停止") wg.Wait() logger.Info("所有监控协程已退出") } func runDynamicMode(ctx context.Context, host string, scanRange *[2]int, interval int, callback monitor.EventCallback, sigChan chan os.Signal) { scanner := discovery.NewScanner(host, scanRange) monitoredPorts := make(map[int]bool) runningMonitors := make(map[int]context.CancelFunc) var mu sync.Mutex startMonitor := func(port int) { mu.Lock() if _, exists := runningMonitors[port]; exists { mu.Unlock() return } monitorCtx, monitorCancel := context.WithCancel(ctx) runningMonitors[port] = monitorCancel mu.Unlock() go func() { logger.Info("开始监控端口: %d", port) fmt.Printf("开始监控端口: %d\n", port) m := monitor.New(host, port, callback) m.Run(monitorCtx) // 连接断开后清理记录,允许重新连接 mu.Lock() delete(runningMonitors, port) delete(monitoredPorts, port) mu.Unlock() logger.Info("端口 %d 监控已停止,等待重新连接", port) fmt.Printf("端口 %d 监控已停止,等待重新连接\n", port) }() } initial := scanner.Discover() if len(initial) == 0 { logger.Info("未找到运行中的 OpenCode 实例,等待自动检测") fmt.Println("未找到运行中的 OpenCode 实例") fmt.Println("请先执行: opencode serve --port 4096") fmt.Println("启动后会自动检测,等待中...") } for _, port := range initial { monitoredPorts[port] = true startMonitor(port) } if len(monitoredPorts) > 0 { ports := make([]int, 0, len(monitoredPorts)) for p := range monitoredPorts { ports = append(ports, p) } sort.Ints(ports) logger.Info("找到 %d 个实例: %v", len(monitoredPorts), ports) fmt.Printf("找到 %d 个实例: %v\n", len(monitoredPorts), ports) } logger.Info("动态模式启动,每 %d 秒扫描新实例", interval) fmt.Printf("每 %d 秒扫描新实例,Ctrl+C 停止\n", interval) fmt.Println(strings.Repeat("-", 40)) scanTicker := time.NewTicker(time.Duration(interval) * time.Second) defer scanTicker.Stop() go func() { for { select { case <-ctx.Done(): return case <-scanTicker.C: newPorts := scanner.Discover() if len(newPorts) > 0 { logger.Debug("扫描到 %d 个端口: %v", len(newPorts), newPorts) } for _, port := range newPorts { mu.Lock() alreadyMonitored := monitoredPorts[port] if !alreadyMonitored { monitoredPorts[port] = true } mu.Unlock() if !alreadyMonitored { logger.Info("发现新实例端口: %d,开始监控", port) startMonitor(port) } } } } }() <-sigChan logger.Info("收到停止信号,正在退出") fmt.Println("\n已停止") }