【一个很好用的软件包】RT-Thread之mqttclient软件包

发布于 2020-02-28 19:31:14
    本帖最后由 杰杰 于 2020-2-28 19:44 编辑


[md]# 关于mqttclient软件包

**一个基于socket API之上的跨平台MQTT客户端**

基于socket API的MQTT客户端,拥有非常简洁的API接口,以极少的资源实现QOS2的服务质量,并且无缝衔接了mbedtls加密库。此仓库是专门为RT-Thread做的软件包,原始仓库位于:[]()

## 优势:
- **基于标准BSD socket之上开发**,只要是兼容BSD socket的系统均可使用。
- **稳定**:无论是`掉线重连`,`丢包重发`,都是严格`遵循MQTT协议标准`执行,除此之外对**大数据量**的测试无论是收是发,都是非常稳定(一次发送`135K`数据,3秒一次),高频测试也是非常稳定(7个主题同时收发,每秒一次,也就是1秒14个mqtt报文,服务质量QoS0、QoS1、QoS2都有)。因为作者以极少的资源设计了`记录机制`,对采用QoS1服务质量的报文必须保证到达一次,对QoS2服务质量的报文有且只有收到一次(如果不相信它稳定性的同学可以自己去修改源码,专门为QoS2服务质量去测试,故意不回复`PUBREC`,让服务器重发QoS2报文,看看客户端是否有且只有处理一次),而对于掉线重连的稳定性,则是**基本操作**了,没啥好说的,因此在测试中稳定性极好。
- **轻量级**:整个代码工程极其简单,不使用mbedtls情况下,占用资源极少,作者曾使用esp8266模组与云端通信,整个工程代码消耗的RAM不足15k(包括系统占用的开销,对数据的处理开销,而此次还是未优化的情况下,还依旧完美保留了掉线重连的稳定性,但是对应qos1 qos2服务质量的报文则未做测试,因为STM32F103C8T6芯片资源实在是太少了,折腾不起)。
- **无缝衔接mbedtls加密传输**,让网络传输更加安全。
- **拥有极简的API接口**,随意配置,使用起来非常简单。
- **有非常好的代码风格与思想**:整个代码采用分层式设计,代码实现采用异步处理的思想,降低耦合,提高性能。
- **MQTT协议支持主题通配符`“#”、“+”`**。
- **订阅的主题与消息处理完全分离**,让编程逻辑更加简单易用,用户无需理会错综复杂的逻辑关系。
- **不对外产生依赖。**
- **mqttclient内部已实现保活处理机制**,无需用户过多关心理会,用户只需专心处理应用功能即可。


## 整体框架

拥有非常明确的分层框架。

![整体框架]()

# API
`mqttclient`拥有非常简洁的`api`接口,并且api见名知其义,非常易于使用。
```c
int mqtt_init(mqtt_client_t* c, client_init_params_t* init);
int mqtt_release(mqtt_client_t* c);
int mqtt_connect(mqtt_client_t* c);
int mqtt_disconnect(mqtt_client_t* c);
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);

int mqtt_keep_alive(mqtt_client_t* c);
int mqtt_yield(mqtt_client_t* c, int timeout_ms);
```
## 核心
**mqtt_client_t 是核心结构**
```c
typedef struct mqtt_client {
unsigned short packet_id;
unsigned char ping_outstanding;
unsigned char ack_handler_number;
unsigned char *read_buf;
unsigned char *write_buf;
unsigned int cmd_timeout;
unsigned int read_buf_size;
unsigned int write_buf_size;
unsigned int reconnect_try_duration;
void *reconnect_date;
reconnect_handler_t reconnect_handler;
client_state_t client_state;
platform_mutex_t write_lock;
platform_mutex_t global_lock;
list_t msg_handler_list;
list_t ack_handler_list;
network_t *network;
platform_thread_t *thread;
platform_timer_t reconnect_timer;
platform_timer_t last_sent;
platform_timer_t last_received;
connect_params_t *connect_params;
} mqtt_client_t;
```

该结构主要维护以下内容:
1. 读写数据缓冲区`read_buf、write_buf`
2. 命令超时时间`cmd_timeout`(主要是读写阻塞时间、等待响应的时间、重连等待时间)
3. 维护`ack`链表`ack_handler_list`,这是异步实现的核心,所有等待响应的报文都会被挂载到这个链表上
4. 维护消息处理列表`msg_handler_list`,这是`mqtt`协议必须实现的内容,所有来自服务器的`publish`报文都会被处理(前提是订阅了对应的消息)
5. 维护一个网卡接口`network`
6. 维护一个内部线程`thread`,所有来自服务器的mqtt包都会在这里被处理!
7. 两个定时器,分别是掉线重连定时器与保活定时器`reconnect_timer、last_sent、last_received`
8. 一些连接的参数`connect_params`

## 初始化
主要是配置`mqtt_client_t`结构的相关信息,如果没有指定初始化参数,则系统会提供默认的参数。
但连接部分的参数则必须指定:
```c
init_params.connect_params.network_params.addr = "[你的mqtt服务器IP地址或者是域名]";
init_params.connect_params.network_params.port = "1883"; //端口号
init_params.connect_params.user_name = "jiejietop";
init_params.connect_params.password = "123456";
init_params.connect_params.client_id = "clientid";

mqtt_init(&client, &init_params);
```
## 连接服务器
```c
mqtt_connect(&client);
```
## 订阅报文
参数只有 `mqtt_client_t` 类型的指针,字符串类型的`主题`(支持通配符"#" "+"),主题的`服务质量`,以及收到报文的`处理函数`,如不指定则有默认处理函数。
```c
mqtt_subscribe(&client, "testtopic0", QOS0, topic_test1_handler);
mqtt_subscribe(&client, "testtopic1", QOS1, NULL);
mqtt_subscribe(&client, "testtopic2", QOS2, NULL);
```

## 发布报文
参数只有 `mqtt_client_t` 类型的指针,字符串类型的`主题`(支持通配符),要发布的消息(包括`服务质量`、`消息主体`)。
```c
mqtt_message_t msg;

msg.qos = 2;
msg.payload = (void *) buf;

mqtt_publish(&client, "testtopic1", &msg);
```

其他的API接口都是非常简单的,在后文会提及到。

[/md]

查看更多

关注者
0
被浏览
1.4k
49 个回答
杰杰
杰杰 2020-02-28
    本帖最后由 杰杰 于 2020-2-28 19:43 编辑


[md]# 使用mqttclient软件包
目前作者已经将mqttclient制作成RT-Thread的软件包了,大家可以通过env工具或者 RT-Thread Studio 直接使用软件包。

## env工具
随着 `package` 系统的不断壮大,会有越来越多的软件包加入进来,所以本地看到 `menuconfig` 中的软件包列表可能会与服务器 不同步 。使用 `pkgs --upgrade` 命令即可解决该问题,这个命令不仅会对本地的包信息进行更新同步,还会对 env 的功能脚本进行升级,建议定期使用。

**本次测试使用野火STM32F429挑战者开发板**

1. 所以用 `pkgs --upgrade` 命令先同步一下软件包。
2. menuconfig命令打开配置。
3. 在以下路径下选中mqttclient软件包,除此之外还要打开lwip、以太网接口 或者 SAL->套接字接口。
```bash
Location:
-> RT-Thread online packages
-> IoT - internet of things
-> mqttclient
```
4. 然后就是随意配置了。

![mqttclient配置]()

### mbedtls

默认不打开mbedtls。

### salof

[salof](https://github.com/jiejieTop/salof) 全称是:`Synchronous Asynchronous Log Output Framework`(同步异步日志输出框架),它是一个异步日志输出库,在空闲时候输出对应的日志信息,并且该库与mqttclient无缝衔接。

**配置对应的日志输出级别:**

```c
#define BASE_LEVEL (0)
#define ASSERT_LEVEL (BASE_LEVEL + 1) /* 日志输出级别:断言级别(非常高优先级) */
#define ERR_LEVEL (ASSERT_LEVEL + 1) /* 日志输出级别:错误级别(高优先级) */
#define WARN_LEVEL (ERR_LEVEL + 1) /* 日志输出级别:警告级别(中优先级) */
#define INFO_LEVEL (WARN_LEVEL + 1) /* 日志输出级别:信息级别(低优先级) */
#define DEBUG_LEVEL (INFO_LEVEL + 1) /* 日志输出级别:调试级别(更低优先级) */

#define LOG_LEVEL WARN_LEVEL /* 日志输出级别 */
```

**日志其他选项:**

- 终端带颜色
- 时间戳
- 标签

### mqtt

配置mqtt等待应答列表的最大值,对于qos1 qos2服务质量有要求的可以将其设置大一点,当然也必须资源跟得上,它主要是保证qos1 qos2的mqtt报文能准确到达服务器。

```c
#define MQTT_ACK_HANDLER_NUM_MAX 64
```

选择MQTT协议的版本,默认为4,表示使用MQTT 3.1.1版本,而3则表示为MQTT 3.1版本。

```c
#define MQTT_VERSION 4 // 4 is mqtt 3.1.1
```

设置默认的保活时间,它主要是保证MQTT客户端与服务器的保持活性连接,单位为 秒 ,比如MQTT客户端与服务器100S没有发送数据了,有没有接收到数据,此时MQTT客户端会发送一个ping包,确认一下这个会话是否存在,如果收到服务器的应答,那么说明这个会话还是存在的,可以随时收发数据,而如果不存在了,就清除会话。

```c
#define MQTT_KEEP_ALIVE_INTERVAL 100 // unit: second
```

默认的命令超时,它主要是用于socket读写超时,在MQTT初始化时可以指定:

```
#define MQTT_DEFAULT_CMD_TIMEOUT 4000
```

默认主题的长度,主题是支持通配符的,如果主题太长则会被截断:

```c
#define MQTT_TOPIC_LEN_MAX 64
```

默认的算法数据缓冲区的大小,如果要发送大量数据则修改大一些,在MQTT初始化时可以指定:

```c
#define MQTT_DEFAULT_BUF_SIZE 1024
```

线程相关的配置,如线程栈,线程优先级,线程时间片等:
在linux环境下可以是不需要理会这些参数的,而在RTOS平台则需要配置,如果不使用mbedtls,线程栈2048字节已足够,而使用mbedtls加密后,需要配置4096字节以上。
```c
#define MQTT_THREAD_STACK_SIZE 2048 // 线程栈
#define MQTT_THREAD_PRIO 5 // 线程优先级
#define MQTT_THREAD_TICK 50 // 线程时间片
```

默认的重连时间间隔,当发生掉线时,会以这个时间间隔尝试重连:
```c
#define MQTT_RECONNECT_DEFAULT_DURATION 1000
```

其他不需要怎么配置的东西:
```c
#define MQTT_MAX_PACKET_ID (0xFFFF - 1) // mqtt报文id
#define MQTT_MAX_CMD_TIMEOUT 20000 //最大的命令超时参数
#define MQTT_MIN_CMD_TIMEOUT 1000 //最小的命令超时参数
```

> ps:以上参数基本不需要怎么配置的,直接用即可~

5. 最后通过`scons --target=mdk5`命令生成mdk工程,然后编译下载到开发板后运行就行了(需要使用mqttclient测试代码),目前作者提供服务器仅供测试。

## RT-Thread Studio使用

1. 通过RT-Thread Setting打开lwip、以太网接口然后选择在线软件包添加到工程中,然后保存配置就可以看到工程已经添加了mqttclient软件包了。

![添加软件包]()

**注意**:如果遇到添加软件包失败的话,很可能是因为RT-Thread Studio中的软件包还没更新或者更新失败,那么可以到软件安装目录`RT-ThreadStudio\platform\env_released\env\packages\packages`下手动更新软件包,然后将master重置到最新的分支就行了(如果实在是不行就直接用env吧):

![手动更新软件包]()[/md]
杰杰
杰杰 2020-02-28
取消订阅
与订阅报文的逻辑基本差不多的~

1. 序列化订阅报文并且发送给服务器
c<br/>MQTTSerialize_unsubscribe(c->write_buf, c->write_buf_size, 0, packet_id, 1, &topic)<br/>mqtt_send_packet(c, len, &timer)<br/>
2. 创建对应的消息处理节点,这个消息节点在收到服务器的UNSUBACK取消订阅应答报文后将消息处理列表msg_handler_list上的已经订阅的主题消息节点销毁
c<br/>mqtt_msg_handler_create((const char*)topic_filter, QOS0, NULL)<br/>
3. 在发送了报文给服务器那就要等待服务器的响应了,先记录这个等待UNSUBACK
c<br/>mqtt_ack_list_record(c, UNSUBACK, packet_id, len, msg_handler)<br/>
杰杰
杰杰 2020-02-28

发布收到与发布释放报文的处理

c<br/>static int mqtt_pubrec_and_pubrel_packet_handle(mqtt_client_t *c, platform_timer_t *timer)<br/>
1. 反序列化报文
c<br/>MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)<br/>
2. 产生一个对应的应答报文
c<br/>mqtt_publish_ack_packet(c, packet_id, packet_type);<br/>
3. 取消对应的ack记录
c<br/>mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)<br/>
杰杰
杰杰 2020-02-28
新人发帖随便溜达溜达~:lol
pjdu
pjdu 2020-02-28
多谢分享,马克
杰杰
杰杰 2020-02-28
whj467467222 发表于 2020-2-28 21:27
杰神,666


你怎么回帖了600多次,,,:lol:lol:lol:lol:lol:lol:lol:lol
pkokoc
pkokoc 2020-02-29
太牛了,谢谢分享
翌雨芩风
翌雨芩风 2020-03-02
版主:
我的模块使用的是ESP:8266,这个使用的是LWIP,我把 LWIP调用的头文件全去了,直接用AP -SAL封装的 SOCK接口没有什么问题吧。
另:我要连接 ONE-NET还应该怎么操用。
谢谢!!!




1.png

2.png




杰杰
杰杰 2020-03-02
翌雨芩风 发表于 2020-3-2 11:46
版主:
我的模块使用的是ESP:8266,这个使用的是LWIP,我把 LWIP调用的头文件全去了,直接用AP -SAL封装的 SO ...


rtt是支持BSDsocket的,去掉没有问题的,你已经连上了我的服务器了,连接onenet就把你的域名、端口号、用户名密码之类的改了就好了
翌雨芩风
翌雨芩风 2020-03-02
杰杰 发表于 2020-3-2 13:36
rtt是支持BSDsocket的,去掉没有问题的,你已经连上了我的服务器了,连接onenet就把你的域名、端口号、用 ...


但是:上要要求的输入没有我的产品号,KEY,设备号,KEY,没法输入,




1.png

2.png
杰杰
杰杰 2020-03-02
翌雨芩风 发表于 2020-3-2 15:19
但是:上要要求的输入没有我的产品号,KEY,设备号,KEY,没法输入,




它这些 产品号,KEY、设备号的核心就是生成用户名与密码,这是他们自己的算法,具体你去了解一下就行了,没有哪个MQTT协议是用这些设备三元组连接的,都是为了生成用户名与密码
杰杰
杰杰 2020-03-03
armink 发表于 2020-3-2 20:04
此包必火~~


大神来了{:2_37:}{:2_37:}
翌雨芩风
翌雨芩风 2020-03-04
楼主:按照你提供的例子:我移植到ONE-NET,结果出现AT超时,可能是啥情况??




1.png
杰杰
杰杰 2020-03-04
翌雨芩风 发表于 2020-3-4 17:16
楼主:按照你提供的例子:我移植到ONE-NET,结果出现AT超时,可能是啥情况??




没有读到数据呀,你是不是没给板子发数据
杰杰
杰杰 2020-03-04
翌雨芩风 发表于 2020-3-4 17:16
楼主:按照你提供的例子:我移植到ONE-NET,结果出现AT超时,可能是啥情况??




你应该是连接上了,晚点我把log去掉,没收到数据就不用理他
hejiang177
hejiang177 2020-03-06
不错不错,这才是完整的mqttclient
杰杰
杰杰 2020-03-06
hejiang177 发表于 2020-3-6 10:05
不错不错,这才是完整的mqttclient


谢谢~:loveliness:
hejiang177
hejiang177 2020-03-11
楼主你好,MQTT在ENV环境中配置使能后,pkgs --update scons --target=mdk5后,工程目录里没有这个软件包是什么原因呢
whj467467222
whj467467222 2020-03-11
hejiang177 发表于 2020-3-11 13:59
楼主你好,MQTT在ENV环境中配置使能后,pkgs --update scons --target=mdk5后,工程目录里没有这个软件包是 ...


pkgs --upgrade
hejiang177 发表于 2020-3-11 13:59
楼主你好,MQTT在ENV环境中配置使能后,pkgs --update scons --target=mdk5后,工程目录里没有这个软件包是 ...


版本选1.0.0就可以了
有个问题,如果不想用连接了,怎么释放资源呢?
试过了mqtt_disconnect和mqtt_release,都不能达到要求,而且只调用mqtt_disconnect也会assertion报错,似乎是内存重复释放了


还有个小问题,在platform_thread.c文件中,rt_thread_init创建的线程,需要用rt_thread_detach删除,这是个小bug

杰杰
杰杰 2020-03-12
踩姑娘的小蘑菇 发表于 2020-3-12 18:30
有个问题,如果不想用连接了,怎么释放资源呢?
试过了mqtt_disconnect和mqtt_release,都不能达到要求,而 ...


好的~感谢,我修复一下
hejiang177
hejiang177 2020-03-23
踩姑娘的小蘑菇 发表于 2020-3-12 18:24
版本选1.0.0就可以了


嗯,了解,是Kconfig里的名称不一样了
pjdu
pjdu 2020-03-25
package里面有kawaii-mqtt和mqttclient,这两个有什么差别吗
杰杰
杰杰 2020-03-28
pjdu 发表于 2020-3-25 15:28
package里面有kawaii-mqtt和mqttclient,这两个有什么差别吗


用kawaii-mqtt,mqttclient因为宏的命名太通用了(可能会造成全局污染),我已经修改了
haov000
haov000 2020-04-02
有几个地方请教下:
1、在实际应用的时候如果mqtt_connect()这个函数调用返回失败,是不是需要不断调用直到成功才可以?不然是没法建立线程,维持mqtt的连接的
2、我看到mqtt_yield_thread()这个主线程也有可能出错退出,这种情况下是不是就不能维持mqtt的连接了?还是一旦mqtt_yield_thread()这个线程退出,说明发生了灾难性的错误也不能维持mqtt的连接了?
tanc
tanc 2020-04-03
我这边使用4g模块AT组件的方式使用kawaii-mqtt跑历程,程序直接挂掉,请问什么原因?帖子
haov000 发表于 2020-4-2 17:06
有几个地方请教下:
1、在实际应用的时候如果mqtt_connect()这个函数调用返回失败,是不是需要不断调用直 ...


连接超时会尝试重连的,只有用户尝试关闭mqtt连接的时候 才会退出mqtt_yield_thread
haov000
haov000 2020-04-07
踩姑娘的小蘑菇 发表于 2020-4-3 17:27
连接超时会尝试重连的,只有用户尝试关闭mqtt连接的时候 才会退出mqtt_yield_thread ...


奥, 对, 关闭连接时,应该退出
木成舟
木成舟 2020-04-15
楼主您好,我在STM32F103RET6上移植了您的MQTT,现在是能发主题,但是接收不到订阅的主题的回调,有时候还会AT socket timeout.想问下这是什么情况,我用的是您的test_sample测试的。
yeshenmeng
yeshenmeng 2020-04-16
杰神你好,我在输入空的用户名和密码的时候发现连接MQTT服务器有问题,之后查看代码发现有这么一句话:
在MQTTSerialize_connectLength函数中:
if (options->username.cstring || options->username.lenstring.data)
len += MQTTstrlen(options->username)+2;
if (options->password.cstring || options->password.lenstring.data)
len += MQTTstrlen(options->password)+2;
在MQTTSerialize_connect函数中:
if (options->username.cstring || options->username.lenstring.data)
flags.bits.username = 1;
if (options->password.cstring || options->password.lenstring.data)
flags.bits.password = 1;
判断用户名与密码的时候这里用的||的关系,我改成&&的关系后就能正常连接到不需要用户名与密码的MQTT服务器,刚接触MQTT不太熟悉,有不对的地方请见谅
杰杰
杰杰 2020-05-01
木成舟 发表于 2020-4-15 07:34
楼主您好,我在STM32F103RET6上移植了您的MQTT,现在是能发主题,但是接收不到订阅的主题的回调,有时候还 ...


没订阅成功,看看订阅时候返回的错误代码~
杰杰
杰杰 2020-05-01
yeshenmeng 发表于 2020-4-16 13:49
杰神你好,我在输入空的用户名和密码的时候发现连接MQTT服务器有问题,之后查看代码发现有这么一句话:
在M ...


我的测试不用用户名、密码是可以正常连接的~
杰杰
杰杰 2020-05-01
最近软件包又有新的更新~大家可以去看看:

微信截图_20200501211902.png
木成舟
木成舟 2020-05-03
杰杰 发表于 2020-5-1 21:08
没订阅成功,看看订阅时候返回的错误代码~


嗯嗯,已经解决了,还有个小疑问,现在这个是没有遗愿机制吗?我看初始化函数里就没有初始化will_flag,will_option
杰杰
杰杰 2020-05-05
木成舟 发表于 2020-5-3 00:14
嗯嗯,已经解决了,还有个小疑问,现在这个是没有遗愿机制吗?我看初始化函数里就没有初始化will_flag,w ...


有的,在指定连接的参数中是可以设置的,我测试只是让他默认了而已,参考connect_params_t结构体的设置,在mqtt_init()中指定
AlexZ
AlexZ 2020-05-21
你好 使用你的 kawaii-mqtt 软件包的时候出现 AT socket (0) receive timeout 问题,https://fastadmin.rt-thread.org/ask/question/424547.html
jerry2cool
jerry2cool 2020-05-21
感谢大神。。。。
Space
Space 2020-05-27
rtt的包没更新1.04吗?
杰杰
杰杰 2020-05-29
Space 发表于 2020-5-27 11:48
rtt的包没更新1.04吗?


最近会更新,要1.0.5了~到时候会更新在这里~,一直在修改,精力有限请谅解哈~
杰杰
杰杰 2020-06-18
Space 发表于 2020-5-27 11:48
rtt的包没更新1.04吗?


已经更新了,这次是比较大版本的更新,跨越了v1.0.x,直接命名为v1.1.0版本,重构部分代码,优化MQTT处理的逻辑,提升整体的稳定性,支持多客户端,支持设置遗嘱,优化API接口,增加多个云平台的测试代码与说明文档,增加在线代码生成工具、在线裁剪配置工具

撰写答案

请登录后再发布答案,点击登录

发布
问题

分享
好友