카프카 스트림즈 - 조인

함궈·2023년 10월 8일
2

아파치카프카

목록 보기
7/10
post-thumbnail
Join operandsType(INNER)JOINLEFT JOINOUTER JOIN
KStream-to-KStreamWindowedOOO
KTable-to-KTableNon-WindowedOOO
KStream-to-KTableNon-WindowedOOX
KStream-to-GlobalKTableNon-WindowedOOX
KTable-to-GlobalKTableN/AXXX

Join Constraints Co-partitioning of data

  • KStream/KStream
  • KTable/KTable
  • KStream/KTable

위 세가지 조인은 코-파티셔닝 되어 있을때만 가능하다.
코파티셔닝 되어 있지 않은 토픽을 조인하려고 하면 TopologyException이 발생한다.

코파티셔닝(co-partitioning) 이란?

조인을 하는 2개 데이터의 파티션 개수가 동일하고 파티셔닝 전략(partitioning strategy)을 동일하게 맞추는 작업

파티션 개수가 동일하고 파티셔닝 전략이 같아야만 동일한 메시지 키를 가진 데이터가 동일한 태스크에 들어가는 것을 보장한다.

코파티셔닝되어 있지 않으면 리파티셔닝(repartitioning)과정을 거쳐야 한다.

Join Example

온라인 광고를 예시로 들어 조인의 차이를 알아보자.

특정 광고의 view event를 다루는 kafka topic이 있고,
click event를 다루는 kafka topic이 있다고 하자.

View들과 Click들은 ID를 각각의 토픽의 키로써 공유한다고 가정한다.

  • A) click event는 view 다음으로 1초 후에 발생한다.
  • B) click event는 view 다음으로 11초 후에 발생한다.
  • C) view event는 click 다음으로 1초 후에 발생한다.
  • D) view event는 있지만 click event는 없다.
  • E) click event는 있지만 view event는 없다.
  • F) 두개의 연속적인 view event 다음으로 1초 후에 click event가 발생한다.
  • G) view event에 이어서 두개의 click event가 연달아 발생한다.

그래프에 나타나는 11초의 B 클릭은 잘못되었고, 12초여야 맞다.

Inner KStream-KStream Join

KStream간의 조인은 Window를 사용한다.
따라서 KStream간의 조인을 사용하려면 window의 크기를 명시해주어야 한다.

KStream은 stateless 하다.
조인을 하려면, 어느 정도의 내부적인 상태를 저장할 필요가 있다.
그렇지 않으면 두 스트림들은 새로운 이벤트가 도착할 때 마다 스캔되어야 한다.
여기서의 상태는 time window의 범위 안에 스트림의 모든 요소들을 포함한다.

이 예시에서는 10초 범위의 window를 사용한다.

두 스트림 간의 Inner join은 윈도우 내에 두 스트림에 key가 모두 나타날 때에 Join result를 만들어 낸다.
위에서 언급했듯이 두 토픽의 key는 ID를 공유한다고 했다.

A와 C는 10초 이내에 두 스트림에서 키가 나타났기 때문에, 순서가 다르더라도 조인된 것을 알 수 있다.

B는 조인된 결과가 보이지 않는데, 스트림 간 같은 키의 레코드가 나타나더라도, time window 내에 두 레코드가 존재하지 않으면 조인하지 않는다.

D와 E는 두 스트림에서 키가 나타나지 않았기 때문에 조인 결과가 없다.

F와 G는 조인 후 두개의 레코드로 나타난다.

Left KStream-KStream Join

Kafka에서의 모든 조인은 발생 시간을 기반으로 하기 때문에, left join은 처리 순서에 있어서 고려할 것이 많으며, sql의 left join 과는 많이 다르다.

left join은 왼쪽 스트림에 event가 도착할 때마다 조인 결과를 만들어낸다.

만약 같은 key를 가지는 event가 오른쪽 스트림에 이미 존재했었다면, 조인되며, 그렇지 않으면 null을 포함하는 조인 결과를 나타낸다.

반면에, 오른쪽 스트림에 도달하는 모든 event들은 이전에 왼쪽 스트림에서 같은 키를 가지는 event가 존재할 경우에만 조인된다.

당연히, Inner join에서 보여진 결과는 모두 포함하고 있으며, 추가적으로 B와 D에 대한 결과와, 왼쪽 스트림의 차집합 부분도 포함하고 있다.

Inner KStream-KStream Join과 비슷하게, 조인된 결과는 event time에 따른 처리 순서에 따라 정렬된 것 처럼 보인다.
그러나, 스트림즈는 event time에 따른 처리를 보장하지 않으며,
여러 번 실행하면 결과가 약간 다를 수 있다.

Outer KStream-KStream Join

둘 중 어느 스트림이든 event가 도달하자마자 조인 결과를 내보낸다.( 한쪽이 비어있는 )

만약 window의 time 내에 다른 스트림에서 같은 키를 가지는 요소가 있다면, 두 요소간에 조인을 한다.

A는 views 스트림에 들어오자마자 바로 처리되어 (A, .)를 만들어 낸다.
같은 키를 가지는 click event가 도달하면, 두 스트림간에 조인된 결과를 만들어 낸다.

B는 window의 범위를 벗어났으므로, 각 스트림의 event는 조인되지 않아 (B, B)의 결과는 만들어 내지 못한다.

Inner KTable-KTable Join

KTable간의 조인은 Window를 사용하지 않는다. KTable에 저장되는 레

하나의 input table이 업데이트된다면, result KTable도 함께 업데이트 된다.
이 result table에 대한 업데이트는 새로운 조인 결과 레코드로 처리된다.

Window를 사용하지 않기 때문에, (B, B) 레코드가 조인 결과에서 나타나는 것을 알 수 있다.

F에 대한 결과 레코드는 하나인 반면, G에 대한 결과 레코드는 두개인데,
그 이유는 F click event가 view event인 F.1, F.2이후에 나타났기 때문이다.
F가 조인하기 전에 F.1에서 F.2로 업데이트되었기 때문이다.

G는 view event가 두 click event 전에 이미 테이블에 기록되어 있고,
G.1, G.2는 시간차를 두고 테이블에 기록되므로 두개의 조인 결과를 만들어 낸다.

KStream간의 조인과 비슷하게, 레코드들이 timestamp 순서대로 처리되는 것 처럼 보인다.
실제로는 조금 다를 수 있다.
예를 들어, F click event가 view event인 F.1, F.2보다 큰 timestamp값을 가지지만, 먼저 처리되었다면,
(F.2, F) 이전에 (F.1, F)라는 조인 결과가 나타났을 수가 있다.

Left KTable-KTable Join

KTable간의 inner join 결과를 포함하면서 왼쪽 KTable의 차집합 부분을 포함하는 것을 알 수 있다.

Outer KTable-KTable Join

조인 결과는 KTable간 left join 결과를 포함하면서 오른쪽 KTable의 차집합 부분을 포함하는 것을 알 수 있다.

KTable간의 조인은 SQL 의미론과 매우 유사하여 이해하기 쉽다.
일반 SQL과의 차이점은 입력 KTable이 업데이트되면 결과 KTable이 자동으로 업데이트된다는 것이다.
따라서, 결과 KTable은 조인을 통해 지속적으로 업데이트되는 것으로 보여질 수 있다.

Inner KStream-KTable Join

KTable간 조인과 비슷하게, KStream-KTable 조인도 window를 사용하지 않는다.

KStream간의 조인, KTable간의 조인이 symmetric 한 것과 대조적으로,
KStream-KTable의 조인은 asymmetric 하다.

"symmetric"하다는 것은 왼쪽 또는 오른쪽 스트림의 입력이 조인 연산을 발생시키고 조인 결과를 만들어낼 수 있다는 것이다.

그러나 Stream-Table join에서는 오직 (왼쪽) stream의 입력만이 조인 연산을 발생시킬 수 있다. 반면에 (오른쪽) table 입력은 자신을 업데이트할 뿐이다.

왜냐하면 이 조인은 window를 사용하지 않고, 왼쪽 stream의 입력은 stateless하기 때문에, table의 입력을 통해 조인 연산을 실행 할 수 없다.

버전 2.0.x 이전

KStream간의 조인, KTable간의 조인처럼, 실제 프로세싱 순서에 대한 의존성이 존재한다.
위 두 경우의 조인에서는 이 의존성에 대해 결과에서 큰 차이를 보이지 않지만,
KStream-KTable 조인에서는 큰 차이를 보일 수 있다.

그 이유는 KStream-KTable 조인은 asymmetric(비대칭) 하기 때문이다.
예를 들면, KStream의 레코드가 KTable의 것보다 작은 timestamp 값을 가짐에도 불구하고 KTable의 것보다 먼저 처리되었다고 하면, 조인된 결과는 없을 것이다.

symmetric 조인에서는 이런 일이 벌어지지 않는다.

버전 2.1.0 이후

timestamp 동기화 기능이 향상되었고 Kafka Streams에서 레코드들의 timestamp를 기반으로 하는 처리에 관하여 확실히 보장하게 되었다.

Left KStream-KTable Join

Inner KStream-KTable 조인의 결과를 포함하며 추가적으로 왼쪽 스트림의 차집합 부분을 포함한다.

Inner KStream-GlobalKTable Join

기본적으로 KStream-KTable 조인과 비슷하지만,
GlobalKTable과 KTable의 차이는 다음과 같다.

첫번째, GlobalKTable은 미리 완성되어 있다.
처음 시작될 때, GlobalKTable의 입력 토픽의 end offset을 읽고,
토픽에서 이 offset이 가리키는 지점까지 읽어들인 후 GlobalKTable에 채워넣는다.

두번째, 만약 입력 GlobalKTable에 갱신이 발생한다면, 이 갱신은 즉시 테이블에 반영된다.

G는 G.1을 제외하고 오직 G.2와 조인되는 것을 볼 수 있는데, GlobalKTable을 생성하는 과정에서 G.1은 G.2에 의해 덮어씌워졌기 때문이다.

Left KStream-GlobalKTable Join

Inner KStream-GlobalKTable 조인의 결과를 포함하면서 추가로 왼쪽 스트림의 차집합 부분을 포함한다.


출처

최원영, 아파치 카프카 애플리케이션 프로그래밍 with 자바, 비제이퍼블릭, 2021

Udemy Kafka Streams 강의

Confluent 블로그

0개의 댓글