1.概要

python下的协程

asynciogevent都是基于携程来进行并发操作的。协程也被称为微线程。 协程只是在单一的线程里进行切换不同的协程,因此无法使用多CPU能力,对于CPU密集型程序还是使用多进程比较好。 协程相比较进程和线程来说占用的内容更少,同样的线程切换更多的是靠操作系统来控制,而协程的执行则由我们自己控制。

并发与异步区分

并发原理:当其中一个协程遇到io等待时,将会切换到另一个协程继续运行。

并发与异步:

并发强调的是N人干同样的事,要保证不争抢 (lock,atomic,synchronize,volatile, cas)

异步强调的是1/N人干不同的事,不该等的别等 (thread pool, future, async,reactive)

看到并发的时候经常会看到异步,原因是一般所说的并发,指的是
【每个业务活动频率很低,但是有大量同时进行的业务活动】
这样用异步代码自己维护每个业务状态,而不劳驾系统通过线程/进程的方式维护每个业务状态的方式,能把这个场景实现得性能更好,内存占用更少。
如果业务活动频率又高,又同时大量进行,那就超出异步的解决范围了。那是分布式处理的范畴。

需要协程原因

1.多线程运行过程容易被打断,有可能出现race condition的情况 2.线程切换本身存在定的消耗,若/0操作非常heavy,多线程很有可能满足不了高效率、高质量的需求。

sync和async的概念区分

  • sync即同步,指操作个接个地执行,下一个操作必须等上一个操作完成后才能执行。
  • asynd即异步,指不同的操作可以交替执行,如果其中某个操作被block了,程序并不会等待,而是会找出可执行的操作继续执行。

Asyncio的缺陷

在实际工作中,要想用好Asyncio,必须得有相应的python库支持。在之前的多线程例子中,我们用到的是requests库,而在这里使用的却是aiohttp库,原因就在于requests库与Asyncio不兼容,但aiohttp库兼容。但是兼容问题会随着版本的问题逐步减少。

此外,使用Asyncio使得我们在任务调度方面有更大的自主权,写代码时就得更加注意,否则容易出现错误。

比如,如果你需要await一系列的操作,就得使用asyncio.gather();如果只是单个的future,则用asyncio.wait()就可以了。那么,对于你的future,是想让它run_until_complete()还是run_forever()呢?此类问题都是在面对具体问题时需要去考虑的。

那么使用asyncio时 如何搭配第三方库

首先,最简单的办法就是搜一下这个库的源码里是否出现了 asyncioasync def 的字样,如果没有出现则几乎可以证明这个库没有对 asyncio 有做特别的支持。为了彻底证实,还应仔细阅读其代码,查看关键 I/O 部分是如何实现的。

对于暂不支持 asyncio 的第三方库,可以按以下步骤依次排查:

1. 确认其 I/O 时间比例是否占到大部分。比如用 SQLAlchemy 时,如果能基本上确保数据库操作都是瞬时的,那么[理论上](http://techspot.zzzeek.org/2015/02/15/asynchronous-python-and-databases/)是可以任由其阻塞主线程的。而对于明显 I/O 占大多数时间且时间不可预测的,比如 requests,就不能让其成为性能瓶颈;
2. 确认其 I/O 的并发能力是否会成为瓶颈。比如说还是用到了 requests,但平均下来每 10 分钟才会发一个请求,其他时间主要都用在数据库计算上了,那么完全可以把 requests [放到线程池里](https://docs.python.org/3/library/asyncio-eventloop.html#executor)去做;
3. 确实需要异步了,首先查找其 asyncio 的扩展,有时会有单独的库做 asyncio 支持,比如 peewee_async;
4. 查找其 asyncio 的替代,比如用 asyncpg 替代 psycopg2;
5. 实在没有了,再次看看线程池的解决方案对性能的影响到底是多少;
6. 如果线程池确实不行,此时一般碰到的库多半都会有自己的并发模型,走到这一步意味着该并发模型并不能兼容 asyncio。此时,可以看看并发库对 asyncio 的支持,比如 [gevent 与 asyncio 的桥接](https://github.com/gevent/gevent/issues/982);
7. 自己改了它吧,或者造新的轮子。

选择多线程还是Asyncio

在面对具体问题时,我们可以按照以下伪代码的规范去选择用多线程还是asyncio:

if io_bound:
    if io_slow:
        print('Use Asyncio')
    else:
        print('Use multi-threading')
else if cpu_bound:
    print('Use multi-processing')
1234567
  • 如果是I/O bound,并且I/O操作很慢,需要很多任务/线程协同实现,那么选用Asyncio更合适(任务难度大)
  • 如果是I/O bound,但是I/O操作很快,且只需要有限任务/线程协同实现,那么选择多线程就行(任务难度小)
  • 如果是CPU bound,则需要选用多进程来提高程序运行效率

总结

不同于多线程,Asyncio是单线程的,但其内部event loop的机制,使得它可以并发运行多个不同的任务,并且比多线程享有更大的自主控制权。

Asyncio中的任务,在运行过程中不会被打断,因此不会出现race condition的情况。

在I/O heavy的情况下,Asyncio的运行效率比多线程更好。这是因为:

  • 任务和线程切换损耗:Asyncio内部任务切换的损耗,远比线程切换的损耗要小
  • 任务和线程数量:Asyncio可以开启的任务数,也远比多线程中的线程数量多得多

2.重新理解Asyncio

Using Asyncio in Python书中接下来给出了asyncio的等级。阶梯越高越难。

img

Coroutines

Event_Loop

Future

Task

3.asyncio异步编程

基于async & await关键字的协程可以实现异步编程,这也是目前python异步相关的主流技术。

3.0 asyncio工作原理

asyncio和python主程序一样,只有一个主线程,但是可以进行多个不同任务(特殊的future对象,可类比为多线程里的多个线程),这些任务被一个event loop的对象所控制。

任务的状态:

预备状态:任务目前空闲,但随时待命准备运行任务的状态 等待状态:任务已经运行,但正在等待外部的操作完成,如I/O操作

执行过程:

选取预备状态的个任务(根据等待时间长短、占用资源等因素选取),使其运行——直到该任务把控制权[执行权、栈的切换]还给event loop为止
当接收到任务控制权后,event loop会根据其完成状态把任务放到对应的预备或等待状态的列表
遍历等待状态的列表,查列表中的任务是否完成(已完成:放到预备状态的列表
未完成:继续放在等待状态的列表)
原先在预备状态列表中的任务位置不变,因为它们仍未运行。当所有任务被重新放置在合适的列表后,新的轮循环又开始了:event loop继续从预备状态的列表中选取一个任务使其执行..如此周而复始,直到所有任务都完成
对于asyncio来说,它的任务在运行时不会被外部的些因素打断,不需要担心线程安全的问题了

3.1 协程快速上手

3.1.1 协程的定义

名词理解:

  • 协程函数:定义形式为 async def函数(coroutine / coro)

  • 协程对象:调用 协程函数 的返回值(实例对象) (一般“协程” 指的就是“协程对象”)

    asyncio文档中特别强调要区分协程函数和协程对象的区别:

    a coroutine function: an async def function; a coroutine object: an object returned by calling a coroutine function.

3.1.2 事件循环event loop

——简单来说,只有 loop 运行了,协程才可能运行。

asyncio和python主程序一样,只有一个主线程,但是可以进行多个不同任务(特殊的future对象,可类比为多线程里的多个线程),这些任务被一个event loop的对象所控制。

事件循环,可以把他当做是一个while循环,这个while循环在周期性的运行并执行一些任务,在特定条件下终止循环。

# 伪代码
"""任务列表 = [ 任务1, 任务2, 任务3,... ]"""
while True:    
	可执行的任务列表已完成的任务列表 = 去任务列表中检查所有的任务'可执行''已完成'的任务返回  
    for 就绪任务 in 已准备就绪的任务列表:  
        执行已就绪的任务  
    for 已完成的任务 in 已完成的任务列表:    
        在任务列表中移除 已完成的任务  
    
    如果 任务列表 中的任务都已完成则终止循环

在编写程序时候可以通过如下代码来获取和创建事件循环。

import asyncio
# 去生成或获取一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到任务列表
loop.run_until_complete(任务)

协程无法直接执行

# 定义一个协程函数
async def main():
	await asyncio.sleep(1)
    return 123

# 狭义上的 coroutine 是指一个 coroutine 实例,通常是调用一个 async def 函数的返回值
coro = func()

注意:调用协程函数时——函数内部代码不会执行,只是会返回一个协程对象

这里的 coro 并不是 123,而是一个 coroutine 实例。此时,main() 函数也并没有开始执行。为了得到结果,一般会这么做:

loop = asyncio.get_event_loop()
rv = loop.run_until_complete(coro)

这时,rv 的值就会在 1 秒钟之后变成 123

先说明一下协程可以做哪些事。协程可以:

  • 等待一个 future 结束

  • 等待另一个协程(产生一个结果,或引发一个异常)

  • 产生一个结果给正在等它的协程

  • 引发一个异常给正在等它的协程

3.2 协程的启动方法

——asyncio.run、await、create_task/ensure_future

3.2.1 run

程序中,如果想要执行协程函数的内部代码,需要 事件循环协程对象 配合才能实现:

协程对象——当成任务添加到事件循环——处理async协程函数内部代码

如:

import asyncio
async def func():   
	print("协程内部代码")
    
# 调用协程函数,返回一个协程对象。
result = func()
	
# 方式一(python3.7 之前)【兼容性比较多的情况的写法】
# 创建一个事件循环
# loop = asyncio.get_event_loop() 
# 将协程当做任务提交到事件循环的任务列表中,协程执行完成之后终止。
# loop.run_until_complete(result_obj_asyc) 

# 方式二(python3.7以后)
# 本质上方式一是一样的,内部先创建事件循环 然后执行 run_until_complete,一个简便的写法。
# asyncio.run 函数在 Python 3.7 中加入 asyncio 模块,
asyncio.run(result)

这个过程可以简单理解为:将协程当做任务添加到 事件循环 的任务列表,然后事件循环检测列表中的协程是否 已准备就绪(默认可理解为就绪状态),如果准备就绪则执行其内部代码。

3.2.2 await

  • 让渡执行权:await是一个只能在协程函数体内使用的关键字,用于遇到IO操作时挂起 当前协程(任务),当前协程(任务)挂起过程中 事件循环可以去执行其他的协程(任务),当前协程IO处理完成时,可以再次切换回来执行await之后的代码。

    PS:await语句是生成器中yield from 的代替品。

await + 可等待对象(协程对象、Future、 Task对象 ——他们都可当做:IO等待)

理解await—— 等:

1.等它后面的可等待对象<给值了>,再继续往下走《下一步要依赖上一步结果》

2.等待过程中依然会切换到其他任务去执行

**示例1:**在一个外层协程函数的定义体中,通过await语句执行协程。

import asyncio
async def func():   
	print("执行协程函数内部代码")    
    # 遇到IO操作挂起当前协程(任务),(当其他任务已经执行完了,或者)<事件循环>检测到IO操作完成之后-有结果了,蔡再继续往下执行。   
    # 当前协程挂起时,事件循环可以去执行其他协程(任务)。   
    response = await asyncio.sleep(2)  # 模拟IO阻塞,产生等待
    print("IO请求结束,结果为:", response)  # None
    
result = func()

asyncio.run(result)

**示例2:**await后再跟一个(嵌套一个) 协程对象()

import asyncio
async def others():  
	print("start") 
	await asyncio.sleep(2) 
	print('end')  
	return '返回值'

# 在一个外层协程函数的定义体中,通过await语句执行协程:
async def outter_coro(): 
	print("执行协程函数内部代码")  
	# 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。 
	response = await others()  # 调用上面的协程函数内部代码

    
asyncio.run( outter_coro() )  # 加入协程对象,运行

示例3: 一个协程函数里可有 多个await

import asyncio

async def others():    
    print("start")    
    await asyncio.sleep(2)   # IO等待
    print('end') 
    return '返回值'

# 在一个外层协程函数的定义体中,通过await语句执行协程:
async def func():   
    print("执行协程函数内部代码")    
    # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。  
    response1 = await others()  # 下面不会立即执行
    print("IO请求结束,结果为:", response1) 
    response2 = await others() 
    print("IO请求结束,结果为:", response2)
    
asyncio.run( func() )

说明:

  • **非并发:**上述的所有示例都只是创建了一个任务,即:事件循环的任务列表中只有一个任务,所以在IO等待时无法演示切换到其他任务效果。

  • main() 函数中直接用 await 调用协程函数,就是普通的函数调用。

在程序想要创建多个任务对象,需要使用Task对象来实现。

3.3 多协程并发执行

——asyncio.gather、wait、as_completed

3.2.3 Task对象

在并发任务中自动切换(相当于并发)多个任务:任务列表 = [ 任务1, 任务2, 任务3,… ]

Tasks are used to schedule coroutines concurrently.

When a coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon。

Tasks用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task()ensure_future() 函数。不建议手动实例化 Task 对象。

本质上是将协程对象封装成task对象,并将协程立即加入事件循环,同时追踪协程的状态。

注意:asyncio.create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用【低层级】的 asyncio.ensure_future() 函数。

示例1: 方法:在一个外层协程函数中使用await语句运行协程。

import asyncio

async def func():  
    print(1) 
    await asyncio.sleep(2)  
    print(2)  
    return "返回值"

async def main(): 
    print("main开始") 
    
    # 创建协程(Task对象),将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task1 = asyncio.create_task(func()) 
    ## 创建Task对象,将当前执行func函数任务添加到事件循环
    task2 = asyncio.create_task(func()) 
    
    print("main结束")   
    
    # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。 
    # 此处的await是等待相对应的协程全都执行完毕(_state会表示状态)并获取结果  
    ret1 = await task1   
    ret2 = await task2  
    print(ret1, ret2)
    
asyncio.run( main() )

示例2: Task_List创建并装入task对象(多个协程对象)

import asyncioasync 
def func():  
    print(1)   
    await asyncio.sleep(2)  
    print(2)  
    return "返回值"

# 主函数
async def main():  
    print("main开始")  
    # 创建协程,将协程封装到一个Task对象中并添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。  
    # 等待列表中所有任务执行完毕
    task_list = [        
        asyncio.create_task(func(), name="n1"),  # 创建task时命名name
        asyncio.create_task(func(), name="n2")  
    ]   
    print("main结束")  
    
    ### await不能直接等上面的task列表
    # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。   
    # 此处的await是等待所有协程执行完毕,并将所有协程的返回值保存到done 
    # 如果设置了timeout值,则意味着此处最多等待的秒,完成的协程返回值写入到done中(集合),未完成则写到pending中。  
    
    # 拿到返回的是集合(无序不可重复)——返回future对象,不返回直接结果
    done, pending = await asyncio.wait(task_list, timeout=None)  # 默认None
    print(done, pending)
    
asyncio.run(main())  # 事件循环已经创建

注意:asyncio.wait 源码内部会对列表中的每个协程执行ensure_future从而封装为Task对象,所以在和wait配合使用时task_list的值为[func(),func()] 也是可以的。

asyncio.wait() :接受一个包含awaitable的列表(其中的协程会自动包装为Task),并发执行所有awaitable。

示例3: asyncio.create_task 放在run(再创建事件循环)

import asyncio
async def func():  
    print("执行协程函数内部代码") 
    # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。  
    response = await asyncio.sleep(2)    
    print("IO请求结束,结果为:", response)
    
coroutine_list = [func(), func()]  # 只放入协程对象

# 错误:coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ] 
# 此处不能直接 【asyncio.create_task】,因为将Task【立即加入到事件循环】的任务列表,
# 但此时事件循环还未创建,所以会报错。

# 使用asyncio.wait将列表封装为一个协程,并调用asyncio.run实现执行两个协程
# asyncio.wait内部会对列表中的每个协程执行ensure_future,封装为Task对象。

# 生成事件循环loop后会帮你创建
done,pending = asyncio.run( asyncio.wait(coroutine_list) )

3.2.4 asyncio.Future对象

A Futureis a special low-level awaitable object that represents an eventual result of an asynchronous operation.

asyncio中的Future对象是一个相对更偏向底层的可对象,通常我们不会直接用到这个对象,而是直接使用Task对象来完成任务的并和状态的追踪。( Task 是 Futrue的子类 )

Future为我们提供了异步编程中的 最终结果 的处理(Task类也具备状态处理的功能)。

示例1:

async def main():  
	# 获取当前事件循环    
	loop = asyncio.get_running_loop()  
	## 创建一个任务(Future对象),这个任务什么都不干。   
	fut = loop.create_future()   
	# 等待任务最终结果(Future对象),没有结果则会一直等下去。 
    await fut
    
await futasyncio.run(main())

示例2:

import asyncio
async def set_after(fut):   
    await asyncio.sleep(2)   
    fut.set_result("666")  # 对Future对象产生结果,fut就结束了
    
async def main():   
    # 获取当前事件循环   
    loop = asyncio.get_running_loop()    
    # 1、创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
    fut = loop.create_future()   
    # 2、创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
    # 即手动设置future任务的最终结果,那么fut就可以结束了。  
    await loop.create_task( set_after(fut) )  
    
    # 等待 Future对象获取 最终结果,否则一直等下去    
    data = await fut   
    print(data)
    
asyncio.run(main())

Future对象本身函数进行绑定(用于夯住函数-等待结果),所以想要让事件循环获取Future的结果,则需要手动设置。而Task对象继承了Future对象,其实就对Future进行扩展,他可以实现在对应绑定的函数执行完成之后,自动执行set_result,从而实现自动结束。

虽然,平时使用的是Task对象,但对于结果的处理本质是基于Future对象来实现的。

扩展:支持 await 对象语 法的对象可成为可等待对象,所以 协程对象Task对象Future对象 都可以被成为可等待对象。

3.2.5 futures.Future对象

在Python的concurrent.futures模块中也有一个Future对象,这个对象是基于线程池进程池实现异步操作时使用的对象。

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def func(value): 
    time.sleep(1)  
    print(value)
    # return 123  # 不写也默认return None
    
pool = ThreadPoolExecutor(max_workers=5)  # 只创建5个线程
# 或 pool = ProcessPoolExecutor(max_workers=5)

for i in range(10):  
    fut = pool.submit(func, i)  # 每次往线程池提交任务申请,就返回一个Future对象(但刚开始并没有处理得结果,只是个future对象,在线程池内部执行完之后(return),赋值给Future对象)
    print(fut)  # 但没有

两个Future对象是不同的,他们是为不同的应用场景而设计,例如:concurrent.futures.Future不支持await语法 等。

相同点:都是为了等待结果

官方提示两对象之间不同:

unlike asyncio Futures, concurrent.futures.Future instances cannot be awaited.

asyncio.Future.result() and asyncio.Future.exception() do not accept the timeout argument.

asyncio.Future.result() and asyncio.Future.exception() raise an InvalidStateError exception when the Future is not done.

Callbacks registered with asyncio.Future.add_done_callback() are not called immediately. They are scheduled with loop.call_soon() instead.

asyncio Future is not compatible with the concurrent.futures.wait() and concurrent.futures.as_completed() functions.

在Python提供了一个将futures.Future 对象包装成asyncio.Future对象的函数 asynic.wrap_future

接下里你肯定问:为什么python会提供这种功能?

其实,一般在程序开发中我们要么统一使用 asycio 的协程实现异步操作、要么都使用进程池和线程池实现异步操作。但如果 协程的异步进程池/线程池的异步 混搭,那么就会用到此功能了。

以后写代码存在交叉时间:例如crm项目80%都是基于协程异步编程+MySQL(不支持)——需要线程、进程做异步编程

import time
import asyncio
import concurrent.futures

def func1():  
    # 某个耗时操作  
    time.sleep(2)  
    return "SB"

async def main():   
    loop = asyncio.get_running_loop()    
    
    # 1. Run in the default loop's executor ( 默认ThreadPoolExecutor )  
    # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象  
    # 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。  
    # 因为concurrent.futures.Future对象不支持await语法,所以需要包装为 asycio.Future对象 才能使用。  
    fut = loop.run_in_executor(None, func1)  ## 默认会创建一个线程池,放入函数;
    result = await fut   
    print('default thread pool', result) 
    
    # 2. Run in a custom thread pool:   
    # with concurrent.futures.ThreadPoolExecutor() as pool:   
    #     result = await loop.run_in_executor(pool, func1)  # 传入线程池
    #     print('custom thread pool', result)  
    
    # 3. Run in a custom process pool:   
    # with concurrent.futures.ProcessPoolExecutor() as pool:   
    #     result = await loop.run_in_executor(pool, func1)  # 传入进程池
    #     print('custom process pool', result)
    
asyncio.run( main() )

应用场景:当项目以协程式的异步编程开发时,如果要使用一个第三方模块,而第三方模块不支持协程方式异步编程时,就需要用到这个功能,例如:

import asyncioimport requestsasync def download_image(url):    # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)    print("开始下载:", url)    loop = asyncio.get_event_loop()    # requests模块默认不支持异步操作,所以就使用线程池来配合实现了。    future = loop.run_in_executor(None, requests.get, url)    response = await future    print('下载完成')    # 图片保存到本地文件    file_name = url.rsplit('_')[-1]    with open(file_name, mode='wb') as file_object:        file_object.write(response.content)if __name__ == '__main__':    url_list = [        'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',        'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',        'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'    ]    tasks = [download_image(url) for url in url_list]    loop = asyncio.get_event_loop()    loop.run_until_complete( asyncio.wait(tasks) )

3.2.6 异步迭代器

什么是异步迭代器

实现了 __aiter__()__anext__() 方法的对象。__anext__ 必须返回一个 awaitable 对象。async for 会处理异步迭代器的 __anext__() 方法所返回的可等待对象,直到其引发一个 StopAsyncIteration 异常。由 PEP 492 引入。

什么是异步可迭代对象?

可在 async for 语句中被使用的对象。必须通过它的 __aiter__() 方法返回一个 asynchronous iterator。由 PEP 492 引入。

import asyncio
class Reader(object):    
    """ 自定义异步迭代器(同时也是异步可迭代对象) """  
    def __init__(self):     
        self.count = 0   
        
    async def readline(self):      
        # await asyncio.sleep(1)    
        self.count += 1       
        if self.count == 100:       
            return None     
        return self.count   
    
    def __aiter__(self):    
        return self  
    
    async def __anext__(self): 
        val = await self.readline()  
        if val == None:        
            raise StopAsyncIteration      
            return val
        
async def func():    
    # 创建异步可迭代对象   
    async_iter = Reader()  
    # async for不能单独使用,必须要放在async def函数内(随便创建个),否则语法错误。
    async for item in async_iter: 
        print(item)
        
asyncio.run(func())

异步迭代器其实没什么太大的作用,只是支持了async for语法而已。

3.2.6 异步上下文管理器

此种对象通过定义 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。由 PEP 492 引入。

import asyncio

class AsyncContextManager:    
    def __init__(self):      
        self.conn = None   
        
    async def do_something(self):    
        # 异步操作数据库       
        return 666    
        
    async def __aenter__(self):      
        # 异步“链接数据库”(最先执行)      
        self.conn = await asyncio.sleep(1) 
        return self   
    
    async def __aexit__(self, exc_type, exc, tb):      
        # 异步“关闭数据库链接”(最后执行)
        await asyncio.sleep(1)

async def func():   
    async with AsyncContextManager() as f:  # 不能单独使用也必须嵌套到async def
        result = await f.do_something()  # 等待拿到结果
        print(result)
        
asyncio.run( func() )

这个异步的上下文管理器还是比较有用的,平时在开发过程中执行 打开、处理、关闭 操作时,就可以用这种方式来处理。

3.3 小结

在程序中只要看到asyncawait关键字,其内部就是基于协程实现的异步编程,这种异步编程是通过一个线程在IO等待时间去执行其他任务,从而实现并发。

以上就是异步编程的常见操作,内容参考官方文档。

  • 中文版:https://docs.python.org/zh-cn/3.8/library/asyncio.html
  • 英文本:https://docs.python.org/3.8/library/asyncio.html

4.uvloop

Python标准库中提供了asyncio模块,用于支持基于协程的异步编程。

uvloop是 asyncio 中的事件循环的替代方案,替换后可以使得asyncio性能提高。事实上,uvloop要比nodejs、gevent等其他python异步框架至少要快2倍,性能可以比肩Go语言。

安装uvloop

pip3 install uvloop

在项目中想要使用uvloop替换asyncio的事件循环也非常简单,只要在代码中这么做就行。

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 编写asyncio的代码,与之前写的代码一致。

# 内部的事件循环自动化会变为uvloop
asyncio.run(...)

注意:知名的asgi uvicorn内部就是使用的uvloop的事件循环。

4.gevent异步编程

gevent是第三方库,通过 greenlet 实现 coroutine 创建、调度的开销比 线程(thread) 还小,因此程序内部的 执行流 效率高。

gevent 实现了 python 标准库中一些阻塞库的非阻塞版本,如 socket、os、select 等 (全部的可参考 gevent1.0 的 monkey.py 源码),可用这些非阻塞的库替代 python 标准库中的阻塞的库。

  • 创建greenlet对象

    采用 gevent.spawn() 的 API :将事件函数交给gevent,生成一个普通的Greenlet对象 greenlet对象,并置于调度队列中

    g1 = gevent.spawn(func1, 传给函数参数)  # 接收第一个参数是 函数名, 第二个参数是传入函数的参数
    
  • 执行

    join() 或 join_all() 函数会阻塞当前流程,并执行所有给定的greenlet对象,执行流程只会在所有greenlet执行完后才会继续向下走。

    # 主线程到 hub greenlet instance 的切换
    g1.join()  # 阻塞直到 g1(greenlet对象)执行完毕
    # 或者,多个greenlet对象合到一起:
    # gevent.joinall([g1, g2, g3])
    

4.线程池

4.1

如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

Exectuor 提供了如下常用方法:

  • submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。

程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。

Future 提供了如下方法:

  • cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
  • cancelled():返回 Future 代表的线程任务是否被成功取消。
  • running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
  • done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
  • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
  • exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
  • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。该回调函数会在线程任务结束时获取其返回值。

使用线程池来执行线程任务的步骤如下:

  1. 调用 ThreadPoolExecutor 类的构造器创建一个线程池。

  2. 定义一个普通函数作为线程任务。

  3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。

  4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。

    另外,由于线程池实现了上下文管理协议(Context Manage Protocol),因此,程序可以使用 with 语句来管理线程池,这样即可避免手动关闭线程池