今天看啥  ›  专栏  ›  Python搬运工

Python并发编程之实战异步IO框架:asyncio 下篇(十一)

Python搬运工  · 掘金  ·  · 2018-11-08 07:15

大家好,并发编程 进入第十一章。

前面两节,我们讲了协程中的单任务多任务

这节我们将通过一个小实战,来对这些内容进行巩固。


在实战中,将会用到以下知识点:

  • 多线程的基本使用
  • Queue消息队列的使用
  • Redis的基本使用
  • asyncio的使用

. 动态添加协程

在实战之前,我们要先了解下在asyncio中如何将协程态添加到事件循环中的。这是前提。

如何实现呢,有两种方法:

  • 主线程是同步
import timeimport asynciofrom queue import Queuefrom threading import Threaddef start_loop(loop):    # 一个在后台永远运行的事件循环    asyncio.set_event_loop(loop)    loop.run_forever()def do_sleep(x, queue, msg=""):    time.sleep(x)    queue.put(msg)queue = Queue()new_loop = asyncio.new_event_loop()# 定义一个线程,并传入一个事件循环对象t = Thread(target=start_loop, args=(new_loop,))t.start()print(time.ctime())# 动态添加两个协程# 这种方法,在主线程是同步的new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一个")new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二个")while True:    msg = queue.get()    print("{} 协程运行完..".format(msg))    print(time.ctime())

由于是同步的,所以总共耗时6+3=9秒.

输出结果

Thu May 31 22:11:16 2018第一个 协程运行完..Thu May 31 22:11:22 2018第二个 协程运行完..Thu May 31 22:11:25 2018
  • 主线程是异步的,这是重点,一定要掌握。。
import timeimport asynciofrom queue import Queuefrom threading import Threaddef start_loop(loop):    # 一个在后台永远运行的事件循环    asyncio.set_event_loop(loop)    loop.run_forever()async def do_sleep(x, queue, msg=""):    await asyncio.sleep(x)    queue.put(msg)queue = Queue()new_loop = asyncio.new_event_loop()# 定义一个线程,并传入一个事件循环对象t = Thread(target=start_loop, args=(new_loop,))t.start()print(time.ctime())# 动态添加两个协程# 这种方法,在主线程是异步的asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一个"), new_loop)asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二个"), new_loop)while True:    msg = queue.get()    print("{} 协程运行完..".format(msg))    print(time.ctime())

输出结果

由于是同步的,所以总共耗时max(6, 3)=6

Thu May 31 22:23:35 2018第二个 协程运行完..Thu May 31 22:23:38 2018第一个 协程运行完..Thu May 31 22:23:41 2018

. 实战:利用redis实现动态添加任务

对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。

为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。

先安装Redis
到 https://github.com/MicrosoftArchive/redis/releases 下载

解压到你的路径。

然后,在当前路径运行cmd,运行redis的服务端。

服务开启后,我们就可以运行我们的客户端了。
并依次输入key=queue,value=5,3,1的消息。

一切准备就绪之后,我们就可以运行我们的代码了。

import timeimport redisimport asynciofrom queue import Queuefrom threading import Threaddef start_loop(loop):    # 一个在后台永远运行的事件循环    asyncio.set_event_loop(loop)    loop.run_forever()async def do_sleep(x, queue):    await asyncio.sleep(x)    queue.put("ok")def get_redis():    connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0)    return redis.Redis(connection_pool=connection_pool)def consumer():    while True:        task = rcon.rpop("queue")        if not task:            time.sleep(1)            continue        asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)if __name__ == '__main__':    print(time.ctime())    new_loop = asyncio.new_event_loop()    # 定义一个线程,运行一个事件循环对象,用于实时接收新任务    loop_thread = Thread(target=start_loop, args=(new_loop,))    loop_thread.setDaemon(True)    loop_thread.start()    # 创建redis连接    rcon = get_redis()    queue = Queue()    # 子线程:用于消费队列消息,并实时往事件对象容器中添加新任务    consumer_thread = Thread(target=consumer)    consumer_thread.setDaemon(True)    consumer_thread.start()    while True:        msg = queue.get()        print("协程运行完..")        print("当前时间:", time.ctime())

稍微讲下代码

loop_thread:单独的线程,运行着一个事件对象容器,用于实时接收新任务。
consumer_thread:单独的线程,实时接收来自Redis的消息队列,并实时往事件对象容器中添加新任务。

输出结果

Thu May 31 23:42:48 2018协程运行完..当前时间: Thu May 31 23:42:49 2018协程运行完..当前时间: Thu May 31 23:42:51 2018协程运行完..当前时间: Thu May 31 23:42:53 2018

我们在Redis,分别发起了5s3s1s的任务。
从结果来看,这三个任务,确实是并发执行的,1s的任务最先结束,三个任务完成总耗时5s

运行后,程序是一直运行在后台的,我们每一次在Redis中输入新值,都会触发新任务的执行。。


好了,经过这个实战内容,你应该对asyncio的实际应用有了一个更深刻的认识了,至此,你已经可以使用asyncio来实现任务的并发。快去体验一下。如果有什么疑问,请在后台加我微信与我联系。。


快关注一下,成为Python高手
快关注一下,成为Python高手



原文地址:访问原文地址
快照地址: 访问文章快照