Spring Batch ItemWriter 성능 개선

공병주(Chris)·2023년 10월 12일
0


Dandi 서비스에서는 날씨 데이터를 제공하고 있습니다. 해당 날씨 데이터는 공공데이터 포털에서 Batch를 통해 받아오고 있습니다. 하루에 8번 실행되는 해당 Batch Job은 1번의 execution당 약 40분이 소요됩니다. 따라서, Batch를 개선하고자 했습니다. 먼저 ItemWriter의 성능을 개선해보았습니다.

1. ItemWriter에서 이뤄지는 작업

@Bean
@StepScope
public ItemWriter<Weathers> itemWriters() {
    return new CompositeItemWriterBuilder<Weathers>()
            .delegates(previousWeatherItemDeletionWriter(), weatherItemWriter())
            .build();

}

@Bean
@StepScope
public ItemWriter<Weathers> previousWeatherItemDeletionWriter() {
    return items -> {
        List<Long> locationIds = items.stream()
                .map(Weathers::getWeatherLocationId)
                .collect(Collectors.toUnmodifiableList());
        weatherPersistencePort.deleteByLocationIds(locationIds);
    };
}

@Bean
@StepScope
public ItemWriter<Weathers> weatherItemWriter() {
    return items -> {
        List<Weathers> weathers = items.stream()
                .map(item -> (Weathers) item)
                .collect(Collectors.toUnmodifiableList());
        weatherPersistencePort.saveInBatch(weathers);
    };
}
  1. 기존의 날씨 정보들을 삭제한다.(delete)
  2. API Call을 통해 받아온 갱신된 날씨 정보를 추가한다.(insert)

2. delete 성능

delete 쿼리는 현재 In 절을 통해 Batch delete를 진행하고 있습니다.

@Modifying
@Query("DELETE FROM WeatherJpaEntity w WHERE w.locationAndDateTime.weatherLocationId IN :ids")
void deleteAllByWeatherLocationIds(@Param("ids") Iterable<Long> ids);
mysql> explain delete from weather where weather_location_id in (600 ~ 800); # 600~800은 편의상 표기
+----+-------------+---------+------------+-------+----------------------+----------------------+---------+-------+-------+----------+-------------+
| id | select_type | table   | partitions | type  | possible_keys        | key                  | key_len | ref   | rows  | filtered | Extra       |
+----+-------------+---------+------------+-------+----------------------+----------------------+---------+-------+-------+----------+-------------+
|  1 | DELETE      | weather | NULL       | range | weather_location_idx | weather_location_idx | 8       | const | 11256 |   100.00 | Using where |
+----+-------------+---------+------------+-------+----------------------+----------------------+---------+-------+-------+----------+-------------+
1 row in set, 1 warning (0.01 sec)

Index를 잘 타며 delete의 개선점은 없어보입니다.

Write는 아래와 같이 JpaRepository의 saveAll 메서드를 통해 날씨 정보를 저장하고 있습니다.

3. Insert 성능

@Component
public class WeatherPersistenceAdapter implements WeatherPersistencePort {

    private final WeatherRepository weatherRepository;

    @Override
    public void save(List<Weathers> weathers) {
        List<WeatherJpaEntity> weatherJpaEntities = weathers.stream()
                .map(this::parseToEntity)
                .flatMap(Collection::stream)
                .collect(Collectors.toUnmodifiableList());
        weatherRepository.saveAll(weatherJpaEntities);
    }

    private List<WeatherJpaEntity> parseToEntity(Weathers weathers) {
        long weatherLocationId = weathers.getWeatherLocationId();
        return weathers.getValues()
                .stream()
                .map(weather -> WeatherJpaEntity.ofWeather(weather, weatherLocationId))
                .collect(Collectors.toUnmodifiableList());
    }
}

현재 Insert는 JPA Repository의 saveAll 메서드를 통해 진행하고 있습니다.

3-1. saveAll 쿼리 메서드의 문제점

delete의 경우에는 1개의 쿼리로 모든 데이터들을 삭제할 수 있습니다. 하지만, insert의 경우에는 그렇지 않습니다. saveAll 메서드의 경우, JPA에서 Id 생성 전략이 IDENTITY인 엔티티 N개를 saveAll 메서드를 통해 저장한다면, Insert 쿼리가 1개로 N개를 저장하는 것이 아니라, N개의 Insert 쿼리가 실행됩니다. 따라서, 날씨 정보 10만개가 저장된다면 ChunkSize와는 무관하게 10만개의 Insert 쿼리가 실행됩니다.

3-2. Insert 쿼리 1개로 N개의 JPA Entity 저장하는 방법

그렇다면 1개의 쿼리로 N개의 엔티티를 저장하는 방법에 대해서 고민했습니다.

3-2-1. ID 생성 전략 변경

ID 생성 전략을 SEQUENCE 혹은 TABLE로 변경할 수 있습니다.

일단, SEQUENCE 방식은 DB가 Mysql이라면 사용 불가합니다.

또한, TABLE 방식의 경우에는 테이블 1개를 더 생성/관리 해야하기 때문에 좋은 방법이라고 생각되지 않았습니다.

3-2-2. saveAll 사용하지 않고 Jdbc의 batchUpdate 사용하기

다음으로, saveAll 메서드를 사용하지 않는 방법을 떠올렸습니다. JPA를 사용하는데 엔티티 저장에 saveAll을 사용하지 않는다고? 네. 저는 아래와 같이 JDBC의 SimpleJdbcInsert를 통해 1개의 Insert 쿼리를 실행하는 방식을 고려해보았습니다.

(참고)

JDBC의 batchUpdate를 사용하시려면 dataSource의 url에 rewriteBatchedStatements=true 옵션을 주셔야합니다.

@Component
public class WeatherPersistenceAdapter implements WeatherPersistencePort {

    private final WeatherRepository weatherRepository;
    private final SimpleJdbcInsert simpleJdbcInsert;

    public WeatherPersistenceAdapter(WeatherRepository weatherRepository, DataSource dataSource) {
        this.weatherRepository = weatherRepository;
        simpleJdbcInsert = new SimpleJdbcInsert(dataSource)
                .withTableName("weather")
                .usingGeneratedKeyColumns("weather_id");
    }

    @Override
    public void saveInBatch(List<Weathers> weathers) {
        simpleJdbcInsert.executeBatch(generateSqlParameterSources(weathers));
    }

    private SqlParameterSource[] generateSqlParameterSources(List<Weathers> weathers) {
        return weathers.stream()
                .map(this::generate)
                .flatMap(Arrays::stream)
                .toArray(SqlParameterSource[]::new);
    }

    private MapSqlParameterSource[] generate(Weathers weathers) {
        long weatherLocationId = aweathersgetWeatherLocationId();
        return weathers.getValues()
                .stream()
                .map(weather -> generateMapSqlParameterSource(weatherLocationId, weather))
                .toArray(MapSqlParameterSource[]::new);
    }

    private MapSqlParameterSource generateMapSqlParameterSource(long weatherLocationId, Weather weather) {
        return new MapSqlParameterSource()
                .addValue("date_time", weather.getDateTime())
                .addValue("forecasted_at", weather.getForecastedAt())
                .addValue("humidity", weather.getHumidity())
                .addValue("precipitation_amount", weather.getPrecipitationAmount())
                .addValue("precipitation_possibility", weather.getPrecipitationPossibility())
                .addValue("precipitation_type", weather.getPrecipitationType().name())
                .addValue("sky", weather.getSky().name())
                .addValue("temperature", weather.getTemperature())
                .addValue("wind_direction", weather.getWindDirection().name())
                .addValue("wind_speed", weather.getWindSpeed())
                .addValue("weather_location_id", weatherLocationId);
    }
}

(참고)

addValue 메서드를 체이닝 방식으로 사용하는 것을 보시고 addValue 메서드의 반환값이 MapSqlParameterSource 객체임을 추측하셨을텐데요. ChunkSize가 작은 해당 Job을 실행했을 때는, MapSqlParameterSource의 addValue 메서드가 새로운 객체를 계속 생성해서 메모리 문제가 있을 것 같다는 걱정이 들었습니다. 하지만, 실제로 addValue 내부 구현을 확인하고 직접 테스트해보니, 위의 맥락에서 addValue는 동일한 객체를 반환합니다. 체이닝 방식을 위해서 객체를 반환하는 것으로 보입니다.

4. 성능 측정

Batch 과정중 Processor에서 API 요청을 진행합니다. 쓰기 성능 측정에서 API 요청이라는 변인을 통제하기 위해 API 요청은 Mocking을 통해 진행했습니다. 단, 실제로 삽입하는 row의 개수는 실제와 동일하게 했습니다.

성능은 아래와 같은 쿼리를 통해 측정했습니다.

SELECT START_TIME, END_TIME FROM BATCH_JOB_EXECUTION;

쓰기 없이 실행

ItemWriter가 아무것도 하지않도록 하면 JOB 실행 시간이 0.2~0.3초 정도됩니다. 따라서, API CALL을 모킹해둔 상태에서, JOB 실행 시간의 대부분 시간은 DB에 Write하는데 소요됨을 알 수 있습니다.

START TIME                 | END_TIME
2023-10-12 08:15:44.531000 | 2023-10-12 08:15:44.858000
2023-10-12 08:16:06.813000 | 2023-10-12 08:16:07.101000
2023-10-12 08:16:23.946000 | 2023-10-12 08:16:24.241000

변경전 (약 104초)

START TIME                 | END_TIME
2023-10-12 07:48:00.004000 | 2023-10-12 07:49:52.509000 112s
2023-10-12 07:50:19.441000 | 2023-10-12 07:52:01.706000 102s
2023-10-12 07:52:40.728000 | 2023-10-12 07:54:23.144000 103s
2023-10-12 07:59:40.357000 | 2023-10-12 08:01:30.660000 110s
2023-10-12 08:04:06.886000 | 2023-10-12 08:05:51.804000 105s
2023-10-12 08:19:00.319000 | 2023-10-12 08:20:48.862000 108s
2023-10-12 08:21:17.716000 | 2023-10-12 08:22:58.598000 101s
2023-10-12 08:23:18.533000 | 2023-10-12 08:24:57.917000 99s
2023-10-12 08:25:21.708000 | 2023-10-12 08:27:05.610000 104s
2023-10-12 08:27:25.856000 | 2023-10-12 08:29:09.137000 104s

변경후 (약 7초)

START TIME                 | END_TIME
2023-10-12 07:39:26.190000 | 2023-10-12 07:39:33.862000 7s
2023-10-12 07:39:58.608000 | 2023-10-12 07:40:06.805000 8s
2023-10-12 07:41:06.052000 | 2023-10-12 07:41:13.254000 7s
2023-10-12 07:41:33.032000 | 2023-10-12 07:41:40.070000 7s
2023-10-12 07:42:08.452000 | 2023-10-12 07:42:15.313000 7s
2023-10-12 08:32:30.178000 | 2023-10-12 08:32:37.881000 7s
2023-10-12 08:32:50.559000 | 2023-10-12 08:32:57.672000 7s
2023-10-12 08:33:14.777000 | 2023-10-12 08:33:21.832000 7s
2023-10-12 08:33:39.904000 | 2023-10-12 08:33:46.819000 7s

약 15배의 Write 성능이 개선되었습니다.

0개의 댓글