剖析前的准备知识 阻塞,非阻塞,同步,异步 典型的一次IO的两个阶段:数据准备和数据读写
数据准备:根据系统IO操作的就绪状态
阻塞:调用IO方法的线程进入阻塞状态
非阻塞:不会改变线程的状态,通过返回值判断
数据读写:根据应用程序和内核的交互方式
同步:就网络编程而言,不管是阻塞还是非阻塞,都是要一个结果(在那个时间段),需要程序员手动把数据放入自己准备的变量
异步:CPU把任务完成后通知我,不需要程序员手动把数据放入自己准备的变量,当前线程是处于在做其他任务的时候。
以epoll为例,它是同步,因为我们要把从缓冲区的数据提取到自己准备的变量。
陈硕大神:在处理IO的时候,阻塞和非阻塞都是同步IO。只有使用了特殊的API才是异步IO.
1 2 3 一个典型的网络IO接口调用,分为两个阶段,分别是"数据就绪" 和"数据读写" ,数据就绪阶段分为阻塞和非阻塞,表现的结果就是,阻塞当前线程或是直接返回。 同步表示A向B请求调用一个网络IO接口时(或者调用某个业务逻辑API接口时),数据的读写都是由请求方A自己来完成的(不管是阻塞还是非阻塞);异步表示A向B请求访问一个网络IO接口时(或者调用某个业务逻辑API接口时),向B传入请求的事件及事件发生时通知的方式,A就可以处理其他逻辑了,当B监听到事件处理完成后,会用事先约定好的通知方式,通知A处理结果。
1 2 3 4 同步阻塞 int size=recv(fd,buf,1024,0); 同步非阻塞 int size=recv(fd,buf,1024,0); 异步阻塞 这个不合理 异步非阻塞 异步大多和非阻塞搭配
Unix/Linux上的五种IO模型 阻塞 blocking
非阻塞 non-blocking
IO复用(IO multiplexing)
信号驱动(signal-driven)
内核在第一个阶段是异步,在第二个阶段是同步;与非阻塞IO的区别在于它提供了消息通知机制,不需要用户进程不断的轮询检查,减少了系统API的调用次数,提高了效率。
异步(asynchronous)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <aiocb.h> struct aiocb { int aio_fildes; off_t aio_offset; volatile void * aio_buf; size_t aio_nbytes; int aio_reqprio; struct sigevent aio_sigevent ; int aio_lio_opcode; };
典型的异步非阻塞状态。Node.js采用的网络IO模型
好的网络服务器设计 在这个多核时代,服务端网络编程如何选择线程模型呢?赞同libev作者的观点:one loop per thread is usually a good model,这样多线程服务端编程的问题转换为如何设计一个高效且易于使用的event loop,然后每个线程run一个event loop就行了(当然线程间的同步,互斥少不了,还有其他的耗时事件需要起另外的线程来做)。
event loop是non-blocking网络编程的核心,在现实生活中,non-blocking几乎总是和IO-multiplexing一起使用,原因有两点:
没有人真的会用轮询来检查某个non-blocking IO操作是否完成,这样太浪费CPU资源。
IO-multiplexing一般不能和blocking IO 用在一起,因为blocking IO中的read/write/accept/connect都有可能阻塞当前线程,这样线程就没有办法处理其他socket上的IO事件
所以,当我们提到non-blocking的时候,实际上指的是non-blocking+IO-multiplexing,但用其中任何一个都没有办法很好的实现功能。
epoll+fork和epoll+pthread都有不同的应用场景。
强大的nginx服务器采用了epoll+fork模型作为网络模块的架构设计,实现了简单好用的负载算法,使各个fork网络进程不会忙的越忙,闲的越闲,并且通过引入一把乐观锁解决了该模型导致的服务器惊群现象,功能十分强大。
Reactor模型 反应器模型(Reactor pattern)是一种为处理服务请求并发提交到一个或者多个服务处理程序的事件设计模式。当请求抵达后,服务处理程序使用解多路分配策略,然后同步地派发这些请求至相关的请求处理程序。
重要组件:Event事件,Reactor反应堆,Demultiplex事件分发器,EventHandler事件处理器
epoll select和poll的缺点 select的缺点:
单个进程能够监视的文件描述符的数量存在最大限制,通常是1024,当然可以更改数量,但由于select采用轮询的方式扫描文件描述符,文件描述符数量越多,性能越差;
内核/用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销
select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件
select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。
相比select模型,poll使用链表保存文件描述符,因此没有了监视文件数量的限制,但其他三个缺点存在。
epoll原理及其优势 设想如下场景:有100万个客户端同时与一个服务器进程保持TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的。如何实现这样的高并发?
在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询结束后再将句柄复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,所以select/poll一般只能处理几千的并发连接。
epoll的设计和实现与select完全不同。epoll通过在Linux内核申请一个简易的文件系统(文件系统一般用什么数据结构:B+树,磁盘IO消耗低,效率高)。把原先的select/poll调用分为3个部分:
调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)。
调用epoll_ctl向epoll对象添加这100万个连接的套接字。
调用epoll_wait手机发生的事件的fd资源
如此一来,要实现上面说的场景,只需要在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除。同时epoll_wait的效率很高,调用epoll_wait时,并没有向操作系统复制这100万个连接的句柄数据,内核也不需要去遍历全部的连接。
1 2 3 4 5 6 7 8 9 10 struct eventpoll { ... struct rb_root rbr ; struct list_head rdlist ; ... };
LT模式 内核数据没被读完,就会一直上报数据
ET模式 内核数据只上报一次
muduo采用的是LT
不会丢失数据或者消息。应用没有读取完数据,内核是会不断上报的。
低延迟处理。每次读数据只需要一次系统调用;照顾了多个连接的公平性,不会因为某个连接上的数据量过大而影响其他连接处理消息。
跨平台处理。像select(select和poll是没有ET的)一样可以跨平台使用。
muduo网络库基本使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 #include <muduo/net/TcpServer.h> #include <muduo/net/EventLoop.h> #include <iostream> #include <functional> #include <string> using namespace muduo::net;using namespace muduo;using namespace std;class ChatServer { public : ChatServer (EventLoop* loop, const InetAddress& listenAddr, const string& nameArg) :_server(loop,listenAddr,nameArg) ,_loop(loop) { _server.setConnectionCallback (std::bind (&ChatServer::onConnection,this ,placeholders::_1)); _server.setMessageCallback (std::bind (&ChatServer::onMessage,this ,placeholders::_1,_2,_3)); _server.setThreadNum (4 ); } void start () { _server.start (); } private : void onConnection (const TcpConnectionPtr&conn) { if (conn->connected ()) { std::cout<<conn->peerAddress ().toIpPort ()<<"->" <<conn->localAddress ().toIpPort ()<<"state:online" <<std::endl; } else { std::cout<<conn->peerAddress ().toIpPort ()<<"->" <<conn->localAddress ().toIpPort ()<<"state:offline" <<std::endl; conn->shutdown (); } } void onMessage (const TcpConnectionPtr&conn, Buffer*buffer, Timestamp time) { std::string buf=buffer->retrieveAllAsString (); std::cout<<"recv data: " <<buf<<"time:" <<time.toString ()<<std::endl; conn->send (buf); } muduo::net::TcpServer _server; muduo::net::EventLoop *_loop; }; int main () { EventLoop loop; InetAddress addr ("127.0.0.1" ,6000 ) ; ChatServer server (&loop,addr,"ChatServer" ) ; server.start (); loop.loop (); return 0 ; }
实现muduo功能代码 noncopyable类 noncopyable.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 #pragma once class noncopyable { public : noncopyable (const noncopyable&) = delete ; noncopyable& operator =(const noncopyable&)=delete ; protected : noncopyable ()=default ; ~noncopyable ()=default ; };
Logger日志实现 Timestamp.h(时间类)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #pragma once #include <iostream> #include <string> class Timestamp { public : Timestamp (); explicit Timestamp (int64_t microSecondsDinceEpoch) ; static Timestamp now () ; std::string toString () const ; private : int64_t microSecondsDinceEpoch_; };
Timestamp.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 #include "Timestamp.h" #include <time.h> Timestamp::Timestamp ():microSecondsDinceEpoch_ (0 ){} Timestamp::Timestamp (int64_t microSecondsDinceEpoch) :microSecondsDinceEpoch_ (microSecondsDinceEpoch) {} Timestamp Timestamp::now () { return Timestamp (time (NULL )); } std::string Timestamp::toString () const { char buf[128 ]={0 }; tm *tm_time=localtime (µSecondsDinceEpoch_); snprintf (buf,128 ,"%4d/%02d/%02d %02d:%02d:%02d" , tm_time->tm_year+1900 , tm_time->tm_mon+1 , tm_time->tm_mday, tm_time->tm_hour, tm_time->tm_min, tm_time->tm_sec); return buf; }
Logger.h(日志类)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 #pragma once #include "noncopyable.h" #include <string> #define LOG_INFO(LogmsgFormat,...)\ do\ {\ Logger &logger=Logger::instance();\ logger.setLogLevel(INFO);\ char buf[1024]={0};\ snprintf(buf,1024,LogmsgFormat,##__VA_ARGS__);\ logger.log(buf);\ }while(0) #define LOG_ERROR(LogmsgFormat,...)\ do\ {\ Logger &logger=Logger::instance();\ logger.setLogLevel(ERROR);\ char buf[1024]={0};\ snprintf(buf,1024,LogmsgFormat,##__VA_ARGS__);\ logger.log(buf);\ }while(0) #define LOG_FATAL(LogmsgFormat,...)\ do\ {\ Logger &logger=Logger::instance();\ logger.setLogLevel(FATAL);\ char buf[1024]={0};\ snprintf(buf,1024,LogmsgFormat,##__VA_ARGS__);\ logger.log(buf);\ exit(-1);\ }while(0) #ifdef MUDEBUG #define LOG_DEBUG(LogmsgFormat,...)\ do\ {\ Logger &logger=Logger::instance();\ logger.setLogLevel(DEBUG);\ char buf[1024]={0};\ snprintf(buf,1024,LogmsgFormat,##__VA_ARGS__);\ logger.log(buf);\ }while(0) #else #define LOG_DEBUG(LogmsgFormat,...) #endif enum LogLevel { INFO, ERROR, FATAL, DEBUG, }; class Logger :noncopyable{ public : static Logger& instance () ; void setLogLevel (int level) ; void log (std::string msg) ; private : int logLevel_; Logger (){} };
Logger.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include "Logger.h" #include "Timestamp.h" #include <iostream> Logger& Logger::instance () { static Logger logger; return logger; } void Logger::setLogLevel (int level) { logLevel_=level; } void Logger::log (std::string msg) { switch (logLevel_) { case INFO: std::cout<<"[INFO]" ; break ; case ERROR: std::cout<<"[ERROR]" ; break ; case FATAL: std::cout<<"[FATAL]" ; break ; case DEBUG: std::cout<<"[DEBUG]" ; break ; default : break ; } std::cout << Timestamp::now ().toString () << " : " << msg <<std::endl; }
InetAddress类实现 InetAddress.h(socket地址类型)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #pragma once #include <arpa/inet.h> #include <netinet/in.h> #include <string> class InetAddress { public : explicit InetAddress (uint16_t port,std::string ip="127.0.0.1" ) ; explicit InetAddress (const sockaddr_in &addr) :addr_(addr) { } std::string toIp () const ; std::string toIpPort () const ; uint16_t toPort () const ; const sockaddr_in* getSockAddr () const {return &addr_;} void setSockAddr (const sockaddr_in &addr) {addr_=addr;} private : sockaddr_in addr_; };
InetAddress.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #include "InetAddress.h" #include <string.h> InetAddress::InetAddress (uint16_t port, std::string ip) { bzero (&addr_,sizeof (addr_)); addr_.sin_family=AF_INET; addr_.sin_port=htons (port); addr_.sin_addr.s_addr=inet_addr (ip.c_str ()); } std::string InetAddress::toIp () const { char buf[64 ]={0 }; inet_ntop (AF_INET,&addr_.sin_addr,buf,sizeof (buf)); return buf; } std::string InetAddress::toIpPort () const { char buf[64 ]={0 }; inet_ntop (AF_INET,&addr_.sin_addr,buf,sizeof (buf)); size_t end=strlen (buf); uint16_t port=ntohs (addr_.sin_port); snprintf (buf+end,sizeof (buf)-end,":%u" ,port); return buf; } uint16_t InetAddress::toPort () const { return ntohs (addr_.sin_port); }
Timestamp类实现 Timestamp.h(时间类)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #pragma once #include <iostream> #include <string> class Timestamp { public : Timestamp (); explicit Timestamp (int64_t microSecondsDinceEpoch) ; static Timestamp now () ; std::string toString () const ; private : int64_t microSecondsDinceEpoch_; };
Timestamp.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 #include "Timestamp.h" #include <time.h> Timestamp::Timestamp ():microSecondsDinceEpoch_ (0 ){} Timestamp::Timestamp (int64_t microSecondsDinceEpoch) :microSecondsDinceEpoch_ (microSecondsDinceEpoch) {} Timestamp Timestamp::now () { return Timestamp (time (NULL )); } std::string Timestamp::toString () const { char buf[128 ]={0 }; tm *tm_time=localtime (µSecondsDinceEpoch_); snprintf (buf,128 ,"%4d/%02d/%02d %02d:%02d:%02d" , tm_time->tm_year+1900 , tm_time->tm_mon+1 , tm_time->tm_mday, tm_time->tm_hour, tm_time->tm_min, tm_time->tm_sec); return buf; }
Channel类实现 Channel.h(某个文件描述符fd在事件循环中的代理对象)
Channel类主要封装了fd,events,revents,callbacks
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 #pragma once #include "noncopyable.h" #include "Timestamp.h" #include <functional> #include <memory> class EventLoop ;class Channel : noncopyable{ public : using EventCallback = std::function<void ()>; using ReadEventCallback = std::function<void (Timestamp)>; Channel (EventLoop *loop, int fd); ~Channel (); void handlerEvent (Timestamp receiveTime) ; void setReadCallback (ReadEventCallback cb) { readCallback_ = std::move (cb); } void setWriteCallback (EventCallback cb) { writeCallback_ = std::move (cb); } void setCloseCallback (EventCallback cb) { closeCallback_ = std::move (cb); } void setErrorCallback (EventCallback cb) { errorCallback_ = std::move (cb); } void tie (const std::shared_ptr<void > &) ; int fd () const { return fd_; } int events () const { return events_; } void set_revents (int revt) { revents_ = revt; } void enableReading () { events_ |= kReadEvent; update (); } void disableReading () { events_ &= ~kReadEvent; update (); } void enableWriting () { events_ |= kWriteEvent; update (); } void disableWriting () { events_ &= ~kWriteEvent; update (); } void disableAll () { events_ = kNoneEvent; update (); } bool isNoneEvent () const { return events_ == kNoneEvent; } bool isWriting () const { return events_ & kWriteEvent; } bool isReading () const { return events_ & kReadEvent; } int index () { return index_; } void set_index (int idx) { index_ = idx; } EventLoop *ownerLoop () { return loop_; } void remove () ; private : void update () ; void handleEventWithGuard (Timestamp receiveTime) ; static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent; EventLoop *loop_; const int fd_; int events_; int revents_; int index_; std::weak_ptr<void > tie_; bool tied_; ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_; };
Channel.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 #include "Channel.h" #include "EventLoop.h" #include "Logger.h" #include <sys/epoll.h> const int Channel::kNoneEvent = 0 ;const int Channel::kReadEvent = EPOLLIN | EPOLLPRI;const int Channel::kWriteEvent = EPOLLOUT;Channel::Channel (EventLoop *loop, int fd) : loop_ (loop), fd_ (fd), events_ (0 ), revents_ (0 ), index_ (-1 ), tied_ (false ) { } Channel::~Channel () { } void Channel::tie (const std::shared_ptr<void > &obj) { tie_ = obj; tied_ = true ; } void Channel::update () { loop_->updateChannel (this ) } void Channel::remove () { loop_->removeChannel (this ); } void Channel::handlerEvent (Timestamp receiveTime) { if (tied_) { std::shared_ptr<void > guard = tie_.lock (); if (guard) { handleEventWithGuard (receiveTime); } } else { handleEventWithGuard (receiveTime); } } void Channel::handleEventWithGuard (Timestamp receiveTime) { LOG_INFO ("Channel handlerEvent revents:%d" , revents_); if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)) { if (closeCallback_) { closeCallback_ (); } } if (revents_ & EPOLLERR) { if (errorCallback_) { errorCallback_ (); } } if (revents_ & (EPOLLIN | EPOLLPRI)) { if (readCallback_) { readCallback_ (receiveTime); } } if (revents_ & EPOLLOUT) { if (writeCallback_) { writeCallback_ (); } } }
Poller类实现 Poller.h(IO 多路复用机制(epoll/poll/select)的封装)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #pragma once #include "noncopyable.h" #include "Timestamp.h" #include <vector> #include <unordered_map> class Channel ;class EventLoop ;class Poller : noncopyable{ public : using ChannelList = std::vector<Channel *>; Poller (EventLoop *loop); virtual ~Poller () = default ; virtual Timestamp poll (int timeoutMs, ChannelList *activeChannels) = 0 ; virtual void updateChannel (Channel *channel) = 0 ; virtual void removeChannel (Channel *channel) = 0 ; bool hasChannel (Channel *channel) const ; static Poller *newDefaultPoller (EventLoop *loop) ; protected : using ChannelMap = std::unordered_map<int , Channel *>; ChannelMap channels_; private : EventLoop *ownerLoop_; };
Poller.cc
1 2 3 4 5 6 7 8 9 10 11 12 #include "Poller.h" #include "Channel.h" Poller::Poller (EventLoop *loop) : ownerLoop_ (loop) { } bool Poller::hasChannel (Channel *channel) const { auto it = channels_.find (channel->fd ()); return it != channels_.end () && it->second == channel; }
DefaultPoller.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 #include "Poller.h" #include "EPollPoller.h" #include <stdlib.h> Poller *Poller::newDefaultPoller (EventLoop *loop) { if (getenv ("MUDUO_USE_POLL" )) { return nullptr ; } else { return new EPollPoller (loop); } }
EPollPoller类实现 EPollPoller.h(继承Poller,具体化epoll)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #pragma once #include <vector> #include <sys/epoll.h> #include "Poller.h" class Channel ;class EPollPoller : public Poller{ public : EPollPoller (EventLoop *loop); ~EPollPoller () override ; Timestamp poll (int timeoutMs, ChannelList *activeChannels) override ; void updateChannel (Channel *channel) override ; void removeChannel (Channel *channel) override ; private : static const int kInitEventListSize = 16 ; void fillActiveChannels (int numEvents, ChannelList *activeChannels) const ; void update (int operation, Channel *channel) ; using EventList = std::vector<epoll_event>; int epollfd_; EventList events_; };
EPollPoller.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 #include "EPollPoller.h" #include "Logger.h" #include "errno.h" #include "Channel.h" #include <cstring> #include <unistd.h> const int kNew = -1 ; const int kAdded = 1 ;const int kDeleted = 2 ;EPollPoller::EPollPoller (EventLoop *loop) : Poller (loop), epollfd_ (epoll_create1 (EPOLL_CLOEXEC)), events_ (kInitEventListSize) { if (epollfd_ < 0 ) { LOG_FATAL ("epoll_create error:%d \n" , errno); } } EPollPoller::~EPollPoller () { close (epollfd_); } Timestamp EPollPoller::poll (int timeoutMs, ChannelList *activeChannels) { LOG_INFO ("func=%s -> fd total count:%lu\n" , __FUNCTION__, channels_.size ()); int numEvents = ::epoll_wait (epollfd_, &*events_.begin (), static_cast <int >(events_.size ()), timeoutMs); int savedError = errno; Timestamp now (Timestamp::now()) ; if (numEvents > 0 ) { LOG_INFO ("%d events happened\n" , numEvents); fillActiveChannels (numEvents, activeChannels); if (numEvents == events_.size ()) { events_.resize (events_.size () * 2 ); } } else if (numEvents == 0 ) { LOG_DEBUG ("%s timeout!\n" , __FUNCTION__); } else { if (savedError != EINTR) { errno = savedError; LOG_ERROR ("EPOLLPoller::poll() err!" ); } } return now; } void EPollPoller::updateChannel (Channel *channel) { const int index = channel->index (); LOG_INFO ("func=%s => fd=%d events=%d index=%d \n" , __FUNCTION__, channel->fd (), channel->events (), index); if (index == kNew || index == kDeleted) { if (index == kNew) { int fd = channel->fd (); channels_[fd] = channel; } channel->set_index (kAdded); update (EPOLL_CTL_ADD, channel); } else { int fd = channel->fd (); if (channel->isNoneEvent ()) { update (EPOLL_CTL_DEL, channel); channel->set_index (kDeleted); } else { update (EPOLL_CTL_MOD, channel); } } } void EPollPoller::removeChannel (Channel *channel) { int fd = channel->fd (); channels_.erase (fd); LOG_INFO ("func=%s => fd=%d \n" , __FUNCTION__, channel->fd ()); int index = channel->index (); if (index == kAdded) { update (EPOLL_CTL_DEL, channel); } channel->set_index (kNew); } void EPollPoller::fillActiveChannels (int numEvents, ChannelList *activeChannels) const { for (int i = 0 ; i < numEvents; ++i) { Channel *channel = static_cast <Channel *>(events_[i].data.ptr); channel->set_revents (events_[i].events); activeChannels->push_back (channel); } } void EPollPoller::update (int operation, Channel *channel) { epoll_event event; memset (&event, 0 , sizeof (event)); event.events = channel->events (); event.data.ptr = channel; int fd = channel->fd (); if (::epoll_ctl (epollfd_, operation, fd, &event) < 0 ) { LOG_ERROR ("epoll_ctl del error:%d\n" , errno); } else { LOG_FATAL ("epoll_ctl del error:%d\n" , errno); } }
获取线程tid CurrentThread.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #pragma once namespace CurrentThread{ extern __thread int t_cachedTid; void cacheTid () ; inline int tid () { if (__builtin_expect(t_cachedTid==0 ,0 )) { cacheTid (); } return t_cachedTid; } }
CurrentThread.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include "CurrentThread.h" #include <unistd.h> #include <sys/syscall.h> namespace CurrentThread{ __thread int t_cachedTid; void cacheTid () { if (t_cachedTid==0 ) { t_cachedTid=static_cast <pid_t >(::syscall (SYS_gettid)); } } }
EventLoop类实现 EventLoop.h(事件分发器)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 #pragma once #include <functional> #include <vector> #include <atomic> #include <memory> #include <mutex> #include "noncopyable.h" #include "Timestamp.h" #include "CurrentThread.h" class Channel ;class Poller ;class EventLoop : noncopyable{ public : using Functor = std::function<void ()>; EventLoop (); ~EventLoop (); void loop () ; void quit () ; Timestamp pollReturnTime () const { return pollReturnTime_; } void runInLoop (Functor cb) ; void queueInLoop (Functor cb) ; void wakeup () ; void updateChannel (Channel *channel) ; void removeChannel (Channel *channel) ; bool hasChannel (Channel *channel) ; bool isInLoopThread () const { return threadId_ == CurrentThread::tid (); } private : void handlerRead () ; void doPendingFunctors () ; using ChannelList = std::vector<Channel *>; std::atomic_bool looping_; std::atomic_bool quit_; const pid_t threadId_; Timestamp pollReturnTime_; std::unique_ptr<Poller> poller_; int wakeupFd_; std::unique_ptr<Channel> wakeupChannel_; ChannelList activeChannels_; std::atomic_bool callingPendingFunctors_; std::vector<Functor> PendingFunctors_; std::mutex mutex_; };
EventLoop.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 #include "EventLoop.h" #include "Logger.h" #include "Poller.h" #include "Channel.h" #include <sys/eventfd.h> #include <unistd.h> #include <fcntl.h> #include <errno.h> #include <memory> __thread EventLoop *t_loopThisThread = nullptr ; const int kPollTimeMs = 10000 ;int createEventfd () { int evtfd = ::eventfd (0 , EFD_NONBLOCK | EFD_CLOEXEC); if (evtfd < 0 ) { LOG_FATAL ("eventfd err:%d\n" , errno); } return evtfd; } EventLoop::EventLoop () : looping_ (false ), quit_ (false ), callingPendingFunctors_ (false ), threadId_ (CurrentThread::tid ()), poller_ (Poller::newDefaultPoller (this )), wakeupFd_ (createEventfd ()), wakeupChannel_ (new Channel (this , wakeupFd_)) { LOG_DEBUG ("EventLoop create %p in thread %d\n" , this , threadId_); if (t_loopThisThread) { LOG_FATAL ("Another EventLoop %p exists in this thread %d \n" , t_loopThisThread, threadId_); } else { t_loopThisThread = this ; } wakeupChannel_->setReadCallback (std::bind (&EventLoop::handlerRead, this )); wakeupChannel_->enableReading (); } EventLoop::~EventLoop () { wakeupChannel_->disableAll (); wakeupChannel_->remove (); ::close (wakeupFd_); t_loopThisThread = nullptr ; } void EventLoop::loop () { looping_ = true ; quit_ = false ; LOG_INFO ("EventLoop %p start looping\n" , this ); while (!quit_) { activeChannels_.clear (); pollReturnTime_ = poller_->poll (kPollTimeMs, &activeChannels_); for (Channel *channel : activeChannels_) { channel->handlerEvent (pollReturnTime_); } doPendingFunctors (); } LOG_INFO ("EventLoop %p stop looping" , this ); looping_ = false ; } void EventLoop::quit () { quit_ = true ; if (!isInLoopThread ()) { wakeup (); } } void EventLoop::runInLoop (Functor cb) { if (isInLoopThread ()) { cb (); } else { queueInLoop (cb); } } void EventLoop::queueInLoop (Functor cb) { { std::unique_lock<std::mutex> lokc (mutex_) ; PendingFunctors_.emplace_back (std::move (cb)); } if (!isInLoopThread () || callingPendingFunctors_) { wakeup (); } } void EventLoop::handlerRead () { uint64_t one = 1 ; ssize_t n = read (wakeupFd_, &one, sizeof (one)); if (n != sizeof (one)) { LOG_ERROR ("EventLoop::handleRead() reads %ld bytes instead of 8" , n); } } void EventLoop::wakeup () { uint64_t one = 1 ; ssize_t n = write (wakeupFd_, &one, sizeof (one)); if (n != sizeof (one)) { LOG_ERROR ("EventLoop::wakeup() writes %lu bytes instead of 8" , n); } } void EventLoop::updateChannel (Channel *channel) { poller_->updateChannel (channel); } void EventLoop::removeChannel (Channel *channel) { poller_->removeChannel (channel); } bool EventLoop::hasChannel (Channel *channel) { return poller_->hasChannel (channel); } void EventLoop::doPendingFunctors () { std::vector<Functor> functors; callingPendingFunctors_ = true ; { std::unique_lock<std::mutex> lock (mutex_) ; functors.swap (PendingFunctors_); } for (const Functor &functor : functors) { functor (); } callingPendingFunctors_ = false ; }
Thread类实现 Thread.h(对底层pthread的简单封装)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #pragma once #include "noncopyable.h" #include <functional> #include <thread> #include <memory> #include <unistd.h> #include <string> #include <atomic> class Thread : noncopyable{ public : using ThreadFunc = std::function<void ()>; explicit Thread (ThreadFunc, const std::string &name = std::string()) ; ~Thread (); void start () ; void join () ; bool started () const { return started_; } pid_t tid () const { return tid_; } const std::string &name () const { return name_; } static int numCreated () { return numCreated_; } private : void setDefaultName () ; bool started_; bool joined_; std::shared_ptr<std::thread> thread_; pid_t tid_; ThreadFunc func_; std::string name_; static std::atomic_int numCreated_; };
Thread.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 #include "Thread.h" #include "CurrentThread.h" #include <semaphore.h> std::atomic_int Thread::numCreated_ (0 ) ;Thread::Thread (ThreadFunc func, const std::string &name) :started_ (false ) ,joined_ (false ) ,tid_ (0 ) ,func_ (std::move (func)) ,name_ (name) { setDefaultName (); } Thread::~Thread () { if (started_&&!joined_) { thread_->detach (); } } void Thread::start () { started_=true ; sem_t sem; sem_init (&sem,false ,0 ); thread_=std::shared_ptr <std::thread>(new std::thread ([&](){ tid_=CurrentThread::tid (); sem_post (&sem); func_ (); })); sem_wait (&sem); } void Thread::join () { joined_=true ; thread_->join (); } void Thread::setDefaultName () { int num=++numCreated_; if (name_.empty ()) { char buf[32 ]={0 }; snprintf (buf,sizeof (buf),"Thread%d" ,num); name_=buf; } }
EventLoopThread类实现 EventLoopThread.h(加入了EventLoop)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #pragma once #include <functional> #include <mutex> #include <condition_variable> #include <string> #include "noncopyable.h" #include "Thread.h" class EventLoop ;class EventLoopThread : noncopyable{ public : using ThreadInitCallback = std::function<void (EventLoop *)>; EventLoopThread (const ThreadInitCallback &cb = ThreadInitCallback (), const std::string &name = std::string ()); ~EventLoopThread (); EventLoop *startLoop () ; private : void threadFunc () ; EventLoop *loop_; bool exiting_; Thread thread_; std::mutex mutex_; std::condition_variable cond_; ThreadInitCallback callback_; };
EventLoopThread.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 #include "EventLoopThread.h" #include "EventLoop.h" #include <memory> EventLoopThread::EventLoopThread (const ThreadInitCallback &cb, const std::string &name) : loop_ (nullptr ), exiting_ (false ), thread_ (std::bind (&EventLoopThread::threadFunc, this ), name), mutex_ (), cond_ (), callback_ (cb) { } EventLoopThread::~EventLoopThread () { exiting_ = true ; if (loop_ != nullptr ) { loop_->quit (); thread_.join (); } } EventLoop *EventLoopThread::startLoop () { thread_.start (); EventLoop *loop = nullptr ; { std::unique_lock<std::mutex> lock (mutex_) ; while (loop_ == nullptr ) { cond_.wait (lock); } loop = loop_; } return loop; } void EventLoopThread::threadFunc () { EventLoop loop; if (callback_) { callback_ (&loop); } { std::unique_lock<std::mutex> lock (mutex_) ; loop_ = &loop; cond_.notify_one (); } loop.loop (); std::unique_lock<std::mutex> lock (mutex_) ; loop_ = nullptr ; }
EventLoopThreadPool类实现 EventLoopThreadPool.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #pragma once #include <functional> #include <string> #include <vector> #include <memory> #include "noncopyable.h" class EventLoop ;class EventLoopThread ;class EventLoopThreadPool : noncopyable{ public : using ThreadInitCallback = std::function<void (EventLoop *)>; EventLoopThreadPool (EventLoop *baseLoop, const std::string &nameArg); ~EventLoopThreadPool (); void setThreadNum (int numThreads) { numThreads_ = numThreads; } void start (const ThreadInitCallback &cb = ThreadInitCallback()) ; EventLoop *getNextLoop () ; std::vector<EventLoop*> getAllLoops () ; bool started () const {return started_;} const std::string& name () const {return name_;} private : EventLoop *baseLoop_; std::string name_; bool started_; int numThreads_; int next_; std::vector<std::unique_ptr<EventLoopThread>> threads_; std::vector<EventLoop *> loops_; };
EventLoopThreadPool.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 #include "EventLoopThreadPool.h" #include "EventLoopThread.h" #include <memory> EventLoopThreadPool::EventLoopThreadPool (EventLoop *baseLoop, const std::string &nameArg) : baseLoop_ (baseLoop), name_ (nameArg), started_ (false ), numThreads_ (0 ), next_ (0 ) { } EventLoopThreadPool::~EventLoopThreadPool () { } void EventLoopThreadPool::start (const ThreadInitCallback &cb) { started_ = true ; for (int i = 0 ; i < numThreads_; ++i) { char buf[name_.size () + 32 ]; snprintf (buf, sizeof (buf), "%s%d" , name_.c_str (), i); EventLoopThread *t = new EventLoopThread (cb, buf); threads_.push_back (std::unique_ptr <EventLoopThread>(t)); loops_.push_back (t->startLoop ()); } if (numThreads_ == 0 && cb) { cb (baseLoop_); } } EventLoop *EventLoopThreadPool::getNextLoop () { EventLoop *loop = baseLoop_; if (!loops_.empty ()) { loop = loops_[next_]; ++next_; if (next_ >= loops_.size ()) { next_ = 0 ; } } return loop; } std::vector<EventLoop *> EventLoopThreadPool::getAllLoops () { if (loops_.empty ()) { return std::vector <EventLoop *>(1 , baseLoop_); } else { return loops_; } }
Socket类实现 Socket.h(封装底层socket)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #pragma once #include "noncopyable.h" class InetAddress ;class Socket : noncopyable{ public : explicit Socket (int sockfd) : sockfd_(sockfd) { } ~Socket (); int fd () const { return sockfd_; } void bindAddress (const InetAddress &localaddr) ; void listen () ; int accept (InetAddress *peeraddr) ; void shutdownWrite () ; void setTcpNoDelay (bool on) ; void setReuseAddr (bool on) ; void setReusePort (bool on) ; void setKeepAlive (bool on) ; private : const int sockfd_; };
Socket.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 #include "Socket.h" #include "Logger.h" #include "InetAddress.h" #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <string.h> #include <netinet/tcp.h> Socket::~Socket () { ::close (sockfd_); } void Socket::bindAddress (const InetAddress &localaddr) { if (0 !=::bind (sockfd_,(sockaddr *)localaddr.getSockAddr (),sizeof (sockaddr_in))) { LOG_FATAL ("bind sockfd:%d fail \n" ,sockfd_); } } void Socket::listen () { if (0 !=::listen (sockfd_,1024 )) { LOG_FATAL ("listen sockfd:%d fail \n" ,sockfd_); } } int Socket::accept (InetAddress *peeraddr) { sockaddr_in addr; socklen_t len=sizeof (addr); bzero (&addr,sizeof (addr)); int connfd=::accept4 (sockfd_,(sockaddr *)&addr,&len,SOCK_NONBLOCK|SOCK_CLOEXEC); if (connfd>=0 ) { peeraddr->setSockAddr (addr); } return connfd; } void Socket::shutdownWrite () { if (::shutdown (sockfd_,SHUT_RDWR)<0 ) { LOG_ERROR ("sockets::shutdownWrite error" ); } } void Socket::setTcpNoDelay (bool on) { int optval=on ? 1 :0 ; ::setsockopt (sockfd_,IPPROTO_TCP,TCP_NODELAY,&optval,sizeof (optval)); } void Socket::setReuseAddr (bool on) { int optval=on ? 1 :0 ; ::setsockopt (sockfd_,SOL_SOCKET,SO_REUSEADDR,&optval,sizeof (optval)); } void Socket::setReusePort (bool on) { int optval=on ? 1 :0 ; ::setsockopt (sockfd_,IPPROTO_TCP,SO_REUSEPORT,&optval,sizeof (optval)); } void Socket::setKeepAlive (bool on) { int optval=on ? 1 :0 ; ::setsockopt (sockfd_,IPPROTO_TCP,SO_KEEPALIVE,&optval,sizeof (optval)); }
Acceptor类实现 Acceptor.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #pragma once #include "noncopyable.h" #include "Socket.h" #include "Channel.h" #include <functional> class EventLoop ;class InetAddress ;class Acceptor : noncopyable{ public : using newConnectionCallback = std::function<void (int sockfd, const InetAddress &)>; Acceptor (EventLoop *loop, const InetAddress &listenAddr, bool reuseport); ~Acceptor (); void setNewConnectionCallback (const newConnectionCallback &cb) { newConnectionCallback_ = cb; } bool listenning () const { return listenning_; } void listen () ; private : void handleRead () ; EventLoop *loop_; Socket acceptSocket_; Channel acceptChannel_; newConnectionCallback newConnectionCallback_; bool listenning_; };
Acceptor.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 #include "Acceptor.h" #include "Logger.h" #include "InetAddress.h" #include <sys/types.h> #include <sys/socket.h> #include <unistd.h> static int createNonblocking () { int sockfd = ::socket (AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0 ); if (sockfd < 0 ) { LOG_FATAL ("%s:%s:%d listen socket create err:%d \n" , __FILE__, __FUNCTION__, __LINE__, errno); } return sockfd; } Acceptor::Acceptor (EventLoop *loop, const InetAddress &listenAddr, bool reuseport) : loop_ (loop), acceptSocket_ (createNonblocking ()) ,acceptChannel_ (loop, acceptSocket_.fd ()), listenning_ (false ) { acceptSocket_.setReuseAddr (true ); acceptSocket_.setReusePort (true ); acceptSocket_.bindAddress (listenAddr); acceptChannel_.setReadCallback (std::bind (&Acceptor::handleRead, this )); } Acceptor::~Acceptor () { acceptChannel_.disableAll (); acceptChannel_.remove (); } void Acceptor::listen () { listenning_ = true ; acceptSocket_.listen (); acceptChannel_.enableReading (); } void Acceptor::handleRead () { InetAddress peerAddr; int connfd = acceptSocket_.accept (&peerAddr); if (connfd >= 0 ) { if (newConnectionCallback_) { newConnectionCallback_ (connfd, peerAddr); } else { ::close (connfd); } } else { LOG_ERROR ("%s:%s:%d accept err:%d \n" , __FILE__, __FUNCTION__, __LINE__, errno); if (errno == EMFILE) { LOG_ERROR ("%s:%s:%d sockfd reached limit! \n" , __FILE__, __FUNCTION__, __LINE__); } } }
Buffer类实现 Buffer.h(网络库底层的缓冲区类型定义)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 #pragma once #include <vector> #include <string> #include <algorithm> class Buffer { public : static const size_t kCheapPrepend = 8 ; static const size_t kInitialSize = 1024 ; explicit Buffer (size_t initialSize = kInitialSize) : buffer_(kCheapPrepend + initialSize), readerIndex_(kCheapPrepend), writerIndex_(kCheapPrepend) { } size_t readableBytes () const { return writerIndex_ - readerIndex_; } size_t writableBytes () const { return buffer_.size () - writerIndex_; } size_t prependableBytes () const { return readerIndex_; } const char *peek () const { return begin () + readerIndex_; } void retrieve (size_t len) { if (len < readableBytes ()) { readerIndex_ += len; } else { retrieveAll (); } } void retrieveAll () { readerIndex_ = writerIndex_ = kCheapPrepend; } std::string retrieveAllAsString () { return retrieveAsString (readableBytes ()); } std::string retrieveAsString (size_t len) { std::string result (peek(), len) ; retrieve (len); return result; } void ensureWriteableBytes (size_t len) { if (writableBytes () < len) { makeSpace (len); } } void append (const char *data, size_t len) { ensureWriteableBytes (len); std::copy (data, data + len, beginWrite ()); writerIndex_ += len; } char *beginWrite () { return begin () + writerIndex_; } const char *beginWrite () const { return begin () + writerIndex_; } ssize_t readFd (int fd, int *saveErrno) ; ssize_t writeFd (int fd,int *saveErrno) ; private : char *begin () { return &*buffer_.begin (); } const char *begin () const { return &*buffer_.begin (); } void makeSpace (size_t len) { if (writableBytes () + prependableBytes () < len + kCheapPrepend) { buffer_.resize (writerIndex_ + len); } else { size_t readable = readableBytes (); std::copy (begin () + readerIndex_, begin () + writerIndex_, begin () + kCheapPrepend); readerIndex_ = kCheapPrepend; writerIndex_ = readerIndex_ + readable; } } std::vector<char > buffer_; size_t readerIndex_; size_t writerIndex_; };
Buffer.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include "Buffer.h" #include <errno.h> #include <unistd.h> #include <sys/uio.h> ssize_t Buffer::readFd (int fd, int *saveErrno) { char extrabuf[65536 ] = {0 }; struct iovec vec[2 ]; const size_t writable = writableBytes (); vec[0 ].iov_base = begin () + writerIndex_; vec[0 ].iov_len = writable; vec[1 ].iov_base = extrabuf; vec[1 ].iov_len = sizeof (extrabuf); const int iovcnt = (writable < sizeof (extrabuf)) ? 2 : 1 ; const ssize_t n = ::readv (fd, vec, iovcnt); if (n < 0 ) { *saveErrno = errno; } else if (n <= writable) { writerIndex_ += n; } else { writerIndex_ = buffer_.size (); append (extrabuf, n - writable); } return n; } ssize_t Buffer::writeFd (int fd,int *saveErrno) { ssize_t n=::write (fd,peek (),readableBytes ()); if (n<0 ) { *saveErrno=errno; } return n; }
Callbacks.h(常用的回调类型) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #pragma once #include <memory> #include <functional> class Buffer ;class TcpConnection ;class Timestamp ;using TcpConnectionPtr=std::shared_ptr<TcpConnection>;using ConnectionCallback=std::function<void (const TcpConnectionPtr&)>;using CloseCallback=std::function<void (const TcpConnectionPtr&)>;using WriteCompleteCallback=std::function<void (const TcpConnectionPtr&)>;using MessageCallback=std::function<void (const TcpConnectionPtr&,Buffer*,Timestamp)>;using HignWaterMarkCallback=std::function<void (const TcpConnectionPtr&,size_t )>;
TcpConnection类实现 TcpConnection.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 #pragma once #include "noncopyable.h" #include "InetAddress.h" #include "Callbacks.h" #include "Buffer.h" #include "Timestamp.h" #include <memory> #include <string> #include <atomic> class Channel ;class EventLoop ;class Socket ;class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>{ public : TcpConnection (EventLoop *loop, const std::string &name, int sockfd, const InetAddress &localAddr, const InetAddress &peerAddr); ~TcpConnection (); EventLoop *getLoop () const { return loop_; } const std::string &name () const { return name_; } const InetAddress &localAddress () const { return localAddr_; } const InetAddress &peerAddress () const { return peerAddr_; } bool connected () const { return state_ == kConnected; } void send (const std::string &buf) ; void shutdown () ; void setConnectionCallback (const ConnectionCallback &cb) { connectionCallback_ = cb; } void setMessageCallback (const MessageCallback &cb) { messageCallback_ = cb; } void setWriteCompleteCallback (const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; } void setHighWaterMarkCallback (const HignWaterMarkCallback &cb, size_t highWaterMark) { hignWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; } void setCloseCallback (const CloseCallback &cb) { closeCallback_ = cb; } void connectEstablished () ; void connectedDestroyed () ; private : enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; void setState (StateE state) {state_=state;} void handleRead (Timestamp receiveTime) ; void handleWrite () ; void handleClose () ; void handleError () ; void sendInLoop (const void *message, size_t len) ; void shutdownInLoop () ; EventLoop *loop_; const std::string name_; std::atomic_int state_; bool reading_; std::unique_ptr<Socket> socket_; std::unique_ptr<Channel> channel_; const InetAddress localAddr_; const InetAddress peerAddr_; ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_; HignWaterMarkCallback hignWaterMarkCallback_; CloseCallback closeCallback_; size_t highWaterMark_; Buffer inputBuffer_; Buffer outputBuffer_; };
TcpConnection.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 #include "TcpConnection.h" #include "Logger.h" #include "Socket.h" #include "Channel.h" #include "Timestamp.h" #include "EventLoop.h" #include <functional> #include <unistd.h> #include <errno.h> #include <sys/types.h> #include <sys/socket.h> #include <string.h> #include <netinet/tcp.h> static EventLoop *CheckLoopNotNull (EventLoop *loop) { if (loop == nullptr ) { LOG_FATAL ("%s:%s:%d TcpConnection Loop is null! \n" , __FILE__, __FUNCTION__, __LINE__); } return loop; } TcpConnection::TcpConnection (EventLoop *loop, const std::string &nameArg, int sockfd, const InetAddress &localAddr, const InetAddress &peerAddr) : loop_ (CheckLoopNotNull (loop)) , name_ (nameArg), state_ (kConnecting) , reading_ (true ) , socket_ (new Socket (sockfd)) , channel_ (new Channel (loop, sockfd)) , localAddr_ (localAddr) , peerAddr_ (peerAddr) , highWaterMark_ (64 * 1024 * 1024 ) { channel_->setReadCallback ( std::bind (&TcpConnection::handleRead, this , std::placeholders::_1)); channel_->setWriteCallback ( std::bind (&TcpConnection::handleWrite, this )); channel_->setCloseCallback ( std::bind (&TcpConnection::handleClose, this )); channel_->setErrorCallback ( std::bind (&TcpConnection::handleError, this )); LOG_INFO ("TcpConnection::ctor[%s] at fd=%d\n" , name_.c_str (), sockfd); socket_->setKeepAlive (true ); } TcpConnection::~TcpConnection () { LOG_INFO ("TcpConnection::dtor[%s] at fd=%d state=%d\n" , name_.c_str (), channel_->fd (), (int )state_); } void TcpConnection::send (const std::string &buf) { if (state_==kConnected) { if (loop_->isInLoopThread ()) { sendInLoop (buf.c_str (),buf.size ()); } else { loop_->runInLoop ( std::bind (&TcpConnection::sendInLoop,this ,buf.c_str (),buf.size ()) ); } } } void TcpConnection::sendInLoop (const void *data, size_t len) { ssize_t nwrote=0 ; ssize_t remaining=len; bool faultError=false ; if (state_==kDisconnected) { LOG_ERROR ("disconnected,give up writing" ); return ; } if (!channel_->isWriting ()&&outputBuffer_.readableBytes ()==0 ) { nwrote=::write (channel_->fd (),data,len); if (nwrote>=0 ) { remaining=len-nwrote; if (remaining==0 &&writeCompleteCallback_) { loop_->queueInLoop ( std::bind (writeCompleteCallback_,shared_from_this ()) ); } } else { nwrote=0 ; if (errno!=EWOULDBLOCK) { LOG_ERROR ("TcpConnection::sendInLoop" ); if (errno==EPIPE||errno==ECONNRESET) { faultError=true ; } } } } if (!faultError&&remaining>0 ) { size_t oldLen=outputBuffer_.readableBytes (); if (oldLen+remaining>=highWaterMark_&&oldLen<highWaterMark_&&hignWaterMarkCallback_) { loop_->queueInLoop ( std::bind (hignWaterMarkCallback_,shared_from_this (),oldLen+remaining) ); } outputBuffer_.append ((char *)data+nwrote,remaining); if (!channel_->isWriting ()) { channel_->enableWriting (); } } } void TcpConnection::shutdown () { if (state_==kConnected) { setState (kDisconnecting); loop_->runInLoop ( std::bind (&TcpConnection::shutdownInLoop,this ) ); } } void TcpConnection::shutdownInLoop () { if (!channel_->isWriting ()) { socket_->shutdownWrite (); } } void TcpConnection::connectEstablished () { setState (kConnected); channel_->tie (shared_from_this ()); channel_->enableReading (); connectionCallback_ (shared_from_this ()); } void TcpConnection::connectedDestroyed () { if (state_==kConnected) { setState (kDisconnected); channel_->disableAll (); connectionCallback_ (shared_from_this ()); } channel_->remove (); } void TcpConnection::handleRead (Timestamp receiveTime) { int saveErrno=0 ; ssize_t n=inputBuffer_.readFd (channel_->fd (),&saveErrno); if (n>0 ) { messageCallback_ (shared_from_this (),&inputBuffer_,receiveTime); } else if (n==0 ) { handleClose (); } else { errno=saveErrno; LOG_ERROR ("TcpConnection::handleRead" ); handleError (); } } void TcpConnection::handleWrite () { if (channel_->isWriting ()) { int saveErrno=0 ; ssize_t n=outputBuffer_.writeFd (channel_->fd (),&saveErrno); if (n>0 ) { outputBuffer_.retrieve (n); if (outputBuffer_.readableBytes ()==0 ) { channel_->disableWriting (); if (writeCompleteCallback_) { loop_->queueInLoop ( std::bind (writeCompleteCallback_,shared_from_this ()) ); } if (state_==kDisconnecting) { shutdownInLoop (); } } } else { LOG_ERROR ("TcpConnection::handleWrite" ); } } else { LOG_ERROR ("TcpConnection fd=%d is down,no more writing \n" ,channel_->fd ()); } } void TcpConnection::handleClose () { LOG_INFO ("TcpConnection::handleClose fd=%d state=%d \n" ,channel_->fd (),(int )state_); setState (kDisconnected); channel_->disableAll (); TcpConnectionPtr connptr (shared_from_this()) ; connectionCallback_ (connptr); closeCallback_ (connptr); } void TcpConnection::handleError () { int optval; socklen_t optlen=sizeof (optval); int err=0 ; if (::getsockopt (channel_->fd (),SOL_SOCKET,SO_ERROR,&optval,&optlen)<0 ) { err=errno; } else { err=optval; } LOG_ERROR ("TcpConnection::handleError name:%s - SO_ERROR:%d\n" ,name_.c_str (),err); }
TcpServer类实现 TcpServer.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 #pragma once #include "EventLoop.h" #include "Acceptor.h" #include "InetAddress.h" #include "noncopyable.h" #include "EventLoopThreadPool.h" #include "Callbacks.h" #include "TcpConnection.h" #include "Buffer.h" #include <functional> #include <string> #include <memory> #include <atomic> #include <unordered_map> class TcpServer : noncopyable{ public : using ThreadInitCallback = std::function<void (EventLoop *)>; enum Option { kNoReusePort, kReusePort, }; TcpServer (EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg,Option option = kNoReusePort); ~TcpServer (); void setThreadInitCallback (const ThreadInitCallback &cb) { threadInitCallback_ = cb; } void setConnectionCallback (const ConnectionCallback &cb) { connectionCallback_ = cb; } void setMessageCallback (const MessageCallback &cb) { messageCallback_ = cb; } void setWriteCompleteCallback (const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; } void setThreadNum (int numThreads) ; void start () ; private : void newConnection (int sockfd,const InetAddress &peerAddr) ; void removeConnection (const TcpConnectionPtr&conn) ; void removeConnectionInLoop (const TcpConnectionPtr &conn) ; using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>; EventLoop *loop_; const std::string ipPort_; const std::string name_; std::unique_ptr<Acceptor> acceptor_; std::shared_ptr<EventLoopThreadPool> threadPool_; ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_; ThreadInitCallback threadInitCallback_; std::atomic_int started_; int nextConnId_; ConnectionMap connections_; };
TcpServer.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 #include "TcpServer.h" #include "Logger.h" #include "TcpConnection.h" #include <functional> #include <strings.h> EventLoop *CheckLoopNotNull (EventLoop *loop) { if (loop == nullptr ) { LOG_FATAL ("%s:%s:%d mainLoop is null! \n" , __FILE__, __FUNCTION__, __LINE__); } return loop; } TcpServer::TcpServer (EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg, Option option) : loop_ (CheckLoopNotNull (loop)) , ipPort_ (listenAddr.toIpPort ()) , name_ (nameArg) , acceptor_ (new Acceptor (loop, listenAddr, option == kReusePort)) , threadPool_ (new EventLoopThreadPool (loop, name_)) , connectionCallback_ () , messageCallback_ () , nextConnId_ (1 ) , started_ (0 ) { acceptor_->setNewConnectionCallback (std::bind (&TcpServer::newConnection, this , std::placeholders::_1, std::placeholders::_2)); } TcpServer::~TcpServer () { for (auto &item:connections_) { TcpConnectionPtr conn (item.second) ; item.second.reset (); conn->getLoop ()->runInLoop ( std::bind (&TcpConnection::connectedDestroyed,conn) ); } } void TcpServer::setThreadNum (int numThreads) { threadPool_->setThreadNum (numThreads); } void TcpServer::start () { if (started_++==0 ) { threadPool_->start (threadInitCallback_); loop_->runInLoop (std::bind (&Acceptor::listen,acceptor_.get ())); } } void TcpServer::newConnection (int sockfd,const InetAddress &peerAddr) { EventLoop *ioLoop=threadPool_->getNextLoop (); char buf[64 ]={0 }; snprintf (buf,sizeof (buf),"-%s#%d" ,ipPort_.c_str (),nextConnId_++); std::string connName=name_+buf; LOG_INFO ("TcpServer::newConnection [%s] - new connection [%s] from %s \n" ,name_.c_str (),connName.c_str (),peerAddr.toIpPort ().c_str ()); sockaddr_in local; ::bzero (&local,sizeof (local)); socklen_t addrlen=sizeof (local); if (::getsockname (sockfd,(sockaddr*)&local,&addrlen)<0 ) { LOG_ERROR ("sockets::getLocalAddr" ); } InetAddress localAddr (local) ; TcpConnectionPtr conn (new TcpConnection( ioLoop, connName, sockfd, localAddr, peerAddr )) ; connections_[connName]=conn; conn->setConnectionCallback (connectionCallback_); conn->setMessageCallback (messageCallback_); conn->setWriteCompleteCallback (writeCompleteCallback_); conn->setCloseCallback ( std::bind (&TcpServer::removeConnection,this ,std::placeholders::_1) ); ioLoop->runInLoop ( std::bind (&TcpConnection::connectEstablished,conn) ); } void TcpServer::removeConnection (const TcpConnectionPtr&conn) { loop_->runInLoop ( std::bind (&TcpServer::removeConnectionInLoop,this ,conn) ); } void TcpServer::removeConnectionInLoop (const TcpConnectionPtr &conn) { LOG_INFO ("TcpServer::removeConnectionInLoop [%s] - connection [%s] \n" ,name_.c_str (),conn->name ().c_str ()); size_t n=connections_.erase (conn->name ()); EventLoop *ioLoop=conn->getLoop (); ioLoop->queueInLoop ( std::bind (&TcpConnection::connectedDestroyed,conn) ); }
各种类的主要内容 1.Channel类:
fd,events,revents,callbacks listenfd->acceptorChannel connfd->connectionChannel
2.Poller和EPollPoller类:
std::unordered_map<int, Channel *>channels_
3.EventLoop类:
int wakeupFd_;std::unique_ptr wakeupChannel_;
ChannelList activeChannels_;
std::unique_ptr poller_;
4.EventLoopThreadPool类:
getNextLoop():通过轮询算法获取下一个subloop
一个thread对应一个loop=one loop per thread
5.Acceptor类:
主要封装了listenfd相关的操作 socket bind listen baseLoop
6.Buffer类:
缓冲区 应用写数据->缓冲区->Tcp发送缓冲区->send
7.TcpConnection类:
一个连接成功的客户端对应一个TcpConnection 封装了Channel和Sokcet和各种回调 发送和接收缓冲区
8.TcpServer类
Acceptor,EventLoopThreadPool,ConnectionMap connections_;