Consumer와 Producer, 두 가지 역할을 하는 로직 구성하기

bradley·2022년 9월 30일
1

Kafka

목록 보기
5/5
post-thumbnail

목적


Consumer이면서 동시에 Producer가 되는 경우에 대해 살펴본다.

시나리오


실시간으로 발생되는 금융 거래 내역이 있다. 금융 거래 내역을 지불 유형에 따라 분류하고, 이 중에서 수상한 거래를 Detection해서 처리하는 로직을 구성해본다.

로직은 크게 다음과 같이 구성된다.

  • 금융 거래 내역을 발생시키는 Generator (Producer)
  • 지불 유형에 따라 거래 내역을 분류하는 Classifier (Consumer & Producer)
    금융 거래 내역 Topic으로부터 데이터를 받은 후 (Consumer), Card 거래와 Coin 거래를 분류하여 각각의 Topic으로 전달한다.(Producer)
  • Card 거래 내역을 처리하는 Processor
  • Coin 거래 내역을 처리하는 Processor
    수상한 사람에게 보내지는 내역을 Detection하여 Alert을 알린다.

시나리오는 지불 유형에 따라 Topic을 분류했지만, 이상 거래 데이터만 따로 수집하는 Topic을 만들어 데이터를 분류할 수도 있을 것이다.

Topic 생성


Kafdrop에서 3개의 Topic을 만들어 준다.

  • payments
  • card_payments
  • coin_payments

Kafka 환경 및 Kafdrop에 대한 설명

Muti Broker Kafka Cluster와 통신하는 간단한 Producer & Consumer 만들기
https://velog.io/@jskim/Muti-Broker-Kafka-Cluster%EC%99%80-%ED%86%B5%EC%8B%A0%ED%95%98%EB%8A%94-%EA%B0%84%EB%8B%A8%ED%95%9C-Producer-Consumer-%EB%A7%8C%EB%93%A4%EA%B8%B0


금융 거래 내역 Generator 생성


payment_producer.py

from kafka import KafkaProducer
from datetime import datetime
import pytz
import time
import random
import json

TOPIC = 'payments'
brokers = ['localhost:9091', 'localhost:9092', 'localhost:9093']
producer = KafkaProducer(bootstrap_servers=brokers)

def get_datetime():
    utc_now = pytz.utc.localize(datetime.utcnow())
    kst_now = utc_now.astimezone(pytz.timezone('Asia/Seoul'))
    d = kst_now.strftime('%Y-%m-%d')
    t = kst_now.strftime('%H:%M:%S')
    return d, t

def generate_payment_data():
    payment_type = random.choice(['VISA', 'MASTERCARD', 'BITCOIN'])
    amt = random.randint(0, 1000000)
    to = random.choice(['me', 'dad', 'mom', 'friend', 'stranger'])
    return payment_type, amt, to

while True:
    d, t = get_datetime()
    payment_type, amt, to = generate_payment_data()

    tx = {
        'DATE': d,
        'TIME': t,
        'PAYMENT_TYPE': payment_type,
        'AMT': amt,
        'TO': to
    }

    producer.send(TOPIC, json.dumps(tx).encode('utf-8'))
    print(tx)
    time.sleep(1)

정상적으로 transaction이 생성되는 것을 확인할 수 있다.

Topic을 확인해보면 Total available messages나 Offset이 변경되어 잘 전달되는 것을 확인할 수 있다.


금융 거래 내역 Classifier 생성


payment_classifier.py

from kafka import KafkaProducer, KafkaConsumer
import json

PAYMENT_TOPIC = 'payments'
COIN_TOPIC = 'coin_payments'
CARD_TOPIC = 'card_payments'

brokers = ['localhost:9091', 'localhost:9092', 'localhost:9093']
consumer = KafkaConsumer(PAYMENT_TOPIC, bootstrap_servers=brokers)
producer = KafkaProducer(bootstrap_servers=brokers)

def is_suspicious(tx):
    if tx['PAYMENT_TYPE'] == 'BITCOIN':
        return True
    return False

for message in consumer:
    msg = json.loads(message.value.decode())
    print(msg)
    print(is_suspicious(msg))

    topic = COIN_TOPIC if is_suspicious(msg) else CARD_TOPIC
    producer.send(topic, json.dumps(msg).encode('utf-8'))

Card 내역 Processor 생성


card_processor.py

from kafka import KafkaProducer, KafkaConsumer
import json

CARD_TOPIC = 'card_payments'

brokers = ['localhost:9091', 'localhost:9092', 'localhost:9093']
consumer = KafkaConsumer(CARD_TOPIC, bootstrap_servers=brokers)

for message in consumer:
    msg = json.loads(message.value.decode())

    to = msg['TO']
    amt = msg['AMT']

    if msg['PAYMENT_TYPE'] == 'VISA':
        print(f'[VISA] payment to : {to} - {amt}')
    elif msg['PAYMENT_TYPE'] == 'MASTERCARD':
        print(f'[MASTERCARD] payment to : {to} - {amt}')
    else:
        print('[ALERT] unable to process payments')

Coin 내역 Processor 생성


coin_processor.py

from kafka import KafkaProducer, KafkaConsumer
import json

COIN_TOPIC = 'coin_payments'

brokers = ['localhost:9091', 'localhost:9092', 'localhost:9093']
consumer = KafkaConsumer(COIN_TOPIC, bootstrap_servers=brokers)

for message in consumer:
    msg = json.loads(message.value.decode())

    to = msg['TO']
    amt = msg['AMT']

    if msg['TO'] == 'stranger':
        print(f'[ALERT] fraud detected payment to : {to} - {amt}')
    else:
        print(f'[PROCESSING BITCOIN] payment to : {to} - {amt}')

실행 결과


결과는 다음과 같다.

  • 왼쪽 상단 : 금융 거래 내역 Generator - payment_producer.py
  • 왼쪽 하단 : 금융 거래 내역 Classifier - payment_classifier.py
  • 오른쪽 상단 : Card 거래 내역 Processor - card_processor.py
    VISAMASTERCARD가 나뉘어 로그가 기록되고 있다.
  • 오른쪽 하단 : Coin 거래 내역 Processor - coin_processor.py
    BITCOIN이 정상적으로 거래된 경우는 [PROCESSING BITCOIN] 로그를 기록, stranger에게 송금된 경우는 이상 거래로 판단하는 Alert이 발생하고 있다.

profile
데이터 엔지니어링에 관심이 많은 홀로 삽질하는 느림보

0개의 댓글