Skip to content

MQTT 客户端接入与开发指南

本文档提供在硬件终端(支持 Node.js 环境或 Python 的嵌入式操作系统)上对接安全 MQTT Broker 的开发指南。所有客户端实现均必须包含持久化会话遗嘱消息 (LWT)、以及防雪崩的指数退避重连机制。


1. JavaScript (MQTT.js) 接入规范

在以 Node.js 为基础的边缘网关或客户端应用上,使用官方最流行的 mqtt 库实现接入。

1.1 核心依赖安装

bash
npm install mqtt --save

1.2 高可用客户端示例代码

新建一个 device.js 文件,实现标准的事件驱动和断线自愈:

javascript
const mqtt = require('mqtt');

const DEVICE_ID = 'dev_f49b10a2';
const BROKER_URL = 'wss://mqtt.example.com:9001'; // 生产环境使用加密 WebSocket 接口

// 1. 建立设备 LWT 遗嘱载荷
const lwtPayload = JSON.stringify({
  deviceId: DEVICE_ID,
  timestamp: Date.now(),
  status: 'offline',
  reason: 'LWT_TRIGGERED'
});

// 2. 连接参数调优
const options = {
  clientId: DEVICE_ID, // ClientID 必须与 ACL 中的 %c 占位符保持一致
  username: 'device_client_01',
  password: 'secret_token_abc',
  clean: false,        // 必须设为 false,开启持久化会话!设备掉线时 Broker 缓存 QoS 1 消息
  keepalive: 60,       // 心跳时间
  reconnectPeriod: 1000, // 初始重连延时 (1秒)
  connectTimeout: 30 * 1000,
  
  // 遗嘱消息配置
  will: {
    topic: `devices/${DEVICE_ID}/status`,
    payload: Buffer.from(lwtPayload),
    qos: 1,
    retain: true
  }
};

const client = mqtt.connect(BROKER_URL, options);

// 用于重连延迟自适应累加(指数退避)
let retryCount = 0;
const MAX_RECONNECT_DELAY = 60 * 1000; // 最大重连延迟为 60 秒

client.on('connect', (connack) => {
  console.log(`[MQTT] 成功连接至 Broker. SessionPresent: ${connack.sessionPresent}`);
  retryCount = 0; // 重置指数退避计数
  options.reconnectPeriod = 1000; // 恢复初始延迟

  // 主动发布上线状态以覆盖 LWT 保留消息
  const onlinePayload = JSON.stringify({
    deviceId: DEVICE_ID,
    timestamp: Date.now(),
    status: 'online',
    reason: 'DEVICE_CONNECTED'
  });
  client.publish(`devices/${DEVICE_ID}/status`, onlinePayload, { qos: 1, retain: true });

  // 订阅云端下发的命令主题 (QoS 1)
  client.subscribe(`devices/${DEVICE_ID}/commands`, { qos: 1 }, (err) => {
    if (err) console.error('[MQTT] 订阅指令主题失败:', err);
  });
});

// 3. 处理云端下发命令并异步 ACK 应答
client.on('message', async (topic, message) => {
  if (topic === `devices/${DEVICE_ID}/commands`) {
    try {
      const command = JSON.parse(message.toString());
      console.log(`[MQTT] 收到云端控制指令:`, command);
      
      // 模拟硬件操作执行 (如改变采样率等)
      const executionSuccess = await executeHardwareCommand(command);
      
      if (executionSuccess) {
        // 反馈 ACK 状态给业务网关
        const ackPayload = JSON.stringify({
          deviceId: DEVICE_ID,
          commandId: command.commandId,
          status: 'SUCCESS',
          timestamp: Date.now()
        });
        client.publish(`devices/${DEVICE_ID}/commands/ack`, ackPayload, { qos: 1 });
      }
    } catch (e) {
      console.error('[MQTT] 解析指令异常:', e);
    }
  }
});

// 4. 指数退避重连算法实施
client.on('offline', () => {
  console.warn('[MQTT] 连接已断开,正在准备自适应重连...');
  if (options.reconnectPeriod < MAX_RECONNECT_DELAY) {
    retryCount++;
    // 指数计算延迟:delay = initial * (2 ^ retry)
    options.reconnectPeriod = Math.min(1000 * Math.pow(2, retryCount), MAX_RECONNECT_DELAY);
    console.log(`[MQTT] 自适应退避重连延迟已调整为: ${options.reconnectPeriod / 1000} 秒`);
  }
});

client.on('error', (err) => {
  console.error('[MQTT] 异常错误:', err);
});

async function executeHardwareCommand(cmd) {
  // 模拟设备控制逻辑
  return new Promise((resolve) => setTimeout(() => resolve(true), 500));
}

2. Python (paho-mqtt) 接入规范

在搭载嵌入式 Linux(如树莓派)上,通常推荐使用官方的 paho-mqtt 包。

2.1 依赖安装

bash
pip install paho-mqtt

2.2 健壮接入示例代码

python
import time
import json
import math
import ssl
import paho.mqtt.client as mqtt

DEVICE_ID = "dev_f49b10a2"
BROKER_HOST = "mqtt.example.com"
BROKER_PORT = 8883 # 生产环境 TLS 加密接口

reconnect_attempts = 0

def on_connect(client, userdata, flags, rc):
    global reconnect_attempts
    if rc == 0:
        print(f"[MQTT] 连接成功。清除指数计数。Flags: {flags}")
        reconnect_attempts = 0
        
        # 订阅云端命令
        client.subscribe(f"devices/{DEVICE_ID}/commands", qos=1)
        
        # 发布上线消息
        online_msg = {
            "deviceId": DEVICE_ID,
            "timestamp": int(time.time() * 1000),
            "status": "online",
            "reason": "DEVICE_CONNECTED"
        }
        client.publish(f"devices/{DEVICE_ID}/status", json.dumps(online_msg), qos=1, retain=True)
    else:
        print(f"[MQTT] 连接失败,错误代码: {rc}")

def on_message(client, userdata, msg):
    if msg.topic == f"devices/{DEVICE_ID}/commands":
        print(f"[MQTT] 收到云端指令: {msg.payload.decode('utf-8')}")
        # 执行完毕后发送 ACK 的逻辑同 JS 示例

def on_disconnect(client, userdata, rc):
    global reconnect_attempts
    print(f"[MQTT] 连接断开,返回代码: {rc}。启动退避延迟算法...")
    reconnect_attempts += 1
    
    # 指数退避计算:delay = min(1.5 ^ attempts, 60)
    delay = min(math.pow(1.5, reconnect_attempts), 60.0)
    print(f"[MQTT] 等待 {delay:.1f} 秒后尝试自动恢复重连...")
    time.sleep(delay)

client = mqtt.Client(client_id=DEVICE_ID, clean_session=False) # 必须设 clean_session=False
client.username_pw_set("device_client_01", "secret_token_abc")

# 注册回调
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

# 注册遗嘱消息 (Will)
lwt_msg = {
    "deviceId": DEVICE_ID,
    "timestamp": int(time.time() * 1000),
    "status": "offline",
    "reason": "LWT_TRIGGERED"
}
client.will_set(f"devices/{DEVICE_ID}/status", json.dumps(lwt_msg), qos=1, retain=True)

# 开启安全 SSL 通信
context = ssl.create_default_context()
client.tls_set_context(context)

# 建立非阻塞后台事件循环
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
client.loop_forever()