《Flink官方文档》监控Wikipedia 编辑流(二)

本示例应该能让你开始写自己的Flink程序。为了学习更多,你可以打开我们的基本概念DataStream API的指南。如果你想学习如何构建一个Flink集群在自己机器上并将结果写入Kafka,请看接下来的激励练习。

激励练习:在一个Flink集群上运行,并将结果写入Kafka

请按照我们的快速开始里面的内容来在你的机器上构建一个Flink分布式,再参考Kafka的快速开始来安装Kafka,然后我们继续。

第一步,我们为了能使用Kafka连接器,需要添加Flink Kafka连接器的依赖。将这个添加的pom.xml文件的依赖模块中:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

接下来,我们需要修改我们的程序。我们会移除print(),替换成使用Kafka 接收器。新的代码示例如下:

result
    .map(new MapFunction<Tuple2<String,Long>, String>() {
        @Override
        public String map(Tuple2<String, Long> tuple) {
            return tuple.toString();
        }
    })
    .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));

也需要引入相关的类:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;

注意我们首先是如何使用一个MapFunction将Tuple2<String, Long>的流转换成一个字符串流的。我们就是在做这个,因为这个对写入简单的字符串到Kafka更容易。因而,我们创建了一个Kafka接收器。你肯能要适配一下你设置的主机名和端口号。”wiki-result”是运行我们程序之前我们将会创建的Kafka 流的名字。因为我们需要一个在集群上运行的jar文件,故用Maven 命令构建这个项目:

$ mvn clean package

产生的jar文件会在target的子文件夹中: target/wiki-edits-0.1.jar。我们接下来会用到这个。现在我们准备安装一个Flink集群,并在其上运行写入到Kafka的程序。到你安装的Flink目录下,开启一个本地的集群:

$ cd my/flink/directory
$ bin/start-local.sh

我们也需要创建这个Kafka Topic,以便我们的程序能写入:

$ cd my/kafka/directory
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results

现在我们准备在本地的Flink集群上运行我们的jar文件:

$ cd my/flink/directory
$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar

如果一切按照计划执行,命令行输出会跟下面的相似:

03/08/2016 15:09:27 Job execution switched to status RUNNING.
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING

你可以看到各个操作是如何开始运行的。只有两个操作是因为由于性能原因窗口后面的操作折叠成了一个。在Flink中,我们称这个为chaining。

你可以用Kafka 控制台消费者通过检测Kafka主题来观察程序的输出:

bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wiki-result

你可以查看运行在 http://localhost:8081上面的Flink仪表盘。你可以对你的集群资源和运行的任务有个整体的感知:


如果你点击运行的任务,你会看到一个可以观察单个操作的视图,例如,看到执行的元素的数量:

结束了我们的Flink之旅,如果你有如何问题,请不要犹豫在我们的Mailing Lists提问。

转载自 并发编程网 - ifeve.com

时间: 2024-11-08 23:30:15

《Flink官方文档》监控Wikipedia 编辑流(二)的相关文章

《Flink官方文档》Batch Examples(二)

连通分支 连通分支算法识别会一个更大的图,这部分图通过被相同的组件ID链接的所有顶点连接.类似PageRank,连通组件是一个迭代算法.在每个步骤中,每个顶点都将其当前组件ID传给所有邻居.如果小于自己的组件ID,一个顶点从邻居接受组件ID. 此实现使用增量迭代:组件ID未变化的顶点不参与下一步骤.因为后来的迭代通常只处理一些离群顶点,这将产生更好的性能. // read vertex and edge data DataSet<Long> vertices = getVertexDataSe

《Flink官方文档》示例总览

示例 Java 的示例项目和Scala 的示例项目指导了构建Maven和SBT项目,并包含了一个单词计数程序的简单实现. 监控Wikipedia编辑是一个更复杂的流式分析应用 用 Apache Flink.Elasticsearch和 Kibana 构建实时面板应用是发布在elastic.co上的一个博客,展示了如何用 Apache Flink.Elasticsearch和 Kibana去构建实时面板来解决流数据分析. 捆绑示例 Flink 资源包含了很多流式(java/scala) 和批处理(

《Apache Flink官方文档》 Apache Flink介绍

下面是关于Apache Flink(以下简称Filnk)框架和流式计算的概述.为了更专业.更技术化的介绍,在Flink文档中推荐了一些"概念性"的文章. 1.无穷数据集的持续计算 在我们详细介绍Flink前,复习一下当我们计算数据选择运算模型时,很可能会遇到的一个更高级别的数据集类型.下面有两个观点经常容易混淆,很有必要去澄清它们. (1)两种数据集类型: ①无穷数据集:无穷的持续集成的数据集合. ②有界数据集:有限不会改变的数据集合. 很多现实中传统地认为有界或者批量的数据集合实际上

《Apache Flink 官方文档》前言

本文档针对的是Apache Flink的 1.2.0版本. Apache Flink是一个分布式流式和批量数据处理程序的开源平台.Flink的核心是流式数据引擎,Flink通过数据流的分布式计算的方式提供数据的分发.通信和容错.Flink也构建了流引擎之上的批处理,覆盖本地迭代上的支持,内存管理和程序优化. 1.第一步 基本概念:先从Flink的数据流程序模型和分布式实时环境的基本概念开始.这会帮助你完全理解文档的其他部分,包括安装和编程指南.强烈推荐先阅读这部分内容. 快速开始:在你的本机上运

《Flink官方文档》Python 编程指南测试版(一)

Flink中的分析程序实现了对数据集的某些操作 (例如,数据过滤,映射,合并,分组).这些数据最初来源于特定的数据源(例如来自于读文件或数据集合).操作执行的结果通过数据池以写入数据到(分布式)文件系统或标准输出(例如命令行终端)的形式返回.Flink程序可以运行在不同的环境中,既能够独立运行,也可以嵌入到其他程序中运行.程序可以运行在本地的JVM上,也可以运行在服务器集群中. 为了创建你自己的Flink程序,我们鼓励你从program skeleton(程序框架)开始,并逐渐增加你自己的tra

《Flink官方文档》Python 编程指南测试版(二)

为元组定义keys 最简单的情形是对一个数据集中的元组按照一个或多个域进行分组: reduced = data \ .group_by(0) \ .reduce_group(<do something>) 数据集中的元组被按照第一个域分组.对于接下来的group-reduce函数,输入的数据组中,每个元组的第一个域都有相同的值. grouped = data \ .group_by(0,1) \ .reduce(/*do something*/) 在上面的例子中,数据集的分组基于第一个和第二个

《Flink官方文档》Batch Examples(一)

批处理示例 下面的程序展示了从简单的单词词频统计到图算法等不同的Flink应用.代码展示了Flink数据集API的使用方法. 下面案例和更多案例的完整源码可以参见Flink源码中的flink-examples-batch和 flink-examples-streaming模块. 运行实例 为了运行Flink的例子,我们假设你拥有已经启动的Flink实例.在导航栏中的"Quickstart" 和 "Setup"介绍了启动Flink的几种不同方法. 最简单的方法是运行脚

《KAFKA官方文档》入门指南(二)

把功能组合起来 消息的传输,存储和流处理的组合看似不寻常却是Kafka作为流处理平台的关键. 像HDFS分布式文件系统,允许存储静态文件进行批量处理.像这样的系统允许存储和处理过去的历史数据. 传统的企业消息系统允许处理您订阅后才抵达的消息.这样的系统只能处理将来到达的数据. Kafka结合了这些功能,这种结合对Kafka作为流应用平台以及数据流处理的管道至关重要. 通过整合存储和低延迟订阅,流处理应用可以把过去和未来的数据用相同的方式处理.这样一个单独的应用程序,不但可以处理历史的,保存的数据

《Spring 5 官方文档》4. 资源(二)

4.6 资源依赖 如果bean本身将通过某种动态过程来确定和提供资源路径,那么bean可以使用ResourceLoader接口来加载资源. j假设以某种方式加载一个模板,其中需要的特定资源取决于用户的角色. 如果资源是静态的,那么完全消除ResourceLoader接口的使用是有意义的,只需让bean公开它需要的Resource属性,那么它们就会以你所期望的方式被注入. 什么使得它们轻松注入这些属性,是所有应用程序上下文注册和使用一个特殊的JavaBeans PropertyEditor,它可以