*ReactiveElasticsearchOperations, ReactiveElasticsearchTemplate을 사용하면서 발생한 이슈입니다!
정확히는 Mono, Flux 에 대한 개념이 부족해서 발생한 이슈이지만,,
template.bulkUpdate(chunkedQueryModels, IndexCoordinates.of(esStatisticType.indexName))
.retryWhen(Retry.backoff(3, Duration.ofMillis(100)))
.doOnError { e ->
log.error(
"[${esStatisticType.indexName}] 통계 데이터 ES UPDATE 중 에러 발생 - ${chunkedQueryModels.size}건 - $chunkedQueryModels",
e
)
}
MySQL -> ES 로 데이터를 마이그레이션 하는 배치인데, insert update 모두 0이다.
(참고로, insert가 아니면 update에 표기되므로 두 개의 합은 항상 select와 동일해야한다.)
template.bulkUpdate(chunkedQueryModels, IndexCoordinates.of(esStatisticType.indexName))
.retryWhen(Retry.backoff(3, Duration.ofMillis(100)))
.doOnError { e ->
log.error(
"[${esStatisticType.indexName}] 통계 데이터 ES UPDATE 중 에러 발생 - ${chunkedQueryModels.size}건 - $chunkedQueryModels",
e
)
}.awaitFirstOrNull()
await 동작이 추가된게 전부이다.
template가 ReactiveElasticsearchOperations 이기 때문에
bulkUpdate() 메소드의 반환형은 Mono<Void> 인데 subscribe() 가 없기 때문에 동작이 완료되기 전에 건너뛰어 버린 것.
이후에 알게된 문서이지만, ReactiveElasticsearchOperations를 사용할 때는 반드시 subscribe() 를 해주라더라,,
Reactive 프로그래밍과 Mono, Flux에 대해 더 공부해보게 된 케이스이다.
Reference :
https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/