Python3高级特性:并发

Published:

最近打算使用 Tornado 做个小 App,官方文档中涉及协程、期物概念,想到之前看的《Fluent Python》书中有讲述这一块,但因长时间未使用,忘了个大概,复习时顺便分享在此,以备不时之需。

前言

可迭代对象、迭代器、生成器

python中的 list, set 等是可迭代对象,可迭代对象使用 iter() 方法可以将其变成迭代器。

可迭代对象有 __iter__ 方法,该方法返回一个迭代器对象,迭代器对象是本身是可迭代对象,因此内部也有 __iter__ 方法,相比“单纯”的可迭代对象,迭代器内部需要实现 __next__ 方法,若该迭代器可以停止,那么 __next__ 方法内部在某种情况下抛出 StopIteration 异常。

另一个可以产生可迭代对象的方法是使用生成器函数,该函数和普通函数不同,python3.3 之前,生成器函数中使用 return 关键字会造成语法错误。Luciano Ramalho在《Fluent Python》中这样诟病

The only syntax distinguishing a plain function from a generator function is the fact that the latter has a yield keyword some‐ where in its body. Some argued that a new keyword like gen should be used for generator functions instead of def, but Guido did not agree。

直观理解上,生成器类是直接返回生成器的类,而生成器函数本身是不断产生数据的生成器。判别生成器函数的方法是看其内部是否有使用 yield 关键字,在 python3 中观察是否使用 yield from 关键字,或迭代生成器(列表生成器中的[]换成())。

使用生成器的优点:惰性加载、节省内存。

协程中的 yiledyield from

函数内部使用 yield 关键字时,若关键字左边可赋值,如: data_in = yield data_out,那么可对该函数可作为协程使用,可对协程对象调用 next().send() 函数,分别对函数中 pull 和 push,pull 和 push 的协作即产生了协程的概念。在 I/O 密集计算上,使用协程可以产生高效的“多线程”并发。

caller, delegating generatorsubgenerator 概念会出现在使用 yieldyield from 的应用中,以此实现灵活、高效的功能。

使用 Futures(期物)处理并发(concurrent)/并行(parallel)

什么是期物

Future 中文翻译为期物,个人理解期物为预期要处理的事物。在 python 中,期物是 concurrent.futuresasyncio 模块的内部组件,python3.4 版本后,期物是标准库 concurrent.futures.Futureasyncio.Future 的实例。二者作用相同,其实例都可以代表一个已经完成或未完成的推迟的任务(Tornado框架中的Future类也是如此)。在程序中,期物不是手动创建的,而手动要做的只是指明某个任务将要去运行,一旦创建了计划任务,其状态是由框架进行控制的。

创建计划任务

创建计划任务时,首先指明执行者,这里使用 Executor。

Executor.submit() 接受可调用函数和函数的参数作为参数,返回期物。

concurrent.futures.Futureasyncio.Future 均有 .done() 方法,该方法不会堵塞调用者,返回 bool 值代表该期物是否已经完成。通常情况下,不会轮询检查并做期物完成后的操作,前者直接使用回调函数 .add_done_callback() 方法,从而期物完成后自动触发某方法,后者则在 yield from 语句之后调用需要继续执行的任务。

两种Future均有 .result() 方法,它返回任务执行结果或抛出的异常。不同点在于 concurrent.futures.Future 的 result 方法可以接受时间参数,超时后抛出超时异常,而 asyncio.Future 若未运行完毕则直接抛出 asyncio.InvalidStateError 异常,因此在 asyncio.Future 中常使用 yield from 获取结果,而 yield from 无法使用在前者。

如果需要获取多个期物的异步执行结果,可以使 futures.as_completed() 函数,它接受包含 Future 实例的可迭代序列,并逐个返回最先完成的任务的期物。

使用 Executor.map 函数返回一迭代器,迭代得到的对象是每个任务的执行结果,因此此时获得的是任务的结果,而不是由该计划任务产生的期物。需要注意的是,其返回执行结果和map进入的顺序是一致的,所以如果执行 Executor.map(sleep_with_seconds, [10, 9, 1]) ,那么在停顿10秒会,立马返回 sleep_with_seconds(10)sleep_with_seconds(9)sleep_with_seconds(1) 的结果。

Executor类型

在python中,有两种Executor:concurrent.futures.ThreadPoolExecutorconcurrent.futures.ProcessPoolExecutor

因为 GIL 的限制,就算有多核 CPU,前者也仅仅是并发,而后者则可以做到并行。而对于 I/O 密集型的运算,前者在单核下也可以达到很好的效果。

在实际编码时,二者明显区别在于,初始化 Executor 实例时,前者需明确指定最大并发数量,而后者并发数量默认为 os.cpu_count() 获取的cpu核数。

在 python3 中,为避免重复,使用更高级 threading 模块,并启用了 thread 模块。 concurrent.futures.ThreadPoolExecutor 则是在 threading 模块上进行封装的,如果需要更灵活的操作,可以使用 threading 模块中的 Thread, Lock, Semaphore 等类进行实施。相应的,multiprocessing 模块则是 concurrent.futures.ProcessPoolExecutor 的实现基础,因此可以使用 multiprocessing 进行实施更灵活的操作。

使用协程处理并发

协程是什么?

和 python 中的多线程(threading)一样,协程也是实现并发(concurrent)的一种方式,而在多线程中,各个线程之间何时切换、如何切换则是由系统自行处理,所以可能会造成一些不可控的现象发生,如中断了不该中断的重要线程或中途产生了不合理的数据等。而在协程中,不存在被中断一说,只有通过 yieldyield from 主动释放 CPU 资源,让步给其他协程,以此实现多线程的并发。

借助多线程理解协程过程

  • 任务创建和运行

多线程使用 threading.Thread(target=func, arg=(*args)) 进行创建一个线程任务,并调用 .start() 附属于主线程运行。

协程的任务不是手动创建,直接使用 asyncio.async(func(*args))loop.create_task(func(*args)) 获取已经计划好启动的 Task 对象。该任务直接驱动协程函数。(Task 是 asyncio.Future 的子类,因此可以将该对象类比为前面讲述的 Executor.submit() 创建的期物对象)

  • 任务的驱动和管理

多线程中,func(*args) 直接由线程回调,在协程中进行驱动任务。

在协程中,协程由 yield from 驱动,每次在 yield from 调用时,控制权限会 back to the main loop

  • 任务的终止

多线程中,没有主动终止线程的API,因为如果实现主动终止,那么终止位置可能在任何地方,这样会造成终止时,系统处于不稳定状态。因此一般借助其它变量触发终止条件。

协程本身就是主动让步,因此在让步时,若外部调用了 Task.cancel() 函数,则将 CancelledError 传递给在 yiled 挂起的位置,以此进行终止。

  • 协程的其它注意点

推荐使用 @asyncio.coroutine 进行修饰协程函数,该方式不是强制性的,但有如下好处:

  1. 显示指明该函数是协程
  2. 当协程函数在非 yiled from 处被垃圾回收时,会触发警告,因为可能因BUG产生而导致某些操作未完成就被回收。

另外,该修饰符不是预启动修饰符(priming decorator),因此可以和 yield from 配合(yield from 内部实现了预启动功能)。在 python3.4 以后引入 asyncio,可简化操作。

当某个协程需要停顿若干时间时,使用 yield from asyncio.sleep(DELAY) 代替多线程中的 time.sleep(DELAY) 以防堵塞所有协程。

透彻理解 asyncio

使用 yield from 驱动 Future, Task 和 协程

从上文可以发现,yield from 既可以驱动 asyncio.Future 又可以驱动 Task 对象,还可以驱动协程对象(调用协程函数返回的是协程对象)。所以如果 foo() 是协程函数或返回 FutureTask 实例的函数,那么就可以这样写 res = yield from foo()。这也是 asyncio 包API中很多地方可以互换协程和期物的原因之一。

为了排定协程的运行计划,需要将协程包装成 Task 对象,有两种两种方式:

  • loop.create_task(coro)

    该方法排定协程的运行时间,返回一个 asyncio.Task 对象。如果在自定义的 BaseEventLoop 子类上调用,返回的对象可能是外部库(如Tornado)中与 Task 类兼容的某个类的实例。

  • asyncio.async(coro_or_future, *, loop=None)

    这个函数统一了协程和期物,如果传入的是 TaskFuture 对象,则原封不动返回;如果传入的是协程,则调用 loop.create_task(coro) 方法创建 Task 对象。loop= 参数用于传入时间循环,如果未传入,那么 async 函数会通过调用 asyncio.get_event_loop() 函数获得循环对象。

asyncio 包中有许多函数会自动把参数指定的协程包装在 asyncio.Task 对象中,例如 BaseEventLoop.run_until_complete() 方法。

In this documentation, some methods are documented as coroutines, even if they are plain Python functions returning a Future. This is intentional to have a freedom of tweaking the implementation of these functions in the future. - 摘自 python-docs

yield from 在做什么

  • yeild from 充当管道,直至最深处的 yield ,协程,TaskFuture 可以相互嵌套,用管道相链接,整个“管道系统”的运行方式就是协程的运行方式。

  • 使用 yield from 链接的多个协程最终必须由不是协程的调用方驱动,调用方显式或隐式的调用 next().send() 方法(如在 loop.run_until_conplete() 中的 for 循环)进行驱动协程。

  • yield from 最内层的子生成器必须是简单的生成器(只使用 yield)或迭代对象(如 asyncio.sleep() 函数或 aiohttp.request() 函数或自己实现的函数)。

  • 每当 yiled from 后的阻塞操作执行完毕后,会调用底层 API 通知 main loop ,主循环会安排下次排定时间。

  • 概括起来,使用 asyncio 包时,我们编写的异步代码中包含由 asyncio 本身驱动的协程(即 delegating generator),而生成器最终把任务委托给了 asyncio 或第三方库中 subgenerator (即上面提到的 asyncio.sleep() 函数或 aiohttp.request() 函数),而 caller 则是最外层可以由 loop.run_until_conplete() 驱动的非协程方,若需要驱动多个协程,则先用 asyncio.wait() 进行封装成协程并返回。

其它

  • asyncio 中使用 Executor

    往本地写文件时,若文件比较大,便会造堵塞,此时使用 loop.run_in_executor(executor, func, *args) 即可解决。

  • 相比回调处理异步,协程的优缺点
    • 优点:避免回调地域
      • 异步代码书写在同一个函数体内,使代码具有更好地易读性
      • 各协程之间的变量容易控制和分享
      • 异常处理更加方便
    • 缺点
      • 适应 yield from 的使用
      • 协程任务需要排定
  • 期务和协程的例子