| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- package mqtt
- import (
- "encoding/json"
- "fmt"
- "time"
- mqttlib "github.com/eclipse/paho.mqtt.golang"
- "AI-Status-Light/internal/database"
- "AI-Status-Light/internal/logger"
- )
- type Client struct {
- broker string
- clientID string
- username string
- password string
- topic string
- client mqttlib.Client
- }
- type StatusMessage struct {
- Port int `json:"port"`
- Type string `json:"type"`
- Status string `json:"status,omitempty"`
- Tool string `json:"tool,omitempty"`
- State string `json:"state,omitempty"`
- Title string `json:"title,omitempty"`
- Timestamp string `json:"timestamp"`
- }
- func New(broker, clientID, username, password, topic string) *Client {
- return &Client{
- broker: broker,
- clientID: clientID,
- username: username,
- password: password,
- topic: topic,
- }
- }
- func NewFromConfig(cfg *database.MQTTConfig) *Client {
- return &Client{
- broker: cfg.Broker,
- clientID: cfg.ClientID,
- username: cfg.Username,
- password: cfg.Password,
- topic: cfg.Topic,
- }
- }
- func (c *Client) Connect() error {
- opts := mqttlib.NewClientOptions()
- opts.AddBroker(c.broker)
- opts.SetClientID(c.clientID)
- if c.username != "" {
- opts.SetUsername(c.username)
- }
- if c.password != "" {
- opts.SetPassword(c.password)
- }
- opts.SetAutoReconnect(true)
- opts.SetConnectTimeout(10 * time.Second)
- opts.SetOnConnectHandler(func(client mqttlib.Client) {
- logger.Info("MQTT 已连接: %s", c.broker)
- })
- opts.SetConnectionLostHandler(func(client mqttlib.Client, err error) {
- logger.Warn("MQTT 连接断开: %v", err)
- })
- c.client = mqttlib.NewClient(opts)
- token := c.client.Connect()
- token.Wait()
- if token.Error() != nil {
- return fmt.Errorf("MQTT 连接失败: %w", token.Error())
- }
- return nil
- }
- func (c *Client) Publish(port int, eventType, status, tool, state, title string) error {
- msg := StatusMessage{
- Port: port,
- Type: eventType,
- Status: status,
- Tool: tool,
- State: state,
- Title: title,
- Timestamp: time.Now().Format(time.RFC3339),
- }
- payload, err := json.Marshal(msg)
- if err != nil {
- return fmt.Errorf("序列化消息失败: %w", err)
- }
- token := c.client.Publish(c.topic, 0, false, payload)
- token.Wait()
- return token.Error()
- }
- func (c *Client) PublishRaw(topic string, payload interface{}) error {
- var data []byte
- switch v := payload.(type) {
- case string:
- data = []byte(v)
- default:
- var err error
- data, err = json.Marshal(v)
- if err != nil {
- return err
- }
- }
- token := c.client.Publish(topic, 0, false, data)
- token.Wait()
- return token.Error()
- }
- func (c *Client) GetTopic() string {
- return c.topic
- }
- func (c *Client) Disconnect() {
- if c.client != nil && c.client.IsConnected() {
- c.client.Disconnect(1000)
- }
- }
|