译|Python幕后(12):async-await 机制

2023-01-16 0 1,065

系列文章地址这是一篇译文,原文地址

在函数前标记 async 关键字,再以 await 关键字调用它,你的程序突然就变成异步的了——可以在等待一件事,比如 I/O 操作的同时做其它事情。

带 async/await 关键字的代码看上去和普通同步代码没什么区别,但运行方式完全不同。要理解其中逻辑,就必须了解很多重要概念,包括并发、并行、事件循环、I/O 多路复用、异步、协作多任务与协程等。而在 Python 中,你还必须了解生成器、基于生成器的协程、原生协程、yield 与 yield from 关键字等。由于其复杂性,很多使用 async/await 的程序员并不了解其原理。我认为,只要从基础概念着手, async/await 模式是可以用简单的语言解释清楚的,这就是我们今天要做的。

注意:文章中默认使用 CPython 3.9,随着 CPython 的演化,一些实现细节可能会有所调整,我会尽力留意一些重要变化并更新文章内容。

1. 并发

计算机是按顺序,一个指令接一个指令地执行程序的。但一个程序通常要运行多种任务,有时无法等待一个任务完成后再开始另一个,比如说,一个下棋应用必须在等待玩家操作的同时更新计时数字。这种同时执行多个任务的能力称为并发。并发并不是说这些任务必须在同一个物理时间执行,它们也可以交替运行:某个任务运行一段时间后挂起,让其它任务运行,之后再回来运行这个任务。基于这种机制,只有几个核心的设备可以运行成千上万个程序。在多核计算机或者计算集群中,如果多个任务于同一个物理时间运行,则称并行,是一种特殊的并发场景。

译|Python幕后(12):async-await 机制
concurrency

哪怕没有任何编程语言层面的支持,你也可以写出并发代码。比如要执行两个任务,分别封装在两个函数中:

def do_task1(): # … def do_task2(): # … def main(): do_task1() do_task2()

如果任务之间是独立的,则可以将其拆分成多个部分,交替调用:

def do_task1_part1(): # … def do_task1_part2(): # … def do_task2_part1(): # … def do_task2_part2(): # … def main(): do_task1_part1() do_task2_part1() do_task1_part2() do_task2_part2()

当然,上面这个例子有点过度简化了。我想说的是,编程语言本身并不会影响你写并发程序的能力,但可以让你更容易地写出并发程序。我们今天要讨论的 async/await 就是一个例子。

为理解从普通并发到 async/await 的演变过程,我们会写一个真实的并发程序——可以同时响应多个客户端请求的 TCP 服务器。一开始是最简单的,无并发的顺序处理版本,然后使用系统线程实现并发,再通过单线程 I/O 多路复用和事件循环实现一个版本,进而是基于生成器、协程实现的版本,最后引入 async/await。

2. 顺序处理服务器

只处理一个连接的 TCP 服务器实现起来比较直接。服务器在某个端口上监听请求,与某个客户端建立连接后与其通信,直到连接关闭,然后监听新的连接请求。可以用一个 socket 程序实现:

# echo_01_seq.py import socket def run_server(host=127.0.0.1, port=55555): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.listen() while True: client_sock, addr = sock.accept() print(Connection from, addr) handle_client(client_sock) def handle_client(sock): while True: received_data = sock.recv(4096) if not received_data: break sock.sendall(received_data) print(Client disconnected:, sock.getpeername()) sock.close() if __name__ == __main__: run_server()

这段代码后面会一直用到,可以仔细看看。如果想温习一下 socket 相关知识,可以参考 Beej 网络编程指引socket 模块文档,上面代码的逻辑可以描述如下:

socket.socket() 创建一个新的 TCP/IP 套接字;sock.bind() 将套接字绑定到一个地址和端口上;sock.listen() 将套接字标记为监听状态;sock.accept() 建立新的连接;sock.recv() 从客户端接收数据,sock.sendall() 将数据发送回客户端;

这个版本的服务器并不支持并发,多个客户端同时连接时,其中一个连接成功并占用服务器,其它客户端必须等待该客户端断开连接后才能连接。我写了一个模拟程序来演示其运行结果:

$ python clients.py [00.097034] Client 0 tries to connect. [00.097670] Client 1 tries to connect. [00.098334] Client 2 tries to connect. [00.099675] Client 0 connects. [00.600378] Client 0 sends “Hello”. [00.601602] Client 0 receives “Hello”. [01.104952] Client 0 sends “world!”. [01.105166] Client 0 receives “world!”. [01.105276] Client 0 disconnects. [01.106323] Client 1 connects. [01.611248] Client 1 sends “Hello”. [01.611609] Client 1 receives “Hello”. [02.112496] Client 1 sends “world!”. [02.112691] Client 1 receives “world!”. [02.112772] Client 1 disconnects. [02.113569] Client 2 connects. [02.617032] Client 2 sends “Hello”. [02.617288] Client 2 receives “Hello”. [03.120725] Client 2 sends “world!”. [03.120944] Client 2 receives “world!”. [03.121044] Client 2 disconnects.

客户端连接服务器,发送两条消息后断开,每条消息的输入需要 0.5 秒,处理三个客户端的请求总共花费 3 秒。不过,任意客户端可能占用服务器任意长的时间,因此,有必要实现并发服务器。

3. 系统线程

实现并发服务器最简单的方式是使用系统线程,只需在另一个线程执行 handle_client() 函数即可,其它代码保持不变:

# echo_02_threads.py import socket import threading def run_server(host=127.0.0.1, port=55555): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.listen() while True: client_sock, addr = sock.accept() print(Connection from, addr) thread = threading.Thread(target=handle_client, args=[client_sock]) thread.start() def handle_client(sock): while True: received_data = sock.recv(4096) if not received_data: break sock.sendall(received_data) print(Client disconnected:, sock.getpeername()) sock.close() if __name__ == __main__: run_server()

此时,多个客户端可以同时与服务器通信:

$ python clients.py [00.095948] Client 0 tries to connect. [00.096472] Client 1 tries to connect. [00.097019] Client 2 tries to connect. [00.099666] Client 0 connects. [00.099768] Client 1 connects. [00.100916] Client 2 connects. [00.602212] Client 0 sends “Hello”. [00.602379] Client 1 sends “Hello”. [00.602506] Client 2 sends “Hello”. [00.602702] Client 0 receives “Hello”. [00.602779] Client 1 receives “Hello”. [00.602896] Client 2 receives “Hello”. [01.106935] Client 0 sends “world!”. [01.107088] Client 1 sends “world!”. [01.107188] Client 2 sends “world!”. [01.107342] Client 0 receives “world!”. [01.107814] Client 0 disconnects. [01.108217] Client 1 receives “world!”. [01.108305] Client 1 disconnects. [01.108345] Client 2 receives “world!”. [01.108395] Client 2 disconnects.

$每$个线程处理一个客户端请求的方式实现起来很容易,但无法支持高并发量。系统线程会消耗大量内存,能同时运行的数量有限。例如,本网站所用的 Linux 服务器最多可以同时运行 8k 个线程,实际上可能不到这个数量就崩溃了。因此,服务器只能在高负载下艰难维持,并且很容易遭受 DoS 攻击。

程池中线程的最大数量,服务器不会创建太多线程。下面是基于线程池实现的服务器版本,使用了concurrent.futures 标准库:

# echo_03_thread_pool.py import socket from concurrent.futures import ThreadPoolExecutor pool = ThreadPoolExecutor(max_workers=20) def run_server(host=127.0.0.1, port=55555): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.listen() while True: client_sock, addr = sock.accept() print(Connection from, addr) pool.submit(handle_client, client_sock) def handle_client(sock): while True: received_data = sock.recv(4096) if not received_data: break sock.sendall(received_data) print(Client disconnected:, sock.getpeername()) sock.close() if __name__ == __main__: run_server()

线程池版本的服务器既简单又实用,但要防止单个客户端长时间占用某个线程。为此,你可以主动断开长连接、要求客户端维持一个最低吞吐率、让线程将任务重新放回队列,或采取组合策略等。这里要说的是,使用系统线程实现并发并不像看上去那么简单,我们还需探索更有效的并发方式。

4. I/O 多路复用与事件循环

我们再来看一下顺序服务器,会发现它总是在等待某个事件发生,没有连接时,它等待新的客户端连接,有连接后,又等待客户端发送数据。为了实现并发,应该

怎么让服务器知道下一个要处理的是什么事件呢?默认情况下,socket 的 accept()、recv()、sendall() 等方法都是阻塞的,调用 accept() 时,会保持阻塞状态,直到新的客户端接入,并不能同时调用其它客户端 socket 的 recv() 方法。不过,我们可以对阻塞方法设置超时,sock.settimeout(timeout),或将 socket 设置为非阻塞模式,sock.setblocking(False),然后同时保持多个 socket 连接,并在一个无限循环中调用每个 socket 对应的事件方法。对于还在监听新连接的 socket,就调用 accept(),对于等待客户端数据的 socket,就调用 recv()。

这个方法的问题在于,轮询时间很难准确配置。如果所有 socket 都设置为非阻塞模式,或超时时间设置得太短,服务器就会一直执行调用,消耗大量 CPU,如果超时时间设置得太长,又会导致响应很慢。

一个更好的选择是询问操作系统哪个 socket 已经就绪。显然,操作系统是掌握这个信息的,新数据包到达网络接口后,会通知操作系统,操作系统随即将其解码并唤醒正在等待读取该 socket 的进程。对于处理进程来说,除了等待读取该 socket,还可以通过 I/O 多路复用机制告诉操作系统,它准备读或写哪些 socket,如select()poll()epoll()等,当某个 socket 可读或可写时,操作系统也会唤醒该进程。

Python selectors标准库封装了不同的 I/O 多路复用机制,暴露的高层接口称为选择器(selector)。其中,SelectSelector 对应 select() 机制,EpollSelector 对应 epoll() 机制,而 DefaultSelector 对应当前操作系统支持的效率最高的机制。

下面一起来看看如何使用 selectors 库。首先,创建一个选择器:

sel = selectors.DefaultSelector()

然后调用 register() 方法,传入 socket 、监听的事件(socket可读写)以及辅加数据,注册监听:

sel.register(sock, selectors.EVENT_READ, my_data)

最后,调用 select() 方法:

keys_events = sel.select()

这个方法会返回一个列表,内容为 (key, events) 元组,每个元组表示一个就绪的 socket:

key 是一个对象,存储着 socket(key.fileobj)及该 socket 的辅加数据(key.data);events 是一个掩码,表示就绪的事件(selectors.EVENT_READ 或 selectors.EVENT_WRITE);

调用 select() 方法时,如果有 socket 就绪,则立即返回,如果没有,则阻塞,直到某个 socket 就绪。操作系统阻塞 select() 方法的逻辑与阻塞 socket 的 recv() 等方法的逻辑是一致的。

不再监听某个 socket 时,可以通过 unregister() 方法取消注册。

那么,socket 就绪之后怎么操作呢?这个问题我们在注册 socket 的时候是心里有数的,因此,每个 socket 注册的时候都可以带上一个就绪时的回调函数。这也正是附加数据的用途。

现在,我们可以通过 I/O 多路复用实现一个单线程版本的并发服务器:

# echo_04_io_multiplexing.py import socket import selectors sel = selectors.DefaultSelector() def setup_listening_socket(host=127.0.0.1, port=55555): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.listen() sel.register(sock, selectors.EVENT_READ, accept) def accept(sock): client_sock, addr = sock.accept() print(Connection from, addr) sel.register(client_sock, selectors.EVENT_READ, recv_and_send) def recv_and_send(sock): received_data = sock.recv(4096) if received_data: # assume sendall wont block sock.sendall(received_data) else: print(Client disconnected:, sock.getpeername()) sel.unregister(sock) sock.close() def run_event_loop(): while True: for key, _ in sel.select(): callback = key.data sock = key.fileobj callback(sock) if __name__ == __main__: setup_listening_socket() run_event_loop()

先给监听的 socket 注册一个 accept() 回调,接受新连接请求并对每个客户端 socket 注册一个 recv_and_send() 回调。程序的核心逻辑是一个事件循环——迭代处理就绪 socket,调用对应回调的无限循环。

事件循环版本的服务器可以正常处理多个客户端连接。与多线程版本相比,主要问题在于代码的组织方式比较奇怪,是围绕着回调实现的。上面的代码看着还好,是因为有些逻辑的处理并不严谨,比如,写 socket 的逻辑也可能在写队列满时被阻塞,我们现应该检查 socket 是否可写,然后再调用 sock.sendall() 方法,也就是说,recv_and_send() 函数需要一分为二,其中一个再次注册为回调。如果服务器逻辑复杂一些,而不是直接返回客户端数据的话,代码实现会更麻烦。

使用系统线程时,我们可以在非回调模式下实现并发。为什么呢?关键在于操作系统有挂起和恢复线程执行的能力。如果我们写的函数也可以像系统线程一样挂起和恢复,就可以实现单线程并发了。Python 是支持这类函数的。

5. 生成器函数与生成器

生成器函数是指代码中有 yield 表达式的函数,比如:

$ python -q >>> def gen(): … yield 1 … yield 2 … return 3 … >>>

调用生成器函数时,Python 并不会像普通函数一样运行代码,而是返回一个生成器对象,或称生成器:

>>> g = gen() >>> g <generator object gen at 0x105655660>

将生成器作为参数传给内置的next() 函数,可以运行其代码。next() 函数调用生成器的 __next__()方法,运行代码至第一个 yield 表达式,返回 yield 参数后挂起。再次调用 next() 函数可以让生成器从挂起的地方开始运行,返回下一个 yield 参数后再次挂起:

>>> next(g) 1 >>> next(g) 2

如果后面已经没有 yield 表达式,则抛出 StopIteration 异常:

>>> next(g) Traceback (most recent call last): File “<stdin>”, line 1, in <module> StopIteration: 3

此时,如果生成器有返回值,则异常对象会携带此值:

>>> g = gen() >>> next(g) 1 >>> next(g) 2 >>> try: … next(g) … except StopIteration as e: … e.value … 3

Python 引入生成器的最初目的是作为迭代器的替代。Python 中,可以迭代处理(比如在 for 循环中)的对象称为可迭代对象,可迭代对象实现了__iter__() 特殊方法,返回一个迭代器。而迭代器则实现了 __next__()方法,每次调用时返回下一个值,

>>> for i in gen(): … i … 1 2

迭代器可迭代,是因为它们本身也是可迭代对象。迭代器的 __iter__() 方法返回的是该迭代器本身。

生成器允许用户使用 yield 关键字,将函数作为迭代器,而不用定义类并实现对应的特殊方法。Python 会自动帮用户填充特殊方法,使生成器变成迭代器。

生成器以懒加载模式按需返回值,因而更节约内存空间,甚至可以用于生成无限序列,相关用例可以参考 PEP 255。不过,我们这里关心的不是内存效率,而是生成器可以被挂起和恢复的特性。

6. 生成器作为协程

对多任务程序,可以在每个任务对应的函数中按需插入 yield 表达式,将其转为生成器,再轮流运行这些生成器:以固定顺序循环调用 next() 函数,直到生成器停止。得到一个按下面逻辑运行的并发程序:

译|Python幕后(12):async-await 机制
generators

下面,我们用这种策略修改前面的顺序服务器。首先要插入 yield 语句,我建议在每次阻塞操作前插入;然后运行生成器,我建议我们可以写一个类,提供 create_task() 方法将生成器添加到一个运行队列中,run() 方法轮流运行被添加的任务。这里把这个类命名为 EventLoopNoIO,因为它处理事件循环,但没有用到 I/O 多路复用。服务器代码如下:

# echo_05_yield_no_io.py import socket from event_loop_01_no_io import EventLoopNoIO loop = EventLoopNoIO() def run_server(host=127.0.0.1, port=55555): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.listen() while True: yield client_sock, addr = sock.accept() print(Connection from, addr) loop.create_task(handle_client(client_sock)) def handle_client(sock): while True: yield received_data = sock.recv(4096) if not received_data: break yield sock.sendall(received_data) print(Client disconnected:, sock.getpeername()) sock.close() if __name__ == __main__: loop.create_task(run_server()) loop.run()

事件循环代码如下:

# event_loop_01_no_io.py from collections import deque class EventLoopNoIO: def __init__(self): self.tasks_to_run = deque([]) def create_task(self, coro): self.tasks_to_run.append(coro) def run(self): while self.tasks_to_run: task = self.tasks_to_run.popleft() try: next(task) except StopIteration: continue self.create_task(task)

这也算一种并发服务器了,不过你可能已经注意到,它的并发是很有限的,所有任务都以固定顺序交替运行。假如当前任务在等待新连接,则处理已连接客户端的任务必须等待新连接进入后才能开始。

换句话说,事件循环并没有检查 socket 操作是否被阻塞。如前面所说,我们可以引入 I/O 多路复用解决这个问题。此时,事件循环根据任务加入顺序轮流执行,而是在任务等待的 socket 可读(或可写)时执行任务。任务可以自己注册监听 socket 事件,也可以用 yield 关键字将需求提交给事件循环。下面的代码采用的是后一种方式:

# echo_06_yield_io.py import socket from event_loop_02_io import EventLoopIo loop = EventLoopIo() def run_server(host=127.0.0.1, port=55555): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.listen() while True: yield wait_read, sock client_sock, addr = sock.accept() print(Connection from, addr) loop.create_task(handle_client(client_sock)) def handle_client(sock): while True: yield wait_read, sock received_data = sock.recv(4096) if not received_data: break yield wait_write, sock sock.sendall(received_data) print(Client disconnected:, sock.getpeername()) sock.close() if __name__ == __main__: loop.create_task(run_server()) loop.run()

使用 I/O 多路复用的事件循环代码如下:

# event_loop_02_io.py from collections import deque import selectors class EventLoopIo: def __init__(self): self.tasks_to_run = deque([]) self.sel = selectors.DefaultSelector() def create_task(self, coro): self.tasks_to_run.append(coro) def run(self): while True: if self.tasks_to_run: task = self.tasks_to_run.popleft() try: op, arg = next(task) except StopIteration: continue if op == wait_read: self.sel.register(arg, selectors.EVENT_READ, task) elif op == wait_write: self.sel.register(arg, selectors.EVENT_WRITE, task) else: raise ValueError(Unknown event loop operation:, op) else: for key, _ in self.sel.select(): task = key.data sock = key.fileobj self.sel.unregister(sock) self.create_task(task)

这段代码怎么样呢?首先,服务器可以正常处理多个客户端连接:

$ python clients.py [00.160966] Client 0 tries to connect. [00.161494] Client 1 tries to connect. [00.161783] Client 2 tries to connect. [00.163256] Client 0 connects. [00.163409] Client 1 connects. [00.163470] Client 2 connects. [00.667343] Client 0 sends “Hello”. [00.667491] Client 1 sends “Hello”. [00.667609] Client 2 sends “Hello”. [00.667886] Client 0 receives “Hello”. [00.668160] Client 1 receives “Hello”. [00.668237] Client 2 receives “Hello”. [01.171159] Client 0 sends “world!”. [01.171320] Client 1 sends “world!”. [01.171439] Client 2 sends “world!”. [01.171610] Client 0 receives “world!”. [01.171839] Client 0 disconnects. [01.172084] Client 1 receives “world!”. [01.172154] Client 1 disconnects. [01.172190] Client 2 receives “world!”. [01.172237] Client 2 disconnects.

其次,我们写的代码和顺序服务器代码很接近。当然,这里还添加了事件循环的代码,不过一般情况下,我们可以使用一些库提供的事件循环,在 Python 中,最常见的是asyncio

在多任务中使用的生成器,称为协程。协程即可以挂起并移交控制权的函数。根据这个定义,带有 yield 表达式的简单生成器也可以称为协程。但真正的协程应该可以调用其它协程并将控制器移交给对方,而不像生成器,只能将控制权移交给调用者。

把生成器部分的代码拆为子生成器,就可以看出真正的协程有什么区别。例如 handle_client() 中有两行代码:

yield wait_read, sock received_data = sock.recv(4096)

如果能拆为独立函数的话:

def async_recv(sock, n): yield wait_read, sock return sock.recv(n)

可以直接调用:

received_data = async_recv(sock, 4096)

但这样拆分是不行的。async_recv() 返回的是一个生成器,而非数据,因此,handle_client() 生成器必须以 next() 函数调用 async_recv() 子生成器。但它并不能直接调用 next() 直到子生成器停止,子生成器将值抛给事件循环,因此,handle_client() 必须将其值再向上层移交,同时还要处理 S

为解决这个问题,Python 做过几次尝试,首先是 PEP 342在 Python 2.5 对生成器做了增强,提供了send() 方法,功能类似于 __next__() ,但同时可以把值发送给生成器,作为生成器挂起时 yield 表达式的值:

>>> def consumer(): … val = yield 1 … print(Got, val) … val = yield … print(Got, val) … >>> c = consumer() >>> next(c) 1 >>> c.send(2) Got 2 >>> c.send(3) Got 3 Traceback (most recent call last): File “<stdin>”, line 1, in <module> StopIteration

__next__() 方法可以简单看做 send(None)。

同时,生成器还新增了 throw() 方法与 close()方法,throw() 方法在生成器挂起时抛出异常,close() 方法抛出 GeneratorExit 异常。

这次增强解决了子生成器问题。此时,生成器不再直接运行子生成器,而是将其 yield 给事件循环,由事件循环运行,事件循环可以用 send() 方法将子生成器的结果返回给生成器(或在子生成器抛出异常时传递给生成器)。调用方式如下:

received_data = yield async_recv(sock)

这种调用方式的结果和协程互相调用是一致的。

这种方式要在事件循环中增加不少处理逻辑,也不太好理解。不过,你不一定要理解这种方式,因为 PEP 380给 Python 3.3 引入了另一种更符合直觉的协程实现方式。

7. yield from

yield from iterable

效果与下面两行代码一致:

for i in iterable: yield i

而在生成器中使用 yield from 的效果要复杂得多,它可以让生成器直接运行子生成器,这也是我们讨论这个语句的原因。其工作逻辑如下:

首先以 send(None) 方式运行子生成器,如果抛出 StopIteration 异常,则捕获异常,将异常结果作为 yield from 表达式的值并停止运行;如果 send() 返回一个值,则 yield 该值并接收一个发送给该生成器的值;接收到值时,重复步骤 1,但 send() 的参数为接收到的值;

yield from 会自动调用 throw() 和 close() 方法,将异常传递给子生成器。在处理可迭代对象时,其逻

用户只要记住:yield from 可以让子生成器的代码就像生成器的一部分一样。下面这个调用:

received_data = yield from async_recv(sock)

就像将这行代码替换为 async_recv() 函数的代码。这也是一种协程调用,但和前面基于 yield 语句的调用不同的是,事件循环的逻辑不受影响。

下面就以 yield from 语句来简化服务器代码。首先提取出所有 yield 语句和之后的 socket 操作,作为单独的生成器函数,将这些函数放到事件循环中:

# event_loop_03_yield_from.py from collections import deque import selectors class EventLoopYieldFrom: def __init__(self): self.tasks_to_run = deque([]) self.sel = selectors.DefaultSelector() def create_task(self, coro): self.tasks_to_run.append(coro) def sock_recv(self, sock, n): yield wait_read, sock return sock.recv(n) def sock_sendall(self, sock, data): yield wait_write, sock sock.sendall(data) def sock_accept(self, sock): yield wait_read, sock return sock.accept() def run(self): while True: if self.tasks_to_run: task = self.tasks_to_run.popleft() try: op, arg = next(task) except StopIteration: continue if op == wait_read: self.sel.register(arg, selectors.EVENT_READ, task) elif op == wait_write: self.sel.register(arg, selectors.EVENT_WRITE, task) else: raise ValueError(Unknown event loop operation:, op) else: for key, _ in self.sel.select(): task = key.data sock = key.fileobj self.sel.unregister(sock) self.create_task(task)

然后再服务器代码中 yield from 这些生成器:

# echo_07_yield_from.py import socket from event_loop_03_yield_from import EventLoopYieldFrom loop = EventLoopYieldFrom() def run_server(host=127.0.0.1, port=55555): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.listen() while True: client_sock, addr = yield from loop.sock_accept(sock) print(Connection from, addr) loop.create_task(handle_client(client_sock)) def handle_client(sock): while True: received_data = yield from loop.sock_recv(sock, 4096) if not received_data: break yield from loop.sock_sendall(sock, received_data) print(Client disconnected:, sock.getpeername()) sock.close() if __name__ == __main__: loop.create_task(run_server()) loop.run()

搞定!只要利用生成器、yield 与 yield from,我们就可实现协程,像写普通代码一样写出异步、并发的代码。那 async/await 呢?它们不过是基于生成器提供的语法糖,用于解决生成器写法不够清楚明确的问题。

8. async/await

看到一个生成器函数时,你并不能确定它会被用作普通生成器还是协程,两者看上去都是包含一些 yield 或 yield from 表达式,由 def 关键字定义的函数。为使协程更明确,PEP 492 在 Python 3.5 中引入了 async 与 await 关键字。

用户可以使用 async def 定义一个原生(native)协程函数:

>>> async def coro(): … return 1 …

调用此函数时,返回一个原生协程对象,或称原生协程。原生协程与生成器类似,只是类型不同,也没有实现 __next__() 方法。事件循环可以调用 send(None) 以运行原生协程:

>>> coro().send(None) Traceback (most recent call last): File “<stdin>”, line 1, in <module> StopIteration: 1

原生协程间可以通过 await 关键字互相调用:

>>> async def coro2(): … r = await coro() … return 1 + r … >>> coro2().send(None) Traceback (most recent call last): File “<stdin>”, line 1, in <module> StopIteration: 2

await 关键字的作用与 yield from 一样。事实上,它就是通过 yield from 实现的,只是增加了一些判断,确保处理对象不是生成器或其它可迭代对象。

将生成器用作协程时,yield from 调用链的终点必须是一个能 yield 的生成器。同样地,await 调用链的终点也必须有 yield 表达式。不过,如果在 async def 函数中使用 yield 表达式,返回的并不是原生协程,而是一种称为异步生成器的对象:

>>> async def g(): … yield 2 … >>> g() <async_generator object g at 0x1046c6790>

这里不过多讨论异步生成器,简单地说,它们实现了异步版本的迭代器接口,包括__aiter__()__anext__() 特殊方法(具体请参考 PEP 525 )。对我们来说,重要的是, __anext__() 是可等待的(awaitable),而异步生成器不行。因此,await 不能调用包含 yield 表达式的 async def 函数。那么,await 调用链的终点应该是什么呢?有两个选项。

第一个选项是在常规生成器函数前加一个 @types.coroutine 装饰器,这个装饰器会在生成器背后的函数上设置标记,使其可以像原生协程一样用在 await 表达式中。

>>> import types >>> @types.coroutine … def gen_coro(): … yield 3 … >>> async def coro3(): … await gen_coro() … >>> coro3().send(None) 3

生成器用 @types.coroutine 装饰后称为基于生成器的协程。如果直接允许在 await 后使用常规生成器,大家还是容易混淆生成器与协程的概念区别,而通过这个装饰器,可以显式地将一个生成器标记为协程。

第二个选项是给需要等待的对象提供 __await__()特殊方法。await 表达式会先判断等待对象是否为原生协程或基于生成器的协程,如果是,则 “yields from” 协程,如果不是,则 “yields from” 该对象的 __await__() 方法返回的迭代器。因为生成器都是迭代器,因此 __await__() 方法可以写成一个普通的生成器函数。

>>> class A: … def __await__(self): … yield 4 … >>> async def coro4(): … await A() … >>> coro4().send(None) 4

下面,我们用 async/await 实现服务器的最终版本。首先,在函数前增加 async 关键字,把 yield from 调用改为 await 调用:

# echo_08_async_await.py import socket from event_loop_04_async_await import EventLoopAsyncAwait loop = EventLoopAsyncAwait() async def run_server(host=127.0.0.1, port=55555): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.listen() while True: client_sock, addr = await loop.sock_accept(sock) print(Connection from, addr) loop.create_task(handle_client(client_sock)) async def handle_client(sock): while True: received_data = await loop.sock_recv(sock, 4096) if not received_data: break await loop.sock_sendall(sock, received_data) print(Client disconnected:, sock.getpeername()) sock.close() if __name__ == __main__: loop.create_task(run_server()) loop.run()

然后修改事件循环。在生成器函数前增加 @types.coroutine 装饰器,并在任务运行时以 send(None) 替换 next() 函数:

# event_loop_04_async_await.py from collections import deque import selectors import types class EventLoopAsyncAwait: def __init__(self): self.tasks_to_run = deque([]) self.sel = selectors.DefaultSelector() def create_task(self, coro): self.tasks_to_run.append(coro) @types.coroutine def sock_recv(self, sock, n): yield wait_read, sock return sock.recv(n) @types.coroutine def sock_sendall(self, sock, data): yield wait_write, sock sock.sendall(data) @types.coroutine def sock_accept(self, sock): yield wait_read, sock return sock.accept() def run(self): while True: if self.tasks_to_run: task = self.tasks_to_run.popleft() try: op, arg = task.send(None) except StopIteration: continue if op == wait_read: self.sel.register(arg, selectors.EVENT_READ, task) elif op == wait_write: self.sel.register(arg, selectors.EVENT_WRITE, task) else: raise ValueError(Unknown event loop operation:, op) else: for key, _ in self.sel.select(): task = key.data sock = key.fileobj self.sel.unregister(sock) self.create_task(task)

上面就是 async/await 版本并发服务器的所有代码了。它的效果和 yield from 版本一致,只是语法稍有不同。

现在,你已经知道 async/await 是干什么的了。但对生成器、协程、yield、 yield from 和 await 的具体实现还不太了解。下一节我们就讨论这个问题。

9. 生成器与协程的实现 *

如果读过本系列前面的文章,你实际上已经知道生成器是如何实现的了。模组、函数、类都是代码块,编译器会为每个代码块创建代码对象,代码对象描述了代码块的具体内容,包括代码对应的字节码、常数、变量名和其它相关信息。而函数则通过函数对象保存其代码对象、函数名、默认参数以及 __doc__ 属性等信息。

生成器函数也是函数,只是其代码对象带有 CO_GENERATOR 标记。用户调用生成器函数时,Python 会检查此标记,如果存在,则不执行函数,而是返回生成器对象。类似地,原生协程函数也是函数,只是代码对象带有 CO_COROUTINE 标记,看到此标记时,Python 会直接返回原生协程对象。

执行函数时,Python 会创建函数帧对象,用于保存代码对象的执行状态,包括代码对象本身以及局部变量的值、全局变量与内置变量字典的引用、值栈、指令指针等等。

生成器对象保存着生成器函数的函数帧,以及一些辅助数据,如生成器名称、运行标记等。send() 方法会调用_PyEval_EvalFrameDefault() ,进入求值循环,像运行普通函数一样运行生成器的函数帧。求值循环会逐个执行函数的字节码。唯一但关键的区别在于,运行普通函数时,每次创建一个新的函数帧,而运行生成器时,使用的是同一个函数帧,因而能保存其运行状态。

Python 是如何执行 yield 表达式的呢?编译器会生成 YIELD_VALUE 字节码指令。通过dis 标准库即可看到:

# yield.py def g(): yield 1 val = yield 2 return 3 $ python -m dis yield.py … Disassembly of <code object g at 0x105b1c710, file “yield.py”, line 3>: 4 0 LOAD_CONST 1 (1) 2 YIELD_VALUE 4 POP_TOP 5 6 LOAD_CONST 2 (2) 8 YIELD_VALUE 10 STORE_FAST 0 (val) 6 12 LOAD_CONST 3 (3) 14 RETURN_VALUE

求值循环遇到 YIELD_VALUE 指令时,会停止该帧,并返回栈顶的值(在我们的案例中,即 send() 的返回值)。这个逻辑和 return 语句生成的 RETURN_VALUE 指令类似,区别在于,RETURN_VALUE 指令会将函数帧的 f_stacktop 字段设为 NULL,而 YIELD_VALUE 会将其设为栈顶指针。基于这个机制,send() 可以判断生成器当前是在 yield 还是 return。如果是 yield,直接返回值,如果是 return,则抛出包含值的 StopIteration 异常。

send() 第一次执行函数帧时,不会将参数提供给生成器。不过它会检查用户提供的参数是否为 None,避免忽略有意义的参数值。

>>> def g(): … val = yield … >>> g().send(42) Traceback (most recent call last): File “<stdin>”, line 1, in <module> TypeError: cant send non-None value to a just-started generator

而在之后运行时,send() 会将参数值推入栈中,随后通过 STORE_FAST (或类似指令)赋值给某个变量,如果 yield 没有接收该值的话,就由 POP_TOP 指令直接抛出。你可能一直搞不清楚生成器是先抛出值还是先接收值,现在应该明白了:先 YIELD_VALUE,再 STORE_FAST。

对 yield from 关键字,编译器会生成 GET_YIELD_FROM_ITER、 LOAD_CONST 以及 YIELD_FROM 指令:

# yield_from.py def g(): res = yield from another_gen $ python -m dis yield_from.py … Disassembly of <code object g at 0x1051117c0, file “yield_from.py”, line 3>: 4 0 LOAD_GLOBAL 0 (another_gen) 2 GET_YIELD_FROM_ITER 4 LOAD_CONST 0 (None) 6 YIELD_FROM 8 STORE_FAST 0 (res) …

GET_YIELD_FROM_ITER 指令会确保当前栈顶用于 yield from 的对象是一个迭代器。如果不是,则将该对象替换为 iter(obj)。

YIELD_FROM 指令的第一个工作是抛出栈顶的值。通常这个值是 send() 推入的,不过 send() 第一次运行时不会推入值,因此这里有一个 LOAD_CONST 往栈顶推入 None。

之后,YIELD_FROM 指令会检查 yield from 的对象,如果 send() 值为 None,则调用 obj.__next__(),否则调用 obj.send(value)。如果调用抛出 StopIteration 异常,则处理异常:将栈顶对象(即 yield from 的对象)替换为异常值,让函数帧继续执行。如果调用返回的是正常值,则停止执行函数帧,并将该值返回给 send(),同时修改指令指针,使下一次进入函数帧时从 YIELD_FROM 开始执行。之后执行时,变化的只有 yield from 对象的状态以及返回的值。

原生协程本质上就是一个类型不同的生成器对象。区别在于 generator 类实现了 __iter__() 与 __next__() 方法,coroutine 类实现的是 __await__() 方法,而 send() 的实现是一致的。

编译器给 await 表达式生成的字节码指令与 yield from 类似,只是用 GET_AWAITABLE 替换了 GET_YIELD_FROM_ITER :

# await.py async def coro(): res = await another_coro $ python -m dis await.py … Disassembly of <code object coro at 0x10d96e7c0, file “await.py”, line 3>: 4 0 LOAD_GLOBAL 0 (another_coro) 2 GET_AWAITABLE 4 LOAD_CONST 0 (None) 6 YIELD_FROM 8 STORE_FAST 0 (res) …

GET_AWAITABLE 指令会检查 yield from 的对象是否为原生协程或基于生成器的协程,如果不是,则替换为 obj.__await__()。

以上就是生成器与协程的基本原理。如果有更多想了解的,建议参考 CPython 源码。关于代码对象的定义,可以参考Include/cpython/code.h,关于函数对象的定义,可以参考 Include/funcobject.h,函数帧的定义可以参考 Include/cpython/frameobject.h ,关于生成器和协程,可以参考 Objects/genobject.c ,而在 Python/ceval.c 中,可以看到不同字节码指令的具体逻辑。

我们已经搞清楚 async/await 的原理,但对运行 async/await 的事件循环还没怎么讨论。工作中,由于比较繁琐,我们一般不会自己写事件循环,而是用一些事件循环库。结束本篇文章之前,让我们一起看看你最可能用到的一个事件循环库。

10. asyncio

引入 async/await 关键字的同时,asyncio 进入了 Python 标准库(PEP 3156)。它本质上就是提供了一个事件循环,以及一些异步编程中用到的类、函数与协程。

asyncio 的事件循环与我们前面写的 EventLoopAsyncAwait 略有不同。EventLoopAsyncAwait 中维护了一个协程队列,然后逐个调用 send(None),协程给出的值统一为 (event, socket),表示协程等待 socket 的 event 事件。然后将 socket 传入选择器中,事件发生后再往队列中增加协程。

asyncio 维护的并不是协程队列,而是一组回调,同时提供了loop.create_task() 及其它方法帮助用户运行协程。我们来看看具体逻辑。

事件循环中的回调分为三种:

已就绪回调,保存在 loop._ready 队列中,可以通过loop.call_soon()loop.call_soon_threadsafe() 方法添加;等待时间的回调,保存在 loop._scheduled 优先队列中,可以通过 loop.call_later() 或 loop.call_at() 方法添加;等待文件描述符的回调,通过选择器管理,可以通过 loop.add_reader()loop.add_writer() 注册;

以上这些方法都返回封装了回调的HandleTimerHandle 实例,用户可以通过 handle.cancel() 方法取消回调。TimerHandle 是 Handle 的子类,封装了等待时间的回调,实现了 __le__() 特殊方法,越快就绪的回调,其值越小,因此 loop._scheduled 优先队列可以按时间排序。

loop._run_once() 方法可以迭代处理事件循环,迭代逻辑如下:

从 loop._scheduled 移除已取消的回调;调用 loop._selector.select() 方法,处理事件,将回调添加到 loop._ready 队列中;将时间到了的回调从 loop._scheduled 队列

那么,基于回调的事件循环如何运行协程呢?我们一起来看看 loop.create_task()方法。调度协程时,会将其封装在Task实例中,而 Task.__init__() 会调用 loop.call_soon() 方法添加 task.__step() 回调,实际运行协程的是 task.__step()。

task.__step() 方法会调用 coro.send(None) 以运行协程。这个协程并不直接抛出(yield)消息,而是抛出 None 或 Future 实例。抛出 None 表示转移控制权,比如调用了 asyncio.sleep(0) 方法。此时,task.__step() 会将自己重新添加到队列。

抛出 Future实例表示某种操作的结果未就绪,大概就是告诉事件循环:“我在等待这个结果,暂时可能还没好,所以先让出控制权,等结果好了再唤醒我。”

此时,task.__step() 会调用 future.add_done_callback() 方法,往 Future 实例中添加一个回调,等结果就绪时重新将task.__step() 放到任务队列中。如果结果已就绪,则直接执行,否则就等待,直到其它人调用 future.set_result() 方法给出结果。

原生协程不能直接 yield。是不是说 yield Future 实例的时候必须用基于生成器的协程呢?并不是,原生协程可以 await Future 实例,例如:

async def future_waiter(): res = await some_future

为此,Future 实例实现了 __await__() 方法,抛出 Future 实例自身并返回结果:

class Future: # … def __await__(self): if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. if not self.done(): raise RuntimeError(“await wasnt used with future”) return self.result() # May raise too.

谁会给出 Future 实例的结果呢?我们可以看一个接收 socket 数据的例子,函数实现如下:

创建一个新的 Future 实例;调用 loop.add_reader() ,为 socket 注册回调,该回调负责从 socket 读取数据,并将其设置为 Future 实例的结果;将 Future 实例返回给调用者;

某个任务等待此 Future 实例时,会将其抛出给 task.__step() 方法,而 task.__step() 会往 Future 实例中添加回调,用于在步骤2给出结果时,将任务重新添加到队列。

我们知道,协程可以等待另一个协程的结果:

async def coro(): res = await another_coro()

实际上,协程还可以调度另一个协程,创建 Task 实例并等待:

async def coro(): task = asyncio.create_task(another_coro()) res = await task

Task 是 Future 的子类,因此也是可等待对象。当 coro.send(None) 抛出 StopIteration 异常,task.__step() 会处理异常并给出 Task 实例的结果。

以上就是 asyncio 库的基本工作逻辑。读者应该记住两点,首先,事件循环是基于回调的,对协程的支持也是基于同样的机制;其次,协程并不抛出消息,而是抛出 Future 对象,Future 对象允许协程等待包括 I/O 事件的各种情况。例如,协程可以创建一个独立线程完成复杂的计算任务,并等待其结果。我们也可以基于 socket 实现类似协程,但基于 Future 对象的实现要更优雅,也更通用。

11. 总结

最近几年,async/await 模式变得越来越流行。并发编程的价值日益凸显,而传统并发模式,如系统线程、回调等并不能用于所有场景,有时候编程语言/应用层实现的并发更合适。技术上说,基于回调的事件循环与 async/await 模式一样好,但谁会喜欢写回调呢?

async/await 并不是并发模式的唯一选择,许多人可能更倾向于其它实现。如GoClojure 中的通信顺序进程模型(CSP),或 ErlangAkka 中的 Actor 模型。不过,在 Python 中,async/await 似乎是目前的最佳选项。

async/await 模式并不是 Python 独有的,C#JavaScriptRust 以及 Swift等编程语言中都有实现。这里侧重讨论 Python 中的实现,只是因为我对它更熟悉。客观地说,Python 中的实现并不精致,它把生成器、基于生成器的协程、原生协程、yield from 以及 await 等概念混在一起,增加了理解难度。但你只要理解了这些概念,也就并不复杂了。

asyncio 库很可靠,但也有自己的问题。基于回调的事件循环使其可以同时支持回调模式或 async/await 模式。但如前面的服务器代码所展示的,直接运行协程的事件循环,在实现和使用上都要简单得多。比如 curiotrio 库就是直接运行协程的。

总的来说,并发编程本身就比较复杂,没有哪种编程模型可以将其彻底简化。但在一些模型帮助下,可以让并发可控可管理,本文讨论的 Python 的 async/await 模型就是其中一种。

P.S.

本文代码已上传到 github。David Beazley 的《协程与并发》和 Eli Bendersky 的《并发服务器》系列对本文帮助很大。

本篇是 Python 幕后系列的最终篇,目前我在计划写一些其它东西,但或许也还会就本序列写一些续集。如果有什么写作主题建议,可以发到我的邮箱 [email protected]

2021 年 8 月 27 日更新:并发与并行的关系很微妙,一般来说,并发被看做程序自身的属性,而并行被看做程序执行的属性。因此,并行可能不一定基于并发——即便是看上去顺序执行的程序,在指令比特层级上也可能是并行的。而任务层级的并行则是并发的一种特例。

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务