| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- package monitor
- import (
- "bufio"
- "context"
- "encoding/json"
- "fmt"
- "net/http"
- "strings"
- "time"
- "AI-Status-Light/internal/event"
- "AI-Status-Light/internal/logger"
- )
- type EventCallback func(port int, evt *event.SSEEvent)
- type Monitor struct {
- baseURL string
- port int
- client *http.Client
- callback EventCallback
- lastActivity time.Time
- timeout time.Duration
- }
- func New(host string, port int, callback EventCallback) *Monitor {
- return &Monitor{
- baseURL: fmt.Sprintf("http://%s:%d", host, port),
- port: port,
- client: &http.Client{Timeout: 2 * time.Second},
- callback: callback,
- lastActivity: time.Now(),
- timeout: 5 * time.Minute,
- }
- }
- func (m *Monitor) CheckHealth() bool {
- resp, err := m.client.Get(m.baseURL + "/global/health")
- if err != nil {
- logger.Debug("端口 %d 健康检查失败: %v", m.port, err)
- return false
- }
- defer resp.Body.Close()
- healthy := resp.StatusCode == 200
- if healthy {
- logger.Debug("端口 %d 健康检查通过", m.port)
- } else {
- logger.Debug("端口 %d 健康检查返回状态码: %d", m.port, resp.StatusCode)
- }
- return healthy
- }
- func (m *Monitor) Run(ctx context.Context) {
- req, err := http.NewRequestWithContext(ctx, "GET", m.baseURL+"/event", nil)
- if err != nil {
- logger.Error("端口 %d 创建请求失败: %v", m.port, err)
- return
- }
- req.Header.Set("Accept", "text/event-stream")
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- logger.Debug("端口 %d 连接事件流失败: %v", m.port, err)
- return
- }
- defer resp.Body.Close()
- logger.Info("端口 %d 已连接到事件流: %s/event", m.port, m.baseURL)
- m.lastActivity = time.Now()
- // 启动超时检测 goroutine
- timeoutChan := make(chan struct{})
- go func() {
- ticker := time.NewTicker(30 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-timeoutChan:
- return
- case <-ticker.C:
- if time.Since(m.lastActivity) > m.timeout {
- logger.Warn("端口 %d 超时 %v,回收监控", m.port, m.timeout)
- fmt.Printf("端口 %d 超时 %v,回收监控\n", m.port, m.timeout)
- return
- }
- }
- }
- }()
- scanner := bufio.NewScanner(resp.Body)
- for scanner.Scan() {
- select {
- case <-ctx.Done():
- logger.Debug("端口 %d 上下文已取消,退出事件循环", m.port)
- close(timeoutChan)
- return
- default:
- }
- line := scanner.Text()
- if !strings.HasPrefix(line, "data:") {
- continue
- }
- data := strings.TrimSpace(line[5:])
- if data == "" {
- continue
- }
- var evt event.SSEEvent
- if err := json.Unmarshal([]byte(data), &evt); err != nil {
- logger.Debug("端口 %d 解析事件失败: %v", m.port, err)
- continue
- }
- m.lastActivity = time.Now()
- if m.callback != nil {
- m.callback(m.port, &evt)
- }
- }
- close(timeoutChan)
- logger.Debug("端口 %d 事件流读取结束", m.port)
- }
|