首页 关于
树枝想去撕裂天空 / 却只戳了几个微小的窟窿 / 它透出天外的光亮 / 人们把它叫做月亮和星星
目录

eventfd——唤醒PollLoop

现在我们的echo服务器已经改了三版了,它比较简单,只有一个线程,我们不需要过多的考虑各种资源竞争的问题。使用XiaoTuNetBox,不超过20行的代码就可以实现,看起来还比较整洁。 关键是使用了Reactor机制之后,我们的服务器具有一定的并发能力了,用起来还是比较方便的。这个例程看起来并不复杂,但它基本上覆盖了大部分常见的网络服务。 我们只需要简单修改接收到消息后的回调函数就可以实现各种网络服务,所以UNP一书会拿它来当例子介绍各种服务器编程的套路。

那么有什么服务是echo类服务器所不能覆盖的吗?

echo类服务器的业务逻辑基本上可以分为三步:接收客户请求 --> 计算 --> 返回结果。这类服务器发送数据是被动的,是由客户端发出的请求驱动的, 不妨称之为被动服务器。那有没有主动服务器呢?这类服务器,一直在进行着某种运算,当它达到某个状态时,主动地给某个连接发送数据。 主动的意义并不体现在服务器一直在做运算,而是是否发送数据。这类服务器存在吗?套用现在的echo服务器可行吗?

我们手机上常见的消息通知应该就是这类服务。我们获取新闻的方式有两种,其一是刷,其二是听。在刷的方式下,我们一直在向服务器发请求,服务器则被动的把我们可能感兴趣的消息发给我们。 这种方式对于服务器而言还是被动的。听的方式就是,我们跟服务器建立起联系后,命令它有事情要第一时间的告诉我们,然后就坐等。什么时候推送什么新闻,完全由服务器来决定。

看起来这类的服务就不能直接套用目前的echo服务器。因为我们需要在同一个线程中至少完成两个任务: 其一,监听accpet端口以建立新的连接。其二,向已建立的连接中写入数据。 如果服务器还混合了一部分的被动服务,那它还得监听各个连接的可读事件。虽然我们通过IO复用解决了同时监听多个事件的问题,但是poll调用会阻塞线程,那么我们再想完成一些计算, 判定是否需要发送数据就比较困难了。

当然,我们可以在调用poll的时候设置timeout为0,让它直接返回。但这种方法就要求我们每隔一段时间调用一次poll,来检查是不是有新的数据。检查的频率太低,有可能会错过一些事件, 检查的频率太高,又比较耗费系统资源,而且还需要计算任务尽快的完成,来保证这个检查频率。对于耗时的计算,就不是一种很好的选择。

这种推送消息的任务还是比较常见的,应该有比较成熟的解决方案。但我还没有仔细研究过,也不知道别人都是怎么实现的。但我想一个最直接的方法就是,用两个线程, 在一个线程中运行我们的PollLoop和TcpServer,监听各种事件。在另一个线程中完成计算,当有消息需要发送时,再通过TcpServer和PollLoop完成。 由于PollLoop将经常阻塞在poll调用上,我们就需要一种方法来唤醒它。

这种方法就是eventfd。本文,我们介绍eventfd,并再次修改PollLoop让它能够被其它线程唤醒,并以back服务器为例介绍它的使用方法。需要强调的是,这涉及到了多个线程之间的协作, 但是PollLoop还是单线程的设计,没有考虑过任何资源竞争的问题。所以back服务器只能是介绍eventfd怎么使用,不能拿来当真正的服务器用。

1. eventfd简介

通过指令$ man eventfd可以查看它的文档,如右图所示。我们可以通过调用eventfd创建一个文件描述符,用来在用户空间中等待或者通知一个事件(used as an event wait/notify mechanism by user-space applications, ...)。如果这个调用成功返回,将给我们提供一个新的文件描述符;若返回-1,则表示出错了。

这个调用的原型告诉我们,它有两个参数。其中initval是一个初始值。eventfd实际上维护了一个64位(8字节)的计数(counter),这个initval就是创建eventfd时为该计数赋予的初值。 第二个参数flags用来描述eventfd的一些特性,比如非阻塞、信号量等。

我们的出发点是要找一个能够在poll调用的框架下使用的唤醒机制,当计算线程发现系统进入了某个状态,需要发送数据时,能够唤醒PollLoop。现在eventfd给我们提供了一个文件描述符, 那么我们是不是可以在PollLoop中监听这个文件描述符的读事件,在计算线程中通过写操作来通知它该起来干活了呢?我们接着往下看文档, 会发现它有专门针对read(2), write(2), poll(2), select(2), close(2)等调用的说明。

如此看来,我们只需要在PollLoop中调用poll来监视POLLIN事件,来等待被人叫醒。在计算线程中调用write,就可以唤醒PollLoop了。 因为,我们在PollLoop的每个poll调用循环中都会遍历一下所有的文件描述符。所以我们不设置EFD_SEMAPHOR,当PollLoop发现它被eventfd唤醒了,直接调用read(2)清零计数。 一切看起来都很和谐,下面我们来修改PollLoop让它能够被唤醒。

2. 修改PollLoop

        class PollLoop {
        private:
            PollEventHandlerPtr mWakeUpHandler;
        }

reactor模式的echo服务器中,我们已经把监听某个文件描述符的时间封装到了事件分发器——PollEventHandler中, 同时定义了Acceptor和Connection持有一个PollEventHandler的对象,来监听端口和管理连接。仿照这个套路,我们让PollLoop持有一个PollEventHandler对象用于监听eventfd文件描述符的读事件。 修改头文件,给它加上这个成员mWakeUpHandler,如右侧代码片段所示。

修改它的构造函数,如下所示。在函数体中,我们调用eventfd获得一个文件描述符,并用之构造PollEventHandler。在调用eventfd时,我们设置初值为0,表示没有唤醒事件。同时还设置了EFD_CLOEXEC标识, 这个是从man文档中抄过来的参数,它的意思是调用exec的时候,会自动关闭之前的open的描述符。这个在多进程的情况下会有用,对于我们这种单进程多线程的程序没有实质的意义。

        PollLoop::PollLoop() : mTid(0), mLooping(false) {
            mWakeUpHandler = PollEventHandlerPtr(new PollEventHandler(eventfd(0, EFD_CLOEXEC)));
            mWakeUpHandler->EnableRead(true);
            mWakeUpHandler->SetReadCallBk(std::bind(&PollLoop::OnWakeUp, this));
        }

成功的创建了监听eventfd的事件分发器,完成mWakeUpHandler的构建之后,我们调用它的接口EnableRead完成监听读事件的设置。同时注册回调函数OnWakeUp来处理唤醒事件。 故事到这里并没有结束,因为在上一节封装PollLoop的时候,我们还需要把PollEventHandler注册到它的关注列表中。

        class PollLoop {
        friend PollLoopPtr CreatePollLoop();    
        private:
            PollLoop();
        }

由于我们使用std::shared_ptr来管理PollLoop和PollEventHandler,而在PollLoop的成员函数中又不能够直接通过this指针来指向对象本身的共享指针。 当然我们可以通过继承std::enable_shared_from_this并调用接口shared_from_this来获取这样的共享指针。但是我觉得这样代码有点丑,不想这么做。 所以我把它的构造函数改成私有的,以防止用户直接构建这个对象。同时又定义了一个友元函数CreatePollLoop,来完成实际的构造工作,和分发器的注册。

如下面的代码所示,在CreatePollLoop中,我们构建一个PollLoop对象并用共享指针来管理它。然后通过ApplyHandlerOnLoop将PollLoop的唤醒事件分发器注册到监听列表中。

        PollLoopPtr CreatePollLoop() {
            PollLoopPtr loop = PollLoopPtr(new PollLoop);
            ApplyHandlerOnLoop(loop->mWakeUpHandler, loop);
            return loop;
        }

如下面左侧的代码所示,在回调函数OnWakeUp中,我们通过mWakeUpHandler的接口GetFd获取eventfd的文件描述符,然后调用read读取计数并清零。

        void PollLoop::OnWakeUp() {
            uint64_t u;

            int md = mWakeUpHandler->GetFd();
            int nread = read(md, &u, sizeof(u));
        }
        void PollLoop::WakeUp(uint64_t u) {
            assert(0 != mTid);
            assert(mTid != ThreadTools::GetCurrentTid());
            int md = mWakeUpHandler->GetFd();
            int s = write(md, &u, sizeof(u));
        }

上面右侧的代码是用于唤醒PollLoop的接口。其中前两行的断言是在判定运行WakeUp接口的线程和PollLoop的循环不是同一个线程。我的本意是想PollLoop不是阻塞在poll调用上, 就是正在处理各个文件描述符的事件上。所以在PollLoop的循环中调用WakeUp是没有意义的。后两行中,我们通过write调用把输入参数u写到eventfd描述符中,以触发它的可读事件。

虽然我们给WakeUp接口的权限是public的,但我还是觉得直接调用PollLoop的这个接口有点奇怪。所以就又在分发器PollEventHandler中添加了一个接口WakeUpLoop用于持有分发器的对象主动的唤醒事件循环。

        void PollEventHandler::WakeUpLoop() {
            assert(mLoop);
            uint64_t idx = mLoopIdx;
            mLoop->WakeUp(idx);
        }

在这个接口中我们先获取分发器的注册索引,然后将该索引写到eventfd中。本意是想让PollLoop被唤醒的时候能够知道是哪个分发器干的。当时写这段代码的时候,没仔细看文档,对eventfd的机制还不是很熟悉。 现在看来这样做并不能达到我的意图。因为,write调用是在累加eventfd的计数,加入PollLoop通过read读取的计数是3,它有可能被索引是3的分发器唤醒的,也有可能是被索引为1的分发器叫了三次才醒。 所以PollLoop还是不能够知道到底是谁干的。

3. back服务

为了体现这个唤醒的操作,我们专门写了一个back的服务器,用来向最新的连接中每个1秒发送三个字符'dd\n'。 跟上一节的套路一样,我们在main函数中,先创建对象loop和tcp,然后注册回调函数。

        int main() {
            loop = CreatePollLoop();
            tcp = std::shared_ptr(new TcpServer(loop, 65530, 3));
            tcp->SetNewConnCallBk(std::bind(OnNewConnection, _1));
            tcp->SetCloseConnCallBk(std::bind(OnCloseConnection, _1));
            tcp->SetNewRawMsgCallBk(std::bind(OnNewRawMsg, _1, _2));

接着在一个线程中进行PollLoop。这个线程所运行的函数Loop只有一行就是loop->Loop(10000);

            std::thread thread1(Loop);

接下来的while循环才是今天的主角。在这个循环中,我们先创建一个RawMsg的对象,并把要发送的三个字符放进去。然后在一个由gmutex保护的邻接区中通过newconn把消息发布出去, 这个接口SendRawMsg将调用我们刚刚写好的WakeUpLoop来唤醒PollLoop。

                while (1) {
                xiaotu::net::RawMsgPtr msg = xiaotu::net::RawMsgPtr(new xiaotu::net::RawMsg);
                msg->push_back('d'); msg->push_back('d'); msg->push_back('\n');
                {
                    std::lock_guard<std::mutex> guard(gmutex);
                    if (newconn)
                        newconn->SendRawMsg(msg);
                }
                sleep(1);
            }

最后,等待线程退出。

            thread1.join();
            return 0;
        }

这里我们使用gmutex尝试保护的是连接对象newconn。因为,在新建和关闭连接的回调中,我们都会更新newconn,如下面的代码所示。而这两个回调都是在PollLoop线程中运行的,为了防止消息发送了一半时, 原来的连接对象被从PollLoop的关注列表中删除了,我们就对它加锁了。

        void OnNewConnection(ConnectionPtr const & conn) {
            std::lock_guard<std::mutex> guard(gmutex);
            newconn = conn;
        }
        void OnCloseConnection(ConnectionPtr const & conn) {
            std::lock_guard<std::mutex> guard(gmutex);
            if (conn == newconn)
                newconn.reset();
        }

正常情况下,编译应该没有问题,我们通过运行t_back_server_0.ext和t_back_client_0.exe就可以在客户端的终端里看到不断出现的"dd"和一个空行了。因为t_back_client_0的实现与我们的eventfd没什么关系, 就不再介绍了,参见源码

其实,一开始我是没打算写个锁在这里的,只是想要说明eventfd怎么用的。因为PollLoop的设计还没针对线程安全的考虑。但是想想,这个怎么说也是个多线程的程序呀,而且这个newconn看起来也是存在竞争的资源呀。 所以就加一个吧。我想线程安全应该是一个好用的网络工具箱必备的特性吧,所以会在后续的多篇文章中逐渐深入。这里权当一个餐前的开胃菜吧。

4. 完

本文在一开始的引言里面,详细的记录了我在看UNP一书时的一些思考。觉得有必要找到一种方法能够在其它线程中唤醒PollLoop, 让它主动的发送数据。针对这个问题,我们找到了eventfd这个方案,它能够完美的融合到poll的调用框架下。在解读了它的文档之后,我们对PollLoop做了必要的修改,并写了一个back服务器来验证这个方案。




Copyright @ 高乙超. All Rights Reserved. 京ICP备16033081号-1