首页 关于
树枝想去撕裂天空 / 却只戳了几个微小的窟窿 / 它透出天外的光亮 / 人们把它叫做月亮和星星
目录

输入缓存与解码分发

上一节中,我们研究了输出缓存。 当时提到,受到报文长度,系统的缓存配置等影响,以至于经常需要分包发送数据。那么对于接收而言,我们就不能假设数据总是在一帧中就可以传送完毕。 而且socket的接口一般都会把通道上的数据看成一个数据流,也就是说它能保证接收到的字节顺序与发送时的一致。但接收端什么时候,接收多少数据是没有保证的。

为了满足应用程序对接收数据的解析需要,码农们通常都会通过接收缓存和消息分发器,来解决接收数据的时机和数量上不确定性的问题。本文中, 我们重点在于给XiaoTuNetBox提供一个输入缓存。关于消息的解码分发,本文将简单的看一下,在下一节中将详细介绍相关的框架。

        void Connection::OnReadEvent() {
            int md = mEventHandler->GetFd();
            int nread = read(md, mReadBuf, 1024);
            if (nread <= 0) {
                std::cout << "close fd = " << md << std::endl;
                close(md);
                if (mCloseCallBk)
                    mCloseCallBk();
            } else {
                if (mRecvRawCallBk) {
                    RawMsgPtr msg(new RawMsg(nread));
                    msg->assign(mReadBuf, mReadBuf + nread);
                    mRecvRawCallBk(msg);
                }
            }
        }

1. 目前接收数据的方法

在修改输入缓存之前,我们先来回顾一下目前XiaoTuNetBox是怎么接收数据的。我们知道,目前XiaoTuNetBox通过一个PollLoop来管理所有的连接的fd(文件描述符), 通过EventHandler分发各个fd的可读可写以及错误事件,由Connection的读事件回调OnReadEvent来读取fd所接收到的数据。

右侧的代码是回调OnReadEvent的实现。我们通过系统调用read来读数据,mReadBuf是类Connection中定义的数组char mReadBuf[1024];,用作读缓存。

如果read返回负数说明出错了,若为0,则说明连接关闭了。 此时我们将调用close关闭文件描述符,如果持有者注册了连接关闭的回调mCloseCallBk,Connection将调用该回调通知持有者。

正常情况下,read将返回实际读取的字节数量。若持有者注册了读数据的回调mRecvRawCallBk,Connection将构建一个RawMsg的对象msg,并把mReadBuf中的数据拷贝到msg中, 然后将msg作为参数传递给回调。

这里所谓的 RawMsg 实际上是通过 typedef 定义的 std::vector 的容器。

        typedef std::vector<char> RawMsg;
        typedef std::shared_ptr<RawMsg> RawMsgPtr;

刚写完这段代码的时候,我还挺自喜的,感觉写的挺好。现在回过头来看,还是存在挺多不合理的地方的。 首先,在第3行中,我们通过系统调用read把数据从内核搬到了用户空间,用Connect的成员变量mReadBuf存储。 这个缓存的大小是一个硬编码的值1024,这意味着我们每次最多搬用1024个字节的数据。 但是上一节对于系统缓存的粗浅研究发现, 系统的缓存远比这个大得多。出于效率的考虑,我们应该尽量减少系统调用的次数。这也就是说,我们应当尽可能多的从内核缓存中搬运数据。

其次,再来看第11行和12行,每次接收到了新的数据,我们都需要重新申请一段内存,再把缓存中的数据拷贝一遍。 这个过程基本没有什么效率可言,在数据量不大但是通信频次很高的情况下,就会频繁的申请小段内存,这对于操作系统而言并不友好, 随着系统的运行,它会产生大量的内存碎片,从而影响整个系统的效率。而且拷贝两边数据明显很多余。

其实这样做也不是一无是处,在多线程的情况下,处理起来比较方便,不需要考虑太多资源竞争的问题。因为,每次都新构建一个RawMsg的对象并用一个智能指针标记它。 这个对象一旦生成就只有被读取的功能,没有修改和写入的操作,所以不存在竞争的问题。这样在其它线程中,为保证线程安全需要对RawMsgPtr加锁后拷贝, 而对shared_ptr的拷贝操作是极快的,临界区很小。

2. 基于DataQueue的输入缓存

在处理输出缓存的时候,我们提到根据TCP协议的数据流式的设计,需要一个先进先出的队列来支持。 参考类Connection 中的成员 mWriteBuf,我们把成员 mReadBuf 的数据类型也修改为 DataQueue<char>。我们使用DataQueue这个数据结构,一方面是因为它的先入先出特性,更主要的是另一方面它可以根据新增数据量自适应的调整内存大小。

遗憾的是,自适应调整内存大小这个特点跟系统调用 read 的用法似乎有些矛盾。read 要求我们在调用它之前先申请一段缓存,并在调用的时候把缓存大小作为参数传递进去。 但是PollLoop只能告诉我们有新的数据可以读,并没有说有多少新数据。 那么也就无法判定需要预先申请多少内存。好在系统内核还为我们提供了 readv 的调用接口可以指定多段缓存,那么我们完全可以指定两个缓存,让 readv 先填充 mReadBuf, 剩余的数据都填充到一个固定大小的缓存中。然后,将写在这个固定大小缓存中的数据通过PushBack接口塞进 mReadBuf中,就可以触发数据结构 DataQueue 的自动增长机制,从而自适应的调整内存大小。

系统调用 readv 原型如下,有三个参数,fd为文件描述符;iov是一个结构体,它有两个成员 iov_base 和 iov_len 分别记录了缓存的其实地址和字节数;iovcnt 则描述了缓存块的数量。

        ssize_t readv(int fd, const struct iovec *iov, int iovcnt);
        struct iovec {
            void  *iov_base;    /* Starting address */
            size_t iov_len;     /* Number of bytes to transfer */
        };

基于上述思想,我们对 Connection 的成员函数 OnReadEvent 做如下的修改。首先声明了三个栈空间下的变量iovcnt, vec, extrabuf,分别表示缓存块数量,缓存描述符列表和固定大小的缓存。

        void Connection::OnReadEvent() {
            int iovcnt = 3;
            struct iovec vec[3];
            uint8_t extrabuf[1024];

接着给缓存描述符表赋予初值,准备调用 readv。这里之所以用了三个缓存块,是因为数据结构 DataQueue 是一个循环队列。 在队列非空的情况下队尾之后和队首之前的空间都是可写的,我们应当优先把新数据写到队尾之后。队列为空时,mReadBuf的所有空间都是可写的。 所以在第5行中我们根据队列是否非空分别调用了接口 GetStorBeginAddr 和 GetEndAddr 来获取缓存空间的首地址和队尾指针。并在第6行中通过 FreeTail 接口获取队尾之后的空间大小。

队尾之后的空间填满之后,DataQueue 不一定写满,因为循环队列的队首之前的那段空间也是可写的。所以还需要在第7和8行中获取队首之前的空间。 如果这两段空间都填满了,意味着 mReadBuf 的空间不够,需要扩充了。此时将尽可能多的填充 extrabuf,即第9,10行的工作。

            vec[0].iov_base = mReadBuf.Empty() ? mReadBuf.GetStorBeginAddr() : mReadBuf.GetEndAddr();
            vec[0].iov_len = mReadBuf.FreeTail();
            vec[1].iov_base = mReadBuf.GetStorBeginAddr();
            vec[1].iov_len = mReadBuf.FreeHead();
            vec[2].iov_base = extrabuf;
            vec[2].iov_len = 1024;

然后获取文件描述符,调用readv填充缓存就好了。系统调用 readv 将把读取到的字节数返回回来,这里用 n 来记录。

            int md = mEventHandler->GetFd();
            size_t n = readv(md, vec, iovcnt);

和read一样,如果返回0意味着连接关闭了,此时需要通过close释放文件描述符。如果是负数,则说明从内核空间中搬运数据发生了错误,这里偷了点懒没做异常处理。

            if (n <= 0) {
                std::cout << "close fd = " << md << std::endl;
                close(md);
                if (mCloseCallBk)
                    mCloseCallBk();
            } else {

系统调用readv只是根据缓存描述符表vec的指示填充数据而已,并不会维护数据结构 DataQueue 的指针。为此,我们专门增加了接口 AcceptBack 用于调整 DataQueue.mBegin 和 DataQueue.mEnd,把新填充的数据包进数据结构中。如果readv返回的字节数大于队列原本的空闲空间,意味着额外的缓存中写入了数据,需要将其PushBack进队列中。

                int ava = mReadBuf.Available();
                if (n > ava) {
                    mReadBuf.AcceptBack(ava);
                    mReadBuf.PushBack(extrabuf, n - ava);
                } else {
                    mReadBuf.AcceptBack(n);
                }

最后,为了和之前的例程兼容,我们仍然保留了下面这段代码,从缓存队列中把新接收到的数据再拷贝了一遍,来触发回调mRecvRawCallBk。这样看来并没有解决两次拷贝数据的问题。 针对这一问题,目前最直接的处理方式就是把缓存mReadBuf以参数的形式传递给回调函数。但是它的数据类型 DataQueue 不是线程安全的,在多线程的程序中往往会有比较大的资源竞争问题。

                if (mRecvRawCallBk) {
                    RawMsgPtr msg(new RawMsg(mReadBuf.Size()));
                    mReadBuf.PopFront(msg->data(), mReadBuf.Size());
                    mRecvRawCallBk(msg);
        }   }   }

那么接下来我们对 DataQueue 做一些简单的封装,让它是线程安全的。

3. 构建一个线程安全的Buffer

解决多线程情形下的资源竞争问题,用的最多的方法就是给公共资源加锁,进入临界区之后,再对公共资源进行读改写操作。下面我们先来定义一个带锁的数据类型 InputBuffer, 它有两个私有的成员变量,mReadBuf 和 mBufMutex,分别是 DataQueue 和 std::mutex 类型的数据。mReadBuf 用来缓存数据,而 mBufMutex 则是一个信号量,在对 mReadBuf 更新的之前需要先锁住该信号量。

        class InputBuffer {
            public:
                size_t Read(int md);
                inline int Size() { // 省略实现 }
                inline bool PeekFront(uint8_t *buf, int n, size_t offset = 0) { // 省略实现 }
                inline bool DropFront(int n) { // 省略实现 }
            private:
                DataQueue<uint8_t> mReadBuf;
                std::mutex mBufMutex;
        };

该数据类型还有四个公开的成员函数,Read() 用于读取文件描述符中的可读数据,它基本上是把上一节中的OnReadEvent接口拷贝过来简单修改而成的; Size() 用于获取 mReadBuf 中保存的数据长度;PeekFront() 和 DropFront() 是一对操作, 分别用于读取 mReadBuf 中的前 n 个数据,和抛弃其前 n 个数据。在这些函数中, 我们都通过std::lock_guard lock(mBufMutex);在一个局部的作用域中对信号量 mBufMutex 加锁。

        size_t InputBuffer::Read(int md) {
            std::lock_guard lock(mBufMutex);

            int iovcnt = 3;
            struct iovec vec[3];
            uint8_t extrabuf[1024];

            vec[0].iov_base = mReadBuf.Empty() ? mReadBuf.GetStorBeginAddr() : mReadBuf.GetEndAddr();
            vec[0].iov_len = mReadBuf.FreeTail();
            vec[1].iov_base = mReadBuf.GetStorBeginAddr();
            vec[1].iov_len = mReadBuf.FreeHead();
            vec[2].iov_base = extrabuf;
            vec[2].iov_len = 1024;

            size_t n = readv(md, vec, iovcnt);
            if (n > 0) {
                int ava = mReadBuf.Available();
                if (n > ava) {
                    mReadBuf.AcceptBack(ava);
                    mReadBuf.PushBack(extrabuf, n - ava);
                } else {
                    mReadBuf.AcceptBack(n);
                }
            }
            return n;
        }
        inline int Size() {
            int re = 0;
            {
                std::lock_guard lock(mBufMutex);
                re = mReadBuf.Size();
            }
            return re;
        }
        inline bool PeekFront(uint8_t *buf, int n, size_t offset = 0) {
            bool re = false;
            {
                std::lock_guard lock(mBufMutex);
                re = mReadBuf.PeekFront(buf, n, offset);
            }
            return re;
        }
        inline bool DropFront(int n) {
            bool re = false;
            {
                std::lock_guard lock(mBufMutex);
                re = mReadBuf.DropFront(n);
            }
            return re;
        }


这样一来,我们把 Connection 的输入缓存的数据类型从 DataQueue 转换到 InputBuffer 上就可以得到一个线程安全的输入缓存。现在我们把它作为参数传递给回调函数似乎不会有什么问题了。 后来我一分析发现,虽然我们成功的保证对缓存队列的操作是线程安全的,但是这个数据结构同一时间只能有一个消费者。这意味着,如果有两个线程都需要用同一个 Connection, 那么它们对 InputBuffer 的 PeekFront 和 DropFront 操作就会互相影响,这并不是一件好事。所以,我们还需要一个消息分发的机制来解决这一问题。

4. 通过观测器实现数据分发

在计算机领域有一句话:计算机的任何问题都可以通过增加一个虚拟层来解决。我们现在需要在 InputBuffer 之上在封装一个观测器 InBufObserver 来实现数据分发。 大体思想就是,给每一个需要使用 InputBuffer 的对象注册一个观测器,它有一个计数用来记录其消费的数据长度或者说是实际的缓存起始索引。这样间接的使用缓存数据,就不会相互影响了。

下面是类型 InBufObserver 的定义。它有三个私有的成员变量,mBuffer 是其要封装的缓存对象,mStartIdx 记录了有效缓存数据的起始索引,mIdx 记录了其在 mBuffer 的观察者列表中的索引位置。 为了防止对 InputBuffer 的误操作,我还特意用私有函数把 InBufObserver 的构造函数给隐藏起来,阻止用户擅自创建相关对象。它都将有 mBuffer 通过 CreateObserver 构建的。

        class InBufObserver {
            friend class InputBuffer;
            private:
                InBufObserver(InputBuffer & buffer, size_t idx) : mBuffer(buffer), mStartIdx(0), mIdx(idx) { }
            public:
                inline int Size() { return mBuffer.Size() - mStartIdx; }
                inline bool PeekFront(uint8_t * buf, int n) { return mBuffer.PeekFront(buf, n, mStartIdx); }
                inline bool PopFront(uint8_t *buf, int n) { // 省略实现 }
                inline bool DropFront(int n) { // 省略实现 }
            private:
                InputBuffer & mBuffer; 
                size_t mStartIdx;      //! 有效缓存数据的起始索引
                size_t mIdx;           //! 在 #mBuffer 的观察者列表中的索引位置
        };

如上面第6行中函数Size()的实现所示,实际消费者可用的数据就是缓存对象中存储的数据量减去已经消费的数据量。接口 PeekFront 的把已经消费的数据量作为 offset 传参给 InputBuffer 对象的 PeekFront。 下面是函数 PopFront 和 DropFront 的实现,它们主要还是在维护 mStartIdx,让其始终记录对有效缓存数据的起始索引。

inline bool PopFront(uint8_t *buf, int n)
{
    bool suc = mBuffer.PeekFront(buf, n, mStartIdx);
    if (suc) mStartIdx += n;
    return suc;
}
inline bool DropFront(int n)
{
    assert(n < Size());
    mStartIdx += n;
}

PopFront 和 DropFront 都不会释放掉已经消费的数据,它们只是在修改起始索引而已。实际释放数据的操作,由下面为 InputBuffer 增加的接口 DropHead来实现,在开始调用 InputBuffer 的 Read 接口之前, 先检查一遍观察者,把那些所有观察者都已经消费了的数据都释放掉。这存在一个内存泄露的风险,如果有一个恶意的 InBufObserver 它一直持有着一个 InputBuffer 但就是不消费其中的数据。 就会导致随着数据的越收越多,底层的循环队列 DataQueue 申请的内存就会越来越大,而且还从来没有释放的机会。

void InputBuffer::DropHead() {
    int min_idx = 0;

    for (int i = 0; i < mObservers.size(); ++i) {
        if (nullptr == mObservers[i])
            continue;

        if (min_idx > mObservers[i]->mStartIdx)
            min_idx = mObservers[i]->mStartIdx;
    }
    mReadBuf.DropFront(min_idx);
}
 InBufObserverPtr InputBuffer::CreateObserver() {
    int idx = 0;
    if (mObsHoles.empty()) {
        idx = mObservers.size();
        mObservers.push_back(nullptr);
    } else {
        idx = mObsHoles.back();
        mObsHoles.pop_back();
        mObservers[idx] = nullptr;
    }
    InBufObserverPtr ptr(new InBufObserver(*this, idx));
    mObservers[idx] = ptr;
    return ptr;
}

上面的右侧代码是 InputBuffer 的一个接口,用于构建 InBufObserver 对象。我们新增了容器 mObservers 记录所有的观察者,由于观察者可能先于 InputBuffer 释放掉。 为了方便管理,提高内存利用效率,维护 InBufObserver 的成员 mIdx 始终记录其在 InputBuffer.mObservers 中的索引,我们用一个容器 mObsHoles 记录历史释放掉的观察者索引。

目前的这个状态,还存在两个资源竞争的风险。首先 InputBuffer 的这两个容器,有可能在不同的进程中发生变动,所以理应加一把锁。其次,InBufObserver 的索引成员 mStartIdx, 也可能在接口 DropHead 中被竞争。关于这两点,我们将在代码中做出调整,这里就不在展开描述了。

5. 小结

本文中我们先分析了源码tag:v0.0.6b的输入缓存实现方法提到它有三个缺点:

  1. 硬编码的输入缓存大小,不能动态修改,面对流量较大的情形,需要多次系统调用才能把内核中的数据全部搬到用户空间中。
  2. 每次都需要临时申请一段内存来保存新接收的数据,内存申请过于频繁而且碎片化比较严重
  3. 数据传递给回调的过程实际上进行了两次数据拷贝,效率上不是很友好。

但是它也有一个优点,就是对于多进程的场景而言比较友好,资源竞争问题并不突出。

针对问题1,2我们用循环队列 DataQueue 替换了C语言原生的数组,并用 readv 替换了 read 系统调用来尽可能多的搬运内核数据。 针对问题3,我们引入了两层封装,分别提供了线程安全的队列,以及观测器用于数据分发。

本文用到了两个例程, 非阻塞echo_polloop.cpp是一个TCP服务器,用于提供 echo 服务。 t_stdin_ipv4_talk.cpp是一个客户端,用于向 echo 服务器发送数据。




Copyright @ 高乙超. All Rights Reserved. 京ICP备16033081号-1