MQTT

介绍

MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布/订阅模型、可靠的消息传递协议,广泛应用于物联网领域,如智能家居、工业自动化、车联网等

Broker

MQTT代理一般运行在服务器端,用于连接管理(连接多个发布者、订阅者)及话题消息过滤、分发

Client

MQTT客户端有两种:发布者和接收者,发布者向MQTT代理发布消息,订阅者则订阅它们想要接收的消息

Mosquitto

Eclipse mosquitto是一个开源(EPL/EDL许可)消息代理,它实现了MQTT协议5.0、3.1.1和3.1版本。mosquitto是轻量级的,适用于从低功耗单板计算机到全服务器的所有设备

安装

Ubuntu22.04.3下安装

sudo apt install mosquitto
sudo apt install mosquitto-clients 
sudo apt install mosquitto-dev 

状态使能

通过systemctl命令查看mosquitto mqtt服务端状态

systemctl status mosquitto
systemctl start mosquitto
systemctl stop mosquitto

命令

mosquitto_pub

  • -h:指定服务器ip
  • -p:指定服务器端口号,一般默认是1883
  • -t:指定话题名
  • -m:指定payload内容
  • -u:指定用户名
  • -P:指定密码

mosquitto_sub

  • -h:指定服务器ip
  • -p:指定服务器端口号,一般默认是1883
  • -t:指定话题名
  • -u:指定用户名
  • -P:指定密码

测试

服务端

发布者(测试环境为树莓派4B、系统为Ubuntu22.04.3 LTS),下面的例程用于发布树莓派cpu温度和当前工作电压

#!/bin/bash

while true
do 
	volt=`sudo vcgencmd measure_volts core | awk -F= '{print $2}' | awk -FV '{print $1}'`
	echo "volt : $volt"
	temp=`sudo vcgencmd measure_temp | awk -F= '{print $2}' | awk -F\' '{print $1}'`
	echo "temp : $temp"
	mosquitto_pub -t /rasp -m "{\"temp\":$temp,\"volt\":$volt}"  -h 192.168.56.131 -p 1883
	sleep 1
done

客户端

订阅者(测试环境为电脑虚拟机,系统为Ubuntu22.04.3 LTS)

mosquitto_sub -t /rasp -h 192.168.56.131 -p 1883

嵌入式网络库,如mongoose也能实现mqtt通信功能,以下是客户端代码示范(测试环境为Windows10):

#include "mongoose.h"

static const char *s_url = "mqtt://192.168.56.131:1883";   // MQTT URL
static const char *s_sub_topic = "/rasp";               // Publish topic
static const char *s_pub_topic = "mg/clnt/test";        // Subscribe topic
static int s_qos = 1;                                   // MQTT QoS
static struct mg_connection *s_conn;                    // Client connection

// Handle interrupts, like Ctrl-C
static int s_signo;
static void signal_handler(int signo) {
    s_signo = signo;
}

static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
    if (ev == MG_EV_OPEN) {
        MG_INFO(("%lu CREATED", c->id));
        // c->is_hexdumping = 1;
    } else if (ev == MG_EV_ERROR) {
        // On error, log error message
        MG_ERROR(("%lu ERROR %s", c->id, (char *) ev_data));
    } else if (ev == MG_EV_MQTT_OPEN) {
        // MQTT connect is successful
        struct mg_str subt = mg_str(s_sub_topic);
        struct mg_str pubt = mg_str(s_pub_topic), data = mg_str("hello");
        MG_INFO(("%lu CONNECTED to %s", c->id, s_url));
        struct mg_mqtt_opts sub_opts;
        memset(&sub_opts, 0, sizeof(sub_opts));
        sub_opts.topic = subt;
        sub_opts.qos = s_qos;
        mg_mqtt_sub(c, &sub_opts);
        MG_INFO(("%lu SUBSCRIBED to %.*s", c->id, (int) subt.len, subt.ptr));
        struct mg_mqtt_opts pub_opts;
        memset(&pub_opts, 0, sizeof(pub_opts));
        pub_opts.topic = pubt;
        pub_opts.message = data;
        pub_opts.qos = s_qos, pub_opts.retain = false;
        mg_mqtt_pub(c, &pub_opts);
        MG_INFO(("%lu PUBLISHED %.*s -> %.*s", c->id, (int) data.len, data.ptr,
                 (int) pubt.len, pubt.ptr));
    } else if (ev == MG_EV_MQTT_MSG) {
        // When we get echo response, print it
        struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
        MG_INFO(("%lu RECEIVED %.*s <- %.*s", c->id, (int) mm->data.len,
                 mm->data.ptr, (int) mm->topic.len, mm->topic.ptr));
    } else if (ev == MG_EV_CLOSE) {
        MG_INFO(("%lu CLOSED", c->id));
        s_conn = NULL;  // Mark that we're closed
    }
    (void) fn_data;
}

// Timer function - recreate client connection if it is closed
static void timer_fn(void *arg) {
    struct mg_mgr *mgr = (struct mg_mgr *) arg;
    struct mg_mqtt_opts opts = {.clean = true,
                .qos = s_qos,
                .topic = mg_str(s_pub_topic),
                .version = 4,
                .message = mg_str("bye")};
    if (s_conn == NULL) s_conn = mg_mqtt_connect(mgr, s_url, &opts, fn, NULL);
}

int main(int argc, char *argv[]) {
    struct mg_mgr mgr;
    int i;

    // Parse command-line flags
    for (i = 1; i < argc; i++) {
        if (strcmp(argv[i], "-u") == 0 && argv[i + 1] != NULL) {
            s_url = argv[++i];
        } else if (strcmp(argv[i], "-p") == 0 && argv[i + 1] != NULL) {
            s_pub_topic = argv[++i];
        } else if (strcmp(argv[i], "-s") == 0 && argv[i + 1] != NULL) {
            s_sub_topic = argv[++i];
        } else if (strcmp(argv[i], "-v") == 0 && argv[i + 1] != NULL) {
            mg_log_set(atoi(argv[++i]));
        } else {
            MG_ERROR(("Unknown option: %s. Usage:", argv[i]));
            MG_ERROR(
                        ("%s [-u mqtts://SERVER:PORT] [-p PUB_TOPIC] [-s SUB_TOPIC] "
                         "[-v DEBUG_LEVEL]",
                         argv[0], argv[i]));
            return 1;
        }
    }

    signal(SIGINT, signal_handler);   // Setup signal handlers - exist event
    signal(SIGTERM, signal_handler);  // manager loop on SIGINT and SIGTERM

    mg_mgr_init(&mgr);
    struct mg_tls_opts opts = {.client_ca = mg_str(CA_ALL)};
    mg_tls_ctx_init(&mgr, &opts);
    mg_timer_add(&mgr, 3000, MG_TIMER_REPEAT | MG_TIMER_RUN_NOW, timer_fn, &mgr);
    while (s_signo == 0) mg_mgr_poll(&mgr, 1000);  // Event loop, 1s timeout
    mg_mgr_free(&mgr);                             // Finished, cleanup

    return 0;
}

Wireshark

mqtt过滤结果

通过查看payload部分,可以大概看出消息内容

总结

mqtt很适合应用于分布式的嵌入式节点之间的信息交换,相比于ROS、ROS2的发布订阅模型,mqtt的payload显得更加轻量化一些

阅读剩余
THE END