|
|
@@ -11,8 +11,6 @@ import (
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
- "github.com/gorilla/websocket"
|
|
|
-
|
|
|
"ai-status-light/internal/database"
|
|
|
"ai-status-light/internal/logger"
|
|
|
mqttcli "ai-status-light/internal/mqtt"
|
|
|
@@ -20,8 +18,6 @@ import (
|
|
|
)
|
|
|
|
|
|
type ClientStatus struct {
|
|
|
- Port int `json:"port"`
|
|
|
- Status string `json:"status"`
|
|
|
Code string `json:"code"`
|
|
|
Timestamp string `json:"timestamp"`
|
|
|
}
|
|
|
@@ -31,12 +27,17 @@ type EventRequest struct {
|
|
|
Timestamp string `json:"timestamp,omitempty"`
|
|
|
}
|
|
|
|
|
|
+type SSEClient struct {
|
|
|
+ ch chan string
|
|
|
+ closed bool
|
|
|
+ mu sync.Mutex
|
|
|
+}
|
|
|
+
|
|
|
type Server struct {
|
|
|
db *database.DB
|
|
|
server *http.Server
|
|
|
- clients map[*websocket.Conn]bool
|
|
|
- clientsMu sync.Mutex
|
|
|
- upgrader websocket.Upgrader
|
|
|
+ sseClients map[*SSEClient]bool
|
|
|
+ sseMu sync.Mutex
|
|
|
statusMap map[int]*ClientStatus
|
|
|
statusMu sync.RWMutex
|
|
|
certFile string
|
|
|
@@ -53,25 +54,20 @@ type Response struct {
|
|
|
|
|
|
func New(db *database.DB, addr string) *Server {
|
|
|
s := &Server{
|
|
|
- db: db,
|
|
|
- clients: make(map[*websocket.Conn]bool),
|
|
|
- statusMap: make(map[int]*ClientStatus),
|
|
|
- upgrader: websocket.Upgrader{
|
|
|
- CheckOrigin: func(r *http.Request) bool {
|
|
|
- return true
|
|
|
- },
|
|
|
- },
|
|
|
+ db: db,
|
|
|
+ sseClients: make(map[*SSEClient]bool),
|
|
|
+ statusMap: make(map[int]*ClientStatus),
|
|
|
}
|
|
|
|
|
|
mux := http.NewServeMux()
|
|
|
mux.HandleFunc("/api/clients", s.handleClients)
|
|
|
mux.HandleFunc("/api/event", s.handleEvent)
|
|
|
+ mux.HandleFunc("/api/events", s.handleSSE)
|
|
|
mux.HandleFunc("/api/mqtt", s.handleMQTT)
|
|
|
mux.HandleFunc("/api/mqtt/", s.handleMQTTByID)
|
|
|
mux.HandleFunc("/api/ble", s.handleBLE)
|
|
|
mux.HandleFunc("/api/ble/", s.handleBLEByID)
|
|
|
mux.HandleFunc("/api/health", s.handleHealth)
|
|
|
- mux.HandleFunc("/ws", s.handleWebSocket)
|
|
|
mux.HandleFunc("/", web.Handler())
|
|
|
|
|
|
s.server = &http.Server{
|
|
|
@@ -169,12 +165,117 @@ func (s *Server) handleEvent(w http.ResponseWriter, r *http.Request) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // 广播到 WebSocket 客户端
|
|
|
- s.BroadcastStatus(0, req.Code, req.Code)
|
|
|
+ // 广播到 SSE 客户端
|
|
|
+ s.broadcastSSE(req.Code)
|
|
|
|
|
|
writeJSON(w, http.StatusOK, Response{Code: 0, Message: "ok"})
|
|
|
}
|
|
|
|
|
|
+func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) {
|
|
|
+ flusher, ok := w.(http.Flusher)
|
|
|
+ if !ok {
|
|
|
+ http.Error(w, "SSE 不支持", http.StatusInternalServerError)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ w.Header().Set("Content-Type", "text/event-stream")
|
|
|
+ w.Header().Set("Cache-Control", "no-cache")
|
|
|
+ w.Header().Set("Connection", "keep-alive")
|
|
|
+ w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
+
|
|
|
+ client := &SSEClient{
|
|
|
+ ch: make(chan string, 10),
|
|
|
+ }
|
|
|
+
|
|
|
+ s.sseMu.Lock()
|
|
|
+ s.sseClients[client] = true
|
|
|
+ s.sseMu.Unlock()
|
|
|
+
|
|
|
+ logger.Info("SSE 客户端已连接,当前连接数: %d", len(s.sseClients))
|
|
|
+
|
|
|
+ defer func() {
|
|
|
+ s.sseMu.Lock()
|
|
|
+ delete(s.sseClients, client)
|
|
|
+ s.sseMu.Unlock()
|
|
|
+ client.Close()
|
|
|
+ logger.Info("SSE 客户端已断开,当前连接数: %d", len(s.sseClients))
|
|
|
+ }()
|
|
|
+
|
|
|
+ // 发送初始连接消息
|
|
|
+ fmt.Fprintf(w, "event: connected\ndata: {\"status\":\"ok\"}\n\n")
|
|
|
+ flusher.Flush()
|
|
|
+
|
|
|
+ ctx := r.Context()
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return
|
|
|
+ case msg, ok := <-client.ch:
|
|
|
+ if !ok {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ fmt.Fprintf(w, "event: status\ndata: %s\n\n", msg)
|
|
|
+ flusher.Flush()
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (s *Server) broadcastSSE(code string) {
|
|
|
+ ts := time.Now().Format(time.RFC3339)
|
|
|
+
|
|
|
+ s.statusMu.Lock()
|
|
|
+ s.statusMap[0] = &ClientStatus{
|
|
|
+ Code: code,
|
|
|
+ Timestamp: ts,
|
|
|
+ }
|
|
|
+ s.statusMu.Unlock()
|
|
|
+
|
|
|
+ s.sseMu.Lock()
|
|
|
+ defer s.sseMu.Unlock()
|
|
|
+
|
|
|
+ if len(s.sseClients) == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ payload := map[string]interface{}{
|
|
|
+ "code": code,
|
|
|
+ "timestamp": ts,
|
|
|
+ }
|
|
|
+
|
|
|
+ data, err := json.Marshal(payload)
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("序列化 SSE 消息失败: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ msg := string(data)
|
|
|
+ for client := range s.sseClients {
|
|
|
+ client.Send(msg)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *SSEClient) Send(msg string) {
|
|
|
+ c.mu.Lock()
|
|
|
+ defer c.mu.Unlock()
|
|
|
+ if c.closed {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ select {
|
|
|
+ case c.ch <- msg:
|
|
|
+ default:
|
|
|
+ logger.Debug("SSE 客户端缓冲区已满,丢弃消息")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *SSEClient) Close() {
|
|
|
+ c.mu.Lock()
|
|
|
+ defer c.mu.Unlock()
|
|
|
+ if !c.closed {
|
|
|
+ c.closed = true
|
|
|
+ close(c.ch)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (s *Server) handleClients(w http.ResponseWriter, r *http.Request) {
|
|
|
if r.Method != http.MethodGet {
|
|
|
writeJSON(w, http.StatusMethodNotAllowed, Response{Code: -1, Message: "方法不允许"})
|
|
|
@@ -189,7 +290,7 @@ func (s *Server) handleClients(w http.ResponseWriter, r *http.Request) {
|
|
|
s.statusMu.RUnlock()
|
|
|
|
|
|
sort.Slice(result, func(i, j int) bool {
|
|
|
- return result[i].Port < result[j].Port
|
|
|
+ return result[i].Code < result[j].Code
|
|
|
})
|
|
|
|
|
|
writeJSON(w, http.StatusOK, Response{Code: 0, Message: "success", Data: result})
|
|
|
@@ -469,76 +570,3 @@ func writeJSON(w http.ResponseWriter, statusCode int, data interface{}) {
|
|
|
func (s *Server) GetAddr() string {
|
|
|
return s.server.Addr
|
|
|
}
|
|
|
-
|
|
|
-func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
|
|
|
- conn, err := s.upgrader.Upgrade(w, r, nil)
|
|
|
- if err != nil {
|
|
|
- logger.Error("WebSocket 升级失败: %v", err)
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- s.clientsMu.Lock()
|
|
|
- s.clients[conn] = true
|
|
|
- s.clientsMu.Unlock()
|
|
|
-
|
|
|
- logger.Info("WebSocket 客户端已连接,当前连接数: %d", len(s.clients))
|
|
|
-
|
|
|
- go func() {
|
|
|
- defer func() {
|
|
|
- s.clientsMu.Lock()
|
|
|
- delete(s.clients, conn)
|
|
|
- s.clientsMu.Unlock()
|
|
|
- conn.Close()
|
|
|
- logger.Info("WebSocket 客户端已断开,当前连接数: %d", len(s.clients))
|
|
|
- }()
|
|
|
-
|
|
|
- for {
|
|
|
- _, _, err := conn.ReadMessage()
|
|
|
- if err != nil {
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
-}
|
|
|
-
|
|
|
-func (s *Server) BroadcastStatus(port int, status string, code string) {
|
|
|
- ts := time.Now().Format(time.RFC3339)
|
|
|
-
|
|
|
- s.statusMu.Lock()
|
|
|
- s.statusMap[port] = &ClientStatus{
|
|
|
- Port: port,
|
|
|
- Status: status,
|
|
|
- Code: code,
|
|
|
- Timestamp: ts,
|
|
|
- }
|
|
|
- s.statusMu.Unlock()
|
|
|
-
|
|
|
- s.clientsMu.Lock()
|
|
|
- defer s.clientsMu.Unlock()
|
|
|
-
|
|
|
- if len(s.clients) == 0 {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- payload := map[string]interface{}{
|
|
|
- "port": port,
|
|
|
- "status": status,
|
|
|
- "code": code,
|
|
|
- "timestamp": ts,
|
|
|
- }
|
|
|
-
|
|
|
- data, err := json.Marshal(payload)
|
|
|
- if err != nil {
|
|
|
- logger.Error("序列化广播消息失败: %v", err)
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- for client := range s.clients {
|
|
|
- err := client.WriteMessage(websocket.TextMessage, data)
|
|
|
- if err != nil {
|
|
|
- logger.Debug("WebSocket 写入失败,移除客户端: %v", err)
|
|
|
- client.Close()
|
|
|
- delete(s.clients, client)
|
|
|
- }
|
|
|
- }
|
|
|
-}
|