Item Writer

문지은·2022년 2월 10일
0

스프링 데이터의 ItemWriter

몽고 DB

스프링 배치에서는 MongoDBItemWriter를 제공하여 몽고 DB 컬렉션에 객체를 문서로 저장한다.
순서는 다음과 같다.

먼저 순수 Customer 객체를 생성한다.
그리고 몽고 DB를 사용하기 위해 spring-boot-starter-data-mongodb 의존성을 추가한다.
몽고 DB를 가리키도록 application.yml에서 spring.data.mongodb.database 프로퍼티를 수정한다.
매핑은 도메인 객체의 어노테이션을 이용해 처리한다.
그리고 mongoDB의 Job 코드를 보자.

@Configuration
public class MongoImportJob {

	private JobBuilderFactory jobBuilderFactory;

	private StepBuilderFactory stepBuilderFactory;

	public MongoImportJob(JobBuilderFactory jobBuilderFactory,
			StepBuilderFactory stepBuilderFactory) {

		this.jobBuilderFactory = jobBuilderFactory;
		this.stepBuilderFactory = stepBuilderFactory;
	}

	@Bean
	@StepScope
	public FlatFileItemReader<Customer> customerFileReader(
			@Value("#{jobParameters['customerFile']}")Resource inputFile) {
			@Value("#{jobParameters['customerFile']}")Resource inputFile) {

		return new FlatFileItemReaderBuilder<Customer>()
				.name("customerFileReader")
				.resource(inputFile)
				.delimited()
				.names(new String[] {"firstName",
						"middleInitial",
						"lastName",
						"address",
						"city",
						"state",
						"zip"})
				.targetType(Customer.class)
				.build();
	}

	@Bean
	public MongoItemWriter<Customer> mongoItemWriter(MongoOperations mongoTemplate) {
		return new MongoItemWriterBuilder<Customer>()
				.collection("customers")	// 데이터 베이스의 컬렉션 이름 구성
				.template(mongoTemplate)	// MongoOperations 인스턴스 제공
				.build();
                // deleteFlag를 사용하여 매칭되는 아이템을 삭제할지 저장할지 정할 수 있다.
	}

	@Bean
	public Step mongoFormatStep() throws Exception {
		return this.stepBuilderFactory.get("mongoFormatStep")
				.<Customer, Customer>chunk(10)
				.reader(customerFileReader(null))
				.writer(mongoItemWriter(null))
				.build();
	}

	@Bean
	public Job mongoFormatJob() throws Exception {
		return this.jobBuilderFactory.get("mongoFormatJob")
				.start(mongoFormatStep())
				.build();
	}
}

이 때 몽고 DB에 대한 논쟁이 있다. 몽고 DB가 ACID트랜잭션을 지원하지 않는 것이다. 이 때문에 스프링 배치는 트랜잭션을 지원하지 않는 다른 데이터 저장소처럼 몽고 DB를 취급해 커밋이 발생하기 직전까지 쓰기를 버퍼링하고 가장 마지막 순간에 실제 쓰기 작업을 수행한다.

네오4j

네오4j는 그래프 데이터베이스로 몽고 DB와 완전히 다른 NoSQL 저장소이다. 이를 사용하기 위해서 스프링 배치는 Neo4jItemWriter를 제공한다.

먼저 Customer 클래스에 @NodeEntity를 붙여서 해당 클래스가 그래프의 노드를 나타냄을 ItemWriter에게 알린다.
@Relationship 을 이용하면 노드 간의 관계를 매핑할 수도 있다.
네오4j는 id로 UUID를 사용한다.

@Id
@GeneratedValue(strategy = UuidStrategy.class)
private UUID id;

그 다음엔 spring-boot-start-data-neo4j 의존성을 추가하고 application.yml에 데이터베이스 정보를 추가한다.

@Configuration
public class Neo4jImportJob {

	private JobBuilderFactory jobBuilderFactory;

	private StepBuilderFactory stepBuilderFactory;

	public Neo4jImportJob(JobBuilderFactory jobBuilderFactory,
			StepBuilderFactory stepBuilderFactory) {

		this.jobBuilderFactory = jobBuilderFactory;
		this.stepBuilderFactory = stepBuilderFactory;
	}

	@Bean
	@StepScope
	public FlatFileItemReader<Customer> customerFileReader(
			@Value("#{jobParameters['customerFile']}")Resource inputFile) {

		return new FlatFileItemReaderBuilder<Customer>()
				.name("customerFileReader")
				.resource(inputFile)
				.delimited()
				.names(new String[] {"firstName",
						"middleInitial",
						"lastName",
						"address",
						"city",
						"state",
						"zip"})
				.targetType(Customer.class)
				.build();
	}

	@Bean
	public Neo4jItemWriter<Customer> neo4jItemWriter(SessionFactory sessionFactory) {
		return new Neo4jItemWriterBuilder<Customer>()
				.sessionFactory(sessionFactory)	// 세션 팩토리만 제공하면 된다.
				.build();
	}

	@Bean
	public Step neo4jFormatStep() throws Exception {
		return this.stepBuilderFactory.get("neo4jFormatStep")
				.<Customer, Customer>chunk(10)
				.reader(customerFileReader(null))
				.writer(neo4jItemWriter(null))
				.build();
	}

	@Bean
	public Job neo4jFormatJob() throws Exception {
		return this.jobBuilderFactory.get("neo4jFormatJob")
				.start(neo4jFormatStep())
				.build();
	}
}

피보탈 젬파이어와 아파치 지오드

피보탈 젬파이어는 인메모리 데이터 그리드로 키-값 저장소이며 HashMap이다. 그리고 아파치 지오드는 피보탈 젬파이어의 오픈소스 버전이다. 또한 캐시를 사용하여 검색 속도를 높인다.

@Region(value = "Customers")를 Customer 클래스에 추가해서 값이 저장될 리전임을 알린다. 그리고 마찬가지로 데이터 젬파이어와 스프링 셸 의존성을 추가한다. 이 때 젬파이어 로깅과의 충돌을 막기 위해서 스프링의 로깅 의존성을 제외한다.

@Configuration
// 애플리케이션 내에서 피보탈 젬파이어 서비스를 부트스트랩하기 위해
@PeerCacheApplication(name = "AccessingDataGemFireApplication", logLevel = "info")
public class GemfireImportJob {

	private JobBuilderFactory jobBuilderFactory;

	private StepBuilderFactory stepBuilderFactory;

	public GemfireImportJob(JobBuilderFactory jobBuilderFactory,
			StepBuilderFactory stepBuilderFactory) {

		this.jobBuilderFactory = jobBuilderFactory;
		this.stepBuilderFactory = stepBuilderFactory;
	}

	@Bean
	@StepScope
	public FlatFileItemReader<Customer> customerFileReader(
			@Value("#{jobParameters['customerFile']}")Resource inputFile) {

		return new FlatFileItemReaderBuilder<Customer>()
				.name("customerFileReader")
				.resource(inputFile)
				.delimited()
				.names(new String[] {"firstName",
						"middleInitial",
						"lastName",
						"address",
						"city",
						"state",
						"zip"})
				.targetType(Customer.class)
				.build();
	}

	@Bean
	public GemfireItemWriter<Long, Customer> gemfireItemWriter(GemfireTemplate gemfireTemplate) {
		return new GemfireItemWriterBuilder<Long, Customer>()
				.template(gemfireTemplate)	// 작성한 템플릿 넘겨줌
                // 피보탈 젬파이어에 저장될 아이템을 사용할 키로 변환
				.itemKeyMapper(new SpELItemKeyMapper<>(
						"firstName + middleInitial + lastName"))
				.build();
	}

	@Bean
	public Step gemfireFormatStep() throws Exception {
		return this.stepBuilderFactory.get("gemfireFormatStep")
				.<Customer, Customer>chunk(10)
				.reader(customerFileReader(null))
				.writer(gemfireItemWriter(null))
				.build();
	}

	@Bean
	public Job gemfireFormatJob() throws Exception {
		return this.jobBuilderFactory.get("gemfireFormatJob")
				.start(gemfireFormatStep())
				.build();
	}

	@Bean(name="customer")
	public Region<Long, Customer> getCustomer(final GemFireCache cache) throws Exception {
		LocalRegionFactoryBean<Long, Customer> customerRegion = new LocalRegionFactoryBean<>();
		customerRegion.setCache(cache);
		customerRegion.setName("customer");
		customerRegion.afterPropertiesSet();
		Region<Long, Customer> object = customerRegion.getRegion();
		return object;
	}

	// itemWriter에서 사용할 젬파이어 템플릿 생성
	@Bean
	public GemfireTemplate gemfireTemplate() throws Exception {
		return new GemfireTemplate(getCustomer(null));
	}


	// 결과를 확인할 GUI가 없기 때문에 커맨드 러너로 성공 여부 검증
	@Bean
	public CommandLineRunner validator(final GemfireTemplate gemfireTemplate) {
		return args -> {
			List<Object> customers = gemfireTemplate.find("select * from /customer").asList();

			for (Object customer : customers) {
				System.out.println(">> object: " + customer);
			}
		};
	}

}

리포지터리

데이터를 읽을 때는 PagingAndSortingRepository를 사용했지만 쓰기에는 페이징이나 정렬에 관해 걱정할 필요가 없으므로 CrudRepository를 사용한다.

@Entity, @Table 을 Customer 클래스에 추가한다.
그리고 리포지터리를 사용하기 위해서 CustomerRepository를 확장하는 인터페이스를 만든다.

@Configuration
// 사용할 도메인 객체
@EnableJpaRepositories(basePackageClasses = Customer.class)
public class RepositoryImportJob {

	private JobBuilderFactory jobBuilderFactory;

	private StepBuilderFactory stepBuilderFactory;

	public RepositoryImportJob(JobBuilderFactory jobBuilderFactory,
			StepBuilderFactory stepBuilderFactory) {

		this.jobBuilderFactory = jobBuilderFactory;
		this.stepBuilderFactory = stepBuilderFactory;
	}

	@Bean
	@StepScope
	public FlatFileItemReader<Customer> customerFileReader(
			@Value("#{jobParameters['customerFile']}")Resource inputFile) {

		return new FlatFileItemReaderBuilder<Customer>()
				.name("customerFileReader")
				.resource(inputFile)
				.delimited()
				.names(new String[] {"firstName",
						"middleInitial",
						"lastName",
						"address",
						"city",
						"state",
						"zip"})
				.targetType(Customer.class)
				.build();
	}

	@Bean
	public RepositoryItemWriter<Customer> repositoryItemWriter(CustomerRepository repository) {
		return new RepositoryItemWriterBuilder<Customer>()
				.repository(repository)		// 사용할 레포지토리
				.methodName("save")		// 사용할 함수
				.build();
	}

	@Bean
	public Step repositoryFormatStep() throws Exception {
		return this.stepBuilderFactory.get("repositoryFormatStep")
				.<Customer, Customer>chunk(10)
				.reader(customerFileReader(null))
				.writer(repositoryItemWriter(null))
				.build();
	}

	@Bean
	public Job repositoryFormatJob() throws Exception {
		return this.jobBuilderFactory.get("repositoryFormatJob")
				.start(repositoryFormatStep())
				.build();
	}
}

그밖의 출력 방식을 위한 ItemWriter

ItemWriterAdapter

ItemWriterAdapter를 사용해서 기존 서비스를 잡의 입력 소스로 사용할 수 있다.

	@Bean
	public ItemWriterAdapter<Customer> itemWriter(CustomerService customerService) {
		ItemWriterAdapter<Customer> customerItemWriterAdapter = new ItemWriterAdapter<>();

		customerItemWriterAdapter.setTargetObject(customerService);	// 사용할 클래스
		customerItemWriterAdapter.setTargetMethod("logCustomer");	// 사용할 메서드

		return customerItemWriterAdapter;
	}

PropertyExtractingDelegatingItemWriter

ItemWriterAdapter는 스프링 배치에서 읽은 아이템을 가져와서 기존 서비스에 전달하기만 하면 된다. 하지만 이렇게 간단한 경우는 드물다.
그렇기에 스프링 배치는 아이템에서 값을 추출한 후 이를 파라미터로 제공하는 메커니즘을 제공한다.

서비스 코드

//  이 파라미터만 쓰고 싶을 때!
public void logCustomerAddress(String address,
			String city,
			String state,
			String zip) {
		System.out.println(
				String.format("I just saved the address:\n%s\n%s, %s\n%s",
						address,
						city,
						state,
						zip));
	}

job 코드

	@Bean
	public PropertyExtractingDelegatingItemWriter<Customer> itemWriter(CustomerService customerService) {
		PropertyExtractingDelegatingItemWriter<Customer> itemWriter =
				new PropertyExtractingDelegatingItemWriter<>();

		itemWriter.setTargetObject(customerService);
		itemWriter.setTargetMethod("logCustomerAddress");
        // 필요한 필드만 추출
        // 파라미터와 동일한 순서
        // ItemWriter가 동적으로 아규먼트를 추출하기 때문에 arguments property 쓸 수 없다
		itemWriter.setFieldsUsedAsTargetMethodArguments(
				new String[] {"address", "city", "state", "zip"});

		return itemWriter;
	}

JsmItemWriter

JMS는 엔드 포인트 간의 통신하는 메세지 지향 방식이다. JmsItemWriter를 사용해서 JMS 큐에 메세지를 넣을 수 있다.
이 때 JMS 브로커를 사용해야 한다. 예제에서는 아파치의 액티브 MQ(간단한 인메모리 브로커)를 사용한다.

JMS 리소스 코드

	@Bean // Serialize message content to json using TextMessage
	public MessageConverter jacksonJmsMessageConverter() {
		MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
		converter.setTargetType(MessageType.TEXT);
		converter.setTypeIdPropertyName("_type");
		return converter;
	}

	// 스프링 부트가 JmsTemplate을 자동으로 구성해 제공하지만 예제에서는 CachingConnectionFactory를 사용하지 않으므로 직접 구성
	@Bean
	public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
		CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
		cachingConnectionFactory.afterPropertiesSet();

		JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
		jmsTemplate.setDefaultDestinationName("customers");
		jmsTemplate.setReceiveTimeout(5000L);

		return jmsTemplate;
	}

job 코드

	@Bean
	@StepScope
	public StaxEventItemWriter<Customer> xmlOutputWriter(
			@Value("#{jobParameters['outputFile']}") Resource outputFile) {
		
		Map<String, Class> aliases = new HashMap<>();
		aliases.put("customer", Customer.class);

		XStreamMarshaller marshaller = new XStreamMarshaller();
		marshaller.setAliases(aliases);

		return new StaxEventItemWriterBuilder<Customer>()
				.name("xmlOutputWriter")
				.resource(outputFile)
                // write할 도메인 객체 지정
				.marshaller(marshaller)
				.rootTagName("customers")
				.build();
	}

SimpleMailMessageItemWriter

스프링 배치를 이용해서 이메일을 보낼 수도 있다. SimpleMessageItemReader를 사용하는데 이 때 받아들이는 객체는 SimpleMailMessage를 확장한 객체의 목록이어야 한다. 하지만 아이템이 해당 클래스를 상속 받을 필요 없이 ItemProcessor에서 해당 객체로 변환해주면 된다.

Customer 객체에 이메일 필드를 추가한다.
그리고 자바 메일에 대한 의존성을 추가하고 메일 보내기 권한에 대한 설정을 application.yml에 추가한다.

예제에서는 csv file을 읽고 sql에 저장하기 -> sql을 읽고 메일 보내기 이렇게 두 개의 step으로 이루어져 있다.

@Configuration
public class EmailSendingJob {

	private JobBuilderFactory jobBuilderFactory;

	private StepBuilderFactory stepBuilderFactory;

	public EmailSendingJob(JobBuilderFactory jobBuilderFactory,
			StepBuilderFactory stepBuilderFactory) {

		this.jobBuilderFactory = jobBuilderFactory;
		this.stepBuilderFactory = stepBuilderFactory;
	}

	@Bean
	@StepScope
	public FlatFileItemReader<Customer> customerEmailFileReader(
			@Value("#{jobParameters['customerFile']}")Resource inputFile) {

		return new FlatFileItemReaderBuilder<Customer>()
				.name("customerFileReader")
				.resource(inputFile)
				.delimited()
				.names(new String[] {"firstName",
						"middleInitial",
						"lastName",
						"address",
						"city",
						"state",
						"zip",
						"email"})
				.targetType(Customer.class)
				.build();
	}

	@Bean
	public JdbcBatchItemWriter<Customer> customerBatchWriter(DataSource dataSource) {

		return new JdbcBatchItemWriterBuilder<Customer>()
				.namedParametersJdbcTemplate(new NamedParameterJdbcTemplate(dataSource))
				.sql("INSERT INTO CUSTOMER (first_name, middle_initial, last_name, address, city, state, zip, email) " +
						"VALUES(:firstName, :middleInitial, :lastName, :address, :city, :state, :zip, :email)")
				.beanMapped()
				.build();
	}

	@Bean
	public JdbcCursorItemReader<Customer> customerCursorItemReader(DataSource dataSource) {

		return new JdbcCursorItemReaderBuilder<Customer>()
				.name("customerItemReader")
				.dataSource(dataSource)
				.sql("select * from customer")
                // 커스텀 rowMapper를 사용하지 않고 데이터와 칼럼 이름으로 매핑
				.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
				.build();
	}

	@Bean
	public SimpleMailMessageItemWriter emailItemWriter(MailSender mailSender) {

		return new SimpleMailMessageItemWriterBuilder()
				// 스프링 부트가 제공하는 mailSender 의존성
				.mailSender(mailSender)
				.build();
	}

	@Bean
	public Step importStep() throws Exception {
		return this.stepBuilderFactory.get("importStep")
				.<Customer, Customer>chunk(10)
				.reader(customerEmailFileReader(null))
				.writer(customerBatchWriter(null))
				.build();
	}

	@Bean
	public Step emailStep() throws Exception {
		return this.stepBuilderFactory.get("emailStep")
				.<Customer, SimpleMailMessage>chunk(10)
				.reader(customerCursorItemReader(null))
				// SimpleMailMessage로 변환
				.processor((ItemProcessor<Customer, SimpleMailMessage>) customer -> {
					SimpleMailMessage mail = new SimpleMailMessage();

					mail.setFrom("prospringbatch@gmail.com");
					mail.setTo(customer.getEmail());
					mail.setSubject("Welcome!");
					mail.setText(String.format("Welcome %s %s,\nYou were imported into the system using Spring Batch!",
							customer.getFirstName(), customer.getLastName()));

					return mail;
				})
				.writer(emailItemWriter(null))
				.build();
	}

	@Bean
	public Job emailJob() throws Exception {
		return this.jobBuilderFactory.get("emailJob")
				.start(importStep())
				.next(emailStep())
				.build();
	}
}

여러 자원을 사용하는 ItemWriter

만약 100만개의 데이터를 csv 파일과 xml 파일로 write하고 싶다면 어떻게 해야 할까? 100만개 처리를 두번 반복한다면 리소스 부하로 너무 오랜 시간이 걸리고 아이템을 처리할 때마다 각 파일에 기록하는 커스텀 아이템 라이터를 구현한다면 직접 개발하는 공수가 너무 많이 든다.
이 때 MultiResourceItemWriter를 사용할 수 있다.

MultiResourceItemWriter

MultiResourceItemWriter는 처리한 레코드 수에 따라서 동적으로 출력 리소스를 만든다. 해당 라이터는 쓰기 작업을 각각의 라이터에 위임하며 아이템 수를 관리하여 필요시 리소스를 생성하는 역할을 한다.

write함수가 호출되면 열려있는 리소스가 있는지
확인해서 각각의 라이터에게 전달하고 파일을 읽다가 임계값에 도달하면 현재 파일을 닫는다. 이 때 청크 중간에 리소스를 생성하지 않는다.

MultiResourceJob

@Configuration
public class MultiResourceJob {

	private JobBuilderFactory jobBuilderFactory;

	private StepBuilderFactory stepBuilderFactory;

	public MultiResourceJob(JobBuilderFactory jobBuilderFactory,
			StepBuilderFactory stepBuilderFactory) {

		this.jobBuilderFactory = jobBuilderFactory;
		this.stepBuilderFactory = stepBuilderFactory;
	}

	@Bean
	public JdbcCursorItemReader<Customer> customerJdbcCursorItemReader(DataSource dataSource) {

		return new JdbcCursorItemReaderBuilder<Customer>()
				.name("customerItemReader")
				.dataSource(dataSource)
				.sql("select * from customer")
				.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
				.build();
	}

	@Bean
	public MultiResourceItemWriter<Customer> multiCustomerFileWriter(CustomerOutputFileSuffixCreator suffixCreator) throws Exception {

		return new MultiResourceItemWriterBuilder<Customer>()
				.name("multiCustomerFileWriter")
				.delegate(delegateItemWriter(null)) // 위임 writer
				.itemCountLimitPerResource(25) // 각 리소스에 쓰기 작업을 진행할 아이템 수
				.resource(new FileSystemResource("Chapter09/target/customer")) // 생성할 리소스의 프로토타입
				.resourceSuffixCreator(suffixCreator) // 생성하는 파일 뒤에 접미사 붙임
				// ResourxeSuffixCreator를 상속받아 커스텀할 수 있다
				.build();
	}

	@Bean
	@StepScope
	public StaxEventItemWriter<Customer> delegateItemWriter(CustomerXmlHeaderCallback headerCallback) throws Exception {

		Map<String, Class> aliases = new HashMap<>();
		aliases.put("customer", Customer.class);

		XStreamMarshaller marshaller = new XStreamMarshaller();

		marshaller.setAliases(aliases);

		marshaller.afterPropertiesSet();

		return new StaxEventItemWriterBuilder<Customer>()
				.name("customerItemWriter")
				.marshaller(marshaller)
				.rootTagName("customers")
				.headerCallback(headerCallback)
				.build();
	}

	@Bean
	public Step multiXmlGeneratorStep() throws Exception {
		return this.stepBuilderFactory.get("multiXmlGeneratorStep")
				.<Customer, Customer>chunk(10)
				.reader(customerJdbcCursorItemReader(null))
				.writer(multiCustomerFileWriter(null))
				.build();
	}

	@Bean
	public Job xmlGeneratorJob() throws Exception {
		return this.jobBuilderFactory.get("xmlGeneratorJob")
				.start(multiXmlGeneratorStep())
				.build();
	}
}

여기서 궁금한 점
MultiResourceItemWriter는 write 작업을 위임하게 해줌 -> 일반 writer와의 차이점은 위임밖에 없는 것? 그렇다면 왜 굳이 위임해서 작업할까?

MultiResourceHeaderFooterJob

스텝이나 잡이 생성하는 파일은 파일 내에 헤더나 푸터를 생성할 수 있어야 한다.
헤더를 이용해 플랫 파일의 형식을 정의하거나 아이템과 관련 없는 별도의 섹션을 XML 파일에 포함시킬 수 있다.
푸터에는 처리된 레코드 수나 파일이 처리된 후 무결성 검사에 사용할 총계가 포함될 수 있다.

플랫 파일에서 헤더와 푸터 : 상단 또는 하단에 하나 이상의 레코드 추가
XML 파일에서 헤더와 푸터 : 맨위 또는 맨 아래에 XML 세그먼트 추가
이에 따라서 스프링 배칭에서는 두가지 인터페이스를 제공한다.

XML 파일

StaxWriterCallback 인터페이스를 사용한다.

@Component
public class CustomerXmlHeaderCallback implements StaxWriterCallback {

	@Override
	public void write(XMLEventWriter writer) throws IOException {
		XMLEventFactory factory = XMLEventFactory.newInstance();

		try {
        	// 태그(섹션) 생성
			writer.add(factory.createStartElement("", "", "identification"));
			writer.add(factory.createStartElement("", "", "author"));
            
            // author 섹션 안에 name attribute 추가
			writer.add(factory.createAttribute("name", "Michael Minella"));
            
			writer.add(factory.createEndElement("", "", "author"));
			writer.add(factory.createEndElement("", "", "identification"));
            
		} catch (XMLStreamException xmlse) {
			System.err.println("An error occured: " + xmlse.getMessage());
			xmlse.printStackTrace(System.err);
		}
	}
}

해당 객체는 접두사, 네임스페이스와 함께 staxEventWriterBuilder에 전달되며 출력 형태는 다음과 같다.

<customers>
  <identification>
    	<author name="Micheal Minella"/>
  </identification>
</customers>

플랫 파일

플랫 파일은 헤더를 생성하려면 푸터를 생성할 때와 다른 인터페이스를 구현해야 한다.

  • FlatFileHeaderCallback
  • FlatFileFooterCallback

FlatFileFooterCallback을 사용하려면 에스펙트를 사용해야 한다.

@Component
@Aspect
public class CustomerRecordCountFooterCallback implements FlatFileFooterCallback {

	private int itemsWrittenInCurrentFile = 0;

	@Override
	public void writeFooter(Writer writer) throws IOException {
		writer.write("This file contains " +
				itemsWrittenInCurrentFile + " items");
	}

	// FlatFileItemWriter.write 호출 전
	@Before("execution(* org.springframework.batch.item.file.FlatFileItemWriter.write(..))")
	public void beforeWrite(JoinPoint joinPoint) {
		List<Customer> items = (List<Customer>) joinPoint.getArgs()[0];

		// 아이템 개수만큼 카운트 증가
		this.itemsWrittenInCurrentFile += items.size();
	}

	// FlatFileItemWriter.open 호출 전
	@Before("execution(* org.springframework.batch.item.file.FlatFileItemWriter.open(..))")
	public void resetCounter() {
    	// 초기화
		this.itemsWrittenInCurrentFile = 0;
	}
}

ItemWriteListener.beforeWrite를 사용하지 않고 에스펙트를 사용하지 않는 이유는 뭘까?
그 이유는 호출 순서때문이다.
FlatFileItemWriter.write를 호출하기 전에 카운터를 초기화해야 하기 때문이다.

초기화 작업은 FlatFileItemWriter.open에서 이뤄진다.

이 때 ItemWriteListener.beforeWrite 사용하면

ItemWriteListener.beforeWrite -> ItemWriter.write
MultiResourceItemWriter.wirte 메서드 내에서 FlatFileItemWriter.open 호출하므로 초기화가 되지 않는다.

@Configuration
public class MultiResourceHeaderFooterJob {

	private JobBuilderFactory jobBuilderFactory;

	private StepBuilderFactory stepBuilderFactory;

	public MultiResourceHeaderFooterJob(JobBuilderFactory jobBuilderFactory,
			StepBuilderFactory stepBuilderFactory) {

		this.jobBuilderFactory = jobBuilderFactory;
		this.stepBuilderFactory = stepBuilderFactory;
	}

	@Bean
	public JdbcCursorItemReader<Customer> multiResourceJdbcReader(DataSource dataSource) {

		return new JdbcCursorItemReaderBuilder<Customer>()
				.name("customerItemReader")
				.dataSource(dataSource)
				.sql("select * from customer")
				.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
				.build();
	}

	@Bean
	public MultiResourceItemWriter<Customer> multiFlatFileItemWriter() throws Exception {

		return new MultiResourceItemWriterBuilder<Customer>()
				.name("multiFlatFileItemWriter")
				.delegate(delegateCustomerItemWriter(null))
				.itemCountLimitPerResource(25)
				.resource(new FileSystemResource("Chapter09/target/customer"))
				.build();
	}

	@Bean
	@StepScope
	public FlatFileItemWriter<Customer> delegateCustomerItemWriter(CustomerRecordCountFooterCallback footerCallback) throws Exception {
		BeanWrapperFieldExtractor<Customer> fieldExtractor = new BeanWrapperFieldExtractor<>();
		fieldExtractor.setNames(new String[] {"firstName", "lastName", "address", "city", "state", "zip"});
		fieldExtractor.afterPropertiesSet();

		FormatterLineAggregator<Customer> lineAggregator = new FormatterLineAggregator<>();

		lineAggregator.setFormat("%s %s lives at %s %s in %s, %s.");
		lineAggregator.setFieldExtractor(fieldExtractor);

		FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();

		itemWriter.setName("delegateCustomerItemWriter");
		itemWriter.setLineAggregator(lineAggregator);
		itemWriter.setAppendAllowed(true);
		itemWriter.setFooterCallback(footerCallback);

		return itemWriter;
	}

	@Bean
	public Step multiFlatFileGeneratorStep() throws Exception {
		return this.stepBuilderFactory.get("multiXmlGeneratorStep")
				.<Customer, Customer>chunk(10)
				.reader(multiResourceJdbcReader(null))
				.writer(multiFlatFileItemWriter())
				.build();
	}

	@Bean
	public Job multiFlatFileGeneratorJob() throws Exception {
		return this.jobBuilderFactory.get("multiFlatFileGeneratorJob")
				.start(multiFlatFileGeneratorStep())
				.build();
	}
}

CompositeItemWriter

위의 예시는 하나의 출력 결과를 하나의 엔드 포인트에 저장했다. 하지만 여러개의 엔드 포인트에 출력 결과를 저장해야 할 상황도 있을 것이다. 이 때 CompositeItemWriter를 사용한다.

@Configuration
public class CompositeItemWriterJob {

	private JobBuilderFactory jobBuilderFactory;

	private StepBuilderFactory stepBuilderFactory;

	public CompositeItemWriterJob(JobBuilderFactory jobBuilderFactory,
			StepBuilderFactory stepBuilderFactory) {

		this.jobBuilderFactory = jobBuilderFactory;
		this.stepBuilderFactory = stepBuilderFactory;
	}

	@Bean
	@StepScope
	public FlatFileItemReader<Customer> compositewriterItemReader(
			@Value("#{jobParameters['customerFile']}")Resource inputFile) {

		return new FlatFileItemReaderBuilder<Customer>()
				.name("compositewriterItemReader")
				.resource(inputFile)
				.delimited()
				.names(new String[] {"firstName",
						"middleInitial",
						"lastName",
						"address",
						"city",
						"state",
						"zip",
						"email"})
				.targetType(Customer.class)
				.build();
	}

	@Bean
	@StepScope
	public StaxEventItemWriter<Customer> xmlDelegateItemWriter(
			@Value("#{jobParameters['outputFile']}") Resource outputFile) throws Exception {

		Map<String, Class> aliases = new HashMap<>();
		aliases.put("customer", Customer.class);

		XStreamMarshaller marshaller = new XStreamMarshaller();

		marshaller.setAliases(aliases);

		marshaller.afterPropertiesSet();

		return new StaxEventItemWriterBuilder<Customer>()
				.name("customerItemWriter")
				.resource(outputFile)
				.marshaller(marshaller)
				.rootTagName("customers")
				.build();
	}

	@Bean
	public JdbcBatchItemWriter<Customer> jdbcDelgateItemWriter(DataSource dataSource) {

		return new JdbcBatchItemWriterBuilder<Customer>()
				.namedParametersJdbcTemplate(new NamedParameterJdbcTemplate(dataSource))
				.sql("INSERT INTO CUSTOMER (first_name, " +
						"middle_initial, " +
						"last_name, " +
						"address, " +
						"city, " +
						"state, " +
						"zip, " +
						"email) " +
						"VALUES(:firstName, " +
						":middleInitial, " +
						":lastName, " +
						":address, " +
						":city, " +
						":state, " +
						":zip, " +
						":email)")
				.beanMapped()
				.build();
	}

	@Bean
	public CompositeItemWriter<Customer> compositeItemWriter() throws Exception {
		return new CompositeItemWriterBuilder<Customer>()
        		// xml과 jdbc에 write
				.delegates(Arrays.asList(xmlDelegateItemWriter(null),
						jdbcDelgateItemWriter(null)))
				.build();
	}


	@Bean
	public Step compositeWriterStep() throws Exception {
		return this.stepBuilderFactory.get("compositeWriterStep")
				.<Customer, Customer>chunk(10)
				.reader(compositewriterItemReader(null))
				.writer(compositeItemWriter())
				.build();
	}

	@Bean
	public Job compositeWriterJob() throws Exception {
		return this.jobBuilderFactory.get("compositeWriterJob")
				.start(compositeWriterStep())
				.build();
	}
}

이 때 100개의 레코드를 읽었다면 JobRepository에서 200개의 쓰기 처리를 기록하지 않을까 생각할 수 있지만 실제로는 100개의 쓰기 처리를 기록한다. 스프링 배치는 아이템 수를 기록하지 때문이다.

ClassifierCompositeItemWriter

미리 정한 기준에 따라 아이템의 쓰기 작업을 수행할 위치를 선택할 수도 있다.
ClassifierCompositeItemWriter는 서로 다른 유형의 아이템을 확인하고 어떤 ItemWriter를 사용해 쓰기 작업을 수행할지 판별한다.

ClassifierCompositeItemWriter는 Classifier 인터페이스의 구현체의 참조를 하고 있다.

Classify 인터페이스는 아이템을 입력받아 해당 아이템의 쓰기 작업을 수행할 ItemWriter를 반환한다. 즉, ItemWriter의 전략 구현체로 볼 수 있다.

CustomerClassifier

public class CustomerClassifier implements
		Classifier<Customer, ItemWriter<? super Customer>> {

	private ItemWriter<Customer> fileItemWriter;
	private ItemWriter<Customer> jdbcItemWriter;

	public CustomerClassifier(StaxEventItemWriter<Customer> fileItemWriter, JdbcBatchItemWriter<Customer> jdbcItemWriter) {
		this.fileItemWriter = fileItemWriter;
		this.jdbcItemWriter = jdbcItemWriter;
	}

	@Override
	public ItemWriter<Customer> classify(Customer customer) {
		if(customer.getState().matches("^[A-M].*")) {
			return fileItemWriter;
		} else {
			return jdbcItemWriter;
		}
	}
}

CustomerClassifier를 이용한 ItemWriter

	@Bean
	public ClassifierCompositeItemWriter<Customer> classifierCompositeItemWriter() throws Exception {
		Classifier<Customer, ItemWriter<? super Customer>> classifier = new CustomerClassifier(xmlDelegate(null), jdbcDelgate(null));

		return new ClassifierCompositeItemWriterBuilder<Customer>()
				.classifier(classifier)
				.build();
	}

하지만 실행하면 예외가 발생한다.
그 이유는 ItemStream에 있다.

ItemStream

ItemStream은 주기적으로 상태를 저장하고 복원하는 역할.

CompositeItemWriter와 ClassifierCompositeItemWriter의 차이점
: CompositeItemWriter는 ItemStream 인터페이스를 구현한 것.
ItemStream의 open 메서드는 위임 ItemWriter의 open 메서드를 호출한다.

이에 따라서 ClassifierCompositeItemWriter은 ItemStream을 구현하지 않았으므로 XML 파일이 열리지 않은 상태에서 XML에 대한 쓰기 처리가 시도된 것이다.

이를 해결하기 위해서 스프링 배치 스텝 내에 수동으로 ItemStream을 등록할 수 있는 기능을 제공한다.

    @Bean
	public Step classifierCompositeWriterStep() throws Exception {
		return this.stepBuilderFactory.get("classifierCompositeWriterStep")
				.<Customer, Customer>chunk(10)
				.reader(classifierCompositeWriterItemReader(null))
				.writer(classifierCompositeItemWriter())
				.stream(xmlDelegate(null))	// 수동으로 등록
				.build();
	}
profile
백엔드 개발자입니다.

0개의 댓글