이번 게시글에서는 SCDF를 이용하여 Batch Job의 실행 결과에 대한 모니터링 기능을 구현해보겠다. 이번 위치추적모듈 프로젝트에서 정기 결제 기능을 구현하면서 결제 실행의 중복을 방지하기 위해 Batch Job을 사용하였다. Batch Server는 항상 실행되며 재결제 요청을 수신하면 Batch Job이 실행되는 구조를 가진다. 이러한 이유로 Jenkins를 이용하여 Batch 작업을 관리하지 못하였다. 또한, K8S 환경이 아니기 때문에 SCDF를 통한 스케줄링 기능을 사용할 수 없어 Spring Batch와 Quartz를 이용하여 구현하였다.
이러한 이유 때문에 Batch에 대한 모니터링 시스템이 필요하였고, 이 글에서는 SCDF를 이용한 Spring Batch의 Job 실행 결과 모니터링 방법을 상세히 설명하고자 한다.
Spring Cloud Data Flow의 로컬 머신 환경을 구축하기 위해서는 Docker Compose 방식을 추천한다. 아래 공식 문서를 참고하기 바란다.
하지만 Batch Job의 결과를 모니터링하기 위해서는 Skipper를 제외한 Dataflow만으로 충분하다. 그래서 Docker Compose 방식이 아닌 Dataflow의 jar 파일로도 충분하다. 아래 링크에서 필요한 파일을 다운로드할 수 있다.
다음 명령어를 사용하여 실행할 수 있다.
java -jar spring-cloud-dataflow-server-2.11.3.jar
--spring.datasource.url=jdbc:postgresql://{데이터베이스 주소}
--spring.datasource.username={username}
--spring.datasource.password={password}
--spring.datasource.driver-class-name=org.postgresql.Driver
--spring.cloud.dataflow.features.streams-enabled=false
--spring.flyway.enabled=true
Spring 3와 Batch 5로 업데이트되면서 주의해야 할 점이 있다. 관련 PR에서 확인할 수 있듯이, Batch 5 버전에서 BATCH_JOB_EXECUTION_PARAMS 테이블 구조가 변경되었기 때문에 SCDF 2.11.x 버전 이전에서는 오류가 발생할 수 있다.
https://github.com/spring-cloud/spring-cloud-dataflow/issues/5152
이 문제를 해결하기 위해 2.11.x 버전이 출시되었으며, 관련 내용을 아래에서 확인할 수 있다.
https://spring.io/blog/2023/09/21/spring-cloud-dataflow-2-11-0-released
따라서, Spring 3과 Batch 5를 사용하는 경우, 해당 문제를 해결하기 위해 SCDF 2.11.x 버전을 사용해야 하며, BOOT3_ 를 TablePrefix로 설정해야 SCDF에서 boot3로 인식하게 된다. 이를 염두에 두고 아래 내용을 살펴보기를 추천한다.
SCDF를 이용한 Batch 모니터링 기능을 구현하기 위해서는 Spring Cloud Task 기반의 Spring Batch로 전환해야 한다.
// Spring Cloud Task
implementation 'org.springframework.cloud:spring-cloud-task-core:3.1.1'
implementation 'org.springframework.cloud:spring-cloud-task-batch:3.1.1'
Spring Batch 프로젝트에 Spring Cloud Task 관련 의존성들을 추가한다.
@SpringBootApplication
@EnableJpaAuditing
@EnableTask
public class AccountBatchApplication {
public static void main(String[] args) {
SpringApplication.run(AccountBatchApplication.class, args);
}
@PostConstruct
public void initializeTimeZone(){
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
}
}
@SpringBootApplication 애노테이션을 통해 Spring Boot 애플리케이션을 설정하고, @EnableTask 애노테이션을 통해 Spring Cloud Task를 활성화한다.
spring:
cloud:
task:
initialize-enabled: false
name: account_server-batch
application.yml 파일에 Spring Cloud Task 설정을 추가하여 Task 이름을 설정하고 초기화를 비활성화한다.
public class CustomTaskConfigurer extends DefaultTaskConfigurer {
private final PlatformTransactionManager metaTransactionManager;
public CustomTaskConfigurer(DataSource dataSource, String tablePrefix, ApplicationContext applicationContext, PlatformTransactionManager metaTransactionManager) {
super(dataSource, tablePrefix, applicationContext);
this.metaTransactionManager = metaTransactionManager;
}
@Override
public PlatformTransactionManager getTransactionManager() {
return metaTransactionManager;
}
}
CustomTaskConfigurer 클래스를 정의하여 기본 Task 설정을 커스터마이징하고, 트랜잭션 매니저를 직접 정의할 수 있도록 한다.
@Configuration
public class TaskConfig {
private final DataSource metaDataSource;
private final PlatformTransactionManager metaTransactionManager;
private final ApplicationContext applicationContext;
public TaskConfig(@Qualifier(META_DATASOURCE)DataSource metaDataSource, @Qualifier(META_TRANSACTION_MANAGER)PlatformTransactionManager metaTransactionManager, ApplicationContext applicationContext) {
this.metaDataSource = metaDataSource;
this.metaTransactionManager = metaTransactionManager;
this.applicationContext = applicationContext;
}
@Bean
public TaskConfigurer taskConfigurer() {
return new CustomTaskConfigurer(metaDataSource, "BOOT3_TASK_", applicationContext, metaTransactionManager);
}
@Bean
public TaskBatchExecutionListener taskBatchExecutionListener() {
return new TaskBatchExecutionListener(new JdbcTaskBatchDao(metaDataSource, "BOOT3_TASK_"));
}
}
TaskConfig 클래스를 정의하여 Task 설정을 구성하고, 커스터마이징한 TaskConfigurer와 TaskBatchExecutionListener를 빈으로 등록한다. 앞에서 언급하였듯이, 테이블 접두사를 "BOOT3TASK"로 설정한다.
@Configuration
public class BatchConfig extends DefaultBatchConfiguration {
private final DataSource mainDataSource;
private final PlatformTransactionManager metaTransactionManager;
public BatchConfig(@Qualifier(DataSourceConfig.META_DATASOURCE) DataSource metaDataSource, @Qualifier(META_TRANSACTION_MANAGER) PlatformTransactionManager metaTransactionManager) {
this.mainDataSource = metaDataSource;
this.metaTransactionManager = metaTransactionManager;
}
@Override
protected ExecutionContextSerializer getExecutionContextSerializer() {
return new Jackson2ExecutionContextStringSerializer();
}
@Override
protected boolean getValidateTransactionState() {
return false;
}
@Override
protected DataSource getDataSource() {
return mainDataSource;
}
@Override
protected PlatformTransactionManager getTransactionManager() {
return metaTransactionManager;
}
@Override
protected String getTablePrefix() {
return "BOOT3_BATCH_";
}
}
BatchConfig 클래스를 정의하여 Spring Batch 설정을 구성하고, 메타 데이터 소스와 트랜잭션 매니저를 설정한다. 또한, 테이블 접두사를 "BOOT3BATCH"로 설정한다.
-- Spring Batch
DROP TABLE IF EXISTS BOOT3_BATCH_STEP_EXECUTION_CONTEXT CASCADE;
DROP TABLE IF EXISTS BOOT3_BATCH_JOB_EXECUTION_CONTEXT CASCADE;
DROP TABLE IF EXISTS BOOT3_BATCH_STEP_EXECUTION CASCADE;
DROP TABLE IF EXISTS BOOT3_BATCH_JOB_EXECUTION_PARAMS CASCADE;
DROP TABLE IF EXISTS BOOT3_BATCH_JOB_EXECUTION CASCADE;
DROP TABLE IF EXISTS BOOT3_BATCH_JOB_INSTANCE CASCADE;
DROP SEQUENCE IF EXISTS BOOT3_BATCH_STEP_EXECUTION_SEQ;
DROP SEQUENCE IF EXISTS BOOT3_BATCH_JOB_EXECUTION_SEQ;
DROP SEQUENCE IF EXISTS BOOT3_BATCH_JOB_SEQ;
CREATE TABLE BOOT3_BATCH_JOB_INSTANCE
(
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY,
VERSION BIGINT,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint BOOT3_JOB_INST_UN unique (JOB_NAME, JOB_KEY)
);
CREATE TABLE BOOT3_BATCH_JOB_EXECUTION
(
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
VERSION BIGINT,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL,
END_TIME TIMESTAMP DEFAULT NULL,
STATUS VARCHAR(10),
EXIT_CODE VARCHAR(2500),
EXIT_MESSAGE VARCHAR(2500),
LAST_UPDATED TIMESTAMP,
constraint BOOT3_JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BOOT3_BATCH_JOB_INSTANCE (JOB_INSTANCE_ID)
);
CREATE TABLE BOOT3_BATCH_JOB_EXECUTION_PARAMS
(
JOB_EXECUTION_ID BIGINT NOT NULL,
PARAMETER_NAME VARCHAR(100) NOT NULL,
PARAMETER_TYPE VARCHAR(100) NOT NULL,
PARAMETER_VALUE VARCHAR(2500),
IDENTIFYING CHAR(1) NOT NULL,
constraint BOOT3_JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BOOT3_BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
);
CREATE TABLE BOOT3_BATCH_STEP_EXECUTION
(
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL,
END_TIME TIMESTAMP DEFAULT NULL,
STATUS VARCHAR(10),
COMMIT_COUNT BIGINT,
READ_COUNT BIGINT,
FILTER_COUNT BIGINT,
WRITE_COUNT BIGINT,
READ_SKIP_COUNT BIGINT,
WRITE_SKIP_COUNT BIGINT,
PROCESS_SKIP_COUNT BIGINT,
ROLLBACK_COUNT BIGINT,
EXIT_CODE VARCHAR(2500),
EXIT_MESSAGE VARCHAR(2500),
LAST_UPDATED TIMESTAMP,
constraint BOOT3_JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BOOT3_BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
);
CREATE TABLE BOOT3_BATCH_STEP_EXECUTION_CONTEXT
(
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT,
constraint BOOT3_STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BOOT3_BATCH_STEP_EXECUTION (STEP_EXECUTION_ID)
);
CREATE TABLE BOOT3_BATCH_JOB_EXECUTION_CONTEXT
(
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT,
constraint BOOT3_JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BOOT3_BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
);
CREATE SEQUENCE BOOT3_BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BOOT3_BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BOOT3_BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
create index BOOT3_BATCH_STEP_EXECUTION_JOB_EXECUTION_ID_IX on BOOT3_BATCH_STEP_EXECUTION(JOB_EXECUTION_ID);
create index BOOT3_BATCH_JOB_EXECUTION_START_TIME_IX on BOOT3_BATCH_JOB_EXECUTION(START_TIME);
-- Spring Cloud Task
DROP TABLE IF EXISTS BOOT3_TASK_EXECUTION CASCADE;
DROP TABLE IF EXISTS BOOT3_TASK_EXECUTION_PARAMS CASCADE;
DROP TABLE IF EXISTS BOOT3_TASK_TASK_BATCH CASCADE;
DROP TABLE IF EXISTS BOOT3_TASK_LOCK CASCADE;
DROP SEQUENCE IF EXISTS BOOT3_TASK_SEQ;
CREATE TABLE BOOT3_TASK_EXECUTION
(
TASK_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
START_TIME TIMESTAMP DEFAULT NULL,
END_TIME TIMESTAMP DEFAULT NULL,
TASK_NAME VARCHAR(100),
EXIT_CODE INTEGER,
EXIT_MESSAGE VARCHAR(2500),
ERROR_MESSAGE VARCHAR(2500),
LAST_UPDATED TIMESTAMP,
EXTERNAL_EXECUTION_ID VARCHAR(255),
PARENT_EXECUTION_ID BIGINT
);
CREATE TABLE BOOT3_TASK_EXECUTION_PARAMS
(
TASK_EXECUTION_ID BIGINT NOT NULL,
TASK_PARAM VARCHAR(2500),
constraint BOOT3_TASK_EXEC_PARAMS_FK foreign key (TASK_EXECUTION_ID)
references BOOT3_TASK_EXECUTION (TASK_EXECUTION_ID)
);
CREATE TABLE BOOT3_TASK_TASK_BATCH
(
TASK_EXECUTION_ID BIGINT NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
constraint BOOT3_TASK_EXEC_BATCH_FK foreign key (TASK_EXECUTION_ID)
references BOOT3_TASK_EXECUTION (TASK_EXECUTION_ID)
);
CREATE TABLE BOOT3_TASK_LOCK
(
LOCK_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint BOOT3_LOCK_PK primary key (LOCK_KEY, REGION)
);
CREATE SEQUENCE BOOT3_TASK_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
create index BOOT3_TASK_TASK_BATCH_JOB_EXECUTION_ID_IX on BOOT3_TASK_TASK_BATCH(JOB_EXECUTION_ID);
Spring Batch와 Spring Cloud Task의 DDL 스크립트를 실행하여 필요한 테이블 및 시퀀스를 생성한다. DDL 스크립트를 참고한 주소는 아래에서 확인할 수 있다.
이러한 설정들을 모두 마친다면 아래와 같이 SCDF를 잘 실행할 수 있다.
앞서 언급한 것처럼, Spring Cloud Task 기반의 Spring Batch로 전환함에 따라 Spring Batch Server 실행 시 Task 실행에 대한 기록이 남는다.
이렇게 실행된 Batch Server는 Quartz를 이용하여 정기적으로 정기결제 또는 회원 탈퇴결제를 실행한다.
사진에서 볼 수 있듯이, 정기결제는 일주일 주기로 실행된다.
사진에서 볼 수 있듯이, 탈퇴결제는 하루 주기로 실행된다.
이렇게 실행된 정기결제 또는 회원탈퇴결제가 결제를 실행한다.
사진에서 볼 수 있듯이, 정기결제가 사용자의 카드 미등록으로 인해 실패한 모습이다. 또한 사용자가 카드를 등록한 후, 재결제 요청을 하였을 때 결제가 완료되었다면 완료로 기록된 모습이다.
이렇게 실행된 결제 Job에 대한 세부 정보를 확인 할 수 있다.
이번 프로젝트를 진행하며, 결제 중복 실행 문제를 방지하고 실행 결과를 모니터링하기 위해 Spring Batch와 SCDF를 사용하였다. 하지만 구현하면서 몇 가지 아쉬운 점이 있었다. 먼저, 결제 로직을 Batch Job으로 구현하여 중복 결제 문제와 모니터링 문제를 해결했지만, 결제를 Batch가 아닌 별도의 로직으로 구현하고, 정기적인 처리를 Spring Batch로 구현한 후 Jenkins를 이용해 스케줄링을 진행하며 모니터링을 수행하는 것이 관리 및 유지보수 측면에서 더 나았을 것이라 생각한다. 또한 SCDF를 단지 모니터링 용도로만 사용한 것도 아쉬움이 남는다. 비록 K8S 환경이 아니기 때문에 스케줄링 기능을 사용할 수 없었지만, 추후 K8S 환경에서는 SCDF를 이용하여 Batch를 관리하고 싶다는 생각이 든다.
다음에 Spring Batch를 사용할 경우, Jenkins 또는 SCDF를 이용하여 Batch를 관리할 것이다. 이번 프로젝트에서는 Batch 서버가 항상 실행되어야 한다는 점과 Jenkins 및 SCDF와 같은 좋은 솔루션을 충분히 활용하지 못한 점에서 아쉬움이 남는다.