이전 포스팅에서는 CSV 파일을 읽어 DB에 적재하는 과정을 다뤘다. 이번에는 외부 API(국토교통부 전/월세 실거래가)를 실시간으로 호출하여 데이터를 수집하고, 이를 서비스 필요에 맞게 가공(평균, 중앙값 계산)하여 DB에 저장하는 배치 작업을 정리한다.
외부 API 연동 시 가장 중요한 것은 네트워크 불안정성에 대한 대처(Retry)와 비정형적인 응답 포맷(JSON/XML) 처리라고 생각한다.
Spring Batch의 로직과 API 통신 로직을 분리하기 위해 Adapter 패턴을 사용했다. 이 어댑터는 국토부 API를 호출하고 필요한 필드(aptNm, jibun, deposit, monthlyRent)만 추출하여 반환한다.
1-1. URI 빌드 및 호출
공공데이터 포털의 API 키는 인코딩된 상태로 제공되기도 하고 디코딩된 상태로 제공되기도 한다. 이를 UriComponentsBuilder를 사용해 유연하게 처리했다.
public List<RentRecord> fetchMonth(String sigunguCode, YearMonth yearMonth, int pageNo, int rows){
// ... 키 처리 로직 ...
String dealYmd = yearMonth.format(DateTimeFormatter.ofPattern("yyyyMM"));
UriComponentsBuilder builder = UriComponentsBuilder
.fromHttpUrl(baseUrl)
.pathSegment(apiPath)
.queryParam("LAWD_CD", sigunguCode) // 시군구 코드
.queryParam("DEAL_YMD", dealYmd) // YYYYMM
// ... 파라미터 설정 ...
.queryParam("_type", "json"); // JSON 요청
// ... 인코딩 여부에 따른 finalUrl 생성 ...
try{
ResponseEntity<String> resp = restTemplate.getForEntity(finalUrl, String.class);
String body = resp.getBody();
// 핵심: Content-Type이 불확실할 때를 대비한 파싱 로직
JsonNode jsonNode = parseJsonWithXmlFallback(resp.getHeaders().getContentType(), body);
return extractRecords(jsonNode);
} catch (Exception e){
log.error("[API ERROR] ... ", e);
throw e;
}
}
1-2. JSON/XML 하이브리드 파싱
공공데이터 API는 _type=json을 요청해도, 에러 상황이나 특정 조건에서 XML을 반환하는 경우가 있다. 이를 해결하기 위해 JSON 파싱을 먼저 시도하고, 실패하거나 형식이 다르면 XML로 간주하여 파싱하는 로직을 추가했다.
private JsonNode parseJsonWithXmlFallback(@Nullable MediaType ct, String body) {
try {
// Content-Type이 JSON이거나, 본문이 JSON 형태({, [)로 시작하는 경우
if ((ct != null && MediaType.APPLICATION_JSON.includes(ct)) || looksLikeJson(body)) {
return objectMapper.readTree(body);
}
// 그 외에는 XML로 간주하고 파싱 시도
return xmlMapper.readTree(body.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
log.warn("parseJsonWithXmlFallback failed: {}", e.getMessage());
return objectMapper.createObjectNode();
}
}
1-3. 데이터 추출 (Jackson)
응답 구조가 짧지 않고 깊기 때문에 JsonNode.at() 메서드를 사용하여 안전하게 데이터를 추출했다.
private List<RentRecord> extractRecords(JsonNode root) {
// /response/body/items/item 경로로 직접 접근
JsonNode items = root.at("/response/body/items/item");
if (items == null || items.isMissingNode()) return List.of();
List<RentRecord> list = new ArrayList<>();
if (items.isArray()) {
for (JsonNode item : items) {
list.add(toRecord(item));
}
} else {
// 아이템이 하나일 경우 배열이 아닌 객체로 오는 경우 대비
list.add(toRecord(items));
}
return list;
}
외부 API는 언제든 실패할 수 있다. 따라서 Spring Batch의 Fault Tolerant 기능을 활용해 재시도 로직을 구성했다.
Step 설정
DwellingBatch는 WorkItem(작업 단위)을 읽어 API를 호출하고 결과를 저장한다.
@Bean
public Step dwellingStep(ItemReader<WorkItem> dwellingReader,
ItemProcessor<WorkItem, DwellingUpsertDTO> dwellingProcessor,
JdbcBatchItemWriter<DwellingUpsertDTO> dwellingWriterJdbc) {
return new StepBuilder("dwellingStep", jobRepository)
.<WorkItem, DwellingUpsertDTO>chunk(10, platformTransactionManager)
.reader(dwellingReader)
.processor(dwellingProcessor)
.writer(dwellingWriterJdbc)
.faultTolerant() // falutTolerant 활성화
.retry(org.springframework.web.client.ResourceAccessException.class) // Read Timeout 등
.retry(java.net.SocketTimeoutException.class) // 소켓 연결 시간 초과
.retryLimit(3) // 최대 3회 재시도
.backOffPolicy(new FixedBackOffPolicy() {{ setBackOffPeriod(1000L); }}) // 재시도 전 1초 대기
.build();
}
retry(): 네트워크 타임아웃 발생 시 즉시 실패 처리하지 않고 재시도한다.
backOffPolicy(): API 서버 부하를 고려하여 1초 대기 후 재시도한다.
Reader: 작업 단위(WorkItem) 생성
DB에 있는 데이터를 직접 퍼올리는 것이 아니라, "어떤 시군구의, 언제부터 언제까지 데이터를 수집할 것인가"를 정의하는 WorkItem 객체를 만들어 Reader로 넘겼다.
@Bean
@StepScope
public ItemReader<WorkItem> dwellingReader(
@Value("#{jobParameters['dealYmd']}") String dealYmd,
@Value("#{jobParameters['months']}") Long months
) {
if (dealYmd == null || months == null) {
throw new BusinessException(NOT_FOUND_YEARMONTH,"dealYmd or months is null");
}
// 날짜 범위 계산
YearMonth to = YearMonth.parse(dealYmd, DateTimeFormatter.ofPattern("yyyyMM"));
YearMonth from = to.minusMonths(months - 1);
// 모든 시군구에 대해 작업 아이템 생성
List<WorkItem> items = sigunguRepository.findAll().stream()
.map(s -> new WorkItem(s.getSigunguCode(), from, to))
.toList();
return new IteratorItemReader<>(items);
}
Processor: API 호출 및 집계
Processor에서는 WorkItem을 받아 실제 API를 호출(adapter.fetchMonth)하고, 받아온 데이터를 기반으로 전세/월세 평균 및 중앙값을 계산했다.
@Bean
public ItemProcessor<WorkItem, DwellingUpsertDTO> dwellingProcessor() {
return work -> {
List<RentRecord> all = new ArrayList<>();
// 요청된 기간(months)만큼 반복하며 API 호출
for (YearMonth ym = work.from(); !ym.isAfter(work.to()); ym = ym.plusMonths(1)) {
List<RentRecord> records = adapter.fetchMonth(work.sigunguCode(), ym, 1, 1000);
all.addAll(records);
}
// 전세/월세 데이터 분리
List<Integer> monthValues = all.stream()
.map(RentRecord::getMonthlyRent)
.filter(v -> v != null && v > 0)
.toList();
List<Integer> jeonseValues = all.stream()
// ... 전세 필터링 ...
.toList();
// 데이터가 없으면 Skip
if (monthValues.isEmpty() && jeonseValues.isEmpty()) return null;
// 통계 DTO 변환 및 Upsert DTO 생성
DwellingDTO dwellingDTO = toDTO(monthValues, jeonseValues, work.sigunguCode());
return DwellingUpsertDTO.builder()
.sigunguCode(dwellingDTO.getSigunguCode())
.jeonseMid(dwellingDTO.getJeonseMid())
// ... 값 설정 ...
.build();
};
}
Writer: JDBC Upsert
가공된 통계 데이터는 JdbcBatchItemWriter를 사용해 DB에 저장했다. 이미 존재하는 시군구 데이터라면 업데이트(UPDATE)하고, 없으면 삽입(INSERT)하는 Upsert 쿼리를 사용했다.
@Bean
public JdbcBatchItemWriter<DwellingUpsertDTO> dwellingWriterJdbc() {
final String upsertSql = """
INSERT INTO Dwelling (sigungu_code, month_avg, month_mid, jeonse_avg, jeonse_mid)
VALUES (:sigunguCode, :monthAvg, :monthMid, :jeonseAvg, :jeonseMid)
ON DUPLICATE KEY UPDATE
month_avg = VALUES(month_avg),
// ...
""";
// ... Writer 설정 ...
}
서버가 구동될 때 ApplicationReadyEvent를 통해 배치 Job을 실행한다. 이때 JobParameters로 기준 날짜(dealYmd)와 기간(months)을 전달한다.
@Order(10)
@EventListener(ApplicationReadyEvent.class)
public void runOnceAfterStartup() throws Exception {
// 멱등성 체크 (이미 수행된 버전이면 패스)
if(guard.alreadyDone("dwellingJob", SEED_VERSION)) return;
JobParameters params = new JobParametersBuilder()
.addString("dealYmd", "202509") // 예시: 9월 기준
.addLong("months", 12L) // 최근 1년치 데이터
.addString("seedVersion", SEED_VERSION)
.toJobParameters();
jobLauncher.run(dwellingJob, params);
}
이번 포스팅은 Spring Batch 안에서 외부 API를 호출하는 패턴을 정리했다.
핵심을 정리하자면
- Adapter 분리 : API 호출 로직을 Reader/Processor와 분리하여 재사용성을 높였다.
- Fault Tolerant : retry와 backOffPolicy를 통해 네트워크 장애에 유연하게 대처했다.
- JSON/XML Fallback : 공공데이터 API의 불규칙한 응답 포맷을 방어 로직으로 해결했다.
배치 작업은 단순히 DB 간의 이동뿐만 아니라, 이처럼 외부 데이터를 수집하고 가공하는 파이프라인으로도 훌륭하게 활용될 수 있음을 직접 프로젝트에 적용함으로써 확인했다.