【Muduo】三大核心之EventLoop

【Muduo】三大核心之EventLoop

码农世界 2024-05-24 前端 68 次浏览 0个评论

Muduo网络库的EventLoop模块是网络编程框架中的核心组件,负责事件循环的驱动和管理。以下是对EventLoop模块的详细介绍:

作用与功能:

  • EventLoop是网络服务器中负责循环的重要模块,它持续地监听、获取和处理各种事件,如IO事件、定时器事件等。
  • 它通过轮询访问Poller(如EPollPoller),获取激活的Channel列表,然后使Channel根据自身情况调用相应的回调函数来处理事件。
  • EventLoop确保了每个Loop都是相互独立的,拥有自己的事件循环、Poller监听者和Channel监听通道列表。

    与Poller的关系:

    • Poller负责从事件监听器上获取监听结果,即哪些文件描述符(fd)上发生了哪些事件。
    • EventLoop会轮询访问Poller,以获取这些发生事件的fd及其相关事件。

      与Channel的关系:

      • Channel类是对文件描述符(fd)以及其相关事件的封装。它保存了fd的感兴趣事件、实际发生的事件以及每种事件对应的处理函数。
      • 当Poller检测到某个fd上有事件发生时,EventLoop会找到对应的Channel,并调用其上的回调函数来处理该事件。

        线程模型:

        • EventLoop遵循“one loop one thread”的原则,即每个EventLoop都在一个独立的线程上运行。
        • 这种设计使得事件处理更加高效和清晰,避免了多线程环境下的竞态条件和同步问题。

          mainLoop和subLoop

          在Muduo网络库中,mainLoop和subLoop都是EventLoop的实例,它们分别代表主事件循环和子事件循环。

          mainLoop(主事件循环)

          • mainLoop是整个Muduo网络库的核心事件循环。它负责监听服务器套接字(通常是listenfd),并接受来自客户端的连接请求。
          • mainLoop运行一个Accrptor,包含一个Poller,用于监听一个特定的非阻塞的服务器sockfd上的读事件。当Poller检测到有读事件发生时(一般是新用户连接),mainLoop会在线程池中通过轮询算法选择一个subLoop来处理这个连接的读写和关闭事件。Acceptor将在后续阐述。
          • mainLoop遵循 “one loop one thread” 的原则,即每个mainLoop都在一个独立的线程上运行。这确保了事件处理的高效性和清晰性,避免了多线程环境下的竞态条件和同步问题。

            subLoop(子事件循环):

            • subLoop是mainLoop的子事件循环,用于处理已建立的连接的读写和关闭事件。每个subLoop都在一个独立的线程上运行,有一个用于唤醒自身的fd和Channel,运行一个Poller,并保存自己管理的多个Channel,以实现并发处理多个连接的目的。
            • 当mainLoop接受到一个新的连接请求时,它会根据EventLoopThreadPool中的线程来选择一个subLoop,将新创建的TcpConnection的Channel放入这个subLoop中。这个subLoop会接管该连接的fd,并监听其上的读写和关闭事件。
            • subLoop中的事件处理逻辑与mainLoop类似,也是通过Poller来监听fd上的事件,并调用相应的回调函数来处理这些事件。
            • 由于subLoop是独立的线程,因此它们可以并行处理多个连接,从而提高了服务器的并发处理能力。

              总的来说,mainLoop和subLoop共同构成了Muduo网络库的事件驱动编程框架。mainLoop负责监听服务器套接字并接受连接请求,而subLoop则负责处理已建立的连接的读写和关闭事件。通过合理的线程调度和事件处理机制,Muduo网络库能够高效、稳定地处理大量的并发连接请求。

              EventLoop.h

              #pragma once
              #include "noncopyable.h"
              #include "Timestamp.h"
              #include "CurrentThread.h"
              #include "LogStream.h"
              #include 
              #include 
              #include 
              #include 
              #include 
              #include 
              class Channel;
              class Poller;
              /**
               * 事件循环类  两大模型:Channel  Poller
               * mainLoop只负责处理IO,并返回client的fd
               * subLoop负责监听poll,并处理相应的回调
               * 两者之间通过weakupfd进行通信
              */
              class EventLoop : noncopyable
              {
              public:
                  using Functor = std::function;
                  EventLoop();
                  ~EventLoop();
                  // 开启loop
                  void loop();
                  // 退出loop
                  void quit();
                  Timestamp pollReturnTime() const { return pollReturnTime_; }
                  // 在当前loop执行cb
                  void runInLoop(Functor cb);
                  // 把cb放入队列,唤醒subloop所在的线程,执行cb
                  void queueInLoop(Functor cb);
                  size_t queueSize() const;
                  // 唤醒loop所在的线程,EventLoop::queueInLoop中调用
                  void wakeup();
                  // EventLoop方法 =》 Poller的方法
                  void updateChannel(Channel *channel);
                  void removeChannel(Channel *channel);
                  bool hasChannel(Channel *channel);
                  // 判断EventLoop对象是否在自己的线程中
                  bool isInLoopThread() const {
                      return threadId_ == CurrentThread::tid();
                  }
              private:
                  // waked up后的一个操作 
                  void handleRead();       
                  // 执行回调
                  void doPendingFunctors(); 
                  using ChannelList = std::vector;
                  std::atomic_bool looping_; // 原子操作,通过CAS实现
                  std::atomic_bool quit_;    // 标识退出loop循环
                  const pid_t threadId_; // 记录当前loop所属的线程id
                  Timestamp pollReturnTime_; // poller返回发生事件的channels的时间点
                  std::unique_ptr poller_;
                  int wakeupFd_; // 当mainLoop获取一个新用户的channel,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channel。使用eventfd
                  // unlike in TimerQueue, which is an internal class,
                  // we don't expose Channel to client.
                  std::unique_ptr wakeupChannel_;
                  // scratch variables
                  ChannelList activeChannels_;
                  std::atomic_bool callingPendingFunctors_; // 标识当前loop是否有需要执行的回调操作,正在执行则为true
                  std::vector pendingFunctors_;    // 存储loop需要执行的所有回调操作
                  std::mutex mutex_;                        // 保护pendingFunctors_线程安全
              };

              EventLoop.cc

              #include "EventLoop.h"
              #include "LogStream.h"
              #include "Poller.h"
              #include "Channel.h"
              #include 
              #include 
              #include 
              #include 
              #include 
              // 防止一个线程创建多个EventLoop    threadLocal
              __thread EventLoop *t_loopInThisThread = nullptr;
              // 定义Poller超时时间
              const int kPollTimeMs = 10000;
              // 创建weakupfd,用来notify唤醒subReactor处理新来的channel
              int createEventfd()
              {
                  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
                  if (evtfd < 0)
                  {
                      LOG_FATAL << "Failed in eventfd" << errno;
                  }
                  return evtfd;
              }
              EventLoop::EventLoop()
                  : looping_(false),
                    quit_(false),
                    callingPendingFunctors_(false),
                    threadId_(CurrentThread::tid()),
                    poller_(Poller::newDefaultPoller(this)),
                    wakeupFd_(createEventfd()),
                    wakeupChannel_(new Channel(this, wakeupFd_))
              {
                  LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
                  if (t_loopInThisThread)
                  {
                      LOG_FATAL << "Another EventLoop " << t_loopInThisThread
                                << " exists in this thread " << threadId_;
                  }
                  else
                  {
                      t_loopInThisThread = this;
                  }
                  // 设置weakupfd的事件类型以及发生事件后的回调操作
                  wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
                  // we are always reading the wakeupfd
                  // 每一个EventLoop都将监听weakupChannel的EPOLLIN读事件了
                  // 作用是subloop在阻塞时能够被mainLoop通过weakupfd唤醒
                  wakeupChannel_->enableReading();
              }
              EventLoop::~EventLoop()
              {
                  LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
                            << " destructs in thread " << CurrentThread::tid();
                  wakeupChannel_->disableAll();
                  wakeupChannel_->remove();
                  ::close(wakeupFd_);
                  t_loopInThisThread = NULL;
              }
              void EventLoop::handleRead()
              {
                  uint64_t one = 1;
                  ssize_t n = read(wakeupFd_, &one, sizeof one);
                  if (n != sizeof one)
                  {
                      LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
                  }
              }
              void EventLoop::loop()
              {
                  looping_ = true;
                  quit_ = false;
                  LOG_INFO << "EventLoop " << this << " start looping";
                  while (!quit_)
                  {
                      activeChannels_.clear();
                      // 当前EventLoop的Poll,监听两类fd,client的fd(正常通信的,在baseloop中)和 weakupfd(mainLoop 和 subLoop 通信用来唤醒sub的)
                      pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
                      for (Channel *channel : activeChannels_)
                      {
                          // Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
                          channel->handleEvent(pollReturnTime_);
                      }
                      // 执行当前EventLoop事件循环需要处理的回调操作
                      /**
                       * IO线程 mainLoop 只 accept 然后返回client通信用的fd <= 用channel打包 并分发给 subloop
                       * mainLoop事先注册一个回调cb(需要subLoop来执行),weakup subloop后,
                       * 执行下面的方法,执行之前mainLoop注册的cb操作(一个或多个)
                       */
                      doPendingFunctors();
                  }
                  LOG_INFO << "EventLoop " << this << " stop looping";
                  looping_ = false;
              }
              /**
               * 退出事件循环
               * 1、loop在自己的线程中 调用quit,此时肯定没有阻塞在poll中
               * 2、在其他线程中调用quit,如在subloop(woker)中调用mainLoop(IO)的qiut
               *
               *                  mainLoop
               * 
               *      Muduo库没有 生产者-消费者线程安全的队列 存储Channel
               *      直接使用wakeupfd进行线程间的唤醒       
               *
               * subLoop1         subLoop2        subLoop3
               */
              void EventLoop::quit()
              {
                  quit_ = true;
                  // 2中,此时,若当前woker线程不等于mainLoop线程,将本线程在poll中唤醒
                  if (!isInLoopThread())
                  {
                      wakeup();
                  }
              }
              void EventLoop::runInLoop(Functor cb)
              {
                  // LOG_DEBUG<<"EventLoop::runInLoop  cb:" << (cb != 0);
                  if (isInLoopThread()) // 产生段错误
                  { // 在当前loop线程中 执行cb
                      LOG_DEBUG << "在当前loop线程中 执行cb";
                      cb();
                  }
                  else
                  { // 在其他loop线程执行cb,需要唤醒其loop所在线程,执行cb
                      LOG_DEBUG << "在其他loop线程执行cb,需要唤醒其loop所在线程,执行cb";
                      queueInLoop(cb);
                  }
              }
              void EventLoop::queueInLoop(Functor cb)
              {
                  {
                      std::unique_lock ulock(mutex_);
                      pendingFunctors_.emplace_back(cb);
                  }
                  // 唤醒相应的,需要执行上面回调操作的loop线程
                  // 若当前线程正在执行回调doPendingFunctors,但是又有了新的回调cb
                  // 防止执行完回调后又阻塞在poll上无法执行新cb,所以预先wakeup写入一个数据
                  if (!isInLoopThread() || callingPendingFunctors_) 
                  {
                      wakeup(); // 唤醒loop所在线程
                  }
              }
              // 用来唤醒loop所在的线程,向wakeupfd写一个数据,wakeupChannel就发生读事件,当前loop线程就会被唤醒
              void EventLoop::wakeup()
              {
                  uint64_t one = 1;
                  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
                  if (n != sizeof one)
                  {
                      LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
                  }
              }
              void EventLoop::updateChannel(Channel *channel)
              {
                  // channel是发起方,通过loop调用poll
                  poller_->updateChannel(channel);
              }
              void EventLoop::removeChannel(Channel *channel)
              {
                  // channel是发起方,通过loop调用poll
                  poller_->removeChannel(channel);
              }
              bool EventLoop::hasChannel(Channel *channel)
              {
                  return poller_->hasChannel(channel);
              }
              // 执行回调,由TcpServer提供的回调函数
              void EventLoop::doPendingFunctors()
              {
                  std::vector functors;
                  callingPendingFunctors_ = true; // 正在执行回调操作
                  { // 使用swap,将原pendingFunctors_置空并且释放,其他线程不会因为pendingFunctors_阻塞
                      std::unique_lock lock(mutex_);
                      functors.swap(pendingFunctors_);
                  }
                  for (const Functor &functor : functors)
                  {
                      functor(); // 执行当前loop需要的回调操作
                  }
                  callingPendingFunctors_ = false;
              }
              

转载请注明来自码农世界,本文标题:《【Muduo】三大核心之EventLoop》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,68人围观)参与讨论

还没有评论,来说两句吧...

Top