What?
: Flink는 시간 특성을 설정해서 스트림 처리에 사용되는 시간을 정의할 수 있다.
Why, When?
: 스트림 처리 시 데이터 속성과 도출 목적에 따라 시간 특성을 별도로 정의하여 상황에 맞는 시간 정의가 필요하다.
Type?
: ProcessingTime, EventTime, IngestionTime
What?
: 스트림 처리 시 실행 중인 Flink 로컬 서버의 시스템 시간을 기준으로 처리한다.
Pros
Cons
When?
Example
// 스칼라
# 스트리밍 실행 환경설정
val env = StreamExecutionEnvironment.getExecutionEnvironment
# ProcessingTime 설정
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
# 파이썬
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream import TimeCharacteristic
# 스트리밍 실행 환경설정
env = StreamExecutionEnvironment.get_execution_environment()
# ProcessingTime 설정
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
What?
: 이벤트에서 타임스탬프를 가지고 있다. 이 때, 타임스탬프는 이벤트 발생 디바이스의 시간이거나, 애플리케이션에서 할당한 시간일 수 있다.
Pros
Cons
When?
Example
// 스칼라
# 스트리밍 실행 환경설정
val env = StreamExecutionEnvironment.getExecutionEnvironment
# ProcessingTime 설정
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
# 파이썬
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream import TimeCharacteristic
# 스트리밍 실행 환경설정
env = StreamExecutionEnvironment.get_execution_environment()
# ProcessingTime 설정
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
📌 스트림 처리 결과에 대해 완벽히 보장이 아닌 최대한 보장인 이유?
What?
: 이벤트가 Flink 에 입수 되는 시점의 타임스탬프를 이용하고, 자동으로 워터마크를 생성한다.
Pros
Cons
When?
Code
// 스칼라
# 스트리밍 실행 환경설정
val env = StreamExecutionEnvironment.getExecutionEnvironment
# ProcessingTime 설정
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
# 파이썬
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream import TimeCharacteristic
# 스트리밍 실행 환경설정
env = StreamExecutionEnvironment.get_execution_environment()
# ProcessingTime 설정
env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
What?
How?
📌 참고
Timestamp Assigner(사용자 정의 타임스탬프 할당자)
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1000L)
// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new SensorSource)
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
val avgTemp: DataStream[SensorReading] = sensorData
// convert Fahrenheit to Celsius using an inlined map function
.map(r =>
SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
// organize stream by sensorId
.keyBy(_.id)
// group readings in 1 second windows
.timeWindow(Time.seconds(1))
// compute average temperature using a user-defined function
.apply(new TemperatureAverager)
SensorTimeAssigner
는 AssignerWithPeriodicWatermarks
나 AssignerWithPunctuatedWatermarks
중 하나가 될 수 있다.SensorTimeAssigner
를 확인해보면 BoundedOutOfOrdernessTimestampExtractor
를 확장하여 워터마크를 지정하고 있는 걸 확인할 수 있다. 이어서 BoundedOutOfOrdernessTimestampExtractor
를 확인해보면 AssignerWithPeriodicWatermarks
를 사용하고 있는 것도 확인할 수 있다.// SensorTimeAssigner.scala
class SensorTimeAssigner
extends BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(5)) {
/** Extracts timestamp from SensorReading. */
override def extractTimestamp(r: SensorReading): Long = r.timestamp
// BoundedOutOfOrdernessTimestampExtractor.scala
package org.apache.flink.streaming.api.functions.timestamps;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
private long lastEmittedWatermark = Long.MIN_VALUE;
private final long maxOutOfOrderness;
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0L) {
throw new RuntimeException("Tried to set the maximum allowed lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
} else {
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
}
...
What?
Config
ExecutionConfig.setAutoWatermarkInterval()
메서드로 시간 간격을 설정할 수 있다. getCurrentWatermark()
메서드를 호출하게 된다.getCurrentWatermark()
메서드가 호출 될 때 이전 워터마크의 타임스탬프보다 더 큰 타임스탬프를 반환하면 새 워터마크를 내보내게 된다.(단, Null이 아닌 값이여야 한다.)val env = StreamExecutionEnvironment.getExecutionEnvironment
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval - 5초마다 워터마크 생성
env.getConfig.setAutoWatermarkInterval(5000)
Example
: 아래 코드는 타임스탬프 할당자가 새 워터마크 요청을 받으면 가장 큰 타임스탬프에서 1분의 허용 오차(tolerance interval)를 뺸 값을 타임스탬프로 반환하는 예제이다.
// WatermarkGeneration.scala
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
// 1 min in ms
val bound: Long = 60 * 1000
// the maximum observed timestamp // 최초 실행된다고 했을 때 MinValue로 시작해서 maxTs가 계속 갱신 되는 형태인 것 같다.
var maxTs: Long = Long.MinValue
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
// update maximum timestamp
maxTs = maxTs.max(r.timestamp)
// return record timestamp
r.timestamp
}
}
일반적으로 많이 사용하는 AssignerWithPeriodicWatermarks의 구현체 2가지
1. assignAscendingTimeStamps
// AssignerWithPeriodicWatermarks - assignAscendingTimeStamps
val stream: DataStream[SensorReading] = ...
val withTimestampAndWatermarks = stream
.assignAscendingTimestamps(e => e.timestamp) // 사실 이 부분이 왜 현재 타임스탬프를 사용한다는 것인지 잘 모르겠다..
BoundedOutOfOrdernessTimestampExtractor
// AssignerWithPeriodicWatermarks - BoundedOutOfOrdernessTimestampExtractor
val stream: DataStream[SensorReading] = ...
val output = stream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[SensorReading](
Time.seconds(10))(e => .timestamp) // 최대 지연 값을 10초로 설정
: 입력 이벤트의 속성에 따라 워터마크를 내보낸다.
What?
Config
AssignerWithPunctuatedWatermarks
인터페이스는 이벤트마다 extractTimestamp()
메서드를 호출한 직후 checkAndGetNextWatermark()
메서드를 호출하고, 이 메서드는 새 워터마크를 생성 할지 말지 결정할 수 있다. 이 메서드가 가장 최근에 내보낸 워터마크보다 더 큰 타임스탬프 값을 가진 워터마크를 반환하면 새 워터마크로 내보낸다.(단, 반환 되는 값이 Null 이 아니여야한다.)AssignerWithPunctuatedWatermarks
이다.// WatermarkGeneration.scala
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {
// 1 min in ms
val bound: Long = 60 * 1000
override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = {
if (r.id == "sensor_1") {
// emit watermark if reading is from sensor_1
new Watermark(extractedTS - bound)
} else {
// do not emit a watermark
null
}
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
// assign record timestamp
r.timestamp
}
}
느슨한 워터마크
빠듯한 워터마크
What?
When?
Type( Flink는 8개의 ProcessFunction을 갖고 있다. )
8개의 ProcessFunction 함수들의 공통점
// RichFunction.class
package org.apache.flink.api.common.functions;
import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;
@Public
public interface RichFunction extends Function {
void open(Configuration var1) throws Exception;
void close() throws Exception;
RuntimeContext getRuntimeContext();
IterationRuntimeContext getIterationRuntimeContext();
void setRuntimeContext(RuntimeContext var1);
}
KeyedProcessFunction 함수에 확장 되는 두 가지 메서드(🔒 솔직히 잘 모르겠다)
: 8개 ProcessFunction 함수들의 공통점 이외에도 KeyedProcessFunction은 아래 두 메서드를 추가로 제공한다.
processElement(v: IN, ctx: Context, out: Collector[OUT])
onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])
processElement(), onTimer() 동기화
// KeyedProcessFunction.class
@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public KeyedProcessFunction() {
}
public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;
public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
public abstract class OnTimerContext extends KeyedProcessFunction<K, I, O>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
public abstract K getCurrentKey();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
public abstract K getCurrentKey();
}
}
Timer(타이머)
TimerService
Timer Types
: Context와 OntimerContext 객체의 TimerService는 아래 코드와 같은 메서드 6개를 제공한다.
// TimerService.class
package org.apache.flink.streaming.api;
import org.apache.flink.annotation.PublicEvolving;
@PublicEvolving
public interface TimerService {
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
long currentProcessingTime(); // Long 현재 처리 시간을 반환한다.
long currentWatermark(); // Long 현재 워터마크의 타임스탬프를 반환한다.
void registerProcessingTimeTimer(long var1); // Unit 현재 키의 처리 시간 타이머를 등록한다. 현재 실행중인 장비의 처리 시간이 인자로 넘긴 시간에 도달하면 타이머가 동작한다.
void registerEventTimeTimer(long var1); // Unit 현재 키의 이벤트 시간 타이머를 등록한다. 워터마크가 타이머의 시간과 같거나 큰 타임스탬프로 갱신 되면 타이머가 동작한다.
void deleteProcessingTimeTimer(long var1); // Unit 현재 키의 타이머 중 이전에 등록한 처리 시간 타이머를 제거한다. 해당 타이머가 존재하지 않으면 이 메서드는 아무런 동작을 하지 않는다.
void deleteEventTimeTimer(long var1); // Unit 현재 키의 타이머 중 이전에 등록한 이벤트 시간 타이머를 제거한다. 해당 타이머가 존재하지 않으면 이 메서드는 아무런 동작을 하지 않는다.
}
Timer & TimerService - 장애복구 대응
: 해당 장의 내용을 여러번 보았는데, 책에 기재되어 있는 상세 내용이 아직 잘 이해가지 않는다. 내가 이해한 중간 요약을 적어보고 스터디에서 공유해보자.
// ProcessFunctionTimers.scala
package io.github.streamingwithflink.chapter6
import io.github.streamingwithflink.util.{SensorReading, SensorSource}
import org.apache.flink.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
object ProcessFunctionTimers {
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// ingest sensor stream
val readings: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new SensorSource)
// 설명1. ProcessFunction은 KeyedStream에서 사용 가능 하므로
// 설명1-1. keyby() 를 사용해서 KeyeStream으로 스트림을 만들고,
// 설명1-2. 온도 상승을 모니터링 하기 위해 TempIncreaseAlertFunction 로 사용자 정의 ProcessFunction을 사용한다.
val warnings = readings
// key by sensor id
.keyBy(_.id)
// apply ProcessFunction to monitor temperatures
.process(new TempIncreaseAlertFunction)
warnings.print()
env.execute("Monitor sensor temperatures.")
}
}
/** Emits a warning if the temperature of a sensor
* monotonically increases for 1 second (in processing time).
*/
class TempIncreaseAlertFunction
extends KeyedProcessFunction[String, SensorReading, String] {
// 설명2. ProcessFunction에서는 getRuntimeContext() 메서드를 제공하므로
// 설명2-1. 아래와 같이 메서드를 이용해 센서 레코드의 lastTemp 값과 timer 값을 가져올 수 있다.
// hold temperature of last sensor reading
lazy val lastTemp: ValueState[Double] =
getRuntimeContext.getState(
new ValueStateDescriptor[Double]("lastTemp", Types.of[Double])
)
// hold timestamp of currently active timer
lazy val currentTimer: ValueState[Long] =
getRuntimeContext.getState(
new ValueStateDescriptor[Long]("timer", Types.of[Long])
)
// 설명3. processElement()가 각 레코드에서 호출되어 아래 플로우를 탄다.
override def processElement(
r: SensorReading,
ctx: KeyedProcessFunction[String, SensorReading, String]#Context,
out: Collector[String]): Unit = {
// get previous temperature
val prevTemp = lastTemp.value()
// update last temperature
lastTemp.update(r.temperature)
val curTimerTimestamp = currentTimer.value()
if (prevTemp == 0.0) {
// first sensor reading for this key.
// we cannot compare it with a previous value.
}
else if (r.temperature < prevTemp) {
// 설명4. 현재 온도가 상승 되지 않았을 경우 모니터할 필요가 없기 때문에 타이머를 clear() 해준다.
// temperature decreased. Delete current timer.
ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)
currentTimer.clear()
}
else if (r.temperature > prevTemp && curTimerTimestamp == 0) {
// 설명5. 현재 온도가 상승되었고, 등록된 타이머가 없다면 인자값으로 전달 받은 Context()에서 timerService에 접근하여 현재 레코드의 현재 처리 시간을 반환 받아 온다.
// 설명5-1. 그리고 현재 처리 시간에 1초를 더해서 1초간 온도 상승을 체크한다.
// 설명5-2. 1초간 온도가 상승 될 경우 1초 후 현재 시점의 처리 시간 타임스탬프값을 타이머에 전달하여 업데이트 한다.
// 설명5-3. 타이머가 갱신 되면서 onTimer() 콜백 함수를 호출 한다.
// temperature increased and we have not set a timer yet.
// set timer for now + 1 second
val timerTs = ctx.timerService().currentProcessingTime() + 1000
ctx.timerService().registerProcessingTimeTimer(timerTs)
// remember current timer
currentTimer.update(timerTs)
out.collect("getCurrentKey: "+ ctx.getCurrentKey + "temperature: " + r.temperature + " | prevTemp: "+ prevTemp)
}
}
override def onTimer(
ts: Long,
ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext,
out: Collector[String]): Unit = {
// 설명6. 타이머가 갱신 되면서 onTimer() 함수가 호출 되고, 1초간 온도가 상승 되었다는 알람을 Collector에 전송하고 현재 타이머를 clear() 한다.
out.collect("Temperature of sensor '" + ctx.getCurrentKey +
"' monotonically increased for 1 second.")
// reset current timer
currentTimer.clear()
}
}
1> getCurrentKey: sensor_58temperature: 68.2771877870539 | prevTemp: 67.66293569327982
1> getCurrentKey: sensor_58temperature: 70.22113915422284 | prevTemp: 69.45767478207411
1> getCurrentKey: sensor_58temperature: 69.70030532502547 | prevTemp: 69.63249876364078
1> getCurrentKey: sensor_58temperature: 69.85322985787988 | prevTemp: 69.0365850638901
1> getCurrentKey: sensor_58temperature: 69.80593202149456 | prevTemp: 69.8000324175976
1> Temperature of sensor 'sensor_58' monotonically increased for 1 second.
사이드 아웃풋
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
// 사이드 아웃풋은 OutputTag<x> 객체로 식별한다. << 이게 무슨 말일까?
public abstract <X> void output(OutputTag<X> var1, X var2);
public abstract K getCurrentKey();
}
// Scala
val outputTag = OutputTag[String]("side-output")
# Python
output_tag = OutputTag("side-output", Types.STRING())
// SideOutputs.scala
package io.github.streamingwithflink.chapter6
import io.github.streamingwithflink.util.{SensorReading, SensorSource, SensorTimeAssigner}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
object SideOutputs {
def main(args: Array[String]): Unit = {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// checkpoint every 10 seconds
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1000L)
// ingest sensor stream
val readings: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new SensorSource)
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
// 설명1. 센서 데이터에서 어는점을 확인하기 위해 FreezingMonitor라는 ProcessFunction을 사용한다.
val monitoredReadings: DataStream[SensorReading] = readings
// monitor stream for readings with freezing temperatures
.process(new FreezingMonitor)
// retrieve and print the freezing alarms
// 설명5. FreezingMonitor에서 사이드 아웃풋으로 내보내진 스트림을 읽고 출력한다.
monitoredReadings
.getSideOutput(new OutputTag[String]("freezing-alarms"))
.print()
// print the main output
// 설명6. collect()로 내보내진 모든 레코드를 출력한다.
readings.print()
env.execute()
}
}
/** Emits freezing alarms to a side output for readings with a temperature below 32F. */
class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] {
// define a side output tag
// 설명2. 사이드 아웃풋의 태그를 'freezing-alarms'라 식별하여 선언한다.
lazy val freezingAlarmOutput: OutputTag[String] =
new OutputTag[String]("freezing-alarms")
override def processElement(
r: SensorReading,
ctx: ProcessFunction[SensorReading, SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// emit freezing alarm if temperature is below 32F.
// 설명3. 각 레코드의 온도가 32도 이하인지 확인하고
// 설명3-1. 온도가 32도 이하일 경우 사이드 아웃풋 태그 데이터와, 문자열 데이터를 output으로 내보낸다.
if (r.temperature < 32.0) {
ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}")
}
// forward all readings to the regular output
// 설명4. 모든 레코드 정보는 일반 collect()로 내보낸다
out.collect(r)
}
}
// 6> SensorReading(sensor_56,1682266031768,30.563619836059083)
// 6> Freezing Alarm for sensor_56
What?
How?
Example(Q. 잘 이해 못함)
package io.github.streamingwithflink.chapter6
import io.github.streamingwithflink.util.{SensorReading, SensorSource}
import org.apache.flink.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
object CoProcessFunctionTimers {
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// 센서 이벤트를 내보내는 필터 스위치
// 설명1. 스트림1 -> 센서 이벤트를 내보내는 필터 스위치
// filterSwitches 변수의 데이터 타입은 DataStream[(String, Long)]이다
// env 객체에서 제공하는 fromCollection 메서드를 사용하여 값을 할당 받는다.
// Seq 메서드는 문자열과 long 값으로 이루어진 튜플의 시퀀스를 생성하는 데 사용된다.
val filterSwitches: DataStream[(String, Long)] = env
.fromCollection(Seq(
("sensor_2", 10 * 1000L), // 10초 동안 sensor_2 방출
("sensor_7", 60 * 1000L)) // 1분 동안 sensor_7 방출
)
// ingest sensor stream
// 설명2. 스트림2 -> 수집된 모든 센서 데이터 레코드
val readings: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new SensorSource)
// 설명3. 스트림2에 스트림1을 connect() 로 연결하고 _id를 기준으로 keyby() 진행하여
// 설명3-1. ReadingFilter에서 사용자 정의 CoProcessFunction 사용
val forwardedReadings = readings
// connect readings and switches
.connect(filterSwitches)
// key by sensor ids
.keyBy(_.id, _._1)
// apply filtering CoProcessFunction
.process(new ReadingFilter)
forwardedReadings
.print()
env.execute("Monitor sensor temperatures.")
}
}
class ReadingFilter
extends CoProcessFunction[SensorReading, (String, Long), SensorReading] {
// switch to enable forwarding
// 내보내기 가능한 스위치(레코드를 내보내는 스위치)
lazy val forwardingEnabled: ValueState[Boolean] =
getRuntimeContext.getState(
new ValueStateDescriptor[Boolean]("filterSwitch", Types.of[Boolean])
)
// hold timestamp of currently active disable timer
// 센서 읽음 레코드를 내보내는 것을 중지하는 타이머의 타임스탬프 저장
lazy val disableTimer: ValueState[Long] =
getRuntimeContext.getState(
new ValueStateDescriptor[Long]("timer", Types.of[Long])
)
// 설명4.
// ProcessElement1 에서 센서(아이디, 온도, 타임스탬프) 읽음 레코드가 들어오면
// forwardingEnabled 값을 확인하고 true 이면 레코드를 collect()로 내보낸다.
override def processElement1(
reading: SensorReading,
ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// check if we may forward the reading
// 설명4-1. 레코드를 받아서 출력 가능한지 체크하고 가능할 경우 일반 collect()로 레코드를 내보낸다.
if (forwardingEnabled.value()) {
out.collect(reading)
}
}
// 설명5.
// processElement2에서 filterSwitches 스위치 데이터가 들어오면,
// forwardingEnabled 값을 true로 바꾸고
// 현재 레코드의 처리 시간 + 10초를 더한 타임스탬프 값을 timerTimestamp변수에 저장한다.
// 그 다음, 최근 타이머 중지 시 업데이트 했던 타이머의 타임스탬프 값을 disableTimer 변수에 저장한다.
// 최근 타임스탬프 값이 새로 선언한 것보다 작으면 기존 타임스탬프 값을 삭제하고 새로운 타이머의 타임스탬프 값을 선언한다.
// disableTimer 을 새로운 타임스탬프 값으로 선언하여 타이머를 트리깅한다.
override def processElement2(
switch: (String, Long),
ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// enable reading forwarding
// 설명6. 센서 읽음 레코드를 내보내도록 forwardingEnabled 값을 true 선언한다.
forwardingEnabled.update(true)
// set disable forward timer
val timerTimestamp = ctx.timerService().currentProcessingTime() + switch._2
val curTimerTimestamp = disableTimer.value()
if (timerTimestamp > curTimerTimestamp) {
// remove current timer and register new timer
ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)
ctx.timerService().registerProcessingTimeTimer(timerTimestamp)
disableTimer.update(timerTimestamp)
}
}
// 설명7.
// 타이머가 새로 갱신 되면 onTimer 콜백 함수가 호출되고
// forwardingEnabled 값을 false 로 바꾸고
// disableTimer 값을 초기화하여 다음 타이머가 선언될 수 있도록한다.
override def onTimer(
ts: Long,
ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#OnTimerContext,
out: Collector[SensorReading]): Unit = {
// remove all state. Forward switch will be false by default.
forwardingEnabled.clear()
disableTimer.clear()
}
}
Example 데이터 결과
: 결과적으로 10초 동안 sensor_2 레코드가 출력되고, 1분 동안 sensor_7 레코드가 출력되는 것을 확인할 수 있다.
8> SensorReading(sensor_7,1682271490317,88.46913758179487)
2> SensorReading(sensor_2,1682271490317,75.91437154538198)
2> SensorReading(sensor_2,1682271490421,75.27610125572255)
8> SensorReading(sensor_7,1682271490421,88.2588389889681)
8> SensorReading(sensor_7,1682271490521,87.71091885536485)
2> SensorReading(sensor_2,1682271490521,75.0144029640268)
2> SensorReading(sensor_2,1682271490628,74.70574187268318)
8> SensorReading(sensor_7,1682271490628,86.9227533362839)
2> SensorReading(sensor_2,1682271490728,74.96867312979697)
8> SensorReading(sensor_7,1682271490728,86.46586606680201)
2> SensorReading(sensor_2,1682271490831,75.66137461955259)
8> SensorReading(sensor_7,1682271490831,86.44857582441172)
... 10초가 지나면 sensor_2 레코드는 출력되지 않고, 1분이 지나면 sensor_7 레코드도 출력되지 않는다.
What?
How?
Example(윈도우 연산 상황)
Config
KeyedStream에 적용하는 윈도우 연산자는 병렬 태스크로 처리되고, 그 외의 스트림에 적용한 윈도우 연산자는 단일 태스크로 처리된다.
윈도우 연산자를 생성할 때 필수로 지정해야하는 두 가지 윈도우 컴포넌트:
// 윈도우 연산자 함수 적용하는 예제
// KeyedStream 에 윈도우 연산자 정의
stream
.keyBy(...)
.window(...) // WindowAssigner 지정
.reduce or aggregate or process(...) // 윈도우 함수 지정
// KeyedStream 이 외의 스트림에 WindowAll 연산자 정의
stream
.windowAll(...) // WindowAssiger 지정
.reduce or aggregate or process(...) // 윈도우 함수 지정
시간 기반 윈도우
What?
Config
개수 기반 윈도우
텀블링 윈도우
What?
Type?
Config
Example
// Scala
val sensorData: DataStream[SensorReading] ...
val avgTemp = sensorData
.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(1))) // 1초 이벤트 시간 윈도우
.process(new TemperatureAverager)
val avgTemp = sensorData
.keyBy(_.kd)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) // 1초 처리 시간 윈도우
.process(new TemperatureAverager)
val avgTemp = sensorData
.keyBy(_.id)
.timeWindow(Time.seconds(1)) // window(TumblingEventTimeWindows.of(Time.seconds(1))) 의 단축 표현
.process(new TemperatureAverager)
// 00:15:00 -> 01:15:00 -> 02:15:00 와 같이 텀블링 윈도우가 흘러간다
val avgTemp = sensorData
.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))) // 1시간 이벤트 시간 윈도우 + 15분 오프셋 지정
.process(new TemperatureAverager)
# Python
input = ... # type: DataStream
# tumbling event-time windows
input \
.key_by(<key selector>) \
.window(TumblingEventTimeWindows.of(Time.seconds(5))) \
.<windowed transformation>(<window function>)
# tumbling processing-time windows
input \
.key_by(<key selector>) \
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) \
.<windowed transformation>(<window function>)
# daily tumbling event-time windows offset by -8 hours.
input \
.key_by(<key selector>) \
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \
.<windowed transformation>(<window function>)
슬라이딩 윈도우
What?
Config
Example
// 이벤트 시간 슬라이딩 WindowAssigner
val slidingAvgTemp = sensorData
.keyBy(_.id)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(15))) // 15분마다 1시간 길이의 이벤트 시간 윈도우를 생성한다.
.process(new TemperatureAverager)
// 처리 시간 슬라이딩 WindowAssigner
val slidingAvgTemp = sensorData
.keyBy(_.id)
.window(SlidingProcessingTimeWindows.of(Time.hours(1), Time.minutes(15))) // 15분마다 1시간 길이의 처리 시간 윈도우를 생성한다.
.process(new TemperatureAverager)
// 단축 표현 메서드를 사용하는 슬라이딩 WindowAssigner
val slidingAvgTemp = sensorData
.keyBy(_.id)
.timeWindow(Time.hours(1), Time.minutes(15)) // .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(15))) 의 단축표현
.process(new TemperatureAverager)
# Python
input = ... # type: DataStream
# sliding event-time windows
input \
.key_by(<key selector>) \
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) \
.<windowed transformation>(<window function>)
# sliding processing-time windows
input \
.key_by(<key selector>) \
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) \
.<windowed transformation>(<window function>)
# sliding processing-time windows offset by -8 hours
input \
.key_by(<key selector>) \
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) \
.<windowed transformation>(<window function>)
세션 윈도우
What?
Config
Example
// 이벤트 시간 세션 WindowAssigner
val sessionWindows = sensorData
.keyBy(_.id)
.window(EventTimeSessionWindows.withGap(Time.minutes(15))) // 15분 격차로 정의한 이벤트 시간 세션 윈도우 생성
.process(...)
// 처리 시간 세션 WindowAssigner
val sessionWindows = sensorData
.keyBy(_.id)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(15))) // 15분 격차로 정의한 처리 시간 세션 윈도우 생성
.process(...)
input = ... # type: DataStream
class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
def extract(self, element: tuple) -> int:
# determine and return session gap
# event-time session windows with static gap
input \
.key_by(<key selector>) \
.window(EventTimeSessionWindows.with_gap(Time.minutes(10))) \
.<windowed transformation>(<window function>)
# event-time session windows with dynamic gap
input \
.key_by(<key selector>) \
.window(EventTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor())) \
.<windowed transformation>(<window function>)
# processing-time session windows with static gap
input \
.key_by(<key selector>) \
.window(ProcessingTimeSessionWindows.with_gap(Time.minutes(10))) \
.<windowed transformation>(<window function>)
# processing-time session windows with dynamic gap
input \
.key_by(<key selector>) \
.window(DynamicProcessingTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor())) \
.<windowed transformation>(<window function>)
What?
Type
ReduceFunction
: 5장 KeyedStream 변환 연산 절에서 어떻게 적용 되는지 소개 되었다.
What?
Work?
Pros
Cons
Example
// ReduceFunction을 람다 함수로 적용
// 15초마다 각 센서의 최소 온도를 계산
val minTempPerWindow: DataStream[(String, Double)] = sensorData
.map(r => (r.id, r.temperature))
.keyBy(_.id)
.timeWindow(Time.seconds(15))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
AggregateFunction
What?
Work?
Pros
Cons
Example
// AggregateFunction.class
package org.apache.flink.api.common.functions;
import java.io.Serializable;
import org.apache.flink.annotation.PublicEvolving;
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
ACC createAccumulator();
ACC add(IN var1, ACC var2);
OUT getResult(ACC var1);
ACC merge(ACC var1, ACC var2);
}
class AvgTempFunction
extends AggregateFunction[(String, Double), (String, Double, Int), (String, Double)] {
override def createAccumulator() = {
("", 0.0, 0)
}
override def add(in: (String, Double), acc: (String, Double, Int)) = {
// in._1: 새로 들어온 레코드의 id값
// in._2: 새로 들어온 레코드의 온도값
// acc._2: 지금까지 집계된 온도값(합계)
// acc._3: 지금까지 집계된 레코드 개수
(in._1, in._2 + acc._2, 1 + acc._3)
}
override def getResult(acc: (String, Double, Int)) = {
// id 값, 평균값 계산
(acc._1, acc._2 / acc._3)
}
override def merge(acc1: (String, Double, Int), acc2: (String, Double, Int)) = {
// 이 부분에 왜 merge가 들어오는지는 잘 모르겠다.
(acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}
}
ProcessWindowFunction
What?
When?
Pros
Cons
아래 코드(ProcessWindowFunction.class
)와 같이 ProcessWindowFunction 의 process() 메서드는 4개의 인자값을 받는다.
// ProcessWindowFunction.class
package org.apache.flink.streaming.api.scala.function
@_root_.org.apache.flink.annotation.PublicEvolving
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: _root_.org.apache.flink.streaming.api.windowing.windows.Window] extends _root_.org.apache.flink.api.common.functions.AbstractRichFunction {
@_root_.scala.throws[_root_.scala.Exception]
def process(key: KEY, context: ProcessWindowFunction.this.Context, elements: _root_.scala.Iterable[IN], out: _root_.org.apache.flink.util.Collector[OUT]): _root_.scala.Unit
abstract class Context {
def window: W
def currentProcessingTime: _root_.scala.Long
def currentWatermark: _root_.scala.Long
def windowState: _root_.org.apache.flink.api.common.state.KeyedStateStore
def globalState: _root_.org.apache.flink.api.common.state.KeyedStateStore
def output[X](outputTag: _root_.org.apache.flink.streaming.api.scala.OutputTag[X], value: X): _root_.scala.Unit
}
@_root_.scala.throws[_root_.scala.Exception]
def clear(context: ProcessWindowFunction.this.Context): _root_.scala.Unit = ???
}
윈도우별 상태(Q. 잘 이해 못함.)
글로벌 상태(Q. 잘 이해 못함.)
Example
// WindowFunctions.scala
// output the lowest and highest temperature reading every 5 seconds
// 센서 읽음 스트림을 5초 길이의 텀블링 윈도우로 모아 윈도우 내에서 발생한 가장 낮은 온도와 높은 온도를 구한다.
// HighAndLowTempProcessFunction에서는 각 윈도우별로 윈도우의 시작과 종료 타임스탬프, 최대 온도와 최소 온도를 포함하는 레코드 하나를 내보낸다.
val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.process(new HighAndLowTempProcessFunction)
case class MinMaxTemp(id: String, min: Double, max:Double, endTs: Long)
// 윈도우별로 최저 및 최고 온도 레코드를 계산하고 윈도우의 종료 타임스탬프와 함께 내보내는 ProcessWindowFunction 이다.
class HighAndLowTempProcessFunction
extends ProcessWindowFunction[SensorReading, MinMaxTemp, String, TimeWindow] {
override def process(
key: String, // 윈도우 키
ctx: Context, // Context 객체
vals: Iterable[SensorReading], // 윈도우의 모든 레코드에 접근할 수 있는 Iterable 객체
out: Collector[MinMaxTemp]): Unit = { // 결과를 내보내는 Collector 객체
val temps = vals.map(_.temperature)
val windowEnd = ctx.window.getEnd
out.collect(MinMaxTemp(key, temps.min, temps.max, windowEnd))
}
}
// * 참고: ProcessWindowFunction으로 평가할 윈도우는 윈도우에 할당된 모든 이벤트를 ListState에 저장한다.
증분 집계 함수와 ProcessWindowFunction(함께 사용)
When?
How?
Example
// WindowsFunctions.scala
val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData
.map(r => (r.id, r.temperature, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.reduce(
// incrementally compute min and max temperature
// 최대, 최소 온도의 증분 계산
(r1: (String, Double, Double), r2: (String, Double, Double)) => {
(r1._1, r1._2.min(r2._2), r1._3.max(r2._3))
},
// finalize result in ProcessWindowFunction
// 증분 계산 후 집계 결과를 아래 ProcessWindowFunction인 AssignWindowEndProcessFunction에서 결과 처리
new AssignWindowEndProcessFunction()
)
class AssignWindowEndProcessFunction
extends ProcessWindowFunction[(String, Double, Double), MinMaxTemp, String, TimeWindow] {
override def process(
key: String,
ctx: Context,
minMaxIt: Iterable[(String, Double, Double)], // 이 때 Iterable한 인자의 요소는 1개
out: Collector[MinMaxTemp]): Unit = {
val minMax = minMaxIt.head
val windowEnd = ctx.window.getEnd
out.collect(MinMaxTemp(key, minMax._2, minMax._3, windowEnd))
}
}
When?
How?
Work
What?
Flow?
Trigger 발생 시점 - 증분 집계 함수 사용 시
Trigger 발생 시점 - 전체 윈도우 함수 사용 시
Trigger 발생 시점 - (증분 집계 함수 + 전체 윈도우 함수)를 함께 사용 시
What?
When?
Evictor 사용법
// Evictor
stream.
.keyBy(...)
.window(...) // WindowAssiger 설정
[.trigger(...)] // 선택 사항: Trigger 설정
[.evictor(...)] // 선택 사항: Evictor 설정
.reduce or aggregate or process(...) // 윈도우 함수 설정
// evictBefore() 윈도우 함수 이전에 적용할 ecvitor 로직을 구현할 때 사용한다.
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
// evictAfter() 윈도우 함수 이후에 적용할 ecvitor 로직을 구현할 때 사용한다.
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
What?
윈도우 생성 시점
윈도우 상태(정보)
What?
Example
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.windowing.time.Time
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream
.keyBy(elem => /* select key */)
.intervalJoin(greenStream.keyBy(elem => /* select key */))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process(new ProcessJoinFunction[Integer, Integer, String] {
override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
out.collect(left + "," + right)
}
})
What?
Config
Work
Example
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>);
Type
Tumbling Window Join
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
.where(elem => /* select key */)
.equalTo(elem => /* select key */)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply { (e1, e2) => e1 + "," + e2 }
Sliding Window Join
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
.where(elem => /* select key */)
.equalTo(elem => /* select key */)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.apply { (e1, e2) => e1 + "," + e2 }
Session Window Join
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
.where(elem => /* select key */)
.equalTo(elem => /* select key */)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply { (e1, e2) => e1 + "," + e2 }