系列文章地址这是一篇译文,原文地址。
在函数前标记 async 关键字,再以 await 关键字调用它,你的程序突然就变成异步的了——可以在等待一件事,比如 I/O 操作的同时做其它事情。
带 async/await 关键字的代码看上去和普通同步代码没什么区别,但运行方式完全不同。要理解其中逻辑,就必须了解很多重要概念,包括并发、并行、事件循环、I/O 多路复用、异步、协作多任务与协程等。而在 Python 中,你还必须了解生成器、基于生成器的协程、原生协程、yield 与 yield from 关键字等。由于其复杂性,很多使用 async/await 的程序员并不了解其原理。我认为,只要从基础概念着手, async/await 模式是可以用简单的语言解释清楚的,这就是我们今天要做的。
注意:文章中默认使用 CPython 3.9,随着 CPython 的演化,一些实现细节可能会有所调整,我会尽力留意一些重要变化并更新文章内容。1. 并发
计算机是按顺序,一个指令接一个指令地执行程序的。但一个程序通常要运行多种任务,有时无法等待一个任务完成后再开始另一个,比如说,一个下棋应用必须在等待玩家操作的同时更新计时数字。这种同时执行多个任务的能力称为并发。并发并不是说这些任务必须在同一个物理时间执行,它们也可以交替运行:某个任务运行一段时间后挂起,让其它任务运行,之后再回来运行这个任务。基于这种机制,只有几个核心的设备可以运行成千上万个程序。在多核计算机或者计算集群中,如果多个任务于同一个物理时间运行,则称并行,是一种特殊的并发场景。
concurrency哪怕没有任何编程语言层面的支持,你也可以写出并发代码。比如要执行两个任务,分别封装在两个函数中:
def do_task1():
# …
def do_task2():
# …
def main():
do_task1()
do_task2()
如果任务之间是独立的,则可以将其拆分成多个部分,交替调用:
def do_task1_part1():
# …
def do_task1_part2():
# …
def do_task2_part1():
# …
def do_task2_part2():
# …
def main():
do_task1_part1()
do_task2_part1()
do_task1_part2()
do_task2_part2()
当然,上面这个例子有点过度简化了。我想说的是,编程语言本身并不会影响你写并发程序的能力,但可以让你更容易地写出并发程序。我们今天要讨论的 async/await 就是一个例子。
为理解从普通并发到 async/await 的演变过程,我们会写一个真实的并发程序——可以同时响应多个客户端请求的 TCP 服务器。一开始是最简单的,无并发的顺序处理版本,然后使用系统线程实现并发,再通过单线程 I/O 多路复用和事件循环实现一个版本,进而是基于生成器、协程实现的版本,最后引入 async/await。
2. 顺序处理服务器
只处理一个连接的 TCP 服务器实现起来比较直接。服务器在某个端口上监听请求,与某个客户端建立连接后与其通信,直到连接关闭,然后监听新的连接请求。可以用一个 socket 程序实现:
# echo_01_seq.py
import socket
def run_server(host=127.0.0.1, port=55555):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
while True:
client_sock, addr = sock.accept()
print(Connection from, addr)
handle_client(client_sock)
def handle_client(sock):
while True:
received_data = sock.recv(4096)
if not received_data:
break
sock.sendall(received_data)
print(Client disconnected:, sock.getpeername())
sock.close()
if __name__ == __main__:
run_server()
这段代码后面会一直用到,可以仔细看看。如果想温习一下 socket 相关知识,可以参考 Beej 网络编程指引或 socket 模块文档,上面代码的逻辑可以描述如下:
socket.socket() 创建一个新的 TCP/IP 套接字;sock.bind() 将套接字绑定到一个地址和端口上;sock.listen() 将套接字标记为监听状态;sock.accept() 建立新的连接;sock.recv() 从客户端接收数据,sock.sendall() 将数据发送回客户端;这个版本的服务器并不支持并发,多个客户端同时连接时,其中一个连接成功并占用服务器,其它客户端必须等待该客户端断开连接后才能连接。我写了一个模拟程序来演示其运行结果:
$ python clients.py
[00.097034] Client 0 tries to connect.
[00.097670] Client 1 tries to connect.
[00.098334] Client 2 tries to connect.
[00.099675] Client 0 connects.
[00.600378] Client 0 sends “Hello”.
[00.601602] Client 0 receives “Hello”.
[01.104952] Client 0 sends “world!”.
[01.105166] Client 0 receives “world!”.
[01.105276] Client 0 disconnects.
[01.106323] Client 1 connects.
[01.611248] Client 1 sends “Hello”.
[01.611609] Client 1 receives “Hello”.
[02.112496] Client 1 sends “world!”.
[02.112691] Client 1 receives “world!”.
[02.112772] Client 1 disconnects.
[02.113569] Client 2 connects.
[02.617032] Client 2 sends “Hello”.
[02.617288] Client 2 receives “Hello”.
[03.120725] Client 2 sends “world!”.
[03.120944] Client 2 receives “world!”.
[03.121044] Client 2 disconnects.
客户端连接服务器,发送两条消息后断开,每条消息的输入需要 0.5 秒,处理三个客户端的请求总共花费 3 秒。不过,任意客户端可能占用服务器任意长的时间,因此,有必要实现并发服务器。
3. 系统线程
实现并发服务器最简单的方式是使用系统线程,只需在另一个线程执行 handle_client() 函数即可,其它代码保持不变:
# echo_02_threads.py
import socket
import threading
def run_server(host=127.0.0.1, port=55555):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
while True:
client_sock, addr = sock.accept()
print(Connection from, addr)
thread = threading.Thread(target=handle_client, args=[client_sock])
thread.start()
def handle_client(sock):
while True:
received_data = sock.recv(4096)
if not received_data:
break
sock.sendall(received_data)
print(Client disconnected:, sock.getpeername())
sock.close()
if __name__ == __main__:
run_server()
此时,多个客户端可以同时与服务器通信:
$ python clients.py
[00.095948] Client 0 tries to connect.
[00.096472] Client 1 tries to connect.
[00.097019] Client 2 tries to connect.
[00.099666] Client 0 connects.
[00.099768] Client 1 connects.
[00.100916] Client 2 connects.
[00.602212] Client 0 sends “Hello”.
[00.602379] Client 1 sends “Hello”.
[00.602506] Client 2 sends “Hello”.
[00.602702] Client 0 receives “Hello”.
[00.602779] Client 1 receives “Hello”.
[00.602896] Client 2 receives “Hello”.
[01.106935] Client 0 sends “world!”.
[01.107088] Client 1 sends “world!”.
[01.107188] Client 2 sends “world!”.
[01.107342] Client 0 receives “world!”.
[01.107814] Client 0 disconnects.
[01.108217] Client 1 receives “world!”.
[01.108305] Client 1 disconnects.
[01.108345] Client 2 receives “world!”.
[01.108395] Client 2 disconnects.
$每$个线程处理一个客户端请求的方式实现起来很容易,但无法支持高并发量。系统线程会消耗大量内存,能同时运行的数量有限。例如,本网站所用的 Linux 服务器最多可以同时运行 8k 个线程,实际上可能不到这个数量就崩溃了。因此,服务器只能在高负载下艰难维持,并且很容易遭受 DoS 攻击。
程池中线程的最大数量,服务器不会创建太多线程。下面是基于线程池实现的服务器版本,使用了concurrent.futures 标准库:
# echo_03_thread_pool.py
import socket
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=20)
def run_server(host=127.0.0.1, port=55555):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
while True:
client_sock, addr = sock.accept()
print(Connection from, addr)
pool.submit(handle_client, client_sock)
def handle_client(sock):
while True:
received_data = sock.recv(4096)
if not received_data:
break
sock.sendall(received_data)
print(Client disconnected:, sock.getpeername())
sock.close()
if __name__ == __main__:
run_server()
线程池版本的服务器既简单又实用,但要防止单个客户端长时间占用某个线程。为此,你可以主动断开长连接、要求客户端维持一个最低吞吐率、让线程将任务重新放回队列,或采取组合策略等。这里要说的是,使用系统线程实现并发并不像看上去那么简单,我们还需探索更有效的并发方式。
4. I/O 多路复用与事件循环
我们再来看一下顺序服务器,会发现它总是在等待某个事件发生,没有连接时,它等待新的客户端连接,有连接后,又等待客户端发送数据。为了实现并发,应该
怎么让服务器知道下一个要处理的是什么事件呢?默认情况下,socket 的 accept()、recv()、sendall() 等方法都是阻塞的,调用 accept() 时,会保持阻塞状态,直到新的客户端接入,并不能同时调用其它客户端 socket 的 recv() 方法。不过,我们可以对阻塞方法设置超时,sock.settimeout(timeout),或将 socket 设置为非阻塞模式,sock.setblocking(False),然后同时保持多个 socket 连接,并在一个无限循环中调用每个 socket 对应的事件方法。对于还在监听新连接的 socket,就调用 accept(),对于等待客户端数据的 socket,就调用 recv()。
这个方法的问题在于,轮询时间很难准确配置。如果所有 socket 都设置为非阻塞模式,或超时时间设置得太短,服务器就会一直执行调用,消耗大量 CPU,如果超时时间设置得太长,又会导致响应很慢。
一个更好的选择是询问操作系统哪个 socket 已经就绪。显然,操作系统是掌握这个信息的,新数据包到达网络接口后,会通知操作系统,操作系统随即将其解码并唤醒正在等待读取该 socket 的进程。对于处理进程来说,除了等待读取该 socket,还可以通过 I/O 多路复用机制告诉操作系统,它准备读或写哪些 socket,如select()、poll() 或 epoll()等,当某个 socket 可读或可写时,操作系统也会唤醒该进程。
Python selectors标准库封装了不同的 I/O 多路复用机制,暴露的高层接口称为选择器(selector)。其中,SelectSelector 对应 select() 机制,EpollSelector 对应 epoll() 机制,而 DefaultSelector 对应当前操作系统支持的效率最高的机制。
下面一起来看看如何使用 selectors 库。首先,创建一个选择器:
sel = selectors.DefaultSelector()
然后调用 register() 方法,传入 socket 、监听的事件(socket可读写)以及辅加数据,注册监听:
sel.register(sock, selectors.EVENT_READ, my_data)
最后,调用 select() 方法:
keys_events = sel.select()
这个方法会返回一个列表,内容为 (key, events) 元组,每个元组表示一个就绪的 socket:
key 是一个对象,存储着 socket(key.fileobj)及该 socket 的辅加数据(key.data);events 是一个掩码,表示就绪的事件(selectors.EVENT_READ 或 selectors.EVENT_WRITE);调用 select() 方法时,如果有 socket 就绪,则立即返回,如果没有,则阻塞,直到某个 socket 就绪。操作系统阻塞 select() 方法的逻辑与阻塞 socket 的 recv() 等方法的逻辑是一致的。
不再监听某个 socket 时,可以通过 unregister() 方法取消注册。
那么,socket 就绪之后怎么操作呢?这个问题我们在注册 socket 的时候是心里有数的,因此,每个 socket 注册的时候都可以带上一个就绪时的回调函数。这也正是附加数据的用途。
现在,我们可以通过 I/O 多路复用实现一个单线程版本的并发服务器:
# echo_04_io_multiplexing.py
import socket
import selectors
sel = selectors.DefaultSelector()
def setup_listening_socket(host=127.0.0.1, port=55555):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
sel.register(sock, selectors.EVENT_READ, accept)
def accept(sock):
client_sock, addr = sock.accept()
print(Connection from, addr)
sel.register(client_sock, selectors.EVENT_READ, recv_and_send)
def recv_and_send(sock):
received_data = sock.recv(4096)
if received_data:
# assume sendall wont block
sock.sendall(received_data)
else:
print(Client disconnected:, sock.getpeername())
sel.unregister(sock)
sock.close()
def run_event_loop():
while True:
for key, _ in sel.select():
callback = key.data
sock = key.fileobj
callback(sock)
if __name__ == __main__:
setup_listening_socket()
run_event_loop()
先给监听的 socket 注册一个 accept() 回调,接受新连接请求并对每个客户端 socket 注册一个 recv_and_send() 回调。程序的核心逻辑是一个事件循环——迭代处理就绪 socket,调用对应回调的无限循环。
事件循环版本的服务器可以正常处理多个客户端连接。与多线程版本相比,主要问题在于代码的组织方式比较奇怪,是围绕着回调实现的。上面的代码看着还好,是因为有些逻辑的处理并不严谨,比如,写 socket 的逻辑也可能在写队列满时被阻塞,我们现应该检查 socket 是否可写,然后再调用 sock.sendall() 方法,也就是说,recv_and_send() 函数需要一分为二,其中一个再次注册为回调。如果服务器逻辑复杂一些,而不是直接返回客户端数据的话,代码实现会更麻烦。
使用系统线程时,我们可以在非回调模式下实现并发。为什么呢?关键在于操作系统有挂起和恢复线程执行的能力。如果我们写的函数也可以像系统线程一样挂起和恢复,就可以实现单线程并发了。Python 是支持这类函数的。
5. 生成器函数与生成器
生成器函数是指代码中有 yield 表达式的函数,比如:
$ python -q
>>> def gen():
… yield 1
… yield 2
… return 3
…
>>>
调用生成器函数时,Python 并不会像普通函数一样运行代码,而是返回一个生成器对象,或称生成器:
>>> g = gen()
>>> g
<generator object gen at 0x105655660>
将生成器作为参数传给内置的next() 函数,可以运行其代码。next() 函数调用生成器的 __next__()方法,运行代码至第一个 yield 表达式,返回 yield 参数后挂起。再次调用 next() 函数可以让生成器从挂起的地方开始运行,返回下一个 yield 参数后再次挂起:
>>> next(g)
1
>>> next(g)
2
如果后面已经没有 yield 表达式,则抛出 StopIteration 异常:
>>> next(g)
Traceback (most recent call last):
File “<stdin>”, line 1, in <module>
StopIteration: 3
此时,如果生成器有返回值,则异常对象会携带此值:
>>> g = gen()
>>> next(g)
1
>>> next(g)
2
>>> try:
… next(g)
… except StopIteration as e:
… e.value
…
3
Python 引入生成器的最初目的是作为迭代器的替代。Python 中,可以迭代处理(比如在 for 循环中)的对象称为可迭代对象,可迭代对象实现了__iter__() 特殊方法,返回一个迭代器。而迭代器则实现了 __next__()方法,每次调用时返回下一个值,
>>> for i in gen():
… i
…
1
2
迭代器可迭代,是因为它们本身也是可迭代对象。迭代器的 __iter__() 方法返回的是该迭代器本身。
生成器允许用户使用 yield 关键字,将函数作为迭代器,而不用定义类并实现对应的特殊方法。Python 会自动帮用户填充特殊方法,使生成器变成迭代器。
生成器以懒加载模式按需返回值,因而更节约内存空间,甚至可以用于生成无限序列,相关用例可以参考 PEP 255。不过,我们这里关心的不是内存效率,而是生成器可以被挂起和恢复的特性。
6. 生成器作为协程
对多任务程序,可以在每个任务对应的函数中按需插入 yield 表达式,将其转为生成器,再轮流运行这些生成器:以固定顺序循环调用 next() 函数,直到生成器停止。得到一个按下面逻辑运行的并发程序:
generators下面,我们用这种策略修改前面的顺序服务器。首先要插入 yield 语句,我建议在每次阻塞操作前插入;然后运行生成器,我建议我们可以写一个类,提供 create_task() 方法将生成器添加到一个运行队列中,run() 方法轮流运行被添加的任务。这里把这个类命名为 EventLoopNoIO,因为它处理事件循环,但没有用到 I/O 多路复用。服务器代码如下:
# echo_05_yield_no_io.py
import socket
from event_loop_01_no_io import EventLoopNoIO
loop = EventLoopNoIO()
def run_server(host=127.0.0.1, port=55555):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
while True:
yield
client_sock, addr = sock.accept()
print(Connection from, addr)
loop.create_task(handle_client(client_sock))
def handle_client(sock):
while True:
yield
received_data = sock.recv(4096)
if not received_data:
break
yield
sock.sendall(received_data)
print(Client disconnected:, sock.getpeername())
sock.close()
if __name__ == __main__:
loop.create_task(run_server())
loop.run()
事件循环代码如下:
# event_loop_01_no_io.py
from collections import deque
class EventLoopNoIO:
def __init__(self):
self.tasks_to_run = deque([])
def create_task(self, coro):
self.tasks_to_run.append(coro)
def run(self):
while self.tasks_to_run:
task = self.tasks_to_run.popleft()
try:
next(task)
except StopIteration:
continue
self.create_task(task)
这也算一种并发服务器了,不过你可能已经注意到,它的并发是很有限的,所有任务都以固定顺序交替运行。假如当前任务在等待新连接,则处理已连接客户端的任务必须等待新连接进入后才能开始。
换句话说,事件循环并没有检查 socket 操作是否被阻塞。如前面所说,我们可以引入 I/O 多路复用解决这个问题。此时,事件循环根据任务加入顺序轮流执行,而是在任务等待的 socket 可读(或可写)时执行任务。任务可以自己注册监听 socket 事件,也可以用 yield 关键字将需求提交给事件循环。下面的代码采用的是后一种方式:
# echo_06_yield_io.py
import socket
from event_loop_02_io import EventLoopIo
loop = EventLoopIo()
def run_server(host=127.0.0.1, port=55555):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
while True:
yield wait_read, sock
client_sock, addr = sock.accept()
print(Connection from, addr)
loop.create_task(handle_client(client_sock))
def handle_client(sock):
while True:
yield wait_read, sock
received_data = sock.recv(4096)
if not received_data:
break
yield wait_write, sock
sock.sendall(received_data)
print(Client disconnected:, sock.getpeername())
sock.close()
if __name__ == __main__:
loop.create_task(run_server())
loop.run()
使用 I/O 多路复用的事件循环代码如下:
# event_loop_02_io.py
from collections import deque
import selectors
class EventLoopIo:
def __init__(self):
self.tasks_to_run = deque([])
self.sel = selectors.DefaultSelector()
def create_task(self, coro):
self.tasks_to_run.append(coro)
def run(self):
while True:
if self.tasks_to_run:
task = self.tasks_to_run.popleft()
try:
op, arg = next(task)
except StopIteration:
continue
if op == wait_read:
self.sel.register(arg, selectors.EVENT_READ, task)
elif op == wait_write:
self.sel.register(arg, selectors.EVENT_WRITE, task)
else:
raise ValueError(Unknown event loop operation:, op)
else:
for key, _ in self.sel.select():
task = key.data
sock = key.fileobj
self.sel.unregister(sock)
self.create_task(task)
这段代码怎么样呢?首先,服务器可以正常处理多个客户端连接:
$ python clients.py
[00.160966] Client 0 tries to connect.
[00.161494] Client 1 tries to connect.
[00.161783] Client 2 tries to connect.
[00.163256] Client 0 connects.
[00.163409] Client 1 connects.
[00.163470] Client 2 connects.
[00.667343] Client 0 sends “Hello”.
[00.667491] Client 1 sends “Hello”.
[00.667609] Client 2 sends “Hello”.
[00.667886] Client 0 receives “Hello”.
[00.668160] Client 1 receives “Hello”.
[00.668237] Client 2 receives “Hello”.
[01.171159] Client 0 sends “world!”.
[01.171320] Client 1 sends “world!”.
[01.171439] Client 2 sends “world!”.
[01.171610] Client 0 receives “world!”.
[01.171839] Client 0 disconnects.
[01.172084] Client 1 receives “world!”.
[01.172154] Client 1 disconnects.
[01.172190] Client 2 receives “world!”.
[01.172237] Client 2 disconnects.
其次,我们写的代码和顺序服务器代码很接近。当然,这里还添加了事件循环的代码,不过一般情况下,我们可以使用一些库提供的事件循环,在 Python 中,最常见的是asyncio。
在多任务中使用的生成器,称为协程。协程即可以挂起并移交控制权的函数。根据这个定义,带有 yield 表达式的简单生成器也可以称为协程。但真正的协程应该可以调用其它协程并将控制器移交给对方,而不像生成器,只能将控制权移交给调用者。
把生成器部分的代码拆为子生成器,就可以看出真正的协程有什么区别。例如 handle_client() 中有两行代码:
yield wait_read, sock
received_data = sock.recv(4096)
如果能拆为独立函数的话:
def async_recv(sock, n):
yield wait_read, sock
return sock.recv(n)
可以直接调用:
received_data = async_recv(sock, 4096)
但这样拆分是不行的。async_recv() 返回的是一个生成器,而非数据,因此,handle_client() 生成器必须以 next() 函数调用 async_recv() 子生成器。但它并不能直接调用 next() 直到子生成器停止,子生成器将值抛给事件循环,因此,handle_client() 必须将其值再向上层移交,同时还要处理 S
为解决这个问题,Python 做过几次尝试,首先是 PEP 342在 Python 2.5 对生成器做了增强,提供了send() 方法,功能类似于 __next__() ,但同时可以把值发送给生成器,作为生成器挂起时 yield 表达式的值:
>>> def consumer():
… val = yield 1
… print(Got, val)
… val = yield
… print(Got, val)
…
>>> c = consumer()
>>> next(c)
1
>>> c.send(2)
Got 2
>>> c.send(3)
Got 3
Traceback (most recent call last):
File “<stdin>”, line 1, in <module>
StopIteration
__next__() 方法可以简单看做 send(None)。
同时,生成器还新增了 throw() 方法与 close()方法,throw() 方法在生成器挂起时抛出异常,close() 方法抛出 GeneratorExit 异常。
这次增强解决了子生成器问题。此时,生成器不再直接运行子生成器,而是将其 yield 给事件循环,由事件循环运行,事件循环可以用 send() 方法将子生成器的结果返回给生成器(或在子生成器抛出异常时传递给生成器)。调用方式如下:
received_data = yield async_recv(sock)
这种调用方式的结果和协程互相调用是一致的。
这种方式要在事件循环中增加不少处理逻辑,也不太好理解。不过,你不一定要理解这种方式,因为 PEP 380给 Python 3.3 引入了另一种更符合直觉的协程实现方式。
7. yield from
yield from iterable
效果与下面两行代码一致:
for i in iterable:
yield i
而在生成器中使用 yield from 的效果要复杂得多,它可以让生成器直接运行子生成器,这也是我们讨论这个语句的原因。其工作逻辑如下:
首先以 send(None) 方式运行子生成器,如果抛出 StopIteration 异常,则捕获异常,将异常结果作为 yield from 表达式的值并停止运行;如果 send() 返回一个值,则 yield 该值并接收一个发送给该生成器的值;接收到值时,重复步骤 1,但 send() 的参数为接收到的值;yield from 会自动调用 throw() 和 close() 方法,将异常传递给子生成器。在处理可迭代对象时,其逻
用户只要记住:yield from 可以让子生成器的代码就像生成器的一部分一样。下面这个调用:
received_data = yield from async_recv(sock)
就像将这行代码替换为 async_recv() 函数的代码。这也是一种协程调用,但和前面基于 yield 语句的调用不同的是,事件循环的逻辑不受影响。
下面就以 yield from 语句来简化服务器代码。首先提取出所有 yield 语句和之后的 socket 操作,作为单独的生成器函数,将这些函数放到事件循环中:
# event_loop_03_yield_from.py
from collections import deque
import selectors
class EventLoopYieldFrom:
def __init__(self):
self.tasks_to_run = deque([])
self.sel = selectors.DefaultSelector()
def create_task(self, coro):
self.tasks_to_run.append(coro)
def sock_recv(self, sock, n):
yield wait_read, sock
return sock.recv(n)
def sock_sendall(self, sock, data):
yield wait_write, sock
sock.sendall(data)
def sock_accept(self, sock):
yield wait_read, sock
return sock.accept()
def run(self):
while True:
if self.tasks_to_run:
task = self.tasks_to_run.popleft()
try:
op, arg = next(task)
except StopIteration:
continue
if op == wait_read:
self.sel.register(arg, selectors.EVENT_READ, task)
elif op == wait_write:
self.sel.register(arg, selectors.EVENT_WRITE, task)
else:
raise ValueError(Unknown event loop operation:, op)
else:
for key, _ in self.sel.select():
task = key.data
sock = key.fileobj
self.sel.unregister(sock)
self.create_task(task)
然后再服务器代码中 yield from 这些生成器:
# echo_07_yield_from.py
import socket
from event_loop_03_yield_from import EventLoopYieldFrom
loop = EventLoopYieldFrom()
def run_server(host=127.0.0.1, port=55555):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
while True:
client_sock, addr = yield from loop.sock_accept(sock)
print(Connection from, addr)
loop.create_task(handle_client(client_sock))
def handle_client(sock):
while True:
received_data = yield from loop.sock_recv(sock, 4096)
if not received_data:
break
yield from loop.sock_sendall(sock, received_data)
print(Client disconnected:, sock.getpeername())
sock.close()
if __name__ == __main__:
loop.create_task(run_server())
loop.run()
搞定!只要利用生成器、yield 与 yield from,我们就可实现协程,像写普通代码一样写出异步、并发的代码。那 async/await 呢?它们不过是基于生成器提供的语法糖,用于解决生成器写法不够清楚明确的问题。
8. async/await
看到一个生成器函数时,你并不能确定它会被用作普通生成器还是协程,两者看上去都是包含一些 yield 或 yield from 表达式,由 def 关键字定义的函数。为使协程更明确,PEP 492 在 Python 3.5 中引入了 async 与 await 关键字。
用户可以使用 async def 定义一个原生(native)协程函数:
>>> async def coro():
… return 1
…
调用此函数时,返回一个原生协程对象,或称原生协程。原生协程与生成器类似,只是类型不同,也没有实现 __next__() 方法。事件循环可以调用 send(None) 以运行原生协程:
>>> coro().send(None)
Traceback (most recent call last):
File “<stdin>”, line 1, in <module>
StopIteration: 1
原生协程间可以通过 await 关键字互相调用:
>>> async def coro2():
… r = await coro()
… return 1 + r
…
>>> coro2().send(None)
Traceback (most recent call last):
File “<stdin>”, line 1, in <module>
StopIteration: 2
await 关键字的作用与 yield from 一样。事实上,它就是通过 yield from 实现的,只是增加了一些判断,确保处理对象不是生成器或其它可迭代对象。
将生成器用作协程时,yield from 调用链的终点必须是一个能 yield 的生成器。同样地,await 调用链的终点也必须有 yield 表达式。不过,如果在 async def 函数中使用 yield 表达式,返回的并不是原生协程,而是一种称为异步生成器的对象:
>>> async def g():
… yield 2
…
>>> g()
<async_generator object g at 0x1046c6790>
这里不过多讨论异步生成器,简单地说,它们实现了异步版本的迭代器接口,包括__aiter__() 和 __anext__() 特殊方法(具体请参考 PEP 525 )。对我们来说,重要的是, __anext__() 是可等待的(awaitable),而异步生成器不行。因此,await 不能调用包含 yield 表达式的 async def 函数。那么,await 调用链的终点应该是什么呢?有两个选项。
第一个选项是在常规生成器函数前加一个 @types.coroutine 装饰器,这个装饰器会在生成器背后的函数上设置标记,使其可以像原生协程一样用在 await 表达式中。
>>> import types
>>> @types.coroutine
… def gen_coro():
… yield 3
…
>>> async def coro3():
… await gen_coro()
…
>>> coro3().send(None)
3
生成器用 @types.coroutine 装饰后称为基于生成器的协程。如果直接允许在 await 后使用常规生成器,大家还是容易混淆生成器与协程的概念区别,而通过这个装饰器,可以显式地将一个生成器标记为协程。
第二个选项是给需要等待的对象提供 __await__()特殊方法。await 表达式会先判断等待对象是否为原生协程或基于生成器的协程,如果是,则 “yields from” 协程,如果不是,则 “yields from” 该对象的 __await__() 方法返回的迭代器。因为生成器都是迭代器,因此 __await__() 方法可以写成一个普通的生成器函数。
>>> class A:
… def __await__(self):
… yield 4
…
>>> async def coro4():
… await A()
…
>>> coro4().send(None)
4
下面,我们用 async/await 实现服务器的最终版本。首先,在函数前增加 async 关键字,把 yield from 调用改为 await 调用:
# echo_08_async_await.py
import socket
from event_loop_04_async_await import EventLoopAsyncAwait
loop = EventLoopAsyncAwait()
async def run_server(host=127.0.0.1, port=55555):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
while True:
client_sock, addr = await loop.sock_accept(sock)
print(Connection from, addr)
loop.create_task(handle_client(client_sock))
async def handle_client(sock):
while True:
received_data = await loop.sock_recv(sock, 4096)
if not received_data:
break
await loop.sock_sendall(sock, received_data)
print(Client disconnected:, sock.getpeername())
sock.close()
if __name__ == __main__:
loop.create_task(run_server())
loop.run()
然后修改事件循环。在生成器函数前增加 @types.coroutine 装饰器,并在任务运行时以 send(None) 替换 next() 函数:
# event_loop_04_async_await.py
from collections import deque
import selectors
import types
class EventLoopAsyncAwait:
def __init__(self):
self.tasks_to_run = deque([])
self.sel = selectors.DefaultSelector()
def create_task(self, coro):
self.tasks_to_run.append(coro)
@types.coroutine
def sock_recv(self, sock, n):
yield wait_read, sock
return sock.recv(n)
@types.coroutine
def sock_sendall(self, sock, data):
yield wait_write, sock
sock.sendall(data)
@types.coroutine
def sock_accept(self, sock):
yield wait_read, sock
return sock.accept()
def run(self):
while True:
if self.tasks_to_run:
task = self.tasks_to_run.popleft()
try:
op, arg = task.send(None)
except StopIteration:
continue
if op == wait_read:
self.sel.register(arg, selectors.EVENT_READ, task)
elif op == wait_write:
self.sel.register(arg, selectors.EVENT_WRITE, task)
else:
raise ValueError(Unknown event loop operation:, op)
else:
for key, _ in self.sel.select():
task = key.data
sock = key.fileobj
self.sel.unregister(sock)
self.create_task(task)
上面就是 async/await 版本并发服务器的所有代码了。它的效果和 yield from 版本一致,只是语法稍有不同。
现在,你已经知道 async/await 是干什么的了。但对生成器、协程、yield、 yield from 和 await 的具体实现还不太了解。下一节我们就讨论这个问题。
9. 生成器与协程的实现 *
如果读过本系列前面的文章,你实际上已经知道生成器是如何实现的了。模组、函数、类都是代码块,编译器会为每个代码块创建代码对象,代码对象描述了代码块的具体内容,包括代码对应的字节码、常数、变量名和其它相关信息。而函数则通过函数对象保存其代码对象、函数名、默认参数以及 __doc__ 属性等信息。
生成器函数也是函数,只是其代码对象带有 CO_GENERATOR 标记。用户调用生成器函数时,Python 会检查此标记,如果存在,则不执行函数,而是返回生成器对象。类似地,原生协程函数也是函数,只是代码对象带有 CO_COROUTINE 标记,看到此标记时,Python 会直接返回原生协程对象。
执行函数时,Python 会创建函数帧对象,用于保存代码对象的执行状态,包括代码对象本身以及局部变量的值、全局变量与内置变量字典的引用、值栈、指令指针等等。
生成器对象保存着生成器函数的函数帧,以及一些辅助数据,如生成器名称、运行标记等。send() 方法会调用_PyEval_EvalFrameDefault() ,进入求值循环,像运行普通函数一样运行生成器的函数帧。求值循环会逐个执行函数的字节码。唯一但关键的区别在于,运行普通函数时,每次创建一个新的函数帧,而运行生成器时,使用的是同一个函数帧,因而能保存其运行状态。
Python 是如何执行 yield 表达式的呢?编译器会生成 YIELD_VALUE 字节码指令。通过dis 标准库即可看到:
# yield.py
def g():
yield 1
val = yield 2
return 3
$ python -m dis yield.py
…
Disassembly of <code object g at 0x105b1c710, file “yield.py”, line 3>:
4 0 LOAD_CONST 1 (1)
2 YIELD_VALUE
4 POP_TOP
5 6 LOAD_CONST 2 (2)
8 YIELD_VALUE
10 STORE_FAST 0 (val)
6 12 LOAD_CONST 3 (3)
14 RETURN_VALUE
求值循环遇到 YIELD_VALUE 指令时,会停止该帧,并返回栈顶的值(在我们的案例中,即 send() 的返回值)。这个逻辑和 return 语句生成的 RETURN_VALUE 指令类似,区别在于,RETURN_VALUE 指令会将函数帧的 f_stacktop 字段设为 NULL,而 YIELD_VALUE 会将其设为栈顶指针。基于这个机制,send() 可以判断生成器当前是在 yield 还是 return。如果是 yield,直接返回值,如果是 return,则抛出包含值的 StopIteration 异常。
send() 第一次执行函数帧时,不会将参数提供给生成器。不过它会检查用户提供的参数是否为 None,避免忽略有意义的参数值。
>>> def g():
… val = yield
…
>>> g().send(42)
Traceback (most recent call last):
File “<stdin>”, line 1, in <module>
TypeError: cant send non-None value to a just-started generator
而在之后运行时,send() 会将参数值推入栈中,随后通过 STORE_FAST (或类似指令)赋值给某个变量,如果 yield 没有接收该值的话,就由 POP_TOP 指令直接抛出。你可能一直搞不清楚生成器是先抛出值还是先接收值,现在应该明白了:先 YIELD_VALUE,再 STORE_FAST。
对 yield from 关键字,编译器会生成 GET_YIELD_FROM_ITER、 LOAD_CONST 以及 YIELD_FROM 指令:
# yield_from.py
def g():
res = yield from another_gen
$ python -m dis yield_from.py
…
Disassembly of <code object g at 0x1051117c0, file “yield_from.py”, line 3>:
4 0 LOAD_GLOBAL 0 (another_gen)
2 GET_YIELD_FROM_ITER
4 LOAD_CONST 0 (None)
6 YIELD_FROM
8 STORE_FAST 0 (res)
…
GET_YIELD_FROM_ITER 指令会确保当前栈顶用于 yield from 的对象是一个迭代器。如果不是,则将该对象替换为 iter(obj)。
YIELD_FROM 指令的第一个工作是抛出栈顶的值。通常这个值是 send() 推入的,不过 send() 第一次运行时不会推入值,因此这里有一个 LOAD_CONST 往栈顶推入 None。
之后,YIELD_FROM 指令会检查 yield from 的对象,如果 send() 值为 None,则调用 obj.__next__(),否则调用 obj.send(value)。如果调用抛出 StopIteration 异常,则处理异常:将栈顶对象(即 yield from 的对象)替换为异常值,让函数帧继续执行。如果调用返回的是正常值,则停止执行函数帧,并将该值返回给 send(),同时修改指令指针,使下一次进入函数帧时从 YIELD_FROM 开始执行。之后执行时,变化的只有 yield from 对象的状态以及返回的值。
原生协程本质上就是一个类型不同的生成器对象。区别在于 generator 类实现了 __iter__() 与 __next__() 方法,coroutine 类实现的是 __await__() 方法,而 send() 的实现是一致的。
编译器给 await 表达式生成的字节码指令与 yield from 类似,只是用 GET_AWAITABLE 替换了 GET_YIELD_FROM_ITER :
# await.py
async def coro():
res = await another_coro
$ python -m dis await.py
…
Disassembly of <code object coro at 0x10d96e7c0, file “await.py”, line 3>:
4 0 LOAD_GLOBAL 0 (another_coro)
2 GET_AWAITABLE
4 LOAD_CONST 0 (None)
6 YIELD_FROM
8 STORE_FAST 0 (res)
…
GET_AWAITABLE 指令会检查 yield from 的对象是否为原生协程或基于生成器的协程,如果不是,则替换为 obj.__await__()。
以上就是生成器与协程的基本原理。如果有更多想了解的,建议参考 CPython 源码。关于代码对象的定义,可以参考Include/cpython/code.h,关于函数对象的定义,可以参考 Include/funcobject.h,函数帧的定义可以参考 Include/cpython/frameobject.h ,关于生成器和协程,可以参考 Objects/genobject.c ,而在 Python/ceval.c 中,可以看到不同字节码指令的具体逻辑。
我们已经搞清楚 async/await 的原理,但对运行 async/await 的事件循环还没怎么讨论。工作中,由于比较繁琐,我们一般不会自己写事件循环,而是用一些事件循环库。结束本篇文章之前,让我们一起看看你最可能用到的一个事件循环库。
10. asyncio
引入 async/await 关键字的同时,asyncio 进入了 Python 标准库(PEP 3156)。它本质上就是提供了一个事件循环,以及一些异步编程中用到的类、函数与协程。
asyncio 的事件循环与我们前面写的 EventLoopAsyncAwait 略有不同。EventLoopAsyncAwait 中维护了一个协程队列,然后逐个调用 send(None),协程给出的值统一为 (event, socket),表示协程等待 socket 的 event 事件。然后将 socket 传入选择器中,事件发生后再往队列中增加协程。
asyncio 维护的并不是协程队列,而是一组回调,同时提供了loop.create_task() 及其它方法帮助用户运行协程。我们来看看具体逻辑。
事件循环中的回调分为三种:
已就绪回调,保存在 loop._ready 队列中,可以通过loop.call_soon() 或 loop.call_soon_threadsafe() 方法添加;等待时间的回调,保存在 loop._scheduled 优先队列中,可以通过 loop.call_later() 或 loop.call_at() 方法添加;等待文件描述符的回调,通过选择器管理,可以通过 loop.add_reader() 或 loop.add_writer() 注册;以上这些方法都返回封装了回调的Handle 或 TimerHandle 实例,用户可以通过 handle.cancel() 方法取消回调。TimerHandle 是 Handle 的子类,封装了等待时间的回调,实现了 __le__() 特殊方法,越快就绪的回调,其值越小,因此 loop._scheduled 优先队列可以按时间排序。
loop._run_once() 方法可以迭代处理事件循环,迭代逻辑如下:
从 loop._scheduled 移除已取消的回调;调用 loop._selector.select() 方法,处理事件,将回调添加到 loop._ready 队列中;将时间到了的回调从 loop._scheduled 队列那么,基于回调的事件循环如何运行协程呢?我们一起来看看 loop.create_task()方法。调度协程时,会将其封装在Task实例中,而 Task.__init__() 会调用 loop.call_soon() 方法添加 task.__step() 回调,实际运行协程的是 task.__step()。
task.__step() 方法会调用 coro.send(None) 以运行协程。这个协程并不直接抛出(yield)消息,而是抛出 None 或 Future 实例。抛出 None 表示转移控制权,比如调用了 asyncio.sleep(0) 方法。此时,task.__step() 会将自己重新添加到队列。
抛出 Future实例表示某种操作的结果未就绪,大概就是告诉事件循环:“我在等待这个结果,暂时可能还没好,所以先让出控制权,等结果好了再唤醒我。”
此时,task.__step() 会调用 future.add_done_callback() 方法,往 Future 实例中添加一个回调,等结果就绪时重新将task.__step() 放到任务队列中。如果结果已就绪,则直接执行,否则就等待,直到其它人调用 future.set_result() 方法给出结果。
原生协程不能直接 yield。是不是说 yield Future 实例的时候必须用基于生成器的协程呢?并不是,原生协程可以 await Future 实例,例如:
async def future_waiter():
res = await some_future
为此,Future 实例实现了 __await__() 方法,抛出 Future 实例自身并返回结果:
class Future:
# …
def __await__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
if not self.done():
raise RuntimeError(“await wasnt used with future”)
return self.result() # May raise too.
谁会给出 Future 实例的结果呢?我们可以看一个接收 socket 数据的例子,函数实现如下:
创建一个新的 Future 实例;调用 loop.add_reader() ,为 socket 注册回调,该回调负责从 socket 读取数据,并将其设置为 Future 实例的结果;将 Future 实例返回给调用者;某个任务等待此 Future 实例时,会将其抛出给 task.__step() 方法,而 task.__step() 会往 Future 实例中添加回调,用于在步骤2给出结果时,将任务重新添加到队列。
我们知道,协程可以等待另一个协程的结果:
async def coro():
res = await another_coro()
实际上,协程还可以调度另一个协程,创建 Task 实例并等待:
async def coro():
task = asyncio.create_task(another_coro())
res = await task
Task 是 Future 的子类,因此也是可等待对象。当 coro.send(None) 抛出 StopIteration 异常,task.__step() 会处理异常并给出 Task 实例的结果。
以上就是 asyncio 库的基本工作逻辑。读者应该记住两点,首先,事件循环是基于回调的,对协程的支持也是基于同样的机制;其次,协程并不抛出消息,而是抛出 Future 对象,Future 对象允许协程等待包括 I/O 事件的各种情况。例如,协程可以创建一个独立线程完成复杂的计算任务,并等待其结果。我们也可以基于 socket 实现类似协程,但基于 Future 对象的实现要更优雅,也更通用。
11. 总结
最近几年,async/await 模式变得越来越流行。并发编程的价值日益凸显,而传统并发模式,如系统线程、回调等并不能用于所有场景,有时候编程语言/应用层实现的并发更合适。技术上说,基于回调的事件循环与 async/await 模式一样好,但谁会喜欢写回调呢?
async/await 并不是并发模式的唯一选择,许多人可能更倾向于其它实现。如Go 与 Clojure 中的通信顺序进程模型(CSP),或 Erlang 与 Akka 中的 Actor 模型。不过,在 Python 中,async/await 似乎是目前的最佳选项。
async/await 模式并不是 Python 独有的,C#、JavaScript、Rust 以及 Swift等编程语言中都有实现。这里侧重讨论 Python 中的实现,只是因为我对它更熟悉。客观地说,Python 中的实现并不精致,它把生成器、基于生成器的协程、原生协程、yield from 以及 await 等概念混在一起,增加了理解难度。但你只要理解了这些概念,也就并不复杂了。
asyncio 库很可靠,但也有自己的问题。基于回调的事件循环使其可以同时支持回调模式或 async/await 模式。但如前面的服务器代码所展示的,直接运行协程的事件循环,在实现和使用上都要简单得多。比如 curio 与 trio 库就是直接运行协程的。
总的来说,并发编程本身就比较复杂,没有哪种编程模型可以将其彻底简化。但在一些模型帮助下,可以让并发可控可管理,本文讨论的 Python 的 async/await 模型就是其中一种。
P.S.
本文代码已上传到 github。David Beazley 的《协程与并发》和 Eli Bendersky 的《并发服务器》系列对本文帮助很大。
本篇是 Python 幕后系列的最终篇,目前我在计划写一些其它东西,但或许也还会就本序列写一些续集。如果有什么写作主题建议,可以发到我的邮箱 [email protected]。
2021 年 8 月 27 日更新:并发与并行的关系很微妙,一般来说,并发被看做程序自身的属性,而并行被看做程序执行的属性。因此,并行可能不一定基于并发——即便是看上去顺序执行的程序,在指令与比特层级上也可能是并行的。而任务层级的并行则是并发的一种特例。