main.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "os"
  7. "os/signal"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "syscall"
  13. "time"
  14. "AI-Status-Light/internal/api"
  15. "AI-Status-Light/internal/database"
  16. "AI-Status-Light/internal/discovery"
  17. "AI-Status-Light/internal/event"
  18. "AI-Status-Light/internal/logger"
  19. "AI-Status-Light/internal/monitor"
  20. mqttcli "AI-Status-Light/internal/mqtt"
  21. )
  22. const defaultDBPath = "./data/config.db"
  23. var Version = "dev"
  24. func main() {
  25. if len(os.Args) < 2 {
  26. printUsage()
  27. return
  28. }
  29. switch os.Args[1] {
  30. case "monitor":
  31. runMonitor(os.Args[2:])
  32. case "config":
  33. runConfig(os.Args[2:])
  34. case "serve":
  35. runServe(os.Args[2:])
  36. case "version":
  37. fmt.Printf("opencode-monitor %s\n", Version)
  38. default:
  39. printUsage()
  40. }
  41. }
  42. func printUsage() {
  43. fmt.Printf("opencode-monitor %s\n\n", Version)
  44. fmt.Println("用法: opencode-monitor <命令> [选项]")
  45. fmt.Println("")
  46. fmt.Println("命令:")
  47. fmt.Println(" monitor 启动监控")
  48. fmt.Println(" config 管理 MQTT 配置")
  49. fmt.Println(" serve 启动 API 服务")
  50. fmt.Println(" version 显示版本信息")
  51. fmt.Println("")
  52. fmt.Println("运行 'opencode-monitor <命令> -h' 查看命令帮助")
  53. }
  54. func runServe(args []string) {
  55. fs := flag.NewFlagSet("serve", flag.ExitOnError)
  56. addr := fs.String("addr", ":8080", "监听地址")
  57. dbPath := fs.String("db", defaultDBPath, "数据库路径")
  58. logFile := fs.String("log-file", "./logs", "日志文件路径(默认 ./logs/monitor.log)")
  59. logLevel := fs.String("log-level", "info", "日志级别 (debug/info/warn/error)")
  60. fs.Parse(args)
  61. logger.SetLevel(logger.ParseLevel(*logLevel))
  62. if err := logger.InitFileLog(*logFile); err != nil {
  63. fmt.Printf("初始化日志文件失败: %v\n", err)
  64. return
  65. }
  66. defer logger.Close()
  67. db, err := database.New(*dbPath)
  68. if err != nil {
  69. logger.Error("打开数据库失败: %v", err)
  70. fmt.Printf("打开数据库失败: %v\n", err)
  71. return
  72. }
  73. defer db.Close()
  74. logger.Info("数据库已连接: %s", *dbPath)
  75. server := api.New(db, *addr)
  76. logger.Info("API 服务已启动: %s", *addr)
  77. fmt.Printf("API 服务已启动: %s\n", *addr)
  78. fmt.Println("接口文档:")
  79. fmt.Println(" GET /api/health - 健康检查")
  80. fmt.Println(" GET /api/mqtt - 获取所有配置")
  81. fmt.Println(" POST /api/mqtt - 创建配置")
  82. fmt.Println(" GET /api/mqtt/:id - 获取单个配置")
  83. fmt.Println(" PUT /api/mqtt/:id - 更新配置")
  84. fmt.Println(" DELETE /api/mqtt/:id - 删除配置")
  85. if err := server.Start(); err != nil {
  86. logger.Error("服务启动失败: %v", err)
  87. fmt.Printf("服务启动失败: %v\n", err)
  88. }
  89. }
  90. func runMonitor(args []string) {
  91. fs := flag.NewFlagSet("monitor", flag.ExitOnError)
  92. host := fs.String("host", "127.0.0.1", "主机地址")
  93. portsFlag := fs.String("ports", "", "端口列表,逗号分隔 (如: 4096,4097,4098)")
  94. scanFlag := fs.String("scan", "", "扫描端口范围 (如: 4096-4100)")
  95. intervalFlag := fs.Int("interval", 1, "动态扫描间隔(秒), 默认1")
  96. dbPath := fs.String("db", defaultDBPath, "数据库路径")
  97. apiAddr := fs.String("api-addr", "", "API 服务地址 (如: :8080)")
  98. logFile := fs.String("log-file", "./logs", "日志文件路径(默认 ./logs/monitor.log)")
  99. logLevel := fs.String("log-level", "info", "日志级别 (debug/info/warn/error)")
  100. fs.Parse(args)
  101. logger.SetLevel(logger.ParseLevel(*logLevel))
  102. if err := logger.InitFileLog(*logFile); err != nil {
  103. fmt.Printf("初始化日志文件失败: %v\n", err)
  104. return
  105. }
  106. defer logger.Close()
  107. var scanRange *[2]int
  108. if *scanFlag != "" {
  109. parts := strings.Split(*scanFlag, "-")
  110. if len(parts) == 2 {
  111. start, err1 := strconv.Atoi(parts[0])
  112. end, err2 := strconv.Atoi(parts[1])
  113. if err1 == nil && err2 == nil {
  114. scanRange = &[2]int{start, end}
  115. }
  116. }
  117. }
  118. ctx, cancel := context.WithCancel(context.Background())
  119. defer cancel()
  120. sigChan := make(chan os.Signal, 1)
  121. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  122. db, err := database.New(*dbPath)
  123. if err != nil {
  124. logger.Error("打开数据库失败: %v", err)
  125. fmt.Printf("打开数据库失败: %v\n", err)
  126. return
  127. }
  128. defer db.Close()
  129. logger.Info("数据库已连接: %s", *dbPath)
  130. var mqttClient *mqttcli.Client
  131. cfg, err := db.GetMQTTConfig()
  132. if err != nil {
  133. logger.Error("读取 MQTT 配置失败: %v", err)
  134. fmt.Printf("读取 MQTT 配置失败: %v\n", err)
  135. } else if cfg != nil {
  136. mqttClient = mqttcli.NewFromConfig(cfg)
  137. if err := mqttClient.Connect(); err != nil {
  138. logger.Error("MQTT 连接失败: %v", err)
  139. fmt.Printf("MQTT 连接失败: %v\n", err)
  140. mqttClient = nil
  141. } else {
  142. defer mqttClient.Disconnect()
  143. logger.Info("MQTT 已连接: %s (主题: %s)", cfg.Broker, cfg.Topic)
  144. fmt.Printf("MQTT 已连接: %s (主题: %s)\n", cfg.Broker, cfg.Topic)
  145. }
  146. } else {
  147. logger.Info("未配置 MQTT,跳过 MQTT 连接")
  148. }
  149. var apiServer *api.Server
  150. if *apiAddr != "" {
  151. apiServer = api.New(db, *apiAddr)
  152. go func() {
  153. logger.Info("API 服务已启动: %s", *apiAddr)
  154. fmt.Printf("API 服务已启动: %s\n", *apiAddr)
  155. if err := apiServer.Start(); err != nil {
  156. logger.Error("API 服务失败: %v", err)
  157. fmt.Printf("API 服务失败: %v\n", err)
  158. }
  159. }()
  160. }
  161. callback := createCallback(mqttClient, apiServer)
  162. if *portsFlag != "" {
  163. runFixedMode(ctx, *host, *portsFlag, callback, sigChan)
  164. cancel()
  165. return
  166. }
  167. if scanRange != nil {
  168. fmt.Printf("扫描端口范围 %d-%d...\n", scanRange[0], scanRange[1])
  169. } else {
  170. fmt.Println("查找 OpenCode 实例...")
  171. }
  172. runDynamicMode(ctx, *host, scanRange, *intervalFlag, callback, sigChan)
  173. cancel()
  174. }
  175. func runConfig(args []string) {
  176. if len(args) < 1 {
  177. printConfigUsage()
  178. return
  179. }
  180. dbPath := defaultDBPath
  181. logFile := "./logs"
  182. logLevel := "info"
  183. for i, arg := range args {
  184. if arg == "--db" && i+1 < len(args) {
  185. dbPath = args[i+1]
  186. }
  187. if arg == "--log-file" && i+1 < len(args) {
  188. logFile = args[i+1]
  189. }
  190. if arg == "--log-level" && i+1 < len(args) {
  191. logLevel = args[i+1]
  192. }
  193. }
  194. logger.SetLevel(logger.ParseLevel(logLevel))
  195. if err := logger.InitFileLog(logFile); err != nil {
  196. fmt.Printf("初始化日志文件失败: %v\n", err)
  197. return
  198. }
  199. defer logger.Close()
  200. // 过滤全局选项,避免传递给子命令的 FlagSet
  201. var filtered []string
  202. for i := 0; i < len(args); i++ {
  203. switch args[i] {
  204. case "--db", "--log-file", "--log-level":
  205. i++ // 跳过值
  206. default:
  207. filtered = append(filtered, args[i])
  208. }
  209. }
  210. args = filtered
  211. db, err := database.New(dbPath)
  212. if err != nil {
  213. logger.Error("打开数据库失败: %v", err)
  214. fmt.Printf("打开数据库失败: %v\n", err)
  215. return
  216. }
  217. defer db.Close()
  218. logger.Info("数据库已连接: %s", dbPath)
  219. switch args[0] {
  220. case "list":
  221. configs, err := db.ListMQTTConfigs()
  222. if err != nil {
  223. logger.Error("查询配置失败: %v", err)
  224. fmt.Printf("查询失败: %v\n", err)
  225. return
  226. }
  227. if len(configs) == 0 {
  228. fmt.Println("未配置 MQTT")
  229. return
  230. }
  231. logger.Info("查询到 %d 条 MQTT 配置", len(configs))
  232. for _, cfg := range configs {
  233. status := "禁用"
  234. if cfg.Enabled {
  235. status = "启用"
  236. }
  237. auth := ""
  238. if cfg.Username != "" {
  239. auth = fmt.Sprintf(" [认证: %s]", cfg.Username)
  240. }
  241. fmt.Printf("[%d] %s | %s | %s%s | %s\n", cfg.ID, cfg.Broker, cfg.ClientID, cfg.Topic, auth, status)
  242. }
  243. case "set":
  244. fs := flag.NewFlagSet("config set", flag.ExitOnError)
  245. broker := fs.String("broker", "", "MQTT Broker 地址 (如: tcp://127.0.0.1:1883)")
  246. clientID := fs.String("client-id", "opencode-monitor", "MQTT 客户端 ID")
  247. username := fs.String("username", "", "MQTT 用户名")
  248. password := fs.String("password", "", "MQTT 密码")
  249. topic := fs.String("topic", "opencode/status", "MQTT 主题")
  250. enabled := fs.Bool("enabled", true, "是否启用")
  251. fs.Parse(args[1:])
  252. if *broker == "" {
  253. fmt.Println("必须指定 --broker")
  254. return
  255. }
  256. cfg := &database.MQTTConfig{
  257. Broker: *broker,
  258. ClientID: *clientID,
  259. Username: *username,
  260. Password: *password,
  261. Topic: *topic,
  262. Enabled: *enabled,
  263. }
  264. if err := db.SaveMQTTConfig(cfg); err != nil {
  265. logger.Error("保存 MQTT 配置失败: %v", err)
  266. fmt.Printf("保存失败: %v\n", err)
  267. return
  268. }
  269. logger.Info("MQTT 配置已保存: %s (主题: %s)", cfg.Broker, cfg.Topic)
  270. fmt.Println("配置已保存")
  271. case "delete":
  272. if len(args) < 2 {
  273. fmt.Println("必须指定配置 ID")
  274. return
  275. }
  276. id, err := strconv.Atoi(args[1])
  277. if err != nil {
  278. logger.Warn("无效的配置 ID: %s", args[1])
  279. fmt.Println("无效的 ID")
  280. return
  281. }
  282. if err := db.DeleteMQTTConfig(id); err != nil {
  283. logger.Error("删除配置失败: id=%d, %v", id, err)
  284. fmt.Printf("删除失败: %v\n", err)
  285. return
  286. }
  287. logger.Info("MQTT 配置已删除: id=%d", id)
  288. fmt.Println("配置已删除")
  289. default:
  290. printConfigUsage()
  291. }
  292. }
  293. func printConfigUsage() {
  294. fmt.Println("用法: opencode-monitor config <子命令> [选项]")
  295. fmt.Println("")
  296. fmt.Println("子命令:")
  297. fmt.Println(" list 列出所有配置")
  298. fmt.Println(" set 设置 MQTT 配置")
  299. fmt.Println(" delete <id> 删除配置")
  300. fmt.Println("")
  301. fmt.Println("选项:")
  302. fmt.Println(" --broker MQTT Broker 地址")
  303. fmt.Println(" --client-id MQTT 客户端 ID")
  304. fmt.Println(" --username MQTT 用户名")
  305. fmt.Println(" --password MQTT 密码")
  306. fmt.Println(" --topic MQTT 主题")
  307. fmt.Println(" --enabled 是否启用 (true/false)")
  308. fmt.Println(" --db 数据库路径")
  309. fmt.Println(" --log-file 日志文件路径(默认 ./logs/monitor.log)")
  310. fmt.Println(" --log-level 日志级别 (debug/info/warn/error)")
  311. }
  312. func createCallback(mqttClient *mqttcli.Client, apiServer *api.Server) monitor.EventCallback {
  313. lastStatus := make(map[int]string)
  314. var mu sync.Mutex
  315. publish := func(port int, status string, code string) {
  316. if mqttClient != nil {
  317. payload := map[string]interface{}{
  318. "port": port,
  319. "status": status,
  320. "code": code,
  321. "timestamp": time.Now().Format(time.RFC3339),
  322. }
  323. if err := mqttClient.PublishRaw(mqttClient.GetTopic(), payload); err != nil {
  324. logger.Error("MQTT 发送失败: %v", err)
  325. fmt.Printf("MQTT 发送失败: %v\n", err)
  326. }
  327. }
  328. if apiServer != nil {
  329. apiServer.BroadcastStatus(port, status, code)
  330. }
  331. }
  332. return func(port int, evt *event.SSEEvent) {
  333. msg := event.FormatEvent(port, evt)
  334. if msg != "" {
  335. fmt.Println(msg)
  336. }
  337. if mqttClient == nil && apiServer == nil {
  338. return
  339. }
  340. var status, code string
  341. switch evt.Type {
  342. case "session.status":
  343. if s, ok := evt.Properties["status"].(map[string]interface{}); ok {
  344. if t, ok := s["type"].(string); ok {
  345. code = t
  346. }
  347. status = event.ParseStatus(s)
  348. }
  349. case "session.idle":
  350. status = "空闲"
  351. code = "idle"
  352. case "message.part.updated":
  353. if part, ok := evt.Properties["part"].(map[string]interface{}); ok {
  354. switch part["type"].(string) {
  355. case "tool":
  356. if st, ok := part["state"].(map[string]interface{}); ok {
  357. if s, ok := st["status"].(string); ok {
  358. code = s
  359. }
  360. status = event.ParseToolState(st)
  361. }
  362. case "reasoning":
  363. status = "思考中"
  364. code = "reasoning"
  365. default:
  366. status = "使用工具中"
  367. code = "using_tool"
  368. }
  369. }
  370. case "permission.updated":
  371. status = "等待权限"
  372. code = "permission"
  373. case "session.error":
  374. status = "错误"
  375. code = "error"
  376. }
  377. if status != "" {
  378. mu.Lock()
  379. prev := lastStatus[port]
  380. if status == "空闲" && prev != "" && prev != "空闲" {
  381. publish(port, "会话完成", "session_completed")
  382. }
  383. lastStatus[port] = status
  384. mu.Unlock()
  385. publish(port, status, code)
  386. }
  387. }
  388. }
  389. func runFixedMode(ctx context.Context, host string, portsFlag string, callback monitor.EventCallback, sigChan chan os.Signal) {
  390. var ports []int
  391. for _, p := range strings.Split(portsFlag, ",") {
  392. p = strings.TrimSpace(p)
  393. if port, err := strconv.Atoi(p); err == nil {
  394. ports = append(ports, port)
  395. }
  396. }
  397. if len(ports) == 0 {
  398. fmt.Println("未指定端口")
  399. return
  400. }
  401. logger.Info("固定模式启动,监控端口: %v", ports)
  402. fmt.Printf("监控端口: %v\n", ports)
  403. fmt.Println("Ctrl+C 停止")
  404. fmt.Println(strings.Repeat("-", 40))
  405. var wg sync.WaitGroup
  406. for _, port := range ports {
  407. wg.Add(1)
  408. go func(p int) {
  409. defer wg.Done()
  410. logger.Info("开始监控端口: %d", p)
  411. m := monitor.New(host, p, callback)
  412. m.Run(ctx)
  413. logger.Info("端口 %d 监控已停止", p)
  414. }(port)
  415. }
  416. <-sigChan
  417. logger.Info("收到停止信号,正在退出")
  418. fmt.Println("\n已停止")
  419. wg.Wait()
  420. logger.Info("所有监控协程已退出")
  421. }
  422. func runDynamicMode(ctx context.Context, host string, scanRange *[2]int, interval int, callback monitor.EventCallback, sigChan chan os.Signal) {
  423. scanner := discovery.NewScanner(host, scanRange)
  424. monitoredPorts := make(map[int]bool)
  425. runningMonitors := make(map[int]context.CancelFunc)
  426. var mu sync.Mutex
  427. startMonitor := func(port int) {
  428. mu.Lock()
  429. if _, exists := runningMonitors[port]; exists {
  430. mu.Unlock()
  431. return
  432. }
  433. monitorCtx, monitorCancel := context.WithCancel(ctx)
  434. runningMonitors[port] = monitorCancel
  435. mu.Unlock()
  436. go func() {
  437. logger.Info("开始监控端口: %d", port)
  438. fmt.Printf("开始监控端口: %d\n", port)
  439. m := monitor.New(host, port, callback)
  440. m.Run(monitorCtx)
  441. // 连接断开后清理记录,允许重新连接
  442. mu.Lock()
  443. delete(runningMonitors, port)
  444. delete(monitoredPorts, port)
  445. mu.Unlock()
  446. logger.Info("端口 %d 监控已停止,等待重新连接", port)
  447. fmt.Printf("端口 %d 监控已停止,等待重新连接\n", port)
  448. }()
  449. }
  450. initial := scanner.Discover()
  451. if len(initial) == 0 {
  452. logger.Info("未找到运行中的 OpenCode 实例,等待自动检测")
  453. fmt.Println("未找到运行中的 OpenCode 实例")
  454. fmt.Println("请先执行: opencode serve --port 4096")
  455. fmt.Println("启动后会自动检测,等待中...")
  456. }
  457. for _, port := range initial {
  458. monitoredPorts[port] = true
  459. startMonitor(port)
  460. }
  461. if len(monitoredPorts) > 0 {
  462. ports := make([]int, 0, len(monitoredPorts))
  463. for p := range monitoredPorts {
  464. ports = append(ports, p)
  465. }
  466. sort.Ints(ports)
  467. logger.Info("找到 %d 个实例: %v", len(monitoredPorts), ports)
  468. fmt.Printf("找到 %d 个实例: %v\n", len(monitoredPorts), ports)
  469. }
  470. logger.Info("动态模式启动,每 %d 秒扫描新实例", interval)
  471. fmt.Printf("每 %d 秒扫描新实例,Ctrl+C 停止\n", interval)
  472. fmt.Println(strings.Repeat("-", 40))
  473. scanTicker := time.NewTicker(time.Duration(interval) * time.Second)
  474. defer scanTicker.Stop()
  475. go func() {
  476. for {
  477. select {
  478. case <-ctx.Done():
  479. return
  480. case <-scanTicker.C:
  481. newPorts := scanner.Discover()
  482. if len(newPorts) > 0 {
  483. logger.Debug("扫描到 %d 个端口: %v", len(newPorts), newPorts)
  484. }
  485. for _, port := range newPorts {
  486. mu.Lock()
  487. alreadyMonitored := monitoredPorts[port]
  488. if !alreadyMonitored {
  489. monitoredPorts[port] = true
  490. }
  491. mu.Unlock()
  492. if !alreadyMonitored {
  493. logger.Info("发现新实例端口: %d,开始监控", port)
  494. startMonitor(port)
  495. }
  496. }
  497. }
  498. }
  499. }()
  500. <-sigChan
  501. logger.Info("收到停止信号,正在退出")
  502. fmt.Println("\n已停止")
  503. }