Sparkstreaming读取Kafka消息再结合SparkSQL,将结果保存到HBase

亲自摸索,送给大家,原创文章,转载注明哦。

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.hadoop.hbase.client.{Mutation, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.OutputFormat
/**
  * Created by sunyulong on 16/9/19.
  */
object OBDSQL extends App{
  //kafka topic
  val topics = List(("aaa",1)).toMap
  //zookeeper
  val zk = "10.1.11.71,10.1.11.72,10.1.11.73"
  val conf = new SparkConf() setMaster "yarn-cluster" setAppName "SparkStreamingETL"
  //create streaming context
  val ssc = new StreamingContext(conf , Seconds(1))
  //get every lines from kafka
  val lines = KafkaUtils.createStream(ssc,zk,"sparkStreaming",topics).map(_._2)
  //get spark context
  val sc = ssc.sparkContext
  //get sql context
  val sqlContext = new SQLContext(sc)
  //process every rdd AND save as HTable
  lines.foreachRDD(rdd => {
    //case class implicits
    import sqlContext.implicits._
    //filter empty rdd
    if (!rdd.isEmpty) {
      //register a temp table
      rdd.map(_.split(",")).map(p => Persion(p(0), p(1).trim.toDouble, p(2).trim.toInt, p(3).trim.toDouble)).toDF.registerTempTable("oldDriver")
      //use spark SQL
      val rs = sqlContext.sql("select count(1) from oldDriver")
      //create hbase conf
      val hconf = HBaseConfiguration.create()
      hconf.set("hbase.zookeeper.quorum",zk)
      hconf.set("hbase.zookeeper.property.clientPort", "2181")
      hconf.set("hbase.defaults.for.version.skip", "true")
      hconf.set(TableOutputFormat.OUTPUT_TABLE, "obd_pv")
      hconf.setClass("mapreduce.job.outputformat.class", classOf[TableOutputFormat[String]], classOf[OutputFormat[String, Mutation]])
      val jobConf = new JobConf(hconf)
      //convert every line to hbase lines
      rs.rdd.map(line => (System.currentTimeMillis(),line(0))).map(line =>{
        //create hbase put
        val put = new Put(Bytes.toBytes(line._1))
        //add column
        put.addColumn(Bytes.toBytes("pv"),Bytes.toBytes("pv"),Bytes.toBytes(line._2.toString))
        //retuen type
        (new ImmutableBytesWritable,put)
      }).saveAsNewAPIHadoopDataset(jobConf)     //save as HTable
    }
  })
  //streaming start
  ssc start()
  ssc awaitTermination()
}

//the entity of persion for SparkSQL
case class Persion(gender: String, tall: Double, age: Int, driverAge: Double)
时间: 2024-12-27 05:23:16

Sparkstreaming读取Kafka消息再结合SparkSQL,将结果保存到HBase的相关文章

SparkStreaming与Kafka整合遇到的问题及解决方案

前言 最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticsearch中,可以实现准实时定位系统日志. 实现 Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式. 一. 基于Receiver方式

Kafka消息delivery可靠性保证(Message Delivery Semantics)

原文见:http://kafka.apache.org/documentation.html#semantics kafka在生产者和消费者之间的传输是如何保证的,我们可以知道有这么几种可能提供的delivery guarantee: At most once 消息可能会丢,但绝不会重复传输 At least one 消息绝不会丢,但可能会重复传输 Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的. 值得注意的是,当Producer向broker发送消息时

android-写入文件然后读取,然后再像一个字符串一样保存内容

问题描述 写入文件然后读取,然后再像一个字符串一样保存内容 我想写进一个文件,然后,我想把内容保存在一个 StringBuffer 里面.最后再通过一个 TextView显示,但是什么也不显示. public class MainActivity extends Activity { String finall; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceS

写文件-Java读取文件然后再修改回去

问题描述 Java读取文件然后再修改回去 有一个文件存着很多对象,现在读取其中的一个对象,然后 将其修改,最终再将这个对象再保存回原文件.这个怎么实现.C语言由于可以控制读文件指针,所以可以定位指正到指定的位置,可是Java怎么修改文件指针? 解决方案 可以试试RandomAccessFile类. 解决方案二: 在Java下使用DOM来读取/修改Xml文件java 修改 读取properties文件 解决方案三: 可以试试RandomAccessFile类. 解决方案四: 可以试试RandomA

Kafka 消息监控 - Kafka Eagle

1.概述 在开发工作当中,消费 Kafka 集群中的消息时,数据的变动是我们所关心的,当业务并不复杂的前提下,我们可以使用 Kafka 提供的命令工具,配合 Zookeeper 客户端工具,可以很方便的完成我们的工作.随着业务的复杂化,Group 和 Topic 的增加,此时我们使用 Kafka 提供的命令工具,已预感到力不从心,这时候 Kafka 的监控系统此刻便尤为显得重要,我们需要观察消费应用的详情. 监控系统业界有很多杰出的开源监控系统.我们在早期,有使用 KafkaMonitor 和

MSMQ如何异步一次性读取所有消息

问题描述 我用System.Messaging.Messagem=mq.EndReceive(asyncResult.AsyncResult);只能依依取出消息,我要一次性取出所有的消息 解决方案 解决方案二:没有人会吗解决方案三:这年头还有人用MSMQ么!?LZ要做么?提高性能也可以用其他方法的解决方案四:MSMQ很落后吗?我还以为多先进的呢解决方案五:MessageEnumeratormyEnumerator=mq.GetMessageEnumerator(); 一次性读取所有消息似乎不存在

Android编程之SMS读取短信并保存到SQLite的方法_Android

本文实例讲述了Android编程之SMS读取短信并保存到SQLite的方法.分享给大家供大家参考,具体如下: Android 之 SMS 短信在Android系统中是保存在SQLite数据库中的,但不让其它程序访问(Android系统的安全机制) 现在我们在读取手机内的SMS短信,先保存在我们自己定义的SQLite数据库中,然后读取SQLite数据库提取短信,并显示 SMS短信SQLite存取代码: package com.homer.sms; import java.sql.Date; imp

Android编程之SMS读取短信并保存到SQLite的方法

本文实例讲述了Android编程之SMS读取短信并保存到SQLite的方法.分享给大家供大家参考,具体如下: Android 之 SMS 短信在Android系统中是保存在SQLite数据库中的,但不让其它程序访问(Android系统的安全机制) 现在我们在读取手机内的SMS短信,先保存在我们自己定义的SQLite数据库中,然后读取SQLite数据库提取短信,并显示 SMS短信SQLite存取代码: package com.homer.sms; import java.sql.Date; imp

android读取Assets图片资源保存到SD卡实例

本文为大家详细介绍下android读取Assets图片资源保存到SD卡的具体实现,感兴趣的各位可以参考下哈,希望对大家有所帮助   复制代码 代码如下: public class ReadBitmap { public void readByte(Context c, String name, int indexInt) { byte[] b = null; int[] intArrat = c.getResources().getIntArray(indexInt); try { AssetM