线程池的实现源码及应用举例

1.线程池本质

​ 多个线程组成的一个集合,目的为了并发执行任务,定义时是一个结构体,成员有互斥锁,条件变量,任务链队列指针,任务链队列中等待的任务个数,当前活跃的线程数量,线程ID,线程销毁标记

2.线程池的关键技术
(1)万能函数指针(通用函数指针): *void *(*p)(void )
(使用技巧:函数的参数个数超过1个时,参数可以打包成结构体,多个参数就变成一个参数(全部包含在结构体里面了))

原理:该函数需要用到互斥锁在完成一个任务后减少任务数量,解锁后继续下一个任务,还要用到条件变量在任务全部完成时通过判断任务数量和结束标志位退出。

(2)封装线程池有关的接口函数
三大基本函数需要我们去封装
第一个:初始化线程池
原理:通过对线程池结构体中的成员初始化让线程池进入工作模式,随后使用循环创建对应数量的线程

第二个:添加任务

原理:通过动态分配内存准备新的内存空间分配给新的任务,在添加任务时利用互斥锁上锁防止任务函数完成任务时减少任务数量带来的冲中突,将任务尾插到任务链表中,并目对线程池结构体中各个成员变量进行更新。

第三个:线程池的销毁(回收线程,想办法让线程的任务函数退出)
原理:通过改变线程池结构体成员变量中的结束标志位,令所有线程能够退出,随后在任务函数中开始退出线程,并在这个线程池销毁函数中回收所有线程。

线程池实例源码列举如下:


头文件(thread_pool.h)

#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <pthread.h>

#define MAX_WAITING_TASKS 1000 // 处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS 20  // 活跃的线程数量

// 任务结点  单向链表的节点,类型
struct task
{
	void *(*do_task)(void *arg); // 任务函数指针  指向线程要执行的任务  格式是固定的
	void *arg;					 // 需要传递给任务的参数,如果不需要,则NULL

	struct task *next; // 指向下一个任务结点的指针
};

// 线程池的管理结构体
typedef struct thread_pool
{
	pthread_mutex_t lock; // 互斥锁
	pthread_cond_t cond;  // 条件量

	bool shutdown;
	/*是否需要销毁线程池,用于指示线程池是否处于销毁状态。它的作用是在线程池需要被销毁时,向线程池中的工作线程发出信号,告知它们停止接受新的任务,并逐渐退出。具体来说,当 shutdown 标记被设置为 true 时,线程池将不再接受新的任务提交,但会继续执行已经提交的任务,直到所有任务都执行完毕。一旦线程池中的任务执行完毕,工作线程就会逐个退出,释放相关资源,最终销毁整个线程池。*/

	struct task *task_list; // 用于存储任务的链表

	pthread_t *tids; // 用于记录线程池中线程的ID

	unsigned max_waiting_tasks; // 线程池中线程的数量最大值
	unsigned waiting_tasks;		// 处于等待状态的线程数量
	unsigned active_threads;	// 正在活跃的线程数量
} thread_pool;

// 初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number);

// 向线程池中添加任务
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);

// 先线程池中添加线程
int add_thread(thread_pool *pool, unsigned int additional_threads_number);

// 从线程池中删除线程
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);

// 销毁线程池
bool destroy_pool(thread_pool *pool);

// 任务函数
void *routine(void *arg);

接口函数的实现源码(thread_pool.c):

#include "thread_pool.h"

/*
 *	@name   : handler
 *	@brief  : 接到取消请求之后进行释放互斥锁
 *	@params :
 *	          @arg : 传入每个添加的任务随机的10秒内的秒数
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 */
void handler(void *arg)
{
	printf("[%u] is ended.\n",
		   (unsigned)pthread_self()); // 响应取消请求之后自动处理的例程:释放互斥锁,以确保不会因为线程被取消而导致资源泄漏或死锁等问题。
	pthread_mutex_unlock((pthread_mutex_t *)arg);
}

/*
 *	@name   : routine
 *	@brief  : 接到取消请求之后进行释放互斥锁
 *	@params :
 *	          @arg : 传递给线程任务的参数
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 */
void *routine(void *arg)
{
// 调试
#ifdef DEBUG
	printf("[%u] is started.\n",
		   (unsigned)pthread_self());
#endif
	// 把需要传递给线程任务的参数进行备份
	thread_pool *pool = (thread_pool *)arg;
	struct task *p;

	while (1)
	{
		/*
		pthread_cleanup_push()是一个宏,用于向线程的取消处理器栈中注册(也可以简单理解为绑定)一个处理函数。作用是在线程退出时自动执行注册的处理函数,即handler
		*/
		pthread_cleanup_push(handler, (void *)&pool->lock);
		pthread_mutex_lock(&pool->lock); // 解锁,申请资源
		// 1,没有任务,且线程池没有被销毁,就挂起等待
		while (pool->waiting_tasks == 0 && !pool->shutdown)
		{
			pthread_cond_wait(&pool->cond, &pool->lock);
		}
		// 2, 没有任务,且线程池被标记销毁,就释放互斥锁并结束进程
		if (pool->waiting_tasks == 0 && pool->shutdown == true)
		{
			pthread_mutex_unlock(&pool->lock);
			pthread_exit(NULL); // CANNOT use 'break';
		}

		// 3, 从任务列表中取出一个任务进行消费,并更新线程池的任务列表和等待任务数量???如果线程数不够怎么办
		p = pool->task_list->next;
		pool->task_list->next = p->next;
		pool->waiting_tasks--;
		// 4, 释放互斥锁,并用pthread_cleanup_pop取消在pthread_cleanup_push中注册的清理处理器
		pthread_mutex_unlock(&pool->lock);
		pthread_cleanup_pop(0);

		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); // 设置取消状态为禁用,防止在执行任务时被取消
		(p->do_task)(p->arg);								  // 执行任务函数,传入任务参数
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);  // 恢复取消状态为启用

		free(p); // 任务完成,释放任务所占用的内存空间
	}
}

/*
 *	@name   : init_pool
 *	@brief  : 初始化线程池
 *	@params :
 *	          @*pool : 线程池的管理结构体指针
 *	          @threads_number : 初始确定的线程数
 *	@retval : 成功返回true,失败false
 * 	@version:
 * 	@note   :
 */
bool init_pool(thread_pool *pool, unsigned int threads_number)
{

	pthread_mutex_init(&pool->lock, NULL); // 初始化互斥锁
	pthread_cond_init(&pool->cond, NULL);  // 初始化条件量

	pool->shutdown = false;										 //  设置销毁标志参数,初始化为不销毁
	pool->task_list = malloc(sizeof(struct task));				 // 给链表的节点申请堆内存
	pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); // 申请堆内存,用于存储创建出来的TID
	// 错误处理,对malloc进行错误处理
	if (pool->task_list == NULL || pool->tids == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	pool->task_list->next = NULL;				 // 对任务链表中的节点的指针域进行初始化
	pool->max_waiting_tasks = MAX_WAITING_TASKS; // 设置线程池中线程数量的最大值
	pool->waiting_tasks = 0;					 // 设置等待线程处理的任务的数量为0,说明现在没有任务
	pool->active_threads = threads_number;		 // 设置线程池中活跃的线程的数量

	for (int i = 0; i < pool->active_threads; i++) // 循环创建活跃线程
	{
		// 创建线程  把线程的ID存储在申请的堆内存
		if (pthread_create(&((pool->tids)[i]), NULL,
						   routine, (void *)pool) != 0)
		{
			perror("create threads error");
			return false;
		}
// 用于调试
#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}
	return true;
}

/*
 *	@name   : add_task
 *	@brief  : 向线程池的任务链表中添加任务,并唤醒
 *	@params :
 *	          @*pool : 线程池的管理结构体指针
 *	          @threads_number : 初始确定的线程数
 *	@retval : 成功返回true,失败false
 * 	@version:
 * 	@note   :
 */
// 先线程池的任务链表中添加任务
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{
	struct task *new_task = malloc(sizeof(struct task)); // 给任务链表节点申请内存
	if (new_task == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	new_task->do_task = do_task;
	new_task->arg = arg;
	new_task->next = NULL; // 指针域设置为NULL

	pthread_mutex_lock(&pool->lock); // 进行线程池任务添加

	// 说明要处理的任务的数量大于能处理的任务数量,直接解锁释放资源并返回
	if (pool->waiting_tasks >= MAX_WAITING_TASKS)
	{
		pthread_mutex_unlock(&pool->lock);
		fprintf(stderr, "too many tasks.\n");
		free(new_task);
		return false;
	}

	struct task *tmp = pool->task_list; // 获取线程池单链表的头节点地址
	while (tmp->next != NULL)			// 遍历链表,找到单向链表的尾节点
		tmp = tmp->next;

	tmp->next = new_task;  // 把新的要处理的任务插入到链表的尾部  尾插
	pool->waiting_tasks++; // 要处理的任务的数量+1

	pthread_mutex_unlock(&pool->lock); // 解锁
// 调试
#ifdef DEBUG
	printf("[%u][%s] ==> a new task has been added.\n",
		   (unsigned)pthread_self(), __FUNCTION__);
#endif

	pthread_cond_signal(&pool->cond); // 唤醒第一个处于条件变量阻塞队列中的线程
	return true;
}

/*
 *	@name   : add_thread
 *	@brief  : 向线程池加入新线程
 *	@params :
 *	          @*pool : 线程池的管理结构体指针
 *	          @additional_threads : 需要添加的线程数
 *	@retval : 实际增加的线程数
 * 	@version:
 * 	@note   :
 */
int add_thread(thread_pool *pool, unsigned additional_threads)
{
	if (additional_threads == 0) // 判断需要添加的新线程的数量是否为0,是的话直接返回
		return 0;

	unsigned total_threads = pool->active_threads + additional_threads; // 计算线程池中总线程的数量

	int i, actual_increment = 0;													 // actual_increment 为实际增加的进程数
	for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++) // 对实际增加的进程数进行限制,使得总和不能超过最大总数
	{
		// 创建新线程
		if (pthread_create(&((pool->tids)[i]),
						   NULL, routine, (void *)pool) != 0) // 增加进程任务处理
		{
			perror("add threads error");
			// no threads has been created, return fail
			if (actual_increment == 0)
				return -1;

			break;
		}
		actual_increment++;

#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}
	// 记录此时线程池中活跃线程的总数,并作为返回值
	pool->active_threads += actual_increment;
	return actual_increment;
}
/*
 *	@name   : remove_thread
 *	@brief  : 移除多余线程
 *	@params :
 *	          @*pool : 线程池的管理结构体指针
 *	          @*removing_threads : 需要移除的线程数量
 *	@retval : 移除后剩下的活跃线程数
 * 	@version:
 * 	@note   :
 */
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
	if (removing_threads == 0) // 判断需要添加的新线程的数量是否为0,是的话直接返回
		return pool->active_threads;

	int remaining_threads = pool->active_threads - removing_threads;
	remaining_threads = remaining_threads > 0 ? remaining_threads : 1; // 初步计算经过移除后,剩下的线程数,并判断;最终剩余的线程数不得小于1

	int i;
	for (i = pool->active_threads - 1; i > remaining_threads - 1; i--) // 依次移除线程,并进行错误判断
	{
		errno = pthread_cancel(pool->tids[i]);

		if (errno != 0)
			break;

#ifdef DEBUG
		printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}

	if (i == pool->active_threads - 1) // 若没有移除成功,则返回-1
		return -1;
	else
	{
		pool->active_threads = i + 1;// 否则返回活跃的线程数
		return i + 1;
	}
}

/*
 *	@name   : destroy_pool
 *	@brief  : 销毁线程池
 *	@params :
 *	          @*pool : 线程池的管理结构体指针
 *	@retval : 成功返回true,失败false
 * 	@version:
 * 	@note   :
 */
bool destroy_pool(thread_pool *pool)
{
	pool->shutdown = true; // 1, 修改线程池结构体的成员,并通知所有线程
	pthread_cond_broadcast(&pool->cond);

	for (int i = 0; i < pool->active_threads; i++) // 2, 等待所有线程退出,并回收资源
	{
		errno = pthread_join(pool->tids[i], NULL);
		if (errno != 0)
		{
			printf("join tids[%d] error: %s\n",
				   i, strerror(errno));
		}
		else
			printf("[%u] is joined\n", (unsigned)pool->tids[i]);
	}

	free(pool->task_list); // 释放申请了的堆内存
	free(pool->tids);
	free(pool);

	return true;
}

应用举例(main.c):

#include "thread_pool.h"
/**
 * @file name:	--
 * @brief
 * @author ni456xinmie@163.com
 * @date 2024/04/25
 * @version 1.0 :版本
 * @property :
 * @note
 * CopyRight (c)  2023-2024   ni456xinmie@163.com   All Right Reseverd
 */

/*
 *	@name   : mytask
 *	@brief  : 给每个线程安排的具体任务
 *	@params :
 *	          @arg : 传入每个添加的任务随机的10秒内的秒数
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 * 			  1.__FUNCTION__ 是 C 和 C++ 语言中的预定义宏,用于获取当前所在函数的名称(在 C++ 中,也包括成员函数)。它会在编译时被替换为当前函数的字符串字面值。
 * 			  2.这部分可以换成自己需要安排的进程任务
 */
void *mytask(void *arg)
{
	int n = (int)arg; // 定义整型变量接收参数

	printf("[%u][%s] ==> job will be done in %d sec...\n",
		   (unsigned)pthread_self(), __FUNCTION__, n);
	sleep(n);
	printf("[%u][%s] ==> job done!\n",
		   (unsigned)pthread_self(), __FUNCTION__);

	return NULL;
}

/*
 *	@name   : count_time
 *	@	    : 计时器,每隔一秒输出当前秒数
 *	@params :
 *	          @*arg : NULL
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 */
void *count_time(void *arg)
{
	int i = 0;
	while (1)
	{
		sleep(1);
		printf("sec: %d\n", ++i);
	}
}

int main()
{
	// 创建线程进行实时输出时间
	pthread_t a;
	pthread_create(&a, NULL, count_time, NULL);

	// 1, initialize the pool 初始化带有2条线程的线程池
	thread_pool *pool = malloc(sizeof(thread_pool));
	init_pool(pool, 2);

	// 2, throw tasks  投入3个任务
	printf("throwing 3 tasks...\n");
	add_task(pool, mytask, (void *)(rand() % 10));
	add_task(pool, mytask, (void *)(rand() % 10));
	add_task(pool, mytask, (void *)(rand() % 10));

	// 3, check active threads number 显示当前的线程数量
	printf("current thread number: %d\n",
		   remove_thread(pool, 0));
	sleep(9);

	// 4, throw tasks 投入2个任务
	printf("throwing another 2 tasks...\n");
	add_task(pool, mytask, (void *)(rand() % 10));
	add_task(pool, mytask, (void *)(rand() % 10));
	// 5, add threads 增加2条线程
	add_thread(pool, 2);
	sleep(5);
	// 6, remove threads 移除3条线程
	printf("remove 3 threads from the pool, "
		   "current thread number: %d\n",
		   remove_thread(pool, 3));

	// 7, destroy the pool 销毁线程池
	destroy_pool(pool);
	return 0;
}

玄机博客
© 版权声明
THE END
喜欢就支持一下吧
点赞13 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片快捷回复

    暂无评论内容