[지방어때] Spring Batch 를 이용한 API->DB 데이터 적재

이현우·2026년 1월 1일

지방어때

목록 보기
3/4
post-thumbnail

국토부 실거래가 API 연동 및 대용량 데이터 가공

이전 포스팅에서는 CSV 파일을 읽어 DB에 적재하는 과정을 다뤘다. 이번에는 외부 API(국토교통부 전/월세 실거래가)를 실시간으로 호출하여 데이터를 수집하고, 이를 서비스 필요에 맞게 가공(평균, 중앙값 계산)하여 DB에 저장하는 배치 작업을 정리한다.

외부 API 연동 시 가장 중요한 것은 네트워크 불안정성에 대한 대처(Retry)와 비정형적인 응답 포맷(JSON/XML) 처리라고 생각한다.

1. API 통신을 위한 Adapter 구현

Spring Batch의 로직과 API 통신 로직을 분리하기 위해 Adapter 패턴을 사용했다. 이 어댑터는 국토부 API를 호출하고 필요한 필드(aptNm, jibun, deposit, monthlyRent)만 추출하여 반환한다.

1-1. URI 빌드 및 호출

공공데이터 포털의 API 키는 인코딩된 상태로 제공되기도 하고 디코딩된 상태로 제공되기도 한다. 이를 UriComponentsBuilder를 사용해 유연하게 처리했다.

public List<RentRecord> fetchMonth(String sigunguCode, YearMonth yearMonth, int pageNo, int rows){
    // ... 키 처리 로직 ...
    String dealYmd = yearMonth.format(DateTimeFormatter.ofPattern("yyyyMM"));

    UriComponentsBuilder builder = UriComponentsBuilder
            .fromHttpUrl(baseUrl)
            .pathSegment(apiPath)
            .queryParam("LAWD_CD", sigunguCode)  // 시군구 코드
            .queryParam("DEAL_YMD", dealYmd)     // YYYYMM
            // ... 파라미터 설정 ...
            .queryParam("_type", "json"); // JSON 요청

    // ... 인코딩 여부에 따른 finalUrl 생성 ...

    try{
        ResponseEntity<String> resp = restTemplate.getForEntity(finalUrl, String.class);
        String body = resp.getBody();
        // 핵심: Content-Type이 불확실할 때를 대비한 파싱 로직
        JsonNode jsonNode = parseJsonWithXmlFallback(resp.getHeaders().getContentType(), body);
        return extractRecords(jsonNode);
    } catch (Exception e){
        log.error("[API ERROR] ... ", e);
        throw e;
    }
}

1-2. JSON/XML 하이브리드 파싱

공공데이터 API는 _type=json을 요청해도, 에러 상황이나 특정 조건에서 XML을 반환하는 경우가 있다. 이를 해결하기 위해 JSON 파싱을 먼저 시도하고, 실패하거나 형식이 다르면 XML로 간주하여 파싱하는 로직을 추가했다.

private JsonNode parseJsonWithXmlFallback(@Nullable MediaType ct, String body) {
    try {
        // Content-Type이 JSON이거나, 본문이 JSON 형태({, [)로 시작하는 경우
        if ((ct != null && MediaType.APPLICATION_JSON.includes(ct)) || looksLikeJson(body)) {
            return objectMapper.readTree(body);
        }
        // 그 외에는 XML로 간주하고 파싱 시도
        return xmlMapper.readTree(body.getBytes(StandardCharsets.UTF_8));
    } catch (Exception e) {
        log.warn("parseJsonWithXmlFallback failed: {}", e.getMessage());
        return objectMapper.createObjectNode();
    }
}

1-3. 데이터 추출 (Jackson)

응답 구조가 짧지 않고 깊기 때문에 JsonNode.at() 메서드를 사용하여 안전하게 데이터를 추출했다.

private List<RentRecord> extractRecords(JsonNode root) {
    // /response/body/items/item 경로로 직접 접근
    JsonNode items = root.at("/response/body/items/item");
    if (items == null || items.isMissingNode()) return List.of();

    List<RentRecord> list = new ArrayList<>();
    if (items.isArray()) {
        for (JsonNode item : items) {
            list.add(toRecord(item));
        }
    } else {
        // 아이템이 하나일 경우 배열이 아닌 객체로 오는 경우 대비
        list.add(toRecord(items));
    }
    return list;
}

2. Batch Job 구성

외부 API는 언제든 실패할 수 있다. 따라서 Spring Batch의 Fault Tolerant 기능을 활용해 재시도 로직을 구성했다.

  1. Step 설정

    DwellingBatch는 WorkItem(작업 단위)을 읽어 API를 호출하고 결과를 저장한다.

    @Bean
    public Step dwellingStep(ItemReader<WorkItem> dwellingReader,
                             ItemProcessor<WorkItem, DwellingUpsertDTO> dwellingProcessor,
                             JdbcBatchItemWriter<DwellingUpsertDTO> dwellingWriterJdbc) {
        return new StepBuilder("dwellingStep", jobRepository)
                .<WorkItem, DwellingUpsertDTO>chunk(10, platformTransactionManager)
                .reader(dwellingReader)
                .processor(dwellingProcessor)
                .writer(dwellingWriterJdbc)
                .faultTolerant() // falutTolerant 활성화
                .retry(org.springframework.web.client.ResourceAccessException.class) // Read Timeout 등
                .retry(java.net.SocketTimeoutException.class) // 소켓 연결 시간 초과
                .retryLimit(3) // 최대 3회 재시도
                .backOffPolicy(new FixedBackOffPolicy() {{ setBackOffPeriod(1000L); }}) // 재시도 전 1초 대기
                .build();
    }
    • retry(): 네트워크 타임아웃 발생 시 즉시 실패 처리하지 않고 재시도한다.

    • backOffPolicy(): API 서버 부하를 고려하여 1초 대기 후 재시도한다.

  2. Reader: 작업 단위(WorkItem) 생성

    DB에 있는 데이터를 직접 퍼올리는 것이 아니라, "어떤 시군구의, 언제부터 언제까지 데이터를 수집할 것인가"를 정의하는 WorkItem 객체를 만들어 Reader로 넘겼다.

    @Bean
    @StepScope
    public ItemReader<WorkItem> dwellingReader(
            @Value("#{jobParameters['dealYmd']}") String dealYmd,
            @Value("#{jobParameters['months']}") Long months
    ) {
        if (dealYmd == null || months == null) {
            throw new BusinessException(NOT_FOUND_YEARMONTH,"dealYmd or months is null");
        }
    
        // 날짜 범위 계산
        YearMonth to = YearMonth.parse(dealYmd, DateTimeFormatter.ofPattern("yyyyMM"));
        YearMonth from = to.minusMonths(months - 1);
    
        // 모든 시군구에 대해 작업 아이템 생성
        List<WorkItem> items = sigunguRepository.findAll().stream()
                .map(s -> new WorkItem(s.getSigunguCode(), from, to))
                .toList();
    
        return new IteratorItemReader<>(items);
    }
  3. Processor: API 호출 및 집계

    Processor에서는 WorkItem을 받아 실제 API를 호출(adapter.fetchMonth)하고, 받아온 데이터를 기반으로 전세/월세 평균 및 중앙값을 계산했다.

    @Bean
    public ItemProcessor<WorkItem, DwellingUpsertDTO> dwellingProcessor() {
        return work -> {
            List<RentRecord> all = new ArrayList<>();
            // 요청된 기간(months)만큼 반복하며 API 호출
            for (YearMonth ym = work.from(); !ym.isAfter(work.to()); ym = ym.plusMonths(1)) {
                List<RentRecord> records = adapter.fetchMonth(work.sigunguCode(), ym, 1, 1000);
                all.addAll(records);
            }
    
            // 전세/월세 데이터 분리
            List<Integer> monthValues = all.stream()
                    .map(RentRecord::getMonthlyRent)
                    .filter(v -> v != null && v > 0)
                    .toList();
    
            List<Integer> jeonseValues = all.stream()
                    // ... 전세 필터링 ...
                    .toList();
    
            // 데이터가 없으면 Skip
            if (monthValues.isEmpty() && jeonseValues.isEmpty()) return null;
    
            // 통계 DTO 변환 및 Upsert DTO 생성
            DwellingDTO dwellingDTO = toDTO(monthValues, jeonseValues, work.sigunguCode());
    
            return DwellingUpsertDTO.builder()
                    .sigunguCode(dwellingDTO.getSigunguCode())
                    .jeonseMid(dwellingDTO.getJeonseMid())
                    // ... 값 설정 ...
                    .build();
        };
    }
  4. Writer: JDBC Upsert

    가공된 통계 데이터는 JdbcBatchItemWriter를 사용해 DB에 저장했다. 이미 존재하는 시군구 데이터라면 업데이트(UPDATE)하고, 없으면 삽입(INSERT)하는 Upsert 쿼리를 사용했다.

    @Bean
    public JdbcBatchItemWriter<DwellingUpsertDTO> dwellingWriterJdbc() {
        final String upsertSql = """
        INSERT INTO Dwelling (sigungu_code, month_avg, month_mid, jeonse_avg, jeonse_mid)
        VALUES (:sigunguCode, :monthAvg, :monthMid, :jeonseAvg, :jeonseMid)
        ON DUPLICATE KEY UPDATE
            month_avg  = VALUES(month_avg),
            // ...
        """;
        // ... Writer 설정 ...
    }

3. 실행 (Runner)

서버가 구동될 때 ApplicationReadyEvent를 통해 배치 Job을 실행한다. 이때 JobParameters로 기준 날짜(dealYmd)와 기간(months)을 전달한다.

@Order(10)
@EventListener(ApplicationReadyEvent.class)
public void runOnceAfterStartup() throws Exception {
      // 멱등성 체크 (이미 수행된 버전이면 패스)
      if(guard.alreadyDone("dwellingJob", SEED_VERSION)) return;

      JobParameters params = new JobParametersBuilder()
              .addString("dealYmd", "202509") // 예시: 9월 기준
              .addLong("months", 12L)         // 최근 1년치 데이터
              .addString("seedVersion", SEED_VERSION)
              .toJobParameters();

      jobLauncher.run(dwellingJob, params);
}

이번 포스팅은 Spring Batch 안에서 외부 API를 호출하는 패턴을 정리했다.

핵심을 정리하자면

  • Adapter 분리 : API 호출 로직을 Reader/Processor와 분리하여 재사용성을 높였다.
  • Fault Tolerant : retry와 backOffPolicy를 통해 네트워크 장애에 유연하게 대처했다.
  • JSON/XML Fallback : 공공데이터 API의 불규칙한 응답 포맷을 방어 로직으로 해결했다.

배치 작업은 단순히 DB 간의 이동뿐만 아니라, 이처럼 외부 데이터를 수집하고 가공하는 파이프라인으로도 훌륭하게 활용될 수 있음을 직접 프로젝트에 적용함으로써 확인했다.

profile
백앤드 개발자

0개의 댓글