main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "os"
  7. "os/signal"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "syscall"
  13. "time"
  14. "AI-Status-Light/internal/api"
  15. "AI-Status-Light/internal/database"
  16. "AI-Status-Light/internal/discovery"
  17. "AI-Status-Light/internal/event"
  18. "AI-Status-Light/internal/monitor"
  19. mqttcli "AI-Status-Light/internal/mqtt"
  20. )
  21. const defaultDBPath = "./data/config.db"
  22. var Version = "dev"
  23. func main() {
  24. if len(os.Args) < 2 {
  25. printUsage()
  26. return
  27. }
  28. switch os.Args[1] {
  29. case "monitor":
  30. runMonitor(os.Args[2:])
  31. case "config":
  32. runConfig(os.Args[2:])
  33. case "serve":
  34. runServe(os.Args[2:])
  35. case "version":
  36. fmt.Printf("opencode-monitor %s\n", Version)
  37. default:
  38. printUsage()
  39. }
  40. }
  41. func printUsage() {
  42. fmt.Printf("opencode-monitor %s\n\n", Version)
  43. fmt.Println("用法: opencode-monitor <命令> [选项]")
  44. fmt.Println("")
  45. fmt.Println("命令:")
  46. fmt.Println(" monitor 启动监控")
  47. fmt.Println(" config 管理 MQTT 配置")
  48. fmt.Println(" serve 启动 API 服务")
  49. fmt.Println(" version 显示版本信息")
  50. fmt.Println("")
  51. fmt.Println("运行 'opencode-monitor <命令> -h' 查看命令帮助")
  52. }
  53. func runServe(args []string) {
  54. fs := flag.NewFlagSet("serve", flag.ExitOnError)
  55. addr := fs.String("addr", ":8080", "监听地址")
  56. dbPath := fs.String("db", defaultDBPath, "数据库路径")
  57. fs.Parse(args)
  58. db, err := database.New(*dbPath)
  59. if err != nil {
  60. fmt.Printf("打开数据库失败: %v\n", err)
  61. return
  62. }
  63. defer db.Close()
  64. server := api.New(db, *addr)
  65. fmt.Printf("API 服务已启动: %s\n", *addr)
  66. fmt.Println("接口文档:")
  67. fmt.Println(" GET /api/health - 健康检查")
  68. fmt.Println(" GET /api/mqtt - 获取所有配置")
  69. fmt.Println(" POST /api/mqtt - 创建配置")
  70. fmt.Println(" GET /api/mqtt/:id - 获取单个配置")
  71. fmt.Println(" PUT /api/mqtt/:id - 更新配置")
  72. fmt.Println(" DELETE /api/mqtt/:id - 删除配置")
  73. if err := server.Start(); err != nil {
  74. fmt.Printf("服务启动失败: %v\n", err)
  75. }
  76. }
  77. func runMonitor(args []string) {
  78. fs := flag.NewFlagSet("monitor", flag.ExitOnError)
  79. host := fs.String("host", "127.0.0.1", "主机地址")
  80. portsFlag := fs.String("ports", "", "端口列表,逗号分隔 (如: 4096,4097,4098)")
  81. scanFlag := fs.String("scan", "", "扫描端口范围 (如: 4096-4100)")
  82. pidFlag := fs.Bool("pid", false, "通过进程号查找端口")
  83. intervalFlag := fs.Int("interval", 5, "动态扫描间隔(秒), 默认5")
  84. dbPath := fs.String("db", defaultDBPath, "数据库路径")
  85. apiAddr := fs.String("api-addr", "", "API 服务地址 (如: :8080)")
  86. fs.Parse(args)
  87. var scanRange *[2]int
  88. if *scanFlag != "" {
  89. parts := strings.Split(*scanFlag, "-")
  90. if len(parts) == 2 {
  91. start, err1 := strconv.Atoi(parts[0])
  92. end, err2 := strconv.Atoi(parts[1])
  93. if err1 == nil && err2 == nil {
  94. scanRange = &[2]int{start, end}
  95. }
  96. }
  97. }
  98. ctx, cancel := context.WithCancel(context.Background())
  99. defer cancel()
  100. sigChan := make(chan os.Signal, 1)
  101. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  102. db, err := database.New(*dbPath)
  103. if err != nil {
  104. fmt.Printf("打开数据库失败: %v\n", err)
  105. return
  106. }
  107. defer db.Close()
  108. if *apiAddr != "" {
  109. apiServer := api.New(db, *apiAddr)
  110. go func() {
  111. fmt.Printf("API 服务已启动: %s\n", *apiAddr)
  112. if err := apiServer.Start(); err != nil {
  113. fmt.Printf("API 服务失败: %v\n", err)
  114. }
  115. }()
  116. }
  117. var mqttClient *mqttcli.Client
  118. cfg, err := db.GetMQTTConfig()
  119. if err != nil {
  120. fmt.Printf("读取 MQTT 配置失败: %v\n", err)
  121. } else if cfg != nil {
  122. mqttClient = mqttcli.NewFromConfig(cfg)
  123. if err := mqttClient.Connect(); err != nil {
  124. fmt.Printf("MQTT 连接失败: %v\n", err)
  125. mqttClient = nil
  126. } else {
  127. defer mqttClient.Disconnect()
  128. fmt.Printf("MQTT 已连接: %s (主题: %s)\n", cfg.Broker, cfg.Topic)
  129. }
  130. }
  131. callback := createCallback(mqttClient)
  132. if *portsFlag != "" {
  133. runFixedMode(ctx, *host, *portsFlag, callback, sigChan)
  134. cancel()
  135. return
  136. }
  137. if scanRange != nil {
  138. fmt.Printf("扫描端口范围 %d-%d...\n", scanRange[0], scanRange[1])
  139. } else if *pidFlag {
  140. fmt.Println("通过进程号查找 OpenCode 端口...")
  141. } else {
  142. fmt.Println("查找 OpenCode 实例...")
  143. }
  144. runDynamicMode(ctx, *host, scanRange, *intervalFlag, callback, sigChan)
  145. cancel()
  146. }
  147. func runConfig(args []string) {
  148. if len(args) < 1 {
  149. printConfigUsage()
  150. return
  151. }
  152. dbPath := defaultDBPath
  153. for i, arg := range args {
  154. if arg == "--db" && i+1 < len(args) {
  155. dbPath = args[i+1]
  156. break
  157. }
  158. }
  159. db, err := database.New(dbPath)
  160. if err != nil {
  161. fmt.Printf("打开数据库失败: %v\n", err)
  162. return
  163. }
  164. defer db.Close()
  165. switch args[0] {
  166. case "list":
  167. configs, err := db.ListMQTTConfigs()
  168. if err != nil {
  169. fmt.Printf("查询失败: %v\n", err)
  170. return
  171. }
  172. if len(configs) == 0 {
  173. fmt.Println("未配置 MQTT")
  174. return
  175. }
  176. for _, cfg := range configs {
  177. status := "禁用"
  178. if cfg.Enabled {
  179. status = "启用"
  180. }
  181. auth := ""
  182. if cfg.Username != "" {
  183. auth = fmt.Sprintf(" [认证: %s]", cfg.Username)
  184. }
  185. fmt.Printf("[%d] %s | %s | %s%s | %s\n", cfg.ID, cfg.Broker, cfg.ClientID, cfg.Topic, auth, status)
  186. }
  187. case "set":
  188. fs := flag.NewFlagSet("config set", flag.ExitOnError)
  189. broker := fs.String("broker", "", "MQTT Broker 地址 (如: tcp://127.0.0.1:1883)")
  190. clientID := fs.String("client-id", "opencode-monitor", "MQTT 客户端 ID")
  191. username := fs.String("username", "", "MQTT 用户名")
  192. password := fs.String("password", "", "MQTT 密码")
  193. topic := fs.String("topic", "opencode/status", "MQTT 主题")
  194. enabled := fs.Bool("enabled", true, "是否启用")
  195. fs.Parse(args[1:])
  196. if *broker == "" {
  197. fmt.Println("必须指定 --broker")
  198. return
  199. }
  200. cfg := &database.MQTTConfig{
  201. Broker: *broker,
  202. ClientID: *clientID,
  203. Username: *username,
  204. Password: *password,
  205. Topic: *topic,
  206. Enabled: *enabled,
  207. }
  208. if err := db.SaveMQTTConfig(cfg); err != nil {
  209. fmt.Printf("保存失败: %v\n", err)
  210. return
  211. }
  212. fmt.Println("配置已保存")
  213. case "delete":
  214. if len(args) < 2 {
  215. fmt.Println("必须指定配置 ID")
  216. return
  217. }
  218. id, err := strconv.Atoi(args[1])
  219. if err != nil {
  220. fmt.Println("无效的 ID")
  221. return
  222. }
  223. if err := db.DeleteMQTTConfig(id); err != nil {
  224. fmt.Printf("删除失败: %v\n", err)
  225. return
  226. }
  227. fmt.Println("配置已删除")
  228. default:
  229. printConfigUsage()
  230. }
  231. }
  232. func printConfigUsage() {
  233. fmt.Println("用法: opencode-monitor config <子命令> [选项]")
  234. fmt.Println("")
  235. fmt.Println("子命令:")
  236. fmt.Println(" list 列出所有配置")
  237. fmt.Println(" set 设置 MQTT 配置")
  238. fmt.Println(" delete <id> 删除配置")
  239. fmt.Println("")
  240. fmt.Println("选项:")
  241. fmt.Println(" --broker MQTT Broker 地址")
  242. fmt.Println(" --client-id MQTT 客户端 ID")
  243. fmt.Println(" --username MQTT 用户名")
  244. fmt.Println(" --password MQTT 密码")
  245. fmt.Println(" --topic MQTT 主题")
  246. fmt.Println(" --enabled 是否启用 (true/false)")
  247. fmt.Println(" --db 数据库路径")
  248. }
  249. func createCallback(mqttClient *mqttcli.Client) monitor.EventCallback {
  250. return func(port int, evt *event.SSEEvent) {
  251. msg := event.FormatEvent(port, evt)
  252. if msg != "" {
  253. fmt.Println(msg)
  254. }
  255. if mqttClient == nil {
  256. return
  257. }
  258. var status string
  259. switch evt.Type {
  260. case "session.status":
  261. if s, ok := evt.Properties["status"].(map[string]interface{}); ok {
  262. status = event.ParseStatus(s)
  263. }
  264. case "session.idle":
  265. status = "空闲"
  266. case "message.part.updated":
  267. if part, ok := evt.Properties["part"].(map[string]interface{}); ok {
  268. switch part["type"].(string) {
  269. case "tool":
  270. if st, ok := part["state"].(map[string]interface{}); ok {
  271. status = event.ParseToolState(st)
  272. }
  273. case "reasoning":
  274. status = "思考中"
  275. }
  276. }
  277. case "session.error":
  278. status = "错误"
  279. }
  280. if status != "" && msg != "" {
  281. if err := mqttClient.PublishRaw(mqttClient.GetTopic(), msg); err != nil {
  282. fmt.Printf("MQTT 发送失败: %v\n", err)
  283. }
  284. }
  285. }
  286. }
  287. func runFixedMode(ctx context.Context, host string, portsFlag string, callback monitor.EventCallback, sigChan chan os.Signal) {
  288. var ports []int
  289. for _, p := range strings.Split(portsFlag, ",") {
  290. p = strings.TrimSpace(p)
  291. if port, err := strconv.Atoi(p); err == nil {
  292. ports = append(ports, port)
  293. }
  294. }
  295. if len(ports) == 0 {
  296. fmt.Println("未指定端口")
  297. return
  298. }
  299. fmt.Printf("监控端口: %v\n", ports)
  300. fmt.Println("Ctrl+C 停止")
  301. fmt.Println(strings.Repeat("-", 40))
  302. var wg sync.WaitGroup
  303. for _, port := range ports {
  304. wg.Add(1)
  305. go func(p int) {
  306. defer wg.Done()
  307. m := monitor.New(host, p, callback)
  308. m.Run(ctx)
  309. }(port)
  310. }
  311. <-sigChan
  312. fmt.Println("\n已停止")
  313. wg.Wait()
  314. }
  315. func runDynamicMode(ctx context.Context, host string, scanRange *[2]int, interval int, callback monitor.EventCallback, sigChan chan os.Signal) {
  316. scanner := discovery.NewScanner(host, scanRange)
  317. monitoredPorts := make(map[int]bool)
  318. runningMonitors := make(map[int]context.CancelFunc)
  319. var mu sync.Mutex
  320. startMonitor := func(port int) {
  321. mu.Lock()
  322. if _, exists := runningMonitors[port]; exists {
  323. mu.Unlock()
  324. return
  325. }
  326. monitorCtx, monitorCancel := context.WithCancel(ctx)
  327. runningMonitors[port] = monitorCancel
  328. mu.Unlock()
  329. go func() {
  330. fmt.Printf("开始监控端口: %d\n", port)
  331. m := monitor.New(host, port, callback)
  332. m.Run(monitorCtx)
  333. // 连接断开后清理记录,允许重新连接
  334. mu.Lock()
  335. delete(runningMonitors, port)
  336. delete(monitoredPorts, port)
  337. mu.Unlock()
  338. fmt.Printf("端口 %d 监控已停止,等待重新连接\n", port)
  339. }()
  340. }
  341. initial := scanner.Discover()
  342. if len(initial) == 0 {
  343. fmt.Println("未找到运行中的 OpenCode 实例")
  344. fmt.Println("请先执行: opencode serve --port 4096")
  345. fmt.Println("启动后会自动检测,等待中...")
  346. }
  347. for _, port := range initial {
  348. monitoredPorts[port] = true
  349. startMonitor(port)
  350. }
  351. if len(monitoredPorts) > 0 {
  352. ports := make([]int, 0, len(monitoredPorts))
  353. for p := range monitoredPorts {
  354. ports = append(ports, p)
  355. }
  356. sort.Ints(ports)
  357. fmt.Printf("找到 %d 个实例: %v\n", len(monitoredPorts), ports)
  358. }
  359. fmt.Printf("每 %d 秒扫描新实例,Ctrl+C 停止\n", interval)
  360. fmt.Println(strings.Repeat("-", 40))
  361. scanTicker := time.NewTicker(time.Duration(interval) * time.Second)
  362. defer scanTicker.Stop()
  363. go func() {
  364. for {
  365. select {
  366. case <-ctx.Done():
  367. return
  368. case <-scanTicker.C:
  369. newPorts := scanner.Discover()
  370. for _, port := range newPorts {
  371. mu.Lock()
  372. if !monitoredPorts[port] {
  373. monitoredPorts[port] = true
  374. startMonitor(port)
  375. }
  376. mu.Unlock()
  377. }
  378. }
  379. }
  380. }()
  381. <-sigChan
  382. fmt.Println("\n已停止")
  383. }