[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

码农世界 2024-05-17 后端 65 次浏览 0个评论

[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

文章目录

  • 一.网络层与传输层协议
    • sockaddr结构体继承体系(Linux体系)
    • 贯穿计算机系统的网络通信架构图示:
    • 二.实现并部署多线程并发Tcp服务器框架
      • 线程池模块
      • 序列化反序列化工具模块
      • 通信信道建立模块
      • 服务器主体模块
      • 任务回调模块(根据具体应用场景可重构)
      • Tips:DebugC++代码过程中遇到的问题记录

        [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

        一.网络层与传输层协议

        • 网络层与传输层内置于操作系统的内核中,网络层一般使用ip协议,传输层常用协议为Tcp协议和Udp协议,Tcp协议和Udp协议拥有各自的特点和应用场景:

          [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

          sockaddr结构体继承体系(Linux体系)

          • sockaddr_in结构体用于存储网络通信主机进程的ip和端口号等信息

            [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

            贯穿计算机系统的网络通信架构图示:

            [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

            二.实现并部署多线程并发Tcp服务器框架

            小项目的完整文件的gittee链接

            • Tcp服务器架构:

              [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

              线程池模块

              #pragma once
              #include 
              #include 
              #include "log.hpp"
              #include 
              #include 
              #include 
              template
              class RingQueue{
              private:
                  pthread_mutex_t Clock_;
                  pthread_mutex_t Plock_;
                  sem_t Psem_;
                  sem_t Csem_;
                  std::vector Queue_;
                  int Pptr_;
                  int Cptr_;
                  int capacity_;
              public:
                  RingQueue(int capacity = 10) : Queue_(capacity),Pptr_(0),Cptr_(0),capacity_(capacity){
                      sem_init(&Psem_,0,capacity);
                      sem_init(&Csem_,0,0);
                      pthread_mutex_init(&Clock_,nullptr);
                      pthread_mutex_init(&Plock_,nullptr);
                  }
                  ~RingQueue(){
                      sem_destroy(&Psem_);
                      sem_destroy(&Csem_);
                      pthread_mutex_destroy(&Clock_);
                      pthread_mutex_destroy(&Plock_);
                  }
                  T Pop(){
                      sem_wait(&Csem_);
                      pthread_mutex_lock(&Clock_);
                      T tem = Queue_[Cptr_];
                      Cptr_++;
                      Cptr_ %= capacity_;
                      pthread_mutex_unlock(&Clock_);
                      sem_post(&Psem_);
                      return tem;
                  }
                  void Push(T t){
                      sem_wait(&Psem_);
                      pthread_mutex_lock(&Plock_);
                      Queue_[Pptr_] = t;
                      Pptr_++;
                      Pptr_%= capacity_;
                      pthread_mutex_unlock(&Plock_);
                      sem_post(&Csem_);
                  }
              };
              
              #pragma once
              #include "sem_cp.cpp"
              #include 
              #include 
              #include 
              #include 
              #include "CalTask.cpp"
              template
              class Thread_Pool{
                  struct Thread_Data{
                      int Thread_num;
                      pthread_t tid;
                  };
              private:
                  RingQueue Queue_;  //线程安全的环形队列
                  std::vector thread_arr; //管理线程的容器
                  static std::mutex lock_;            //单例锁
                  static Thread_Pool * ptr_;    //单例指针
              private:
                  Thread_Pool(int capacity_Of_queue = 20) : Queue_(capacity_Of_queue){}
                  Thread_Pool(const Thread_Pool& Tp) = delete;
                  Thread_Pool& operator=(const Thread_Pool & Tp) = delete;
              public:
                  ~Thread_Pool(){}
                  //获取线程池单例-->注意C++的类模板静态成员函数需要在类体外进行定义
                  static Thread_Pool * Getinstance();
                  //创建多线程
                  void Create_thread(int thread_num = 10){
                      Thread_Data T_data;
                      for(int i = 0 ; i < thread_num ; ++i){
                          //注意线程池对象的this指针传递给线程
                          pthread_create(&T_data.tid,nullptr,Routine,this);
                          T_data.Thread_num = i + 1;
                          thread_arr.push_back(T_data);
                      }
                  }
                  //线程等待
                  void Thread_join(){
                      for(int i = 0 ;i < thread_arr.size() ; ++i){
                          pthread_join(thread_arr[i].tid,nullptr);
                      }
                  }
                  //向线程池中加入任务
                  void Push(Task T){
                      Queue_.Push(T);
                  }
                  void Push(Task && T){
                      Queue_.Push(std::forward(T));
                  }
              private:
                  //线程函数-->该函数没有在类外调用,所以无须在类体外定义
                  static void* Routine(void * args){
                      Thread_Pool * Pool = static_cast *>(args);
                      while(true){
                          std::cout << "Thread prepare to work\n" << std::endl;
                          Task Thread_Task = Pool->Queue_.Pop();
                          //要求Task类重载()-->用于执行具体任务
                          Thread_Task();
                      }
                      return nullptr;
                  }
              };
              //初始化静态指针
              template
              Thread_Pool * Thread_Pool::ptr_ = nullptr;
              template
              std::mutex Thread_Pool::lock_;
              //注意C++的类模板静态成员函数需要在类体外进行定义
              template
              Thread_Pool * Thread_Pool::Getinstance(){
                  if(ptr_ == nullptr){
                      lock_.lock();
                      if(ptr_ == nullptr){
                          ptr_ = new Thread_Pool;
                      }
                      lock_.unlock();
                  }
                  return ptr_;
              }
              

              序列化反序列化工具模块

              • 序列反序列化是保证通信过程中数据完整性的关键步骤,保证数据语义完整,结构完整

                [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

                #pragma once
                #include 
                #include 
                // 自定义序列化反序列化协议
                const std::string blank_space_sep = " ";
                const std::string protocol_sep = "\n";
                //封装报文
                std::string Encode(std::string &content){
                    //报文正文字节数
                    std::string package = std::to_string(content.size());
                    package += protocol_sep;
                    package += content;    //用分隔符封装正文
                    package += protocol_sep;
                    return package;
                }
                //解析报文package-->"正文长度"\n"正文"\n
                bool Decode(std::string &package, std::string& content){
                    size_t pos = package.find(protocol_sep);
                    if(pos == std::string::npos) return false;
                    //解析报文正文长度
                    size_t Len = std::atoi(package.substr(0,pos).c_str());
                    //确定报文是否完整
                    size_t total_Len = pos + Len + 2;
                    if(package.size() != total_Len) return false;
                    //获取正文内容
                    content = package.substr(pos+1,Len);
                    package.erase(0,total_Len);
                    return true;
                }
                //用户层协议请求结构体
                class Request{
                public:
                    int x;
                    int y;
                    char op; 
                public:
                    Request(int data1 , int data2 , char op)
                        : x(data1),y(data2),op(op){}
                    Request(){}
                public:
                    //请求结构体 序列化 成报文正文字符串 "x op y"
                    bool Serialize(std::string& out){
                        std::string content = std::to_string(x);
                        content += blank_space_sep;
                        content += op;
                        content += blank_space_sep;
                        content += std::to_string(y);
                        out = content;
                        return true;
                        // 等价的jason代码
                        // Json::Value root;
                        // root["x"] = x;
                        // root["y"] = y;
                        // root["op"] = op;
                        // // Json::FastWriter w;
                        // Json::StyledWriter w;
                        // out = w.write(root);
                        // return true;
                    }
                    //报文正文字符串 反序列化 成请求结构体
                    // "x op y"
                    bool Deserialize(const std::string &in) {
                        size_t left = in.find(blank_space_sep);
                        if(left == std::string::npos)return false;
                        x = std::stoi(in.substr(0,left).c_str());
                        std::size_t right = in.rfind(blank_space_sep);
                        if (right == std::string::npos)return false;
                        y = std::atoi(in.substr(right + 1).c_str());
                        if(left + 2 != right) return false;
                        op = in[left+1];
                        return true;
                        // 等价的jason代码
                        // Json::Value root;
                        // Json::Reader r;
                        // r.parse(in, root);
                        // x = root["x"].asInt();
                        // y = root["y"].asInt();
                        // op = root["op"].asInt();
                        // return true;
                    }
                    void DebugPrint()
                    {
                        std::cout << "新请求构建完成:  " << x << op << y << "=?" << std::endl;
                    }
                };
                //用户层协议请求回应结构体
                class Response{
                public:
                    int result;
                    int code; 
                public:
                    Response(int res , int c)
                        : result(res),code(c){}
                    Response(){}
                public:
                    //请求回应结构体 序列化 成报文正文字符串 "result code"
                    bool Serialize(std::string& out){
                        std::string s = std::to_string(result);
                        s += blank_space_sep;
                        s += std::to_string(code);
                        out = s;
                        return true;
                        // 等价的jason代码
                        // Json::Value root;
                        // root["result"] = result;
                        // root["code"] = code;
                        // // Json::FastWriter w;
                        // Json::StyledWriter w;
                        // out = w.write(root);
                        // return true;
                    }
                    //"result code"
                    //报文正文字符串 反序列化 成请求回应结构体
                    bool Deserialize(const std::string &in) 
                    {
                        std::size_t pos = in.find(blank_space_sep);
                        if (pos == std::string::npos)return false;
                        if(pos == 0 || pos == in.size() - 1) return false;
                        result = std::stoi(in.substr(0, pos).c_str());
                        code = std::stoi(in.substr(pos+1).c_str());
                        return true;
                        // 等价的jason代码
                        // Json::Value root;
                        // Json::Reader r;
                        // r.parse(in, root);
                        // result = root["result"].asInt();
                        // code = root["code"].asInt();
                        // return true;
                    }
                    void DebugPrint()
                    {
                        std::cout << "结果响应完成, result: " << result << ", code: "<< code << std::endl;
                    }
                };
                

                通信信道建立模块

                #pragma once
                #include 
                #include 
                #include    
                #include 
                #include "log.hpp"
                #include 
                #include 
                #include 
                namespace MySocket{
                    //Tcp通讯构建器
                    class TcpServer{
                        enum{
                            UsageError = 1,
                            SocketError,
                            BindError,
                            ListenError,
                        };
                    private:
                        int socketfd_ = -1;
                        std :: string ip_;
                        uint16_t port_;
                        int backlog_ = 10;
                    public:
                        TcpServer(const std::string& ip = "172.19.29.44", uint16_t port = 8081) : ip_(ip) , port_(port){}
                        ~TcpServer(){if(socketfd_ > 0) close(socketfd_);}
                    public:
                        //确定通信协议,建立文件描述符
                        void BuildSocket(){
                            socketfd_ = socket(AF_INET,SOCK_STREAM,0);
                            if(socketfd_ < 0){
                                lg(Fatal,"socket error,%s\n",strerror(errno));
                                exit(SocketError);
                            }
                        }
                        //文件描述符与服务器ip : 端口号绑定
                        void SocketBind(){
                            struct sockaddr_in addr;
                            memset(&addr,0,sizeof(addr));
                            addr.sin_port = htons(port_);
                            addr.sin_family = AF_INET;
                            addr.sin_addr.s_addr = inet_addr(ip_.c_str());
                            if(bind(socketfd_,(const sockaddr*)&addr,sizeof(addr)) < 0){
                                lg(Fatal,"socket bind error,%s\n",strerror(errno));
                                exit(BindError);
                            }
                            lg(Info,"socket bind success\n");
                        }
                        //启动服务监听,等待客户端的连接
                        void Socklisten(){
                            if(socketfd_ <= 0){
                                lg(Fatal,"socket error,%s\n",strerror(errno));
                                exit(SocketError);
                            }
                            if(listen(socketfd_,backlog_) < 0){
                                lg(Fatal, "listen error, %s: %d", strerror(errno), errno);
                                exit(ListenError);
                            }
                        }
                        //服务器接收客户端的连接-->并创建用于通信的文件描述符-->一个客户端连接对应一个文件描述符
                        int SockAccept(std::string& cilent_ip, uint16_t& cilent_port){
                            struct sockaddr_in client_addr;  // 输出型参数,用于获取用户的ip : 端口号
                            memset(&client_addr,0,sizeof(client_addr));
                            socklen_t Len = sizeof(client_addr);
                            int newfd = accept(socketfd_,(struct sockaddr*)&client_addr,&Len);
                            if(newfd < 0){
                                lg(Warning, "accept error, %s: %d", strerror(errno), errno);
                                return -1;
                            }
                            //提取客户端信息-->输出参数
                            char ipstr[64];
                            cilent_ip = inet_ntop(AF_INET,&client_addr.sin_addr,ipstr,sizeof(ipstr));
                            cilent_ip = ipstr;
                            cilent_port = ntohs(client_addr.sin_port);
                            return newfd;
                        }
                    public:
                        int Get_Server_fd(){
                            return socketfd_;
                        }
                        void Close_fd(){
                            if(socketfd_ > 0){
                                close(socketfd_);
                                socketfd_ = -1;
                            }
                        }
                    };
                };
                

                服务器主体模块

                [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

                #pragma once
                #include "ThreadPool.cpp"
                #include "TcpServer.cpp"
                #include "CalTask.cpp"
                #include "log.hpp"
                #include 
                //构建计算器服务器
                class CalServer{
                    const int size = 2048;
                private:
                    Thread_Pool * Pool_ptr_;
                    MySocket::TcpServer Socket_;
                    int Socket_fd_ = -1;
                public:
                    CalServer(const std::string& de_ip = "172.19.29.44",uint16_t de_port = 8081)
                        : Socket_(de_ip,de_port)
                    {
                        Pool_ptr_ = Thread_Pool::Getinstance();
                        if(Pool_ptr_ == nullptr){
                            lg(Fatal,"Pool_ptr_ is nullptr\n");
                            return;
                        }
                        Pool_ptr_->Create_thread();
                    }
                    ~CalServer(){}
                public:
                    //建立Tcp连接条件
                    bool Init(){
                        Socket_.BuildSocket();
                        Socket_fd_ = Socket_.Get_Server_fd();
                        if(Socket_fd_ < 0){
                            lg(Fatal,"BuildSocket failed\n");
                            return true;
                        }
                        Socket_.SocketBind();
                        Socket_.Socklisten();
                        lg(Info, "init server .... done");
                        return true;
                    }
                    //启动服务器
                    void Start(){
                        signal(SIGCHLD, SIG_IGN);
                        signal(SIGPIPE, SIG_IGN);
                        char ReadBuffer[size];
                        while(true){
                            //接受用户请求
                            std::string client_ip;
                            uint16_t client_port;
                            int client_fd = Socket_.SockAccept(client_ip,client_port);
                            if(client_fd < 0){
                                lg(Warning,"SockAccept error\n");
                                continue;
                            }
                            lg(Info, "accept a new link, sockfd: %d, clientip: %s, clientport: %d", client_fd, client_ip.c_str(), client_port);
                            int n = read(client_fd,ReadBuffer,sizeof(ReadBuffer));
                            ReadBuffer[n] = 0;  
                            std::string TaskStr(ReadBuffer);
                            printf("receives mess from client : %s",ReadBuffer);
                            if(n < 0){
                                lg(Warning,"read error\n");
                                break;
                            }
                            CalTask task(client_fd,client_ip,client_port,TaskStr);
                            Pool_ptr_->Push(task);
                        }
                    }
                };
                

                任务回调模块(根据具体应用场景可重构)

                #pragma once
                #include 
                #include "ThreadPool.cpp"
                #include "Protocol.cpp"
                enum{
                    Div_Zero = 1,
                    Mod_Zero,
                    Other_Oper
                };
                class CalTask{
                private:
                    int socketfd_;                //网络通信文件描述符
                    std :: string ip_;            //客户端ip
                    uint16_t port_;               //客户端端口号
                    std::string package_;         //客户请求字符串
                public:
                    CalTask(int socketfd,const std::string& ip , uint16_t & port,std::string & str)
                        : socketfd_(socketfd),ip_(ip),port_(port),package_(str){}
                    CalTask(){}//类一定要有默认构造函数
                    ~CalTask(){}
                public:
                    //执行计算任务并将结果发送给用户
                    void operator() (){
                        std::cout << "Task Running ... \n" << std::endl;
                        std::string content;
                        //将用户发送的报文进行解包获取正文
                        bool r = Decode(package_, content);
                        if (!r)return;
                        //将报文正文进行反序列化
                        Request req;
                        r = req.Deserialize(content);
                        if (!r)return ;
                        req.DebugPrint();
                        content = ""; 
                        //构建计算结果                         
                        Response resp = CalculatorHelper(req);
                        resp.DebugPrint();
                        //计算结果序列化成字符串
                        resp.Serialize(content);
                        //字符串正文封装成报文发送给用户
                        std::string ResStr = Encode(content);
                        write(socketfd_,ResStr.c_str(),ResStr.size());
                        if(socketfd_ > 0)close(socketfd_);
                    }
                private:
                    Response CalculatorHelper(const Request &req){
                        //构建请求回应结构体
                        Response resp(0, 0);
                        switch (req.op){
                        case '+':
                            resp.result = req.x + req.y;
                            break;
                        case '-':
                            resp.result = req.x - req.y;
                            break;
                        case '*':
                            resp.result = req.x * req.y;
                            break;
                        case '/':{
                            if (req.y == 0)
                                resp.code = Div_Zero;
                            else
                                resp.result = req.x / req.y;
                        }
                        break;
                        case '%':{
                            if (req.y == 0)
                                resp.code = Mod_Zero;
                            else
                                resp.result = req.x % req.y;
                        }
                        break;
                        default:
                            resp.code = Other_Oper;
                            break;
                        }
                        return resp;
                    }
                };
                

                Tips:DebugC++代码过程中遇到的问题记录

                • 使用C++类模板时,若在类模板中定义了静态成员函数,且该静态成员函数在类外被调用,则该静态成员函数必须定义在类外,不然链接器无法找到函数体.
                • 注意类模板静态成员的声明格式需要加关键字temlpate<>
                • 声明类模板静态成员无需特化模版类型参数
                • 跨主机并发通信测试:

                  [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

                  [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

转载请注明来自码农世界,本文标题:《[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,65人围观)参与讨论

还没有评论,来说两句吧...

Top