[Spring-batch] Jpa ItemReader

오늘내일·2024년 6월 27일
0

보통 JPA 기반으로 프로젝트를 많이 진행하고 있기 때문에 batch에서 JPA 기반 ItemReader를 살펴보자. spring-batch에서 제공해주는 구현체를 이해하고 사용하면, 언젠가 좀 더 내 상황에 맞게 customizing 하기 편리할 거 같기 때문이다.

spring-batch에서 제공해주는 JPA 기반 ItemReader 구현체는 JpaPagingItemReader와 JpaCursorItemReader가 있다. 먼저 객체 이름에서 Paging과 Cursor가 눈에 띈다. 두 방식의 차이는 아래와 같다.

  • Cursor 방식 : DB와 커넥션을 맺은 후, Cursor를 한 칸씩 옮기면서 데이터를 읽어온다. 즉, 한번에 1Row씩 읽어오는 방식이다.
  • Paging 방식 : 한번에 개발자가 지정한 PageSize만큼 데이터를 읽어온다. 예를 들어 PageSize를 10으로 설정한다면 한번에 10Row씩 읽어오는 방식이다.

위 내용은 아래 블로그를 참고하였다.

참고 : https://jojoldu.tistory.com/336

JpaCursorItemReader

JpaCursorItemReader 객체 구조는 아래와 같다.

제일 위에서부터 살펴보자.

  • @FunctionalInterface : 람다식을 사용 가능하게 하는 어노테이션이다.
  • ItemReader(인터페이스) : read 메서드가 있는 것으로 보아 직접 데이터를 읽어오는 작업과 관련한 인터페이스이다.
  • ItemStream (인터페이스) : batch 작업 실행상태에 관련한 인터페이스이다.
  • ItemStreamSupport(추상클래스) :
    • ItemStream 인터페이스의 구현체로 executionContext(Job이나 Step의 실행 상태를 저장하고 복구하는데 사용)를 지원하는 ExecutionContextUserSupport 객체의 이름을 정하고 가져오는 메서드를 구현했다.(open, update, close 메서드는 deprecated된 상태다)
  • AbstractItemStreamItemReader : 별다른 기능이 없는지 필드, 메서드가 전무하다.
  • AbstractItemCountingItemStreamItemReader :
    • read() 메서드를 통해 아이템을 읽을 때마다 읽은 아이템의 개수를 추적한다.
    • open() 메서드를 통해 스트림을 열고 리소스를 초기화한다. 초기화 시 executionContext로부터 현재까지 읽은 아이템의 개수를 호출하여 복구할 수 있다.
    • close()메서드를 통해 스트림을 닫는 기능을 제공한다.
    • update() 메서드를 통해 현재 읽은 아이템의 개수를 executionContext에 저장한다.
    • 아이템의 개수를 기반으로 스트림의 상태를 관리하여, 배치 잡의 중단과 재시작을 지원한다.
    • 추상 메서드
      • doRead(), doOpen(), doClose() 를 통해 서브클래스에서 구체적인 읽기, 열기, 닫기 로직을 구현할 수 있다.
  • JpaCursorItemReader :
    • JpaCursorItemReader에서는 위 AbstractItemCountingItemStreamItemReader의 추상메서드인 doOpen(), doRead(), doClose()를 구현하고 있다.
    • doOpen() 메서드
      @Override
      	@SuppressWarnings("unchecked")
      	protected void doOpen() throws Exception {
      		this.entityManager = this.entityManagerFactory.createEntityManager();
      		if (this.entityManager == null) {
      			throw new DataAccessResourceFailureException("Unable to create an EntityManager");
      		}
      		if (this.queryProvider != null) {
      			this.queryProvider.setEntityManager(this.entityManager);
      		}
      		Query query = createQuery();
      		if (this.parameterValues != null) {
      			this.parameterValues.forEach(query::setParameter);
      		}
      		this.iterator = query.getResultStream().iterator();
      	}
      
      	private Query createQuery() {
      		if (this.queryProvider == null) {
      			return this.entityManager.createQuery(this.queryString);
      		}
      		else {
      			return this.queryProvider.createQuery();
      		}
      	}
      • entityManager를 설정
      • queryProvider 또는 queryString을 통해서 query 생성
      • query에 파라미터 설정
      • query실행결과를 Iterator로 반환
    • doRead() 메서드
      @Override
      	protected T doRead() {
      		return this.iterator.hasNext() ? this.iterator.next() : null;
      	}
      • Iterator를 하나씩 순회하며 데이터를 읽는다.
    • doClose() 메서드
      @Override
      	protected void doClose() {
      		if (this.entityManager != null) {
      			this.entityManager.close();
      		}
      	}
      • entityManager를 닫아준다.

클래스 다이어그램은 그리는 법은 아래 블로그에서 참조했다.

JpaPagingItemReader

JpaPagingItemReader 객체 구조는 아래와 같다.

  • 객체 구조를 살펴보면 AbstractItemCountingItemStreamItemReader 객체까지는 JpaCursorItemReader객체와 구조가 동일하다. 따라서 달라지는 AbstractPagingItemReader 객체부터 살펴보자.
  • AbstractPagingItemReader : 주요 메서드 및 필드를 살펴보자.
    • initialized(초기화 플래그), pageSize(한 번에 읽어올 데이터의 개수), current(현재 페이지 내에서의 현재 위치를 나타내는 인덱스), page(페이지 번호), results(현재 페이지의 읽어온 List), lock(동기화를 위한 객체) 필드는 전부 페이지로 데이터를 읽어올 때 필요한 필드들로 volatile(변수의 값을 여러 스레드 간에 일관되게 유지하기 위해 사용되는 키워드) 로 관리된다.
    • doOpen()
      @Override
      protected void doOpen() throws Exception {
      
          Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first");
          initialized = true;
      
      }
      • reader를 열 때 초기화 상태를 체크하고 초기화 플래그를 true로 설정한다.
    • doRead()
      @Nullable
      	@Override
      	protected T doRead() throws Exception {
      
      		synchronized (lock) {
      
      			if (results == null || current >= pageSize) {
      
      				if (logger.isDebugEnabled()) {
      					logger.debug("Reading page " + getPage());
      				}
      
      				doReadPage();
      				page++;
      				if (current >= pageSize) {
      					current = 0;
      				}
      
      			}
      
      			int next = current++;
      			if (next < results.size()) {
      				return results.get(next);
      			}
      			else {
      				return null;
      			}
      
      		}
      
      	}
      • 페이지 관리를 위해 동기화 블럭 안에서 page, current 등을 관리한다. 실제로 데이터를 읽어오는 메서드는 추상 메서드 doReadPage()로 서브 객체에서 구현하도록 되어있다.
    • doClose()
      @Override
      protected void doClose() throws Exception {
      
          synchronized (lock) {
              initialized = false;
              current = 0;
              page = 0;
              results = null;
          }
      
      }
      • 초기화 플래그를 해제하고, 현재 위치와 페이지를 초기화하며, 결과 리스트를 null로 설정한다.
  • JpaPagingItemReader
    • doOpen()
      @Override
      	protected void doOpen() throws Exception {
      		super.doOpen();
      
      		entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
      		if (entityManager == null) {
      			throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
      		}
      		// set entityManager to queryProvider, so it participates
      		// in JpaPagingItemReader's managed transaction
      		if (queryProvider != null) {
      			queryProvider.setEntityManager(entityManager);
      		}
      
      	}
      • 부모 객체인 AbstractPagingItemReader에서 구현한 doOpen() 메서드에 추가적으로 entityManager와 queryProvider를 설정한다.
    • doReadPage()
      @Override
      @SuppressWarnings("unchecked")
      protected void doReadPage() {
      
          EntityTransaction tx = null;
      
          if (transacted) {
              tx = entityManager.getTransaction();
              tx.begin();
      
              entityManager.flush();
              entityManager.clear();
          }
      
          Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
      
          if (parameterValues != null) {
              for (Map.Entry<String, Object> me : parameterValues.entrySet()) {
                  query.setParameter(me.getKey(), me.getValue());
              }
          }
      
          if (results == null) {
              results = new CopyOnWriteArrayList<>();
          } else {
              results.clear();
          }
      
          if (!transacted) {
              List<T> queryResult = query.getResultList();
              for (T entity : queryResult) {
                  entityManager.detach(entity);
                  results.add(entity);
              }
          } else {
              results.addAll(query.getResultList());
              tx.commit();
          }
      }
      • AbstractPagingItemReader에서 구현하지 않았던 doReadPage() 메서드를 JpaPagingItemReader에서 구현하고 있다.
      • 트랜잭션 관리를 위해 트랜잭션을 사용할 지 여부의 정보를 담고있는 flag를 transacted를 사용한다.
      • 트랜잭션을 사용한다면 entityManager의 flush() 메서드를 통해 변경 사항을 DB에 저장하고, clear() 메서드를 통해 영속성 컨텍스를 비움으로 모든 엔티티들을 분리 상태(DB와 동기화 되지 않도록)로 만든다.
      • page와 pageSize를 정보를 가지고 있는 query 생성
      • query에 파라미터 설정
      • query의 결과를 받을 results 변수가 null 이면 CopyOnWriteArrayList(읽기와 쓰기를 동시에 안전하게 수행할 수 있도록 설계된 리스트)로 초기화하고, null 이 아니면 clear()를 통해 비운다.
      • 트랜잭션을 사용하지 않으면 query의 실행결과를 entityManager를 통해 전부 detach상태로 만들고 results에 entity를 저장한다.
      • 트랜잭션을 사용하면 results에 query 실행결과를 바로 저장하고 트랜잭션을 commit한다.

volatile : volatile이 붙은 변수는 메인 메모리에 직접 저장되고 읽히게 된다. 즉, 스레드가 변수의 값을 캐시하지 않고 항상 메인 메모리에서 읽고 쓰도록 강제하여 변수의 값을 스레드 간에 일관되게 유지한다.

JpaCursorItemReaderBuilder

  • JpaCursorItemReader 객체는 아래와 같이 JpaCursorItemReaderBuilder 객체를 사용하여 필요한 값을 설정하여 객체를 생성할 수 있다. JpaCursorItemReaderBuilder의 주요 메서드를 살펴보자.
    public class JpaCursorItemReaderExample {
        public JpaCursorItemReader<Person> createReader(EntityManagerFactory entityManagerFactory) {
            Map<String, Object> params = new HashMap<>();
            params.put("status", "ACTIVE");
    
            return new JpaCursorItemReaderBuilder<Person>()
                .name("personReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("SELECT p FROM Person p WHERE p.status = :status")
                .parameterValues(params)
                .saveState(true)
                .currentItemCount(0)
                .maxItemCount(100)
                .build();
        }
    }
    • name() : reader의 이름을 설정
    • entityManagerFactory() : entityManagerFactory 설정
    • queryString() : 실행할 query 문 설정
    • parameterValues() : query 파라미터 설정(Map을 사용)
    • saveState() : reader의 상태를 저장할지 여부를 설정(기본값 : true)
    • currentItemCount() : reader가 시작할 아이템의 인덱스를 설정(기본값 : 0)
    • maxItemCount() : 읽어올 최대 아이템 수를 설정 (기본값 : Integer.MAX_VALUE)
    • queryProvider() : 동적으로 쿼리를 생성하기 위한 queryProvider 설정

JpaPagingItemReaderBuilder

  • JpaPagingtemReader 객체 또한 아래와 같이 JpaPagingItemReaderBuilder 객체를 사용하여 필요한 값을 설정하여 객체를 생성할 수 있다. JpaPagingItemReaderBuilder의 주요 메서드는 JpaCursorItemReaderBuilder의 메서드와 동일한 부분이 많다. JpaPagingItemReaderBuilder만 가지고 있는 메서드 위주로 살펴보자.
    @Configuration
    public class BatchConfiguration {
    
        @Bean
        public CustomJpaQueryProvider customJpaQueryProvider() {
            CustomJpaQueryProvider provider = new CustomJpaQueryProvider();
            provider.setBaseQuery("SELECT e FROM Employee e");
            return provider;
        }
    
        @Bean
        public JpaPagingItemReader<Employee> jpaPagingItemReader(EntityManagerFactory entityManagerFactory,
                                                                CustomJpaQueryProvider customJpaQueryProvider) {
            Map<String, Object> params = new HashMap<>();
            params.put("status", "ACTIVE");
    
            return new JpaPagingItemReaderBuilder<Employee>()
                .name("employeeReader")
                .entityManagerFactory(entityManagerFactory)
                .queryProvider(customJpaQueryProvider)
                .parameterValues(params)
                .pageSize(10)
                .build();
        }
    }
    • pageSize() : 한 페이지에 읽어올 데이터의 개수(기본값 : 10)
    • transacted() : 트랜잭션 사용 여부(기본값 : true)
profile
다시 시작합니다.

0개의 댓글