并发请求处理
多线程实现的必要性
当一个客户端与服务器建立连接以后,服务器端
accept()返回,进而准备循环接收客户端发过来的数据。如果客户端暂时没发数据,服务端会在
recv()处阻塞。此时,其他客户端向服务器发起连接后,由于服务器阻塞了,无法执行
accept()接受连接,也就是其他客户端发送的数据,服务器无法读取。服务器也就无法并发同时处理多个客户端。
服务器如何实现同时处理多个客户端的同时通信:有如下几种方案:多线程,多进程,select,poll,epoll,以及使用epoll+reactor
多线程
采用pthread多线程来分别处理单个连接,实现多个客户端连接的同时处理
1 2 3 4 5 6 7 8 9 10 11 while (1 ) { struct sockaddr_in client ; socklen_t len = sizeof (client); if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1 ) { printf ("accept socket error: %s(errno: %d)\n" , strerror(errno), errno); return 0 ; } pthread_t threadid; pthread_create(&threadid, NULL , client_routine, (void *)&connfd); }
accept以后创建线程处理响应。服务端接受一个客户端的连接后,创建一个线程(或者进程),然后在新创建的线程或进程中循环处理数据。主线程(父进程)只负责监听客户端的连接,并使用
accept()接受连接,不进行数据的处理。
多线程
实验代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 #include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> #include <pthread.h> #define MAXLNE 4096 #define POLL_SIZE 1024 void *client_routine (void *arg) { int connfd = *(int *)arg; char buff[MAXLNE]; while (1 ) { int n = recv(connfd, buff, MAXLNE, 0 ); if (n > 0 ) { buff[n] = '\0' ; printf ("recv msg from client: %s\n" , buff); send(connfd, buff, n, 0 ); } else if (n == 0 ) { close(connfd); break ; } } return NULL ; } int main (int argc, char **argv) { int listenfd, connfd, n; struct sockaddr_in servaddr ; char buff[MAXLNE]; if ((listenfd = socket(AF_INET, SOCK_STREAM, 0 )) == -1 ) { printf ("create socket error: %s(errno: %d)\n" , strerror(errno), errno); return 0 ; } memset (&servaddr, 0 , sizeof (servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(10000 ); if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof (servaddr)) == -1 ) { printf ("bind socket error: %s(errno: %d)\n" , strerror(errno), errno); return 0 ; } if (listen(listenfd, 10 ) == -1 ) { printf ("listen socket error: %s(errno: %d)\n" , strerror(errno), errno); return 0 ; } while (1 ) { struct sockaddr_in client ; socklen_t len = sizeof (client); if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1 ) { printf ("accept socket error: %s(errno: %d)\n" , strerror(errno), errno); return 0 ; } pthread_t threadid; pthread_create(&threadid, NULL , client_routine, (void *)&connfd); } return 0 ; }
g++ pthreadserver.cpp -pthread
这种方法的不足:
但是每次连接生成创建一个线程,消耗内存会很大。另外进行IO操作时,例如发生
IO 操作 read 时,它会经历两个阶段:
等待数据准备就绪
将数据从内核拷贝到进程或者线程中 因此当使用默认的阻塞套接字时,由于1
个线程捆绑处理 1
个连接,这两个阶段合而为一,这样操作套接字的代码所在的线程就得睡眠来等待消息准备好,这导致了高并发下线程会频繁的睡眠、唤醒,从而影响了
CPU 的使用效率。
select
原理
select:将文件描述符放入一个集合中,调用select时,将这个集合从用户空间拷贝到内核空间(缺点:每次都要复制,开销大 ),由内核根据就绪状态修改该集合的内容。一个多线程编程很麻烦又容易出错,二是如果连接有几千个的话,线程间切换的开销确实是很大。能能够在一个线程里就实现这个效果?
这个又叫做非阻塞IO多路复用,就是进程或线程执行此函数时不必非要等待事件的发生,一旦执行肯定返回,以返回值的不同来反映函数的执行情况,如果事件发生则与阻塞方式相同,若事件没有发生则返回一个代码来告知事件未发生,而进程或线程继续执行,所以效率较高。
select
用户程序访问不了内核空间,调用 select 会把所有要管理的 socket 的 fd
(文件描述符,Linux下皆为文件,简单理解就是通过 fd 能找到这个
socket)传到内核中。此时,要遍历所有
socket,看看是否有感兴趣的事件发生。如果没有一个 socket 有事件发生,那么
select 的线程就需要让出 cpu
阻塞等待,这个等待可以是不设置超时时间的死等,也可以是设置 timeout
的有超时时间的等待。
假设此时客户端发送了数据,网卡接收到的数据塞到对应的 socket
的接收队列中,此时 socket 知道来数据了,那如何唤醒 select 呢?
其实每个 socket 有个属于自己的睡眠队列,select
会安排一个内应,即在被管理的 socket 的睡眠队列里面塞入一个 entry。当
socket 接收到网卡的数据后,就会去它的睡眠队列里遍历 entry,调用 entry
设置的 callback 方法,这个 callback 方法里就能唤醒 select !
所以 select 在每个被它管理的 socket 的睡眠队列里都塞入一个与它相关的
entry,这样不论哪个 socket 来数据了,它立马就能被唤醒然后干活!
但是,select 的实现不太好,因为唤醒的 select
此时只知道来活了,并不知道具体是哪个 socket
来数据了,所以只能傻傻地遍历所有 socket ,看看到底是哪个 scoket
来活了,然后把所有来活的 socket
封装成事件返回。这样用户程序就能获得发生的事件,然后进行 I/O
和业务处理了。
select2
说到select的IO多路复用就不得不提fd_set这个变量类型,首先我们打开Linux的fd_set数据结构的源码我们可以看到,就是一个长度为32的long
int类型的数组(要注意,windows的源码和Linux的不一样)。每一位可以代表一个文件描述符(位图),所以fd_set最多表示1024个文件描述符。
使用例子
fd_set定义的几个宏
1 2 3 4 5 #include <sys/select.h> int FD_ZERO (fd_set *fdset) ; int FD_SET (int fd, fd_set *fd_set) ; int FD_ISSET (int fd, fd_set *fdset) ;int FD_CLR (int fd, fd_set *fdset) ;
我们假设fdset就一个字节,就是8位,那么
(1)执行FD_ZERO(&fdset),则set用位表示是00000000,就是所有位都清空成0,一般刚开始的时候就需要清空。
(2)执行FD_SET(fd,&fdset),若fd=5,后set变为00010000,第5位置为1,就是将客户端连接的描述字(一般就是一个整数啦)放入到set当中。
(3)执行FD_ISSET(fd,&fdset),若fd=5,则就是判断set的第5位是否是1,一般用来判断是否客户端的连接。
(4)执行FD_CLR(fd,&fdset),若fd=5,则就是将第5位置成0,在断开客户端连接的时候,一定要记得调用这个。
select的函数定义。
1 int select (int maxfdp, fd_set *readset, fd_set *writeset, fd_set *exceptset,struct timeval *timeout) ;
参数说明:
maxfdp:被监听的文件描述符的总数,它比所有文件描述符集合中的文件描述符的最大值大1,因为文件描述符是从0开始计数的;
fd_set *readset:
该参数是我们所关心的文件是否可读的文件描述符的集合,如果这个集合中有个文件可读了,那select返回一个大于0的数,表示有文件可读了,比如说服务端接收到客户端的数据,服务端都是读的状态,所以正常读的文件都放在这里。
fd_set
*writeset:那这个大家就比较好理解了,服务端发到客户端的数据,要写入到缓冲区,那么所有正常写的文件都放在这里。
fd_set
*exceptset:在所有正常读和正常写的时候,产生了异常情况,那么异常文件就放在这里。
timeval
*timeout:用于设置select函数的超时时间,即告诉内核select等待多长时间之后就放弃等待。这个参数使select处于三种状态:(1)timeout传入NULL,则select一直等到文件状态有变化时才返回,这段时间一直处于阻塞状态。(2):timeout传入0,则select会立即返回(非阻塞),如果文件状态有变化则返回一个大于0的值没有变化则返回0;(3)timeout传入一个大于0的数,则select在timeout时间内阻塞,一旦文件状态有变化就会返回,超时后不管怎样都会返回值同样是文件状态右边话就返回一个大于0的值,无变化则返回0;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 #include <stdio.h> #include <sys/socket.h> #include <netinet/in.h> #include <stdlib.h> #include <arpa/inet.h> #include <unistd.h> #include <string.h> #define PORT 80 #define MAX_FD_NUM 3 #define BUF_SIZE 512 #define ERR_EXIT(m) \ do \ { \ perror(m); \ exit(EXIT_FAILURE); \ } while (0) int main () { int m_sockfd = socket(AF_INET, SOCK_STREAM, 0 ); if (m_sockfd < 0 ) { ERR_EXIT("create socket fail" ); } struct sockaddr_in server_addr ; int server_len = sizeof (server_addr); memset (&server_addr, 0 , server_len); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(PORT); int m_bindfd = bind(m_sockfd, (struct sockaddr *)&server_addr, server_len); if (m_bindfd < 0 ) { ERR_EXIT("bind ip and port fail" ); } int m_listenfd = listen(m_sockfd, MAX_FD_NUM); if (m_listenfd < 0 ) { ERR_EXIT("listen client fail" ); } printf ("client accept success\n" ); struct sockaddr_in client_addr ; socklen_t client_len = sizeof (client_addr); char buffer[BUF_SIZE]; int array_fd[MAX_FD_NUM]; int client_count = 0 ; fd_set tmpfd; int max_fd = m_sockfd; struct timeval timeout ; for (int i = 0 ; i < MAX_FD_NUM; i++) { array_fd[i] = -1 ; } while (1 ) { FD_ZERO(&tmpfd); FD_SET(m_sockfd, &tmpfd); int i; for (i = 0 ; i < MAX_FD_NUM; i++) { if (array_fd[i] > 0 ) { FD_SET(array_fd[i], &tmpfd); if (max_fd < array_fd[i]) { max_fd = array_fd[i]; } } } int ret = select(max_fd + 1 , &tmpfd, NULL , NULL , NULL ); if (ret < 0 ) { ERR_EXIT("select fail" ); } else if (ret == 0 ) { printf ("select timeout\n" ); continue ; } if (FD_ISSET(m_sockfd, &tmpfd)) { int m_connfd = accept(m_sockfd, (struct sockaddr *)&client_addr, &client_len); if (m_connfd < 0 ) { ERR_EXIT("server accept fail" ); } if (client_count >= MAX_FD_NUM) { printf ("max connections arrive!!!\n" ); close(m_connfd); continue ; } client_count++; printf ("we got a new connection, client_socket=%d, client_count=%d, ip=%s, port=%d\n" , m_connfd, client_count, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); for (i = 0 ; i < MAX_FD_NUM; i++) { if (array_fd[i] == -1 ) { array_fd[i] = m_connfd; break ; } } } for (i = 0 ; i < MAX_FD_NUM; i++) { if (array_fd[i] < 0 ) { continue ; } else { if (FD_ISSET(array_fd[i], &tmpfd)) { memset (buffer, 0 , sizeof (buffer)); int recv_len = recv(array_fd[i], buffer, sizeof (buffer) - 1 , 0 ); if (recv_len < 0 ) { ERR_EXIT("recv data fail" ); } else if (recv_len == 0 ) { client_count--; printf ("client_socket=[%d] close, client_count=[%d], ip=%s, port=%d\n\n" , array_fd[i], client_count, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); close(array_fd[i]); FD_CLR(array_fd[i], &tmpfd); array_fd[i] = -1 ; } else { printf ("server recv:%s\n" , buffer); strcat (buffer, "+ACK" ); send(array_fd[i], buffer, sizeof (buffer) - 1 , 0 ); } } } } } close(m_sockfd); printf ("server socket closed!!!\n" ); return 0 ; }
采用select实现并发处理请求的缺点:
1:
每次都要将文件描述符集合从用户空间拷贝到内核空间复制,处理完事件之后又需要将监听的fd从用户态拷贝到内核态,开销大
2:
保存fd数组的大小有限,并且监听的fd越多,性能可能越差(当活跃的fd较少时)。
3:
需要轮询遍历所有文件描述符才能知道哪些fd句柄有事件发生,轮询方式效率慢
poll
poll:和select几乎没有区别,区别在于文件描述符的存储方式不同,poll采用链表的方式存储,没有最大存储数量的限制;
用到比较少这里不多介绍
epoll
原理
其它比较好的参考博客
参考博客2
select 的几个可以优化的点。为什么每次 select 需要把监控的 fds
传输到内核里?
为什么 socket 只唤醒 select,不能告诉它是哪个 socket 来数据了?
epoll 主要就是基于上面两点做了优化。
首先,搞了个叫 epoll_ctl 的方法,这方法就是用来管理维护 epoll
所监控的哪些 socket。
如果你的 epoll 要新加一个 socket 来管理,那就调用
epoll_ctl,要删除一个 socket 也调用
epoll_ctl,通过不同的入参来控制增删改。
epoll_ctl
这样,在内核里面就维护了此 epoll 管理的 socket
集合,这样就不用每次调用的时候都得把所有管理的 fds 拷贝到内核了。然后和
select 类似,每个 socket 的睡眠队列里都会加个 entry,当每个 socket
来数据之后,同样也会调用 entry 对应的 callback。 socket
集合底层通过红黑树来描述。
epoll
与 select 不同的是,引入了一个 ready_list 双向链表,callback
里面会把当前的 socket 加入到 ready_list 然后唤醒 epoll。
在使用epoll_wait调用时,仅观察这个list中有没有数据即可。这样被唤醒的
epoll 只需要遍历 ready_list 即可,这个链表里一定是有数据可读的
socket,相比于 select 就不会做无用的遍历了。同时收集到的可读的 fd
按理是要拷贝到用户空间的,这里又做了个优化,利用了
mmp内存映射,让用户空间和内核空间映射到同一块内存中,这样就避免了拷贝。
readylist
整体示意图
服务器向客户端发送文件的时候整个过程是这样的:
相关函数接口
events
1 2 3 4 struct epoll_event events [EPOLL_SIZE ] = {0 }; struct epoll_event ev ; ev.events = EPOLLIN; ev.data.fd = listenfd;
events可以是以下几个宏的集合:
EPOLLIN : 表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT: 表示对应的文件描述符可以写; EPOLLPRI:
表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR: 表示对应的文件描述符发生错误; EPOLLHUP:
表示对应的文件描述符被挂断; EPOLLET: 将 EPOLL设为边缘触发(Edge
Triggered)模式(默认为水平触发),这是相对于水平触发(Level
Triggered)来说的。 EPOLLONESHOT:
只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
epoll_create:文件描述符的创建
epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。
这个文件描述符使用如下epoll_create函数来创建;
epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集。
调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点(epoll_create创建的文件描述符),在内核cache里建了个
红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件
eventpoll对象也是文件系统中的一员,和socket一样,它也会有等待队列。
1 2 3 4 5 6 7 8 9 10 struct eventpoll { spin_lock_t lock; struct mutex mtx ; wait_queue_head_t wq; wait_queue_head_t poll_wait; struct list_head rdllist ; struct rb_root rbr ; struct epitem *ovflist ; }
总结:
epoll_create创建额外的文件描述符,来唯一标识内核中的这个内核事件表(eventpoll对象)
creat
epoll_ctl:注册监控事件
创建epoll对象后,可以用epoll_ctl添加或删除所要监听的socket
。以添加socket为例,如果通过epoll_ctl添加sock1、sock2和sock3的监视,内核会将eventpoll添加到这三个socket的等待队列中。
ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
epoll的事件注册函数,即注册要监听的事件类型。
第一个参数是epoll_create()的返回值,
第二个参数表示动作,用三个宏来表示: EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd; 第三个参数是需要监听的fd,
第四个参数是告诉内核需要监听什么事
epoll_wait:事件等待,返回就绪事件
当socket收到数据后,中断程序会给eventpoll的“就绪列表”添加socket引用。如果是sock2和sock3收到数据后,中断程序让rdlist引用这两个socket,而不是像select轮询所有的sock。eventpoll对象相当于是socket和进程之间的中介,socket的数据接收并不直接影响进程,而是通过改变eventpoll的就绪列表来改变进程状态。当程序执行到epoll_wait时,如果rdlist已经引用了socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程。当socket接收到数据,中断程序一方面修改rdlist,另一方面唤醒eventpoll等待队列中的进程,进程A再次进入运行状态(如下图)。也因为rdlist的存在,进程A可以知道哪些socket发生了变化。
wait
事件类型
epoll的两种模式ET和LT模式
水平触发LT: 高电平代表1
只要缓冲区中有数据, 就一直通知
边缘触发ET: 电平有变化就代表1
缓冲区中有数据只会通知一次,
之后再有数据才会通知.(若是读数据的时候没有读完, 则剩余的数据不会再通知,
直到有新的数据到来)
epoll默认是水平触发LT
,在需要高性能的场景下,可以改成边缘ET非阻塞
方式来提高效率。
ET模式由于只通知一次, 所以在读的时候要循环读, 直到读完,
但是当读完之后read就会阻塞,
所以应该将该文件描述符设置为非阻塞模式(fcntl函数)。如果不循环读,读完的话还好,没读完的话数据就会留在读缓冲区
,影响数据的接收和判断。read函数在非阻塞模式下读的时候,
若返回-1, 且errno为EAGAIN, 则表示当前资源不可用,
也就是说缓冲区无数据(缓冲区的数据已经读完了);
或者当read返回的读到的数据长度小于
请求的数据长度时,就可以确定此时缓冲区中已没有数据可读了,也就可以认为此时读事件已处理完成。
程序框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 关于epoll_wait返回值的一个简单测试 void test (int epollfd) { struct epoll_event events [MAX_EVENT_NUMBER ]; int number; while (1 ) { number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1 ); printf ("number : %2d\n\n" , number); for (i = 0 ; i < number; i++) { sockfd = events[i].data.fd; if (sockfd == listenfd) { } else if (events[i].events & EPOLLIN) { } else if (events[i].events & EPOLLOUT) { } else { } } } }
使用例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 #include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> #include <sys/epoll.h> #include <pthread.h> #include <fcntl.h> #define MAXLNE 4096 #define EPOLL_SIZE 1024 int main () { int listenfd, connfd, n; struct sockaddr_in servaddr ; char buff[MAXLNE]; if ((listenfd = socket(AF_INET, SOCK_STREAM, 0 )) == -1 ) { printf ("create socket error: %s(errno: %d)\n" , strerror(errno), errno); return 0 ; } memset (&servaddr, 0 , sizeof (servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(10000 ); if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof (servaddr)) == -1 ) { printf ("bind socket error: %s(errno: %d)\n" , strerror(errno), errno); return 0 ; } if (listen(listenfd, 10 ) == -1 ) { printf ("listen socket error: %s(errno: %d)\n" , strerror(errno), errno); return 0 ; } int epfd = epoll_create(1 ); struct epoll_event events [EPOLL_SIZE ] = {0 }; struct epoll_event ev ; ev.events = EPOLLIN; ev.data.fd = listenfd; epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); fcntl(listenfd, F_SETFL, fcntl(listenfd, F_GETFD, 0 )| O_NONBLOCK); while (1 ) { int nready = epoll_wait(epfd, events, EPOLL_SIZE, 5 ); if (nready == -1 ) { continue ; } for (int i = 0 ; i < nready; i++) { int clientfd = events[i].data.fd; if (clientfd == listenfd) { struct sockaddr_in client ; socklen_t len = sizeof (client); if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1 ) { printf ("accept socket error: %s(errno: %d)\n" , strerror(errno), errno); return 0 ; } printf ("accept Success fd= %d\n" , connfd); ev.events = EPOLLIN; ev.data.fd = connfd; epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev); fcntl(connfd, F_SETFL,fcntl(connfd, F_GETFD, 0 )| O_NONBLOCK); send(connfd,"Welcome to My server" ,21 ,0 ); } else if (events[i].events & EPOLLIN) { n = recv(clientfd, buff, MAXLNE, 0 ); if (n > 0 ) { buff[n] = '\0' ; printf ("recv msg from client: %s\n" , buff); send(clientfd, buff, n, 0 ); } else if (n == 0 ) { ev.events = EPOLLIN; ev.data.fd = clientfd; epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &ev); close(clientfd); } } } } return 0 ; }
epoll-reactor模型原理
reactor
reactor是一种高并发服务器模型,是一种框架,一个概念,所以reactor没有一个固定的代码,可以有很多变种.
reactor中的IO使用的是select,poll,epoll这些IO多路复用,使用IO多路复用系统不必创建维护大量线程,只使用一个线程、一个选择器就可同时处理成千上万连接,大大减少了系统开销。
I/O 复用结合线程池,这就是 Reactor 模式基本设计思想,如下图:
reactor
Reactor
模式,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。
服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor
模式也叫 Dispatcher 模式。
即 I/O 多了复用统一监听事件,收到事件后分发(Dispatch
给某进程),是编写高性能网络服务器的必备技术之一。
Reactor 模式中有 2 个关键组成:
1)Reactor:Reactor
在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO
事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
2)Handlers:处理程序执行 I/O
事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor
通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。
根据 Reactor 的数量和处理资源池线程的数量不同,有 3
种典型的实现:
1)单 Reactor 单线程;
2)单 Reactor 多线程;
3)主从 Reactor 多线程。