외부 API를 호출하여 날씨 데이터를 읽어오고, 읽어온 데이터를 DB에 저장하는 Batch 작업을 구현하여 보자.
주간날씨 정보를 기상청 open API(이하 ‘외부 API’)를 통해 호출하여 데이터를 읽어오고, 읽어온 데이터를 DB에 저장하는 Batch 작업을 구현해보려 한다.
혹시 외부 API 호출하는 작업이나 이 프로젝트 관련하여 혹시 궁금하신 점이 있으신 분들은 아래에서 전체 코드를 확인할 수 있다.
@Scheduled(cron = "${schedules.cron.update.weather}")
@Transactional
public void updateLongTermWeather()
throws URISyntaxException, NoSuchFieldException, IllegalAccessException {
LocalDateTime now = LocalDateTime.now();
List<Location> allLocation = locationRepository.findAll();
for (Location location : allLocation) {
WeeklyForecastItem forecast = getWeeklyForecastFromApi(now,
location.getLocationLandCode());
WeeklyTemperatureItem temperature = getWeeklyTemperatureFromApi(now,
location.getLocationCode());
for (int i = 2; i <= 6; i++) {
String minTemperature = "minTemperaturePlus" + i + "Days";
String maxTemperature = "maxTemperaturePlus" + i + "Days";
String morningForecast = "weatherForecastAmPlus" + i + "Days";
String afternoonForecast = "weatherForecastPmPlus" + i + "Days";
WeeklyWeather weeklyWeather = WeeklyWeather.builder()
.date(now.toLocalDate().plusDays(i))
.minTemperature(
getField(temperature.getClass(), minTemperature).getInt(temperature))
.maxTemperature(
getField(temperature.getClass(), maxTemperature).getInt(temperature))
.morningWeatherForecast(
getField(forecast.getClass(), morningForecast).get(forecast).toString())
.afternoonWeatherForecast(
getField(forecast.getClass(), afternoonForecast).get(forecast).toString())
.location(location)
.build();
weeklyWeatherRepository.save(weeklyWeather);
}
}
weeklyWeatherRepository.deleteAllByCreatedAtBefore(now);
}
@Bean
public JpaPagingItemReader<Location> locationReader(EntityManagerFactory emf) {
return new JpaPagingItemReaderBuilder<Location>()
.name("locationReader")
.entityManagerFactory(emf)
.queryString("select L from Location L")
.pageSize(5)
.build();
}
@Scheduled(cron = "${schedules.cron.update.weather}")
@Transactional
public void updateLongTermWeather()
throws URISyntaxException, NoSuchFieldException, IllegalAccessException {
LocalDateTime now = LocalDateTime.now();
List<Location> allLocation = locationRepository.findAll();
for (Location location : allLocation) {
List<WeeklyWeather> weeklyWeathers = getWeeklyWeatherByLocation(now, location);
weeklyWeatherRepository.saveAll(weeklyWeathers);
}
weeklyWeatherRepository.deleteAllByCreatedAtBefore(now);
}
public List<WeeklyWeather> getWeeklyWeatherByLocation(LocalDateTime now,
Location location)
throws URISyntaxException, NoSuchFieldException, IllegalAccessException {
List<WeeklyWeather> weeklyWeathers = new CopyOnWriteArrayList<>();
WeeklyForecastItem forecast = getWeeklyForecastFromApi(now,
location.getLocationLandCode());
WeeklyTemperatureItem temperature = getWeeklyTemperatureFromApi(now,
location.getLocationCode());
for (int i = 2; i <= 6; i++) {
String minTemperature = "minTemperaturePlus" + i + "Days";
String maxTemperature = "maxTemperaturePlus" + i + "Days";
String morningForecast = "weatherForecastAmPlus" + i + "Days";
String afternoonForecast = "weatherForecastPmPlus" + i + "Days";
WeeklyWeather weeklyWeather = WeeklyWeather.builder()
.date(now.toLocalDate().plusDays(i))
.minTemperature(
getField(temperature.getClass(), minTemperature).getInt(temperature))
.maxTemperature(
getField(temperature.getClass(), maxTemperature).getInt(temperature))
.morningWeatherForecast(
getField(forecast.getClass(), morningForecast).get(forecast).toString())
.afternoonWeatherForecast(
getField(forecast.getClass(), afternoonForecast).get(forecast).toString())
.location(location)
.build();
weeklyWeathers.add(weeklyWeather);
}
weeklyWeatherRepository.deleteAllByLocation(location);
return weeklyWeathers;
}
@RequiredArgsConstructor
@Setter
public class LocationItemProcessor implements ItemProcessor<Location, List<WeeklyWeather>> {
// 날씨 호출 서비스
private final LongTermWeatherService longTermWeatherService;
private LocalDateTime now;
@Override
public List<WeeklyWeather> process(Location item) throws Exception {
return longTermWeatherService.getWeeklyWeatherByLocation(now, item);
}
}
@Bean
public LocationItemProcessor locationProcessor(LongTermWeatherService longTermWeatherService) {
LocationItemProcessor locationItemProcessor = new LocationItemProcessor(
longTermWeatherService);
locationItemProcessor.setNow(LocalDateTime.now());
return locationItemProcessor;
}
public class WeeklyWeatherItemWriter<T> implements ItemWriter<List<T>> {
private JpaItemWriter<T> wrapped;
public WeeklyWeatherItemWriter(JpaItemWriter<T> wrapped) {
this.wrapped = wrapped;
}
@Override
public void write(Chunk<? extends List<T>> items) {
Chunk<T> chunk = new Chunk<>();
for (List<T> subList : items) {
chunk.addAll(subList);
}
wrapped.write(chunk);
}
}
@Bean
public WeeklyWeatherItemWriter<WeeklyWeather> weeklyWeatherWriter(
EntityManagerFactory emf) {
JpaItemWriter<WeeklyWeather> writer = new JpaItemWriterBuilder<WeeklyWeather>()
.entityManagerFactory(emf)
.usePersist(true)
.build();
return new WeeklyWeatherItemWriter<>(writer);
}
@Bean
public Step weeklyWeatherStep(JobRepository jobRepository,
JpaTransactionManager transactionManager,
JpaPagingItemReader<Location> reader,
ItemProcessor<Location, List<WeeklyWeather>> processor,
WeeklyWeatherItemWriter<WeeklyWeather> writer) {
return new StepBuilder("weeklyWeatherStep", jobRepository)
.<Location, List<WeeklyWeather>>chunk(chunkSize, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Configuration
public class JpaTransactionConfiguration {
@Bean
public JpaTransactionManager transactionManager() {
return new JpaTransactionManager();
}
}
@Bean
public Job weeklyWeatherJob(JobRepository jobRepository, Step weeklyWeatherStep) {
return new JobBuilder("weeklyWeatherJob", jobRepository)
.start(weeklyWeatherStep)
.build();
}
spring.batch.jdbc.initialize-schema=*always*
LocalContainerEntityManagerFactoryBean
객체에서 EntityManagerFactory를 반환하는 변수명이 emf로 되어 있는 것을 발견하여, 주입받는 EntityManagerFactory의 property의 변수명을 emf로 변경하였더니 정상 작동하였다. Jpa를 사용하기 위한 EntityManagerFactory bean id가 emf로 등록되어 있는듯하다.@Configuration
@RequiredArgsConstructor
public class WeeklyWeatherJobConfiguration {
@Value("${spring.batch.chunkSize}")
private int chunkSize;
@Value("${spring.batch.pageSize}")
private int pageSize;
private final EntityManagerFactory emf;
private final LongTermWeatherService longTermWeatherService;
private final JobRepository jobRepository;
private final JpaTransactionManager transactionManager;
@Bean
public JpaPagingItemReader<Location> locationReader() {
return new JpaPagingItemReaderBuilder<Location>()
.name("locationReader")
.entityManagerFactory(emf)
.queryString("select L from Location L")
.pageSize(pageSize)
.build();
}
@Bean
public ItemProcessor<Location, List<WeeklyWeather>> locationProcessor() {
LocationItemProcessor locationItemProcessor = new LocationItemProcessor(
longTermWeatherService);
locationItemProcessor.setNow(LocalDateTime.now());
return locationItemProcessor;
}
@Bean
public WeeklyWeatherItemWriter<WeeklyWeather> weeklyWeatherWriter() {
JpaItemWriter<WeeklyWeather> writer = new JpaItemWriterBuilder<WeeklyWeather>()
.entityManagerFactory(emf)
.usePersist(true)
.build();
return new WeeklyWeatherItemWriter<>(writer);
}
@Bean
public Step weeklyWeatherStep() {
return new StepBuilder("weeklyWeatherStep", jobRepository)
.<Location, List<WeeklyWeather>>chunk(chunkSize, transactionManager)
.reader(locationReader())
.processor(locationProcessor())
.writer(weeklyWeatherWriter())
.build();
}
@Bean
public Job weeklyWeatherJob() {
return new JobBuilder("weeklyWeatherJob", jobRepository)
.start(weeklyWeatherStep())
.build();
}
}
@Component
@Slf4j
@RequiredArgsConstructor
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final WeeklyWeatherJobConfiguration weeklyWeatherJobConfiguration;
@Scheduled(cron = "${schedules.cron.update.weather}", zone = "Asia/Seoul")
public void runGetWeeklyWeatherJob() {
JobParameters jobParameters = new JobParametersBuilder()
.addDate("date", new Date())
.toJobParameters();
try {
log.info("WeeklyWeatherJobConfiguration.run() start");
jobLauncher.run(weeklyWeatherJobConfiguration.weeklyWeatherJob(), jobParameters);
} catch (Exception e) {
log.error("WeeklyWeatherJobConfiguration.run() error", e);
}
}
}
testImplementation 'org.springframework.batch:spring-batch-test'
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
class WeeklyWeatherJobConfigurationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private WeeklyWeatherJobConfiguration weeklyWeatherJobConfiguration;
@Autowired
private LocationRepository locationRepository;
@Autowired
private WeeklyWeatherRepository weeklyWeatherRepository;
@BeforeEach
void setUp() {
saveTestData();
}
@Test
public void testJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addDate("date", new Date())
.toJobParameters();
jobLauncherTestUtils.setJob(weeklyWeatherJobConfiguration.weeklyWeatherJob());
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
assertEquals("COMPLETED", jobExecution.getExitStatus().getExitCode());
checkBatchData();
}
private void checkBatchData() {
LocalDate now = LocalDate.now();
List<Location> locationList = locationRepository.findAll();
for (Location location : locationList) {
List<WeeklyWeather> weeklyWeathers = weeklyWeatherRepository.findAllByLocation(
location);
weeklyWeathers.sort(Comparator.comparing(WeeklyWeather::getDate));
for (int i = 2; i <= 6; i++) {
assertEquals(weeklyWeathers.get(i - 2).getDate(), now.plusDays(i));
}
}
}
private void saveTestData() {
List<Location> locationList = new ArrayList<>();
List<WeeklyWeather> weeklyWeatherList = new ArrayList<>();
BufferedReader br = null;
try {
File csv = new File("src/main/resources/location/location.csv");
br = new BufferedReader(new FileReader(csv));
String line = br.readLine(); // 제일 첫줄이 제목이라 한 줄 읽고 시작
while ((line = br.readLine()) != null) {
String[] input = line.split(",");
Location location = Location.builder()
.sido(input[0])
.sigungu(input[1])
.xCoordinate(Integer.parseInt(input[2]))
.yCoordinate(Integer.parseInt(input[3]))
.locationCode(input[4])
.locationLandCode(input[5])
.build();
locationList.add(location);
LocalDate now = LocalDate.now();
for (int i = 1; i <= 5; i++) {
WeeklyWeather weeklyWeather = WeeklyWeather.builder()
.location(location)
.maxTemperature(i)
.minTemperature(i)
.afternoonWeatherForecast(String.valueOf(i))
.morningWeatherForecast(String.valueOf(i))
.date(now.plusDays(i))
.build();
weeklyWeatherList.add(weeklyWeather);
}
}
locationRepository.saveAll(locationList);
weeklyWeatherRepository.saveAll(weeklyWeatherList);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (br != null) {
br.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
testImplementation 'org.awaitility:awaitility'
@SpringBootTest(properties = {"schedules.cron.update.weather=* * * * * *",
"schedules.zone.update.weather=Asia/Seoul"})
class BatchSchedulerTest {
@SpyBean
private BatchScheduler batchScheduler;
@Test
void schedulerActionTest() {
await()
.atMost(Duration.of(1500, ChronoUnit.MILLIS))
.untilAsserted(() -> verify(batchScheduler, atLeastOnce()).runGetWeeklyWeatherJob());
}
@Test
@DisplayName("scheduled cron 표현식 확인")
void batchScheduleCronTest() {
String cronExpression = "0 5 6,18 * * *";
String zone = "Asia/Seoul";
Instant initialTime = LocalDateTime.of(2024, 7, 9, 20, 50)
.atZone(ZoneId.of("Asia/Seoul")).toInstant();
List<Instant> expectedTimes = Stream.of(
LocalDateTime.of(2024, 7, 10, 6, 5),
LocalDateTime.of(2024, 7, 10, 18, 5),
LocalDateTime.of(2024, 7, 11, 6, 5)
).map(time -> time.atZone(ZoneId.of("Asia/Seoul")).toInstant()).toList();
ScheduleTestUtils.assertCronExpression(cronExpression, zone, initialTime, expectedTimes);
}
public static class ScheduleTestUtils {
public static void assertCronExpression(String cronExpression, String zone,
Instant initialTime, List<Instant> expectedTimes) {
CronTrigger trigger = getTrigger(cronExpression, zone);
SimpleTriggerContext context = new SimpleTriggerContext(initialTime,
initialTime, initialTime);
for (Instant expected : expectedTimes) {
Instant actual = trigger.nextExecution(context);
assertEquals(actual, expected);
context.update(actual, actual, actual);
}
}
private static CronTrigger getTrigger(String cronExpression, String zone) {
if (StringUtils.hasText(zone)) {
return new CronTrigger(cronExpression,
StringUtils.parseTimeZoneString(zone));
} else {
return new CronTrigger(cronExpression);
}
}
}
}
spring-batch를 구현하는데 성공하였으나 아쉬운 부분이 있다. 기왕이면 spring-batch를 비동기 방식으로 사용하여 성능을 개선하였으면 하는 부분이다. 비동기 적용 사항은 추후 별도의 글로 정리해보도록 하겠다.