一个使用 asyncio 协程的网络爬虫(三)

使用协程

我们将从描述爬虫如何工作开始。现在是时候用 asynio 去实现它了。

我们的爬虫从获取第一个网页开始,解析出链接并把它们加到队列中。此后它开始傲游整个网站,并发地获取网页。但是由于客户端和服务端的负载限制,我们希望有一个最大数目的运行的 worker,不能再多。任何时候一个 worker 完成一个网页的获取,它应该立即从队列中取出下一个链接。我们会遇到没有那么多事干的时候,所以一些 worker 必须能够暂停。一旦又有 worker 获取一个有很多链接的网页,队列会突增,暂停的 worker 立马被唤醒干活。最后,当任务完成后我们的程序必须马上退出。

假如你的 worker 是线程,怎样去描述你的爬虫算法?我们可以使用 Python 标准库中的同步队列。每次有新的一项加入,队列增加它的 “tasks” 计数器。线程 worker 完成一个任务后调用 task_done。主线程阻塞在Queue.join,直到“tasks”计数器与 task_done 调用次数相匹配,然后退出。

协程通过 asyncio 队列,使用和线程一样的模式来实现!首先我们导入它


  1. try:
  2. from asyncio import JoinableQueue as Queue
  3. except ImportError:
  4. # In Python 3.5, asyncio.JoinableQueue is
  5. # merged into Queue.
  6. from asyncio import Queue

我们把 worker 的共享状态收集在一个 crawler 类中,主要的逻辑写在 crawl 方法中。我们在一个协程中启动crawl,运行 asyncio 的事件循环直到 crawl 完成:


  1. loop = asyncio.get_event_loop()
  2. crawler = crawling.Crawler('http://xkcd.com',
  3. max_redirect=10)
  4. loop.run_until_complete(crawler.crawl())

crawler 用一个根 URL 和最大重定向数 max_redirect 来初始化,它把 (URL, max_redirect) 序对放入队列中。(为什么要这样做,请看下文)


  1. class Crawler:
  2. def __init__(self, root_url, max_redirect):
  3. self.max_tasks = 10
  4. self.max_redirect = max_redirect
  5. self.q = Queue()
  6. self.seen_urls = set()
  7. # aiohttp's ClientSession does connection pooling and
  8. # HTTP keep-alives for us.
  9. self.session = aiohttp.ClientSession(loop=loop)
  10. # Put (URL, max_redirect) in the queue.
  11. self.q.put((root_url, self.max_redirect))

现在队列中未完成的任务数是 1。回到我们的主程序,启动事件循环和 crawl 方法:


  1. loop.run_until_complete(crawler.crawl())

crawl 协程把 worker 们赶起来干活。它像一个主线程:阻塞在 join 上直到所有任务完成,同时 worker 们在后台运行。


  1. @asyncio.coroutine
  2. def crawl(self):
  3. """Run the crawler until all work is done."""
  4. workers = [asyncio.Task(self.work())
  5. for _ in range(self.max_tasks)]
  6. # When all work is done, exit.
  7. yield from self.q.join()
  8. for w in workers:
  9. w.cancel()

如果 worker 是线程,可能我们不会一次把它们全部创建出来。为了避免创建线程的昂贵代价,通常一个线程池会按需增长。但是协程很廉价,我们可以直接把他们全部创建出来。

怎么关闭这个 crawler 很有趣。当 join 完成,worker 存活但是被暂停:他们等待更多的 URL,所以主协程要在退出之前清除它们。否则 Python 解释器关闭并调用所有对象的析构函数时,活着的 worker 会哭喊到:


  1. ERROR:asyncio:Task was destroyed but it is pending!

cancel 又是如何工作的呢?生成器还有一个我们还没介绍的特点。你可以从外部抛一个异常给它:


  1. >>> gen = gen_fn()
  2. >>> gen.send(None) # Start the generator as usual.
  3. 1
  4. >>> gen.throw(Exception('error'))
  5. Traceback (most recent call last):
  6. File "<input>", line 3, in <module>
  7. File "<input>", line 2, in gen_fn
  8. Exception: error

生成器被 throw 恢复,但是它现在抛出一个异常。如过生成器的调用堆栈中没有捕获异常的代码,这个异常被传递到顶层。所以注销一个协程:


  1. # Method of Task class.
  2. def cancel(self):
  3. self.coro.throw(CancelledError)

任何时候生成器暂停,在某些 yield from 语句它恢复并且抛出一个异常。我们在 task 的 step 方法中处理注销。


  1. # Method of Task class.
  2. def step(self, future):
  3. try:
  4. next_future = self.coro.send(future.result)
  5. except CancelledError:
  6. self.cancelled = True
  7. return
  8. except StopIteration:
  9. return
  10. next_future.add_done_callback(self.step)

现在 task 知道它被注销了,所以当它被销毁时,它不再抱怨。

一旦 crawl 注销了 worker,它就退出。同时事件循环看见这个协程结束了(我们后面会见到的),也就退出。


  1. loop.run_until_complete(crawler.crawl())

crawl 方法包含了所有主协程需要做的事。而 worker 则完成从队列中获取 URL、获取网页、解析它们得到新的链接。每个 worker 独立地运行 work 协程:


  1. @asyncio.coroutine
  2. def work(self):
  3. while True:
  4. url, max_redirect = yield from self.q.get()
  5. # Download page and add new links to self.q.
  6. yield from self.fetch(url, max_redirect)
  7. self.q.task_done()

Python 看见这段代码包含 yield from 语句,就把它编译成生成器函数。所以在 crawl 方法中,我们调用了 10 次 self.work,但并没有真正执行,它仅仅创建了 10 个指向这段代码的生成器对象并把它们包装成 Task 对象。task 接收每个生成器所 yield 的 future,通过调用 send 方法,当 future 解决时,用 future 的结果做为 send 的参数,来驱动它。由于生成器有自己的栈帧,它们可以独立运行,带有独立的局部变量和指令指针。

worker 使用队列来协调其小伙伴。它这样等待新的 URL:


  1. url, max_redirect = yield from self.q.get()

队列的 get 方法自身也是一个协程,它一直暂停到有新的 URL 进入队列,然后恢复并返回该条目。

碰巧,这也是当主协程注销 worker 时,最后 crawl 停止,worker 协程暂停的地方。从协程的角度,yield from 抛出CancelledError 结束了它在循环中的最后旅程。

worker 获取一个网页,解析链接,把新的链接放入队列中,接着调用task_done减小计数器。最终一个worker遇到一个没有新链接的网页,并且队列里也没有任务,这次task_done的调用使计数器减为0,而crawl正阻塞在join方法上,现在它就可以结束了。

我们承诺过要解释为什么队列中要使用序对,像这样:


  1. # URL to fetch, and the number of redirects left.
  2. ('http://xkcd.com/353', 10)

新的 URL 的重定向次数是10。获取一个特别的 URL 会重定向一个新的位置。我们减小重定向次数,并把新的 URL 放入队列中。


  1. # URL with a trailing slash. Nine redirects left.
  2. ('http://xkcd.com/353/', 9)

我们使用的 aiohttp 默认会跟踪重定向并返回最终结果。但是,我们告诉它不要这样做,爬虫自己来处理重定向,以便它可以合并那些目的相同的重定向路径:如果我们已经在 self.seen_urls 看到一个 URL,说明它已经从其他的地方走过这条路了。

Figure 5.4 - Redirects

crawler 获取“foo”并发现它重定向到了“baz”,所以它会加“baz”到队列和 seen_urls 中。如果它获取的下一个页面“bar” 也重定向到“baz”,fetcher 不会再次将 “baz”加入到队列中。如果该响应是一个页面,而不是一个重定向,fetch 会解析它的链接,并把新链接放到队列中。


  1. @asyncio.coroutine
  2. def fetch(self, url, max_redirect):
  3. # Handle redirects ourselves.
  4. response = yield from self.session.get(
  5. url, allow_redirects=False)
  6. try:
  7. if is_redirect(response):
  8. if max_redirect > 0:
  9. next_url = response.headers['location']
  10. if next_url in self.seen_urls:
  11. # We have been down this path before.
  12. return
  13. # Remember we have seen this URL.
  14. self.seen_urls.add(next_url)
  15. # Follow the redirect. One less redirect remains.
  16. self.q.put_nowait((next_url, max_redirect - 1))
  17. else:
  18. links = yield from self.parse_links(response)
  19. # Python set-logic:
  20. for link in links.difference(self.seen_urls):
  21. self.q.put_nowait((link, self.max_redirect))
  22. self.seen_urls.update(links)
  23. finally:
  24. # Return connection to pool.
  25. yield from response.release()

如果这是多进程代码,就有可能遇到讨厌的竞争条件。比如,一个 worker 检查一个链接是否在 seen_urls中,如果没有它就把这个链接加到队列中并把它放到 seen_urls 中。如果它在这两步操作之间被中断,而另一个 worker 解析到相同的链接,发现它并没有出现在 seen_urls 中就把它加入队列中。这(至少)导致同样的链接在队列中出现两次,做了重复的工作和错误的统计。

然而,一个协程只在 yield from 时才会被中断。这是协程比多线程少遇到竞争条件的关键。多线程必须获得锁来明确的进入一个临界区,否则它就是可中断的。而 Python 的协程默认是不会被中断的,只有它明确 yield 时才主动放弃控制权。

我们不再需要在用回调方式时用的 fetcher 类了。这个类只是不高效回调的一个变通方法:在等待 I/O 时,它需要一个存储状态的地方,因为局部变量并不能在函数调用间保留。倒是 fetch 协程可以像普通函数一样用局部变量保存它的状态,所以我们不再需要一个类。

当 fetch 完成对服务器响应的处理,它返回到它的调用者 workwork 方法对队列调用 task_done,接着从队列中取出一个要获取的 URL。

当 fetch 把新的链接放入队列中,它增加未完成的任务计数器,并停留在主协程,主协程在等待 q.join,处于暂停状态。而当没有新的链接并且这是队列中最后一个 URL 时,当 work 调用task_done,任务计数器变为 0,主协程从join` 中退出。

与 worker 和主协程一起工作的队列代码像这样(实际的 asyncio.Queue 实现在 Future 所展示的地方使用asyncio.Event 。不同之处在于 Event 是可以重置的,而 Future 不能从已解决返回变成待决。)


  1. class Queue:
  2. def __init__(self):
  3. self._join_future = Future()
  4. self._unfinished_tasks = 0
  5. # ... other initialization ...
  6. def put_nowait(self, item):
  7. self._unfinished_tasks += 1
  8. # ... store the item ...
  9. def task_done(self):
  10. self._unfinished_tasks -= 1
  11. if self._unfinished_tasks == 0:
  12. self._join_future.set_result(None)
  13. @asyncio.coroutine
  14. def join(self):
  15. if self._unfinished_tasks > 0:
  16. yield from self._join_future

主协程 crawl yield from join。所以当最后一个 worker 把计数器减为 0,它告诉 crawl 恢复运行并结束。

旅程快要结束了。我们的程序从 crawl 调用开始:


  1. loop.run_until_complete(self.crawler.crawl())

程序如何结束?因为 crawl 是一个生成器函数,调用它返回一个生成器。为了驱动它,asyncio 把它包装成一个 task:


  1. class EventLoop:
  2. def run_until_complete(self, coro):
  3. """Run until the coroutine is done."""
  4. task = Task(coro)
  5. task.add_done_callback(stop_callback)
  6. try:
  7. self.run_forever()
  8. except StopError:
  9. pass
  10. class StopError(BaseException):
  11. """Raised to stop the event loop."""
  12. def stop_callback(future):
  13. raise StopError

当这个任务完成,它抛出 StopError,事件循环把这个异常当作正常退出的信号。

但是,task 的 add_done_callbock 和 result 方法又是什么呢?你可能认为 task 就像一个 future,不错,你的直觉是对的。我们必须承认一个向你隐藏的细节,task 是 future。


  1. class Task(Future):
  2. """A coroutine wrapped in a Future."""

通常,一个 future 被别人调用 set_result 解决。但是 task,当协程结束时,它自己解决自己。记得我们解释过当 Python 生成器返回时,它抛出一个特殊的 StopIteration 异常:


  1. # Method of class Task.
  2. def step(self, future):
  3. try:
  4. next_future = self.coro.send(future.result)
  5. except CancelledError:
  6. self.cancelled = True
  7. return
  8. except StopIteration as exc:
  9. # Task resolves itself with coro's return
  10. # value.
  11. self.set_result(exc.value)
  12. return
  13. next_future.add_done_callback(self.step)

所以当事件循环调用 task.add_done_callback(stop_callback),它就准备被这个 task 停止。在看一次run_until_complete


  1. # Method of event loop.
  2. def run_until_complete(self, coro):
  3. task = Task(coro)
  4. task.add_done_callback(stop_callback)
  5. try:
  6. self.run_forever()
  7. except StopError:
  8. pass

当 task 捕获 StopIteration 并解决自己,这个回调从循环中抛出 StopError。循环结束,调用栈回到run_until_complete。我们的程序结束。

总结

现代的程序越来越多是 I/O 密集型而不是 CPU 密集型。对于这样的程序,Python 的线程在两个方面不合适:全局解释器锁阻止真正的并行计算,并且抢占切换也导致他们更容易出现竞争。异步通常是正确的选择。但是随着基于回调的异步代码增加,它会变得非常混乱。协程是一个更整洁的替代者。它们自然地重构成子过程,有健全的异常处理和栈追溯。

如果我们换个角度看 yield from 语句,一个协程看起来像一个传统的做阻塞 I/O 的线程。甚至我们可以采用经典的多线程模式编程,不需要重新发明。因此,与回调相比,协程更适合有经验的多线程的编码者。

但是当我们睁开眼睛关注 yield from 语句,我们能看到协程放弃控制权、允许其它人运行的标志点。不像多线程,协程展示出我们的代码哪里可以被中断哪里不能。在 Glyph Lefkowitz 富有启发性的文章“Unyielding”:“线程让局部推理变得困难,然而局部推理可能是软件开发中最重要的事”。然而,明确的 yield,让“通过过程本身而不是整个系统理解它的行为(和因此、正确性)”成为可能。

这章写于 Python 和异步的复兴时期。你刚学到的基于生成器的的协程,在 2014 年发布在 Python 3.4 的 asyncio 模块中。2015 年 9 月,Python 3.5 发布,协程成为语言的一部分。这个原生的协程通过“async def”来声明, 使用“await”而不是“yield from”委托一个协程或者等待 Future。

除了这些优点,核心的思想不变。Python 新的原生协程与生成器只是在语法上不同,工作原理非常相似。事实上,在 Python 解释器中它们共用同一个实现方法。Task、Future 和事件循环在 asynico 中扮演着同样的角色。

你已经知道 asyncio 协程是如何工作的了,现在你可以忘记大部分的细节。这些机制隐藏在一个整洁的接口下。但是你对这基本原理的理解能让你在现代异步环境下正确而高效的编写代码。

原文发布时间为:2017-03-06

本文来自合作伙伴“Linux中国”

时间: 2024-11-03 22:22:51

一个使用 asyncio 协程的网络爬虫(三)的相关文章

一个使用 asyncio 协程的网络爬虫(二)

协程 还记得我们对你许下的承诺么?我们可以写出这样的异步代码,它既有回调方式的高效,也有多线程代码的简洁.这个结合是同过一种称为协程coroutine的模式来实现的.使用 Python3.4 标准库 asyncio 和一个叫"aiohttp"的包,在协程中获取一个网页是非常直接的( @asyncio.coroutine 修饰符并非魔法.事实上,如果它修饰的是一个生成器函数,并且没有设置 PYTHONASYNCIODEBUG 环境变量的话,这个修饰符基本上没啥用.它只是为了框架的其它部分

一个使用 asyncio 协程的网络爬虫(一)

介绍 经典的计算机科学强调高效的算法,尽可能快地完成计算.但是很多网络程序的时间并不是消耗在计算上,而是在等待许多慢速的连接或者低频事件的发生.这些程序暴露出一个新的挑战:如何高效的等待大量网络事件.一个现代的解决方案是异步 I/O. 这一章我们将实现一个简单的网络爬虫.这个爬虫只是一个原型式的异步应用,因为它等待许多响应而只做少量的计算.一次爬的网页越多,它就能越快的完成任务.如果它为每个动态的请求启动一个线程的话,随着并发请求数量的增加,它会在耗尽套接字之前,耗尽内存或者线程相关的资源.使用

基于asyncio 异步协程框架实现收集B站直播弹幕_python

前言 虽然标题是全站,但目前只做了等级 top 100 直播间的全天弹幕收集. 弹幕收集系统基于之前的B 站直播弹幕姬 Python 版修改而来.具体协议分析可以看上一篇文章. 直播弹幕协议是直接基于 TCP 协议,所以如果 B 站对类似我这种行为做反制措施,比较困难.应该有我不知道的技术手段来检测类似我这种恶意行为. 我试过同时连接 100 个房间,和连接单个房间 100 次的实验,都没有问题.>150 会被关闭链接. 直播间的选取 现在弹幕收集系统在选取直播间上比较简单,直接选取了等级 to

golang 裸写一个pool池控制协程的大小

这几天深入的研究了一下golang 的协程,读了一个好文 http://mp.weixin.qq.com/s?__biz=MjM5OTcxMzE0MQ==&mid=2653369770&idx=1&sn=044be64c577a11a9a13447b373e80082&chksm=bce4d5b08b935ca6ad59abb5cc733a341a5126fefc0e6600bd61c959969c5f77c95fbfb909e3&mpshare=1&sce

Python3.5 协程原理

本文讲的是Python3.5 协程原理, 作为 Python 核心开发者之一,让我很想了解这门语言是如何运作的.我发现总有一些阴暗的角落我对其中错综复杂的细节不是很清楚,但是为了能够有助于 Python 的一些问题和其整体设计,我觉得我应该试着去理解 Python 的核心语法和内部运作机制. 但是直到最近我才理解 Python 3.5 中 async/await 的原理.我知道 Python 3.3 中的 yield from 和 Python 3.4 中的asyncio 组合得来这一新语法.但

介绍Python的Tornado框架中的协程异步实现原理

  介绍Python的Tornado框架中的协程异步实现原理        这篇文章主要介绍了简单介绍Python的Tornado框架中的协程异步实现原理,作者基于Python的生成器讲述了Tornado异步的特点,需要的朋友可以参考下 Tornado 4.0 已经发布了很长一段时间了, 新版本广泛的应用了协程(Future)特性. 我们目前已经将 Tornado 升级到最新版本, 而且也大量的使用协程特性. 很长时间没有更新博客, 今天就简单介绍下 Tornado 协程实现原理, Tornad

简单介绍Python的Tornado框架中的协程异步实现原理_python

Tornado 4.0 已经发布了很长一段时间了, 新版本广泛的应用了协程(Future)特性. 我们目前已经将 Tornado 升级到最新版本, 而且也大量的使用协程特性. 很长时间没有更新博客, 今天就简单介绍下 Tornado 协程实现原理, Tornado 的协程是基于 Python 的生成器实现的, 所以首先来回顾下生成器.生成器 Python 的生成器可以保存执行状态 并在下次调用的时候恢复, 通过在函数体内使用 yield 关键字 来创建一个生成器, 通过内置函数 next 或生成

基于Python生成器的Tornado协程异步

Tornado 4.0 已经发布了很长一段时间了, 新版本广泛的应用了协程(Future)特性. 我们目前已经将 Tornado 升级到最新版本, 而且也大量的使用协程特性. 很长时间没有更新博客, 今天就简单介绍下 Tornado 协程实现原理, Tornado 的协程是基于 Python 的生成器实现的, 所以首先来回顾下生成器. 生成器 Python 的生成器可以保存执行状态 并在下次调用的时候恢复, 通过在函数体内使用 yield 关键字 来创建一个生成器, 通过内置函数 next 或生

理解Python的协程机制-Yield

根据PEP-0342 Coroutines via Enhanced Generators,原来仅仅用于生成器的yield关键字被扩展,成为Python协程实现的一部分.而之所以使用协程,主要是出于性能的考虑:一个活跃的Python线程大约占据8MB内存,而一个活跃线程只使用1KB不到内存.对于IO密集型的应用,显然轻量化的协程更适用. 概述 原来,yield是一个statement,即和return一样的语句,但是在PEP-0342后,yield statement被改造为了yield exp