보통 JPA 기반으로 프로젝트를 많이 진행하고 있기 때문에 batch에서 JPA 기반 ItemReader를 살펴보자. spring-batch에서 제공해주는 구현체를 이해하고 사용하면, 언젠가 좀 더 내 상황에 맞게 customizing 하기 편리할 거 같기 때문이다.
spring-batch에서 제공해주는 JPA 기반 ItemReader 구현체는 JpaPagingItemReader와 JpaCursorItemReader가 있다. 먼저 객체 이름에서 Paging과 Cursor가 눈에 띈다. 두 방식의 차이는 아래와 같다.
위 내용은 아래 블로그를 참고하였다.
참고 : https://jojoldu.tistory.com/336
JpaCursorItemReader 객체 구조는 아래와 같다.
제일 위에서부터 살펴보자.
@Override
@SuppressWarnings("unchecked")
protected void doOpen() throws Exception {
this.entityManager = this.entityManagerFactory.createEntityManager();
if (this.entityManager == null) {
throw new DataAccessResourceFailureException("Unable to create an EntityManager");
}
if (this.queryProvider != null) {
this.queryProvider.setEntityManager(this.entityManager);
}
Query query = createQuery();
if (this.parameterValues != null) {
this.parameterValues.forEach(query::setParameter);
}
this.iterator = query.getResultStream().iterator();
}
private Query createQuery() {
if (this.queryProvider == null) {
return this.entityManager.createQuery(this.queryString);
}
else {
return this.queryProvider.createQuery();
}
}
@Override
protected T doRead() {
return this.iterator.hasNext() ? this.iterator.next() : null;
}
@Override
protected void doClose() {
if (this.entityManager != null) {
this.entityManager.close();
}
}
클래스 다이어그램은 그리는 법은 아래 블로그에서 참조했다.
JpaPagingItemReader 객체 구조는 아래와 같다.
@Override
protected void doOpen() throws Exception {
Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first");
initialized = true;
}
@Nullable
@Override
protected T doRead() throws Exception {
synchronized (lock) {
if (results == null || current >= pageSize) {
if (logger.isDebugEnabled()) {
logger.debug("Reading page " + getPage());
}
doReadPage();
page++;
if (current >= pageSize) {
current = 0;
}
}
int next = current++;
if (next < results.size()) {
return results.get(next);
}
else {
return null;
}
}
}
@Override
protected void doClose() throws Exception {
synchronized (lock) {
initialized = false;
current = 0;
page = 0;
results = null;
}
}
@Override
protected void doOpen() throws Exception {
super.doOpen();
entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
if (entityManager == null) {
throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
}
// set entityManager to queryProvider, so it participates
// in JpaPagingItemReader's managed transaction
if (queryProvider != null) {
queryProvider.setEntityManager(entityManager);
}
}
@Override
@SuppressWarnings("unchecked")
protected void doReadPage() {
EntityTransaction tx = null;
if (transacted) {
tx = entityManager.getTransaction();
tx.begin();
entityManager.flush();
entityManager.clear();
}
Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
if (parameterValues != null) {
for (Map.Entry<String, Object> me : parameterValues.entrySet()) {
query.setParameter(me.getKey(), me.getValue());
}
}
if (results == null) {
results = new CopyOnWriteArrayList<>();
} else {
results.clear();
}
if (!transacted) {
List<T> queryResult = query.getResultList();
for (T entity : queryResult) {
entityManager.detach(entity);
results.add(entity);
}
} else {
results.addAll(query.getResultList());
tx.commit();
}
}
volatile : volatile이 붙은 변수는 메인 메모리에 직접 저장되고 읽히게 된다. 즉, 스레드가 변수의 값을 캐시하지 않고 항상 메인 메모리에서 읽고 쓰도록 강제하여 변수의 값을 스레드 간에 일관되게 유지한다.
public class JpaCursorItemReaderExample {
public JpaCursorItemReader<Person> createReader(EntityManagerFactory entityManagerFactory) {
Map<String, Object> params = new HashMap<>();
params.put("status", "ACTIVE");
return new JpaCursorItemReaderBuilder<Person>()
.name("personReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT p FROM Person p WHERE p.status = :status")
.parameterValues(params)
.saveState(true)
.currentItemCount(0)
.maxItemCount(100)
.build();
}
}
@Configuration
public class BatchConfiguration {
@Bean
public CustomJpaQueryProvider customJpaQueryProvider() {
CustomJpaQueryProvider provider = new CustomJpaQueryProvider();
provider.setBaseQuery("SELECT e FROM Employee e");
return provider;
}
@Bean
public JpaPagingItemReader<Employee> jpaPagingItemReader(EntityManagerFactory entityManagerFactory,
CustomJpaQueryProvider customJpaQueryProvider) {
Map<String, Object> params = new HashMap<>();
params.put("status", "ACTIVE");
return new JpaPagingItemReaderBuilder<Employee>()
.name("employeeReader")
.entityManagerFactory(entityManagerFactory)
.queryProvider(customJpaQueryProvider)
.parameterValues(params)
.pageSize(10)
.build();
}
}