package main import ( "context" "flag" "fmt" "os" "os/signal" "sort" "strconv" "strings" "sync" "syscall" "time" "ai-status-light/internal/api" "ai-status-light/internal/database" "ai-status-light/internal/discovery" "ai-status-light/internal/event" "ai-status-light/internal/logger" "ai-status-light/internal/monitor" mqttcli "ai-status-light/internal/mqtt" ) const defaultDBPath = "./data/config.db" var Version = "dev" func main() { if len(os.Args) < 2 { printUsage() return } switch os.Args[1] { case "monitor": runMonitor(os.Args[2:]) case "config": runConfig(os.Args[2:]) case "serve": runServe(os.Args[2:]) case "version": fmt.Printf("opencode-monitor %s\n", Version) default: printUsage() } } func printUsage() { fmt.Printf("opencode-monitor %s\n\n", Version) fmt.Println("用法: opencode-monitor <命令> [选项]") fmt.Println("") fmt.Println("命令:") fmt.Println(" monitor 启动监控") fmt.Println(" config 管理 MQTT 配置") fmt.Println(" serve 启动 API 服务") fmt.Println(" version 显示版本信息") fmt.Println("") fmt.Println("运行 'opencode-monitor <命令> -h' 查看命令帮助") } func runServe(args []string) { fs := flag.NewFlagSet("serve", flag.ExitOnError) addr := fs.String("addr", ":8080", "监听地址") dbPath := fs.String("db", defaultDBPath, "数据库路径") logFile := fs.String("log-file", "./logs", "日志文件路径(默认 ./logs/monitor.log)") logLevel := fs.String("log-level", "info", "日志级别 (debug/info/warn/error)") tls := fs.Bool("tls", false, "启用 HTTPS (使用自签名证书)") tlsCert := fs.String("tls-cert", "./data/tls/cert.pem", "TLS 证书文件路径") tlsKey := fs.String("tls-key", "./data/tls/key.pem", "TLS 私钥文件路径") fs.Parse(args) logger.SetLevel(logger.ParseLevel(*logLevel)) if err := logger.InitFileLog(*logFile); err != nil { fmt.Printf("初始化日志文件失败: %v\n", err) return } defer logger.Close() db, err := database.New(*dbPath) if err != nil { logger.Error("打开数据库失败: %v", err) fmt.Printf("打开数据库失败: %v\n", err) return } defer db.Close() logger.Info("数据库已连接: %s", *dbPath) server := api.New(db, *addr) if *tls { if err := api.EnsureSelfSignedCert(*tlsCert, *tlsKey); err != nil { logger.Error("生成自签名证书失败: %v", err) fmt.Printf("生成自签名证书失败: %v\n", err) return } server.EnableTLS(*tlsCert, *tlsKey) logger.Info("HTTPS 已启用") fmt.Println("HTTPS 已启用 (自签名证书)") } else { fmt.Printf("API 服务已启动: %s\n", *addr) } fmt.Println("接口文档:") fmt.Println(" GET /api/health - 健康检查") fmt.Println(" GET /api/mqtt - 获取所有配置") fmt.Println(" POST /api/mqtt - 创建配置") fmt.Println(" GET /api/mqtt/:id - 获取单个配置") fmt.Println(" PUT /api/mqtt/:id - 更新配置") fmt.Println(" DELETE /api/mqtt/:id - 删除配置") if err := server.Start(); err != nil { logger.Error("服务启动失败: %v", err) fmt.Printf("服务启动失败: %v\n", err) } } func runMonitor(args []string) { fs := flag.NewFlagSet("monitor", flag.ExitOnError) host := fs.String("host", "127.0.0.1", "主机地址") portsFlag := fs.String("ports", "", "端口列表,逗号分隔 (如: 4096,4097,4098)") scanFlag := fs.String("scan", "", "扫描端口范围 (如: 4096-4100)") intervalFlag := fs.Int("interval", 1, "动态扫描间隔(秒), 默认1") dbPath := fs.String("db", defaultDBPath, "数据库路径") apiAddr := fs.String("api-addr", "", "API 服务地址 (如: :8080)") tls := fs.Bool("tls", false, "启用 HTTPS (使用自签名证书)") tlsCert := fs.String("tls-cert", "./data/tls/cert.pem", "TLS 证书文件路径") tlsKey := fs.String("tls-key", "./data/tls/key.pem", "TLS 私钥文件路径") logFile := fs.String("log-file", "./logs", "日志文件路径(默认 ./logs/monitor.log)") logLevel := fs.String("log-level", "info", "日志级别 (debug/info/warn/error)") fs.Parse(args) logger.SetLevel(logger.ParseLevel(*logLevel)) if err := logger.InitFileLog(*logFile); err != nil { fmt.Printf("初始化日志文件失败: %v\n", err) return } defer logger.Close() var scanRange *[2]int if *scanFlag != "" { parts := strings.Split(*scanFlag, "-") if len(parts) == 2 { start, err1 := strconv.Atoi(parts[0]) end, err2 := strconv.Atoi(parts[1]) if err1 == nil && err2 == nil { scanRange = &[2]int{start, end} } } } ctx, cancel := context.WithCancel(context.Background()) defer cancel() sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) db, err := database.New(*dbPath) if err != nil { logger.Error("打开数据库失败: %v", err) fmt.Printf("打开数据库失败: %v\n", err) return } defer db.Close() logger.Info("数据库已连接: %s", *dbPath) var mqttClient *mqttcli.Client cfg, err := db.GetMQTTConfig() if err != nil { logger.Error("读取 MQTT 配置失败: %v", err) fmt.Printf("读取 MQTT 配置失败: %v\n", err) } else if cfg != nil { mqttClient = mqttcli.NewFromConfig(cfg) if err := mqttClient.Connect(); err != nil { logger.Error("MQTT 连接失败: %v", err) fmt.Printf("MQTT 连接失败: %v\n", err) mqttClient = nil } else { defer mqttClient.Disconnect() logger.Info("MQTT 已连接: %s (主题: %s)", cfg.Broker, cfg.Topic) fmt.Printf("MQTT 已连接: %s (主题: %s)\n", cfg.Broker, cfg.Topic) } } else { logger.Info("未配置 MQTT,跳过 MQTT 连接") } var apiServer *api.Server if *apiAddr != "" { apiServer = api.New(db, *apiAddr) if *tls { if err := api.EnsureSelfSignedCert(*tlsCert, *tlsKey); err != nil { logger.Error("生成自签名证书失败: %v", err) fmt.Printf("生成自签名证书失败: %v\n", err) return } apiServer.EnableTLS(*tlsCert, *tlsKey) } go func() { scheme := "http" if *tls { scheme = "https" } logger.Info("API 服务已启动: %s://%s", scheme, *apiAddr) fmt.Printf("API 服务已启动: %s://%s\n", scheme, *apiAddr) if err := apiServer.Start(); err != nil { logger.Error("API 服务失败: %v", err) fmt.Printf("API 服务失败: %v\n", err) } }() } callback := createCallback(mqttClient, apiServer) if *portsFlag != "" { runFixedMode(ctx, *host, *portsFlag, callback, sigChan) cancel() return } if scanRange != nil { fmt.Printf("扫描端口范围 %d-%d...\n", scanRange[0], scanRange[1]) } else { fmt.Println("查找 OpenCode 实例...") } runDynamicMode(ctx, *host, scanRange, *intervalFlag, callback, sigChan) cancel() } func runConfig(args []string) { if len(args) < 1 { printConfigUsage() return } dbPath := defaultDBPath logFile := "./logs" logLevel := "info" for i, arg := range args { if arg == "--db" && i+1 < len(args) { dbPath = args[i+1] } if arg == "--log-file" && i+1 < len(args) { logFile = args[i+1] } if arg == "--log-level" && i+1 < len(args) { logLevel = args[i+1] } } logger.SetLevel(logger.ParseLevel(logLevel)) if err := logger.InitFileLog(logFile); err != nil { fmt.Printf("初始化日志文件失败: %v\n", err) return } defer logger.Close() // 过滤全局选项,避免传递给子命令的 FlagSet var filtered []string for i := 0; i < len(args); i++ { switch args[i] { case "--db", "--log-file", "--log-level": i++ // 跳过值 default: filtered = append(filtered, args[i]) } } args = filtered db, err := database.New(dbPath) if err != nil { logger.Error("打开数据库失败: %v", err) fmt.Printf("打开数据库失败: %v\n", err) return } defer db.Close() logger.Info("数据库已连接: %s", dbPath) switch args[0] { case "list": configs, err := db.ListMQTTConfigs() if err != nil { logger.Error("查询配置失败: %v", err) fmt.Printf("查询失败: %v\n", err) return } if len(configs) == 0 { fmt.Println("未配置 MQTT") return } logger.Info("查询到 %d 条 MQTT 配置", len(configs)) for _, cfg := range configs { status := "禁用" if cfg.Enabled { status = "启用" } auth := "" if cfg.Username != "" { auth = fmt.Sprintf(" [认证: %s]", cfg.Username) } fmt.Printf("[%d] %s | %s | %s%s | %s\n", cfg.ID, cfg.Broker, cfg.ClientID, cfg.Topic, auth, status) } case "set": fs := flag.NewFlagSet("config set", flag.ExitOnError) broker := fs.String("broker", "", "MQTT Broker 地址 (如: tcp://127.0.0.1:1883)") clientID := fs.String("client-id", "opencode-monitor", "MQTT 客户端 ID") username := fs.String("username", "", "MQTT 用户名") password := fs.String("password", "", "MQTT 密码") topic := fs.String("topic", "opencode/status", "MQTT 主题") enabled := fs.Bool("enabled", true, "是否启用") fs.Parse(args[1:]) if *broker == "" { fmt.Println("必须指定 --broker") return } cfg := &database.MQTTConfig{ Broker: *broker, ClientID: *clientID, Username: *username, Password: *password, Topic: *topic, Enabled: *enabled, } if err := db.SaveMQTTConfig(cfg); err != nil { logger.Error("保存 MQTT 配置失败: %v", err) fmt.Printf("保存失败: %v\n", err) return } logger.Info("MQTT 配置已保存: %s (主题: %s)", cfg.Broker, cfg.Topic) fmt.Println("配置已保存") case "delete": if len(args) < 2 { fmt.Println("必须指定配置 ID") return } id, err := strconv.Atoi(args[1]) if err != nil { logger.Warn("无效的配置 ID: %s", args[1]) fmt.Println("无效的 ID") return } if err := db.DeleteMQTTConfig(id); err != nil { logger.Error("删除配置失败: id=%d, %v", id, err) fmt.Printf("删除失败: %v\n", err) return } logger.Info("MQTT 配置已删除: id=%d", id) fmt.Println("配置已删除") default: printConfigUsage() } } func printConfigUsage() { fmt.Println("用法: opencode-monitor config <子命令> [选项]") fmt.Println("") fmt.Println("子命令:") fmt.Println(" list 列出所有配置") fmt.Println(" set 设置 MQTT 配置") fmt.Println(" delete 删除配置") fmt.Println("") fmt.Println("选项:") fmt.Println(" --broker MQTT Broker 地址") fmt.Println(" --client-id MQTT 客户端 ID") fmt.Println(" --username MQTT 用户名") fmt.Println(" --password MQTT 密码") fmt.Println(" --topic MQTT 主题") fmt.Println(" --enabled 是否启用 (true/false)") fmt.Println(" --db 数据库路径") fmt.Println(" --log-file 日志文件路径(默认 ./logs/monitor.log)") fmt.Println(" --log-level 日志级别 (debug/info/warn/error)") } func createCallback(mqttClient *mqttcli.Client, apiServer *api.Server) monitor.EventCallback { lastStatus := make(map[int]string) var mu sync.Mutex publish := func(port int, status string, code string) { if mqttClient != nil { payload := map[string]interface{}{ "port": port, "status": status, "code": code, "timestamp": time.Now().Format(time.RFC3339), } if err := mqttClient.PublishRaw(mqttClient.GetTopic(), payload); err != nil { logger.Error("MQTT 发送失败: %v", err) fmt.Printf("MQTT 发送失败: %v\n", err) } } if apiServer != nil { apiServer.BroadcastStatus(port, status, code) } } return func(port int, evt *event.SSEEvent) { msg := event.FormatEvent(port, evt) if msg != "" { fmt.Println(msg) } if mqttClient == nil && apiServer == nil { return } var status, code string switch evt.Type { case "session.status": if s, ok := evt.Properties["status"].(map[string]interface{}); ok { if t, ok := s["type"].(string); ok { code = t } status = event.ParseStatus(s) } case "session.idle": status = "空闲" code = "idle" case "message.part.updated": if part, ok := evt.Properties["part"].(map[string]interface{}); ok { switch part["type"].(string) { case "tool": if st, ok := part["state"].(map[string]interface{}); ok { if s, ok := st["status"].(string); ok { code = s } status = event.ParseToolState(st) } case "reasoning": status = "思考中" code = "reasoning" default: status = "使用工具中" code = "using_tool" } } case "permission.updated": code = "permission" if title, ok := evt.Properties["title"].(string); ok && title != "" { status = "等待权限: " + title } else { status = "等待权限" } case "session.error": status = "错误" code = "error" } if status != "" { mu.Lock() prev := lastStatus[port] if status == "空闲" && prev != "" && prev != "空闲" { publish(port, "会话完成", "session_completed") } lastStatus[port] = status mu.Unlock() publish(port, status, code) } } } func runFixedMode(ctx context.Context, host string, portsFlag string, callback monitor.EventCallback, sigChan chan os.Signal) { var ports []int for _, p := range strings.Split(portsFlag, ",") { p = strings.TrimSpace(p) if port, err := strconv.Atoi(p); err == nil { ports = append(ports, port) } } if len(ports) == 0 { fmt.Println("未指定端口") return } logger.Info("固定模式启动,监控端口: %v", ports) fmt.Printf("监控端口: %v\n", ports) fmt.Println("Ctrl+C 停止") fmt.Println(strings.Repeat("-", 40)) var wg sync.WaitGroup for _, port := range ports { wg.Add(1) go func(p int) { defer wg.Done() logger.Info("开始监控端口: %d", p) m := monitor.New(host, p, callback) m.Run(ctx) logger.Info("端口 %d 监控已停止", p) }(port) } <-sigChan logger.Info("收到停止信号,正在退出") fmt.Println("\n已停止") wg.Wait() logger.Info("所有监控协程已退出") } func runDynamicMode(ctx context.Context, host string, scanRange *[2]int, interval int, callback monitor.EventCallback, sigChan chan os.Signal) { scanner := discovery.NewScanner(host, scanRange) monitoredPorts := make(map[int]bool) runningMonitors := make(map[int]context.CancelFunc) var mu sync.Mutex startMonitor := func(port int) { mu.Lock() if _, exists := runningMonitors[port]; exists { mu.Unlock() return } monitorCtx, monitorCancel := context.WithCancel(ctx) runningMonitors[port] = monitorCancel mu.Unlock() go func() { logger.Info("开始监控端口: %d", port) fmt.Printf("开始监控端口: %d\n", port) m := monitor.New(host, port, callback) m.Run(monitorCtx) // 连接断开后清理记录,允许重新连接 mu.Lock() delete(runningMonitors, port) delete(monitoredPorts, port) mu.Unlock() logger.Info("端口 %d 监控已停止,等待重新连接", port) fmt.Printf("端口 %d 监控已停止,等待重新连接\n", port) }() } initial := scanner.Discover() if len(initial) == 0 { logger.Info("未找到运行中的 OpenCode 实例,等待自动检测") fmt.Println("未找到运行中的 OpenCode 实例") fmt.Println("请先执行: opencode serve --port 4096") fmt.Println("启动后会自动检测,等待中...") } for _, port := range initial { monitoredPorts[port] = true startMonitor(port) } if len(monitoredPorts) > 0 { ports := make([]int, 0, len(monitoredPorts)) for p := range monitoredPorts { ports = append(ports, p) } sort.Ints(ports) logger.Info("找到 %d 个实例: %v", len(monitoredPorts), ports) fmt.Printf("找到 %d 个实例: %v\n", len(monitoredPorts), ports) } logger.Info("动态模式启动,每 %d 秒扫描新实例", interval) fmt.Printf("每 %d 秒扫描新实例,Ctrl+C 停止\n", interval) fmt.Println(strings.Repeat("-", 40)) scanTicker := time.NewTicker(time.Duration(interval) * time.Second) defer scanTicker.Stop() go func() { for { select { case <-ctx.Done(): return case <-scanTicker.C: newPorts := scanner.Discover() if len(newPorts) > 0 { logger.Debug("扫描到 %d 个端口: %v", len(newPorts), newPorts) } for _, port := range newPorts { mu.Lock() alreadyMonitored := monitoredPorts[port] if !alreadyMonitored { monitoredPorts[port] = true } mu.Unlock() if !alreadyMonitored { logger.Info("发现新实例端口: %d,开始监控", port) startMonitor(port) } } } } }() <-sigChan logger.Info("收到停止信号,正在退出") fmt.Println("\n已停止") }