Spring WebFlux 스케줄러 @Scheduler로 http client 요청 돌리기

아무튼 간에·2023년 1월 31일
0

Netty

목록 보기
3/4

Scheduler란 무엇인가?

(제목 전우치 톤으로 읽기)

@Scheduled is a fully viable option it is part of the org.springframework.scheduling.annotation package which means it has nothing to do with spring web etc.
@Scheduledorg.springframework.scheduling.annotation 패키지의 일부이므로 Spring Web 등과는 전혀 관계가 없다.
참고: How to create a scheduler in spring webflux?

처음엔 Quartz를 사용하려고 시도해봤는데 굳이 안 써도 될 것 같아서 기본 지원 스케줄러 사용했다.

스케쥴되는 메서드는 반환 타입이 반드시 void이고 아무런 인자도 받지 말아야 한다.
@Scheduled 어노테이션이 붙고 컨테이너에 일반적인 스프링 빈으로 등록된 bean 클래스에 @Configurable를 사용하지 말아야 한다.
@Scheduled@Async 어노테이션을 둘 다 활성화하려면 설정에서 task 네임스페이스에 'annotation-driven' 요소를 추가해라.
참고: [Spring 레퍼런스] 26장 태스크(Task) 실행과 스케줄링의 26.5 스케줄링과 비동기 실행에 대한 어노테이션 지원


구현 목표

구현하고자 하는 로직은

  • 1시간에 한번씩
  • DB에서 ip/port 데이터 리스트를 받아온 뒤
  • 그 리스트의 호스트(ip)마다 전체 리스트를 보내주는 스케줄이다

받은 리스트를 처리하는 부분도 같은 프로젝트 안에 구현되어 있음.
데이터를 받으면 그대로 다시 DB에 업데이트 해주는 로직.

그니까 한 프로젝트 안에

  • 호스트 리스트를 호스트마다 보내주는 스케줄러가 있고
  • 호스트 리스트를 받아서 DB 업데이트를 수행하는 서비스가 다 있는거임.
    자웅동체... 자공자수... 북치고 장구치고...

따라서 스케줄러를 실행해서 호스트 리스트를 보내면 받아서 처리하는 부분까지 한 프로젝트 안에서 확인 가능하다는 것


코드

아무튼..

전송객체 생성

HostEntity.java

@Setter
@Getter
@Builder
public class HostEntity {
    private String ip;
    private int port;
    private String type;
    private List<HostEntity> hostList;
}

Repository 작성

public interface HostRepository extends R2dbcRepository<HostEntity, String> {

    Mono<HostEntity> findByIp(String ip);
    
    @Query("select * from host_entity")
    Flux<HostEntity> findAll();

    @Modifying
    @Query("UPDATE host_entity SET port=:port, type=:type where ip=:ip")
    Mono<HostEntity> setHostFor(String port, String type, String ip);
}

> 스케줄러 작성

@Service SyncJobScheduleService.java

@Scheduled(cron = "*/30 * * * * *")
public void syncSchedule(){
	System.out.println("============스케줄러 start==============");

	List<HostEntity> entityList = 
    	hostRepository
        	.findAll()
            .log()
    		.collectList()
            .subscribe(list -> {
        		for(HostEntity entity : list){
            		System.out.println("this name: " + entity.getIp());
                    
                    // 수정 테스트
                	if("127.0.0.1".equals(entity.getIp())){
                		entity.setType("수정~~!");
					}
                    
                    // http post application/json 요청
                	nettyHttpRequest(HostEntity.builder()
                    	.ip(entity.getIp())
                    	.port(Integer.parseInt(entity.getPort()))
                    	.hostList(list).build());
				}
			}).dispose();
    }
}

처음에는 db 조회 후 http client 전송 로직을 수행하도록 구현함

  • @Scheduled: 스케줄링 메소드 어노테이션
    (cron = "*/30 * * * * *): 매 30초마다 실행. cron 표현식 (초 분 시 일 월 주 년)
    (cron = "@hourly") 1시간마다 돌릴 경우 이렇게도 표현 가능
    참고:
    - 크론 표현식(Cron Expressions)
    - 크론 표현식 에디터 crontab guru
  • nettyHttpRequest(): Netty http post application/json 요청 서비스.
    위에 작성한 의도대로 조회한 리스트를 각 호스트에 전송하는 로직이다.
    참고로 netty를 쓰려면 webflux 빌드를 해줘야함
    	implementation 'org.springframework.boot:spring-boot-starter-webflux'

받은 리스트 처리 서비스 작성

HostDto.java

@EqualsAndHashCode(callSuper = true)
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class HostDto{
    private List<HostEntity> hostList;
}

@Component routingHandler.java

public Mono<ServerResponse> syncHost(ServerRequest request){
	return request.bodyToMono(HostDto.class)
    	.flatMap(body -> {
        	log.info("=== Host synchronization 시작 ===");
            List<HostEntity> bodyList = body.getHostList();
            List<HostEntity> hostList = new ArrayList<>();
            for(HostEntity he : bodyList){
            	hostList.add(HostEntity.builder()
                        .ip(he.getIp())
                        .port(he.getPort())
                        .type(he.getType())
                        .build());
			}
            return ServerResponse.ok()
            	.body(syncService.SyncHost(hostList),String.class);
	}).onErrorResume(e->ServerResponse.badRequest().bodyValue(e.getMessage()));
}
  • RestController를 쓰지 않고 Router로 동작하는 시스템이어서 맞춰서 작성함.

@Service SyncService.java

public Mono<String> SyncHost(List<HostEntity> hostList) throws CustomException {
	return hostClient.saveAllHost(hostList)
    		.flatMap(result -> {
            	if(result){
                	return Mono.just(OK);
				}else{
                	throw new CustomException(FAILED_SYNC);
				}
			 });
}

@Service HostClient.java

@Transactional
public Mono<Boolean> saveAllHost(List<HostEntity> hostList) {
	System.out.println("saveAll");
	try {
    	for(HostEntity i: hostList){
        	hostRepository.findByIp(i.getIp())
            	.flatMap(found -> hostRepository.setHostFor(i.getPort(), i.getType(), i.getIp()).log())
                .switchIfEmpty(hostRepository.save(i)).log().subscribe();
            }
	}catch (Exception e){
    	e.printStackTrace();
        return Mono.just(false);
	}
    return Mono.just(true);
}

...안되넹

처음 스케줄을 실행할 때는 데이터를 넘기는 것까지 잘 되는 것 같았다. 그런데 데이터를 받아서 처리하는 부분에서는 쿼리 실행을 시도조차 하지 않고, 그 다음 스케줄을 실행할 때도 스케줄러 안의 findAll()조차 하지 않는다…

... 그러고보니 받는 로직도 같은 프로젝트 안에 있다고 했잖음?

  • WebFlux로 구성한 코드 상 이 모든 로직이 한 스레드에서 돌고 있기 때문에
  • 마찬가지로 쿼리 실행 후 값을 리턴하지 않고 계속 물고 있는 상태라 또 다른 쿼리를 실행할 수 없다?
    이런 생각에 미치게 됨. WebFlux도 r2d2도 온전히 이해하지 못한 상태에서 하려니...

바꾼 코드

아무튼 그래서 바꿈
@Service SyncJobScheduleService.java

@Scheduled(cron = "*/30 * * * * *")
public void invalidateOtp(){
	System.out.println("============스케줄러 start==============");
    
    // 바꾼 부분
	List<HostEntity> hostList = hostRepository.findAll().log().collectList().block();
    
	for(HostEntity entity : hostList){
    	System.out.println("this host: " + entity.getIp());
        
        // 수정 테스트
        if("127.0.0.1".equals(entity.getIp())){
        	entity.setType("수정^^~~!");
		}
        
        // http post application/json 요청
		nettyHttpRequest(HostEntity.builder()
        		.ip(entity.getIp())
                .port(Integer.parseInt(entity.getPort()))
                .type(entity.getType())
                .hostList(hostList).build());
	}
}
  • 어차피 본래의 스케줄 성격은 webflux와 관계없이 동작하고 리턴값도 없으니 조회한 데이터를 block()으로 List에 넣어도 되겠다 싶어서 위에처럼 처리했음. 이게 맞는 설명인지는 모르겠는데 느낌은 이럼.

사실... 맞는 방법인지 모르겠음
근데 됨.


오늘도 10%의 확신과 90%의 어 이게 왜 되지 마무리

웹플럭스가 너무 어렵당.. 공부하기 싫당....

profile
armton garnet

0개의 댓글