카프카는 단순한 메시지 버스에서 데이터 통합 시스템 및 데이터 스트림으로 대거 활용하게 되었다. 버전 0.10.0 부터 카프카는 단순한 널리 쓰이는 스트림 프로세싱 프레임워크를 위한 신회성 있는 데이터 스트림 저장소 역할을 넘어서기 시작했다. 카프카는 카프카 스트림즈라는 강력한 스트림 처리 라이브러리를 클라이언트 라이브러리의 일부로서 포함한다. 카프카 스트림즈를 사용하면 외부 처리 프레임워크에 의존할 필요없이 어플리케이션에 이벤트를 읽고 처리하고 쓰는 기능을 구현할 수 있다.
우선 데이터 스트림(이벤트 스트림 또는 스트리밍 데이터)란 무한히 늘어나는 데이터세트를 추상화한 것이다. 카드 결제, 주식 거래, 택배 배송, 이메일 전송 등 이벤트들이 무한히 계속 연속 발생하는 것이다. '무한'의 특징말고도 다른 몇몇 특징들이 존재하는데..
이벤트 스트림에는 순서가 있다
이벤트는 그 자체로 다른 이벤트 전, 후에 발생했다는 의미를 가진다. 출금 -> 입금과 입금 -> 출금의 차이가 분명한거처럼 말이다. 데이터베이스 테이블는 항상 순서가 없는 것으로 간주되기에 직접 order by 쿼리를 섞지만 스트림에는 순서가 존재한다는 점이 데이터베이스와 다르다.
데이터 레코드는 불변하다
이벤트는 한번 발생한 뒤 바뀌거나 고칠 수 없다. 구매 후 취소하여 반품하여도 구매부터 취소까지 모든 레코드를 저장한다. 만약 데이터베이스라면 데이터가 지웠다 다시 추가되어 다시 존재하는 거처럼 보인다. (redo log 에는 제거 후 다시 추가한 트랜잭션 로그가 남긴하다)
이벤트 스트림은 재생이 가능하다
몇달 전, 몇년 전 발생한 raw stream 을 그대로 재생할 수도 있다. 이는 에러를 수정하거나 새로운 분석방법을 시도하거나, 감사 수행 등을 할 때 매우 중요한 역할을 한다. 카프카는 이벤트 스트림을 캡처하고 재생할 수 있다.
이제 이벤트 스트림에 대해서 살펴봤다면 다음은 스트림 처리가 무엇인지 이해해보자. 스트림 처리란 하나 이상의 이벤트 스트림을 처리하는 것을 의미한다. 스트림 처리는 요청-응답, 배치 처리와 마찬가지로 프로그래밍 패러다임 중 하나다. 스트림 처리가 소프트웨어 아키텍처에서 차지하는 위치를 이해하기 위해 서로 다른 프로그래밍 패러다임들을 잠시 살펴보자.
요청-응답
응답시간이 밀리초 수준인 패러다임으로 가장 지연이 적다. 다만 처리 방식이 보통 블로킹 방식이라 요청 -> 응답까지 대기하는 것이 보통이다. 이 패러다임은 OLTP(online transaction processing)으로 알려져 있으며 보통 카드 결제, 시간 추적 시스템이 이 패러다임으로 작동한다.
배치 처리
이것은 지연이 크지만 처리량 또한 크다. 이러한 부류 시스템은 특정 시간에 작업이 시작하여 대량의 데이터를 배치단위로 쪼개어 처리된다. 한번 리포트가 생성되면 다음 리포트까지 충분히 적재가 된후 보아야한다는 단점이 있지만 최근 효율적인 의사결정을 위해 짧은 주기로 배치 처리하기도 한다.
스트림 처리
이것은 연속적이고 논 블로킹하게 작동하는 방식이다. 이는 요청-응답과 배치 처리 간 격차를 메워준다. 대부분의 비즈니스 프로세스는 연속적으로 발생하며, 비즈니스 리포트가 지속적으로 업데이트되고, 최일선의 비즈니스 어플리케이션들이 역시 계속해서 응답할 수만 있다면 굳이 수 밀리초 내의 응답 같은 걸 기다릴 필요 없이 처리할 수 있다. 의심스러운 카드결제, 네트워크 사용 내역, 물품 배송 추가 등 연속적이지만 논 블로킹한 처리에 딱 맞는다.
이러한 스트림 처리에 대한 정의가 특정한 프레임워크나 API, 기능을 요구하지 않는다는 점을 명시해라. 무한한 크기의 데이터세트에서 지속적으로 데이터를 읽어와 처리하고 결과를 내보낸는 것이 스트림 처리이다. 만약 오후 2시작 시작되어서 500개 데이터를 가져와 처리하고 결과를 보낸뒤 프로세스가 종료되면 그것은 스트림 처리가 아니다.
스트림 처리는 다른 데이터 처리와 유사하게 데이터를 읽고, 처리하고, 결과물을 쓰는 방식으로 비슷하다. 하지만 스트림 처리 고유의 핵심 개념이 몇 개 있다.
스트림 처리 어플리케이션은 하나 이상의 처리 토폴로지를 포함한다. 처리 토폴로지는 하나 이상의 소스 스트림, 스트림 프로세서의 그래프, 하나 이상의 싱크 스트림이 서로 연결된 것이다. 그리고 하나 이상의 소스 스트림에서 시작된 이벤트 스트림은 연결된 스트림 프로세서들을 거쳐가면서 처리되다가 마지막에는 하나 이상의 싱크 스트림에 결과물을 쓰는 것으로 끝나게된다. 스트림 프로세서는 이벤트를 변환하기 위해 가해지는 연산 단계로서 filter, map, group by, left join 등이 있다.
스트림 처리의 맥락에서 대부분의 스트림 어플리케이션이 시간 윈도우에 대해 작업을 수행하는 만큼 시간에 대해 공통적인 개념을 가지는 것은 매우 중요하다. 예를 들어, 최근 5분 사이의 주가의 이동 평균을 구하는 스트림 어플리케이션을 생각해보자. 만약 데이터를 쓰는쪽에서 이슈가 발생하여 2시간동안 묵혀놨던 데이터를 한꺼번에 보냈을 때 어떻게 처리를 해야할까? 이미 연산결과가 끝난 5분 길이의 시간 윈도우에 대해서나 의미가 있을 것이다.
이벤트 시간
이벤트가 발생하여 레코드가 생성된 시점이다. 상품이 팔린 시간, 유저가 조회한 시간 등. 0.10.0 버전 이후부터는 프로듀서 레코드 생성할 때 기본적으로 현재시간을 추가한다. 만약 이것이 어플리케이션의 이벤트 시간 개념과 일치하지 않는다면(예를 들어, 이벤트 발생하고 시간이 조금 지난 뒤에 데이터베이스 레코드를 기준으로 카프카 레코드 생성한다면), 레코드에 이벤트 시간을 가리키는 필드를 하나 추가함으로써 나중에 처리할 때 두 시간을 모두 활용할 수 있게 하는 방법을 권한다. 스트림 데이터를 처리할 때 대부분 이벤트 시간이 중요하기 때문이다.
로그 추가 시간
이것은 이벤트가 카프카 브로커에 전달된 시점이며, 접수시간이라고도 불린다. 이 또한 0.10.0 버전 이후 자동으로 레코드에 필드가 추가된다. 비교적 해당 시간 개념은 이벤트 시간보다 덜 중요하다. (이벤트 시간이 기록되지 않은 경우 로그 추가 시간을 통해서 추적할 수도 있음)
처리 시간
스트림 처리 어플리케이션이 뭔가 연산을 수행하기 위해 이벤트를 받은 시간이다. 이는 이벤트 시간보다 몇 초뒤 또는 몇 달뒤가 될 수도 있다.
카프카 스트림즈는 TimeStampExtractor 인터페이스를 사용하여 각각의 이벤트에 시간 부여한다. 카프카 스트림즈가 결과물을 카프카 토픽에 쓸 때의 다음과 같은 규칙으로 타임스탬프를 부여한다.
punctuate() 와 같이 입력과 상관없이 특정한 스케쥴에 따라 데이터를 생성하는 함수에 의해 생성된 결과 레코드 경우, 애플리케이션의 현재 내부시각에 따라 결정그리고 카프카 스트림즈의 DSL 대신 저수준 API 를 사용하는 경우 직접 타임스탬프 의미 구조를 개발할 수도 있다
시간대에 주의하라!
전체 데이터 파이프라인이 표준화된 시간대 하나만 써야한다. 그렇지 않으면 혼란을 야기한다. 만약 들어온 두 개 이상의 데이터의 타임스탬프 시간대가 다르다면 맞춰줘야할 필요가 있다.
스트림을 처리하는 동안 연산자가 내부적으로 저장하고 유지하는 데이터를 말한다.즉, “들어오는 이벤트 하나만 보고 처리하는 게 아니라”, 과거 데이터나 집계 결과를 기억해야 할 때 필요한 저장 공간을 의미한다.
로컬 혹은 내부 상태
스트림 처리 애플리케이션의 특정 인스턴스에서만 사용할 수 있는 상태로서 애플리케이션에 포함되어 구동되는 내장형 인메모리 데이터베이스를 사용해서 유지 관리한다. 이는 빠르지만, 사용 가능한 메모리 크기의 제한을 받는다.
외부 상태
NoSQL 등 외부 시스템을 사용해서 저장한다. 외부를 사용하기 때문에 크기에 제한 없음, 여러 인스턴스, 어플리케이션에서 접근 가능하지만 다른 시스템을 추가하는데에 따른 지연, 복잡도, 가용성 문제 등이 있다.
테이블은 여러 상태 변경의 결과물인 현재 상태만 저장하지만, 스트림은 변경 내역을 연속적으로 저장한다.
테이블 -> 스트림으로 변환한다면 추가, 변경, 삭제 등 이벤트를 가져와서 스트림으로 저장해야한다. 이러한 변경점들을 잡아내기 위해 CDC 솔루션을 적용하며, 카프카 커넥터를 통해 스트림 처리를 할 수 있도록 만든다.
스트림 -> 테이블로 변환한다면 스트림에 포함된 모든 변경 사항을 테이블에 적용하여 저장하면 된다. 빨간색 양말을 예시로 들면,
1. 빨간색 양말 100개 입고
2. 빨간색 양말 2개 판매
3. 빨간색 양말 1개 반품
위와 같이 스트림 내 이벤트들이 존재한다면 테이블에는 위 이벤트들이 모두 적용되어 최종적으로 빨간색 양말 99개만 남아있다.
대부분 스트림 작업은 시간을 윈도우라는 구간 단위로 잘라서 처리한다. 이동 평균을 계산하거나, 이번 주 얼마나 팔렸는지 계산하거나 등. 두 스트림을 조인하는 것 또한 윈도우 작업이다. 즉, 동일한 시간 내 발생한 이벤트끼리 조인하는 것이다. 만약 이동 평균을 계산하는 경우 다음과 같은 사항을 알아두어야한다.
윈도우 크기가 커질수록 이동 평균 완만해지지만 랙(lag) 역시 커진다. 큰 윈도우로 잡으면 작은 윈도우보다 갑작스러운 주가 변동에 더 알아차리기 어렵다). 그리고 카프카 스트림은 윈도우의 크기가 비활동 기간의 길이에 따라 결정되는 세션 윈도우 역시 지원한다. 예를 들어,
# 비활성 간격 5분으로 설정했다 가정
10:00
10:02
10:04
(6분 침묵)
10:10
10:12
와 같이 이벤트가 들어왔다면 윈도우는 다음과 같이 나뉘어진다.
[10:00 ~ 10:04] ← 세션1
[10:10 ~ 10:12] ← 세션2
윈도우의 크기와 윈도우 사이의 고정된 시간 간격이 같은 '호핑 윈도우'와 집행 간격과 윈도우 크기가 같은 윈도우 '텀블링 윈도우' 가 있다.
# 텀블링 윈도우
|----5분----|----5분----|----5분----|
# 호핑 윈도우
|----5분----|
|----5분----|
|----5분----|
00:00 ~ 00:05 시간 윈도우에 대해 5분 단위 이동 평균을 계산했다고 가정하자. 이로부터 1시간 뒤 00:02 의 입력 레코드가 들어온 경우 어떻게 처리해야할까? 이럴 때는 윈도우에 이벤트가 추가될 수 있는 기한을 두고, 만약 기한을 넘어서 들어왔다면 무시하는 것이 좋다.
스트림 처리 어플리케이션에 있어서 핵심적인 요구 조건 중 하나는 장애가 발생했을 경우에도 각각의 레코드를 한 번만 처리할 수 있는 능력이다. 8장에서 논의한 것과 같이 아파치 카프카는 트랜잭션적이고 멱등적 프로듀서 기능을 통해 '정확히 한 번' 의미 구조를 지난다. processing.guarantee 설정을 excatly_once_v2 로 사용하여 정확히 한 번 보장 기능을 활성화시킬 수 있다.
가장 간단한 형태로 컨슈머, 처리로직, 프로듀서를 엮어 놓은게 있는가하면, 클러스터 상에서 기계 학습 라이브러리와 함께 돌아가는 스파크 스트리밍과 같은것도 있다. 스트림 처리 관련 알려진 패턴 중 몇가지를 살펴보자.
가장 기본적인 스트림 처리 패턴이다.
각 이벤트를 독립적으로 처리하며, 다른 이벤트와의 관계나 상태(state)를 고려하지 않는다.
주로 불필요한 이벤트를 스트림에서 걸러내거라 각 이벤트를 변환할 떄 사용한다.
[Producer]
│
▼
┌─────────┐
│ Event │
└─────────┘
│
▼
┌──────────────┐
│ Processing │ ← 상태 저장 없음
│ (Stateless) │
└──────────────┘
│
▼
┌─────────┐
│ Output │
└─────────┘
│
▼
[Consumer]
대부분의 스트림 처리 어플리케이션은 윈도우 집계와 같이 정보의 집계에 초점을 맞춘다. 매일 주식 최저가와 최고가를 찾고 주가의 이동평균을 구하는 등. 이처럼 집계를 할 때 스트림의 상태를 유지할 필요가 있다.
즉, 단일 이벤트 처리는 상태를 사용하지 않지만, 실제 많은 스트림 처리는 이벤트 간의 관계를 기억해야 한다. 이때 사용하는 것이 로컬 상태(Local State) 이다.
각 처리 인스턴스는 자신이 담당하는 파티션에 대해 상태를 로컬에 저장하고 갱신하면서 처리한다.
즉, 이벤트를 처리할 때 이전 이벤트의 결과를 참고할 수 있다.
[Stream]
│
▼
┌──────────────┐
│ Processor │
│ + State │ ← 로컬 상태 저장소
└──────────────┘
│
▼
[Output]
로컬 상태는 주로 메모리 안에 저장하여 성능상 이점을 가져가며, 어플리케이션이 종료되어도 유실되지 않도록 카프카 스트림즈에 내장된 RocksDB 를 사용하여 인메모리 뿐만 아니라 디스크에도 저장할 수 있다. 그리고 로컬 상태 변경 내역은 카프카 토픽에도 보내짐으로 스트림 처리를 담당하고 있는 노드에 장애가 발생한다 하더라도 유실되지 않는다.
마지막으로 파티션은 리밸런싱에 의해 다른 컨슈머에게 할당될 수도 있다. 이때 컨슈머에게 이전 마지막 상태를 전달함으로써 이전 상태를 복구시키도록 해야한다.
그룹별 집계가 필요할 때 로컬 상태 뿐만 아니라 사용 가능한 모든 정보를 사용해서 내야하는 경우를 살펴보자. 상위 10개 주식 전체가 서로 다른 인스턴스에 할당된 파티션에 분산되어 있을 수 있는 탓에 이 경우 각각의 어플리케이션 인스턴스에서 따로 작업하는 것으로는 충분하지 않다. 따라서 이런 경우 여런 단계를 거쳐서 처리를 해야한다.
주식거래토픽 (여러 파티션)
↓
상승/하락 처리
↓
상승하락토픽 (단일 파티션)
↓
상위 10개 종목 산출
↓
상위 10개 토픽 (단일 파티션)
위처럼 기존에 주식들의 상승/하락을(여러 파티션들을) 2개 이상의 단계를 거쳐 하나의 파티션으로 처리하는 것이 다단계 처리/리파티셔닝 패턴이다.
[주식거래토픽]
│
▼
┌──────────────────┐
│ 1단계: 상승/하락 처리 │ (각 주식들 처리)
└──────────────────┘
│
▼
[상승하락토픽]
│
▼ ← Repartition 발생
┌──────────────────┐
│ 2단계: 상위 10 계산 │ (전역 비교)
└──────────────────┘
│
▼
[상위 10개 토픽]
스트림 데이터만으로는 충분하지 않은 경우가 있다. 이때 외부 테이블의 데이터를 조회하여 함께 처리한다.
간단하게 데이터 확장을 위해 외부 테이블을 조인하는 경우 다음과 같은 아키텍쳐를 생각할 수 있다.
┌────────────────────┐
│ 프로필 테이터베이스 │
└────────────────────┘
▲
│ (확장)
▼
[클릭 이벤트 토픽] ─────┼────────▶ [확장된 클릭 이벤트 토픽]
그러나 스트림 처리 시스템은 초당 10만~50만을 이벤트를 처리하는 반면, 데이터베이스는 초당 만개 가량 처리를 할 수 있는게 보통이기에 성능과 가용성 측면에서 많은 복잡성이 요구된다.
성능과 가용성, 두마리 토끼를 잡기 위해 스트림 처리 어플리케이션 안에 데이터베이스 관련 캐시를 적용할 필요가 있다. 데이터베이스에 직접 이벤트 변경점을 갖고 오면 데이터베이스에 부하가 생길 수 있으므로 데이터베이스에서 변경이 발생할때마다 데이터베이스 변경 토픽에 변경점을 저장하여 또 다른 스트림으로 받아오도록 하면 깔끔하게 해결할 수 있다(데이터베이스 변경 내역을 이벤트 스트림으로 받아오는 것을 CDC 라고 하며, 카프카 커넥트 등을 활용해 적용할 수 있다)
┌────────────────────┐
│ 프로필 테이터베이스 │
└────────────────────┘
│ (변경점 캡처)
▼
[사용자 프로필 토픽]
▼
[클릭 이벤트 토픽] ─────┼────────▶ [확장된 클릭 이벤트 토픽]
(조인)
두 개의 테이블(상태 데이터) 을 서로 조인하는 패턴이다. 스트림 이벤트를 즉시 처리하는 것이 아니라, 각 테이블의 상태가 변경될 때 결과가 갱신된다.
카프카 스트림즈는 역시 두 개의 테이블에 대해 외래 키 조인을 지원한다. 한 스트림 혹은 테이블의 키와 다른 스트림 혹은 테이블의 임의의 필드를 조인할 수 있는 것이다.
두 개의 스트림을 서로 조인하는 패턴이다. 두 스트림 모두 계속 이벤트가 들어오며, 일정 시간 범위 안에서 조건이 맞는 이벤트끼리 매칭한다.
테이블 끼리 조인은 현재 상태를 조인해서 과거 이벤트는 신경안써도 되지만 스트림 간 조인은 일정 시간 내 이벤트들을 조인하는 것이기 때문에 과거 이벤트도 중요하다. 한쪽 스트림 내 이벤트의 키와 동일한 다른 스트림 내 이벤트 간 일정 시간 내 이벤트들을 모두 조인한다.
[주문스트림] ──┐
├── Window 내 매칭 ──▶ [결과 스트림]
[체결스트림] ──┘
TODO: 440 페이지 그림 대해서 물어보기
스트림 데이터는 항상 순서대로 도착하지 않는다. 네트워크 지연, 재전송, 파티션 차이 등으로 인해 이전 시점의 이벤트가 나중에 도착할 수 있다. 이를 비순차 이벤트(Out-of-Order Event) 라고 한다.
# 정상
10:00 → 180
10:01 → 181
10:02 → 179
# 비순차
10:00 → 180
10:02 → 179
10:01 → 181 ← 늦게 도착
비순차 이벤트가 발생한 경우, 스트림 어플리케이션은 순서가 벗어낫음을 즉각 알아차려야하며, 복구 시간 영역 내 발생했다면 처리하고 그렇지 않다면 과감히 버린다.
만약 배치작업이였다면 일괄적으로 어제 작업을 다시 돌리면 되지만, 스트림 작업은 계속 돌아가기 때문에 주어진 시점 기준으로 오래된 이벤트와 새로운 이벤트 모두 처리 가능해야한다. 그리고 스트림 처리 결과가 데이터베이스에 쓰여진다면 특정 경우에는 업데이트가 필요할 수 있다. 카프카 스트림즈 API 는 언제나 집계 결과를 결과 토픽에 쓴다. 따라서 집계 윈도우의 결과가 늦게 도착한 이벤트로 변경되어야 하는 경우, 카프카 스트림즈는 단순히 해당 집계 윈도우의 새로운 결과값을 씀으로써 기존 결과값을 대체한다.
만약 기존 스트림 어플리케이션에서 새로운 버전의 스트림 어플리케이션으로 변경한다면, 다음 방법을 고려할 수 있다.
이때 장점은 기존 결과를 바로 교체하지 않는다는 것이다. 즉, 일정 기간 동안 두 버전의 결과를 비교할 수 있다. 따라서 이 경우 오랫동안 이벤트 스트림이 온전히 저장된다.
어플리케이션을 초기화해서 입력 스트림의 맨 처음부터 다시 처리하고, 기존 스트림 내용 역시 재처리해야 될 수도 있다. 이 경우 기존 결과를 저장하지 않아 위험할 수 있으니 되도록이면 위 방법을 권장한다.
스트림 처리는 보통 결과를 토픽으로 내보내는 것으로 끝난다. 하지만 경우에 따라 외부 애플리케이션이 현재 상태를 직접 조회해야 할 때가 있다. 이때 사용하는 패턴이 인터랙티브 쿼리이다.
스트림 애플리케이션이 유지하고 있는 로컬 상태 저장소를 외부에서 질의(Query) 할 수 있도록 하는 방식이다.
아파치 카프카는 2개의 스트림 API 를 제공한다. 하나는 저수준의 Processor API 이고, 다른 하나는 고수준의 스트림즈 DSL 이다. 다음 예제에서는 고수준의 스트림즈 DSL 에 대해서만 살펴본다.
저수준의 Processor API 는 https://kafka.apache.org/28/streams/developer-guide/processor-api 를 참고하자!
카프카 스트림즈(Kafka Streams)의 첫번째 살펴볼 예제는 WordCount(단어 개수 세기) 이다. 이 예제는 입력 토픽에서 문자열 데이터를 읽어 각 단어의 등장 횟수를 집계한 후, 그 결과를 다른 토픽으로 출력하는 간단한 스트림 처리 애플리케이션이다.
우선 가장 먼저해야될 것은 카프카 스트림즈를 설정하는 것이다.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
다음으로 스트림즈 토폴리지를 생성해보자.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("wordcount-input");
final Pattern pattern = Pattern.compile("\\W+");
KStream counts = source.flatMapValues(value-> Arrays.asList(pattern.split(value.toLowerCase())))
.map((key, value) -> new KeyValue<Object, Object>(value, value))
.filter((key, value) -> (!value.equals("the")))
.groupByKey()
.count().mapValues(value->Long.toString(value)).toStream();
counts.to("wordcount-output");
위 토폴로지에서 프로세스는 다음과 같이 처리한다.
입력 토픽(wordcount-input)
↓
문장 → 단어 분리
↓
(key, word) 형태로 재구성
↓
"the" 제거
↓
단어 기준 그룹화
↓
단어 개수 집계
↓
Long → String 변환
↓
출력 토픽(wordcount-output)
예를 들어, "kafka streams is fun" 문장이 들어왔다면 처리과정은 다음과 같다.
입력 토픽(wordcount-input)
↓
문장 → 단어 분리 # kafka, streams, is, fun 으로 쪼개짐
↓
(key, word) 형태로 재구성 # ("kafka", "kafka"), ("streams", "streams"), ("is", "is"), ("fun", "fun")
↓
"the" 제거
↓
단어 기준 그룹화 # ("kafka", "kafka"), ("streams", "streams"), ("is", "is"), ("fun", "fun")
↓
단어 개수 집계 # kafka 1개, streams 1개, ...
↓
Long → String 변환 # "1" -> 1
↓
출력 토픽(wordcount-output)
최종적으로 출력토픽에는 kafka 1개, streams 1개, is 1개, fun 1개로 토픽에 저장될 것이다.
이제 어플리케이션이 수행할 변환의 흐름을 정의했으므로 다음과 같이 실행시키기만 하면 된다.
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
... # 스트림이 동작하는 중
# 이후 닫고싶을 때
streams.close();
이번에는 주식 시장 거래에 있어서 다음과 같은 결과를 얻어보자.
예를 단순하게 하기 위해서, 거래소에는 10개의 종목만 거래되어있다고 가정하자.
설정은 앞서 14.4.1 과 비슷하다.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, TradeSerde.String().getClass().getName());
Value 만 TradeSerde 로 변경되었는데 이는 구글이 개발한 Gson 라이브러리를 사용해서 별도로 만든 커스텀 시리얼/디시리얼라이저이다.
이제 토폴로지를 생성해보자.
StreamsBuilder builder = new StreamsBuilder();
// (1) 입력 스트림: key=종목(symbol), value=Trade
KStream<String, Trade> trades = builder.stream("stocktrades-input");
// (2) 5초 윈도우 정의
TimeWindows windows = TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(5));
// (3) groupByKey -> windowedBy -> aggregate
KTable<Windowed<String>, TradeStats> aggregated = trades
.groupByKey() // 1) 종목 코드(key) 기준으로 그룹화
.windowedBy(windows) // 2) 5초 단위 시간 윈도우 적용
.aggregate(
TradeStats::new, // 초기값: 윈도우/종목별 TradeStats 생성
(symbol, trade, stats) -> stats.add(trade), // 새 Trade가 들어올 때마다 누적(add)
Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("stockstats-store")
.withKeySerde(Serdes.String())
.withValueSerde(new TradeStatsSerde()) // 3) 상태 저장소 value serde 지정
);
// (4) KTable -> KStream 변환 (윈도우 집계 결과를 이벤트 스트림으로 내보내기 위함)
KStream<Windowed<String>, TradeStats> statsStream = aggregated.toStream();
// (5) 평균가 계산을 포함한 최종 변환
KStream<Windowed<String>, TradeStats> stats = statsStream
.mapValues(tradeStats -> tradeStats.computeAvgPrice());
// (6) 출력 토픽에 기록
stats.to(
"stockstats-output",
Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), new TradeStatsSerde())
);
최신 Kafka Streams에서는 TimeWindows.of()는 deprecated
→ ofSizeWithNoGrace() 사용 권장
만약 입력 토픽으로부터 다음과 같이 데이터가 들어왔다고 가정하자
key=“AAPL”, value={price: 101.2, shares: 50, side: “SELL”, ...}
key=“AAPL”, value={price: 100.9, shares: 10, side: “SELL”, ...}
key=“MSFT”, value={price: 55.1, shares: 20, side: “SELL”, ...}
...
위와 같이 윈도우가 적용된 집계 연산을 스트림에 대해 수행하는 방법이 대게 많이 사용된다. 하지만 이 어플리케이션은 여러 인스턴스로 확장이 가능하면서도 각각의 인스턴스에 장애가 발생할 경우, 일부 파티션에 대한 처리 작업을 다른 인스턴스로 이전함으로써 자동으로 복구된다. (요건 14.5 에서 살펴보자)
이번에는 클릭 스트림 + 프로필 데이터베이스 테이블에 대해서 살펴보자. 우선 아래와 같이 클릭이벤트에 프로필 데이터를 추가하여 확장시키는 예시를 보자.
[클릭 이벤트] KStream
userId=100, page="sports"
+
[프로필] KTable
userId=100, region="Seoul", age=29
↓ KStream + KTable Join
[확장 결과]
userId=100, page="sports", region="Seoul", age=29
이제 다수의 스트림을 조인하는 토폴로지를 살펴보자
// 입력
KStream<Integer, PageView> views = builder.stream(
"pageviews-input",
Consumed.with(Serdes.Integer(), new PageViewSerde())
);
KStream<Integer, Search> searches = builder.stream(
"searches-input",
Consumed.with(Serdes.Integer(), new SearchSerde())
);
KTable<Integer, UserProfile> profiles = builder.table(
"userprofiles-input",
Consumed.with(Serdes.Integer(), new UserProfileSerde())
);
위와 같이 조인하고자하는 두 개의 스트림 객체인 클릭과 검색을 생성, 그리고 사용자 프로필을 저장할 테이블을 정의한다. 여기서 profiles 는 KTable 객체로서 KTable 은 변경 스트림에 의해 갱신되는 구체화된 저장소(materialized store)이다.
이제 다수의 스트림을 조인하는 토폴로지를 생성하자.
# 1번
KStream<Integer, UserActivity> viewsWithProfile =
views.leftJoin(
profiles,
(page, profile) -> {
// 프로필은 없을 수 있음 (left join)
if (profile != null) {
return new UserActivity(profile.getUserID(), ..., page.getPage());
} else {
return new UserActivity(-1, ..., page.getPage());
}
}
);
# 2번
KStream<Integer, UserActivity> userActivityKStream =
viewsWithProfile.leftJoin(
searches,
(userActivity, search) -> {
if (search != null) {
userActivity.updateSearch(search.getSearchTerms());
}
return userActivity;
},
JoinWindows.of(Duration.ofSeconds(1))
.before(Duration.ofSeconds(0)),
StreamJoined.with(
Serdes.Integer(),
new UserActivitySerde(),
new SearchSerde()
)
);
1번 코드
viewsWithProfile 은 클릭 데이터에 프로필 데이터를 추가하여 확장한 코드이다. leftJoin 을 함으로써 프로필이 없더라도 클릭 이벤트는 그대로 보존되게끔 한다. 최종적으로는 UserActivity 객체를 반환한다.
2번 코드
UserActivity 와 검색 데이터를 조인한다. 만약 검색이 있다면 userActivity 의 검색 단어를 업데이트해준다. 이후 1초 간격으로 윈도우를 생성한 뒤 0초 간격의 before 를 호출해서 검색 후 1초 동안 발생한 클릭만 조인한다. 그 결과는 관련이 있는 클릭과 검색어, 그리고 사용자 프로필을 포함하게 될 것이다. 마지막으로 조인 결과에 대한 Serde 를 정의한다. 이것은 조인 양쪽에 공통인 키값에 대한 Serde와 조인 결과에 포함될 양쪽의 밸류값에 대한 Serde를 포함한다. 이 경우, 키는 사용자 ID 이므로 단순한 Integer형 Serde 를 사용한다.
앞서 카프카 스트림을 어떻게 사용하는지 디자인 패턴에 대해 알아보았다면, 이번에는 카프카 스트림 내부에 어떻게 동작하는지 살펴보자.
토폴로지는 다른 스트림 처리 프레임워크에서는 DAG 혹은 유향 비순환 그래프로 표현되는데, 모든 이벤트가 입력에서 출력으로 이동하는 동안 수행되는 작업과 변환 처리의 집합이라고 볼 수 있다.
┌──────────────────────────┐
│ Kafka Topic │
│ wordcount-input │
└─────────────┬────────────┘
│ (Source Processor)
▼
┌──────────────────────────┐
│ KStream<String, String> │
│ source │
└─────────────┬────────────┘
│ flatMapValues: 문장 → 단어들
▼
┌──────────────────────────┐
│ KStream<String, String> │
│ (null, "kafka") ... │
└─────────────┬────────────┘
│ map: (key,value) -> (word, word)
▼
┌──────────────────────────┐
│ KStream<String, String> │
│ ("kafka","kafka") ... │
└─────────────┬────────────┘
│ filter: 예) "the" 제거 (선택)
▼
┌──────────────────────────┐
│ KStream<String, String> │
│ ("kafka","kafka") ... │
└─────────────┬────────────┘
│ groupByKey()
│ (필요 시 repartition 토픽 생성)
▼
┌──────────────────────────────┐
│ (optional) Kafka internal │
│ <appId>-repartition │
└──────────────┬───────────────┘
▼
┌───────────────────────────────────────────┐
│ COUNT (Stateful Aggregation) │
│ .count() -> KTable<String, Long> │
└─────────────┬─────────────────────────────┘
│
│ 로컬 상태 저장 (각 인스턴스에 유지)
▼
┌─────────────────────────────────┐
│ Local State Store (RocksDB 등) │
│ wordcount-store │
│ key=word, value=count(Long) │
└──────────────┬──────────────────┘
│ 내구성/복구를 위한 변경로그 # 추후 인스턴스가 바뀔 수 있으므로
▼
┌─────────────────────────────────┐
│ Kafka internal changelog topic │
│ <appId>-wordcount-store-changelog │
└─────────────────────────────────┘
│
│ toStream() + mapValues(Long->String)
▼
┌──────────────────────────┐
│ KStream<String, String> │
│ ("kafka","3") ... │
└─────────────┬────────────┘
│ to("wordcount-output")
▼
┌──────────────────────────┐
│ Kafka Topic │
│ wordcount-output │
└──────────────────────────┘
카프카 스트림즈는 DSL API 를 사용해서 개발된 어플리케이션의 각 DSL 메서드를 독립적으로 저수준 API 로 변환하여 실행한다. 각각의 DSL 메서드를 독립적으로 ㅂ녀환하기 때문에 결과 토폴로지는 전체적으로 그리 최적화되지 않은 상태일 수 있다.
카프카 스트림즈 어플리케이션의 실행은 아래 3단계로 이루어진다.
1. KStream, KTable 객체를 생성하고 여기에 필터, 조인과 같은 DSL 작업을 수행함으로써 논리적 토폴로지를 정의한다.
2. StreamsBuilder.build() 메서드가 논리적 토폴로지로부터 물리적 토폴로지를 생성한다.
3. KafkaStreams.start() 가 토폴로지를 실행시키낟. 데이터를 읽고, 처리하고, 쓰는 곳이 바로 여기다.
위 단계에서 두번째 단계에 최적화를 적용할 수 있다. StreamsConfig.TOPOLOGY_OPTIMIZATION 설정값을 StreamsConfig.OPTIMIZE 로 잡아준뒤 build(props) 호출함으로써 활성화시킬 수 있다.
카프카 스트림즈 어플리케이션에서의 주된 테스트 툴은 TopologyTestDriver다. 1.1.0 버전에서 처음 도입되고, 2.4.0 부터 사용이 더 쉬워졌다. 입력 데이터를 정의하고, 목업(mock) 입력 토픽에 데이터를 쓰고, 테스트 드라이버를 써서 토폴로지를 실행시키고, 목업(mock) 출력 토픽에 결과를 읽어 예상 값과 동일한지 확인한다.
그리고 통합테스트에서는 EmbeddedKafkaCluster, Testcontainers 의 두 통합 테스트 프레임워크가 자주쓰인다.
카프카 스트림즈는 하나의(또는 다수의) 어플리케이션 인스턴스 안에서 다수의 스레드가 실행될 수 있게 함으로써 규모 확장과 서로 다른 어플리케이션 인스턴스 간에 부하 분산이 이루어지도록 한다.
카프카 스트림즈 엔진은 토폴로지의 실행을 다수의 태스크로 분할함으로써 병렬 처리한다. 스트림즈 엔진은 어플리케이션이 처리하는 토픽의 파티션 수에 따라 태스크 수를 결정한다. 각 태스크는 전체 파티션 중 일부를 책임져 처리한다. 즉, 각 태스크는 자신이 담당하는 파티션들을 구독해서 이벤트를 읽어오고 처리해서 싱크에 쓴다. 이러한 태스크는 서로 완전히 독립적으로 실행되기에 병렬 처리의 기본 단위가 된다.
Kafka Cluster
┌─────────────────────────────────────────────┐
│ Topic: wordcount-input │
│ │
│ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ │
│ │ P0 │ │ P1 │ │ P2 │ │ P3 │ │
│ └───┬───┘ └───┬───┘ └───┬───┘ └───┬───┘ │
└───────┼─────────┼─────────┼─────────┼───────┘
│ │ │ │
▼ ▼ ▼ ▼
Kafka Streams Application
┌────────────────────────────────────────────────────┐
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Thread 1 │ │ Thread 2 │ │
│ │ │ │ │ │
│ │ ┌────────┐ │ │ ┌────────┐ │ │
│ │ │ Task 0 │ │ │ │ Task 2 │ │ │
│ │ │ (P0) │ │ │ │ (P2) │ │ │
│ │ └────────┘ │ │ └────────┘ │ │
│ │ │ │ │ │
│ │ ┌────────┐ │ │ ┌────────┐ │ │
│ │ │ Task 1 │ │ │ │ Task 3 │ │ │
│ │ │ (P1) │ │ │ │ (P3) │ │ │
│ │ └────────┘ │ │ └────────┘ │ │
│ └─────────────┘ └─────────────┘ │
│ │
└────────────────────────────────────────────────────┘
Kafka Cluster
┌───────────────────────────────────────────────────────────┐
│ Topic: wordcount-input (Partitions = Tasks = 8) │
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ P0 │ │ P1 │ │ P2 │ │ P3 │ │ P4 │ │ P5 │ ...│
│ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │
│ │ │ │ │ │ │ │
│ ┌──────┐ ┌──────┐ │
│ │ P6 │ │ P7 │ │
│ └──┬───┘ └──┬───┘ │
└─────┼────────┼───────────────────────────────────────────┘
│ │
│ (각 파티션은 특정 Task에 할당됨)
▼ ▼
Kafka Streams Application (여러 서버/인스턴스)
┌───────────────────────────────────────────────────────────┐
│ Server A │
│ ┌───────────────────────────────┐ │
│ │ Tasks (2개) │ │
│ │ ┌──────────────┐ │ │
│ │ │ Task 0 (P0) │ ← P0 구독 │ │
│ │ │ - process │ │ │
│ │ │ - state store│ │ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ Task 1 (P1) │ ← P1 구독 │ │
│ │ │ - process │ │ │
│ │ │ - state store│ │ │
│ │ └──────────────┘ │ │
│ └───────────────────────────────┘ │
│ │
│ Server B │
│ ┌───────────────────────────────┐ │
│ │ Tasks (2개) │ │
│ │ ┌──────────────┐ │ │
│ │ │ Task 2 (P2) │ ← P2 구독 │ │
│ │ │ - process │ │ │
│ │ │ - state store│ │ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ Task 3 (P3) │ ← P3 구독 │ │
│ │ │ - process │ │ │
│ │ │ - state store│ │ │
│ │ └──────────────┘ │ │
│ └───────────────────────────────┘ │
│ │
│ Server C │
│ ┌───────────────────────────────┐ │
│ │ Tasks (2개) │ │
│ │ ┌──────────────┐ │ │
│ │ │ Task 4 (P4) │ ← P4 구독 │ │
│ │ │ - process │ │ │
│ │ │ - state store│ │ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ Task 5 (P5) │ ← P5 구독 │ │
│ │ │ - process │ │ │
│ │ │ - state store│ │ │
│ │ └──────────────┘ │ │
│ └───────────────────────────────┘ │
│ │
│ Server D │
│ ┌───────────────────────────────┐ │
│ │ Tasks (2개) │ │
│ │ ┌──────────────┐ │ │
│ │ │ Task 6 (P6) │ ← P6 구독 │ │
│ │ │ - process │ │ │
│ │ │ - state store│ │ │
│ │ └──────────────┘ │ │
│ │ ┌──────────────┐ │ │
│ │ │ Task 7 (P7) │ ← P7 구독 │ │
│ │ │ - process │ │ │
│ │ │ - state store│ │ │
│ │ └──────────────┘ │ │
│ └───────────────────────────────┘ │
└───────────────────────────────────────────────────────────┘
조인 등 작업으로 두 개 이상의 파티션에서 입력을 가져와야하는 경우
조인 작업에 필요한 모든 파티션들을 하나의 태스크에 할당함으로써 해결한다. 이렇기 때문에 조인 작업에 사용될 모든 토픽에 대해 동일한 조인 키로 파티션된 동일한 수의 파티션을 가질 것을 요구하는 것이다.
리파티셔닝하는 경우 (groupBy 등)
리파티셔닝을 하는 경우 소스 토픽 -> 리파티션 토픽 -> 싱크 토픽 순으로 처리된다. 즉, 전체 토폴로지를 소스 -> 리파티션, 리파티션 -> 싱크 2개의 서브 토폴로지로 분할한다.
입력 토픽 (Input Topic)
Partitions
┌──────┐ ┌──────┐ ┌──────┐
│ P0 │ │ P1 │ │ P2 │
└──┬───┘ └──┬───┘ └──┬───┘
│ │ │
▼ ▼ ▼
태스크 (기존 키 기준 처리)
┌──────┐ ┌──────┐ ┌──────┐
│Task0 │ │Task1 │ │Task2 │
└──┬───┘ └──┬───┘ └──┬───┘
\ | /
\ | /
\ | /
▼ ▼ ▼
리파티션 토픽 (<appId>-repartition)
┌──────┐ ┌──────┐ ┌──────┐
│ R0 │ │ R1 │ │ R2 │
└──┬───┘ └──┬───┘ └──┬───┘
│ │ │
▼ ▼ ▼
태스크 (새 키 기준 처리)
┌──────┐ ┌──────┐ ┌──────┐
│Task3 │ │Task4 │ │Task5 │
└──────┘ └──────┘ └──────┘
결론: 카프카 스트림은 확장을 용이하게 할 수 있으며, 이는 데이터를 처리하는 스레드 내 테스크들이 서로 독립적으로 처리하기 때문에 가능하다.
카프카는 매우 가용성이 높은 시스템이다. 만약 장애가 발생하여 재시작이 필요하다면, 장애가 발생하기 전 마지막으로 커밋된 오프셋을 카프카에서 가져옴으로써 처리하던 스트림의 마지막으로 처리된 지점부터 처리를 재개할 수 있다. 단, 로컬 상태 저장소가 유실되었을 경우(예를 들어, 로컬 상태 저장소를 가지고 있던 서버를 새로운 걸로 바꾸는 경우), 스트림즈 어플리케이션은 항상 카프카로부터 변경 로그를 읽어와 로컬 상태 저장소를 복구한다는 점을 염두에 두자.
카프카 스트림즈는 태스크 고가용성을 지원하기 위해 카프카의 컨슈머 코디네이션 기능을 사용한다. 즉, 카프카에서 컨슈머 그룹 내 컨슈머들을 관리하는 것과 비슷하게, 만약 특정 테스크에 장애가 났다면 사용 가능한 다른 테스크의 스레드를 찾아 재시작하게 된다.
장애가 났을 때 카프카로부터 저장된 내부 토픽을 다시 읽어와서 카프카 스트림즈의 상태 저장소를 업데이트한다면 이 작업을 수행하는 동안 일부 데이터에 대해서 처리가 지연될 것이다. 따라서 복구 시간을 줄이기 위해서는 빠르게 읽어올 수 있도록 카프카 스트림즈 토픽에 매우 강력한 압착 설정을 걸어놓는 것이다. min.compaction.lag.ms 는 낮추고 세크먼트 크기는 기본값인 1GB 대신 100MB 정도로 낮추는 방법이 있다. (단, 마지막 활성화된 세그먼트는 압착되지 않음)
더 빠른 장애 복구를 하는 방법은 스탠바이 레플리카를 설정하는 방법이 있다. 이 경우 스탠바이 역할을 하는 서버가 이미 최신상태를 유지하기 때문에 바로 교체하여 바로 복구할 수 있다.
대형 호텔 체인에서는 예약 생성, 취소, 일정 변경, 객실 타입 변경, 결제 상태 변경 등 고객 여정에서 발생하는 이벤트가 매우 많다. 전통적인 방식(배치 동기화, 시스템 간 직접 연동)에서는 변경 사항이 늦게 반영되거나, 시스템별 데이터 불일치가 발생하기 쉽다. 스트림 처리를 도입하면 예약 이벤트를 단일 이벤트 스트림으로 발행하고, 호텔 체인의 각 시스템이 이를 실시간 구독하여 자신의 상태를 즉시 갱신할 수 있다.
IoT 환경에서는 센서(온도, 진동, 전력, 위치 등)에서 초당 수십~수천 건의 텔레메트리 데이터가 발생한다. 이 데이터는 저장만 해두면 가치가 제한적이고, 실시간으로 처리해야 운영 효율과 안전성을 높일 수 있다. 스트림 처리는 센서 데이터의 유입 즉시 필터링, 정규화, 윈도우 집계(예: 1분 평균/최대값), 이상치 탐지(갑작스러운 급등/급락) 등을 수행하고, 그 결과를 모니터링 대시보드나 알림 시스템으로 전달한다.
사기 탐지는 “사기 거래가 발생한 뒤”가 아니라, 거래가 발생하는 순간에 탐지하고 차단하는 것이 핵심이다. 카드 결제, 계좌 이체, 포인트 전환, 로그인 시도 같은 이벤트는 실시간으로 들어오며, 스트림 처리 시스템은 짧은 시간 안에 사용자 행동 패턴을 분석해 위험도를 계산한다.
스트림 처리 프렘이워크를 선택할 때는 어떠한 형태의 어플리케이션을 개발하고자 하는지를 고려해야한다.
데이터 수집
스크림 처리 시스템이 필요한 것인지, 아니면 수집에 최적화된 카프카 커넥트 같은 시스템이 필요한 것인지 숙고해 볼 필요가 있다.
밀리초 단위 작업
이 경우 대체로 요청-응답 패턴이 더 낫다. 스트림 처리 시스템을 필요로 하는 것이 확실하다면, 마이크로 배치 방식을 택하는 것보다 이벤트 단위 저지연 방식을 지원하는 쪽을 선택해야 할 것이다.
비동기 마이크로 서비스
사용하는 메시지 버스(카프카 등)와 잘 통합되고, 업스트림의 변경 사항을 마이크로 서비스의 로컬 상태에 쉽게 반영할 수 있으며, 로컬 상태를 캐시 혹은 구체화된 휴 형태로 활용 가능한 스트림 처리 시스템이 필요하다.
복잡한 분석 엔진
이 경우 복잡한 집계 연산, 윈도우, 조인 등을 지원이 필요할 때 스트림 처리 시스템이 필요하다.
이벤트 단위 저지연 방식이란?
각 이벤트(레코드)를 개별적으로, 도착 즉시 처리하는 방식이다.
• 데이터를 모아서 처리하지 않고
• 이벤트 1건이 들어오면 바로 처리
• 지연(latency)을 밀리초(ms) 단위까지 낮추는 처리 모델이벤트 마이크로 배치와는 약간 다르다. 마이크로 배치는
• Spark Streaming 초기 모델
• 1초, 500ms, 100ms 등
• 일정 시간 동안 이벤트를 모았다가
• 한 번에 묶어서 처리
특정한 활용 사례에 국한된 고려사항 외에도 보편적으로 고려해야할 사항 역시 있다.