责任编辑Sonbhadra讲诉Python 3.5后再次出现的async/await的采用方式,和它的许多采用目地,假如严重错误,热烈欢迎尖萼。
昨晚看见David Beazley在16年的两个演说:Fear and Awaiting in Async,给了我许多的体悟和启迪,只好想剖析下他们的路子,因此有了下列这首诗。
Python在3.5版中导入了有关PulseAudio的句法糖async和await,有关PulseAudio的基本概念能先看我在上一首诗提及的文本。
瞧瞧Python中常用的三种表达式方式:
1. 一般表达式
def function():
return 1
2. 计算机程序表达式
def generator():
yield 1
在3.5后,他们能采用async润色将一般表达式和计算机程序表达式包装袋成触发器表达式和触发器计算机程序。
3. 触发器表达式(PulseAudio)
async def async_function():
return 1
4. 触发器计算机程序
async def async_generator():
yield 1
通过类型判断能验证表达式的类型
import types
print(type(function) is types.FunctionType)
print(type(generator()) is types.GeneratorType)
print(type(async_function()) is types.CoroutineType)
print(type(async_generator()) is types.AsyncGeneratorType)
直接调用触发器表达式不会返回结果,而是返回两个coroutine对象:
print(async_function())
# <coroutine object async_function at 0x102ff67d8>
PulseAudio需要通过其他方式来驱动,因此能采用这个PulseAudio对象的send方式给PulseAudio发送两个值:
print(async_function().send(None))
不幸的是,假如通过上面的调用会抛出两个异常:
StopIteration: 1
因为计算机程序/PulseAudio在正常返回退出时会抛出两个StopIteration异常,而原来的返回值会存放在St
try:
async_function().send(None)
except StopIteration as e:
print(e.value)
# 1
通过上面的方式来新建两个run表达式来驱动PulseAudio表达式:
def run(coroutine):
try:
coroutine.send(None)
except StopIteration as e:
return e.value
在PulseAudio表达式中,能通过await句法来挂起自身的PulseAudio,并等待另两个PulseAudio完成直到返回结果:
async def async_function():
return 1
async def await_coroutine():
result = await async_function()
print(result)
run(await_coroutine())
# 1
要注意的是,await句法只能出现在通过async润色的表达式中,否则会报SyntaxError严重错误。
而且await后面的对象需要是两个Awaitable,或者实现了相关的协议。
查看Awaitable抽象类的代码,表明了只要两个类实现了__await__方式,那么通过它构造出来的实例就是两个Awaitable:
class Awaitable(metaclass=ABCMeta):
__slots__ = ()
@abstractmethod
def __await__(self):
yield
@classmethod
def __subclasshook__(cls, C):
if cls is Awaitable:
return _check_methods(C, “__await__”)
return NotImplemented
而且能看见,Coroutine类也继承了Awaitable,而且实现了send,throw和close方式。因此await两个调用触发器表达式返回的PulseAudio对象是合法的。
class Coroutine(Awaitable):
__slots__ = ()
@abstractmethod
def send(self, value):
…
@abstractmethod
def throw(self, typ, val=None, tb=None):
…
def close(self):
…
@classmethod
def __subclasshook__(cls, C):
if cls is Coroutine:
return _check_methods(C, __await__, send, throw, close)
return NotImplemented
接下来是触发器计算机程序,来看两个例子:
假如我要到一家超市去购买土豆,而超市货架上的土豆数量是有限的:
class Potato:
@classmethod
def make(cls, num, *args, **kws):
potatos = []
for i in range(num):
potatos.append(cls.__new__(cls, *args, **kws))
return potatos
all_potatos = Potato.make(5)
现在我想要买50个土豆,每次从货架上拿走两个土豆放到篮子:
def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
sleep(.1)
else:
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break
def buy_potatos():
bucket = []
for p in take_potatos(50):
bucket.append(p)
对应到代码中,就是迭代两个计算机程序的模型,显然,当货架上的土豆不够的时候,这时只能够死等,而且在上面例子中等多长时间都不会有结果(因为一切都是同步的),也许能用多进程和多线程解决,而在现实生活中,更应该像是这样的:
async def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
await ask_for_potato()
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break
当货架上的土豆没有了后,我可以询问超市请求需要更多的土豆,这时候需要等待一段时间直到生产者完成生产的过程:
async def ask_for_potato():
await asyncio.sleep(random.random())
all_potatos.extend(Potato.make(random.randint(1, 10)))
当生产者完成和返回后,这是便能从await挂起的地方继续往下跑,完成消费的过程。而这整两个过程,就是两个触发器计算机程序迭代的流程:
async def buy_potatos():
bucket = []
async for p in take_potatos(50):
bucket.append(p)
print(fGot potato {id(p)}…)
async for句法表示他们要后面迭代的是两个触发器计算机程序。
def main():
import asyncio
loop = asyncio.get_event_loop()
res = loop.run_until_complete(buy_potatos())
loop.close()
用asyncio运行这段代码,结果是这样的:
Got potato 4338641384...
Got potato 4338641160...
Got potato 4338614736...
Got potato 4338614680...
Got potato 4338614568...
Got potato 4344861864...
Got potato 4344843456...
Got potato 4344843400...
Got potato 4338641384...
Got potato 4338641160...
…
既然是触发器的,在请求后不一定要死等,而是能做其他事情。比如除了土豆,我还想买番茄,这时只需要在事件循环中再添加两个过程:
def main():
import asyncio
loop = asyncio.get_event_loop()
res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))
loop.close()
再来运行这段代码:
Got potato 4423119312...
Got tomato 4423119368...
Got potato 4429291024...
Got potato 4421640768...
Got tomato 4429331704...
Got tomato 4429331760...
Got tomato 4423119368...
Got potato 4429331760...
Got potato 4429331704...
Got potato 4429346688...
Got potato 4429346072...
Got tomato 4429347360...
…
瞧瞧AsyncGenerator的定义,它需要实现__aiter__和__anext__两个核心方式,和asend,athrow,aclose方式。
class AsyncGenerator(AsyncIterator):
__slots__ = ()
async def __anext__(self):
…
@abstractmethod
async def asend(self, value):
…
@abstractmethod
async def athrow(self, typ, val=None, tb=None):
…
async def aclose(self):
…
@classmethod
def __subclasshook__(cls, C):
if cls is AsyncGenerator:
return _check_methods(C, __aiter__, __anext__,
asend, athrow, aclose)
return NotImplemented
触发器生成器是在3.6后才有的特性,同样的还有触发器推导表达式,因此在上面的例子中,也能写成这样:
bucket = [p async for p in take_potatos(50)]
类似的,还有await表达式:
result = [await fun() for fun in funcs if await condition()]
除了表达式之外,类实例的一般方式也能用async句法润色:
class ThreeTwoOne:
async def begin(self):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return
async def game():
t = ThreeTwoOne()
await t.begin()
print(start)
实例方式的调用同样是返回两个coroutine:
function = ThreeTwoOne.begin
method = function.__get__(ThreeTwoOne, ThreeTwoOne())
import inspect
assert inspect.isfunction(function)
assert inspect.ismethod(method)
assert inspect.iscoroutine(method())
同理还有类方式:
class ThreeTwoOne:
@classmethod
async def begin(cls):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return
async def game():
await ThreeTwoOne.begin()
print(start)
根据PEP 492中,async也能应用到上下文管理器中,__aenter__和__aexit__需要返回两个Awaitable:
class GameContext:
async def __aenter__(self):
print(game loading…)
await asyncio.sleep(1)
async def __aexit__(self, exc_type, exc, tb):
print(game exit…)
await asyncio.sleep(1)
async def game():
async with GameContext():
print(game start…)
await asyncio.sleep(2)
在3.7版,contextlib中会新增两个asynccontextmanager装饰器来包装袋两个实现触发器协议的上下文管理器:
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_connection():
conn = await acquire_db_connection()
try:
yield
finally:
await release_db_connection(conn)
async润色符也能用在__call__方式上:
class GameContext:
async def __aenter__(self):
self._started = time()
print(game loading…)
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
print(game exit…)
await asyncio.sleep(1)
async def __call__(self, *args, **kws):
if args[0] == time:
return time() – self._started
async def game():
async with GameContext() as ctx:
print(game start…)
await asyncio.sleep(2)
print(game time:, await ctx(time))
await和yield from
Python3.3的yield from句法能把计算机程序的操作委托给另两个计算机程序,计算机程序的调用方能直接与子计算机程序进行通信:
def sub_gen():
yield 1
yield 2
yield 3
def gen():
return (yield from sub_gen())
def main():
for val in gen():
print(val)
# 1
# 2
# 3
利用这一特性,采用yield from能够编写出类似PulseAudio效果的表达式调用,在3.5之前,asyncio正是采用@asyncio.coroutine和yield from句法来创建PulseAudio:
# https://docs.python.org/3.4/library/asyncio-task.html
import asyncio
@asyncio.coroutine
def compute(x, y):
print(“Compute %s + %s …” % (x, y))
yield from asyncio.sleep(1.0)
return x + y
@asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print(“%s + %s = %s“ % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
然而,用yield from容易在表示PulseAudio和计算机程序中混淆,没有良好的语义性,因此在Python 3.5推出了更新的async/await表达式来作为PulseAudio的句法。
因此类似下列的调用是等价的:
async with lock:
…
with (yield from lock):
…
######################
def main():
return (yield from coro())
def main():
return (await coro())
那么,怎么把计算机程序包装袋为两个PulseAudio对象呢?这时候能用到types包中的coroutine装饰器(假如采用asyncio做驱动的话,那么也能采用asyncio的coroutine装饰器),@types.coroutine装饰器会将两个计算机程序表达式包装袋为PulseAudio对象:
import asyncio
import types
@types.coroutine
def compute(x, y):
print(“Compute %s + %s …” % (x, y))
yield from asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print(“%s + %s = %s“ % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
尽管两个表达式分别采用了新旧句法,但他们都是PulseAudio对象,也分别称作native coroutine和generator-based coroutine,因此不用担心句法问题。
下面观察两个asyncio中Future的例子:
import asyncio
future = asyncio.Future()
async def coro1():
await asyncio.sleep(1)
future.set_result(data)
async def coro2():
print(await future)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
coro1(),
coro2()
]))
loop.close()
两个PulseAudio在在事件循环中,PulseAudiocoro1在执行第一句后挂起自身切到asyncio.sleep,而PulseAudiocoro2一直等待future的结果,让出事件循环,计时器结束后coro1执行了第二句设置了future的值,被挂起的coro2恢复执行,打印出future的结果data。
future能被await证明了future对象是两个Awaitable,进入Future类的源码能看见有一段代码显示了future实现了__await__协议:
class Future:
…
def __iter__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), “yield from wasnt used with future”
return self.result() # May raise too.
if compat.PY35:
__await__ = __iter__ # make compatible with await expression
当执行await future这行代码时,future中的这段代码就会被执行,首先future检查它自身是否已经完成,假如没有完成,挂起自身,告知当前的Task(任务)等待future完成。
当future执行set_result方式时,会触发下列的代码,设置结果,标记future已经完成:
def set_result(self, result):
…
if self._state != _PENDING:
raise InvalidStateError({}: {!r}.format(self._state, self))
self._result = result
self._state = _FINISHED
self._schedule_callbacks()
最后future会调度自身的回调表达式,触发Task._step()告知Task驱动future从之前挂起的点恢复执行,不难看出,future会执行下面的代码:
class Future:
…
def __iter__(self):
…
assert self.done(), “yield from wasnt used with future”
return self.result() # May raise too.
最终返回结果给调用方。
前面讲了那么多有关asyncio的例子,那么除了asyncio,就没有其他PulseAudio库了吗?asyncio作为python的标准库,自然受到很多青睐,但它有时候还是显得太重量了,尤其是提供了许多复杂的轮子和协议,不便于采用。
你能理解为,asyncio是采用async/await句法开发的PulseAudio库,而不是有asyncio才能用async/await,除了asyncio之外,curio和trio是更加轻量级的替代物,而且也更容易采用。
curio的作者是David Beazley,下面是采用curio创建tcp server的例子,据说这是dabeaz理想中的两个触发器服务器的样子:
from curio import run, spawn
from curio.socket import *
async def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
print(Server listening at, address)
async with sock:
while True:
client, addr = await sock.accept()
await spawn(echo_client, client, addr)
async def echo_client(client, addr):
print(Connection from, addr)
async with client:
while True:
data = await client.recv(100000)
if not data:
break
await client.sendall(data)
print(Connection closed)
if __name__ == __main__:
run(echo_server, (,25000))
无论是asyncio还是curio,或者是其他触发器协程库,在背后往往都会借助于IO的事件循环来实现触发器,下面用几十行代码来展示两个简陋的基于事件驱动的echo服务器:
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from selectors import DefaultSelector, EVENT_READ
selector = DefaultSelector()
pool = {}
def request(client_socket, addr):
client_socket, addr = client_socket, addr
def handle_request(key, mask):
data = client_socket.recv(100000)
if not data:
client_socket.close()
selector.unregister(client_socket)
del pool[addr]
else:
client_socket.sendall(data)
return handle_request
def recv_client(key, mask):
sock = key.fileobj
client_socket, addr = sock.accept()
req = request(client_socket, addr)
pool[addr] = req
selector.register(client_socket, EVENT_READ, req)
def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
selector.register(sock, EVENT_READ, recv_client)
try:
while True:
events = selector.select()
for key, mask in events:
callback = key.data
callback(key, mask)
except KeyboardInterrupt:
sock.close()
if __name__ == __main__:
echo_server((,25000))
验证一下:
# terminal 1$ nc localhost25000
hello world
hello world
# terminal 2
$ nc localhost 25000
hello world
hello world
现在知道,完成触发器的代码不一定要用async/await,采用了async/await的代码也不一定能做到触发器,async/await是PulseAudio的句法糖,使PulseAudio之间的调用变得更加清晰,采用async润色的函数调用时会返回两个PulseAudio对象,await只能放在async润色的表达式里面采用,await后面必须要跟着两个PulseAudio对象或Awaitable,await的目地是等待PulseAudio控制流的返回,而实现暂停并挂起表达式的操作是yield。
个人认为,async/await和PulseAudio是Python未来实现触发器编程的趋势,他们Sonbhadra在更多的地方看见他们的身影,例如PulseAudio库的curio和trio,web框架的sanic,数据库驱动的asyncpg等等…在Python 3主导的今天,作为开发者,应该及时拥抱和适应新的变化,而基于async/await的PulseAudio凭借良好的可读性和易用性日渐登上舞台,看见这里,你还不赶紧上车?
参考:
PEP 492PEP 525