IO复用与并发服务器
在上一节中,我们探讨了TCP协议,并使用套接字实现了一个echo服务。这是个很初级的echo服务,有一个非常尴尬的问题,就是同一时间它只能处理一个连接的echo请求。 这主要是因为accept和read两个调用都是阻塞的,应用程序一旦进入,只要不满足返回条件就会一直等在那里。而客户端什么时候建立连接、什么时候发送数据并不是服务器所能控制的。 如果有人很坏,一直在尝试建立连接,即不发送数据,也不主动断开连接,那么这个echo服务器基本上就跪了。
在UNP一书中将这种服务器模型称为迭代服务器,这种服务器在当前客户被处理完之前,是不可能处理其它业务的。 而服务器同时处理多个连接的情况是十分普遍的,比如"双十一"这种动辄几千万上亿的并发量,使用这种模型肯定是要被骂成狗的。
本文我们先简单讨论一下UNP一书中提到的若干种服务器模型。 选择基于poll调用的IO复用方式的单线程reactor模式重构echo服务器。
1. 服务器模型初讨论
UNP一书在第27章中讨论了差不多10种,在20世纪90年代,常用的编写服务器的方法。它们大体上可以分为6类:
服务器模型 | 特点 | |
---|---|---|
1 | accept之后立即通过处理事务 | 同一时间只能处理一个连接,不具有并发性。 |
2 | accept之后fork一个进程来处理事务 | 具有一定的并发性,但是可以fork的进程数量受限于操作系统。而且随着连接和进程数量的增加,也会增加系统调度的负担。 |
3 | accept之后创建一个线程来处理事务 | 和2没有本质上的区别,只是通过线程来实现并行操作,各个连接和线程之间的通信相对比较灵活。 |
4 | 通过select, poll, epoll等方法实现IO复用 | 让进程阻塞在select,poll,epoll这样的方法上,监听多个文件描述符的特定事件。不用一直阻塞在accept,read上,可以在单个进程上并发的串行的处理所有连接的事务。 是目前大量使用的reactor模式大多是在这个模型上实现的。 |
5 | 预先fork出一个进程池 | 可以理解为2和4的组合版本。相比于2而言,进程的数量是固定的,不会因为连接的数量增加而给操作系统带来过多的负担。 相比于4可以通过多进程增加并行处理能力,在多核的机器上可以实现真正的并行。 |
6 | 预先创建出一个线程池 | 可以理解为3和4的组合版本。相比于3而言,线程的数量是固定的,不会因为连接的数量增加而给操作系统带来过多的负担。 相比于4可以通过多线程增加并行处理能力,在多核的机器上可以实现真正的并行。 |
对于高并发的应用而言,1,2,3就不要用了。4在实现难度和系统性能上可以是一个折中的选择,单线程不需要考虑并行程序的各种资源竞争的问题,它还有一定的并发能力。 如果想压榨多核计算机的并行能力时,可以考虑5或者6,如果各个连接之间存在一定的耦合关系的话,可能6是一种比较舒服的方案,因为在同一个进程内各个线程之间的通信要比进程间方便很多。
2. poll调用
我们先用模型4对echo服务器进行重构,选用poll作为IO复用方案。UNP一书中介绍了select和poll两种IO复用的方法, 它们两个本质上没有什么不同,只是select能够监听的文件描述符数量受到FD_SETSIZE的约束,而poll不受这个约束。所以我更倾向使用这种方案。后来又出现了epoll的方案, 这种方案下的IO复用更像是一种事件机制,减少了对监听的文件描述符的遍历过程,似乎更高效一点,我们在以后的改进过程中再来讨论它。
通过指令$ man poll
我们查看poll的相关文档,它的原型定义如下,有三个输入参数。其中fds是一个struct pollfd
类型的数组,
nfds描述了这个数组中一共有多少个元素,timeout是一个ms为单位的超时。调用poll之后,程序就会被阻塞,直到fds所监听的事件发生了或者超时了,才会返回。
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
如右侧的代码片段所示,pollfd有三个字段,其中fd是将要监听的文件描述符,如果该值小于0,poll将忽略它。events的每一位表示一个需要监听fd的事件, 我们可以通过位与“&”或者位或“|”来获取或者设置指定事件。revents与events一一对应,是poll负责的字段,返回之后,各位是'1'还是'0'表示相应的事件发生与否。
那么我们的服务器就变成了,在poll上阻塞。如果返回了,就依次遍历fds中的各个文件描述符,检查revents,判定关心的事件是否发生了并相应的做处理就好了。在调用poll之前, 应当用一个数组,把监听端口的套接字文件,和各个连接的套接字文件保存起来,修改events,监听POLLRDBAND事件。
3. 重构echo服务器
下面是重构之后的echo服务器的代码片段,为了节省篇幅,也为了方便编程,我们用IPv4和Socket对网络地址和socket接口进行了简单的封装,不再展开。 一开始,我们先定义了整型max_conn来限制服务器同时处理的连接数量。然后创建了套接字sock来监听65530端口。
int main() {
int max_conn = 3;
xiaotu::net::IPv4 serverIp(65530);
xiaotu::net::Socket sock(AF_INET, SOCK_STREAM, 0);
sock.SetReuseAddr(true);
sock.SetKeepAlive(true);
sock.BindOrDie(serverIp);
sock.ListenOrDie(max_conn);
接着,创建一个连接池connections用于记录建立连接的对方IP地址和端口,并为池中的每个对象创建一个缓存,由二维数组buf提供。
xiaotu::net::IPv4 connections[max_conn];
char buf[max_conn][1024];
然后,为了本文的主角,我们构建了一个struct pollfd
类型的数组,数组大小是max_conn+1,前max_conn个元素分别对应着连接池中的一个对象,
最后的那个元素用来处理监听端口的文件描述符。因为一开始还没有连接建立,所以将前max_conn的fd设置为-1。
struct pollfd pollFds[max_conn + 1];
pollFds[max_conn].fd = sock.GetFd();
pollFds[max_conn].events = POLLRDNORM;
for (int i = 0; i < max_conn; i++)
pollFds[i].fd = -1;
最后,我们改写业务循环。在while循环一开始,先调用poll监听各个文件描述符。我们用nready记录poll的返回值,它表示pollFds中有多少个元素的revents非零,如果出错将返回-1。
while (1) {
int nready = poll(pollFds, max_conn + 1, 100000);
std::cout << "nready = " << nready << std::endl;
如果poll返回了,意味着有人搞事情了。我们先不管是谁在搞事,先来看看监听套接字是否是发现有新的连接了。
if (POLLRDNORM & pollFds[max_conn].revents) {
如果有新的连接,就在pollFds中找一个空闲的元素用来记录新建连接的套接字文件描述符。
int idx = 0;
for (; idx < max_conn; idx++) {
if (pollFds[idx].fd < 0)
break;
}
std::cout << "idx = " << idx << std::endl;
然后,调用accept新建连接。如果超过了最大连接数量就直接关闭掉该连接,否则就更新pollFds将新连接添加到监听数组中。
int conn_fd = sock.Accept(&connections[idx]);
if (max_conn == idx) {
std::cout << "连接太多了" << std::endl;
close(conn_fd);
} else {
pollFds[idx].fd = conn_fd;
pollFds[idx].events = POLLRDNORM;
}
}
紧接着,我们在一个for循环中,遍历所有的连接文件描述符,如果有可读的事件发生了,就用read把内核中的数据搬到缓存中,并通过send在发送回去。如果read返回0表示客户端已经终止连接了, -1则是出错了,此时我们将close相应的文件描述符释放资源。
for (int i = 0; i < max_conn; i++) {
if (pollFds[i].fd < 0)
continue;
int conn_fd = pollFds[i].fd;
if (pollFds[i].revents & (POLLRDNORM | POLLERR)) {
int nread = read(conn_fd, buf[i], 1024);
if (nread <= 0) {
std::cout << "close conn_fd = " << conn_fd << std::endl;
close(conn_fd);
pollFds[i].fd = -1;
} else {
send(conn_fd, buf[i], nread, 0);
} } } } }
经过改写之后,我们的echo服务器就可以同时响应多个请求了。
4. 完
本文,我们简单讨论了UNP一书中提到的服务器模型。个人认为,单线程的IO复用使用很好的模型,它具有一定的并发能力, 同时不需要考虑过多的并行编程的问题,在性能与开发难度上是一个很好的折中。而且从这个模型出发,我们可以很容易改进出多线程版本的服务器,来压榨多核系统的并行能力,这点我们将在后续的文章中看到。