响应HTTP请求的任务队列
在上文中,我们一起分析了 HTTP 请求报文的解析过程。 当成功解析了一帧请求报文之后,将调用函数 HandleRequest 根据请求报文的具体内容生成响应报文, 下面是源码tag:v0.0.7b中该函数的实现,大体上可以分为四个部分:检查报文首部、 构建 HttpResponse 对象,调用应用回调函数,发送响应。
void HttpServer::HandleRequest(ConnectionPtr const & con, HttpRequestPtr const & req) {
// req->PrintHeaders();
// 1. 检查请求报文头部是否有 Connection 字段
std::string con_key("Connection");
std::string con_header;
bool has_con = req->GetHeader(con_key, con_header);
if (has_con) ToLower(con_header);
// 2. 判定是否需要保持 TCP 连接,据此构建 HttpResponse 对象
bool close = !has_con || (con_header != "keep-alive");
HttpResponsePtr res(new HttpResponse(close));
// 3. 设置响应报文默认状态码为 503,然后调用回调接口 mRequestCB
res->SetStatusCode(HttpResponse::e503_ServiceUnavilable);
if (mRequestCB)
mRequestCB(req, res);
// 4. 发送响应报文,如果需要关闭连接则闭之
std::vector<uint8_t> buf;
res->ToUint8Vector(buf);
con->SendBytes(buf.data(), buf.size());
if (res->CloseConnection())
con->Close();
}
我们在其中的第4到7行中,检查请求报文头部是否有 Connection 字段,如果该字段值为 "keep-alive" 表示需要保持 TCP 连接。 "keep-alive" 是告知服务器客户端希望保持当前正在通信的 TCP 连接,这是提高网络响应速度的一种方式。 我们知道 HTTP 协议是建立在 TCP 之上的,如果每次 HTTP 请求都需要新建一个 TCP 连接的话,就意味着需要频繁的进行握手。通过 "keep-alive" 保持一个长连接,可以避免这些操作, 达到提高速度和带宽的效果。 如果请求报文中配置了这一选项,那么服务器返回的响应报文就应当在首部中告知客户端正文的长度。否则,浏览器将一直等待接收消息,直到断开连接。
在第9、10行中,我们根据刚才检查的 "keep-alive" 配置构建新的 HttpResponse 对象。然后在12到14行中,设置响应报文的默认状态码,并调用应用回调函数。 在例程当中,参见下面的示例代码,我们注册了回调函数 OnOkHttpReques,并在该函数中修改了响应报文的状态码为200,并在正文中填充了一个 h1 标签。 最后,第16到20行中,我们通过 TCP 连接通道发送了响应报文。并根据 HttpResponse 的 CloseConnection 检查是否需要关闭连接。
void OnOkHttpRequest(HttpRequestPtr const & req, HttpResponsePtr const & res) {
res->SetStatusCode(HttpResponse::e200_OK);
res->AppendContent("<h1>Hello</h1>");
}
int main() {
PollLoopPtr loop = CreatePollLoop();
HttpServer http(loop, 65530, 3);
http.SetRequestCallBk(std::bind(&OnOkHttpRequest, _1, _2));
loop->Loop(10);
return 0;
}
显然这个响应过程是比较粗糙的,存在很多改进空间,比如:
- 只检查了请求报文的 Connection 字段。实际请求报文的首部中还有很多描述,对于提高网络通信的可靠性和安全性上有很大帮助。 我们还可以根据具体的首部内容,了解到浏览器的一些特性。
- 这个例程中并没有区分请求类型,无论 GET 还是 POST,都返回一个 h1 的标签。
- 存在服务器没有响应的风险。因为目前的例程都是单线程的,通过 PollLoop 循环进行IO复用,在回调函数中完成各种事务。 这些事务最后都会归结到这里的应用回调函数中。如果该回调函数长时间不退出,服务器看起来就好像死了一样,最终影响服务器的并发性能。
- 发送报文时存在多次拷贝数据的操作。目前的例程中,我们先将整个响应报文拷贝到一个 vector 容器中,在交给 TCP 连接通道发送。
在本文中,我们重点关注问题3,增加响应 HTTP 请求的任务队列,并交由一个专门的线程完成具体的任务。
1. 分段处理请求
原本HttpServer接收到新的消息后,将在其 OnMessage 函数中完成请求报文的解析、响应报文的填充、响应报文的发送三个过程。其中响应报文的填充要调用应用程序注册的回调函数, 可能需要运行很长一段时间。我们可以将其放到一个独立的线程中运行,这样就不会影响 PollLoop 及时响应新的网络事件。
如下图所示,OnMessage 函数将被拆分成三段,放到两个线程中运行。为了方便,我们将响应报文的过程抽象成处理请求报文的任务,并定义一个任务队列来管理。 在一个线程中驱动 PollLoop循环,分发事件,解析Http请求报文,并根据报文的首部构建 Http 响应任务,放置到任务队列的对尾,后文中称该线程为 PollLoop 线程。 在第二个线程的超级循环中,消费任务队列,从队首中取出任务并执行,通知 PollLoop 线程,后文中称该线程为 TaskFifo 线程。最后由 PollLoop 线程负责发送响应报文。
下面左侧的代码片段是 HttpServer::OnMessage 中完成请求报文的解析后创建新任务的过程。首先新建了一个任务,并指定 HttpServer::HandleRequest 作为任务的处理函数。 由于两个线程都要用到 mTaskFifo,所以用信号量 mFifoMutex 对其加锁保护。放入任务队列 mTaskFifo 对尾之后,通过条件变量 mFifoCV 唤醒其它等待信号量 mFifoMutex 的线程。
|
|
上面右侧的函数是 TaskFifo 线程消费任务队列的超级循环。在该循环中,我们先通过信号量 mFifoMutex 和条件变量 mFifoCV 保护共有资源 mTaskFifo。 当任务队列非空,就取出队首的任务并完成它。下面的函数是修改之后的 HandleRequest,它本质上跟本文一开始提及的请求处理函数没有太大区别, 只是用第19行 mWakeUpper 唤醒 PollLoop 线程,替换原函数中第四部分直接发送消息的代码。
void HttpServer::HandleRequest(ConnectionPtr const & con, HttpSessionPtr const & session) {
HttpRequestPtr req = session->GetRequest();
std::string con_key("Connection");
std::string con_header;
bool has_con = req->GetHeader(con_key, con_header);
if (has_con) ToLower(con_header);
HttpResponsePtr res = session->GetResponse();
bool close = !has_con || (con_header != "keep-alive");
res->SetClosing(close);
res->SetStatusCode(HttpResponse::e503_ServiceUnavilable);
if (mRequestCB) mRequestCB(req, res);
session->mWakeUpper->WakeUp(4096);
}
mWakeUpper是为了唤醒 PollLoop 线程之后,能够成功回到处理请求报文的过程中而定义的一个对象。 它是对eventfd的一个封装。 在建立新 Http 通信的时候构建,并绑定唤醒回调函数 HttpServer::HandleResponse。这个函数目前就是把原HandleRequest的第四部分搬到这里来了。
void HttpServer::HandleReponse(ConnectionPtr const & con, HttpSessionPtr const & session) {
HttpResponsePtr res = session->GetResponse();
std::vector<uint8_t> buf;
res->ToUint8Vector(buf);
con->SendBytes(buf.data(), buf.size());
if (res->CloseConnection())
con->Close();
session->Reset();
}
std::mutex mFifoMutex;
std::condition_variable mFifoCV;
std::thread mTaskThread;
std::deque<HttpTaskPtr> mTaskFifo;
2. 任务队列
如右侧的代码片段所示,我们给 HttpServer 增加了四个成员变量。其中 mTaskFifo 就是用来记录待处理的任务队列,它是一个std::deque的容器。 PollLoop 线程负责生产任务,并将之放入对尾;TaskFifo 线程负责消费任务,从队首取出。这个生产消费关系一直在修改 mTaskFifo 的成员。 为了解决这一资源竞争的问题,我们增加了信号量 mFifoMutex,条件变量 mFifoCV。它们的使用方法,我们在前文分段处理请求报文的时候就已经介绍了。 变量 mTaskThread 是一个线程对象,在 HttpServer 的构造函数中构建,并通过如下的形式注册了线程函数 FinishTasks。
mTaskThread = std::thread(std::bind(&HttpServer::FinishTasks, this));
下面是类型 HttpTask 的实现,它只有一个 void() 类型的函数接口,我们可以通过std::bind绑定任务处理函数。借助std::bind我们还可以在注册的时候绑定一些参数。 接口也十分简单,只需要通过 SetTaskFunc 注册处理函数,之后就可以通过Finish接口或者重载的()操作符来运行之。
class HttpTask {
public:
void operator () () { if (mFunction) mFunction(); }
void Finish() { if (mFunction) mFunction(); }
public:
typedef std::function< void ()> TaskFunc;
void SetTaskFunc(TaskFunc func) { mFunction = std::move(func); }
private:
TaskFunc mFunction;
};
typedef std::shared_ptr HttpTaskPtr;
3. 唤醒 PollLoop 线程
为了能够方便唤醒 PollLoop 线程找到 TCP 连接通道发送响应报文, 我们对eventfd进行了封装,下面左侧时该数据类型的定义。 它只有两个私有成员 mEventHandler 用于PollLoop监听唤醒事件,mFd则记录了eventfd的文件描述符。
右侧分别时类型 WakeUpper 的构造函数、注册回调函数的接口和可读事件的响应函数。在构造函数中,我们通过系统调用eventfd获得了一个文件描述符, 并据此实例化了 mEventHandler,注册通过 OnReadEvent 处理唤醒事件,并调用唤醒事件回调接口 mWakeUpCb。 应用程序通过调用接口 WakeUp 向mFd写入一个64位的数据触发mEventHandler的可读事件,达到唤醒 PollLoop 线程的效果。
|
|
4. 完
在本文中,为了防止因为处理请求报文的时间过长,导致服务器无法响应其它请求,影响服务器的并发性能。 我们把处理请求报文的任务拆分成了解析请求报文、填充响应报文、发送响应报文三个阶段,初步实现了一个任务队列专门用来填充响应报文。 由一个线程驱动 PollLoop 循环生成处理任务放入对尾,同时用另一个线程执行处理任务。
下面的几个函数是对本文一开始的例程中的 OnOkHttpRequest 的扩展,检查请求类型,针对 GET 请求,通过 HttpResponse 的 AppendContent 接口读取请求报文中URL指定的资源文件并填充到响应报文中。这样就可以通过浏览器加载网页了。
std::string gRootPath("/home/gyc/tmp");
void OnHeadRequest(HttpRequestPtr const & req, HttpResponsePtr const & res)
{
res->SetStatusCode(HttpResponse::e200_OK);
}
void OnGetRequest(HttpRequestPtr const & req, HttpResponsePtr const & res)
{
std::string urlpath = req->GetURLPath();
if ("/" == urlpath)
urlpath = "/index.html";
std::string path = gRootPath + urlpath;
std::cout << path << std::endl;
struct stat s;
if (-1 == stat(path.c_str(), &s)) {
res->SetStatusCode(HttpResponse::e404_NotFound);
res->AppendContent("Error:404");
return;
}
if (S_ISDIR(s.st_mode)) {
res->SetStatusCode(HttpResponse::e503_ServiceUnavilable);
return;
}
if (!S_ISREG(s.st_mode))
return;
res->SetStatusCode(HttpResponse::e200_OK);
res->AppendContent(path, 0, s.st_size);
}
void OnOkHttpRequest(HttpRequestPtr const & req, HttpResponsePtr const & res)
{
switch (req->GetMethod()) {
case HttpRequest::eHEAD: {
OnHeadRequest(req, res);
break;
}
case HttpRequest::eGET: {
OnGetRequest(req, res);
break;
}
default: {
res->SetStatusCode(HttpResponse::e503_ServiceUnavilable);
break;
}
}
}