Linux 信号
什么是信号
linux系统中无时无该产生着信号.信号在linux上又称软件中断 ,用来通知进程发生了异步事件 。在软件层次上是对中断机制的一种模拟,在原理上,一个进程收到一个信号与处理器收到一个中断请求可以说是一样的。信号是进程间通信机制中唯一的异步通信机制 ,一个进程不必通过任何操作来等待信号的到达,事实上,进程也不知道信号到底什么时候到达。进程之间可以互相通过系统调用kill发送软中断信号。内核也可以因为内部事件而给进程发送信号,通知进程发生了某个事件。信号机制除了基本通知功能外,还可以传递附加信息。
发送信号处理信号
linux下通过kill函数由一个进程向另一个进程发送信号
1 2 3 #include <sys/types.h> #include <signal.h> int kill (pid_d pid,int sig)
SIG_DFL 使用信号的默认处理方式。SIG_IGN 表示忽略目标信号
中断系统调用:
当程序(处于阻塞状态)执行系统调用时收到信号,且已经为信号设置了信号处理函数,则默认情况下系统调用将被中断 。可以使用sigaction
函数设置标志以自动重启被信号中断的系统调用。对于LINUX来说:如果默认行为是暂停进程的信号(SIGSTOP,SIGTTIN)就算我们不设置信号处理函数,这类信号也可以中断模型系统调用例如:(connect,epoll_wait)
信号集
linux使用数据结构sigset_t来表示一组信号,本质上是一个长整型数组,数组的每个元素的每个位表示一个信号(位图)
设置进程信号掩码以后,被屏蔽的信号不会被进程接收。
统一事件源
为啥要统一事件源:
信号处理是一种异步的事件:信号处理函数和程序主循环是两条不同的执行路线
为了避免一些竞态条件,信号处理期间,系统不会再次触发它(借助进程信号掩码的机制,进程即使多次收到同一个被挂起的信号,sigpending函数也只能反映一次,因此在使用sigprocmask使能挂起信号的时候信号处理函数只能触发一次)
所以说如果信号处理的时间过久,就相当于该类信号被屏蔽了很久,因此我们希望处理时间越短越好(有点类似于中断处理函数不能太长)
解决方法是:信号处理被触发时通知主循环执行对应处理的逻辑代码,信号处理函数往管道端写入信号值,主循环从管道端读出该信号值。
什么是统一事件源:
使用IO复用系统调用来监听管道描述符上的可读事件。这样信号事件就可以和其它的I/O事件一样被处理,即统一事件源。(指的是信号处理和其它I/O事件处理触发来源统一)
简单实现
部分较为重要的函数:
epoll_event 是 Linux 中 epoll 事件的结构体,定义在
<sys/epoll.h> 头文件中。它包含以下字段:
1 2 3 4 5 struct epoll_event { uint32_t events; epoll_data_t data; };
使用 epoll 的基本步骤如下:
创建一个 epoll
实例:int epoll_fd = epoll_create(MAX_EVENT_NUMBER);
向 epoll
实例中添加关注的事件:epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);
等待事件发生:int event_count = epoll_wait(epoll_fd, events, MAX_EVENT_NUMBER, timeout);
处理发生的事件:遍历 events
数组,处理每个事件。
在这个过程中,events 数组就用于存储发生的 epoll
事件,每个元素都是一个 epoll_event 结构体。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <signal.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 static int pipefd[2 ];int setnonblocking ( int fd ) { int old_option = fcntl ( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl ( fd, F_SETFL, new_option ); return old_option; } void addfd ( int epollfd, int fd ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl ( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking ( fd ); } void sig_handler ( int sig ) { int save_errno = errno; int msg = sig; send ( pipefd[1 ], ( char * )&msg, 1 , 0 ); errno = save_errno; } void addsig ( int sig ) { struct sigaction sa; memset ( &sa, '\0' , sizeof ( sa ) ); sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; sigfillset ( &sa.sa_mask ); assert ( sigaction ( sig, &sa, NULL ) != -1 ); } int main ( int argc, char * argv[] ) { if ( argc <= 2 ) { printf ( "usage: %s ip_address port_number\n" , basename ( argv[0 ] ) ); return 1 ; } const char * ip = argv[1 ]; int port = atoi ( argv[2 ] ); int ret = 0 ; struct sockaddr_in address; bzero ( &address, sizeof ( address ) ); address.sin_family = AF_INET; inet_pton ( AF_INET, ip, &address.sin_addr ); address.sin_port = htons ( port ); int listenfd = socket ( PF_INET, SOCK_STREAM, 0 ); assert ( listenfd >= 0 ); ret = bind ( listenfd, ( struct sockaddr* )&address, sizeof ( address ) ); if ( ret == -1 ) { printf ( "errno is %d\n" , errno ); return 1 ; } ret = listen ( listenfd, 5 ); assert ( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd = epoll_create ( 5 ); assert ( epollfd != -1 ); addfd ( epollfd, listenfd ); ret = socketpair ( PF_UNIX, SOCK_STREAM, 0 , pipefd ); assert ( ret != -1 ); setnonblocking ( pipefd[1 ] ); addfd ( epollfd, pipefd[0 ] ); addsig ( SIGHUP ); addsig ( SIGCHLD ); addsig ( SIGTERM ); addsig ( SIGINT ); bool stop_server = false ; while ( !stop_server ) { int number = epoll_wait ( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ( number < 0 ) && ( errno != EINTR ) ) { printf ( "epoll failure\n" ); break ; } for ( int i = 0 ; i < number; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof ( client_address ); int connfd = accept ( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd ( epollfd, connfd ); } 。 else if ( ( sockfd == pipefd[0 ] ) && ( events[i].events & EPOLLIN ) ) { int sig; char signals[1024 ]; ret = recv ( pipefd[0 ], signals, sizeof ( signals ), 0 ); if ( ret == -1 ) { continue ; } else if ( ret == 0 ) { continue ; } else { for ( int i = 0 ; i < ret; ++i ) { switch ( signals[i] ) { case SIGCHLD: case SIGHUP: { continue ; } case SIGTERM: case SIGINT: { stop_server = true ; } } } } } else { } } } printf ( "close fds\n" ); close ( listenfd ); close ( pipefd[1 ] ); close ( pipefd[0 ] ); return 0 ; }
I/O复用相关操作详见之前写的这章博客的记载:参考博客
网络编程相关信号
主要介绍在网络编程中几个密切相关的函数:SIGUP,SIGPIPE,SIGURG。
先来了解两个概念:进程组和会话。
进程组
进程组就是一系列相互关联的进程集合,系统中的每一个进程也必须从属于某一个进程组;每个进程组中都会有一个唯一的
ID(process group id),简称 PGID;PGID 一般等同于进程组的创建进程的
Process ID,而这个进进程一般也会被称为进程组先导(process group
leader),同一进程组中除了进程组先导外的其他进程都是其子进程;我们可以一次性发送一个信号量给同一进程组中的所有进程。
会话
会话(session)是一个若干进程组的集合,同样的,系统中每一个进程组也都必须从属于某一个会话;一个会话只拥有最多一个控制终端(也可以没有),该终端为会话中所有进程组中的进程所共用。一个会话中前台进程组只会有一个,只有其中的进程才可以和控制终端进行交互;除了前台进程组外的进程组,都是后台进程组;和进程组先导类似,会话中也有会话先导(session
leader)的概念,用来表示建立起到控制终端连接的进程。在拥有控制终端的会话中,session
leader 也被称为控制进程(controlling
process),一般来说控制进程也就是登入系统的 shell 进程(login shell);
SIGHUP
当挂起的进程控制终端时,触发SIGHUP 信号。SIGHUP
信号在用户终端连接(正常或非正常)结束 时发出,
通常是在终端的控制进程结束时, 通知同一session内的各个作业,
这时它们与控制终端不再关联.
系统对SIGHUP信号的默认处理是终止收到该信号的进程 。所以若程序中没有捕捉该信号,当收到该信号时,进程就会退出。
SIGHUP会在以下3种情况下被发送给相应的进程:
1、终端关闭时,该信号被发送到session首进程以及作为job提交的进程(即用
& 符号提交的进程);
2、session首进程退出时,该信号被发送到该session中的前台进程组中的每一个进程;
3、若父进程退出导致进程组成为孤儿进程组,且该进程组中有进程处于停止状态(收到SIGSTOP或SIGTSTP信号),该信号会被发送到该进程组中的每一个进程。
例如:在我们登录Linux时,系统会分配给登录用户一个终端(Session)。在这个终端运行的所有程序,包括前台进程组和后台进程组,一般都属于这个
Session。当用户退出Linux登录时,前台进程组和后台有对终端输出的进程将会收到SIGHUP信号。这个信号的默认操作为终止进程,因此前台进
程组和后台有终端输出的进程就会中止。
此外,对于与终端脱离关系的守护进程,这个信号用于通知它重新读取配置文件。
比如Linux超级守护进程——xinetd.
当xinetd程序在接收到SIGHUP信号之后调用hard_reconfig函数,它将循环读取/etc/xinetd.d/目录下的每个子配置文件,并检测其变化。如果某个正在运行的子服务的配置文件被修改以停止服务,则xinetd主进程讲给该子服务进程发送SIGTERM信号来结束它。如果某个子服务的配置文件被修改以开启服务,则xinetd将创建新的socket并将其绑定到该服务对应的端口上。
SIGPIPE
在网络编程中,SIGPIPE这个信号是很常见的。当往一个写端关闭 的管道或socket连接中连续写入数据时会引发SIGPIPE信号,引发SIGPIPE信号的写操作将设置errno为EPIPE。
在TCP通信中,当通信的双方中的一方close一个连接时,若另一方接着发数据,根据TCP协议的规定,会收到一个RST响应报文,若再往这个服务器发送数据时,系统会发出一个SIGPIPE信号给进程,告诉进程这个连接已经断开了,不能再写入数据。
此外,因为SIGPIPE信号的默认行为是结束进程 ,而我们绝对不希望因为写操作的错误而导致程序退出,尤其是作为服务器程序来说就更恶劣了。所以我们应该对这种信号加以处理,在这里,介绍两种处理SIGPIPE信号的方式:
1 、给SIGPIPE设置SIG_IGN信号处理函数,忽略该信号:
1 signal (SIGPIPE, SIG_IGN);
前文说过,引发SIGPIPE信号的写操作将设置errno为EPIPE。所以,第二次往关闭的socket中写入数据时,
会返回-1, 同时errno置为EPIPE.
这样,便能知道对端已经关闭,然后进行相应处理,而不会导致整个进程退出.
2、使用send函数的MSG_NOSIGNAL 标志来禁止写操作触发SIGPIPE信号。
1 send (sockfd , buf , size , MSG_NOSIGNAL);
同样,我们可以根据send函数反馈的errno来判断socket的读端是否已经关闭。此外,我们也可以通过IO复用函数来检测管道和socket连接的读端是否已经关闭。以POLL为例,当socket连接被对方关闭时,socket上的POLLRDHUP事件将被触发。
SIGURG
带外数据
带外数据用于迅速告知对方本端发生的重要的事件。它比普通的数据(带内数据)拥有更高的优先级,不论发送缓冲区中是否有排队等待发送的数据,它总是被立即发送。带外数据的传输可以使用一条独立的传输层连接,也可以映射到传输普通数据的连接中。
实际应用中,带外数据是使用很少见UDP没有没有实现带外数据传输,TCP也没有真正的带外数据。不过TCP利用头部的紧急指针标志和紧急指针,为应用程序提供了一种紧急方式,含义和带外数据类似。TCP的紧急方式利用传输普通数据的连接来传输紧急数据。
SIGURG信号的作用
内核通知应用程序带外数据到达的方式有两种:一种就是利用I/O复用技术的系统调用(如select)在接受到带外数据时将返回,并向应用程序报告socket上的异常事件。另一种方法就是使用SIGURG信号。
SIGURG信号检测带外数据是否到达的例程
sigaction :结构体定义如下:
1 2 3 4 5 6 7 struct sigaction { void (*sa_handler)(int ); void (*sa_sigaction)(int , siginfo_t *, void *); sigset_t sa_mask; int sa_flags; void (*sa_restorer)(void ); };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <signal.h> #include <fcntl.h> #define BUF_SIZE 1024 static int connfd;void sig_urg ( int sig ) { int save_errno = errno; char buffer[ BUF_SIZE ]; memset ( buffer, '\0' , BUF_SIZE ); int ret = recv ( connfd, buffer, BUF_SIZE-1 , MSG_OOB ); printf ( "got %d bytes of oob data '%s'\n" , ret, buffer ); errno = save_errno; } void addsig ( int sig, void ( *sig_handler )( int ) ) { struct sigaction sa; memset ( &sa, '\0' , sizeof ( sa ) ); sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; sigfillset ( &sa.sa_mask ); assert ( sigaction ( sig, &sa, NULL ) != -1 ); } int main ( int argc, char * argv[] ) { if ( argc <= 2 ) { printf ( "usage: %s ip_address port_number\n" , basename ( argv[0 ] ) ); return 1 ; } const char * ip = argv[1 ]; int port = atoi ( argv[2 ] ); struct sockaddr_in address; bzero ( &address, sizeof ( address ) ); address.sin_family = AF_INET; inet_pton ( AF_INET, ip, &address.sin_addr ); address.sin_port = htons ( port ); int sock = socket ( PF_INET, SOCK_STREAM, 0 ); assert ( sock >= 0 ); int ret = bind ( sock, ( struct sockaddr* )&address, sizeof ( address ) ); assert ( ret != -1 ); ret = listen ( sock, 5 ); assert ( ret != -1 ); struct sockaddr_in client; socklen_t client_addrlength = sizeof ( client ); connfd = accept ( sock, ( struct sockaddr* )&client, &client_addrlength ); if ( connfd < 0 ) { printf ( "errno is: %d\n" , errno ); } else { addsig ( SIGURG, sig_urg ); fcntl ( connfd, F_SETOWN, getpid () ); char buffer[ BUF_SIZE ]; while ( 1 ) { memset ( buffer, '\0' , BUF_SIZE ); ret = recv ( connfd, buffer, BUF_SIZE-1 , 0 ); if ( ret <= 0 ) { break ; } printf ( "got %d bytes of normal data '%s'\n" , ret, buffer ); } close ( connfd ); } close ( sock ); return 0 ; }
定时器
socket
SO_RCVTIMEO和SO_SNDTIMEO
例子:
1 2 3 4 5 6 7 int sockfd = socket ( PF_INET, SOCK_STREAM, 0 );assert ( sockfd >= 0 );struct timeval timeout; timeout.tv_sec = time; timeout.tv_usec = 0 ; socklen_t len = sizeof ( timeout );ret = setsockopt ( sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len );
这段代码是在设置一个套接字(socket)选项,具体来说,是设置发送操作的超时时间。
struct timeval timeout;:声明了一个timeval结构体变量,用于表示时间值。
timeout.tv_sec = time;:将timeout结构体的tv_sec字段设置为time变量的值。这个字段表示超时的秒数。
timeout.tv_usec = 0;:将timeout结构体的tv_usec字段设置为0。这个字段表示超时的微秒数。
socklen_t len = sizeof(timeout);:声明了一个socklen_t类型的变量len,并将其设置为timeout结构体的大小,即结构体中的字节数。
ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len);:调用setsockopt函数设置套接字选项。具体来说,它将发送超时时间应用到指定的套接字上。
设置connect超时时间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdlib.h> #include <assert.h> #include <stdio.h> #include <errno.h> #include <fcntl.h> #include <unistd.h> #include <string.h> int timeout_connect ( const char * ip, int port, int time ) { int ret = 0 ; struct sockaddr_in address; bzero ( &address, sizeof ( address ) ); address.sin_family = AF_INET; inet_pton ( AF_INET, ip, &address.sin_addr ); address.sin_port = htons ( port ); int sockfd = socket ( PF_INET, SOCK_STREAM, 0 ); assert ( sockfd >= 0 ); struct timeval timeout; timeout.tv_sec = time; timeout.tv_usec = 0 ; socklen_t len = sizeof ( timeout ); ret = setsockopt ( sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len ); assert ( ret != -1 ); ret = connect ( sockfd, ( struct sockaddr* )&address, sizeof ( address ) ); if ( ret == -1 ) { if ( errno == EINPROGRESS ) { printf ( "connecting timeout\n" ); return -1 ; } printf ( "error occur when connecting to server\n" ); return -1 ; } return sockfd; } int main ( int argc, char * argv[] ) { if ( argc <= 2 ) { printf ( "usage: %s ip_address port_number\n" , basename ( argv[0 ] ) ); return 1 ; } const char * ip = argv[1 ]; int port = atoi ( argv[2 ] ); int sockfd = timeout_connect ( ip, port, 10 ); if ( sockfd < 0 ) { return 1 ; } return 0 ; }
SIGALEM信号
定时信号SIGALRM的用途:
在编程的过程中,很多时候我们需要为程序设置一个闹钟,然后到了闹钟设定的时刻然后再去采取相关的操作。比如进行socket编程时,如果客户端长时间没有与服务器进行交互,需要服务器在一定时间之后主动关闭socket连接。在这种场景下,就可以在服务器收到客户端的socket的连接时,设置一个定时信号,然后在定时信号到来时,关闭掉socket连接即可。
通过alarm函数触发定时信号:
1 2 3 4 5 6 7 8 9 10 11 #include <unistd.h> unsigned int alarm (unsigned int seconds)
闹钟信号只会触发一次,若想循环触发,可以在闹钟到时后重新通过alarm函数触发
可以重新设置闹钟信号,即在上一个闹钟到时之前,通过alarm函数重新设定响铃时刻或取消闹钟
基于升序链表的定时器
定时器通常至少包含两个成员:超时时间+回调函数
用链表作为容器来串联所有的定时器。
下面代码实现了简单的升序定时器链表,将其中的定时器按照超时时间升序排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 #ifndef LST_TIMER #define LST_TIMER #include <time.h> #define BUFFER_SIZE 64 class util_timer ;struct client_data { sockaddr_in address; int sockfd; char buf[ BUFFER_SIZE ]; util_timer* timer; }; class util_timer { public : util_timer () : prev ( NULL ), next ( NULL ){} public : time_t expire; void (*cb_func)( client_data* ); client_data* user_data; util_timer* prev; util_timer* next; }; class sort_timer_lst { public : sort_timer_lst () : head ( NULL ), tail ( NULL ) {} ~sort_timer_lst () { util_timer* tmp = head; while ( tmp ) { head = tmp->next; delete tmp; tmp = head; } } void add_timer ( util_timer* timer ) { if ( !timer ) { return ; } if ( !head ) { head = tail = timer; return ; } if ( timer->expire < head->expire ) { timer->next = head; head->prev = timer; head = timer; return ; } add_timer ( timer, head ); } void adjust_timer ( util_timer* timer ) { if ( !timer ) { return ; } util_timer* tmp = timer->next; if ( !tmp || ( timer->expire < tmp->expire ) ) { return ; } if ( timer == head ) { head = head->next; head->prev = NULL ; timer->next = NULL ; add_timer ( timer, head ); } else { timer->prev->next = timer->next; timer->next->prev = timer->prev; add_timer ( timer, timer->next ); } } void del_timer ( util_timer* timer ) { if ( !timer ) { return ; } if ( ( timer == head ) && ( timer == tail ) ) { delete timer; head = NULL ; tail = NULL ; return ; } if ( timer == head ) { head = head->next; head->prev = NULL ; delete timer; return ; } if ( timer == tail ) { tail = tail->prev; tail->next = NULL ; delete timer; return ; } timer->prev->next = timer->next; timer->next->prev = timer->prev; delete timer; } void tick () { if ( !head ) { return ; } printf ( "timer tick\n" ); time_t cur = time ( NULL ); util_timer* tmp = head; while ( tmp ) { if ( cur < tmp->expire ) { break ; } tmp->cb_func ( tmp->user_data ); head = tmp->next; if ( head ) { head->prev = NULL ; } delete tmp; tmp = head; } } private : void add_timer ( util_timer* timer, util_timer* lst_head ) { util_timer* prev = lst_head; util_timer* tmp = prev->next; while ( tmp ) { if ( timer->expire < tmp->expire ) { prev->next = timer; timer->next = tmp; tmp->prev = timer; timer->prev = prev; break ; } prev = tmp; tmp = tmp->next; } if ( !tmp ) { prev->next = timer; timer->prev = prev; timer->next = NULL ; tail = timer; } } private : util_timer* head; util_timer* tail; }; #endif
处理非活动连接
升序定时器链表的一个实际应用:处理非活动连接
服务器通常需要定期处理非活动连接:给客户端发一个重连接请求或者关闭该连接或其它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <signal.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #include "lst_timer.h" #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define TIMESLOT 5 static int pipefd[2 ];static sort_timer_lst timer_lst;static int epollfd = 0 ;int setnonblocking ( int fd ) { int old_option = fcntl ( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl ( fd, F_SETFL, new_option ); return old_option; } void addfd ( int epollfd, int fd ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl ( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking ( fd ); } void sig_handler ( int sig ) { int save_errno = errno; int msg = sig; send ( pipefd[1 ], ( char * )&msg, 1 , 0 ); errno = save_errno; } void addsig ( int sig ) { struct sigaction sa; memset ( &sa, '\0' , sizeof ( sa ) ); sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; sigfillset ( &sa.sa_mask ); assert ( sigaction ( sig, &sa, NULL ) != -1 ); } void timer_handler () { timer_lst.tick (); alarm ( TIMESLOT ); } void cb_func ( client_data* user_data ) { epoll_ctl ( epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0 ); assert ( user_data ); close ( user_data->sockfd ); printf ( "close fd %d\n" , user_data->sockfd ); } int main ( int argc, char * argv[] ) { if ( argc <= 2 ) { printf ( "usage: %s ip_address port_number\n" , basename ( argv[0 ] ) ); return 1 ; } const char * ip = argv[1 ]; int port = atoi ( argv[2 ] ); int ret = 0 ; struct sockaddr_in address; bzero ( &address, sizeof ( address ) ); address.sin_family = AF_INET; inet_pton ( AF_INET, ip, &address.sin_addr ); address.sin_port = htons ( port ); int listenfd = socket ( PF_INET, SOCK_STREAM, 0 ); assert ( listenfd >= 0 ); ret = bind ( listenfd, ( struct sockaddr* )&address, sizeof ( address ) ); assert ( ret != -1 ); ret = listen ( listenfd, 5 ); assert ( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd = epoll_create ( 5 ); assert ( epollfd != -1 ); addfd ( epollfd, listenfd ); ret = socketpair ( PF_UNIX, SOCK_STREAM, 0 , pipefd ); assert ( ret != -1 ); setnonblocking ( pipefd[1 ] ); addfd ( epollfd, pipefd[0 ] ); addsig ( SIGALRM ); addsig ( SIGTERM ); bool stop_server = false ; client_data* users = new client_data[FD_LIMIT]; bool timeout = false ; alarm ( TIMESLOT ); while ( !stop_server ) { int number = epoll_wait ( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ( number < 0 ) && ( errno != EINTR ) ) { printf ( "epoll failure\n" ); break ; } for ( int i = 0 ; i < number; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof ( client_address ); int connfd = accept ( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd ( epollfd, connfd ); users[connfd].address = client_address; users[connfd].sockfd = connfd; util_timer* timer = new util_timer; timer->user_data = &users[connfd]; timer->cb_func = cb_func; time_t cur = time ( NULL ); timer->expire = cur + 3 * TIMESLOT; users[connfd].timer = timer; timer_lst.add_timer ( timer ); } else if ( ( sockfd == pipefd[0 ] ) && ( events[i].events & EPOLLIN ) ) { int sig; char signals[1024 ]; ret = recv ( pipefd[0 ], signals, sizeof ( signals ), 0 ); if ( ret == -1 ) { continue ; } else if ( ret == 0 ) { continue ; } else { for ( int i = 0 ; i < ret; ++i ) { switch ( signals[i] ) { case SIGALRM: { timeout = true ; break ; } case SIGTERM: { stop_server = true ; } } } } } else if ( events[i].events & EPOLLIN ) { memset ( users[sockfd].buf, '\0' , BUFFER_SIZE ); ret = recv ( sockfd, users[sockfd].buf, BUFFER_SIZE-1 , 0 ); printf ( "get %d bytes of client data %s from %d\n" , ret, users[sockfd].buf, sockfd ); util_timer* timer = users[sockfd].timer; if ( ret < 0 ) { if ( errno != EAGAIN ) { cb_func ( &users[sockfd] ); if ( timer ) { timer_lst.del_timer ( timer ); } } } else if ( ret == 0 ) { cb_func ( &users[sockfd] ); if ( timer ) { timer_lst.del_timer ( timer ); } } else { if ( timer ) { time_t cur = time ( NULL ); timer->expire = cur + 3 * TIMESLOT; printf ( "adjust timer once\n" ); timer_lst.adjust_timer ( timer ); } } } else { } } if ( timeout ) { timer_handler (); timeout = false ; } } close ( listenfd ); close ( pipefd[1 ] ); close ( pipefd[0 ] ); delete [] users; return 0 ; }
I/O复用系统调用的超时参数
利用I.O复用的超时参数,进行超时设定,这期间也可以处理其他事情。在主循环中,一定要每次都更新超时参数,因为I/O复用系统调用可能在超时时间到期之前就返回(有I/O时间发生)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 #define TIMEOUT 5000 int timeout = TIMEOUT;time_t start = time ( NULL );time_t end = time ( NULL );while ( 1 ){ printf ( "the timeout is now %d mill-seconds\n" , timeout ); start = time ( NULL ); int number = epoll_wait ( epollfd, events, MAX_EVENT_NUMBER, timeout ); if ( ( number < 0 ) && ( errno != EINTR ) ) { printf ( "epoll failure\n" ); break ; } if ( number == 0 ) { timeout = TIMEOUT; continue ; } end = time ( NULL ); timeout -= ( end - start ) * 1000 ; if ( timeout <= 0 ) { timeout = TIMEOUT; } }
高性能定时器
时间轮
基于排序链表的定时器存在一个问题:添加定时器的效率偏低
简单的时间轮
时间轮上有多个槽(slot),指针以恒定时间转动,每转动一步指向下一个槽,每次转动位一个滴答(tick)。每个槽指向一个定时器链表
至于在每转到一个槽时都要检查是否到达运行时间,可以这样理解:时间轮进行散列的方法就是取余运算,假设每个槽的间隔为a
秒,共有n个槽,当前转到了第cur个槽,那么一个定时在 t
s以后运行的定时器就要放在第( cur + t /si ) % n个槽,并在运行t /
n圈后到达该槽时才会运行。因此一个槽中的定时器运行的时间是相差i(i >=
0)个周期的。
所以时间轮简单来说就是散列 +
链表,这样与使用升序链表相比,散列可以直接定位要插入的槽所在位置,可以提高添加定时器的效率,由O(N)到了O(1)。
想要提高定时精度就要使得a足够小,想要提高执行效率就要使得n足够大。复杂的时间轮可以有多个轮子,每个轮子有不同的粒度,类似齿轮
简单的时间轮实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 #ifndef TIME_WHEEL_TIMER #define TIME_WHEEL_TIMER #include <time.h> #include <netinet/in.h> #include <stdio.h> #define BUFFER_SIZE 64 class tw_timer ;struct client_data { sockaddr_in address; int sockfd; char buf[ BUFFER_SIZE ]; tw_timer* timer; }; class tw_timer { public : tw_timer ( int rot, int ts ) : next ( NULL ), prev ( NULL ), rotation ( rot ), time_slot ( ts ){} public : int rotation; int time_slot; void (*cb_func)( client_data* ); client_data* user_data; tw_timer* next; tw_timer* prev; }; class time_wheel { public : time_wheel () : cur_slot ( 0 ) { for ( int i = 0 ; i < N; ++i ) { slots[i] = NULL ; } } ~time_wheel () { for ( int i = 0 ; i < N; ++i ) { tw_timer* tmp = slots[i]; while ( tmp ) { slots[i] = tmp->next; delete tmp; tmp = slots[i]; } } } tw_timer* add_timer ( int timeout ) { if ( timeout < 0 ) { return NULL ; } int ticks = 0 ; if ( timeout < TI ) { ticks = 1 ; } else { ticks = timeout / TI; } int rotation = ticks / N; int ts = ( cur_slot + ( ticks % N ) ) % N; tw_timer* timer = new tw_timer ( rotation, ts ); if ( !slots[ts] ) { printf ( "add timer, rotation is %d, ts is %d, cur_slot is %d\n" , rotation, ts, cur_slot ); slots[ts] = timer; } else { timer->next = slots[ts]; slots[ts]->prev = timer; slots[ts] = timer; } return timer; } void del_timer ( tw_timer* timer ) { if ( !timer ) { return ; } int ts = timer->time_slot; if ( timer == slots[ts] ) { slots[ts] = slots[ts]->next; if ( slots[ts] ) { slots[ts]->prev = NULL ; } delete timer; } else { timer->prev->next = timer->next; if ( timer->next ) { timer->next->prev = timer->prev; } delete timer; } } void tick () { tw_timer* tmp = slots[cur_slot]; printf ( "current slot is %d\n" , cur_slot ); while ( tmp ) { printf ( "tick the timer once\n" ); if ( tmp->rotation > 0 ) { tmp->rotation--; tmp = tmp->next; } else { tmp->cb_func ( tmp->user_data ); if ( tmp == slots[cur_slot] ) { printf ( "delete header in cur_slot\n" ); slots[cur_slot] = tmp->next; delete tmp; if ( slots[cur_slot] ) { slots[cur_slot]->prev = NULL ; } tmp = slots[cur_slot]; } else { tmp->prev->next = tmp->next; if ( tmp->next ) { tmp->next->prev = tmp->prev; } tw_timer* tmp2 = tmp->next; delete tmp; tmp = tmp2; } } } cur_slot = ++cur_slot % N; } private : static const int N = 60 ; static const int TI = 1 ; tw_timer* slots[N]; int cur_slot; }; #endif
对时间轮而言,添加一个定时器的复杂度是O(1),删除一个定时器的复杂度是O(1)。执行定时器的时间复杂度是O(n).当使用很多轮子时执行复杂度接近O(1)
时间堆
前面讨论的方案都是以固定的频率调用心搏函数tick,并依次检测到期的定时器。然后执行定时器上的回调函数。
另一种思路是,将所有定时器中最小的定时器的超时值作为心搏间隔。这样一旦tick被调用,最小的定时器必然到期。我们可以找出剩余最小的超时值设为心搏间隔。使用小根堆比较容易实现这种定时方案
使用数组实现小根堆,实现时间堆代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 #ifndef intIME_HEAP #define intIME_HEAP #include <iostream> #include <netinet/in.h> #include <time.h> using std::exception;#define BUFFER_SIZE 64 class heap_timer ;struct client_data { sockaddr_in address; int sockfd; char buf[ BUFFER_SIZE ]; heap_timer* timer; }; class heap_timer { public : heap_timer ( int delay ) { expire = time ( NULL ) + delay; } public : time_t expire; void (*cb_func)( client_data* ); client_data* user_data; }; class time_heap { public : time_heap ( int cap ) throw ( std::exception ) : capacity ( cap ), cur_size ( 0 ) { array = new heap_timer* [capacity]; if ( ! array ) { throw std::exception (); } for ( int i = 0 ; i < capacity; ++i ) { array[i] = NULL ; } } time_heap ( heap_timer** init_array, int size, int capacity ) throw ( std::exception ) : cur_size ( size ), capacity ( capacity ) { if ( capacity < size ) { throw std::exception (); } array = new heap_timer* [capacity]; if ( ! array ) { throw std::exception (); } for ( int i = 0 ; i < capacity; ++i ) { array[i] = NULL ; } if ( size != 0 ) { for ( int i = 0 ; i < size; ++i ) { array[ i ] = init_array[ i ]; } for ( int i = (cur_size-1 )/2 ; i >=0 ; --i ) { percolate_down ( i ); } } } ~time_heap () { for ( int i = 0 ; i < cur_size; ++i ) { delete array[i]; } delete [] array; } public : void add_timer ( heap_timer* timer ) throw ( std::exception ) { if ( !timer ) { return ; } if ( cur_size >= capacity ) { resize (); } int hole = cur_size++; int parent = 0 ; for ( ; hole > 0 ; hole=parent ) { parent = (hole-1 )/2 ; if ( array[parent]->expire <= timer->expire ) { break ; } array[hole] = array[parent]; } array[hole] = timer; } void del_timer ( heap_timer* timer ) { if ( !timer ) { return ; } timer->cb_func = NULL ; } heap_timer* top () const { if ( empty () ) { return NULL ; } return array[0 ]; } void pop_timer () { if ( empty () ) { return ; } if ( array[0 ] ) { delete array[0 ]; array[0 ] = array[--cur_size]; percolate_down ( 0 ); } } void tick () { heap_timer* tmp = array[0 ]; time_t cur = time ( NULL ); while ( !empty () ) { if ( !tmp ) { break ; } if ( tmp->expire > cur ) { break ; } if ( array[0 ]->cb_func ) { array[0 ]->cb_func ( array[0 ]->user_data ); } pop_timer (); tmp = array[0 ]; } } bool empty () const { return cur_size == 0 ; } private : void percolate_down ( int hole ) { heap_timer* temp = array[hole]; int child = 0 ; for ( ; ((hole*2 +1 ) <= (cur_size-1 )); hole=child ) { child = hole*2 +1 ; if ( (child < (cur_size-1 )) && (array[child+1 ]->expire < array[child]->expire ) ) { ++child; } if ( array[child]->expire < temp->expire ) { array[hole] = array[child]; } else { break ; } } array[hole] = temp; } void resize () throw ( std::exception ) { heap_timer** temp = new heap_timer* [2 *capacity]; for ( int i = 0 ; i < 2 *capacity; ++i ) { temp[i] = NULL ; } if ( ! temp ) { throw std::exception (); } capacity = 2 *capacity; for ( int i = 0 ; i < cur_size; ++i ) { temp[i] = array[i]; } delete [] array; array = temp; } private : heap_timer** array; int capacity; int cur_size; }; #endif
对时间堆而言,添加一个定时器的复杂度是O(lgn),删除一个定时器的复杂度是O(1)