Spring Batch 외부API 호출 후 DB 저장

김상인·2025년 5월 7일

Spring Batch

목록 보기
1/1


스프링 배치를 구성해 주기적으로 실행하던 중 슬로우 쿼리가 발생해 이를 개선한 내용을 작성한 글이다.

문제 사항

외부 API를 통해 최대 1,000건씩 페이징 방식으로 데이터를 조회할 수 있다.
이 API를 반복 호출해 수만 건의 데이터를 가져온 뒤 DB에 저장된 기존 데이터와 비교해 변경된 항목은 수정하고 신규 데이터는 저장하려면 스프링 배치를 어떻게 구성하는 것이 좋을까?


기존 프로세스

  1. 페이징이 끝날 때까지 외부 API를 반복 호출하여 모든 데이터를 메모리에 저장
  2. 비교를 위해 기준이 되는 기존 데이터를 DB에서 모두 조회하여 메모리에 저장
  3. API에서 가져온 데이터와 DB에서 가져온 데이터를 비교해 DB에 반영할 데이터 선별
  4. 필터링된 데이터를 chunk 단위로 DB에 수정 또는 저장

위와 같은 과정으로 처리하고 있었지만 2번 과정에서 슬로우 쿼리가 발생했다.
수많은 데이터를 한 번에 조회했기 때문이다.

또 어떤 문제가 발생할 수 있을까?


문제점

  1. 외부 API와 DB의 모든 데이터를 한꺼번에 메모리에 적재하기 때문에 OOM 발생 위험
  2. DB 오류 시에도 외부 API는 최대 페이지까지 호출하기 때문에 불필요한 API 호출 발견
  3. 데이터가 지속적으로 증가할 경우 현재 방식은 확장성이 떨어져 장기적인 운영에 부적합

해답은 간단하다.
페이징 크기 단위로 데이터를 처리하도록 Reader - Processor - Writer 를 구성한 배치 Job으로 처리하면 된다.


개선 프로세스

  1. 외부 API를 페이징 단위로 호출하여 데이터를 메모리에 저장
  2. 비교를 위해 기준이 되는 기존 데이터를 외부 API 호출로 얻은 페이징 크기만큼 DB에서 조회하여 메모리에 저장
  3. API에서 가져온 데이터와 DB에서 가져온 데이터를 비교해 DB에 반영할 데이터 선별
  4. 필터링된 데이터를 DB에 수정 또는 저장
  5. 1~3번 과정을 반복하며 지정한 chunk 크기만큼 DB에 반영할 데이터가 쌓이면 4번 과정 실행 (이런 과정을 외부 API 페이징 조회가 끝날 때까지 반복)
  6. 외부 API의 페이징이 끝날 때까지 1~5번 반복 수행

위 프로세스를 코드로 구현해보자


구현

chunk 기반으로 Reader - Writer 만 구성해서 해결하였다.

Job

@Configuration
public class TestBatchConfiguration {
	Bean
	public Job excuteJob(JobRepository jobRepository, Step step) {
		return new JobBuilder("testJob", jobRepository)
			.start(step)
			.incrementer(new RunIdIncrementer())
			.build();
	}

	@Bean
	public Step step(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
		return new StepBuilder("testStep", jobRepository)
			.<~>chunk(1000, platformTransactionManager)
			.reader(apiPagingItemReader)
			.writer(writer())
			.build();
	}
}

배치 잡은 위와 같이 간단하게 구성하였다.
눈여겨 볼 점은 apiPagingItemReader(reader) 로 커스텀으로 구현한 것이며 writer는 단순하게 batch insert를 하는 용도이므로 넘어가도록 하겠다.

Reader

@Component
public class ApiPagingItemReader implements ItemReader<Item> {
	private static final int PAGE_SIZE = 1000;

	private final Queue<Item> buffer = new LinkedList<>();
    private final ItemService itemService;
	private final ApiClient apiClient;

	private int page = 1;
	private boolean lastPageReached;

	@Override
	public Employee read() throws
		Exception,
		UnexpectedInputException,
		ParseException,
		NonTransientResourceException {
        // 이미 버퍼에 저장된 데이터가 있다면 하나씩 꺼내어 그 값을 반환
		if (!buffer.isEmpty()) {
			return buffer.poll();
		}

		// 마지막 페이지에 도달했으면 더이상 호출할 필요 없음
		if (lastPageReached) {
			return null;
		}

		while (true) {
			// 외부 API 호출
			List<Item> items = apiClient.request(page++);

			// 외부 API 값이 비어있다면 종료
			if (CollectionUtils.isEmpty(items)) {
				return null;
			}

			// 불필요한 API 호출을 막기 위해 마지막 페이지에 도달했는지 확인
			if (items.size() < PAGE_SIZE) {
				lastPageReached = true;
			}

			// 외부API에서 받아온 데이터 중 DB에 반영할 데이터만 추출
			List<Item> upsertItems = itemService.extractNewAndModified(items);
            
            // 추출한 데이터가 비어있다면 다시 외부API 호출
			if (CollectionUtils.isEmpty(newItems)) {
				continue;
			}

			// 필터링한 데이터를 버퍼에 저장
			buffer.addAll(newItems);
			return buffer.poll();
		}
	}
}

read()는 데이터를 하나씩 읽으며 지정된 chunk 크기만큼 모이면 write()를 호출하는 형태이고 데이터가 더 이상 없으면 즉 null을 반환하면 그 시점까지 쌓인 데이터를 처리한 뒤 배치는 종료된다.
이 흐름을 인지하고 구현해야 된다.

커스텀으로 구현한 reader 흐름은 다음과 같다.

  1. 버퍼에 데이터가 있으면 반환한다.
  2. 외부API를 호출해서 데이터를 조회한다.
    2-1. 데이터가 없다면 reader() 종료
  3. 가져온 데이터 중 DB에 반영할 데이터만 추출한다.
    3-1. 데이터가 없다면 2번으로 이동
    3-2. 데이터가 있다면 버퍼에 저장

3-1번 과정에서 2번으로 이동하는 이유는 외부API의 최대 페이지까지 조회를 안했기 때문이고 null을 반환해서는 안된다.


결과

더 이상 슬로우 쿼리는 발생하지 않았고, 지정한 chunk 크기 단위로 데이터를 처리하기 때문에 OOM 발생 위험도 사라졌다.

외부 API에서 받은 데이터를 그대로 저장했다면 더 쉽게 구현할 수 있었겠지만 필요한 데이터만 선별해 저장해야 했기에 고민과 구현에 시간이 다소 걸렸다ㅜㅜㅜ

profile
백엔드 희망자

0개의 댓글