在前文介绍了利用互斥量实现多线程程序中,对共享资源的保护,实现了多线程的同步操作。但是在消费者线程中,需要对共享资源进行循环检测,这样将会导致过多的CPU资源的占用。针对这一问题,能否有一种机制,可以让消费者线程等待在某处,直到生产者线程主动将其唤醒。这样就不需要轮询检测共享资源的状态了,从而解放CPU的占用,提高程序执行的效率呢?答案是有,即POSIX标准下的另一个线程间同步机制——条件变量。
条件变量并不保存状态信息,只是传递应用程序状态信息的一种通讯机制。发送信号时若无任何线程在等待该条件变量,这个信号也就会不了了之。线程如在此后等待该条件变量,只有当再次收到此变量的下一信号时,方可解除阻塞状态 。因此,条件变量必须配合互斥量来使用,当线程阻塞到某个条件变量上时,必须指明此时等待的条件,应该受控与哪一把“锁”(mutex),否则条件变量将毫无意义。
使用宏 PTHREAD_COND_INITIALIZER可以对条件变量在创建的同时进行静态初始化。条件变量用 pthread_cond_t 数据类型 表示 ,用条件变量前必须对其初始化。对于经由静态分配的条件变量,将其赋值为PTHREAD_COND_INITALIZER 即完成初始化操作。
#define PTHREAD_COND_INITIALIZER
具体初始化条件变量的操作如下:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
除了使用宏进行静态初始化之外,还可以先定义条件变量,然后在合适位置进行初始化,或者在堆中动态分配的条件变量,譬如使用 malloc()函数申请分配的条件变量对象,函数原型如下:
int pthread_cond_init(pthread_cond_t *cond,pthread_condattr_t *attr);
函数参数和返回值含义如下:
其中pthread_condattr_t类型对象用于定义互斥锁的属性,若将参数 attr 设置为 NULL,则表示将条件变量的属性设置为默认值,在这种情况下其实就等价于PTHREAD_COND_INITIALIZER 这种方式初始化,而不同之处在于,使用宏不进行错误检查。
返回值: 成功返回 0;失败将返回一个非 0 的错误码。
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex_t *mutex,const timespec *abstime);
调用 pthread_cond_wait()函数可以使当前线程阻塞在该条件变量上,直到被另一个线程通过该条件变量唤醒,
功,函数调用将立刻返回;pthread_cond_timewait函数则可以设置阻塞的超时时间,在超时时间到了之后,自动解除阻塞。
条件变量与互斥量之间存在天然关系,同时等待相同条件变量的所有线程在调用 pthread_cond_wait()或 pthread_cond_timedwait()时必须指定同一互斥量。实际上, pthread_cond_wait()在调用期间能将条件变量与一个唯一的互斥量做动态绑定 。如果脱离了互斥量,那么条件变量就没有存在的意义,所以,必须保证被唤醒的线程,能够“指向”共享资源被“锁定”的线程使用的那把“锁”,才能做到线程等待条件、获取锁操作共享资源的整个流程。
函数 pthread_cond_signal()和 pthread_cond_broadcast()均可针对由参数 cond 所指定的条件变量而发送信号 。函数 pthread_cond_signal()和 pthread_cond_broadcast()之间的差别在于,二者对阻塞于 pthread_cond_wait()的多个线程处理方式不同。 pthread_cond_signal()函数只保证唤醒至少一条遭到阻塞的线程,而 pthread_cond_broadcast()则会唤醒所有遭阻塞的线程。
其函数原型如下所示:
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
参数 cond指向目标互斥锁,成功返回 0,失败返回一个非 0 值的错误码。
使用函数 pthread_cond_broadcast()总能产生正确结果(因为所有线程应都能处理多余和虚假的唤醒动作),但函数 pthread_cond_signal()会更为高效。不过,只有当仅需唤醒一条(且无论是其中哪条)等待线程来处理共享变量的状态变化时,才应使用 pthread_cond_signal()。应用这种方式的典型情况是,所有等待线程都在执行完全相同的任务。基于这些假设,函数pthread_cond_signal()会比 pthread_cond_broadcast()更具效率,因为这可以避免发生如下情况:
当不再需要条件变量时, 应该将其销毁,通过调用 pthread_cond_destroy()函数来销毁互斥锁,其函数原
型如下所示:
int pthread_cond_destroy(pthread_cond_t *cond);
使用该函数需要包含头文件<pthread.h>,参数 cond指向目标互斥锁;同样在调用成功情况下返回 0,
失败返回一个非 0 值的错误码。
结合前一篇文章【ART-PI Smart 抛砖引玉 二】基于POSIX的应用开发之多线程) 中通过互斥量实现的生产者消费者模型,来添加条件变量的使用,配合互斥量,实现线程间的同步的同时,也保证CPU较低的占用。
首先设计共享资源对象,因为增加了条件变量,因此本篇的共享数据块的具体数据结构有所改变,如下:
struct safe_shared_obj
{
pthread_mutex_t* buf_mutex;
pthread_mutex_t* readycnt_mutex;
pthread_cond_t* ready_cond;
int* buf;
int len;
int pos;
int nextval;
int readycnt;
};
typedef struct safe_shared_obj safe_shared_t;
该数据结构包含了共享资源的地址,数据块长度,当前数据存放的位置,已经下一个要存放的值,并且包含了两个互斥锁指针,一个用来保护身共享数据块,另一个用来保护readycnt变量;该变量用来标记当前可读取的数据个数,该变量会随着生产者在内存块中填入数据而累加,也会随着消费者取走数据块中的内容而递减。
依旧采用动态内存分配的方式来创建共享数据块对象句柄,这种方式较为灵活,可以根据需求创建符合需求的数据块大小。该函数入口参数是需要创建的数据块的大小(默认int型数据),返回创建好的共享数据块的句柄指针
safe_shared_t* safe_shared_create(int size)
{
safe_shared_t* pargv = NULL;
if(size==0)
{
printf("[error] init shared buf size invaild!\r\n");
return NULL;
}
pargv = (safe_shared_t*)malloc(sizeof(struct safe_critical_obj));
if(pargv == NULL)
{
printf("[error] malloc shared error!\r\n");
return NULL;
}
pargv->buf_mutex = NULL;
pargv->readycnt_mutex = NULL;
pargv->ready_cond = NULL;
pargv->buf = NULL;
pargv->len = 0;
pargv->pos = 0;
pargv->nextval = 0;
pargv->readycnt = 0;
pargv->buf_mutex = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
pargv->readycnt_mutex = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
if(pargv->buf_mutex == NULL||pargv->readycnt_mutex == NULL ||
pthread_mutex_init(pargv->buf_mutex, NULL) != 0 ||
pthread_mutex_init(pargv->readycnt_mutex, NULL) != 0)
{
printf("[error] init shared mutexs error!\r\n");
return NULL;
}
pargv->ready_cond = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
if(pargv->ready_cond == NULL || pthread_cond_init(pargv->ready_cond,NULL) != 0)
{
printf("[error] init shared cond error!\r\n");
return NULL;
}
pargv->buf = (int*)malloc(sizeof(int)*size);
if(pargv->buf == NULL)
{
printf("[error] malloc shared buf error!\r\n");
return NULL;
}
memset(pargv->buf,0,size);
pargv->len=size;
return pargv;
}
动态分配的数据块对象句柄,在这个应用程序退出时,应该把从系统内配的内存统一释放归还给系统,避免出现不可预测内存问题。所以,在程序结束时,调用该销毁函数,实现对共享资源的释放。
int safe_shared_destroy(safe_shared_t* pargv)
{
if(pargv == NULL)
{
printf("[error] destroy shared error!\r\n");
return -1;
}
pthread_mutex_destroy(pargv->buf_mutex);
pthread_mutex_destroy(pargv->readycnt_mutex);
pthread_cond_destroy(pargv->ready_cond);
free(pargv->buf_mutex);
pargv->buf_mutex =NULL;
free(pargv->readycnt_mutex);
pargv->readycnt_mutex=NULL;
free(pargv->ready_cond);
pargv->ready_cond=NULL;
free(pargv);
pargv = NULL;
return 0;
}
生产者线程在前一篇设计的基础之上,增加了新的互斥锁和条件变量来完善整个共享资源操作的流程buf_mutex用来保护数据块内容的更新以及相关变量的更新。readycnt_mutex互斥量用来专门保护当前可消费的数据个数的更新。并且在更新完该变量之后,对阻塞在条件变量ready_cond上的消费者线程进行唤醒。
并且,与之前相同的是,在执行完临界区操作之后,对从主线程传入生产者子线程的一个计数变量进行累加,该变量从主线程传入子线程,用来累计生产者线程在执行完数据写入之后总的写入次数,对于多生产者线程的应用情境下,如果共享数据块的互斥锁机制正常起作用,最终在主线程退出时,各个生产者子线程的所有计数和应该等于共享内存数据长度,及代表各个子线程在访问和操作共享资源的过程中,合理的对互斥锁进行了获取和释放。
在每次数据写入后,让线程随机休眠一段时间,来模拟不同生产者线程对共享资源访问的随机性。当数据写入位置到数据块的末尾时,生产者线程则退出。
static void* produce_entry(void* argc)
{
while(1)
{
pthread_mutex_lock(safe_shared_handle->buf_mutex);
if(safe_shared_handle->pos >= safe_shared_handle->len)
{
pthread_mutex_unlock(safe_shared_handle->buf_mutex);
break;
}
safe_shared_handle->buf[safe_shared_handle->pos] = safe_shared_handle->nextval;
safe_shared_handle->pos++;
safe_shared_handle->nextval++;
pthread_mutex_unlock(safe_shared_handle->buf_mutex);
pthread_mutex_lock(safe_shared_handle->readycnt_mutex);
safe_shared_handle->readycnt++;
pthread_cond_signal(safe_shared_handle->ready_cond);
pthread_mutex_unlock(safe_shared_handle->readycnt_mutex);
*((int* )argc)+=1;
sleep(rand()%3);
}
return NULL;
}
消费者线程会按照创建的共享数据块的长度来遍历该共享资源内的数据,在没有访问到数据末尾的时候,利用每次把数据取出进行数据比对校验来模拟消费者线程对共享资源的访问和操作,如果数据比对失败,则意味着生产者填入的某个数据出现了错误。消费者线程用随机睡眠时间,来模拟消费者对共享资源的访问随机效果。
static void* consume_entry(void* argc)
{
for(int i=0; i<safe_shared_handle->len; i++)
{
pthread_mutex_lock(safe_shared_handle->readycnt_mutex);
while(safe_shared_handle->readycnt==0)
{
pthread_cond_wait(safe_shared_handle->ready_cond,safe_shared_handle->readycnt_mutex);
}
safe_shared_handle->readycnt--;
pthread_mutex_unlock(safe_shared_handle->readycnt_mutex);
pthread_mutex_lock(safe_shared_handle->buf_mutex);
if(safe_shared_handle->buf[i] != i)
{
printf("[error] shared_handle->buf[%d] : %d \r\n", i, safe_shared_handle->buf[i]);
}
pthread_mutex_unlock(safe_shared_handle->buf_mutex);
sleep(rand()%3);
}
pthread_exit(0);
return NULL;
}
在改进过后的消费者线程中,共享数据块分别有两把锁,来一次实现对不同资源对象的保护,值得注意的是,在程序中,while(safe_shared_handle->readycnt==0)循环判断的作用。在该代码块中,当前线程阻塞在ready_cond这个条件变量上,并且和readycnt_mutex互斥锁绑定。此时的while判定,主要是用来防止当前线程被虚假唤醒,比如在另一个线程中用到了pthread_cond_broadcast,将会产生“惊群”现象,即所有阻塞在该条件变量上的线程都会接触唤醒,如果不对具体要操作的变量进行判断,则会导致在某变量 不符合程序逻辑要求的情况下自动往下执行后续逻辑,将会产生不好的结果。因此,利用while循环判断,来防止线程阻塞在条件变量上被虚假唤醒的问题。
while(safe_shared_handle->readycnt==0)
{
pthread_cond_wait(safe_shared_handle->ready_cond,safe_shared_handle->readycnt_mutex);
}
在主程序之前,首先定义了宏来表示要定义的实际共享数据块的大小以及要创建的生产者线程的数量。
#define MAX_DATAS 10000
#define MAX_PRODUCETHREADS 3
本设计中,设定共享数据块的大小为10000个int数据的容量,生产者线程数量3,消费者线程数量默认为1。创建完各个线程之后,即把各个线程一次设置为可连接状态,此时主线程会阻塞,直到子线程执行完毕,然后统一打印各个子线程的执行情况,已经总的消费者线程的累计计数值,正如前文所述,如果各个子线程累计数据写入计数值等于共享变量的最大容量(int为单位),则说明共享数据块的互斥锁机制正确的起了作用。最后,在主线程退出之前,调用销毁接口来释放共享数据块的资源。
int cond_demo(void)
{
int count_tbl[MAX_PRODUCETHREADS] = {0};
pthread_t consume_thread;
pthread_t produce_thread[MAX_PRODUCETHREADS];
printf("\n [cond_demo]\r\n");
safe_shared_handle = safe_shared_create(MAX_DATAS);
if(safe_shared_handle == NULL)
{
printf("[error] create shared error!\r\n");
return -1;
}
printf("produce thread : %d\r\n",MAX_PRODUCETHREADS);
printf("consume thread : %d\r\n",1);
for(int i=0; i<MAX_PRODUCETHREADS; i++)
{
pthread_create(&produce_thread[i], NULL, produce_entry, (void* )&count_tbl[i]);
}
pthread_create(&consume_thread, NULL, consume_entry, NULL);
for(int i=0; i<MAX_PRODUCETHREADS; i++)
{
pthread_join(produce_thread[i], NULL);
printf("count[%d] : %d\r\n", i, count_tbl[i]);
}
int add=0;
for(int i=0; i<sizeof(count_tbl)/sizeof(count_tbl[0]); i++)
{
add+=count_tbl[i];
}
printf("all cnt : %d\r\n", add);
pthread_join(consume_thread, NULL);
printf("join over.\r\n");
safe_shared_destroy(safe_shared_handle);
printf("demo close.\r\n");
return 0;
}
本篇在互斥量实现线程同步来安全访问共享数据资源的基础上,引入了条件变量这一机制,条件变量的引入,完善了mutex机制,二者配合使用,在实现了对共享资源的保护的同时,也能做到根据情况去唤醒阻塞在某个条件下的线程,使其来竞争下一轮的互斥锁使用权,这就避免了单一使用互斥量需要反复轮询变量状态条件所带来的高CPU占用。在实际开发中,互斥锁+条件变量的机制应用较为广泛。
本篇的结束,照例分享一段喜欢的诗,作为凌晨的早安,宇宙很大,生活更大。
我咽下一枚铁做的月亮
他们把它叫做螺丝
我咽下这工业的废水,失业的订单
那些低于机台的青春早早夭亡
我咽下奔波,咽下流离失所
咽下人行天桥,咽下长满水锈的生活
我再咽不下了
所有我曾经咽下的现在都从喉咙汹涌而出
在祖国的领土上铺成一首
耻辱的诗
---2013-12-19 许立志(富士康流工人/90后诗人/于2014-10-1自杀身亡)