Flink - DataStream

先看例子,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Long, Long>> stream = env.addSource(...);
stream
    .keyBy(0)
    .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
    .reduce(new SummingReducer())
    .addSink(new SinkFunction<Tuple2<Long, Long>>() {...});

env.execute();

看出,和batch最大的不同是,这里是DataStream而不是DataSet;

/**
 * A DataStream represents a stream of elements of the same type. A DataStream
 * can be transformed into another DataStream by applying a transformation as
 * for example:
 * <ul>
 * <li>{@link DataStream#map},
 * <li>{@link DataStream#filter}, or
 * </ul>
 *
 * @param <T> The type of the elements in this Stream
 */
public class DataStream<T> {

    protected final StreamExecutionEnvironment environment;

    protected final StreamTransformation<T> transformation;

    /**
     * Create a new {@link DataStream} in the given execution environment with
     * partitioning set to forward by default.
     *
     * @param environment The StreamExecutionEnvironment
     */
    public DataStream(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
        this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
        this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");
    }    

    //DataStream上的各种操作。。。。。。
    //map,reduce,keyby......
}

DataStream的核心,即

StreamTransformation<T> transformation; 如何产生data stream

 

StreamTransformation

对于StreamTransformation,表示一个用于create dataStream的operation; 
并且不一定需要对应于一个实际的物理operation,可能只是个逻辑概念,比如下面的例子

/**
 * A {@code StreamTransformation} represents the operation that creates a
 * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every
 * {@link org.apache.flink.streaming.api.datastream.DataStream} has an underlying
 * {@code StreamTransformation} that is the origin of said DataStream.
 *
 * <p>
 * API operations such as {@link org.apache.flink.streaming.api.datastream.DataStream#map} create
 * a tree of {@code StreamTransformation}s underneath. When the stream program is to be executed this
 * graph is translated to a {@link StreamGraph} using
 * {@link org.apache.flink.streaming.api.graph.StreamGraphGenerator}.
 *
 * <p>
 * A {@code StreamTransformation} does not necessarily correspond to a physical operation
 * at runtime. Some operations are only logical concepts. Examples of this are union,
 * split/select data stream, partitioning.
 *
 * <p>
 * The following graph of {@code StreamTransformations}:
 *
 * <pre>{@code
 *   Source              Source
 *      +                   +
 *      |                   |
 *      v                   v
 *  Rebalance          HashPartition
 *      +                   +
 *      |                   |
 *      |                   |
 *      +------>Union<------+
 *                +
 *                |
 *                v
 *              Split
 *                +
 *                |
 *                v
 *              Select
 *                +
 *                v
 *               Map
 *                +
 *                |
 *                v
 *              Sink
 * }</pre>
 *
 * Would result in this graph of operations at runtime:
 *
 * <pre>{@code
 *  Source              Source
 *    +                   +
 *    |                   |
 *    |                   |
 *    +------->Map<-------+
 *              +
 *              |
 *              v
 *             Sink
 * }</pre>
 *
 * The information about partitioning, union, split/select end up being encoded in the edges
 * that connect the sources to the map operation.
 *
 * @param <T> The type of the elements that result from this {@code StreamTransformation}
 */
public abstract class StreamTransformation<T>

对于StreamTransformation只定义了output,即该transform产生的result stream 
这是抽象类无法直接用,transform产生stream的逻辑还是要封装在具体的operator中

通过下面的例子体会一下,transform和operator的区别,这里设计的有点绕

 

OneInputTransformation,在StreamTransformation基础上加上input

/**
 * This Transformation represents the application of a
 * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} to one input
 * {@link org.apache.flink.streaming.api.transformations.StreamTransformation}.
 *
 * @param <IN> The type of the elements in the nput {@code StreamTransformation}
 * @param <OUT> The type of the elements that result from this {@code OneInputTransformation}
 */
public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {

    private final StreamTransformation<IN> input;

    private final OneInputStreamOperator<IN, OUT> operator;

    private KeySelector<IN, ?> stateKeySelector;

    private TypeInformation<?> stateKeyType;
}

所以包含, 
产生input stream的StreamTransformation<IN> input 
以及通过input产生output的OneInputStreamOperator<IN, OUT> operator

同时也可以看下,

public class TwoInputTransformation<IN1, IN2, OUT> extends StreamTransformation<OUT> {

    private final StreamTransformation<IN1> input1;
    private final StreamTransformation<IN2> input2;

    private final TwoInputStreamOperator<IN1, IN2, OUT> operator;
}

 

在看下SourceTransformation和SinkTransformation的对比,

public class SourceTransformation<T> extends StreamTransformation<T> {

    private final StreamSource<T> operator;
}

public class SinkTransformation<T> extends StreamTransformation<Object> {

    private final StreamTransformation<T> input;

    private final StreamSink<T> operator;
}

比较容易理解transform的作用, 
对于source,没有input,所以没有代表input的transformation 
而对于sink,有input,但是sink的operator不是普通的streamOperator,是StreamSink,即流的终点

 

transform

这个函数的意思,用用户自定义的operator,将当前的Stream,转化为用户指定类型的Stream

/**
 * Method for passing user defined operators along with the type
 * information that will transform the DataStream.
 *
 * @param operatorName
 *            name of the operator, for logging purposes
 * @param outTypeInfo
 *            the output type of the operator
 * @param operator
 *            the object containing the transformation logic
 * @param <R>
 *            type of the return stream
 * @return the data stream constructed
 */
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

    // read the output type of the input Transform to coax out errors about MissingTypeInfo
    transformation.getOutputType();

    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<T, R>(
            this.transformation,
            operatorName,
            operator,
            outTypeInfo,
            environment.getParallelism());

    @SuppressWarnings({ "unchecked", "rawtypes" })
    SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}

所以参数为,

用户定义的: 输出的TypeInformation,以及OneInputStreamOperator

实现是,

创建OneInputTransformation,以this.transformation为input,以传入的operator为OneInputStreamOperator 
所以通过resultTransform,就会将当前的stream转换为目的流

然后又封装一个SingleOutputStreamOperator,这是什么?

/**
 * The SingleOutputStreamOperator represents a user defined transformation
 * applied on a {@link DataStream} with one predefined output type.
 *
 * @param <T> The type of the elements in this Stream
 * @param <O> Type of the operator.
 */
public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<T, O>> extends DataStream<T> {

    protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
        super(environment, transformation);
    }
}

说白了,就是封装了一下用户定义的transformation

Flink这块代码的命名有点混乱,Operator,transformation,两个概念容易混

 

上面的例子,里面keyBy(0)

会产生

KeyedStream
对于keyedStream,关键的就是
keySelector和keyType,如何产生key以及key的类型

/**
 * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
 * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
 * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
 * partitioning methods such as shuffle, forward and keyBy.
 *
 * <p>
 * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements
 * that have the same key.
 *
 * @param <T> The type of the elements in the Keyed Stream.
 * @param <KEY> The type of the key in the Keyed Stream.
 */
public class KeyedStream<T, KEY> extends DataStream<T> {

    /** The key selector that can get the key by which the stream if partitioned from the elements */
    private final KeySelector<T, KEY> keySelector;

    /** The type of the key by which the stream is partitioned */
    private final TypeInformation<KEY> keyType;
}

 
看下transform,在调用DataStream.transform的同时,设置keySelector和keyType

// ------------------------------------------------------------------------
//  basic transformations
// ------------------------------------------------------------------------

@Override
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
        TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

    SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);

    // inject the key selector and key type
    OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
    transform.setStateKeySelector(keySelector);
    transform.setStateKeyType(keyType);

    return returnStream;
}

 

KeyedStream很关键的是,作为一个到WindowedStream的过度,

所以提供一组生成Windowed的接口

// ------------------------------------------------------------------------
//  Windowing
// ------------------------------------------------------------------------

/**
 * Windows this {@code KeyedStream} into tumbling time windows.
 *
 * <p>
 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
 * set using
 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
 *
 * @param size The size of the window.
 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
    return window(TumblingTimeWindows.of(size));
}

 

WindowedStream

例子中 
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))

 

/**
 * A {@code WindowedStream} represents a data stream where elements are grouped by
 * key, and for each key, the stream of elements is split into windows based on a
 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
 * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
 *
 * <p>
 * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
 * different points for each key.
 *
 * <p>
 * If an {@link Evictor} is specified it will be used to evict elements from the window after
 * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
 * When using an evictor window performance will degrade significantly, since
 * pre-aggregation of window results cannot be used.
 *
 * <p>
 * Note that the {@code WindowedStream} is purely and API construct, during runtime
 * the {@code WindowedStream} will be collapsed together with the
 * {@code KeyedStream} and the operation over the window into one single operation.
 *
 * @param <T> The type of elements in the stream.
 * @param <K> The type of the key by which elements are grouped.
 * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
 */
public class WindowedStream<T, K, W extends Window> {

    /** The keyed data stream that is windowed by this stream */
    private final KeyedStream<T, K> input;

    /** The window assigner */
    private final WindowAssigner<? super T, W> windowAssigner;

    /** The trigger that is used for window evaluation/emission. */
    private Trigger<? super T, ? super W> trigger;

    /** The evictor that is used for evicting elements before window evaluation. */
    private Evictor<? super T, ? super W> evictor;

可以看到WindowedStream没有直接继承自DataStream

而是以,KeyedStream作为他的input

当然window所必需的,WindowAssigner,Trigger和Evictor,也是不会少

 

继续例子, .reduce(new SummingReducer())

看看windowedStream的操作,reduce

/**
 * Applies a reduce function to the window. The window function is called for each evaluation
 * of the window for each key individually. The output of the reduce function is interpreted
 * as a regular non-windowed stream.
 * <p>
 * This window will try and pre-aggregate data as much as the window policies permit. For example,
 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
 * so a few elements are stored per key (one per slide interval).
 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
 * aggregation tree.
 *
 * @param function The reduce function.
 * @return The data stream that is the result of applying the reduce function to the window.
 */
public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
    //clean the closure
    function = input.getExecutionEnvironment().clean(function);

    String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
    KeySelector<T, K> keySel = input.getKeySelector();

    OneInputStreamOperator<T, T> operator;

    boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;

    if (evictor != null) {
        operator = new EvictingWindowOperator<>(windowAssigner,
                windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                keySel,
                input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                new HeapWindowBuffer.Factory<T>(),
                new ReduceWindowFunction<K, W, T>(function),
                trigger,
                evictor).enableSetProcessingTime(setProcessingTime);

    } else {
        operator = new WindowOperator<>(windowAssigner,
                windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                keySel,
                input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                new PreAggregatingHeapWindowBuffer.Factory<>(function), //PreAggre,即不会cache真实的element,而是直接存聚合过的值,这样比较节省空间
                new ReduceWindowFunction<K, W, T>(function),
                trigger).enableSetProcessingTime(setProcessingTime);
    }

    return input.transform(opName, input.getType(), operator);
}

关键就是根据是否有Evicting,选择创建不同的WindowOperator

然后调用input.transform,将windowedStream转换成SingleOutputStream,

这里input,即是keyedStream

// ------------------------------------------------------------------------
//  basic transformations
// ------------------------------------------------------------------------

@Override
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
        TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

    SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);

    // inject the key selector and key type
    OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
    transform.setStateKeySelector(keySelector);
    transform.setStateKeyType(keyType);

    return returnStream;
}

可以看到这里的参数是OneInputStreamOperator,而WindowOperator其实是实现了该interface的,

可以看到,对于OneInputStreamOperator而言,我们只需要实现,processElement和processWatermark两个接口,侧重如何处理input element

/**
 * Interface for stream operators with one input. Use
 * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if
 * you want to implement a custom operator.
 *
 * @param <IN> The input type of the operator
 * @param <OUT> The output type of the operator
 */
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {

    /**
     * Processes one element that arrived at this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     */
    void processElement(StreamRecord<IN> element) throws Exception;

    /**
     * Processes a {@link Watermark}.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     *
     * @see org.apache.flink.streaming.api.watermark.Watermark
     */
    void processWatermark(Watermark mark) throws Exception;
}

继续调用,super.transform,即DataStream的transform

 

例子最后,

.addSink(new SinkFunction<Tuple2<Long, Long>>() {...});

实际是调用,

SingleOutputStreamOperator.addSink,即DataStream.addSink

/**
 * Adds the given sink to this DataStream. Only streams with sinks added
 * will be executed once the {@link StreamExecutionEnvironment#execute()}
 * method is called.
 *
 * @param sinkFunction
 *            The object containing the sink's invoke function.
 * @return The closed DataStream.
 */
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

    StreamSink<T> sinkOperator = new StreamSink<T>(clean(sinkFunction));

    DataStreamSink<T> sink = new DataStreamSink<T>(this, sinkOperator);

    getExecutionEnvironment().addOperator(sink.getTransformation());
    return sink;
}

 

SinkFunction结构,

public interface SinkFunction<IN> extends Function, Serializable {

    /**
     * Function for standard sink behaviour. This function is called for every record.
     *
     * @param value The input record.
     * @throws Exception
     */
    void invoke(IN value) throws Exception;
}

 

StreamSink,即是OneInputStreamOperator,所以主要是processElement接口

public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
        implements OneInputStreamOperator<IN, Object> {

    public StreamSink(SinkFunction<IN> sinkFunction) {
        super(sinkFunction);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        userFunction.invoke(element.getValue());
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        // ignore it for now, we are a sink, after all
    }
}

 

DataStreamSink,就是对SinkTransformation的封装

/**
 * A Stream Sink. This is used for emitting elements from a streaming topology.
 *
 * @param <T> The type of the elements in the Stream
 */
public class DataStreamSink<T> {

    SinkTransformation<T> transformation;

    @SuppressWarnings("unchecked")
    protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
        this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
    }
}

 

最终,

把SinkTransformation加入 List<StreamTransformation<?>> transformations

 

最后走到,env.execute();

时间: 2024-08-25 14:39:07

Flink - DataStream的相关文章

Stream Processing for Everyone with SQL and Apache Flink

Where did we come from? With the 0.9.0-milestone1 release, Apache Flink added an API to process relational data with SQL-like expressions called the Table API. The central concept of this API is a Table, a structured data set or stream on which relat

Flink运行时之生成作业图

生成作业图 在分析完了流处理程序生成的流图(StreamGraph)以及批处理程序生成的优化后的计划(OptimizedPlan)之后,下一步就是生成它们面向Flink运行时执行引擎的共同抽象--作业图(JobGraph). 什么是作业图 作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一. 相比流图(StreamGraph)以及批处理优化计划(OptimizedPlan),JobGraph发生了一些变化,已

Flink关系型API的公共部分

关系型程序的公共部分 下面的代码段展示了Table&SQL API所编写流式程序的程序模式: val env = StreamExecutionEnvironment.getExecutionEnvironment //创建TableEnvironment对象 val tableEnv = TableEnvironment.getTableEnvironment(env) //注册表 tableEnv.registerTable("table1", ...) //或者 tabl

Flink关系型API简介

在接触关系型API之前,用户通常会采用DataStream.DataSet API来编写Flink程序,它们都提供了丰富的处理能力,以DataStream为例,它有如下这些优点: 富有表现力的流处理,包括但不限于:转换数据,更新状态,定义窗口.聚合,事件时间语义,有状态且保证正确性等: 高度自定义的窗口逻辑:分配器.触发器.逐出器以及允许延迟等: 提升与外部系统连接能力的异步I/O接口: ProcessFunction给予用户访问时间戳和定时器等低层级的操作能力: 但它同时也存在一些使用壁垒导致

Flink流处理之迭代案例

当前Flink将迭代的重心集中在批处理上,之前我们谈及了批量迭代和增量迭代主要是针对批处理(DataSet)API而言的,并且Flink为批处理中的迭代提供了针对性的优化.但是对于流处理(DataStream),Flink同样提供了对迭代的支持,这一节我们主要来分析流处理中的迭代,我们将会看到流处理中的迭代相较于批处理有相似之处,但差异也是十分之明显. 可迭代的流处理程序允许定义"步函数"(step function)并将其内嵌到一个可迭代的流(IterativeStream)中.因为

Spark Streaming和Flink的Word Count对比

准备: nccat for windows/linux 都可以 通过 TCP 套接字连接,从流数据中创建了一个 Spark DStream/ Flink DataSream, 然后进行处理, 时间窗口大小为10s 因为 示例需要, 所以 需要下载一个netcat, 来构造流的输入. 代码: spark streaming package cn.kee.spark; public final class JavaNetworkWordCount { private static final Pat

Flink运行时之流处理程序生成流图

流处理程序生成流图 DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph). 什么是流图 流图(StreamGraph)是表示流处理程序拓扑的数据结构,它封装了生成作业图(JobGraph)的必要信息.它的类继承关系如下图所示: 当你基于StreamGraph的继承链向上追溯,会发现它实现了FlinkPlan接口. Flink效仿了传统的关系型数据库在执行SQL时生成执行计划并对其进行优化的思路

Flink流处理迭代之化解反馈环

我们都知道Flink在可迭代的流处理中引入了反馈边来将本次迭代的结果反馈给迭代头以进行下一次迭代,这在执行拓扑中引入了环(反馈环).Flink主要应对的执行拓扑还是有向无环图(DAG),最终它选择了将反馈环进行化解使其能够适配有向无环图的结构,而如何对反馈环进行化解是我们这一篇主要探讨的话题. 任何提交给Flink执行的程序在提交之前都必须先生成作业图,对于用DataStream API编写的流处理程序在生成作业图之前,还会先生成流图.因此,如果想化解迭代产生的反馈环其时机只能是在部署执行之前的

Flink之CEP案例分析-网络攻击检测

上一篇我们介绍了Flink CEP的API,这一篇我们将以结合一个案例来练习使用CEP的API编写应用程序,以强化对API的理解.所选取的案例是对网络遭受的潜在攻击进行检测并给出告警.当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据. 假定一家云服务提供商,有多个跨地区的数据中心,每个数据中心会定时向监控中心上报其瞬时流量. 我们将检测的结果分为三个等级: 正常:流量在预设的正常范围内: 警告:某数据中心在10