7. I/O 复用

0.前情提要-IO多路转接服务器设计思路

1.前情提要

I/O 复用使得程序能够同时监听多个文件描述符,从而提高程序的性能。I/O 复用本身是阻塞的。Linux 下实现 I/O 复用的系统调用主要有 selectpollepoll

IO复用和多路IO复用只是表述上不同,实际上指的是同一个东西。

Reactor 模型基本原理回顾

  • Reactor 模型是一种事件驱动的设计模式,主要包含事件分离器(Reactor)和事件处理器(Handler)。事件分离器负责监听 I/O 事件,当事件发生时,将事件分发给相应的事件处理器进行处理。、

而本章的select,poll,epoll都是Reactor模型的一种实现方式。

select 是 Reactor 模型的一种实现方式

  • 事件监听机制:select 函数充当事件分离器的角色。它可以同时监听多个文件描述符(包括套接字)的可读、可写和异常事件。程序将需要监听的文件描述符集合传递给 select 函数,select 会阻塞等待这些文件描述符上的事件发生。
  • 事件分发处理:当 select 返回时,表示有事件发生。此时,程序需要遍历文件描述符集合,检查每个文件描述符的状态,以确定是哪个文件描述符发生了哪种事件(可读、可写或异常)。然后,根据事件类型调用相应的处理函数,这些处理函数就是事件处理器。例如,对于可读事件,可以调用接收数据的函数;对于可写事件,可以调用发送数据的函数。

poll 也是 Reactor 模型的实现方式

  • 与 select 类似的事件监听:poll 函数的功能和 select 类似,也是用于监听多个文件描述符的事件。它使用一个结构体数组来表示文件描述符及其等待的事件类型,通过不断地轮询这些文件描述符来检查事件是否发生。
  • 事件分发处理机制相同:当 poll 返回后,程序同样需要遍历结构体数组来确定发生事件的文件描述符和事件类型,然后调用相应的处理函数进行处理。和 select 一样,它符合 Reactor 模型中事件分离器监听事件并将事件分发给事件处理器的基本架构。

epoll 是对 Reactor 模型更高效的实现

  • 事件监听的改进:epoll 在 Linux 系统中是一种高效的 I/O 复用机制,也可以看作是 Reactor 模型的实现。epoll 通过在内核中维护一个事件表来管理文件描述符。程序可以向这个事件表中添加、删除或修改要监听的文件描述符及其事件类型。epoll_wait 函数用于等待事件的发生,它只会返回发生了事件的文件描述符,而不像 select 和 poll 那样需要遍历所有的文件描述符来确定事件发生的位置。
  • 高效的事件分发处理:当 epoll_wait 返回后,程序直接获取了发生事件的文件描述符列表,然后针对这些文件描述符对应的事件进行处理,这使得事件的分发和处理更加高效。它同样遵循 Reactor 模型的基本思想,即由事件分离器(epoll 机制)监听事件,然后将事件分发给相应的事件处理器(处理函数)进行处理。

2.IO多路转接服务器设计思路

10-多路IO转接服务器设计思路_哔哩哔哩_bilibili

讲的比书上讲的好太多了,只能说没法比

以select为例子

image-20241212192051846

服务器看做老板,select看做秘书,但是这个秘书不太聪明,她只知道有事情了(请求,读,写,异常)的话就去叫老板

下面来说一下客户c1,c2,c3连接服务器的过程

1.当前还没有任何客户连接

2.c1请求连接

3.select秘书通知老板srver有请求

4.老板sever创建监听套接字lfd

5.小秘书拿走lfd

6.老板sever创建连接套接字cfd1

7.小秘书拿走cfd1和c1建立联系,同时开始监听c1上的读,写,异常请求

8.小秘书继续监听是否有请求,同时监听和c1建立的连接上是否有读,写,异常的请求,有的话再次通知老板处理读写请求

9.c2请求,老板创建cfd2

10.小秘书拿走cfd2和c2建立联系,同时开始监听c2上的读,写,异常请求

11.小秘书继续监听是否有请求,同时监听cfd1,cfd2上是否有读,写,异常的请求,有的话再次通知老板处理读写请求

12.c3请求,老板创建cfd3

13.小秘书拿走cfd3和c3建立联系,同时开始监听c3上的读,写,异常请求

14.小秘书继续监听是否有请求,同时监听cfd1,cfd2,cfd3上是否有读,写,异常的请求,有的话再次通知老板处理读写请求

select此时掌管着四个socket,在没有任何请求的时候,老板sever该干啥干啥就行了。你有事你和我秘书说,秘书说了我来处理

而在没有这个秘书的情况下

老板sever只能阻塞等待,等着有请求的时候去连接,除此之外什么都干不了

1.select 系统调用

1.select API

select系统调用:在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写、异常事件:

1
2
#include <sys/select.h>
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);

参数:

  • nfds:指定监听的文件描述符的总数,通常是select监听的所有文件描述符中的最大值+1。(因为文件描述符从 0 开始计数)。

  • readfds, writefds, exceptfds:分别指向可读、可写、异常等事件对应文件描述符集合。应用调用select时,我们通过这3个参数传入自己感兴趣的文件描述符,当select函数返回时,内核将修改它们来通知应用进程哪些文件描述符已经就绪。都是传入传出参数,本质上是一个位图。

    传入的时候比如 读事件集合是3,5,6, 写事件集合4,6 异常事件7

    传出的时候就只传出实际发出了请求的 比如3,5发出了读请求,6没有,那读事件集合传出的时候就只剩下3,5了

  • image-20241212194932191

    传0就是轮询,就是非阻塞的那种状态

    NULL就是阻塞,只不过不是进程阻塞,而是select阻塞,这部分是由内核完成的

  • fd_set 结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <typesizes.h>
#define __FD_SETSIZE 1024

#include <sys/select.h>
#define FD_SETSIZE __FD_SETSIZE
typedef long int __fd_mask;
#undef __NFDBITS
#define __NFDBITS (8 * (int) sizeof(__fd_mask))
typedef struct
{
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
#define __FDS_BITS(set) ((set)->fds_bits);
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
#define __FDS_BITS(set) ((set)->__fds_bits);
} fd_set;

fd_set能容纳的文件描述符数量由FD_SETSIZE指定

  • timeout:设置select函数的超时时间。
1
2
3
4
struct timeval{
long tv_sec; //秒数
long tv_usec; //微秒数
}

返回值:

select成功时返回就绪(可读、可写、异常)文件描述符的总数,如果在超时时间内没有任何文件描述符就绪,select返回0。select失败返回会-1并设置errno,如果在select等待期间,程序收到信号,select立刻返回-1并设置errnoEINTR

监听集合对应函数

void FD_ZERO(fd_set *set); --清空一个文件描述符集合

1
fd_set rset; FD_ZERO(&rset); //将rset集合清空

void FD_SET(int fd, fd_set *set); --将待监听的文件描述符添加到监听集合中

1
FD_SET(3,&rset);FD_SET(5,&rset); //将文件描述符3和5加到rset集合中

void FD_CLR(int fd, fd_set *set); --将一个文件描述符从监听集合中移除

int FD_ISSET(int fd, fd_set *set); --判断一个文件描述符是否在该集合中

返回1就在,0就不在

2.文件描述符就绪条件

哪些情况下文件描述符可以被认为是可读、可写或者出现异常:

下列情况socket可读

  1. socket内核接收缓冲区字节数大于等于低水位标记 SO_RCVLOWAT
  2. socket通信对方关闭连接,此时socket读操作返回0;
  3. 监听socket上有新的连接请求;
  4. socket 上有未处理错误,可用getsockopt读取和清除错误。

下列情况socket可写

  1. socket内核发送缓冲区可用字节数大于等于低水位标记 SO_SNDLOWAT
  2. socket通信写操作被关闭,对写操作关闭的socket执行写操作会触发SIGPIPE信号;
  3. socket使用非阻塞connect连接成功或失败(超时)之后;
  4. socket上有未处理错误,可用getsockopt读取和清除错误。

socket能处理的异常

  1. 只有一种情况:socket接收到带外数据。

3.处理带外数据

socket上接收到普通数据和带外数据都将使select函数返回,但 socket 处于不同的就绪状态:前者处于可读状态,后者处于异常状态。

下面的代码清单描述了 select 如何同时处理二者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/*
#include ...
*/

int main(int argc, char *argv[]) {

// 省略其他操作

while (1) {
memset(buf, '\0', sizeof(buf));

// 设置fdset的位fd
FD_SET(connfd, &read_fds);
FD_SET(connfd, &exception_fds);

ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL);

if (ret < 0) {
printf("selection failure\n");
break;
}

// 可读事件,采用普通的recv函数读取数据
if (FD_ISSET(connfd, &read_fds)) {
ret = recv(connfd, buf, sizeof(buf) - 1, 0);
if (ret <= 0) {
break;
}
printf("get %d bytes of normal data: %s\n", ret, buf);
}
// 异常事件,采用带MSG_OOB标志的recv函数读取带外数据
else if (FD_ISSET(connfd, &exception_fds)) {
ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
if (ret <= 0) {
break;
}
printf("get %d bytes of oob data: %s\n", ret, buf);
}
}

close(connfd);
close(listenfd);
return 0;
}

关于accept只调用了一次的原因

  • 代码中从始至终确实只调用了一次accept函数。
  • 原因是这段代码的主要目的是演示如何在一个已建立的连接上处理多个读事件(包括普通数据读取和带外数据读取),而不是处理多个连接的建立。如果要处理多个连接的建立,需要在while循环内添加accept函数调用,并对每个新连接进行相应的处理。例如,可以在select函数检测到监听套接字(listenfd)有可读事件时(表示有新的连接请求),调用accept函数接受新连接,并将新连接的套接字加入到select监听的文件描述符集合中。

select 调用同时接收普通数据和带外数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include "TCPServerSocket.h"
#include <stdio.h>
#include <iostream>
#include <sys/select.h>
#include <stdlib.h>
#include <errno.h>

#define BUFF_SIZE 1024

int main(int argc, char* argv[]){

if(argc <= 2){
printf("Usage: %s ip_address port_number.\n", basename(argv[0]));
}

const char* ip = argv[1]; // ip地址
int port = atoi(argv[2]); // 端口号

TCPServerSocket server(ip, port);

// 绑定socket到指定的地址和端口
if (server.bindSocket() < 0) {
throw std::runtime_error("Failed to bind socket");
}

// 开始监听连接,backlog参数指定了可以挂起(等待被accept)的连接数
if (server.listenSocket(5) < 0) {
throw std::runtime_error("Failed to listen on socket");
}

// 等待并接受一个客户端连接
struct sockaddr_in client_addr;
socklen_t client_addrlen = sizeof(client_addr);
int client_sockfd = server.acceptConnection(&client_addr, &client_addrlen);

if (client_sockfd < 0) {
throw std::runtime_error("Failed to accept connection");
}

char buf[BUFF_SIZE];
fd_set read_fds;
fd_set exception_fds;
FD_ZERO(&read_fds); /* 清除 fd_set 的所有位 */
FD_ZERO(&exception_fds);

while(1)
{
memset(buf, '\0', BUFF_SIZE);
/* 每次调用select之前都要重新设置文件描述符,因为事件发生之后,文件描述符集合会被内核修改 */
FD_SET(client_sockfd, &read_fds);
FD_SET(client_sockfd, &exception_fds);
int ret = select(client_sockfd + 1, &read_fds, NULL, &exception_fds, NULL);
if(ret < 0) {
printf("Selection failure.\n");
return 1;
}
/* 处理可读事件,对于多个文件描述符,需要循环询问是否可读 */
if (FD_ISSET(client_sockfd, &read_fds)) {
ret = recv(client_sockfd, buf, BUFF_SIZE - 1, 0);
if(ret <= 0 ){
return 1;
}
printf("Read: get %d bytes of normal data: %s\n", ret, buf);
}
/* 处理异常事件, 采用带MSG_OOB标志的recv函数读取带外数据 */
else if (FD_ISSET(client_sockfd, &exception_fds)){
ret = recv(client_sockfd, buf, BUFF_SIZE - 1, MSG_OOB);
if(ret <= 0 ){
return 1;
}
printf("Exception: get %d bytes of oob data: %s\n", ret, buf);
}
}
close(client_sockfd);
return 0;
}

编译文件:

1
g++ -o select_server select.cpp

运行:

1
2
./select_server 127.0.0.1 12345		// 服务器端
./client 127.0.0.1 12345 // 客户端

img

这里大家发现了,服务器终端并没有显示接收到了带外数据,是什么原因导致的呢?我们在第五节测试的时候,明明 TCP 服务器终端是打印出了带外数据的(使用带 MSG_OOB 标志的 recv 函数),说明问题肯定没有出在数据的发送上(因为这两次数据发送使用的是同一个客户端程序)。

那么数据接收出了什么问题呢?猜测可能是由于数据发送间隔太短,导致服务器没有时间处理带外数据。

于是我们在 test_sned_oob.cpp 函数中加入延时:

img

重新编译运行,结果如下:

img

成功了!

4.select实现多路IO转接服务器

读者转至笔者这篇博客学习

linux网络编程 | c | select实现IO多路转接服务器-CSDN博客

5.优缺点

缺点:

1.监听上限受限于文件描述符(一个进程也就1024个,如果我们有5000个客户端那还得用多进程)

2.检测满足条件的fd的时候,它自身的内部只能去轮询文件描述符,效率较低(特别是遇到只有两个文件描述符一个3一个1023的时候效率最低),需要咱们自己去添加业务逻辑,提高了编码难度

优点:

跨平台

2.poll 系统调用

要求:能看懂看,看不懂也没啥大事,现在基本都用epoll代替了

在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。

1
2
#include <poll.h>
int poll(struct pollfd* fds, nfds_t nfds, int timeout);

参数:

  • fds:指定所有我们感兴趣的文件描述符上发生的可读、可写、异常事件。
1
2
3
4
5
struct pollfd{
int fd; // 待监听的文件描述符
short events; // 待监听的文件描述符对应的监听事件 events成员告诉poll函数监听fd成员上的哪些事件,它是一系列事件的按位或
short revents; // 由内核修改,以通知应用程序fd上实际发生了哪些事件。传入的时候给0值,如果满足事件的话就返回那三个
};

就是传入的是events传出的是revents,比select做的改进就是把传入和传出的分开了,而select传入传出是一个,其实没太大作用

  • nfds:指定被监听事件集合fds参数数组的大小
1
typedef unsigned long int nfds_t
  • timeout:指定 poll 的超时值,单位是 ms。timeout == -1poll调用会一直阻塞,直到某个事件发生;当timeout == 0poll调用立刻返回。

poll 支持的事件类型:

img

img

自Linux内核2.6.17开始,GNU为poll系统调用增加了POLLRDHUP事件,它在socket上接收到对方关闭连接的请求后触发。

返回值:

  • 与 select 相同。

select成功时返回就绪(可读、可写、异常)文件描述符的总数,如果在超时时间内没有任何文件描述符就绪,select返回0。select失败返回会-1并设置errno,如果在select等待期间,程序收到信号,select立刻返回-1并设置errnoEINTR

poll实现多路IO转接服务器

3.epoll 系列系统调用

image-20241213084531049

连接服务器的客户端很多,但是活跃的客户端很少,这种情况最适合epoll

内核事件表

epoll函数是Linux特有的I/O复用函数,它在实现和使用上与selectpoll函数有很大差异:

  • epoll函数使用一组函数来完成任务,而非单个函数;
  • epoll函数把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像selectpoll函数那样每次调用都要重复传入文件描述符集或事件集;
  • epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表,这个文件描述符使用epoll_create函数来创建(创建一颗监听红黑树):
1
2
#include <sys/epoll.h>
int epoll_create(int size);

该函数返回的文件描述符将用作其他所有epoll系统调用的第1个参数,以指定要访问的内核事件表,失败返回-1。

底层是红黑树,我们传入的size是我们自己预计epoll要监听多少文件描述符,供内核参考,如果以后超过了这个值,内核会帮我们动态扩容。

image-20241213082846695

操作内核事件表 epoll_ctl

1
2
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
  • fd:要操作的文件描述符。待监听的fd
  • op:指定操作类型。

操作类型有如下三种:

EPOLL_CTL_ADD:往事件表中注册fd上的事件。添加fd到监听红黑树

EPOLL_CTL_MOD:修改fd上的注册事件。修改fd在监听红黑树上的监听事件 原来是读可以改为写,写也可以改成读之类的

EPOLL_CTL_DEL:删除fd上注册事件。将一个fd从红黑树上摘下

  • event:指定事件。本质是struct epoll_event 结构体的地址

    events:EPOLLIN/EPOLLOUT/EPOLLERR 读/写/异常

    data:是一个联合体

1
2
3
4
struct epoll_event{
__uin32_t events; // 表示epoll事件
epoll_data_t data; // 用户数据
};

events 成员描述事件类型,epoll函数支持的事件类型和poll函数基本相同,表示epoll事件类型的宏是在poll对应的宏前加上E,如epoll的数据可读事件是EPOLLIN,但epoll有两个额外的事件类型EPOLLETEPOLLONESHOTepoll_data_t定义如下:

1
2
3
4
5
6
typedef union epoll_data{
void *ptr;
int fd;//对应监听事件的fd 和第二个参数是对应的
uint32_t u32;
uint64_t u64;
} epoll_data_t;

epoll_data_t是一个联合体,其4个成员中使用最多的是fd成员,它指定事件所从属的文件描述符。prt成员是指向用户定义数据的指针,但由于epoll_data_t是一个联合体,我们不能同时使用其ptr成员和fd成员。

返回值:

epoll_ctl函数成功时返回0,失败时返回-1并设置errno

epoll_wait 函数

epoll系列系统调用的主要接口是epoll_wait,它在一段超时时间内等待一组文件描述符上的事件:

1
2
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
参数:
  • timeout:设置epoll_wait函数的超时时间。

    image-20241213084258685

  • maxevents:指定最多监听多少个事件,必须大于 0。就是数组元素的总个数

    比如这个例子中,就是传入1024

    1
    struct epoll_event events[1024]
  • eventsepoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd参数指定)中复制到它的第二个参数events指向的数组中,这个数组只用于输出epoll_wait函数检测到的就绪事件,而不像selectpoll函数的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件,这样就极大地提高了应用进程索引就绪文件描述符的效率。

    一个典型的传出参数,传出的就是满足对应事件的文件描述符集合,是一个数组,存储的就是这些文件描述符,遍历的时候直接遍历这个数组就行。

例如,我们要索引 poll 返回的就绪文件描述符:

1
2
3
4
5
6
7
8
9
int ret = poll(fds, MAX_EVENT_NUMBER, -1);

// 遍历所有已注册文件描述符并找到其中的就绪者(当然可用ret来稍做优化)
for (int i = 0; i < MAX_EVENT_NUMBER; ++i) {
if ( fds[i].revents & POLLIN ) { // 判断第i个文件描述符是否就绪
int sockfd = fds[i].fd;
// 处理socket
}
}

而索引 epoll 返回的就绪文件描述符:

1
2
3
4
5
6
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
// 仅遍历就绪的ret个文件描述符
for (int i = 0; i < ret; ++i) {
int sockfd = events[i].data.fd;
// socketfd肯定就绪,直接处理
}
返回值:

成功时返回就绪的文件描述符个数(监听到的满足条件的文件数量,可以用作循环上限),失败时返回-1并设置errno

返回0的话说明没有满足条件的文件数量。

LT 模式和 ET 模式

10-ET和LT模_哔哩哔哩_bilibili

image-20241213092050204

ET模式客户端必须得有下一次动作才能把缓冲区剩余未读进去的 给读进去

image-20241213092107620

epoll对文件描述符的操作有两种模式:LT(Level Trigger,电平触发)模式和ET(Edge Trigger,边沿触发)模式。

LT模式是默认的工作模式,这种模式下epoll相当于一个效率较高的poll。当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描述符。ET模式是epoll的高效工作模式。

对于采用LT工作模式的文件描述符,当epoll_wait函数检测到其上有事件发生并将此事件通知应用进程后,应用进程可以不立即处理该事件,这样,当应用进程下次调用epoll_wait时,epoll_wait函数还会再次向应用进程通告此事件,直到该事件被处理。

而对于采用ET工作模式的文件描述符,当epoll_wait函数检测到其上有事件发生并将此事件通知应用进程后,应用进程应立即处理该事件,因为后续的epoll_wait调用将不再向应用进程通知这一事件。可见ET模式降低了同一个epoll事件被重复触发的次数,因此效率比LT模式高。以下代码体现了LT和ET在工作方式上的差异(这里只给出了主要的代码段,想了解完整代码的请参阅 《Linux 高性能服务器编程》第九章 P154-157):

LT 模式的工作流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void lt(epoll_event *events, int number, int epollfd, int listenfd) {
char buf[BUFFER_SIZE];
for (int i = 0; i < number; ++i) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
addfd(epollfd, connfd, false);
} else if (events[i].events & EPOLLIN) {
/* 只要socket读缓存中还有未读出的数据,这段代码就被触发 */
printf("event trigger once\n");
memset(buf, '\0', BUFFER_SIZE);
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if (ret <= 0) {
close(sockfd);
continue;
}
printf("get %d bytes of content: %s\n", ret, buf);
} else {
printf("something else happened\n");
}
}
}

ET 模式工作流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void et(epoll_event *events, int number, int epollfd, int listenfd) {
char buf[BUFFER_SIZE];
for (int i = 0; i < number; ++i) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
addfd(epollfd, connfd, true);
} else if (events[i].events & EPOLLIN) {
// 这段代码不会被重复触发,所以需要循环读取数据
printf("event trigger once\n");
while (1) {
memset(buf, '\0', BUFFER_SIZE);
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if (ret < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
printf("read later\n");
break;
}
close(sockfd);
break;
} else if (ret == 0) {
close(sockfd);
} else {
printf("get %d bytes of content: %s\n", ret, buf);
}
}
} else {
printf("something else happened\n");
}
}
}

ET模式下事件被触发的次数比LT模式下少很多。

使用ET模式的文件描述符应该是非阻塞的。如果文件描述符是阻塞的,那么读或写操作将会因为没有后续事件而一直处于阻塞状态。

重点:非阻塞socket+ET比LT要高效

epoll的et模式下阻塞socket不能正常工作吗?

  1. epollET模式与阻塞套接字结合的问题
    • epollET模式下,阻塞套接字可以工作,但会出现一些问题,导致不能很好地发挥ET模式的优势,甚至可能导致数据丢失或程序逻辑异常。
    • 事件触发机制的冲突:
      • ET模式是边缘触发模式,epoll只会在套接字状态发生变化的边缘(如从不可读到可读,从不可写到可写)触发事件通知。当epoll通知套接字可读后,如果是阻塞套接字,在read操作时可能会因为数据量较大等原因一直阻塞,等待读取所有数据。
      • 例如,假设客户端发送了一个较大的数据包,epoll检测到套接字可读并通知应用程序。应用程序使用阻塞套接字的read函数读取数据,由于数据量超过了一次读取的缓冲区大小,read函数会一直阻塞,等待读取剩余的数据。
    • 数据丢失风险:
      • 在上述阻塞读取的过程中,如果客户端又发送了新的数据,epoll不会再次通知套接字可读,因为在ET模式下,只有状态再次发生变化边缘时才会触发通知。这样,新到达的数据就可能因为程序阻塞在之前的read操作上而无法被及时处理,导致数据丢失。
  2. 对比非阻塞套接字的优势
    • 非阻塞套接字的高效处理:
      • 非阻塞套接字在epollET模式下可以更好地工作。当epoll通知套接字可读后,应用程序使用非阻塞套接字的read函数进行读取。如果数据没有读完,read函数会返回EAGAINEWOULDBLOCK,表示当前没有数据可读,但这不是错误情况。
      • 程序可以记录读取的进度,然后继续处理其他任务(如检查其他套接字的事件)。下一次epoll循环或者在合适的时机,程序可以再次对这个套接字进行read操作,继续读取剩余的数据,从而避免了数据丢失,并且可以高效地利用系统资源,实现对多个套接字的并发处理。

重点 :数据丢失的例子:

客户端一次输入10个字节

依次输入5个a 5个b (1)

下一次输入 5个c 5个d (2)

服务器端一次读5个

ET模式下+阻塞socket:

第一次读5个a,第二次读5个b,如果后续没有读事件,那么5c和5d就丢失了

LT模式下:

第一次就能读出5a5b,第二次就能读出5c5d

ET模式下+非阻塞socket:

第一次读出5a后,程序会知道可能有数据没读完,继续尝试读取数据,所以5b会被读出

第二次重复一样的过程,所以不会出现数据丢失的情况

LT模式:

  • LT模式下,只要缓冲区中有数据,epoll就会一直触发可读事件。所以如你所说,在这种模式下,第一次能读出 “5a5b”,第二次能读出 “5c5d”。因为当第一次读取部分数据后,由于缓冲区中还有剩余数据,epoll会持续通知套接字可读,直到所有数据被读完。

ET模式 + 阻塞套接字:

  • 你的分析正确。当服务器端使用阻塞套接字时,在ET模式下,第一次读取出 “5a” 后,由于是阻塞read,会继续读取直到把 “5b” 也读完。如果后续没有新的读事件触发(因为ET模式只有状态变化边缘才触发,而此时套接字状态没有新的从不可读到可读的变化),那么当客户端发送 “5c5d” 时,服务器端无法知晓,数据就会丢失。

ET模式 + 非阻塞套接字

  • 当服务器端使用ET模式和非阻塞套接字时,第一次epoll触发可读事件后,read操作会读取 “5a”,由于是非阻塞套接字,read不会阻塞等待更多数据。返回 “5a” 后,程序会知道可能还有数据没读完,会继续尝试读取,所以可以接着读取 “5b”。

  • 当客户端发送下一批 “5c5d” 时,因为ET模式下,套接字状态从之前读完数据后的不可读到又有新数据可读,会再次触发epoll的可读事件。然后服务器端又可以通过非阻塞read操作读取这批新数据,所以不会出现数据丢失的情况。

EPOLLONESHOT 事件

即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次,这在并发程序中会引起问题,比如一个线程(或进程,下同)在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另一个线程被唤醒来读取这些新数据,于是就出现了两个线程同时操作一个socket的局面,这不是我们所期望的,我们期望的是一个socket连接在任一时刻都只被一个线程处理,这可用EPOLLONESHOT事件实现。

对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写、异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件,这样,当一个线程在处理某个socket时,其他线程不可能有机会操作socket。注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下次可读时,其EPOLLIN事件能触发,从而让其他线程有机会处理这个socket。

以下代码展示了EPOLLONESHOT事件的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024

struct fds {
int epollfd;
int sockfd;
};

int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

// 将fd参数上的EPOLLIN和EPOLLET事件注册到epollfd参数指示的内核事件表中
// 参数oneshot指定是否注册fd参数上的EPOLLONESHOT事件
void addfd(int epollfd, int fd, bool oneshot) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
if (oneshot) {
event.events |= EPOLLONESHOT;
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

// 重置fd参数上的事件,这样操作后,可以再次触发fd参数上的事件
void reset_oneshot(int epollfd, int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}

// 工作线程
void *worker(void *arg) {
int sockfd = ((fds *)arg)->sockfd;
int epollfd = ((fds *)arg)->epollfd;
printf("start new thread to receive data on fd: %d\n", sockfd);
char buf[BUFFER_SIZE];
memset(buf, '\0', BUFFER_SIZE);
// 循环读取sockfd上的数据,直到遇到EAGAIN错误
while (1) {
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if (ret == 0) {
close(sockfd);
printf("foreiner closed the connection\n");
break;
} else if (ret < 0) {
if (errno == EAGAIN) {
reset_oneshot(epollfd, sockfd);
printf("read later\n");
break;
}
} else {
printf("get content: %s\n", buf);
// 休眠5s,模拟数据处理过程
sleep(5);
}
}
printf("end thread receiving data on fd: %d\n", sockfd);
}

int main(int argc, char *argv[]) {
if (argc != 3) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);

int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);

int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);

ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);

ret = listen(listenfd, 5);
assert(ret != -1);

epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
// 监听socket上不能注册EPOLLONESHOT事件,否则只能处理一个客户连接
// 后续的连接请求将不再触发listenfd上的EPOLLIN事件
addfd(epollfd, listenfd, false);

while (1) {
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if (ret < 0) {
printf("epoll failure\n");
break;
}

for (int i = 0; i < ret; ++i) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
// 对每个非监听文件描述符都注册EPOLLONESHOT事件
addfd(epollfd, connfd, true);
} else if (events[i].events & EPOLLIN) {
pthread_t thread;
fds fds_for_new_worker;
fds_for_new_worker.epollfd = epollfd;
fds_for_new_worker.sockfd = sockfd;
// 对每个客户请求都启动一个工作线程为其服务
pthread_create(&thread, NULL, worker, (void *)&fds_for_new_worker);
} else {
printf("something else happened\n");
}
}
}

close(listenfd);
return 0;
}

从工作线程函数worker来看,如果一个工作线程处理完某个socket上的一次请求(我们用休眠5秒来模拟此过程)之后,又接收到该socket上新的客户请求,则该线程将继续为这个socket服务,并且由于该socket上注册了EPOLLONESHOT事件,主线程中epoll_wait函数不会返回该描述符的可读事件,从而不会有其他线程读这个socket,如果工作线程等待5秒后仍没收到该socket上的下一批客户数据,则它将放弃为该socket服务,同时调用reset_oneshot来重置该socket上的注册事件,这将使epoll有机会再次检测到该socket上的EPOLLIN事件,进而使得其他线程有机会为该socket服务。

有了EPOLLONESHOT,尽管一个socket在不同时间可能被不同的线程处理,但同一时刻肯定只有一个线程在为它服务,这就保证了连接的完整性,从而避免了很多可能的竞态条件。

EPOLLONSHOT和锁的比较

  1. EPOLLONESHOT事件的作用原理
    • EPOLLONESHOTepoll事件机制中的一个标志位。当一个文件描述符(fd)被注册了EPOLLONESHOT事件后,在该文件描述符对应的事件触发并被处理一次后,内核会自动将该文件描述符从epoll的就绪事件列表中移除,并且之后不会再对这个文件描述符的相同事件进行通知,直到通过epoll_ctl函数重新设置这个文件描述符的事件。
  2. 与锁的相似之处
    • 独占性和单次操作保障:
      • 就像锁可以保证在某一时刻只有一个线程(或进程)能够访问被锁定的资源一样,EPOLLONESHOT在一定程度上保证了对于一个文件描述符的事件处理是独占性的单次操作。当一个线程(或进程)在处理该文件描述符的事件时,其他线程(或进程)不会收到相同事件的通知,避免了多个线程(或进程)同时处理同一个文件描述符事件可能导致的并发问题。
    • 资源访问控制方面:
      • 锁用于控制对共享资源的访问顺序和并发访问数量,EPOLLONESHOT可以控制对文件描述符事件的处理顺序和次数。例如,在多线程服务器环境下,如果多个线程共享一个epoll实例来处理客户端连接的事件,使用EPOLLONESHOT可以防止多个线程同时处理同一个客户端连接的可读事件,从而避免数据混乱。
  3. 与锁的不同之处
    • 作用范围和对象不同:
      • 锁主要用于保护共享资源(如共享变量、共享数据结构等),它是在内存资源访问层面起作用。而EPOLLONESHOT主要是针对epoll事件通知机制,用于控制文件描述符事件的处理流程,作用对象是文件描述符及其相关的事件。
    • 实现机制不同:
      • 锁的实现通常基于操作系统提供的原子操作、互斥量等机制,通过阻塞或等待来控制资源访问。而EPOLLONESHOTepoll内部的一种事件控制策略,通过内核自动调整文件描述符在epoll就绪事件列表中的状态来实现其功能,它不涉及像锁那样的阻塞等待其他线程释放资源的过程。
    • 应用场景侧重点不同:
      • 锁更多地用于解决多线程(或多进程)环境下的数据一致性和并发访问冲突问题,例如在多个线程同时读写共享数据时保证数据的正确性。EPOLLONESHOT主要用于优化epoll事件驱动的程序设计,确保每个文件描述符的事件在合适的时机被正确处理一次,更侧重于事件处理的顺序和完整性。

epoll优缺点

优点:高效

缺点:不能跨平台

epoll实现多路IO转接服务器

linux网络编程 | c | epoll实现IO多路转接服务器-CSDN博客

4.三组 I/O 复用函数的比较

事件集

1
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);

select函数的参数类型fd_set没有将文件描述符和事件绑定,它仅仅是一个文件描述符集合,因此select函数需要3个fd_set类型参数来分别传入和输出可读、可写、异常事件,这使得select函数不能处理更多类型的事件,另一方面,由于内核对fd_set集合的修改,应用进程下次调用select前不得不重置这3个fd_set集合。

1
2
3
4
5
6
7
int poll(struct pollfd* fds, nfds_t nfds, int timeout);

struct pollfd{
int fd; // 文件描述符
short events; // events成员告诉poll函数监听fd成员上的哪些事件,它是一系列事件的按位或
short revents; // 由内核修改,以通知应用程序fd上实际发生了哪些事件。
};

poll函数的参数类型pollfd把文件描述符和事件都定义其中,任何事件都被统一处理,从而使得编程接口简洁地多,且内核每次修改的是pollfd结构体revents成员,而events成员保持不变,因此下次调用poll时应用进程无需重置pollfd类型的事件集参数。

1
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

由于每次selectpoll函数都返回整个用户注册的事件集合(包括就绪和未就绪的),所以应用索引就绪文件描述符的时间复杂度为O(n)epoll则采用与selectpoll函数不同的方式来管理用户注册的事件,它在内核中维护一个事件表,并提供一个独立的系统调用epoll_ctl来往内核事件表中添加、删除、修改事件,这样,每次epoll_wait调用都直接从内核表中取得用户注册的事件,而无须反复从用户空间读入这些事件,epoll_wait函数的events参数仅用来返回就绪的事件,这使得应用进程索引就绪文件描述符的时间复杂度达到O(1)

最大支持的文件描述符数

pollepoll_wait函数分别用nfdsmaxevents参数指定最多监听多少文件描述符和事件,这两个数值都能达到系统允许打开的最大文件描述符数目,即65535(cat /proc/sys/fd/file-max)。而select函数允许监听的最大文件描述符数量通常有限制,虽然用户可以修改这个限制,但这可能导致不可预期的后果。

  1. epoll内部机制并非简单的数组结构
    • epoll不受 1024 个文件描述符限制不是因为使用可以随意扩大的数组。实际上,epoll内部主要使用了红黑树和链表两种数据结构来管理文件描述符和事件。
    • 红黑树用于高效管理文件描述符:
      • epoll使用红黑树来存储要监视的文件描述符及其相关的事件信息。红黑树是一种自平衡二叉搜索树,它的插入、删除和查找操作的时间复杂度都是 。这使得epoll在添加、删除和查找文件描述符时能够保持高效,无论文件描述符的数量有多少。
      • 例如,当通过epoll_ctl函数添加一个文件描述符到epoll实例中时,epoll会将这个文件描述符及其事件信息插入到红黑树中。红黑树会自动根据文件描述符的值进行排序和平衡,方便后续的查找和操作。这种结构不会像数组那样受到固定大小的限制,并且能够高效地处理大量的文件描述符。
    • 链表用于事件通知:
      • 当文件描述符对应的事件发生时,epoll会将这个文件描述符添加到一个就绪事件链表中。epoll_wait函数返回时,就是从这个就绪事件链表中获取已经发生事件的文件描述符列表。这样的设计使得epoll可以高效地通知应用程序哪些文件描述符发生了事件,而不需要像select那样遍历所有可能的文件描述符。
  2. 与传统select基于数组的机制对比
    • select的限制:
      • select模型中,使用固定大小的数组(以位图形式)来表示文件描述符集合。这个数组的大小通常是有限的,并且在许多系统中被限制为 1024 个文件描述符。这是因为select需要遍历这个数组来检查每个文件描述符是否有事件发生,当文件描述符数量过多时,遍历的效率会变得很低,并且固定大小的数组无法轻易扩展。
    • epoll的优势:
      • epoll通过红黑树和链表的组合,避免了select的这种限制。它能够动态地管理大量的文件描述符,并且在事件通知方面更加高效,不受固定大小的文件描述符集合的限制。无论是添加新的文件描述符、删除已有的文件描述符,还是等待和处理事件,epoll的机制都能够很好地适应不同规模的文件描述符数量。

工作模式

selectpoll函数只能工作在相对低效的LT模式,而epoll函数能工作在高效的ET模式,且epoll函数还支持EPOLLONESHOT事件,该事件能进一步减少可读、可写、异常事件触发的次数。

具体实现

从实现原理上说,selectpoll函数采用的都是轮询方式,即每次调用都要扫描整个注册文件描述符集合,因此它们检测就绪事件的算法时间复杂度是O(n)

epoll_wait函数采用回调的方式,内核检测到就绪的文件描述符时,将触发回调函数,回调函数将该文件描述符上对应的事件插入内核就绪事件队列,然后内核在适当的时机将该就绪事件队列中的内容拷贝到用户空间,因此epoll_wait函数无须轮询整个文件描述符集合来检测哪些事件已经就绪,其算法时间复杂度为O(1)

当活动连接比较多时,epoll_wait 的回调函数被触发的过于频繁,会导致 epoll_wait 的效率反而降低,因此 epoll_wait 适用于连接数量多,但是活动连接较少的情况。

img

5.I/O 复用的高级应用一:非阻塞 connect

connect系统调用的man手册中有如下一段内容:

img

这段话描述了connect函数出错时的一种errno值(EINPROGRESS),这种错误发生在对非阻塞的socket调用connect,而连接又没有立即建立时,此时,我们可以调用selectpoll等函数来监听这个连接失败的socket上的可写事件,当selectpoll等函数返回后,再利用getsockopt函数来读取错误码并清除该socket上的错误,如果错误码是0,表示连接成功建立,否则连接建立失败。

通过非阻塞connect,我们就能同时发起多个连接并一起等待。但是这种方法存在几处移植性问题。

6.I/O 复用的高级应用二:聊天室程序

像ssh这样的登录服务通常需要同时处理套接字描述符和用户输入输出描述符,这可用I/O复用来实现,下面用poll函数为例实现一个简单的聊天室程序,该聊天室程序能让所有用户同时在线群聊,它分为客户端和服务器两部分。

客户端有两个功能

  • 一是从标准输入终端读入用户数据,并将用户数据发送至服务器;
  • 二是往标准输出终端打印服务器发来的数据。

服务器的功能是接收客户数据,并把客户数据发送给每个登录到该服务器上的客户端(数据发送者除外)。

客户端程序使用poll函数同时监听用户输入和网络连接,并利用splice函数将用户输入内容直接定向到网络连接上发送,从而实现数据零拷贝,提高了程序执行效率。splice函数

服务器使用poll函数同时管理监听socket和连接socket,且使用牺牲空间换取事件的策略来提高服务器性能。

7.I/O 复用的高级应用三:同时处理 TCP 和 UDP 服务

以上讨论的服务器程序只监听一个端口,实际应用中,有些服务器程序能同时监听多个端口,如超级服务器inetd和android的调试服务adbd。

bind系统调用的参数来看,一个socket只能与一个socket地址绑定,即一个socket只能用来监听一个端口,因此,如果服务器要同时监听多个端口,就必须创建多个socket,并将它们分别绑定到各个端口上,这样,服务器就需要同时管理多个监听socket,这可使用I/O复用技术实现。另外,即使是同一个端口,如果服务器要同时处理该端口上的TCP和UDP请求,也需要创建两个不同的socket,一个是流socket,另一个是数据报socket,并将它们都绑定到该端口上。

8.超级服务 xinetd

Linux因特网服务inetd是超级服务,它同时管理着多个子服务,即监听多个端口,现在Linux上使用的inetd服务程序通常是其升级版本xinetd,xinetd程序的原理与inetd的相同,但增加了一些控制选项,并提高了安全性。