앞에서의 포스팅을 과정을 거쳐서 브로커의 토픽에 메세지(로그)들이 적재가 되었습니다.
이렇게 적재된 메세지들을 이제 컨슈머를 통해서 소비를하고 원하는 목적지에 배달을 해야하는데요
구현한 컨슈머를 통해서 포스팅을 진행하겠습니다.
BigQuery로 가는 "user2" 토픽의 메세지들은 모든로그를 취급합니다.
그러면서 BigQuery로 적재된 로그 데이터를 통해서 분석가들이 사용자를 분석하거나 그거에 맞는 모델을 만들기 위해서 사용하는 데이터 마트의 역할을 하게됩니다.
from kafka import KafkaConsumer
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file('GCP 키파일 경로')
client = bigquery.Client(credentials=credentials)
table_id = "빅쿼리 테이블 아이디"
# Kafka 처리를 위한 consumer 생성
consumer = KafkaConsumer(
'토픽 이름',
bootstrap_servers=['IP:PORT', 'IP:PORT', 'IP:PORT'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# 로그 데이터를 처리하여 BigQuery에 저장하는 함수
def insert_data_bigquery(log_data):
errors = client.insert_rows_json(table_id, [log_data])
if errors == []:
print("새 행이 추가되었습니다.")
else:
print("다음 에러가 발생했습니다: ", errors)
for msg in consumer:
log_data = msg.value
insert_data_bigquery(log_data)
빅쿼리 연결
우선적으로 빅쿼리와 연결을 먼저 진행해야합니다.
처음에는 insert_data_bigquery함수안에 했지만 그것을 밖으로 빼서 함수가 작동할때마다 연결하는 시간을 단축시켰습니다.
카프카 연결
카프카 연결을 위해서 브로커들의 아이피와 포트, 토픽이름을 작성하고, 들어온 메세지에 대한 직렬화를 했습니다.
브로커의 아이피와 포트를 모두 쓴 이유는 카프카 클러스터의 한 브로커가 장애가 발생하더라도, 컨슈머는 다른 브로커와 연결하여 계속해서 데이터를 수신하여 장애에 대처하기 위함입니다.
메세지 빅쿼리에 insert
가져온 메세지를 빅쿼리에 넣기 위해서 테이블의 ID를 통해서 연결을 합니다.
만약 client.insert_rows_json(table_id, [log_data])
이 과정에서 에러가 없다면 errors
라는 변수가 비어있기 때문에 추가됐다고 출력을 하고 아니라면 에러의 내용을 출력하여 대처할수 있습니다.
Redis로 가는 메세지들은 "user2" 로그로 향하는 메세지들중 info
라는 키값에 search
또는 movie_detail
이라면 "user3" 라는 토픽에도 추가로 향하게 됩니다.
from kafka import KafkaConsumer
import json
import redis
redis_client = redis.Redis(host='IP', port=6379, db=0)
# Kafka 처리를 위한 consumer 생성
consumer = KafkaConsumer(
'토픽 이름',
bootstrap_servers=['IP:PORT', 'IP:PORT', 'IP:PORT'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
def insert_data_redis(key, log_data):
if key == 'popularity':
# 상세페이지 본 순위
redis_client.zincrby(key, 1, log_data['action_data']) # sorted set의 개념을 활용(순위)
# 최근 본 상세페이지 기능 구현
user_key = log_data['user_id'] # 최근 본 영화 구현을 위해
value=f'{log_data["action_data"]}' # 최근 본 영화 구현을 위해
redis_client.lpush(user_key, value) # 리스트의 왼쪽에 movie_id 값 추가
redis_client.ltrim(user_key, 0, 9) # 최근 본 영화를 10개만 보여주기위해서 밀어내면서 뒤에오는것은 삭제
else:
redis_client.zincrby(key, 1, log_data['action_data'])
print(f"레디스에 데이터 추가됨: {key} -> {log_data['action_data']}")
for msg in consumer:
log_data = msg.value
if log_data['info']=='movie_detail':
insert_key='popularity'
else:
insert_key='search'
insert_data_redis(insert_key ,log_data)
Redis 연결
우선적으로 Redis와 연결을 먼저 진행해야합니다.
카프카 연결
빅쿼리 컨슈머와 같은 방식으로 카프카의 토픽과 연결합니다.
들어온 메세지 처리
아래쪽의 반복문을 통해서 메세지가 들어오게되고 이때 조건문을 통해서 아래와 같이 키값을 다르게 했습니다.
info
라는 키의 값이 movie_detail
이라면 insert_key
는 popularity
info
라는 키의 값이 search
이라면 insert_key
는 search
클릭한 영화에대한 순위
key
가 popularity
라면 sorted set을 통해서 들어온 log_data['action_data']
의 영화아이디 값을 1씩 증가합니다.
사용자가 최근 클릭한 영화
사용자의 최근 본 영화는user_key
라는 리스트에 방금 본 영화의 아이디인 value
를 추가합니다. 이 때, Redis의 LPUSH 명령어를 사용하여 리스트의 가장 왼쪽(즉, 맨 앞)에 새로운 요소를 추가하게 됩니다.
검색어 순위
key
가 search
라면 sorted set을 통해서 들어온 log_data['action_data']
의 검색어 값을 1씩 증가합니다.
운영을 할때는 추가됐다는 출력문을 삭제하고, 디버깅을 통해서 이상유무를 빠르게 아는 방법으로 변경하는것이 좋겠습니다.
그리고 하드코딩으로 작성했던 IP와 PORT를 환경변수나 설정파일을 통해서 관리를 해야합니다.