보통 JPA 기반으로 프로젝트를 많이 진행하고 있기 때문에 batch에서 JPA 기반 ItemReader를 살펴보자. spring-batch에서 제공해주는 구현체를 이해하고 사용하면, 언젠가 좀 더 내 상황에 맞게 customizing 하기 편리할 거 같기 때문이다.
spring-batch에서 제공해주는 JPA 기반 ItemReader 구현체는 JpaPagingItemReader와 JpaCursorItemReader가 있다. 먼저 객체 이름에서 Paging과 Cursor가 눈에 띈다. 두 방식의 차이는 아래와 같다.
위 내용은 아래 블로그를 참고하였다.
참고 : https://jojoldu.tistory.com/336
JpaCursorItemReader 객체 구조는 아래와 같다.

제일 위에서부터 살펴보자.
@Override
	@SuppressWarnings("unchecked")
	protected void doOpen() throws Exception {
		this.entityManager = this.entityManagerFactory.createEntityManager();
		if (this.entityManager == null) {
			throw new DataAccessResourceFailureException("Unable to create an EntityManager");
		}
		if (this.queryProvider != null) {
			this.queryProvider.setEntityManager(this.entityManager);
		}
		Query query = createQuery();
		if (this.parameterValues != null) {
			this.parameterValues.forEach(query::setParameter);
		}
		this.iterator = query.getResultStream().iterator();
	}
	private Query createQuery() {
		if (this.queryProvider == null) {
			return this.entityManager.createQuery(this.queryString);
		}
		else {
			return this.queryProvider.createQuery();
		}
	}@Override
	protected T doRead() {
		return this.iterator.hasNext() ? this.iterator.next() : null;
	}@Override
	protected void doClose() {
		if (this.entityManager != null) {
			this.entityManager.close();
		}
	}클래스 다이어그램은 그리는 법은 아래 블로그에서 참조했다.
JpaPagingItemReader 객체 구조는 아래와 같다.

@Override
protected void doOpen() throws Exception {
    Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first");
    initialized = true;
}@Nullable
	@Override
	protected T doRead() throws Exception {
		synchronized (lock) {
			if (results == null || current >= pageSize) {
				if (logger.isDebugEnabled()) {
					logger.debug("Reading page " + getPage());
				}
				doReadPage();
				page++;
				if (current >= pageSize) {
					current = 0;
				}
			}
			int next = current++;
			if (next < results.size()) {
				return results.get(next);
			}
			else {
				return null;
			}
		}
	}@Override
protected void doClose() throws Exception {
    synchronized (lock) {
        initialized = false;
        current = 0;
        page = 0;
        results = null;
    }
}@Override
	protected void doOpen() throws Exception {
		super.doOpen();
		entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
		if (entityManager == null) {
			throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
		}
		// set entityManager to queryProvider, so it participates
		// in JpaPagingItemReader's managed transaction
		if (queryProvider != null) {
			queryProvider.setEntityManager(entityManager);
		}
	}@Override
@SuppressWarnings("unchecked")
protected void doReadPage() {
    EntityTransaction tx = null;
    if (transacted) {
        tx = entityManager.getTransaction();
        tx.begin();
        entityManager.flush();
        entityManager.clear();
    }
    Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
    if (parameterValues != null) {
        for (Map.Entry<String, Object> me : parameterValues.entrySet()) {
            query.setParameter(me.getKey(), me.getValue());
        }
    }
    if (results == null) {
        results = new CopyOnWriteArrayList<>();
    } else {
        results.clear();
    }
    if (!transacted) {
        List<T> queryResult = query.getResultList();
        for (T entity : queryResult) {
            entityManager.detach(entity);
            results.add(entity);
        }
    } else {
        results.addAll(query.getResultList());
        tx.commit();
    }
}volatile : volatile이 붙은 변수는 메인 메모리에 직접 저장되고 읽히게 된다. 즉, 스레드가 변수의 값을 캐시하지 않고 항상 메인 메모리에서 읽고 쓰도록 강제하여 변수의 값을 스레드 간에 일관되게 유지한다.
public class JpaCursorItemReaderExample {
    public JpaCursorItemReader<Person> createReader(EntityManagerFactory entityManagerFactory) {
        Map<String, Object> params = new HashMap<>();
        params.put("status", "ACTIVE");
        return new JpaCursorItemReaderBuilder<Person>()
            .name("personReader")
            .entityManagerFactory(entityManagerFactory)
            .queryString("SELECT p FROM Person p WHERE p.status = :status")
            .parameterValues(params)
            .saveState(true)
            .currentItemCount(0)
            .maxItemCount(100)
            .build();
    }
}@Configuration
public class BatchConfiguration {
    @Bean
    public CustomJpaQueryProvider customJpaQueryProvider() {
        CustomJpaQueryProvider provider = new CustomJpaQueryProvider();
        provider.setBaseQuery("SELECT e FROM Employee e");
        return provider;
    }
    @Bean
    public JpaPagingItemReader<Employee> jpaPagingItemReader(EntityManagerFactory entityManagerFactory,
                                                            CustomJpaQueryProvider customJpaQueryProvider) {
        Map<String, Object> params = new HashMap<>();
        params.put("status", "ACTIVE");
        return new JpaPagingItemReaderBuilder<Employee>()
            .name("employeeReader")
            .entityManagerFactory(entityManagerFactory)
            .queryProvider(customJpaQueryProvider)
            .parameterValues(params)
            .pageSize(10)
            .build();
    }
}