《Flink官方文档》Batch Examples(一)

批处理示例

下面的程序展示了从简单的单词词频统计到图算法等不同的Flink应用。代码展示了Flink数据集API的使用方法。

下面案例和更多案例的完整源码可以参见Flink源码中的flink-examples-batch和 flink-examples-streaming模块。

运行实例

为了运行Flink的例子,我们假设你拥有已经启动的Flink实例。在导航栏中的“Quickstart” 和 “Setup”介绍了启动Flink的几种不同方法。

最简单的方法是运行脚本./bin/start-local.sh,执行后一个启动本地JobManager。

每个编译好的Flink源码包含了一个实例目录,其中包括了此页面每个例子的jar包。

执行如下命令,来运行WordCount例子

./bin/flink run ./examples/batch/WordCount.jar
其他的例子都可以用类似的方式执行

如果运行例子的时候没有带参数,默认使用缺省参数。如果希望使用真实数据来运行WordCount,需要将数据的路径传递进去

./bin/flink run ./examples/batch/WordCount.jar –input /path/to/some/text/data –output /path/to/result
注意非本地文件系统需要标明数据库前缀,比如HDFS://

词频统计

单词词频统计是大数据处理系统“hello world”程序。它计算了文本中的词频。算法分成两步,第一部分,将文本分隔成不同单词。第二步,讲这些单词分组并计数。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<String> text = env.readTextFile("/path/to/file");

DataSet<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer())
        // group by the tuple field "0" and sum up tuple field "1"
        .groupBy(0)
        .sum(1);

counts.writeAsCsv(outputPath, "\n", " ");

// User-defined functions
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}

词频统计例子实现了上述描述的算法,需要两个输入参数。–input –output 。测试数据可以替换为任何文本。

val env = ExecutionEnvironment.getExecutionEnvironment

// get input data
val text = env.readTextFile("/path/to/file")

val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .groupBy(0)
  .sum(1)

counts.writeAsCsv(outputPath, "\n", " ")

Page Rank

PageRank算法计算了图中页面的重要性,一个页面到另一页面的点形成了链接,这些链接定义成图。它是迭代式的算法,意味着相同的计算会被重复执行。在每次迭代中,每个页面对它的邻居贡献出相同的评分,并接受来自它的邻居的加权评分作为新的评分。PageRank算法因google搜索引擎众所周知,它被用来计算网页搜索查询结果的评分。

这个例子中,PageRank通过一批迭代和固定次数的迭代来完成。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read the pages and initial ranks by parsing a CSV file
DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
                           .types(Long.class, Double.class)

// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);

// set iterative data set
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);

DataSet<Tuple2<Long, Double>> newRanks = iteration
        // join pages with outgoing edges and distribute rank
        .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
        // collect and sum ranks
        .groupBy(0).sum(1)
        // apply dampening factor
        .map(new Dampener(DAMPENING_FACTOR, numPages));

DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
        newRanks,
        newRanks.join(iteration).where(0).equalTo(0)
        // termination condition
        .filter(new EpsilonFilter()));

finalPageRanks.writeAsCsv(outputPath, "\n", " ");

// User-defined functions

public static final class JoinVertexWithEdgesMatch
                    implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
                                            Tuple2<Long, Double>> {

    @Override
    public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
                        Collector<Tuple2<Long, Double>> out) {
        Long[] neighbors = adj.f1;
        double rank = page.f1;
        double rankToDistribute = rank / ((double) neigbors.length);

        for (int i = 0; i < neighbors.length; i++) {
            out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
        }
    }
}

public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
    private final double dampening, randomJump;

    public Dampener(double dampening, double numVertices) {
        this.dampening = dampening;
        this.randomJump = (1 - dampening) / numVertices;
    }

    @Override
    public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
        value.f1 = (value.f1 * dampening) + randomJump;
        return value;
    }
}

public static final class EpsilonFilter
                implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {

    @Override
    public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
        return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
    }
}

pagerank 程序实现了上面的例子。需要下面的运行参数–pages –links –output –numPages –iterations 。

scala

// User-defined types
case class Link(sourceId: Long, targetId: Long)
case class Page(pageId: Long, rank: Double)
case class AdjacencyList(sourceId: Long, targetIds: Array[Long])

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read the pages and initial ranks by parsing a CSV file
val pages = env.readCsvFile[Page](pagesInputPath)

// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
val links = env.readCsvFile[Link](linksInputPath)

// assign initial ranks to pages
val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))

// build adjacency list from link input
val adjacencyLists = links
  // initialize lists
  .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
  // concatenate lists
  .groupBy("sourceId").reduce {
  (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
  }

// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
  currentRanks =>
    val newRanks = currentRanks
      // distribute ranks to target pages
      .join(adjacencyLists).where("pageId").equalTo("sourceId") {
        (page, adjacent, out: Collector[Page]) =>
        for (targetId <- adjacent.targetIds) {
          out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
        }
      }
      // collect ranks and sum them up
      .groupBy("pageId").aggregate(SUM, "rank")
      // apply dampening factor
      .map { p =>
        Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
      }

    // terminate if no rank update was significant
    val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
      (current, next, out: Collector[Int]) =>
        // check for significant update
        if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
    }

    (newRanks, termination)
}

val result = finalRanks

// emit result
result.writeAsCsv(outputPath, "\n", " ")

输入文件必须是普通文本文件而且文件必须是遵循下列格式:

–Pages 用long型的ID表示,并以换行符分隔,如”1\n2\n12\n42\n63\n”体现了5个页面,id分别是1, 2, 12, 42, and 63。

–Links表示了多对pageId的组合,每对之间通过空格分隔,不同links用换行符分隔。”1 2\n2 12\n1 12\n42 63\n”表示了(1)->(2), (2)->(12), (1)->(12), and (42)->(63)四个有向链接。

为了这个简单实现至少需要每个页面至少有一个入链接和一个出链接。一个页面可以链接到他自己。

时间: 2024-07-28 14:11:08

《Flink官方文档》Batch Examples(一)的相关文章

《Apache Flink官方文档》 Apache Flink介绍

下面是关于Apache Flink(以下简称Filnk)框架和流式计算的概述.为了更专业.更技术化的介绍,在Flink文档中推荐了一些"概念性"的文章. 1.无穷数据集的持续计算 在我们详细介绍Flink前,复习一下当我们计算数据选择运算模型时,很可能会遇到的一个更高级别的数据集类型.下面有两个观点经常容易混淆,很有必要去澄清它们. (1)两种数据集类型: ①无穷数据集:无穷的持续集成的数据集合. ②有界数据集:有限不会改变的数据集合. 很多现实中传统地认为有界或者批量的数据集合实际上

《Flink官方文档》Batch Examples(二)

连通分支 连通分支算法识别会一个更大的图,这部分图通过被相同的组件ID链接的所有顶点连接.类似PageRank,连通组件是一个迭代算法.在每个步骤中,每个顶点都将其当前组件ID传给所有邻居.如果小于自己的组件ID,一个顶点从邻居接受组件ID. 此实现使用增量迭代:组件ID未变化的顶点不参与下一步骤.因为后来的迭代通常只处理一些离群顶点,这将产生更好的性能. // read vertex and edge data DataSet<Long> vertices = getVertexDataSe

《Flink官方文档》Python 编程指南测试版(二)

为元组定义keys 最简单的情形是对一个数据集中的元组按照一个或多个域进行分组: reduced = data \ .group_by(0) \ .reduce_group(<do something>) 数据集中的元组被按照第一个域分组.对于接下来的group-reduce函数,输入的数据组中,每个元组的第一个域都有相同的值. grouped = data \ .group_by(0,1) \ .reduce(/*do something*/) 在上面的例子中,数据集的分组基于第一个和第二个

《Flink官方文档》Python 编程指南测试版(一)

Flink中的分析程序实现了对数据集的某些操作 (例如,数据过滤,映射,合并,分组).这些数据最初来源于特定的数据源(例如来自于读文件或数据集合).操作执行的结果通过数据池以写入数据到(分布式)文件系统或标准输出(例如命令行终端)的形式返回.Flink程序可以运行在不同的环境中,既能够独立运行,也可以嵌入到其他程序中运行.程序可以运行在本地的JVM上,也可以运行在服务器集群中. 为了创建你自己的Flink程序,我们鼓励你从program skeleton(程序框架)开始,并逐渐增加你自己的tra

《Apache Flink 官方文档》前言

本文档针对的是Apache Flink的 1.2.0版本. Apache Flink是一个分布式流式和批量数据处理程序的开源平台.Flink的核心是流式数据引擎,Flink通过数据流的分布式计算的方式提供数据的分发.通信和容错.Flink也构建了流引擎之上的批处理,覆盖本地迭代上的支持,内存管理和程序优化. 1.第一步 基本概念:先从Flink的数据流程序模型和分布式实时环境的基本概念开始.这会帮助你完全理解文档的其他部分,包括安装和编程指南.强烈推荐先阅读这部分内容. 快速开始:在你的本机上运

《Flink官方文档》监控Wikipedia 编辑流(二)

本示例应该能让你开始写自己的Flink程序.为了学习更多,你可以打开我们的基本概念和DataStream API的指南.如果你想学习如何构建一个Flink集群在自己机器上并将结果写入Kafka,请看接下来的激励练习. 激励练习:在一个Flink集群上运行,并将结果写入Kafka 请按照我们的快速开始里面的内容来在你的机器上构建一个Flink分布式,再参考Kafka的快速开始来安装Kafka,然后我们继续. 第一步,我们为了能使用Kafka连接器,需要添加Flink Kafka连接器的依赖.将这个

《Flink官方文档》示例总览

示例 Java 的示例项目和Scala 的示例项目指导了构建Maven和SBT项目,并包含了一个单词计数程序的简单实现. 监控Wikipedia编辑是一个更复杂的流式分析应用 用 Apache Flink.Elasticsearch和 Kibana 构建实时面板应用是发布在elastic.co上的一个博客,展示了如何用 Apache Flink.Elasticsearch和 Kibana去构建实时面板来解决流数据分析. 捆绑示例 Flink 资源包含了很多流式(java/scala) 和批处理(

《Spark官方文档》Spark操作指南

原文链接   译者:小村长 Spark–Quick Start 本项目是 Apache Spark官方文档的中文翻译版,致力于打造一个全新的大数据处理平台来满足大数据处理和分析的各个使用场景,本次翻译主要针对对Spark感兴趣和致力于从事大数据方法开发的人员提供有价值的中文资料,希望能够对大家的工作和学习有所帮助. Spark最近几年在国内外都比较火,在淘宝.百度.腾讯.高伟达等一些公司有比较成熟的应用,做大数据方面的开发人员或多或少都与其有接触.Spark的中文资料相对前几年相对较多,但是我认

《Log4j 2 官方文档》多余性(Additivity)

如果我们希望输出com.foo.Bar的TRACE等级的日志,而不像影响其他日志的输出.简单的改变日志等级是不能达到我们想要的目的:但是修改也很简单,只要我们添加一个新的Logger定义就可以达到目标. <Logger name="com.foo.Bar" level="TRACE"/> <Root level="ERROR"> <AppenderRef ref="STDOUT"> <