文章
问答
冒泡
四、Flink Windows

Window窗口概述

  • 窗口即是无界流 ==>有界流的一种转换,它会按固定的大小将流数据分发到有限大小的桶(bucket)中进行分析。

  • 窗口可以按指定的key进行开窗(对应的流称为键控流(keyed streams)),也可以不指定key(全部的数据)进行开窗(对应的流称为非键控流(non-keyed streams))。

  • 对于键控流,允许多个任务并行的执行计算,同一个键的数据会被发送到同一个并行任务中进行处理。

  • 对于非键控流,原始流不会被分割,所有窗口逻辑由单个任务执行,即并行度为1。

// Keyed Windows
stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"


// Non-Keyed Windows
stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

Window Assigners(窗口分配器)

  • 窗口分配器的作用主要是用来将输入的数据分配到一个或多个窗口。

  • 窗口分配器主要分为滚动窗口(tumbling windows)、滑动窗口(sliding windows)、会话窗口(session windows)、全局窗口(global windows),当然,你也可以继承自类org.apache.flink.streaming.api.windowing.assigners.WindowAssigner来自定义一个窗口分配器。

滚动窗口(Tumbling Windows)

依据固定的窗口长度对数据进行切分,时间对齐,窗口长度固定,窗口无重叠。

使用示例展示:

val input: DataStream[T] = ...

// tumbling event-time windows
//5秒的事件时间滚动窗口
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
//5秒的处理时间滚动窗口
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
// 24小时的事件时间滚动窗口,偏移8小时,一般涉及时区的时候才会设置第二个参数
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)
滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。窗口长度固定,窗口可以有重叠。

使用示例展示:

val input: DataStream[T] = ...

// sliding event-time windows
// 基于事件时间的滑动窗口
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows
// 基于处理时间的滑动窗口
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows offset by -8 hours
// 基于处理时间的滑动窗口,偏移8小时
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)
会话窗口(Session Windows)

会话窗口由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口。特别就是时间无对齐。

使用示例展示:

val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)
全局窗口(Global Windows)

无窗口区分,所有的数据都被划分到同一个窗口中

使用示例展示:

val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)

Window Functions

窗口函数主要定义的是在开窗之后的窗口的计算逻辑。

ReduceFunction

基础的聚合函数,指定输入的两个元素(相同类型)通过计算输出一个相同类型的元素。

//演示了在窗口中,计算元祖中第二个元素的总和
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
AggregateFunction

AggregateFunctionReduceFunction的增强版,增量聚合函数,AggregateFunction定义了三个类型:输入类型(IN)、累加器类型(ACC)、输出类型(OUT)。

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 * 获取平均值:累加器用于缓存过来的记录的总和和数量,方法{@code getResult}用于计算平均值
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)
ProcessWindowFunction

ProcessWindowFunction会获取窗口中所有元素的一个迭代器。但是这个是以消耗大量的性能和资源为代价的,因为需要把窗口中的所有元素先要提前缓存起来的。

ProcessWindowFunction类的定义如下:

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {

  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key      The key for which this window is evaluated.
    * @param context  The context in which the window is being evaluated.
    * @param elements The elements in the window being evaluated.
    * @param out      A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def process(
      key: KEY,
      context: Context,
      elements: Iterable[IN],
      out: Collector[OUT])

  /**
    * The context holding window metadata
    */
  abstract class Context {
    /**
      * Returns the window that is being evaluated.
      */
    def window: W

    /**
      * Returns the current processing time.
      */
    def currentProcessingTime: Long

    /**
      * Returns the current event-time watermark.
      */
    def currentWatermark: Long

    /**
      * State accessor for per-key and per-window state.
      */
    def windowState: KeyedStateStore

    /**
      * State accessor for per-key global state.
      */
    def globalState: KeyedStateStore
  }

}

key是通过keyBy()配置。

ProcessWindowFunction使用示例,计算窗口中元素总和。

val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}
ProcessWindowFunction with Incremental Aggregation

ProcessWindowFunction可以与增量聚合函数(ReduceFunctionAggregateFunction)结合使用,当窗口结束的时候,就返回一个聚合的结果。

Incremental Window Aggregation with ReduceFunction

示例:计算窗口内的最小温度

package com.wy.wc.apitest

case class SensorReading(
                          id: String,
                          timestamp: Long,
                          temperature: Double
                        )
package com.wy.wc.apitest.window

import com.wy.wc.apitest.SensorReading
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.time.Duration

object ProcessWindowFunctionAndReduceFunctionTest {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.setParallelism(1)

        val parameterTool = ParameterTool.fromArgs(args);
        val host = parameterTool.get("host", "192.168.10.7");
        val port = parameterTool.getInt("port", 7777);
        val inputStream = env.socketTextStream(host, port);

        val dataStream = inputStream.map(i => {
            val arr = i.split(", ")
            SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }).assignTimestampsAndWatermarks(WatermarkStrategy
          .forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(2L))
          .withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
              override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = element.timestamp * 1000L
          })
          .withIdleness(Duration.ofSeconds(3L)))

        val resultStream = dataStream
          .keyBy(_.id)
          .window(TumblingEventTimeWindows.of(Time.seconds(5L)))
          .reduce(new MyReduceFunction(), new MyProcessWindowFunction())

        resultStream.print("minTemp")

        env.execute()
    }

    class MyReduceFunction extends ReduceFunction[SensorReading] {
        override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {
            if (value1.temperature > value2.temperature) value2 else value1
        }
    }

    class MyProcessWindowFunction extends ProcessWindowFunction[SensorReading, (Long, SensorReading), String, TimeWindow] {
        def process(key: String,
                    context: Context,
                    elements: Iterable[SensorReading],
                    out: Collector[(Long, SensorReading)]): Unit = {
            out.collect((context.window.getStart, elements.iterator.next()))
        }
    }

}

Incremental Window Aggregation with AggregateFunction

示例:计算窗口内的平均温度

package com.wy.wc.apitest

case class SensorReading(
                          id: String,
                          timestamp: Long,
                          temperature: Double
                        )
package com.wy.wc.apitest.window

import com.wy.wc.apitest.SensorReading
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.time.Duration

object ProcessWindowFunctionAndAggregateFunctionTest {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.setParallelism(1)

        val parameterTool = ParameterTool.fromArgs(args);
        val host = parameterTool.get("host", "192.168.10.7");
        val port = parameterTool.getInt("port", 7777);
        val inputStream = env.socketTextStream(host, port);

        val dataStream = inputStream.map(i => {
            val arr = i.split(", ")
            SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }).assignTimestampsAndWatermarks(WatermarkStrategy
          .forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(2L))
          .withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
              override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = element.timestamp * 1000L
          })
          .withIdleness(Duration.ofSeconds(3L)))

        val resultStream = dataStream
          .keyBy(_.id)
          .window(TumblingEventTimeWindows.of(Time.seconds(5L)))
          .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

        resultStream.print("avgTemp")

        env.execute()
    }

    class MyAggregateFunction extends AggregateFunction[SensorReading, (Double, Long), Double] {
        override def createAccumulator(): (Double, Long) = (0D, 0L)

        override def add(value: SensorReading, accumulator: (Double, Long)): (Double, Long) = {
            (accumulator._1 + value.temperature, accumulator._2 + 1)
        }

        override def getResult(accumulator: (Double, Long)): Double = {
            if (accumulator._2 != 0) {
                accumulator._1 / accumulator._2
            } else {
                0
            }
        }

        override def merge(a: (Double, Long), b: (Double, Long)): (Double, Long) = {
            (a._1 + b._1, a._2 + b._2)
        }
    }

    class MyProcessWindowFunction extends ProcessWindowFunction[Double, (Long, Double), String, TimeWindow] {
        def process(key: String,
                    context: Context,
                    elements: Iterable[Double],
                    out: Collector[(Long, Double)]): Unit = {
            out.collect((context.window.getStart, elements.iterator.next()))
        }
    }

}

Triggers(触发器)

定义window什么时候关闭,触发计算并输出结果

Evictors(移除器)

定义移除某些数据的逻辑

Allowed Lateness(延迟处理)

允许处理迟到的数据。配合.sideOutputLateData()将迟到的数据放入侧输出流中。

val lateOutputTag = OutputTag[T]("late-data")

val input: DataStream[T] = ...

val result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>)

val lateStream = result.getSideOutput(lateOutputTag)
flink
scala

关于作者

justin
123456
获得点赞
文章被阅读