功能描述

MQTT原始数据订阅服务基于 MQTT 协议,可实现从 轻松连(UbiBot)平台向客户端实时接收传感器设备的原始数据推送。客户端可订阅自身账户或已分享的设备账户通道,以接收设备实时传递出来的数据。为保持订阅的有效性,客户端需定期发送心跳请求。

注意:该接口服务仅对 UbiBot Plus 会员等级为 青铜及以上的用户开放,免费账户无法使用该功能。

MQTT连接方式

  • Host: mqtt-api.ubibot.cn
  • Port: 1883 or 8883 (for SSL encrypted connection)
  • WebSocket Port: 8083 or 8084 (for SSL encrypted connection)
  • WebSocket Path:/mqtt
  • Username: Format: user_id=USER_ID
  • Password: Format: account_key=ACCOUNT_KEY

请将USER_ID and ACCOUNT_KEY 替换成您账户对应的秘钥信息,获取方式请登录 轻松连控制台.

MQTT订阅维持心跳包

为了保证可以接收实时订阅数据,订阅客户端还需要定时发送心跳包请求,请使用HTTP GET方式到:

https://webapi.ubibot.cn/mqtt-user-feeds/subcribe-ping

客户端需至少每 300 秒发送一次心跳请求。若未按时发送,服务器将终止该账户的数据推送。推荐每 240 秒发送一次,且请避免发送过于频繁(如每 30秒一次)。

MQTT订阅维持心跳包请求参数

  • account_key (字符串, 必须)
  • user_id (选填): 使用英文逗号分隔的需要订阅的设备分享主的user ID列表

MQTT Topics

  • 订阅自身账户所有的原始数据推送:
    /user/USER_ID/channel_feeds/#
  • 订阅指定账号下的指定设备原始市局推送:
    /user/USER_ID/channel_feeds/CHANNEL_ID

请对应替换USER_ID and CHANNEL_ID. 如果需要订阅其他账户下的设备,需要填入对应设备主的user_id。
注意: 对方用户必须在轻松连控制台中为您的账户启用数据共享(分享权限),否则您将无法接收其设备数据。

 

Python – MQTT原始数据订阅示例

# -*- coding: utf-8 -*-
# UbiBot MQTT Feed Subscription with Heartbeat (Python)

import paho.mqtt.client as mqtt
import threading
import requests
import time

# Replace with your actual credentials
USER_ID = "your_user_id"
ACCOUNT_KEY = "your_account_key"
OTHER_USER_IDS = ""  # Optional, e.g., "user1,user2"

# MQTT connection settings
MQTT_HOST = "mqtt-api.ubibot.cn"
MQTT_PORT = 1883
MQTT_USERNAME = f"user_id={USER_ID}"
MQTT_PASSWORD = f"account_key={ACCOUNT_KEY}"
MQTT_TOPIC = f"/user/{USER_ID}/channel_feeds/#"

# Heartbeat settings
HEARTBEAT_URL = "https://webapi.ubibot.cn/mqtt-user-feeds/subcribe-ping"
HEARTBEAT_INTERVAL = 240  # seconds

# Heartbeat function
def send_heartbeat():
    params = {
        "account_key": ACCOUNT_KEY
    }
    if OTHER_USER_IDS:
        params["user_id"] = OTHER_USER_IDS

    try:
        response = requests.get(HEARTBEAT_URL, params=params, timeout=5)
        print(f"[HEARTBEAT] Sent. Status: {response.status_code}, Response: {response.text}")
    except Exception as e:
        print(f"[HEARTBEAT] Failed: {e}")

    # Schedule next heartbeat
    threading.Timer(HEARTBEAT_INTERVAL, send_heartbeat).start()

# MQTT Callbacks
def on_message(client, userdata, msg):
    print(f"[RECV] Topic: {msg.topic}")
    print(f"[RECV] Payload: {msg.payload.decode()}")

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("[INFO] Connected successfully.")
        client.subscribe(MQTT_TOPIC)
        print(f"[INFO] Subscribed to: {MQTT_TOPIC}")
    else:
        print(f"[ERROR] Connection failed with code {rc}")

# Start MQTT client
client = mqtt.Client()
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.on_message = on_message

print("[INFO] Connecting to MQTT broker...")
client.connect(MQTT_HOST, MQTT_PORT, 60)

# Start heartbeat thread
send_heartbeat()

# Start MQTT loop
client.loop_forever()

NodeJS – MQTT原始数据订阅示例

// Node.js – MQTT Feed Subscription Example with Heartbeat

const mqtt = require('mqtt');
const https = require('https');
const querystring = require('querystring');

// Replace with your actual credentials
const USER_ID = 'your_user_id';
const ACCOUNT_KEY = 'your_account_key';
const OTHER_USER_IDS = ''; // Optional, e.g., 'user1,user2'

const options = {
  username: `user_id=${USER_ID}`,
  password: `account_key=${ACCOUNT_KEY}`
};

const topic = `/user/${USER_ID}/channel_feeds/#`;

const client = mqtt.connect('mqtt://mqtt-api.ubibot.cn:1883', options);

client.on('connect', () => {
  console.log('[INFO] Connected to MQTT broker.');
  client.subscribe(topic, (err) => {
    if (!err) {
      console.log('[INFO] Subscribed to:', topic);
    } else {
      console.error('[ERROR] Subscribe failed:', err.message);
    }
  });

  // Start sending heartbeat
  sendHeartbeat();
  setInterval(sendHeartbeat, 240000); // every 240 seconds
});

client.on('message', (topic, message) => {
  console.log(`[RECV] Topic: ${topic}`);
  console.log(`[RECV] Payload: ${message.toString()}`);
});

function sendHeartbeat() {
  const params = {
    account_key: ACCOUNT_KEY
  };

  if (OTHER_USER_IDS) {
    params.user_id = OTHER_USER_IDS;
  }

  const query = querystring.stringify(params);
  const url = `https://webapi.ubibot.cn/mqtt-user-feeds/subcribe-ping?${query}`;

  https.get(url, (res) => {
    let data = '';
    res.on('data', (chunk) => { data += chunk; });
    res.on('end', () => {
      console.log(`[HEARTBEAT] Status: ${res.statusCode}, Response: ${data}`);
    });
  }).on('error', (err) => {
    console.error(`[HEARTBEAT] Error: ${err.message}`);
  });
}

C# – MQTT原始数据订阅示例

// C# – MQTT Feed Subscription Example with Heartbeat
// Requires MQTTnet (via NuGet) and System.Net.Http

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    private static readonly string USER_ID = "your_user_id";
    private static readonly string ACCOUNT_KEY = "your_account_key";
    private static readonly string OTHER_USER_IDS = ""; // Optional: "user1,user2"
    private static readonly string TOPIC = $"/user/{USER_ID}/channel_feeds/#";
    private static readonly string HEARTBEAT_URL = "https://webapi.ubibot.cn/mqtt-user-feeds/subcribe-ping";
    private static readonly int HEARTBEAT_INTERVAL = 240; // seconds

    private static readonly HttpClient httpClient = new HttpClient();

    static async Task Main(string[] args)
    {
        var factory = new MqttFactory();
        var mqttClient = factory.CreateMqttClient();

        var options = new MqttClientOptionsBuilder()
            .WithTcpServer("mqtt-api.ubibot.cn", 1883)
            .WithCredentials($"user_id={USER_ID}", $"account_key={ACCOUNT_KEY}")
            .WithCleanSession()
            .Build();

        mqttClient.UseConnectedHandler(async e =>
        {
            Console.WriteLine("[INFO] Connected to MQTT broker.");
            await mqttClient.SubscribeAsync(TOPIC);
            Console.WriteLine($"[INFO] Subscribed to: {TOPIC}");

            // Start heartbeat loop
            _ = Task.Run(() => StartHeartbeatLoop());
        });

        mqttClient.UseApplicationMessageReceivedHandler(e =>
        {
            Console.WriteLine($"[RECV] Topic: {e.ApplicationMessage.Topic}");
            Console.WriteLine($"[RECV] Payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
        });

        mqttClient.UseDisconnectedHandler(e =>
        {
            Console.WriteLine("[WARN] Disconnected from MQTT broker.");
        });

        Console.WriteLine("[INFO] Connecting...");
        await mqttClient.ConnectAsync(options);

        Console.WriteLine("[INFO] Press any key to exit.");
        Console.ReadLine();
    }

    static async Task StartHeartbeatLoop()
    {
        while (true)
        {
            try
            {
                var uriBuilder = new UriBuilder(HEARTBEAT_URL);
                var query = $"account_key={ACCOUNT_KEY}";
                if (!string.IsNullOrEmpty(OTHER_USER_IDS))
                {
                    query += $"&user_id={OTHER_USER_IDS}";
                }
                uriBuilder.Query = query;

                var response = await httpClient.GetAsync(uriBuilder.Uri);
                var result = await response.Content.ReadAsStringAsync();
                Console.WriteLine($"[HEARTBEAT] Status: {response.StatusCode}, Response: {result}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"[HEARTBEAT] Error: {ex.Message}");
            }

            await Task.Delay(HEARTBEAT_INTERVAL * 1000);
        }
    }
}