Toggle navigation
首页
问答
文章
积分商城
专家
专区
更多专区...
文档中心
返回主站
搜索
提问
会员
中心
登录
注册
内核
数据结构
RT-Thread 隐藏的宝藏之 data_queue
发布于 2021-02-27 14:55:47 浏览:1656
订阅该版
[tocm] ### 1. data_queue 是什么 `data_queue` 直接翻译过来是 **数据队列**。这个名字和 **消息队列** 很像。那么他们有什么区别呢? **消息队列**:消息队列能够接收来自线程或中断服务例程中不固定长度的消息,并把消息缓存在自己的内存空间中。其他线程也能够从消息队列中读取相应的消息,而当消息队列是空的时候,可以挂起读取线程。当有新的消息到达时,挂起的线程将被唤醒以接收并处理消息。消息队列是一种异步的通信方式。(摘自 [RT-Thread文档中心](https://www.rt-thread.org/document/site/programming-manual/ipc2/ipc2/#_13)). **数据队列**:没有找到官方详细的说明,只是在 [RT-Thread API参考手册](https://www.rt-thread.org/document/api/group__dataqueue.html),有介绍。数据队列能够接收来自线程中不固定长度的数据,数据 **不会** 缓存在自己的内存空间中,自己的内存空间只有一个指向这包数据的指针。其他线程也能够从数据队列获取数据,当数据队列为空的时候,可以挂起线程。当有新的数据到达时,挂起的线程将被唤醒以接收并处理消息。数据队列是一种异步的通信方式。 **消息队列** 是用于线程消息传递的,属于**线程间同步异步 IPC**;消息队列在 `recv` 数据之后,这组数据就没了。 **数据队列** 更多的使用在流式数据传递,属于**线程间通信 IPC**;数据队列可以使用 `peak` 的方式 `舔一下` 这组数据不会丢失。自带高、低水位,可以对锯齿速度(压入数据的间隔不一致,时快时慢的)情况进行调节. ### 2. data_queue 怎么使用 `data_queue` 的 **API** 很少,使用起来也非常简单。 ##### 1. 定义一个数据队列 ``` struct rt_data_queue queue; ``` ##### 2. 初始化数据队列 ``` rt_data_queue_init(struct rt_data_queue *queue, rt_uint16_t size, rt_uint16_t lwm, void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event)) ``` **struct rt_data_queue** : 数据队列控制块 **size** : 数据队列的容量 **lwm** : 数据队列的阈值 **(*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event)**: 通知回调函数 ##### 3. 反初始化数据队列 ``` rt_err_t rt_data_queue_deinit(struct rt_data_queue *queue) ``` ##### 4. 往数据队列压入数据 ``` rt_err_t rt_data_queue_push(struct rt_data_queue *queue, const void *data_ptr, rt_size_t data_size, rt_int32_t timeout) ``` **const void *data_ptr** : 保存数据的指针 **data_size** : 数据的大小 ##### 5. 从数据队列弹出数据 ``` rt_err_t rt_data_queue_pop(struct rt_data_queue *queue, const void** data_ptr, rt_size_t *size, rt_int32_t timeout) ``` **data_ptr** : 指向了保存数据的地址 **size** : 收到数据的大小 ##### 6. 取出但保留数据队列中的数据 ``` rt_err_t rt_data_queue_peak(struct rt_data_queue *queue, const void** data_ptr, rt_size_t *size) ``` **data_ptr** : 指向了保存数据的地址 **size** : 收到数据的大小 ##### 7. 重置数据队列 ``` void rt_data_queue_reset(struct rt_data_queue *queue) ``` ### 3. data_queue 的实现 ##### 1. 初始化数据队列 伪代码: ``` rt_err_t rt_data_queue_init(struct rt_data_queue *queue, rt_uint16_t size, rt_uint16_t lwm, void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event)) { queue->evt_notify = evt_notify; queue->magic = DATAQUEUE_MAGIC; queue->size = size; queue->lwm = lwm; queue->get_index = 0; queue->put_index = 0; rt_list_init(&(queue->suspended_push_list)); rt_list_init(&(queue->suspended_pop_list)); queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size); return RT_EOK; } ``` 给 `struct rt_data_queue` 的成员赋值: **evt_notify** : 事件通知回调函数 **size** : 数据队列的容量 **lwm** : 数据队列的阈值 **magic** :数据队列的魔法字 **get_index** :获取数据的下标 **put_index** :压入数据的下标 **suspended_push_list** :压入数据的挂起线程的链表 **suspended_pop_list** :弹出数据的挂起线程的链表 ##### 2. 反初始化数据队列 伪代码: ``` rt_err_t rt_data_queue_deinit(struct rt_data_queue *queue) { rt_data_queue_reset(queue); queue->magic = 0; rt_free(queue->queue); return RT_EOK; } ``` 释放 `data_queue` 申请的内存。 ##### 3. 向数据队列中写入数据 伪代码: ``` rt_err_t rt_data_queue_push(struct rt_data_queue *queue, const void *data_ptr, rt_size_t data_size, rt_int32_t timeout) { rt_ubase_t level; rt_thread_t thread; rt_err_t result; result = RT_EOK; thread = rt_thread_self();//获取当前线程 level = rt_hw_interrupt_disable();//关中断 while (queue->put_index - queue->get_index == queue->size)//相等说明,队列已经满了 { /* queue is full */ if (timeout == 0) //已经满了,超时时间写的是0,那么就立即返回 -RT_ETIMEOUT { result = -RT_ETIMEOUT; goto __exit; } /* suspend thread on the push list */ rt_thread_suspend(thread);//挂起这个线程,然后把线程的链表插入到队列里面 rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist)); /* 设置线程自带的定时器 */ if (timeout > 0) { /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); } /* 进行调度 */ rt_schedule(); /* 获取线程的状态 */ result = thread->error; if (result != RT_EOK) goto __exit; } queue->queue[queue->put_index % queue->size].data_ptr = data_ptr;//把实际的数据的地址赋值给数据队列 queue->queue[queue->put_index % queue->size].data_size = data_size;//把实际的数据的大小赋值给数据队列 queue->put_index += 1;// 压入数据的下标加 1 /* 如果有线程在挂起链表上 */ if (!rt_list_isempty(&(queue->suspended_pop_list))) { /* get thread entry */ thread = rt_list_entry(queue->suspended_pop_list.next, struct rt_thread, tlist); /* 恢复这个线程 */ rt_thread_resume(thread); rt_schedule(); return result; } __exit: /* 如果定义了时间回调函数,则会发送一个 RT_DATAQUEUE_EVENT_PUSH 的事件*/ if ((result == RT_EOK) && queue->evt_notify != RT_NULL) { queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH); } return result; } ``` ##### 4. 取出数据队列中的数据 伪代码 : ``` rt_err_t rt_data_queue_pop(struct rt_data_queue *queue, const void** data_ptr, rt_size_t *size, rt_int32_t timeout) { rt_thread_t thread; rt_err_t result; result = RT_EOK; thread = rt_thread_self();//获取当前线程 while (queue->get_index == queue->put_index) // put 和 get 相等,说明队列为空 { /* 超时时间设置为0,那么就立即返回超时 */ if (timeout == 0) { result = -RT_ETIMEOUT; goto __exit; } /* suspend thread on the pop list */ rt_thread_suspend(thread);//挂起这个线程,并把线程的链表挂到 pop 链表上 rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist)); /* 启动定时器 */ if (timeout > 0) { /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); } /* 进行调度 */ rt_schedule(); /* 获取线程的状态 */ result = thread->error; if (result != RT_EOK) goto __exit; } *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr;//获取数据的地址 *size = queue->queue[queue->get_index % queue->size].data_size;//获取数据的大小 queue->get_index += 1;// get 的下标加 1 if ((queue->put_index - queue->get_index) <= queue->lwm) //到达阈值 { /* 如果有线程在挂起链表上 */ if (!rt_list_isempty(&(queue->suspended_push_list))) { /* 获取这个线程的控制块 */ thread = rt_list_entry(queue->suspended_push_list.next, struct rt_thread, tlist); /* 恢复这个线程 */ rt_thread_resume(thread); rt_schedule(); } else { rt_hw_interrupt_enable(level); } /* 如果定义了事件回调函数,则发送 RT_DATAQUEUE_EVENT_LWM*/ if (queue->evt_notify != RT_NULL) queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM); return result; } __exit: /* 如果定义了事件回调函数,则发送 RT_DATAQUEUE_EVENT_POP */ if ((result == RT_EOK) && (queue->evt_notify != RT_NULL)) { queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP); } return result; } ``` ##### 5. 取出但保留数据队列中的数据 ``` rt_err_t rt_data_queue_peak(struct rt_data_queue *queue, const void** data_ptr, rt_size_t *size) { rt_ubase_t level; level = rt_hw_interrupt_disable(); if (queue->get_index == queue->put_index) // 相等说明队列为空,立即返回 { rt_hw_interrupt_enable(level); return -RT_EEMPTY; } *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr; //获取数据的地址 *size = queue->queue[queue->get_index % queue->size].data_size; //获取数据的大小 rt_hw_interrupt_enable(level); return RT_EOK; } ``` **注意** : 这里没有移动 `queue->get_index += 1;` 所以数据没有被取走,下次读还是在这里。这也就是 `pop` 和 `peak` 的区别。 ##### 6. 重置数据队列 伪代码: ``` void rt_data_queue_reset(struct rt_data_queue *queue) { struct rt_thread *thread; register rt_ubase_t temp; rt_enter_critical();//进入临界区 /* wakeup all suspend threads */ /* 唤醒所有 pop list 上的线程 */ while (!rt_list_isempty(&(queue->suspended_pop_list))) //如果 pop list 上有挂起的线程 { /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get next suspend thread */ thread = rt_list_entry(queue->suspended_pop_list.next, //获取线程控制块 struct rt_thread, tlist); /* set error code to RT_ERROR */ thread->error = -RT_ERROR;// 设置线程的状态为 RT_ERROR /* * resume thread * In rt_thread_resume function, it will remove current thread from * suspend list */ rt_thread_resume(thread);// 恢复线程 /* enable interrupt */ rt_hw_interrupt_enable(temp); } /* 唤醒所有 push list 上的线程 */ while (!rt_list_isempty(&(queue->suspended_push_list)))//如果 push list 上有挂起的线程 { /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get next suspend thread */ thread = rt_list_entry(queue->suspended_push_list.next,//获取线程控制块 struct rt_thread, tlist); /* set error code to RT_ERROR */ thread->error = -RT_ERROR;// 设置线程的状态为 RT_ERROR /* * resume thread * In rt_thread_resume function, it will remove current thread from * suspend list */ rt_thread_resume(thread);// 恢复线程 /* enable interrupt */ rt_hw_interrupt_enable(temp);// 开中断 } rt_exit_critical();//退出临界区 rt_schedule();//进行调度 } ``` ### 4. 最后 1. 标准版 **RT-Thread** 中的 **rt_data_queue** 源码在 `"\rt-thread\components\drivers\src\dataqueue.c"`在你要使用的文件中`#include dataqueue.h`直接就可以使用。 2. Nano 版 **RT-Thread** 直接拷贝 `dataqueue.c` 和 `dataqueue` 添加到工程就可以使用。 3. `rt_data_queue` **不是** 从 `rt_object` 派生出来的。 `rt_data_queue` 运用于流数据的特定场景。
1
条评论
默认排序
按发布时间排序
登录
注册新账号
关于作者
whj467467222
开源,分享,交流,共同进步
文章
32
回答
1222
被采纳
148
关注TA
发私信
相关文章
1
rt-thread的学习疑惑
2
基于stm32的RTT在RTT Studio IDE环境中的启动顺序求解
3
关于 rt_object_detach 脱离内核对象函数的作用求解
4
RT-Thread内核什么时候考虑加入MPU功能?
5
rt_hw_board_init中开中断后,触发SysTick_Handler
6
Cortex-M0在bootloader环境下的上下文切换问题?
7
关于ART-PI的bootloader是怎么烧写进去的
8
为什么内核代码和bootloader的代码一样的
9
线程对象结构体为什么不直接选择继承内核对象?
10
使用rt_memset给线程栈初始化,为什么选择字符‘#’,而不是‘\0’?
推荐文章
1
RT-Thread应用项目汇总
2
玩转RT-Thread系列教程
3
国产MCU移植系列教程汇总,欢迎查看!
4
机器人操作系统 (ROS2) 和 RT-Thread 通信
5
五分钟玩转RT-Thread新社区
6
【技术三千问】之《玩转ART-Pi》,看这篇就够了!干货汇总
7
关于STM32H7开发板上使用SDIO接口驱动SD卡挂载文件系统的问题总结
8
STM32的“GPU”——DMA2D实例详解
9
RT-Thread隐藏的宝藏之completion
10
【ART-PI】RT-Thread 开启RTC 与 Alarm组件
热门标签
RT-Thread Studio
串口
Env
LWIP
SPI
AT
Bootloader
Hardfault
CAN总线
FinSH
ART-Pi
USB
DMA
文件系统
RT-Thread
SCons
RT-Thread Nano
线程
MQTT
STM32
RTC
FAL
rt-smart
ESP8266
I2C_IIC
UART
WIZnet_W5500
ota在线升级
freemodbus
PWM
flash
cubemx
packages_软件包
BSP
潘多拉开发板_Pandora
定时器
ADC
flashDB
GD32
socket
中断
编译报错
Debug
SFUD
rt_mq_消息队列_msg_queue
msh
keil_MDK
ulog
C++_cpp
MicroPython
本月问答贡献
a1012112796
10
个答案
1
次被采纳
踩姑娘的小蘑菇
4
个答案
1
次被采纳
红枫
4
个答案
1
次被采纳
张世争
4
个答案
1
次被采纳
Ryan_CW
4
个答案
1
次被采纳
本月文章贡献
catcatbing
3
篇文章
5
次点赞
YZRD
2
篇文章
5
次点赞
qq1078249029
2
篇文章
2
次点赞
xnosky
2
篇文章
1
次点赞
Woshizhapuren
1
篇文章
5
次点赞
回到
顶部
发布
问题
投诉
建议
回到
底部