《Flink官方文档》Batch Examples(二)

连通分支

连通分支算法识别会一个更大的图,这部分图通过被相同的组件ID链接的所有顶点连接。类似PageRank,连通组件是一个迭代算法。在每个步骤中,每个顶点都将其当前组件ID传给所有邻居。如果小于自己的组件ID,一个顶点从邻居接受组件ID。

此实现使用增量迭代:组件ID未变化的顶点不参与下一步骤。因为后来的迭代通常只处理一些离群顶点,这将产生更好的性能。

// read vertex and edge data
DataSet<Long> vertices = getVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());

// assign the initial component IDs (equal to the vertex ID)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());

// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
        verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);

// apply the step logic:
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
        // join with the edges
        .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
        // select the minimum neighbor component ID
        .groupBy(0).aggregate(Aggregations.MIN, 1)
        // update if the component ID of the candidate is smaller
        .join(iteration.getSolutionSet()).where(0).equalTo(0)
        .flatMap(new ComponentIdFilter());

// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);

// emit result
result.writeAsCsv(outputPath, "\n", " ");

// User-defined functions

public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {

    @Override
    public Tuple2<T, T> map(T vertex) {
        return new Tuple2<T, T>(vertex, vertex);
    }
}

public static final class UndirectEdge
                    implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();

    @Override
    public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
        invertedEdge.f0 = edge.f1;
        invertedEdge.f1 = edge.f0;
        out.collect(edge);
        out.collect(invertedEdge);
    }
}

public static final class NeighborWithComponentIDJoin
                implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

    @Override
    public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
        return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
    }
}

public static final class ComponentIdFilter
                    implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
                                            Tuple2<Long, Long>> {

    @Override
    public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
                        Collector<Tuple2<Long, Long>> out) {
        if (value.f0.f1 < value.f1.f1) {
            out.collect(value.f0);
        }
    }
}

scala

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read vertex and edge data
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map { id => (id, id) }

// undirected edges by emitting for each input edge the input edges itself and an inverted
// version
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }

// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
  (s, ws) =>

    // apply the step logic: join with the edges
    val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
      (edge._2, vertex._2)
    }

    // select the minimum neighbor
    val minNeighbors = allNeighbors.groupBy(0).min(1)

    // update if the component of the candidate is smaller
    val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
      (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
        if (newVertex._2 < oldVertex._2) out.collect(newVertex)
    }

    // delta and new workset are identical
    (updatedComponents, updatedComponents)
}

verticesWithComponents.writeAsCsv(outputPath, "\n", " ")

该连通分支程序实现了上述例子。它需要运行下列参数:–vertices –edges –output –iterations 。
输入文件是纯文本文件,必须格式化如下:

–Vertices 以IDS表示的顶点,由换行字符分隔。例如“1\n2\n12\n42\n63\n”给出了五个订单(1)、(2)、(12)、(42)和(63)。

–Edges 边通过以空格分隔的两个顶点ID表示。不同边是由换行符分隔。例如“1 2\n2 12\n1 12\n42 63\n”表示了四个无方向链接(1)-(2)、(2)-(12)、(1)-(12)和(42)-(63)。

关系型查询

关系型查询示例假定会使用两张表,一张订单表,另一张是TPC-H决策支持基准测试表。TPC-H是数据库行业标准基准测试。如何生成输入数据请参见下面的说明。

该示例实现以下sql查询。
SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
FROM orders, lineitem
WHERE l_orderkey = o_orderkey
AND o_orderstatus = "F"
AND YEAR(o_orderdate) > 1993
AND o_orderpriority LIKE "5%"
GROUP BY l_orderkey, o_shippriority;

Flink程序中按照如下的方式进行sql查询

// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
// get lineitem data set: (orderkey, extendedprice)
DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);

// orders filtered by year: (orderkey, custkey)
DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
        // filter orders
        orders.filter(
            new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
                @Override
                public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
                    // status filter
                    if(!t.f1.equals(STATUS_FILTER)) {
                        return false;
                    // year filter
                    } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
                        return false;
                    // order priority filter
                    } else if(!t.f3.startsWith(OPRIO_FILTER)) {
                        return false;
                    }
                    return true;
                }
            })
        // project fields out that are no longer required
        .project(0,4).types(Integer.class, Integer.class);

// join orders with lineitems: (orderkey, shippriority, extendedprice)
DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders =
        ordersFilteredByYear.joinWithHuge(lineitems)
                            .where(0).equalTo(0)
                            .projectFirst(0,1).projectSecond(1)
                            .types(Integer.class, Integer.class, Double.class);

// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
DataSet<Tuple3<Integer, Integer, Double>> priceSums =
        // group by order and sum extendedprice
        lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);

// emit result
priceSums.writeAsCsv(outputPath);

缺少scala例子(译者注)

关系查询程序实现了上述查询。它需要以下参数运行–orders –lineitem –output 。
order和lineitem文件可以使用TPC-H基准测试套件的数据生成工具(DBGEN)生成。采取以下步骤生成需提供给flink程序输入的任意大小的数据文件。

1、下载并解压DBGEN

2、复制makefile.suite并更名为Makefile,编辑修改如下:

DATABASE = DB2
MACHINE  = LINUX
WORKLOAD = TPCH
CC       = gcc

1、使用make命令构建DBGEN

2、使用DBGEN生成lineitem和orders表。-s命令传入1,将会一个生成约1 GB的大小的数据集。

./dbgen -T o -s 1

转载自 并发编程网 - ifeve.com

时间: 2024-08-02 08:17:00

《Flink官方文档》Batch Examples(二)的相关文章

《Apache Flink官方文档》 Apache Flink介绍

下面是关于Apache Flink(以下简称Filnk)框架和流式计算的概述.为了更专业.更技术化的介绍,在Flink文档中推荐了一些"概念性"的文章. 1.无穷数据集的持续计算 在我们详细介绍Flink前,复习一下当我们计算数据选择运算模型时,很可能会遇到的一个更高级别的数据集类型.下面有两个观点经常容易混淆,很有必要去澄清它们. (1)两种数据集类型: ①无穷数据集:无穷的持续集成的数据集合. ②有界数据集:有限不会改变的数据集合. 很多现实中传统地认为有界或者批量的数据集合实际上

《Flink官方文档》Batch Examples(一)

批处理示例 下面的程序展示了从简单的单词词频统计到图算法等不同的Flink应用.代码展示了Flink数据集API的使用方法. 下面案例和更多案例的完整源码可以参见Flink源码中的flink-examples-batch和 flink-examples-streaming模块. 运行实例 为了运行Flink的例子,我们假设你拥有已经启动的Flink实例.在导航栏中的"Quickstart" 和 "Setup"介绍了启动Flink的几种不同方法. 最简单的方法是运行脚

《Flink官方文档》Python 编程指南测试版(二)

为元组定义keys 最简单的情形是对一个数据集中的元组按照一个或多个域进行分组: reduced = data \ .group_by(0) \ .reduce_group(<do something>) 数据集中的元组被按照第一个域分组.对于接下来的group-reduce函数,输入的数据组中,每个元组的第一个域都有相同的值. grouped = data \ .group_by(0,1) \ .reduce(/*do something*/) 在上面的例子中,数据集的分组基于第一个和第二个

《Flink官方文档》监控Wikipedia 编辑流(二)

本示例应该能让你开始写自己的Flink程序.为了学习更多,你可以打开我们的基本概念和DataStream API的指南.如果你想学习如何构建一个Flink集群在自己机器上并将结果写入Kafka,请看接下来的激励练习. 激励练习:在一个Flink集群上运行,并将结果写入Kafka 请按照我们的快速开始里面的内容来在你的机器上构建一个Flink分布式,再参考Kafka的快速开始来安装Kafka,然后我们继续. 第一步,我们为了能使用Kafka连接器,需要添加Flink Kafka连接器的依赖.将这个

《Flink官方文档》Python 编程指南测试版(一)

Flink中的分析程序实现了对数据集的某些操作 (例如,数据过滤,映射,合并,分组).这些数据最初来源于特定的数据源(例如来自于读文件或数据集合).操作执行的结果通过数据池以写入数据到(分布式)文件系统或标准输出(例如命令行终端)的形式返回.Flink程序可以运行在不同的环境中,既能够独立运行,也可以嵌入到其他程序中运行.程序可以运行在本地的JVM上,也可以运行在服务器集群中. 为了创建你自己的Flink程序,我们鼓励你从program skeleton(程序框架)开始,并逐渐增加你自己的tra

《Apache Flink 官方文档》前言

本文档针对的是Apache Flink的 1.2.0版本. Apache Flink是一个分布式流式和批量数据处理程序的开源平台.Flink的核心是流式数据引擎,Flink通过数据流的分布式计算的方式提供数据的分发.通信和容错.Flink也构建了流引擎之上的批处理,覆盖本地迭代上的支持,内存管理和程序优化. 1.第一步 基本概念:先从Flink的数据流程序模型和分布式实时环境的基本概念开始.这会帮助你完全理解文档的其他部分,包括安装和编程指南.强烈推荐先阅读这部分内容. 快速开始:在你的本机上运

《Flink官方文档》示例总览

示例 Java 的示例项目和Scala 的示例项目指导了构建Maven和SBT项目,并包含了一个单词计数程序的简单实现. 监控Wikipedia编辑是一个更复杂的流式分析应用 用 Apache Flink.Elasticsearch和 Kibana 构建实时面板应用是发布在elastic.co上的一个博客,展示了如何用 Apache Flink.Elasticsearch和 Kibana去构建实时面板来解决流数据分析. 捆绑示例 Flink 资源包含了很多流式(java/scala) 和批处理(

android-怎样才能看安卓官方文档?

问题描述 怎样才能看安卓官方文档? 找了一些Android Developer(应该是镜像站),其中的api都不全,起码我想找RecyclerView类就没有.看的不少博客等的地方,都说"根据官方文档的介绍",我就想问问怎样才能看到安卓官方文档,能看到所有类,接口等的定义,用法介绍,还有新推出了什么控件等等的,求大神指点指点. 另外我下个android.support.v7的jar包里,也是找不到RecyclerView的,其他一些类就只有代码,我还是想看看有介绍的那些,所以希望大神指

《Spark官方文档》Spark操作指南

原文链接   译者:小村长 Spark–Quick Start 本项目是 Apache Spark官方文档的中文翻译版,致力于打造一个全新的大数据处理平台来满足大数据处理和分析的各个使用场景,本次翻译主要针对对Spark感兴趣和致力于从事大数据方法开发的人员提供有价值的中文资料,希望能够对大家的工作和学习有所帮助. Spark最近几年在国内外都比较火,在淘宝.百度.腾讯.高伟达等一些公司有比较成熟的应用,做大数据方面的开发人员或多或少都与其有接触.Spark的中文资料相对前几年相对较多,但是我认