Kafka 정리 - 1

유호준·2024년 1월 7일

Kafka

목록 보기
1/6

카프카의 특징

주요 기능

  • 메시지 큐처럼 레코드를 읽고 쓴다
  • 내결함성으로 레코드를 저장한다
  • 스트림이 발생할 때 처리한다.

일반화

  • 홈 엔터테인먼트 시스템에 있는 수신기
  • 데이터 스트림을 처리하고 다른쪽 끝난에 연결된 외부 장치에 적합한 형식으로 보낸다.
  • 프로듀서 : 데이터 스트림을 카프카 프로커에 전송
  • 컨슈머 : 데이터를 읽어 처리할 수 있는 클아이언트

전송 방식

  • at-least-once semantics: 메시지는 수진확인이 될 때까지 재발송한다
  • at-most-once semantics: 메시지는 단 한번 보내며 실패하더라도 재발송하지 않는다
  • exactly-once semantics: 메시지는 이 메시지의 컨슈머에게 단 한번만 보인다.

개발자를 위한 카프카

-직원이 유급 휴가를 제출하는 데 사용하는 HR 시스템을 예로 들어보자

  • 휴가제출은 급여 뿐만아니라 작업 예측을 위한 프로젝트 번다운 차트도 처리될 가능성이 있다.
  • 두가지를 함께 묶을 것인가?
  • 급여 시스템이 다운되면?

카프카 시작

# 주키퍼 서버 시작
bin/zookeeper-server-start.sh config/zookeeper.properties

# 카프카 서버 클러스터 시작
bin/kafka-server-start.sh config/server0.properties
bin/kafka-server-start.sh config/server1.properties
bin/kafka-server-start.sh config/server2.properties

레코드

  • 데이터의 기본 요소
  • 타임스탬프, 값, 키를 가지고 있다

브로커

  • 카프카의 서버 측면
  • 명령줄을 염두에 두고 개발되었다.

토픽 만들기

bin/kafka-topics.sh --create --bootstrap-server localhost:9094 --topic kinaction_helloworld --partitions 3 --replication-factor 3
  • --partitions : 얼마나 많은 부분으로 분할할 것인가
    - 브로커가 3개 있기 때문에 파티션 3개를 사용하면 브로커당 1개씩 제공된다.
  • --bootstrap-server : 로컬 카프카 브로커를 가리킨다

토픽 조회

bin/kafka-topics.sh --list --bootstrap-server localhost:9094
bin/kafka-topics.sh --bootstrap-server localhost:9094 --describe --topic kinaction_helloworld
  • describe : 세부 정보 조회

메시지 전송/수신

bin/kafka-console-producer.sh --bootstrap-server localhost:9094 --topic kinaction_helloworld
bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic kinaction_helloworld --from-beginning
  • --from-beginning : 소비자가 시작하기 전 받은 메시지도 보여준다

프로듀서와 컨슈머

  • 프로듀서 : 데이터의 소스나 메시지를 카프카에 보낸다
  • 컨슈머 : 컨슈머나 싱크에 의해 카프카에서 데이터를 가져온다.

메시지를 보내는 로직

Alert alert = new Alert(...);
ProducerRecord<Alert, String> pr = new ProducerRecord<>("kinaction_alert", "alert", alert.getAlertMessage());
producer.send(pr, new AlertCallback()); # 콜백을 사용할 수도 있음
producer.close()

메시지를 소비하는 로직

consumer.subscribe(List.of("kinaction_audit")); //토픽 구독
while(keepConsuming){
	var records = consumer.poll(Duriation.ofMillis(250));
    for(ConsumerRecord<String, String> record : records){
    	log.info("kinaction_info offset = {}, kinaction_value = {}", record.offset(), record.value());
        OffsetAndMetaData offsetMeta = new OffsetAndMetadata(++record.offset(), "");
        
        Map<TopicPartition, OffsetAndMetadata> kaOffsetMap = new HashMap<>();
        kaOffsetMap.put(new TopicPartition("kinaction_audit", record.partition()), offsetMeta);
        
        consumer.commitSync(kaOffsetMap);
    }
}

토픽 개요

  • 대다수 사용자가 어떤 메시지가 어디로 가야하는지에 관한 로직을 생각하기 시작하는 곳
  • 파티션이라는 단위로 구성된다.
  • 파티션 복사본 중 하나가 리더가 된다.
    - 파티션 3개로 된 토픽이 있고 3개의 복사본을 갖고 있다면 모든 파티션은 각각 리더 레플리카를 선출했을 것이다.
    - 다른 두 복사본은 리더 레플리카에의해 데이터를 업데이트 받는 팔로워가 된다.

주키퍼의 용도

  • 디스커버리, 컨피규레이션, 동기화 서비스를 고가용성 방식으로 제공되는 분산 저장소

카프카의 고가용성 아키텍처

  • 수백만 개의 메시지를 빠르게 처리할 수 있는 것은 페이지 캐시 사용이다.
  • 브로커가 힙에 캐시되지 않도록 하여 큰 힙으로 인해 발생하는 문제를 방지한다
  • 다른 고려 사항은 데이터의 접근 패턴
    - 새 메시지가 유입될 때 많은 컨슈머가 가장 최신 메시지와 상호작용할 가능성이 높다.
    - 최신 메시지는 캐시로부터 제공받을 수 있다.

커밋 로그

  • 오프셋을 사용해 메시지가 로그에서 어디에 위치한 지 알 수 있다.
  • 이벤트가 항상 로그 마지막에 추가된다

다양한 소스 코드 패키지와 역할

카프카 스트림즈

  • 독립된 프로세싱 클러스터가 필요하지 않다
  • 스트리밍 애플리케이션을 쉽게 작성하게 해준다.

카프카 커넥트

  • 다른 시스템을 쉽게 통합하기 위해 만들어졌다.
  • 소스 커넥터 : 소스에서 카프카로 데이터를 임포트하기 위해 사용된다.
  • 커넥트소스 : 메시지를 카프카로 생산하기 위해
  • 싱크 커넥터 : 카프카에서 다른 시스템으로 데이터를 익스포트하기 위해 사용된다.

컨플루언트 클라이언트

public class HelloWorldProducer{
	public static main(String[] args){
   		Properties kaProperties = new Properties();
        kaProperties.put("bootstrap.servers", "localhost:9092", "localhost:9093", "localhost:9094");
        
        kaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        try(Producer<String, String> producer = new KafkaProducer<>(kaProperties))
        
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kinaction_helloworld", null, "hello world again");
        producer.send(producerRecord)
    }
}

컨슈머

public class HelloWorldConsumer{
	...
    
    public static void main(String[] args){
    	Properties kaProperties = new Properties();
        kaProperties.put("bootstrap.servers", "localhost:9092", "localhost:9093", "localhost:9094")
        kaProperties.put("group.id","kinaction_helloconsumer");
        kaProperties.put("enable.auto.commit","true");
        kaProperties.put("auto.commit.interval.ms","1000");
        kaProperties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        kaProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        
        HelloWorldConsumer helloWorldConsumer = new HelloWorldConsumer();
        helloWorldConsumer.consume(kaProperties);
        Runtime.getRuntime().addShutdownHook(new Thread(helloWorldconsumer::shutdown));
    }
    
    private void consume(Properties kaProperties){
    	try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kaProperties)){
        	consumer.subscribe(List.of("kinaction_helloworld"));
            while(keepConsuming){
            	ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(250));
                ...
            }
        }
    }
}
  • 프로듀서와 달리 스레드 안전하지 않다
    - 단일 컨슈머를 넘어서 확장할 때 스레드 안전 문제를 고려해야한다.
  • 단일 poll로 여러개의 메시지가 들어올 수 있다

스트림 처리

  • 데이터가 계속 도착하며 끝나지 않는다
  • 이 데이터를 항상 처리해야 하며 실행 요청이나 시간 프레임을 기다리지 않아야 한다
  • 데이터를 일괄 처리 및 그룹 단위로 처리하지 않는다.
    - 야간, 월간 실행이라는 개념도 스트림 처리라는 워크 플로와 무관하다
  • 이벤트 메시지가 클러스터에 들어오는 동안 컨슈머 애플리케이션은 이벤트의 정적인 스냅숏을 가져오기 위한 쿼리 결과를 기다리는 것이 아니라 업데이트된 정보를 계속해서 제공한다.
    - 최신 이벤트를 보기 위해 사용자가 새로고침 할 필요가 없다.
profile
포트폴리오 - https://drive.google.com/file/d/152OM9p7JQorjUfvR4BaxqGuP5xtQ8-fM/view?usp=sharing

0개의 댓글