package monitor import ( "bufio" "context" "encoding/json" "errors" "fmt" "io" "net/http" "strings" "time" "ai-status-light/internal/event" "ai-status-light/internal/logger" ) type EventCallback func(port int, evt *event.SSEEvent) type Monitor struct { baseURL string port int client *http.Client callback EventCallback lastActivity time.Time timeout time.Duration } func New(host string, port int, callback EventCallback) *Monitor { return &Monitor{ baseURL: fmt.Sprintf("http://%s:%d", host, port), port: port, client: &http.Client{Timeout: 2 * time.Second}, callback: callback, lastActivity: time.Now(), timeout: 5 * time.Minute, } } func (m *Monitor) CheckHealth() bool { resp, err := m.client.Get(m.baseURL + "/global/health") if err != nil { logger.Debug("端口 %d 健康检查失败: %v", m.port, err) return false } defer resp.Body.Close() healthy := resp.StatusCode == 200 if healthy { logger.Debug("端口 %d 健康检查通过", m.port) } else { logger.Debug("端口 %d 健康检查返回状态码: %d", m.port, resp.StatusCode) } return healthy } func (m *Monitor) Run(ctx context.Context) { for { if ctx.Err() != nil { return } if !m.CheckHealth() { logger.Debug("端口 %d 健康检查失败,等待重试", m.port) select { case <-ctx.Done(): return case <-time.After(5 * time.Second): } continue } err := m.connectAndRead(ctx) if ctx.Err() != nil { return } if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { logger.Info("端口 %d 连接关闭: %v, 停止监控", m.port, err) return } logger.Warn("端口 %d 连接断开: %v, 3秒后重连", m.port, err) select { case <-ctx.Done(): return case <-time.After(3 * time.Second): } } } func (m *Monitor) connectAndRead(ctx context.Context) error { req, err := http.NewRequestWithContext(ctx, "GET", m.baseURL+"/event", nil) if err != nil { logger.Error("端口 %d 创建请求失败: %v", m.port, err) return err } req.Header.Set("Accept", "text/event-stream") resp, err := http.DefaultClient.Do(req) if err != nil { logger.Debug("端口 %d 连接事件流失败: %v", m.port, err) return err } defer resp.Body.Close() logger.Info("端口 %d 已连接到事件流: %s/event", m.port, m.baseURL) m.lastActivity = time.Now() // 启动超时检测 goroutine timeoutChan := make(chan struct{}) go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-timeoutChan: return case <-ticker.C: if time.Since(m.lastActivity) > m.timeout { logger.Warn("端口 %d 超时 %v,回收监控", m.port, m.timeout) fmt.Printf("端口 %d 超时 %v,回收监控\n", m.port, m.timeout) return } } } }() scanner := bufio.NewScanner(resp.Body) scanner.Buffer(make([]byte, 0, 1024*1024), 1024*1024) for scanner.Scan() { select { case <-ctx.Done(): logger.Debug("端口 %d 上下文已取消,退出事件循环", m.port) close(timeoutChan) return nil default: } line := scanner.Text() if !strings.HasPrefix(line, "data:") { continue } data := strings.TrimSpace(line[5:]) if data == "" { continue } var evt event.SSEEvent if err := json.Unmarshal([]byte(data), &evt); err != nil { logger.Debug("端口 %d 解析事件失败: %v", m.port, err) continue } m.lastActivity = time.Now() if m.callback != nil { m.callback(m.port, &evt) } } close(timeoutChan) logger.Debug("端口 %d 事件流读取结束", m.port) return scanner.Err() }