开源技术 * IBM 微讲堂:Kubeflow 系列(10 月 29 日,8:00PM) 了解详情

协程和 asyncio

在本系列的 第 1 部分 中,您了解了 Python 迭代器;在 第 2 部分 中,您了解了 itertools。在这一部分,将了解一种称为协程(Coroutines)的特殊的生成器函数。您还将了解另一个功能强大但十分复杂的标准库模块: asyncio

想象您走进一家小餐馆。餐馆里有 3 张桌子,且仅有一位服务员。您知道将会发生什么。服务员为您递上菜单,然后回来取走您的订单。在厨师准备好您订购的食物后,服务员会将其端给您。用完餐后,服务员会将账单带给您,并在您准备好付款时回到您的桌边。

其他桌的客人正在愉快地用餐,因为当您考虑点什么菜时,或者在厨师准备您的食物时,抑或在您用餐时,服务员可以对其他桌的客人执行这些步骤。当多个桌子的客人同时需要服务员的关注时,您可能需要等几分钟,但等待时间不会太长。

如果只有一桌客人,从客人走进餐馆到离开餐馆平均需要 1 小时的时间;如果还有另一桌客人,可能需要花费 1 小时 10 分钟的时间;如果其他两桌都有客人,则会花费 1 小时 20 分钟的时间。这种情况不算太糟糕。

现在假设您走进了餐馆,但服务员只关注一桌客人,直到他们完成所有的步骤。您甚至可能需要等待两个小时后才开始用餐,因为服务员率先处理了其他桌客人的用餐。这不会是一家受欢迎的餐馆。

同步与异步

令人惊讶的是,我们编写的许多计算机代码的工作方式类似于这家效率极低的餐馆。在计算机术语中,不受欢迎的餐馆情形称为串行操作,服务员的操作称为同步操作。我们习惯的餐馆情形是,服务员可以同时关注不同桌子的客人,满足他们的需求,这种情形称为并行操作,服务员的操作称为异步操作。

我在这个类比上花这么多时间的原因是,它阐明了一项最重要的技术,开发人员应该适当学习这项技术,以便编写能够使用以输入/输出 (I/O) 为基础的数据库、网络和其他这类资源的可扩展应用程序。现实世界的餐馆之所以使用异步流程,是因为如果不这样做,它们就不会有吸引力,也不会有竞争力。理想情况下,实际程序倾向于使用异步流程,但开发人员需要利用正确的工具、库、技能和实践才能实现异步流程。本教程是教程系列的第 3 部分,将介绍如何在 Python 中这么做。

我想说的是,Python 支持异步编程的工具纷繁复杂,有许多不同的方法来实现异步编程。这些工具中有些是最近才添加的,有些辅助功能尚处于试验阶段。不过,这个主题很重要,值得坚持。我会通过这些工具中的一个实用子集,有意识地引导您熟悉基本概念,然后您就可以自行探索其他方法。

协程

您在前面的教程中了解了生成器函数,以及它们与常规函数的区别。当调用者调用常规函数时,流程是从顶部开始的,并根据函数的逻辑从某个位置退出。借助生成器,调用者可以多次进出单个函数,暂停并恢复其执行。

可以多次进出、每次暂停并恢复执行的函数被称为协程。生成器只是一种简化的协程。Python 有多种类型的协程,但本教程的重点是设计用来支持异步编程的协程类型。让我们回到餐馆的类比上。每桌的菜单/下单/用餐/结账/付款步骤是一个单独的协程,但服务员会暂停并重新关注每桌的客人,以便所有 3 个协程能同时运行(可能处于流程的不同阶段)。服务员训练有素的大脑充当着处理这些并行协程的调度程序。

在同步餐馆中,所有操作都是常规函数。在客人到达时,进入函数一次;在客人离开时,退出函数一次。一次只能运行一个这样的函数,所以客人可能需要等上两个小时才能开始自己的用餐体验。

在异步餐馆中,会在客人到达时首次进入协程函数,创建一个协程对象,并在客人离开时最终退出函数,此时不再需要协程对象。但是,在服务员提供菜单后,他们可以暂停这桌客人的协程对象,检查是否需要关注其他任何桌子的客人。在任何指定桌子的客人执行下单、结账等步骤后,服务员也会执行同样的操作。

餐馆服务员代码

利用 Python 的可读性几乎与伪代码一样的事实,下面给出了服务员协程的一个实际实现。

async def serve_table(table_number):
    await get_menus()
    print('Welcome.Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

此函数是使用 async def 而不只是使用 def 定义的。这会将它标记为异步协程函数。顺便提一下,还有一些异步协程 生成器 函数,其主体中的某处有一个 yield 语句,但这些都是特殊情况,超出了本教程系列的讨论范畴。老实说,Python 3 中众多的函数/生成器/协程类型让人眼花缭乱,但本教程系列会忽略一些可能情况,仅提供一个简单的入门途径。

serve_table 的主体中包含一系列 await 语句。这会从调用的协程函数创建一个协程对象并调用此对象,同时将控制权转交给其他任何准备运行的协程。这相当于餐馆服务员启动了一个流程,比如让厨师开始准备食物,同时检查是否有任何其他桌的客人需要关注。

对任务的这种处理发生在训练有素的服务员的大脑中,在 Python 中,类似情况称为事件循环。我们稍后再介绍事件循环。

更多协程

让我们看看 serve_table 调用的其他协程的实现。

async def get_menus():
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order):
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat():
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

这些函数使用一个睡眠计时器来模拟如何花时间做一些处理。 random.randrange 函数提供了一个整数范围,用于从中随机挑选一个整数。 asyncio.sleep 函数是一个特殊的协程,它会在给定的秒数内暂停操作。当然,在此睡眠期间,事件循环可以自由运行其他任何准备好的协程。像往常一样,您可以使用 await 关键字调用此函数。

此时顺便提一下,您只能在异步协程函数(比如使用 async def 定义的函数)的主体中使用 await 关键字。在其他任何地方使用 await 都是一个语法错误。

请注意, get_order 协程返回了一个值。此值是在调用者的 await 语句中传回的。

一步到位:事件循环

我之前提到过事件循环。您需要使用一些特殊的设置代码来进入异步模式,创建一个事件循环来调度和管理各个协程,就像您将它们编码为协作任务一样。 asyncio 协程也可简化称为任务。当某个协程使用 await 将控制权转交给另一个协程时,它实际上是将控制权转交回事件循环。事件循环就像服务员训练有素的大脑。

以下是运行我们目前为止定义的餐馆服务员协程的代码。

#Create coroutines for three tables
gathered_coroutines = asyncio.gather(
    serve_table(1),
    serve_table(2),
    serve_table(3)
)

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()
#This is the entry from synchronous to asynchronous code.It will block
#Until the coroutine passed in has completed
loop.run_until_complete(gathered_coroutines)
#We're done with the event loop
loop.close()

特殊协程 asyncio.gather 采用一个或多个其他协程,安排它们全部运行,且在所有集合协程都运行完才算完成。这里使用它在事件循环中运行 3 个桌子的协程,我们已经先使用 asyncio.get_event_loop 获得了事件循环。下一行代码运行了给定的协程,直至它完成。由于向该协程传递了收集的 3 个协程,因此在所有 3 个协程都完成后它才结束运行。当然,每个 serve_table 协程都会调用其他协程,比如使用 await 调用 get_menusget_order ,然后使用事件循环来调度这些协程。

完整程序

清单 1. serve_tables.py 是完整的程序
import random
import asyncio

async def get_menus():
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order):
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat():
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

async def serve_table(table_number):
    await get_menus()
    print('Welcome.Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

#Create coroutines for three tables
gathered_coroutines = asyncio.gather(
    serve_table(1),
    serve_table(2),
    serve_table(3)
)

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()
#This is the entry from synchronous to asynchronous code.It will block
#Until the coroutine passed in has completed
loop.run_until_complete(gathered_coroutines)
#We're done with the event loop
loop.close()

这是运行此程序的输出示例。

Welcome.Please sit at table 1 Here are your menus
Welcome.Please sit at table 2 Here are your menus
Table 1 what will you be having today?
Welcome.Please sit at table 3 Here are your menus
Table 3 what will you be having today?
Table 2 what will you be having today?
   [Order ready from kitchen:  Pasta ]
Table 1 here is your meal: Pasta
   [Order ready from kitchen:  Fish & Chips ]
Table 3 here is your meal: Fish & Chips
   [Order ready from kitchen:  Special of the day ]
Table 2 here is your meal: Special of the day
Table 3 here is your check
Table 1 here is your check
Thanks for visiting us! (table 3 )
Thanks for visiting us! (table 1 )
Table 2 here is your check
Thanks for visiting us! (table 2 )

请注意,大部分行之间都会有几秒的延迟。这是各个协程中的睡眠延迟,它模拟了在餐馆中处理事务所花费的时间。时间被压缩,程序中的一秒表示餐馆中的一分钟。由于睡眠延迟时长是随机的,所以每次运行程序时,消息的出现顺序都是不同的。

另请注意,该程序并不总是完全从 1 号桌开始,然后是 2 号桌、3 号桌。 asyncio.gather 协程会调度您为它提供的协程,但没有特定的顺序。

这里要注意的主要事情是协作式多任务的流程。研究上面的完整清单,同时运行并调整代码,直到您完全了解协程如何释放和重获控制权。有时,所有 3 个 serve_table 协程对象都在调用其他某个协程,所有协程都在等待睡眠延迟。在这几秒内,您看不到任何输出。在这些时刻,事件循环会耐心地检查每个协程,查看它们何时可以恢复执行。

添加协程

我提到了如何获得在运行清单 1 中程序获得的输出之间的延迟。显示某种进度指示器会更加方便用户的使用。您可以使用协作式多任务的强大功能来实现此目的。下面的协程函数每秒显示一个点两次,以此作为进度指示器。

async def progress_indicator(delay, loop):
    while True:
        try:
            await asyncio.sleep(delay)
        except asyncio.CancelledError:
            break
        #Print a dot, with no newline afterward & force the output to appear immediately
        print('.', end='', flush=True)
        #Check if this is the last remaining task, and exit if so
        num_active_tasks = [ task for task in asyncio.Task.all_tasks(loop)
                                  if not task.done() ]
        if len(num_active_tasks) == 1:
            break

此函数采用了两个参数:打印点的最短延迟和事件循环对象。例如,在清单 1 底部附近创建的就是这种对象。为了确保一组受控协程之间能够保持协作,您需要将一个循环对象传递给许多 asyncio 。在本例中,将事件循环传递给了 asyncio.Task.all_tasks ,然后,后者返回所有在该事件循环中调度的任务(即协程)的列表,包括那些已完成的任务。为了仅获得未完成的任务,请使用 task.done 进一步筛选该列表。

假设您利用此函数创建了一个协程对象,并传入 0.5 作为延迟。它会直接进入一个无限循环,就像您可能还记得的之前教程中的无限生成器那样。然后,它会调用睡眠延迟,但在外部实体取消该协程(可以通过多种方式)时,会出现异常。在这些情况下,协程会中断并抛出 asyncio.CancelledError 异常,导致我们跳出无限循环。

通常,协程恢复正常后,它会输出一个点,然后检查其他所有协程是否在正常运行。如果 progress_indicator 是唯一剩下的协程,它会跳出无限循环。

清单 2.为了使用 progress_indicator 协程而更新的完整清单
import random
import asyncio

async def get_menus():
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order):
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat():
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

async def progress_indicator(delay, loop):
    while True:
        try:
            await asyncio.sleep(delay)
        except asyncio.CancelledError:
            break
        #Print a dot, with no newline afterward & force the output to appear immediately
        print('.', end='', flush=True)
        #Check if this is the last remaining task, and exit if so
        num_active_tasks = [ task for task in asyncio.Task.all_tasks(loop)
                                  if not task.done() ]
        if len(num_active_tasks) == 1:
            break

async def serve_table(table_number):
    await get_menus()
    print('Welcome.Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()

#Create coroutines for three tables
gathered_coroutines = asyncio.gather(
    serve_table(1),
    serve_table(2),
    serve_table(3),
    progress_indicator(0.5, loop)
)

#This is the entry from synchronous to asynchronous code.It will block
#Until the coroutine passed in has completed
loop.run_until_complete(gathered_coroutines)
#We're done with the event loop
loop.close()

请注意,现在,在创建协程集合之前,事件循环已经出现了。这是因为必须将此循环传递给 progress_indicator ,正如您在要收集的协程列表中看到的那样。

下面的输出来自一个样本运行:

.Welcome.Please sit at table 3 Here are your menus
..Welcome.Please sit at table 2 Here are your menus
Welcome.Please sit at table 1 Here are your menus
........Table 3 what will you be having today?
......Table 1 what will you be having today?
....Table 2 what will you be having today?
..........[Order ready from kitchen:  Fish & Chips ]
Table 3 here is your meal: Fish & Chips
..............[Order ready from kitchen:  Fish & Chips ]
Table 2 here is your meal: Fish & Chips
......[Order ready from kitchen:  Pasta ]
Table 1 here is your meal: Pasta
..............................Table 3 here is your check
Thanks for visiting us! (table 3 )
..........Table 2 here is your check
..Thanks for visiting us! (table 2 )
......................Table 1 here is your check
..........Thanks for visiting us! (table 1 )
.

进度指示器的点会有规律地出现,约半秒出现一个。

这是何种类型的多任务?

如果您在 Python 中实现过多线程或多处理,您可能想知道它们与这种 asyncio 协作式多任务方法有何不同。主要区别在于,在 asyncio 方法中,不会实际尝试让两个协程在同一时间做一些事情,就像餐馆服务员无法在为 3 号桌上餐的同时为 1 号桌提供菜单一样。 asyncio 事件循环所做的是利用任务内的自然停机时间,允许协程在有工作要做时执行工作,但在其他协程空闲时将控制权转交给它们。

协程无法控制它再次运行的时间,此方法称为协作式多任务是有原因的。如果某个协程花了太长时间而没有将控制权转交回事件循环,它会阻塞一切操作,导致不必要的延迟,而且您将失去多任务的优势。这意味着您首先必须确保您的程序适合用这种方式实现,然后必须小心地编写程序代码,将它分解为会在恰当时机相互释放控制权的协程。这可能比听起来还要复杂,因为您可能无意识地从某个协程调用常规函数,这会花费很长时间,而且问题不会很明显。

一般来说, asyncio 事件循环最适合经常联网的程序,或大量查询数据库和类似对象的程序。等待远程服务器或数据库响应请求或查询时,是将控制权释放给事件循环的理想时刻。在过去,程序员倾向于在这种情况下使用线程,但与多线程相比, asyncio 事件循环是一种更加清晰和灵活的编程方式。一个难点是,要充分发挥 asyncio 事件循环的优势,需要在 asyncio 协程中对网络和数据库 API 进行编码。幸运的是,现在许多 Python 第三方库已实现了对 asyncio 的充分利用。

不过,有时您可能会遇到这样的情况:您想要使用 asyncio ,但需要使用不支持 asyncio 的库。换言之,您需要从异步代码中调用同步代码,而不破坏多线程。可以使用在单独的线程或流程中运行同步代码的 asyncio 执行器实现此目的。我提到这一点是因为您可能想了解这一点,但更多的细节超出了这些教程的讨论范畴。

结束语

随着您越来越精通 asyncio ,您会了解到与该技术相关的其他外来概念,包括令人印象深刻的”future”。您还会了解到,协程可通过不同方式将控制权释放给事件循环,包括 async with ,如果您使用的是 Python 3.6 或更高版本,还包括 async for 。由于本教程系列要求的最低版本是 Python 3.5,所以我不会讨论后者,但在下一篇教程中,您将学习 async with ,以及其他很酷的技术。

相关主题

参阅关于 Python 的 技术讲座、Code Pattern 和博客 。 浏览 cognitiveclass.ai 上的 Python 课程

本文翻译自:Coroutines and asyncio(2018-07-25)