Consumer이면서 동시에 Producer가 되는 경우에 대해 살펴본다.
실시간으로 발생되는 금융 거래 내역이 있다. 금융 거래 내역을 지불 유형에 따라 분류하고, 이 중에서 수상한 거래를 Detection해서 처리하는 로직을 구성해본다.
로직은 크게 다음과 같이 구성된다.
시나리오는 지불 유형에 따라 Topic을 분류했지만, 이상 거래 데이터만 따로 수집하는 Topic을 만들어 데이터를 분류할 수도 있을 것이다.
Kafdrop에서 3개의 Topic을 만들어 준다.
payments
card_payments
coin_payments
Kafka 환경 및 Kafdrop에 대한 설명
Muti Broker Kafka Cluster와 통신하는 간단한 Producer & Consumer 만들기
https://velog.io/@jskim/Muti-Broker-Kafka-Cluster%EC%99%80-%ED%86%B5%EC%8B%A0%ED%95%98%EB%8A%94-%EA%B0%84%EB%8B%A8%ED%95%9C-Producer-Consumer-%EB%A7%8C%EB%93%A4%EA%B8%B0
payment_producer.py
from kafka import KafkaProducer
from datetime import datetime
import pytz
import time
import random
import json
TOPIC = 'payments'
brokers = ['localhost:9091', 'localhost:9092', 'localhost:9093']
producer = KafkaProducer(bootstrap_servers=brokers)
def get_datetime():
utc_now = pytz.utc.localize(datetime.utcnow())
kst_now = utc_now.astimezone(pytz.timezone('Asia/Seoul'))
d = kst_now.strftime('%Y-%m-%d')
t = kst_now.strftime('%H:%M:%S')
return d, t
def generate_payment_data():
payment_type = random.choice(['VISA', 'MASTERCARD', 'BITCOIN'])
amt = random.randint(0, 1000000)
to = random.choice(['me', 'dad', 'mom', 'friend', 'stranger'])
return payment_type, amt, to
while True:
d, t = get_datetime()
payment_type, amt, to = generate_payment_data()
tx = {
'DATE': d,
'TIME': t,
'PAYMENT_TYPE': payment_type,
'AMT': amt,
'TO': to
}
producer.send(TOPIC, json.dumps(tx).encode('utf-8'))
print(tx)
time.sleep(1)
정상적으로 transaction이 생성되는 것을 확인할 수 있다.
Topic을 확인해보면 Total available messages나 Offset이 변경되어 잘 전달되는 것을 확인할 수 있다.
payment_classifier.py
from kafka import KafkaProducer, KafkaConsumer
import json
PAYMENT_TOPIC = 'payments'
COIN_TOPIC = 'coin_payments'
CARD_TOPIC = 'card_payments'
brokers = ['localhost:9091', 'localhost:9092', 'localhost:9093']
consumer = KafkaConsumer(PAYMENT_TOPIC, bootstrap_servers=brokers)
producer = KafkaProducer(bootstrap_servers=brokers)
def is_suspicious(tx):
if tx['PAYMENT_TYPE'] == 'BITCOIN':
return True
return False
for message in consumer:
msg = json.loads(message.value.decode())
print(msg)
print(is_suspicious(msg))
topic = COIN_TOPIC if is_suspicious(msg) else CARD_TOPIC
producer.send(topic, json.dumps(msg).encode('utf-8'))
card_processor.py
from kafka import KafkaProducer, KafkaConsumer
import json
CARD_TOPIC = 'card_payments'
brokers = ['localhost:9091', 'localhost:9092', 'localhost:9093']
consumer = KafkaConsumer(CARD_TOPIC, bootstrap_servers=brokers)
for message in consumer:
msg = json.loads(message.value.decode())
to = msg['TO']
amt = msg['AMT']
if msg['PAYMENT_TYPE'] == 'VISA':
print(f'[VISA] payment to : {to} - {amt}')
elif msg['PAYMENT_TYPE'] == 'MASTERCARD':
print(f'[MASTERCARD] payment to : {to} - {amt}')
else:
print('[ALERT] unable to process payments')
coin_processor.py
from kafka import KafkaProducer, KafkaConsumer
import json
COIN_TOPIC = 'coin_payments'
brokers = ['localhost:9091', 'localhost:9092', 'localhost:9093']
consumer = KafkaConsumer(COIN_TOPIC, bootstrap_servers=brokers)
for message in consumer:
msg = json.loads(message.value.decode())
to = msg['TO']
amt = msg['AMT']
if msg['TO'] == 'stranger':
print(f'[ALERT] fraud detected payment to : {to} - {amt}')
else:
print(f'[PROCESSING BITCOIN] payment to : {to} - {amt}')
결과는 다음과 같다.
payment_producer.py
payment_classifier.py
card_processor.py
VISA
와 MASTERCARD
가 나뉘어 로그가 기록되고 있다.coin_processor.py
BITCOIN
이 정상적으로 거래된 경우는 [PROCESSING BITCOIN]
로그를 기록, stranger
에게 송금된 경우는 이상 거래로 판단하는 Alert이 발생하고 있다.