Toggle navigation
首页
问答
文章
积分商城
专家
专区
更多专区...
文档中心
返回主站
搜索
提问
会员
中心
登录
注册
ART-Pi Smart
POSIX
【ART-PI Smart 抛砖引玉 六】基于POSIX的应用开发之消息队列
发布于 2022-05-13 23:43:07 浏览:596
订阅该版
[tocm] # POSIX 多线程通信之消息队列 ## 消息队列引入 在多线程应用开发中,除了前几篇文章提到的临界区的问题以外,还需要考虑一种应用场景,那就是多个线程中间关键数据(如控制指令等)需要按照某种循序一次传输到另一个线程中。在这种场景下,是否能有一种机制,一方面可以使重要的数据按照某种确定的次序排列缓存,另一方面可以使等待获取这些数据的线程根据整个数据串的状态做出相对应的同步响应(阻塞/解除阻塞/超时等)呢?对于上述需求,POSIX已经提供了完善的机制来应对上述问题,那就是消息队列。 消息队列可以简单的理解为,某种数据块,按照时间序列或者优先级顺序,被组织排列成一串内存块,类似于FIFO但是又不同于简单的FIFO,它又有着多线程间操作需要保证的一些复杂细节。线程或中断服务例程可以将一条或多条消息放入消息队列中。同样,一个或多个线程也可以从消息队列中获得消息。当有多个消息发送到消息队列时,通常将先进入消息队列的消息先传给线程,也就是说,线程先得到的是最先进入消息队列的消息,即先进先出原则 (FIFO)  ## 消息队列基本操作API ### 创建/打开消息队列 ```c int mqd_t mq_open(const char *name, int oflag, /* mode_t mode, struct mq_attr* attr */); ``` name 参数标识出了消息队列。 oflag 参数是一个位掩码,它控制着 mq_open()操作的各个方面。下表对这个掩码中可以包含的值进行了总结。 | 标 记 | 描 述 | | :--------------------------- | -------------------------------------------------- | | O_CREAT / O_EXCL | 队列不存在时创建队列 与 O_CREAT 一起排它地创建队列 | | O_RDONLY / O_WRONLY / O_RDWR | 只读打开 只写打开 读写打开 | | O_NONBLOCK | 以非阻塞模式打开 | oflag 参数的其中一个用途是,确定是打开一个既有队列还是创建和打开一个新队列。如果在 oflag 中不包含 O_CREAT,那么将会打开一个既有队列。如果在 oflag 中包含了 O_CREAT,并且与给定的 name 对应的队列不存在,那么就会创建一个新的空队列。如果在 oflag 中同时包含 O_CREAT 和 O_EXCL, 并且与给定的 name 对应的队列已经存在, 那么 mq_open()就会失败。oflag 参数还能够通过包含 O_RDONLY、 O_WRONLY 以及 O_RDWR 这三个值中的一个来表明调用进程在消息队列上的访问方式。剩下的一个标记值 O_NONBLOCK 将会导致以非阻塞的模式打开队列。如果后续的 mq_receive()或 mq_send()调用无法在不阻塞的情况下执行, 那么调用就会立即返回 EAGAIN 错误。mq_open()通常用来打开一个既有消息队列,这种调用只需要两个参数,但如果在 flags中指定了 O_CREAT,那么就还需要另外两个参数: mode 和 attr。(如果通过 name 指定的队列已经存在,那么这两个参数会被忽略。 )这些参数的用法如下。 - mode 参数是一个位掩码,它指定了施加于新消息队列之上的权限。这个参数可取的位值与文件上的掩码值是一样的,并且与 open()一样, mode 中的值会与进程的 umask取掩码。要从一个队列中读取消息mq_receive()就必须要将读权限赋予相应的用户,要向队列写入消息 mq_send()就需要写权限。 - attr 参数是一个 mq_attr 结构,它指定了新消息队列的特性。如果 attr 为 NULL,那么将使用实现定义的默认特性创建队列。 mq_open()在成功结束时会返回一个消息队列描述符,它是一个类型为 mqd_t 的值,在后续的调用中将会使用它来引用这个打开着的消息队列。 ### 关闭消息队列 关闭一个消息队列,其数原型如下所示: ```c int mq_close(mqd_t mqdes); ``` 与标准文件操作上的 close()一样,关闭一个消息队列并不会删除该队列。要删除队列则需要使用mq_unlink()。 ### 删除消息队列 ```c int mq_unlink(const char *name); ``` 此函数会根据消息队列名称 name 查找消息队列,若找到,则将消息队列置为分离状态,之后若持有计数为 0,则删除消息队列,并释放消息队列占有的资源。 ### 发送消息到队列 mq_send()函数会将位于 msg_ptr 指向的缓冲区中的消息添加到描述符 mqdes 所引用的消息队列中 。msg_len 参数指定了 msg_ptr 指向的消息的长度, 其值必须小于或等于队列的 mq_msgsize特性,否则 mq_send()就会返回 EMSGSIZE 错误。长度为零的消息是允许的。每条消息都拥有一个用非负整数表示的优先级,它通过 msg_prio 参数指定。消息在队列 中是按照优先级倒序排列的(即 0 表示优先级最低)。 当一条消息被添加到队列中时,它会被放置在队列中具有相同的优先级的所有消息之后。如果一个应用程序无需使用消息优先级,那么只需要将 msg_prio 指定为 0 即可 。如果消息队列已经满了(即已经达到了队列的 mq_maxmsg 限制),那么后续的 mq_send()调用会阻塞直到队列中存在可用空间为止或者在 O_NONBLOCK 标记起作用时立即失败并返回 EAGAIN 错误 。 ```c int mq_send(mqd_t mqdes,const char *msg_ptr,size_t msg_len,unsigned msg_prio); int mq_timedsend(mqd_t mqdes,const char *msg_ptr,size_t msg_len,unsigned msg_prio,const struct timespec *abs_timeout); ``` mq_timedsend与mq_send在操作上类似,唯一的区别在于mq_timedsend函数允许当前操作阻塞设置的时间再返回。两种消息发送函数视情况而定。 ### 从队列获取消息 mq_receive()函数从 mqdes 引用的消息队列中删除一条优先级最高、存在时间最长的消息并将删除的消息放置在 msg_ptr 指向的缓冲区 。 ```C ssize_t mq_receive(mqd_t mqdes,char *msg_ptr,size_t msg_len,unsigned *msg_prio); ssize_t mq_timedreceive(mqd_t mqdes,char *msg_ptr,size_t msg_len,unsigned *msg_prio,const struct timespec *abs_timeout); ``` 调用者使用 msg_len 参数来指定 msg_ptr 指向的缓冲区中的可用字节数 。不管消息的实际大小是什么, msg_len(即 msg_ptr 指向的缓冲区的大小)必须要大于或等于队列的 mq_msgsize 特性,否则 mq_receive()就会失败并返回 EMSGSIZE 错误。如果不清楚一个队列的 mq_msgsize 特性的值,那么可以使用 mq_getattr()来获取这个值。 如果 msg_prio 不为 NULL,那么接收到的消息的优先级会被复制到 msg_prio 指向的位置处。如果消息队列当前为空,那么mq_receive()会阻塞直到存在可用的消息或在O_NONBLOCK 标记起作用时会立即失败并返回 EAGAIN 错误 。 mq_receive与mq_timedreceive在接收上的区别主要是后者能够按照设定的超时时间自动解除阻塞。两种接收函数可视具体情况选择。 ### 获取消息队列属性 ```c int mq_getattr(mqd_t mqdes, struct mq_attr* attr); ``` mq_getattr()函数返回一个包含与描述符 mqdes 相关联的消息队列描述和消息队列的相关信息的 mq_attr 结构。 除了上面已经介绍的 mq_maxmsg 和 mq_msgsize 字段之外, attr 指向的返回结构中还包含下列字段。 - mq_flags:与描述符 mqdes 相关联的打开的消息队列描述的标记,其取值只有一个O_NONBLOCK。这个标记是根据 mq_open()的 oflag 参数来初始化的,并且使用 mq_setattr()可以修改这个标记。 - mq_curmsgs:这个当前位于队列中的消息数。这个信息在 mq_getattr()返回时可能已经发生了改变,前 提是存在其他进程从队列中读取消息或向队列写入消息。 ### 设置/修改消息队列属性 ```c int mq_setattr(mqd_t mqdes, const struct mq_attr* newattr, struct mq_attr* oldattr); ``` mq_setattr()函数执行下列任务: - 它使用 newattr 指向的 mq_attr 结构中的 mq_flags 字段来修改与描述符 mqdes 相关联 的消息队列描述的标记。 - 如果 oldattr 不为 NULL,那么就返回一个包含之前的消息队列描述标记和消息队列特 性的 mq_attr 结构(即与 mq_getattr()执行的任务一样)。 应用程序应该通过使用 mq_getattr() 来获取 mq_flags 值并修改O_NONBLOCK 位来修改 O_NONBLOCK 标记的状态以及调用 mq_setattr()来修改 mq_flags 设置。 ## 消息队列在多线程通信中的应用 ### 消息发送线程设计 发送队列线程会发送16个消息,发送完毕后自动退出线程,在发送过程中,最后一个数据为退出指令,接收线程收到该指令,意味着发送线程发送完毕,其自身主动退出。 线程用随机睡眠时间,来模拟发送端随机发送消息的效果。 ```C static void* send_task(void* argc) { char data = 0xaa; char end = 0x55; for(int i=0; i<16; i++) { if(i==15) { mq_send(mq, &end, 1, 0); } else { mq_send(mq, &data, 1, 0); } sleep(rand()%3); } return NULL; } ``` ### 消息接收线程设计 消息结束线程采用阻塞等待的方式来获取消息队列中的数据,当获取到退出指令0x55时,则会主动退出线程。其余数据每次收到之后,将会比对校验数据是否正确,如果数据接收错误,则输出错误信息。 线程用随机睡眠时间,来模拟接收端随机接收消息的效果。 ```C static void* receive_task(void* argc) { char msg = 0x00; while(1) { mq_receive(mq, &msg, 1, 0); if(msg == 0x55) { break; } else if(msg != 0xaa) { printf("[error] msg recv: 0x%x %s\r\n",msg,strerror(errno)); } sleep(rand()%3); } return NULL; } ``` ### 主程序设计 在主程序之前,首先定义了宏来表示消息队列的名称。 ```C #define MQ_NAME "/mq0" ``` 主程序中,首先设置消息队列句柄mqattr的参数,本程序中设置最大消息数64,一个消息大小为1字节。然后通过mq_open函数创建一个符合设置参数的消息队列,之后分别创建消息发送线程和接收线程,并且把各个子线程设置为连接状态,此时主程序阻塞等待子线程执行完毕后退出。 发送线程会发送两种数据,0x55和0xaa,分别表示退出指令和普通数据;接收线程收到消息队列的数据,通过简单比对,如果数据等于0x55,则表示该消息收发正确,否则打印错误信息。当接收线程收到0xaa指令,则线程退出。 ```C int msgqueue_demo(void) { pthread_t tid_send, tid_receive; struct mq_attr mqattr; printf("\n [msgqueue_demo]\r\n"); mqattr.mq_maxmsg = 64; mqattr.mq_msgsize = 1; mq = mq_open(MQ_NAME, O_RDWR|O_CREAT|O_EXCL, S_IRWXU|S_IRWXG|S_IRWXO, &mqattr); printf("mq : %d %s\n", mq,strerror(errno)); if(mq == (-1)) { printf("mq : %d %s\n", mq,strerror(errno)); return 0; } pthread_create(&tid_send, NULL, send_task, NULL); pthread_create(&tid_receive, NULL, receive_task, NULL); pthread_join(tid_send, NULL); pthread_join(tid_receive, NULL); mq_close(mq); mq_unlink(MQ_NAME); return 0; } ``` ## 总结 本篇介绍了多线程程序中另一种机制---线程间通信。消息队列作为最重要的多线程编程机制之一,在很多场合都有着重要的作用,比如利用消息队列实现处理指令的缓冲等。文章设计了消息队列的一种典型的应用场合,通过例程的设计和执行。可以很清晰的看到消息队列在多线程数据交互传输之间的应用细节。 本篇是POSIX标准接口应用开发基础的的最后一篇,由于能力有限,只能把自己在Smart系统上尝试过的这些基础操作分享出来。后续如果时间允许,会继续分享在POSIX标准下开发的一些Smart应用实例。 结尾照例分享一首喜欢的诗,生活如沧浪之水,清浊难辨,好坏难分,灰色是生命的常态,平凡是生活的圣经。 > 目击众神死亡的草原上野花一片 远在远方的风比远方更远 我的琴声呜咽 泪水全无 我把这远方的远归还草原 ——海子《九月》
3
条评论
默认排序
按发布时间排序
登录
注册新账号
关于作者
RickFlying
something in the way
文章
10
回答
45
被采纳
0
关注TA
发私信
相关文章
1
移植rt-thread2.1.0缺少components/pthreads里的posix_types.h文件中包含的sy
2
[征集自愿者] POSIX相关章节文档编写
3
RT-Thread POSIX支持
4
POSIX标准接口针对文件系统没有导出fsync
5
POSIX接口的select还没实现?
6
pthread 组建bug反馈
7
RTT中的POSIX支持
8
RT-Thread 如何用POSIX接口
9
使用 RT_USING_POSIX finsh能显示,但不能输入
10
请教、讨论POSIX接口、dfs中的pos和size问题
推荐文章
1
RT-Thread应用项目汇总
2
玩转RT-Thread系列教程
3
国产MCU移植系列教程汇总,欢迎查看!
4
机器人操作系统 (ROS2) 和 RT-Thread 通信
5
五分钟玩转RT-Thread新社区
6
【技术三千问】之《玩转ART-Pi》,看这篇就够了!干货汇总
7
关于STM32H7开发板上使用SDIO接口驱动SD卡挂载文件系统的问题总结
8
STM32的“GPU”——DMA2D实例详解
9
RT-Thread隐藏的宝藏之completion
10
【ART-PI】RT-Thread 开启RTC 与 Alarm组件
热门标签
RT-Thread Studio
串口
Env
LWIP
SPI
AT
Bootloader
Hardfault
CAN总线
FinSH
ART-Pi
DMA
USB
文件系统
RT-Thread
SCons
RT-Thread Nano
线程
MQTT
STM32
RTC
rt-smart
FAL
I2C_IIC
UART
ESP8266
cubemx
WIZnet_W5500
ota在线升级
PWM
BSP
flash
freemodbus
packages_软件包
潘多拉开发板_Pandora
定时器
ADC
GD32
flashDB
socket
编译报错
中断
Debug
rt_mq_消息队列_msg_queue
keil_MDK
ulog
SFUD
msh
C++_cpp
MicroPython
本月问答贡献
RTT_逍遥
10
个答案
3
次被采纳
xiaorui
3
个答案
2
次被采纳
winfeng
2
个答案
2
次被采纳
三世执戟
8
个答案
1
次被采纳
KunYi
8
个答案
1
次被采纳
本月文章贡献
catcatbing
3
篇文章
5
次点赞
lizimu
2
篇文章
9
次点赞
swet123
1
篇文章
4
次点赞
Days
1
篇文章
4
次点赞
YZRD
1
篇文章
2
次点赞
回到
顶部
发布
问题
投诉
建议
回到
底部