webserver--3.半同步半反应堆线程池

webserver--3.半同步半反应堆线程池

码农世界 2024-05-26 前端 94 次浏览 0个评论

文章目录

  • 半同步/半反应堆线程池
    • 1.头文件
    • 2.类的声明
    • 3.构造函数
    • 4.析构函数
    • 5.append()方法
    • 6.append_p()方法
    • 7.worker()方法
    • 8.run()方法

      半同步/半反应堆线程池

      使用一个工作队列完全解除了主线程和工作线程的耦合关系:主线程往工作队列中插入任务,工作线程通过竞争来取得任务并执行它。

      • 同步I/O模拟proactor模式
      • 半同步/半反应堆
      • 线程池

      采用模板template

      只有一个类threadpool,类方法实现都在头文件中

      包括:构造函数threadpool()、析构函数~threadpool()以及append()方法、append_p()方法、worker()方法、run()方法

      1.头文件

      #include 
      #include 
      #include 
      #include 
      #include "../lock/locker.h"
      #include "../CGImysql/sql_connection_pool.h"
      

      2.类的声明

      template 
      class threadpool
      {
      public:
          /*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
          threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
          ~threadpool();
          bool append(T *request, int state);
          bool append_p(T *request);
      private:
          /*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
          static void *worker(void *arg);
          void run();
      private:
          int m_thread_number;        //线程池中的线程数
          int m_max_requests;         //请求队列中允许的最大请求数
          pthread_t *m_threads;       //描述线程池的数组,其大小为m_thread_number
          std::list m_workqueue; //请求队列
          locker m_queuelocker;       //保护请求队列的互斥锁
          sem m_queuestat;            //是否有任务需要处理
          connection_pool *m_connPool;  //数据库
          int m_actor_model;          //模型切换
      };
      

      3.构造函数

      重点是最后一个循环,使用 pthread 库创建多个线程,并将它们分离(detach)到父线程中。循环内部会迭代 thread_number 次,每次创建一个新的线程。

      具体的操作如下:

      • 通过调用 pthread_create 函数来创建新线程。该函数的第一个参数是指向线程标识符的指针,第二个参数是线程属性,第三个参数是指向函数的指针,这个函数会作为线程的入口点,并传递一个指向当前对象的指针作为参数。如果创建线程成功,则返回0。
      • 如果 pthread_create 返回非零值,则表示线程创建失败,此时代码会抛出 std::exception 异常,并删除已经创建的线程数组 m_threads。
      • 接着,通过调用 pthread_detach 函数将线程从父线程中分离。如果分离成功,则返回0。
      • 如果 pthread_detach 返回非零值,则表示线程分离失败,此时代码会抛出 std::exception 异常,并删除已经创建的线程数组 m_threads。

        线程分离后,其资源将由系统自动回收。因此,无法通过调用 pthread_join() 来获取线程的返回值。

        线程池的参数包括:

        • actor_model 表示选择的反应堆模型
        • connPool 表示连接池指针(connection_pool定义在sql_connection_pool中)
        • thread_number 表示线程数量
        • max_requests 表示请求队列中最大请求数量
          template 
          threadpool::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool)
          {
              // 通过形参给m_actor_model,m_thread_number,m_max_requests,m_connPool赋值、m_threads(描述线程池的数组,大小为m_thread_number)初始化为NULL
              // 线程数量or表示请求队列中最大请求数量小于等于0抛出异常
              if (thread_number <= 0 || max_requests <= 0)
                  throw std::exception();
              // 创建m_threads数组,大小为m_thread_number
              m_threads = new pthread_t[m_thread_number];
              // 如果m_threads数组为null抛出异常
              if (!m_threads)
                  throw std::exception();
              // 遍历m_thread数组,为线程数组分配内存,并通过循环创建线程。
              // worker是一个回调函数。它被传递给pthread_create作为新线程的入口点,它将在新线程上运行。
              // 这个回调函数执行线程池中实际处理任务的工作。每个线程都执行 worker 函数,
              // worker 函数中调用run()函数,从请求队列中取出请求并处理,直到请求队列为空或者达到最大请求数
              for (int i = 0; i < thread_number; ++i)
              {
                  if (pthread_create(m_threads + i, NULL, worker, this) != 0)
                  {
                      delete[] m_threads;
                      throw std::exception();
                  }
                  if (pthread_detach(m_threads[i]))
                  {
                      delete[] m_threads;
                      throw std::exception();
                  }
              }
          }
          

          4.析构函数

          析构函数 ~threadpool() 的作用是在线程池对象被销毁时,释放线程数组 m_threads 的内存。在该析构函数中,只有一行代码,即 delete[] m_threads;,它会释放之前在构造函数中通过 new 操作符分配的内存,因为在整个线程池生命周期中,m_threads 数组都是由线程池对象管理的。

          template 
          threadpool::~threadpool()
          {
              delete[] m_threads;
          }
          

          5.append()方法

          这个方法是用于向线程池工作队列中添加任务。

          函数的参数包括:

          • T* request 是指向要添加的任务对象的指针
          • int state 是要为该任务设置的状态值

            函数返回值 为一个布尔值,表示任务是否成功添加到工作队列中。如果工作队列已经满了,那么就不会添加到队列中,返回 false;否则将任务添加到队列中,并返回 true。

            template 
            bool threadpool::append(T *request, int state)
            {
                // 保护请求队列的互斥锁,上锁
                m_queuelocker.lock();
                // 检查队列是否已满,满了则解锁并返回false
                if (m_workqueue.size() >= m_max_requests)
                {
                    m_queuelocker.unlock();
                    return false;
                }
                // 队列没满,给新任务的状态赋值 m_state代表是读还是写
                request->m_state = state;
                // 添加到队列末尾
                m_workqueue.push_back(request);
                 // 保护请求队列的互斥锁,解锁
                m_queuelocker.unlock();
                // post()对信号量释放,通过m_queuestat对等待的线程进行信号通知,让他们开始执行任务
                m_queuestat.post();
                return true;
            }
            

            6.append_p()方法

            方法append和append_p的区别在于:

            • append方法有一个额外的参数state,用于设置任务的状态。
            • append方法会在向工作队列中添加任务之前,将任务的状态设置为传递进来的参数state。

              因此,append方法比append_p方法多了一个设置任务状态的步骤。(state在这个项目表示读或者写)

              template 
              bool threadpool::append_p(T *request)
              {
                  m_queuelocker.lock();
                  if (m_workqueue.size() >= m_max_requests)
                  {
                      m_queuelocker.unlock();
                      return false;
                  }
                  m_workqueue.push_back(request);
                  m_queuelocker.unlock();
                  m_queuestat.post();
                  return true;
              }
              

              7.worker()方法

              定义了一个返回类型为 void* 的 worker 函数,之前在构造函数中以回调函数形式出现

              这个方法是一个静态函数,它是线程池的工作线程函数,用于在线程池中执行具体任务。

              在每次有新的任务需要执行时,会创建一个新的线程,并将该函数作为线程的入口点。在工作线程中,首先从参数中获取线程池对象的指针,然后调用线程池对象的 run() 方法执行具体的任务。最后,将线程池对象的指针作为返回值返回。

              template 
              void *threadpool::worker(void *arg)
              {
                  // 将传入参数 arg 强制转换为线程池指针类型,赋值给 pool 指针
                  threadpool *pool = (threadpool *)arg;
                  // 调用线程池对象的 run() 方法,开始执行任务
                  pool->run();
                  // 由于线程池对象需要在多个线程之间共享,因此将当前线程执行完毕后的线程池对象指针作为返回值返回
                  return pool;
              }
              

              8.run()方法

              该方法是线程池中的工作线程运行的主要逻辑代码,无限循环地从任务队列中取出任务,并执行任务中的处理逻辑。

              template 
              void threadpool::run()
              {
                  while (true)
                  {
                      // 信号量,使当前线程等待直到任务队列中有任务可供执行
                      m_queuestat.wait();
                      // 获取任务队列锁
                      m_queuelocker.lock();
                      // 检查任务队列是否为空,如果为空则立即释放锁并继续下一次循环
                      if (m_workqueue.empty())
                      {
                          m_queuelocker.unlock();
                          continue;
                      }
                      // 如果任务队列不为空,则从任务队列中取出队头元素request,并将其从队列中删除
                      T *request = m_workqueue.front();
                      m_workqueue.pop_front();
                      // 释放任务队列锁
                      m_queuelocker.unlock();
                      // 检查获取到的任务指针是否为空,如果是空指针则立即继续下一次循环
                      if (!request)
                          continue;
                      // 如果当前采用的是Reactor模式 m_actor_model == 1
                      if (1 == m_actor_model)
                      {
                          // 判断任务当前状态是否为读 request->m_state == 0
                          if (0 == request->m_state)
                          {
                              // 如果读操作成功,调用request->read_once()方法进行读操作
                              if (request->read_once())
                              {
                                  // 将request->improv属性设置为 1,表示已经得到改进
                                  request->improv = 1;
                                  // 创建connectionRAII对象(该类在sql_connection_pool中定义),自动获取一个数据库连接对象
                                  connectionRAII mysqlcon(&request->mysql, m_connPool);
                                  // 调用request->process()方法进行业务处理
                                  request->process();
                              }
                              // 如果读操作失败
                              else
                              {
                                  // 将request->improv属性设置为 1,表示已经得到改进
                                  request->improv = 1;
                                  // 将request->timer_flag属性设置为 1,表示需要添加一个定时器来重试该任务
                                  request->timer_flag = 1;
                              }
                          }
                          // 如果当前任务状态为写
                          else
                          {
                              // 调用request->write()方法进行写操作
                              if (request->write())
                              {
                                  // 如果写操作成功,则将request->improv属性设置为 1,表示已经得到改进
                                  request->improv = 1;
                              }
                              else
                              {
                                  // 如果写操作失败,则将request->improv属性设置为 1,表示已经得到改进
                                  request->improv = 1;
                                  // 将request->timer_flag属性设置为 1,表示需要添加一个定时器来重试该任务
                                  request->timer_flag = 1;
                              }
                          }
                      }
                      // 如果当前采用的是 Proactor 模式 m_actor_model != 1  Proactor的读写不在工作队列处理,这个只进行主程序读取后的数据处理
                      else
                      {
                          // 则创建connectionRAII对象,自动获取一个数据库连接对象
                          connectionRAII mysqlcon(&request->mysql, m_connPool);
                          // 调用request->process()方法进行业务处理
                          request->process();
                      }
                  }
              }
              

转载请注明来自码农世界,本文标题:《webserver--3.半同步半反应堆线程池》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,94人围观)参与讨论

还没有评论,来说两句吧...

Top