EC2 인스턴스에 kafka를 직접 설치하는 방식이 아닌 docker-compose를 통해 다중 브로커를 만들고 실행해보자
$ sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
$ sudo chmod +x /usr/local/bin/docker-compose
# 폴더 생성
$ mkdir ./kafka
# compose 파일 작성
$ vi docker-compose.yml
--- version: '3.8' services: zookeeper-1: image: confluentinc/cp-zookeeper:latest ports: - '32181:32181' environment: ZOOKEEPER_CLIENT_PORT: 32181 ZOOKEEPER_TICK_TIME: 2000 kafka-1: image: confluentinc/cp-kafka:latest ports: - '9092:9092' depends_on: - zookeeper-1 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_NUM_PARTITIONS: 3 kafka-2: image: confluentinc/cp-kafka:latest ports: - '9093:9093' depends_on: - zookeeper-1 environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_NUM_PARTITIONS: 3 kafka-3: image: confluentinc/cp-kafka:latest ports: - '9094:9094' depends_on: - zookeeper-1 environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_NUM_PARTITIONS: 3
- kafka 브로커 3개로 구성된 클러스터 생성
- 포트는 9092, 9093, 9094로 설정 (ec2 인스턴스 보안설정에서도 개방해줘야한다)
$ vi docker-compose-kafka-ui.yml
version: '2' services: kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui ports: - "8989:8080" restart: always environment: - KAFKA_CLUSTERS_0_NAME=local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:29092,kafka-2:29093,kafka-3:29094 - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-1:22181
# 작성한 위치에서 도커 실행
$ docker-compose up -d
localhost:8989로 접속
# 같은 위치에 파이썬 크롤링 파일 작성
$ vi crawling.py
import requests import re from bs4 import BeautifulSoup import json from kafka import KafkaProducer # Kafka 설정 producer = KafkaProducer(bootstrap_servers='localhost:9092') HEADERS = { "User-Agent": "...", "Accept-Language": "ko-KR,ko;q=0.8,en-US;q=0.5,en;q=0.3" } SEARCH_PRODUCT_CLASS = re.compile("^search-product") for i in range(1, 4): #print("페이지 :", i) url = "https://www.coupang.com/np/search?component=&q=%EC%95%84%EC%9D%B4%ED%8C%A8%EB%93%9C+%EC%97%90%EC%96%B4+5&channel=auto" try: res = requests.get(url, headers=HEADERS) res.raise_for_status() except requests.RequestException as e: print(f"Request failed: {e}") continue soup = BeautifulSoup(res.text, "html.parser") items = soup.find_all("li", attrs={"class": SEARCH_PRODUCT_CLASS}) for item in items: # 광고 제품은 제외 ad_badge = item.find("span", attrs={"class":"ad-badge-text"}) if ad_badge: #print(" <광고 상품 제외합니다>") continue name = item.find("div", attrs={"class":"name"}).get_text() # 제품명 price_tag = item.find("strong", attrs={"class": "price-value"}) if price_tag: price = price_tag.get_text() else: print("가격 정보를 찾을 수 없습니다.") continue # 리뷰 10개 이상, 평점 3 이상 되는 것만 조회 rate = item.find("em", attrs={"class":"rating"}) # 평점 if rate: rate = rate.get_text() else: #rate = "평점 없음" #print(" <평점 없는 상품 제외합니다>") continue rate_cnt = item.find("span", attrs={"class":"rating-total-count"}) # 평점 수 if rate_cnt: rate_cnt = rate_cnt.get_text()[1:-1] # 예 : (26), 괄호 없애기 else: #rate_cnt = "평점 수 없음" #print(" <평점 수 없는 상품 제외합니다>") continue link = item.find("a", attrs={"class":"search-product-link"})["href"] if float(rate) >= 3 and int(rate_cnt) >= 10: #print(name, price, rate, rate_cnt) #print(f"제품명 : {name}") #print(f"가격 : {price}") #print(f"평점 : {rate}점 ({rate_cnt})개") #print("바로가기 : {}".format("https://www.coupang.com/"+link)) #print("-"*100) product_info = { "name": name, "price": price, "rate": rate, "rate_cnt": rate_cnt, "link": f"https://www.coupang.com/{link}" } producer.send('product_topic', value=json.dumps(product_info).encode('utf-8'))