| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- package main
- import (
- "context"
- "flag"
- "fmt"
- "os"
- "os/signal"
- "sort"
- "strconv"
- "strings"
- "sync"
- "syscall"
- "time"
- "AI-Status-Light/internal/api"
- "AI-Status-Light/internal/database"
- "AI-Status-Light/internal/discovery"
- "AI-Status-Light/internal/event"
- "AI-Status-Light/internal/monitor"
- mqttcli "AI-Status-Light/internal/mqtt"
- )
- const defaultDBPath = "./data/config.db"
- var Version = "dev"
- func main() {
- if len(os.Args) < 2 {
- printUsage()
- return
- }
- switch os.Args[1] {
- case "monitor":
- runMonitor(os.Args[2:])
- case "config":
- runConfig(os.Args[2:])
- case "serve":
- runServe(os.Args[2:])
- case "version":
- fmt.Printf("opencode-monitor %s\n", Version)
- default:
- printUsage()
- }
- }
- func printUsage() {
- fmt.Printf("opencode-monitor %s\n\n", Version)
- fmt.Println("用法: opencode-monitor <命令> [选项]")
- fmt.Println("")
- fmt.Println("命令:")
- fmt.Println(" monitor 启动监控")
- fmt.Println(" config 管理 MQTT 配置")
- fmt.Println(" serve 启动 API 服务")
- fmt.Println(" version 显示版本信息")
- fmt.Println("")
- fmt.Println("运行 'opencode-monitor <命令> -h' 查看命令帮助")
- }
- func runServe(args []string) {
- fs := flag.NewFlagSet("serve", flag.ExitOnError)
- addr := fs.String("addr", ":8080", "监听地址")
- dbPath := fs.String("db", defaultDBPath, "数据库路径")
- fs.Parse(args)
- db, err := database.New(*dbPath)
- if err != nil {
- fmt.Printf("打开数据库失败: %v\n", err)
- return
- }
- defer db.Close()
- server := api.New(db, *addr)
- fmt.Printf("API 服务已启动: %s\n", *addr)
- fmt.Println("接口文档:")
- fmt.Println(" GET /api/health - 健康检查")
- fmt.Println(" GET /api/mqtt - 获取所有配置")
- fmt.Println(" POST /api/mqtt - 创建配置")
- fmt.Println(" GET /api/mqtt/:id - 获取单个配置")
- fmt.Println(" PUT /api/mqtt/:id - 更新配置")
- fmt.Println(" DELETE /api/mqtt/:id - 删除配置")
- if err := server.Start(); err != nil {
- fmt.Printf("服务启动失败: %v\n", err)
- }
- }
- func runMonitor(args []string) {
- fs := flag.NewFlagSet("monitor", flag.ExitOnError)
- host := fs.String("host", "127.0.0.1", "主机地址")
- portsFlag := fs.String("ports", "", "端口列表,逗号分隔 (如: 4096,4097,4098)")
- scanFlag := fs.String("scan", "", "扫描端口范围 (如: 4096-4100)")
- pidFlag := fs.Bool("pid", false, "通过进程号查找端口")
- intervalFlag := fs.Int("interval", 5, "动态扫描间隔(秒), 默认5")
- dbPath := fs.String("db", defaultDBPath, "数据库路径")
- apiAddr := fs.String("api-addr", "", "API 服务地址 (如: :8080)")
- fs.Parse(args)
- var scanRange *[2]int
- if *scanFlag != "" {
- parts := strings.Split(*scanFlag, "-")
- if len(parts) == 2 {
- start, err1 := strconv.Atoi(parts[0])
- end, err2 := strconv.Atoi(parts[1])
- if err1 == nil && err2 == nil {
- scanRange = &[2]int{start, end}
- }
- }
- }
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
- db, err := database.New(*dbPath)
- if err != nil {
- fmt.Printf("打开数据库失败: %v\n", err)
- return
- }
- defer db.Close()
- if *apiAddr != "" {
- apiServer := api.New(db, *apiAddr)
- go func() {
- fmt.Printf("API 服务已启动: %s\n", *apiAddr)
- if err := apiServer.Start(); err != nil {
- fmt.Printf("API 服务失败: %v\n", err)
- }
- }()
- }
- var mqttClient *mqttcli.Client
- cfg, err := db.GetMQTTConfig()
- if err != nil {
- fmt.Printf("读取 MQTT 配置失败: %v\n", err)
- } else if cfg != nil {
- mqttClient = mqttcli.NewFromConfig(cfg)
- if err := mqttClient.Connect(); err != nil {
- fmt.Printf("MQTT 连接失败: %v\n", err)
- mqttClient = nil
- } else {
- defer mqttClient.Disconnect()
- fmt.Printf("MQTT 已连接: %s (主题: %s)\n", cfg.Broker, cfg.Topic)
- }
- }
- callback := createCallback(mqttClient)
- if *portsFlag != "" {
- runFixedMode(ctx, *host, *portsFlag, callback, sigChan)
- cancel()
- return
- }
- if scanRange != nil {
- fmt.Printf("扫描端口范围 %d-%d...\n", scanRange[0], scanRange[1])
- } else if *pidFlag {
- fmt.Println("通过进程号查找 OpenCode 端口...")
- } else {
- fmt.Println("查找 OpenCode 实例...")
- }
- runDynamicMode(ctx, *host, scanRange, *intervalFlag, callback, sigChan)
- cancel()
- }
- func runConfig(args []string) {
- if len(args) < 1 {
- printConfigUsage()
- return
- }
- dbPath := defaultDBPath
- for i, arg := range args {
- if arg == "--db" && i+1 < len(args) {
- dbPath = args[i+1]
- break
- }
- }
- db, err := database.New(dbPath)
- if err != nil {
- fmt.Printf("打开数据库失败: %v\n", err)
- return
- }
- defer db.Close()
- switch args[0] {
- case "list":
- configs, err := db.ListMQTTConfigs()
- if err != nil {
- fmt.Printf("查询失败: %v\n", err)
- return
- }
- if len(configs) == 0 {
- fmt.Println("未配置 MQTT")
- return
- }
- for _, cfg := range configs {
- status := "禁用"
- if cfg.Enabled {
- status = "启用"
- }
- fmt.Printf("[%d] %s | %s | %s | %s\n", cfg.ID, cfg.Broker, cfg.ClientID, cfg.Topic, status)
- }
- case "set":
- fs := flag.NewFlagSet("config set", flag.ExitOnError)
- broker := fs.String("broker", "", "MQTT Broker 地址 (如: tcp://127.0.0.1:1883)")
- clientID := fs.String("client-id", "opencode-monitor", "MQTT 客户端 ID")
- topic := fs.String("topic", "opencode/status", "MQTT 主题")
- enabled := fs.Bool("enabled", true, "是否启用")
- fs.Parse(args[1:])
- if *broker == "" {
- fmt.Println("必须指定 --broker")
- return
- }
- cfg := &database.MQTTConfig{
- Broker: *broker,
- ClientID: *clientID,
- Topic: *topic,
- Enabled: *enabled,
- }
- if err := db.SaveMQTTConfig(cfg); err != nil {
- fmt.Printf("保存失败: %v\n", err)
- return
- }
- fmt.Println("配置已保存")
- case "delete":
- if len(args) < 2 {
- fmt.Println("必须指定配置 ID")
- return
- }
- id, err := strconv.Atoi(args[1])
- if err != nil {
- fmt.Println("无效的 ID")
- return
- }
- if err := db.DeleteMQTTConfig(id); err != nil {
- fmt.Printf("删除失败: %v\n", err)
- return
- }
- fmt.Println("配置已删除")
- default:
- printConfigUsage()
- }
- }
- func printConfigUsage() {
- fmt.Println("用法: opencode-monitor config <子命令> [选项]")
- fmt.Println("")
- fmt.Println("子命令:")
- fmt.Println(" list 列出所有配置")
- fmt.Println(" set 设置 MQTT 配置")
- fmt.Println(" delete <id> 删除配置")
- fmt.Println("")
- fmt.Println("选项:")
- fmt.Println(" --broker MQTT Broker 地址")
- fmt.Println(" --client-id MQTT 客户端 ID")
- fmt.Println(" --topic MQTT 主题")
- fmt.Println(" --enabled 是否启用 (true/false)")
- fmt.Println(" --db 数据库路径")
- }
- func createCallback(mqttClient *mqttcli.Client) monitor.EventCallback {
- return func(port int, evt *event.SSEEvent) {
- msg := event.FormatEvent(port, evt)
- if msg != "" {
- fmt.Println(msg)
- }
- if mqttClient == nil {
- return
- }
- var evtType, status, tool, state, title string
- evtType = evt.Type
- switch evt.Type {
- case "session.status":
- if s, ok := evt.Properties["status"].(map[string]interface{}); ok {
- status = event.ParseStatus(s)
- }
- case "session.idle":
- status = "空闲"
- case "message.part.updated":
- if part, ok := evt.Properties["part"].(map[string]interface{}); ok {
- pt, _ := part["type"].(string)
- switch pt {
- case "tool":
- tool, _ = part["tool"].(string)
- if st, ok := part["state"].(map[string]interface{}); ok {
- state = event.ParseToolState(st)
- }
- case "reasoning":
- state = "思考中"
- }
- }
- case "permission.updated":
- title, _ = evt.Properties["title"].(string)
- case "session.error":
- status = "错误"
- }
- if err := mqttClient.Publish(port, evtType, status, tool, state, title); err != nil {
- fmt.Printf("MQTT 发送失败: %v\n", err)
- }
- }
- }
- func runFixedMode(ctx context.Context, host string, portsFlag string, callback monitor.EventCallback, sigChan chan os.Signal) {
- var ports []int
- for _, p := range strings.Split(portsFlag, ",") {
- p = strings.TrimSpace(p)
- if port, err := strconv.Atoi(p); err == nil {
- ports = append(ports, port)
- }
- }
- if len(ports) == 0 {
- fmt.Println("未指定端口")
- return
- }
- fmt.Printf("监控端口: %v\n", ports)
- fmt.Println("Ctrl+C 停止")
- fmt.Println(strings.Repeat("-", 40))
- var wg sync.WaitGroup
- for _, port := range ports {
- wg.Add(1)
- go func(p int) {
- defer wg.Done()
- m := monitor.New(host, p, callback)
- m.Run(ctx)
- }(port)
- }
- <-sigChan
- fmt.Println("\n已停止")
- wg.Wait()
- }
- func runDynamicMode(ctx context.Context, host string, scanRange *[2]int, interval int, callback monitor.EventCallback, sigChan chan os.Signal) {
- scanner := discovery.NewScanner(host, scanRange)
- monitoredPorts := make(map[int]bool)
- runningMonitors := make(map[int]context.CancelFunc)
- var mu sync.Mutex
- startMonitor := func(port int) {
- mu.Lock()
- if _, exists := runningMonitors[port]; exists {
- mu.Unlock()
- return
- }
- monitorCtx, monitorCancel := context.WithCancel(ctx)
- runningMonitors[port] = monitorCancel
- mu.Unlock()
- go func() {
- m := monitor.New(host, port, callback)
- m.Run(monitorCtx)
- }()
- }
- initial := scanner.Discover()
- if len(initial) == 0 {
- fmt.Println("未找到运行中的 OpenCode 实例")
- fmt.Println("请先执行: opencode serve --port 4096")
- fmt.Println("启动后会自动检测,等待中...")
- }
- for _, port := range initial {
- monitoredPorts[port] = true
- startMonitor(port)
- }
- if len(monitoredPorts) > 0 {
- ports := make([]int, 0, len(monitoredPorts))
- for p := range monitoredPorts {
- ports = append(ports, p)
- }
- sort.Ints(ports)
- fmt.Printf("找到 %d 个实例: %v\n", len(monitoredPorts), ports)
- }
- fmt.Printf("每 %d 秒扫描新实例,Ctrl+C 停止\n", interval)
- fmt.Println(strings.Repeat("-", 40))
- scanTicker := time.NewTicker(time.Duration(interval) * time.Second)
- defer scanTicker.Stop()
- go func() {
- for {
- select {
- case <-ctx.Done():
- return
- case <-scanTicker.C:
- newPorts := scanner.Discover()
- for _, port := range newPorts {
- mu.Lock()
- if !monitoredPorts[port] {
- monitoredPorts[port] = true
- startMonitor(port)
- }
- mu.Unlock()
- }
- }
- }
- }()
- <-sigChan
- fmt.Println("\n已停止")
- }
|