s3에서 로컬 디렉터리로 파일을 다운로드하는 단일 스텝의 배치 잡을 클라우드 네이티브 배치로 발전시켜보자.
(책에 있는 코드 참조 617)
메인 클래스
Job Configuration
DownloadingJobExecutionListener
EnrichmentProcessor
이외에도 컨트롤러, application.yml, 디펜던시 추가
배치 처리는 매우 효율적. 하지만 REST API가 과부하가 걸려서 반복 호출이 일어나는 것이 의미가 있을까? 이를 위해서 또 다른 요청을 하기 전에 API 과부하 상태가 해소될 때까지 기다려주는 기회를 준다면 어떨까?
서킷 브레이커로 감쌀 메서드 식별
해당 메서드에서 발생한 예외 건수가 임계값을 초과하면 서킷 브레이커가 해당 메서드에 대한 호출을 중지하고 대체 메서드(다른 방식으로 처리, ex. 기본값 리턴)로 트래픽을 라우팅
서킷 브레이커가 특정 알고리즘을 기반으로 다시 원래 메서드로 트래픽 되돌린 뒤 정상인 상태로 돌아왔는지 테스트 후 재설정
내결함성 스텝을 사용하는 대신 서킷 브레이커를 사용하는 이유
1. 성능 : 작업 재시도시 트랜잭션 롤백, 커밋 카운트 1로 설정, 자체 트랜잭션에서 각 아이템 대상으로 처리 작업 재시도
-> 성능 저하
2. 사용 사례 : 아이템의 대한 처리를 재시도할 수 있지만 문제가 있는 코드 부하를 줄이는 기능 제공하지 않음.
핵심은 두개의 애너테이션
@CircuitBreaker : 서킷 브레이커에서 무언가 래핑해야함
@Recover : 재시도 가능한 메서드가 실패하거나 서킷 브레이커가 플립될 때 호출하는 메서드
둘이 적용된 메서드는 동일한 시그니처를 가져야 함
@EnableRetry 를 메인 클래스에 추가하여 재시도 가능한 메서드 호출을 프록시하기 위한 스프링 메커니즘 부트스트랩
코드는 책 참조 630
구성 파일이 소스 코드와 함께 제공되기 때문에 환경에 따라 쉽게 변화할 수 없는 점에서 클라우드 네이티브 환경에서 문제. 민감한 정보가 평문 형태로 저장되어 있기 때문에 보안 문제.
깃, DB 백엔드에 저장된 구성을 제공하기 위한 구성 서버
클라이언트에 의존성 및 name, failFast(예외 발생)라는 두가지 프로퍼티 구성
서버에서 CLI 이용하여 컨피그 서버 구성 (구성 파일 git에 저장 후 uri 전달) 이 때 암호화 유틸리티 사용
서비스 바인딩 사용
유레카는 클라이이언트와 서버 존재.
애플리케이션은 서버에 등록돼 서비스로 검색될 수 있게 식별됨.
배치 잡은 시작될 때 유레카로부터 통신 방법에 대한 정보 얻음.
bootstrap.yml 파일에서 어플리케이션 name 추가
@EnableDiscoveryClient(autoRegister = false)로 수정
REST API를 유레카에 서비스로 등록하기를 원했지만 하나의 잡으로 등록하려는 것이 아니고 다른 서비스의 구성 상세 정보를 얻으려고 한 것이기 때문이다.
@LoadBalanced를 RestTemplate에 적용 -> 유레카 클라리언트를 활성화 후 사용하기 위해서
EnrichmentProcessor : 유레카를 사용한다면 서비스 이름만 지정하면 되며 나머지느 스프링 클라우드가 처리
CLI로 유레카 명령어 사용하여 실행
로그에 제공된 url에서 대시보드 확인 가능
이제 잡을 실행
스프링 배치에는 특정 시간에 잡을 처리하기 위한 메커니즘이 없다. 하지만 스프링 포트폴리오에는 데이터 처리 애플리케이션을 오케스트레이션하는 도구를 가지고 있다. 스프링 클라우드 데이터 플로우 이다. 스트리밍이나 태스크 기반 워크로드를 오케스트레이션하기 위해 완전하게 개발된 도구이다.
사용중인 플랫폼에서 배치잡을 시작하는 서버 애플리케이션
배치 잡을 배포하고 시작하는 스프링 클라우드 데이터 플로우 서버, 이를 조작하기 위한 셸 또는 GUI로 이루어진다. 이 때 위의 구성요소들은 모두 스프링 부트 기반이다.
스프링 클라우드 데이터 플로우 다운로드
jobRepository 구성
실행 후 메이븐 좌표를 이용해 애플리케이션 등록
태스크를 시작하기 위한 태스크 정의 만들기
잡 시작 방법 정의하기. 이 때 시간 또는 이벤트 driven 방식으로 정의 가능
잡이 실행 중일 때 jobRepository 또는 GUI를 통해서 모니터링 가능
GUI에서는 태스크, 잡, 스탭실행 상세 페이지 등을 확인 가능하다.
테스할 때 잡과 스텝 스코프에 대한 의존성 문제를 어떻게 해결할까?
첫번째는 TestExecutionListener를 사용하는 방법이다. 해당 리스너는 테스크 메서드 실행 전후에 수행되야 하는 일을 정의하는 스프링 API이다. 이 중에서 구현체인 StepScopeTestExecutionListener를 사용해보자. 먼저 테스트케이스에서 팩토리 메서드를 이용해서 StepExecution을 가져오고 반환된 컨텍스트를 현재 테스트 메서드의 컨텍스트로 사용한다. 또한 스텝 컨텍스트를 제공한다.
@ExtendWith(SpringExtension.class) // 스프링 기능 사용하기 위해
// 애플리케이션 컨텍스트를 만드는 클래스를 지정하는 곳
@ContextConfiguration(classes = {ImportJobConfiguration.class, // @Configuration
// ImportJobConfiguration이 사용하는 두개의 빈
CustomerItemValidator.class,
AccountItemProcessor.class})
@JdbcTest // 데이터베이스 사용하기 위해
@EnableBatchProcessing // JobRepository 사용하기 위해
@SpringBatchTest // ApplicationContext에 자동으로 테스트할 수 있는 많은 유틸리티 제공
public class FlatFileItemReaderTests {
@Autowired
private FlatFileItemReader<CustomerUpdate> customerUpdateItemReader;
public StepExecution getStepExecution() {
// JobParameters 객체 생성
JobParameters jobParameters = new JobParametersBuilder()
.addString("customerUpdateFile", "classpath:customerUpdateFile.csv") // 입력 파일 가리킴
.toJobParameters();
return MetaDataInstanceFactory.createStepExecution(jobParameters); // StepExecution, JobExecution 생성. 하지만 JobRepository에 저장되지 않음
}
@Test
public void testTypeConversion() throws Exception {
this.customerUpdateItemReader.open(new ExecutionContext()); // reader의 open 호출
// 유형 검증
assertTrue(this.customerUpdateItemReader.read() instanceof CustomerAddressUpdate);
assertTrue(this.customerUpdateItemReader.read() instanceof CustomerContactUpdate);
assertTrue(this.customerUpdateItemReader.read() instanceof CustomerNameUpdate);
}
}
@ExtendWith(SpringExtension.class) // Junit5가 제공하는 기능 쓸 수 있게 해줌
// ApplicationContext 빌드하는 데 필요한 클래스 제공
@ContextConfiguration(classes = {ImportJobConfiguration.class,
CustomerItemValidator.class,
AccountItemProcessor.class,
BatchAutoConfiguration.class})
@JdbcTest // 데이터베이스 사용
@SpringBatchTest // JobLauncherTestUtils를 사용하기 위해서
@Transactional(propagation = Propagation.NOT_SUPPORTED) // @JdbcTest는 각 테스트 메서드를 하나의 트랜잭션으로 래핑하고 메서드 실행 완료시 롤백. 하지만 스프링 배치가 트랙잭션을 관리하면서 다른 트랜잭션으로 래핑하면 오류 발생하므로 비활성화
public class ImportCustomerUpdatesTests {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private DataSource dataSource;
private JdbcOperations jdbcTemplate;
@BeforeEach
public void setUp() {
this.jdbcTemplate = new JdbcTemplate(this.dataSource);
}
@Test
public void test() {
// jobParameters 정의
JobParameters jobParameters = new JobParametersBuilder()
.addString("customerUpdateFile", "classpath:customerUpdateFile.csv")
.toJobParameters();
// JobLauncher 호출해서 스텝 실행
JobExecution jobExecution =
this.jobLauncherTestUtils.launchStep("importCustomerUpdates",
jobParameters);
// 아래는 테스트
assertEquals(BatchStatus.COMPLETED,
jobExecution.getStatus());
List<Map<String, String>> results =
this.jdbcTemplate.query("select * from customer where customer_id = 5",
(rs, rowNum) -> {
Map<String, String> item = new HashMap<>();
item.put("customer_id", rs.getString("customer_id"));
item.put("first_name", rs.getString("first_name"));
item.put("middle_name", rs.getString("middle_name"));
item.put("last_name", rs.getString("last_name"));
item.put("address1", rs.getString("address1"));
item.put("address2", rs.getString("address2"));
item.put("city", rs.getString("city"));
item.put("state", rs.getString("state"));
item.put("postal_code", rs.getString("postal_code"));
item.put("ssn", rs.getString("ssn"));
item.put("email_address", rs.getString("email_address"));
item.put("home_phone", rs.getString("home_phone"));
item.put("cell_phone", rs.getString("cell_phone"));
item.put("work_phone", rs.getString("work_phone"));
item.put("notification_pref", rs.getString("notification_pref"));
return item;
});
Map<String, String> result = results.get(0);
assertEquals("5", result.get("customer_id"));
assertEquals("Rozelle", result.get("first_name"));
assertEquals("Heda", result.get("middle_name"));
assertEquals("Farnill", result.get("last_name"));
assertEquals("36 Ronald Regan Terrace", result.get("address1"));
assertEquals("P.O. Box 33", result.get("address2"));
assertEquals("Montgomery", result.get("city"));
assertEquals("Alabama", result.get("state"));
assertEquals("36134", result.get("postal_code"));
assertEquals("832-86-3661", result.get("ssn"));
assertEquals("tlangelay4@mac.com", result.get("email_address"));
assertEquals("240-906-7652", result.get("home_phone"));
assertEquals("907-709-2649", result.get("cell_phone"));
assertEquals("316-510-9138", result.get("work_phone"));
assertEquals("2", result.get("notification_pref"));
}
}
전체 잡을 테스트하는 일은 매우 어렵고 복잡할 수 있지만 잡 실행과 결과 검증을 자동화할 수 있다는 장점을 무시할 수는 없다. 따라서 이 예제 수준의 테스트를 만들어 자동화하는 것이 좋다.
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = {JobTests.BatchConfiguration.class, BatchAutoConfiguration.class})
@SpringBatchTest
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public class JobTests {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void test() throws Exception {
JobExecution jobExecution =
this.jobLauncherTestUtils.launchJob(); // jobLauncher 사용해서 job 실행
// 아래는 테스트
assertEquals(BatchStatus.COMPLETED,
jobExecution.getStatus());
StepExecution stepExecution =
jobExecution.getStepExecutions().iterator().next();
assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus());
assertEquals(3, stepExecution.getReadCount());
assertEquals(3, stepExecution.getWriteCount());
}
@Configuration
@EnableBatchProcessing
public static class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public ListItemReader<String> itemReader() {
return new ListItemReader<>(Arrays.asList("foo", "bar", "baz"));
}
// System.out.println하는 잡
@Bean
public ItemWriter<String> itemWriter() {
return (list -> {
list.forEach(System.out::println);
});
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.build();
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder().build();
}
}
}