230319 TIL (2)

김수아·2023년 3월 19일
0

2023_TIL

목록 보기
9/9
post-thumbnail

자바 병렬 처리

1. 리액티브 스트림

1-1.

List<Foo> foos = Flux.fromIterable(barList) // 리스트의 요소를 하나씩 발행하는 Flux를 생성
    .parallel() // 이를 병렬 처리를 위한 ParallelFlux로 변환
    .runOn(Schedulers.parallel()) // parallel 연산자 이후의 연산을 실행할 스케줄러를 지정
    .flatMap(b -> method(b)) // 요소마다 method를 실행한 결과를 Flux로 만들음
    .sequential() // 다시 순차적으로 처리하기 위한 Flux로 변환
    .collectList() // 결과값을 리스트로 수집
    .block(); // 결과를 동기적으로 받음

1-2. 타임아웃 적용

CountDownLatch countDownLatch = new CountDownLatch(1);
List<Foo> result = new ArrayList<>();
List<Foo> foos = Flux.fromIterable(barList)
    .parallel()
    .runOn(Schedulers.parallel())
    .flatMap(b -> method(b)) 
    .sequential()
    .doFinally((signal) -> {
                countDownLatch.countDown();
            })
    .subscribe(results::add);
    
try {
            countDownLatch.await(20000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ie) {
            log.error("ERROR!!!");
        }
  • 병렬로 실행되는 스레드에서 처리된 결과를 모아서 반환하기 위해, CountDownLatch.await() 메서드를 사용한다.
  • try catch 문 안의 CountDownLatch.await() 메서드를 호출은 스레드가 결과를 기다리도록 하는 목적으로 사용된다. 0이 되지 않고 설정한 20초가 지나면 에러가 발생하는 식이다. 20초는 MAX 값이라서 항상 20초를 기다리는 것은 아니다.
  • CountDownLatch를 사용하지 않으면 병렬로 실행되는 각각의 스레드가 처리된 결과를 추가할 때, 스레드 간 동기화 문제가 발생할 수 있다. 이는 결과적으로 results 리스트에 원하는 대로 결과가 저장되지 않거나, 중복된 결과가 포함될 수 있다.
  • 참고: CountDownLatch 사용 방법

2. 스트림 API

  • Java 8에서 추가된 Stream API의 parallel() 메서드는 스트림 요소를 병렬 처리할 수 있는 기능을 제공함
  • 주의: seq(리스트).parallel()과 같이 병렬처리를 시도하는 것은 무의미하다. 각각 순차 처리와 병렬 처리를 위한 메서드이기 때문이다. seq() 메서드를 사용하면 parallel() 메서드로 병렬 처리된 요소들이 메인 쓰레드에서 다시 순차적으로 처리된다. 그러니 그 반대로 parallelStream() 메서드를 호출하여 병렬 처리를 시작하고, map() 메서드로 요소들을 변환한 후 sequential() 메서드를 호출하여 다시 순차 처리를 시작하는 것이 올바른 방향이다.

스프링배치 병렬 처리

배치 로직 안에서 위와 같이 병렬 처리를 할 수도 있고, 스프링 배치 설정에 taskExecutor로 SimpleAsyncTaskExecutor를 지정해서 병렬 처리를 시도할 수 있다.

@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

0개의 댓글