本示例应该能让你开始写自己的Flink程序。为了学习更多,你可以打开我们的基本概念和DataStream API的指南。如果你想学习如何构建一个Flink集群在自己机器上并将结果写入Kafka,请看接下来的激励练习。
激励练习:在一个Flink集群上运行,并将结果写入Kafka
请按照我们的快速开始里面的内容来在你的机器上构建一个Flink分布式,再参考Kafka的快速开始来安装Kafka,然后我们继续。
第一步,我们为了能使用Kafka连接器,需要添加Flink Kafka连接器的依赖。将这个添加的pom.xml文件的依赖模块中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
接下来,我们需要修改我们的程序。我们会移除print(),替换成使用Kafka 接收器。新的代码示例如下:
result
.map(new MapFunction<Tuple2<String,Long>, String>() {
@Override
public String map(Tuple2<String, Long> tuple) {
return tuple.toString();
}
})
.addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
也需要引入相关的类:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;
注意我们首先是如何使用一个MapFunction将Tuple2<String, Long>的流转换成一个字符串流的。我们就是在做这个,因为这个对写入简单的字符串到Kafka更容易。因而,我们创建了一个Kafka接收器。你肯能要适配一下你设置的主机名和端口号。”wiki-result”是运行我们程序之前我们将会创建的Kafka 流的名字。因为我们需要一个在集群上运行的jar文件,故用Maven 命令构建这个项目:
$ mvn clean package
产生的jar文件会在target的子文件夹中: target/wiki-edits-0.1.jar。我们接下来会用到这个。现在我们准备安装一个Flink集群,并在其上运行写入到Kafka的程序。到你安装的Flink目录下,开启一个本地的集群:
$ cd my/flink/directory
$ bin/start-local.sh
我们也需要创建这个Kafka Topic,以便我们的程序能写入:
$ cd my/kafka/directory
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results
现在我们准备在本地的Flink集群上运行我们的jar文件:
$ cd my/flink/directory
$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
如果一切按照计划执行,命令行输出会跟下面的相似:
03/08/2016 15:09:27 Job execution switched to status RUNNING.
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
你可以看到各个操作是如何开始运行的。只有两个操作是因为由于性能原因窗口后面的操作折叠成了一个。在Flink中,我们称这个为chaining。
你可以用Kafka 控制台消费者通过检测Kafka主题来观察程序的输出:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wiki-result
你可以查看运行在 http://localhost:8081上面的Flink仪表盘。你可以对你的集群资源和运行的任务有个整体的感知:
如果你点击运行的任务,你会看到一个可以观察单个操作的视图,例如,看到执行的元素的数量:
结束了我们的Flink之旅,如果你有如何问题,请不要犹豫在我们的Mailing Lists提问。