本网站相关文章:

阻塞 IO

说明

首先, 默认情况下, 大部分的系统调用都是阻塞 IO. 当发生 IO 中断的时候, 整个进程会被阻塞, 直到想要的数据获取到之后, 进程才会接触阻塞, 重新开始运行, 其流程如下:

阻塞IO

这里recvfrom函数一旦调用就会阻塞等待客户端传递数据, 如果一直没有数据就会一直等待(理论上). 当然, 此类方法极大的降低了程序的执行效率, 在此模式下, 服务器只能通过增加进程或线程来提高对外服务的并发数量, 这会产生同步编程到异步编程技术演进中提及的 C10K 问题.

实例

常见的阻塞 socket 服务器代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def main():
""" 监听指定的端口, 启动一个简单的socket监听服务器 """
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('127.0.0.1', 8000))
serversocket.listen(5)

print('Start server: http://127.0.0.1:8000')
try:
while True:
conn, address = serversocket.accept()
handle_context(conn, address)
except BaseException as e:
print('Occur a exception: {}'.format(e))
finally:
serversocket.close()

非阻塞 IO

说明

非阻塞 IO不同于阻塞 IO, 其调用会立刻返回, 将整个主动权交由开发者, 一旦 IO 数据未准备完成, 立刻返回error, 后续由开发者自己决定是否继续轮询等待数据完成.

非阻塞 IO

实例

我们可以调整第一章的阻塞 IO 例子为非阻塞逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def main():
""" 监听指定的端口, 启动一个简单的socket监听服务器 """
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 开启非阻塞
serversocker.setblocking(False)
serversocket.bind(('127.0.0.1', 8000))
serversocket.listen(5)

print('Start server: http://127.0.0.1:8000')
try:
while True:
try:
conn, address = serversocket.accept()
except Exception as msg:
# 非阻塞, 没有连接的时候会抛出异常
time.sleep(1)
else:
handle_context(conn, address)
except BaseException as e:
print('Occur a exception: {}'.format(e))
finally:
serversocket.close()

最后, 注意非阻塞 IO 和异步 IO 的区别:

  • 前者虽然是非阻塞的, 但是仍然需要调用者不断的 roll polling 以获取任务当前的状态或者接口.
  • 后者在任务处理完成之后会通过指定的系统调用触发处理完成信号, 有点类似信号驱动 IO 逻辑, 但是一部 IO 会自动完成数据从内核到用户台的拷贝工作

IO 多路复用

说明

IO 多路复用就是目前最为普遍 IO 调用, select, poll, epoll都是归属于 IO 多路复用, 其又可以称为event driven IO. IO 多路复用成功的解决了大量并发连接发生时, 使用多线程/多进程进行阻塞调用时导致的服务器资源不足问题, 确保服务器的并发/并行效率大大提高.

多路复用 IO

select, poll, epoll他们的实现机制是不一样的,目前 epoll/kqueue已经替代前面两者成为目前大部分事件驱动 IO的底层实现机制. 注意, 在连接数不是非常高的前提下, IO 多路复用服务器性能并不一定比多进程+阻塞IO的服务器性能好, 此时因为 IO 多路复用的内在实现逻辑可能造成延时更大的情况. IO 多路复用的优势在于大量连接存在时能够平稳的对外提供相同的服务. 另外, 注意, IO 多路复用仍然属于阻塞IO范畴, 在 RW 事件就绪之后需要自己负责读写操作, 其相比多线程和多进程的最大优势就是:

  • 系统开销少, 不需要额外创建新线程和新进程
  • 代码维护性强, 不需要维护自己创建的线程和进程

IO 多路复用的使用场景:

  • 服务器同时处理 TCP 和 UDP
  • 服务器同时处理多个服务
  • TCP 服务器同时处理监控套接字和已连接套接字

在介绍这三种多路复用系统调用之前, 我们先简单的了解下如下知识点: 在某一个文件描述符已经准备就绪时, 系统内核如何通知等待进程? 其通知方式有如下两种:

  • 水平触发通知: 只要 FD 可以非阻塞的执行 IO 系统调用(仍然有数据可以读取), 则内核即认为该 FD 已就绪, 所以等待进程对于该 FD 内容可以多次读取, 每次 select 或 poll 调用一次读取一点, 当然这样性能可能比较大
  • 边缘触发通知: 只有检测到 FD 从上次开始到现在有新的 IO 获取(输入)时才会触发通知, 即使 FD 中还存在上次未读取的数据, 此时内核也认定为该 FD 未准备就绪.

好了, 下面我们用 C 语言简单演示下这三者的异同和实现方式.

select

select 函数以及各个参数说明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
nfds: 待监听的最大fd值 + 1
readfds: 待监听的可读文件fd集合
writefds: 待监听的可写文件fd集合
exceptfds: 待监听的异常文件fd集合
timeout: 指明愿意等待的时间
return返回: 返回满足条件的fd数量和,如果出错返回-1,如果是超时返回0

其中readfds, writefds, eceptfds可以为NULL, 表示不监控该类型描述符
*/
int select(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
// 其他配套函数
void FD_ZERO(fd_set *fdset); // 清空
void FD_SET(int fd, fd_set *set); // 将fd添加到set集合中
void FD_CLR(int fd, fd_set *set); // 将fd从set集合删除
int FD_ISSET(int fd, fd_set *set); // 检查set中的fd是否已经准备好了

其基本的实现逻辑:

  1. 初始化 fdset
  2. 分配 fd 给对应的 fdset
  3. 调用 select 函数等待监控到的 RW 事件
  4. 对 fd 进行检查判断是否已经准备好了FD_ISSET

其简单代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
int main()
{
fd_set original_socket, original_stdin;
fd_set readfds, writefds;
struct timeval tv;
int numfd, receive;
int sockfd = 0;
unsigned int addrLen;
char receive_data[MAX_LENGTH];
struct sockaddr_in caddr, saddr;
bool rst;

// clear the set ahead of time
// 清空
FD_ZERO(&original_socket);
FD_ZERO(&original_stdin);
FD_ZERO(&readfds);
FD_ZERO(&writefds);

// 服务器地址初始化: IP地址族, bind
rst = socket_init(&sockfd, &saddr);
if (rst != true) {
return -1;
}

// add our descriptors to the set (0 - stands for STDIN)
// 将sockfd存入到备份描述符集合/读描述符集合中
FD_SET(sockfd, &original_socket);//instead of 0 put sockfd
FD_SET(sockfd, &readfds);
FD_SET(0, &original_stdin);
FD_SET(0, &writefds);

// 设置描述符最大值
numfd = sockfd + 1;
addrLen = sizeof(struct sockaddr);
printf("\nUDP_Server Waiting for client to respond...\n");
printf("Type (q or Q) at anytime to quit\n");
fflush(stdout);

while (1) {
tv.tv_sec = 1;
tv.tv_usec = 500000;
// 从备份中读取描述符信息
readfds = original_socket;
writefds = original_stdin;

receive = select(numfd, &readfds, &writefds,/*NULL,*/ NULL, &tv);
if (receive == -1) {
_error("select"); // error occurred in select()
} else if (receive == 0) {
printf("Timeout occurred! No data after 10.5 seconds.\n");
} else {
// one or both of the descriptors have data
if (FD_ISSET(sockfd, &readfds)) {
FD_CLR(sockfd, &readfds);
// 使用recvfrom进行数据的接收
read_msg(sockfd, receive_data, &caddr, &addrLen);
} else {
printf("\nOOPS! What happened? SERVER");
}
} //end else
}//end while

close(sockfd);
return 0;
}

通过对比就可以上面的代码, 在多线程和多进程非阻塞 IO 架构中, 每次来个连接就需要单独启动一个线程或者进程去额外处理这些连接, 但是在 select 中可以一次性同时监听 1024 个连接并且不需要额外创建 1024 个线程或者进程, 这样使得C10K问题得以部分缓解. 当然, select 作为最早的 IO 多路复用技术, 其本身有一定的缺点:

  • 每次调用 select, 实际上都是将 fd 集合从用户态拷贝到内核态, 这个开销随着 fd 的增加而增加, 每次拷贝或其他操作都需要轮询传进来的 fd
  • 单个进程能够监视的文件描述符数量存在最大限制, 默认情况下 Linux 系统为 1024, 如果想要提升需要更改相关宏定义并重新编译内核.
  • 获取 FD 状态时都需要重新遍历最大 FD 个数, 降低查询效率

poll

select 和 poll 都是传统的 UNIX 多路复用技术, 他们的基本原理和性能没有太大的区别, 主要是各自的实现方式或者调用方式不同, poll 与 select 不同, 其将某一个 FD 感兴趣的事件(R, W)放到一个结构体中, 传入的结构体数组中可以有两个相同的 FD, 最终向内核传递一个 FD 结构体列表而非 select 的三个 FD 列表, 其基本数据结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct pollfd {
int fd; // 监控的文件描述符
short events; // 在该文件描述符上"感兴趣"的事件
short revents; // 在该文件描述符上"实际发生"的事件
};

/*
fds: 文件描述符集合
nfds: 数组fds元素个数, 注意相比select不是FD最大值而是当前需要监控的fd列表长度
timeout: 阻塞行为
-1: 一直阻塞直到 fds 数组中列出的文件描述符有一个达到就绪态
0: 不会阻塞,只是执行一次检查看看哪个文件描述符处于就绪态
>0: 至多阻塞 timeout 毫秒,直到 fds 列出的文件描述符中有一个达到就绪态,或者直到捕获到一个信号为止
*/
int poll (struct pollfd *fds, unsigned int nfds, int timeout);

注意, 通过对比 select 和 poll 可以发现 poll 有两个相比 select 更好的有点:

  • fds 是一个动态数组, 这个相比 select 的fd_set(固定数量数组, 1024)能够容纳无限的 FD 数量
  • select 每次调用都必须遍历一遍 nfds(1024 最大文件描述符数量)以便查明到底需要检查哪个文件描述符, 但是 poll 调用时内核仅仅需要检查我们指定的文件描述符链表即可.

epoll

在讲解 epoll 之前我们先简单的介绍下 select 和 poll 系统调用在大量文件描述符存在时的性能问题, 其中 select 的缺点在上面两个章节已经有过描述.

  • 时间复杂度较高: 每次调用 select 和 poll, 内核都必须轮询传递过来的 FD 文件描述符以判断是否就绪, 其中 select 更加离谱
  • 内核空间和内核空间文件描述符的拷贝: 其中 poll 的 FD 结构体比较大, 最主要的原因是每次调用都需要传递新的 FD 列表, 内核空间根本不记录这些 FD 信息, 从而造成极大的性能损耗

那么 epoll 是怎么解决上面的问题的呢? 一个 epoll 的基本流程如下:

  1. 通过 epoll_create 系统调用创建 epoll 实例, epoll 实例本身也是个文件, 所以会返回一个代表 epoll 实例的文件描述符 epfd
  2. 通过 epoll_ctl 系统调用修改 FD 兴趣列表并同时记录到内核空间中(这个很重要), 即传入内核空间的数据不会有重复, 做增删改即可.
  3. 通过 epoll_wait 系统调用阻塞等待文件描述符就绪事件

通过这个流程, epoll_ctl指定了需要监视的文件描述符 FD 时, 内核空间中就会相应的增加该 FD, 而当 IO 就绪时内核就会在 epoll 就绪列表中增加生效的 FD 以便等待 epoll_wait 调用获取就绪的 FD 列表, 在整个流程中有两个关键点:

  • 内核空间本身持有一份感兴趣的 FD 数据结构, 每次epoll_ctl仅仅传递某一个感兴趣文件描述符, 不需要像 select 或者 poll, 每次调用都从用户空间往内核空间拷贝所有 FD, 因为调用实在太频繁了, 过多重复数据影响性能
  • 内核空间的特殊数据结构极大的加快了 FD 轮询速度

下面是 epoll 三个基本函数的简单说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 1. 创建epfd
int epoll_create(int size);

# 2. 添加fd
typedef union epoll_data {
void *ptr; // 用户自定义数据
int fd; // 待监控的文件描述符
uint32_t u32; // 一个 32 位整数
uint64_t u64; // 一个 64 位整数
} epoll_data_t;

struct epoll_event {
uint32_t events; // 在该文件描述符上感兴趣的事件集合,用掩码表示
epoll_data_t data; // 用户数据,当描述符 fd 稍后成为就绪态时,可用来指定传回给调用进程的信息
};

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);

# 3. 阻塞等待
int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);

异步 IO

说明

异步 IO 有点类似 IO 多路复用, 但是其仅仅针对某一个 fd, 在函数调用之后(类似注册)立刻返回, 后续内核会监听到数据准备完成之后, 给进程发送一个signal通知以便进行数据读写. 异步 IO 实际上分为信号驱动 IO(Signal Driven IO), 异步 IO, 这两者差别主要在实现方式上.

另外, 异步 IO 和信号驱动 IO 虽然也是非阻塞 IO, 但是后两者在 FD 数据为空的时候进程不需要进行轮询浪费 CPU 资源, 不需要休眠阻塞, 可以并发做其他事情并等待内核主动通知回调函数.

异步 IO

那么同步 IO(synchronous IO) 和 异步 IO(asynchronous)的区别是什么呢?

  • 同步 IO: 进行真实 IO 操作时, 进程会直接阻塞以便进行数据的传输, 即使是 IO 多路复用也是会轮询
  • 异步 IO: 进程发起 IO 操作之后, 直接返回, 之后一旦受到数据内核会自动开始传输, 完成之后通知进程

从上面可以了解异步 IO在真正发生 IO 以进行数据传输的那一刻实际上没有阻塞进程进行, 而同步 IO 不管是否阻塞, 在真正发生 IO 的那一刻都会立刻阻塞进程已进行数据的传输.上面的几个 IO 模型中, 属于同步 IO的有:

  • 阻塞 IO
  • 非阻塞 IO(虽然立刻返回, 但是在轮询并存在 IO 数据的时候, 立刻阻塞等待数据返回)
  • IO 多路复用

python 环境

一个异步 IO 模型一般都需要消息循环机制, 在消息循环主线程中不断的进行读取msg-处理msg的 action:

1
2
3
4
loop = get_event_loop()
while True:
event = loop.get_event()
process_event(event)

每次收到 FD 就绪时阻塞 IO 需要同步等待 IO 完成才可以继续进行其他逻辑(见 IO 多路复用的图示), 而异步 IO 在IO就绪->IO完成过程中主线程仍然可以继续处理其他消息(并非真正的并行啊). python 中常见的异步 IO:

  • asyncio
  • async/await
  • aiohttp

C 环境

linux 2.6 版本之后在 C 语言环境可以使用 aio 达到异步 IO 的效果, 其提供了一些基本的 API:

1
2
3
4
5
6
7
8
9
10
int aio_read(struct aiocb *paiocb); // 异步读操作, -1: 失败, 0-成功
int aio_write(struct aiocb *paiocb); // 异步写操作
int aio_error(struct aiocb *paiocb); // 获取其参数指定的读写操作的状态
ssize_t aio_return(struct aiocb *paiocb); // 用来返回其参数指定I/O操作的返回值

// 挂起当前进程等待事件完成(阻塞等待)
int aio_suspend(const struct aiocb *const cblist[],int n,const struct timespec *timeout);
int aio_cancel(struct aiocb *paiocb);
// 发起多个或多种I/O请求, 一次调用完成大量IO
int lio_listio(int mode,struct aiocb *list[],int nent,struct sigevent *sig);

上面已经简单介绍过异步 IO 有两种方式: 信号 IO, 异步 IO. 类似的, 当一个 IO 事件完成之后, 有两种方式进行通知:

  • 信号通知事件完成
  • 回调函数通知事件完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
void aio_completion_handler(sigval_t sigval)
{
//用来获取读aiocb结构的指针
struct aiocb *prd;
int ret;

prd = (struct aiocb *)sigval.sival_ptr;
printf("hello\n");
//判断请求是否成功
if(aio_error(prd) == 0)
{
//获取返回值
ret = aio_return(prd);
printf("读返回值为:%d\n",ret);
}
}

int main(int argc,char **argv)
{
// init
//填充aiocb的基本内容
bzero(&rd,sizeof(rd));

rd.aio_fildes = fd;
rd.aio_buf = (char *)malloc(sizeof(BUFFER_SIZE + 1));
rd.aio_nbytes = BUFFER_SIZE;
rd.aio_offset = 0;

//填充aiocb中有关回调通知的结构体sigevent
rd.aio_sigevent.sigev_notify = SIGEV_THREAD;//使用线程回调通知
rd.aio_sigevent.sigev_notify_function = aio_completion_handler;//设置回调函数
rd.aio_sigevent.sigev_notify_attributes = NULL;//使用默认属性
rd.aio_sigevent.sigev_value.sival_ptr = &rd;//在aiocb控制块中加入自己的引用

//异步读取文件
ret = aio_read(&rd);
if(ret < 0)
{
perror("aio_read");
}
return 0;
}

最终让我们简单看下异步读和异步写的简单实例从而更好的了解 aio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// -------------->Tip1: 读: 对一个FD进行异步读操作
int aio_read(struct aiocb *paiocb);

// 1. 对rd结构体进行初始化
//将rd结构体清空
bzero(&rd,sizeof(rd));
//为rd.aio_buf分配空间
rd.aio_buf = malloc(BUFFER_SIZE + 1);
//填充rd结构体
rd.aio_fildes = fd;
rd.aio_nbytes = BUFFER_SIZE;
rd.aio_offset = 0;

// 2. 部分实例
ret = aio_read(&rd);
if (ret < 0) {
exit(1);
}
// 循环等待
counter = 0;
while (aio_error(&rd) == EINPROGRESS) {
printf('次数:%d', counter++);
}
// 获取异步值
ret = aio_return(&rd)
printf("\n\n返回值为:%d",ret);

// -------------->Tip2: 写
wr.aio_buf = str;
//填充aiocb结构
wr.aio_fildes = fd;
wr.aio_nbytes = 1024;
//异步写操作
ret = aio_write(&wr);
if(ret < 0) {
perror("aio_write");
}

//等待异步写完成
while(aio_error(&wr) == EINPROGRESS) {
printf("hello,world\n");
}
//获得异步写的返回值
ret = aio_return(&wr);

参考