亲自摸索,送给大家,原创文章,转载注明哦。
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.hadoop.hbase.client.{Mutation, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.OutputFormat
/**
* Created by sunyulong on 16/9/19.
*/
object OBDSQL extends App{
//kafka topic
val topics = List(("aaa",1)).toMap
//zookeeper
val zk = "10.1.11.71,10.1.11.72,10.1.11.73"
val conf = new SparkConf() setMaster "yarn-cluster" setAppName "SparkStreamingETL"
//create streaming context
val ssc = new StreamingContext(conf , Seconds(1))
//get every lines from kafka
val lines = KafkaUtils.createStream(ssc,zk,"sparkStreaming",topics).map(_._2)
//get spark context
val sc = ssc.sparkContext
//get sql context
val sqlContext = new SQLContext(sc)
//process every rdd AND save as HTable
lines.foreachRDD(rdd => {
//case class implicits
import sqlContext.implicits._
//filter empty rdd
if (!rdd.isEmpty) {
//register a temp table
rdd.map(_.split(",")).map(p => Persion(p(0), p(1).trim.toDouble, p(2).trim.toInt, p(3).trim.toDouble)).toDF.registerTempTable("oldDriver")
//use spark SQL
val rs = sqlContext.sql("select count(1) from oldDriver")
//create hbase conf
val hconf = HBaseConfiguration.create()
hconf.set("hbase.zookeeper.quorum",zk)
hconf.set("hbase.zookeeper.property.clientPort", "2181")
hconf.set("hbase.defaults.for.version.skip", "true")
hconf.set(TableOutputFormat.OUTPUT_TABLE, "obd_pv")
hconf.setClass("mapreduce.job.outputformat.class", classOf[TableOutputFormat[String]], classOf[OutputFormat[String, Mutation]])
val jobConf = new JobConf(hconf)
//convert every line to hbase lines
rs.rdd.map(line => (System.currentTimeMillis(),line(0))).map(line =>{
//create hbase put
val put = new Put(Bytes.toBytes(line._1))
//add column
put.addColumn(Bytes.toBytes("pv"),Bytes.toBytes("pv"),Bytes.toBytes(line._2.toString))
//retuen type
(new ImmutableBytesWritable,put)
}).saveAsNewAPIHadoopDataset(jobConf) //save as HTable
}
})
//streaming start
ssc start()
ssc awaitTermination()
}
//the entity of persion for SparkSQL
case class Persion(gender: String, tall: Double, age: Int, driverAge: Double)
时间: 2024-12-27 05:23:16