| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- package monitor
- import (
- "bufio"
- "context"
- "encoding/json"
- "fmt"
- "net/http"
- "strings"
- "time"
- "AI-Status-Light/internal/event"
- )
- type EventCallback func(port int, evt *event.SSEEvent)
- type Monitor struct {
- baseURL string
- port int
- client *http.Client
- callback EventCallback
- lastActivity time.Time
- timeout time.Duration
- }
- func New(host string, port int, callback EventCallback) *Monitor {
- return &Monitor{
- baseURL: fmt.Sprintf("http://%s:%d", host, port),
- port: port,
- client: &http.Client{Timeout: 2 * time.Second},
- callback: callback,
- lastActivity: time.Now(),
- timeout: 5 * time.Minute,
- }
- }
- func (m *Monitor) CheckHealth() bool {
- resp, err := m.client.Get(m.baseURL + "/global/health")
- if err != nil {
- return false
- }
- defer resp.Body.Close()
- return resp.StatusCode == 200
- }
- func (m *Monitor) Run(ctx context.Context) {
- req, err := http.NewRequestWithContext(ctx, "GET", m.baseURL+"/event", nil)
- if err != nil {
- return
- }
- req.Header.Set("Accept", "text/event-stream")
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return
- }
- defer resp.Body.Close()
- m.lastActivity = time.Now()
- // 启动超时检测 goroutine
- timeoutChan := make(chan struct{})
- go func() {
- ticker := time.NewTicker(30 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-timeoutChan:
- return
- case <-ticker.C:
- if time.Since(m.lastActivity) > m.timeout {
- fmt.Printf("端口 %d 超时 %v,回收监控\n", m.port, m.timeout)
- return
- }
- }
- }
- }()
- scanner := bufio.NewScanner(resp.Body)
- for scanner.Scan() {
- select {
- case <-ctx.Done():
- close(timeoutChan)
- return
- default:
- }
- line := scanner.Text()
- if !strings.HasPrefix(line, "data:") {
- continue
- }
- data := strings.TrimSpace(line[5:])
- if data == "" {
- continue
- }
- var evt event.SSEEvent
- if err := json.Unmarshal([]byte(data), &evt); err != nil {
- continue
- }
- m.lastActivity = time.Now()
- if m.callback != nil {
- m.callback(m.port, &evt)
- }
- }
- close(timeoutChan)
- }
|