프로듀서로부터 데이터를 전달받으면 카프카 브로커는 프로듀서가 요청한 토픽의 파티션에 데이터를 저장
ls /tmp/kafka-logs
log.dir
옵션에 정의한 디렉토리에 데이터를 저장한다.각 파티션에 대한 폴더에 들어가면 파티션에 존재하는 데이터를 확인할 수 있다.
log
에는 메시지와 메타데이터를 저장한다.
index
에는 메시지의 오프셋을 인덱싱한 정보를 담은 파일이다.
timestamp
파일에는 메시지에 포함된 timestamp값을 기준으로 인덱싱한 정보가 담겨있다.
컨슈머가 데이터를 요청하면 파티션에 저장된 데이터를 전달
카프카는 페이지캐시를 사용하여 디스크 입출력속도를 높이는 방법을 사용했다.
페이지캐시
란 OS에서 파일 입출력의 성능향상을 위해 만들어 놓은 메모리 영역(캐시 영역)을 뜻하며, 한번 읽은 파일의 내용은 메모리의 페이징 캐시영역에 저장시킨다. 추후 동일한 파일의 접근이 일어나면 디스크에서 읽지 않고 메모리에서 이를 읽어 속도문제를 해결할 수 있다.데이터 복제(Replication)는 카프카를 장애 허용시스템으로 동작하도록 하는 원동력
카프카의 데이터 복제는 파티션 단위로 이루어진다.
파티션의 복제 개수(replication factor)
도 같이 설정되는데 직접 옵션을 설정하지 않으면, 브로커에 설정된 옵션값을 따라감.
출처 : https://engineering.linkedin.com/kafka/intra-cluster-replication-apache-kafka
위 그림을 보면, 각 브로커마다 복제된 파티션을 볼 수 있다.
팔로워 파티션은 리더 파티션의 오프셋을 확인하여, 자신이 가지고 있는 오프셋과 차이가 나는 경우 리더파티션으로부터 데이터를 가져와서 자신의 파티션에 저장, 이 과정을 복제(Replication)이라 한다.
복제 개수만큼 저장용량이 증가한다는 단점이 존재하지만, 데이터를 안전하게 사용할수 있다는 강력한 장점때문에 카프카를 운영할 때 2이상의 복제 개수를 정하는 것이 중요하다.
또한 브로커1에서 장애가 발생한다면, 브로커2나 브로커3중의 파티션 하나가 리더 파티션 지위를 넘겨받는다.
데이터가 일부 유실되어도 무관하고 데이터 처리 속도가 중요하다면 1또는 2로 설정
금융권 정보와 같이 유실이 일어나면 안되는 데이터의 경우 복제 개수를 3으로 설정하여 최대 2개의 브로커에서 동시에 장애가 발생하더라도 데이터를 안정적으로 유지할 수 있도록 한다.
log.segment.bytes
ehsms log.segment.ms
옵션에 값이 설정되면 세그먼트 파일이 닫힌다.Trade off
에 주의할 것.카프카는 데이터를 삭제하지 않고 메시지키를 기준으로 오래된 데이터를 압축하는 정책을 가져갈 수도 있다.
여기까지 브로커의 역할에 대해 알아보았다. 그렇다면 카프카 클러스터를 운영할 때 주키퍼가 하는 역할은 무엇일까?
주키퍼는 카프카의 메타데이터를 관리하는데 사용된다. 어떤 데이터를 저장하는지 쉘 명령어를 통해 사용할 수 있는 내부 명령어들에 대한 설명은 주키퍼 공식 홈페이지에 확인할 수 있다. 주키퍼 쉘 명령어는 zookeeper-shell.sh로 실행할 수 있으며 bin 폴더 내에 존재.
출처 : https://www.cloudkarafka.com/blog/part1-kafka-for-beginners-what-is-apache-kafka.html
__consumer_offsets
,__transaction_state
)은 생성 불가능하다.WARNING
표시토픽이름을 모호하게 작성하면 유지보수시 큰 어려움을 겪을 수 있다.
따라서 토픽이름을 통해 어떤 개발환경에서 사용되는 것인지 판단하고, 어떤 데이터 타입을 다루는지 유추할 수 있어야 한다.
토픽 작명 예시
중요한 것은 토픽이름에 대한 규칙을 사전에 정의하고 그 규칙을 잘따르는 것이 중요하다.
타임스탬프
, 메시지키
, 메시지 값
, 오프셋
으로 구성되어있다.StringSerializer
로 직렬화한 메시지 값을 컨슈머가 IntegerDeserializer
로 역직렬화하면 정상적인 데이터를 얻을 수 없다.plugins {
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.7.0'
implementation 'org.slf4j:slf4j-simple:2.0.0-alpha0'
implementation 'org.projectlombok:lombok:1.18.18'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
}
test {
useJUnitPlatform()
}
@Slf4j
public class Producer {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVER = "localhost:9092";
public static void main(String... args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
String messageValue = "testMessage";
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,messageValue);
producer.send(record);
log.info("{}",record);
producer.flush();
producer.close();
}
}
이제 카프카 프로듀서가 전송할 토픽을 생성하자
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partition 3
프로듀서 어플리케이션을 실행할 때 설정해야 하는 필수 옵션과 선택 옵션이 있다.
필수 옵션
옵션명 | 설명 |
---|---|
bootstrap.servers | 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트:포트를 1개이상 작성한다. |
key.serializer | 레코드의 메시지 키를 직렬화하는 클래스를 지정한다. |
value.serializer | 레코드의 메시지 값을 직렬화하는 클래스를 지정한다. |
선택옵션
옵션명 | 설명 |
---|---|
acks | 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는데에 사용하는 옵션이다. 0 ,1 ,-1 중 하나로 설정할 수 있다.1 : 기본값으로써 리더 파티션에 데이터가 저장되면 전송 성공으로 판단 0 : 프로듀서가 전송한 즉시 브로커에 데이터 저장여부와 상관없이 성공으로 판단 -1 : 토픽의 min.insync.replicas 개수에 해당하는 리더 파티션과 팔로워 파티션에 데이터가 저장되면 성공하는 것으로 판단. |
buffer.memory | 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리양을 정한다. 기본값은 32MB |
retries | 프로듀서가 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수를 지정한다 기본값은 Integer.MAX |
batch.size | 배치로 전송할 레코드 최대 용량을 지정한다. 너무 작게 설정하면 프로듀서가 브로커로 더 자주 보내기 때문에 네트워크 부담이 있고 너무크게 설정하면 메모리를 더 많이 사용하게 되는 점을 주의해야한다 default : 16384 |
linger.ms | 배치를 전송하기전까지 기다리는 최소시간이다. 기본값은 0이다 |
partitioner.class | 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정한다. 기본값은 org.apache.kafka.clients.producer.internals.DefaultPartioner |
transactional.id | 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 설정한다. 프로듀서의 고유한 트랜잭션 아이디를 설정할 수 있다. |
// Donggeun 이 key가 들어갈 자리
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"DongGeun",messageValue);
파티션을 직접 넣고 싶다면 다음과 같이 설정하면 된다.
int partitionNo = 0;
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME, partitionNo, messageKey, messageValue);
프로듀서 사용환경에 따라 특정 데티러를 가지는 레코드를 특정 파티션으로 보내야 할때가 있다. 기본 설정 파티셔너를 사용할 경우 메시지 키의 해시값을 파티션에 매칭하여 데이터를 전송하므로 어느 파티션에 들어가는지 알 수 없다. 이때 Partitioner 인터페이스를 사용하여 사용자 정의 파티셔너를 생성하면 특정값을 가진 메시지키에 대해서 무조건 정해진 파티션으로 데이터를 전송하도록 설정할 수 있다.
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if(keyBytes ==null){
throw new InvalidPartitionsException("Need message key");
}
//특정 파티션 값으로 보내도록
if(((String)key).equals("DongGeun")){
return 0;
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//기본 값
return Utils.toPositive(Utils.murmur2(keyBytes) % numPartitions);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
커스텀 파티셔너를 지정한 경우 ProducerConfig
의 PARTITONER_CLASS_CONFIG
을 사용자 생성 파티셔너로 설정하여 KafkaProducer 인스턴스를 생성해야 된다.
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
KafkaProducer
의 send() 메소드는 Future
객체를 반환한다.RecordMetadata
의 비동기 결과를 표현하는 것으로 ProducerRecord가 카프카 브로커에 정상적으로 적재되었는지에 대한 데이터가 포함되어있다.ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
RecordMetadata metadata = producer.send(record).get();
logger.info(metadata.toString());
토픽이름
과 파티션 번호
, 오프셋 번호
가 출력된다. public class Consumer {
private final static Logger logger = LoggerFactory.getLogger(Consumer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
public static void main(String... args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
//컨슈머 그룹 ID 선언
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//카프카 컨슈머 선언
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
//해당 bootstrap의 토픽 구독
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
logger.info("{}","start");
while(true){
//초당 데이터를 가져와서
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String,String> record : records){
//로그로 뿌리는 역할
logger.info("{}",record.toString());
}
}
}
}
컨슈머 그룹을 기준으로 offset을 관리하기 때문에
subscribe()
메소드를 사용하여 토픽을 구독하는 경우 컨슈머 그룹을 선언해야 한다. 컨슈머가 중단되거나 재시작되더라도 컨슈머 그룹의 컨슈머 오프셋을 기준으로 이후 데이터 처리를 하기 때문이다. 컨슈머 그룹을 선언하지 않으면 어떤 그룹에도 속하지 않는 컨슈머로 동작하게 된다.
토픽의 파티션으로부터 데이터를 가져가기 위해 컨슈머를 운영하는 방법은 크게 2가지가 존재.
예를들어 컨슈머가 파티션 개수보다 많을 경우, 파티션은 컨슈머에 1대1로 할당되고 남는 컨슈머는 데이터를 처리하지 못하고 불필요한 스레드로 남게 된다.
카프카 궁금했는데 써주신 글들 대략 보며 개념을 잡았내요 감사합니다 !