时间语义
-
Event Time:事件创建的时间
-
Ingestion Time:数据进入Flink的时间
-
Processing Time:执行操作算子的本地系统时间,与机器有关
Flink1.15默认的时间语义是Event Time
。
乱序数据的影响
当Flink以Event Time模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。但由于网络、分布式等原因,会导致乱序数据的产生,乱序数据会让窗口计算不准确。
Watermark
Watermark是一种衡量Event Time进展的机制,可以设定延迟触发;Watermark用于处理乱序事件,通常Watermark与Window结合使用。window的执行由Watermark触发。
Watermark的特点
- watermark是一条特殊的数据记录
- watermark必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退
- watermark与数据的时间戳相关
Watermark的生成
- Flink使用
TimestampAssigner
获取对应事件中对应事件时间的字段。 - Flink使用
WatermarkGenerator
来生成Watermark。
例子:
package com.justin.watermark;
import com.justin.model.SensorReading;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
/**
* 有界乱序场景下的使用
* nc -lk 7777
*/
public class BoundedOutOfOrdernessWatermarkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host", "192.168.10.7");
int port = parameterTool.getInt("port", 7777);
DataStream<String> inputStream = env.socketTextStream(host, port);
DataStream<SensorReading> dataStream = inputStream.map(i -> {
String[] arr = i.split(", ");
return SensorReading.builder()
.id(arr[0])
.timestamp(Long.valueOf(arr[1]))
.temperature(Double.valueOf(arr[2]))
.build();
}).assignTimestampsAndWatermarks(WatermarkStrategy
//有界乱序,入参是有界乱序情况下,窗口等待的最大时长
.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
.withTimestampAssigner((element, recordTimestamp) -> {
return element.getTimestamp() * 1000L;
})
.withIdleness(Duration.ofSeconds(3L)));
DataStream<SensorReading> resultStream = dataStream.keyBy(SensorReading::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5L)))
.reduce((v1, v2) -> {
return SensorReading.builder()
.id(v1.getId())
.timestamp(v2.getTimestamp())
.temperature(Math.min(v1.getTemperature(), v2.getTemperature()))
.build();
});
resultStream.print();
env.execute();
}
}
/**
* 单调递增场景下的使用
* nc -lk 7777
*/
public class MonotonousTimestampsWatermarkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host", "192.168.10.7");
int port = parameterTool.getInt("port", 7777);
DataStream<String> inputStream = env.socketTextStream(host, port);
DataStream<SensorReading> dataStream = inputStream.map(i -> {
String[] arr = i.split(", ");
return SensorReading.builder()
.id(arr[0])
.timestamp(Long.valueOf(arr[1]))
.temperature(Double.valueOf(arr[2]))
.build();
}).assignTimestampsAndWatermarks(WatermarkStrategy
//单调递增
.<SensorReading>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> {
return element.getTimestamp() * 1000L;
})
.withIdleness(Duration.ofSeconds(3L)));
DataStream<SensorReading> resultStream = dataStream.keyBy(SensorReading::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5L)))
.reduce((v1, v2) -> {
return SensorReading.builder()
.id(v1.getId())
.timestamp(v2.getTimestamp())
.temperature(Math.min(v1.getTemperature(), v2.getTemperature()))
.build();
});
resultStream.print();
env.execute();
}
}
TimestampAssigner
用来提取时间字段。
@Public
@FunctionalInterface
public interface TimestampAssigner<T> {
/**
* The value that is passed to {@link #extractTimestamp} when there is no previous timestamp
* attached to the record.
*/
long NO_TIMESTAMP = Long.MIN_VALUE;
/**
* Assigns a timestamp to an element, in milliseconds since the Epoch. This is independent of
* any particular time zone or calendar.
*
* <p>The method is passed the previously assigned timestamp of the element. That previous
* timestamp may have been assigned from a previous assigner. If the element did not carry a
* timestamp before, this value is {@link #NO_TIMESTAMP} (= {@code Long.MIN_VALUE}: {@value
* Long#MIN_VALUE}).
*
* @param element The element that the timestamp will be assigned to.
* @param recordTimestamp The current internal timestamp of the element, or a negative value, if
* no timestamp has been assigned yet.
* @return The new timestamp.
*/
long extractTimestamp(T element, long recordTimestamp);
}
WatermarkGenerator
用来生成Watermark
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
/**
* The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a
* fixed interval).
*
* <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code
* AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
*/
@Public
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine and remember the event
* timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated depends on {@link
* ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
Watermark的设定
-
如果Watermark设置的延迟太久,收到结果的速度可能就会很慢,所以解决办法是在水位线到达之前输出一个近似结果。
-
如果watermark到达的太早,则可能收到错误结果,Flink处理迟到数据的机制可以解决此问题。