api.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. package api
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "sort"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/gorilla/websocket"
  11. "ai-status-light/internal/database"
  12. "ai-status-light/internal/logger"
  13. "ai-status-light/internal/web"
  14. )
  15. type ClientStatus struct {
  16. Port int `json:"port"`
  17. Status string `json:"status"`
  18. Code string `json:"code"`
  19. Timestamp string `json:"timestamp"`
  20. }
  21. type Server struct {
  22. db *database.DB
  23. server *http.Server
  24. clients map[*websocket.Conn]bool
  25. clientsMu sync.Mutex
  26. upgrader websocket.Upgrader
  27. statusMap map[int]*ClientStatus
  28. statusMu sync.RWMutex
  29. certFile string
  30. keyFile string
  31. }
  32. type Response struct {
  33. Code int `json:"code"`
  34. Message string `json:"message"`
  35. Data interface{} `json:"data,omitempty"`
  36. }
  37. func New(db *database.DB, addr string) *Server {
  38. s := &Server{
  39. db: db,
  40. clients: make(map[*websocket.Conn]bool),
  41. statusMap: make(map[int]*ClientStatus),
  42. upgrader: websocket.Upgrader{
  43. CheckOrigin: func(r *http.Request) bool {
  44. return true
  45. },
  46. },
  47. }
  48. mux := http.NewServeMux()
  49. mux.HandleFunc("/api/clients", s.handleClients)
  50. mux.HandleFunc("/api/mqtt", s.handleMQTT)
  51. mux.HandleFunc("/api/mqtt/", s.handleMQTTByID)
  52. mux.HandleFunc("/api/health", s.handleHealth)
  53. mux.HandleFunc("/ws", s.handleWebSocket)
  54. mux.HandleFunc("/", web.Handler())
  55. s.server = &http.Server{
  56. Addr: addr,
  57. Handler: corsMiddleware(mux),
  58. }
  59. return s
  60. }
  61. func (s *Server) EnableTLS(certFile, keyFile string) {
  62. s.certFile = certFile
  63. s.keyFile = keyFile
  64. }
  65. func (s *Server) Start() error {
  66. if s.certFile != "" && s.keyFile != "" {
  67. logger.Info("API 服务器开始监听 (HTTPS): %s", s.server.Addr)
  68. err := s.server.ListenAndServeTLS(s.certFile, s.keyFile)
  69. if err != nil && err != http.ErrServerClosed {
  70. logger.Error("API 服务器监听失败: %v", err)
  71. }
  72. return err
  73. }
  74. logger.Info("API 服务器开始监听: %s", s.server.Addr)
  75. err := s.server.ListenAndServe()
  76. if err != nil && err != http.ErrServerClosed {
  77. logger.Error("API 服务器监听失败: %v", err)
  78. }
  79. return err
  80. }
  81. func corsMiddleware(next http.Handler) http.Handler {
  82. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  83. w.Header().Set("Access-Control-Allow-Origin", "*")
  84. w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
  85. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  86. if r.Method == "OPTIONS" {
  87. w.WriteHeader(http.StatusOK)
  88. return
  89. }
  90. next.ServeHTTP(w, r)
  91. })
  92. }
  93. func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
  94. writeJSON(w, http.StatusOK, Response{Code: 0, Message: "ok"})
  95. }
  96. func (s *Server) handleClients(w http.ResponseWriter, r *http.Request) {
  97. if r.Method != http.MethodGet {
  98. writeJSON(w, http.StatusMethodNotAllowed, Response{Code: -1, Message: "方法不允许"})
  99. return
  100. }
  101. s.statusMu.RLock()
  102. result := make([]*ClientStatus, 0, len(s.statusMap))
  103. for _, cs := range s.statusMap {
  104. result = append(result, cs)
  105. }
  106. s.statusMu.RUnlock()
  107. sort.Slice(result, func(i, j int) bool {
  108. return result[i].Port < result[j].Port
  109. })
  110. writeJSON(w, http.StatusOK, Response{Code: 0, Message: "success", Data: result})
  111. }
  112. func (s *Server) handleMQTT(w http.ResponseWriter, r *http.Request) {
  113. logger.Debug("HTTP %s %s", r.Method, r.URL.Path)
  114. switch r.Method {
  115. case http.MethodGet:
  116. s.listMQTTConfigs(w, r)
  117. case http.MethodPost:
  118. s.createMQTTConfig(w, r)
  119. default:
  120. logger.Warn("不支持的 HTTP 方法: %s %s", r.Method, r.URL.Path)
  121. writeJSON(w, http.StatusMethodNotAllowed, Response{Code: -1, Message: "方法不允许"})
  122. }
  123. }
  124. func (s *Server) handleMQTTByID(w http.ResponseWriter, r *http.Request) {
  125. logger.Debug("HTTP %s %s", r.Method, r.URL.Path)
  126. idStr := strings.TrimPrefix(r.URL.Path, "/api/mqtt/")
  127. id, err := strconv.Atoi(idStr)
  128. if err != nil {
  129. logger.Warn("无效的配置 ID: %s", idStr)
  130. writeJSON(w, http.StatusBadRequest, Response{Code: -1, Message: "无效的 ID"})
  131. return
  132. }
  133. switch r.Method {
  134. case http.MethodGet:
  135. s.getMQTTConfig(w, id)
  136. case http.MethodPut:
  137. s.updateMQTTConfig(w, r, id)
  138. case http.MethodDelete:
  139. s.deleteMQTTConfig(w, id)
  140. default:
  141. logger.Warn("不支持的 HTTP 方法: %s %s", r.Method, r.URL.Path)
  142. writeJSON(w, http.StatusMethodNotAllowed, Response{Code: -1, Message: "方法不允许"})
  143. }
  144. }
  145. func (s *Server) listMQTTConfigs(w http.ResponseWriter, r *http.Request) {
  146. configs, err := s.db.ListMQTTConfigs()
  147. if err != nil {
  148. logger.Error("查询 MQTT 配置列表失败: %v", err)
  149. writeJSON(w, http.StatusInternalServerError, Response{Code: -1, Message: err.Error()})
  150. return
  151. }
  152. logger.Debug("查询 MQTT 配置列表: %d 条", len(configs))
  153. writeJSON(w, http.StatusOK, Response{Code: 0, Message: "ok", Data: configs})
  154. }
  155. func (s *Server) createMQTTConfig(w http.ResponseWriter, r *http.Request) {
  156. var cfg database.MQTTConfig
  157. if err := json.NewDecoder(r.Body).Decode(&cfg); err != nil {
  158. logger.Warn("创建配置请求体解析失败: %v", err)
  159. writeJSON(w, http.StatusBadRequest, Response{Code: -1, Message: "无效的请求体"})
  160. return
  161. }
  162. if cfg.Broker == "" {
  163. logger.Warn("创建配置: broker 为空")
  164. writeJSON(w, http.StatusBadRequest, Response{Code: -1, Message: "broker 不能为空"})
  165. return
  166. }
  167. if cfg.ClientID == "" {
  168. cfg.ClientID = "opencode-monitor"
  169. }
  170. if cfg.Topic == "" {
  171. cfg.Topic = "opencode/status"
  172. }
  173. if err := s.db.SaveMQTTConfig(&cfg); err != nil {
  174. logger.Error("创建 MQTT 配置失败: %v", err)
  175. writeJSON(w, http.StatusInternalServerError, Response{Code: -1, Message: err.Error()})
  176. return
  177. }
  178. logger.Info("MQTT 配置已创建: id=%d, broker=%s, topic=%s", cfg.ID, cfg.Broker, cfg.Topic)
  179. writeJSON(w, http.StatusCreated, Response{Code: 0, Message: "创建成功", Data: cfg})
  180. }
  181. func (s *Server) getMQTTConfig(w http.ResponseWriter, id int) {
  182. configs, err := s.db.ListMQTTConfigs()
  183. if err != nil {
  184. logger.Error("查询 MQTT 配置失败: id=%d, %v", id, err)
  185. writeJSON(w, http.StatusInternalServerError, Response{Code: -1, Message: err.Error()})
  186. return
  187. }
  188. for _, cfg := range configs {
  189. if cfg.ID == id {
  190. logger.Debug("查询 MQTT 配置: id=%d", id)
  191. writeJSON(w, http.StatusOK, Response{Code: 0, Message: "ok", Data: cfg})
  192. return
  193. }
  194. }
  195. logger.Warn("MQTT 配置不存在: id=%d", id)
  196. writeJSON(w, http.StatusNotFound, Response{Code: -1, Message: "配置不存在"})
  197. }
  198. func (s *Server) updateMQTTConfig(w http.ResponseWriter, r *http.Request, id int) {
  199. var cfg database.MQTTConfig
  200. if err := json.NewDecoder(r.Body).Decode(&cfg); err != nil {
  201. logger.Warn("更新配置请求体解析失败: id=%d, %v", id, err)
  202. writeJSON(w, http.StatusBadRequest, Response{Code: -1, Message: "无效的请求体"})
  203. return
  204. }
  205. cfg.ID = id
  206. if cfg.Broker == "" {
  207. logger.Warn("更新配置: broker 为空, id=%d", id)
  208. writeJSON(w, http.StatusBadRequest, Response{Code: -1, Message: "broker 不能为空"})
  209. return
  210. }
  211. if cfg.ClientID == "" {
  212. cfg.ClientID = "opencode-monitor"
  213. }
  214. if cfg.Topic == "" {
  215. cfg.Topic = "opencode/status"
  216. }
  217. if err := s.db.SaveMQTTConfig(&cfg); err != nil {
  218. logger.Error("更新 MQTT 配置失败: id=%d, %v", id, err)
  219. writeJSON(w, http.StatusInternalServerError, Response{Code: -1, Message: err.Error()})
  220. return
  221. }
  222. logger.Info("MQTT 配置已更新: id=%d, broker=%s, topic=%s", id, cfg.Broker, cfg.Topic)
  223. writeJSON(w, http.StatusOK, Response{Code: 0, Message: "更新成功", Data: cfg})
  224. }
  225. func (s *Server) deleteMQTTConfig(w http.ResponseWriter, id int) {
  226. if err := s.db.DeleteMQTTConfig(id); err != nil {
  227. logger.Error("删除 MQTT 配置失败: id=%d, %v", id, err)
  228. writeJSON(w, http.StatusInternalServerError, Response{Code: -1, Message: err.Error()})
  229. return
  230. }
  231. logger.Info("MQTT 配置已删除: id=%d", id)
  232. writeJSON(w, http.StatusOK, Response{Code: 0, Message: "删除成功"})
  233. }
  234. func writeJSON(w http.ResponseWriter, statusCode int, data interface{}) {
  235. w.Header().Set("Content-Type", "application/json")
  236. w.WriteHeader(statusCode)
  237. json.NewEncoder(w).Encode(data)
  238. }
  239. func (s *Server) GetAddr() string {
  240. return s.server.Addr
  241. }
  242. func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
  243. conn, err := s.upgrader.Upgrade(w, r, nil)
  244. if err != nil {
  245. logger.Error("WebSocket 升级失败: %v", err)
  246. return
  247. }
  248. s.clientsMu.Lock()
  249. s.clients[conn] = true
  250. s.clientsMu.Unlock()
  251. logger.Info("WebSocket 客户端已连接,当前连接数: %d", len(s.clients))
  252. go func() {
  253. defer func() {
  254. s.clientsMu.Lock()
  255. delete(s.clients, conn)
  256. s.clientsMu.Unlock()
  257. conn.Close()
  258. logger.Info("WebSocket 客户端已断开,当前连接数: %d", len(s.clients))
  259. }()
  260. for {
  261. _, _, err := conn.ReadMessage()
  262. if err != nil {
  263. break
  264. }
  265. }
  266. }()
  267. }
  268. func (s *Server) BroadcastStatus(port int, status string, code string) {
  269. ts := time.Now().Format(time.RFC3339)
  270. s.statusMu.Lock()
  271. s.statusMap[port] = &ClientStatus{
  272. Port: port,
  273. Status: status,
  274. Code: code,
  275. Timestamp: ts,
  276. }
  277. s.statusMu.Unlock()
  278. s.clientsMu.Lock()
  279. defer s.clientsMu.Unlock()
  280. if len(s.clients) == 0 {
  281. return
  282. }
  283. payload := map[string]interface{}{
  284. "port": port,
  285. "status": status,
  286. "code": code,
  287. "timestamp": ts,
  288. }
  289. data, err := json.Marshal(payload)
  290. if err != nil {
  291. logger.Error("序列化广播消息失败: %v", err)
  292. return
  293. }
  294. for client := range s.clients {
  295. err := client.WriteMessage(websocket.TextMessage, data)
  296. if err != nil {
  297. logger.Debug("WebSocket 写入失败,移除客户端: %v", err)
  298. client.Close()
  299. delete(s.clients, client)
  300. }
  301. }
  302. }