monitor.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package monitor
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "AI-Status-Light/internal/event"
  11. )
  12. type EventCallback func(port int, evt *event.SSEEvent)
  13. type Monitor struct {
  14. baseURL string
  15. port int
  16. client *http.Client
  17. callback EventCallback
  18. lastActivity time.Time
  19. timeout time.Duration
  20. }
  21. func New(host string, port int, callback EventCallback) *Monitor {
  22. return &Monitor{
  23. baseURL: fmt.Sprintf("http://%s:%d", host, port),
  24. port: port,
  25. client: &http.Client{Timeout: 2 * time.Second},
  26. callback: callback,
  27. lastActivity: time.Now(),
  28. timeout: 5 * time.Minute,
  29. }
  30. }
  31. func (m *Monitor) CheckHealth() bool {
  32. resp, err := m.client.Get(m.baseURL + "/global/health")
  33. if err != nil {
  34. return false
  35. }
  36. defer resp.Body.Close()
  37. return resp.StatusCode == 200
  38. }
  39. func (m *Monitor) Run(ctx context.Context) {
  40. req, err := http.NewRequestWithContext(ctx, "GET", m.baseURL+"/event", nil)
  41. if err != nil {
  42. return
  43. }
  44. req.Header.Set("Accept", "text/event-stream")
  45. resp, err := http.DefaultClient.Do(req)
  46. if err != nil {
  47. return
  48. }
  49. defer resp.Body.Close()
  50. m.lastActivity = time.Now()
  51. // 启动超时检测 goroutine
  52. timeoutChan := make(chan struct{})
  53. go func() {
  54. ticker := time.NewTicker(30 * time.Second)
  55. defer ticker.Stop()
  56. for {
  57. select {
  58. case <-ctx.Done():
  59. return
  60. case <-timeoutChan:
  61. return
  62. case <-ticker.C:
  63. if time.Since(m.lastActivity) > m.timeout {
  64. fmt.Printf("端口 %d 超时 %v,回收监控\n", m.port, m.timeout)
  65. return
  66. }
  67. }
  68. }
  69. }()
  70. scanner := bufio.NewScanner(resp.Body)
  71. for scanner.Scan() {
  72. select {
  73. case <-ctx.Done():
  74. close(timeoutChan)
  75. return
  76. default:
  77. }
  78. line := scanner.Text()
  79. if !strings.HasPrefix(line, "data:") {
  80. continue
  81. }
  82. data := strings.TrimSpace(line[5:])
  83. if data == "" {
  84. continue
  85. }
  86. var evt event.SSEEvent
  87. if err := json.Unmarshal([]byte(data), &evt); err != nil {
  88. continue
  89. }
  90. m.lastActivity = time.Now()
  91. if m.callback != nil {
  92. m.callback(m.port, &evt)
  93. }
  94. }
  95. close(timeoutChan)
  96. }