但C10K问题,让我们意识到在超大数量的网络连接下,机器设备和网络速度不再是瓶颈,瓶颈在于操作系统和IO应用程序的沟通协作的方式。
举个例子,一万个socket连接过来,传统的IO编程模型要开万个线程来应对,还要注意,socket会关闭打开,一万个线程要不断的关闭线程重建线程,资源都浪费在这上面了,我们算建立一个线程耗1M内存,1万个线程机器至少要10G内存,这在IA-32的机器架构下基本是不可能的(要开PAE),现在x64架构才有可能舒服点,要知道,这仅仅是粗略算的内存消耗。别的资源呢?
所以,高性能的网络编程(即IO编程),第一,需要松绑IO连接和应用程序线程的对应关系,这就是非阻塞(nonblocking)、异步(asynchronous)的要求的由来(构造一个线程池,epoll监控到有数的fd,把fd传入线程池,由这些worker thread来读写io)。第二,需要高性能的OS对IO设备可读写(数据来了)的通知方式:从level-triggered notification到edge-triggered notification,关于这个通知方式,我们稍后谈。
需要注意异步,不等于AIO(asynchronous IO),Linux的AIO和java的AIO都是实现异步的一种方式,都是渣,这个我们也接下来会谈到。
针对前面说的这两点,我们看看select和poll的问题
这两个函数都在每次调用的时候要求我们把需要监控(看看有没有数据)的文件描述符,通过数组传递进入内核,内核每次都要扫描这些文件描述符,去理解它们,建立一个文件描述符和IO对应的数组(实际内核工作会有好点的实现方式,但可以这么理解先),以便IO来的时候,通知这些文件描述符,进而通知到进程里等待的这些select、poll。当有一万个文件描述符要监控的时候呢(一万个网络连接)?这个工作效率是很低的,资源要求却很高。
我们看epoll
epoll很巧妙,分为三个函数,第一个函数创建一个session类似的东西,第二函数告诉内核维持这个session,并把属于session内的fd传给内核,第三个函数epoll_wait是真正的监控多个文件描述符函数,只需要告诉内核,我在等待哪个session,而session内的fd,内核早就分析过了,不再在每次epoll调用的时候分析,这就节省了内核大部分工作。这样每次调用epoll,内核不再重新扫描fd数组,因为我们维持了session。
说道这里,只有一个字,开源,赞,众人拾柴火焰高,赞。
epoll的效率还不仅仅体现在这里,在内核通知方式上,也改进了,我们先看select和poll的通知方式,也就是level-triggered notification,内核在被DMA中断,捕获到IO设备来数据后,本来只需要查找这个数据属于哪个文件描述符,进而通知线程里等待的函数即可,但是,select和poll要求内核在通知阶段还要继续再扫描一次刚才所建立的内核fd和io对应的那个数组,因为应用程序可能没有真正去读上次通知有数据后的那些fd,应用程序上次没读,内核在这次select和poll调用的时候就得继续通知,这个os和应用程序的沟通方式效率是低下的。只是方便编程而已(可以不去读那个网络io,方正下次会继续通知)。
于是epoll设计了另外一种通知方式:edge-triggered notification,在这个模式下,io设备来了数据,就只通知这些io设备对应的fd,上次通知过的fd不再通知,内核不再扫描一大堆fd了。
基于以上分析,我们可以看到epoll是专门针对大网络并发连接下的os和应用沟通协作上的一个设计,在linux下编网络服务器,必然要采用这个,nginx、PHP的国产异步框架swool、varnish,都是采用这个。
注意还要打开epoll的edge-triggered notification。而java的NIO和NIO.2都只是用了epoll,没有打开edge-triggered notification,所以不如JBoss的Netty。
接下来我们谈谈AIO的问题,AIO希望的是,你select,poll,epoll都需要用一个函数去监控一大堆fd,那么我AIO不需要了,你把fd告诉内核,你应用程序无需等待,内核会通过信号等软中断告诉应用程序,数据来了,你直接读了,所以,用了AIO可以废弃select,poll,epoll。
但linux的AIO的实现方式是内核和应用共享一片内存区域,应用通过检测这个内存区域(避免调用nonblocking的read、write函数来测试是否来数据,因为即便调用nonblocking的read和write由于进程要切换用户态和内核态,仍旧效率不高)来得知fd是否有数据,可是检测内存区域毕竟不是实时的,你需要在线程里构造一个监控内存的循环,设置sleep,总的效率不如epoll这样的实时通知。所以,AIO是渣,适合低并发的IO操作。所以java7引入的NIO.2引入的AIO对高并发的网络IO设计程序来说,也是渣,只有Netty的epoll+edge-triggered notification最牛,能在linux让应用和OS取得最高效率的沟通。
socket接受线程:C语言为了高并发所以选择了epoll。当程序启动的时候(g_net_update.c文件中main函数,会启动一个thread见函数create_accept_task)这个thread就处理一件事情,只管接收客户端的连接,当有连接进来的时候 通过epoll_ctl函数,把socket fd 加入到epoll里面去,epoll设置监听事件EPOLLIN | EPOLLET主要是监听的是加入到epoll中的socket是否可读(因为我的需求是客户端连上了server就会马上向server发送一份数据的)。其它的部分在主线程中处理。
主线程:是一个无线循环,epoll_wait 函数相当于把客户端的连接从epoll中拿出来(因为我们监听的是EPOLLIN | EPOLLET)说明这个时候客户端有数据发送过来)。再通过recv_buffer_from_fd 函数把客户端发送过来的数据读出来。然后其他的一切就抛给线程池去处理。
线程池:(代码中我会在池里面创建15个线程) 双向链表。加入线程就是在链表后面加一个链表项,链表的前面会一个一个被拿出来处理。主要是malloc 函数free函数,sem_wait函数sem_post的处理(sem_wait 会阻塞当值大于0是会减一,sem_post是值加一)。typedef void* (FUNC)(void arg, int index)是我们自定义的线程的逻辑处理部分,arg是参数,index是第几个线程处理(我们隐形的给每个线程都标了号),例如代码中的respons_stb_info,更加具体可以看看代码里面是怎么实现的。聪明的你也可以改掉这块的内容改成动态线程池,当某个时刻的处理比较多的时候能够动态的增加线程,而不像我代码里面的是固定的。
数据库连接池:按照我的需求在处理客户端请求数据的时候是要访问数据库的。就是一下子创建出一堆的数据连接。要访问数据库的时候先去数据库连接池中找出空闲的连接,具体可以看下代码。使用的时候可以参考下database_process.c文件(代码中数据库连接池和线程池中的个数是一样的)。这里我想说下get_db_connect_from_pool这个函数,我用了随机数,我是为了不想每次都从0开始去判断哪个连接没有用到。为了数据库连接池中的每个链接都能等概率的使用到,具体的还是可以看下代码的实现。
#include <unistd.h>#include <sys/types.h> /* basic system data types */
#include <sys/socket.h> /* basic socket definitions */
#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
#include <arpa/inet.h> /* inet(3) functions */
#include <sys/epoll.h> /* epoll function */
#include <fcntl.h> /* nonblocking */
#include <sys/resource.h> /*setrlimit */
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#define MAXEPOLLSIZE 10000
#define MAXLINE 10240
int handle(int connfd)
int setnonblocking(int sockfd)
{
if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1) {
return -1
}
return 0
}
int main(int argc, char **argv)
{
int servPort = 6888
int listenq = 1024
int listenfd, connfd, kdpfd, nfds, n, nread, curfds,acceptCount = 0
struct sockaddr_in servaddr, cliaddr
socklen_t socklen = sizeof(struct sockaddr_in)
struct epoll_event ev
struct epoll_event events[MAXEPOLLSIZE]
struct rlimit rt
char buf[MAXLINE]
/* 设置每个进程允许打开的最大文件数 */
rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE
if (setrlimit(RLIMIT_NOFILE, &rt) == -1)
{
perror("setrlimit error")
return -1
}
bzero(&servaddr, sizeof(servaddr))
servaddr.sin_family = AF_INET
servaddr.sin_addr.s_addr = htonl(INADDR_ANY)
servaddr.sin_port = htons(servPort)
listenfd = socket(AF_INET, SOCK_STREAM, 0)
if (listenfd == -1) {
perror("can't create socket file")
return -1
}
int opt = 1
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))
if (setnonblocking(listenfd) < 0) {
perror("setnonblock error")
}
if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(struct sockaddr)) == -1)
{
perror("bind error")
return -1
}
if (listen(listenfd, listenq) == -1)
{
perror("listen error")
return -1
}
/* 创建 epoll 句柄,把监听 socket 加入到 epoll 集合里 */
kdpfd = epoll_create(MAXEPOLLSIZE)
ev.events = EPOLLIN | EPOLLET
ev.data.fd = listenfd
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listenfd, &ev) < 0)
{
fprintf(stderr, "epoll set insertion error: fd=%d\n", listenfd)
return -1
}
curfds = 1
printf("epollserver startup,port %d, max connection is %d, backlog is %d\n", servPort, MAXEPOLLSIZE, listenq)
for () {
/* 等待有事件发生 */
nfds = epoll_wait(kdpfd, events, curfds, -1)
if (nfds == -1)
{
perror("epoll_wait")
continue
}
/* 处理所有事件 */
for (n = 0 n < nfds ++n)
{
if (events[n].data.fd == listenfd)
{
connfd = accept(listenfd, (struct sockaddr *)&cliaddr,&socklen)
if (connfd < 0)
{
perror("accept error")
continue
}
sprintf(buf, "accept form %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port)
printf("%d:%s", ++acceptCount, buf)
if (curfds >= MAXEPOLLSIZE) {
fprintf(stderr, "too many connection, more than %d\n", MAXEPOLLSIZE)
close(connfd)
continue
}
if (setnonblocking(connfd) < 0) {
perror("setnonblocking error")
}
ev.events = EPOLLIN | EPOLLET
ev.data.fd = connfd
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, connfd, &ev) < 0)
{
fprintf(stderr, "add socket '%d' to epoll failed: %s\n", connfd, strerror(errno))
return -1
}
curfds++
continue
}
// 处理客户端请求
if (handle(events[n].data.fd) < 0) {
epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd,&ev)
curfds--
}
}
}
close(listenfd)
return 0
}
int handle(int connfd) {
int nread
char buf[MAXLINE]
nread = read(connfd, buf, MAXLINE)//读取客户端socket流
if (nread == 0) {
printf("client close the connection\n")
close(connfd)
return -1
}
if (nread < 0) {
perror("read error")
close(connfd)
return -1
}
write(connfd, buf, nread)//响应客户端
return 0
}
epoll server端;
epoll client端
#include <unistd.h>#include <sys/types.h> /* basic system data types */
#include <sys/socket.h> /* basic socket definitions */
#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
#include <arpa/inet.h> /* inet(3) functions */
#include <netdb.h> /*gethostbyname function */
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#define MAXLINE 1024
void handle(int connfd)
int main(int argc, char **argv)
{
char * servInetAddr = "127.0.0.1"
int servPort = 6888
char buf[MAXLINE]
int connfd
struct sockaddr_in servaddr
if (argc == 2) {
servInetAddr = argv[1]
}
if (argc == 3) {
servInetAddr = argv[1]
servPort = atoi(argv[2])
}
if (argc > 3) {
printf("usage: echoclient <IPaddress> <Port>\n")
return -1
}
connfd = socket(AF_INET, SOCK_STREAM, 0)
//bzero(&servaddr, sizeof(servaddr))
servaddr.sin_family = AF_INET
servaddr.sin_port = htons(servPort)
//inet_pton(AF_INET, servInetAddr, &servaddr.sin_addr)
servaddr.sin_addr.s_addr = inet_addr(servInetAddr)
bzero(&(servaddr.sin_zero), 0)
if (connect(connfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
perror("connect error")
return -1
}
printf("welcome to echoclient\n")
handle(connfd) /* do it all */
close(connfd)
printf("exit\n")
exit(0)
}
void handle(int sockfd)
{
char sendline[MAXLINE], recvline[MAXLINE]
int n
for () {
if (fgets(sendline, MAXLINE, stdin) == NULL)
{
break//read eof
}
n = write(sockfd, sendline, strlen(sendline))
n = read(sockfd, recvline, MAXLINE)
if (n == 0) {
printf("echoclient: server terminated prematurely\n")
break
}
write(STDOUT_FILENO, recvline, n)
}
}
欢迎分享,转载请注明来源:夏雨云
评论列表(0条)