Toggle navigation
首页
问答
文章
积分商城
专家
专区
更多专区...
文档中心
返回主站
搜索
提问
会员
中心
登录
注册
ART-Pi Smart
POSIX
【ART-PI Smart 抛砖引玉 八】基于POSIX的应用开发实践之线程池
发布于 2022-05-16 22:26:58 浏览:673
订阅该版
[tocm] ## 线程池的引入 通常情况下的多线程应用程序流程大致为:接收任务请求->创建线程并执行任务->退出线程。这在一般应用场景下较为合适。甚至有些线程需要在程序运行期间持续轮询,或阻塞或运行。除了上述多线程应用的常规情况外,还会存在如上一篇文章[【ART-PI Smart 抛砖引玉 七】基于POSIX的应用开发实践之任务队列](https://club.rt-thread.org/ask/article/78ef443775d0bd0d.html)中提到的情形,即在短时间内,会出连续出现很多个任务请求等待执行,此时如果继续选择多线程的一般流程来设计程序,那么每一个任务的到来和执行都需要创建新的线程并且在执行完毕之后销毁,这将导致短时间内系统开销增大。针对类似上述应用情形,可以设计某种机制,使其在多任务连续请求的过程中,实现线程的重用,并且能够根据当前系统中的待执行任务的状态和当前重用线程的数量来动添加和删除工作线程,以此来使线程池的工作线程数量保持在一个合理的范围内,从而提高多任务场景下的执行效率,降低系统开销。这种方式即为线程池模式。 ![捕获.PNG](https://oss-club.rt-thread.org/uploads/20220516/c31bda1d452befcec9276eda21d429e9.png.webp) (图片来自于网络,仅供示意参考) ## 线程池的组成结构 一个线程池对象在程序结构上大致可以分为三个主要部分:任务队列、管理者线程、工作者线程。任务队列用来存放待处理的任务,在前一篇文章中已经对该模块进行了设计,在本篇线程池对象设计中,作为其对象数据结构成员变量即可。管理者线程是维持线程池运行的核心,其并不执行任务队列中的具体任务,而是按照设定好的监测频率和管理策略,根据当前任务队列的状态和当前工作线程的状态来动态管理工作线程的数量,是实现高效处理的核心。工作线程主要负责从任务队列中,获取相应的任务并且执行,并且根据管理者线程给出的状态,来实现自身的存留。 | 模块 | 功能说明 | | ---------- | ------------------------------------------ | | 任务队列 | 用来容纳系统要执行的任务。 | | 管理者线程 | 不参与执行任务,只负责监视管理工作者线程 | | 工作者线程 | 不参与线程管理,只负责执行任务队列中的任务 | ## 线程池对象设计 线程池对象由两部分组成,一部分用来对线程池自身特性进行描述,另一部分用来对线程池操作的行为进行描述。下面分别给出任务对象设计的两部分构成细则。 ### 线程池对象参数成员构成 | 成员参数 | 功能描述 | | ------------------ | -------------------------------------------------- | | *taskQueueHandle | 任务队列,用于存放待处理的任务,包含完整的队列操作 | | managerThread | 管理者线程 | | *workerThreadQueue | 工作者线程队列,用来容纳创建的工作者线程 | | minThreadNum | 线程池中最小存活工作者线程数 | | maxThreadNum | 线程池中工作者线程数量上限 | | busyThreadNum | 线程池当前忙碌的工作线程数 | | liveThreadNum | 线程池当前存活的工作线程数 | | killThreadNum | 线程池要销毁的线程数 | | shutdown | 销毁线程池标志 | | 成员参数 | 功能描述 | | ------------ | -------------------- | | mutexPool | 线程池互斥锁 | | mutexBusy | 工作线程忙碌数互斥锁 | | cond_isFull | 任务队列满条件变量 | | cond_isEmpty | 任务队列空条件变量 | ### 线程池对象的行为 线程池对象的行为主要是针对线程池对象实例在实际操作过程中要实现的具体功能进行了接口的封装,通过函数指针统一由线程池对象来初始化和管理,间接的把线程池对象的参数保护起来,通过功能接口来调用。 | 队列对象行为 | 功能描述 | | ------------------ | -------------------------- | | *addTask | 向线程池内添加任务 | | *getBusyThreadNum | 获取当前忙碌工作线程数 | | *getAliveThreadNum | 取当前存活线程数 | | *getKillThreadNum | 获取当前要退出的线程数 | | *getMinThreadNum | 线程池存在的最小工作线程数 | | *getMaxThreadNum | 线程池允许的最大工作线程数 | | *destroy | 线程池销毁 | ### 线程池对象数据结构 ```C typedef struct ThreadPool { taskQueue_t* taskQueueHandle; pthread_t managerThread; pthread_t *workerThreadQueue; int minThreadNum; int maxThreadNum; int busyThreadNum; int liveThreadNum; int killThreadNum; int shutdown; pthread_mutex_t mutexPool; pthread_mutex_t mutexBusy; pthread_cond_t cond_isFull; pthread_cond_t cond_isEmpty; void (*addTask)(void* this, void (*func)(void*), void* arg); int (*getBusyThreadNum)(void* this); int (*getAliveThreadNum)(void* this); int (*getKillThreadNum)(void* this); int (*getMinThreadNum)(void* this); int (*getMaxThreadNum)(void* this); int (*destroy)(void* this); }ThreadPool_t; ``` ## 线程池操作接口设计 | API接口 | 功能描述 | | :---------------- | -------------- | | threadpool_create | 创建线程池实例 | 线程池主要的API接口为线程池创建函数,函数入口参数分别为线程池中最小线程数、最大线程数、任务队列容量,该函数会逐一创建出线程池对象的实例,并且返回动态创建的对象内存块指针。在线程池使用完毕,调用线程池对象行为函数即可销毁当前线程池对象实例。 ```c ThreadPool_t* threadpool_create(int threadCntMin, int threadCntMax, int taskQueueSize) { ThreadPool_t* pool = (ThreadPool_t*)malloc(sizeof(ThreadPool_t)); do { if(NULL == pool) { printf("[error] malloc threadpool err!\n"); break; } pool->workerThreadQueue = (pthread_t*)malloc(sizeof(pthread_t)*threadCntMax); if(NULL == pool->workerThreadQueue) { printf("[error] malloc workerthreadQueue err!"); } memset(pool->workerThreadQueue,0,sizeof(pthread_t)*threadCntMax); pool->shutdown = 0; pool->minThreadNum = threadCntMin; pool->maxThreadNum = threadCntMax; pool->busyThreadNum = 0; pool->liveThreadNum = threadCntMin; pool->killThreadNum = 0; if( pthread_mutex_init(&pool->mutexPool, NULL) != 0 || pthread_mutex_init(&pool->mutexBusy, NULL) != 0 || pthread_cond_init(&pool->cond_isFull, NULL) != 0 || pthread_cond_init(&pool->cond_isEmpty,NULL) != 0 ) { printf("pools IPC init err! \n"); break; } pool->taskQueueHandle = taskQueue_create(taskQueueSize); pthread_create(&pool->managerThread, NULL, managerHandler, pool); for(int i=0; i
minThreadNum; i++) { pthread_create(&pool->workerThreadQueue[i], NULL, workerHandler, pool); } pool->addTask = threadpool_addTask; pool->getBusyThreadNum = threadpool_getBusyThreadNum; pool->getAliveThreadNum = threadpool_getAliveThreadNum; pool->getMinThreadNum = threadpool_getMinThreadNum; pool->getMaxThreadNum = threadpool_getMaxThreadNum; pool->destroy = threadpool_destroy; return pool; } while(0); if(pool && pool->workerThreadQueue) { free(pool->workerThreadQueue); } if(pool && pool->taskQueueHandle) { free(pool->taskQueueHandle); } if(pool) { free(pool); } return NULL; } ``` ## 线程池对象管理者线程设计 线程池的管理者线程是线程池整个执行的核心,它不执行具体的任务,主要是根据任务队列中任务的状态,来动态管理整个线程池中工作者线程。管理者线程按照一定的设定频率来循环检测,CHECK_TIM即表示管理者线程的监管周期,本设计采用的是固定设定值,后续优化可以根据整个系统的运行情况,设置成动态调整。 管理者线程主要执行两个功能,即向判断是否需要向工作者线程中添加线程,以及判断是否需要删除当前空闲的消费者线程。判断标准主要依据任务队列的状态和当前线程状态之间的关系。 ```C static void* managerHandler(void* arg) { ThreadPool_t* pool = (ThreadPool_t*)arg; while(!pool->shutdown) { usleep(CHECK_TIM); pthread_mutex_lock(&pool->mutexPool); int taskCurNum = pool->taskQueueHandle->getCurrentTaskNum(pool->taskQueueHandle); int liveThreadNum = pool->liveThreadNum; pthread_mutex_unlock(&pool->mutexPool); pthread_mutex_lock(&pool->mutexBusy); int busyThreadNum = pool->busyThreadNum; pthread_mutex_unlock(&pool->mutexBusy); if(taskCurNum > liveThreadNum && liveThreadNum < pool->maxThreadNum) { int cnt=0; pthread_mutex_lock(&pool->mutexPool); for(int i=0; i
maxThreadNum && cnt
liveThreadNum < pool->maxThreadNum; i++) { if(pool->workerThreadQueue[i]==0) { pthread_create(&pool->workerThreadQueue[i], NULL, workerHandler, pool); cnt++; pool->liveThreadNum++; } } pthread_mutex_unlock(&pool->mutexPool); } if(liveThreadNum > busyThreadNum*2 && liveThreadNum > pool->minThreadNum) { pthread_mutex_lock(&pool->mutexPool); pool->killThreadNum = RM_THREAD_NUM; pthread_mutex_unlock(&pool->mutexPool); for(int i=0; i
cond_isEmpty); } } } return NULL; } ``` ## 线程池对象工作者线程设计 工作者线程主要负责从任务队列中获取任务并且执行,如果任务队列里没有需要执行的数据,工作者线程会阻塞在“空”条件变量上等待唤醒,一旦被唤醒,则首先判断是否需要“自杀”,如果是正常唤醒,则继续判断当前状态下线程池是否要销毁;正常情况下,即可从任务队列中获取出一个任务,然后唤醒一次阻塞在“满”条件下的添加任务线程(生产者线程)继续添加任务,随后即可按照回调方式来执行任务函数。 ```C static void* workerHandler(void* arg) { ThreadPool_t* pool = (ThreadPool_t*)arg; task_t task; while(1) { pthread_mutex_lock(&pool->mutexPool); while(pool->taskQueueHandle->getCurrentTaskNum(pool->taskQueueHandle)==0 && !pool->shutdown) { pthread_cond_wait(&pool->cond_isEmpty, &pool->mutexPool); if(pool->killThreadNum>0) { pool->killThreadNum--; if(pool->liveThreadNum > pool->minThreadNum) { pool->liveThreadNum--; pthread_mutex_unlock(&pool->mutexPool); threadpool_currentThreadExit(pool); } } } if(pool->shutdown) { pool->liveThreadNum--; pthread_mutex_unlock(&pool->mutexPool); threadpool_currentThreadExit(pool); } task = pool->taskQueueHandle->getTask(pool->taskQueueHandle) pthread_cond_signal(&pool->cond_isFull); pthread_mutex_unlock(&pool->mutexPool); pthread_mutex_lock(&pool->mutexBusy); pool->busyThreadNum++; pthread_mutex_unlock(&pool->mutexBusy); task.func(task.arg); free(task.arg); task.arg=NULL; pthread_mutex_lock(&pool->mutexBusy); pool->busyThreadNum--; pthread_mutex_unlock(&pool->mutexBusy); } return NULL; } ``` ## 线程池对象行为函数设计 ### 销毁线程池 对应于线程池的创建,线程池的销毁包含在了线程池的行为函数中,当不需要再使用线程池时,通过线程池实例调用该接口函数即可实现自我销毁,释放占用的系统资源。值得注意的是,在销毁函数中,先通过线程连结阻塞等待管理者线程结束,之后遍历触发所有存活且阻塞等待在“空”条件变量的工作者线程,此时被唤醒的工作者线程会根据线程池销毁标志的置位而自动“自杀”。 ```C static int threadpool_destroy(void* this) { ThreadPool_t* pool = (ThreadPool_t*)this; int ret; if(pool == NULL) { return -1; } pool->shutdown =1; printf("\nstart destroy thread pool...\n"); pthread_join(pool->managerThread, NULL); pool->managerThread = 0; for(int i=0; i
liveThreadNum; i++) { pthread_cond_signal(&pool->cond_isEmpty); } if(pool->taskQueueHandle) { pool->taskQueueHandle->destroy(pool->taskQueueHandle); } if(pool->workerThreadQueue) { free(pool->workerThreadQueue); pool->workerThreadQueue=NULL; } pthread_mutex_destroy(&(pool->mutexPool)); pthread_mutex_destroy(&(pool->mutexBusy)); ret = pthread_cond_destroy(&(pool->cond_isFull)); ret = pthread_cond_destroy(&(pool->cond_isEmpty)); free(pool); pool=NULL; printf("destroy thread pool success.\n\n"); return 0; } ``` ### 向线程池中添加任务 向线程池中添加一个待处理的任务,本质上是向线程池包含的的任务队列对象中添加任务,在添加任务之前,首先判断当前任务队列中存在的任务数是否已经达到任务队列的上限,如果此时任务队列已经满员,则使当前所处的线程阻塞在“满”条件变量上,直到被唤醒。唤醒之后还不能立即添加任务,还需要判断醒来之后线程池是否需要销毁,如果是在线程池销毁时被唤醒,则不能再继续添加任务,而是要主动退出。除此之外,正常条件下被唤醒后,则可以向线程池中的任务队列里添加任务,并且唤醒此时在系统中阻塞在“空”条件下的工作者线程,令其来获取任务并且执行。 ```c static void threadpool_addTask(void* this, void (*func)(void*), void* arg) { ThreadPool_t* pool = (ThreadPool_t*)this; pthread_mutex_lock(&pool->mutexPool); while( pool->taskQueueHandle->getCurrentTaskNum(pool->taskQueueHandle) == pool->taskQueueHandle->getQueueCap(pool->taskQueueHandle) && !pool->shutdown ) { printf("[error] task queue overflow, block full cond.\n"); pthread_cond_wait(&pool->cond_isFull, &pool->mutexPool); } if(pool->shutdown) { pthread_mutex_unlock(&pool->mutexPool); return; } pool->taskQueueHandle->addTask(pool->taskQueueHandle, func, arg); pthread_cond_signal(&pool->cond_isEmpty); pthread_mutex_unlock(&pool->mutexPool); } ``` ### 获取线程池状态和参数 获取线程池的状态和参数主要是通过一系列的相关的接口函数来对外返回线程池内部相关的数据,例如获取线程池当前忙碌的工作者线程数目,主要是通过如下接口函数来实现,在获取到忙碌数目的保护锁之后,即可读取并把数据返回。 其他线程池状态和参数的获取,和示例函数的流程一致,在此不再赘述,外部会获取的参数见上文列表。 ```c static int threadpool_getBusyThreadNum(void* this) { ThreadPool_t* pool = (ThreadPool_t*)this; pthread_mutex_lock(&pool->mutexBusy); int num = pool->busyThreadNum; pthread_mutex_unlock(&pool->mutexBusy); return num; } ``` ### 工作者线程的退出 工作者线程在需要自行退出时,会调用该接口,该函数会先根据当前工作线程的id,在工作者线程队列中清零对应的id值,然后执行线程退出系统调用。 ```c static void threadpool_currentThreadExit(void* this) { ThreadPool_t* pool = (ThreadPool_t*)this; pthread_t tid = pthread_self(); for(int i=0; i
maxThreadNum; i++) { if(pool->workerThreadQueue[i] == tid) { pool->workerThreadQueue[i] = 0; break; } } pthread_exit(NULL); } ``` 获取任务队列其他状态和参数的流程和上述操作类似。在此不做赘述。 ## 主程序设计 ### 任务函数设计 在测试函数中,设计要执行的任务函数为如下形式,即任务执行主要负责把当前任务所处的工作者线程id号和任务对应的id号打印出来,即正常退出。任务id编号主要以任务入口参数的形式传递进来。 ```C void taskFunc(void* arg) { int id = *(int*)arg; printf("thread %p is working, taskID = %d\n",pthread_self(),id); } ``` ### 主函数设计 在主程序中,调用线程池创建函数来初始化一个线程池实例,然后循环向线程池添加上面设计的任务函数,此时线程池会随着任务的增多,而进行工作者线程的动态管理。实现多任务多线程高效执行的目的。在执行完毕之后调用销毁接口来释放线程池的资源,并退出。在程序运行过程中,通过在管理者线程等相关位置添加打印信息,即可看到在整个过程中,线程池的内部工作者线程的动态变化情况。具体的测试函数可以结合更复杂的应用场景另行发挥,本文此处只展示最简单的操作流程,意在突出线程池的特点。 ```C int threadpool_demo(void) { ThreadPool_t* pool = threadpool_create(3,10,50); for(int i=0; i<100; i++) { int* ptaskID = (int*)malloc(sizeof(int)); *ptaskID = i; pool->addTask(pool, taskFunc, ptaskID); } printf("all task add end.\n"); sleep(30); pool->destroy(pool); return 0; } ``` ## 总结 本设计通过对实际程序运行场景中会出现的问题来进入了线程池的概念,并且按照对象的思想给出类线程池的相关设计,并且给出了简单的测试示例。线程池的应用过程中,引用了任务队列模块,这种对象话的设计可以很好地把各个功能模块抽象成独立的模块,继而可以极大的方便程序的设计。线程池模块的设计,可以应用在更广的场合下,比如网络服务器的多线程并发设计中。后会续结合实际情况来对这一应用进行分享。 文章的最后,照例分享一段自己喜欢的诗。好的音乐就是一首好诗,文字本身是诗,旋律亦是诗;就像她的眼睛也是诗,一首藏在心底的诗。 > “当你又泪如雨下 就变成七月的模样 我是腐烂了花期的凶手 你是藏起花瓣的牧童” ————宋东野《卡比巴拉的海》
2
条评论
默认排序
按发布时间排序
登录
注册新账号
关于作者
RickFlying
something in the way
文章
10
回答
45
被采纳
0
关注TA
发私信
相关文章
1
移植rt-thread2.1.0缺少components/pthreads里的posix_types.h文件中包含的sy
2
[征集自愿者] POSIX相关章节文档编写
3
RT-Thread POSIX支持
4
POSIX标准接口针对文件系统没有导出fsync
5
POSIX接口的select还没实现?
6
pthread 组建bug反馈
7
RTT中的POSIX支持
8
RT-Thread 如何用POSIX接口
9
使用 RT_USING_POSIX finsh能显示,但不能输入
10
请教、讨论POSIX接口、dfs中的pos和size问题
推荐文章
1
RT-Thread应用项目汇总
2
玩转RT-Thread系列教程
3
国产MCU移植系列教程汇总,欢迎查看!
4
机器人操作系统 (ROS2) 和 RT-Thread 通信
5
五分钟玩转RT-Thread新社区
6
【技术三千问】之《玩转ART-Pi》,看这篇就够了!干货汇总
7
关于STM32H7开发板上使用SDIO接口驱动SD卡挂载文件系统的问题总结
8
STM32的“GPU”——DMA2D实例详解
9
RT-Thread隐藏的宝藏之completion
10
【ART-PI】RT-Thread 开启RTC 与 Alarm组件
热门标签
RT-Thread Studio
串口
Env
LWIP
SPI
AT
Bootloader
Hardfault
CAN总线
FinSH
ART-Pi
USB
DMA
文件系统
RT-Thread
SCons
RT-Thread Nano
线程
MQTT
STM32
RTC
FAL
rt-smart
ESP8266
I2C_IIC
UART
WIZnet_W5500
ota在线升级
freemodbus
PWM
flash
cubemx
packages_软件包
BSP
潘多拉开发板_Pandora
定时器
ADC
flashDB
GD32
socket
中断
编译报错
Debug
SFUD
rt_mq_消息队列_msg_queue
msh
keil_MDK
ulog
C++_cpp
MicroPython
本月问答贡献
a1012112796
10
个答案
1
次被采纳
踩姑娘的小蘑菇
4
个答案
1
次被采纳
红枫
4
个答案
1
次被采纳
张世争
4
个答案
1
次被采纳
Ryan_CW
4
个答案
1
次被采纳
本月文章贡献
catcatbing
3
篇文章
5
次点赞
YZRD
2
篇文章
5
次点赞
qq1078249029
2
篇文章
2
次点赞
xnosky
2
篇文章
1
次点赞
Woshizhapuren
1
篇文章
5
次点赞
回到
顶部
发布
问题
投诉
建议
回到
底部