Toggle navigation
首页
问答
文章
积分商城
专家
专区
更多专区...
文档中心
返回主站
搜索
提问
会员
中心
登录
注册
MQTT
原创征文
RyanMqtt 移植指南(三)
发布于 2023-09-28 15:55:47 浏览:495
订阅该版
[tocm] 移植代码可去 https://github.com/Ryan-CW-Code/RyanMqtt platform/quecopen查看 测试环境:stm32F401RCT6、RT-Thread版本: v4.1.0、RT-Thread Studio版本: 2.2.6、网络硬件使用ec800m移植at_socket使用sal框架。 ## 1、移植介绍 _RyanMqtt 库希望应用程序为以下接口提供实现:_ #### system 接口 _RyanMqtt 需要 RTOS 支持,必须实现如下接口才可以保证 mqtt 客户端的正常运行_ | 函数名称 | 函数简介 | | --------------------- | ------------------- | | platformMemoryMalloc | 申请内存 | | platformMemoryFree | 释放已申请内存 | | platformPrint | 打印字符串 | | platformDelay | 毫秒延时 | | platformThreadInit | 初始化线程 | | platformThreadStart | 开启线程 | | platformThreadStop | 挂起线程 | | platformThreadDestroy | 销毁线程 | | platformMutexInit | 初始化互斥锁 | | platformMutexLock | 获取互斥锁 | | platformMutexUnLock | 释放互斥锁 | | platformMutexDestroy | 销毁互斥锁 | | platformCriticalEnter | 进入临界区 / 关中断 | | platformCriticalExit | 退出临界区 / 开中断 | #### network 接口 _RyanMqtt 依赖于底层传输接口 API,必须实现该接口 API 才能在网络上发送和接收数据包_ _MQTT 协议要求基础传输层能够提供有序的、可靠的、双向传输(从客户端到服务端 和从服务端到客户端)的字节流_ | 函数名称 | 函数简介 | | ------------------------ | ------------------------ | | platformNetworkConnect | 根据 ip 和端口连接服务器 | | platformNetworkRecvAsync | 非阻塞接收数据 | | platformNetworkSendAsync | 非阻塞发送数据 | | platformNetworkClose | 断开 mqtt 服务器连接 | #### time 接口 _RyanMqtt 依靠函数生成毫秒时间戳,用于计算持续时间和超时,内部已经做了数值溢出处理_ | 函数名称 | 函数简介 | | ---------------- | ------------------------ | | platformUptimeMs | 自系统启动以来 ms 时间戳 | ## 2、开始移植 得益于RT-Thread驱动应用层分离的思想和SAL框架,**platform/rtthread的适配层可以适应任何RT-Thread代码**,所以我们就不拿RT-Thread来移植了。 ***使用FreeRTOS内核来移植,使用CMSIS-RTOS V2兼容层。*** #### system 接口 系统接口,需要移植RTOS的接口。为方便管理类型使用平台结构体,修改platformSystem.h里面的结构体 就是线程和互斥锁 ```c typedef struct { osThreadId_t thread; } platformThread_t; typedef struct { osMutexId_t mutex; } platformMutex_t; ``` **再来实现platformSystem.c里面的函数定义** 注意里面的 platformPrint 函数,由于FreeRTOS没有官方的打印接口。记得修改为你的打印接口 ```c #include "platformSystem.h" /** * @brief 申请内存 * * @param size * @return void* */ void *platformMemoryMalloc(size_t size) { return pvPortMalloc(size); } /** * @brief 释放内存 * * @param ptr */ void platformMemoryFree(void *ptr) { vPortFree(ptr); } /** * @brief ms延时 * * @param ms */ void platformDelay(uint32_t ms) { osDelay(ms); } /** * @brief 打印字符串函数,可通过串口打印出去 * * @param str * @param strLen */ void platformPrint(char *str, uint16_t strLen) { } /** * @brief 初始化并运行线程 * * @param userData * @param platformThread * @param name * @param entry * @param param * @param stackSize * @param priority * @return RyanMqttError_e */ RyanMqttError_e platformThreadInit(void *userData, platformThread_t *platformThread, const char *name, void (*entry)(void *), void *const param, uint32_t stackSize, uint32_t priority) { const osThreadAttr_t myTask02_attributes = { .name = name, .stack_size = stackSize, .priority = (osPriority_t)priority, }; platformThread->thread = osThreadNew(entry, param, &myTask02_attributes); if (NULL == platformThread->thread) return RyanMqttNoRescourceError; return RyanMqttSuccessError; } /** * @brief 销毁自身线程 * * @param userData * @param platformThread * @return RyanMqttError_e */ RyanMqttError_e platformThreadDestroy(void *userData, platformThread_t *platformThread) { osThreadTerminate(platformThread->thread); return RyanMqttSuccessError; } /** * @brief 开启线程 * * @param userData * @param platformThread * @return RyanMqttError_e */ RyanMqttError_e platformThreadStart(void *userData, platformThread_t *platformThread) { osThreadResume(platformThread->thread); return RyanMqttSuccessError; } /** * @brief 挂起线程 * * @param userData * @param platformThread * @return RyanMqttError_e */ RyanMqttError_e platformThreadStop(void *userData, platformThread_t *platformThread) { osThreadSuspend(platformThread->thread); return RyanMqttSuccessError; } /** * @brief 互斥锁初始化 * * @param userData * @param platformMutex * @return RyanMqttError_e */ RyanMqttError_e platformMutexInit(void *userData, platformMutex_t *platformMutex) { const osMutexAttr_t myMutex01_attributes = { .name = "mqttMutex"}; platformMutex->mutex = osMutexNew(&myMutex01_attributes); return RyanMqttSuccessError; } /** * @brief 销毁互斥锁 * * @param userData * @param platformMutex * @return RyanMqttError_e */ RyanMqttError_e platformMutexDestroy(void *userData, platformMutex_t *platformMutex) { osMutexDelete(platformMutex->mutex); return RyanMqttSuccessError; } /** * @brief 阻塞获取互斥锁 * * @param userData * @param platformMutex * @return RyanMqttError_e */ RyanMqttError_e platformMutexLock(void *userData, platformMutex_t *platformMutex) { osMutexAcquire(platformMutex->mutex, osWaitForever); return RyanMqttSuccessError; } /** * @brief 释放互斥锁 * * @param userData * @param platformMutex * @return RyanMqttError_e */ RyanMqttError_e platformMutexUnLock(void *userData, platformMutex_t *platformMutex) { osMutexRelease(platformMutex->mutex); return RyanMqttSuccessError; } /** * @brief 进入临界区 / 关中断 * */ void platformCriticalEnter(void) { osKernelLock(); } /** * @brief 退出临界区 / 开中断 * */ void platformCriticalExit(void) { osKernelUnlock(); } ``` #### time 接口 time接口,只需要提供一个ms时间戳就行,直接修改函数,这里使用FreeRTOS的心跳。 ```c uint32_t platformUptimeMs(void) { if (1000 == osKernelGetTickFreq()) return (uint32_t)osKernelGetTickCount(); else { uint32_t tick = 0; tick = osKernelGetTickCount() * 1000; return (uint32_t)((tick + osKernelGetTickCount() - 1) / osKernelGetTickCount()); } } ``` #### network 接口 **_MQTT 协议要求基础传输层能够提供有序的、可靠的、双向传输(从客户端到服务端 和从服务端到客户端)的字节流_** 由于FreeRTOS没有规定标准的网络层,你可以选择 FreeRTOS-Plus-TCP / FreeRTOS-Cellular-Interface/ lwip / W5500等网络方法,几乎RT-Thread支持的你也可以在FreeRTOS仓库找到。 这里以lwip为例,使用socket接口来实现,网络阻塞发送和接收使用 SO_SNDTIMEO 和 SO_RCVTIMEO 来实现,你也可以选择select / poll / epoll等方式。 修改 platformNetwork_t 结构体以支持 socket ```c typedef struct { int socket; } platformNetwork_t; ``` 接着实现platformNetwork.c里面的函数 ```c #define rlogEnable 1 // 是否使能日志 #define rlogColorEnable 1 // 是否使能日志颜色 #define rlogLevel (rlogLvlWarning) // 日志打印等级 #define rlogTag "RyanMqttNet" // 日志tag #include "platformNetwork.h" #include "RyanMqttLog.h" /** * @brief 连接mqtt服务器 * * @param userData * @param platformNetwork * @param host * @param port * @return RyanMqttError_e * 成功返回RyanMqttSuccessError, 失败返回错误信息 */ RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host, const char *port) { RyanMqttError_e result = RyanMqttSuccessError; struct addrinfo *addrList = NULL; struct addrinfo hints = { .ai_family = AF_UNSPEC, .ai_socktype = SOCK_STREAM, .ai_protocol = IPPROTO_TCP}; if (getaddrinfo(host, port, &hints, &addrList) != 0) { result = RyanSocketFailedError; goto exit; } platformNetwork->socket = socket(addrList->ai_family, addrList->ai_socktype, addrList->ai_protocol); if (platformNetwork->socket < 0) { result = RyanSocketFailedError; goto exit; } if (connect(platformNetwork->socket, addrList->ai_addr, addrList->ai_addrlen) != 0) { platformNetworkClose(userData, platformNetwork); result = RyanMqttSocketConnectFailError; goto exit; } exit: if (NULL != addrList) freeaddrinfo(addrList); return result; } /** * @brief 非阻塞接收数据 * * @param userData * @param platformNetwork * @param recvBuf * @param recvLen * @param timeout * @return RyanMqttError_e * socket错误返回 RyanSocketFailedError * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError * 接收成功 RyanMqttSuccessError */ RyanMqttError_e platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char *recvBuf, int recvLen, int timeout) { int32_t recvResult = 0; int32_t offset = 0; int32_t timeOut2 = timeout; struct timeval tv = {0}; platformTimer_t timer = {0}; if (-1 == platformNetwork->socket) return RyanSocketFailedError; platformTimerCutdown(&timer, timeout); while ((offset < recvLen) && (0 != timeOut2)) { tv.tv_sec = timeOut2 / 1000; tv.tv_usec = timeOut2 % 1000 * 1000; if (tv.tv_sec <= 0 && tv.tv_usec <= 100) { tv.tv_sec = 0; tv.tv_usec = 100; } setsockopt(platformNetwork->socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞 recvResult = recv(platformNetwork->socket, recvBuf + offset, recvLen - offset, 0); if (recvResult <= 0) // 小于零,表示错误,个别错误不代表socket错误 { // 下列3种表示没问题,但需要推出发送 if ((errno == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时 errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空 errno == EINTR)) // 操作被信号中断 break; return RyanSocketFailedError; } offset += recvResult; timeOut2 = platformTimerRemain(&timer); } if (offset != recvLen) return RyanMqttRecvPacketTimeOutError; return RyanMqttSuccessError; } /** * @brief 非阻塞发送数据 * * @param userData * @param platformNetwork * @param sendBuf * @param sendLen * @param timeout * @return RyanMqttError_e * socket错误返回 RyanSocketFailedError * 接收超时或者接收数据长度不等于期待数据接受长度 RyanMqttRecvPacketTimeOutError * 接收成功 RyanMqttSuccessError */ RyanMqttError_e platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char *sendBuf, int sendLen, int timeout) { int32_t sendResult = 0; int32_t offset = 0; int32_t timeOut2 = timeout; struct timeval tv = {0}; platformTimer_t timer = {0}; if (-1 == platformNetwork->socket) return RyanSocketFailedError; platformTimerCutdown(&timer, timeout); while ((offset < sendLen) && (0 != timeOut2)) { tv.tv_sec = timeOut2 / 1000; tv.tv_usec = timeOut2 % 1000 * 1000; if (tv.tv_sec <= 0 && tv.tv_usec <= 100) { tv.tv_sec = 0; tv.tv_usec = 100; } setsockopt(platformNetwork->socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(struct timeval)); // 设置错做模式为非阻塞 sendResult = send(platformNetwork->socket, sendBuf + offset, sendLen - offset, 0); if (sendResult <= 0) // 小于零,表示错误,个别错误不代表socket错误 { // 下列3种表示没问题,但需要推出发送 if ((errno == EAGAIN || // 套接字已标记为非阻塞,而接收操作被阻塞或者接收超时 errno == EWOULDBLOCK || // 发送时套接字发送缓冲区已满,或接收时套接字接收缓冲区为空 errno == EINTR)) // 操作被信号中断 break; return RyanSocketFailedError; } offset += sendResult; timeOut2 = platformTimerRemain(&timer); } if (offset != sendLen) return RyanMqttSendPacketTimeOutError; return RyanMqttSuccessError; } /** * @brief 断开mqtt服务器连接 * * @param userData * @param platformNetwork * @return RyanMqttError_e */ RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork) { if (platformNetwork->socket >= 0) { closesocket(platformNetwork->socket); platformNetwork->socket = -1; } return RyanMqttSuccessError; } ``` ## 3、总结 可以看到,RyanMqtt移植非常简单,有专门的platform层用来移植。
4
条评论
默认排序
按发布时间排序
登录
注册新账号
关于作者
Ryan_CW
这家伙很懒,什么也没写!
文章
9
回答
69
被采纳
13
关注TA
发私信
相关文章
1
umqtt 软件包使用后,连接上emqx服务器,过一会儿就掉线了
2
使用正点原子的 潘多拉 开发板 的例程19_iot_mqtt
3
mqtt软件包,不支持直接关闭?
4
kawaii_mqtt 申请内存崩溃
5
_signal_entry() 函数中dbg_enter在哪里定义呢?
6
start to connect mqtt server 失败
7
MQTT 在“ read 0:1, break “后断开重连
8
paho_mqtt线程相关疑问
9
RT thread studio kawaii mqtt 无法连接EMQ
10
调试bc26 ,断言错误failed at rt_thread_timeout
推荐文章
1
RT-Thread应用项目汇总
2
玩转RT-Thread系列教程
3
国产MCU移植系列教程汇总,欢迎查看!
4
机器人操作系统 (ROS2) 和 RT-Thread 通信
5
五分钟玩转RT-Thread新社区
6
【技术三千问】之《玩转ART-Pi》,看这篇就够了!干货汇总
7
关于STM32H7开发板上使用SDIO接口驱动SD卡挂载文件系统的问题总结
8
STM32的“GPU”——DMA2D实例详解
9
RT-Thread隐藏的宝藏之completion
10
【ART-PI】RT-Thread 开启RTC 与 Alarm组件
热门标签
RT-Thread Studio
串口
Env
LWIP
SPI
AT
Bootloader
Hardfault
CAN总线
FinSH
ART-Pi
USB
DMA
文件系统
RT-Thread
SCons
RT-Thread Nano
线程
MQTT
STM32
RTC
FAL
rt-smart
ESP8266
I2C_IIC
WIZnet_W5500
UART
ota在线升级
PWM
cubemx
freemodbus
flash
packages_软件包
BSP
潘多拉开发板_Pandora
定时器
ADC
GD32
flashDB
socket
中断
Debug
编译报错
msh
SFUD
rt_mq_消息队列_msg_queue
keil_MDK
ulog
MicroPython
C++_cpp
本月问答贡献
a1012112796
20
个答案
3
次被采纳
张世争
11
个答案
3
次被采纳
踩姑娘的小蘑菇
7
个答案
3
次被采纳
rv666
9
个答案
2
次被采纳
用户名由3_15位
13
个答案
1
次被采纳
本月文章贡献
程序员阿伟
9
篇文章
2
次点赞
hhart
3
篇文章
4
次点赞
RTT_逍遥
1
篇文章
6
次点赞
大龄码农
1
篇文章
5
次点赞
ThinkCode
1
篇文章
1
次点赞
回到
顶部
发布
问题
投诉
建议
回到
底部