스프링 배치 - Job 설정과 실행

600g (Kim Dong Geun)·2022년 2월 20일
0

굉장한 스크롤 압박 예정 👨‍💻
기존 Document랑 프레임워크를 뜯어가면서 동작을 이해하려 하다보니 양이 쓸데없이 많아졌다.

Configuring a Job

Job 인터페이스 구현체는 다양하고, 설정방법도 다르고 Builder 패턴으로 이를 추상화한다.

@Bean
public Job footballJob(){
	return this.jobBuilderFactory.get("footballJob")
			.start(playerLoad())
			.next(gameLoad())
			.next(playerSummarization())
			.end()
			.build();
}

Job(보통 Step을 가지고 있는)은 JobRepository가 필요하다.
JobRepositoryBatchConfigurer를 통해 설정한다.

위에 보이는 Job은 3개의 Step을 가지고 있다.
job빌더로 step 뿐 아니라 병렬화, 선언적 flow 제어, flow 정의 외부화 같은 다른 요소도 설정 가능하다.

Restarability

배치 Job을 실행할 때 발생하는 주요 이슈는 Job이 재시작 할 때의 동작과 관련 있다. job을 실행할 때 특정 JobInstance의 JobInstance가 이미 존재한다면 재시작으로 간주 한다.

모든 Job이 중단된 지점부터 재시작할 수 잇다면 이상적이지만, 불가능하여 새로 시작할 경우 개발자가 직접 JobInstance를 생성해야 한다.

결국 그때 실행한 JobInstance는 유일하다를 보장하는거네.

스프링 배치에서는 restartable 프로퍼티 값을 false로 설정하면 절대 Job을 재실행하지 않고 항상 새 JobInstance로 실행한다.

@Bean
public Job footballJob(){
	return this.jobBuilderFactory.get("footballJob")
			.preventRestart() // jobInstance를 항상 재시작한다.
			...
			.build();
}
  • 이때 재시작을 지원하지 않는 Job을 재시작하려고 하면 JobRestartException이 발생한다.
Job job = new SimpleJob();
job.setRestartable(false);

JobParameters jobParameters = new JobParameters();

JobExecution firstExecution = jobRepository.createJobExecution(job, jobParameters);
jobRepository.saveOrUpdate(firstExecution);

try {
    jobRepository.createJobExecution(job, jobParameters);
    fail();
}
catch (JobRestartException e) {
    // expected
}

Interceptiong Job Execution

public interface JobExecutionListener {

    void beforeJob(JobExecution jobExecution);

    void afterJob(JobExecution jobExecution);

}

Job은 JobExecutionListener 구현체를 등록해서 Job 전후로 이벤트를 등록할 수 있다.


@Component  
@RequiredArgsConstructor  
public class TestComponent {  
	 private final JobBuilderFactory jobBuilderFactory;  
	 private final StepBuilderFactory stepBuilderFactory;  
  
 @Bean  
 public Step simpleStep(){  
	 return stepBuilderFactory.get("simpleStep1")  
		 .tasklet((contribution, chunkContext) -> {  
			 System.out.println("step1");  
			 return RepeatStatus.FINISHED;  
		 }).build();  
 }  


//Job에 jobExecutionListener 등록
 @Bean  
 public Job simpleJob(){  
	 return jobBuilderFactory.get("simpleJob2")  
	 .listener(jobExecutionListener())  
	 .start(simpleStep())  
	 .build();  
 }  


//JobExecutionListener 구현
 @Bean  
 public JobExecutionListener jobExecutionListener(){  
 JobExecutionListener jobExecutionListener = new JobExecutionListener() {  
	 @Override  
	 public void beforeJob(JobExecution jobExecution) {  
	 System.out.println("job 들어간다");  
	 }  
 
	@Override  
		 public void afterJob(JobExecution jobExecution) {  
	 System.out.println("job 끝난다.");  
	 } };  
	 return jobExecutionListener;  
 }  }

실행결과

  • 또한 인터페이스와 동일한 어노테이션도 지원한다.
  • @BeforeJob
  • @AfterJob

JobParametersValidator

XML 네임 스페이스에서 선언되었거나 AbstractJob의 서브클래스를 사용한 job은 원한다면 런타임에 사용할 Job파라미터 validator를 지정할 수 있다.
Job 실행에 꼭 필요한 파라미터를 검증하는 용도로 유용하다.
간단한 필수/옵션 파라미터를 검증하는데 사용할 수 있는 DefaultJobParametersValidator를 지원하며, 좀 더 복잡한 제약 조건이 있다면 인터페이스를 직접 구현할 수 있다.

JavaBuilder를 통해 validator를 설정한다.

defaultJobParametersValidator를 열어보니 다음과 같다.

  • parameter에 대한 기본적인 검증이 들어감.
    - 코드를 살펴보니 OptionalKey와 requiredKey에 대한 검증이 있는것 같은데 requiredKey가 JobParameter에 들어가지 않으면 runtime error(JobParametersInvalidException)를 내는 것 같다.
  • 커스텀해서 원하는 validator 역할도 만들 수 있을 듯.

등록은 다음과 같이한다

@Bean
public Job job1() {
    return this.jobBuilderFactory.get("job1")
                     .validator(parametersValidator())
                     ...
                     .build();
}

Java Config

스프링 배치 2.2.0부터 자바코드로 배치 Job을 설정할 수 있다.
@EnableBatchProcessing을 이용하여 다음과 같은 빈들에 대한 설정을 초기화 해준다.

  • JobRepository -> bean name "jobRepository"
  • JobLauncher -> bean name "jobLauncher"
  • JobRegistry -> bean name "jobRegistry"
  • PlatformTransactionManager -> bean name "transactionManager"
  • JobBuilderFactory -> bean name "jobBuilder"
  • StepBuilderFactory -> bean name "stepBuilders"

이 설정을 위한 핵심 인터페이스는 BatchConfigurer이다.
디폴트 구현체는 위에 언급된 빈을 제공하며 컨텍스트에 DataSource 빈을 설정해 줘야 한다.

  • defaultBatchConfigurer 뜯어본 모습
  • 실제로 DataSource를 주입받고 있고, 이는 AbstractBatchConfiguration에 들어가 있어서 @EnableBatchProcessing을 실행하면 해당코드가 실행되는것을 확인할 수 있다.

물론 BatchConfiguerer 인터페이스를 구현하면 직접 이 빈들을 커스터마이징 할 수도 있다.

일반적으로 DefaultBatchConfiguerer를 상속받아 getter를 overriding 하는 것만으로도 충분하다.

아래는 batchConfiguerer를 override하여 custom trnasaction manager를 사용하는 방법이다.

@Bean
public BatchConfigurer batchConfigurer(){
	return new DefaultBatchConfigurer(){
		@Override
		public PlatformTransactionManager getTransactionManager(){
	return new MyTransactionManager();
		}
	}
}

이 기반 설정만 해주면 기본으로 제공하는 빌더 팩토리로 job을 만들 수 있다.

@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class) //datasource에 대한 설정및 빈을 가지고 있겠구만..

public class AppConfig{

	@Autowired
	private JobBuilderFactory jobs; //inject (o)

	@Autowired
	private StepBuilderFactory jobs; //inject (o)
}

Configuring a JobRepository

EnableBatchProcessing을 이용하면 JobRepository를 바로 사용할 수 있다.

  • BatchConfigurer를 custom 해서 JobRepository에 대한 설정을 커스텀 할 수 있다.
...
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
    factory.setTablePrefix("BATCH_");
    factory.setMaxVarCharLength(1000);
    return factory.getObject();
}
...

아까 위에 defaultBatchConfigurer를 뜯어 봤는데, 그 부분을 살펴보자.

Transaction Configuration for the JobRepository

네임스페이스나 기본으로 제공하는 FactoryBean을 사용한다면 Repository를 위한 트랜잭션 advice가 자동으로 생성된다( 순간 읽다 이해 안됐는데 ->이게 commit, rollback에 대한 동작에 대한 aop를 지원한다라고 이해)
이는 재시작 할때 필요한 state를 포함한 메타 데이터가 안전하게 저장됨을 보장한다.
두 프로세스가 동시에 같은 job을 실행하려고 하면, 유일하게 실행되어야 하기 때문에 create* 메소드의 레벨을 별도로 지정한다.
JobRepository의 default isolation levelSERIALIZED이다. (READ_COMMITED도 충분히 잘 동작한다고 하고 실제로도 그럴 것 같다.)

그렇지만, create* 메소드 호출이 매우 짧은 호출이기 때문에 SERIALIZED도 딱히 문제가 되진 않는다고 한다.

그게 아니더라도 Override 옵션을 제공한다

@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
    return factory.getObject();
}

따로 Namespace나 팩토리 빈을 사용안하면 AOP 사용해서 트랜잭션을 제어하는 게 좋다고한다.

@Bean
public TransactionProxyFactoryBean baseProxy() {
	TransactionProxyFactoryBean transactionProxyFactoryBean = new TransactionProxyFactoryBean();
	Properties transactionAttributes = new Properties();
	transactionAttributes.setProperty("*", "PROPAGATION_REQUIRED");
	transactionProxyFactoryBean.setTransactionAttributes(transactionAttributes);
	transactionProxyFactoryBean.setTarget(jobRepository());
	transactionProxyFactoryBean.setTransactionManager(transactionManager());
	return transactionProxyFactoryBean;
}

Changing The Table Prefix

JobRepository 에서 table prefix 를 바꿀수 있다.

// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setTablePrefix("SYSTEM.TEST_"); //SYSTEM.TEST_JOB_EXECUTION으로 변경된다.
    return factory.getObject();
}

In-Memory Repository

성능등의 이유로 도메인 오브젝트를 굳이 데이터베이스에 저장하지 않고 인메모리에 저장할 수도 있다. 아니면 상태를 저장할 필요가 없는 Job이거나 위와같은 이유로 스프링 배치는 인메모리 Map 버전의 Job Repository를 제공한다.

// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
    factory.setTransactionManager(transactionManager);
    return factory.getObject();
}

위대로 MapJobRepositoryFactoryBean을 이용하여 인메모리 job repo를 inject 할 수 있다.
InMemory 레포는 휘발성이여서 jvm을 새로 띄우는식의 재실행이 불가능하다.
또한 같은 파라미터로 두개의 job인스턴스를 동시에 실행시킬 수 없으며, 멀티쓰레드 job이나 프로그램 내에서 파티셔닝한 Step에는 적합하지 않다. -> 이경우가 어떤 경우를 말하는가?

프로그램내에서 파티셔닝하는 작업이 뭘까..? 데이터를 불러놓고 데이터 chunk를 따로 만드는듯한 작업을 말하는 것인것 같다. partitioning 하는 작업자체가 메모리를 많이 잡아먹는경우가 많은데, 이를 메인메모리에 저장하기에는 부담스러울 수도 있기 때문인듯?

인메모리 레포지토리도 트랜잭션 매니저를 등록해야한다. 비즈니스 로직에서는 트랜잭션이 필요할수도 있기 때문이다.

테스트가 목적이면 ResourcelessTransactionManager가 유용하다고 한다.

테스트 추가로 해보기

Non-standard Database Types in a Repository

사용하려는 데이터 베이스 플랫폼이 지원되지 않는다면 SQL이 크게 다르지 않은 경우에 한해 지원되는 타입 중 하나를 골라 사용하는 방법도 존재한다.
손쉬운 네임스페이스 대신 Raw 레벨의 JobRepositoryFactoryBean을 사용해서 가장 유사한 database를 설정하면 된다.

// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setDatabaseType("db2");
    factory.setTransactionManager(transactionManager);
    return factory.getObject();
}


4.3.4 버전에서는 지원하고 있는 데이터 타입인데, 거의 대부분의 사용 RDBMS를 지원한다.
mariaDB 말고 쓸일이 있는건가?

(데이터베이스 타입을 지정하지 않으면 JobRepositoryFactoryBeanDataSource로부터 자동으로 타입을 알아낸다.) 플랫폼마다 주로 Primary Key를 증가시키는 전략이 다르기 때문에, 필요하다면 incrementerFactory를 오버라이드해야한다. -> postgresql이었나?

Configuring JobLauncher

위에서도 언급했듯이 @EnableBatchProcessing을 사용하면 JobRepository가 제공된다.
JobLauncher 인터페이스의 가장 기본적인 구현체는 SimpleJobLauncher다. JobRepository의존성만 있으면 실행가능하다.

...
// This would reside in your BatchConfigurer implementation
@Override
protected JobLauncher createJobLauncher() throws Exception {
	SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
	jobLauncher.setJobRepository(jobRepository); //jobRepo를 주입받아야 한다.
	jobLauncher.afterPropertiesSet();
	return jobLauncher;
}
...
  • 스케줄러가 job을 실행시킬때 flow

    스케쥴러가 job을 기동하면 순서대로 잘 동작한다.

문제는 HTTP 요청으로 job을 시작할 때 생긴다. 이때는 요청을 비동기로 처리해서 SimpleJobLauncher가 요청 즉시 caller에게 결과를 리턴해줘야한다. (내부구조가 어떤지, Callable인지 deferedResult인지 뜯어볼 수 있으면 뜯어보자..)

그게 아니라, http 요청자체를 비동기로 처리해주는게 올바른 처리 방식이고 jobLauncher를 async하게 만들어주는 것을 권고하기 위해 한말이었다.


이런 경우 SimpleJobLauncher에 TaskExecutor를 설정하면 된다.

@Bean
public JobLauncher jobLauncher() {
	SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
	jobLauncher.setJobRepository(jobRepository());
	jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
	jobLauncher.afterPropertiesSet();
	return jobLauncher;
}

Running jobs from the command line

The CommandLineJobRunner

bash나 script로 스프링 배치 job을 실행시키기 위한 클래스.

CommandLineJob은 다음 4가지 기능을 수행한다.

  • 적절한 ApplicationContext로딩
  • 커맨드라인에서 받은 인자를 JobParameters로 변환
  • 인자에 따른 적절한 Job 선택
  • 어플리케이션 컨텍스트에 JobLauncher로 job 실행

Spring Boot의 경우 진입점이 main class 가 되기 때문에 따로 변경해줘야한다.

package com.example.springbatchtutorial;

/**
import들 생략
*/

@Configuration  
@EnableBatchProcessing  
@RequiredArgsConstructor  
public class TestComponent {  
 private final JobBuilderFactory jobBuilderFactory;  
 private final StepBuilderFactory stepBuilderFactory;  
  
	 @Bean  
	 public Step simpleStep(){  
		 return stepBuilderFactory.get("simpleStep1")  
			 .tasklet((contribution, chunkContext) -> {  
				 System.out.println("step1");  
				 return RepeatStatus.FINISHED;  
			 }) .build();  
	 }

	 @Bean  
	 public Job simpleJob(){  
		 return jobBuilderFactory.get("simpleJob")  
			 .listener(jobExecutionListener())  
			 .start(simpleStep())  
		 .build();  
	 }

위와 같이 선언이 되어있다면

java -jar SpringBatchTutorial-0.0.1-SNAPSHOT.jar com.example.springbatchtutorial.TestComponent simpleJob

# java 어플리케이션 클래스명 jobname jobparameter=value 형식이 되겠다.

다음과 같이 bash 명령어로 job을 실행할 수 있다.

다만 SpringBoot의 경우 진입점 자체가 application의 main으로 되어있기 때문에 gradle에서 다음과 같은 옵션으로 CommandLineJobRunner로 진입되도록 옵션을 변경해줘야 한다.

# 추가필요

plugins {  
 id 'application'  
}

bootJar {  
 mainClassName = 'org.springframework.batch.core.launch.support.CommandLineJobRunner'  
}

이 부분은 gradle 설정을 좀 더 자세히 볼 필요가 있을 것 같다.

ExitCodes

커맨드 라인으로 배치를 실행할 땐 보통 엔터프라이즈 스케쥴러(Cron 말하는건가?)를 사용한다. 대부분 스케쥴러는 그렇게 똑똑하지 않다. 왜냐하면 프로세스 수준에서 작동하기 때문이다. 이 말은 쉘스크립트 등의 운영체제 프로세스만 다룬다는 뜻이다. 따라서 스케쥴러를 사용하면 리턴코드로 잡의성공/실패 여부를 판단해야 한다.

이말 덕분에 빨리 이해가 됐다.
즉 어플리케이션 단이 아니라 배치는 운영체제 레벨단에서 운용되는 스케쥴러로 실행되는 경우가 많기 때문에 다양한처리를 하기가 어렵다. 특히 성공 실패 여부를 리턴코드로 받아야 되는등등

리턴 코드는 프로세스가 스케줄러에게 리턴하는 수자로 실행결과를 나타낸다.
가장 간단하게는 0은 성공이고 1은 실패를 의미한다.
여기서 더 복잡한 케이스 등이 있는데 이런 exitCode와 어플리케이션 내에서 통용되는 formatter, converter? 같은 기능을 하는것이 ExitCode라고 보면될 것 같다.

exitCodeMapper를 통해서 exitCode에 따라서 어떤 값을 return 해줄지에 대한 정의가 가능하다.

public interface ExitCodeMapper {

    public int intValue(String exitCode);

}

JobRunner가 디폴트로 사용하는 구현체는 SimpleJvmExitCodeMapper이며 성공시에는 0, 일반적인 에러라면 1, 컨텍스트 내에서 Job을 찾지못하는 경우에는 2를 리턴한다.


  • 다음과 같이 구현이 되어 있다.

따라서 해당 부분을 스크립트로 만들어서 실행을 해보았다.

#!/bin/bash

JOBNAME=simpleJob

java -jar SpringBatchTutorial-0.0.1-SNAPSHOT.jar com.example.springbatchtutorial.TestComponent $JOBNAME

# $? 은 마지막에 실행된 명령의 리턴값을 의미
STATUS=$?

echo "status : $STATUS"
  • 오류일때

  • 오류가 아닐때

운영체제나 특정 Jenkins 단에서 Batch에 대한 조작을 할 수 있게 만들 수 있다.

Running Jobs from within a Web Container

위에서 예제는 오프라인에서 실행할 때 였다.
그러나 HttpRequest로 실행해야 될때가 있다.(웹 어플리케이션, 리포팅 용도, ad-hoc 등)


배치 Job은 보통 긴 작업을 의미하기 때문에 job은 비동기로 실행되어져야한다.
( = 응답을 주고(커넥션을 유지하지않고), 작업은 back단에서 계속 돌아가게끔 한다 )

컨트롤러가 Job을 실행시키는데, 이때 요청 즉시 JobExecution을 리턴하게 설정한 JobLauncher를 사용한다.

만약 커넥션을 유지하고 동기식으로 동작한다면, jobLauncher가 asyncTaskExecutor가 주입되어있는지 확인해보자

@Bean  
SimpleJobLauncher simpleJobLauncher(JobRepository jobRepository){  
 SimpleJobLauncher jobLauncher = new SimpleJobLauncher();  
 jobLauncher.setJobRepository(jobRepository);  
 jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());  
 return jobLauncher;  
}
@RestController  
@RequestMapping("/api")  
public class TestController {  
  
 @Autowired  
 @Qualifier("simpleJobLauncher")  
 private JobLauncher jobLauncher;  
  
 @Autowired  
 @Qualifier("simpleJob")  
 private Job job;  
  
 @GetMapping  
 public ResponseEntity<JobExecution> get(){  
 JobExecution result;  
 try{  
 result = jobLauncher.run(job, new JobParameters());  
 } catch (Exception e){  
 throw new RuntimeException();  
 }  
 return ResponseEntity.ok(result);  
 }  
}

Advanced Meta-Data Usage

  • JobLauncherJobExecution 오브젝트를 생성하고 실행하기 위해 JobRepository를 사용한다.
  • 그다음으론 JobStep 구현체가 Job 실행중에 Execution을 업데이트 하기 위해 JobRepository를 사용한다.

그러나 기본적인 시나리오는 간단한 동작으로 충분하지만, 배치 job이 수백개에 달하고 스케줄링까지 복잡하다면 메타 데이터에 접근하기 위한 고급 기술이 필요하다.
-> 이런 복잡한 메타 데이터 관리를 위해 JobExplorerJobOperator 인터페이스를 통해 추가 기능을 제공한다.

Querying the Repository

  • JobExplorer 인터페이스는 JobExecution을 요청하는 가장 기본적인 기능을 제공한다.
public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

JobRepository와 결정적인 차이는 이름그대로 조회를 책임지는 객체라는 성격이 강한 건가? -> 맞네

위의 메소드에서 알 수 있듯이, JobExplorerJobRepository의 ReadOnly 버전이며, JobRepository와 동일하게 팩토리 빈을 통해 손쉽게 설정이 가능하다.

@Override
public JobExplorer getJobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();

	factoryBean.setDataSource(this.dataSource);
	factoryBean.setTablePrefix("SYSTEM."); //jobExplorer도 같은 테이블에서 동작하므로 당연히 프리픽스를 변경할 수 있다.
	return factoryBean.getObejct();
}

JobRegistry

JobRegistry는 필수는 아니지만, 컨텍스트 내에 있는 Job을 추적하고 싶을 때 유용하다.

여러 곳에서 job을 생성하는 환경이라면 어플리케이션 컨텍스트에서 job을 수집할 때도 사용할 수 있다.

커스텀 JobRegistry 구현체 또한 등록된 Job의 이름이나 프로퍼티를 관리할 때 유용할 수 있다. 기본으로 제공하는 구현체는 Job이름을 JobInstance에 매핑한 간단한 Map기반이다.

@EnableBatchProcessing을 선언하면 JobRegistry를 기본으로 제공한다.

JobRegistry는 2가지 메커니즘을 통해서 등록이 가능하다

  • JobRegistryBeanPostProcessor
  • register LifeCycle
JobRegistryBeanPostProcessor

빈 post-processor는 생성된 모든 job을 등록할 수 있다.

@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
    JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
    postProcessor.setJobRegistry(jobRegistry());
    return postProcessor;
}

이름으로 유추하자면, 후처리기 같은데, 어떻게 동작할까를 찾아봤다.


BeanPostProcessor를 구현한 구현체로 빈이 생성된 이후 해당 Bean이 Job일 경우 JobRegistry에 등록해주고 있다.

스프링은 참 방대하구나 한번 더 생각했다.

AutomaticJobRegister

AutoMAticJobRegister는 자식 컨텍스트를 생성하고 각 job을 하위 컨텍스트에 등록하는 라이프사이클 컴포넌트다.
이렇게 하면 자식 컨텍스트에 있는 Job이름은 registry 전역에서 유일해야하지만, job이 의존성을 갖는 Bean이름은 그럴필요가 없어진다, 예를들어 Job을 하나씩 가진 여러 XML설정 파일에 빈이름이 같은 ItemReader 가 여러개 있어도 상관없다.

무슨 말일까?
AutoMaticJobRegister내에 job을 등록하는 컨텍스트가 존재,
컨텍스트 내에서는 Registry가 유일해야하지만, job이 의존성을 갖는 Bean이름은 여러번 정의하지 않아도 configuration 설정만 바꾸면 주입할 수 있다 이걸까?
출처 : https://opennote46.tistory.com/77

@Bean
public AutomaticJobRegistrar registrar() {

    AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
    registrar.setJobLoader(jobLoader());
    registrar.setApplicationContextFactories(applicationContextFactories());
    registrar.afterPropertiesSet();
    return registrar;

}

registrar 필수 프로퍼티는 ApplicationContextFactory배열과 JobLoader 2개다.
JobLoader가 자식 컨텍스트와 JobRegistry에 등록된 job의 라이프 사이클을 관리한다.

ApplicationContextFactory 가 자식 컨텍스트 생성을 담당

JobOperator

앞서 말한바와 같이 JobRepository는 메타데이터에 대한 CRUD Operation을 JobExplorer는 리드온리 오퍼레이션을 제공한다.
그런데 이 두 오퍼레이션을 함께 사용하는 배치에서 흔히쓰는 중단, 재시작 요약등의 모니터링이 가능해진다.
스프링 배치는 JobOperator 인터페이스를 통해 이를 지원한다.

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

위 오퍼레이션은 JobLauncher, JobRepository, JobExplorer, JobRegistry 같은 인터페이스를 사용하는 메소드가 많다.

JobOperator를 구현한 SimpleJobOperator는 의존성이 높다.

주석에도 표시되있다 ㅋㄷㅋㄷ

JobParametersIncrementer

JobOperator의 메소드는 대부분 설명이 필요 없으며, 자세한 설명은 javadocㅇ을 참조

startNextInstance 메소드는 주의깊게 살펴볼 필요가 있다.
-> 이 메소드는 항상 새 JobInstance를 실행.

JobExecution에서 심각한 이슈가 발생해서 job을 새로 시작해야할경우 유용하다. jobLauncher는 이전 파라미터 셋과는 다른 값으로 새 JobInstance를 실행시키려면 새로운 JobParameters 오브젝트가 필요한 반면에, startNextInstance 메소드는 Job에 상응하는 JobParametersIncrementer를 통해 새 메소드를 만들도록 강제한다

이걸 이용하면 다음과 같은 처리가 가능할 것 같다
배치 execution 3번실행 -> 실패 -> startNextInstance 3번 실행 -> 실패 -> 수동 복구 로그에 기록

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

JobParametersIncrementer 는 JobParameter에서 필요한 값을 이용하여 다음에 사용될 JobParameter 오브젝트를 리턴한다,

다음은 JobParameterIncrementer에 대한 간단한 예제이다.

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}
  • 등록과정
@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
    				 .incrementer(sampleIncrementer())
    				 ...
                                 .build();
}

Stopping a job

job을 정상적으로(gracefully) 종료하는데에도 JobOperator를 사용한다.

Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

해당 명령어는 Job을 바로 종료시키지 않는다, 강제로 즉시 종료하는건 불가능하다.

현재 실행중인 execution이 개발자가 작성한 비즈니스 로직을 담당하는 코드라면, 프레임 워크에서 컨트롤할 수 없다.

그러나 프레임워크가 다시 제어할 수 있는 상태가 되면 종료하기전의 StepExecution의 상태를 BatchStatus.STOPPED로 바꾸고 저장한 다음 JobExecution에도 동일한 처리를 한다.

Aborting a Job

실패한 Job은 재실행이 가능하다. (Restartable하다면), ABANDONED 상태인 Job은 프레임워크에 의해 재실행되지 않는다.

job을 재실행할 때 특정 step을 스킵하고 싶을때도 ABANDONED를 사용한다.

프로세스가 죽으면 당연히 job은 실행중이 아님에도 Job Repository는 프로세스가 죽었다는 통지를 받을수가 없기 때문에 이를 알아 채지 못한다.

Execution 이 실패하거나 중단되었다고 판단되면 이를 수동으로 알려줘야 한다
(즉 상태를 FAILEDABANDONED 로 변경해야 한다.)

자료 출처 : 토리맘의 한글화 프로젝트

profile
수동적인 과신과 행운이 아닌, 능동적인 노력과 치열함

0개의 댓글