Window窗口概述
-
窗口即是无界流 ==>有界流的一种转换,它会按固定的大小将流数据分发到有限大小的桶(bucket)中进行分析。
-
窗口可以按指定的key进行开窗(对应的流称为键控流(keyed streams)),也可以不指定key(全部的数据)进行开窗(对应的流称为非键控流(non-keyed streams))。
-
对于键控流,允许多个任务并行的执行计算,同一个键的数据会被发送到同一个并行任务中进行处理。
-
对于非键控流,原始流不会被分割,所有窗口逻辑由单个任务执行,即并行度为1。
// Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
// Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Window Assigners(窗口分配器)
-
窗口分配器的作用主要是用来将输入的数据分配到一个或多个窗口。
-
窗口分配器主要分为滚动窗口(tumbling windows)、滑动窗口(sliding windows)、会话窗口(session windows)、全局窗口(global windows),当然,你也可以继承自类
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
来自定义一个窗口分配器。
滚动窗口(Tumbling Windows)
依据固定的窗口长度对数据进行切分,时间对齐,窗口长度固定,窗口无重叠。
使用示例展示:
val input: DataStream[T] = ...
// tumbling event-time windows
//5秒的事件时间滚动窗口
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
//5秒的处理时间滚动窗口
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
// 24小时的事件时间滚动窗口,偏移8小时,一般涉及时区的时候才会设置第二个参数
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
滑动窗口(Sliding Windows)
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。窗口长度固定,窗口可以有重叠。
使用示例展示:
val input: DataStream[T] = ...
// sliding event-time windows
// 基于事件时间的滑动窗口
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
// 基于处理时间的滑动窗口
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
// 基于处理时间的滑动窗口,偏移8小时
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
会话窗口(Session Windows)
会话窗口由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口。特别就是时间无对齐。
使用示例展示:
val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
全局窗口(Global Windows)
无窗口区分,所有的数据都被划分到同一个窗口中
使用示例展示:
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
Window Functions
窗口函数主要定义的是在开窗之后的窗口的计算逻辑。
ReduceFunction
基础的聚合函数,指定输入的两个元素(相同类型)通过计算输出一个相同类型的元素。
//演示了在窗口中,计算元祖中第二个元素的总和
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
AggregateFunction
AggregateFunction
是ReduceFunction
的增强版,增量聚合函数,AggregateFunction
定义了三个类型:输入类型(IN)、累加器类型(ACC)、输出类型(OUT)。
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
* 获取平均值:累加器用于缓存过来的记录的总和和数量,方法{@code getResult}用于计算平均值
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
ProcessWindowFunction
ProcessWindowFunction
会获取窗口中所有元素的一个迭代器。但是这个是以消耗大量的性能和资源为代价的,因为需要把窗口中的所有元素先要提前缓存起来的。
ProcessWindowFunction
类的定义如下:
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def process(
key: KEY,
context: Context,
elements: Iterable[IN],
out: Collector[OUT])
/**
* The context holding window metadata
*/
abstract class Context {
/**
* Returns the window that is being evaluated.
*/
def window: W
/**
* Returns the current processing time.
*/
def currentProcessingTime: Long
/**
* Returns the current event-time watermark.
*/
def currentWatermark: Long
/**
* State accessor for per-key and per-window state.
*/
def windowState: KeyedStateStore
/**
* State accessor for per-key global state.
*/
def globalState: KeyedStateStore
}
}
key
是通过keyBy()
配置。
ProcessWindowFunction
使用示例,计算窗口中元素总和。
val input: DataStream[(String, Long)] = ...
input
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
ProcessWindowFunction with Incremental Aggregation
ProcessWindowFunction
可以与增量聚合函数(ReduceFunction
、AggregateFunction
)结合使用,当窗口结束的时候,就返回一个聚合的结果。
Incremental Window Aggregation with ReduceFunction
示例:计算窗口内的最小温度
package com.wy.wc.apitest
case class SensorReading(
id: String,
timestamp: Long,
temperature: Double
)
package com.wy.wc.apitest.window
import com.wy.wc.apitest.SensorReading
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.time.Duration
object ProcessWindowFunctionAndReduceFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val parameterTool = ParameterTool.fromArgs(args);
val host = parameterTool.get("host", "192.168.10.7");
val port = parameterTool.getInt("port", 7777);
val inputStream = env.socketTextStream(host, port);
val dataStream = inputStream.map(i => {
val arr = i.split(", ")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}).assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(2L))
.withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = element.timestamp * 1000L
})
.withIdleness(Duration.ofSeconds(3L)))
val resultStream = dataStream
.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(5L)))
.reduce(new MyReduceFunction(), new MyProcessWindowFunction())
resultStream.print("minTemp")
env.execute()
}
class MyReduceFunction extends ReduceFunction[SensorReading] {
override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {
if (value1.temperature > value2.temperature) value2 else value1
}
}
class MyProcessWindowFunction extends ProcessWindowFunction[SensorReading, (Long, SensorReading), String, TimeWindow] {
def process(key: String,
context: Context,
elements: Iterable[SensorReading],
out: Collector[(Long, SensorReading)]): Unit = {
out.collect((context.window.getStart, elements.iterator.next()))
}
}
}
Incremental Window Aggregation with AggregateFunction
示例:计算窗口内的平均温度
package com.wy.wc.apitest
case class SensorReading(
id: String,
timestamp: Long,
temperature: Double
)
package com.wy.wc.apitest.window
import com.wy.wc.apitest.SensorReading
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.time.Duration
object ProcessWindowFunctionAndAggregateFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val parameterTool = ParameterTool.fromArgs(args);
val host = parameterTool.get("host", "192.168.10.7");
val port = parameterTool.getInt("port", 7777);
val inputStream = env.socketTextStream(host, port);
val dataStream = inputStream.map(i => {
val arr = i.split(", ")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}).assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(2L))
.withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = element.timestamp * 1000L
})
.withIdleness(Duration.ofSeconds(3L)))
val resultStream = dataStream
.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(5L)))
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
resultStream.print("avgTemp")
env.execute()
}
class MyAggregateFunction extends AggregateFunction[SensorReading, (Double, Long), Double] {
override def createAccumulator(): (Double, Long) = (0D, 0L)
override def add(value: SensorReading, accumulator: (Double, Long)): (Double, Long) = {
(accumulator._1 + value.temperature, accumulator._2 + 1)
}
override def getResult(accumulator: (Double, Long)): Double = {
if (accumulator._2 != 0) {
accumulator._1 / accumulator._2
} else {
0
}
}
override def merge(a: (Double, Long), b: (Double, Long)): (Double, Long) = {
(a._1 + b._1, a._2 + b._2)
}
}
class MyProcessWindowFunction extends ProcessWindowFunction[Double, (Long, Double), String, TimeWindow] {
def process(key: String,
context: Context,
elements: Iterable[Double],
out: Collector[(Long, Double)]): Unit = {
out.collect((context.window.getStart, elements.iterator.next()))
}
}
}
Triggers(触发器)
定义window什么时候关闭,触发计算并输出结果
Evictors(移除器)
定义移除某些数据的逻辑
Allowed Lateness(延迟处理)
允许处理迟到的数据。配合.sideOutputLateData()
将迟到的数据放入侧输出流中。
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)