首页 关于
树枝想去撕裂天空 / 却只戳了几个微小的窟窿 / 它透出天外的光亮 / 人们把它叫做月亮和星星
目录

IO复用与并发服务器

在上一节中,我们探讨了TCP协议,并使用套接字实现了一个echo服务。这是个很初级的echo服务,有一个非常尴尬的问题,就是同一时间它只能处理一个连接的echo请求。 这主要是因为accept和read两个调用都是阻塞的,应用程序一旦进入,只要不满足返回条件就会一直等在那里。而客户端什么时候建立连接、什么时候发送数据并不是服务器所能控制的。 如果有人很坏,一直在尝试建立连接,即不发送数据,也不主动断开连接,那么这个echo服务器基本上就跪了。

UNP一书中将这种服务器模型称为迭代服务器,这种服务器在当前客户被处理完之前,是不可能处理其它业务的。 而服务器同时处理多个连接的情况是十分普遍的,比如"双十一"这种动辄几千万上亿的并发量,使用这种模型肯定是要被骂成狗的。

本文我们先简单讨论一下UNP一书中提到的若干种服务器模型。 选择基于poll调用的IO复用方式的单线程reactor模式重构echo服务器。

1. 服务器模型初讨论

UNP一书在第27章中讨论了差不多10种,在20世纪90年代,常用的编写服务器的方法。它们大体上可以分为6类:

服务器模型 特点
1 accept之后立即通过处理事务 同一时间只能处理一个连接,不具有并发性。
2 accept之后fork一个进程来处理事务 具有一定的并发性,但是可以fork的进程数量受限于操作系统。而且随着连接和进程数量的增加,也会增加系统调度的负担。
3 accept之后创建一个线程来处理事务 和2没有本质上的区别,只是通过线程来实现并行操作,各个连接和线程之间的通信相对比较灵活。
4 通过select, poll, epoll等方法实现IO复用 让进程阻塞在select,poll,epoll这样的方法上,监听多个文件描述符的特定事件。不用一直阻塞在accept,read上,可以在单个进程上并发的串行的处理所有连接的事务。 是目前大量使用的reactor模式大多是在这个模型上实现的。
5 预先fork出一个进程池 可以理解为2和4的组合版本。相比于2而言,进程的数量是固定的,不会因为连接的数量增加而给操作系统带来过多的负担。 相比于4可以通过多进程增加并行处理能力,在多核的机器上可以实现真正的并行。
6 预先创建出一个线程池 可以理解为3和4的组合版本。相比于3而言,线程的数量是固定的,不会因为连接的数量增加而给操作系统带来过多的负担。 相比于4可以通过多线程增加并行处理能力,在多核的机器上可以实现真正的并行。

对于高并发的应用而言,1,2,3就不要用了。4在实现难度和系统性能上可以是一个折中的选择,单线程不需要考虑并行程序的各种资源竞争的问题,它还有一定的并发能力。 如果想压榨多核计算机的并行能力时,可以考虑5或者6,如果各个连接之间存在一定的耦合关系的话,可能6是一种比较舒服的方案,因为在同一个进程内各个线程之间的通信要比进程间方便很多。

2. poll调用

我们先用模型4对echo服务器进行重构,选用poll作为IO复用方案。UNP一书中介绍了select和poll两种IO复用的方法, 它们两个本质上没有什么不同,只是select能够监听的文件描述符数量受到FD_SETSIZE的约束,而poll不受这个约束。所以我更倾向使用这种方案。后来又出现了epoll的方案, 这种方案下的IO复用更像是一种事件机制,减少了对监听的文件描述符的遍历过程,似乎更高效一点,我们在以后的改进过程中再来讨论它。

通过指令$ man poll我们查看poll的相关文档,它的原型定义如下,有三个输入参数。其中fds是一个struct pollfd类型的数组, nfds描述了这个数组中一共有多少个元素,timeout是一个ms为单位的超时。调用poll之后,程序就会被阻塞,直到fds所监听的事件发生了或者超时了,才会返回。

       struct pollfd {
           int   fd;         /* file descriptor */
           short events;     /* requested events */
           short revents;    /* returned events */
       };
       #include <poll.h>
       int poll(struct pollfd *fds, nfds_t nfds, int timeout);

如右侧的代码片段所示,pollfd有三个字段,其中fd是将要监听的文件描述符,如果该值小于0,poll将忽略它。events的每一位表示一个需要监听fd的事件, 我们可以通过位与“&”或者位或“|”来获取或者设置指定事件。revents与events一一对应,是poll负责的字段,返回之后,各位是'1'还是'0'表示相应的事件发生与否。

那么我们的服务器就变成了,在poll上阻塞。如果返回了,就依次遍历fds中的各个文件描述符,检查revents,判定关心的事件是否发生了并相应的做处理就好了。在调用poll之前, 应当用一个数组,把监听端口的套接字文件,和各个连接的套接字文件保存起来,修改events,监听POLLRDBAND事件。

3. 重构echo服务器

下面是重构之后的echo服务器的代码片段,为了节省篇幅,也为了方便编程,我们用IPv4和Socket对网络地址和socket接口进行了简单的封装,不再展开。 一开始,我们先定义了整型max_conn来限制服务器同时处理的连接数量。然后创建了套接字sock来监听65530端口。

        int main() {
            int max_conn = 3;
            xiaotu::net::IPv4 serverIp(65530);
            xiaotu::net::Socket sock(AF_INET, SOCK_STREAM, 0);

            sock.SetReuseAddr(true);
            sock.SetKeepAlive(true);
            sock.BindOrDie(serverIp);
            sock.ListenOrDie(max_conn);

接着,创建一个连接池connections用于记录建立连接的对方IP地址和端口,并为池中的每个对象创建一个缓存,由二维数组buf提供。

            xiaotu::net::IPv4 connections[max_conn];
            char buf[max_conn][1024];

然后,为了本文的主角,我们构建了一个struct pollfd类型的数组,数组大小是max_conn+1,前max_conn个元素分别对应着连接池中的一个对象, 最后的那个元素用来处理监听端口的文件描述符。因为一开始还没有连接建立,所以将前max_conn的fd设置为-1。

            struct pollfd pollFds[max_conn + 1];
            pollFds[max_conn].fd = sock.GetFd();
            pollFds[max_conn].events = POLLRDNORM;
            for (int i = 0; i < max_conn; i++)
                pollFds[i].fd = -1;

最后,我们改写业务循环。在while循环一开始,先调用poll监听各个文件描述符。我们用nready记录poll的返回值,它表示pollFds中有多少个元素的revents非零,如果出错将返回-1。

            while (1) {
                int nready = poll(pollFds, max_conn + 1, 100000);
                std::cout << "nready = " << nready << std::endl;

如果poll返回了,意味着有人搞事情了。我们先不管是谁在搞事,先来看看监听套接字是否是发现有新的连接了。

                if (POLLRDNORM & pollFds[max_conn].revents) {

如果有新的连接,就在pollFds中找一个空闲的元素用来记录新建连接的套接字文件描述符。

                    int idx = 0;
                    for (; idx < max_conn; idx++) {
                        if (pollFds[idx].fd < 0)
                            break;
                    }
                    std::cout << "idx = " << idx << std::endl;

然后,调用accept新建连接。如果超过了最大连接数量就直接关闭掉该连接,否则就更新pollFds将新连接添加到监听数组中。

                    int conn_fd = sock.Accept(&connections[idx]);
                    if (max_conn == idx) {
                        std::cout << "连接太多了" << std::endl;
                        close(conn_fd);
                    } else {
                        pollFds[idx].fd = conn_fd;
                        pollFds[idx].events = POLLRDNORM;
                    }
                }

紧接着,我们在一个for循环中,遍历所有的连接文件描述符,如果有可读的事件发生了,就用read把内核中的数据搬到缓存中,并通过send在发送回去。如果read返回0表示客户端已经终止连接了, -1则是出错了,此时我们将close相应的文件描述符释放资源。

                    for (int i = 0; i < max_conn; i++) {
                    if (pollFds[i].fd < 0)
                        continue;

                    int conn_fd = pollFds[i].fd;
                    if (pollFds[i].revents & (POLLRDNORM | POLLERR)) {
                        int nread = read(conn_fd, buf[i], 1024);
                        if (nread <= 0) {
                            std::cout << "close conn_fd = " << conn_fd << std::endl;
                            close(conn_fd);
                            pollFds[i].fd = -1;
                        } else {
                            send(conn_fd, buf[i], nread, 0);
        }   }   }   }   }

经过改写之后,我们的echo服务器就可以同时响应多个请求了。

4. 完

本文,我们简单讨论了UNP一书中提到的服务器模型。个人认为,单线程的IO复用使用很好的模型,它具有一定的并发能力, 同时不需要考虑过多的并行编程的问题,在性能与开发难度上是一个很好的折中。而且从这个模型出发,我们可以很容易改进出多线程版本的服务器,来压榨多核系统的并行能力,这点我们将在后续的文章中看到。




Copyright @ 高乙超. All Rights Reserved. 京ICP备16033081号-1