7장에서는 리액티브 마이크로서비스 개발 방법 (==스프링을 사용해 논블로킹 동기 REST API 및 비동기 이벤트 기반 서비스를 개발하는 방법)을 배우고, 이 두 가지 방식의 선택 기준도 배운다. 끝으로 리액티브 마이크로서비스 환경을 위한 수동 및 자동 테스트를 작성하고 실행하는 방법을 살펴본다.
논블로킹
- 논 블로킹 : 처리되어야 하는 한 작업이 전체적인 작업의 흐름을 안막는다
- 블로킹 : 막는다
다른 작업을 수행하는 주체가 어떻게 상대하는지에 따라 달라짐동기와 비동기
처리해야 할 작업들을 어떤 흐름으로 처리 할 것인가에 대한 관점
- 동기 : 호출되는 함수의 작업 완료 여부를 신경쓴다
- 비동기 : 안쓴다.
참고 https://velog.io/@hayeon/%EB%8F%99%EA%B8%B0-%EB%B9%84%EB%8F%99%EA%B8%B0-%EB%B8%94%EB%A1%9C%ED%82%B9-%EB%85%BC%EB%B8%94%EB%A1%9C%ED%82%B9
이번 장에서 사용한 소스 코드는
https://github.com/PacktPublishing/Hands-On-Microservices-with-Spring-Boot-and-Spring-Cloud/tree/master/Chapter07
버전 :
스프링 클라우드 2.1.0
스프링 부트 2.1.2, 스프링 5.1.4
리액티브 마이크로서비스를 개발 시 1. 논블로킹 동기 API와 2. 이벤트 기반 비동기 서비스 중 어떤걸 사용할지 정해져잇는 것 XX 자율적 OO
if ) 런타임 의존성 최소화(=느슨한 결합) ==>동기 API << 이벤트 기반 비동기 메시지 전달 방식 선호
b/c 런타임에 다른 여러 서비스에 메시징 시스템에만 접근하면 되기 때문
if2 ) 최종 사용자가 응답을 기다리는 읽기 작업 ==> 논블로킹 동기 API >> 이벤트 기반 비동기 서비스
if 3 ) 모바일 앱이나 SPA 웹 애플리케이션처럼 동기 API가 알맞은 클라이언트 플랫폼 ==> 논블로킹 동기 API >> 이벤트 기반 비동기 서비스
if 4 ) 클라이언트가 다른 조직의 서비스에 연결할 시 ==> 논블로킹 동기 API >> 이벤트 기반 비동기 서비스
(b/c 여러 조직이 공통 메시징 시스템을 공유불가 )
이번에 시스템 환경구성은
product-composite : 동기 api (b/c1 복합 마이크로서비스는 웹 및 모바일 플랫폼을 대상 + b/c2 시스템 환경을 운영하는 조직이 아닌 다른 조직의 클라이언트를 주로 상대한다고 가정)
핵심 마이크로서비스가 제공하는 읽기 서비스 : 논블로킹 동기 API (b/c 응답을 기다리는 최종 사용자가 있기 때문)
핵심 마이크로서비스의 생성/ 삭제 서비스 : 이벤트 기반 비동기 서비스 ( 복합 마이크로서비스가 제품 집계 정보의 생성 및 삭제를 위해 제공하는 동기 API는, 핵심 서비스가 수신하는 토픽에 생성 및 삭제 이벤트를 게시한 후 바로 200(OK) 응답을 반환)
스프링 5의 리액티브 지원은 project reactor 기반이다.
project reactor는 reactive 애플리케이션 구축의 표준인 reactive stream 사양을 기반으로 한다.
스프링 webflux, 스프링 webclient, 스프링 data ==> 스프링 리액터를 사용해 리액티브 및 논블로킹 기능을 제공
@Test
public void TestFlux() {
List<Integer> list = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.log()
.subscribe(n -> list.add(n));
assertThat(list).containsExactly(4, 8);
}
1,2,3,4 정수로 스트림 시작-> 홀수만 filer -> 스트림 값에 *2 해서 변환(map) -> map 끝나면 log에 스트림 데이터 기록 -> 스트림을 처리하려면 구독자가 있어야 한다.!! 호출된 subscribe 메서드로 구독자를 등록. 스트림에서 받은 각 요소에 람다 lambda 함수를 적용하며, 스트림에서 받은 각 요소를 list에 add -> 스트림 처리 결과가 담긴 list에 assertThat 예상 값인 정수 4, 8이 있는지 검증 containsExactly
출력 로그
onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
: 스트림을 구독하는 구독자가 스트림 처리를 시작하고 스트림 콘텐츠를 요청
onComplete()
: 구독자가 onComplete() 호출해 스트림 종료를 알리면 처리가 종료
를 통해서 product 및 recommendation 서비스의 MongoDB 기반 리포지토리를 reactive 하게 만듦
변경해주니까 테스트 코드가 에러나고 난리났느데 테스트 코드도 변경해주자
영속성 메서드가 Mono나 Flux 객체를 반환하므로 테스트 메서드는 반환된 리액티브 객체에서 결과를 받을 때까지 기다려야 함
StepVerifier 클래스를 사용해 repository 검색하고 결과를 확인하는 처리 절차를 설 정하는 방법도 있음
설정을 다 하고 verifyComplete() 메서드를 호출하면 시퀀스 시작함
핵심서비스도 코드 변경해야함
api 변경 (ProductService, RecommendationService, ReviewService)
원래처럼 Product 객체를 반환하는게 아니라
Mono 객체 반환하도록 변경
서비스 구현 변경 (ProductServiceImpl, RecommendaionServiceImpl)
기존의 repository 에서 findByProductId를 통해서 가져와서 List< RecommendationEntity>로 저장했던 거에서
가져와서 log찍고
mapper.entityToApi() 메서드를 호출해 영속성 계층에서 가져온 엔티티를 API모델 객체로 변환 -> 모델 객체의 serviceAddress 필드에 요청을 처리한 마이크로서비스의 DNS 이름과 IP 주소를 설정하는 map 까지
getProduct도 동일하다
테스트 코드 변경
비동기 동작을 처리하고자 Mono 및 Flux를 반환(리액티브 객체)하고, 테스트 코드 block() 메서드를 호출하거나 StepVerifier 헬퍼 클래스 사용
기존의
에서
등으로 바꾸는건데
책에서는 너무 설명을 안해준다 ㅡㅡ
jpa 이용하는 관계형 db에 접근한는 review는 논블로킹 프로그래밍 모델을 사용하지 않고 Scheduler를 사용해 블로킹 코드를 실행
스케줄러는 일정 수의 스레드를 보유한 전용 스레드 풀의 스레드에서 블로킹 코드를 실행 (마이크로서비스의 논블로킹 처리에 영향없음 b/c 스레드의 고갈을 방지하므로)
메인 클래스에 스레드 풀 구성
-> 서비스 구현 클래스에 scheduler 주입
근데 책이랑 코드가 좀 다르다 뒤에 내용이랑 합쳐진건가..
암튼
asyncFlux (getByProductId (productId)).log(); 로 getByProductId() 메서드에 블로킹 코드를 구현하고
getReviews() 메서드는 asyncFlux() 메서드를 사용해 스레드 풀의 스레드에서 블로킹 코드를 실행하게 된다.
통합계층 변경
ProductCompositeIntegration 통합 클래스에서 사용하는 블로킹 방식 HTTP 클라이언트
인 RestTemplate 스프링 5에서 제공하는 논블로킹 방식 HTTP 클라이언트인 WebClient
로 대체한다.
그리고 기존의
에서
-> webclient 인스턴스 사용한 논블로킹 방식으로 product 서비스 호출
서비스 구현 변경
zip 메서드로 다수의 병렬 요청을 처리. 처리 완료 시 하나로 압축
createProductAggregate 헬퍼 메서드로 3개의 api 호출 결과를 집계
각각은 mono객체 반환
테스트 코드 변경
기존의 코드에서
통합 클래스를 대체하는 모의 객체의 설정을 변경한다 (helper 메서드인 Mono.just() 메서드와 Flux.fromIterable() 메서드를 사용해 Mono 및 Flux 객체를 반환하도록)
이벤트 기반의 비동기 생성 서비스와 삭제 서비스를 개발
복합 서비스는 create / delete 이벤트를 각 핵심 서비스의 topic에 게시한 후 핵심 서비스의 처리를 기다리지 않고 호출자에게 OK 응답을 반환
topic에 메시지 게시(publish) 하는 코드는
mysource.output().send(MessageBuilder.withPayload(message).build());
메시지 소비하는 코드
@StreamListener(target = Sink.INPUT)
public void receive(MyMessage message) {
LOG.info("Received: {}",message);
이 프로그래밍 모델은 rabbitMQ나 kafka같은 메시징 시스템 과 독립적으로 사용.
동기 API 호출보다는 비동기 메시지 방식이 갖는 문제가 있어서 스프링 클라우드 스트림으로 문제 처리 방법 알아본다.
문제 :소비자 인스턴스 수 늘리면 product 마이크로서비스의 모든 인스턴스가 같은 메시지를 소비한다
해결 : 소비자 유형별로 하나의 인스턴스가 메시지를 처리하게 한다
스프링 클라우드 스트림의 소비자 그룹은 소비자 쪽에서 구성
소비자 그룹은 group: 으로 묶는다 -> 스프링 클라우드 스트림은 product 마이크로서비스 인스턴스 중 한 인스턴스로만 product 토픽에 게시된 메시지를 전달
문제 :
소비자가 메시지 처리 실패시 -> 메시지는 성공할때까지 대기열로 다시 보내지거나 사라짐
내용이 잘못된 메시지(=포이즌 메시지 poison message)인 경우 수동으로 메시지를 제거할 때까지 다른 메시지를 처리하지 못하도록 소비자를 차단
참고 :
해결 : 결함 분석 및 수정을 위해 메시지를 다른 저장소로 이동하기 전에 수행할 재시도 횟수를 지정
문제 : 비즈니스 로직에서 메시지가 전송된 순서대로 메시지를 소비하고 처리해야 하는 경우엔 소비자 그룹을 사용불가 (=여러 개의 소비자 인스턴스를 사용해 처리 성능을 높일 수 없음) --> 들어오는 메시지를 처리할 때 발생하는 지연 시간이 지나치게 길어질 수 있는 문제 존재
참고 : 파티션 == 하위ㅣ토픽
메시징 시스템이 같은 key를 가진 메시지 사이의 순서를 보장하고자 사용할 키를 각 메시지에 지정. -> 메시징 시스템은 키 기준으로 특정 파티션에 메시지 배치 (=같은 키 가지는 메시지는 always 같은 파티션에 배치)
->메시지 순서 보장을 위해 소비자 그룹 안의 각 파티션마다 하나의 소비자 인스턴스가 배정
파티션 수가 늘어나면 소비자 인스턴스를 추가해 전달 순서를 유지하면서 성능을 향상 가능
해결 : 파티션을 사용하면 성능과 확장성을 잃지 않으면서도 전송됐을 때의 순서 그대로 메시지 전달 가능
메시징 시스템은 보통 헤더와 본문으로 구성된 메시지를 다룸
event : 어떤 상황이 발생했다는 것을 설명하는 메시지
이벤트 메시지 본문 ⊂ 이벤트 유형 (type) , 이벤트 데이터 (data), 타임스탬프(이벤트 발생 시간)(timestamp), 데이터 식별을 위한 키(key)
메시지 소스를 선언하고 통합 계층에서 이벤트를 게시
이벤트를 다른 토픽에 게시하려면 토픽별 MessageChannel을 선언한 자바 인터페이스를 만들고, EnableBinding 애노테이션을 선언해 활성화해야 함
ProductCompositeIntegration에 메세지 채널 선언하고, 인스턴스를 생성자로 주입
주입된 messageSources 객체로 이벤트 게시
ex) 제품 삭제 이벤트-> outputProducts() 메서드로 제품 토픽을 위한
messageChannel을 가져온 다음 send() 메서드로 이벤트를 게시
MessageBuilder 클래스를 사용해 이ㅣ벤트 담긴 메시지 작성
이벤트 게시를 위한 구성을 추가
기본 메시징 시스템 : rabbit mq
기본 컨텐츠 유형 : application/json
출력 채널과 토픽이름 bind (output)
rabbitMQ, kafka 연결정보 선언
이벤트 게시를 테스트할 수 있도록 테스트를 변경
스프링 클라우드 스트림이제공하는 TestSupportBinder를 사용하면 테스트 중에 메시징 시스템을 사용하지 않아도 어떤 메시지가 전송됐는지 확인가능
MessageCollector 헬퍼 클래스를 사용하면 테스트 중에 전송된 모든 메시지 가져올 수 있음
ProductServiceImpl과 RecommendationServiceImpl코드에서
메시지 프로세서는 블로킹 프로그래밍 모델을 기반으로 하므로 영속성 계층에서 받은 Mono 객체의 block() 메서드를 호출한 후에 메시지 프로세서로 결과를 반환해야 한다.
if ) block() 메서드를 호출하지 않으면 : 메시징 시스템이 서비스 구현에서 발생한 오류를 처리하지 못하므로 이벤트가 대기열로 다시 들어가지 못하고 데드 레터 대기열로 이동하게 되기 때문이다.
review 는 jpa 블로킹 방식이므로 변경X
이벤트 소비를 위한 구성을 추가
이전과 동일함 기본 메시징 시스템 rabbitmq, json이 기본
이벤트의 비동기 처리를 테스트할 수 있도록 테스트를 변경
create / delete이벤트를 수신하므로 (이전의)REST API를 호출하는 대신 이벤트를 보내게 해야 한다.
-- input 메서드 채널의 send() 메서드를 사용해 이벤트를 전송
input 채널의 send() 메서드 호출 : 메시지 프로세서에 의한 동기 방식 처리
=== 메시지 프로세서의 process() 메서드를 직접 호출하는 것과 다를 바 없음
-> 테스트 코드가 이벤트를 비동기로 처리하고자 동기화 로직이나 wait Logic을 구현할 필요가 없음
docker compose 파일 3가지 구성으로 테스트
파티션 없이 RabbitMQ 사용
토픽당 2개의 파티션으로 RabbitMQ 사용
토픽당 2개의 파티션으로 카프카 사용
마이크로서비스 테스트 위해서 환경 단순화
product-composite 서비스의 health 엔드포인트는 product
composite 서비스 자신과 3가지 핵심 마이크로서비스 모두가 정상인 경우에만 UP으로 응답 (=http 200)
그리고 헬퍼 메서드를 사용해 세 가지 핵심 마이크로서비스의 상태 정보를 등록
이제 curl 테스트 해봐야함
일단 빌드
아하 역시 한방에 될리가 없쥬?
bean 이 없다고 하는 에러다
일단 기본코드랑 비교해보면
product의 persistence에 이부분이 없었고
메인 메소드 쪽에 template 가 안바뀌어있었다
이 후에 다시 build해보면
동일하게 recommendation도 같은 애러가 난다
product의 persistence에 이부분이 없었고
메인 메소드 쪽에 template 가 안바뀌어있던 부분 변경해줬음
*책내용 그대로 따라한다고 한거같은데 왜 안되어있엇지 🤯
암튼 빌드 성공
컨테이너 띄우고
curl localhost:8080/actuator/health -s
를 해보면 up은 나오지만 책처럼은 안나온다
저자 코드를 이용해서 해보면 책처럼 나온다 🤔 무엇이 문젤까 🤔
했는데 에러가 나오길래
코드 차이를 분석해서 여러개 빠트린 것을 채워넣어서 해봤다
책 순서대로 따라한다고 했는데 중간에 정신을 놓았던 모양이다..
음. .
.....
일단 저자 코드로 하겟다. ..
참내...
body='{"productId":1,"name":"product name C","weight":300, "recommendations":[
{"recommendationId": 1, "author": "author 1","rate": 1, "content":"content 1"},
{"recommendationId":2,"author":"author 2","rate":2,"content":"content 2"},
{"recommendationId":3,"author":"author 3","rate":3,"content":"content 3"}
], "reviews":[
{"reviewId": 1, "author": "author 1", "subject":"subject 1","content":"content
1"},
{"reviewId":2,"author":"author 2","subject":"subject 2","content":"content
2"},
{"reviewId":3,"author":"author 3","subject":"subject 3","content":"content
3"}
]}'
curl -X POST localhost:8080/product-composite -H "Content-Type: application/
json" --data "$body"
curl로 하려고 했는데 많은 양은안된다 -> postman으로한다
rabiitmq 페이지간다 -> 아이디 페스워드 guest , guest 로 로그인한다
curl로도 조회
토픽당 2개의 파티션으로 RabbitMQ를 사용하고자 별도의 도커 컴포즈 파일 docker-compose-partitions.yml
-> 스프링 클라우드 스트림이 생성한 대기열은 파티션별로 대기열이 하나씩
이번 장에서는
스프링을 사용해 논블로킹 동기 REST API 및 비동기 이벤트 기반 서비스를 개발하는 방법으로 리액티브 마이크로서비스를 구현해봤다. 코드 에러는 많이 나지만..
논 블로킹 동기 api는 처리되어야 하는 한 작업이 전체적인 작업의 흐름을 막지않고 호출되는 함수의 작업 완료 여부를 신경쓰는 api이고 비동기 이벤트 기반은 이벤트가 발생함에 따라서 호출되는 함수의 작업 완료 여부와 상관없는 api인 것으로 이해했다.
스프링 webflux 및 스프링 webclient, spring data는 스프링 리액터를 사용해 리액티브및 논블로킹 기능을 제공한다.
스프링 데이터의 MongoDB 지원 기능(flux와 mono객체를 반환하게 만들고 repository 클래스를 ReactiveCrudRepository로 만드는 것)을 사용해서 데이터베이스 응답을 기다리는 동안 스레드를 차단하지 않는 논블로킹 방식으로 MongoDB 데이터베이스에 접속할 수 있다.
jpa를 이용하는 관계형 db (review 서비스 ) 는 Scheduler를 사용해 블로킹 코드를 실행했다.
스프링 데이터 스트림과 메시징 시스템인 RabbitMQ 및 카프카를 이용해 코드 변경
없이 이벤트 기반 비동기 서비스를 개발하는 방법을 살펴봤는데 이부분이 아마 코드 구현이 잘 안됐던것 같다. 그래도 배웠던 내용은 1소비자 그룹으로 문제를 해결하는법, 2재시도및 데드 레터 대기열, 3파티션로 문제해결하는 법을 통해서스프링 클라우드 스트림의 기능을 사용해 여러 가지 비동기 메시징 문제를 처리할 수 있다는것을 알았다.
리액티브 시스템은 메시지 기반의 비동기 통신을 토대로 유연성, 확장성, 탄력성을 확보하므로 장애에 강하고 유연성과 탄력성이 있기 때문에 적시에 반응할 수 있지만 이를 다 이용하기 위해서는 알아야할 것들이 많아서 적용의 스케일을 컨트롤 가능한 만큼만 하는 것이 좋겠다