文章目录
- 半同步/半反应堆线程池
- 1.头文件
- 2.类的声明
- 3.构造函数
- 4.析构函数
- 5.append()方法
- 6.append_p()方法
- 7.worker()方法
- 8.run()方法
半同步/半反应堆线程池
使用一个工作队列完全解除了主线程和工作线程的耦合关系:主线程往工作队列中插入任务,工作线程通过竞争来取得任务并执行它。
- 同步I/O模拟proactor模式
- 半同步/半反应堆
- 线程池
采用模板template
只有一个类threadpool,类方法实现都在头文件中
包括:构造函数threadpool()、析构函数~threadpool()以及append()方法、append_p()方法、worker()方法、run()方法
1.头文件
#include
-
#include
#include #include #include "../lock/locker.h" #include "../CGImysql/sql_connection_pool.h" 2.类的声明
template
class threadpool { public: /*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/ threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000); ~threadpool(); bool append(T *request, int state); bool append_p(T *request); private: /*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/ static void *worker(void *arg); void run(); private: int m_thread_number; //线程池中的线程数 int m_max_requests; //请求队列中允许的最大请求数 pthread_t *m_threads; //描述线程池的数组,其大小为m_thread_number std::list m_workqueue; //请求队列 locker m_queuelocker; //保护请求队列的互斥锁 sem m_queuestat; //是否有任务需要处理 connection_pool *m_connPool; //数据库 int m_actor_model; //模型切换 }; 3.构造函数
重点是最后一个循环,使用 pthread 库创建多个线程,并将它们分离(detach)到父线程中。循环内部会迭代 thread_number 次,每次创建一个新的线程。
具体的操作如下:
- 通过调用 pthread_create 函数来创建新线程。该函数的第一个参数是指向线程标识符的指针,第二个参数是线程属性,第三个参数是指向函数的指针,这个函数会作为线程的入口点,并传递一个指向当前对象的指针作为参数。如果创建线程成功,则返回0。
- 如果 pthread_create 返回非零值,则表示线程创建失败,此时代码会抛出 std::exception 异常,并删除已经创建的线程数组 m_threads。
- 接着,通过调用 pthread_detach 函数将线程从父线程中分离。如果分离成功,则返回0。
- 如果 pthread_detach 返回非零值,则表示线程分离失败,此时代码会抛出 std::exception 异常,并删除已经创建的线程数组 m_threads。
线程分离后,其资源将由系统自动回收。因此,无法通过调用 pthread_join() 来获取线程的返回值。
线程池的参数包括:
- actor_model 表示选择的反应堆模型
- connPool 表示连接池指针(connection_pool定义在sql_connection_pool中)
- thread_number 表示线程数量
- max_requests 表示请求队列中最大请求数量
template
threadpool ::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool) { // 通过形参给m_actor_model,m_thread_number,m_max_requests,m_connPool赋值、m_threads(描述线程池的数组,大小为m_thread_number)初始化为NULL // 线程数量or表示请求队列中最大请求数量小于等于0抛出异常 if (thread_number <= 0 || max_requests <= 0) throw std::exception(); // 创建m_threads数组,大小为m_thread_number m_threads = new pthread_t[m_thread_number]; // 如果m_threads数组为null抛出异常 if (!m_threads) throw std::exception(); // 遍历m_thread数组,为线程数组分配内存,并通过循环创建线程。 // worker是一个回调函数。它被传递给pthread_create作为新线程的入口点,它将在新线程上运行。 // 这个回调函数执行线程池中实际处理任务的工作。每个线程都执行 worker 函数, // worker 函数中调用run()函数,从请求队列中取出请求并处理,直到请求队列为空或者达到最大请求数 for (int i = 0; i < thread_number; ++i) { if (pthread_create(m_threads + i, NULL, worker, this) != 0) { delete[] m_threads; throw std::exception(); } if (pthread_detach(m_threads[i])) { delete[] m_threads; throw std::exception(); } } } 4.析构函数
析构函数 ~threadpool() 的作用是在线程池对象被销毁时,释放线程数组 m_threads 的内存。在该析构函数中,只有一行代码,即 delete[] m_threads;,它会释放之前在构造函数中通过 new 操作符分配的内存,因为在整个线程池生命周期中,m_threads 数组都是由线程池对象管理的。
template
threadpool ::~threadpool() { delete[] m_threads; } 5.append()方法
这个方法是用于向线程池工作队列中添加任务。
函数的参数包括:
- T* request 是指向要添加的任务对象的指针
- int state 是要为该任务设置的状态值
函数返回值 为一个布尔值,表示任务是否成功添加到工作队列中。如果工作队列已经满了,那么就不会添加到队列中,返回 false;否则将任务添加到队列中,并返回 true。
template
bool threadpool ::append(T *request, int state) { // 保护请求队列的互斥锁,上锁 m_queuelocker.lock(); // 检查队列是否已满,满了则解锁并返回false if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } // 队列没满,给新任务的状态赋值 m_state代表是读还是写 request->m_state = state; // 添加到队列末尾 m_workqueue.push_back(request); // 保护请求队列的互斥锁,解锁 m_queuelocker.unlock(); // post()对信号量释放,通过m_queuestat对等待的线程进行信号通知,让他们开始执行任务 m_queuestat.post(); return true; } 6.append_p()方法
方法append和append_p的区别在于:
- append方法有一个额外的参数state,用于设置任务的状态。
- append方法会在向工作队列中添加任务之前,将任务的状态设置为传递进来的参数state。
因此,append方法比append_p方法多了一个设置任务状态的步骤。(state在这个项目表示读或者写)
template
bool threadpool ::append_p(T *request) { m_queuelocker.lock(); if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } m_workqueue.push_back(request); m_queuelocker.unlock(); m_queuestat.post(); return true; } 7.worker()方法
定义了一个返回类型为 void* 的 worker 函数,之前在构造函数中以回调函数形式出现
这个方法是一个静态函数,它是线程池的工作线程函数,用于在线程池中执行具体任务。
在每次有新的任务需要执行时,会创建一个新的线程,并将该函数作为线程的入口点。在工作线程中,首先从参数中获取线程池对象的指针,然后调用线程池对象的 run() 方法执行具体的任务。最后,将线程池对象的指针作为返回值返回。
template
void *threadpool ::worker(void *arg) { // 将传入参数 arg 强制转换为线程池指针类型,赋值给 pool 指针 threadpool *pool = (threadpool *)arg; // 调用线程池对象的 run() 方法,开始执行任务 pool->run(); // 由于线程池对象需要在多个线程之间共享,因此将当前线程执行完毕后的线程池对象指针作为返回值返回 return pool; } 8.run()方法
该方法是线程池中的工作线程运行的主要逻辑代码,无限循环地从任务队列中取出任务,并执行任务中的处理逻辑。
template
void threadpool ::run() { while (true) { // 信号量,使当前线程等待直到任务队列中有任务可供执行 m_queuestat.wait(); // 获取任务队列锁 m_queuelocker.lock(); // 检查任务队列是否为空,如果为空则立即释放锁并继续下一次循环 if (m_workqueue.empty()) { m_queuelocker.unlock(); continue; } // 如果任务队列不为空,则从任务队列中取出队头元素request,并将其从队列中删除 T *request = m_workqueue.front(); m_workqueue.pop_front(); // 释放任务队列锁 m_queuelocker.unlock(); // 检查获取到的任务指针是否为空,如果是空指针则立即继续下一次循环 if (!request) continue; // 如果当前采用的是Reactor模式 m_actor_model == 1 if (1 == m_actor_model) { // 判断任务当前状态是否为读 request->m_state == 0 if (0 == request->m_state) { // 如果读操作成功,调用request->read_once()方法进行读操作 if (request->read_once()) { // 将request->improv属性设置为 1,表示已经得到改进 request->improv = 1; // 创建connectionRAII对象(该类在sql_connection_pool中定义),自动获取一个数据库连接对象 connectionRAII mysqlcon(&request->mysql, m_connPool); // 调用request->process()方法进行业务处理 request->process(); } // 如果读操作失败 else { // 将request->improv属性设置为 1,表示已经得到改进 request->improv = 1; // 将request->timer_flag属性设置为 1,表示需要添加一个定时器来重试该任务 request->timer_flag = 1; } } // 如果当前任务状态为写 else { // 调用request->write()方法进行写操作 if (request->write()) { // 如果写操作成功,则将request->improv属性设置为 1,表示已经得到改进 request->improv = 1; } else { // 如果写操作失败,则将request->improv属性设置为 1,表示已经得到改进 request->improv = 1; // 将request->timer_flag属性设置为 1,表示需要添加一个定时器来重试该任务 request->timer_flag = 1; } } } // 如果当前采用的是 Proactor 模式 m_actor_model != 1 Proactor的读写不在工作队列处理,这个只进行主程序读取后的数据处理 else { // 则创建connectionRAII对象,自动获取一个数据库连接对象 connectionRAII mysqlcon(&request->mysql, m_connPool); // 调用request->process()方法进行业务处理 request->process(); } } }
还没有评论,来说两句吧...