(추가)acks:all 설정 시 min.insync.replicas도 2 이상으로 변경해주어야한다.
./gradlew jar -PscalaVersion=2.13.5
//zookeeper 실행 커맨드
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh config/server.properties1
bin/kafka-server-start.sh config/server.properties2
spring:
kafka:
producer:
properties:
min:
insync:
replicas: 2
kafka:
bootstrapAddress: localhost:9092, localhost:9093
admin:
properties:
topic:
alarm:
name: alarm
replicationFactor: 2
numPartitions: 2
consumer:
alarm:
rdb-group-id: createAlarmInRDB
redis-group-id: publishInRedis
autoOffsetResetConfig: latest
producer:
acksConfig: all
retry: 3
enable-idempotence: true
max-in-flight-requests-per-connection: 3
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value("${kafka.topic.alarm.name}")
private String topicName;
@Value("${kafka.topic.alarm.numPartitions}")
private String numPartitions;
@Value("${kafka.topic.alarm.replicationFactor}")
private String replicationFactor;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
/**
* broker 를 두개만 설정하였으므로 최소 Replication Factor로 2를 설정하고
* Partition의 경우 Event 의 Consumer인 WAS를 2대까지만 실행되도록 해두었기 때문에 2로 설정함.
* 이보다 Partition을 크게 설정한다고 해서 Consume 속도가 빨라지지 않기 때문이다.
* @return
*/
@Bean
public NewTopic newTopic() {
return new NewTopic(topicName, Integer.parseInt(numPartitions), Short.parseShort(replicationFactor));
}
}
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrapAddress}")
private String bootstrapServers;
/**
* ack: all
* In-Sync-Replica에 모두 event가 저장되었음이 확인 되어야 ack 신호를 보냄 가장 성능은 떨어지지만
* event produce를 보장할 수 있음.
*/
@Value("${kafka.producer.acksConfig}")
private String acksConfig;
@Value("${kafka.producer.retry}")
private Integer retry;
@Value("${kafka.producer.enable-idempotence}")
private Boolean enableIdempotence;
@Value("${kafka.producer.max-in-flight-requests-per-connection}")
private Integer maxInFlightRequestsPerConnection;
/**
* enable.idempotence true를 위해서는 retry가 0이상,
* max.in.flight.requests.per.connection 은 5이하여야한다.
* @return
*/
@Bean
public ProducerFactory<String, AlarmEvent> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.ACKS_CONFIG, acksConfig);
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.RETRIES_CONFIG, retry);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, AlarmEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrapAddress}")
private String bootstrapServers;
@Value("${kafka.consumer.autoOffsetResetConfig}")
private String autoOffsetResetConfig;
@Value("${kafka.consumer.alarm.rdb-group-id}")
private String rdbGroupId;
@Value("${kafka.consumer.alarm.redis-group-id}")
private String redisGroupId;
@Bean
// @Qualifier("alarmEventRDBConsumerFactory")
public ConsumerFactory<String, AlarmEvent> alarmEventRDBConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, rdbGroupId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(AlarmEvent.class));
}
@Bean
public ConsumerFactory<String, AlarmEvent> alarmEventRedisConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, redisGroupId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(AlarmEvent.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, AlarmEvent> kafkaListenerContainerFactoryRDB() {
ConcurrentKafkaListenerContainerFactory<String, AlarmEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(alarmEventRDBConsumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
// ContainerProperties containerProperties = factory.getContainerProperties();
// containerProperties.setIdleBetweenPolls(1000);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, AlarmEvent> kafkaListenerContainerFactoryRedis() {
ConcurrentKafkaListenerContainerFactory<String, AlarmEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(alarmEventRedisConsumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
}
Event Consume시 실행할 로직을 구현합니다.
@KafkaListener는 ConcurrentKafkaListenerContainerFactory를 특정하여 실행되는 방식이기 때문에 이를 고려하여 작성하면 됩니다.
똑같은 produce된 event에 대해서 두가지 방식으로 (RDB, Redis)에서 Consumer되어야 하기 때문에 두가지 Consumer Group을 정의하고 각각 KafkaListener를 등록합니다.
kafkaLisener에서 ack를 명시적으로 지정하고 싶은 경우
factory.getContainerProperties().setAckMode(AckMode.MANUAL); 옵션을 지정해야합니다. 하지않을 경우 직렬화 관련 에러가 발생합니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmConsumer {
private final AlarmService alarmService;
/**
* offset을 최신으로 설정.
* https://stackoverflow.com/questions/57163953/kafkalistener-consumerconfig-auto-offset-reset-doc-earliest-for-multiple-listene
* @param alarmEvent
* @param ack
*/
@KafkaListener(topics = "${kafka.topic.alarm.name}", groupId = "${kafka.consumer.alarm.rdb-group-id}",
properties = {AUTO_OFFSET_RESET_CONFIG + ":earliest"}, containerFactory = "kafkaListenerContainerFactoryRDB")
public void createAlarmInRDBConsumerGroup(AlarmEvent alarmEvent, Acknowledgment ack) {
log.info("createAlarmInRDBConsumerGroup");
alarmService.createAlarm(alarmEvent.getUserId(), alarmEvent.getType(), alarmEvent.getArgs());
ack.acknowledge();
}
@KafkaListener(topics = "${kafka.topic.alarm.name}", groupId = "${kafka.consumer.alarm.redis-group-id}",
properties = {AUTO_OFFSET_RESET_CONFIG + ":earliest"}, containerFactory = "kafkaListenerContainerFactoryRedis")
public void redisPublishConsumerGroup(AlarmEvent alarmEvent, Acknowledgment ack) {
log.info("redisPublishConsumerGroup");
alarmService.send(alarmEvent.getUserId(),
alarmEvent.getEventName());
ack.acknowledge();
}
}