Spark处理存储于Hive中的Twitter数据

本文将介绍使用Spark batch作业处理存储于Hive中Twitter数据的一些实用技巧。

首先我们需要引入一些依赖包,参考如下:
name := "Sentiment"version := "1.0"
 
scalaVersion := "2.10.6"
 
assemblyJarName in assembly := "sentiment.jar"
 
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.6.0" % "provided"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"
 
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
 
assemblyMergeStrategy in assembly := { 
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard 
  case x => MergeStrategy.first}

编写一个Scala case class用于存储解析好的Twitter Json数据:
case class Tweet(coordinates: String, geo:String, handle: String,
                   hashtags: String, language: String,
                   location: String, msg: String, time: String,
                   tweet_id: String, unixtime: String,
                   user_name: String, tag: String,
                   profile_image_url: String,
                   source: String, place: String, friends_count: String,
                   followers_count: String, retweet_count: String,
                   time_zone: String, sentiment: String,
                   stanfordSentiment: String)

引入以下的包:
import java.util.Properties
import com.vader.SentimentAnalyzer
import edu.stanford.nlp.ling.CoreAnnotations
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import edu.stanford.nlp.pipeline.StanfordCoreNLP
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{<span class="wp_keywordlink_affiliate"><a data-original-title="View all posts in Spark" href="/archives/tag/spark" title="" target="_blank">Spark</a></span>Conf, <span class="wp_keywordlink_affiliate"><a data-original-title="View all posts in Spark" href="/archives/tag/spark" title="" target="_blank">Spark</a></span>Context}
import org.apache.spark.serializer.KryoSerializer

import org.apache.spark.sql._

用Scala编写的用于从Hive中读取数据的Spark代码片段:
def main(args: Array[String]) {
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
  val logger: Logger = Logger.getLogger("com.iteblog.sentiment.TwitterSentimentAnalysis")
  val sparkConf = new SparkConf().setAppName("TwitterSentimentAnalysis")
  sparkConf.set("spark.streaming.backpressure.enabled", "true")
  sparkConf.set("spark.cores.max", "32")
  sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
  sparkConf.set("spark.sql.tungsten.enabled", "true")
  sparkConf.set("spark.eventLog.enabled", "true")
  sparkConf.set("spark.app.id", "Sentiment")
  sparkConf.set("spark.io.compression.codec", "snappy")
  sparkConf.set("spark.rdd.compress", "true")
  val sc = new SparkContext(sparkConf)
  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
  import sqlContext.implicits._
  val tweets = sqlContext.read.json("hdfs://www.iteblog.com:8020/social/twitter")
  sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
  tweets.printSchema()
  tweets.count
  tweets.take(5).foreach(println)

其中我们需要注意的是我们需要创建Hive context而不是标准的SQL context

在运行我们的代码之前,先确认Hive中存储Twitter Json数据的表,以及用于存放结果数据的表格是否存在,本文用于存储结果数据的表格使用了ORC 格式
beeline
!connect jdbc:hive2://localhost:10000/default;
!set showHeader true;
set hive.vectorized.execution.enabled=true;
set hive.execution.engine=tez;
set hive.vectorized.execution.enabled =true;
set hive.vectorized.execution.reduce.enabled =true;
set hive.compute.query.using.stats=true;
set hive.cbo.enable=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
show tables;
describe sparktwitterorc;
describe twitterraw;
describe sparktwitterorc;
analyze table sparktwitterorc compute statistics;
analyze table sparktwitterorc compute statistics for columns;

上面名为twitterraw的表格是用于存放Twitter Json数据的表;而名为sparktwitterorc的表格是用于存放Spark处理结果的表。

如何将RDD或者DataFrame中的数据写入到Hive ORC表呢?操作如下:
outputTweets.toDF().write.format("orc").mode(SaveMode.Overwrite).saveAsTable("default.sparktwitterorc")

在编译的程序时候设置JVM相关参数
export SBT_OPTS="-Xmx2G -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=2G -Xss2M  -Duser.timezone=GMT"
sbt -J-Xmx4G -J-Xms4G assembly

将Spark作业提交到YARN集群:
spark-submit --class com.iteblog.sentiment.TwitterSentimentAnalysis --master yarn-client sentiment.jar --verbose

这里附上我们的rawtwitter表建表语句:
CREATE TABLE rawtwitter
(
   handle              STRING,
   hashtags            STRING,
   msg                 STRING,
   language            STRING,
   time                STRING,
   tweet_id            STRING,
   unixtime            STRING,
   user_name           STRING,
   geo                 STRING,
   coordinates         STRING,
   `location`          STRING,
   time_zone           STRING,
   retweet_count       STRING,
   followers_count     STRING,
   friends_count       STRING,
   place               STRING,
   source              STRING,
   profile_image_url   STRING,
   tag                 STRING,
   sentiment           STRING,
   stanfordsentiment   STRING
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 'hdfs://www.iteblog.com:8020/social/twitter'

时间: 2024-09-28 01:25:34

Spark处理存储于Hive中的Twitter数据的相关文章

Hive中如何查看数据来源文件和具体位置

通常用户在HIVE中用SELECT语句出来结果,无法确定结果是来自哪个文件或者具体位置信息,HIVE中考虑到了这点,在Virtual Column虚列中可以指定三个静态列: 1.  INPUT__FILE__NAME        map任务读入File的全路径 2.  BLOCK__OFFSET__INSIDE__FILE       如果是RCFile或者是SequenceFile块压缩格式文件则显示Block file Offset,也就是当前快在文件的第一个字偏移量,如果是TextFil

Hive中如何确定map数

Hive 是基于 Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的 sql 查询功能,可以将 sq l语句转换为 MapReduce 任务进行运行.当运行一个 hql 语句的时候,map 数是如何计算出来的呢?有哪些方法可以调整 map 数呢? 本文测试集群版本:cdh-4.3.0 . hive 默认的 input format 在 cdh-4.3.0 的 hive 中查看 hive.input.format 值(为什么是hive.input.format?

漫谈数据仓库之拉链表(原理、设计以及在Hive中的实现)

本文将会谈一谈在数据仓库中拉链表相关的内容,包括它的原理.设计.以及在我们大数据场景下的实现方式. 全文由下面几个部分组成: 先分享一下拉链表的用途.什么是拉链表. 通过一些小的使用场景来对拉链表做近一步的阐释,以及拉链表和常用的切片表的区别. 举一个具体的应用场景,来设计并实现一份拉链表,最后并通过一些例子说明如何使用我们设计的这张表(因为现在Hive的大规模使用,我们会以Hive场景下的设计为例). 分析一下拉链表的优缺点,并对前面的提到的一些内容进行补充说明,比如说拉链表和流水表的区别.

Hive中 分区表和桶

Hive分区表 在hive Select 查询中一般会扫描整个表内容,会消耗很多时间做没必要的工作.有时候只需要扫描表中关心的一部分数据,因此建表时引入了partition概念.分许表指的是在创建表时指定的partition的分区空间. Hive可以对数据按照某列或者某些列进行分区管理,所谓分区我们可以拿下面的列子进行解释. 当前互联网应用每天都要存储大量的日志文件.几G.十几G甚至更大都是有可能的.存储日志,其中必然有个属性是日志产生的日期.在产生分区时,就可以按照日志产生的日期列进行划分.把

hive中partition如何使用

网上有篇关于hive的partition的使用讲解的比较好,转载了: 一.背景 1.在Hive Select查询中一般会扫描整个表内容,会消耗很多时间做没必要的工作.有时候只需要扫描表中关心的一部分数据,因此建表时引入了partition概念. 2.分区表指的是在创建表时指定的partition的分区空间. 3.如果需要创建有分区的表,需要在create表的时候调用可选参数partitioned by,详见表创建的语法结构. 二.技术细节 1.一个表可以拥有一个或者多个分区,每个分区以文件夹的形

HIVE中关于collect_set与explode函数妙用

hive的复合数据类型 Hive中的列支持使用三类复杂的集合数据类型,即:array,map及struct,这些类型的名称是保留字,具体用法可参见该篇博文,里面有关于三类基本集合数据类型的操作实例,注:map中可嵌套array类型. 例如,定义表: create table example (      device_id string,      login_ip array<string>,      user_info map<string,array<string>&

Apache Kylin权威指南2.2 在Hive中准备数据

2.2 在Hive中准备数据 2.1节介绍了Kylin中的常见概念.本节将介绍准备Hive数据的一些注意事项.需要被分析的数据必须先保存为Hive表的形式,然后Kylin才能从Hive中导入数据,创建Cube. Apache Hive是一个基于Hadoop的数据仓库工具,最初由Facebook开发并贡献到Apache软件基金会.Hive可以将结构化的数据文件映射为数据库表,并可以将SQL语句转换为MapReduce或Tez任务进行运行,从而让用户以类SQL(HiveQL,也称HQL)的方式管理和

在 Apache Hive 中轻松生存的12个技巧

在 Apache Hive 中轻松生存的12个技巧 Hive 可以让你在 Hadoop 上使用 SQL,但是在分布式系统上优化 SQL 则有所不同.这里是让你可以轻松驾驭 Hive 的12个技巧. Hive 并不是关系型数据库(RDBMS),但是它大多数时候都表现得像是一个关系型数据库一样,它有表.可以运行 SQL.也支持 JDBC 和 ODBC. 这种表现既有好的一面,也有不好的一面:Hive 并不像关系型数据库那样执行 SQL 查询.我在 Hive 上花费了大量时间,光是我自己在工作中就为了

大数据-spark能在WEB项目中使用吗?

问题描述 spark能在WEB项目中使用吗? ssh的web项目中想使用spark大数据分析,导入spark的jar包 在初始化sparkconf时报求助啊.............大神在哪里 解决方案 用法有问题,根本不是包的问题.spark更像是操作系统,你的调用是在其上的,而不是平行调用以为加个包就可以了