Spark 如何写入HBase/Redis/MySQL/Kafka

一些概念

一个partition 对应一个task,一个task 必定存在于一个Executor,一个Executor 对应一个JVM.

  • Partition 是一个可迭代数据集合
  • Task 本质是作用于Partition的线程

问题

Task 里如何使用Kafka Producer 将数据发送到Kafaka呢。 其他譬如HBase/Redis/MySQL 也是如此。

解决方案

直观的解决方案自然是能够在Executor(JVM)里有个Prodcuer Pool(或者共享单个Producer实例),但是我们的代码都是现在Driver端执行,然后将一些函数序列化到Executor端执行,这里就有序列化问题,正常如Pool,Connection都是无法序列化的。

一个简单的解决办法是定义个Object 类,

譬如

object SimpleHBaseClient {
  private val DEFAULT_ZOOKEEPER_QUORUM = "127.0.0.1:2181"

  private lazy val (table, conn) = createConnection

  def bulk(items:Iterator) = {
      items.foreach(conn.put(_))
      conn.flush....
  }
 ......
}

然后保证这个类在map,foreachRDD等函数下使用,譬如:

dstream.foreachRDD{ rdd =>
    rdd.foreachPartition{iter=>
        SimpleHBaseClient.bulk(iter)
    }
}

为什么要保证放到foreachRDD /map 等这些函数里呢?Spark的机制是先将用户的程序作为一个单机运行(运行者是Driver),Driver通过序列化机制,将对应算子规定的函数发送到Executor进行执行。这里,foreachRDD/map 等函数都是会发送到Executor执行的,Driver端并不会执行。里面引用的object 类 会作为一个stub 被序列化过去,object内部属性的的初始化其实是在Executor端完成的,所以可以避过序列化的问题。

Pool也是类似的做法。然而我们并不建议使用pool,因为Spark 本身已经是分布式的,举个例子可能有100个executor,如果每个executor再搞10个connection的pool,则会有100*10 个链接,Kafka也受不了。一个Executor 维持一个connection就好。

关于Executor挂掉丢数据的问题,其实就看你什么时候flush,这是一个性能的权衡。

时间: 2024-09-16 14:00:59

Spark 如何写入HBase/Redis/MySQL/Kafka的相关文章

spark streaming 中使用saveAsNewAPIHadoopDataset方法写入hbase中,从checkpoint中恢复时报错

问题描述 最近写了一个从Kafka读取数据,处理之后通过saveAsNewAPIHadoopDataset方法写入到hbase中,正常运行的时候没有报错,写入也正常,但是当手动停止应用,再次执行(通过Checkpoint恢复)的时候就会报错,跪求大神们解答!!报错信息如下:15/12/2216:26:52WARNVerifiableProperties:Propertyserializer.classisnotvalid15/12/2216:26:57WARNFileOutputCommitte

如何提高spark批量读取HBase数据的性能

问题描述 Configurationconf=HBaseConfiguration.create();StringtableName="testTable";Scanscan=newScan();scan.setCaching(10000);scan.setCacheBlocks(false);conf.set(TableInputFormat.INPUT_TABLE,tableName);ClientProtos.Scanproto=ProtobufUtil.toScan(scan)

NODEJS连接REDIS/MYSQL

问题描述 哪位大侠提供window下的NODEJS和REDIS.MYSQL的搭建流程 解决方案 解决方案二:还有NODEJS和ZeroMQ的例子解决方案三:不清楚,顶一下解决方案四:github上面有很多库的,自己可以去找一下

【Spark Summit East 2017】使用Kafka, Spark, and Kudu构建实时BI系统

本讲义出自Ruhollah Farchtchi在Spark Summit East 2017上的演讲,主要介绍了在面对处理实时流数据时的一个关键性挑战就是被捕获到的数据的格式不是查询中的最佳解析格式,那么如何构建实时的商业智能系统就成为了一个挑战,本讲义介绍了如何使用Kafka, Spark, and Kudu构建实时BI系统.

【Spark Summit East 2017】 使用Kafka Connect和Spark Streaming构建实时数据管道

本讲义出自Ewen Cheslack Postava在Spark Summit East 2017上的演讲,主要介绍了面对使用Spark Streaming构建大规模实时数据管道的挑战,Kafka项目最近推出了新的工具-- Kafka Connect,该工具将帮助简化从Kafka导入和导出数据,Ewen Cheslack Postava分享了如何使用Kafka Connect和Spark Streaming构建实时数据管道.

spark批量插入hbase

问题描述 当rdd是自定义的list的时候可以这么写valrdd=List((1,"lilei",14),(2,"hanmei",18),(3,"someone",38))vallocalData=sc.parallelize(rdd).map(convert)localData.saveAsHadoopDataset(jobConf)defconvert(triple:(String,String,String))={valp=newPut(B

从MySQL到Redis,提升数据迁移的效率

做开发的同学都知道,一旦设计到底层存储优化,数据结构甚至数据库的变更,通常都会进行数据迁移的工作.如果系统运行时间过长,数据迁移的数量可能非常庞大.这时候,如何进行高效的数据迁移,实际也是上线质量的直接影响因素之一. 下面内容是转载的一个小技巧(原文),无法适用于各种变化的场景,仅供大家参考. 场景是从MySQL中将数据导入到Redis的Hash结构中.当然,最直接的做法就是遍历MySQL数据,一条一条写入到Redis中.这样可能没什么错,但是速度会非常慢.而如果能够使MySQL的查询输出数据直

从MySQL到Redis 提升数据迁移的效率

场景是从MySQL中将数据导入到Redis的Hash结构中.当然,最直接的做法就是遍历MySQL数据,一条一条写入到Redis中.这样可能没什么错,但是速度会非常慢.而如果能够使MySQL的查询输出数据直接能够与Redis命令行的输入数据协议相吻合,可能就省事多了. 根据测试800w的数据迁移,时间从90分钟缩短到2分钟. 具体案例如下: MySQL数据表结构: CREATE TABLE events_all_time ( id int(11) unsigned NOT NULL AUTO_IN

redis作为mysql的缓存服务器(读写分离) (转)

一.redis简介Redis是一个key-value存储系统.和Memcached类似,为了保证效率,数据都是缓存在内存中.区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步.在部分场合可以对关系数据库起到很好的补充作用.它提供了Java,C/C++(hiredis),C#,PHP,JavaScript,Perl,Object-C,Python,Ruby等客户端,使用很方便. 二.架构图<ignore_js_