输出缓存与分包发送
上一节我们研究非阻塞IO通信的时候就简单提到过, 非阻塞的发送实际上是把将要发送的数据丢给系统内核之后就直接返回了。如果内核的发送缓冲区不够了,会直接报错EWOULDBLOCK或者EAGAIN。 显然,内核的发送缓存区是有限度的。此外我们的通信载体以太网和IP报文也是有长度限制的,也就是常说的最大传输单元(MTU, Maximum Transmission Unit)。 如果需要传输的数据超过MTU限制,我们就需要分包发送。
实际上对于我们基于套接字的TCP编程来说,MTU很难进入到我们的代码视野中。MTU的存在并不是因为网络传输的物理特性上有什么不可克服的因素, 纯粹是为交互通信提供一个足够快的响应时间。这样不至于因为发送大数据量的报文使得用户以为网络发生了拥堵。本文讨论的重点不在协议族上的MTU,这里就不再展开讲了。
本文中,我们先研究一下查询和修改系统的发送缓存配置方法。同时针对缓存配置的临界条件,我们设计一些测试用例来研究发送缓存对于服务器应用程序的影响。 最后,将修改我们的网络工具箱,提供一个带有输出缓存的发送机制。
1. 系统写缓存查询与配置
$ sysctl -a | grep net.ipv4.tcp_wmem
net.ipv4.tcp_wmem = 4096 16384 4194304
$ sysctl -a | grep net.core.wmem_default
net.core.wmem_default = 212992
$ sysctl -a | grep net.core.wmem_max
net.core.wmem_max = 212992
经过搜索引擎的一顿查找,我发现可以简单通过如右侧第一行的指令那样获取套接字发送缓存大小。 经过grep过滤之后,我们得到了关于tcp_wmem的三个配置:最少字节数、默认字节数和最大字节数。 其中后两个数值会被net.core.wmem_default和net.core.wmem_max所覆盖,类似的我们也可以通过sysctl和grep查询这两个值,如右侧的第3行和第5行的代码所示。
根据输出结果,可以看到在我的机器上,输出缓存最少应该分配4096个字节,最多分配212992个字节。但是在没有特别申请的情况下就会按照最多的配置进行分配,即212922个字节。 对于我们之前一直举例的echo服务而言,这已经是一个挺大的配置了。后来在翻UNP的时候发现,其实套接字的接口中有两个函数getsockopt和setsockopt可以用来获取或者设置套接字的配置。 而且还有SO_RCVBUF, SO_SNDBUF, SO_RCVLOWAT, SO_SNDLOWAT四个选项分别用来控制接收/发送缓存和低潮限度。我们可以通过如下的C代码来查询一个套接字的输出缓存大小:
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
int main() {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == fd) {
perror("创建socket失败");
exit(1);
}
int send_buf_size = 0;
socklen_t optlen = sizeof(send_buf_size);
int err = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &send_buf_size, &optlen);
if (err < 0) {
perror("获取发送缓冲区大小失败\n");
exit(1);
}
printf("发送缓冲区大小:%d字节\n", send_buf_size);
close(fd);
return 0;
}
在我本机上编译运行得到实际分配的缓存大小只有16384个字节,如右侧的截图所示,这跟用搜索引擎找到的结果好像有些出入,按照搜索引擎的结果我们的输出缓存应当是212992个字节呀。
于是我又按照搜索引擎告诉我的方法通过'sysctl -w'指令来修改配置'net.ipv4.tcp_wmem',让系统默认给套接字分配16000个字节。再次运行上面的例程,确实在起作用。 个人认为,前文所说的“net.ipv4.tcp_wmem的配置会被net.core.wmem*的参数所覆盖”应该是说反了,应当是ipv4的配置会覆盖core的。
我又尝试通过下面的例程,调用setsockopt来设置发送缓存“SO_SNDBUF”。
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
int GetSockSendBufSize(int fd) {
int send_buf_size = 0;
socklen_t optlen = sizeof(send_buf_size);
int err = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &send_buf_size, &optlen);
if (err < 0) {
perror("获取发送缓冲区大小失败\n");
exit(1);
}
return send_buf_size;
}
void SetSockSendBufSize(int fd, int send_buf_size) {
socklen_t optlen = sizeof(send_buf_size);
int err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &send_buf_size, optlen);
if (err < 0) {
perror("设置发送缓冲区大小失败\n");
exit(1);
}
}
int main(int argc, char *argv[]) {
assert(argc == 2);
int buf_size = atoi(argv[1]);
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == fd) {
perror("创建socket失败");
exit(1);
}
int send_buf_size = GetSockSendBufSize(fd);
printf("发送缓冲区大小:%d字节\n", send_buf_size);
printf("尝试修改发送缓存大小:%d字节\n", buf_size);
SetSockSendBufSize(fd, buf_size);
send_buf_size = GetSockSendBufSize(fd);
printf("修改后发送缓冲区大小:%d字节\n", send_buf_size);
close(fd);
return 0;
}
运行该例程的时候,tcp_wmem的配置是'4096 16384 4194304'。当我尝试给4095、4096、4097等不同的字节时,得到的却是申请值的两倍,如右图所示。 在socket(7)的手册中找到了如下的一段话,是说通过setsockopt修改发送缓存大小的时候,内核会double一下。
SO_SNDBUF
Sets or gets the maximum socket send buffer in bytes. The kernel doubles this value (to allow space for bookkeeping overhead) when it is set using setsockopt(2), and this doubled value is returned by getsockopt(2).
关于net.ipv4.tcp_wmem, net.core.wmem_default, setsockopt的配置关系,在知乎上还有这么一段描述, 也不知是真是假。
如果指定了tcp_wmem,则net.core.wmem_default被tcp_wmem的覆盖。send Buffer在tcp_wmem的最小值和最大值之间自动调整。如果调用setsockopt()设置了socket选项SO_SNDBUF, 将关闭发送端缓冲的自动调节机制,tcp_wmem将被忽略,SO_SNDBUF的最大值由net.core.wmem_max限制。
2. 当内核发送缓存不够用时
我们在上一节的非阻塞echo服务器的基础上做些简单的修改,来研究非阻塞和阻塞的发送都有哪些特性。 这里是一个非阻塞的echo服务器例程,下面是省略了一些日志输出、变量定义后的核心代码片段。
int conn_fd = accept4(listen_fd, (struct sockaddr *)&cli_addr, &len, SOCK_NONBLOCK | SOCK_CLOEXEC);
int send_buf_size = xiaotu::net::GetSockSendBufSize(conn_fd);
int sbuf_size = 2 * send_buf_size;
char * as = (char *)malloc(sbuf_size);
memset(as, 'A', sbuf_size);
我们通过 accept4 接受一个连接请求,并为之创建一个非阻塞的文件描述符 conn_fd。然后通过刚才封装的 GetSockSendBufSize 获取系统为之分配的发送缓存大小, 并申请一个两倍大的空间将之全部写成字符 'A'。然后在一个 while 循环中不断地从 conn_fd 中尝试读取数据。每次成功读取到数据之后,就先把刚刚 malloc 的空间中的 'A' 都发送出去, 再 echo 接收到的数据。
while (1) {
int nread = read(conn_fd, buf, 1024);
if (-1 == nread) {
if (EAGAIN & eno || EWOULDBLOCK & eno)
continue;
else
break;
}
send(conn_fd, as, sbuf_size, 0);
send(conn_fd, buf, nread, 0);
}
为了方便测试,我们写了一个测试用例,把标准输入 stdin 中的数据通过ipv4的网络连接发送出去, 并把从ipv4网络连接中收到的数据都输出到标准输出 stdout 上,同时计数接收到的字节数量。下面这行代码是其运行方式,它尝试与本机的 65530 端口建立连接。
$ ./build/t_stdin_ipv4_talk.exe 127.0.0.1 65530
下面左图是非阻塞例程的运行结果。从输出的日志中可以看到,系统给 conn_fd 分配了 2626560 个字节,我们成功 malloc 了 5253120 个字节的空间。 接着就收到了一个字节,触发了发送过程。首先尝试把 5253120 个 'A'发送出去,但是由于数据太大,超出了系统的缓存大小,实际只发送了 2588672 个字节。 接着尝试把接收到的那个字节发送出去时就失败了。保存信息是说资源暂时不可用,这实际就是 EAGAIN 或者 EWOULDBLOCK 的错误信息, 表示系统当前没有足够的缓存了,请等一会儿再试。
下面右图是阻塞例程的运行结果。在该例程中,只是在调用 accept4 时没有指定 SOCK_NONBLOCK。 于是得到了一个阻塞的文件描述符 conn_fd,其它的代码没有任何差异。从输出的日志中可以看到阻塞方式成功的把 5253120 个字节发送了出去,只是耗时比较长。
图 1(a). 非阻塞echo 系统默认缓存配置 | 图 1(b). 阻塞echo 系统默认缓存配置 |
看到日志,我有几个方面的疑问,为什么系统给分配的发送缓冲区会那么大呢?我们不是已经通过 sysctl 看到了默认缓存只有 16384 吗?最大也才 4194304 的呀? 而到了实际发送的时候一次最多只能发送 2588672 个字节,这好像远比系统分配的缓冲区小呀?
如图2(a)所示,接着我又尝试在 accept4 成功接受了一个连接,并生成文件描述符 conn_fd 之后,通过封装的接口 xiaotu:net::SetSockSendBufSize 手动修改一下发送缓存大小为 4096。如下面的左图所示, 系统对配置 double 了一下,给分配了8192个字节。然后我们构建了16384个字符 'A'。结果很成功的一次就发送完了,好像这个缓存大小不存在一样。
图 2(a). 非阻塞echo 发送缓存大小4096 发送16384个'A' | 图 2(b). 非阻塞echo 发送缓存大小4096 发送5253120个'A' |
如图2(b)所示,在4096的发送缓存配置下,请求发送 5253120 个字节, 实际发送 32768 个字节,而且下次发送接收字节时也能成功发送。看样子还是有一个缓存大小在起作用, 但是由于实际发送的数据比较少,内核可以很快消费完或者部分消费缓存数据,所以再次请求发送数据时还是有可能成功的。
系统缓存大小确实有些费解,不太清楚系统在后面做了些什么工作。本想着在一些临界的条件下研究一下系统缓存的影响,现在看来有点困难。暂时先这样吧, 毕竟我们是来研究提供应用层的缓存的必要性的,我们还会回来看这个问题的。
3. 输出缓存设计
在非阻塞的通信模式下有必要提供一个应用层的缓存。因为在应用层,用户请求发送大量数据的情形是经常存在的。 刚才的实验中发送 5253120 个字节都会充满系统的缓存,这才5M多。而网络上传送的文件、图片、视频等资源轻轻松松就是几百兆。用户发送数据的时候肯定希望只请求一次, 至于网络库和系统是怎么工作的,他们是不会关心的。这就要求我们提供一个应用层的缓存,当系统缓存充满时,可以接管用户的数据,等到系统缓存中有空闲的空间时接着发送剩余的数据。
根据TCP协议的数据流式的设计,我们的缓存应当也支持这种形式,即先写入缓存中的数据应当先发送出去。这是一种典型的队列的使用方式。
我们把XiaoTuDataBox中的DataQueue拷贝了过来,
并增加了如下的一些接口,方便整块的拷贝数据。
同时将类Connection 中的成员 mWriteBuf 的数据类型修改为
DataQueue<char>
。
bool PushBack(T const *buf, int n); // 在队尾插入n个数据 size = size + n
bool PopFront(T *buf, int n); // 从対首取出n个数据 size = size - n
bool DropFront(int n); // 抛弃掉対首的n个数据 size = size -n
T const * GetBeginAddr() const; // 获取有效数据的起始地址
bool Folded() const; // 判定队列是否折叠
int FoldedHead() const; // 若折叠,获取头部的长度
int FoldedTail() const; // 获取尾部的长度
用户通过接口SendBytes请求发送数据,如下面的代码片段所示。我们在这里做了一点优化,当缓存队列为空的时候就直接发送数据。这避免一些数据拷贝,对于小数据量的通信而言应该是比较友好的。
void Connection::SendBytes(char const *buf, int num) {
int nsend = 0;
if (mEventHandler->GetLoopTid() == ThreadTools::GetCurrentTid() && mWriteBuf.Empty()) {
nsend = SendRawData(buf, num);
num -= nsend;
buf += nsend;
}
在第3行中判定调用该函数的线程ID是否与polloop循环的是一个,若是并且应用缓存仍然是空的,就直接通过SendRawData把数据发送出去。 同时记录发送出去的数据量。目前还没有认真讨论过多线程的事情,但是我们还是稍微注意一下,保证只在一个线程里通过套接字发送数据。
那么剩下的数据都是需要放到缓存队列里的,有polloop循环来调度发送。在第9行中,我们通过PushBack的接口把剩余数据填入队列中。 然后让事件分发器开始监听可写事件,如果在不同的线程中调用该函数,还需要唤醒polloop线程。
if (num > 0) {
mWriteBuf.PushBack(buf, num);
std::cout << "writebuf.size = " << mWriteBuf.Size() << std::endl;
mEventHandler->EnableWrite(true);
if (mEventHandler->GetLoopTid() != ThreadTools::GetCurrentTid())
mEventHandler->WakeUpLoop();
}
}
在可写事件的回调函数中,我们断言缓存队列一定非空。如果缓存队列为空的话,应当关闭事件分发器的可写事件的监听。所以有可写事件发生, 就意味着有数据需要发送。
void Connection::OnWriteEvent() {
assert(!mWriteBuf.Empty());
因为函数SendRawData所用的系统调用send要求一次发送的数据在内存上是连续的,所以我们在这里检查一下队列是否发生了折叠。 若折叠了则先发送头部数据,待下次可写事件发生时在发送尾部数据。
int n = mWriteBuf.Folded() ? mWriteBuf.FoldedHead() : mWriteBuf.Size();
int nsend = SendRawData(mWriteBuf.GetBeginAddr(), n);
std::cout << __FUNCTION__ << ":" << n << ":" << nsend<< std::endl;
局部变量nsend记录了实际发送的字节数量,然后我们通过接口DropFront从缓存队列中剔除已发送的数据。
mWriteBuf.DropFront(nsend);
若此时缓存队列为空,意味着所有的数据都已经发送完了,可以关闭可写事件的监听了。否则仍然需要监听之。
if (mWriteBuf.Empty())
mEventHandler->EnableWrite(false);
else
mEventHandler->EnableWrite(true);
}
以上讨论的这两个函数就是输出缓存的主要内容了, 我们提供了阻塞和非阻塞两个版本的 polloop echo 服务器。读者可以配合t_stdin_ipv4_talk.cpp来测试运行。
4. 完
本文中,我们初步探讨了套接字的系统发送缓存,我们也找到了修改系统缓存大小的方法。只是内核做了一些优化和自适应的操作,以至于我并不能够精准的控制缓存大小,所以目前还不能找到比较好的边界条件测试方法。 关于这点我们还会回来深入探讨的。最后,我们借助队列数据结构DataQueue实现了一个应用层的输出缓存机制。