Java Reactor 에 대해 공부가 더 필요하지만 제가 알고 있는 수준에서 정리해보았습니다.
공식문서 참고하였으나 틀린 지식이 있을 수 있으니 유의해주시면 감사하겠습니다.
전체 요구사항은 이보다 더 많지만 간략하게 정리하고자 한다.
- 사용자는 추천경로 목록 중 원하는 경로를 선택한다.
- 원하는 경로의 막차도착예정정보를 조회한다.
- 각 구간의 이동수단에 따라 해당 수단의 막차도착예정정보를 조회한다.
- 막차도착예정정보에 따라 막차알림 이벤트를 발행한다.
아래와 같이 역할에 따라 계층을 분리하였다.
막차알림 파사드
막차도착예정정보 조회 서비스
를 호출한다.막차도착예정정보
에 따라 막차알림 이벤트
를 발행한다.막차도착예정정보 조회 서비스
버스 막차정보 조회 클라이언트
를 호출한다.지하철 막차정보 조회 클라이언트
를 호출한다.기차 막차정보 조회 클라이언트
를 호출한다.버스 막차정보 조회 클라이언트
버스 막차정보 API
를 호출한다.지하철 막차정보 조회 클라이언트
지하철 막차정보 API
를 호출한다.기차 막차정보 조회 클라이언트
기차 막차정보 API
를 호출한다.위와 같은 설계대로라면 Client Layer 에서 각 대중교통 타입에 따라
대중교통 API 를 호출하는 로직들이 있어야 한다.
하지만 대중교통 또한 도메인으로 묶여있었다.
대중교통 정보를 영속화 & 캐싱하고 있었고, API 로 정보제공도 하고 있고, 여러 지자체 API 를 거치다보니 인터페이스화 되어있었다.
더군다나 사실 따지고 보면 대중교통도 도메인이 맞긴하다.
현재 컨벤션 상 아래 규칙으로 인해 Client Layer 에서 대중교통 도메인을 호출할 수 없었다.
Facade 의미에 맞게 Facade 에서만 다른 도메인을 의존하도록 강제하였다.
그렇다면 어떻게 할 수 있을까,,,, 고민하던 찰나에 이벤트 발행 & 채널 개념을 사용할 수 있지 않을까 라고 생각들었고 이에 대해서 구현해보고자 찾아보았다.
도메인 모델, 포트 어댑터 패턴, 마이크로서비스,,,,
여러 설계안이 있을 것이다.
하지만 위 설계안 모두 도메인 기반 설계이다.
도메인 기반 설계에서는 도메인 간 이벤트 발행 및 큐잉이 필연적이라고 생각한다.
특히 마이크로서비스에서는,,,,
가장 대표적인 예시가 애그리거트 모델이다.
애그리거트 모델에서는 애그리거트 루트의 값이 변경됨에 따라
이벤트 발행하여 값이 변경되었음을 알린다.
이에 따라 구독자들을 이를 탐지하고 각 유스케이스에 따라 처리를 한다.
( 가령
주문
도메인에 대해환불 유스케이스
처리한 경우
환불 이벤트
발행을 하여 이에 관련된구독권한
을 해지한다던가,,, )
그렇다면 도메인 간 정보를 주고 받고자 한다면 어떻게 해야할까?
Kafaka 나 Redis PUB/SUB 을 사용하는 경우가 있을 것이다.
Kafaka 나 Redis PUB/SUB 에서는 발행자와 구독자가 이벤트 발행과 채널(카프카에서는 브로커)을 통해 데이터를 주고 받는다.
하지만 Kafaka 나 Redis PUB/SUB 에 대해서는 아직 기술 성숙도가 낮았다.
이에 따라 스프링부트의 ApplicationEventPublisher 와 FluxSink 를 사용하여 구현해보았다.
ApplicationEventPublisher 에 대해서는 아래 링크를 참고해보세요.
Mono.create() & MonoSink 도 아래에서 설명할 FluxSink 와 비슷하게 처리됩니다. 아래 링크를 참고해보세요.
Mono / Flux 생성 방법에 따라 처리방법을 조작하거나 결과값을 담아 보낼 수 있다.
해당 방법을 통해 Redis 채널 역할을 할 수 있다.
그 전에 결과값에 대해 제어할 수 있는 FluxSink 를 알아보자.
다운스트림 구독자를 래퍼 API로 감싸서 0 또는 1의 onError/onComplete가 뒤따르는 다음 신호를 원하는 개수만큼 방출할 수 있다.
onNext
signal.)create 를 사용하면 각 단계마다 값을 여러 개 생산하는 Flux 를 만들 수 있으며, 심지어 멀티스레드로도 가능하다.
이 메서드는 next, error, complete 메서드를 가지고 있는 FluxSink 를 노출하고 있다. 콜백에서 멀티스레드 기반 이벤트를 트리거할 수 있다.
Flux.create(fluxSink -> {
fluxSink.next(1);
fluxSink.next(2);
fluxSink.complete();
}).subscribe(...)
처럼 create
를 사용하면 Consumer
내부에서
next
로 내려줄지complete
완료 시그널을 줄 지error
에러 시그널을 줄 지 프로그래밍 적으로 결정할 수 있다.create 는 비동기 API와 함께 사용할 수 있다고 해서, 코드를 병렬화해 주거나 비동기로 만들어 주지는 않는다
람다 내에서 블로킹하면 교착 상태나 이와 유사한 부작용을 경험할 것이다.
람다에서 오랫동안 블로킹하고 있으면 ( sinke.next(t) 를 호출하는 무한 루프 등) 파이프라인이 잠겨 버릴 수 있다: 요청을 수행해야 할 스레드에서 루프를 실행하고 있기 때문에 요청을 수행할 수 없다. 이때는 subscribeOn(Scheduler, false) 메소드를 사용해라: create 는 requestOnSeparateThread = false 면 Scheduler 스레드를 사용하고, 기존 스레드에서 request 를 수행하기 때문에 데이터 흐름을 멈추지 않는다.
Flux create 는 Subscriber가 요청한 개수보다 Publisher 가 더 많은 데이터를 방출할 수 있다.
Flux<Integer> flux = Flux.create( (FluxSink<Integer> sink) -> {
// Subscriber가 요청한 것보다 3개 더 발생
sink.onRequest(request -> {
for (int i = 1; i <= request + 3 ; i++) {
sink.next(i);
}
});
});
이 코드는 Subscriber가 요청한 개수보다 3개 데이터를 더 발생한다.
이 경우 어떻게 될까?
기본적으로 Flux.create()로 생성한 Flux는 초과로 발생한 데이터를 버퍼에 보관한다.
버퍼에 보관된 데이터는 다음에 Subscriber가 데이터를 요청할 때 전달된다.
요청보다 발생한 데이터가 많을 때 선택할 수 있는 처리 방식은 다음과 같다.
Flux.create()의 두 번째 인자로 처리 방식을 전달하면 된다.
Flux.create(sink -> { ... }, FluxSink.OverflowStrategy.IGNORE);
limitRate() 를 사용하여 다운스트림 요청을 분할 할 수 있다.
예를 들어, limitRate(10) 에 100 을 요청하게 되면 업스트림으로 10 요청을 최대 10 개 전파한다.
fetchMetroLastDeparture(requestDto)
.limitRate(10) // Request 10 items at a time
.doOnNext(item -> {
// process each item
})
.subscribe();
create() 는 비동기 멀티스레드 Flux 를 생성하는 반면, 동기 Flux 를 생성하고자 한다면 generate() 를 사용해야한다.
Flux<String> flux = Flux.generate(
() -> 0, // (1)
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state); // (2)
if (state == 10) sink.complete(); // (3)
return state + 1; // (4)
});
위 Flux 의 동작과정은 아래와 같다.
https://velog.io/@redjen/Java-Reactive-Programming-4-Flux-%EC%83%9D%EC%84%B1
https://www.baeldung.com/flux-sequences-reactor
이 문제는 어디서 Flux Stream 을 생성 하느냐
에 달려있다.
본 설계에서의 Flux Stream 생성 위치는 이벤트 구독자이다.
다만 이벤트 구독자에는 @Async
를 통해 스레드 풀을 부여하였다.
따라서 이에 걸맞는 Flux.create()
를 사용하였다.
**여기서 설명은 안 했지만, Flux.push() 는 Async&Single-Thread Flux 를 생성하는데, Flux.create() 로 처리가능하므로 생략했다.
@Slf4j
@Component
@RequiredArgsConstructor
public class MetroEventHandler {
private final FetchMetroLastDepartureFacade fetchMetroLastDepartureFacade;
@Async("LastTransportAlarmAsyncThreadPool")
@Retryable(maxAttempts = 5, backoff = @Backoff(delay = 100))
@EventListener
public void handleFetchMetroLastDepartureEvent(FetchMetroLastDepartureEvent event) {
// Flux 파이프라인 생성
// Flux 파이프라인을 채널로 반환
}
}
방법은 간단하다.
Event 발행 시 FluxSink 를 Event 에 넘겨주고, 구독자는 FluxSink 에 값을 담는 것이다.
즉, FluxSink 가 채널의 역할을 하게되는 것이다.
각각 알아보자.
우선 아래와 같이 FluxSink 를 Event 의 필드값으로 둔다.
@Value
@Getter
@JsonSerialize
@JsonDeserialize
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor(access = AccessLevel.PUBLIC, force = true)
public class FetchMetroLastDepartureEvent {
MetroLastDepartureRequestDto requestDto;
FluxSink<MetroLastDeparture> resultSink;
}
Flux.create() 를 통해 Flux 생성, sink 를 Event 에 담는다.
이후 Event 를 발행한다.
public interface FetchSubwayClient {
Flux<MetroLastDeparture> fetchMetroLastDepartureOf(Leg leg);
}
@Slf4j
@Component
@RequiredArgsConstructor
public class FetchSubwayClientImpl implements FetchSubwayClient {
private final ApplicationEventPublisher applicationEventPublisher;
private final MetroLastDepartureRequestMapper requestMapper;
@Override
public Flux<MetroLastDeparture> fetchMetroLastDepartureOf(Leg leg) {
return Flux.<MetroLastDeparture>create(sink -> {
MetroLastDepartureRequestDto requestDto = requestMapper.toRequest(leg);
FetchMetroLastDepartureEvent event = FetchMetroLastDepartureEvent.of(requestDto, sink);
applicationEventPublisher.publishEvent(event);
});
}
}
발행된 Event 를 탐지한 구독자는 Flux 를 생성한다.
( 여기서는 Flux 이 되겠다. )
생성한 Flux 의 시그널에 따라 sink 를 제어한다.
만약 성공한다면 sink 에 대해 값이 전송된다.
@Slf4j
@Component
@RequiredArgsConstructor
public class MetroEventHandler {
private final FetchMetroLastDepartureFacade fetchMetroLastDepartureFacade;
@Async("LastTransportAlarmAsyncThreadPool")
@Retryable(maxAttempts = 5, backoff = @Backoff(delay = 100))
@EventListener
public void handleFetchMetroLastDepartureEvent(FetchMetroLastDepartureEvent event) {
MetroLastDepartureRequestDto requestDto = event.getRequestDto();
fetchMetroLastDepartureFacade
.fetchMetroLastDeparture(requestDto)
.doOnNext(event.getResultSink()::next)
.doOnError(error -> event.getResultSink().error(error))
.doOnComplete(event.getResultSink()::complete)
.limitRate(10)
.subscribe();
}
}
Flux 를 생성한다.
여기서는 서비스를 호출하여 API 호출을 하고,
비즈니스 로직을 처리했다. — filter() 나 take() 사용
public interface FetchMetroLastDepartureFacade {
Flux<MetroLastDeparture> fetchMetroLastDeparture(MetroLastDepartureRequestDto requestDto);
}
@Slf4j
@Component
@RequiredArgsConstructor
public class FetchMetroLastDepartureFacadeImpl implements FetchMetroLastDepartureFacade {
private final FetchMetroLastDeparture fetchMetroLastDeparture;
@Override
public Flux<MetroLastDeparture> fetchMetroLastDeparture(MetroLastDepartureRequestDto request) {
return fetchMetroLastDeparture
.fetchMetroLastDeparture(request)
// 잘못된 역이름 조회결과 필터
.filter(metroLastDeparture -> metroLastDeparture.stationNm().equalsIgnoreCase(request.subwayStationName()))
// 페이지 사이즈만큼 가져오기
.take(request.numOfRows());
}
}
이제 잘 돌아가는지 실제로 테스트를 할 차례다.
파라미터 테스트를 통해 더미 인자값을 만들어주었다. — TMAP JSON 생성
@SpringBootTest
//@CustomIntegrationTest
class FetchSubwayClientImplTest {
// 구간 소요시간 = 1시간 (60초 * 60분)
private final static int FIXED_SECTION_SECONDS = 60 * 60;
// 더미데이터 생성 라이브러리
private final static FixtureMonkey FIXTURE_MONKEY = FixtureMonkey.builder()
.defaultNotNull(Boolean.TRUE)
.objectIntrospector(ConstructorPropertiesArbitraryIntrospector.INSTANCE)
.build();
@Autowired
private FetchSubwayClientImpl fetchSubwayClient;
static Stream<Arguments> 종로역3호선생성() {
List<Station> stations = List.of(
new Station(
0,
"110321",
"종로3가",
"126.991825",
"37.571708"
),
new Station(
1,
"110322",
"을지로3가",
"126.992569",
"37.566803"
),
new Station(
2,
"110323",
"충무로",
"126.994439",
"37.560953"
)
);
Leg 종로3가 = FIXTURE_MONKEY
.giveMeBuilder(Leg.class)
.set(
javaGetter(Leg::mode),
Mode.SUBWAY
)
.set(
javaGetter(Leg::type),
3
)
.set(
javaGetter(Leg::start),
new Start(37.57170833333333, 126.991825, "종로3가")
)
.set(
javaGetter(Leg::passStopList),
FIXTURE_MONKEY.giveMeBuilder(PassStop.class).set(javaGetter(PassStop::stations), stations).sample()
)
.set(
javaGetter(Leg::sectionTime),
FIXED_SECTION_SECONDS
)
.sample();
return Stream.of(
Arguments.of(
종로3가
)
);
}
@ParameterizedTest
@DisplayName("이벤트 발행을 통해 지하철 막차 도착예정 정보 조회 시 성공합니다.")
@MethodSource("종로역3호선생성")
void 이벤트발행을통해_지하철막차도착예정정보_조회시_성공(Leg leg) {
// GIVEN
// WHEN
Flux<MetroLastDeparture> metroLastDepartureFlux = fetchSubwayClient.fetchMetroLastDepartureOf(leg);
// THEN
StepVerifier.create(metroLastDepartureFlux)
.thenConsumeWhile(metroLastDeparture -> {
assertNotNull(metroLastDeparture);
System.out.println("metroLastDeparture = " + metroLastDeparture);
return true;
})
.verifyComplete();
}
}
아래와 같이 성공하는 것을 볼 수 있다.
이 모든 뻘짓이,,,
모놀리식 구조로 마이크로서비스 흉내를 내다보니 발행하는 일이라고 생각한다.
마이크로서비스 흉내를 낼 것이라면,,,
도메인 단위로 찢어내는 게 가장 현명하고 빠른 길이라고 생각한다.
그러나 분리했음에도 다양한 방법이 있을 것이다.
다음은 필자가 생각하기에 좋은 방법이라고 고려하는 것들이다.