Flink 案例整合

1.概述

  Flink 1.1.0 版本已经在官方发布了,官方博客于 2016-08-08 更新了 Flink 1.1.0 的变动。在这 Flink 版本的发布,添加了 SQL 语法这一特性。这对于业务场景复杂,依赖于 SQL 来分析统计数据,算得上是一个不错的福利。加上之前有同学和朋友邮件中提到,Flink 官方给的示例运行有困难,能否整合一下 Flink 的案例。笔者通过本篇博客来解答一下相关疑问。

2.内容

2.1 集群部署

  首先,集群的部署需要 JDK 环境。下载 JDK 以及配置 JAVA_HOME 环境,这里就不详述了,比较简单。然后,我们去下载 Flink 1.1.0 的安装包,进入到下载页面,如下图所示:

  这里需要注意的是,Flink 集群的部署,本身不依赖 Hadoop 集群,如果用到 HDFS 或是 HBase 中的存储数据,就需要选择对应的 Hadoop 版本。大家可以根据 Hadoop 集群的版本,选择相应的 Flink 版本下载。

  下载好 Flink 1.1.0 后,按以下步骤进行:

  • 解压 Flink 安装包到 Master 节点

tar xzf flink-*.tgz
cd flink-*
  • 配置 Master 和 Slaves

vi $FLINK_HOME/conf/master
vi $FLINK_HOME/conf/slaves
  • 分发

scp -r flink-1.1.0 hadoop@dn2:/opt/soft/flink
scp -r flink-1.1.0 hadoop@dn3:/opt/soft/flink

  这里只用了2个 slave 节点。另外,在 flink-conf.yaml 文件中,可以按需配置,较为简单。就不多赘述了。

  • 启动集群

bin/start-cluster.sh

  注意,这里没有使用 YARN 来启动集群,若是需要使用 YARN 启动集群,可以参考官方文档进行启动。地址

  Flink 集群启动后,系统有一个 WebUI 监控界面,如下图所示:

2.2 案例

  这里,我们使用 Flink SQL 的 API 来运行一个场景,对一个销售表做一个聚合计算。这里,笔者将实现代码进行了分解,首先是获取操作 Flink 系统的对象,如下所示:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

  接着是读取数据源,并注册为表,如下所示:

CsvTableSource csvTableSource = new CsvTableSource(inPath, new String[] { "trans_id", "part_dt", "lstg_format_name", "leaf_categ_id", "lstg_site_id", "slr_segment_cd", "price", "item_count", "seller_id" },
                    new TypeInformation<?>[] { Types.LONG(), Types.STRING(), Types.STRING(), Types.LONG(), Types.INT(), Types.INT(), Types.FLOAT(), Types.LONG(), Types.LONG() });
tableEnv.registerTableSource("user", csvTableSource);
Table tab = tableEnv.scan("user");

  这里 inPath 使用了 HDFS 上的数据路径。类型可以在 Hive 中使用 desc 命令查看该表的类型。然后,将“表”转化为数据集,如下所示:

DataSet<KylinSalesDomain> ds = tableEnv.toDataSet(tab, KylinSalesDomain.class);

tableEnv.registerDataSet("user2", ds, "trans_id,part_dt,lstg_format_name,leaf_categ_id,lstg_site_id,slr_segment_cd,price,item_count,seller_id");

Table result = tableEnv.sql("SELECT lstg_format_name as username,SUM(FLOOR(price)) as total FROM user2 group by lstg_format_name");

  最后,对结果进行存储,这里笔者将结果存在了 HDFS 上。如下所示:

TableSink<?> sink = new CsvTableSink(outPath, "|");

result.writeToSink(sink);

env.setParallelism(1);
env.execute("Flink Sales SUM");

  注意,这里并发数是可以设置的,通过 setParallelism 方法来设置并发数。

  完整示例,如下所示:

try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

            CsvTableSource csvTableSource = new CsvTableSource(args[0], new String[] { "trans_id", "part_dt", "lstg_format_name", "leaf_categ_id", "lstg_site_id", "slr_segment_cd", "price", "item_count", "seller_id" },
                    new TypeInformation<?>[] { Types.LONG(), Types.STRING(), Types.STRING(), Types.LONG(), Types.INT(), Types.INT(), Types.FLOAT(), Types.LONG(), Types.LONG() });
            tableEnv.registerTableSource("user", csvTableSource);
            Table tab = tableEnv.scan("user");

            DataSet<KylinSalesDomain> ds = tableEnv.toDataSet(tab, KylinSalesDomain.class);

            tableEnv.registerDataSet("user2", ds, "trans_id,part_dt,lstg_format_name,leaf_categ_id,lstg_site_id,slr_segment_cd,price,item_count,seller_id");

            Table result = tableEnv.sql("SELECT lstg_format_name as username,SUM(FLOOR(price)) as total FROM user2 group by lstg_format_name");

            TableSink<?> sink = new CsvTableSink(args[1], "|");
            // write the result Table to the TableSink
            result.writeToSink(sink);

            // execute the program
            env.setParallelism(1);
            env.execute("Flink Sales SUM");
        } catch (Exception e) {
            e.printStackTrace();
        }

  最后,我们将应用提交到 Flink 集群。如下所示:

flink run flink_sales_sum.jar hdfs://master:8020/user/hive/warehouse/kylin_sales/DEFAULT.KYLIN_SALES.csv hdfs://master:8020/tmp/result3

3.Hive 对比

  同样的语句,在 Hive 下运行之后,与在 Flink 集群下运行之后,结果如下所示:

  • Hive 运行结果:

  • Flink 运行结果:

 

  通过 WebUI 监控界面观察,任务在 Flink 集群中运行所花费的时间在 2s 以内。其运行速度是比较具有诱惑力的。

4.总结

  总体来说,Flink 集群的部署较为简单,其 SQL 的 API 编写需要对官方的文档比较熟悉,需要注意的是,在本地运行 Flink 代码,若是要读取远程 HDFS 文件,那么获取 Flink 对象操作环境,需要采用远程接口(HOST & PORT),或者在本地部署一个开发集群环境,将远程数据源提交到本地 Flink 集群环境运行。若是,读取本地文件,则不需要。其中的原因是当你以集群的方式运行,Flink 会检查本地是否有 Flink 集群环境存在,如若不存在,则会出现远程数据源(如:HDFS 路径地址无法解析等错误)。

5.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

时间: 2024-09-13 04:56:01

Flink 案例整合的相关文章

【资料合集】Apache Flink 精选PDF下载

Apache Flink是一款分布式.高性能的开源流式处理框架,在2015年1月12日,Apache Flink正式成为Apache顶级项目.目前Flink在阿里巴巴.Bouygues Teleccom.Capital One等公司得到应用,如阿里巴巴对Apache Flink的应用案例. 为了更好地让大家了解和使用Apache Flink,我们收集了25+个Flink相关的演讲PDF(资料来自Apache Flink官网推荐)和相关文章,供大家参考. PDF下载 Robert Metzger:

汽车点评买车惠获微营销精品案例银奖

"小而美.微而智"2013中国微营销盛典于3721.html">2014年1月10日在北京北方佳苑饭店隆重举行.此次微营销盛典通过寻找影响2013年的微营销精品案例,为有创新力为营销做出突出贡献的企业颁发一系列微博类.微信类.整合类.微电影类.工具类等微营销奖项.汽车点评(xgo.com.cn)买车惠作为汽车电商化产品,凭借其2013全年优异的表现,斩获微营销精品案例整合类银奖,成为唯一汽车电商类获奖产品. 微营销是在内容化.视频化.社会化.移动化趋势下应运而生的新型营

SpringBoot集成Redis来实现缓存技术方案

概述 在我们的日常项目开发过程中缓存是无处不在的,因为它可以极大的提高系统的访问速度,关于缓存的框架也种类繁多,今天主要介绍的是使用现在非常流行的NoSQL数据库(Redis)来实现我们的缓存需求. Redis简介 Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库.缓存和消息中间件,Redis 的优势包括它的速度.支持丰富的数据类型.操作原子性,以及它的通用性. 案例整合 本案例是在之前一篇SpringBoot+ Mybatis + RESTful的基础上来集成

GDS2017全球域名峰会在厦举行,“信站大全”在这里居然……

7月7日至9日,全球域名界的目光都聚焦在厦门.域名业界最具影响力的会议--GDS2017全球域名峰会在厦门举行,来自全球近百家域名注册局.域名注册管理机构.域名注册交易服务商以及域名投资人.企业代表等近千人齐聚一堂,畅谈域名发展.       行业领军企业共商域名发展   据介绍,厦门是业界公认的"域名之都".这是GDS全球域名峰会首次在厦举行.本次峰会由Verisign.商务中国.百度云.阿里云.Godaddy.易名中国等域名行业领军企业.域名行业领袖.业界专家共同参与举办.峰会旨在

阿里云合作伙伴媒介匣获千万级Pre-A轮融资 竞逐企业营销服务市场

2017年10月16日,媒介匣宣布完成Per-A轮千万级融资,这是继与阿里云合作之后,媒介匣再一次迎来战略升级,投资方为华滨创投.本轮融资将主要用于市场和资源的拓展. 6月22日媒介匣与阿里云达成合作,正式登陆阿里云市场,为用户提供全面.专业的营销推广服务,助力中小企业品牌推广. 阿里云市场是一个开放.平等.价格透明的一站式服务平台,中小企业可以在上面找到所需的各类企业应用和服务,并通过线上的方式实现快速的交易与交付,目前媒介匣在阿里云云市场上架了品牌SEO优化.新闻营销.舆情监测.软文撰写.百

谁帮我调试下jsp源代码,源代码出错了,我私信给你发过去。驱动是否注册我不会调试。

问题描述 谁帮我调试下jsp源代码,源代码出错了,我私信给你发过去.驱动是否注册我不会调试.是<实战突击:Java项目开发案例整合>第九章的例子.使用ProxoolConnectionPoolsqle=java.sql.SQLException:Nosuitabledriverfoundforproxool.net 解决方案 解决方案二:那个驱动注册的好像被注释了吧,你看的是不是企业门户新闻网srccomwsyConnsqlserver.java我看这个源码里那一段被注释了/*try{cn=D

Flink流处理之迭代案例

当前Flink将迭代的重心集中在批处理上,之前我们谈及了批量迭代和增量迭代主要是针对批处理(DataSet)API而言的,并且Flink为批处理中的迭代提供了针对性的优化.但是对于流处理(DataStream),Flink同样提供了对迭代的支持,这一节我们主要来分析流处理中的迭代,我们将会看到流处理中的迭代相较于批处理有相似之处,但差异也是十分之明显. 可迭代的流处理程序允许定义"步函数"(step function)并将其内嵌到一个可迭代的流(IterativeStream)中.因为

Spring整合Mongodb,Maven的依赖,Spring配置,MongoDB的公共操作类,使用SpringMVC的Controller进行测试并返回结果的案例

在和Spring和MongoDB进行整合的时候需要如下三个jar,分别是: spring-data-commons spring-data-mongodb mongo-java-driver 下面讲解Spring和MongoDB2.x进行整合的Spring配置(下面案例以下面的方式进行说明:) Maven的Pom文件的配置如下: <dependency> <groupId>org.springframework.data</groupId> <artifactId

Spring Boot 整合 Thymeleaf 完整 Web 案例

摘要: 原创出处:www.bysocket.com 泥瓦匠BYSocket 希望转载,保留摘要,谢谢! Thymeleaf 是一种模板语言.那模板语言或模板引擎是什么?常见的模板语言都包含以下几个概念:数据(Data).模板(Template).模板引擎(Template Engine)和结果文档(Result Documents). 数据 数据是信息的表现形式和载体,可以是符号.文字.数字.语音.图像.视频等.数据和信息是不可分离的,数据是信息的表达,信息是数据的内涵.数据本身没有意义,数据只