文章
问答
冒泡
五、Flink Watermark

时间语义

  • Event Time:事件创建的时间

  • Ingestion Time:数据进入Flink的时间

  • Processing Time:执行操作算子的本地系统时间,与机器有关

Flink1.15默认的时间语义是Event Time

乱序数据的影响

当Flink以Event Time模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。但由于网络、分布式等原因,会导致乱序数据的产生,乱序数据会让窗口计算不准确。

Watermark

Watermark是一种衡量Event Time进展的机制,可以设定延迟触发;Watermark用于处理乱序事件,通常Watermark与Window结合使用。window的执行由Watermark触发。

Watermark的特点

  • watermark是一条特殊的数据记录
  • watermark必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退
  • watermark与数据的时间戳相关

Watermark的生成

  • Flink使用TimestampAssigner获取对应事件中对应事件时间的字段。
  • Flink使用WatermarkGenerator来生成Watermark。

例子:

package com.justin.watermark;

import com.justin.model.SensorReading;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;

/**
 * 有界乱序场景下的使用
 * nc -lk 7777
 */
public class BoundedOutOfOrdernessWatermarkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host", "192.168.10.7");
        int port = parameterTool.getInt("port", 7777);

        DataStream<String> inputStream = env.socketTextStream(host, port);

        DataStream<SensorReading> dataStream = inputStream.map(i -> {
            String[] arr = i.split(", ");
            return SensorReading.builder()
                    .id(arr[0])
                    .timestamp(Long.valueOf(arr[1]))
                    .temperature(Double.valueOf(arr[2]))
                    .build();
        }).assignTimestampsAndWatermarks(WatermarkStrategy
                //有界乱序,入参是有界乱序情况下,窗口等待的最大时长
                .<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
                .withTimestampAssigner((element, recordTimestamp) -> {
                    return element.getTimestamp() * 1000L;
                })
                .withIdleness(Duration.ofSeconds(3L)));

        DataStream<SensorReading> resultStream = dataStream.keyBy(SensorReading::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5L)))
                .reduce((v1, v2) -> {
                    return SensorReading.builder()
                            .id(v1.getId())
                            .timestamp(v2.getTimestamp())
                            .temperature(Math.min(v1.getTemperature(), v2.getTemperature()))
                            .build();
                });

        resultStream.print();

        env.execute();
    }
}
/**
 * 单调递增场景下的使用
 * nc -lk 7777
 */
public class MonotonousTimestampsWatermarkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host", "192.168.10.7");
        int port = parameterTool.getInt("port", 7777);

        DataStream<String> inputStream = env.socketTextStream(host, port);

        DataStream<SensorReading> dataStream = inputStream.map(i -> {
            String[] arr = i.split(", ");
            return SensorReading.builder()
                    .id(arr[0])
                    .timestamp(Long.valueOf(arr[1]))
                    .temperature(Double.valueOf(arr[2]))
                    .build();
        }).assignTimestampsAndWatermarks(WatermarkStrategy
                //单调递增
                .<SensorReading>forMonotonousTimestamps()
                .withTimestampAssigner((element, recordTimestamp) -> {
                    return element.getTimestamp() * 1000L;
                })
                .withIdleness(Duration.ofSeconds(3L)));

        DataStream<SensorReading> resultStream = dataStream.keyBy(SensorReading::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5L)))
                .reduce((v1, v2) -> {
                    return SensorReading.builder()
                            .id(v1.getId())
                            .timestamp(v2.getTimestamp())
                            .temperature(Math.min(v1.getTemperature(), v2.getTemperature()))
                            .build();
                });

        resultStream.print();

        env.execute();
    }
}

TimestampAssigner用来提取时间字段。

@Public
@FunctionalInterface
public interface TimestampAssigner<T> {

    /**
     * The value that is passed to {@link #extractTimestamp} when there is no previous timestamp
     * attached to the record.
     */
    long NO_TIMESTAMP = Long.MIN_VALUE;

    /**
     * Assigns a timestamp to an element, in milliseconds since the Epoch. This is independent of
     * any particular time zone or calendar.
     *
     * <p>The method is passed the previously assigned timestamp of the element. That previous
     * timestamp may have been assigned from a previous assigner. If the element did not carry a
     * timestamp before, this value is {@link #NO_TIMESTAMP} (= {@code Long.MIN_VALUE}: {@value
     * Long#MIN_VALUE}).
     *
     * @param element The element that the timestamp will be assigned to.
     * @param recordTimestamp The current internal timestamp of the element, or a negative value, if
     *     no timestamp has been assigned yet.
     * @return The new timestamp.
     */
    long extractTimestamp(T element, long recordTimestamp);
}

WatermarkGenerator用来生成Watermark

package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; /** * The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a * fixed interval). * * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code * AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}. */ @Public public interface WatermarkGenerator<T> { /** * Called for every event, allows the watermark generator to examine and remember the event * timestamps, or to emit a watermark based on the event itself. */ void onEvent(T event, long eventTimestamp, WatermarkOutput output); /** * Called periodically, and might emit a new watermark, or not. * * <p>The interval in which this method is called and Watermarks are generated depends on {@link * ExecutionConfig#getAutoWatermarkInterval()}. */ void onPeriodicEmit(WatermarkOutput output); }

Watermark的设定

  1. 如果Watermark设置的延迟太久,收到结果的速度可能就会很慢,所以解决办法是在水位线到达之前输出一个近似结果。

  2. 如果watermark到达的太早,则可能收到错误结果,Flink处理迟到数据的机制可以解决此问题。

flink
java

关于作者

justin
123456
获得点赞
文章被阅读