[Apache Kafka] docker-compose를 통한 kafka 구축 과 크롤링 데이터 전송

youngtae·2023년 9월 4일
1

Apache Kafka

목록 보기
2/3
post-thumbnail

Docker-Compose로 kafka 클러스터 구축

EC2 인스턴스에 kafka를 직접 설치하는 방식이 아닌 docker-compose를 통해 다중 브로커를 만들고 실행해보자

1. EC2 접속 후 도커 컴포즈 설치

$ sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
$ sudo chmod +x /usr/local/bin/docker-compose

2. docker-compose 파일 작성

# 폴더 생성
$ mkdir ./kafka
# compose 파일 작성
$ vi docker-compose.yml
---
version: '3.8'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000

  kafka-1:
    image: confluentinc/cp-kafka:latest
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3

  kafka-2:
    image: confluentinc/cp-kafka:latest
    ports:
      - '9093:9093'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3

  kafka-3:
    image: confluentinc/cp-kafka:latest
    ports:
      - '9094:9094'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
  • kafka 브로커 3개로 구성된 클러스터 생성
  • 포트는 9092, 9093, 9094로 설정 (ec2 인스턴스 보안설정에서도 개방해줘야한다)

3. docker-compose-kafka-ui

  • 모니터링을 위한 kafka-ui 파일도 작성
$ vi docker-compose-kafka-ui.yml
version: '2'
services:
  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8989:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:29092,kafka-2:29093,kafka-3:29094
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-1:22181

4. docker 실행

# 작성한 위치에서 도커 실행
$ docker-compose up -d

5. kafka-ui 확인

localhost:8989로 접속

  • online 상태인 클러스터에 브로커 3개가 잘 생성된 것을 확인할 수 있다.

6. 크롤링 데이터 kafka로 전송

  • ec2 인스턴스 내에서 파이썬 파일을 작성해서 localhost:9092로 전송
  • 쿠팡의 상품목록 3페이지 크롤링
  • kafka의 producer역할
# 같은 위치에 파이썬 크롤링 파일 작성
$ vi crawling.py
import requests
import re
from bs4 import BeautifulSoup
import json
from kafka import KafkaProducer

# Kafka 설정
producer = KafkaProducer(bootstrap_servers='localhost:9092')

HEADERS = {
    "User-Agent": "...",
    "Accept-Language": "ko-KR,ko;q=0.8,en-US;q=0.5,en;q=0.3"
}

SEARCH_PRODUCT_CLASS = re.compile("^search-product")

for i in range(1, 4):
    #print("페이지 :", i)
    url = "https://www.coupang.com/np/search?component=&q=%EC%95%84%EC%9D%B4%ED%8C%A8%EB%93%9C+%EC%97%90%EC%96%B4+5&channel=auto"

    try:
        res = requests.get(url, headers=HEADERS)
        res.raise_for_status()
    except requests.RequestException as e:
        print(f"Request failed: {e}")
        continue

    soup = BeautifulSoup(res.text, "html.parser")
    items = soup.find_all("li", attrs={"class": SEARCH_PRODUCT_CLASS})

    for item in items:

        # 광고 제품은 제외
        ad_badge = item.find("span", attrs={"class":"ad-badge-text"})
        if ad_badge:
            #print("  <광고 상품 제외합니다>")
            continue

        name = item.find("div", attrs={"class":"name"}).get_text() # 제품명

        price_tag = item.find("strong", attrs={"class": "price-value"})
        if price_tag:
            price = price_tag.get_text()
        else:
            print("가격 정보를 찾을 수 없습니다.")
            continue

        # 리뷰 10개 이상, 평점 3 이상 되는 것만 조회
        rate = item.find("em", attrs={"class":"rating"}) # 평점
        if rate:
            rate = rate.get_text()
        else:
            #rate = "평점 없음"
            #print("  <평점 없는 상품 제외합니다>")
            continue

        rate_cnt = item.find("span", attrs={"class":"rating-total-count"}) # 평점 수
        if rate_cnt:
            rate_cnt = rate_cnt.get_text()[1:-1] # 예 : (26), 괄호 없애기

        else:
            #rate_cnt = "평점 수 없음"
            #print("  <평점 수 없는 상품 제외합니다>")
            continue

        link = item.find("a", attrs={"class":"search-product-link"})["href"]

        if float(rate) >= 3 and int(rate_cnt) >= 10:
            #print(name, price, rate, rate_cnt)
            #print(f"제품명 : {name}")
            #print(f"가격 : {price}")
            #print(f"평점 : {rate}점 ({rate_cnt})개")
            #print("바로가기 : {}".format("https://www.coupang.com/"+link))
            #print("-"*100)
            product_info = {
                "name": name,
                "price": price,
                "rate": rate,
                "rate_cnt": rate_cnt,
                "link": f"https://www.coupang.com/{link}"
            }
            producer.send('product_topic', value=json.dumps(product_info).encode('utf-8'))

  • 크롤링 데이터가 메시지로 kafka에 전송된 것을 확인

참고

Kafka-UI Tool 을 이용하여 Kafka 관리하기
쿠팡 크롤링

profile
나의 개발기록

0개의 댓글