reactor模式的echo服务器
在上一节中,我们通过poll同时监听多个文件描述符,实现了在一个线程中并发的提供echo服务。
调用poll之前,需要有一个struct pollfd
类型的数组,记录了需要监听的文件描述符和读写事件。
数组中没有使用的元素需要将结构体struct pollfd
中的字段fd置为-1,这样在调用poll的时候操作系统就会忽略它。
上一节中的echo例程还不能算是一个reactor模式的服务器,因为我们还是在一个while循环中主动的查询文件描述符,然后读取数据并转发出去。使用reactor模式时, 我们需要转换一下思想,就是有一个监听器一直在帮我们关注着感兴趣的事件,当有事发生时通知我们,我们再处理它。也就是说我们需要一个对象来封装poll调用, 根据文件描述符和事件的不同调用对应的回调函数,同时需要提供注册回调函数的接口。
本文,我们将poll封装到一个PollLoop的类中,并为每一个文件描述符提供一个PollEventHandler,用于分发该文件描述符上发生的事件。PollLoop和PollEventHandler实现了我们的reactor模式, 针对TCP服务器的特性,我们又在PollEventHandler的基础上构建了Acceptor用于监听服务端口,Connection用于响应各个TCP连接,并基于此构建类型TcpServer。 最后,我们再次重构例程,实现reactor模式的echo服务器。
1. 事件监听器PollLoop
事件监听器PollLoop的使命就是帮我们调用poll,当poll返回后,就通过事件分发器PollEventHandler通知持有对应文件描述符的对象。 下面是定义在PollLoop.h中的类型PollLoop。
class PollLoop {
public:
void LoopOnce(int timeout);
private:
int Register(PollEventHandlerPtr const & handler);
void UnRegister(PollEventHandlerPtr const & handler);
friend void ApplyHandlerOnLoop(PollEventHandlerPtr const & h, PollLoopPtr const & loop);
friend void UnApplyHandlerOnLoop(PollEventHandlerPtr const & h, PollLoopPtr const & loop);
private:
std::vector<int> mIdleIdx;
std::vector<struct pollfd> mPollFdList;
std::vector<PollEventHandlerPtr> mHandlerList;
};
1.1 事件循环
它有三个私有的成员变量。其中mIdleIdx与事件分发器的注册和注销有关,我们将放到下一小节中介绍。
mPollFdList是一个关于struct pollfd
的容器,作为服务器,我们并不知道有多少客户端对我们的服务感兴趣,
所以像上一节那样,定义一个固定长度的一维数组并不是一个灵活性很高的选择,
选用vector我们就可以比较灵活的增加监听文件描述符了。
c++的std标准库中,vector容器是一种连续存储的线性表,
我们可以通过对首个元素取址来获取数组的首地址,也可以通过c++11标准中的接口data()获得。
mHandlerList则是一个与mPollFdList一一对应的容器,它保存了各个文件描述符的事件分发器,这里的PollEventHandlerPtr是一个智能指针std::shared_ptr。之所以选用智能指针, 是为了以后构建线程安全的数据类型做准备的,有了它我们就可以节省很多管理对象生命周期的精力。因为std::shared_ptr是一种强引用关系,只要还有一个指向某对象的shared_ptr指针存在, 该对象就一定不会被释放,只有最后一个shared_ptr释放了对象之后,才会销毁它。这样我们基本上不用担心野指针和重复释放的问题。
成员函数LoopOnce用于完成一次poll调用和事件分发,使用时需要把它放到一个循环体中。如下面的代码片段所示,它有一个超时的输入参数,单位是ms。 我们在第2行中调用poll函数,通过data()接口获取mPollFdList的首地址。程序在执行了这个语句后就会阻塞,有当监听的事件发生了,或者超时(timeout)了,才会返回。
void PollLoop::LoopOnce(int timeout) {
int nready = poll(mPollFdList.data(), mPollFdList.size(), timeout);
std::cout << "nready = " << nready << std::endl;
for (int i = 0; i < mPollFdList.size(); i++) {
if (mPollFdList[i].fd < 0) {
mHandlerList[i] = NULL;
continue;
}
mHandlerList[i]->HandleEvents(mPollFdList[i]);
}
}
当poll返回了,我们就遍历一下mPollFdList,调用事件分发器的HandleEvents接口来看看有哪些文件描述符在搞事。后面我们会看到该接口将把事件具体的分为读事件、写事件、错误事件等不同的类型, 并调用相应的回调函数来处理。如果某个对象的字段fd为负数,我们就跳过该对象。
1.2 事件分发器的注册与注销
对于TCP服务器而言,连接的建立和关闭是常有的事情。由于我们选用的是vector容器,如果每次关闭连接我们都从容器mPollFdList和mHandlerList中将响应的对象移除的话, 会产生很多额外的数据拷贝操作,比较影响效率。而且有新的连接建立时,还得再重新分配内存,想想都很麻烦。如果能够重复利用容器的成员,将节省很多资源。
容器mIdleIdx就是用来记录mPollFdList和mHandlerList中已注销对象的索引,这样当有新的事件分发器注册的时候,我们就可以直接先从注销记录中查找可用的资源,没有的话再扩容。 如下面的代码片段所示,我们在注册函数Register中,先检查容器mIdleIdx是否为空。空则mPollFdList和mHandlerList已满,直接扩容。否则找一个可用的资源填充之。
int PollLoop::Register(PollEventHandlerPtr const & handler) {
if (mIdleIdx.empty()) {
mPollFdList.push_back(handler->GetPollFd());
mHandlerList.push_back(handler);
return (mHandlerList.size() - 1);
} else {
int idx = mIdleIdx.back();
std::cout << "register idx:" << idx << std::endl;
mIdleIdx.pop_back();
mPollFdList[idx] = handler->GetPollFd();
mHandlerList[idx] = handler;
return idx;
}
}
如下代码所示,我们通过消息分发器的GetLoopIdx获取消息分发器在PollLoop中的索引,并重置相应的资源。最后将该索引放置到容器mIdleIdx中。
void PollLoop::UnRegister(PollEventHandlerPtr const & handler) {
int idx = handler->GetLoopIdx();
assert(idx >= 0);
assert(idx < mPollFdList.size() && mPollFdList.size() == mHandlerList.size());
assert(mPollFdList[idx].fd == handler->GetFd());
assert(mHandlerList[idx] == handler);
mPollFdList[idx].fd = -1;
mPollFdList[idx].events = 0;
mPollFdList[idx].revents = 0;
mHandlerList[idx].reset();
mIdleIdx.push_back(idx);
}
我们看到,PollEventHandler与PollLoop之间存在一个耦合关系——索引。PollEventHandler中有一个私有的成员变量mLoopIdx记录了它在mHandlerList中的位置, 正常情况下注册和注销事件分发器的之后,我们都应当相应的更新这个记录。这个记录很关键,我们并不想把它暴露出来。所以就又写了两个函数ApplyHandlerOnLoop和UnApplyHandlerOnLoop, 当作PollLoop和PollEventHandler的友元,如下所示,同时更新两个对象。
|
|
2. 事件分发器PollEventHandler
事件分发器PollEventHandler的任务就是分析它所持有的文件描述符的事件,调用对应的回调函数,同时提供注册回调函数的接口。 该类型定义在头文件EventHandler.h中,它有三个私有的成员变量:
int mLoopIdx;
记录了事件分发器在PollLoop中的索引,如果没有注册该值为-1。PollLoopPtr mLoop;
记录了注册的PollLoop对象,如果没有注册将指向空。struct pollfd mPollFd;
记录了持有的文件描述符,和需要监听的事件。
所有的消息分发实在成员函数HandleEvents中完成的。如下面的代码所示,我们先断言PollLoop对象传递的文件描述符与该对象所持有的是同一个,而且它们监听着相同的事件。 接着,我们把poll返回的revents更新到当前对象的mPollFd上。最后通过if语句检查是否发生了读事件POLLIN,若是,则检查是否有注册回调函数,并调用之。针对写事件POLLOUT、 错误事件我们都可以按照相同的套路在这里依次讨论,完成事件分发。由于echo服务器例程只用到了读事件,所以这里就偷了点懒。
void PollEventHandler::HandleEvents(struct pollfd const & pollFd) {
assert(mPollFd.fd == pollFd.fd);
assert(mPollFd.events == pollFd.events);
mPollFd.revents = pollFd.revents;
if (pollFd.revents & POLLIN) {
if (mReadCallBk)
mReadCallBk();
}
}
如下所示,我们通过typdedef定义了事件回调函数的原型,并在函数SetReadCallBk中使用std::move更新mReadCallBk完成回调函数的注册。 在后面介绍Tcp服务器的时候,我们会看到,用户在调用接口SetReadCallBk注册读事件回调函数的时候,需要通过std::bind来完成。
typedef std::function<void()> EventCallBk;
EventCallBk mReadCallBk;
void SetReadCallBk(EventCallBk cb) { mReadCallBk = std::move(cb); }
3. TCP服务器TcpServer
对于一个TCP服务器而言,我们有两方面的需求。其一,需要创建一个套接字监听指定端口以被动建立新连接,我们将之封装成Acceptor类。其二,需要管理新连接,控制连接通道上数据的收发, 我们将之封装成Connection类。
3.1 Acceptor
Acceptor用私有成员mAccpSock来监听指定端口,并为之构建事件分发器对象来监听读事件。如下所示,是Acceptor的构造函数。第2到6行通过封装后的对象mAccpSock,完成socket-->bind-->listen监听端口。 我们在第8到11行中构建了事件分发器,并通过接口设置监听读事件,最后通过std::bind注册读事件的回调函数。
Acceptor::Acceptor(int port, int qsize) : mAccpSock(AF_INET, SOCK_STREAM, 0) {
IPv4 addr(port);
mAccpSock.SetReuseAddr(true);
mAccpSock.SetKeepAlive(true);
mAccpSock.BindOrDie(addr);
mAccpSock.ListenOrDie(qsize);
mEventHandler = PollEventHandlerPtr(new PollEventHandler(mAccpSock.GetFd()));
mEventHandler->EnableRead(true);
mEventHandler->EnableWrite(false);
mEventHandler->SetReadCallBk(std::bind(&Acceptor::OnReadEvent, this));
}
std::bind可以把特定的对象和参数与指定的函数绑定在一起。OnReadEvent是Acceptor的成员函数,所以编译过程中会给它安排一个this指针作为一个输入参数。
而刚刚看到的读事件回调函数的原型是void()
没有输入参数。std::bind会构建一个可调用的对象,并把this指针与函数OnReadEvent绑定在一起。
这样一来,虽然PollEventHandler看到的是void()的回调函数,但是实际上该调用对象会在执行的时候,把绑定的this指针传参进来。
下面是读事件的回调函数OnReadEvent的实现。在该回调函数中,我们通过mAccpSock的Accept接口调用accept来被动建立新连接。如果Acceptor的持有者注册了新建连接的回调, 我们就把accept生成的IP地址和新连接的文件描述符作为参数调用回调函数。
void Acceptor::OnReadEvent() {
IPv4Ptr pear_addr = IPv4Ptr(new IPv4);
int fd = mAccpSock.Accept(pear_addr);
if (mNewConnCallBk) {
mNewConnCallBk(fd, pear_addr);
} else {
std::cout << "未注册new conn callback" << std::endl;
::close(fd);
}
}
下面是新建连接的回调函数定义和注册接口。它与PollEventHandler的读事件回调机制的套路类似,只是这测的回调有参数。后面我们会看到如何通过std::bind的占位符来注册这类带参数的回调。
typedef std::function<void(int, IPv4Ptr const &)> NewConnCallBk;
NewConnCallBk mNewConnCallBk;
void SetNewConnCallBk(NewConnCallBk cb) { mNewConnCallBk = std::move(cb); }
3.2 Connection
如下面的代码所示,Connection在它的构造函数中根据输入的文件描述符和连接地址,构建了mPeerAddr和mEventHandler对象。
Connection::Connection(int fd, IPv4Ptr const & peer) : mPeerAddr(peer) {
mEventHandler = PollEventHandlerPtr(new PollEventHandler(fd));
mEventHandler->EnableRead(true);
mEventHandler->EnableWrite(false);
mEventHandler->SetReadCallBk(std::bind(&Connection::OnReadEvent, this));
}
void Connection::OnReadEvent() {
int md = mEventHandler->GetFd();
int nread = read(md, mReadBuf, 1024);
if (nread <= 0) {
std::cout << "close fd = " << md << std::endl;
close(md);
if (mCloseCallBk)
mCloseCallBk();
} else {
if (mRecvRawCallBk) {
RawMsgPtr msg(new RawMsg(nread));
msg->assign(mReadBuf, mReadBuf + nread);
mRecvRawCallBk(msg);
}
}
}
Connection用于管理连接通道上的数据收发。客户端什么时候发送数据不是服务器能够控制的,所以在构造函数中注册了一个读事件的回调函数来接收客户端的数据。 而什么时候发送数据,通常都是服务器自己决定的,目前先直接调用了send将数据发送到通道上,如下面的代码所示:
void Connection::SendRawData(char const * buf,
int num) {
int md = mEventHandler->GetFd();
send(md, buf, num, 0);
}
右侧的代码是回调OnReadEvent的实现。我们调用read来读数据,mReadBuf是一个1024字节的char型的数组,用作读缓存。如果read返回负数说明出错了,若为0,则说明连接关闭了。 此时我们将调用close关闭文件描述符,如果持有者注册了连接关闭的回调mCloseCallBk,Connection将调用该回调通知持有者。
正常情况下,read将返回实际读取的字节数量。若持有者注册了读数据的回调mRecvRawCallBk,Connection将构建一个RawMsg的对象msg,并把mReadBuf中的数据拷贝到msg中, 然后将msg作为参数传递给回调。
3.3 TcpServer
TcpServer对Acceptor和Connection进行了封装。下面是该类型的构造函数,它有三个输入参数。loop是一个事件监听器对象,port是将要监听的端口,max_conn是服务器支持的最大连接数量。
TcpServer::TcpServer(PollLoopPtr const & loop, int port, int max_conn)
: mLoop(loop), mMaxConn(max_conn), mAcceptor(new Acceptor(port, max_conn)) {
mAcceptor->SetNewConnCallBk(std::bind(&TcpServer::OnNewConnection, this, _1, _2));
ApplyHandlerOnLoop(mAcceptor->GetHandler(), mLoop);
}
TcpServer构造之初,就需要一个Acceptor对象来监听端口。我们在构造函数的初始化列表中完成了mAcceptor的构建,在函数体中先注册了新连接的回调函数,再将Acceptor的事件分发器注册到loop对象上了。
在刚刚分析Acceptor的新连接回调函数的原型时,我们看到它有两个输入参数。算上this指针,TcpServer的成员函数OnNewConnection一共有三个输入参数。 所以这里的std::bind在this之后又增加了两个占位符_1和_2。在Acceptor调用该回调的时候,会先用fd和IPv4对象填充这两个占位符,然后std::bind生成的可调用对象会把this与_1,_2一起传递给OnNewConnection。
一般我们写回调函数的时候,参数的顺序都是与原型的顺序一致的。std::bind的占位符允许我们打乱这种顺序,也就是说如果对掉下面的函数OnNewConnection的fd和peer_addr的顺序, 那么我们只需要对应的交换占位符_1,_2的顺序即可。但这样的写法对人类来说不是那么友好,不建议使用。
在新建连接的回调OnNewConnection中,我们首先检查当前的连接数量,如果达到了连接上限就主动关闭连接。否则就用fd和peer_addr参数构建Connection对象,并注册关闭连接和接收数据的回调。 然后通过ApplyHandlerOnLoop注册事件分发器,并把新连接放置到容器mConnList中。最后,如果用户注册了TcpServer的新连接回调,就调用之。
void TcpServer::OnNewConnection(int fd, IPv4Ptr const &peer_addr) {
if (mConnList.size() >= mMaxConn) {
std::cout << "连接太多了" << std::endl;
close(fd);
} else {
ConnectionPtr conn(new Connection(fd, peer_addr));
conn->SetCloseCallBk(std::bind(&TcpServer::OnCloseConnection, this, conn));
conn->SetRecvRawCallBk(std::bind(&TcpServer::OnNewRawMsg, this, conn, _1));
ApplyHandlerOnLoop(conn->GetHandler(), mLoop);
mConnList.push_back(conn);
if (mNewConnCallBk)
mNewConnCallBk(conn);
} }
我们知道Connection的关闭连接回调的原型是void(),没有任何输入参数。但是下面的回调OnCloseConnection中,我们却定义了一个参数。如果按照C语言中的函数指针的方式写回调的话, 这里肯定会有一个语法错误。但是有了std::bind的伟大存在,如上面的第7行所示,使得我们在注册回调的同时,就把将要传参的conn对象给绑定上了。如此一来,我们就可以在回调函数中了解到是哪个连接被关闭了。
在这个回调里,我们依次遍历容器mConnList找到被关闭的连接对象,将它从容器中移除。同时注销其事件分发器。
void TcpServer::OnCloseConnection(ConnectionPtr const & con) {
for (auto it = mConnList.begin(); it != mConnList.end(); it++) {
ConnectionPtr & ptr = *it;
if (ptr == con) {
UnApplyHandlerOnLoop(ptr->GetHandler(), mLoop);
mConnList.erase(it);
if (mCloseConnCallBk)
mCloseConnCallBk(con);
break;
} } }
最后,我们再来看一下TcpServer接收到消息时的回调函数OnNewRawMsg,它有两个输入参数,其中con是接收到消息的对象,在注册回调的时候通过std::bind绑定。msg是Connection对象调用回调的时候的实际传参。 函数体没什么好解释的,直接调用用户的回调(如果有的话)。
void TcpServer::OnNewRawMsg(ConnectionPtr const & con, RawMsgPtr const & msg) {
if (mNewRawMsgCallBk)
mNewRawMsgCallBk(con, msg);
}
4. 重构echo服务器
从PollLoop到TcpServer,经过层层封装之后,我们构建一个echo服务器的形式就比较简单了。如下面的main函数所示,我们只需要先构建PollLoop和TcpServer对象,再注册回调函数OnNewRawMsg, 在回调函数中把接收到的数据在发送出去就好了。
int main() {
PollLoopPtr loop(new xiaotu::net::PollLoop);
TcpServer tcp(loop, 65530, 3);
tcp.SetNewConnCallBk(std::bind(OnNewConnection, _1));
tcp.SetCloseConnCallBk(std::bind(OnCloseConnection, _1));
tcp.SetNewRawMsgCallBk(std::bind(OnNewRawMsg, _1, _2));
while (1) {
loop->LoopOnce(100000);
} }
下面是用户回调的实现,OnNewConnection和OnCloseConnection没什么实质操作,只是打印了一些日志。OnNewRawMsg只是将接收到的消息再原本的发送出去而已。
void OnNewConnection(ConnectionPtr const & conn) {
std::cout << "新建连接:" << conn->GetPeerAddr().GetIpPort() << std::endl;
}
void OnCloseConnection(ConnectionPtr const & conn) {
std::cout << "关闭连接:" << conn->GetPeerAddr().GetIpPort() << std::endl;
}
void OnNewRawMsg(ConnectionPtr const & conn, RawMsgPtr const & msg) {
conn->SendRawData(msg->data(), msg->size());
}
5. 完
本文我们介绍了五个类型,并在此基础上重构了echo服务器。本着够用的原则,有很多功能还没有实现,我们会在后续文章中逐渐扩展。
- PollLoop:作为事件监视器,提供LoopOnce接口来调用poll,监视文件描述符的各种事件。由于poll的调用是阻塞的,所以它将独立运行在一个线程中。 目前echo服务器在一个线程中就可以实现并发的服务,所以目前没考虑任何线程安全性的问题。
- PollEventHandler:事件分发器,由具体的Acceptor,Connection等对象持有。它被注册到一个PollLoop对象上,将其持有的文件描述符上发生的事件拆分为读事件、 写事件等各种事件,提供回调机制处理这些事件。同样由于echo服务器只用到了读事件,所以目前只分发了读事件。
- Acceptor:是TCP服务器的一个基本组件,用来监听指定的端口。它持有一个PollEventHandler,在读事件的回调中accept新连接。
- Connection:TCP服务器的另一个基本组件,用来管理各个连接的数据收发。同样持有一个PollEventHandler。
- TcpServer:目前由一个Acceptor和若干个Connection构成,echo服务器通过注册它的NewRawCallBk来接收数据,并在回调中原样发送回去。