Toggle navigation
首页
问答
文章
积分商城
专家
专区
更多专区...
文档中心
返回主站
搜索
提问
会员
中心
登录
注册
SAL
【rt-thread网络连载】第0篇:通过paho-mqtt软件包入门rt-thread的sal
5.00
发布于 2023-01-15 18:39:51 浏览:1245
订阅该版
[tocm] # 一、paho-mqtt软件包程序流程 ## 1.1 paho_mqtt_start 在rt_wlan_register_event_handler函数注册好RT_WLAN_EVT_READY的回调函数paho_mqtt_start,当wifi准备好后调用mq_start启动mqtt。在mq_start中,初始化MQTTClient结构体,设置mqtt连接的参数:mqtt的uri、mqtt的用户名(username)和密码(password)、mqtt发布和订阅的主题Topic、消息质量等级QoS,最后调用paho_mqtt_start创建处理mqtt的线程paho_mqtt_thread。 ```c static void mq_start(void) { /* init condata param by using MQTTPacket_connectData_initializer */ MQTTPacket_connectData condata = MQTTPacket_connectData_initializer; static char cid[20] = { 0 }; static int is_started = 0; if (is_started) { return; } /* config MQTT context param */ { client.isconnected = 0; client.uri = MQTT_URI; /* generate the random client ID */ rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get()); /* config connect param */ memcpy(&client.condata, &condata, sizeof(condata)); client.condata.clientID.cstring = cid; client.condata.keepAliveInterval = 60; client.condata.cleansession = 1; client.condata.username.cstring = MQTT_USERNAME; client.condata.password.cstring = MQTT_PASSWORD; /* config MQTT will param. */ client.condata.willFlag = 1; client.condata.will.qos = 1; client.condata.will.retained = 0; client.condata.will.topicName.cstring = MQTT_PUBTOPIC; client.condata.will.message.cstring = MQTT_WILLMSG; /* malloc buffer. */ client.buf_size = client.readbuf_size = 1024; client.buf = malloc(client.buf_size); client.readbuf = malloc(client.readbuf_size); if (!(client.buf && client.readbuf)) { LOG_E("no memory for MQTT client buffer!"); goto _exit; } /* set event callback function */ client.connect_callback = mqtt_connect_callback; client.online_callback = mqtt_online_callback; client.offline_callback = mqtt_offline_callback; /* set subscribe table and event callback */ client.messageHandlers[0].topicFilter = MQTT_SUBTOPIC; client.messageHandlers[0].callback = mqtt_sub_callback; client.messageHandlers[0].qos = QOS1; /* set default subscribe event callback */ client.defaultMessageHandler = mqtt_sub_default_callback; } /* run mqtt client */ paho_mqtt_start(&client); is_started = 1; _exit: return; } rt_wlan_register_event_handler(RT_WLAN_EVT_READY, (void (*)(int, struct rt_wlan_buff *, void *))mq_start, RT_NULL); ``` ## 1.2 paho_mqtt_thread 在paho_mqtt_thread中调用paho-mqtt提供的接口和rt-thread的sal的接口完成与mqtt服务器的交互,包括以下几个方面:与服务器的连接、订阅主题、向服务器发送心跳包、处理服务器发送下来的消息(CONNACK、PUBACK、SUBACK、PUBLISH、PUBREC、PUBCOMP、PINGRESP)、回环服务器通过topic发送下来的消息。 ```c static void paho_mqtt_thread(void *param) { MQTTClient *c = (MQTTClient *)param; int i, rc, len; int rc_t = 0; c->pub_sock = socket(AF_INET, SOCK_DGRAM, 0); if (c->pub_sock == -1) { debug_printf("create pub_sock error!\n"); goto _mqtt_exit; } /* bind publish socket. */ { struct sockaddr_in pub_server_addr; c->pub_port = pub_port; pub_port ++; pub_server_addr.sin_family = AF_INET; pub_server_addr.sin_port = htons((c->pub_port)); pub_server_addr.sin_addr.s_addr = INADDR_ANY; memset(&(pub_server_addr.sin_zero), 0, sizeof(pub_server_addr.sin_zero)); rc = bind(c->pub_sock, (struct sockaddr *)&pub_server_addr, sizeof(struct sockaddr)); if (rc == -1) { debug_printf("pub_sock bind error!\n"); goto _mqtt_exit; } } _mqtt_start: if (c->connect_callback) { c->connect_callback(c); } rc = net_connect(c); if (rc != 0) { goto _mqtt_restart; } rc = MQTTConnect(c); if (rc != 0) { goto _mqtt_restart; } for (i = 0; i < MAX_MESSAGE_HANDLERS; i++) { const char *topic = c->messageHandlers[i].topicFilter; if(topic == RT_NULL) continue; rc = MQTTSubscribe(c, topic, QOS2); debug_printf("Subscribe #%d %s %s!\n", i, topic, (rc < 0) ? ("fail") : ("OK")); if (rc != 0) { goto _mqtt_disconnect; } } if (c->online_callback) { c->online_callback(c); } c->tick_ping = rt_tick_get(); while (1) { int res; rt_tick_t tick_now; fd_set readset; struct timeval timeout; tick_now = rt_tick_get(); if (((tick_now - c->tick_ping) / RT_TICK_PER_SECOND) > (c->keepAliveInterval - 5)) { timeout.tv_sec = 1; //debug_printf("tick close to ping.\n"); } else { timeout.tv_sec = c->keepAliveInterval - 10 - (tick_now - c->tick_ping) / RT_TICK_PER_SECOND; //debug_printf("timeount for ping: %d\n", timeout.tv_sec); } timeout.tv_usec = 0; FD_ZERO(&readset); FD_SET(c->sock, &readset); FD_SET(c->pub_sock, &readset); /* int select(maxfdp1, readset, writeset, exceptset, timeout); */ res = select(((c->pub_sock > c->sock) ? c->pub_sock : c->sock) + 1, &readset, RT_NULL, RT_NULL, &timeout); if (res == 0) { len = MQTTSerialize_pingreq(c->buf, c->buf_size); rc = sendPacket(c, len); if (rc != 0) { debug_printf("[%d] send ping rc: %d \n", rt_tick_get(), rc); goto _mqtt_disconnect; } /* wait Ping Response. */ timeout.tv_sec = 5; timeout.tv_usec = 0; FD_ZERO(&readset); FD_SET(c->sock, &readset); res = select(c->sock + 1, &readset, RT_NULL, RT_NULL, &timeout); if (res <= 0) { debug_printf("[%d] wait Ping Response res: %d\n", rt_tick_get(), res); goto _mqtt_disconnect; } } /* res == 0: timeount for ping. */ if (res < 0) { debug_printf("select res: %d\n", res); goto _mqtt_disconnect; } if (FD_ISSET(c->sock, &readset)) { //debug_printf("sock FD_ISSET\n"); rc_t = MQTT_cycle(c); //debug_printf("sock FD_ISSET rc_t : %d\n", rc_t); if (rc_t < 0) goto _mqtt_disconnect; continue; } if (FD_ISSET(c->pub_sock, &readset)) { struct sockaddr_in pub_client_addr; uint32_t addr_len = sizeof(struct sockaddr); MQTTMessage *message; MQTTString topic = MQTTString_initializer; //debug_printf("pub_sock FD_ISSET\n"); len = recvfrom(c->pub_sock, c->readbuf, c->readbuf_size, MSG_DONTWAIT, (struct sockaddr *)&pub_client_addr, &addr_len); if (pub_client_addr.sin_addr.s_addr != *((uint32_t *)(&netif_default->ip_addr))) { #if 1 char client_ip_str[16]; /* ###.###.###.### */ strcpy(client_ip_str, inet_ntoa(*((struct in_addr *) & (pub_client_addr.sin_addr)))); debug_printf("pub_sock recvfrom len: %s, skip!\n", client_ip_str); #endif continue; } if (len < sizeof(MQTTMessage)) { c->readbuf[len] = '\0'; debug_printf("pub_sock recv %d byte: %s\n", len, c->readbuf); if (strcmp((const char *)c->readbuf, "DISCONNECT") == 0) { debug_printf("DISCONNECT\n"); goto _mqtt_disconnect_exit; } continue; } message = (MQTTMessage *)c->readbuf; message->payload = c->readbuf + sizeof(MQTTMessage); topic.cstring = (char *)c->readbuf + sizeof(MQTTMessage) + message->payloadlen; //debug_printf("pub_sock topic:%s, payloadlen:%d\n", topic.cstring, message->payloadlen); len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id, topic, (unsigned char *)message->payload, message->payloadlen); if (len <= 0) { debug_printf("MQTTSerialize_publish len: %d\n", len); goto _mqtt_disconnect; } if ((rc = sendPacket(c, len)) != PAHO_SUCCESS) // send the subscribe packet { debug_printf("MQTTSerialize_publish sendPacket rc: %d\n", rc); goto _mqtt_disconnect; } } /* pbulish sock handler. */ } /* while (1) */ _mqtt_disconnect: MQTTDisconnect(c); _mqtt_restart: if (c->offline_callback) { c->offline_callback(c); } net_disconnect(c); rt_thread_delay(RT_TICK_PER_SECOND * 5); debug_printf("restart!\n"); goto _mqtt_start; _mqtt_disconnect_exit: MQTTDisconnect(c); net_disconnect(c); _mqtt_exit: debug_printf("thread exit\n"); return; } ``` # 二、与mqtt broker的交互 paho-mqtt软件包提供了两种发布消息到mqtt broker的方式:udp和管道。在MQTTClient结构体中有三个成员与通信有关:sock、pub_sock、pub_pipe,其中sock是与mqtt broker通信的套接字,pub_sock和pub_pipe是两种不同的发布方式:pub_sock是通过udp的方式发布消息;pub_pipe是通过管道,最终由sock发布消息。如下面的代码所示,使用哪种方式可以通过宏来配置。下面展开描述这两种方式如何与mqtt broker交互的。 ```c /* publish interface */ #if defined(RT_USING_POSIX) && (defined(RT_USING_DFS_NET) || defined(SAL_USING_POSIX)) int pub_pipe[2]; #else int pub_sock; int pub_port; #endif ``` ## 2.1 管道(pipe)方式 在paho_mqtt_pipe.c中的paho_mqtt_thread,下面的代码完成了发布消息、接收订阅消息、处理心跳包的工作。下面以三个点细说。 - 当需要发布消息时,应用层需要调用MQTTPublish,这个函数会调用write向管道的写端pub_pipe[1]写入待发送的数据。而管道的读端pub_pipe[0]在select中被监听,当MQTTPublish被调用时,select可以往下执行,首先调用read从管道中读取数据,接着调用MQTTSerialize_publish将数据封包,最后调用sendPacket将数据发送出去。 - 当接收到订阅的消息时,select会往下执行,接着调用MQTT_cycle读取并解析出数据。 - select的超时时间是50s,如果50s没有消息处理,则向broker发送心跳包。 ```c FD_ZERO(&readset); FD_SET(c->sock, &readset); FD_SET(c->pub_pipe[0], &readset); /* int select(maxfdp1, readset, writeset, exceptset, timeout); */ res = select(((c->pub_pipe[0] > c->sock) ? c->pub_pipe[0] : c->sock) + 1, &readset, RT_NULL, RT_NULL, &timeout); if (res == 0) { len = MQTTSerialize_pingreq(c->buf, c->buf_size); rc = sendPacket(c, len); if (rc != 0) { LOG_E("[%d] send ping rc: %d ", rt_tick_get(), rc); goto _mqtt_disconnect; } /* wait Ping Response. */ timeout.tv_sec = 5; timeout.tv_usec = 0; FD_ZERO(&readset); FD_SET(c->sock, &readset); res = select(c->sock + 1, &readset, RT_NULL, RT_NULL, &timeout); if (res <= 0) { LOG_E("[%d] wait Ping Response res: %d", rt_tick_get(), res); goto _mqtt_disconnect; } } /* res == 0: timeount for ping. */ if (res < 0) { LOG_E("select res: %d", res); goto _mqtt_disconnect; } if (FD_ISSET(c->sock, &readset)) { //LOG_D("sock FD_ISSET"); rc_t = MQTT_cycle(c); //LOG_D("sock FD_ISSET rc_t : %d", rc_t); if (rc_t < 0) goto _mqtt_disconnect; continue; } if (FD_ISSET(c->pub_pipe[0], &readset)) { MQTTMessage *message; MQTTString topic = MQTTString_initializer; //LOG_D("pub_sock FD_ISSET"); len = read(c->pub_pipe[0], c->readbuf, c->readbuf_size); if (len < sizeof(MQTTMessage)) { c->readbuf[len] = '\0'; LOG_D("pub_sock recv %d byte: %s", len, c->readbuf); if (strcmp((const char *)c->readbuf, "DISCONNECT") == 0) { LOG_D("DISCONNECT"); goto _mqtt_disconnect_exit; } continue; } message = (MQTTMessage *)c->readbuf; message->payload = c->readbuf + sizeof(MQTTMessage); topic.cstring = (char *)c->readbuf + sizeof(MQTTMessage) + message->payloadlen; //LOG_D("pub_sock topic:%s, payloadlen:%d", topic.cstring, message->payloadlen); len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id, topic, (unsigned char *)message->payload, message->payloadlen); if (len <= 0) { LOG_D("MQTTSerialize_publish len: %d", len); goto _mqtt_disconnect; } if ((rc = sendPacket(c, len)) != PAHO_SUCCESS) // send the subscribe packet { LOG_D("MQTTSerialize_publish sendPacket rc: %d", rc); goto _mqtt_disconnect; } } ``` ## 2.2 udp方式 udp方式中,处理流程与管道方式基本相似。下面说明一下这种方式两个套接字的工作流程。 MQTTClient结构体中有两个socket,一个是基于tcp的负责控制与服务器连接的sock,另一个是基于udp协议的负责消息发布的pub_sock。 ### 2.2.1 sock - 连接:在net_connect调用socket、connet函数建立与服务器的tcp连接。 - 处理:sock接收到服务器的数据后,在MQTT_cycle中处理来自服务器的CONNACK、PUBACK、SUBACK、PUBLISH、PUBREC、PUBCOMP、PINGRESP消息。 - 断开连接:在net_disconnect函数中调用closesocket关闭与服务器的tcp连接。 ### 2.2.2 pub_sock - 连接:分为pub_sock的绑定和mqtt连接的建立 1、调用socket创建pub_sock,之后调用bind绑定pub_sock到udp端口。 2、在MQTTConnect函数中,通过sock发送connect消息给服务器,建立mqtt连接。 - 处理:先recvfrom将接受的数据拷贝到MQTTClient的readbuf,再将数据回环发布到服务器。 - 断开连接:通过sock向服务器发送DISCONNECT消息,断开mqtt连接。
2
条评论
默认排序
按发布时间排序
登录
注册新账号
关于作者
happycode999
这家伙很懒,什么也没写!
文章
28
回答
6
被采纳
0
关注TA
发私信
相关文章
1
SAL 不支持 PF_PACKET,如何修改可以支持?
2
socket(AF_INET, SOCK_RAW, IPPROTO_UDP);
3
SAL组件好像并不支持DTLS
4
添加SAL组件后socket相关函数均提示declared implicitly
5
sal组件socket等函数无法使用
6
SAL关闭自带的一个socket连接
7
wiznet与libmodbus软件包衔接对接SAL层传参问题
8
sal验证link.rt-thread.org:8101失败
9
请问 AT-SAL,recv() 函数在服务端断开之后没有返回,是设计如此吗?
10
多网卡的出现创建socket失败
推荐文章
1
RT-Thread应用项目汇总
2
玩转RT-Thread系列教程
3
国产MCU移植系列教程汇总,欢迎查看!
4
机器人操作系统 (ROS2) 和 RT-Thread 通信
5
五分钟玩转RT-Thread新社区
6
【技术三千问】之《玩转ART-Pi》,看这篇就够了!干货汇总
7
关于STM32H7开发板上使用SDIO接口驱动SD卡挂载文件系统的问题总结
8
STM32的“GPU”——DMA2D实例详解
9
RT-Thread隐藏的宝藏之completion
10
【ART-PI】RT-Thread 开启RTC 与 Alarm组件
热门标签
RT-Thread Studio
串口
Env
LWIP
SPI
AT
Bootloader
Hardfault
CAN总线
FinSH
ART-Pi
USB
DMA
文件系统
RT-Thread
SCons
RT-Thread Nano
线程
MQTT
STM32
RTC
FAL
rt-smart
ESP8266
I2C_IIC
UART
WIZnet_W5500
ota在线升级
PWM
cubemx
flash
freemodbus
BSP
packages_软件包
潘多拉开发板_Pandora
定时器
ADC
GD32
flashDB
socket
中断
编译报错
Debug
rt_mq_消息队列_msg_queue
SFUD
keil_MDK
msh
ulog
MicroPython
C++_cpp
本月问答贡献
出出啊
1517
个答案
342
次被采纳
小小李sunny
1444
个答案
290
次被采纳
张世争
812
个答案
177
次被采纳
crystal266
547
个答案
161
次被采纳
whj467467222
1222
个答案
148
次被采纳
本月文章贡献
出出啊
1
篇文章
2
次点赞
小小李sunny
1
篇文章
1
次点赞
张世争
1
篇文章
2
次点赞
crystal266
2
篇文章
2
次点赞
whj467467222
2
篇文章
2
次点赞
回到
顶部
发布
问题
投诉
建议
回到
底部