在多线程应用开发中,除了前几篇文章提到的临界区的问题以外,还需要考虑一种应用场景,那就是多个线程中间关键数据(如控制指令等)需要按照某种循序一次传输到另一个线程中。在这种场景下,是否能有一种机制,一方面可以使重要的数据按照某种确定的次序排列缓存,另一方面可以使等待获取这些数据的线程根据整个数据串的状态做出相对应的同步响应(阻塞/解除阻塞/超时等)呢?对于上述需求,POSIX已经提供了完善的机制来应对上述问题,那就是消息队列。
消息队列可以简单的理解为,某种数据块,按照时间序列或者优先级顺序,被组织排列成一串内存块,类似于FIFO但是又不同于简单的FIFO,它又有着多线程间操作需要保证的一些复杂细节。线程或中断服务例程可以将一条或多条消息放入消息队列中。同样,一个或多个线程也可以从消息队列中获得消息。当有多个消息发送到消息队列时,通常将先进入消息队列的消息先传给线程,也就是说,线程先得到的是最先进入消息队列的消息,即先进先出原则 (FIFO)
int mqd_t mq_open(const char *name, int oflag, /* mode_t mode, struct mq_attr* attr */);
name 参数标识出了消息队列。
oflag 参数是一个位掩码,它控制着 mq_open()操作的各个方面。下表对这个掩码中可以包含的值进行了总结。
标 记 | 描 述 |
---|---|
O_CREAT / O_EXCL | 队列不存在时创建队列 与 O_CREAT 一起排它地创建队列 |
O_RDONLY / O_WRONLY / O_RDWR | 只读打开 只写打开 读写打开 |
O_NONBLOCK | 以非阻塞模式打开 |
oflag 参数的其中一个用途是,确定是打开一个既有队列还是创建和打开一个新队列。如果在 oflag 中不包含 O_CREAT,那么将会打开一个既有队列。如果在 oflag 中包含了 O_CREAT,并且与给定的 name 对应的队列不存在,那么就会创建一个新的空队列。如果在 oflag 中同时包含 O_CREAT 和 O_EXCL, 并且与给定的 name 对应的队列已经存在, 那么 mq_open()就会失败。oflag 参数还能够通过包含 O_RDONLY、 O_WRONLY 以及 O_RDWR 这三个值中的一个来表明调用进程在消息队列上的访问方式。剩下的一个标记值 O_NONBLOCK 将会导致以非阻塞的模式打开队列。如果后续的
mq_receive()或 mq_send()调用无法在不阻塞的情况下执行, 那么调用就会立即返回 EAGAIN 错误。mq_open()通常用来打开一个既有消息队列,这种调用只需要两个参数,但如果在 flags中指定了 O_CREAT,那么就还需要另外两个参数: mode 和 attr。(如果通过 name 指定的队列已经存在,那么这两个参数会被忽略。 )这些参数的用法如下。
mq_open()在成功结束时会返回一个消息队列描述符,它是一个类型为 mqd_t 的值,在后续的调用中将会使用它来引用这个打开着的消息队列。
关闭一个消息队列,其数原型如下所示:
int mq_close(mqd_t mqdes);
与标准文件操作上的 close()一样,关闭一个消息队列并不会删除该队列。要删除队列则需要使用mq_unlink()。
int mq_unlink(const char *name);
此函数会根据消息队列名称 name 查找消息队列,若找到,则将消息队列置为分离状态,之后若持有计数为 0,则删除消息队列,并释放消息队列占有的资源。
mq_send()函数会将位于 msg_ptr 指向的缓冲区中的消息添加到描述符 mqdes 所引用的消息队列中 。msg_len 参数指定了 msg_ptr 指向的消息的长度, 其值必须小于或等于队列的 mq_msgsize特性,否则 mq_send()就会返回 EMSGSIZE 错误。长度为零的消息是允许的。每条消息都拥有一个用非负整数表示的优先级,它通过 msg_prio 参数指定。消息在队列
中是按照优先级倒序排列的(即 0 表示优先级最低)。
当一条消息被添加到队列中时,它会被放置在队列中具有相同的优先级的所有消息之后。如果一个应用程序无需使用消息优先级,那么只需要将 msg_prio 指定为 0 即可 。如果消息队列已经满了(即已经达到了队列的 mq_maxmsg 限制),那么后续的 mq_send()调用会阻塞直到队列中存在可用空间为止或者在 O_NONBLOCK 标记起作用时立即失败并返回 EAGAIN 错误 。
int mq_send(mqd_t mqdes,const char *msg_ptr,size_t msg_len,unsigned msg_prio);
int mq_timedsend(mqd_t mqdes,const char *msg_ptr,size_t msg_len,unsigned msg_prio,const struct timespec *abs_timeout);
mq_timedsend与mq_send在操作上类似,唯一的区别在于mq_timedsend函数允许当前操作阻塞设置的时间再返回。两种消息发送函数视情况而定。
mq_receive()函数从 mqdes 引用的消息队列中删除一条优先级最高、存在时间最长的消息并将删除的消息放置在 msg_ptr 指向的缓冲区 。
ssize_t mq_receive(mqd_t mqdes,char *msg_ptr,size_t msg_len,unsigned *msg_prio);
ssize_t mq_timedreceive(mqd_t mqdes,char *msg_ptr,size_t msg_len,unsigned *msg_prio,const struct timespec *abs_timeout);
调用者使用 msg_len 参数来指定 msg_ptr 指向的缓冲区中的可用字节数 。不管消息的实际大小是什么, msg_len(即 msg_ptr 指向的缓冲区的大小)必须要大于或等于队列的 mq_msgsize 特性,否则 mq_receive()就会失败并返回 EMSGSIZE 错误。如果不清楚一个队列的 mq_msgsize 特性的值,那么可以使用 mq_getattr()来获取这个值。 如果 msg_prio 不为 NULL,那么接收到的消息的优先级会被复制到 msg_prio 指向的位置处。如果消息队列当前为空,那么mq_receive()会阻塞直到存在可用的消息或在O_NONBLOCK 标记起作用时会立即失败并返回 EAGAIN 错误 。
mq_receive与mq_timedreceive在接收上的区别主要是后者能够按照设定的超时时间自动解除阻塞。两种接收函数可视具体情况选择。
int mq_getattr(mqd_t mqdes, struct mq_attr* attr);
mq_getattr()函数返回一个包含与描述符 mqdes 相关联的消息队列描述和消息队列的相关信息的 mq_attr 结构。 除了上面已经介绍的 mq_maxmsg 和 mq_msgsize 字段之外, attr 指向的返回结构中还包含下列字段。
int mq_setattr(mqd_t mqdes, const struct mq_attr* newattr, struct mq_attr* oldattr);
mq_setattr()函数执行下列任务:
应用程序应该通过使用 mq_getattr() 来获取 mq_flags 值并修改O_NONBLOCK 位来修改 O_NONBLOCK 标记的状态以及调用 mq_setattr()来修改 mq_flags 设置。
发送队列线程会发送16个消息,发送完毕后自动退出线程,在发送过程中,最后一个数据为退出指令,接收线程收到该指令,意味着发送线程发送完毕,其自身主动退出。
线程用随机睡眠时间,来模拟发送端随机发送消息的效果。
static void* send_task(void* argc)
{
char data = 0xaa;
char end = 0x55;
for(int i=0; i<16; i++)
{
if(i==15)
{
mq_send(mq, &end, 1, 0);
}
else
{
mq_send(mq, &data, 1, 0);
}
sleep(rand()%3);
}
return NULL;
}
消息结束线程采用阻塞等待的方式来获取消息队列中的数据,当获取到退出指令0x55时,则会主动退出线程。其余数据每次收到之后,将会比对校验数据是否正确,如果数据接收错误,则输出错误信息。
线程用随机睡眠时间,来模拟接收端随机接收消息的效果。
static void* receive_task(void* argc)
{
char msg = 0x00;
while(1)
{
mq_receive(mq, &msg, 1, 0);
if(msg == 0x55)
{
break;
}
else if(msg != 0xaa)
{
printf("[error] msg recv: 0x%x %s\r\n",msg,strerror(errno));
}
sleep(rand()%3);
}
return NULL;
}
在主程序之前,首先定义了宏来表示消息队列的名称。
#define MQ_NAME "/mq0"
主程序中,首先设置消息队列句柄mqattr的参数,本程序中设置最大消息数64,一个消息大小为1字节。然后通过mq_open函数创建一个符合设置参数的消息队列,之后分别创建消息发送线程和接收线程,并且把各个子线程设置为连接状态,此时主程序阻塞等待子线程执行完毕后退出。
发送线程会发送两种数据,0x55和0xaa,分别表示退出指令和普通数据;接收线程收到消息队列的数据,通过简单比对,如果数据等于0x55,则表示该消息收发正确,否则打印错误信息。当接收线程收到0xaa指令,则线程退出。
int msgqueue_demo(void)
{
pthread_t tid_send, tid_receive;
struct mq_attr mqattr;
printf("\n [msgqueue_demo]\r\n");
mqattr.mq_maxmsg = 64;
mqattr.mq_msgsize = 1;
mq = mq_open(MQ_NAME, O_RDWR|O_CREAT|O_EXCL, S_IRWXU|S_IRWXG|S_IRWXO, &mqattr);
printf("mq : %d %s\n", mq,strerror(errno));
if(mq == (-1))
{
printf("mq : %d %s\n", mq,strerror(errno));
return 0;
}
pthread_create(&tid_send, NULL, send_task, NULL);
pthread_create(&tid_receive, NULL, receive_task, NULL);
pthread_join(tid_send, NULL);
pthread_join(tid_receive, NULL);
mq_close(mq);
mq_unlink(MQ_NAME);
return 0;
}
本篇介绍了多线程程序中另一种机制---线程间通信。消息队列作为最重要的多线程编程机制之一,在很多场合都有着重要的作用,比如利用消息队列实现处理指令的缓冲等。文章设计了消息队列的一种典型的应用场合,通过例程的设计和执行。可以很清晰的看到消息队列在多线程数据交互传输之间的应用细节。
本篇是POSIX标准接口应用开发基础的的最后一篇,由于能力有限,只能把自己在Smart系统上尝试过的这些基础操作分享出来。后续如果时间允许,会继续分享在POSIX标准下开发的一些Smart应用实例。
结尾照例分享一首喜欢的诗,生活如沧浪之水,清浊难辨,好坏难分,灰色是生命的常态,平凡是生活的圣经。
目击众神死亡的草原上野花一片
远在远方的风比远方更远
我的琴声呜咽 泪水全无
我把这远方的远归还草原
——海子《九月》
刚好用的到,感谢分享
大佬,你的这个demo是可以运行成功的吗?为什么我运行不正常呢?内核需要修改吗?

list_msgqueue也没有对应的mq建立

@AngerCoke 😂我是个菜鸟,不是大佬。这个是可以运行运行成功的,不过因为是用的posix那套接口,目测应该list的话可能会看不到吧,我没有注意这个问题。如果没有特殊需求,还是跑RT官方的那套API比较好哦。我这个分享主要是针对想移植老程序的操作才进行的测试。哈哈。