CarbonData集群模式体验

官方提供了一个快速上手的Quick-Start ,不过是采用spark-shell local模式的。我这里在实际集群环境做了下测试,并且记录了下过程,希望对大家有所帮助。

前言

之前专门写过一篇CarbonData的文章;由CarbonData想到了存储和计算的关系。可惜碍于时间问题到现在才开始真正的尝试。

编译打包

截止到本文章发出,CarbonData 明确支持的Spark版本是 1.5.2(默认) 以及 1.6.1。 而相应的,hadoop版本有2.2.0 和 2.7.2,理论上大部分2.0 之后的hadoop版本应该都是兼容的。

  • 下载源码:
git clone https://github.com/apache/incubator-carbondata.git carbondata
  • 安装 thrift (0.9.3)

Note:Thrift 主要是用来编译carbon-format模块用的,里面都是一些thrift文件,需要生成java文件。其他一些版本应该也是可以的,比如我用的就是0.9版本

  • 编译打包

打开pom.xml文件,然后找到<profiles>标签,然后加入

<profile>
      <id>hadoop-2.6.0</id>
      <properties>
        <hadoop.version>2.6.0</hadoop.version>
      </properties>
    </profile>

之后就可以指定hadoop 2.6.0 编译了。不过这个是可选项,如前所述,理论上大部分版本都是兼容的。

现在可以执行打包指令了:

cd carbondata
mvn package -DskipTests -Pspark-1.6.1 -Phadoop-2.6.0

我编译过很多次,都没遇到啥问题。如果有问题,不妨留言给我。这个时候你应该得到了carbondata的jar包了:

assembly/target/scala-2.10/carbondata_2.10-0.1.0-SNAPSHOT-shade-hadoop2.6.0.jar
  • 依赖说明

CarbonData 现阶段依赖于Kettle 以及 Hive Metastore。 依赖于Kettle 是因为一些数据处理逻辑Kettle已经有实现(譬如多线程等),而使用Hive Metastore 则是因为用Hive的人多。后面考虑会去除这些依赖,当前要体验的话,需要额外做些配置。

  • Kettle plugins 
 cd carbondata
 cp -r processing/carbonplugins/*  carbondata-kettle
 tar czvf carbondata-kettle.tar.gz carbondata-kettle

接着将这个包分发到各个Slave节点上(hadoop集群上),假定最后的目录是:

/data/soft/lib/java/carbondata-kettle

配置完成后检查下,确保carbondata-kettle下有个.kettle 的隐藏目录,该目录有kettle.properties文件。各个Slave节点都会加载该配置文件

  • Hive MetaStore 配置

首先下载一个mysql-connector,放到你准备提交Spark任务的机器上(有SPARK_HOME的机器上)的某个目录,比如我这里是:

  /Users/allwefantasy/Softwares/spark-1.6.1-bin-hadoop2.6/lib/mysql-connector-java-6.0.3.jar

然后将你的Hive 的hive-site.xml 文件拷贝到你的SPAKR_HOME/conf 目录下。conf 目录会被自动打包发送到集群上。另外一种选择是在提交的时候通过--files 指定hive-site.xml文件也是OK的,我们推荐第一种方式。

hive-site.xml文件一般会配置两个目录:

hive.exec.scratchdir
hive.metastore.warehouse.dir

你需要确保你之后需要运行的程序对着两个目录相应的权限。如果权限不足,程序会较为明显的告诉你问题所在,所以关注下命令行的输出即可。

  • 运行CarbonData

在 SPARK_HOME/lib 下还有三个datanucleus开头的包,我们也通过--jars 参数加上

./bin/spark-shell   \
--master yarn-client \
--num-executors 10 \
--executor-cores 3 \
--executor-memory 5G \
--driver-memory 3G \
--jars /Users/allwefantasy/CSDNWorkSpace/incubator-carbondata/assembly/target/scala-2.10/carbondata_2.10-0.1.0-SNAPSHOT-shade-hadoop2.6.0.jar,/Users/allwefantasy/Softwares/spark-1.6.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar,/Users/allwefantasy/Softwares/spark-1.6.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar,/Users/allwefantasy/Softwares/spark-1.6.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar,/Users/allwefantasy/Softwares/spark-1.6.1-bin-hadoop2.6/lib/mysql-connector-java-5.1.35.jar

所以--jars 一共有五个包:

  1. 我们编译好的carbondata_2.10-0.1.0-SNAPSHOT-shade-hadoop2.6.0.jar
  2. 我们下载的 mysql-connector-java-5.1.35.jar
  3. SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar
  4. SPARK_HOME/lib/datanucleus-core-3.2.10.jar
  5. SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar

然后就运行起来了,进入spark shell。

构建CarbonContext 对象

import org.apache.spark.sql.CarbonContext
import java.io.File
import org.apache.hadoop.hive.conf.HiveConf

val cc = new CarbonContext(sc, "hdfs://xxx/data/carbondata01/store")

CarbonContext 的第二个参数是主存储路径,确保你设置的目录,spark-shell 启动账号是具有写入权限。通常我会做如下操作:

hdfs dfs -chmod 777  /data/carbondata01/store

一些表信息,索引信息都是存在该目录的。如果写入权限不足,load数据的时候,会出现如下的异常:

ERROR 05-07 13:42:49,783 - table:williamtable02 column:bkup generate global dictionary file failed
ERROR 05-07 13:42:49,783 - table:williamtable02 column:bc generate global dictionary file failed
ERROR 05-07 13:42:49,783 - table:williamtable02 column:bid generate global dictionary file failed
ERROR 05-07 13:42:49,783 - generate global dictionary files failed
ERROR 05-07 13:42:49,783 - generate global dictionary failed
ERROR 05-07 13:42:49,783 - main
java.lang.Exception: Failed to generate global dictionary files
    at org.carbondata.spark.util.GlobalDictionaryUtil$.org$carbondata$spark$util$GlobalDictionaryUtil$$checkStatus(GlobalDictionaryUtil.scala:441)
    at org.carbondata.spark.util.GlobalDictionaryUtil$.generateGlobalDictionary(GlobalDictionaryUtil.scala:485)

如果下次你在启动spark-shell或者提交新的应用时,需要保持这个路径(storePath)的不变,否则会出现表不存在的问题。类似:

AUDIT 05-07 16:12:10,889 - [allwefantasy][allwefantasy][Thread-1]Table Not Found: williamtable02
org.spark-project.guava.util.concurrent.UncheckedExecutionException: org.apache.spark.sql.catalyst.analysis.NoSuchTableException
    at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4882)
    at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.apply(LocalCache.java:4898)
    at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:394)
    at

设置Kettle 相关

因为引入了Kettle的库,而该库需要在运行的服务器上读取一些配置文件(如kettle.properties),所以需要做一个配置。我们前面已经将kettle 分发到各个节点了,现在把路径要告诉Carbon,通过如下的方式:

cc.setConf("carbon.kettle.home","/data/soft/lib/java/carbondata-kettle")

如果这个目录在Slave节点不存在,你进入Spark 各个节点(Executor)的日志,可以看到很明显的错误,提示 kettle.properties 找不到。 而更明显的现象是,数据载入会不成功。

Hive  相关配置

理论上hive-site.xml的配置里已经有这些信息了,不过也可以显示设置下。

cc.setConf("hive.metastore.warehouse.dir", "hdfs://cdncluster/user/hive/warehouse")
cc.setConf(HiveConf.ConfVars.HIVECHECKFILEFORMAT.varname, "false")

生产数据

到目前为止 CarbonContext 已经设置完毕,可以往里面装载数据了。现阶段,CarbonData 支持CSV数据直接装载进CarbonData。

如果你已经有或者可以自己产生csv相关的数据,则可以忽略本节。

另外其实CarbonData 也提供了标准的Spark SQL API(Datasource)方便你导入数据,参看Carbondata-Interfaces。内部本质也是帮你把数据转化成csv然后再导入的:

def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
      // To avoid derby problem, dataframe need to be writen and read using CarbonContext
      require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
        "Error in saving dataframe to carbon file, must use CarbonContext to save dataframe"
      )

      val storePath = dataFrame.sqlContext.asInstanceOf[CarbonContext].storePath
      val options = new CarbonOption(parameters)
      val dbName = options.dbName
      val tableName = options.tableName

      // temporary solution: write to csv file, then load the csv into carbon
      val tempCSVFolder = s"$storePath/$dbName/$tableName/tempCSV"
      dataFrame.write
        .format(csvPackage)
        .option("header", "true")
        .mode(SaveMode.Overwrite)
        .save(tempCSVFolder)

这里也介绍另外一种方式,以从ES导出数据为csv为例:

  • 下载一个配置文件配置文件,根据里面的要求进行修改

并且将修改后的配置上传到hdfs上。假设路径是: 

hdfs://cluster/tmp/test.json

下载一个jar包:

链接: http://pan.baidu.com/s/1bZWphO 密码: kf5y
  • 提交到集群
./bin/spark-submit   \
--class streaming.core.StreamingApp   \
--name "es导出成csv文件"  \
--master yarn-cluster   \
--executor-memory 2G   \
--driver-memory 6G   \
--conf "spark.locality.wait=10ms"   \
--num-executors 35   \
--executor-cores 3  \\/Users/allwefantasy/CSDNWorkSpace/streamingpro/target/streamingpro-0.2.0-SNAPSHOT-online-1.6.1.jar  \
-streaming.name estocsvn \
-streaming.job.file.path hdfs://cluster/tmp/test.json \
-streaming.platform spark

这样你就生成了一个csv格式的数据

创建表

cc.sql("create table if not exists williamtable04 (sid string,  r double,time string,domain string,month Int,day Int,mid string) STORED BY 'org.apache.carbondata.format'")

貌似不支持float,需要用double类型。

装载CSV数据

cc.sql(s"load data inpath 'hdfs://cluster/tmp/csv-table1/part-00001.csv' into table williamtable04")

csv文件需要是.csv 为后缀,并且需要带有header。当然,如果你生成的csv文件没有header,也可以通过在load data时指定FIELDHEADER来完成。

查询

cc.sql("select count(*) from williamtable04").show

后话

因为现阶段CarbonData 依赖于Hive/Kettle,所以需要做一些额外配置,自身的配置已经足够简单,只需要个storePath。在集群环境里,我们还需要注意权限相关的问题。

时间: 2024-11-02 21:24:30

CarbonData集群模式体验的相关文章

从单机到集群会话的管理之集群模式二(更大的集群)

<从单机到集群会话的管理之集群模式一>中讲到的全节点复制的网络流量随节点数量增加呈平方趋势增长,也正是因为这个因素导致无法构建较大规模的集群,为了使集群节点能更加大,首要解决的就是数据复制时流量增长的问题,下面将介绍另外一种会话管理方式,每个会话只会有一个备份,它使会话备份的网络流量随节点数量的增加呈线性趋势增长,大大减少了网络流量和逻辑操作,可构建较大的集群. 下面看看这种方式具体的工作机制,集群一般是通过负载均衡对外提供整体服务,所有节点被隐藏在后端组成一个整体.前面各种模式的实现都无需负

Zookeeper集群模式无法部署云服务器【java.net.BindException: 无法指定被请求的地址 (Bind failed)】的解决

Zookeeper集群模式无法部署云服务器[java.net.BindException: 无法指定被请求的地址 (Bind failed)]的解决. 正文 在云服务器(阿里云.腾讯云)上部署Zookeeper集群模式时,无法成功的原因有很多,网上主要提到了端口被占用(未开放)和防火墙开启两种问题,类似的博客很多,本文不再赘述. 如果你已经针对上述两种情况做了修改依然没有成功,那么就可能出现了本篇的问题. Zookeeper在启动时,并不会打印信息,即使集群启动失败,依然会显示: 这样我们看不到

《Spark官方文档》集群模式概览

Spark 1.6.0  译者:dlbrant 集群模式概览 本文简要描述了Spark在集群中各个组件如何运行.想了解如何在集群中启动Spark应用,请参考application submission guide . 组件 Spark应用在集群上运行时,包括了多个独立的进程,这些进程之间通过你的主程序(也叫作驱动器,即:driver)中的SparkContext对象来进行协调. 特别要指出的是,SparkContext能与多种集群管理器通信(包括:Spark独立部署时自带的集群管理器,Mesos

NetApp集群模式Data ONTAP展露新颜

 NetApp(纳斯达克股票代码:NTAP)宣布推出集群模式Data ONTAP 8.2,这一新版本是其旗舰存储操作系统打出的又一记重拳.新的软件产品使得企业和云服务提供商能够快速且经济高效地提供新的服务及性能,同时最大限度地支持应用程序无中断正常运行.集群模式 Data ONTAP 8.2 突破了传统孤立硬件的性能.可用性和效率极限,支持 IT 根据不断变化的业务和应用程序需求无中断地调整存储基础架构. 集群模式Data ONTAP具备行业领先的功能: ● 无中断运行--可靠性超过99.999

NetApp借助集群模式Data ONTAP发力云服务

提到云服务提供的难度, 那么部署和管理公有云.私有云以及混合云的基础设施应该算是其中之一. 众多http://www.aliyun.com/zixun/aggregation/13793.html">云服务提供商本身的云都是各有思路, 实现方法千差万别,标准的缺失一定程度上阻碍了云的大规模推广.当下 NetApp提出了一种 新的解决方案用于管理不同的云模型,这种解决方案基于三种战略理念:通用的数据平台(universal data platform). 动态的数据移植(dynamic da

从单机到集群会话的管理之集群模式一

为什么要使用集群?主要有两方面原因:一是对于一些核心系统要求长期不能中断服务,为了提供高可用性我们需要由多台机器组成的集群:另外一方面,随着访问量越来越大且业务逻辑越来越复杂,单台机器的处理能力已经不足以处理如此多且复杂的逻辑,于是需要增加若干台机器使整个服务处理能力得到提升. 如果说一个web应用不涉及会话的话,那么做集群是相当简单的,因为节点都是无状态的,集群内各个节点无需互相通信,只需要将各个请求均匀分配到集群节点即可.但基本所有web应用都会使用会话机制,所以做web应用集群时整个难点在

MongoDB 集群,主从复制集群模式,replSet集群模式

一.MongoDB -Master-Slave mode: 1.master 节点配置(192.168.99.16) master节点创建/data/master 数据目录,给mongo的data文件夹添加写权限 sudo mkdir -pv /data/masterdb && sudo chmod 777 /data/masterd 运行master mongd sudo nohup mongod --master --dbpath /data/masterdb & 2.slav

如何管理一台集群的虚拟机

现如今的企业组织需要保证全年的每一天都24小时全天候的正常运营.而且,他们的服务必须保持时刻在线,否则就会失去客户.同时,在今天的全球市场上,确保企业的系统总是可用,以使得企业组织保持竞争优势,进而保持和提升用户满意度是至关重要的.数据中心的高度可用性可以通过几种方式来实现,但最常见的和最简单的方式则是通过服务器虚拟化和故障转移群集(Failover Cluster). 服务器虚拟化为许多企业组织的IT部门带来了诸多的益处,包括允许他们将多款不同的系统整合到一台单一的主机服务器,带来了更高的资源

基于淘宝平台价值实现的“小而美”产业集群

[论道"小而美"(第三季)] 1.基于工业文明的"小而美"产业集群 (1)意大利"小而美"产业集群 意大利中小企业集群的形成有悠久的历史.从意大利中部到北部地区农村一直是小农形态,产生大量的农村剩余劳动力,农业负担较大,农民不得不从事副业生产.贵族阶层和有钱人的住房.服装.装饰品.家具.工艺品便成为农民副业生产的对象,逐渐形成了手工业传统.二战后,在意大利手工业基础上迅速发展形成了与人们生活息息相关的日用品产业集群.根据意大利统计局的评判标准,全