Flink의 상태는 관리 범위에 따라 Keyed State와 Operator State로 구분된다.
Keyed State는 keyBy 기준으로 분리되어 각 key마다 독립적으로 관리되는 상태이며, Operator State는 연산자 인스턴스 단위로 관리되는 상태이다.
Keyed State는 keyBy가 적용된 스트림에서 사용되며, 동일한 key를 가진 이벤트는 항상 동일한 상태 공간에 누적된다. 각 key의 상태는 서로 완전히 분리되어 관리되므로, 사용자·계좌·디바이스와 같이 논리적으로 독립된 엔티티 단위의 상태를 안전하게 유지할 수 있다.
Flink는 Keyed State를 내부적으로 자동 분산 저장하며, 병렬도가 변경되는 경우에도 상태를 재배치하여 처리의 연속성을 유지한다. 대규모 상태 저장이 필요한 경우에는 RocksDB 기반 State Backend를 사용하여 디스크 기반으로 확장할 수 있다.
Operator State는 특정 key에 귀속되지 않고 연산자 인스턴스에 직접 귀속되는 상태이다. 이벤트의 의미보다는 연산자의 처리 진행 상황이나 내부 동작 정보를 저장하는 데 목적이 있다.
각 병렬 태스크가 자신의 상태를 독립적으로 관리하며, 병렬도 변경 시에는 상태를 재분배할 수 있도록 설계되어 있다. 이러한 특성으로 인해 배치 전송을 위한 버퍼, 외부 시스템 연동을 위한 오프셋과 같이 스트림 데이터와 직접 대응되지 않는 내부 처리 정보를 저장하는 용도로 주로 사용된다.
Flink는 상태의 구조와 접근 방식을 명확하게 표현할 수 있도록 여러 형태의 상태 객체를 제공한다. 이들은 주로 Keyed State 환경에서 사용되지만, 상태 모델 자체는 동일하다.
ValueState는 하나의 key에 대해 하나의 값만 저장하는 가장 기본적인 상태 타입이다. 최근 상태나 메타 정보를 저장하는 데 사용된다.
public class LastLoginFunction extends KeyedProcessFunction<String, Event, String> {
private ValueState<Long> lastLoginTime;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("lastLoginTime", Long.class);
lastLoginTime = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
Long last = lastLoginTime.value();
if (last == null) {
out.collect("first login");
}
lastLoginTime.update(value.timestamp());
}
}
ListState는 하나의 key에 대해 여러 값을 순서대로 저장하는 상태 타입이다. 이벤트 이력이나 제한된 히스토리 관리에 사용된다.
public class RecentEventFunction extends KeyedProcessFunction<String, Event, String> {
private ListState<String> recentEvents;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<String> descriptor =
new ListStateDescriptor<>("recentEvents", String.class);
recentEvents = getRuntimeContext().getListState(descriptor);
}
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
recentEvents.add(value.type());
for (String e : recentEvents.get()) {
// iterate history
}
}
}
MapState는 하나의 key 내부에서 key-value 구조를 관리하는 상태 타입이다. 다수의 속성이나 카운터를 함께 관리할 때 사용된다.
public class CartFunction extends KeyedProcessFunction<String, Order, String> {
private MapState<String, Integer> cart;
@Override
public void open(Configuration parameters) {
MapStateDescriptor<String, Integer> descriptor =
new MapStateDescriptor<>("cart", String.class, Integer.class);
cart = getRuntimeContext().getMapState(descriptor);
}
@Override
public void processElement(Order value, Context ctx, Collector<String> out) throws Exception {
Integer count = cart.get(value.productId());
cart.put(value.productId(), count == null ? 1 : count + 1);
}
}
ReducingState는 값이 추가될 때마다 reduce 함수가 적용되어 하나의 결과만 유지되는 상태 타입이다.
ReducingStateDescriptor<Long> descriptor =
new ReducingStateDescriptor<>(
"totalSales",
Long::sum,
Long.class
);
ReducingState<Long> totalSales = getRuntimeContext().getReducingState(descriptor);
// usage
totalSales.add(amount);
Long result = totalSales.get();
AggregatingState는 내부 accumulator를 통해 복잡한 집계를 수행하고, 최종 결과를 별도의 타입으로 반환하는 상태 타입이다.
public class AvgAggregate implements AggregateFunction<Long, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return Tuple2.of(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Long value, Tuple2<Long, Long> acc) {
return Tuple2.of(acc.f0 + value, acc.f1 + 1);
}
@Override
public Double getResult(Tuple2<Long, Long> acc) {
return acc.f0 / (double) acc.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
}
AggregatingStateDescriptor<Long, Tuple2<Long, Long>, Double> descriptor =
new AggregatingStateDescriptor<>(
"avgState",
new AvgAggregate(),
Types.TUPLE(Types.LONG, Types.LONG)
);
AggregatingState<Long, Double> avgState = getRuntimeContext().getAggregatingState(descriptor);
// usage
avgState.add(value);
Double avg = avgState.get();
Broadcast State는 모든 병렬 태스크가 동일한 데이터를 참조해야 할 때 사용하는 상태 구조이다. 일반적인 Keyed State는 key 기준으로 분산되기 때문에, 전역 규칙이나 설정 정보를 공유하기에는 적합하지 않다.
사기 탐지 규칙, 필터 조건, 라우팅 테이블과 같이 런타임 중 변경되는 정책 정보를 전달하는 데 주로 사용되며, 규칙이 변경되면 모든 태스크에 즉시 반영된다.
스트리밍 애플리케이션은 장기간 실행되는 경우가 많기 때문에, 사용되지 않는 상태가 계속 누적되면 저장소 용량 증가와 복구 시간 증가로 이어진다. 이를 방지하기 위해 Flink는 State TTL(Time To Live) 기능을 제공한다. TTL은 StateDescriptor 단위로 설정하며, 상태를 생성할 때 함께 구성한다.
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("lastLoginTime", Long.class);
descriptor.enableTimeToLive(ttlConfig);
ValueState<Long> lastLoginTime = getRuntimeContext().getState(descriptor);
TTL은 사용자 세션 관리, 임시 통계 데이터, 단기 이벤트 추적과 같은 경우에 필수적으로 사용된다.TTL을 설정하지 않으면 RocksDB 용량 증가, 체크포인트 지연, 장애 복구 시간 증가와 같은 운영 문제가 발생할 수 있다.