MQTT
介绍
MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布/订阅模型、可靠的消息传递协议,广泛应用于物联网领域,如智能家居、工业自动化、车联网等
Broker
MQTT代理一般运行在服务器端,用于连接管理(连接多个发布者、订阅者)及话题消息过滤、分发
Client
MQTT客户端有两种:发布者和接收者,发布者向MQTT代理发布消息,订阅者则订阅它们想要接收的消息
Mosquitto
Eclipse mosquitto是一个开源(EPL/EDL许可)消息代理,它实现了MQTT协议5.0、3.1.1和3.1版本。mosquitto是轻量级的,适用于从低功耗单板计算机到全服务器的所有设备
安装
Ubuntu22.04.3下安装
sudo apt install mosquitto
sudo apt install mosquitto-clients
sudo apt install mosquitto-dev
状态使能
通过systemctl命令查看mosquitto mqtt服务端状态
systemctl status mosquitto
systemctl start mosquitto
systemctl stop mosquitto
命令
mosquitto_pub
- -h:指定服务器ip
- -p:指定服务器端口号,一般默认是1883
- -t:指定话题名
- -m:指定payload内容
- -u:指定用户名
- -P:指定密码
mosquitto_sub
- -h:指定服务器ip
- -p:指定服务器端口号,一般默认是1883
- -t:指定话题名
- -u:指定用户名
- -P:指定密码
测试
服务端
发布者(测试环境为树莓派4B、系统为Ubuntu22.04.3 LTS),下面的例程用于发布树莓派cpu温度和当前工作电压
#!/bin/bash
while true
do
volt=`sudo vcgencmd measure_volts core | awk -F= '{print $2}' | awk -FV '{print $1}'`
echo "volt : $volt"
temp=`sudo vcgencmd measure_temp | awk -F= '{print $2}' | awk -F\' '{print $1}'`
echo "temp : $temp"
mosquitto_pub -t /rasp -m "{\"temp\":$temp,\"volt\":$volt}" -h 192.168.56.131 -p 1883
sleep 1
done
客户端
订阅者(测试环境为电脑虚拟机,系统为Ubuntu22.04.3 LTS)
mosquitto_sub -t /rasp -h 192.168.56.131 -p 1883
嵌入式网络库,如mongoose也能实现mqtt通信功能,以下是客户端代码示范(测试环境为Windows10):
#include "mongoose.h"
static const char *s_url = "mqtt://192.168.56.131:1883"; // MQTT URL
static const char *s_sub_topic = "/rasp"; // Publish topic
static const char *s_pub_topic = "mg/clnt/test"; // Subscribe topic
static int s_qos = 1; // MQTT QoS
static struct mg_connection *s_conn; // Client connection
// Handle interrupts, like Ctrl-C
static int s_signo;
static void signal_handler(int signo) {
s_signo = signo;
}
static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_OPEN) {
MG_INFO(("%lu CREATED", c->id));
// c->is_hexdumping = 1;
} else if (ev == MG_EV_ERROR) {
// On error, log error message
MG_ERROR(("%lu ERROR %s", c->id, (char *) ev_data));
} else if (ev == MG_EV_MQTT_OPEN) {
// MQTT connect is successful
struct mg_str subt = mg_str(s_sub_topic);
struct mg_str pubt = mg_str(s_pub_topic), data = mg_str("hello");
MG_INFO(("%lu CONNECTED to %s", c->id, s_url));
struct mg_mqtt_opts sub_opts;
memset(&sub_opts, 0, sizeof(sub_opts));
sub_opts.topic = subt;
sub_opts.qos = s_qos;
mg_mqtt_sub(c, &sub_opts);
MG_INFO(("%lu SUBSCRIBED to %.*s", c->id, (int) subt.len, subt.ptr));
struct mg_mqtt_opts pub_opts;
memset(&pub_opts, 0, sizeof(pub_opts));
pub_opts.topic = pubt;
pub_opts.message = data;
pub_opts.qos = s_qos, pub_opts.retain = false;
mg_mqtt_pub(c, &pub_opts);
MG_INFO(("%lu PUBLISHED %.*s -> %.*s", c->id, (int) data.len, data.ptr,
(int) pubt.len, pubt.ptr));
} else if (ev == MG_EV_MQTT_MSG) {
// When we get echo response, print it
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
MG_INFO(("%lu RECEIVED %.*s <- %.*s", c->id, (int) mm->data.len,
mm->data.ptr, (int) mm->topic.len, mm->topic.ptr));
} else if (ev == MG_EV_CLOSE) {
MG_INFO(("%lu CLOSED", c->id));
s_conn = NULL; // Mark that we're closed
}
(void) fn_data;
}
// Timer function - recreate client connection if it is closed
static void timer_fn(void *arg) {
struct mg_mgr *mgr = (struct mg_mgr *) arg;
struct mg_mqtt_opts opts = {.clean = true,
.qos = s_qos,
.topic = mg_str(s_pub_topic),
.version = 4,
.message = mg_str("bye")};
if (s_conn == NULL) s_conn = mg_mqtt_connect(mgr, s_url, &opts, fn, NULL);
}
int main(int argc, char *argv[]) {
struct mg_mgr mgr;
int i;
// Parse command-line flags
for (i = 1; i < argc; i++) {
if (strcmp(argv[i], "-u") == 0 && argv[i + 1] != NULL) {
s_url = argv[++i];
} else if (strcmp(argv[i], "-p") == 0 && argv[i + 1] != NULL) {
s_pub_topic = argv[++i];
} else if (strcmp(argv[i], "-s") == 0 && argv[i + 1] != NULL) {
s_sub_topic = argv[++i];
} else if (strcmp(argv[i], "-v") == 0 && argv[i + 1] != NULL) {
mg_log_set(atoi(argv[++i]));
} else {
MG_ERROR(("Unknown option: %s. Usage:", argv[i]));
MG_ERROR(
("%s [-u mqtts://SERVER:PORT] [-p PUB_TOPIC] [-s SUB_TOPIC] "
"[-v DEBUG_LEVEL]",
argv[0], argv[i]));
return 1;
}
}
signal(SIGINT, signal_handler); // Setup signal handlers - exist event
signal(SIGTERM, signal_handler); // manager loop on SIGINT and SIGTERM
mg_mgr_init(&mgr);
struct mg_tls_opts opts = {.client_ca = mg_str(CA_ALL)};
mg_tls_ctx_init(&mgr, &opts);
mg_timer_add(&mgr, 3000, MG_TIMER_REPEAT | MG_TIMER_RUN_NOW, timer_fn, &mgr);
while (s_signo == 0) mg_mgr_poll(&mgr, 1000); // Event loop, 1s timeout
mg_mgr_free(&mgr); // Finished, cleanup
return 0;
}
Wireshark
mqtt过滤结果
通过查看payload部分,可以大概看出消息内容
总结
阅读剩余
版权声明:
作者:hywing
链接:https://iotstuff.cn/mqtt/
文章版权归作者所有,未经允许请勿转载。
THE END