网络编程2:Reactor网络模型
本章核心:
epoll
事件驱动编程,reactor模型,引入连接队列管理随并发量大量增加的连接。
简单介绍网络层级模型和应用层并实现一个简单的http server。
***********************************我是分割线***********************************
我们在上一章讨论了Linux系统的基础网络接口和select()/poll()两种高级I/O。不难发现,后两者还有一些可以优化的地方。例如,每次调用时操作系统都会把fd集合从用户态拷贝到内核态遍历,这种行为在大并发场景下也仍谈不上是效率最高的做法。有没有什么方法可以在利用内核的事件通知机制的同时,避免频繁的fd全集传参和遍历呢?有些读者或许会想到,避免全集传参和便利的一种优化方法为,将传递fd全集改为将关注的fd交给内核一直监听,内核也只返回有事件就绪的fd,而非将所有fd标记后让用户态程序再检查一遍。这种方法大大减少了每一轮循环中传递fd数量的规模,也同时避免了反复地遍历。epoll的原理便大致如此。Linux从2.5.44起提供了一个新的可扩展I/O事件通知机制epoll,让需要大量操作fd的程序得以发挥更优异的性能。Linux在该版本发布后逐渐成为服务器开发的绝对主流与epoll的出现关系密切。
2.1 epoll
与上一章介绍select()/poll()一样,还是从函数的行为开始说起。沿用邮差收信的比喻,有的邮筒存放了信件,有的邮筒什么都没有。如果说poll()的机制是邮差不知道每个邮筒的情况,需要一个一个检查的话,epoll()就是由小区物业负责信件把整个小区的信件收集到快递驿站,邮差直接在驿站统一拿走所有信件。下面是一个简单的示意图。
经过上述优化后,fd集合一直由内核态负责管理,因此用户态程序调用epoll()时fd集合也无需再从用户态拷贝到内核态;调用epoll()时操作系统也不再返回没有事件的I/O,用户态程序拿到的fd集合中的fd全部都是有事件的fd而无需遍历。有的读者可能会疑惑:诚然从数据结构的角度可以理解其性能提升的原因,然而在实际行为层面只是用户态程序无需再检查一遍fd集合而已,对效率提升帮助很大吗?epoll()面对大数量活跃I/O时,即使和select()相比实际效率也谈不上能有质的提升,其独特优势体现在面对**稀疏活跃,**也即大数量I/O连接但其中仅少数活跃的场景。这个问题会在后面实现epoll()的时候有详细解释。
下面是epoll版本的服务器实现:
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h> // for close()
#include <sys/epoll.h>
#define BUF_SIZE 1024
int main(){
int recvfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in recv_addr;
memset(&recv_addr, 0, sizeof(recv_addr));
recv_addr.sin_family = AF_INET;
recv_addr.sin_port = htons(2000);
recv_addr.sin_addr.s_addr = INADDR_ANY;
bind(recvfd, (struct sockaddr*)&recv_addr, sizeof(recv_addr));
listen(recvfd, 10);
struct sockaddr_in client_addr;
socklen_t clientlen = sizeof(client_addr);
int epfd = epoll_create(1);
struct epoll_event recv_ev;
recv_ev.events = EPOLLIN;
recv_ev.data.fd = recvfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, recvfd, &recv_ev);
while(1){
struct epoll_event events[BUF_SIZE] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
for (int i = 0; i < nready; ++i){
int connfd = events[i].data.fd;
if (connfd == recvfd){
int clientfd = accept(recvfd, (struct sockaddr*)&client_addr, &clientlen);
struct epoll_event client_ev;
client_ev.events = EPOLLIN;
client_ev.data.fd = clientfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &client_ev);
} else if (events[i].events & EPOLLIN){
char buffer[BUF_SIZE] = {0};
ssize_t n = recv(connfd, buffer, BUF_SIZE-1, 0);
if (n > 0){
buffer[n] = '\0';
printf("Received from client: %s\n", buffer);
const char *reply = "Message received\n";
send(connfd, reply, strlen(reply), 0);
} else if (n == 0) {
close(connfd);
epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
continue;
}
}
}
}
}
如前所述,epoll的机制比较类似于小区门口的快递驿站,也因此需要先调用epoll_create()申请一个epoll文件描述符epfd,起到类似小区驿站的作用。可以感性地认为,之后所有关注的fd都是通过被添加进epfd这个“快递驿站”而实现关注的。申请epfd需要指定关注fd的总数量,然而由于如今epoll的底层实现已经不再是老版本的数组而是链表,这个参数也变得不再重要了。保留该参数主要是为兼顾老版本的代码。
除epoll_create()以外,总共仅两个接口和一个结构体与epoll有关。首先是用于控制,也即管理epfd中fd的epoll_ctl(),需要4个参数:
第一个参数为epfd。
第二个参数op表示行为。EPOLL_CTL_ADD代表将fd添加进epfd中,EPOLL_CTL_DEL代表从epfd删除该fd,EPOLL_CTL_MOD代表修改(详见下)。
第三和第四个参数分别为被操作的fd和对应的事件指针。为表示不同的事件,select()维护三个不同的位数组,每次调用会传递三个位数组的指针给操作系统;poll()将fd和对应关注的事件封装进一个结构体中,每次调用传递该结构体的数组;epoll则无需封装,直接将fd和event成对地一个一个添加进入epfd中。
然后是epoll中对应select()/poll()的调用epoll_wait(),它需要四个参数:
第一个参数为epfd。
第二个参数为一个空事件数组。由于关注的fd已经交给epfd管理,从而无需再像select()或poll()一样传递fd集合,仅需在调用时传递一个空数组,供操作系统写入就绪的事件。之后用户态程序仅需遍历该列表,顺序处理就绪事件。
第三个参数为最大文件描述符,起类似maxfd的作用。
第四个参数为超时控制timeout,不再赘述。
剩余的代码逻辑和select()/poll()的版本基本一样就不赘述了。仅需注意,事件结构体epoll_event.data.fd中明明设置了fd,为什么调用epoll_ctl()还需要传递一个fd?一个简单的回答是当操作为EPOLL_CTL_DEL时无需指定事件,因此为了保证接口形式的一致性所以需要传递fd;然而更根本的原因是结构体epoll_event中的epoll_event.data是用户态程序自定义,也是供用户态程序在处理返回结果时使用的信息。在不同的场景下,epoll_event.data可以有很多种不同的用法,其中的epoll_event.data.fd甚至不一定需要与fd一致,只不过恰好在上面的例子里是一致的而已。
2.2 面向事件编程和连接队列
现在我们有了epoll这一强大工具。可当我们尝试基于epoll去扩展我们的代码,实现面向更多场景的网络业务时,很容易发现当前的编程范式导致了诸多限制。如果我们要扩展更多的事件,例如写事件,异常等等,该怎么做呢?基于目前的代码结构,我们得判断每个fd的值和返回了什么事件,并在if/else控制流中编写代码块。一旦功能开始扩展,if/else将会变得极其复杂,代码也将变得难以阅读。如果用流程图表示,conn下将会延伸出很多,且逻辑复杂的分支。
不妨试试从事件的角度思考。目前每个fd仅有EPOLLIN和EPOLLOUT两种事件需要被处理,但相同的事件由于fd的不同,处理方法也不同。例如,recvfd针对EPOLLIN调用accept(),而connfd针对EPOLLIN调用recv()。对此,我们可以在每个fd创建之初就为它分配处理对应事件的函数,并用一个数据结构管理起来。每当事件触发,我们只需要利用该数据结构,调用fd对应的函数即可,而无需再关心到底具体是哪个fd+哪种事件的组合。这种方式还隔离了事件具体的处理逻辑和服务器运行的主逻辑,降低了思维难度。
我们先试着考虑一下recvfd会如何处理事件。它被EPOLLIN触发时需要负责接收连接,创建connfd。这些功能可以封装在下面的函数中:
int accept_cb(int fd){
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
return 0;
}
recvfd没有EPOLLOUT事件,无需实现对应的回调函数。而EPOLLIN和EPOLLOUT都有可能触发connfd,所以两者对应的回调函数都需要实现:
int recv_cb(int fd){
char buffer[BUF_SIZE] = {0};
ssize_t n = recv(connfd, buffer, BUF_SIZE-1, 0);
if (count == 0){
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
recv();
return count;
}
int send_cb(int fd){
// prepare for the data to be sent
int count = send();
return count;
}
上面的三个函数各自对应了一种fd+一种事件的组合:accept_cb()——recvfd+EPOLLIN,recv_cb()——connfd+EPOLLIN,send_cb()——connfd+EPOLLOUT。这种对应关系仅仅只需要在创建fd的时候进行管理,将fd+event的组合与某一个callback匹配并封装在数据结构中即可。每当后面调用epoll(),拿到操作系统返回的事件列表时,我们只需要将事件列表中的每个fd取出,判断事件并调用对应的函数。
通过上图可以很清晰地看出新的编程范式的逻辑。我们不再需要像之前的范式那样在主循环中写非常复杂的if/else判断,而是只关心事件+调用回调函数。这种新的编程范式因其强调事件event和与事件匹配的函数,因而被称为event-driven programming事件驱动编程,而对应的事件处理函数被称做RCALLBACK回调函数。这种编程范式在如今的程序设计中被广泛使用,甚至在有些场景下必不可少。
说完了事件驱动编程,我们还需要关注如何管理新封装的函数。相关元素已被包含在上图。最左边的conn_list连接队列即为前面提到过用于管理fd+event组合的数据结构。定义如下:
#define CONNECTION_SIZE 1024
#define BUFFER_SIZE 1024
typedef int (*RCALLBACK)(int fd);
struct Connection{
int fd;
RCALLBACK in_callback;
RCALLBACK out_callback;
};
struct Connection conn[CONNECTION_SIZE] = {0};
它使用类型定义模拟了面向对象的写法,把函数定义在结构体内,从而将fd+事件统一封装。基于它,程序可以直接:
int main(){
// ......
conn[recvfd].fd = recvfd;
conn[recvfd].in_callback = accept_cb;
// ......
while (1){
// ......
for (){
// ......
if (recvfd & EPOLLIN){
conn[recvfd].in_callback(recvfd);
}
// ......
}
}
}
这也就是上面说的,仅需在创建connfd时指定其对应的回调函数。在后面处理事件时我们甚至无需关心它对应的回调函数是哪一个,直接调用即可。
如何扩展事件的问题解决了,现在到了第二个问题:如何为每个fd都分配一个生命周期超出while循环的缓冲区。可以发现这个问题随着连接队列的诞生已经迎刃而解了。我们只需要扩展conn_list,为每个fd加入读写缓冲区即可:
#define CONNECTION_SIZE 1024
#define BUFFER_SIZE 1024
typedef int (*RCALLBACK)(int fd);
struct Connection{
int fd;
char rbuffer[BUFFER_SIZE];
int rlength;
char wbuffer[BUFFER_SIZE];
int wlength;
RCALLBACK in_callback;
RCALLBACK out_callback;
};
struct Connection conn[CONNECTION_SIZE] = {0};
基于这一版新的连接队列,上面的三个回调函数也要相应更新。下面直接展示完整的代码,其中包含了更新后的三个回调函数,和对应之前编程范式里尚未封装的功能(例如启动recvfd开始监听,切换事件等等)所对应的函数。
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#define BUFFER_SIZE 1024
#define CONNECTION_SIZE 1024
int epfd;
typedef int (*RCALLBACK)(int fd);
struct Connection{
int fd;
char rbuffer[BUFFER_SIZE];
int rlength;
char wbuffer[BUFFER_SIZE];
int wlength;
RCALLBACK out_callback;
RCALLBACK in_callback;
};
struct Connection conn[CONNECTION_SIZE] = {0};
void set_event(int fd, int event, int flag){
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
if (flag){
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
} else {
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int recv_cb(int fd){
memset(conn[fd].rbuffer, 0, BUFFER_SIZE);
int count = recv(fd, conn[fd].rbuffer, BUFFER_SIZE, 0);
if (count == 0){
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
conn[fd].rlength = count;
printf("Received from client: %s\n", conn[fd].rbuffer);
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd){
const char *reply = "Message received\n";
conn[fd].wlength = strlen(reply);
strncpy(conn[fd].wbuffer, reply, conn[fd].wlength-1);
conn[fd].wlength = '\0';
int count = send(fd, conn[fd].wbuffer, conn[fd].wlength, 0);
set_event(fd, EPOLLIN, 0);
return count;
}
int accept_cb(int fd){
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
conn[clientfd].fd = clientfd;
conn[clientfd].out_callback = send_cb;
conn[clientfd].in_callback = recv_cb;
memset(conn[clientfd].rbuffer, 0, BUFFER_SIZE);
conn[clientfd].rlength = 0;
memset(conn[clientfd].wbuffer, 0, BUFFER_SIZE);
conn[clientfd].wlength = 0;
set_event(clientfd, EPOLLIN, 1);
return 0;
}
int init_server(unsigned short port){
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons (port);
bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr));
listen(sockfd, 10);
return sockfd;
}
int main(){
unsigned short port = 2000;
epfd = epoll_create(1);
int sockfd = init_server(port);
conn[sockfd].fd = sockfd;
conn[sockfd].in_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);
while (1){
struct epoll_event events[1024] = {0};
int n_ready = epoll_wait(epfd, events, 1024, -1);
for (int i = 0; i < n_ready; ++i){
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN){
conn[connfd].in_callback(connfd);
}
if (events[i].events & EPOLLOUT){
conn[connfd].out_callback(connfd);
}
}
}
}
在新的实现中,fd的行为从由控制流决定变为由事件决定:针对不同的事件,调用不同的回调函数。如果在未来需要扩展代码处理更多的事件,只需要定义新的cb函数,调整有关系的回调函数即可。主逻辑在这种实现下无需调整,从而大大降低了维护难度。这种基于事件驱动编程架构被称为Reactor。
至此,我们基本讲完了如何组织大并发场景下的socket编程。现如今的服务器底层实现基本都是事件驱动+连接队列+epoll/poll,各种功能也都是基于这个架构扩展功能得到的。如果希望尝试通过练习熟悉这个架构,可以把前面select和poll的代码都重新组织为一个Reactor。
2.3 网络分层模型和Http服务器
在继续工程问题的讨论之前,在这里有必要先介绍一点网络分层模型的内容。
还是沿用信件的比喻,但这次让我们来谈谈信件的内容。自然语言作为交流的统一标准存在于人类的交流之间。如果用中文给一个美国人写信,那么对方肯定看不懂,除非对方懂中文。同样的,客户端向服务器请求的格式和服务器针对该请求响应的格式需要事先约定好,使得双方都可以成功且高效地解析。这种计算机相互通信所遵循的规范即为协议。如下图所示,网络通信中存在针对不同场景的各种协议,并被划分进入了不同的层级。
网络的每一层级都有独属于该层的协议,不同层级的协议都服务于所属层级的功能,也因而有不同的组织和解析方式,表达的不同的含义。在操作系统中,上一层的协议会作为下一层的数据段,从上到下一层一层往外套,最终作为一个整体被发送出去。这个数据包在传输过程中会被经过的节点一层层往里解析到需要的那一层,再重新封装为一个完整的数据包发往下一个节点,直至最终到达目标主机。每一层协议的解析都是独立的,一个层级的协议格式错误对于其下的层级不会造成任何影响,就好像无论信件里写什么都不是邮局罢工的理由一样。上层的信息对于更低的层级来说不具备任何“语义”上的意义,都只是需要传输的数据罢了。例如当客户端组织了一段格式错误的请求,使用socket发送时不会产生任何错误,直到服务器接收到发现无法解析时才会将其识别为错误的HTTP协议,并回复能告知客户端“请求错误”的响应。但这对于底层的各种协议来说完全无所谓。
目前为止,我们所有的数据都是通过socket发送出去的,位于最顶层的应用层。如果在其中约定某种格式用于通信就属于应用层协议。例如,可以约定客户端通过socket发送如下数据给客户端,代表向客户端请求网页:
GET / HTTP/1.1
Host: www.google.com
服务器可能如此响应上述请求,同样通过socket回发给客户端网页文件:
HTTP/1.1 200 OK
Content-Length: 3059
Server: GWS/2.0
Date: Sat, 11 Jan 2003 02:44:04 GMT
Content-Type: text/html
Cache-control: private
Set-Cookie: PREF=ID=73d4aef52e57bae9:TM=1042253044:LM=1042253044:S=SMCc_HRPCQiqy
X9j; expires=Sun, 17-Jan-2038 19:14:07 GMT; path=/; domain=.google.com
Connection: keep-alive
// HTML
......
事实上,这就是浏览器和服务器之间通信的HTTP协议。在上面展示的请求中,客户端以GET开头,用于向指定的资源发出“显示”请求。这个字段可以更改。例如POST代表向指定资源提交数据,请求服务器进行处理。此外还有HEAD,PUT,DELETE等等不同的字段。在响应的开头有一个数字200,它代表请求被服务器理解,接收。这个字段同样可以被其他代表不同的意义的值替换,例如每个人都见过的404 NOT FOUND。4开头的响应码代表请求有错误,或者该请求服务器无法执行。尽管只介绍了HTTP协议众多字段中的两个,但应该已足够说明通信双方如何依赖应用层协议,乃至任何协议通信。协议规定了格式,指定了在什么位置的字段能有哪些值,又有什么含义,收发双方则依赖值的意义组织行为。在后面讲到其它层级的协议时你会发现,所有协议的工作方式都类似,仅仅是结构和字段的含义因需要实现的功能不同而不同罢了。
当某一类应用会被市面上多个不同厂商共同实现,那就需要一套大家都承认的协议,上面展示的网页浏览器和HTTP协议便是。然而由于应用层因应用程序种类繁多、需求各异,设计独属于自己app的协议也十分常见。每款应用程序都可以使用原创的协议而不必遵守任何规范,只要客户端和服务器能基于这款协议实现期望的功能即可。说到底,应用层协议只是“信的具体内容到底写什么”,双方都能看懂即可。
为具体展示HTTP协议如何工作,下面将基于3.1中的Reactor实现一个收发HTTP协议的服务器,也就是一个HTTP Server。由于Reactor已经实现了很好的封装,我们在这里仅需将收发的数据从展示用的“Hello!”更改为HTTP协议,并相应更改recv_cb()和send_cb()即可。它们不再直接调用recv()或send(),而是调用处理HTTP协议的函数,如下:
int recv_cb(int fd){
memset(conn[fd].rbuffer, 0, BUFFER_SIZE);
int count = recv(fd, conn[fd].rbuffer, BUFFER_SIZE, 0);
if (count == 0){
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
conn[fd].rlength = count;
http_request(&conn[fd]);
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd){
http_response(&conn[fd]);
int count = send(fd, conn[fd].wbuffer, conn[fd].wlength, 0);
set_event(fd, EPOLLIN, 0);
return count;
}
在这个函数逻辑下,http_request()负责处理收取到的请求,http_response()负责组织响应并填入缓冲区。实现如下:
#include <stdio.h>
#include "http.h"
int http_request(struct Connection *c){
printf("request: %s\n", c->rbuffer);
}
int http_response(struct Connection *c){
c->wlength = sprintf(c->wbuffer,
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/html; charset=UTF-8\r\n"
"Content-Length: 63\r\n"
"hello: world\r\n"
"\r\n"
"<!DOCTYPE html><html><body><p>Hello, world!</p></body></html>\r\n"
);
return c->wlength;
}
在这里我们只将请求打印,并组织确定的响应。在实际的HTTP Server中,在收到请求和返回响应之间还涉及组织SQL语句,查找资源等环节。
最后还要组织如下头文件:
#ifndef _HTTP_H_
#define _HTTP_H_
#define BUFFER_SIZE 1024
typedef int (*RCALLBACK)(int fd);
struct Connection{
int fd;
int status;
char rbuffer[BUFFER_SIZE];
int rlength;
char wbuffer[BUFFER_SIZE];
int wlength;
RCALLBACK out_callback;
RCALLBACK in_callback;
};
int http_request(struct Connection *c);
int http_response(struct Connection *c);
#endif
现在让我们尝试一下编译,运行,并用浏览器访问我们自己实现的服务器。启动服务器后在浏览器键入你运行该服务器的ip地址和端口2000,你将能看到下面的内容:
最后考虑一种情况。浏览器渲染网页的资源通常包括多种不同的文件:文本,图片,音视频等等……这多个文件的回发仅仅对应对该网页的一次请求。在上面的实现中,clientfd的关注事件在调用一次send_cb()后会被设置为EPOLLIN,但多个文件的回发意味着I/O需要连续处理多次EPOLLOUT事件,调用多次send_cb()。目前的代码结构无法实现这种控制,为解决这个问题,我们继续扩展连接队列,引入一个状态量。
struct Connection{
int fd;
int status;
char rbuffer[BUFFER_SIZE];
int rlength;
char wbuffer[BUFFER_SIZE];
int wlength;
RCALLBACK out_callback;
RCALLBACK in_callback;
};
例如,让status为0代表发送完毕,为1代表未完成。在这种设置下,多次回发的控制仅需在http_response()和send_cb()中对该参数进行判断即可实现(这实际上实现了一个有限状态机)。当然,这个量代表几个状态,分别用什么数字代表,具体如何处理逻辑……这些也是应用层协议如何设计的问题,在这里就不赘述了。
2.4 总结
本章介绍的架构在如今的服务器底层中非常常见,请务必理解透彻并上手实践。为帮助深化理解,并同时结合实践,下一章将以一个更具体的例子来展示epoll和Reactor的使用方式。我们将以Reactor为底层的网络框架,开发一个kv存储网络中间件。