Toggle navigation
首页
问答
文章
积分商城
专家
专区
更多专区...
文档中心
返回主站
搜索
提问
会员
中心
登录
注册
MQTT
原创征文
RyanMqtt 移植指南(三)
发布于 2023-09-28 15:55:47 浏览:168
订阅该版
[tocm] 移植代码可去 https://github.com/Ryan-CW-Code/RyanMqtt platform/quecopen查看 测试环境:stm32F401RCT6、RT-Thread版本: v4.1.0、RT-Thread Studio版本: 2.2.6、网络硬件使用ec800m移植at_socket使用sal框架。 ## 1、移植介绍 _RyanMqtt 库希望应用程序为以下接口提供实现:_ #### system 接口 _RyanMqtt 需要 RTOS 支持,必须实现如下接口才可以保证 mqtt 客户端的正常运行_ | 函数名称 | 函数简介 | | --------------------- | ------------------- | | platformMemoryMalloc | 申请内存 | | platformMemoryFree | 释放已申请内存 | | platformPrint | 打印字符串 | | platformDelay | 毫秒延时 | | platformThreadInit | 初始化线程 | | platformThreadStart | 开启线程 | | platformThreadStop | 挂起线程 | | platformThreadDestroy | 销毁线程 | | platformMutexInit | 初始化互斥锁 | | platformMutexLock | 获取互斥锁 | | platformMutexUnLock | 释放互斥锁 | | platformMutexDestroy | 销毁互斥锁 | | platformCriticalEnter | 进入临界区 / 关中断 | | platformCriticalExit | 退出临界区 / 开中断 | #### network 接口 _RyanMqtt 依赖于底层传输接口 API,必须实现该接口 API 才能在网络上发送和接收数据包_ _MQTT 协议要求基础传输层能够提供有序的、可靠的、双向传输(从客户端到服务端 和从服务端到客户端)的字节流_ | 函数名称 | 函数简介 | | ------------------------ | ------------------------ | | platformNetworkConnect | 根据 ip 和端口连接服务器 | | platformNetworkRecvAsync | 非阻塞接收数据 | | platformNetworkSendAsync | 非阻塞发送数据 | | platformNetworkClose | 断开 mqtt 服务器连接 | #### time 接口 _RyanMqtt 依靠函数生成毫秒时间戳,用于计算持续时间和超时,内部已经做了数值溢出处理_ | 函数名称 | 函数简介 | | ---------------- | ------------------------ | | platformUptimeMs | 自系统启动以来 ms 时间戳 | ## 2、开始移植 得益于RT-Thread驱动应用层分离的思想和SAL框架,**platform/rtthread的适配层可以适应任何RT-Thread代码**,所以我们就不拿RT-Thread来移植了。 ***使用FreeRTOS内核来移植,使用CMSIS-RTOS V2兼容层。*** #### system 接口 系统接口,需要移植RTOS的接口。为方便管理类型使用平台结构体,修改platformSystem.h里面的结构体 就是线程和互斥锁 ```c typedef struct { osThreadId_t thread; } platformThread_t; typedef struct { osMutexId_t mutex; } platformMutex_t; ``` **再来实现platformSystem.c里面的函数定义** 注意里面的 platformPrint 函数,由于FreeRTOS没有官方的打印接口。记得修改为你的打印接口 ```c #include "platformSystem.h" /** * @brief 申请内存 * * @param size * @return void* */ void *platformMemoryMalloc(size_t size) { return pvPortMalloc(size); } /** * @brief 释放内存 * * @param ptr */ void platformMemoryFree(void *ptr) { vPortFree(ptr); } /** * @brief ms延时 * * @param ms */ void platformDelay(uint32_t ms) { osDelay(ms); } /** * @brief 打印字符串函数,可通过串口打印出去 * * @param str * @param strLen */ void platformPrint(char *str, uint16_t strLen) { } /** * @brief 初始化并运行线程 * * @param userData * @param platformThread * @param name * @param entry * @param param * @param stackSize * @param priority * @return RyanMqttError_e */ RyanMqttError_e platformThreadInit(void *userData, platformThread_t *platformThread, const char *name, void (*entry)(void *), void *const param, uint32_t stackSize, uint32_t priority) { const osThreadAttr_t myTask02_attributes = { .name = name, .stack_size = stackSize, .priority = (osPriority_t)priority, }; platformThread->thread = osThreadNew(entry, param, &myTask02_attributes); if (NULL == platformThread->thread) return RyanMqttNoRescourceError; return RyanMqttSuccessError; } /** * @brief 销毁自身线程 * * @param userData * @param platformThread * @return RyanMqttError_e */ RyanMqttError_e platformThreadDestroy(void *userData, platformThread_t *platformThread) { osThreadTerminate(platformThread->thread); return RyanMqttSuccessError; } /** * @brief 开启线程 * * @param userData * @param platformThread * @return RyanMqttError_e */ RyanMqttError_e platformThreadStart(void *userData, platformThread_t *platformThread) { osThreadResume(platformThread->thread); return RyanMqttSuccessError; } /** * @brief 挂起线程 * * @param userData * @param platformThread * @return RyanMqttError_e */ RyanMqttError_e platformThreadStop(void *userData, platformThread_t *platformThread) { osThreadSuspend(platformThread->thread); return RyanMqttSuccessError; } /** * @brief 互斥锁初始化 * * @param userData * @param platformMutex * @return RyanMqttError_e */ RyanMqttError_e platformMutexInit(void *userData, platformMutex_t *platformMutex) { const osMutexAttr_t myMutex01_attributes = { .name = "mqttMutex"}; platformMutex->mutex = osMutexNew(&myMutex01_attributes); return RyanMqttSuccessError; } /** * @brief 销毁互斥锁 * * @param userData * @param platformMutex * @return RyanMqttError_e */ RyanMqttError_e platformMutexDestroy(void *userData, platformMutex_t *platformMutex) { osMutexDelete(platformMutex->mutex); return RyanMqttSuccessError; } /** * @brief 阻塞获取互斥锁 * * @param userData * @param platformMutex * @return RyanMqttError_e */ RyanMqttError_e platformMutexLock(void *userData, platformMutex_t *platformMutex) { osMutexAcquire(platformMutex->mutex, osWaitForever); return RyanMqttSuccessError; } /** * @brief 释放互斥锁 * * @param userData * @param platformMutex * @return RyanMqttError_e */ RyanMqttError_e platformMutexUnLock(void *userData, platformMutex_t *platformMutex) { osMutexRelease(platformMutex->mutex); return RyanMqttSuccessError; } /** * @brief 进入临界区 / 关中断 * */ void platformCriticalEnter(void) { osKernelLock(); } /** * @brief 退出临界区 / 开中断 * */ void platformCriticalExit(void) { osKernelUnlock(); } ``` #### time 接口 time接口,只需要提供一个ms时间戳就行,直接修改函数,这里使用FreeRTOS的心跳。 ```c uint32_t platformUptimeMs(void) { if (1000 == osKernelGetTickFreq()) return (uint32_t)osKernelGetTickCount(); else { uint32_t tick = 0; tick = osKernelGetTickCount() * 1000; return (uint32_t)((tick + osKernelGetTickCount() - 1) / osKernelGetTickCount()); } } ``` #### network 接口 **_MQTT 协议要求基础传输层能够提供有序的、可靠的、双向传输(从客户端到服务端 和从服务端到客户端)的字节流_** 由于FreeRTOS没有规定标准的网络层,你可以选择 FreeRTOS-Plus-TCP / FreeRTOS-Cellular-Interface/ lwip / W5500等网络方法,几乎RT-Thread支持的你也可以在FreeRTOS仓库找到。 这里以lwip为例,使用socket接口来实现,网络阻塞发送和接收使用 SO_SNDTIMEO 和 SO_RCVTIMEO 来实现,你也可以选择select / poll / epoll等方式。 修改 platformNetwork_t 结构体以支持 socket ```c typedef struct { int socket; } platformNetwork_t; ``` 接着实现platformNetwork.c里面的函数 ```c #define rlogEnable 1 // 是否使能日志 #define rlogColorEnable 1 // 是否使能日志颜色 #define rlogLevel (rlogLvlWarning) // 日志打印等级 #define rlogTag "RyanMqttNet" // 日志tag #include "platformNetwork.h" #include "RyanMqttLog.h" /** * @brief 连接mqtt服务器 * * @param userData * @param platformNetwork * @param host * @param port * @return RyanMqttError_e * 成功返回RyanMqttSuccessError, 失败返回错误信息 */ RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host, const char *port) { RyanMqttError_e result = RyanMqttSuccessError; struct addrinfo *addrList = NULL; struct addrinfo hints = { .ai_family = AF_UNSPEC, .ai_socktype = SOCK_STREAM, .ai_protocol = IPPROTO_TCP}; if (getaddrinfo(host, port, &hints, &addrList) != 0) { result = RyanSocketFailedError; goto exit; } platformNetwork->socket = socket(addrList->ai_family, addrList->ai_socktype, addrList->ai_protocol); if (platformNetwork->socket < 0) { result = RyanSocketFailedError; goto exit; } if (connect(platformNetwork->socket, addrList->ai_addr, addrList->ai_addrlen) != 0) { platformNetworkClose(userData, platformNetwork); result = RyanMqttSocketConnectFailError; goto exit; } exit: if (NULL != addrList) freeaddrinfo(addrList); return result; } /** * @brief 非阻塞接收数据 * * @param userData * @param platformNetwork * @param recvBuf * @param recvLen * @param timeout * @return RyanMqttError_e * socket错误返回 RyanSocketFailedError * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError * 接收成功 RyanMqttSuccessError */ RyanMqttError_e platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, int recvLen, int timeout) { int32_t recvResult = 0; int32_t offset = 0; int32_t timeOut2 = timeout; struct timeval tv = {0}; platformTimer_t timer = {0}; if (-1 == platformNetwork->socket) return RyanSocketFailedError; platformTimerCutdown(&timer, timeout); while ((offset < recvLen) && (0 != timeOut2)) { tv.tv_sec = timeOut2 / 1000; tv.tv_usec = timeOut2 % 1000 * 1000; if (tv.tv_sec <= 0 && tv.tv_usec <= 100) { tv.tv_sec = 0; tv.tv_usec = 100; } setsockopt(platformNetwork->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞 recvResult = recv(platformNetwork->socket, recvBuf + offset, recvLen - offset, 0); if (recvResult <= 0) // 小于零,表示错误,个别错误不代表socket错误 { // 下列3种表示没问题,但需要推出发送 if ((errno == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时 errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空 errno == EINTR)) // 操作被信号中断 break; return RyanSocketFailedError; } offset += recvResult; timeOut2 = platformTimerRemain(&timer); } if (offset != recvLen) return RyanMqttRecvPacketTimeOutError; return RyanMqttSuccessError; } /** * @brief 非阻塞发送数据 * * @param userData * @param platformNetwork * @param sendBuf * @param sendLen * @param timeout * @return RyanMqttError_e * socket错误返回 RyanSocketFailedError * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError * 接收成功 RyanMqttSuccessError */ RyanMqttError_e platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, int sendLen, int timeout) { int32_t sendResult = 0; int32_t offset = 0; int32_t timeOut2 = timeout; struct timeval tv = {0}; platformTimer_t timer = {0}; if (-1 == platformNetwork->socket) return RyanSocketFailedError; platformTimerCutdown(&timer, timeout); while ((offset < sendLen) && (0 != timeOut2)) { tv.tv_sec = timeOut2 / 1000; tv.tv_usec = timeOut2 % 1000 * 1000; if (tv.tv_sec <= 0 && tv.tv_usec <= 100) { tv.tv_sec = 0; tv.tv_usec = 100; } setsockopt(platformNetwork->socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞 sendResult = send(platformNetwork->socket, sendBuf + offset, sendLen - offset, 0); if (sendResult <= 0) // 小于零,表示错误,个别错误不代表socket错误 { // 下列3种表示没问题,但需要推出发送 if ((errno == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时 errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空 errno == EINTR)) // 操作被信号中断 break; return RyanSocketFailedError; } offset += sendResult; timeOut2 = platformTimerRemain(&timer); } if (offset != sendLen) return RyanMqttSendPacketTimeOutError; return RyanMqttSuccessError; } /** * @brief 断开mqtt服务器连接 * * @param userData * @param platformNetwork * @return RyanMqttError_e */ RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork) { if (platformNetwork->socket >= 0) { closesocket(platformNetwork->socket); platformNetwork->socket = -1; } return RyanMqttSuccessError; } ``` ## 3、总结 可以看到,RyanMqtt移植非常简单,有专门的platform层用来移植。
4
条评论
默认排序
按发布时间排序
登录
注册新账号
关于作者
Ryan_CW
这家伙很懒,什么也没写!
文章
9
回答
53
被采纳
12
关注TA
发私信
相关文章
1
umqtt 软件包使用后,连接上emqx服务器,过一会儿就掉线了
2
使用正点原子的 潘多拉 开发板 的例程19_iot_mqtt
3
mqtt软件包,不支持直接关闭?
4
kawaii_mqtt 申请内存崩溃
5
_signal_entry() 函数中dbg_enter在哪里定义呢?
6
start to connect mqtt server 失败
7
MQTT 在“ read 0:1, break “后断开重连
8
paho_mqtt线程相关疑问
9
RT thread studio kawaii mqtt 无法连接EMQ
10
调试bc26 ,断言错误failed at rt_thread_timeout
推荐文章
1
RT-Thread应用项目汇总
2
玩转RT-Thread系列教程
3
机器人操作系统 (ROS2) 和 RT-Thread 通信
4
五分钟玩转RT-Thread新社区
5
国产MCU移植系列教程汇总,欢迎查看!
6
【技术三千问】之《玩转ART-Pi》,看这篇就够了!干货汇总
7
关于STM32H7开发板上使用SDIO接口驱动SD卡挂载文件系统的问题总结
8
STM32的“GPU”——DMA2D实例详解
9
RT-Thread隐藏的宝藏之completion
10
【ART-PI】RT-Thread 开启RTC 与 Alarm组件
热门标签
RT-Thread Studio
串口
LWIP
Env
AT
SPI
Bootloader
FinSH
ART-Pi
CAN总线
Hardfault
USB
文件系统
RT-Thread
DMA
SCons
线程
MQTT
RT-Thread Nano
STM32
RTC
rt-smart
ESP8266
flash
ota在线升级
WIZnet_W5500
FAL
I2C
packages_软件包
UART
cubemx
freemodbus
潘多拉开发板_Pandora
定时器
BSP
PWM
ADC
socket
中断
rt_mq_消息队列_msg_queue
keil_MDK
SDIO
Debug
AB32VG1
MicroPython
编译报错
C++_cpp
msh
ulog
QEMU
本月问答贡献
出出啊
1501
个答案
338
次被采纳
小小李sunny
1390
个答案
276
次被采纳
张世争
715
个答案
157
次被采纳
crystal266
522
个答案
153
次被采纳
whj467467222
1216
个答案
146
次被采纳
本月文章贡献
出出啊
1
篇文章
12
次点赞
小小李sunny
1
篇文章
1
次点赞
张世争
2
篇文章
2
次点赞
crystal266
2
篇文章
5
次点赞
whj467467222
2
篇文章
1
次点赞
回到
顶部
发布
问题
投诉
建议
回到
底部