首页 关于
树枝想去撕裂天空 / 却只戳了几个微小的窟窿 / 它透出天外的光亮 / 人们把它叫做月亮和星星
目录

响应HTTP请求的任务队列

上文中,我们一起分析了 HTTP 请求报文的解析过程。 当成功解析了一帧请求报文之后,将调用函数 HandleRequest 根据请求报文的具体内容生成响应报文, 下面是源码tag:v0.0.7b中该函数的实现,大体上可以分为四个部分:检查报文首部、 构建 HttpResponse 对象,调用应用回调函数,发送响应。

        void HttpServer::HandleRequest(ConnectionPtr const & con, HttpRequestPtr const & req) {
            // req->PrintHeaders();
            // 1. 检查请求报文头部是否有 Connection 字段
            std::string con_key("Connection");
            std::string con_header;
            bool has_con = req->GetHeader(con_key, con_header);
            if (has_con) ToLower(con_header);
            // 2. 判定是否需要保持 TCP 连接,据此构建 HttpResponse 对象
            bool close = !has_con || (con_header != "keep-alive");
            HttpResponsePtr res(new HttpResponse(close));
            // 3. 设置响应报文默认状态码为 503,然后调用回调接口 mRequestCB
            res->SetStatusCode(HttpResponse::e503_ServiceUnavilable);
            if (mRequestCB)
                mRequestCB(req, res);
            // 4. 发送响应报文,如果需要关闭连接则闭之
            std::vector<uint8_t> buf;
            res->ToUint8Vector(buf);
            con->SendBytes(buf.data(), buf.size());
            if (res->CloseConnection())
                con->Close();
        }

我们在其中的第4到7行中,检查请求报文头部是否有 Connection 字段,如果该字段值为 "keep-alive" 表示需要保持 TCP 连接。 "keep-alive" 是告知服务器客户端希望保持当前正在通信的 TCP 连接,这是提高网络响应速度的一种方式。 我们知道 HTTP 协议是建立在 TCP 之上的,如果每次 HTTP 请求都需要新建一个 TCP 连接的话,就意味着需要频繁的进行握手。通过 "keep-alive" 保持一个长连接,可以避免这些操作, 达到提高速度和带宽的效果。 如果请求报文中配置了这一选项,那么服务器返回的响应报文就应当在首部中告知客户端正文的长度。否则,浏览器将一直等待接收消息,直到断开连接。

在第9、10行中,我们根据刚才检查的 "keep-alive" 配置构建新的 HttpResponse 对象。然后在12到14行中,设置响应报文的默认状态码,并调用应用回调函数。 在例程当中,参见下面的示例代码,我们注册了回调函数 OnOkHttpReques,并在该函数中修改了响应报文的状态码为200,并在正文中填充了一个 h1 标签。 最后,第16到20行中,我们通过 TCP 连接通道发送了响应报文。并根据 HttpResponse 的 CloseConnection 检查是否需要关闭连接。

        void OnOkHttpRequest(HttpRequestPtr const & req, HttpResponsePtr const & res) {
            res->SetStatusCode(HttpResponse::e200_OK);
            res->AppendContent("<h1>Hello</h1>");
        }
        int main() {
            PollLoopPtr loop = CreatePollLoop();
            HttpServer http(loop, 65530, 3);
            http.SetRequestCallBk(std::bind(&OnOkHttpRequest, _1, _2));
            loop->Loop(10);
            return 0;
        }

显然这个响应过程是比较粗糙的,存在很多改进空间,比如:

  1. 只检查了请求报文的 Connection 字段。实际请求报文的首部中还有很多描述,对于提高网络通信的可靠性和安全性上有很大帮助。 我们还可以根据具体的首部内容,了解到浏览器的一些特性。
  2. 这个例程中并没有区分请求类型,无论 GET 还是 POST,都返回一个 h1 的标签。
  3. 存在服务器没有响应的风险。因为目前的例程都是单线程的,通过 PollLoop 循环进行IO复用,在回调函数中完成各种事务。 这些事务最后都会归结到这里的应用回调函数中。如果该回调函数长时间不退出,服务器看起来就好像死了一样,最终影响服务器的并发性能。
  4. 发送报文时存在多次拷贝数据的操作。目前的例程中,我们先将整个响应报文拷贝到一个 vector 容器中,在交给 TCP 连接通道发送。

在本文中,我们重点关注问题3,增加响应 HTTP 请求的任务队列,并交由一个专门的线程完成具体的任务。

1. 分段处理请求

原本HttpServer接收到新的消息后,将在其 OnMessage 函数中完成请求报文的解析、响应报文的填充、响应报文的发送三个过程。其中响应报文的填充要调用应用程序注册的回调函数, 可能需要运行很长一段时间。我们可以将其放到一个独立的线程中运行,这样就不会影响 PollLoop 及时响应新的网络事件。

如下图所示,OnMessage 函数将被拆分成三段,放到两个线程中运行。为了方便,我们将响应报文的过程抽象成处理请求报文的任务,并定义一个任务队列来管理。 在一个线程中驱动 PollLoop循环,分发事件,解析Http请求报文,并根据报文的首部构建 Http 响应任务,放置到任务队列的对尾,后文中称该线程为 PollLoop 线程。 在第二个线程的超级循环中,消费任务队列,从队首中取出任务并执行,通知 PollLoop 线程,后文中称该线程为 TaskFifo 线程。最后由 PollLoop 线程负责发送响应报文。

下面左侧的代码片段是 HttpServer::OnMessage 中完成请求报文的解析后创建新任务的过程。首先新建了一个任务,并指定 HttpServer::HandleRequest 作为任务的处理函数。 由于两个线程都要用到 mTaskFifo,所以用信号量 mFifoMutex 对其加锁保护。放入任务队列 mTaskFifo 对尾之后,通过条件变量 mFifoCV 唤醒其它等待信号量 mFifoMutex 的线程。

if (HttpSession::eResponsing == ptr->mState) {
    HttpTaskPtr task(new HttpTask);
    task->SetTaskFunc(std::bind(&HttpServer::HandleRequest, this, con, ptr));

    std::unique_lock<std::mutex> lock(mFifoMutex);
    mTaskFifo.push_back(task);
    mFifoCV.notify_all();
} else if (HttpSession::eError == ptr->mState) {
    con->SendString("HTTP/1.1 400 Bad Request\r\n\r\n");
    con->Close();
}
void HttpServer::FinishTasks() {
    while (true) {
        std::unique_lock lock(mFifoMutex);
        while (mTaskFifo.empty())
            mFifoCV.wait(lock);

        HttpTaskPtr task = mTaskFifo.front();
        mTaskFifo.pop_front();
        task->Finish();
    }
}

上面右侧的函数是 TaskFifo 线程消费任务队列的超级循环。在该循环中,我们先通过信号量 mFifoMutex 和条件变量 mFifoCV 保护共有资源 mTaskFifo。 当任务队列非空,就取出队首的任务并完成它。下面的函数是修改之后的 HandleRequest,它本质上跟本文一开始提及的请求处理函数没有太大区别, 只是用第19行 mWakeUpper 唤醒 PollLoop 线程,替换原函数中第四部分直接发送消息的代码。

        void HttpServer::HandleRequest(ConnectionPtr const & con, HttpSessionPtr const & session) {
            HttpRequestPtr req = session->GetRequest();
            std::string con_key("Connection");
            std::string con_header;
            bool has_con = req->GetHeader(con_key, con_header);

            if (has_con) ToLower(con_header);
            HttpResponsePtr res = session->GetResponse();
            bool close = !has_con || (con_header != "keep-alive");
            res->SetClosing(close);
            res->SetStatusCode(HttpResponse::e503_ServiceUnavilable);

            if (mRequestCB) mRequestCB(req, res);
            session->mWakeUpper->WakeUp(4096);
        }

mWakeUpper是为了唤醒 PollLoop 线程之后,能够成功回到处理请求报文的过程中而定义的一个对象。 它是对eventfd的一个封装。 在建立新 Http 通信的时候构建,并绑定唤醒回调函数 HttpServer::HandleResponse。这个函数目前就是把原HandleRequest的第四部分搬到这里来了。

        void HttpServer::HandleReponse(ConnectionPtr const & con, HttpSessionPtr const & session) {
            HttpResponsePtr res = session->GetResponse();

            std::vector<uint8_t> buf;
            res->ToUint8Vector(buf);
            con->SendBytes(buf.data(), buf.size());
            if (res->CloseConnection())
                con->Close();

            session->Reset();
        }
        std::mutex mFifoMutex;    
        std::condition_variable mFifoCV;
        std::thread mTaskThread;
        std::deque<HttpTaskPtr> mTaskFifo;    

2. 任务队列

如右侧的代码片段所示,我们给 HttpServer 增加了四个成员变量。其中 mTaskFifo 就是用来记录待处理的任务队列,它是一个std::deque的容器。 PollLoop 线程负责生产任务,并将之放入对尾;TaskFifo 线程负责消费任务,从队首取出。这个生产消费关系一直在修改 mTaskFifo 的成员。 为了解决这一资源竞争的问题,我们增加了信号量 mFifoMutex,条件变量 mFifoCV。它们的使用方法,我们在前文分段处理请求报文的时候就已经介绍了。 变量 mTaskThread 是一个线程对象,在 HttpServer 的构造函数中构建,并通过如下的形式注册了线程函数 FinishTasks。

mTaskThread = std::thread(std::bind(&HttpServer::FinishTasks, this));

下面是类型 HttpTask 的实现,它只有一个 void() 类型的函数接口,我们可以通过std::bind绑定任务处理函数。借助std::bind我们还可以在注册的时候绑定一些参数。 接口也十分简单,只需要通过 SetTaskFunc 注册处理函数,之后就可以通过Finish接口或者重载的()操作符来运行之。

        class HttpTask {
    	    public:
    	        void operator () () { if (mFunction) mFunction(); }
    	        void Finish() { if (mFunction) mFunction(); }
    	    public:
    	        typedef std::function< void ()> TaskFunc;
    	        void SetTaskFunc(TaskFunc func) { mFunction = std::move(func); }
    	    private:
    	        TaskFunc mFunction;
    	};
    	typedef std::shared_ptr HttpTaskPtr;

3. 唤醒 PollLoop 线程

为了能够方便唤醒 PollLoop 线程找到 TCP 连接通道发送响应报文, 我们对eventfd进行了封装,下面左侧时该数据类型的定义。 它只有两个私有成员 mEventHandler 用于PollLoop监听唤醒事件,mFd则记录了eventfd的文件描述符。

右侧分别时类型 WakeUpper 的构造函数、注册回调函数的接口和可读事件的响应函数。在构造函数中,我们通过系统调用eventfd获得了一个文件描述符, 并据此实例化了 mEventHandler,注册通过 OnReadEvent 处理唤醒事件,并调用唤醒事件回调接口 mWakeUpCb。 应用程序通过调用接口 WakeUp 向mFd写入一个64位的数据触发mEventHandler的可读事件,达到唤醒 PollLoop 线程的效果。

class WakeUpper {
public:
    WakeUpper();
    WakeUpper(WakeUpper const &) = delete;
    WakeUpper & operator = (WakeUpper const &) = delete;
    ~WakeUpper() { close(mFd); }

    PollEventHandlerPtr & GetHandler() { return mEventHandler; }
    void WakeUp(uint64_t u) { write(mFd, &u, sizeof(u)); }

    typedef std::function<void()> EventCallBk;
    void SetWakeUpCallBk(EventCallBk cb);
    EventCallBk mWakeUpCb;

    void OnReadEvent();
private:
    PollEventHandlerPtr mEventHandler;
    int mFd;
};
WakeUpper::WakeUpper()
{
    mFd = eventfd(0, EFD_CLOEXEC);
    mEventHandler = PollEventHandlerPtr(new PollEventHandler(mFd));
    mEventHandler->EnableRead(true);
    mEventHandler->SetReadCallBk(std::bind(&WakeUpper::OnReadEvent,
                                    this));
}
void SetWakeUpCallBk(EventCallBk cb)
{
	mWakeUpCb = std::move(cb);
}
void WakeUpper::OnReadEvent() {
    uint64_t u;
    int nread = read(mFd, &u, sizeof(u));
    if (mWakeUpCb)
        mWakeUpCb();
}

4. 完

在本文中,为了防止因为处理请求报文的时间过长,导致服务器无法响应其它请求,影响服务器的并发性能。 我们把处理请求报文的任务拆分成了解析请求报文、填充响应报文、发送响应报文三个阶段,初步实现了一个任务队列专门用来填充响应报文。 由一个线程驱动 PollLoop 循环生成处理任务放入对尾,同时用另一个线程执行处理任务。

下面的几个函数是对本文一开始的例程中的 OnOkHttpRequest 的扩展,检查请求类型,针对 GET 请求,通过 HttpResponse 的 AppendContent 接口读取请求报文中URL指定的资源文件并填充到响应报文中。这样就可以通过浏览器加载网页了。

        std::string gRootPath("/home/gyc/tmp");
		void OnHeadRequest(HttpRequestPtr const & req, HttpResponsePtr const & res)
		{
		    res->SetStatusCode(HttpResponse::e200_OK);
		}
		
		void OnGetRequest(HttpRequestPtr const & req, HttpResponsePtr const & res)
		{
		    std::string urlpath = req->GetURLPath();
		    if ("/" == urlpath)
		        urlpath = "/index.html";
		
		    std::string path = gRootPath + urlpath;
		    std::cout << path << std::endl;
		
		    struct stat s;
		    if (-1 == stat(path.c_str(), &s)) {
		        res->SetStatusCode(HttpResponse::e404_NotFound);
		        res->AppendContent("Error:404");
		        return;
		    }
		
		    if (S_ISDIR(s.st_mode)) {
		        res->SetStatusCode(HttpResponse::e503_ServiceUnavilable);
		        return;
		    }
		
		    if (!S_ISREG(s.st_mode))
		        return;
		    res->SetStatusCode(HttpResponse::e200_OK);
		    res->AppendContent(path, 0, s.st_size);
		}
		
		
		void OnOkHttpRequest(HttpRequestPtr const & req, HttpResponsePtr const & res)
		{
		    switch (req->GetMethod()) {
		        case HttpRequest::eHEAD: {
		            OnHeadRequest(req, res);
		            break;
		        }
		        case HttpRequest::eGET: {
		            OnGetRequest(req, res);
		            break;
		        }
		        default: {
		            res->SetStatusCode(HttpResponse::e503_ServiceUnavilable);
		            break;
		        }
		    }
		}




Copyright @ 高乙超. All Rights Reserved. 京ICP备16033081号-1