public void process() {
consume().flatMap(this::recordToNotifyObject)
.groupBy(Message::key)
.flatMap(flux -> flux.buffer(Duration.ofSeconds(30)))
.flatMap(this::notify)
.flatMap(this::saveNotifyResult)
.subscribe();
}
webflux , kafka consume을 통해 받는데 메세지 모아서 보내기 위해서 buffer 사용함.
❓ 근데 이러면 저 시간 30초동안 delay가 생기는가? 그럼 webflux를 쓰는 이유가 비동기인데 이렇게 되면 동기적으로 변하는건가?
buffer(Duration.ofSeconds(30)) => 30초 동안 메시지들을 모으고 그룹화된 각 키에 대한 메시지들을 한 번에 처리함
이 부분은 실제로 메시지 수집 동안 다른 작업이 진행될 수 없는 것처럼 보일 수 있지만, 전체적인 WebFlux의 비동기 및 논블로킹 특성에는 영향을 주지 않습니다.
WebFlux 프레임워크는 반응형 프로그래밍을 가능하게 해서, I/O 작업(예를 들어 Kafka에서 메시지를 소비하는 것)이 블로킹 되지 않고, 시스템의 다른 부분이 계속해서 동작할 수 있도록 합니다. buffer의 사용은 단지 특정 시간 동안 메시지를 모으고 배치로 처리하기 위한 것이며, 이 기간 동안 다른 비동기 작업들은 계속 진행될 수 있습니다. 예를 들어, 다른 네트워크 요청을 처리하거나, 다른 메시지 키에 대한 처리를 동시에 진행할 수 있습니다.
즉, buffer를 사용하는 동안 특정 메시지 스트림의 처리가 지연될 수는 있지만, 이것이 전체 시스템의 비동기적 특성을 동기적으로 바꾸는 것은 아닙니다. 다른 이벤트 처리는 계속해서 비동기적으로 진행되므로, WebFlux를 사용하는 주된 이유인 시스템의 반응성과 비동기성은 그대로 유지됩니다.
여기서 groupBy(Message::key) 메소드는 메시지들을 그들의 키에 따라 그룹화합니다. 이렇게 하면 각 키별로 별도의 스트림이 생성되고, 각 스트림은 독립적으로 buffer(Duration.ofSeconds(30))에 의해 관리됩니다. 이는 각 키별로 30초 동안 메시지를 모으고, 그 기간이 끝나면 한 번에 모인 메시지들을 처리하는 것을 의미합니다.
이 과정에서, 각 키별 스트림은 독립적으로 동작하므로, 한 키에 대한 메시지 처리가 다른 키의 메시지 처리와 상호 간섭하지 않습니다. 하지만 특정 키의 스트림 내에서는 메시지들이 30초간 모이는 동안 추가적인 작업을 기다리게 됩니다. 즉, 그룹화된 키에 해당하는 메시지 스트림은 '배치 처리' 방식으로 동기적인 작업 패턴을 보이는 셈입니다. 그러나 이것은 그룹화된 스트림 내에서만 해당되며, 전체 애플리케이션 또는 시스템의 다른 비동기 작업에는 영향을 미치지 않습니다.
따라서, 동일한 Message::key 값을 가진 메시지들은 같은 그룹 내에서 30초 간격으로 처리되기 때문에, 이들은 동기적으로 배치 처리되는 것처럼 보일 수 있습니다. 하지만 전체 프로세스는 비동기적이며, WebFlux 프레임워크의 논블로킹 특성은 계속 유지됩니다.