Python实现并行抓取整站40万条房价数据(可更换抓取城市)_python

写在前面

这次的爬虫是关于房价信息的抓取,目的在于练习10万以上的数据处理及整站式抓取。

数据量的提升最直观的感觉便是对函数逻辑要求的提高,针对Python的特性,谨慎的选择数据结构。以往小数据量的抓取,即使函数逻辑部分重复,I/O请求频率密集,循环套嵌过深,也不过是1~2s的差别,而随着数据规模的提高,这1~2s的差别就有可能扩展成为1~2h。

因此对于要抓取数据量较多的网站,可以从两方面着手降低抓取信息的时间成本。

1)优化函数逻辑,选择适当的数据结构,符合Pythonic的编程习惯。例如,字符串的合并,使用join()要比“+”节省内存空间。

2)依据I/O密集与CPU密集,选择多线程、多进程并行的执行方式,提高执行效率。

一、获取索引

包装请求request,设置超时timeout

# 获取列表页面
def get_page(url):
  headers = {
    'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
           r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
    'Referer': r'http://bj.fangjia.com/ershoufang/',
    'Host': r'bj.fangjia.com',
    'Connection': 'keep-alive'
  }
  timeout = 60
  socket.setdefaulttimeout(timeout) # 设置超时
  req = request.Request(url, headers=headers)
  response = request.urlopen(req).read()
  page = response.decode('utf-8')
  return page

一级位置:区域信息

二级位置:板块信息(根据区域位置得到板块信息,以key_value对的形式存储在dict中)

以dict方式存储,可以快速的查询到所要查找的目标。-> {'朝阳':{'工体','安贞','健翔桥'......}}

三级位置:地铁信息(搜索地铁周边房源信息)

将所属位置地铁信息,添加至dict中。  -> {'朝阳':{'工体':{'5号线','10号线' , '13号线'},'安贞','健翔桥'......}}

对应的url:http://bj.fangjia.com/ershoufang/--r-%E6%9C%9D%E9%98%B3%7Cw-5%E5%8F%B7%E7%BA%BF%7Cb-%E6%83%A0%E6%96%B0%E8%A5%BF%E8%A1%97

解码后的url:http://bj.fangjia.com/ershoufang/--r-朝阳|w-5号线|b-惠新西街

根据url的参数模式,可以有两种方式获取目的url:

1)根据索引路径获得目的url

# 获取房源信息列表(嵌套字典遍历)
def get_info_list(search_dict, layer, tmp_list, search_list):
  layer += 1 # 设置字典层级
  for i in range(len(search_dict)):
    tmp_key = list(search_dict.keys())[i] # 提取当前字典层级key
    tmp_list.append(tmp_key)  # 将当前key值作为索引添加至tmp_list
    tmp_value = search_dict[tmp_key]
    if isinstance(tmp_value, str):  # 当键值为url时
      tmp_list.append(tmp_value)  # 将url添加至tmp_list
      search_list.append(copy.deepcopy(tmp_list))  # 将tmp_list索引url添加至search_list
      tmp_list = tmp_list[:layer] # 根据层级保留索引
    elif tmp_value == '':  # 键值为空时跳过
      layer -= 2      # 跳出键值层级
      tmp_list = tmp_list[:layer]  # 根据层级保留索引
    else:
      get_info_list(tmp_value, layer, tmp_list, search_list) # 当键值为列表时,迭代遍历
      tmp_list = tmp_list[:layer]
  return search_list

2)根据dict信息包装url

 {'朝阳':{'工体':{'5号线'}}}

参数:

——  r-朝阳

——  b-工体

——  w-5号线

组装参数:http://bj.fangjia.com/ershoufang/--r-朝阳|w-5号线|b-工体

1 # 根据参数创建组合url
2 def get_compose_url(compose_tmp_url, tag_args, key_args):
3   compose_tmp_url_list = [compose_tmp_url, '|' if tag_args != 'r-' else '', tag_args, parse.quote(key_args), ]
4   compose_url = ''.join(compose_tmp_url_list)
5   return compose_url

二、获取索引页最大页数

# 获取当前索引页面页数的url列表
def get_info_pn_list(search_list):
  fin_search_list = []
  for i in range(len(search_list)):
    print('>>>正在抓取%s' % search_list[i][:3])
    search_url = search_list[i][3]
    try:
      page = get_page(search_url)
    except:
      print('获取页面超时')
      continue
    soup = BS(page, 'lxml')
    # 获取最大页数
    pn_num = soup.select('span[class="mr5"]')[0].get_text()
    rule = re.compile(r'\d+')
    max_pn = int(rule.findall(pn_num)[1])
    # 组装url
    for pn in range(1, max_pn+1):
      print('************************正在抓取%s页************************' % pn)
      pn_rule = re.compile('[|]')
      fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1)
      tmp_url_list = copy.deepcopy(search_list[i][:3])
      tmp_url_list.append(fin_url)
      fin_search_list.append(tmp_url_list)
  return fin_search_list

三、抓取房源信息Tag

这是我们要抓取的Tag:

['区域', '板块', '地铁', '标题', '位置', '平米', '户型', '楼层', '总价', '单位平米价格']

# 获取tag信息
def get_info(fin_search_list, process_i):
  print('进程%s开始' % process_i)
  fin_info_list = []
  for i in range(len(fin_search_list)):
    url = fin_search_list[i][3]
    try:
      page = get_page(url)
    except:
      print('获取tag超时')
      continue
    soup = BS(page, 'lxml')
    title_list = soup.select('a[class="h_name"]')
    address_list = soup.select('span[class="address]')
    attr_list = soup.select('span[class="attribute"]')
    price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"}) # select对于某些属性值(属性值中间包含空格)无法识别,可以用find_all(attrs={})代替
    for num in range(20):
      tag_tmp_list = []
      try:
        title = title_list[num].attrs["title"]
        print(r'************************正在获取%s************************' % title)
        address = re.sub('\n', '', address_list[num].get_text())
        area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)
        layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
        floor = re.search('\d/\d', attr_list[num].get_text()).group(0)
        price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
        unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
        tag_tmp_list = copy.deepcopy(fin_search_list[i][:3])
        for tag in [title, address, area, layout, floor, price, unit_price]:
          tag_tmp_list.append(tag)
        fin_info_list.append(tag_tmp_list)
      except:
        print('【抓取失败】')
        continue
  print('进程%s结束' % process_i)
  return fin_info_list

四、分配任务,并行抓取

对任务列表进行分片,设置进程池,并行抓取。

# 分配任务
def assignment_search_list(fin_search_list, project_num): # project_num每个进程包含的任务数,数值越小,进程数越多
  assignment_list = []
  fin_search_list_len = len(fin_search_list)
  for i in range(0, fin_search_list_len, project_num):
    start = i
    end = i+project_num
    assignment_list.append(fin_search_list[start: end]) # 获取列表碎片
  return assignment_list
  p = Pool(4) # 设置进程池
  assignment_list = assignment_search_list(fin_info_pn_list, 3) # 分配任务,用于多进程
  result = [] # 多进程结果列表
  for i in range(len(assignment_list)):
    result.append(p.apply_async(get_info, args=(assignment_list[i], i)))
  p.close()
  p.join()
  for result_i in range(len(result)):
    fin_info_result_list = result[result_i].get()
    fin_save_list.extend(fin_info_result_list) # 将各个进程获得的列表合并

通过设置进程池并行抓取,时间缩短为单进程抓取时间的3/1,总计时间3h。

电脑为4核,经过测试,任务数为3时,在当前电脑运行效率最高。

五、将抓取结果存储到excel中,等待可视化数据化处理

# 存储抓取结果
def save_excel(fin_info_list, file_name):
  tag_name = ['区域', '板块', '地铁', '标题', '位置', '平米', '户型', '楼层', '总价', '单位平米价格']
  book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name) # 默认存储在桌面上
  tmp = book.add_worksheet()
  row_num = len(fin_info_list)
  for i in range(1, row_num):
    if i == 1:
      tag_pos = 'A%s' % i
      tmp.write_row(tag_pos, tag_name)
    else:
      con_pos = 'A%s' % i
      content = fin_info_list[i-1] # -1是因为被表格的表头所占
      tmp.write_row(con_pos, content)
  book.close()

附上源码

#! -*-coding:utf-8-*-
# Function: 房价调查
# Author:蘭兹
from urllib import parse, request
from bs4 import BeautifulSoup as BS
from multiprocessing import Pool
import re
import lxml
import datetime
import cProfile
import socket
import copy
import xlsxwriter
starttime = datetime.datetime.now()
base_url = r'http://bj.fangjia.com/ershoufang/'
test_search_dict = {'昌平': {'霍营': {'13号线': 'http://bj.fangjia.com/ershoufang/--r-%E6%98%8C%E5%B9%B3|w-13%E5%8F%B7%E7%BA%BF|b-%E9%9C%8D%E8%90%A5'}}}
search_list = [] # 房源信息url列表
tmp_list = [] # 房源信息url缓存列表
layer = -1
# 获取列表页面
def get_page(url):
  headers = {
    'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
           r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
    'Referer': r'http://bj.fangjia.com/ershoufang/',
    'Host': r'bj.fangjia.com',
    'Connection': 'keep-alive'
  }
  timeout = 60
  socket.setdefaulttimeout(timeout) # 设置超时
  req = request.Request(url, headers=headers)
  response = request.urlopen(req).read()
  page = response.decode('utf-8')
  return page
# 获取查询关键词dict
def get_search(page, key):
  soup = BS(page, 'lxml')
  search_list = soup.find_all(href=re.compile(key), target='')
  search_dict = {}
  for i in range(len(search_list)):
    soup = BS(str(search_list[i]), 'lxml')
    key = soup.select('a')[0].get_text()
    value = soup.a.attrs['href']
    search_dict[key] = value
  return search_dict
# 获取房源信息列表(嵌套字典遍历)
def get_info_list(search_dict, layer, tmp_list, search_list):
  layer += 1 # 设置字典层级
  for i in range(len(search_dict)):
    tmp_key = list(search_dict.keys())[i] # 提取当前字典层级key
    tmp_list.append(tmp_key)  # 将当前key值作为索引添加至tmp_list
    tmp_value = search_dict[tmp_key]
    if isinstance(tmp_value, str):  # 当键值为url时
      tmp_list.append(tmp_value)  # 将url添加至tmp_list
      search_list.append(copy.deepcopy(tmp_list))  # 将tmp_list索引url添加至search_list
      tmp_list = tmp_list[:layer] # 根据层级保留索引
    elif tmp_value == '':  # 键值为空时跳过
      layer -= 2      # 跳出键值层级
      tmp_list = tmp_list[:layer]  # 根据层级保留索引
    else:
      get_info_list(tmp_value, layer, tmp_list, search_list) # 当键值为列表时,迭代遍历
      tmp_list = tmp_list[:layer]
  return search_list
# 获取房源信息详情
def get_info_pn_list(search_list):
  fin_search_list = []
  for i in range(len(search_list)):
    print('>>>正在抓取%s' % search_list[i][:3])
    search_url = search_list[i][3]
    try:
      page = get_page(search_url)
    except:
      print('获取页面超时')
      continue
    soup = BS(page, 'lxml')
    # 获取最大页数
    pn_num = soup.select('span[class="mr5"]')[0].get_text()
    rule = re.compile(r'\d+')
    max_pn = int(rule.findall(pn_num)[1])
    # 组装url
    for pn in range(1, max_pn+1):
      print('************************正在抓取%s页************************' % pn)
      pn_rule = re.compile('[|]')
      fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1)
      tmp_url_list = copy.deepcopy(search_list[i][:3])
      tmp_url_list.append(fin_url)
      fin_search_list.append(tmp_url_list)
  return fin_search_list
# 获取tag信息
def get_info(fin_search_list, process_i):
  print('进程%s开始' % process_i)
  fin_info_list = []
  for i in range(len(fin_search_list)):
    url = fin_search_list[i][3]
    try:
      page = get_page(url)
    except:
      print('获取tag超时')
      continue
    soup = BS(page, 'lxml')
    title_list = soup.select('a[class="h_name"]')
    address_list = soup.select('span[class="address]')
    attr_list = soup.select('span[class="attribute"]')
    price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"}) # select对于某些属性值(属性值中间包含空格)无法识别,可以用find_all(attrs={})代替
    for num in range(20):
      tag_tmp_list = []
      try:
        title = title_list[num].attrs["title"]
        print(r'************************正在获取%s************************' % title)
        address = re.sub('\n', '', address_list[num].get_text())
        area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)
        layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
        floor = re.search('\d/\d', attr_list[num].get_text()).group(0)
        price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
        unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
        tag_tmp_list = copy.deepcopy(fin_search_list[i][:3])
        for tag in [title, address, area, layout, floor, price, unit_price]:
          tag_tmp_list.append(tag)
        fin_info_list.append(tag_tmp_list)
      except:
        print('【抓取失败】')
        continue
  print('进程%s结束' % process_i)
  return fin_info_list
# 分配任务
def assignment_search_list(fin_search_list, project_num): # project_num每个进程包含的任务数,数值越小,进程数越多
  assignment_list = []
  fin_search_list_len = len(fin_search_list)
  for i in range(0, fin_search_list_len, project_num):
    start = i
    end = i+project_num
    assignment_list.append(fin_search_list[start: end]) # 获取列表碎片
  return assignment_list
# 存储抓取结果
def save_excel(fin_info_list, file_name):
  tag_name = ['区域', '板块', '地铁', '标题', '位置', '平米', '户型', '楼层', '总价', '单位平米价格']
  book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name) # 默认存储在桌面上
  tmp = book.add_worksheet()
  row_num = len(fin_info_list)
  for i in range(1, row_num):
    if i == 1:
      tag_pos = 'A%s' % i
      tmp.write_row(tag_pos, tag_name)
    else:
      con_pos = 'A%s' % i
      content = fin_info_list[i-1] # -1是因为被表格的表头所占
      tmp.write_row(con_pos, content)
  book.close()
if __name__ == '__main__':
  file_name = input(r'抓取完成,输入文件名保存:')
  fin_save_list = [] # 抓取信息存储列表
  # 一级筛选
  page = get_page(base_url)
  search_dict = get_search(page, 'r-')
  # 二级筛选
  for k in search_dict:
    print(r'************************一级抓取:正在抓取【%s】************************' % k)
    url = search_dict[k]
    second_page = get_page(url)
    second_search_dict = get_search(second_page, 'b-')
    search_dict[k] = second_search_dict
  # 三级筛选
  for k in search_dict:
    second_dict = search_dict[k]
    for s_k in second_dict:
      print(r'************************二级抓取:正在抓取【%s】************************' % s_k)
      url = second_dict[s_k]
      third_page = get_page(url)
      third_search_dict = get_search(third_page, 'w-')
      print('%s>%s' % (k, s_k))
      second_dict[s_k] = third_search_dict
  fin_info_list = get_info_list(search_dict, layer, tmp_list, search_list)
  fin_info_pn_list = get_info_pn_list(fin_info_list)
  p = Pool(4) # 设置进程池
  assignment_list = assignment_search_list(fin_info_pn_list, 2) # 分配任务,用于多进程
  result = [] # 多进程结果列表
  for i in range(len(assignment_list)):
    result.append(p.apply_async(get_info, args=(assignment_list[i], i)))
  p.close()
  p.join()
  for result_i in range(len(result)):
    fin_info_result_list = result[result_i].get()
    fin_save_list.extend(fin_info_result_list) # 将各个进程获得的列表合并
  save_excel(fin_save_list, file_name)
  endtime = datetime.datetime.now()
  time = (endtime - starttime).seconds
  print('总共用时:%s s' % time)

总结:

当抓取数据规模越大,对程序逻辑要求就愈严谨,对python语法要求就越熟练。如何写出更加pythonic的语法,也需要不断学习掌握的。

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持!

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索python
, 数据
并行抓取
整站抓取工具、网站整站抓取工具、html网站整站抓取工具、整站抓取、整站url链接抓取,以便于您获取更多的相关知识。

时间: 2024-08-02 12:04:37

Python实现并行抓取整站40万条房价数据(可更换抓取城市)_python的相关文章

Python如何实现并行抓取整站40万条房价数据的教程(可更换抓取城市)

写在前面 这次的爬虫是关于房价信息的抓取,目的在于练习10万以上的数据处理及整站式抓取. 数据量的提升最直观的感觉便是对函数逻辑要求的提高,针对Python的特性,谨慎的选择数据结构.以往小数据量的抓取,即使函数逻辑部分重复,I/O请求频率密集,循环套嵌过深,也不过是1~2s的差别,而随着数据规模的提高,这1~2s的差别就有可能扩展成为1~2h. 因此对于要抓取数据量较多的网站,可以从两方面着手降低抓取信息的时间成本. 1)优化函数逻辑,选择适当的数据结构,符合Pythonic的编程习惯.例如,

hibernate 查询40万条记录出现 java.sql.SQLException 数字溢出 异常

问题描述 大家好,我用hibernate 查询40万条记录,结果出现 java.sql.SQLException 数字溢出 异常,怎么回事啊?难道 hibernate 支持不了几十万的数据检索???我的代码: List list = session.createQuery("from 表 ").list(); 解决方案 你有没有设置Lazy Loading呀..如果你设的为立即加载的话,几十万的话数据.溢出很正常呀..解决方案二:大哥 你一起弄这么多数据出来能不溢出么?用LAZY解决方

整治网络谣言取成果 清21万条信息关42家网站

摘要: 国家互联网信息办网络新闻协调局局长刘正荣.工业和信息化部通信保障局副局长赵志国今天就整治网络谣言的有关情况接受媒体采访.两位负责人通报了近期整治网络谣言的举措和成 国家互联网信息办网络新闻协调局局长刘正荣.工业和信息化部通信保障局副局长赵志国今天就整治网络谣言的有关情况接受媒体采访.两位负责人通报了近期整治网络谣言的举措和成效,并对即将深入开展的相关工作进行了介绍. 拘留6名网上造谣者 刘正荣.赵志国在接受采访时指出,各类网络谣言经常出现,特别是最近一段时间,有个别网民在互联网上编造.传

A/B向上取整的方法

1.问题 A,B都是整数并且 A>1, B>1 求 ┌ A/B ┐ 即 A/B 的上取整. 当 A/B 整除,往上取整返回值 为 A/B. 当 不整除,返回值是 int(A/B) + 1 这个算法的一个应用:如果你有一个动态增长的缓冲区,增长的步长是 B, 某一次缓冲区申请的大小是 A,这个时候,就可以用这个算法,计算出缓冲区的一个合 适大小了,正好可以容纳A,并且不会过于得多,多余部分不会比B多. 2.方法 int( (A+B-1)/B ) 3.HUNTON 的证明 上取整用UP表示 由于A

javascript 取整

<script language="javascript"> var t = 3.1415; alert("int(" + t + ") = " + int(t)); /******** 内置取整函数 *********/ alert("parseInt(" + t + ") = " + parseInt(t)); /******** floor是地板的意思,顾名思义是向下取整 ********

纳尼,mysqldump导出的数据居然少了40万?

0.导读 用mysqldump备份数据时,加上 -w 条件选项过滤部分数据,发现导出结果比实际少了40万,什么情况? 本文约1500字,阅读时间约5分钟. 1.问题 我的朋友小文前几天遇到一个怪事,他用mysqldump备份数据时,加上了 -w 选项过滤部分数据,发现导出的数据比实际上少了40万. 要进行备份表DDL见下: CREATE TABLE `oldbiao` (   `aaaid` int(11) NOT NULL,   `bbbid` int(11) NOT NULL,   `ccc

SEO资料站长丘仕达访谈:整站优化心得

这个采访是应群里一个朋友而做的.其实不能算采访,因为我也不是名人.不过里面有些东西或许会对一些朋友有帮助,转发过来供大家交流吧. 内容提要 1.SEO资料站是如何做到第一的 2.做为一名在校学生您是如何走向研究SEO道路的. 3.为什么有采取免费分析网站优化的想法呢 4.如何控制百度对内页的收录和更新! 5.如何利用AD来赚钱? 6. 相关搜索蜘蛛SEO的出现说明什么? 7. 关于分词技术? 8. 您对点石 SEO的看法. 9. 整站的优化的心得! 10.SEO资料站未来的发展 小兵徐州SEO

WordPress整站全面SEO优化指南

WordPress是大家熟知的一款建站程序.它功能强大.模板丰富.插件充足.安装简单以及定制性强的特点使得我们在建立个人博客,甚至企业网站或其他类型网站使也会选择Wordpress.我对WordPress SEO有一些了解和经验,以下就来说说过去7天,我对它做了些什么.Wordpress模板的SEO大同小异,希望能够通过本文给有需要的朋友开拓一些思路. 在<网站整体标准化对SEO的影响>一文中,我介绍了网站的四个层面:基础层.结构层.内容层和表现层.   在进行WordPress SEO时,我

网站外链平均分布利于节省资源提升整站权重

内容为王外链为皇的时代虽然已经过了,虽然这几年搜索引擎也不断在进步,把外链的影响因素不断降低,但目前来看外链对网站排名还是起着决定性的影响.特别是在网站的前期对排名更是起着决定性的作用,当然外链的重要性已经不用我再来多讲了,相信大家都清楚,今天我要讲的是怎么把外链用好,怎么样把最小的资源价值最大人,比如一个关键词只需要十条外链就能进入前三名,而你却给它做一百条外链,那多余的90条就被浪费了,如何把这浪费的资源盘活价值最大化是我们每个人都需要做的. 其实现在很多人在做外链的时候陷入一个极大的误区,