[공부정리] SCDF를 이용하여 Batch Job 결과 모니터링

jeyong·2024년 6월 4일
0

공부 / 생각 정리  

목록 보기
75/121


이번 게시글에서는 SCDF를 이용하여 Batch Job의 실행 결과에 대한 모니터링 기능을 구현해보겠다. 이번 위치추적모듈 프로젝트에서 결제 기능을 구현하면서 복잡한 정기 결제 와 회원 탈퇴 결제의 로직을 처리하기 위해 Batch Job을 사용하였다. 이에 대한 내용은 [생각정리] Spring Batch의 필요성에 간단히 기록한바 있다.

Batch Server는 항상 실행되며 재결제 요청을 수신하면 Batch Job이 실행되는 구조를 가진다. 이러한 이유로 Jenkins를 이용하여 Batch 작업을 관리하지 못하였다. 또한, K8S 환경이 아니기 때문에 SCDF를 통한 스케줄링 기능을 사용할 수 없어 중복 실행 문제를 방지하며 배치작업의 스케줄링을 위해 Spring Batch와 Quartz를 이용하여 구현하였다.

그러나 Batch 결과가 데이터베이스에만 저장되어 있어 한눈에 파악하기 어려운 문제가 있었다. 이러한 이유로 Batch 작업을 한눈에 모니터링할 수 있는 시스템이 필요했고, 이 글에서는 SCDF를 이용한 Spring Batch의 Job 실행 결과 모니터링 방법을 상세히 설명하고자 한다.

1. SCDF 환경 구축

Spring Cloud Data Flow의 로컬 머신 환경을 구축하기 위해서는 Docker Compose 방식을 추천한다. 아래 공식 문서를 참고하기 바란다.

https://dataflow.spring.io/docs/installation/local/docker/

하지만 Batch Job의 결과를 모니터링하기 위해서는 Skipper를 제외한 Dataflow만으로 충분하다. 그래서 Docker Compose 방식이 아닌 Dataflow의 jar 파일로도 충분하다. 아래 링크에서 필요한 파일을 다운로드할 수 있다.

https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-server/2.11.3/spring-cloud-dataflow-server-2.11.3.jar

다음 명령어를 사용하여 실행할 수 있다.

java -jar spring-cloud-dataflow-server-2.11.3.jar 
--spring.datasource.url=jdbc:postgresql://{데이터베이스 주소} 
--spring.datasource.username={username}
--spring.datasource.password={password} 
--spring.datasource.driver-class-name=org.postgresql.Driver 
--spring.cloud.dataflow.features.streams-enabled=false 
--spring.flyway.enabled=true

Spring 3와 Batch 5로 업데이트되면서 주의해야 할 점이 있다. 관련 PR에서 확인할 수 있듯이, Batch 5 버전에서 BATCH_JOB_EXECUTION_PARAMS 테이블 구조가 변경되었기 때문에 SCDF 2.11.x 버전 이전에서는 오류가 발생할 수 있다.

https://github.com/spring-cloud/spring-cloud-dataflow/issues/5152

이 문제를 해결하기 위해 2.11.x 버전이 출시되었으며, 관련 내용을 아래에서 확인할 수 있다.

https://spring.io/blog/2023/09/21/spring-cloud-dataflow-2-11-0-released

따라서, Spring 3과 Batch 5를 사용하는 경우, 해당 문제를 해결하기 위해 SCDF 2.11.x 버전을 사용해야 하며, BOOT3_ 를 TablePrefix로 설정해야 SCDF에서 boot3로 인식하게 된다. 이를 염두에 두고 아래 내용을 살펴보기를 추천한다.

1-1. Spring Cloud Task 설정

SCDF를 이용한 Batch 모니터링 기능을 구현하기 위해서는 Spring Cloud Task 기반의 Spring Batch로 전환해야 한다.

// Spring Cloud Task
implementation 'org.springframework.cloud:spring-cloud-task-core:3.1.1'
implementation 'org.springframework.cloud:spring-cloud-task-batch:3.1.1'

Spring Batch 프로젝트에 Spring Cloud Task 관련 의존성들을 추가한다.

@SpringBootApplication
@EnableJpaAuditing
@EnableTask
public class AccountBatchApplication {

    public static void main(String[] args) {
        SpringApplication.run(AccountBatchApplication.class, args);
    }

    @PostConstruct
    public void initializeTimeZone(){
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
    }
}

@SpringBootApplication 애노테이션을 통해 Spring Boot 애플리케이션을 설정하고, @EnableTask 애노테이션을 통해 Spring Cloud Task를 활성화한다.

spring:
  cloud:
    task:
      initialize-enabled: false
      name: account_server-batch

application.yml 파일에 Spring Cloud Task 설정을 추가하여 Task 이름을 설정하고 초기화를 비활성화한다.

public class CustomTaskConfigurer extends DefaultTaskConfigurer {

    private final PlatformTransactionManager metaTransactionManager;

    public CustomTaskConfigurer(DataSource dataSource, String tablePrefix, ApplicationContext applicationContext, PlatformTransactionManager metaTransactionManager) {
        super(dataSource, tablePrefix, applicationContext);
        this.metaTransactionManager = metaTransactionManager;
    }

    @Override
    public PlatformTransactionManager getTransactionManager() {
        return metaTransactionManager;
    }

}

CustomTaskConfigurer 클래스를 정의하여 기본 Task 설정을 커스터마이징하고, 트랜잭션 매니저를 직접 정의할 수 있도록 한다.

@Configuration
public class TaskConfig {

    private final DataSource metaDataSource;
    private final PlatformTransactionManager metaTransactionManager;
    private final ApplicationContext applicationContext;

    public TaskConfig(@Qualifier(META_DATASOURCE)DataSource metaDataSource, @Qualifier(META_TRANSACTION_MANAGER)PlatformTransactionManager metaTransactionManager, ApplicationContext applicationContext) {
        this.metaDataSource = metaDataSource;
        this.metaTransactionManager = metaTransactionManager;
        this.applicationContext = applicationContext;
    }

    @Bean
    public TaskConfigurer taskConfigurer() {
        return new CustomTaskConfigurer(metaDataSource, "BOOT3_TASK_", applicationContext, metaTransactionManager);
    }

    @Bean
    public TaskBatchExecutionListener taskBatchExecutionListener() {
        return new TaskBatchExecutionListener(new JdbcTaskBatchDao(metaDataSource, "BOOT3_TASK_"));
    }
}

TaskConfig 클래스를 정의하여 Task 설정을 구성하고, 커스터마이징한 TaskConfigurer와 TaskBatchExecutionListener를 빈으로 등록한다. 앞에서 언급하였듯이, 테이블 접두사를 "BOOT3TASK"로 설정한다.

1-2. Spring Batch 설정

@Configuration
public class BatchConfig extends DefaultBatchConfiguration {

    private final DataSource mainDataSource;
    private final PlatformTransactionManager metaTransactionManager;

    public BatchConfig(@Qualifier(DataSourceConfig.META_DATASOURCE) DataSource metaDataSource, @Qualifier(META_TRANSACTION_MANAGER) PlatformTransactionManager metaTransactionManager) {
        this.mainDataSource = metaDataSource;
        this.metaTransactionManager = metaTransactionManager;
    }

    @Override
    protected ExecutionContextSerializer getExecutionContextSerializer() {
        return new Jackson2ExecutionContextStringSerializer();
    }

    @Override
    protected boolean getValidateTransactionState() {
        return false;
    }

    @Override
    protected DataSource getDataSource() {
        return mainDataSource;
    }

    @Override
    protected PlatformTransactionManager getTransactionManager() {
        return metaTransactionManager;
    }

    @Override
    protected String getTablePrefix() {
        return "BOOT3_BATCH_";
    }
}

BatchConfig 클래스를 정의하여 Spring Batch 설정을 구성하고, 메타 데이터 소스와 트랜잭션 매니저를 설정한다. 또한, 테이블 접두사를 "BOOT3BATCH"로 설정한다.

1-3. DDL

-- Spring Batch
DROP TABLE IF EXISTS BOOT3_BATCH_STEP_EXECUTION_CONTEXT CASCADE;
DROP TABLE IF EXISTS BOOT3_BATCH_JOB_EXECUTION_CONTEXT CASCADE;
DROP TABLE IF EXISTS BOOT3_BATCH_STEP_EXECUTION CASCADE;
DROP TABLE IF EXISTS BOOT3_BATCH_JOB_EXECUTION_PARAMS CASCADE;
DROP TABLE IF EXISTS BOOT3_BATCH_JOB_EXECUTION CASCADE;
DROP TABLE IF EXISTS BOOT3_BATCH_JOB_INSTANCE CASCADE;

DROP SEQUENCE IF EXISTS BOOT3_BATCH_STEP_EXECUTION_SEQ;
DROP SEQUENCE IF EXISTS BOOT3_BATCH_JOB_EXECUTION_SEQ;
DROP SEQUENCE IF EXISTS BOOT3_BATCH_JOB_SEQ;

CREATE TABLE BOOT3_BATCH_JOB_INSTANCE
(
    JOB_INSTANCE_ID BIGINT       NOT NULL PRIMARY KEY,
    VERSION         BIGINT,
    JOB_NAME        VARCHAR(100) NOT NULL,
    JOB_KEY         VARCHAR(32)  NOT NULL,
    constraint BOOT3_JOB_INST_UN unique (JOB_NAME, JOB_KEY)
);

CREATE TABLE BOOT3_BATCH_JOB_EXECUTION
(
    JOB_EXECUTION_ID BIGINT    NOT NULL PRIMARY KEY,
    VERSION          BIGINT,
    JOB_INSTANCE_ID  BIGINT    NOT NULL,
    CREATE_TIME      TIMESTAMP NOT NULL,
    START_TIME       TIMESTAMP DEFAULT NULL,
    END_TIME         TIMESTAMP DEFAULT NULL,
    STATUS           VARCHAR(10),
    EXIT_CODE        VARCHAR(2500),
    EXIT_MESSAGE     VARCHAR(2500),
    LAST_UPDATED     TIMESTAMP,
    constraint BOOT3_JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
        references BOOT3_BATCH_JOB_INSTANCE (JOB_INSTANCE_ID)
);

CREATE TABLE BOOT3_BATCH_JOB_EXECUTION_PARAMS
(
    JOB_EXECUTION_ID BIGINT       NOT NULL,
    PARAMETER_NAME   VARCHAR(100) NOT NULL,
    PARAMETER_TYPE   VARCHAR(100) NOT NULL,
    PARAMETER_VALUE  VARCHAR(2500),
    IDENTIFYING      CHAR(1)      NOT NULL,
    constraint BOOT3_JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
        references BOOT3_BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
);

CREATE TABLE BOOT3_BATCH_STEP_EXECUTION
(
    STEP_EXECUTION_ID  BIGINT       NOT NULL PRIMARY KEY,
    VERSION            BIGINT       NOT NULL,
    STEP_NAME          VARCHAR(100) NOT NULL,
    JOB_EXECUTION_ID   BIGINT       NOT NULL,
    CREATE_TIME        TIMESTAMP    NOT NULL,
    START_TIME         TIMESTAMP DEFAULT NULL,
    END_TIME           TIMESTAMP DEFAULT NULL,
    STATUS             VARCHAR(10),
    COMMIT_COUNT       BIGINT,
    READ_COUNT         BIGINT,
    FILTER_COUNT       BIGINT,
    WRITE_COUNT        BIGINT,
    READ_SKIP_COUNT    BIGINT,
    WRITE_SKIP_COUNT   BIGINT,
    PROCESS_SKIP_COUNT BIGINT,
    ROLLBACK_COUNT     BIGINT,
    EXIT_CODE          VARCHAR(2500),
    EXIT_MESSAGE       VARCHAR(2500),
    LAST_UPDATED       TIMESTAMP,
    constraint BOOT3_JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
        references BOOT3_BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
);

CREATE TABLE BOOT3_BATCH_STEP_EXECUTION_CONTEXT
(
    STEP_EXECUTION_ID  BIGINT        NOT NULL PRIMARY KEY,
    SHORT_CONTEXT      VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT,
    constraint BOOT3_STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
        references BOOT3_BATCH_STEP_EXECUTION (STEP_EXECUTION_ID)
);

CREATE TABLE BOOT3_BATCH_JOB_EXECUTION_CONTEXT
(
    JOB_EXECUTION_ID   BIGINT        NOT NULL PRIMARY KEY,
    SHORT_CONTEXT      VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT,
    constraint BOOT3_JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
        references BOOT3_BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
);

CREATE SEQUENCE BOOT3_BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BOOT3_BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BOOT3_BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;

create index BOOT3_BATCH_STEP_EXECUTION_JOB_EXECUTION_ID_IX on BOOT3_BATCH_STEP_EXECUTION(JOB_EXECUTION_ID);
create index BOOT3_BATCH_JOB_EXECUTION_START_TIME_IX on BOOT3_BATCH_JOB_EXECUTION(START_TIME);

-- Spring Cloud Task
DROP TABLE IF EXISTS BOOT3_TASK_EXECUTION CASCADE;
DROP TABLE IF EXISTS BOOT3_TASK_EXECUTION_PARAMS CASCADE;
DROP TABLE IF EXISTS BOOT3_TASK_TASK_BATCH CASCADE;
DROP TABLE IF EXISTS BOOT3_TASK_LOCK CASCADE;

DROP SEQUENCE IF EXISTS BOOT3_TASK_SEQ;

CREATE TABLE BOOT3_TASK_EXECUTION
(
    TASK_EXECUTION_ID     BIGINT NOT NULL PRIMARY KEY,
    START_TIME            TIMESTAMP DEFAULT NULL,
    END_TIME              TIMESTAMP DEFAULT NULL,
    TASK_NAME             VARCHAR(100),
    EXIT_CODE             INTEGER,
    EXIT_MESSAGE          VARCHAR(2500),
    ERROR_MESSAGE         VARCHAR(2500),
    LAST_UPDATED          TIMESTAMP,
    EXTERNAL_EXECUTION_ID VARCHAR(255),
    PARENT_EXECUTION_ID   BIGINT
);

CREATE TABLE BOOT3_TASK_EXECUTION_PARAMS
(
    TASK_EXECUTION_ID BIGINT NOT NULL,
    TASK_PARAM        VARCHAR(2500),
    constraint BOOT3_TASK_EXEC_PARAMS_FK foreign key (TASK_EXECUTION_ID)
        references BOOT3_TASK_EXECUTION (TASK_EXECUTION_ID)
);

CREATE TABLE BOOT3_TASK_TASK_BATCH
(
    TASK_EXECUTION_ID BIGINT NOT NULL,
    JOB_EXECUTION_ID  BIGINT NOT NULL,
    constraint BOOT3_TASK_EXEC_BATCH_FK foreign key (TASK_EXECUTION_ID)
        references BOOT3_TASK_EXECUTION (TASK_EXECUTION_ID)
);


CREATE TABLE BOOT3_TASK_LOCK
(
    LOCK_KEY     CHAR(36)     NOT NULL,
    REGION       VARCHAR(100) NOT NULL,
    CLIENT_ID    CHAR(36),
    CREATED_DATE TIMESTAMP    NOT NULL,
    constraint BOOT3_LOCK_PK primary key (LOCK_KEY, REGION)
);

CREATE SEQUENCE BOOT3_TASK_SEQ MAXVALUE 9223372036854775807 NO CYCLE;

create index BOOT3_TASK_TASK_BATCH_JOB_EXECUTION_ID_IX on BOOT3_TASK_TASK_BATCH(JOB_EXECUTION_ID);

Spring Batch와 Spring Cloud Task의 DDL 스크립트를 실행하여 필요한 테이블 및 시퀀스를 생성한다. DDL 스크립트를 참고한 주소는 아래에서 확인할 수 있다.

이러한 설정들을 모두 마친다면 아래와 같이 SCDF를 잘 실행할 수 있다.

2. Batch Server 실행 기록

앞서 언급한 것처럼, Spring Cloud Task 기반의 Spring Batch로 전환함에 따라 Spring Batch Server 실행 시 Task 실행에 대한 기록이 남는다.

3. Batch Job 실행 기록

이렇게 실행된 Batch Server는 Quartz를 이용하여 정기적으로 정기결제 또는 회원 탈퇴결제를 실행한다.

사진에서 볼 수 있듯이, 정기결제는 일주일 주기로 실행된다.

사진에서 볼 수 있듯이, 탈퇴결제는 하루 주기로 실행된다.

이렇게 실행된 정기결제 또는 회원탈퇴결제가 결제를 실행한다.

사진에서 볼 수 있듯이, 정기결제가 사용자의 카드 미등록으로 인해 실패한 모습이다. 또한 사용자가 카드를 등록한 후, 재결제 요청을 하였을 때 결제가 완료되었다면 완료로 기록된 모습이다.

이렇게 실행된 결제 Job에 대한 세부 정보를 확인 할 수 있다.

4. SCDF에 대한 생각

이번 프로젝트를 진행하며, 결제 중복 실행 문제를 방지하고 실행 결과를 모니터링하기 위해 Spring Batch와 SCDF를 사용하였다. 하지만 구현하면서 몇 가지 아쉬운 점이 있었다. 먼저, 결제 로직을 Batch Job으로 구현하여 중복 결제 문제와 모니터링 문제를 해결했지만, 결제를 Batch가 아닌 별도의 로직으로 구현하고, 정기적인 처리를 Spring Batch로 구현한 후 Jenkins를 이용해 스케줄링을 진행하며 모니터링을 수행하는 것이 관리 및 유지보수 측면에서 더 나았을 것이라 생각한다. 또한 SCDF를 단지 모니터링 용도로만 사용한 것도 아쉬움이 남는다. 비록 K8S 환경이 아니기 때문에 스케줄링 기능을 사용할 수 없었지만, 추후 K8S 환경에서는 SCDF를 이용하여 Batch를 관리하고 싶다는 생각이 든다.

다음에 Spring Batch를 사용할 경우, Jenkins 또는 SCDF를 이용하여 Batch를 관리할 것이다. 이번 프로젝트에서는 Batch 서버가 항상 실행되어야 한다는 점 때문에 Jenkins 및 SCDF와 같은 좋은 솔루션을 충분히 활용하지 못한 점에서 아쉬움이 남는다.

profile
노를 젓다 보면 언젠가는 물이 들어오겠지.

0개의 댓글