main.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "os"
  7. "os/signal"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "syscall"
  13. "time"
  14. "AI-Status-Light/internal/api"
  15. "AI-Status-Light/internal/database"
  16. "AI-Status-Light/internal/discovery"
  17. "AI-Status-Light/internal/event"
  18. "AI-Status-Light/internal/monitor"
  19. mqttcli "AI-Status-Light/internal/mqtt"
  20. )
  21. const defaultDBPath = "./data/config.db"
  22. var Version = "dev"
  23. func main() {
  24. if len(os.Args) < 2 {
  25. printUsage()
  26. return
  27. }
  28. switch os.Args[1] {
  29. case "monitor":
  30. runMonitor(os.Args[2:])
  31. case "config":
  32. runConfig(os.Args[2:])
  33. case "serve":
  34. runServe(os.Args[2:])
  35. case "version":
  36. fmt.Printf("opencode-monitor %s\n", Version)
  37. default:
  38. printUsage()
  39. }
  40. }
  41. func printUsage() {
  42. fmt.Printf("opencode-monitor %s\n\n", Version)
  43. fmt.Println("用法: opencode-monitor <命令> [选项]")
  44. fmt.Println("")
  45. fmt.Println("命令:")
  46. fmt.Println(" monitor 启动监控")
  47. fmt.Println(" config 管理 MQTT 配置")
  48. fmt.Println(" serve 启动 API 服务")
  49. fmt.Println(" version 显示版本信息")
  50. fmt.Println("")
  51. fmt.Println("运行 'opencode-monitor <命令> -h' 查看命令帮助")
  52. }
  53. func runServe(args []string) {
  54. fs := flag.NewFlagSet("serve", flag.ExitOnError)
  55. addr := fs.String("addr", ":8080", "监听地址")
  56. dbPath := fs.String("db", defaultDBPath, "数据库路径")
  57. fs.Parse(args)
  58. db, err := database.New(*dbPath)
  59. if err != nil {
  60. fmt.Printf("打开数据库失败: %v\n", err)
  61. return
  62. }
  63. defer db.Close()
  64. server := api.New(db, *addr)
  65. fmt.Printf("API 服务已启动: %s\n", *addr)
  66. fmt.Println("接口文档:")
  67. fmt.Println(" GET /api/health - 健康检查")
  68. fmt.Println(" GET /api/mqtt - 获取所有配置")
  69. fmt.Println(" POST /api/mqtt - 创建配置")
  70. fmt.Println(" GET /api/mqtt/:id - 获取单个配置")
  71. fmt.Println(" PUT /api/mqtt/:id - 更新配置")
  72. fmt.Println(" DELETE /api/mqtt/:id - 删除配置")
  73. if err := server.Start(); err != nil {
  74. fmt.Printf("服务启动失败: %v\n", err)
  75. }
  76. }
  77. func runMonitor(args []string) {
  78. fs := flag.NewFlagSet("monitor", flag.ExitOnError)
  79. host := fs.String("host", "127.0.0.1", "主机地址")
  80. portsFlag := fs.String("ports", "", "端口列表,逗号分隔 (如: 4096,4097,4098)")
  81. scanFlag := fs.String("scan", "", "扫描端口范围 (如: 4096-4100)")
  82. pidFlag := fs.Bool("pid", false, "通过进程号查找端口")
  83. intervalFlag := fs.Int("interval", 5, "动态扫描间隔(秒), 默认5")
  84. dbPath := fs.String("db", defaultDBPath, "数据库路径")
  85. apiAddr := fs.String("api-addr", "", "API 服务地址 (如: :8080)")
  86. fs.Parse(args)
  87. var scanRange *[2]int
  88. if *scanFlag != "" {
  89. parts := strings.Split(*scanFlag, "-")
  90. if len(parts) == 2 {
  91. start, err1 := strconv.Atoi(parts[0])
  92. end, err2 := strconv.Atoi(parts[1])
  93. if err1 == nil && err2 == nil {
  94. scanRange = &[2]int{start, end}
  95. }
  96. }
  97. }
  98. ctx, cancel := context.WithCancel(context.Background())
  99. defer cancel()
  100. sigChan := make(chan os.Signal, 1)
  101. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  102. db, err := database.New(*dbPath)
  103. if err != nil {
  104. fmt.Printf("打开数据库失败: %v\n", err)
  105. return
  106. }
  107. defer db.Close()
  108. if *apiAddr != "" {
  109. apiServer := api.New(db, *apiAddr)
  110. go func() {
  111. fmt.Printf("API 服务已启动: %s\n", *apiAddr)
  112. if err := apiServer.Start(); err != nil {
  113. fmt.Printf("API 服务失败: %v\n", err)
  114. }
  115. }()
  116. }
  117. var mqttClient *mqttcli.Client
  118. cfg, err := db.GetMQTTConfig()
  119. if err != nil {
  120. fmt.Printf("读取 MQTT 配置失败: %v\n", err)
  121. } else if cfg != nil {
  122. mqttClient = mqttcli.NewFromConfig(cfg)
  123. if err := mqttClient.Connect(); err != nil {
  124. fmt.Printf("MQTT 连接失败: %v\n", err)
  125. mqttClient = nil
  126. } else {
  127. defer mqttClient.Disconnect()
  128. fmt.Printf("MQTT 已连接: %s (主题: %s)\n", cfg.Broker, cfg.Topic)
  129. }
  130. }
  131. callback := createCallback(mqttClient)
  132. if *portsFlag != "" {
  133. runFixedMode(ctx, *host, *portsFlag, callback, sigChan)
  134. cancel()
  135. return
  136. }
  137. if scanRange != nil {
  138. fmt.Printf("扫描端口范围 %d-%d...\n", scanRange[0], scanRange[1])
  139. } else if *pidFlag {
  140. fmt.Println("通过进程号查找 OpenCode 端口...")
  141. } else {
  142. fmt.Println("查找 OpenCode 实例...")
  143. }
  144. runDynamicMode(ctx, *host, scanRange, *intervalFlag, callback, sigChan)
  145. cancel()
  146. }
  147. func runConfig(args []string) {
  148. if len(args) < 1 {
  149. printConfigUsage()
  150. return
  151. }
  152. dbPath := defaultDBPath
  153. for i, arg := range args {
  154. if arg == "--db" && i+1 < len(args) {
  155. dbPath = args[i+1]
  156. break
  157. }
  158. }
  159. db, err := database.New(dbPath)
  160. if err != nil {
  161. fmt.Printf("打开数据库失败: %v\n", err)
  162. return
  163. }
  164. defer db.Close()
  165. switch args[0] {
  166. case "list":
  167. configs, err := db.ListMQTTConfigs()
  168. if err != nil {
  169. fmt.Printf("查询失败: %v\n", err)
  170. return
  171. }
  172. if len(configs) == 0 {
  173. fmt.Println("未配置 MQTT")
  174. return
  175. }
  176. for _, cfg := range configs {
  177. status := "禁用"
  178. if cfg.Enabled {
  179. status = "启用"
  180. }
  181. fmt.Printf("[%d] %s | %s | %s | %s\n", cfg.ID, cfg.Broker, cfg.ClientID, cfg.Topic, status)
  182. }
  183. case "set":
  184. fs := flag.NewFlagSet("config set", flag.ExitOnError)
  185. broker := fs.String("broker", "", "MQTT Broker 地址 (如: tcp://127.0.0.1:1883)")
  186. clientID := fs.String("client-id", "opencode-monitor", "MQTT 客户端 ID")
  187. topic := fs.String("topic", "opencode/status", "MQTT 主题")
  188. enabled := fs.Bool("enabled", true, "是否启用")
  189. fs.Parse(args[1:])
  190. if *broker == "" {
  191. fmt.Println("必须指定 --broker")
  192. return
  193. }
  194. cfg := &database.MQTTConfig{
  195. Broker: *broker,
  196. ClientID: *clientID,
  197. Topic: *topic,
  198. Enabled: *enabled,
  199. }
  200. if err := db.SaveMQTTConfig(cfg); err != nil {
  201. fmt.Printf("保存失败: %v\n", err)
  202. return
  203. }
  204. fmt.Println("配置已保存")
  205. case "delete":
  206. if len(args) < 2 {
  207. fmt.Println("必须指定配置 ID")
  208. return
  209. }
  210. id, err := strconv.Atoi(args[1])
  211. if err != nil {
  212. fmt.Println("无效的 ID")
  213. return
  214. }
  215. if err := db.DeleteMQTTConfig(id); err != nil {
  216. fmt.Printf("删除失败: %v\n", err)
  217. return
  218. }
  219. fmt.Println("配置已删除")
  220. default:
  221. printConfigUsage()
  222. }
  223. }
  224. func printConfigUsage() {
  225. fmt.Println("用法: opencode-monitor config <子命令> [选项]")
  226. fmt.Println("")
  227. fmt.Println("子命令:")
  228. fmt.Println(" list 列出所有配置")
  229. fmt.Println(" set 设置 MQTT 配置")
  230. fmt.Println(" delete <id> 删除配置")
  231. fmt.Println("")
  232. fmt.Println("选项:")
  233. fmt.Println(" --broker MQTT Broker 地址")
  234. fmt.Println(" --client-id MQTT 客户端 ID")
  235. fmt.Println(" --topic MQTT 主题")
  236. fmt.Println(" --enabled 是否启用 (true/false)")
  237. fmt.Println(" --db 数据库路径")
  238. }
  239. func createCallback(mqttClient *mqttcli.Client) monitor.EventCallback {
  240. return func(port int, evt *event.SSEEvent) {
  241. msg := event.FormatEvent(port, evt)
  242. if msg != "" {
  243. fmt.Println(msg)
  244. }
  245. if mqttClient == nil {
  246. return
  247. }
  248. var evtType, status, tool, state, title string
  249. evtType = evt.Type
  250. switch evt.Type {
  251. case "session.status":
  252. if s, ok := evt.Properties["status"].(map[string]interface{}); ok {
  253. status = event.ParseStatus(s)
  254. }
  255. case "session.idle":
  256. status = "空闲"
  257. case "message.part.updated":
  258. if part, ok := evt.Properties["part"].(map[string]interface{}); ok {
  259. pt, _ := part["type"].(string)
  260. switch pt {
  261. case "tool":
  262. tool, _ = part["tool"].(string)
  263. if st, ok := part["state"].(map[string]interface{}); ok {
  264. state = event.ParseToolState(st)
  265. }
  266. case "reasoning":
  267. state = "思考中"
  268. }
  269. }
  270. case "permission.updated":
  271. title, _ = evt.Properties["title"].(string)
  272. case "session.error":
  273. status = "错误"
  274. }
  275. if err := mqttClient.Publish(port, evtType, status, tool, state, title); err != nil {
  276. fmt.Printf("MQTT 发送失败: %v\n", err)
  277. }
  278. }
  279. }
  280. func runFixedMode(ctx context.Context, host string, portsFlag string, callback monitor.EventCallback, sigChan chan os.Signal) {
  281. var ports []int
  282. for _, p := range strings.Split(portsFlag, ",") {
  283. p = strings.TrimSpace(p)
  284. if port, err := strconv.Atoi(p); err == nil {
  285. ports = append(ports, port)
  286. }
  287. }
  288. if len(ports) == 0 {
  289. fmt.Println("未指定端口")
  290. return
  291. }
  292. fmt.Printf("监控端口: %v\n", ports)
  293. fmt.Println("Ctrl+C 停止")
  294. fmt.Println(strings.Repeat("-", 40))
  295. var wg sync.WaitGroup
  296. for _, port := range ports {
  297. wg.Add(1)
  298. go func(p int) {
  299. defer wg.Done()
  300. m := monitor.New(host, p, callback)
  301. m.Run(ctx)
  302. }(port)
  303. }
  304. <-sigChan
  305. fmt.Println("\n已停止")
  306. wg.Wait()
  307. }
  308. func runDynamicMode(ctx context.Context, host string, scanRange *[2]int, interval int, callback monitor.EventCallback, sigChan chan os.Signal) {
  309. scanner := discovery.NewScanner(host, scanRange)
  310. monitoredPorts := make(map[int]bool)
  311. runningMonitors := make(map[int]context.CancelFunc)
  312. var mu sync.Mutex
  313. startMonitor := func(port int) {
  314. mu.Lock()
  315. if _, exists := runningMonitors[port]; exists {
  316. mu.Unlock()
  317. return
  318. }
  319. monitorCtx, monitorCancel := context.WithCancel(ctx)
  320. runningMonitors[port] = monitorCancel
  321. mu.Unlock()
  322. go func() {
  323. m := monitor.New(host, port, callback)
  324. m.Run(monitorCtx)
  325. }()
  326. }
  327. initial := scanner.Discover()
  328. if len(initial) == 0 {
  329. fmt.Println("未找到运行中的 OpenCode 实例")
  330. fmt.Println("请先执行: opencode serve --port 4096")
  331. fmt.Println("启动后会自动检测,等待中...")
  332. }
  333. for _, port := range initial {
  334. monitoredPorts[port] = true
  335. startMonitor(port)
  336. }
  337. if len(monitoredPorts) > 0 {
  338. ports := make([]int, 0, len(monitoredPorts))
  339. for p := range monitoredPorts {
  340. ports = append(ports, p)
  341. }
  342. sort.Ints(ports)
  343. fmt.Printf("找到 %d 个实例: %v\n", len(monitoredPorts), ports)
  344. }
  345. fmt.Printf("每 %d 秒扫描新实例,Ctrl+C 停止\n", interval)
  346. fmt.Println(strings.Repeat("-", 40))
  347. scanTicker := time.NewTicker(time.Duration(interval) * time.Second)
  348. defer scanTicker.Stop()
  349. go func() {
  350. for {
  351. select {
  352. case <-ctx.Done():
  353. return
  354. case <-scanTicker.C:
  355. newPorts := scanner.Discover()
  356. for _, port := range newPorts {
  357. mu.Lock()
  358. if !monitoredPorts[port] {
  359. monitoredPorts[port] = true
  360. startMonitor(port)
  361. }
  362. mu.Unlock()
  363. }
  364. }
  365. }
  366. }()
  367. <-sigChan
  368. fmt.Println("\n已停止")
  369. }