
기존 데이터 시스템의 구조는 각 애플리케이션과 데이터베이스가 end-to-end로 직접 연결되어 있었습니다. 이러한 구조는 간단하지만 각각의 데이터 파이프라인이 분리되어 있어, 요구사항이 증가함에 따라 시스템의 복잡도를 높이는 결과를 가져왔고, 크게 아래와 같은 문제점들이 발생했습니다.
시스템 복잡도의 증가
중앙화된 데이터 전송 영역이 없어, 데이터의 흐름을 파악하기 어렵고, 시스템 관리가 복잡함.
시스템의 일부분에 문제가 발생하면, 연결된 모든 애플리케이션들을 확인해야 함.
데이터 일관성 유지의 어려움
* 데이터가 여러 시스템과 데이터베이스에 분산되어 있는 경우, 한 시스템에서 변경된 데이터가 다른 시스템에 즉시 반영되지 않아 데이터의 일관성을 유지하기 어려움.
데이터 실시간 처리의 어려움
전통적인 메시지 큐 시스템이나 데이터베이스는 대부분 배치 처리 방식을 사용함.
이는 데이터를 실시간으로 처리하는 것이 어렵다는 것을 의미.
확장성 제한
대부분의 전통적인 메시지 큐 시스템은 한정된 리소스 내에서 작동하므로, 대량의 데이터를 처리하는 데 제한이 있음.
데이터의 양이 증가하면서 시스템을 확장해야 하는 상황에서 이런 제한이 큰 문제가 될 수 있음.
아파치 카프카(Apache Kafka)는 이런 문제점들을 해결하기 위해 링크드인에서 개발되었고, 현재는 Apache Software Foundation의 오픈 소스 프로젝트로 유지 관리되는 분산 스트리밍 플랫폼입니다.
프로듀서(Producer)
프로듀서는 Kafka에 메시지를 발행하는 역할을 하는 컴포넌트입니다.
프로듀서는 다양한 데이터 소스로부터 데이터를 가져와 Kafka의 특정 토픽에 메시지를 발행합니다.
브로커(Broker)
브로커는 Kafka의 핵심 서버 컴포넌트로, 프로듀서로부터 메시지를 받아서 저장하고, 컨슈머에게 메시지를 전달하는 역할을 합니다.
Kafka 클러스터는 여러 브로커들로 구성되며, 각 브로커는 하나 이상의 토픽의 메시지를 저장하고 관리합니다.
토픽(Topic)
토픽은 Kafka에서 데이터를 분류하는 단위입니다.
프로듀서는 메시지를 특정 토픽에 발행하고, 컨슈머는 토픽을 구독하여 메시지를 소비합니다. 토픽은 여러 파티션으로 나뉘어질 수 있고, 이를 통해 데이터를 병렬로 처리할 수 있습니다. 각 파티션은 순서가 보장된 메시지 스트림을 제공하며, 브로커가 클러스터 내에서 파티션을 분산하여 저장합니다.
컨슈머(Consumer)
컨슈머는 Kafka의 특정 토픽을 구독하고, 해당 토픽의 메시지를 소비하는 역할을 하는 컴포넌트입니다.
컨슈머는 하나 이상의 토픽을 구독할 수 있으며, 토픽의 파티션을 동시에 소비할 수 있습니다.
// kafka
implementation 'org.springframework.kafka:spring-kafka'
2 카프카서버
카프카란 것은 우리가 스프링과 외부의 DB (mysql)같은것을 연결해서 사용하듯이 외부의 카프카 서비스를 사용하는것 예시에서는 ms에서 제공하는 카프카 서비스를 이용하는 예시
서버를 하나 파서 도커로 카프카이미지 돌려서 손쉽게 구동할수도있음
3 yml설정
spring:
kafka:
producer:
bootstrap-servers: developeryhub.servicebus.windows.net:9093 # 브로커 주소
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 키값 직렬화방법
value-serializer: org.apache.kafka.common.serialization.StringSerializer #벨류값 직렬화방법
properties: #연결서버 인증방법 상황에 맞게 설정해줘야함 ms제공 카프카에서는 이런식으로 하라고 되어있음
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://developeryhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=7MeGYZsuZ6hVYxtKv9wW+KW0o7qUJokz88rBw=";
sasl.mechanism: PLAIN
security.protocol: SASL_SSL
컨트롤러
@RestController
@RequestMapping("/kafka")
public class TestController {
@Autowired
TestService service;
@GetMapping("/insert")
public String insert() {
return service.insert();
}
}
서비스
@Slf4j
@Service
public class TestService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public String insert( ) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("hub1", LocalDateTime.now().toString());
future.addCallback(successCallback -> {
log.info("[producer] successCallback. offset: " + successCallback.getRecordMetadata().offset() + "partition: " + successCallback.getRecordMetadata().partition());
},
errorCallback -> {
log.error("[producer] errorCallback. msg: " + errorCallback.getMessage());
}
);
return "{}";
}
}
기본적으로 yml파일에 카프카 설정이 있으면 카프카 사용한다고 생각하고 가장 기본의 KafkaTemplate 빈으로 등록해줘서 바로사용가능 <String, String>은 아까 설정에서 스트링 직렬화 사용한다고 했기때문에 그것으로 등록
kafkaTemplate.send("hub1", LocalDateTime.now().toString()); 에서 "hub1" 카프카 서버에 등록해둔 토픽(그룹) LocalDateTime.now().toString()보내고싶은 데이터 넣어줌
ListenableFuture<SendResult<String, String>> future 여기서 kafkaTemplate.send는 비동기로 동작하기에 일단 값을 받아두고 나중에 해결하기위한 타입
future.addCallback 결과가 어떤지에 따라서 화살표함수 동작을 작성해줄곳 첫번째 매개변수 = 성공, 두번째 매개변수 = 실패 성공의 경우 successCallback에서 이런저런 값 꺼낼 수 있음
localhost:8080/kafka/insert/ 여러번 요청 보내보면
결과 뽑을 수 있음
partition은 카프카에 해당 토픽에 대한 파티션 개수를 내가 설정해줘서(예시에서는 2개) 같은 요청을 보낼때 해당 토픽에 요청이 골고루 들어가는것을 확인가능 (0,1)
offset은 해당 토픽의 파티션에 오프셋 값은 '파티션 내에서' 고유하고, 순차적으로 표기됩니다. 테이블의 pk (id) 개념과 비슷하다고 볼 수 있습니다. 예시에서는 몇번 시도를 미리 해둬서 조금 높은숫자부터 각파티션에서 독립적으로 1씩 증가하는걸 볼 수 있습니다.
spring:
kafka:
producer:
bootstrap-servers: developeryhub.servicebus.windows.net:9093 # 브로커 주소
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://developeryhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=7MeGYZsuZ6hVYxtKv9wW+KW0o7qUJokz88rBw=";
sasl.mechanism: PLAIN
security.protocol: SASL_SSL
# 같은 프로젝트에서 컨슈머 설정할거라yml에 컨슈머만 추가 프로듀서쪽이랑 의미는 같음
consumer:
bootstrap-servers: developery-kafka.servicebus.windows.net:9093 # 브로커 주소
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
yml 컨슈머 추가
@Slf4j
@Service
public class KafkaConsumer {
@KafkaListener(topics = "hub1", groupId = "myGroup1")
public void consume(String message) throws IOException {
log.info("[consumer] Consumed message : {}", message);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
log.info("end");
}
}
컨슈머 서비스 클래스 추가
@KafkaListener(topics = "hub1", groupId = "myGroup1")이부분이 중요한데 topics = "hub1"카프카의 어떠한 토픽을 읽을지, groupId = "myGroup1"컨슈머 그룹이름설정
이렇게 하면 카프카 hub1 토픽에 메시지가 들어오면 자동으로 아래 함수가 실행되는것!
프로듀서쪽에서 LocalDateTime.now().toString())스트링 값으로 보내줬기때문에 public void consume(String message)함수의 매개변수타입 스트링으로
저 함수안에서 받은 메시지로 작업할거 작업하고 DB에 넣고 하는것들 다 해주면 됨 예시로 sleep넣어줌
localhost:8080/kafka/insert/ 여러번 요청 보내보면 위와같은 결과 확인 가능
다만 지금 상황에서는 문제가 있는데 4번의 요청을 연타해서 보내면 public void consume함수는 sleep때문에 첫번째 요청-> 5초작업 ->두번째 요청 ->5초작업 ...
싱글 스레드인 지금 상황에서는 마지막 줄까지 출력되는데 20초가 걸리는 상황
방법 1 컨슈머스프링서버 여러개 띄우기
스프링부트를 여러개 띄우면 각서버가 토픽의 파티션개수 (예시에서는 2개)에 맞게 분할해서 붙기때문에 지금 예시에서는 서버2개를 띄우면 토픽의 각 파티션에 각서버거 붙어서 한번에 2개씩 일 처리 가능하다 (실무에서는 더 많은 파티션 사용함)
방법 2 스레드 늘리기
물론 파티션 10개 사용한다고 서버를 10개를 돌릴수는 없기때문에 하나의 스프링부트에서 카프카 리스너의 스레드를 여러개 돌릴수있도록 설정함
스레드를 늘리기위해서는 카프카 config로 빈으로 수동설정해서 등록해줘야함
@Configuration
@AllArgsConstructor
public class KafkaConfig {
//우리가 사용할 카프카 메시지 키벨류값
final ConsumerFactory<String, Object> consumerFactory; // 자동 주입됨.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(2); // 이곳의 숫자만큼 consumer thread가 구동됨.
ContainerProperties properties = factory.getContainerProperties();
properties.setAckMode(AckMode.MANUAL); // 실무에서는 auto commit 대신 매뉴얼commit을 이용.
return factory;
}
}
주석참조
properties.setAckMode(AckMode.MANUAL); 기본값은 auto로 카프카쪽에서 컨슈머 쪽으로 메시지 보내면 그냥 끝 다음꺼 보낼 수 있으면 보냄
AckMode.MANUAL로 하면 컨슈머쪽에서 리턴값 넘겨줘야지 다음작업함
@Slf4j
@Service
public class KafkaConsumer {
@KafkaListener(topics = "hub1", groupId = "myGroup1")
public void consume(@Payload String message,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
log.info("[consumer] Consumed key: {}, partition: {}, offset: {}, topic: {}, ts: {}, message : {}",
key, partition, offset, topic, ts, message);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
acknowledgment.acknowledge(); // commit
log.info("end");
}
}
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment,이런식으로 추가적인 값들을 받을 수 있고 아까 AckMode.MANUAL로 수동설정해줬기때문에 Acknowledgment acknowledgment이건 꼭 받고나서 다 끝나면 acknowledgment.acknowledge(); // commit해줘야함 안해주면 카프카 큐애서 안빠짐
처음에 스프링 부트 구동 시키면 설정한데로 스레드 1개는 토픽의 파티션 1번으로 다른 1개를 토픽의 파이션2번으로 붙은 설정 확인할 수 있음
이번에 요청을 2연타 해보면 2개가 별개로 동작하는걸 확인할 수 있음
만약 오류가 중간에 발생하면 어떻게 될까
@Slf4j
@Service
public class KafkaConsumer {
..
.
.
if (true) throw now RuntimeException("myexce") //강제 오류 추가
.
.
이 코드를 넣고 한번의 요청을 보내면
x10
위의 오류가 10번뜨게된다 이유는 카프카에서는 수동설정일때 acknowledgment.acknowledge();가 돌아오지않으면 내부적으로 10번의 재시도를 하는것이 기본값이다.
이를 해결해보자
KafkaConfig 수정
@Slf4j
@Configuration
@AllArgsConstructor
public class KafkaConfig {
final ConsumerFactory<String, Object> consumerFactory; // 자동 주입됨.
final KafkaTemplate<String, Object> kafkaTemplate;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(2); // 이곳의 숫자만큼 consumer thread가 구동됨.
// deprecated 인데 예제들이 대부분 이거라서...
@SuppressWarnings("deprecation")
final DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, exception) -> {
// 에러 핸들러에서 지정한 재시도만큼 모두 최종 실패, 이곳이 실행됨. 이곳에서 어느 토픽으로 보낼지를 리턴해야함.
log.info("run recoverer. original record: {}, exception: {}", record, exception);
// DLQ 용도의 토픽을 만들어서 리턴함. 보통 .dlt 를 suffix 로 하고, record.partition 즉 동일 파티션으로 지정하나
// 통일성있게 규칙을 만들어서 따르면 됨.
return new TopicPartition(record.topic() + ".dlt", 0);
});
factory.setRetryTemplate(retryTemplate());
// 아래 setRecoveryCallback 은 안해도 잘 동작함.
// retryTemplate 이 모두 실패시 콜백을 받아서 뭔가 처리하고 싶을때 구현하면 됨.
factory.setRecoveryCallback((context -> {
if(context.getLastThrowable().getCause() instanceof RecoverableDataAccessException){
log.info("여기는 호출이 되지 않더군..");
}
else{
log.error("retryTemplate의 최대 재시도횟수까지 해도 실패한 경우 여기가 호출됨. setRecoveryCallback. context: " + context);
// SeekToCurrentErrorHandler 을 쓰지 않을 경우, 아래 recoverer.accept() 코드 필요
recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
(Exception)context.getLastThrowable());
// exception을 throw 해야지 최종적으로 commit 이 수행되어 해당 offset 을 무시하고 다음 offset을 시도함.
// 에러 핸들러가 동작하기 위해서는 exception throw 해야함. 그래야 DLQ로 메시지 넘어감.
throw new RuntimeException(context.getLastThrowable().getMessage());
}
return null;
}));
// retry template 를 반복하는 횟수. 1인 경우 (1)+1=2회, 3인 경우 (1)+3 = 4회,
// 1000L은 retry template 간 interval.
// ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(1000L, 1L));
// factory.setErrorHandler(errorHandler);
// SeekToCurrentErrorHandler 쓰지 않는다면 아래처럼 일반적인 에러 핸들러를 만들고 로깅만 하면 됨.
factory.setErrorHandler((exception, record) -> {
log.info("errorHandler. exception: {}, record: {}", exception, record);
});
ContainerProperties properties = factory.getContainerProperties();
properties.setAckMode(AckMode.MANUAL); // 실무에서는 auto commit 대신 매뉴얼 commit을 이용.
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(500L); // 500ms 쉬었다가 ...
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(5); // 최대 5번까지 재시도
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
}
실무에서는 실패한 요청 바로 재시도해도 안될가능성이 높기때문에 몇초기다리는 옵션 사용
1. RetryTemplate함수
2. factory.setRetryTemplate(retryTemplate()); 넣어주면 적용완료
3. 이상태로만 사용하면 바로 적용안됨, 실패했을때 어떻게 할것인가 exception handler등 이거 다해줘야적용가능
4. final KafkaTemplate<String, Object> kafkaTemplate; 카프카 프로듀서 만들었을때 그것
5. @SuppressWarnings("deprecation") recoverer 설정 추가해줌 재시도까지 다하고나서도 실패했을때 어떻게 할것인지
지금 deprecated인데 최신은 옆쪽 블로그 참조 (https://sejoung.github.io/2022/02/2022-02-10-dead_letter_queue/#spring-kafka-dead-letter-queue-%EC%84%A4%EC%A0%95)
6. factory.setRecoveryCallback으로 완전히 실패시 동작할 함수 recoverer 만든것(.dlt 토픽을 전송) 실행
7. factory.setErrorHandler 최종 에러헨들러 마지막에 동작
이동된 토픽에서 다시 요청받을 컨슈머
@KafkaListener(topics = "hub1.dlt", groupId = "myGroup1")
public void dltConsume(@Payload String message,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
log.info("[dltConsume] dltConsumed key: {}, partition: {}, offset: {}, topic: {}, ts: {}, message : {}",
key, partition, offset, topic, ts, message);
acknowledgment.acknowledge(); // commit
log.info("end");
}
카프카에서 .dlt로 새로 만들어줍니다.
localhost:8080/kafka/insert/로 기존꺼 오류뜨는 코드로 요청
23:11:14.736 : [producer] successCallback. partition: 0, offset: 50
23:11:14.742 : [consumer] Consumed key: k:0, partition: 0, offset: 50, topic: hub1, ts: 1638799872909, message : 2021-12-06T23:11:11.183551900
23:11:15.253 : [consumer] Consumed key: k:0, partition: 0, offset: 50, topic: hub1, ts: 1638799872909, message : 2021-12-06T23:11:11.183551900
23:11:15.764 : [consumer] Consumed key: k:0, partition: 0, offset: 50, topic: hub1, ts: 1638799872909, message : 2021-12-06T23:11:11.183551900
23:11:16.275 : [consumer] Consumed key: k:0, partition: 0, offset: 50, topic: hub1, ts: 1638799872909, message : 2021-12-06T23:11:11.183551900
23:11:16.790 : [consumer] Consumed key: k:0, partition: 0, offset: 50, topic: hub1, ts: 1638799872909, message : 2021-12-06T23:11:11.183551900
23:11:16.792 : retryTemplate의 최대 재시도횟수까지 해도 실패한 경우 여기가 호출됨. setRecoveryCallback. context: [RetryContext: count=5, lastException=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.developery.azure.KafkaConsumer.consume(java.lang.String,org.springframework.kafka.support.Acknowledgment,java.lang.String,int,long,java.lang.String,long)' threw exception; nested exception is java.lang.RuntimeException: myException, exhausted=false]
23:11:16.802 : run recoverer. original record: ConsumerRecord(topic = hub1, partition = 0, leaderEpoch = null, offset = 50, CreateTime = 1638799872909, serialized key size = 3, serialized value size = 29, headers = RecordHeaders(headers = [], isReadOnly = false), key = k:0, value = 2021-12-06T23:11:11.183551900), exception: {}
23:11:17.002 : errorHandler. exception: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is java.lang.RuntimeException: Listener method 'public void com.developery.azure.KafkaConsumer.consume(java.lang.String,org.springframework.kafka.support.Acknowledgment,java.lang.String,int,long,java.lang.String,long)' threw exception; nested exception is java.lang.RuntimeException: myException, record: ConsumerRecord(topic = hub1, partition = 0, leaderEpoch = null, offset = 50, CreateTime = 1638799872909, serialized key size = 3, serialized value size = 29, headers = RecordHeaders(headers = [], isReadOnly = false), key = k:0, value = 2021-12-06T23:11:11.183551900)
23:11:17.225 : [dltConsume] dltConsumed key: k:0, partition: 0, offset: 15, topic: hub1.dlt, ts: 1638799877000, message : 2021-12-06T23:11:11.183551900
23:11:17.226 : end