main.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "os"
  7. "os/signal"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "syscall"
  13. "time"
  14. "AI-Status-Light/internal/api"
  15. "AI-Status-Light/internal/database"
  16. "AI-Status-Light/internal/discovery"
  17. "AI-Status-Light/internal/event"
  18. "AI-Status-Light/internal/logger"
  19. "AI-Status-Light/internal/monitor"
  20. mqttcli "AI-Status-Light/internal/mqtt"
  21. )
  22. const defaultDBPath = "./data/config.db"
  23. var Version = "dev"
  24. func main() {
  25. if len(os.Args) < 2 {
  26. printUsage()
  27. return
  28. }
  29. switch os.Args[1] {
  30. case "monitor":
  31. runMonitor(os.Args[2:])
  32. case "config":
  33. runConfig(os.Args[2:])
  34. case "serve":
  35. runServe(os.Args[2:])
  36. case "version":
  37. fmt.Printf("opencode-monitor %s\n", Version)
  38. default:
  39. printUsage()
  40. }
  41. }
  42. func printUsage() {
  43. fmt.Printf("opencode-monitor %s\n\n", Version)
  44. fmt.Println("用法: opencode-monitor <命令> [选项]")
  45. fmt.Println("")
  46. fmt.Println("命令:")
  47. fmt.Println(" monitor 启动监控")
  48. fmt.Println(" config 管理 MQTT 配置")
  49. fmt.Println(" serve 启动 API 服务")
  50. fmt.Println(" version 显示版本信息")
  51. fmt.Println("")
  52. fmt.Println("运行 'opencode-monitor <命令> -h' 查看命令帮助")
  53. }
  54. func runServe(args []string) {
  55. fs := flag.NewFlagSet("serve", flag.ExitOnError)
  56. addr := fs.String("addr", ":8080", "监听地址")
  57. dbPath := fs.String("db", defaultDBPath, "数据库路径")
  58. logFile := fs.String("log-file", "./logs", "日志文件路径(默认 ./logs/monitor.log)")
  59. logLevel := fs.String("log-level", "info", "日志级别 (debug/info/warn/error)")
  60. tls := fs.Bool("tls", false, "启用 HTTPS (使用自签名证书)")
  61. tlsCert := fs.String("tls-cert", "./data/tls/cert.pem", "TLS 证书文件路径")
  62. tlsKey := fs.String("tls-key", "./data/tls/key.pem", "TLS 私钥文件路径")
  63. fs.Parse(args)
  64. logger.SetLevel(logger.ParseLevel(*logLevel))
  65. if err := logger.InitFileLog(*logFile); err != nil {
  66. fmt.Printf("初始化日志文件失败: %v\n", err)
  67. return
  68. }
  69. defer logger.Close()
  70. db, err := database.New(*dbPath)
  71. if err != nil {
  72. logger.Error("打开数据库失败: %v", err)
  73. fmt.Printf("打开数据库失败: %v\n", err)
  74. return
  75. }
  76. defer db.Close()
  77. logger.Info("数据库已连接: %s", *dbPath)
  78. server := api.New(db, *addr)
  79. if *tls {
  80. if err := api.EnsureSelfSignedCert(*tlsCert, *tlsKey); err != nil {
  81. logger.Error("生成自签名证书失败: %v", err)
  82. fmt.Printf("生成自签名证书失败: %v\n", err)
  83. return
  84. }
  85. server.EnableTLS(*tlsCert, *tlsKey)
  86. logger.Info("HTTPS 已启用")
  87. fmt.Println("HTTPS 已启用 (自签名证书)")
  88. } else {
  89. fmt.Printf("API 服务已启动: %s\n", *addr)
  90. }
  91. fmt.Println("接口文档:")
  92. fmt.Println(" GET /api/health - 健康检查")
  93. fmt.Println(" GET /api/mqtt - 获取所有配置")
  94. fmt.Println(" POST /api/mqtt - 创建配置")
  95. fmt.Println(" GET /api/mqtt/:id - 获取单个配置")
  96. fmt.Println(" PUT /api/mqtt/:id - 更新配置")
  97. fmt.Println(" DELETE /api/mqtt/:id - 删除配置")
  98. if err := server.Start(); err != nil {
  99. logger.Error("服务启动失败: %v", err)
  100. fmt.Printf("服务启动失败: %v\n", err)
  101. }
  102. }
  103. func runMonitor(args []string) {
  104. fs := flag.NewFlagSet("monitor", flag.ExitOnError)
  105. host := fs.String("host", "127.0.0.1", "主机地址")
  106. portsFlag := fs.String("ports", "", "端口列表,逗号分隔 (如: 4096,4097,4098)")
  107. scanFlag := fs.String("scan", "", "扫描端口范围 (如: 4096-4100)")
  108. intervalFlag := fs.Int("interval", 1, "动态扫描间隔(秒), 默认1")
  109. dbPath := fs.String("db", defaultDBPath, "数据库路径")
  110. apiAddr := fs.String("api-addr", "", "API 服务地址 (如: :8080)")
  111. tls := fs.Bool("tls", false, "启用 HTTPS (使用自签名证书)")
  112. tlsCert := fs.String("tls-cert", "./data/tls/cert.pem", "TLS 证书文件路径")
  113. tlsKey := fs.String("tls-key", "./data/tls/key.pem", "TLS 私钥文件路径")
  114. logFile := fs.String("log-file", "./logs", "日志文件路径(默认 ./logs/monitor.log)")
  115. logLevel := fs.String("log-level", "info", "日志级别 (debug/info/warn/error)")
  116. fs.Parse(args)
  117. logger.SetLevel(logger.ParseLevel(*logLevel))
  118. if err := logger.InitFileLog(*logFile); err != nil {
  119. fmt.Printf("初始化日志文件失败: %v\n", err)
  120. return
  121. }
  122. defer logger.Close()
  123. var scanRange *[2]int
  124. if *scanFlag != "" {
  125. parts := strings.Split(*scanFlag, "-")
  126. if len(parts) == 2 {
  127. start, err1 := strconv.Atoi(parts[0])
  128. end, err2 := strconv.Atoi(parts[1])
  129. if err1 == nil && err2 == nil {
  130. scanRange = &[2]int{start, end}
  131. }
  132. }
  133. }
  134. ctx, cancel := context.WithCancel(context.Background())
  135. defer cancel()
  136. sigChan := make(chan os.Signal, 1)
  137. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  138. db, err := database.New(*dbPath)
  139. if err != nil {
  140. logger.Error("打开数据库失败: %v", err)
  141. fmt.Printf("打开数据库失败: %v\n", err)
  142. return
  143. }
  144. defer db.Close()
  145. logger.Info("数据库已连接: %s", *dbPath)
  146. var mqttClient *mqttcli.Client
  147. cfg, err := db.GetMQTTConfig()
  148. if err != nil {
  149. logger.Error("读取 MQTT 配置失败: %v", err)
  150. fmt.Printf("读取 MQTT 配置失败: %v\n", err)
  151. } else if cfg != nil {
  152. mqttClient = mqttcli.NewFromConfig(cfg)
  153. if err := mqttClient.Connect(); err != nil {
  154. logger.Error("MQTT 连接失败: %v", err)
  155. fmt.Printf("MQTT 连接失败: %v\n", err)
  156. mqttClient = nil
  157. } else {
  158. defer mqttClient.Disconnect()
  159. logger.Info("MQTT 已连接: %s (主题: %s)", cfg.Broker, cfg.Topic)
  160. fmt.Printf("MQTT 已连接: %s (主题: %s)\n", cfg.Broker, cfg.Topic)
  161. }
  162. } else {
  163. logger.Info("未配置 MQTT,跳过 MQTT 连接")
  164. }
  165. var apiServer *api.Server
  166. if *apiAddr != "" {
  167. apiServer = api.New(db, *apiAddr)
  168. if *tls {
  169. if err := api.EnsureSelfSignedCert(*tlsCert, *tlsKey); err != nil {
  170. logger.Error("生成自签名证书失败: %v", err)
  171. fmt.Printf("生成自签名证书失败: %v\n", err)
  172. return
  173. }
  174. apiServer.EnableTLS(*tlsCert, *tlsKey)
  175. }
  176. go func() {
  177. scheme := "http"
  178. if *tls {
  179. scheme = "https"
  180. }
  181. logger.Info("API 服务已启动: %s://%s", scheme, *apiAddr)
  182. fmt.Printf("API 服务已启动: %s://%s\n", scheme, *apiAddr)
  183. if err := apiServer.Start(); err != nil {
  184. logger.Error("API 服务失败: %v", err)
  185. fmt.Printf("API 服务失败: %v\n", err)
  186. }
  187. }()
  188. }
  189. callback := createCallback(mqttClient, apiServer)
  190. if *portsFlag != "" {
  191. runFixedMode(ctx, *host, *portsFlag, callback, sigChan)
  192. cancel()
  193. return
  194. }
  195. if scanRange != nil {
  196. fmt.Printf("扫描端口范围 %d-%d...\n", scanRange[0], scanRange[1])
  197. } else {
  198. fmt.Println("查找 OpenCode 实例...")
  199. }
  200. runDynamicMode(ctx, *host, scanRange, *intervalFlag, callback, sigChan)
  201. cancel()
  202. }
  203. func runConfig(args []string) {
  204. if len(args) < 1 {
  205. printConfigUsage()
  206. return
  207. }
  208. dbPath := defaultDBPath
  209. logFile := "./logs"
  210. logLevel := "info"
  211. for i, arg := range args {
  212. if arg == "--db" && i+1 < len(args) {
  213. dbPath = args[i+1]
  214. }
  215. if arg == "--log-file" && i+1 < len(args) {
  216. logFile = args[i+1]
  217. }
  218. if arg == "--log-level" && i+1 < len(args) {
  219. logLevel = args[i+1]
  220. }
  221. }
  222. logger.SetLevel(logger.ParseLevel(logLevel))
  223. if err := logger.InitFileLog(logFile); err != nil {
  224. fmt.Printf("初始化日志文件失败: %v\n", err)
  225. return
  226. }
  227. defer logger.Close()
  228. // 过滤全局选项,避免传递给子命令的 FlagSet
  229. var filtered []string
  230. for i := 0; i < len(args); i++ {
  231. switch args[i] {
  232. case "--db", "--log-file", "--log-level":
  233. i++ // 跳过值
  234. default:
  235. filtered = append(filtered, args[i])
  236. }
  237. }
  238. args = filtered
  239. db, err := database.New(dbPath)
  240. if err != nil {
  241. logger.Error("打开数据库失败: %v", err)
  242. fmt.Printf("打开数据库失败: %v\n", err)
  243. return
  244. }
  245. defer db.Close()
  246. logger.Info("数据库已连接: %s", dbPath)
  247. switch args[0] {
  248. case "list":
  249. configs, err := db.ListMQTTConfigs()
  250. if err != nil {
  251. logger.Error("查询配置失败: %v", err)
  252. fmt.Printf("查询失败: %v\n", err)
  253. return
  254. }
  255. if len(configs) == 0 {
  256. fmt.Println("未配置 MQTT")
  257. return
  258. }
  259. logger.Info("查询到 %d 条 MQTT 配置", len(configs))
  260. for _, cfg := range configs {
  261. status := "禁用"
  262. if cfg.Enabled {
  263. status = "启用"
  264. }
  265. auth := ""
  266. if cfg.Username != "" {
  267. auth = fmt.Sprintf(" [认证: %s]", cfg.Username)
  268. }
  269. fmt.Printf("[%d] %s | %s | %s%s | %s\n", cfg.ID, cfg.Broker, cfg.ClientID, cfg.Topic, auth, status)
  270. }
  271. case "set":
  272. fs := flag.NewFlagSet("config set", flag.ExitOnError)
  273. broker := fs.String("broker", "", "MQTT Broker 地址 (如: tcp://127.0.0.1:1883)")
  274. clientID := fs.String("client-id", "opencode-monitor", "MQTT 客户端 ID")
  275. username := fs.String("username", "", "MQTT 用户名")
  276. password := fs.String("password", "", "MQTT 密码")
  277. topic := fs.String("topic", "opencode/status", "MQTT 主题")
  278. enabled := fs.Bool("enabled", true, "是否启用")
  279. fs.Parse(args[1:])
  280. if *broker == "" {
  281. fmt.Println("必须指定 --broker")
  282. return
  283. }
  284. cfg := &database.MQTTConfig{
  285. Broker: *broker,
  286. ClientID: *clientID,
  287. Username: *username,
  288. Password: *password,
  289. Topic: *topic,
  290. Enabled: *enabled,
  291. }
  292. if err := db.SaveMQTTConfig(cfg); err != nil {
  293. logger.Error("保存 MQTT 配置失败: %v", err)
  294. fmt.Printf("保存失败: %v\n", err)
  295. return
  296. }
  297. logger.Info("MQTT 配置已保存: %s (主题: %s)", cfg.Broker, cfg.Topic)
  298. fmt.Println("配置已保存")
  299. case "delete":
  300. if len(args) < 2 {
  301. fmt.Println("必须指定配置 ID")
  302. return
  303. }
  304. id, err := strconv.Atoi(args[1])
  305. if err != nil {
  306. logger.Warn("无效的配置 ID: %s", args[1])
  307. fmt.Println("无效的 ID")
  308. return
  309. }
  310. if err := db.DeleteMQTTConfig(id); err != nil {
  311. logger.Error("删除配置失败: id=%d, %v", id, err)
  312. fmt.Printf("删除失败: %v\n", err)
  313. return
  314. }
  315. logger.Info("MQTT 配置已删除: id=%d", id)
  316. fmt.Println("配置已删除")
  317. default:
  318. printConfigUsage()
  319. }
  320. }
  321. func printConfigUsage() {
  322. fmt.Println("用法: opencode-monitor config <子命令> [选项]")
  323. fmt.Println("")
  324. fmt.Println("子命令:")
  325. fmt.Println(" list 列出所有配置")
  326. fmt.Println(" set 设置 MQTT 配置")
  327. fmt.Println(" delete <id> 删除配置")
  328. fmt.Println("")
  329. fmt.Println("选项:")
  330. fmt.Println(" --broker MQTT Broker 地址")
  331. fmt.Println(" --client-id MQTT 客户端 ID")
  332. fmt.Println(" --username MQTT 用户名")
  333. fmt.Println(" --password MQTT 密码")
  334. fmt.Println(" --topic MQTT 主题")
  335. fmt.Println(" --enabled 是否启用 (true/false)")
  336. fmt.Println(" --db 数据库路径")
  337. fmt.Println(" --log-file 日志文件路径(默认 ./logs/monitor.log)")
  338. fmt.Println(" --log-level 日志级别 (debug/info/warn/error)")
  339. }
  340. func createCallback(mqttClient *mqttcli.Client, apiServer *api.Server) monitor.EventCallback {
  341. lastStatus := make(map[int]string)
  342. var mu sync.Mutex
  343. publish := func(port int, status string, code string) {
  344. if mqttClient != nil {
  345. payload := map[string]interface{}{
  346. "port": port,
  347. "status": status,
  348. "code": code,
  349. "timestamp": time.Now().Format(time.RFC3339),
  350. }
  351. if err := mqttClient.PublishRaw(mqttClient.GetTopic(), payload); err != nil {
  352. logger.Error("MQTT 发送失败: %v", err)
  353. fmt.Printf("MQTT 发送失败: %v\n", err)
  354. }
  355. }
  356. if apiServer != nil {
  357. apiServer.BroadcastStatus(port, status, code)
  358. }
  359. }
  360. return func(port int, evt *event.SSEEvent) {
  361. msg := event.FormatEvent(port, evt)
  362. if msg != "" {
  363. fmt.Println(msg)
  364. }
  365. if mqttClient == nil && apiServer == nil {
  366. return
  367. }
  368. var status, code string
  369. switch evt.Type {
  370. case "session.status":
  371. if s, ok := evt.Properties["status"].(map[string]interface{}); ok {
  372. if t, ok := s["type"].(string); ok {
  373. code = t
  374. }
  375. status = event.ParseStatus(s)
  376. }
  377. case "session.idle":
  378. status = "空闲"
  379. code = "idle"
  380. case "message.part.updated":
  381. if part, ok := evt.Properties["part"].(map[string]interface{}); ok {
  382. switch part["type"].(string) {
  383. case "tool":
  384. if st, ok := part["state"].(map[string]interface{}); ok {
  385. if s, ok := st["status"].(string); ok {
  386. code = s
  387. }
  388. status = event.ParseToolState(st)
  389. }
  390. case "reasoning":
  391. status = "思考中"
  392. code = "reasoning"
  393. default:
  394. status = "使用工具中"
  395. code = "using_tool"
  396. }
  397. }
  398. case "permission.updated":
  399. code = "permission"
  400. if title, ok := evt.Properties["title"].(string); ok && title != "" {
  401. status = "等待权限: " + title
  402. } else {
  403. status = "等待权限"
  404. }
  405. if t, ok := evt.Properties["type"].(string); ok {
  406. code = "permission:" + t
  407. }
  408. case "session.error":
  409. status = "错误"
  410. code = "error"
  411. }
  412. if status != "" {
  413. mu.Lock()
  414. prev := lastStatus[port]
  415. if status == "空闲" && prev != "" && prev != "空闲" {
  416. publish(port, "会话完成", "session_completed")
  417. }
  418. lastStatus[port] = status
  419. mu.Unlock()
  420. publish(port, status, code)
  421. }
  422. }
  423. }
  424. func runFixedMode(ctx context.Context, host string, portsFlag string, callback monitor.EventCallback, sigChan chan os.Signal) {
  425. var ports []int
  426. for _, p := range strings.Split(portsFlag, ",") {
  427. p = strings.TrimSpace(p)
  428. if port, err := strconv.Atoi(p); err == nil {
  429. ports = append(ports, port)
  430. }
  431. }
  432. if len(ports) == 0 {
  433. fmt.Println("未指定端口")
  434. return
  435. }
  436. logger.Info("固定模式启动,监控端口: %v", ports)
  437. fmt.Printf("监控端口: %v\n", ports)
  438. fmt.Println("Ctrl+C 停止")
  439. fmt.Println(strings.Repeat("-", 40))
  440. var wg sync.WaitGroup
  441. for _, port := range ports {
  442. wg.Add(1)
  443. go func(p int) {
  444. defer wg.Done()
  445. logger.Info("开始监控端口: %d", p)
  446. m := monitor.New(host, p, callback)
  447. m.Run(ctx)
  448. logger.Info("端口 %d 监控已停止", p)
  449. }(port)
  450. }
  451. <-sigChan
  452. logger.Info("收到停止信号,正在退出")
  453. fmt.Println("\n已停止")
  454. wg.Wait()
  455. logger.Info("所有监控协程已退出")
  456. }
  457. func runDynamicMode(ctx context.Context, host string, scanRange *[2]int, interval int, callback monitor.EventCallback, sigChan chan os.Signal) {
  458. scanner := discovery.NewScanner(host, scanRange)
  459. monitoredPorts := make(map[int]bool)
  460. runningMonitors := make(map[int]context.CancelFunc)
  461. var mu sync.Mutex
  462. startMonitor := func(port int) {
  463. mu.Lock()
  464. if _, exists := runningMonitors[port]; exists {
  465. mu.Unlock()
  466. return
  467. }
  468. monitorCtx, monitorCancel := context.WithCancel(ctx)
  469. runningMonitors[port] = monitorCancel
  470. mu.Unlock()
  471. go func() {
  472. logger.Info("开始监控端口: %d", port)
  473. fmt.Printf("开始监控端口: %d\n", port)
  474. m := monitor.New(host, port, callback)
  475. m.Run(monitorCtx)
  476. // 连接断开后清理记录,允许重新连接
  477. mu.Lock()
  478. delete(runningMonitors, port)
  479. delete(monitoredPorts, port)
  480. mu.Unlock()
  481. logger.Info("端口 %d 监控已停止,等待重新连接", port)
  482. fmt.Printf("端口 %d 监控已停止,等待重新连接\n", port)
  483. }()
  484. }
  485. initial := scanner.Discover()
  486. if len(initial) == 0 {
  487. logger.Info("未找到运行中的 OpenCode 实例,等待自动检测")
  488. fmt.Println("未找到运行中的 OpenCode 实例")
  489. fmt.Println("请先执行: opencode serve --port 4096")
  490. fmt.Println("启动后会自动检测,等待中...")
  491. }
  492. for _, port := range initial {
  493. monitoredPorts[port] = true
  494. startMonitor(port)
  495. }
  496. if len(monitoredPorts) > 0 {
  497. ports := make([]int, 0, len(monitoredPorts))
  498. for p := range monitoredPorts {
  499. ports = append(ports, p)
  500. }
  501. sort.Ints(ports)
  502. logger.Info("找到 %d 个实例: %v", len(monitoredPorts), ports)
  503. fmt.Printf("找到 %d 个实例: %v\n", len(monitoredPorts), ports)
  504. }
  505. logger.Info("动态模式启动,每 %d 秒扫描新实例", interval)
  506. fmt.Printf("每 %d 秒扫描新实例,Ctrl+C 停止\n", interval)
  507. fmt.Println(strings.Repeat("-", 40))
  508. scanTicker := time.NewTicker(time.Duration(interval) * time.Second)
  509. defer scanTicker.Stop()
  510. go func() {
  511. for {
  512. select {
  513. case <-ctx.Done():
  514. return
  515. case <-scanTicker.C:
  516. newPorts := scanner.Discover()
  517. if len(newPorts) > 0 {
  518. logger.Debug("扫描到 %d 个端口: %v", len(newPorts), newPorts)
  519. }
  520. for _, port := range newPorts {
  521. mu.Lock()
  522. alreadyMonitored := monitoredPorts[port]
  523. if !alreadyMonitored {
  524. monitoredPorts[port] = true
  525. }
  526. mu.Unlock()
  527. if !alreadyMonitored {
  528. logger.Info("发现新实例端口: %d,开始监控", port)
  529. startMonitor(port)
  530. }
  531. }
  532. }
  533. }
  534. }()
  535. <-sigChan
  536. logger.Info("收到停止信号,正在退出")
  537. fmt.Println("\n已停止")
  538. }