Răsfoiți Sursa

修复mqtt bug

moki 3 săptămâni în urmă
părinte
comite
7586574208

+ 181 - 0
README.md

@@ -0,0 +1,181 @@
+# OpenCode Monitor
+
+OpenCode 状态监控工具,支持实时监控多个 OpenCode 实例的状态,并通过 MQTT 推送状态信息。
+
+## 功能特性
+
+- 🔍 **自动发现** - 自动扫描并发现运行中的 OpenCode 实例
+- 📊 **实时监控** - 通过 SSE 事件流实时获取状态变化
+- 📡 **MQTT 推送** - 支持将状态信息推送到 MQTT Broker
+- 💾 **配置管理** - 使用 SQLite 存储 MQTT 配置
+- 🌐 **HTTP API** - 提供 RESTful API 接口管理配置
+- 🖥️ **跨平台** - 支持 Linux、macOS、Windows
+
+## 安装
+
+### 下载预编译版本
+
+从 `dist/` 目录下载对应平台的可执行文件:
+
+- `opencode-monitor-linux-amd64` - Linux x86_64
+- `opencode-monitor-windows-amd64.exe` - Windows x86_64
+
+### 从源码编译
+
+```bash
+# 克隆项目
+git clone <repository-url>
+cd AI-Status-Light
+
+# 编译当前平台
+go build -o bin/opencode-monitor ./cmd/monitor
+
+# 编译所有平台
+make build-all
+```
+
+## 使用方法
+
+### 启动监控
+
+```bash
+# 自动发现并监控所有 OpenCode 实例
+./opencode-monitor monitor
+
+# 监控指定端口
+./opencode-monitor monitor --ports 4096,4097
+
+# 启动监控 + API 服务
+./opencode-monitor monitor --api-addr :8080
+```
+
+### API 服务
+
+```bash
+# 启动独立的 API 服务
+./opencode-monitor serve --addr :8080
+```
+
+### 配置管理
+
+```bash
+# 查看 MQTT 配置
+./opencode-monitor config list
+
+# 设置 MQTT 配置
+./opencode-monitor config set --broker tcp://127.0.0.1:1883 --topic opencode/status
+
+# 删除配置
+./opencode-monitor config delete 1
+```
+
+## API 接口
+
+| 方法 | 路径 | 说明 |
+|------|------|------|
+| GET | /api/health | 健康检查 |
+| GET | /api/mqtt | 获取所有配置 |
+| POST | /api/mqtt | 创建配置 |
+| GET | /api/mqtt/:id | 获取单个配置 |
+| PUT | /api/mqtt/:id | 更新配置 |
+| DELETE | /api/mqtt/:id | 删除配置 |
+
+### 请求示例
+
+```bash
+# 创建 MQTT 配置
+curl -X POST http://localhost:8080/api/mqtt \
+  -H "Content-Type: application/json" \
+  -d '{"broker":"tcp://127.0.0.1:1883","topic":"opencode/status"}'
+
+# 获取所有配置
+curl http://localhost:8080/api/mqtt
+```
+
+## MQTT 消息格式
+
+```json
+{
+  "port": 4096,
+  "type": "session.status",
+  "status": "忙碌",
+  "tool": "bash",
+  "state": "运行中: ls -la",
+  "title": "",
+  "timestamp": "2026-06-03T10:30:00Z"
+}
+```
+
+### 消息类型
+
+| 类型 | 说明 |
+|------|------|
+| session.status | 会话状态变化 |
+| session.idle | 会话空闲 |
+| message.part.updated | 消息部分更新 |
+| permission.updated | 权限请求 |
+| session.error | 会话错误 |
+
+## 命令行参数
+
+### monitor 命令
+
+```bash
+./opencode-monitor monitor [选项]
+
+选项:
+  --host string        主机地址 (默认 "127.0.0.1")
+  --ports string       端口列表,逗号分隔 (如: 4096,4097,4098)
+  --scan string        扫描端口范围 (如: 4096-4100)
+  --pid                通过进程号查找端口
+  --interval int       动态扫描间隔(秒) (默认 5)
+  --db string          数据库路径 (默认 "./data/config.db")
+  --api-addr string    API 服务地址 (如: :8080)
+```
+
+### serve 命令
+
+```bash
+./opencode-monitor serve [选项]
+
+选项:
+  --addr string    监听地址 (默认 ":8080")
+  --db string      数据库路径 (默认 "./data/config.db")
+```
+
+## 项目结构
+
+```
+AI-Status-Light/
+├── cmd/monitor/          # 程序入口
+├── internal/
+│   ├── api/              # HTTP API 模块
+│   ├── database/         # SQLite 数据库模块
+│   ├── discovery/        # 端口发现模块
+│   ├── event/            # 事件处理模块
+│   ├── monitor/          # 监控器核心模块
+│   └── mqtt/             # MQTT 客户端模块
+├── docs/                 # 文档
+├── dist/                 # 编译产物
+└── Makefile              # 构建脚本
+```
+
+## 开发
+
+```bash
+# 安装依赖
+go mod tidy
+
+# 运行测试
+go test ./...
+
+# 构建
+make build
+
+# 构建所有平台
+make build-all
+```
+
+## 许可证
+
+MIT License

+ 26 - 12
cmd/monitor/main.go

@@ -204,13 +204,19 @@ func runConfig(args []string) {
 			if cfg.Enabled {
 				status = "启用"
 			}
-			fmt.Printf("[%d] %s | %s | %s | %s\n", cfg.ID, cfg.Broker, cfg.ClientID, cfg.Topic, status)
+			auth := ""
+			if cfg.Username != "" {
+				auth = fmt.Sprintf(" [认证: %s]", cfg.Username)
+			}
+			fmt.Printf("[%d] %s | %s | %s%s | %s\n", cfg.ID, cfg.Broker, cfg.ClientID, cfg.Topic, auth, status)
 		}
 
 	case "set":
 		fs := flag.NewFlagSet("config set", flag.ExitOnError)
 		broker := fs.String("broker", "", "MQTT Broker 地址 (如: tcp://127.0.0.1:1883)")
 		clientID := fs.String("client-id", "opencode-monitor", "MQTT 客户端 ID")
+		username := fs.String("username", "", "MQTT 用户名")
+		password := fs.String("password", "", "MQTT 密码")
 		topic := fs.String("topic", "opencode/status", "MQTT 主题")
 		enabled := fs.Bool("enabled", true, "是否启用")
 		fs.Parse(args[1:])
@@ -223,6 +229,8 @@ func runConfig(args []string) {
 		cfg := &database.MQTTConfig{
 			Broker:   *broker,
 			ClientID: *clientID,
+			Username: *username,
+			Password: *password,
 			Topic:    *topic,
 			Enabled:  *enabled,
 		}
@@ -265,6 +273,8 @@ func printConfigUsage() {
 	fmt.Println("选项:")
 	fmt.Println("  --broker          MQTT Broker 地址")
 	fmt.Println("  --client-id       MQTT 客户端 ID")
+	fmt.Println("  --username        MQTT 用户名")
+	fmt.Println("  --password        MQTT 密码")
 	fmt.Println("  --topic           MQTT 主题")
 	fmt.Println("  --enabled         是否启用 (true/false)")
 	fmt.Println("  --db              数据库路径")
@@ -281,8 +291,7 @@ func createCallback(mqttClient *mqttcli.Client) monitor.EventCallback {
 			return
 		}
 
-		var evtType, status, tool, state, title string
-		evtType = evt.Type
+		var status string
 
 		switch evt.Type {
 		case "session.status":
@@ -293,25 +302,23 @@ func createCallback(mqttClient *mqttcli.Client) monitor.EventCallback {
 			status = "空闲"
 		case "message.part.updated":
 			if part, ok := evt.Properties["part"].(map[string]interface{}); ok {
-				pt, _ := part["type"].(string)
-				switch pt {
+				switch part["type"].(string) {
 				case "tool":
-					tool, _ = part["tool"].(string)
 					if st, ok := part["state"].(map[string]interface{}); ok {
-						state = event.ParseToolState(st)
+						status = event.ParseToolState(st)
 					}
 				case "reasoning":
-					state = "思考中"
+					status = "思考中"
 				}
 			}
-		case "permission.updated":
-			title, _ = evt.Properties["title"].(string)
 		case "session.error":
 			status = "错误"
 		}
 
-		if err := mqttClient.Publish(port, evtType, status, tool, state, title); err != nil {
-			fmt.Printf("MQTT 发送失败: %v\n", err)
+		if status != "" && msg != "" {
+			if err := mqttClient.PublishRaw(mqttClient.GetTopic(), msg); err != nil {
+				fmt.Printf("MQTT 发送失败: %v\n", err)
+			}
 		}
 	}
 }
@@ -367,8 +374,15 @@ func runDynamicMode(ctx context.Context, host string, scanRange *[2]int, interva
 		mu.Unlock()
 
 		go func() {
+			fmt.Printf("开始监控端口: %d\n", port)
 			m := monitor.New(host, port, callback)
 			m.Run(monitorCtx)
+			// 连接断开后清理记录,允许重新连接
+			mu.Lock()
+			delete(runningMonitors, port)
+			delete(monitoredPorts, port)
+			mu.Unlock()
+			fmt.Printf("端口 %d 监控已停止,等待重新连接\n", port)
 		}()
 	}
 

+ 14 - 0
docs/api.md

@@ -38,6 +38,8 @@ GET /api/mqtt
       "id": 1,
       "broker": "tcp://127.0.0.1:1883",
       "client_id": "opencode-monitor",
+      "username": "user",
+      "password": "pass",
       "topic": "opencode/status",
       "enabled": true
     }
@@ -56,6 +58,8 @@ POST /api/mqtt
 {
   "broker": "tcp://127.0.0.1:1883",
   "client_id": "opencode-monitor",
+  "username": "user",
+  "password": "pass",
   "topic": "opencode/status",
   "enabled": true
 }
@@ -65,6 +69,8 @@ POST /api/mqtt
 |------|------|------|--------|------|
 | broker | string | 是 | - | MQTT Broker 地址 |
 | client_id | string | 否 | opencode-monitor | 客户端 ID |
+| username | string | 否 | - | MQTT 用户名 |
+| password | string | 否 | - | MQTT 密码 |
 | topic | string | 否 | opencode/status | 主题前缀 |
 | enabled | boolean | 否 | true | 是否启用 |
 
@@ -77,6 +83,8 @@ POST /api/mqtt
     "id": 1,
     "broker": "tcp://127.0.0.1:1883",
     "client_id": "opencode-monitor",
+    "username": "user",
+    "password": "pass",
     "topic": "opencode/status",
     "enabled": true
   }
@@ -103,6 +111,8 @@ GET /api/mqtt/:id
     "id": 1,
     "broker": "tcp://127.0.0.1:1883",
     "client_id": "opencode-monitor",
+    "username": "user",
+    "password": "pass",
     "topic": "opencode/status",
     "enabled": true
   }
@@ -125,6 +135,8 @@ PUT /api/mqtt/:id
 {
   "broker": "tcp://192.168.1.100:1883",
   "client_id": "my-monitor",
+  "username": "user",
+  "password": "pass",
   "topic": "my/topic",
   "enabled": false
 }
@@ -139,6 +151,8 @@ PUT /api/mqtt/:id
     "id": 1,
     "broker": "tcp://192.168.1.100:1883",
     "client_id": "my-monitor",
+    "username": "user",
+    "password": "pass",
     "topic": "my/topic",
     "enabled": false
   }

+ 21 - 9
internal/database/database.go

@@ -18,6 +18,8 @@ type MQTTConfig struct {
 	ID       int    `json:"id"`
 	Broker   string `json:"broker"`
 	ClientID string `json:"client_id"`
+	Username string `json:"username,omitempty"`
+	Password string `json:"password,omitempty"`
 	Topic    string `json:"topic"`
 	Enabled  bool   `json:"enabled"`
 }
@@ -51,20 +53,30 @@ func (d *DB) init() error {
 		id INTEGER PRIMARY KEY AUTOINCREMENT,
 		broker TEXT NOT NULL,
 		client_id TEXT NOT NULL,
+		username TEXT DEFAULT '',
+		password TEXT DEFAULT '',
 		topic TEXT NOT NULL,
 		enabled BOOLEAN DEFAULT 1
 	);
 	`
 	_, err := d.conn.ExecContext(context.Background(), query)
-	return err
+	if err != nil {
+		return err
+	}
+
+	for _, col := range []string{"username", "password"} {
+		alterQuery := fmt.Sprintf("ALTER TABLE mqtt_config ADD COLUMN %s TEXT DEFAULT ''", col)
+		d.conn.ExecContext(context.Background(), alterQuery)
+	}
+	return nil
 }
 
 func (d *DB) GetMQTTConfig() (*MQTTConfig, error) {
-	query := "SELECT id, broker, client_id, topic, enabled FROM mqtt_config WHERE enabled = 1 LIMIT 1"
+	query := "SELECT id, broker, client_id, username, password, topic, enabled FROM mqtt_config WHERE enabled = 1 LIMIT 1"
 	row := d.conn.QueryRowContext(context.Background(), query)
 
 	var cfg MQTTConfig
-	err := row.Scan(&cfg.ID, &cfg.Broker, &cfg.ClientID, &cfg.Topic, &cfg.Enabled)
+	err := row.Scan(&cfg.ID, &cfg.Broker, &cfg.ClientID, &cfg.Username, &cfg.Password, &cfg.Topic, &cfg.Enabled)
 	if err == sql.ErrNoRows {
 		return nil, nil
 	}
@@ -76,12 +88,12 @@ func (d *DB) GetMQTTConfig() (*MQTTConfig, error) {
 
 func (d *DB) SaveMQTTConfig(cfg *MQTTConfig) error {
 	if cfg.ID == 0 {
-		query := "INSERT INTO mqtt_config (broker, client_id, topic, enabled) VALUES (?, ?, ?, ?)"
-		_, err := d.conn.ExecContext(context.Background(), query, cfg.Broker, cfg.ClientID, cfg.Topic, cfg.Enabled)
+		query := "INSERT INTO mqtt_config (broker, client_id, username, password, topic, enabled) VALUES (?, ?, ?, ?, ?, ?)"
+		_, err := d.conn.ExecContext(context.Background(), query, cfg.Broker, cfg.ClientID, cfg.Username, cfg.Password, cfg.Topic, cfg.Enabled)
 		return err
 	}
-	query := "UPDATE mqtt_config SET broker = ?, client_id = ?, topic = ?, enabled = ? WHERE id = ?"
-	_, err := d.conn.ExecContext(context.Background(), query, cfg.Broker, cfg.ClientID, cfg.Topic, cfg.Enabled, cfg.ID)
+	query := "UPDATE mqtt_config SET broker = ?, client_id = ?, username = ?, password = ?, topic = ?, enabled = ? WHERE id = ?"
+	_, err := d.conn.ExecContext(context.Background(), query, cfg.Broker, cfg.ClientID, cfg.Username, cfg.Password, cfg.Topic, cfg.Enabled, cfg.ID)
 	return err
 }
 
@@ -92,7 +104,7 @@ func (d *DB) DeleteMQTTConfig(id int) error {
 }
 
 func (d *DB) ListMQTTConfigs() ([]MQTTConfig, error) {
-	query := "SELECT id, broker, client_id, topic, enabled FROM mqtt_config ORDER BY id"
+	query := "SELECT id, broker, client_id, username, password, topic, enabled FROM mqtt_config ORDER BY id"
 	rows, err := d.conn.QueryContext(context.Background(), query)
 	if err != nil {
 		return nil, err
@@ -102,7 +114,7 @@ func (d *DB) ListMQTTConfigs() ([]MQTTConfig, error) {
 	var configs []MQTTConfig
 	for rows.Next() {
 		var cfg MQTTConfig
-		if err := rows.Scan(&cfg.ID, &cfg.Broker, &cfg.ClientID, &cfg.Topic, &cfg.Enabled); err != nil {
+		if err := rows.Scan(&cfg.ID, &cfg.Broker, &cfg.ClientID, &cfg.Username, &cfg.Password, &cfg.Topic, &cfg.Enabled); err != nil {
 			continue
 		}
 		configs = append(configs, cfg)

+ 0 - 1
internal/discovery/discovery_darwin.go

@@ -3,7 +3,6 @@
 package discovery
 
 import (
-	"fmt"
 	"os/exec"
 	"regexp"
 	"sort"

+ 39 - 8
internal/monitor/monitor.go

@@ -15,18 +15,22 @@ import (
 type EventCallback func(port int, evt *event.SSEEvent)
 
 type Monitor struct {
-	baseURL  string
-	port     int
-	client   *http.Client
-	callback EventCallback
+	baseURL      string
+	port         int
+	client       *http.Client
+	callback     EventCallback
+	lastActivity time.Time
+	timeout      time.Duration
 }
 
 func New(host string, port int, callback EventCallback) *Monitor {
 	return &Monitor{
-		baseURL:  fmt.Sprintf("http://%s:%d", host, port),
-		port:     port,
-		client:   &http.Client{Timeout: 2 * time.Second},
-		callback: callback,
+		baseURL:      fmt.Sprintf("http://%s:%d", host, port),
+		port:         port,
+		client:       &http.Client{Timeout: 2 * time.Second},
+		callback:     callback,
+		lastActivity: time.Now(),
+		timeout:      5 * time.Minute,
 	}
 }
 
@@ -52,10 +56,33 @@ func (m *Monitor) Run(ctx context.Context) {
 	}
 	defer resp.Body.Close()
 
+	m.lastActivity = time.Now()
+
+	// 启动超时检测 goroutine
+	timeoutChan := make(chan struct{})
+	go func() {
+		ticker := time.NewTicker(30 * time.Second)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case <-timeoutChan:
+				return
+			case <-ticker.C:
+				if time.Since(m.lastActivity) > m.timeout {
+					fmt.Printf("端口 %d 超时 %v,回收监控\n", m.port, m.timeout)
+					return
+				}
+			}
+		}
+	}()
+
 	scanner := bufio.NewScanner(resp.Body)
 	for scanner.Scan() {
 		select {
 		case <-ctx.Done():
+			close(timeoutChan)
 			return
 		default:
 		}
@@ -74,8 +101,12 @@ func (m *Monitor) Run(ctx context.Context) {
 			continue
 		}
 
+		m.lastActivity = time.Now()
+
 		if m.callback != nil {
 			m.callback(m.port, &evt)
 		}
 	}
+
+	close(timeoutChan)
 }

+ 28 - 6
internal/mqtt/mqtt.go

@@ -14,6 +14,8 @@ import (
 type Client struct {
 	broker   string
 	clientID string
+	username string
+	password string
 	topic    string
 	client   mqttlib.Client
 }
@@ -28,10 +30,12 @@ type StatusMessage struct {
 	Timestamp string `json:"timestamp"`
 }
 
-func New(broker, clientID, topic string) *Client {
+func New(broker, clientID, username, password, topic string) *Client {
 	return &Client{
 		broker:   broker,
 		clientID: clientID,
+		username: username,
+		password: password,
 		topic:    topic,
 	}
 }
@@ -40,6 +44,8 @@ func NewFromConfig(cfg *database.MQTTConfig) *Client {
 	return &Client{
 		broker:   cfg.Broker,
 		clientID: cfg.ClientID,
+		username: cfg.Username,
+		password: cfg.Password,
 		topic:    cfg.Topic,
 	}
 }
@@ -48,6 +54,12 @@ func (c *Client) Connect() error {
 	opts := mqttlib.NewClientOptions()
 	opts.AddBroker(c.broker)
 	opts.SetClientID(c.clientID)
+	if c.username != "" {
+		opts.SetUsername(c.username)
+	}
+	if c.password != "" {
+		opts.SetPassword(c.password)
+	}
 	opts.SetAutoReconnect(true)
 	opts.SetConnectTimeout(10 * time.Second)
 	opts.SetOnConnectHandler(func(client mqttlib.Client) {
@@ -82,22 +94,32 @@ func (c *Client) Publish(port int, eventType, status, tool, state, title string)
 		return fmt.Errorf("序列化消息失败: %w", err)
 	}
 
-	topic := fmt.Sprintf("%s/%d", c.topic, port)
-	token := c.client.Publish(topic, 0, false, payload)
+	token := c.client.Publish(c.topic, 0, false, payload)
 	token.Wait()
 	return token.Error()
 }
 
 func (c *Client) PublishRaw(topic string, payload interface{}) error {
-	data, err := json.Marshal(payload)
-	if err != nil {
-		return err
+	var data []byte
+	switch v := payload.(type) {
+	case string:
+		data = []byte(v)
+	default:
+		var err error
+		data, err = json.Marshal(v)
+		if err != nil {
+			return err
+		}
 	}
 	token := c.client.Publish(topic, 0, false, data)
 	token.Wait()
 	return token.Error()
 }
 
+func (c *Client) GetTopic() string {
+	return c.topic
+}
+
 func (c *Client) Disconnect() {
 	if c.client != nil && c.client.IsConnected() {
 		c.client.Disconnect(1000)