功能描述
MQTT原始数据订阅服务基于 MQTT 协议,可实现从 轻松连(UbiBot)平台向客户端实时接收传感器设备的原始数据推送。客户端可订阅自身账户或已分享的设备账户通道,以接收设备实时传递出来的数据。为保持订阅的有效性,客户端需定期发送心跳请求。
注意:该接口服务仅对 UbiBot Plus 会员等级为 青铜及以上的用户开放,免费账户无法使用该功能。
MQTT连接方式
- Host: mqtt-api.ubibot.cn
- Port: 1883 or 8883 (for SSL encrypted connection)
- WebSocket Port: 8083 or 8084 (for SSL encrypted connection)
- WebSocket Path:/mqtt
- Username: Format: user_id=USER_ID
- Password: Format: account_key=ACCOUNT_KEY
请将USER_ID
and ACCOUNT_KEY
替换成您账户对应的秘钥信息,获取方式请登录 轻松连控制台.
MQTT订阅维持心跳包
为了保证可以接收实时订阅数据,订阅客户端还需要定时发送心跳包请求,请使用HTTP GET方式到:
https://webapi.ubibot.cn/mqtt-user-feeds/subcribe-ping
客户端需至少每 300 秒发送一次心跳请求。若未按时发送,服务器将终止该账户的数据推送。推荐每 240 秒发送一次,且请避免发送过于频繁(如每 30秒一次)。
MQTT订阅维持心跳包请求参数
account_key
(字符串, 必须)
user_id
(选填): 使用英文逗号分隔的需要订阅的设备分享主的user ID列表
MQTT Topics
请对应替换USER_ID
and CHANNEL_ID
. 如果需要订阅其他账户下的设备,需要填入对应设备主的user_id。
注意: 对方用户必须在轻松连控制台中为您的账户启用数据共享(分享权限),否则您将无法接收其设备数据。
Python – MQTT原始数据订阅示例
# -*- coding: utf-8 -*-
# UbiBot MQTT Feed Subscription with Heartbeat (Python)
import paho.mqtt.client as mqtt
import threading
import requests
import time
# Replace with your actual credentials
USER_ID = "your_user_id"
ACCOUNT_KEY = "your_account_key"
OTHER_USER_IDS = "" # Optional, e.g., "user1,user2"
# MQTT connection settings
MQTT_HOST = "mqtt-api.ubibot.cn"
MQTT_PORT = 1883
MQTT_USERNAME = f"user_id={USER_ID}"
MQTT_PASSWORD = f"account_key={ACCOUNT_KEY}"
MQTT_TOPIC = f"/user/{USER_ID}/channel_feeds/#"
# Heartbeat settings
HEARTBEAT_URL = "https://webapi.ubibot.cn/mqtt-user-feeds/subcribe-ping"
HEARTBEAT_INTERVAL = 240 # seconds
# Heartbeat function
def send_heartbeat():
params = {
"account_key": ACCOUNT_KEY
}
if OTHER_USER_IDS:
params["user_id"] = OTHER_USER_IDS
try:
response = requests.get(HEARTBEAT_URL, params=params, timeout=5)
print(f"[HEARTBEAT] Sent. Status: {response.status_code}, Response: {response.text}")
except Exception as e:
print(f"[HEARTBEAT] Failed: {e}")
# Schedule next heartbeat
threading.Timer(HEARTBEAT_INTERVAL, send_heartbeat).start()
# MQTT Callbacks
def on_message(client, userdata, msg):
print(f"[RECV] Topic: {msg.topic}")
print(f"[RECV] Payload: {msg.payload.decode()}")
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("[INFO] Connected successfully.")
client.subscribe(MQTT_TOPIC)
print(f"[INFO] Subscribed to: {MQTT_TOPIC}")
else:
print(f"[ERROR] Connection failed with code {rc}")
# Start MQTT client
client = mqtt.Client()
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
print("[INFO] Connecting to MQTT broker...")
client.connect(MQTT_HOST, MQTT_PORT, 60)
# Start heartbeat thread
send_heartbeat()
# Start MQTT loop
client.loop_forever()
NodeJS – MQTT原始数据订阅示例
// Node.js – MQTT Feed Subscription Example with Heartbeat
const mqtt = require('mqtt');
const https = require('https');
const querystring = require('querystring');
// Replace with your actual credentials
const USER_ID = 'your_user_id';
const ACCOUNT_KEY = 'your_account_key';
const OTHER_USER_IDS = ''; // Optional, e.g., 'user1,user2'
const options = {
username: `user_id=${USER_ID}`,
password: `account_key=${ACCOUNT_KEY}`
};
const topic = `/user/${USER_ID}/channel_feeds/#`;
const client = mqtt.connect('mqtt://mqtt-api.ubibot.cn:1883', options);
client.on('connect', () => {
console.log('[INFO] Connected to MQTT broker.');
client.subscribe(topic, (err) => {
if (!err) {
console.log('[INFO] Subscribed to:', topic);
} else {
console.error('[ERROR] Subscribe failed:', err.message);
}
});
// Start sending heartbeat
sendHeartbeat();
setInterval(sendHeartbeat, 240000); // every 240 seconds
});
client.on('message', (topic, message) => {
console.log(`[RECV] Topic: ${topic}`);
console.log(`[RECV] Payload: ${message.toString()}`);
});
function sendHeartbeat() {
const params = {
account_key: ACCOUNT_KEY
};
if (OTHER_USER_IDS) {
params.user_id = OTHER_USER_IDS;
}
const query = querystring.stringify(params);
const url = `https://webapi.ubibot.cn/mqtt-user-feeds/subcribe-ping?${query}`;
https.get(url, (res) => {
let data = '';
res.on('data', (chunk) => { data += chunk; });
res.on('end', () => {
console.log(`[HEARTBEAT] Status: ${res.statusCode}, Response: ${data}`);
});
}).on('error', (err) => {
console.error(`[HEARTBEAT] Error: ${err.message}`);
});
}
C# – MQTT原始数据订阅示例
// C# – MQTT Feed Subscription Example with Heartbeat
// Requires MQTTnet (via NuGet) and System.Net.Http
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
class Program
{
private static readonly string USER_ID = "your_user_id";
private static readonly string ACCOUNT_KEY = "your_account_key";
private static readonly string OTHER_USER_IDS = ""; // Optional: "user1,user2"
private static readonly string TOPIC = $"/user/{USER_ID}/channel_feeds/#";
private static readonly string HEARTBEAT_URL = "https://webapi.ubibot.cn/mqtt-user-feeds/subcribe-ping";
private static readonly int HEARTBEAT_INTERVAL = 240; // seconds
private static readonly HttpClient httpClient = new HttpClient();
static async Task Main(string[] args)
{
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer("mqtt-api.ubibot.cn", 1883)
.WithCredentials($"user_id={USER_ID}", $"account_key={ACCOUNT_KEY}")
.WithCleanSession()
.Build();
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("[INFO] Connected to MQTT broker.");
await mqttClient.SubscribeAsync(TOPIC);
Console.WriteLine($"[INFO] Subscribed to: {TOPIC}");
// Start heartbeat loop
_ = Task.Run(() => StartHeartbeatLoop());
});
mqttClient.UseApplicationMessageReceivedHandler(e =>
{
Console.WriteLine($"[RECV] Topic: {e.ApplicationMessage.Topic}");
Console.WriteLine($"[RECV] Payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
});
mqttClient.UseDisconnectedHandler(e =>
{
Console.WriteLine("[WARN] Disconnected from MQTT broker.");
});
Console.WriteLine("[INFO] Connecting...");
await mqttClient.ConnectAsync(options);
Console.WriteLine("[INFO] Press any key to exit.");
Console.ReadLine();
}
static async Task StartHeartbeatLoop()
{
while (true)
{
try
{
var uriBuilder = new UriBuilder(HEARTBEAT_URL);
var query = $"account_key={ACCOUNT_KEY}";
if (!string.IsNullOrEmpty(OTHER_USER_IDS))
{
query += $"&user_id={OTHER_USER_IDS}";
}
uriBuilder.Query = query;
var response = await httpClient.GetAsync(uriBuilder.Uri);
var result = await response.Content.ReadAsStringAsync();
Console.WriteLine($"[HEARTBEAT] Status: {response.StatusCode}, Response: {result}");
}
catch (Exception ex)
{
Console.WriteLine($"[HEARTBEAT] Error: {ex.Message}");
}
await Task.Delay(HEARTBEAT_INTERVAL * 1000);
}
}
}