pyspark访问hive数据实战

数据分析都是直接使用hive脚本进行调用,随着APP用户行为和日志数据量的逐渐累积,跑每天的脚本运行需要花的时间越来越长,虽然进行了sql优化,但是上spark已经提上日程。

直接进行spark开发需要去学习scala,为了降低数据分析师的学习成本,决定前期先试用sparkSQL,能够让计算引擎无缝从MR切换到spark,现在主要使用pyspark访问hive数据。

以下是安装配置过程中的详细步骤:

1.安装spark

需要先安装JDK和scala,这不必多说,由于现有hadoop集群版本是采用的2.6.3,所以spark版本是下载的稳定版本spark-1.4.0-bin-hadoop2.6.tgz

我是先在一台机器上完成了Spark的部署,Master和Slave都在一台机器上。注意要配置免秘钥ssh登陆。

1.1 环境变量配置


  1. export JAVA_HOME=/usr/jdk1.8.0_73 
  2. export HADOOP_HOME=/usr/hadoop 
  3. export HADOOP_CONF_DIR=/usr/hadoop/etc/hadoop 
  4. export SCALA_HOME=/usr/local/scala-2.11.7 
  5. export SPARK_HOME=/home/hadoop/spark_folder/spark-1.4.0-bin-hadoop2.6 
  6. export SPARK_MASTER_IP=127.0.0.1 
  7. export SPARK_MASTER_PORT=7077 
  8. export SPARK_MASTER_WEBUI_PORT=8099 
  9.   
  10. export SPARK_WORKER_CORES=3     //每个Worker使用的CPU核数 
  11. export SPARK_WORKER_INSTANCES=1   //每个Slave中启动几个Worker实例 
  12. export SPARK_WORKER_MEMORY=10G    //每个Worker使用多大的内存 
  13. export SPARK_WORKER_WEBUI_PORT=8081 //Worker的WebUI端口号 
  14. export SPARK_EXECUTOR_CORES=1       //每个Executor使用使用的核数 
  15. export SPARK_EXECUTOR_MEMORY=1G     //每个Executor使用的内存 
  16.  
  17. export HIVE_HOME=/home/hadoop/hive 
  18. export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar:$SPARK_CLASSPATH 
  19. export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$HADOOP_HOME/lib/native 

1.2 配置slaves


  1. cp slaves.template slaves 
  2. vi slaves 添加以下内容:localhost 

1.3 启动master和slave


  1. cd $SPARK_HOME/sbin/ 
  2. ./start-master.sh 
  3.  
  4. 启动日志位于 $SPARK_HOME/logs/目录,访问 http://localhost:8099,即可看到Spark的WebUI界面 
  5.  
  6. 执行 ./bin/spark-shell,打开Scala到Spark的连接窗口    

2.SparkSQL与Hive的整合


  1. 拷贝$HIVE_HOME/conf/hive-site.xml和hive-log4j.properties到 $SPARK_HOME/conf/  
  2. 在$SPARK_HOME/conf/目录中,修改spark-env.sh,添加  
  3. export HIVE_HOME=/home/hadoop/hive 
  4. export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar:$SPARK_CLASSPATH 
  5. 另外也可以设置一下Spark的log4j配置文件,使得屏幕中不打印额外的INFO信息(如果不想受干扰可设置为更高):  
  6. log4j.rootCategory=WARN, console  
  7. 进入$SPARK_HOME/bin,执行 ./spark-sql –master spark://127.0.0.1:7077 进入spark-sql CLI: 
  8. [hadoop@hadoop spark]$ bin/spark-sql --help   
  9. Usage: ./bin/spark-sql [options] [cli option]   
  10. CLI options:   
  11.  -d,--define <keykey=value>          Variable subsitution to apply to hive   
  12.                                   commands. e.g. -d A=B or --define A=B   
  13.     --database <databasename>     Specify the database to use   
  14.  -e <quoted-query-string>         SQL from command line   
  15.  -f <filename>                    SQL from files   
  16.  -h <hostname>                    connecting to Hive Server on remote host   
  17.     --hiveconf <propertyproperty=value>   Use value for given property   
  18.     --hivevar <keykey=value>         Variable subsitution to apply to hive   
  19.                                   commands. e.g. --hivevar A=B   
  20.  -i <filename>                    Initialization SQL file   
  21.  -p <port>                        connecting to Hive Server on port number   
  22.  -S,--silent                      Silent mode in interactive shell   
  23.  -v,--verbose                     Verbose mode (echo executed SQL to the   
  24.                                   console)   

需要注意的是CLI不是使用JDBC连接,所以不能连接到ThriftServer;但可以配置conf/hive-site.xml连接到hive的metastore,然后对hive数据进行查询。下面我们接着说如何在python中连接hive数据表查询。

3.配置pyspark和示例代码

3.1 配置pyspark


  1. 打开/etc/profile: 
  2.         #PythonPath 将Spark中的pySpark模块增加的Python环境中 
  3.          export PYTHONPATH=/opt/spark-hadoop/python 
  4.         source /etc/profile   

执行./bin/pyspark ,打开Python到Spark的连接窗口,确认没有报错。

打开命令行窗口,输入python,Python版本为2.7.6,如图所示,注意Spark暂时不支持Python3。输入import pyspark不报错,证明开发前工作已经完成。

3.2 启动ThriftServer

启动ThriftServer,使之运行在spark集群中:

sbin/start-thriftserver.sh --master spark://localhost:7077 --executor-memory 5g

ThriftServer可以连接多个JDBC/ODBC客户端,并相互之间可以共享数据。

3.3 请求示例

查看spark官方文档说明,spark1.4和2.0对于sparksql调用hive数据的API变化并不大。都是用sparkContext 。


  1. from pyspark import SparkConf, SparkContext 
  2. from pyspark.sql import HiveContext 
  3.  
  4. conf = (SparkConf() 
  5.          .setMaster("spark://127.0.0.1:7077") 
  6.          .setAppName("My app") 
  7.          .set("spark.executor.memory", "1g")) 
  8. sc = SparkContext(conf = conf) 
  9. sqlContext = HiveContext(sc) 
  10. my_dataframe = sqlContext.sql("Select count(1) from logs.fmnews_dim_where") 
  11. my_dataframe.show() 

返回结果:

运行以后在webUI界面看到job运行详情。

4.性能比较

截取了接近一个月的用户行为数据,数据大小为2G,总共接近1600w条记录。

为了测试不同sql需求情况下的结果,我们选取了日常运行的2类sql:

1.统计数据条数:


  1. select count(1) from fmnews_user_log2; 

2.统计用户行为:


  1. SELECT device_id, min_time FROM 
  2.         (SELECT device_id,min(import_time) min_time FROM fmnews_user_log2 
  3.             GROUP BY device_id)a 
  4.         WHERE from_unixtime(int(substr(min_time,0,10)),'yyyy-MM-dd') = '2017-03-02'; 

3. 用户行为分析:


  1. select case when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '06:00' and '07:59' then 1 
  2.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '08:00' and '09:59' then 2 
  3.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '10:00' and '11:59' then 3 
  4.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '12:00' and '13:59' then 4 
  5.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '14:00' and '15:59' then 5 
  6.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '16:00' and '17:59' then 6 
  7.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '18:00' and '19:59' then 7 
  8.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '20:00' and '21:59' then 8 
  9.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '22:00' and '23:59' then 9 
  10.             else 0 end fmnews_time_type, count(distinct device_id) device_count,count(1) click_count 
  11.        from fmcm.fmnews_user_log2 
  12.      where from_unixtime(int(substr(import_time,0,10)),'yyyy-MM-dd') = '2017-03-02' 
  13.     group by case when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '06:00' and '07:59' then 1 
  14.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '08:00' and '09:59' then 2 
  15.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '10:00' and '11:59' then 3 
  16.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '12:00' and '13:59' then 4 
  17.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '14:00' and '15:59' then 5 
  18.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '16:00' and '17:59' then 6 
  19.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '18:00' and '19:59' then 7 
  20.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '20:00' and '21:59' then 8 
  21.             when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '22:00' and '23:59' then 9 
  22.             else 0 end; 

第一条sql的执行结果对比:hive 35.013 seconds

第一条sql的执行结果对比:sparksql 1.218 seconds

第二条sql的执行结果对比:hive 78.101 seconds

第二条sql的执行结果对比:sparksql 8.669 seconds

第三条sql的执行结果对比:hive 101.228 seconds

第三条sql的执行结果对比:sparksql 14.221 seconds

可以看到,虽然没有官网吹破天的100倍性能提升,但是根据sql的复杂度来看10~30倍的效率还是可以达到的。

不过这里要注意到2个影响因子:

1. 我们数据集并没有采取全量,在数据量达到TB级别两者的差距应该会有所减小。同时sql也没有针对hive做优化。

2. spark暂时是单机(内存足够)并没有搭建集群,hive使用的hadoop集群有4台datanode。

本文作者:aibati2008

来源:51CTO

时间: 2024-12-03 23:25:59

pyspark访问hive数据实战的相关文章

pyinstaller如何打包python源程序访问hive

1.需求 使用hvie server一段时间后,业务部门需要自己不定时的查询业务数据,之前这一块都是他们提需求我们来做,后来发现这样重复一样的工作放在我们这边做是在没有效率,遂提出给他们工具或者web UI自助查询,当然hive有自己的hwi可以通过网页UI进行自助查询,但是这对不懂sql的业务人员有点不太友好,目前有没时间去修改hwi的UI,所以还是给他们提供查询工具吧.我这边主要使用python thrift访问集群的hive,所以自然要将python源码打包成.exe,业务人员在windo

E-MapReduce的Presto组件默认支持访问oss数据

阿里云E-MapReduce从EMR-2.1.0版本镜像开始,Presto组件默认就支持访问oss数据了,不再需要引导操作额外支持. 如何使用 创建集群,版本选择EMR-2.1.0,软件勾选Presto,等待创建成功. 验证 hive创建oss数据表 下文举了创建数据在oss上的uservisits表的例子,请将表名,字段,oss信息替换为您oss数据对应的信息hive 进入hive cli vpc网络可以使用MetaService不需要指定ak等信息: CREATE EXTERNAL TABL

用引导操作打通presto访问oss数据

目前产品组件presto还不能直接访问oss数据,如果有需求,可以参考本文用引导操作打通presto访问oss数据. 准备脚本 下载 脚本,放在您的oss合适的目录里. 创建集群 参照 帮助文档 ,创建集群时点击添加引导操作,分别选择刚才上传的ossforpresto.sh脚本,创建以个引导操作步骤.集群创建好后,通过集群详情页的引导/软件配置:无异常来确定引导操作执行成功 验证 hive建表 下文举了一二创建数据在oss上的uservisits表的例子,请将表名,字段,oss信息替换为您oss

Hive简介、什么是Hive、为什么使用Hive、Hive的特点、Hive架构图、Hive基本组成、Hive与Hadoop的关系、Hive与传统数据库对比、Hive数据存储(来自学习资料)

1.1 Hive简介 1.1.1   什么是Hive Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能. 1.1.2   为什么使用Hive Ø  直接使用hadoop所面临的问题 人员学习成本太高 项目周期要求太短 MapReduce实现复杂查询逻辑开发难度太大   Ø  为什么要使用Hive 操作接口采用类SQL语法,提供快速开发的能力. 避免了去写MapReduce,减少开发人员的学习成本. 功能扩展很方便. 1.1.3   H

如何使用Big SQL访问大数据使用详解

在大数据技术推广.使用过程中,一个很大的挑战就是如何使用目前企业用户广泛使用的标准 SQL 来访问基于 Hadoop 平台的大数据,使用企业原有应用来访问大数据. 现在,使用大数据技术,通常使用 Hive.Pig 及 Java 程序来访问大数据,只能支持标准 SQL 的子集,需要用户学习新的编程语言,改写企业原有的应用,为了解决上述问题,IBM 推出了 Big SQL,它使用标准的 SQL 来访问基于 Hadoop 平台的 InfoSphere BigInsights,并提供标准的 JDBC.O

Excel 2000访问远程数据的四种方法

excel|访问|数据      Excel 2000作为一个电子表格软件,它不仅有强大的数据处理能力,而且它的报表功能也是十分强大.因而常常用Excel 2000去调用Access.SQL Server.Oracle.DB2等数据库软件建立的大型数据库的内容.用户可以在工作表中对这些数据进行筛选.排序.查询.编辑和打印报表,十分方便,这也是大多数人都熟悉的.但如何去调用这数据呢?本人在这里提供4种方法.    下面四种方法必须要先创建一个数据源,我们以SQL Server7.0内的样本数据库p

ASP.NET中利用SQLXML WEB服务访问XML数据

asp.net|sql|web|web服务|xml|访问|数据      引言 使用SQLXML 的Web 服务从你的ASP.net应用程序中直接访问XML 数据       SQLXML 是扩展SQL 服务器现有的对检索和储存XML 数据的支持的一套附加的工具. 有了SQLXML 3.0,你现在就能使用SQL服务器展示Web 服务了.在SQLXML的Web 服务可以让用户执行存储过程,用户定义的功能,并且它们支持模板.       在这篇文章中,你将看出怎样展示一个作为Web 服务并构建一个简

互联网公司网站访问的数据该如何应用?

文章描述:网站的访问数据为何很少被应用. 在互联网公司的众多数据中,营业的数据.财务的数据往往是比较容易得到应用,但是网站访问的数据往往是应用得最少的.这是什么原因,网站数据该如何应用呢? 先说应用少的原因: 1.首先是数据的重要程度的认识.         交易数据.财务数据往往是关系到公司的生死,如果这个数据出现问题了,那么公司的运转就会出现了紧张状况.所以上到CEO,CFO:下到公司的具体员工都在关心这个数据.         网站数据只关系到公司产品自身的好坏,而在公司发展的阶段,这点上

mfc-我已经通过MFC ODBC连上mysql数据库了,怎样以最简单的方式访问到数据?

问题描述 我已经通过MFC ODBC连上mysql数据库了,怎样以最简单的方式访问到数据? 已经连上mysql了,现在只需要读出数据库里的数据就行,希望能读出某一个字段中的所有值,按顺序这样读下来,存到我的程序里,怎么读最简单?本人比较菜,最好有易懂的代码,谢谢了 解决方案 就是select * from table这样的比较的理想 解决方案二: 参考:http://www.cnblogs.com/good90/archive/2012/03/04/2379371.htmlMFC通过ODBC连接