高性能网络服务器编程:为什么linux下epoll

高性能网络服务器编程:为什么linux下epoll,第1张

基本的IO编程过程(包括网络IO和文件IO)是,打开文件描述符(windows是handler,Java是stream或channel),多路捕获(Multiplexe,即select和poll和epoll)IO可读写的状态,而后可以读写的文件描述符进行IO读写,由于IO设备速度和CPU内存比速度会慢,为了更好的利用CPU和内存,会开多线程,每个线程读写一个文件描述符。

但C10K问题,让我们意识到在超大数量的网络连接下,机器设备和网络速度不再是瓶颈,瓶颈在于操作系统和IO应用程序的沟通协作的方式。

举个例子,一万个socket连接过来,传统的IO编程模型要开万个线程来应对,还要注意,socket会关闭打开,一万个线程要不断的关闭线程重建线程,资源都浪费在这上面了,我们算建立一个线程耗1M内存,1万个线程机器至少要10G内存,这在IA-32的机器架构下基本是不可能的(要开PAE),现在x64架构才有可能舒服点,要知道,这仅仅是粗略算的内存消耗。别的资源呢?

所以,高性能的网络编程(即IO编程),第一,需要松绑IO连接和应用程序线程的对应关系,这就是非阻塞(nonblocking)、异步(asynchronous)的要求的由来(构造一个线程池,epoll监控到有数的fd,把fd传入线程池,由这些worker thread来读写io)。第二,需要高性能的OS对IO设备可读写(数据来了)的通知方式:从level-triggered notification到edge-triggered notification,关于这个通知方式,我们稍后谈。

需要注意异步,不等于AIO(asynchronous IO),Linux的AIO和java的AIO都是实现异步的一种方式,都是渣,这个我们也接下来会谈到。

针对前面说的这两点,我们看看select和poll的问题

这两个函数都在每次调用的时候要求我们把需要监控(看看有没有数据)的文件描述符,通过数组传递进入内核,内核每次都要扫描这些文件描述符,去理解它们,建立一个文件描述符和IO对应的数组(实际内核工作会有好点的实现方式,但可以这么理解先),以便IO来的时候,通知这些文件描述符,进而通知到进程里等待的这些select、poll。当有一万个文件描述符要监控的时候呢(一万个网络连接)?这个工作效率是很低的,资源要求却很高。

我们看epoll

epoll很巧妙,分为三个函数,第一个函数创建一个session类似的东西,第二函数告诉内核维持这个session,并把属于session内的fd传给内核,第三个函数epoll_wait是真正的监控多个文件描述符函数,只需要告诉内核,我在等待哪个session,而session内的fd,内核早就分析过了,不再在每次epoll调用的时候分析,这就节省了内核大部分工作。这样每次调用epoll,内核不再重新扫描fd数组,因为我们维持了session。

说道这里,只有一个字,开源,赞,众人拾柴火焰高,赞。

epoll的效率还不仅仅体现在这里,在内核通知方式上,也改进了,我们先看select和poll的通知方式,也就是level-triggered notification,内核在被DMA中断,捕获到IO设备来数据后,本来只需要查找这个数据属于哪个文件描述符,进而通知线程里等待的函数即可,但是,select和poll要求内核在通知阶段还要继续再扫描一次刚才所建立的内核fd和io对应的那个数组,因为应用程序可能没有真正去读上次通知有数据后的那些fd,应用程序上次没读,内核在这次select和poll调用的时候就得继续通知,这个os和应用程序的沟通方式效率是低下的。只是方便编程而已(可以不去读那个网络io,方正下次会继续通知)。

于是epoll设计了另外一种通知方式:edge-triggered notification,在这个模式下,io设备来了数据,就只通知这些io设备对应的fd,上次通知过的fd不再通知,内核不再扫描一大堆fd了。

基于以上分析,我们可以看到epoll是专门针对大网络并发连接下的os和应用沟通协作上的一个设计,在linux下编网络服务器,必然要采用这个,nginx、PHP的国产异步框架swool、varnish,都是采用这个。

注意还要打开epoll的edge-triggered notification。而java的NIO和NIO.2都只是用了epoll,没有打开edge-triggered notification,所以不如JBoss的Netty。

接下来我们谈谈AIO的问题,AIO希望的是,你select,poll,epoll都需要用一个函数去监控一大堆fd,那么我AIO不需要了,你把fd告诉内核,你应用程序无需等待,内核会通过信号等软中断告诉应用程序,数据来了,你直接读了,所以,用了AIO可以废弃select,poll,epoll。

但linux的AIO的实现方式是内核和应用共享一片内存区域,应用通过检测这个内存区域(避免调用nonblocking的read、write函数来测试是否来数据,因为即便调用nonblocking的read和write由于进程要切换用户态和内核态,仍旧效率不高)来得知fd是否有数据,可是检测内存区域毕竟不是实时的,你需要在线程里构造一个监控内存的循环,设置sleep,总的效率不如epoll这样的实时通知。所以,AIO是渣,适合低并发的IO操作。所以java7引入的NIO.2引入的AIO对高并发的网络IO设计程序来说,也是渣,只有Netty的epoll+edge-triggered notification最牛,能在linux让应用和OS取得最高效率的沟通。

socket接受线程:C语言为了高并发所以选择了epoll。当程序启动的时候(g_net_update.c文件中main函数,会启动一个thread见函数create_accept_task)这个thread就处理一件事情,只管接收客户端的连接,当有连接进来的时候 通过epoll_ctl函数,把socket fd 加入到epoll里面去,epoll设置监听事件EPOLLIN | EPOLLET主要是监听的是加入到epoll中的socket是否可读(因为我的需求是客户端连上了server就会马上向server发送一份数据的)。其它的部分在主线程中处理。

主线程:是一个无线循环,epoll_wait 函数相当于把客户端的连接从epoll中拿出来(因为我们监听的是EPOLLIN | EPOLLET)说明这个时候客户端有数据发送过来)。再通过recv_buffer_from_fd 函数把客户端发送过来的数据读出来。然后其他的一切就抛给线程池去处理。

线程池:(代码中我会在池里面创建15个线程) 双向链表。加入线程就是在链表后面加一个链表项,链表的前面会一个一个被拿出来处理。主要是malloc 函数free函数,sem_wait函数sem_post的处理(sem_wait 会阻塞当值大于0是会减一,sem_post是值加一)。typedef void* (FUNC)(void arg, int index)是我们自定义的线程的逻辑处理部分,arg是参数,index是第几个线程处理(我们隐形的给每个线程都标了号),例如代码中的respons_stb_info,更加具体可以看看代码里面是怎么实现的。聪明的你也可以改掉这块的内容改成动态线程池,当某个时刻的处理比较多的时候能够动态的增加线程,而不像我代码里面的是固定的。

数据库连接池:按照我的需求在处理客户端请求数据的时候是要访问数据库的。就是一下子创建出一堆的数据连接。要访问数据库的时候先去数据库连接池中找出空闲的连接,具体可以看下代码。使用的时候可以参考下database_process.c文件(代码中数据库连接池和线程池中的个数是一样的)。这里我想说下get_db_connect_from_pool这个函数,我用了随机数,我是为了不想每次都从0开始去判断哪个连接没有用到。为了数据库连接池中的每个链接都能等概率的使用到,具体的还是可以看下代码的实现。

#include  <unistd.h>

#include  <sys/types.h>       /* basic system data types */

#include  <sys/socket.h>      /* basic socket definitions */

#include  <netinet/in.h>      /* sockaddr_in{} and other Internet defns */

#include  <arpa/inet.h>       /* inet(3) functions */

#include  <sys/epoll.h> /* epoll function */

#include  <fcntl.h>     /* nonblocking */

#include  <sys/resource.h> /*setrlimit */

#include <stdlib.h>

#include <errno.h>

#include <stdio.h>

#include <string.h>

#define MAXEPOLLSIZE 10000

#define MAXLINE 10240

int handle(int connfd)

int setnonblocking(int sockfd)

{

    if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1) {

        return -1

    }

    return 0

}

int main(int argc, char **argv)

{

    int  servPort = 6888

    int listenq = 1024

    int listenfd, connfd, kdpfd, nfds, n, nread, curfds,acceptCount = 0

    struct sockaddr_in servaddr, cliaddr

    socklen_t socklen = sizeof(struct sockaddr_in)

    struct epoll_event ev

    struct epoll_event events[MAXEPOLLSIZE]

    struct rlimit rt

    char buf[MAXLINE]

    /* 设置每个进程允许打开的最大文件数 */

    rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE

    if (setrlimit(RLIMIT_NOFILE, &rt) == -1)

    {

        perror("setrlimit error")

        return -1

    }

    bzero(&servaddr, sizeof(servaddr))

    servaddr.sin_family = AF_INET

    servaddr.sin_addr.s_addr = htonl(INADDR_ANY)

    servaddr.sin_port = htons(servPort)

    listenfd = socket(AF_INET, SOCK_STREAM, 0)

    if (listenfd == -1) {

      perror("can't create socket file")

        return -1

    }

    int opt = 1

    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))

    if (setnonblocking(listenfd) < 0) {

        perror("setnonblock error")

    }

    if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(struct sockaddr)) == -1)

    {

        perror("bind error")

        return -1

    }

    if (listen(listenfd, listenq) == -1)

    {

        perror("listen error")

        return -1

    }

    /* 创建 epoll 句柄,把监听 socket 加入到 epoll 集合里 */

    kdpfd = epoll_create(MAXEPOLLSIZE)

    ev.events = EPOLLIN | EPOLLET

    ev.data.fd = listenfd

    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listenfd, &ev) < 0)

    {

        fprintf(stderr, "epoll set insertion error: fd=%d\n", listenfd)

        return -1       

     }

    curfds = 1

    printf("epollserver startup,port %d, max connection is %d, backlog is %d\n", servPort, MAXEPOLLSIZE, listenq)

    for () {

        /* 等待有事件发生 */

        nfds = epoll_wait(kdpfd, events, curfds, -1)

        if (nfds == -1)

        {

            perror("epoll_wait")

            continue

        }

        /* 处理所有事件 */

        for (n = 0 n < nfds ++n)

        {

            if (events[n].data.fd == listenfd)

            {

                connfd = accept(listenfd, (struct sockaddr *)&cliaddr,&socklen)

                if (connfd < 0)

                {

                    perror("accept error")

                    continue

                }

                sprintf(buf, "accept form %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port)

                printf("%d:%s", ++acceptCount, buf)    

               if (curfds >= MAXEPOLLSIZE) {

                    fprintf(stderr, "too many connection, more than %d\n", MAXEPOLLSIZE)

                    close(connfd)

                    continue

                }

                if (setnonblocking(connfd) < 0) {

                    perror("setnonblocking error")

                }

                ev.events = EPOLLIN | EPOLLET

                ev.data.fd = connfd

                if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, connfd, &ev) < 0)

                {

                    fprintf(stderr, "add socket '%d' to epoll failed: %s\n", connfd, strerror(errno))

                    return -1

                }

                curfds++

                continue

            }

            // 处理客户端请求

            if (handle(events[n].data.fd) < 0) {

                epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd,&ev)

                curfds--

            }

        }  

      }

    close(listenfd)

    return 0

}

int handle(int connfd) {

    int nread

    char buf[MAXLINE]

    nread = read(connfd, buf, MAXLINE)//读取客户端socket流

    if (nread == 0) {

        printf("client close the connection\n")

        close(connfd)

        return -1

    }

    if (nread < 0) {

        perror("read error")

        close(connfd)

        return -1

    }

    write(connfd, buf, nread)//响应客户端  

    return 0

}

epoll   server端;

epoll   client端

#include  <unistd.h>

#include  <sys/types.h>       /* basic system data types */

#include  <sys/socket.h>      /* basic socket definitions */

#include  <netinet/in.h>      /* sockaddr_in{} and other Internet defns */

#include  <arpa/inet.h>       /* inet(3) functions */

#include <netdb.h> /*gethostbyname function */

#include <stdlib.h>

#include <errno.h>

#include <stdio.h>

#include <string.h>

#define MAXLINE 1024

void handle(int connfd)

int main(int argc, char **argv)

{

    char * servInetAddr = "127.0.0.1"

    int servPort = 6888

    char buf[MAXLINE]

    int connfd

    struct sockaddr_in servaddr

    if (argc == 2) {

        servInetAddr = argv[1]

    }

    if (argc == 3) {

        servInetAddr = argv[1]

        servPort = atoi(argv[2])

    }

    if (argc > 3) {

        printf("usage: echoclient <IPaddress> <Port>\n")

         return -1

    }

    connfd = socket(AF_INET, SOCK_STREAM, 0)

    //bzero(&servaddr, sizeof(servaddr))

    servaddr.sin_family = AF_INET

    servaddr.sin_port = htons(servPort)

    //inet_pton(AF_INET, servInetAddr, &servaddr.sin_addr)

    servaddr.sin_addr.s_addr = inet_addr(servInetAddr)

    bzero(&(servaddr.sin_zero), 0)

    if (connect(connfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {

        perror("connect error")

        return -1

    }

    printf("welcome to echoclient\n")

    handle(connfd)     /* do it all */

    close(connfd)

    printf("exit\n")

    exit(0)

}

void handle(int sockfd)

{

    char sendline[MAXLINE], recvline[MAXLINE]

    int n

    for () {

        if (fgets(sendline, MAXLINE, stdin) == NULL)

        {

            break//read eof

        }

      

        n = write(sockfd, sendline, strlen(sendline))

        n = read(sockfd, recvline, MAXLINE)

        if (n == 0) {

            printf("echoclient: server terminated prematurely\n")

            break

        }

        write(STDOUT_FILENO, recvline, n)

    }

}


欢迎分享,转载请注明来源:夏雨云

原文地址:https://www.xiayuyun.com/zonghe/193915.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-03-31
下一篇2023-03-31

发表评论

登录后才能评论

评论列表(0条)

    保存