基于asyncio 异步协程框架实现收集B站直播弹幕_python

前言

虽然标题是全站,但目前只做了等级 top 100 直播间的全天弹幕收集。

弹幕收集系统基于之前的B 站直播弹幕姬 Python 版修改而来。具体协议分析可以看上一篇文章。

直播弹幕协议是直接基于 TCP 协议,所以如果 B 站对类似我这种行为做反制措施,比较困难。应该有我不知道的技术手段来检测类似我这种恶意行为。

我试过同时连接 100 个房间,和连接单个房间 100 次的实验,都没有问题。>150 会被关闭链接。

直播间的选取

现在弹幕收集系统在选取直播间上比较简单,直接选取了等级 top100。

以后会修改这部分,改成定时去 http://live.bilibili.com/all 查看新开播的直播间,并动态添加任务。

异步任务和弹幕存储

收集系统仍旧使用了 asyncio 异步协程框架,对于每一个直播间都使用如下方法来加进 loop 中。

danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
task1 = asyncio.ensure_future(danmuji.connectServer())
task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())

其实若将心跳任务 HeartbeatLoop 放入 connectorServer 中去启动,代码看起来更优雅一些。但这么做是因为我需要维护一个任务列表,后面会有描述。

在弹幕存储上我花了些时间选择。

数据库存储是一个同步 IO 的过程,Insert 的时候会阻塞弹幕收集的任务。虽然有 aiomysql 这种异步接口,但配置数据库太麻烦,我的设想是这个小系统能够方便地部署。

最终我选择使用自带的 sqlite3。但 sqlite3 无法做并行操作,故开了一个线程单独进行数据库存储。在另一个线程中,100 * 2 个任务搜集所有的弹幕、人数信息,并塞进队列 commentq, numq 中。存储线程每隔 10s 唤醒一次,将队列中的数据写进 sqlite3 中,并清空队列。

在多线程和异步的配合下,网络流量没有被阻塞。

可能的连接失败场景处理

弹幕协议是直接基于 TCP,位与位直接关联性较强,一旦解析错误,很容易就抛 Exception(个人感觉,虽然 TCP 是可靠传输,但B站服务器自身发生错误也是有可能的)。所以有必要设计一个自动重连机制。

在 asyncio 文档中提到,

Done means either that a result / exception are available, or that the future was cancelled.

函数正常返回、抛出异常或者是被 cancel,都会退出当前任务。可以使用 done() 来判断。

每一个直播间对应两个任务,解析任务是最容易挂的,但并不会影响心跳任务,所以必须找出并将对应心跳任务结束。
在创建任务的时候使用字典记录每个房间的两个任务,

self.tasks[url] = [task1, task2]

在运行过程中,每隔 10s 做一次检查,

for url in self.tasks:
  item = self.tasks[url]
  task1 = item[0]
  task2 = item[1]
  if task1.done() == True or task2.done() == True:
    if task1.done() == False:
      task1.cancel()
    if task2.done() == False:
      task2.cancel()
    danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
    task11 = asyncio.ensure_future(danmuji.connectServer())
    task22 = asyncio.ensure_future(danmuji.HeartbeatLoop())
    self.tasks[url] = [task11, task22]

实际我只见过一次任务失败的场景,是因为主播房间被封了,导致无法进入直播间。

结论

  1. B站人数是按照连接弹幕服务器的链接数量统计的。通过操纵链接量,可以瞬间增加任意人数观看,有商机?
  2. 运行的这几天中,发现即使大部分房间不在直播,也能有 >5 的人数,包括凌晨。我只能猜测也有和我一样的人在 24h 收集弹幕。
  3. top100 平均一天 40M 弹幕数据。
  4. 收集的弹幕能做什么?还没想好,可能可以拿来做用户行为分析 -_^

最后附上本源码的GITHUB地址 https://github.com/lyyyuna/bilibili_danmu_colloector

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索python
, asyncio
, 异步协程框架
Python收集弹幕
python asyncio 协程、asyncio 异步http、协程 异步、lua 协程 异步、协程和异步性能比较,以便于您获取更多的相关知识。

时间: 2024-09-20 19:35:43

基于asyncio 异步协程框架实现收集B站直播弹幕_python的相关文章

Java协程框架kilim碰到的问题,求解答,万分感谢!

问题描述 Java协程框架kilim碰到的问题,求解答,万分感谢! 最近看到kilim框架提供的java的协程功能,便想运行一下,但是每次都报错,运行不起来. 报错如下图: 查看的文档和链接是http://blog.csdn.net/chenyi8888/article/details/7047099 求各位帮忙解答一下这个报错,或者有什么解决方案,万分感谢了! 解决方案 Java协程框架--Kilim常见问题解答

PHP实现协程

在服务器编程当中,为了实现异步,经常性的需要回调函数,例如以下这段代码 function send($value) { $data = process($value); onReceive($data); } function onReceive($recv_value) { var_dump($recv_value); } function process($value) { return $value+1; } $send_value = 1; send($send_value); 实现的东

一个使用 asyncio 协程的网络爬虫(二)

协程 还记得我们对你许下的承诺么?我们可以写出这样的异步代码,它既有回调方式的高效,也有多线程代码的简洁.这个结合是同过一种称为协程coroutine的模式来实现的.使用 Python3.4 标准库 asyncio 和一个叫"aiohttp"的包,在协程中获取一个网页是非常直接的( @asyncio.coroutine 修饰符并非魔法.事实上,如果它修饰的是一个生成器函数,并且没有设置 PYTHONASYNCIODEBUG 环境变量的话,这个修饰符基本上没啥用.它只是为了框架的其它部分

Python3.5 协程原理

本文讲的是Python3.5 协程原理, 作为 Python 核心开发者之一,让我很想了解这门语言是如何运作的.我发现总有一些阴暗的角落我对其中错综复杂的细节不是很清楚,但是为了能够有助于 Python 的一些问题和其整体设计,我觉得我应该试着去理解 Python 的核心语法和内部运作机制. 但是直到最近我才理解 Python 3.5 中 async/await 的原理.我知道 Python 3.3 中的 yield from 和 Python 3.4 中的asyncio 组合得来这一新语法.但

[译]C语言协程

C语言协程 by Simon Tatham 原文链接:http://www.chiark.greenend.org.uk/~sgtatham/coroutines.html 引言 为大型程序设计一个良好的结构通常是一件困难的事情.其中一个经常出现的问题是:如果你有一段代码产生数据,另一段代码消费数据,那么谁应该作为调用者,谁应该作为被调用者? 下面是一段很简单的Run-Length(游程编码)解压缩代码(Decompressor): /* Decompression code */ while

介绍Python的Tornado框架中的协程异步实现原理

  介绍Python的Tornado框架中的协程异步实现原理        这篇文章主要介绍了简单介绍Python的Tornado框架中的协程异步实现原理,作者基于Python的生成器讲述了Tornado异步的特点,需要的朋友可以参考下 Tornado 4.0 已经发布了很长一段时间了, 新版本广泛的应用了协程(Future)特性. 我们目前已经将 Tornado 升级到最新版本, 而且也大量的使用协程特性. 很长时间没有更新博客, 今天就简单介绍下 Tornado 协程实现原理, Tornad

简单介绍Python的Tornado框架中的协程异步实现原理_python

Tornado 4.0 已经发布了很长一段时间了, 新版本广泛的应用了协程(Future)特性. 我们目前已经将 Tornado 升级到最新版本, 而且也大量的使用协程特性. 很长时间没有更新博客, 今天就简单介绍下 Tornado 协程实现原理, Tornado 的协程是基于 Python 的生成器实现的, 所以首先来回顾下生成器.生成器 Python 的生成器可以保存执行状态 并在下次调用的时候恢复, 通过在函数体内使用 yield 关键字 来创建一个生成器, 通过内置函数 next 或生成

基于Python生成器的Tornado协程异步

Tornado 4.0 已经发布了很长一段时间了, 新版本广泛的应用了协程(Future)特性. 我们目前已经将 Tornado 升级到最新版本, 而且也大量的使用协程特性. 很长时间没有更新博客, 今天就简单介绍下 Tornado 协程实现原理, Tornado 的协程是基于 Python 的生成器实现的, 所以首先来回顾下生成器. 生成器 Python 的生成器可以保存执行状态 并在下次调用的时候恢复, 通过在函数体内使用 yield 关键字 来创建一个生成器, 通过内置函数 next 或生

一个使用 asyncio 协程的网络爬虫(三)

使用协程 我们将从描述爬虫如何工作开始.现在是时候用 asynio 去实现它了. 我们的爬虫从获取第一个网页开始,解析出链接并把它们加到队列中.此后它开始傲游整个网站,并发地获取网页.但是由于客户端和服务端的负载限制,我们希望有一个最大数目的运行的 worker,不能再多.任何时候一个 worker 完成一个网页的获取,它应该立即从队列中取出下一个链接.我们会遇到没有那么多事干的时候,所以一些 worker 必须能够暂停.一旦又有 worker 获取一个有很多链接的网页,队列会突增,暂停的 wo