오 이렇게 ItemReader를 구성했을때 100건을 가져와서 하나씩 데이터를 read 시키고 있다.
@Override
public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
log.info("Reading the information of the next book");
if (bookDataIsNotInitialized()) { // 초기 데이터가 없다면 호출
bookData = fetchBookDataFromAPI(currentPage, numOfRows);
}
BookDTO nextBook = null;
if (nextBookIndex < bookData.size()) {
nextBook = bookData.get(nextBookIndex);
nextBookIndex += 1;
} else {
currentPage += 1;
nextBookIndex = 0;
bookData = null;
}
log.info("Found book: {}", nextBook);
return nextBook;
}
현재는 chunk를 10으로 지정해놨기 때문에 ItemWriter 부분에는 10개씩 모아서 처리하고 있는 것을 알 수 있다. 현재 ItemWriter는 로그만 찍도록 해놨다.
2022-08-15 19:25:44.085 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Fetching book data from an external API by using the url: http://api.kcisa.kr/openapi/service/rest/meta16/getNlTot?serviceKey=55ae7991-0659-438a-89d5-cd3e13defa92&numOfRows=100&pageNo=1
2022-08-15 19:25:46.377 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Found book: com.ckstn0777.batch.dto.BookDTO@e7d0db2
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Reading the information of the next book
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Found book: com.ckstn0777.batch.dto.BookDTO@26bce60d
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Reading the information of the next book
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Found book: com.ckstn0777.batch.dto.BookDTO@76eadc5a
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Reading the information of the next book
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Found book: com.ckstn0777.batch.dto.BookDTO@1e9d7366
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Reading the information of the next book
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Found book: com.ckstn0777.batch.dto.BookDTO@20914835
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Reading the information of the next book
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Found book: com.ckstn0777.batch.dto.BookDTO@615c4ea4
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Reading the information of the next book
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Found book: com.ckstn0777.batch.dto.BookDTO@417446d9
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Reading the information of the next book
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Found book: com.ckstn0777.batch.dto.BookDTO@34070bd2
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Reading the information of the next book
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Found book: com.ckstn0777.batch.dto.BookDTO@1e7d3d87
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Reading the information of the next book
2022-08-15 19:25:46.379 INFO 2285 --- [ main] com.ckstn0777.batch.job.RESTBookReader : Found book: com.ckstn0777.batch.dto.BookDTO@1dd76982
2022-08-15 19:25:46.380 INFO 2285 --- [ main] com.ckstn0777.batch.job.BookWriter : Writing books: [com.ckstn0777.batch.dto.BookDTO@e7d0db2, com.ckstn0777.batch.dto.BookDTO@26bce60d, com.ckstn0777.batch.dto.BookDTO@76eadc5a, com.ckstn0777.batch.dto.BookDTO@1e9d7366, com.ckstn0777.batch.dto.BookDTO@20914835, com.ckstn0777.batch.dto.BookDTO@615c4ea4, com.ckstn0777.batch.dto.BookDTO@417446d9, com.ckstn0777.batch.dto.BookDTO@34070bd2, com.ckstn0777.batch.dto.BookDTO@1e7d3d87, com.ckstn0777.batch.dto.BookDTO@1dd76982]
ItemReader를 변경시켜서 currentPage, numOfRows와 totalPage를 비교시켜서 계속해서 데이터를 가져올 수 있도록 처리하였다.
@Override
public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
log.info("Reading the information of the next book");
if (bookDataIsNotInitialized()) { // 초기 데이터가 없다면 호출
bookData = fetchBookDataFromAPI(currentPage, numOfRows);
}
BookDTO nextBook = null;
if (nextBookIndex < bookData.size()) {
nextBook = bookData.get(nextBookIndex);
nextBookIndex += 1;
// 다음 내용이 없다면 currentPage를 증가시키는데.. totalPage보다 넘는 경우는 종료시킴
if (nextBookIndex == bookData.size()) {
if (totalPage <= currentPage * numOfRows) return null;
currentPage += 1;
nextBookIndex = 0;
bookData = null;
}
}
log.info("Found book: {}", nextBook);
return nextBook;
}
이제 데이터를 저장시켜야 되는데 … BookDTO와 Book 엔티티간에 차이를 메우기 위해 ItemProcessor를 추가해주었다. 그리고 나서 JPA 방식으로 데이터베이스에 저장을 위해 EntityManagerFactory를 가져와서 setEntityManagerFactory에 넣어주었다.
(아 근데 원래는 BookWriter.java 파일로 분리하려고 했는데 왜 분리가 잘 안되는지… )
@Slf4j
@RequiredArgsConstructor
@Configuration
public class SimpleJobConfiguration {
private static final String PROPERTY_REST_API_URL = "rest.api.url"; // api 요청 url
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job simpleJob(Step collectStep) { // 이런식으로 의존성 주입을 받을 수도 있구나
return jobBuilderFactory.get("simpleJob")
.start(collectStep)
.build();
}
@Bean
@JobScope
public Step collectStep(ItemReader<BookDTO> reader, JpaItemWriter<Book> writer) {
return stepBuilderFactory.get("collectStep")
.<BookDTO, Book>chunk(10)
.reader(reader)
.processor(jpaItemProcessor())
.writer(writer)
.build();
}
@Bean
public ItemReader<BookDTO> reader(Environment environment, RestTemplate restTemplate) {
// Rest API 로 데이터를 가져온다.
return new RESTBookReader(environment.getRequiredProperty(PROPERTY_REST_API_URL),
restTemplate);
}
@Bean
public ItemProcessor<BookDTO, Book> jpaItemProcessor() {
// 가져온 데이터를 적절히 가공해준다.
return BookDTO -> Book.builder()
.title(BookDTO.getTitle())
.author(BookDTO.getPerson())
.build();
}
@Bean
public JpaItemWriter<Book> writer() {
// 데이터베이스에 저장한다.
JpaItemWriter<Book> jpaItemWriter = new JpaItemWriter<>();
jpaItemWriter.setEntityManagerFactory(entityManagerFactory);
return jpaItemWriter;
}
}
그리고 나서 JPA 관련 설정을 넣어준다.
# jpa
spring.jpa.show-sql=true
spring.jpa.generate-ddl=true
spring.jpa.hibernate.ddl-auto=create
spring.jpa.properties.hibernate.format_sql=true
실행 결과를 보면 insert가 한번에 날아가지만 결국 하나씩 실행되는듯하다. 어쨌거나 드디어 성공…!
Hibernate:
insert
into
book
(author, title)
values
(?, ?)
Hibernate:
insert
into
book
(author, title)
values
(?, ?)
...
대충 numOfRows를 1000으로 늘리고 돌려봤는데 순식간에 77000건이 쌓였다. 오 너무 좋아. 이제 해당 데이터베이스 내용을 긁어다가 엘라스틱에 bulk하는 작업만 남았군.
근데 지금은 한 테이블에 넣으니까 그렇지, 좀 더 복잡한 구조를 가진 Entity에다가 넣으려고 하면 그것도 더 알아봐야 할 듯. ㅇㅇ 근데 못할거는 아닌듯
깃허브 전체 코드 : https://github.com/ckstn0777/elasticsearch-test
작성자님 글 덕분에 많은 도움이 됐습니다 ~
맞는지 모르겠으나
if (totalPage <= currentPage * numOfRows) return null;
위 처럼 넣게 되면 마지막 인덱스는 저장이 안되지 않을까요 ??