오늘은 개인적으로 Stream processing의 꽃이라고 생각하는 window operator에 대해 적어 보려고 합니다. Window operator는 간단하게 설명하자면, stream으로 흘들어 오는 이벤트를 묶어서 처리 하는 것입니다. 실시간을 들어오는 이벤트를 묶어서 처리해야 하는 여러가지 상황이 있지만, 아래와 같은 상황에 사용할 수 있습니다.
위 상황들 말고도 여러 상황에서 window operator는 유효하게 사용 가능하며, 상황만 알맞은 곳에 적용한다면, 제 개인적으로는, 트래픽 대비 서버 사용량도 효율적으로 대폭 감소 시킨 경험이 있습니다.
Window operaton은 keyed와 non-keyed가 존재 하는데, 결국 window operator는 특정 이벤트를 묶어서 처리해주는 기능이라면, key는 어떤 방식으로 이벤트가 같이 묶여서 처리되는 지에 대한 partition 설정이라고 보실수 있습니다. Key를 설정하게 된다면, 이후 이벤트 처리에서 같은 키들은 묶여서 처리되며, 다른 키는 결코 같이 묶일수 없으며, (이벤트 처리후 join을 강제적으로 하지 않는 이상...) 분리되어서 처리된다고 볼수 있고, Non-keyed로 진행한다면, 전체의 이벤트가 window operator가 작업이 되는 동안에 같이 window로 묶일 가능성이 생깁니다. (무조건 묶이는게 아니라, 그후에 추가로 묶이는 logic에 의해 결정됨) 추가로, key로 진행 된 window 같은 경우 여러 parition으로 나뉘어 지기 때문에, 각각의 key window로 병렬 처리가 가능합니다.
아래에 예제 코드는 간단하게, keyed window와 non-keyed window의 선언법 입니다.
// keyed window
stream
.keyBy(...)
.window(...)
// non-keyed window
stream
.windowAll(...)
Window assigner는 어떻게 넘어온 이벤트가 어떤 형식으로 window를 생성하고 닫는 지에 대한 정의를 합니다. Flink에서는 기본적으로 아래 4가지의 window를 제공합니다.
// 5 초간 윈도우가 열리고 닫힌다.
stream
.keyBy()
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 한국 시간 기준
stream
.keyBy()
.window(TumblingEventTimeWindows.of(Time.seconds(5), Time.hours(9))))
// 10초간 윈도우가 열리고, 5초 마다 새로운 window가 시작된다.
stream
.keyBy()
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
// 한국 시간 기준
stream
.keyBy()
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5), Time.hours(9))))
```java
// 10분간 새로운 이벤트가 윈도우에 들어오지 않을경우 윈도우가 닫힌다.
stream
.keyBy()
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
```
//
stream
.keyBy()
.window(GlobalWindows.create())
이미지 출처: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
Window function은 이전에 적은 window로서 뭉친 이벤트를 실제로 처리하는 logic 정하는 곳 입니다. 최초로는 아래와 같이 3가지 처리방식을 제공 합니다.
// 튜플에 2번째를 sum 하는 reduce function
steam
.keyBy()
.window()
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
// 평균을 구하는 aggregate function
stream
.keyBy()
.window()
.aggregate(new AverageAggregate());
// 윈도우에 몇개에 이벤트가 넘어 왔는 지 string으로 output 내보내는 process function
stream
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction());
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
Flink window process를 적다 보니 part1 과 part2로 나누게 되었습니다. 공식 문서에서도 이야기 하듯이 window는 Flink에서 가장 메인이 되는 기능이기도 하며, 다른 stream에서는 디테일하게 제공되지 않는 기능이기도 합니다. (제가 Flink를 선택한 가장 큰 이유이기도 하구요 ㅎ) 다음 파트는 Flink를 통해 제공된 window가 아닌 어떻게 커스텀하게 나만의 window operator 만들 수 있는 지 대해, 적어보겠습니다.
좋은 글 감사합니다.