monitor.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package monitor
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "AI-Status-Light/internal/event"
  11. "AI-Status-Light/internal/logger"
  12. )
  13. type EventCallback func(port int, evt *event.SSEEvent)
  14. type Monitor struct {
  15. baseURL string
  16. port int
  17. client *http.Client
  18. callback EventCallback
  19. lastActivity time.Time
  20. timeout time.Duration
  21. }
  22. func New(host string, port int, callback EventCallback) *Monitor {
  23. return &Monitor{
  24. baseURL: fmt.Sprintf("http://%s:%d", host, port),
  25. port: port,
  26. client: &http.Client{Timeout: 2 * time.Second},
  27. callback: callback,
  28. lastActivity: time.Now(),
  29. timeout: 5 * time.Minute,
  30. }
  31. }
  32. func (m *Monitor) CheckHealth() bool {
  33. resp, err := m.client.Get(m.baseURL + "/global/health")
  34. if err != nil {
  35. logger.Debug("端口 %d 健康检查失败: %v", m.port, err)
  36. return false
  37. }
  38. defer resp.Body.Close()
  39. healthy := resp.StatusCode == 200
  40. if healthy {
  41. logger.Debug("端口 %d 健康检查通过", m.port)
  42. } else {
  43. logger.Debug("端口 %d 健康检查返回状态码: %d", m.port, resp.StatusCode)
  44. }
  45. return healthy
  46. }
  47. func (m *Monitor) Run(ctx context.Context) {
  48. req, err := http.NewRequestWithContext(ctx, "GET", m.baseURL+"/event", nil)
  49. if err != nil {
  50. logger.Error("端口 %d 创建请求失败: %v", m.port, err)
  51. return
  52. }
  53. req.Header.Set("Accept", "text/event-stream")
  54. resp, err := http.DefaultClient.Do(req)
  55. if err != nil {
  56. logger.Debug("端口 %d 连接事件流失败: %v", m.port, err)
  57. return
  58. }
  59. defer resp.Body.Close()
  60. logger.Info("端口 %d 已连接到事件流: %s/event", m.port, m.baseURL)
  61. m.lastActivity = time.Now()
  62. // 启动超时检测 goroutine
  63. timeoutChan := make(chan struct{})
  64. go func() {
  65. ticker := time.NewTicker(30 * time.Second)
  66. defer ticker.Stop()
  67. for {
  68. select {
  69. case <-ctx.Done():
  70. return
  71. case <-timeoutChan:
  72. return
  73. case <-ticker.C:
  74. if time.Since(m.lastActivity) > m.timeout {
  75. logger.Warn("端口 %d 超时 %v,回收监控", m.port, m.timeout)
  76. fmt.Printf("端口 %d 超时 %v,回收监控\n", m.port, m.timeout)
  77. return
  78. }
  79. }
  80. }
  81. }()
  82. scanner := bufio.NewScanner(resp.Body)
  83. for scanner.Scan() {
  84. select {
  85. case <-ctx.Done():
  86. logger.Debug("端口 %d 上下文已取消,退出事件循环", m.port)
  87. close(timeoutChan)
  88. return
  89. default:
  90. }
  91. line := scanner.Text()
  92. if !strings.HasPrefix(line, "data:") {
  93. continue
  94. }
  95. data := strings.TrimSpace(line[5:])
  96. if data == "" {
  97. continue
  98. }
  99. var evt event.SSEEvent
  100. if err := json.Unmarshal([]byte(data), &evt); err != nil {
  101. logger.Debug("端口 %d 解析事件失败: %v", m.port, err)
  102. continue
  103. }
  104. m.lastActivity = time.Now()
  105. if m.callback != nil {
  106. m.callback(m.port, &evt)
  107. }
  108. }
  109. close(timeoutChan)
  110. logger.Debug("端口 %d 事件流读取结束", m.port)
  111. }