Apache Beam WordCount编程实战及源码解读

概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上。完整项目Github源码

负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来。

1.Apache Beam编程实战–前言,Apache Beam的特点与关键概念。

Apache Beam 于2017年1月10日成为Apache新的顶级项目。

1.1.Apache Beam 特点:

  • 统一:对于批处理和流媒体用例使用单个编程模型。
  • 方便:支持多个pipelines环境运行,包括:Apache Apex, Apache Flink, Apache Spark, 和 Google Cloud Dataflow。
  • 可扩展:编写和分享新的SDKs,IO连接器和transformation库
    部分翻译摘自官网:Apacher Beam 官网

1.2.Apache Beam关键概念:

1.2.1.Apache Beam SDKs

主要是开发API,为批处理和流处理提供统一的编程模型。目前(2017)支持JAVA语言,而Python正在紧张开发中。

1.2.2. Apache Beam Pipeline Runners(Beam的执行器/执行者们),支持Apache Apex,Apache Flink,Apache Spark,Google Cloud Dataflow多个大数据计算框架。可谓是一处Apache Beam编程,多计算框架运行。

1.2.3. 他们的对如下的支持情况详见

2.Apache Beam编程实战–Apache Beam源码解读

基于maven,intellij IDEA,pom.xm查看 完整项目Github源码 。直接通过IDEA的项目导入功能即可导入完整项目,等待MAVEN下载依赖包,然后按照如下解读步骤即可顺利运行。

2.1.源码解析-Apache Beam 数据流处理原理解析:

关键步骤:

  • 创建Pipeline
  • 将转换应用于Pipeline
  • 读取输入文件
  • 应用ParDo转换
  • 应用SDK提供的转换(例如:Count)
  • 写出输出
  • 运行Pipeline

2.2.源码解析,完整项目Github源码,附WordCount,pom.xml等

/**
 * MIT.
 * Author: wangxiaolei(王小雷).
 * Date:17-2-20.
 * Project:ApacheBeamWordCount.
 */

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

public class WordCount {

    /**
     *1.a.通过Dofn编程Pipeline使得代码很简洁。b.对输入的文本做单词划分,输出。
     */
    static class ExtractWordsFn extends DoFn<String, String> {
        private final Aggregator<Long, Long> emptyLines =
                createAggregator("emptyLines", Sum.ofLongs());

        @ProcessElement
        public void processElement(ProcessContext c) {
            if (c.element().trim().isEmpty()) {
                emptyLines.addValue(1L);
            }

            // 将文本行划分为单词
            String[] words = c.element().split("[^a-zA-Z']+");
            // 输出PCollection中的单词
            for (String word : words) {
                if (!word.isEmpty()) {
                    c.output(word);
                }
            }
        }
    }

    /**
     *2.格式化输入的文本数据,将转换单词为并计数的打印字符串。
     */
    public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
        @Override
        public String apply(KV<String, Long> input) {
            return input.getKey() + ": " + input.getValue();
        }
    }
    /**
     *3.单词计数,PTransform(PCollection Transform)将PCollection的文本行转换成格式化的可计数单词。
     */
    public static class CountWords extends PTransform<PCollection<String>,
            PCollection<KV<String, Long>>> {
        @Override
        public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

            // 将文本行转换成单个单词
            PCollection<String> words = lines.apply(
                    ParDo.of(new ExtractWordsFn()));

            // 计算每个单词次数
            PCollection<KV<String, Long>> wordCounts =
                    words.apply(Count.<String>perElement());

            return wordCounts;
        }
    }

    /**
     *4.可以自定义一些选项(Options),比如文件输入输出路径
     */
    public interface WordCountOptions extends PipelineOptions {

        /**
         * 文件输入选项,可以通过命令行传入路径参数,路径默认为gs://apache-beam-samples/shakespeare/kinglear.txt
         */
        @Description("Path of the file to read from")
        @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
        String getInputFile();
        void setInputFile(String value);

        /**
         * 设置结果文件输出路径,在intellij IDEA的运行设置选项中或者在命令行中指定输出文件路径,如./pom.xml
         */
        @Description("Path of the file to write to")
        @Required
        String getOutput();
        void setOutput(String value);
    }
    /**
     * 5.运行程序
     */
    public static void main(String[] args) {
        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(WordCountOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
                .apply(new CountWords())
                .apply(MapElements.via(new FormatAsTextFn()))
                .apply("WriteCounts", TextIO.Write.to(options.getOutput()));

        p.run().waitUntilFinish();
    }
}

3.支持Spark,Flink,Apex等大数据数据框架来运行该WordCount程序。完整项目Github源码(推荐,注意pom.xml模块加载是否成功,在工具中开发大数据程序,利于调试,开发体验较好)

3.1.intellij IDEA(社区版)中Spark大数据框架运行Pipeline计算程序

  • Spark运行

    • 设置VM options

      -DPapex-runner
    • 设置Programe arguments
      --inputFile=pom.xml --output=counts

3.2.intellij IDEA(社区版)中Apex,Flink等支持的大数据框架均可运行WordCount的Pipeline计算程序,完整项目Github源码

  • Apex运行

    • 设置VM options

      -DPapex-runner
    • 设置Programe arguments
      --inputFile=pom.xml --output=counts
  • Flink运行等等
    • 设置VM options

      -DPflink-runner
    • 设置Programe arguments
      --inputFile=pom.xml --output=counts

4.终端运行(Terminal)(不推荐,第一次下载过程很慢,开发体验较差)

4.1.以下命令是下载官方示例源码,第一次运行下载较慢,如果失败了就多运行几次,(推荐下载,完整项目Github源码)直接用上述解读在intellij IDEA中运行。

mvn archetype:generate       -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots       -DarchetypeGroupId=org.apache.beam       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples       -DarchetypeVersion=LATEST       -DgroupId=org.example       -DartifactId=word-count-beam       -Dversion="0.1"       -Dpackage=org.apache.beam.examples       -DinteractiveMode=false

4.2.打包并运行

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner

4.3.成功运行结果

4.3.1.显示运行成功

4.3.2.WordCount输出计算结果

时间: 2024-10-11 22:24:48

Apache Beam WordCount编程实战及源码解读的相关文章

Apache OFbiz entity engine源码解读

简介 最近一直在看Apache OFbiz entity engine的源码.为了能够更透彻得理解,也因为之前没有看人别人写过分析它的文章,所以决定自己来写一篇. 首先,我提出一个问题,如果你有兴趣可以想一下它的答案: JDBC真的给数据访问提供了足够的抽象,以至于你可以在多个支持jdbc访问的数据库之间任意切换而完全不需要担心你的数据访问代码吗? 我曾经在微博上有过关于该问题的思考: 其实这个感慨正是来自于我之前在看的一篇关于jdbc的文章,里面提到了jdbc中的一些设计模式(工厂方法),提供

求助:急需一份《Visual C++.NET多媒体编程》的源码

问题描述 本人急需一份<VisualC++.NET多媒体编程>的源码?能帮帮忙么,在网上苦熬两天了,找不到,想买书得到那光盘,可现在书也没处买,实在需要,兄弟帮个忙吧!大恩不言谢.白客同志介绍过来地.我在技术交流1群.qq:30363882camerabuff@yahoo.cncsdn昵称:qbdpqbdp先谢了,在线等. 解决方案 解决方案二:不明LZ在说什么解决方案三:都是很好的建议!值得学习

基于Docker的TensorFlow机器学习框架搭建和实例源码解读

概述:基于Docker的TensorFlow机器学习框架搭建和实例源码解读,TensorFlow作为最火热的机器学习框架之一,Docker是的容器,可以很好的结合起来,为机器学习或者科研人员提供便捷的机器学习开发环境,探索人工智能的奥秘,容器随开随用方便快捷.源码解析TensorFlow容器创建和示例程序运行,为热爱机器学者降低学习难度. 默认机器已经装好了Docker(Docker安装和使用可以看我另一篇博文:Ubuntu16.04安装Docker1.12+开发实例+hello world+w

Spark jdbc postgresql数据库连接和写入操作源码解读

概述:Spark postgresql jdbc 数据库连接和写入操作源码解读,详细记录了SparkSQL对数据库的操作,通过java程序,在本地开发和运行.整体为,Spark建立数据库连接,读取数据,将DataFrame数据写入另一个数据库表中.附带完整项目源码(完整项目源码github). 1.首先在postgreSQL中创建一张测试表,并插入数据.(完整项目源码Github) 1.1. 在postgreSQL中的postgres用户下,创建 products CREATE TABLE pr

SpringMVC源码解读之HandlerMapping_java

概述 对于Web开发者,MVC模型是大家再熟悉不过的了,SpringMVC中,满足条件的请求进入到负责请求分发的DispatcherServlet,DispatcherServlet根据请求url到控制器的映射(HandlerMapping中保存),HandlerMapping最终返回HandlerExecutionChain,其中包含了具体的处理对象handler(也即我们编程时写的controller)以及一系列的拦截器interceptors,此时DispatcherServlet会根据返

jQuery源码解读之removeAttr()方法分析

 这篇文章主要介绍了jQuery源码解读之removeAttr()方法分析,较为详细的分析了removeAttr方法的实现技巧,非常具有实用价值,需要的朋友可以参考下     本文较为详细的分析了jQuery源码解读之removeAttr()方法.分享给大家供大家参考.具体分析如下: 扩展jQuery原型对象的方法: 代码如下: jQuery.fn.extend({ //name,传入要DOM元素要移除的属性名. removeAttr: function( name ) {   //使用jQue

jQuery源码解读之hasClass()方法分析

 这篇文章主要介绍了jQuery源码解读之hasClass()方法,以注释形式较为详细的分析了hasClass()方法的实现技巧,具有一定参考借鉴价值,需要的朋友可以参考下     本文较为详细的分析了jQuery源码解读之hasClass()方法.分享给大家供大家参考.具体分析如下:   代码如下: jQuery.fn.extend({ hasClass: function( selector ) { //将要检查的类名selector赋值给className, l为选择器选择的当前要检查的j

jQuery源码解读之addClass()方法分析

 这篇文章主要介绍了jQuery源码解读之addClass()方法,注释形式较为详细的分析了addClass()方法的实现技巧与相关注意事项,具有一定参考借鉴价值,需要的朋友可以参考下     本文较为详细的分析了jQuery源码解读之addClass()方法.分享给大家供大家参考.具体分析如下: 给jQuery原型对象扩展addClass功能,jQuery.fn就是jQuery.prototype 代码如下: jQuery.fn.extend({ /* 可以看出这是一个函数名叫addClass

jQuery源码解读之removeClass()方法分析

 这篇文章主要介绍了jQuery源码解读之removeClass()方法,以注释形式较为详细的分析了removeClass()方法的实现技巧与使用注意事项,需要的朋友可以参考下     本文较为详细的分析了jQuery源码解读之removeClass()方法.分享给大家供大家参考.具体分析如下: removeClass()方法和addClass()差别不大.这就来看看: 代码如下: jQuery.fn.extend({ removeClass: function( value ) { var c