monitor.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package monitor
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "strings"
  11. "time"
  12. "ai-status-light/internal/event"
  13. "ai-status-light/internal/logger"
  14. )
  15. type EventCallback func(port int, evt *event.SSEEvent)
  16. type Monitor struct {
  17. baseURL string
  18. port int
  19. client *http.Client
  20. callback EventCallback
  21. lastActivity time.Time
  22. timeout time.Duration
  23. }
  24. func New(host string, port int, callback EventCallback) *Monitor {
  25. return &Monitor{
  26. baseURL: fmt.Sprintf("http://%s:%d", host, port),
  27. port: port,
  28. client: &http.Client{Timeout: 2 * time.Second},
  29. callback: callback,
  30. lastActivity: time.Now(),
  31. timeout: 5 * time.Minute,
  32. }
  33. }
  34. func (m *Monitor) CheckHealth() bool {
  35. resp, err := m.client.Get(m.baseURL + "/global/health")
  36. if err != nil {
  37. logger.Debug("端口 %d 健康检查失败: %v", m.port, err)
  38. return false
  39. }
  40. defer resp.Body.Close()
  41. healthy := resp.StatusCode == 200
  42. if healthy {
  43. logger.Debug("端口 %d 健康检查通过", m.port)
  44. } else {
  45. logger.Debug("端口 %d 健康检查返回状态码: %d", m.port, resp.StatusCode)
  46. }
  47. return healthy
  48. }
  49. func (m *Monitor) Run(ctx context.Context) {
  50. for {
  51. if ctx.Err() != nil {
  52. return
  53. }
  54. if !m.CheckHealth() {
  55. logger.Debug("端口 %d 健康检查失败,等待重试", m.port)
  56. select {
  57. case <-ctx.Done():
  58. return
  59. case <-time.After(5 * time.Second):
  60. }
  61. continue
  62. }
  63. err := m.connectAndRead(ctx)
  64. if ctx.Err() != nil {
  65. return
  66. }
  67. if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
  68. logger.Info("端口 %d 连接关闭: %v, 停止监控", m.port, err)
  69. return
  70. }
  71. logger.Warn("端口 %d 连接断开: %v, 3秒后重连", m.port, err)
  72. select {
  73. case <-ctx.Done():
  74. return
  75. case <-time.After(3 * time.Second):
  76. }
  77. }
  78. }
  79. func (m *Monitor) connectAndRead(ctx context.Context) error {
  80. req, err := http.NewRequestWithContext(ctx, "GET", m.baseURL+"/event", nil)
  81. if err != nil {
  82. logger.Error("端口 %d 创建请求失败: %v", m.port, err)
  83. return err
  84. }
  85. req.Header.Set("Accept", "text/event-stream")
  86. resp, err := http.DefaultClient.Do(req)
  87. if err != nil {
  88. logger.Debug("端口 %d 连接事件流失败: %v", m.port, err)
  89. return err
  90. }
  91. defer resp.Body.Close()
  92. logger.Info("端口 %d 已连接到事件流: %s/event", m.port, m.baseURL)
  93. m.lastActivity = time.Now()
  94. // 启动超时检测 goroutine
  95. timeoutChan := make(chan struct{})
  96. go func() {
  97. ticker := time.NewTicker(30 * time.Second)
  98. defer ticker.Stop()
  99. for {
  100. select {
  101. case <-ctx.Done():
  102. return
  103. case <-timeoutChan:
  104. return
  105. case <-ticker.C:
  106. if time.Since(m.lastActivity) > m.timeout {
  107. logger.Warn("端口 %d 超时 %v,回收监控", m.port, m.timeout)
  108. fmt.Printf("端口 %d 超时 %v,回收监控\n", m.port, m.timeout)
  109. return
  110. }
  111. }
  112. }
  113. }()
  114. scanner := bufio.NewScanner(resp.Body)
  115. scanner.Buffer(make([]byte, 0, 1024*1024), 1024*1024)
  116. for scanner.Scan() {
  117. select {
  118. case <-ctx.Done():
  119. logger.Debug("端口 %d 上下文已取消,退出事件循环", m.port)
  120. close(timeoutChan)
  121. return nil
  122. default:
  123. }
  124. line := scanner.Text()
  125. if !strings.HasPrefix(line, "data:") {
  126. continue
  127. }
  128. data := strings.TrimSpace(line[5:])
  129. if data == "" {
  130. continue
  131. }
  132. var evt event.SSEEvent
  133. if err := json.Unmarshal([]byte(data), &evt); err != nil {
  134. logger.Debug("端口 %d 解析事件失败: %v", m.port, err)
  135. continue
  136. }
  137. m.lastActivity = time.Now()
  138. if m.callback != nil {
  139. m.callback(m.port, &evt)
  140. }
  141. }
  142. close(timeoutChan)
  143. logger.Debug("端口 %d 事件流读取结束", m.port)
  144. return scanner.Err()
  145. }