首页 关于
树枝想去撕裂天空 / 却只戳了几个微小的窟窿 / 它透出天外的光亮 / 人们把它叫做月亮和星星
目录

HTTP发送大文件

前文中,我们用一个队列、两个线程、三个阶段,实现了一个有一定并发能力的静态 HTTP 服务器。 把 Http 的一次服务过程拆分成了解析请求报文、填充响应报文、发送响应报文三个阶段,并分别有两个线程来完成。 PollLoop 线程负责解析请求报文和发送响应报文两个阶段。PollLoop 线程成功解析Http请求报文之后,根据报文的首部构建 Http 响应任务,放置到任务队列的对尾。 TaskFifo 线程负责消费任务队列,填充响应报文。

我们设计这么一个服务框架的目的,主要是为了解决服务器没有响应的问题。在此之前,我们只有一个线程,并在一个函数中完成了从处理请求报文到发送响应报文的所有过程。 如果这个过程长时间不退出,服务器就不能够响应其它网络事件。我们将 HTTP 服务拆成3个阶段,并专门增加了一个线程来填充响应报文。看似服务器在根据请求报文提供的服务的同时, 还可以处理其它网络事件。实际上这个拆分并不完善,如果 TaskFifo 线程碰到某个任务需要很长的运行时间,整个系统仍然不能及时的对其它 HTTP 请求做出响应。

除此之外,我们在发送大文件的时候,效率也很低下。如下面的代码片段所示,我们在函数 OnGetRequest 中处理 GET 请求。 通过 HttpResponse 的 AppendContent 接口读取请求报文中URL指定的资源文件并填充到响应报文中, 需要一次读取所有的文件内容。如果碰上几个G甚至更大的文件,就需要很大的内存,甚至需要较长的数据读取时间。而且后续的发送过程还需要多次拷贝,所以整个系统的运行效率此时会非常低下。

        void OnGetRequest(HttpRequestPtr const & req, HttpResponsePtr const & res)
        {
            // 省略检查 URL 的代码
            res->SetStatusCode(HttpResponse::e200_OK);
            res->AppendContent(path, 0, s.st_size);
        }
        // 读取资源文件
        void HttpResponse::AppendContent(std::string const & fname, uint64_t off, uint64_t len)
        {
            size_t n_ori = mContent.size();

            mContent.resize(n_ori + len);
            ReadBinary(fname, mContent.data() + n_ori, 0, len);
        }
        size_t ReadBinary(std::string const & path, uint8_t * buf, uint64_t off, uint64_t len)
        {
            FILE * fp = fopen(path.c_str(), "r");
            int re = fseek(fp, off, SEEK_SET);
            size_t aclen = fread((void*) buf, 1, len, fp);
            fclose(fp);
            return aclen;
        }

本文中,我们重点考虑解决发送大文件的效率问题,通过少量多次的加载文件予以解决。期间,我们也会对服务器没有响应的问题做一些粗浅的思考, 具体的解决方法将在下一篇文章中参考 nginx 的设计思想进行调整。

1. 分段读取文件

解决大文件发送的问题,最直接的方式就是分段读取文件,一次只读取部分文件内容并发送,多次重复这个过程直到所有的文件数据发送完毕。 这样我们的服务过程就不能简单的分为三个顺序执行的阶段了。在第二阶段读取了部分文件,在第三阶段发送出去之后,应当判定是否还有数据需要发送,若有应当回到第二阶段继续读数据发数据。

首先我们给 HttpResponse 增加一个 LoadContent 的接口,用于加载指定长度的文件内容,如下面左侧的代码所示。同时调整函数 HandleGetRequest 让它最多读取 mDefaultLoadSize 个字节的数据。这个运行参数 mDefaultLoadSize 目前是硬编码的 8192 个字节,以后会通过配置文件修改。HandleGetRequest 在退出之前会通过 session 的 WakeUp 接口唤醒 PollLoop 循环,标志着一次读取任务的结束。我们将原来的回调函数 HandleResponse 更名为 OnTaskFinished,在该函数中完成部分数据的发送,如果还有数据未发送将重新构建一个读取数据的任务,具体细节我们将在后文中展开。

        bool HttpServer::HandleGetRequest(
            HttpSessionWeakPtr const & weakptr)
        {
            HttpSessionPtr session = weakptr.lock();
            if (nullptr == session)
                return false;

            HttpRequestPtr req = session->GetRequest();
            HttpResponsePtr res = session->GetResponse();

            HttpServer::OnHeadRequest(req, res);
            if (HttpResponse::e404_NotFound == res->GetStatusCode()) {
                res->AppendContent("Error:404");
            } else {
                res->LoadContent(mDefaultLoadSize);
            }

            session->WakeUp();
            return true;
        }
        void HttpResponse::LoadContent(int len)
        {
            if (NULL == mFilePtr)
                mFilePtr = fopen(mFilePath.c_str(), "r");
            assert(mHeadEndIdx > 0 && NULL != mFilePtr);

            size_t need = mHeadEndIdx + len;
            if (mContent.size() < need)
                mContent.resize(need);

            uint8_t * begin = mContent.data() + mHeadEndIdx;
            size_t aclen = fread((void*)begin, 1, len, mFilePtr);
            mDataSize -= aclen;
            mLoadCount++;

            if (aclen < len) {
                fclose(mFilePtr);
                mFilePtr = NULL;
            }
        }

上面右侧的代码是类 HttpResponse 新增的接口 LoadContent,它有一个输入参数 len 指定了加载的字节数量。在调用该函数之前,我们还需要先后调用 HttpResponse 的成员函数 SetFile 指定目标文件,和成员函数 LockHead 锁定响应报文的首部数据。这些改动是为了减少一次数据拷贝,我们将在后文中详细介绍。

我们用一个 vector 容器 mContent 记录了所有的响应报文。它可以分为首部和正文两个部分,字段 mHeadEndIdx 记录了首部结束的字节索引。所以在第7行中,mHeadEndIdx + len 就可以得到将要发送的响应报文长度 need。在第11行中,通过mContent.data() + mHeadEndIdx获得正文的起始地址。之后通过标准库函数 fread 读取文件内容。字段 mDataSize 负责记录剩余的字节数,所以第13行需要减去实际读取的字节数量 aclen。

2. 从第三阶段回到第二阶段

        TaskPtr const & currTask = session->mCurrTask;
        if (currTask->success) {
            if (currTask->OnSuccess)
                currTask->Success();
            else
                HttpServer::OnTaskSuccessDefault(con, session);
        } else {
            if (currTask->OnFailure)
                currTask->Failure();
            else
                HttpServer::OnTaskFailureDefault(con, session);
        }

在 TaskFifo 线程中,每完成一个任务,都会通过HttpSession对象的WakeUp接口通知 PollLoop 线程任务运行结束,响应报文填充完毕。PollLoop 线程将调用回调函数 OnTaskFinished 完成第三阶段发送响应报文的具体工作。由于我们打算分段发送大文件,在该函数中还需要有机制能够判定是否还有剩余数据需要发送,是否需要回到第二阶段继续填充响应报文。

右侧的代码片段是函数 HttpServer::OnTaskFinished 最后的一段代码逻辑,它通过会话对象 session 获取当前正在处理的任务 currTask,然后根据其字段 success 判定任务是否正常执行结束,分别调用成功和失败的回调函数。针对分段读取大文件数据并发送的需求,我们给类型 Task 增加了字段 OnSuccess 用于记录成功执行后的回调函数,和一个接口函数 Success 来完成指定回调函数的调用。如果用户在构建 Task 的时候没有特别指定,我们就在20行中执行一个默认的成功回调。类似的针对任务执行失败的情况我们给类型 Task增加了字段 OnFailure 和接口函数 Failure。

        typedef std::function< bool ()> TaskFunc;
        class Task {
            public:
                Task(TaskFunc func)
                {
                    success = false;
                    mFunction = std::move(func);
                }

                void SetSuccessFunc(TaskFunc func) { OnSuccess = std::move(func); }
                void SetFailureFunc(TaskFunc func) { OnFailure = std::move(func); }

                void Finish() { success = mFunction(); }
                void Success() { OnSuccess(); }
                void Failure() { OnFailure(); }

                bool success;
                TaskFunc OnSuccess;
                TaskFunc OnFailure;
            private:
                TaskFunc mFunction;
        };

如下面的代码片段所示,我们在函数HttpServer::HandleRequest中响应GET请求,构建了一个任务 task 并通过其成员函数 SetSuccessFunc 指定了一个特殊的成功回调函数 HttpServer::OnTaskSuccessGet。在该回调函数中我们将判定剩余的字节数量,并根据需要回到第二阶段。

        case HttpRequest::eGET:
            task = std::make_shared<Task>(std::bind(&HttpServer::HandleGetRequest, HttpSessionWeakPtr(ptr)));
            task->SetSuccessFunc(std::bind(&HttpServer::OnTaskSuccessGet, ConnectionWeakPtr(con), worker));
            break;

下面是函数HttpServer::OnTaskSuccessGet的代码片段,我们在第4行中获取了响应报文首部结束位置的字节索引 endidx,第5行中获取加载文件的次数 loadCount, 在第7行中通过类HttpResponse的接口GetDataSize获取剩余数据的字节数量 nleft。

        bool HttpServer::OnTaskSuccessGet(ConnectionWeakPtr const & conptr, ThreadWorkerPtr const & worker) {
            // 省略部分构建局部变量的代码
            HttpResponsePtr res = session->GetResponse();
            int endidx = res->GetHeadEndIdx();
            int loadCount = res->GetLoadCount();
            std::vector<uint8_t> const & buf = res->GetContent();
            size_t nleft = (0 == loadCount) ? 0 : res->GetDataSize();

如果我们是第一次加载数据,意味着还没有发送过响应报文,此时需要连带报文首部一起发送出去,如下面的第9行所示。如果 loadCount 大于 1,意味着我们已经发送过报文首部了。根据 HTTP 协议,我们只需要在第11行发送正文部分的内容就可以了。

            if (loadCount <= 1)
                con->SendBytes(buf.data(), buf.size());
            else
                con->SendBytes(buf.data() + endidx, buf.size() - endidx);

接下来在第12行中,我们根据变量 nleft 判定是否还有数据需要发送。若有,我们将构建一个新的任务 task 放置到任务队列中,当 TaskFifo 线程处理该任务的时候就回到了第二阶段。在第13行构建任务的时候,我们指定了加载数据的回调函数 HttpServer::HandleGetLoadContent 继续读取数据并填充响应报文。在第14行中仍将HttpServer::OnTaskSuccessGet注册为任务成功的回调函数。

            if (nleft > 0) {
                TaskPtr task = std::make_shared(std::bind(&HttpServer::HandleGetLoadContent, HttpSessionWeakPtr(session)));
                task->SetSuccessFunc(std::bind(&HttpServer::OnTaskSuccessGet, ConnectionWeakPtr(con), worker));
                worker->AddTask(task);
            } else {
                if (res->CloseConnection())
                    con->Close();
                session->Reset();
            }
            return true;
        }

如果第12行的判定条件不满足,意味着我们已经读取并发送完文件的所有内容,此时正常重置会话就可以了。再次回到第二阶段运行的回调函数HttpServer::HandleGetLoadContent十分简单,如下所示,只需要调用接口函数 LoadContent 再次加载数据就可以了。

        bool HttpServer::HandleGetLoadContent(HttpSessionWeakPtr const & weakptr) {
            HttpSessionPtr session = weakptr.lock();
            if (nullptr == session)
                return false;
            HttpResponsePtr res = session->GetResponse();
            res->LoadContent(mDefaultLoadSize);
            session->WakeUp();
            return true;
        }

3. 减少一次数据拷贝

        std::vector<uint8_t> buf;
        res->ToUint8Vector(buf);
        con->SendBytes(buf.data(), buf.size());

如右侧的代码片段所示,在此之前我们发送响应报文的时候,需要临时构建一个字节容器,通过接口函数 ToUint8Vector 将具体的报文内容填充到该容器中,最后通过连接对象的接口函数 SendBytes 发送给客户端。

显然这样每次发送响应报文的时候,我们至少需要拷贝两次数据才能完成。当时这样设计的原因是,类HttpResonse 分别用一个std::map类型的容器记录着首部的键值对,一个std::vector类型记录数据。如果想要一次发送所有的响应报文就需要把它们写到一段连续的缓存中才行。这样的效率就比较低下,但是修改首部比较灵活,可以在不影响正文的情况下对首部进行增删改。

对于大文件而言,多次的数据拷贝将导致效率显著下降。如果我们在生成响应报文的时候,就把首部和正文放到一个连续的缓存中就可以避免一次数据拷贝。所以,我们在类HttpResponse中增加了字段 mHeadEndIdx 用于记录首部结束的字节索引。并增加了接口函数 LockHead 把首部的键值对序列化为一串字节写到 std::vector 容器 mContent 中同时更新 mHeadEndIdx。

        void HttpResponse::LockHead(size_t size)
        {
            mDataSize = size;
            mHeadEndIdx = 0;
            mContent.clear();

            AppendContent(StartLine());
            AppendContent("Content-Length: ");
            AppendContent(std::to_string(mDataSize));

            if (CloseConnection())
                AppendContent("\r\nConnection: close\r\n");
            else
                AppendContent("\r\nConnection: Keep-Alive\r\n");

            for (auto it = mHeaders.begin(); it != mHeaders.end(); ++it)
                AppendContent(it->first + ": " + it->second + "\r\n");
            AppendContent("\r\n");

            mHeadEndIdx = mContent.size();
        }

在填充报文正文之前必须先调用 LockHead。如果在填充报文正文期间需要修改首部内容,则需要调用函数 UnlockHead 重置 mHeadEndIdx,修改完成之后需要再次调用 LockHead,并且需要重新填充正文。

            void HttpResponse::UnlockHead() {
            mDataSize = 0;
            mHeadEndIdx = -1;
            mContent.clear();
        }

4. 完

本文中为了提高大文件的发送效率,我们引入了分段读取文件的机制。这只是一个临时的补丁,我们将在下一篇文章中参考 nginx 的设计思想重构 HTTP 服务器,将整个服务过程拆分成更多的阶段,并通过模块和流水线的方式实现。

最后我们讨论了一个减少数据拷贝的补丁,这个补丁以牺牲操作灵活度的方式换取了缓存的连续性。减少数据拷贝的根本还是要想办法处理输出缓存和发送数据的接口。之前我们曾尝试探讨缓存的问题,但后来发现当时的一些考虑和优化不是很合适,使用起来也不那么顺手。我们将在后续的文章中参考nginx等开源项目继续深入探讨内存和缓存管理方面的内容。




Copyright @ 高乙超. All Rights Reserved. 京ICP备16033081号-1