MQTT的发布功能是客户端向Broker发送消息到指定主题的核心操作,结合paho-mqtt C++库,实现发布功能需遵循连接Broker→构造消息→发布消息→处理发布结果的流程。以下是具体步骤、代码示例及关键细节说明。
一、实现发布功能的核心步骤
- 初始化客户端并连接Broker:先建立与MQTT Broker的连接(基础前提)。
- 构造MQTT消息:指定消息的主题、负载(内容)、QoS等级、保留标志等属性。
- 调用发布接口:通过客户端实例发送消息,支持同步/异步发布。
- 处理发布结果:通过回调或返回值确认消息是否发布成功(尤其QoS>0时)。
- 断开连接(可选):发布完成后按需断开与Broker的连接。
二、基础发布功能实现(同步发布)
步骤1:配置基础信息
定义Broker地址、客户端ID、发布主题等常量:
1 2 3 4 5 6 7
| #include <iostream> #include <mqtt/client.h>
const std::string BROKER_ADDRESS = "tcp://test.mosquitto.org:1883"; const std::string CLIENT_ID = "mqtt_publisher_demo"; const std::string PUBLISH_TOPIC = "test/cpp/publish";
|
步骤2:创建客户端并连接Broker
初始化MQTT客户端实例,配置连接选项并建立连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| mqtt::client client(BROKER_ADDRESS, CLIENT_ID);
mqtt::connect_options conn_opts; conn_opts.set_clean_session(true); conn_opts.set_keep_alive_interval(20);
try { std::cout << "连接Broker: " << BROKER_ADDRESS << std::endl; client.connect(conn_opts); std::cout << "连接成功!" << std::endl; } catch (const mqtt::exception& e) { std::cerr << "连接失败: " << e.what() << std::endl; return 1; }
|
步骤3:构造并发布消息
通过mqtt::message类构造消息,调用client.publish()发布:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| std::string payload = "Hello, MQTT from C++ Publisher!"; int qos = 1; bool retained = false;
mqtt::message msg(PUBLISH_TOPIC, payload, qos, retained);
try { std::cout << "发布消息到主题: " << PUBLISH_TOPIC << std::endl; client.publish(msg); std::cout << "消息发布成功!内容: " << payload << std::endl; } catch (const mqtt::exception& e) { std::cerr << "发布失败: " << e.what() << std::endl; client.disconnect(); return 1; }
|
步骤4:断开连接(可选)
发布完成后断开与Broker的连接:
1 2 3
| client.disconnect(); std::cout << "已断开与Broker的连接" << std::endl;
|
完整代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| #include <iostream> #include <mqtt/client.h>
const std::string BROKER_ADDRESS = "tcp://test.mosquitto.org:1883"; const std::string CLIENT_ID = "mqtt_publisher_demo"; const std::string PUBLISH_TOPIC = "test/cpp/publish";
int main() { mqtt::client client(BROKER_ADDRESS, CLIENT_ID);
mqtt::connect_options conn_opts; conn_opts.set_clean_session(true); conn_opts.set_keep_alive_interval(20);
try { client.connect(conn_opts); std::cout << "连接Broker成功!" << std::endl; } catch (const mqtt::exception& e) { std::cerr << "连接失败: " << e.what() << std::endl; return 1; }
std::string payload = "Hello, MQTT from C++ Publisher!"; mqtt::message msg(PUBLISH_TOPIC, payload, 1, false);
try { client.publish(msg); std::cout << "消息发布成功!" << std::endl; std::cout << "主题: " << PUBLISH_TOPIC << std::endl; std::cout << "内容: " << payload << std::endl; } catch (const mqtt::exception& e) { std::cerr << "发布失败: " << e.what() << std::endl; client.disconnect(); return 1; }
client.disconnect(); std::cout << "断开连接成功!" << std::endl;
return 0; }
|
编译运行
1 2 3 4 5
| g++ -std=c++11 mqtt_publisher.cpp -o mqtt_publisher -lpaho-mqttpp3 -lpaho-mqtt3as
./mqtt_publisher
|
三、进阶发布功能实现
1. 异步发布与发布确认(QoS>0)
对于QoS=1或2的消息,可通过异步发布+回调确认消息是否送达Broker:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #include <iostream> #include <mqtt/client.h> #include <mqtt/callback.h>
class PublishCallback : public virtual mqtt::callback { public: void delivery_complete(mqtt::delivery_token_ptr tok) override { if (tok) { std::cout << "\n发布完成!消息ID: " << tok->get_message_id() << std::endl; std::cout << "主题: " << tok->get_message()->get_topic() << std::endl; } } };
int main() { mqtt::client client(BROKER_ADDRESS, CLIENT_ID); PublishCallback cb; client.set_callback(cb);
mqtt::connect_options conn_opts; conn_opts.set_clean_session(true); client.connect(conn_opts);
std::string payload = "异步发布的MQTT消息"; mqtt::message msg(PUBLISH_TOPIC, payload, 1, false); mqtt::delivery_token_ptr tok = client.publish(msg);
tok->wait_for_completion(); std::cout << "异步发布完成!" << std::endl;
client.disconnect(); return 0; }
|
2. 批量发布多条消息
循环发布多条消息到同一或不同主题:
1 2 3 4 5 6 7 8
| for (int i = 0; i < 5; ++i) { std::string payload = "批量消息 " + std::to_string(i + 1); mqtt::message msg(PUBLISH_TOPIC, payload, 0); client.publish(msg); std::cout << "已发布: " << payload << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); }
|
3. 发布保留消息
保留消息会被Broker存储,后续订阅该主题的客户端会立即收到最后一条保留消息:
1 2 3 4
| mqtt::message msg(PUBLISH_TOPIC, "保留消息内容", 1, true); client.publish(msg); std::cout << "保留消息发布成功!" << std::endl;
|
4. 带认证的发布(用户名/密码)
若Broker需身份认证,需在连接选项中配置用户名和密码:
1 2 3 4 5 6 7
| conn_opts.set_user_name("your_username"); conn_opts.set_password("your_password");
client.connect(conn_opts); client.publish(PUBLISH_TOPIC, "带认证的消息", 1);
|
四、关键参数说明
| 参数 |
作用 |
可选值/示例 |
| QoS等级 |
消息交付保证机制 |
0(最多一次)、1(至少一次)、2(恰好一次) |
| 保留标志 |
Broker是否存储最后一条消息,供新订阅者接收 |
true(保留)、false(不保留) |
| 发布令牌(delivery_token) |
跟踪异步发布的状态,用于确认发布完成 |
tok->wait_for_completion() 等待完成 |
| 主题(Topic) |
消息的分类标识,支持层级(如sensor/temp) |
test/cpp/publish、sensor/#(通配符) |
五、常见问题排查
1. 消息发布成功但订阅端未收到?
- 检查发布主题与订阅主题是否完全匹配(区分大小写)。
- 确认QoS配置:订阅端QoS需≥发布端QoS才能接收对应消息。
- 若使用QoS=1/2,确保客户端发布后未立即断开连接(需等待Broker确认)。
2. 发布时抛出异常?
- 检查Broker连接是否正常(未连接时发布会抛出异常)。
- 确认主题格式合法(不能包含空格、通配符仅用于订阅)。
- 若Broker启用认证,需配置正确的用户名/密码。
3. 异步发布的回调未触发?
- 确保设置了回调类(
client.set_callback(cb))。
- 确认消息的QoS>0(QoS=0无发布确认,不会触发回调)。
六、总结
实现MQTT发布功能的核心流程为:连接Broker→构造消息→调用publish接口→处理结果。根据业务需求,可选择同步/异步发布、设置不同QoS等级、发布保留消息等。关键需注意Broker连接状态、主题匹配规则及QoS机制,确保消息可靠送达。