@Component
@RequiredArgsConstructor
public class SettlementTasklet implements Tasklet {
private final GetRegisteredBankAccountPort getRegisteredBankAccountPort;
private final PaymentPort paymentPort;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
// 1. order service에서 주문 완료된 주문 내역을 조회한다. (order service)
List<Payment> normalStatusPaymentList = paymentPort.getNormalStatusPayments();
// 2. 각 주문 내역의 franchiseId 에 해당하는 유저 정보(userId)에 대한 뱅킹 정보(계좌번호)를 가져온다. (user service)
Map<String, FirmbankingRequestInfo> franchiseIdToBankAccountMap = new HashMap<>();
for (Payment payment : normalStatusPaymentList) {
RegisteredBankAccountAggregateIdentifier entity = getRegisteredBankAccountPort.getRegisteredBankAccount(payment.getFranchiseId());
franchiseIdToBankAccountMap.put(payment.getFranchiseId()
, new FirmbankingRequestInfo(entity.getBankName(), entity.getBankAccountNumber()));
}
// 3. 각 franchiseId 별로, 정산 금액을 계산한다. (settlement service)
for (Payment payment : normalStatusPaymentList) {
FirmbankingRequestInfo firmbankingRequestInfo = franchiseIdToBankAccountMap.get(payment.getFranchiseId());
double fee = Double.parseDouble(payment.getFranchiseFeeRate());
int caculatedPrice = (int) ((100 - fee) * payment.getRequestPrice() * 100);
firmbankingRequestInfo.setMoneyAmount(firmbankingRequestInfo.getMoneyAmount() + caculatedPrice);
}
// 4. 계산된 금액을 펌뱅킹 요청한다. (펌뱅킹 외부 어댑터)
for (FirmbankingRequestInfo firmbankingRequestInfo : franchiseIdToBankAccountMap.values()) {
getRegisteredBankAccountPort.requestFirmbanking(
firmbankingRequestInfo.getBankName()
, firmbankingRequestInfo.getBankAccountNumber()
, firmbankingRequestInfo.getMoneyAmount());
}
// 5. 정산 완료된 주문 내역은 정산 완료 상태로 변경해준다. (order service)
for (Payment payment : normalStatusPaymentList) {
paymentPort.finishSettlement(payment.getPaymentId());
}
return RepeatStatus.FINISHED;
}
}
@Slf4j
@Configuration
public class KafkaConfiguration {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Autowired
private KafkaProperties kafkaProperties;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// Kafka Consumer Config
@Bean
public ConsumerFactory<String, String> consumerFactory() {
final Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, KafkaProperties.IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
// Retry Template(Kafka 2.8 이상부터 사용함.)
@Bean
public RetryTopicConfigurer retryTopicConfigurer() {
return RetryTopicConfigurer
.builder()
.create(kafkaTemplate);
}
// Retry Template(Kafka 2.8 이상부터 사용하지 않음.)
private RetryTemplate retryTemplate() {~~
RetryTemplate retryTemplate = new RetryTemplate();
// 재시도시 1초 후에 재 시도하도록 backoff delay 시간을 설정한다.
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
// 최대 재시도 횟수 설정
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(2);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;~~
}
// Roll Back Logic
private RecoveryCallback<Object> recoveryCallback() {
return new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) {
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) context.getAttribute("record");
String topic = "dlt_" + record.topic();
try {
log.warn("[Send to dead letter topic] {} - value: {}.", topic, record.value());
kafkaTemplate.send(topic, record.value().toString());
} catch (Exception e) {
log.error("[Fail to send to dead letter topic]: {}", topic, e);
}
Acknowledgment acknowledgment = (Acknowledgment) context.getAttribute("acknowledgment");
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
return Optional.empty();
}
};
}
// Kafka Listener
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(kafkaProperties));
// factory.setRetryTemplate(retryTemplate()); // Kafka 2.8 이상부터 사용하지 않음.
factory.setRecoveryCallback(recoveryCallback());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
consumerFactory
→ Kafka Consumer에 대한 설정 정보를 담아둔다.retryTopicConfigurer
→ Retry Template을 대체하기 위해 만든 Retry Topic 설정 정보이다.retryTopicConfigurer
로 수정하였습니다.recoveryCallback
→ Roll Back에 대한 비즈니스 로직을 만들어두었다.kafkaListenerContainerFactory
→ Kafka Listener가 사용할 수 있도록 설정을 미리 해두었다.@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumer {
private final PaymentRepo paymentRepo;
@KafkaListener(topics = "Payment-topic", containerFactory = "kafkaListenerContainerFactory")
@RetryableTopic(attempts = "2", backoff = @Backoff(delay = 1000))
public void requestFirmBanking(String kafkaMessage) {
// 펌뱅킹을 요청하는 비즈니스 로직 처리
}
}
containerFactory
→ kafkaListenerContainerFactory로 설정이 되어있는데, kafkaListenerContainerFactory는 Kakfa Consumer의 세부 동작 방식을 구성하는데 사용된다.@RetryableTopic(attempts = "2", backoff = @Backoff(delay = 1000))
RetryableTopic
→ Kafka 메시지 처리 중 오류가 발생했을 때 재시도 로직을 구성할 수 있게 도와준다.attempts
→ 재 시도 회수는 최대 2번이다.backoff
→ 다시 요청하는데 지연시킬 시간은 1000ms이다.@Component
@RequiredArgsConstructor
public class SettlementTasklet implements Tasklet {
private final GetRegisteredBankAccountPort getRegisteredBankAccountPort;
private final PaymentPort paymentPort;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
// 1. order service에서 주문 완료된 주문 내역을 조회한다. (order service)
List<Payment> normalStatusPaymentList = paymentPort.getNormalStatusPayments();
// 2. 각 주문 내역의 franchiseId 에 해당하는 유저 정보(userId)에 대한 뱅킹 정보(계좌번호)를 가져온다. (user service)
Map<String, FirmbankingRequestInfo> franchiseIdToBankAccountMap = new HashMap<>();
for (Payment payment : normalStatusPaymentList) {
RegisteredBankAccountAggregateIdentifier entity = getRegisteredBankAccountPort.getRegisteredBankAccount(payment.getFranchiseId());
franchiseIdToBankAccountMap.put(payment.getFranchiseId()
, new FirmbankingRequestInfo(entity.getBankName(), entity.getBankAccountNumber()));
}
// 3. 각 franchiseId 별로, 정산 금액을 계산한다. (settlement service)
for (Payment payment : normalStatusPaymentList) {
FirmbankingRequestInfo firmbankingRequestInfo = franchiseIdToBankAccountMap.get(payment.getFranchiseId());
double fee = Double.parseDouble(payment.getFranchiseFeeRate());
int caculatedPrice = (int) ((100 - fee) * payment.getRequestPrice() * 100);
firmbankingRequestInfo.setMoneyAmount(firmbankingRequestInfo.getMoneyAmount() + caculatedPrice);
}
// 5. 정산 완료된 주문 내역은 정산 완료 상태로 변경해준다. (order service)
for (Payment payment : normalStatusPaymentList) {
paymentPort.finishSettlement(payment.getPaymentId());
}
// 4. 계산된 금액을 펌뱅킹 요청한다. (펌뱅킹 외부 어댑터)
for (FirmbankingRequestInfo firmbankingRequestInfo : franchiseIdToBankAccountMap.values()) {
getRegisteredBankAccountPort.requestFirmbanking(
firmbankingRequestInfo.getBankName()
, firmbankingRequestInfo.getBankAccountNumber()
, firmbankingRequestInfo.getMoneyAmount());
}
return RepeatStatus.FINISHED;
}
}