mqtt.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package mqtt
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "time"
  6. mqttlib "github.com/eclipse/paho.mqtt.golang"
  7. "AI-Status-Light/internal/database"
  8. "AI-Status-Light/internal/logger"
  9. )
  10. type Client struct {
  11. broker string
  12. clientID string
  13. username string
  14. password string
  15. topic string
  16. client mqttlib.Client
  17. }
  18. type StatusMessage struct {
  19. Port int `json:"port"`
  20. Type string `json:"type"`
  21. Status string `json:"status,omitempty"`
  22. Tool string `json:"tool,omitempty"`
  23. State string `json:"state,omitempty"`
  24. Title string `json:"title,omitempty"`
  25. Timestamp string `json:"timestamp"`
  26. }
  27. func New(broker, clientID, username, password, topic string) *Client {
  28. return &Client{
  29. broker: broker,
  30. clientID: clientID,
  31. username: username,
  32. password: password,
  33. topic: topic,
  34. }
  35. }
  36. func NewFromConfig(cfg *database.MQTTConfig) *Client {
  37. return &Client{
  38. broker: cfg.Broker,
  39. clientID: cfg.ClientID,
  40. username: cfg.Username,
  41. password: cfg.Password,
  42. topic: cfg.Topic,
  43. }
  44. }
  45. func (c *Client) Connect() error {
  46. opts := mqttlib.NewClientOptions()
  47. opts.AddBroker(c.broker)
  48. opts.SetClientID(c.clientID)
  49. if c.username != "" {
  50. opts.SetUsername(c.username)
  51. }
  52. if c.password != "" {
  53. opts.SetPassword(c.password)
  54. }
  55. opts.SetAutoReconnect(true)
  56. opts.SetConnectTimeout(10 * time.Second)
  57. opts.SetOnConnectHandler(func(client mqttlib.Client) {
  58. logger.Info("MQTT 已连接: %s", c.broker)
  59. })
  60. opts.SetConnectionLostHandler(func(client mqttlib.Client, err error) {
  61. logger.Warn("MQTT 连接断开: %v", err)
  62. })
  63. c.client = mqttlib.NewClient(opts)
  64. token := c.client.Connect()
  65. token.Wait()
  66. if token.Error() != nil {
  67. return fmt.Errorf("MQTT 连接失败: %w", token.Error())
  68. }
  69. return nil
  70. }
  71. func (c *Client) Publish(port int, eventType, status, tool, state, title string) error {
  72. msg := StatusMessage{
  73. Port: port,
  74. Type: eventType,
  75. Status: status,
  76. Tool: tool,
  77. State: state,
  78. Title: title,
  79. Timestamp: time.Now().Format(time.RFC3339),
  80. }
  81. payload, err := json.Marshal(msg)
  82. if err != nil {
  83. return fmt.Errorf("序列化消息失败: %w", err)
  84. }
  85. token := c.client.Publish(c.topic, 0, false, payload)
  86. token.Wait()
  87. return token.Error()
  88. }
  89. func (c *Client) PublishRaw(topic string, payload interface{}) error {
  90. var data []byte
  91. switch v := payload.(type) {
  92. case string:
  93. data = []byte(v)
  94. default:
  95. var err error
  96. data, err = json.Marshal(v)
  97. if err != nil {
  98. return err
  99. }
  100. }
  101. token := c.client.Publish(topic, 0, false, data)
  102. token.Wait()
  103. return token.Error()
  104. }
  105. func (c *Client) GetTopic() string {
  106. return c.topic
  107. }
  108. func (c *Client) Disconnect() {
  109. if c.client != nil && c.client.IsConnected() {
  110. c.client.Disconnect(1000)
  111. }
  112. }