本篇主要是asyncio这个包的使用,如何使用协程,以及协程和线程可以怎么配合使用,得到更加的使用效果

1. 定义协程

通过 async 来定义协程,并将协程丢到事件循环中执行,常用套路如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
import time

now = lambda: time.time()


# 定义一个协程,丢到事件循环中执行
async def do_some_job(x):
print("do some : ", x)
time.sleep(1)


print("------- start coroutine ----------")

start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_job(2))
print("end cost: ", now() - start)

说明

  • 通过关键字async修饰函数,然后直接调用函数的方式,返回的就是一个协程coroutine
  • 协程需要丢到事件循环中执行,通过 asyncio.get_event_loop() 获取事件循环 loop
  • 执行协程 loop.run_until_complete(coroutine)

上面执行的输出如下

1
2
3
------- start coroutine ----------
do some : 2
end cost: 1.0031189918518066

2. 返回结果

上面的函数如果有返回结果,丢到事件循环中执行后,可以怎么获取返回呢?

case1:直接使用方式

直接获取loop.run_until_complete(xxx)返回即可,ans就是协程返回的value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
import time

now = lambda: time.time()


# 定义一个协程,丢到事件循环中执行
async def do_some_job(x):
print("do some : ", x)
time.sleep(1)
return x * x


print("------- start coroutine ----------")
start = now()
loop = asyncio.get_event_loop()
ans = loop.run_until_complete(do_some_job(2))
print("end cost: ", now() - start, ans)

case2:使用任务

另外一种方式就是使用任务,任务还有一种更强的方式就是绑定回调函数,当协程处理完毕之后,将结果丢到回调函数中,继续执行某些操作

一个简单的case如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
print("------- start callback ----------")


async def do_some_return_job(data):
print('do return data: ', data)
return data * 2


def callback(future):
print("do callback: ", future.result())


# 创建一个task,并绑定返回结果的回调
start = now()
task = asyncio.ensure_future(do_some_return_job(3))
task.add_done_callback(callback)
ans = loop.run_until_complete(task)
print("end callback cost: ", now() - start, ans)

这种使用场景也比较经典,步骤如下

  • 使用asyncio.ensure_future(coroutine)来创建一个task任务
  • 为task任务绑定回调task.add_done_callback(callback)
  • 将task丢到事件回调中执行

3. 多任务执行

前面两个只能算是演示使用协程的方式,但实际上因为只有一个task,所以并不能体现协程的并行优势;下面开始演示下多个任务的情况

在这种场景下,我们执行的任务加一个耗时的等待,模拟让出cpu给其他线程执行的case

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 函数内部通过调用 `asyncio.sleep(1)` + `await` 来表示现在执行耗时的操作了,让出cpu
async def do_some_wait_job(data):
print("do wait job:", data)
await asyncio.sleep(1)
return data * 2

print("------- start 多任务并行 ---------")

start = now()
tasks = [
asyncio.ensure_future(do_some_wait_job(2)),
asyncio.ensure_future(do_some_wait_job(3)),
asyncio.ensure_future(do_some_wait_job(4)),
]
# await方式,返回的结果通过task进行获取
# loop.run_until_complete(asyncio.wait(tasks))
# for task in tasks:
# print('task return : ', task.result())
# print("multi task cost : ", now() - start)

# gather 方式直接获取返回的结果
start = now()
ans = loop.run_until_complete(asyncio.gather(*tasks))
for v in ans:
print('gather task return: ', v)
print("multi2 task cost : ", now() - start)

多个任务执行中,首先当然也是创建task任务列表,然后将所有的任务列表丢到事件循环中即可

  • 定义task列表 [ asyncio.ensure_futre(xxx)...]
  • 执行所有的任务
    • loop.run_until_complete(asyncio.wait(tasks)) 获取任务返回的结果就需要通过task#result来获取
    • loop.run_until_complete(asyncio.gather(*tasks)) 直接返回任务执行的结果

上面代码执行之后输出如下

1
2
3
4
5
6
7
8
------- start 多任务并行 ---------
do wait job: 2
do wait job: 3
do wait job: 4
gather task return: 4
gather task return: 6
gather task return: 8
multi2 task cost : 1.0029327869415283

根据上面的输出结果,可以看到三个任务总共执行1s多一点,如果串行那么最少要3s

4. 线程+协程使用

我们考虑将定义的协程任务放在一个子线程中执行,然后主线程就负责往子线程中注册协程,这样就可以实现子线程负责干活,主线程则负责分配任务的功能,这种case可以如何处理呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import threading
import time

'''
主线程中创建一个new_loop,然后在另外的子线程中开启一个无限事件循环

然后在主线程中通过`run_coroutine_threadsafe`来注册协程对象,子线程运行时间循环,主线程也不会被阻塞
'''


def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()


async def do_some_work(x):
print("do some work: ", x)
await asyncio.sleep(1)
print("do next work: ", x)
return x * x


now = lambda: time.time()

new_loop = asyncio.new_event_loop()

# 创建线程,在线程中开启事件循环
t = threading.Thread(target=start_loop, args=(new_loop,), name='loopThread')
t.start()

start = now()
result = asyncio.run_coroutine_threadsafe(do_some_work(3), new_loop)
print("haha---->", now() - start, result)

result = asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
print("haha---->", now() - start)
print("end", now(), ' cost ', now() - start, result.result())

上面需要注意的几点是

  • 创建一个新的事件循环, asyncio.new_event_loop()
  • 主线程注册协程的逻辑 asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop), 两个参数,第一个为协程,第二个为前面创建的时间循环
  • 如果需要获取返回的结果,则根据返回的 result() 函数来get

II. 其他

1. 一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

2. 声明

尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

3. 扫描关注

一灰灰blog

QrCode

知识星球

goals