9. 定时器
网络程序需要处理定时事件,如定期检测一个客户连接的活动状态。服务器程序通常管理着众多定时事件,有效地组织这些定时事件,使其在预期的时间被触发且不影响服务器的主要逻辑,对于服务器的性能有至关重要的影响。为此,我们要将每个定时事件分别封装成定时器,并使用某种容器类数据结构,如链表、排序链表、时间轮,将所有定时器串联起来,以实现对定时事件的统一管理。本章主要讨论两种高效的管理定时器的容器:时间轮和时间堆。
定时指在一段时间后触发某段代码的机制,我们可以在这段代码中依次处理所有到期的定时器,即定时机制是定时器得以被处理的原动力。Linux提供三种定时方法:
- socket套接字选项
SO_RCVTIMEO
和SO_SNDTIMEO
;
SIGALRM
信号;
- I/O复用系统调用的超时参数。
1.socket 选项 SO_RCVTIMEO
和 SO_SNDTIMEO
SO_RCVTIMEO
设置 socket 接收数据超时时间。
SO_SNDTIMEO
设置 socket 发送数据超时时间。
这两个数据仅对与数据接收和发送相关的 socket 系统调用 send
、sendmsg
、recv
、recvmsg
、accept
和 connect
。

在程序中,我们根据上述系统调用的返回值以及 errno
来判断超时时间是否已到,进而决定是否开始处理定时任务。
2.使用 SO_SNDTIMEO
选项设置定时
setsockopt讲解
1
| int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
|
参数
- sockfd:
- 类型:int
- 解释:这是一个整型变量,表示要操作的套接字的文件描述符(file descriptor)。
- level:
- 类型:int
- 解释:这个参数指定了选项所属的协议层次,如SOL_SOCKET(套接字级别)、IPPROTO_IP(IP协议级别)等。
- optname:
- 类型:int
- 解释:这是选项的名称,比如SO_REUSEADDR、SO_TIMEOUT等,是套接字选项库中预定义的一个整数常量。
- optval:
- 类型:const void*
- 解释:这个参数指向一个包含选项值的内存区域。选项值的类型取决于特定的选项,可能是一个整数、字符数组、枚举值等。
- optlen:
- 类型:socklen_t
- 解释:这是一个长度类型的参数,表示optval所指向的内存区域大小。在调用前,应预先根据选项和可能的值设置其正确的大小。
返回值
- 成功:返回0。
- 失败:返回-1(SOCKET_ERROR),并设置errno来表示错误的原因。可能的错误代码包括EBADF(sock不是有效的文件描述词)、EFAULT(optval指向的内存并非有效的进程空间)、EINVAL(在调用setsockopt()时,optlen无效)、ENOPROTOOPT(指定的协议层不能识别选项)、ENOTSOCK(sock描述的不是套接字)等。
常用选项示例
- SO_RCVBUF和SO_SNDBUF:用于设置/读取发送缓冲区和接收缓冲区大小。选项值类型为int,指定新的缓冲区大小。对setsockopt和getsockopt有效。设置缓冲区大小只能在TCP连接建立之前进行。
- SO_REUSEADDR:用于复用socket地址(端口号)。选项值类型为int,0表示不能复用,1表示可以复用,默认值为0。对setsockopt和getsockopt有效。
- SO_KEEPALIVE:用于TCP socket检测/保持网络连接。选项值类型为int,0表示不能发送探测数据段,1表示可以发送,默认值为0。对setsockopt和getsockopt有效。
- TCP_NODELAY:用于禁止TCP协议的Nagle算法,使小数据包(小于最大数据段大小)立即发送,可能会降低TCP协议的效率。
setsockopt函数的作用是设置与指定的套接字关联的选项,从而影响套接字的行为。通过调用setsockopt函数,可以设置这些选项的值,以满足不同的网络编程需求。
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdlib.h> #include <assert.h> #include <stdio.h> #include <errno.h> #include <fcntl.h> #include <unistd.h> #include <string.h> #include <libgen.h> /* 超时连接函数 * 这个函数的作用是尝试与指定IP地址和端口的服务器建立连接, * 并设置一个连接超时时间。 */ int timeout_connect(const char *ip, int port, int time) { int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int sockfd = socket(PF_INET, SOCK_STREAM, 0); assert(sockfd >= 0); // SO_RCVTIMEO和SO_SNDTIMEO套接字选项对应的值类型为timeval,这和select函数的超时参数类型相同 struct timeval timeout; timeout.tv_sec = time; timeout.tv_usec = 0; socklen_t len = sizeof(timeout); ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len); assert(ret != -1); /* * 如果连接在指定时间内没有成功建立,connect() 函数将返回 -1, * 并且 errno 会被设置为 EINPROGRESS,表示连接超时。 */ printf("Attempting to connect...\n"); ret = connect(sockfd, (struct sockaddr *)&address, sizeof(address)); printf("Connect returned: %d\n", ret); if (ret == -1) { // 超时对应的错误号是EINPROGRESS,此时就可执行定时任务了 if (errno == EINPROGRESS) { printf("conencting timeout, process timeout logic\n"); return -1; } printf("error occur when connecting to server\n"); return -1; } return sockfd; } int main(int argc, char *argv[]) { if (argc != 3) { printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char *ip = argv[1]; int port = atoi(argv[2]); /* 超时时间 10s */ int sockfd = timeout_connect(ip, port, 10); if (sockfd < 0) { return 1; } return 0; }
|
编译:
1
| g++ -o client connect_timeout.cpp
|
运行:
1
| ./client 192.168.100.100 8080 /* 这是一个无效的IP地址 */
|
结果:

3.SIGALRM 信号
由alarm
和setitimer
函数设置的实时闹钟一旦超时,就会触发SIGALRM
信号,我们可以使用该信号的信号处理函数来处理定时任务。
1.基于升序链表的定时器
定时器通常至少要包含两个成员:一个超时时间(相对时间或绝对时间)和一个任务回调函数。有时还可能包含回调函数被执行时需要传入的参数,以及是否重启定时器等信息。如果使用链表作为容器来串联所有定时器,则每个定时器还要包含指向下一定时器的指针成员,如果链表是双向的,则每个定时器还需要包含指向前一个定时器的指针成员。
从执行效率来看,添加定时器的时间复杂度是O(n)
,删除定时器的时间复杂度是O(1)
,执行定时任务的时间复杂度平均是O(1)
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
| #ifndef LST_TIMER #include<time.h> #include<sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <signal.h> #include <fcntl.h> #include <libgen.h>
#define BUFFER_SIZE 64 class util_timer;
struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; util_timer* timer; };
class util_timer { public: util_timer():prev(NULL),next(NULL){} util_timer* prev; util_timer* next; time_t expire; void (*cb_func)(client_data *); client_data * user_data; };
class sort_timer_1st { public: sort_timer_1st():head(NULL),tail(NULL){} ~sort_timer_1st() { util_timer* tmp=head; while(tmp) { head=tmp->next; delete tmp; tmp=head; } } void add_timer(util_timer* timer) { if(!timer) return ; if(!head) { head=tail=timer; return; } if(timer->expire<head->expire) { timer->next=head; head->prev=timer; head=timer; return; } add_timer(timer,head); } void adjust(util_timer* timer) { if(!timer) { return; } util_timer* tmp=timer->next; if(!tmp||(timer->expire<tmp->expire)) { return; } if(timer==head) { head=head->next; head->prev=NULL; timer->next=NULL; add_timer(timer,head); } else { timer->prev->next=timer->next; timer->next->prev=timer->prev; add_timer(timer,timer->next); } } void del_timer(util_timer* timer) { if (!timer) { return; } if ((timer == head) && (timer == tail)) { delete timer; head = NULL; tail = NULL; return; } if (timer == head) { head = head->next; head->prev = NULL; delete timer; return; } if (timer == tail) { tail = tail->prev; tail->next = NULL; delete timer; return; } timer->prev->next = timer->next; timer->next->prev = timer->prev; delete timer; } void tick() { if (!head) { return; } printf("timer tick\n"); time_t cur = time(NULL); util_timer* tmp = head; while (tmp) { if (cur < tmp->expire) { break; } tmp->cb_func(tmp->user_data); head = tmp->next; if (head) { head->prev = NULL; } delete tmp; tmp = head; } } private: void add_timer(util_timer* timer,util_timer* lst_head) { util_timer* prev=lst_head; util_timer* tmp=prev->next; while(tmp) { if(timer->expire<tmp->expire) { prev->next=timer; timer->next=tmp; tmp->prev=timer; timer->prev=prev; break; } prev=tmp; tmp=tmp->next; } if(!tmp) { prev->next=timer; timer->prev=prev; timer->next=NULL; tail=timer; } } util_timer* head; util_timer* tail; };
#endif
|
核心函数tick相当于一个心博函数,每隔一段固定的时间就执行一次,以检测并处理到期的任务。
判断任务到期的依据是定时器党的expire值小于当前的系统时间。
2.处理非活动连接
现在考虑以上升序定时器链表的实际应用 ——处理非活动连接。服务器进程通常要定期处理非活动连接:给客户端发一个重连请求,或关闭该连接,或者其他。
Linux在内核中提供了对连接是否处于活动状态的定期检查机制,我们可通过socket选项KEEPALIVE
来激活它,不过这样会使得应用程序对连接的管理变得复杂。我们考虑在应用层实现类似KEEPALIVE
的机制,以管理所有长时间处于非活动状态的连接。
处理非活动连接:利用 alarm 函数周期性触发 SIGALRM 信号
如以下代码利用alarm
函数周期性地触发SIGALRM
信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务——关闭非活动的连接。
服务器端
TIMESLOT
定义了定时器的基本时间单位,用于设定定时器回调函数的调用频率。这个值代表了服务器在触发 SIGALRM
信号后多长时间再次触发该信号,从而执行定时任务。在代码示例中,TIMESLOT
被设置为 5 秒,也就是说,基本时间单位是 5 秒。
alarm
函数用于安排信号在未来的某个时间点发送给进程。这个函数是标准库中的一部分,定义在 <unistd.h>
头文件中。当调用 alarm(TIMESLOT);
时,设定了一个闹钟,让操作系统在 TIMESLOT
秒后向当前进程发送一个 SIGALRM
信号。
alarm(TIMESLOT);
在 timer_handler()
函数中调用,意味着每隔 TIMESLOT
秒,操作系统将向进程发送一个 SIGALRM
信号。收到这个信号后,sig_handler
处理函数将被执行,进而调用 timer_handler
函数来处理所有定时任务。
当一个新的客户端连接被接受,服务器为这个连接创建一个定时器,并设置其超时时间:
1
| timer->expire = cur + 3 * TIMESLOT;
|
cur
是当前时间,3 * TIMESLOT
表示定时器的超时时间是15秒(因为 3 乘以 5秒)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
| #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <signal.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #include <libgen.h> #include "lst_timer.h" #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define TIMESLOT 5 static int pipefd[2]; static sort_timer_lst timer_lst; static int epollfd = 0;
int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; }
void addfd(int epollfd, int fd) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); }
void sig_handler(int sig) { int save_errno = errno; int msg = sig; send(pipefd[1], (char *)&msg, 1, 0); errno = save_errno; }
void addsig(int sig) { struct sigaction sa; memset(&sa, '\0', sizeof(sa)); sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL) != -1); } void timer_handler() { timer_lst.tick(); alarm(TIMESLOT); } void cb_func(client_data *user_data) { epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); assert(user_data); close(user_data->sockfd); printf("close fd %d\n", user_data->sockfd); } int main(int argc, char *argv[]) { if (argc != 3) { printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char *ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd); ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); assert(ret != -1); setnonblocking(pipefd[1]); addfd(epollfd, pipefd[0]); addsig(SIGALRM); addsig(SIGTERM); bool stop_server = false; client_data *users = new client_data[FD_LIMIT]; bool timeout = false; alarm(TIMESLOT); while (!stop_server) { int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if ((number < 0) && (errno != EINTR)) { printf("epoll failure\n"); break; } for (int i = 0; i < number; ++i) { int sockfd = events[i].data.fd; if (sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength); addfd(epollfd, connfd); users[connfd].address = client_address; users[connfd].sockfd = connfd; util_timer *timer = new util_timer; timer->user_data = &users[connfd]; timer->cb_func = cb_func; time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; users[connfd].timer = timer; timer_lst.add_timer(timer); } else if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) { int sig; char signals[1024]; ret = recv(pipefd[0], signals, sizeof(signals), 0); if (ret == -1) { continue; } else if (ret == 0) { continue; } else { for (int i = 0; i < ret; ++i) { switch (signals[i]) { case SIGALRM: timeout = true; break; case SIGTERM: stop_server = true; break; } } } } else if (events[i].events & EPOLLIN) { memset(users[sockfd].buf, '\0', BUFFER_SIZE); ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0); printf("get %d bytes of client data %s from %d\n", ret, users[sockfd].buf, sockfd); util_timer *timer = users[sockfd].timer; if (ret < 0) { if (errno != EAGAIN) { cb_func(&users[sockfd]); if (timer) { timer_lst.del_timer(timer); } } } else if (ret == 0) { cb_func(&users[sockfd]); if (timer) { timer_lst.del_timer(timer); } } else { if (timer) { time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; printf("adjust timer once\n"); timer_lst.adjust_timer(timer); } } } } if (timeout) { timer_handler(); timeout = false; } } close(listenfd); close(pipefd[1]); close(pipefd[0]); delete[] users; return 0; }
|
整体讲解:
这段代码实现了一个基于 epoll
的网络服务器程序,它具备以下主要功能特点:
- 利用
epoll
实现高效的 I/O 复用,能同时处理多个客户端连接的 I/O 事件,并且采用了非阻塞套接字结合边缘触发(ET)模式来提升性能。
- 通过定时器链表(
sort_timer_lst
)来管理连接的超时时间,实现定时关闭长时间无活动的客户端连接功能,并且能够动态调整定时器时间(比如客户端有数据交互时延长连接的超时时间)。
- 可以处理系统信号,例如
SIGALRM
(用于定时任务触发)和 SIGTERM
(用于优雅地终止服务器进程),并在相应信号到来时执行特定的操作。
和10.4统一事件源思想类似,把信号处理函数定义为对管道写数据,写的就是信号的编号,然后通过对管道进行事件监听,一旦监听到管道有读事件,那就是有信号发过来了,recv接收并处理信号,本题中的信号就是定时器信号,5秒一次,到了的话就去处理定时器链表里面的超时事件
同时监听客户的读事件,如果客户在5秒内进行了写事件,那么就要重新设置定时器表示这个客户是活跃的
客户端
为了测试服务器能够按照预定的定时器逻辑关闭非活动连接,我们可以编写一个简单的客户端程序,该程序连接到服务器后不发送任何数据,仅保持连接一定时间后(sleep(20);
)关闭,看服务器是否会在设定的超时时间后自动关闭该连接。
服务端中,定时器的超时时间被设置为3 * TIMESLOT
,即15秒(TIMESLOT
设置为5秒)。定时器的目的是在客户端在指定时间内没有任何活动(例如数据交换)的情况下自动关闭该连接。
SIGALRM
信号每次触发就在其信号处理函数(如果使用统一事件源,则是主函数)中执行一次tick
函数,处理链表上的到期任务,并在终端打印”timer tick”。
1 2 3 4 5 6
| void timer_handler() { timer_lst.tick(); alarm(TIMESLOT); }
|
在服务器端,在没有收到任何数据的情况下,服务器在定时器到期后关闭了连接的日志消息,例如 “close fd <fd>
“ 的输出,表示服务器正确处理了非活动连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> int main(int argc, char *argv[]) { if (argc != 3) { printf("Usage: %s <ip> <port>\n", argv[0]); return 1; } const char *server_ip = argv[1]; int server_port = atoi(argv[2]); int sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { perror("Socket creation failed"); return 1; } struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(server_port); if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) { perror("Invalid address/ Address not supported"); return 1; } if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { perror("Connection Failed"); return 1; } printf("Connected to server, now sleeping...\n"); sleep(20); close(sock); printf("Disconnected from server\n"); return 0; }
|
效果

4.I/O 复用系统调用的超时函数
Linux下的3组I/O复用系统调用都带有超时参数,因此它们不仅能统一处理信号(通过管道在信号处理函数中通知主进程)和I/O事件,也能统一处理定时事件,但由于I/O复用系统调用可能在超时时间到期前就返回(有I/O事件发生),所以如果我们要利用它们来定时,就需要不断更新定时参数以反映剩余的时间,如下代码所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| #define TIMEOUT 5000
int timeout = TIMEOUT; time_t start = time(NULL); time_t end = time(NULL); while (1) { printf("the timeout is now %d mil-seconds\n", timeout); start = time(NULL); int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, timeout); if ((number < 0) && (errno != EINTR)) { printf("epoll failure\n"); break; } if (number == 0) { timeout = TIMEOUT; continue; } end = time(NULL); timeout -= (end - start) * 1000; if (timeout <= 0) { timeout = TIMEOUT; } }
|
基于 epoll
的事件循环与定时机制实现
这段代码实现了一个基于 epoll
的事件循环,并结合了定时处理的功能,整体功能围绕高效处理 I/O 事件以及定时任务展开,具体如下:
- 定时相关变量初始化
首先定义了一个宏 TIMEOUT
表示超时时间(单位为毫秒,不过代码中实际换算有些问题,后续会详细说明),初始化为 5000
。然后声明了一个整型变量 timeout
并初始化为 TIMEOUT
,用于记录剩余的超时时间。同时,通过 time(NULL)
函数获取当前时间分别赋值给 start
和 end
,用于后续计算时间差来跟踪时间流逝情况。
- 循环主体功能
进入一个无限循环(while (1)
),在每次循环中执行以下操作:
- 打印剩余超时时间信息:
使用 printf
函数打印当前剩余的超时时间(这里代码将 timeout
当作毫秒来处理,但实际 time(NULL)
获取的是秒数,后面计算时间差时需要进行相应转换才能准确对应毫秒单位,不过暂且按现有逻辑分析),方便查看定时情况。
- 启动
epoll_wait
并处理返回结果:
- 调用
epoll_wait
:每次循环都会调用 epoll_wait
函数,传入 epollfd
(epoll
实例对应的文件描述符)、events
(用于接收就绪事件的数组)、MAX_EVENT_NUMBER
(数组大小,限定了最多能接收的事件数量)以及 timeout
(本次等待的超时时间,单位在代码逻辑中期望是毫秒,虽然前面提到换算有问题)。这个函数会阻塞等待文件描述符上有感兴趣的事件发生或者超时。
- 处理
epoll_wait
返回值小于 0 的情况:如果 epoll_wait
返回值小于 0
,并且错误原因不是因为被信号中断(errno!= EINTR
),则说明 epoll
操作出现了真正的错误,此时会打印 “epoll failure”
提示信息,并通过 break
跳出循环,结束整个事件处理流程。
- 处理
epoll_wait
返回值等于 0 的情况(超时情况):当 epoll_wait
返回 0
时,意味着在设定的超时时间内没有文件描述符就绪事件发生,也就是超时时间到了。此时,代码会将 timeout
重新设置为初始的 TIMEOUT
值,用于下一轮循环继续等待事件时使用初始设定的超时时间,然后通过 continue
语句直接进入下一轮循环,去再次等待事件或者超时。
- 处理
epoll_wait
返回值大于 0 的情况(有文件描述符就绪):
- 更新时间记录:当
epoll_wait
返回值大于 0
时,表示有文件描述符上的事件就绪了。此时先获取当前时间赋值给 end
,然后通过 end - start
计算出本次 epoll_wait
调用所持续的时间(这里获取的时间是以秒为单位,实际代码逻辑期望换算成毫秒去更新 timeout
,存在前面提到的单位换算问题)。
- 更新剩余超时时间:用当前剩余的超时时间
timeout
减去本次 epoll_wait
调用持续时间换算后的毫秒数((end - start) * 1000
,这里代码本意是将秒换算成毫秒,但实际 time(NULL)
函数获取的是从 1970 年 1 月 1 日 00:00:00 UTC 到当前时间的秒数,所以换算逻辑不准确),以此来更新 timeout
的值,反映剩余的超时时间还有多少。
- 处理超时时间耗尽情况:如果更新后的
timeout
值小于等于 0
,说明本次 epoll_wait
在有文件描述符就绪的同时,超时时间也刚好到达(或者已经超过了设定的超时时间),这时会将 timeout
重新设置为初始的 TIMEOUT
值,准备下一轮循环继续基于初始超时时间进行事件等待与定时处理,同时也意味着此时需要去处理定时任务(虽然代码中没有明确展示具体的定时任务处理内容,只是做了这个时间重置的相关操作)。
- 处理就绪的连接(注释中的
handle connections
部分):
这部分代码没有具体展开,但可以推测在这里会对 epoll_wait
返回的就绪文件描述符对应的连接进行相应的读、写等 I/O 操作处理,比如接收客户端发送的数据、向客户端发送响应数据等操作,完成服务器与客户端之间基于网络连接的交互功能。
总体而言,这段代码通过不断循环调用 epoll_wait
函数来同时兼顾 I/O 事件的处理以及定时任务的触发,利用超时时间的计算和更新机制,使得程序既能及时响应文件描述符上的就绪事件,又能按照设定的时间间隔(虽然存在时间单位换算问题)来处理定时任务,在网络服务器等需要同时处理 I/O 和定时逻辑的场景中较为常用。不过代码中的时间计算部分需要修正时间单位换算逻辑,才能准确实现预期的定时功能。
time_t time(time_t *t);
函数原型:
参数
t
: 这是一个指向 time_t
类型的指针,用于存储时间值。如果参数是 NULL
,函数只返回当前时间。
返回值
- 函数返回一个
time_t
类型的值,它表示从Epoch至今的秒数。如果出现错误,则返回 (time_t) -1
。
6.高性能定时器(较难,需复习)
时间轮
基于排序链表的定时器存在一个问题:添加定时器的效率偏低(添加定时器的时间复杂度是**O(n)**
)。下面讨论的时间轮解决了这个问题,一种简单的时间轮如下图:

上图中,时间轮内部的实线指针指向轮子上的一个槽(slot),它以恒定的速度顺时针转动,每转动一步就指向下一个槽(虚线指针所指的槽),每次转动称为一个滴答(tick),一个滴答的时间称为时间轮的槽间隔si(slot interval),它实际上就是心搏时间。上图中的时间轮共有N个槽,因此它转动一周的时间是N * si
。每个槽指向一条定时器链表,每条链表上的定时器具有相同的特征:它们的定时事件相差N * si
的整数倍,时间轮正是利用这个关系将定时器散列到不同的链表中。假如现在指针指向槽cs
,我们要添加一个定时事件为ti
的定时器,则该定时器将被插入槽ts
(timer slot)对应的链表中:
基于排序链表的定时器使用唯一的一条链表来管理所有定时器,所以插入操作的效率随着定时器数目的增多而降低,而时间轮使用哈希表的思想,将定时器散列到不同的链表上,这样每条链表上的定时器数目都将明显少于原来的排序链表上的定时器数目,插入操作的效率基本不受定时器数目的影响。
对时间轮而言,要想提高定时精度,就要使si
足够小,要提高执行效率,就要求N足够大(N越大,散列冲突的概率就越小)。
以下代码描述了一种简单的时间轮,因为它只有一个轮子,而复杂的时间轮可能有多个轮子,不同的轮子有不同的粒度,相邻的两个轮子,精度高的转一圈,精度低的仅仅往前移动一槽,就像水表一样。
对时间轮而言,如果一共有n个定时器,则添加一个定时器的时间复杂度为O(1)
;删除一个定时器的时间复杂度平均也是O(1)
,但最坏情况下可能所有节点都在一个槽中,此时删除定时器的时间复杂度为O(n)
;执行一个定时器的时间复杂度是O(n)
,实际上执行一个定时器任务的效率要比O(n)
好得多,因为时间轮将所有定时器散列到了不同的链表上,时间轮的槽越多,等价于散列表的入口(entry)越多,从而每条链表上的定时器数量越少。此外,以上代码中只使用了1个时间轮,当使用多个轮子来实现时间轮时,执行一个定时器任务的时间复杂度将接近O(1)
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
|
#ifndef TIME_WHEEL_TIMER #define TIME_WHEEL_TIMER
#include <time.h> #include <netinet/in.h> #include <stdio.h>
#define BUFFER_SIZE 64 class tw_timer;
struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; tw_timer *timer; }
class tw_timer { public: tw_timer(int rot, int ts) :next(NULL), prev(NULL), rotation(rot), time_slot(ts){}
public: int rotation; int time_slot; void (*cb_func)(client_data *); client_data *user_data; tw_timer *next; tw_timer *prev; }
class time_wheel { public: time_wheel() : cur_slot(0) { for(int i = 0; i < N; ++i) { slots[i] = NULL; } } ~time_wheel() { for(int i = 0; i < N; ++i) { tw_timer *tmp = slots[i]; while(tmp) { slots[i] = tmp->next; delete tmp; tmp = slots[i]; } } }
tw_timer* add_timer(int timeout) { if(timeout < 0) { return NULL; } int ticks = 0; if(timeout < SI) { ticks = 1; } else { ticks = timeout / SI; } int rotation = ticks / N; int ts = (cur_slot + (ticks % N)) % N; tw_timer *timer = new tw_timer(rotation, ts); if(!slots[ts]) { printf("add timer, rotation is %d, ts is %d, cur_slot is %d\n", rotation, ts, cur_slot); } else { timer->next = slots[ts]; slots[ts]->prev = timer; slots[ts] = timer; } return timer; }
void del_timer(tw_timer *timer) { if(!timer) { return; } int ts = timer->time_slot; if(timer == slots[ts]) { slots[ts] = slots[ts]->next; if(slots[ts]) { slots[ts]->prev = NULL; } delete timer; } else { timer->prev->next = timer->next; if(timer->next) { timer->next->prev = timer->prev; } delete timer; }
}
void tick() { tw_timer *tmp = slots[cur_slot]; printf("current slot is %d\n", cur_slot); while(tmp) { printf("tick the timer once\n"); if(tmp->rotation > 0) { tmp->rotation--; tmp = tmp->next; } else { tmp->cb_func(tmp->user_data); if(tmp = slots[cur_slot]) { printf("delete header in cur_slot\n"); slots[cur_slot] = tmp->next; delete tmp; if(slots[cur_slot]) { slots[cur_slot]->prev = tmp->prev; } tmp = slots[cur_slot]; } else { tmp->prev->next = tmp->next; if(tmp->next) { tmp->next->prev = tmp->prev; } tw_timer *tmp2 = tmp->next; delete tmp; tmp = tmp2; } } } cur_slot = (++cur_slot) % N; } private: static const int N = 60; static const int SI = 1; tw_timer* slots[N]; int cur_slot; };
|
复杂度分析
由于添加一个定时器是链表头插,则时间复杂度为 O ( 1 ) O(1)O(1)
删除一个定时器的时间复杂的也为O ( 1 ) O(1)O(1)
执行一个定时器的复杂度为O ( n ) O(n)O(n).但实际执行一个定时任务效率要比O ( n ) O(n)O(n)好,因为时间轮将所有定时器散列到不同的链表上。
若使用多个轮子实现时间轮,执行一个定时器任务的复杂度可以降到O ( 1 ) O(1)O(1)
时间堆
以上讨论的定时方案都是以固定频率调用心搏函数tick
,并在其中依次检测到期的定时器,然后执行到期定时器上的回调函数。设计定时器的另一种思路是:将所有定时器中超时时间最小的定时器的超时值作为心搏间隔,这样,一旦心搏函数tick
被调用,超时时间最小的定时器必然到期,我们就可在tick
函数中处理该定时器,然后,再次从剩余定时器中找出超时时间最小的一个,并将这段最小时间设为下一次心搏间隔。
最小堆很适合这种定时方案:

最小堆是每个节点的值都小于或等于其子节点的值的完全二叉树。
由于最小堆其实就是一棵树,所以其实现可以用链表,也可以用数组。用最小堆实现的定时器称为时间堆。对时间堆而言,添加一个定时器的时间复杂度是O(lgn)
;删除一个定时器的时间复杂度是O(1)
;执行一个定时器的时间复杂度是O(1)
。因此,时间堆的效率很高。
最小堆很适合这种定时方案。本文实现最小堆有以下关键特点:
- 根节点值小于其孩子节点的值(递归成立);
- 插入节点是在最后一个节点添加新节点,然后进行上滤保证最小堆特性;
- 删除节点是删除其根节点上的元素,然后把最后一个元素移动到根节点,进行下滤操作保证最小堆特性;
- 将N个元素的数组(普通二叉树)初始化为最小堆,即从二叉树最后一个非叶节点到根节点(第[ ( N − 1 ) / 2 ] [(N-1) / 2][(N−1)/2] ~ 0 个元素)执行下滤操作。
- 本文实现的最小堆底层是用数组进行存储,是一个适配器,联想C++的
priority_queue<int, vector<int>, greater<int>>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
| #ifndef MIN_HEAP #define MIN_HEAP
#include <iostream> #include <netinet/in.h> #include <time.h> using std::exception;
#define BUFFER_SIZE 64 class heap_timer;
struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; heap_timer *timer; };
class heap_timer { public: heap_timer(int delay) { expire = time(NULL) + delay; }
public: time_t expire = expire; void (*cb_func)(client_data *); client_data *user_data; };
class time_heap { public: time_heap(int cap) throw (std::exception) : capacity(cap), cur_size(0) { array = new heap_timer*[capacity]; if(!array) { throw std::exception(); } else { for(int i = 0; i < capacity; ++i) { array[i] = NULL; } } }
time_heap(heap_timer **init_array, int size, int capacity) throw (std::exception) : cur_size(size), capacity(capacity) { if(capacity < size) { throw std::exception(); } array = new heap_timer*[capacity]; if(!array) { throw std::exception(); } for(int i = 0; i < capacity; ++i) { array[i] = NULL; } if(size != 0) { for(int i = 0; i < size; ++i) { array[i] = init_array[i]; } for(int i = (cur_size - 1); i >= 0; --i) { percolate_down(i); } } } ~time_heap() { for(int i = 0; i < cur_size; ++i) { delete array[i]; } delete []array; }
public: void add_timer(heap_timer *timer) throw (std::exception) { if(!timer) { return; } if(cur_size >= capacity) { resize(); } int hole = cur_size++; int parent = 0; for(; hole > 0; hole = parent) { parent = (hole - 1) / 2; if(array[parent]->expire <= timer->expire) { break; } array[hole] = array[parent]; } array[hole] = timer; } void del_timer(heap_timer *timer) { if(!timer) { return; } timer->cb_func = NULL; }
heap_timer* top() const { if(empty()) { return NULL; } return array[0]; }
void pop_timer() { if(empty()) { return ; } if(array[0]) { delete array[0]; array[0] = array[--cur_size]; percolate_down(0); } }
void tick() { heap_timer *tmp = array[0]; time_t cur = time(NULL); while(!empty()) { if(!tmp) { break; } if(tmp->expire > cur) { break; } if(array[0]->cb_func) { array[0]->cb_func(array[0]->user_data); } pop_timer(); tmp = array[0]; } }
bool empty() const { return cur_size == 0; }
private: void percolate_down(int hole) { heap_timer *temp = array[hole]; int child = 0; for(; (hole * 2 + 1) <= (cur_size - 1); hole = child) { child = hole * 2 + 1; if((child < (cur_size - 1)) && (array[child + 1]->expire < array[child]->expire)) { child++; } if(array[child]->expire < temp->expire) { array[hole] = array[child]; } else { break; } } array[hole] = temp; }
void resize() throw (std::exception) { heap_timer **temp = new heap_timer*[2 * capacity]; for(int i = 0; i < 2 * capacity; ++i) { temp[i] = NULL; } if(!temp) { throw std::exception(); } capacity = 2 * capacity; for(int i = 0; i < cur_size; ++i) { temp[i] = array[i]; } delete []array; array = temp; } private: heap_timer **array; int capacity; int cur_size; };
#endif
|
复杂度分析
- 对时间堆而言,添加一个定时器的时间复杂度为O ( l o g n ) O(logn)O(log**n)(由于需要上滤操作)
- 删除一个定时器的时间复杂度为O ( 1 ) O(1)O(1),这是因为只是将目标定时器的回调函数设置为空
- 执行一个定时器的时间复杂度为O ( 1 ) O(1)O(1)