[Spring Cloud] Spring Batch 구현 및 Spring Cloud Task를 활용한 SCDF Task 배포

이준영·2025년 1월 7일

Spring MSA 프로젝트

목록 보기
12/15
post-thumbnail

개요

사용자 삭제 API 호출 시 상태값을 INACTIVE로 변경하는 soft-delete 기능을 구현하였다.
이후 해당 상태의 사용자를 조회하여 사용자의 이용기록을 삭제해준 뒤 최종적으로 사용자 정보를 완전히 삭제 하는 Batch 프로그램을 만들어 보자.
그리고 이를 SCDF Task로 등록하여 스케줄링 및 모니터링이 가능하도록 설정하려고 한다.

Spring Batch 구현

Batch의 경우 다양한 구현 방법이 있는 데 해당 내용은 아래 링크에서 잘 설명해주고 있다.
https://khj93.tistory.com/entry/Spring-Batch%EB%9E%80-%EC%9D%B4%ED%95%B4%ED%95%98%EA%B3%A0-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0

Reader, Processor, Writer

@Configuration
public class BatchConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;

    public BatchConfig(JobRepository jobRepository,
                       @Qualifier("jhomeTransactionManager") PlatformTransactionManager transactionManager
    ) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    @Bean
    public Job userCleanUpJob(Step userCleanupStep) {
        return new JobBuilder("userCleanUpJob", jobRepository)
                .start(userCleanupStep)
                .build();
    }

    @Bean
    public Step userCleanupStep(ItemReader<UserEntity> userReader,
                            ItemProcessor<UserEntity,UserEntity> userProcessor,
                            ItemWriter<UserEntity> userWriter) {
        return new StepBuilder("userCleanupStep", jobRepository)
                .<UserEntity, UserEntity>chunk(2, transactionManager)
                .reader(userReader)
                .processor(userProcessor)
                .writer(userWriter)
                .build();
    }

    @Bean
    public RepositoryItemReader<UserEntity> userReader(UserRepository userRepository) {
        return new RepositoryItemReaderBuilder<UserEntity>()
                .name("userReader")
                .repository(userRepository)
                .methodName("findByStatus")
                .pageSize(10)
                .arguments(UserStatus.INACTIVE)
                .sorts(Map.of("id", Sort.Direction.ASC))
                .build();
    }

    @Bean
    public ItemProcessor<UserEntity, UserEntity> userProcessor() {
        return item -> item;
    }

    @Bean
    public RepositoryItemWriter<UserEntity> userWriter(UserRepository userRepository) {
        return new RepositoryItemWriterBuilder<UserEntity>()
                .repository(userRepository)
                .methodName("delete")
                .build();
    }
  • userReader : INACTIVE 상태 사용자 조회
  • userProcessor : 사용자 이용 기록 삭제(예정), 현재는 item을 그대로 넘겨줌
  • userWriter : 최종적으로 사용자 데이터 hard-delete

Spring Batch 메타데이터

Spring Batch를 실행하면 연동 된 db에 메타데이터 테이블을 생성하도록 되어 있다.

그런데 spring batch-core 5.x 이후 버전과 이전 버전의 테이블 구조가 다르다.

자세히 보면 batch_job_execution 테이블과 batch_job_execution_params 테이블의 컬럼이 추가되거나 변경된 것을 볼 수 있다.

SCDF Batch 메타데이터 테이블

이를 SCDF에서는 Dataflow가 실행될 때 테이블을 생성하는데 기존 테이블 네이밍을 따르는("BATCH_"로 시작하는) 테이블 번들과 "boot3_"로 시작하는 5.x이상 버전용 테이블 번들 두가지로 생성하고 있다.

그리고 SCDF application 등록 시 boot 버전을 2 또는 3으로 선택하도록하여 각가의 테이블을 사용하도록 하고 있다.

Spring Batch 메타데이터 테이블

필자는 SCDF를 활용한 Batch 및 Task 관리를 하고 싶었기에 SCDF Dataflow의 db를 사용하기로 했다.

CREATE TABLE BATCH_JOB_INSTANCE  (
	JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT ,
	JOB_NAME VARCHAR(100) NOT NULL,
	JOB_KEY VARCHAR(32) NOT NULL,
	constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;

CREATE TABLE BATCH_JOB_EXECUTION  (
	JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT  ,
	JOB_INSTANCE_ID BIGINT NOT NULL,
	CREATE_TIME TIMESTAMP NOT NULL,
	START_TIME TIMESTAMP DEFAULT NULL ,
	END_TIME TIMESTAMP DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED TIMESTAMP,
	constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
	references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
	JOB_EXECUTION_ID BIGINT NOT NULL ,
	PARAMETER_NAME VARCHAR(100) NOT NULL ,
	PARAMETER_TYPE VARCHAR(100) NOT NULL ,
	PARAMETER_VALUE VARCHAR(2500) ,
	IDENTIFYING CHAR(1) NOT NULL ,
	constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION  (
	STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT NOT NULL,
	STEP_NAME VARCHAR(100) NOT NULL,
	JOB_EXECUTION_ID BIGINT NOT NULL,
	CREATE_TIME TIMESTAMP NOT NULL,
	START_TIME TIMESTAMP DEFAULT NULL ,
	END_TIME TIMESTAMP DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	COMMIT_COUNT BIGINT ,
	READ_COUNT BIGINT ,
	FILTER_COUNT BIGINT ,
	WRITE_COUNT BIGINT ,
	READ_SKIP_COUNT BIGINT ,
	WRITE_SKIP_COUNT BIGINT ,
	PROCESS_SKIP_COUNT BIGINT ,
	ROLLBACK_COUNT BIGINT ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED TIMESTAMP,
	constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
	STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT TEXT ,
	constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
	references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
	JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT TEXT ,
	constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;

Spring Batch 5.1.1 버전에서 제공하는 postgresql용 schema.sql 전문이다.
SCDF 테이블 기준으로 "boot3"로 시작하는 테이블 구조를 따르고 있는 것을 볼 수 있다.
하지만 테이블의 이름은 "boot3
"로 시작하지 않는다.

SCDF 테이블들의 기본 테이블("boot3_"로 시작하지 않는 테이블)은 boot 2 버전 테이블 구조를 따르고 있어 BATCH_JOB_EXECUTION_PARAMS 테이블을 참조할 때에 에러가 발생한다.

즉, SCDF Dataflow 서버에서 사용하는 메타데이터 테이블을 사용하기 위해선 "boot3_"로 시작하는 테이블을 사용해야하도록 설정해주어야한다.

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({BatchRegistrar.class, ScopeConfiguration.class, AutomaticJobRegistrarBeanPostProcessor.class, BatchObservabilityBeanPostProcessor.class})
public @interface EnableBatchProcessing {
    boolean modular() default false;

    String dataSourceRef() default "dataSource";

    String databaseType() default "";

    String transactionManagerRef() default "transactionManager";

    String executionContextSerializerRef() default "executionContextSerializer";

    String charset() default "UTF-8";

    String tablePrefix() default "BATCH_";

    int maxVarCharLength() default 2500;

    String incrementerFactoryRef() default "incrementerFactory";

    String jobKeyGeneratorRef() default "jobKeyGenerator";

    String lobHandlerRef() default "lobHandler";

    int clobType() default 2005;

    String isolationLevelForCreate() default "ISOLATION_SERIALIZABLE";

    String taskExecutorRef() default "taskExecutor";

    String conversionServiceRef() default "conversionService";
}

EnableBatchProcessing 인터페이스를 보면 tablePrefix() 라는게 존재한다.

spring:
  batch:
    jdbc:
      table-prefix: BOOT3_BATCH_

그래서 이런 설정을 추가 해주면 table 생성 시 "BATCH"가 아닌 "BOOT3_BATCH"로 시작하는 테이블을 만들고 이를 활용하게 된다.

DataSource 2개 연결하기

이제 Batch 프로그램 작성 및 테스트에는 성공했다.
Task 설정을 시작하기에 앞서 본 Batch 프로그램에서는 두개의 DB를 참조한다.

@Configuration
public class DataSourceConfig {

    @Primary
    @Bean(name = "dataSource")
    @ConfigurationProperties(prefix = "spring.datasource-meta")
    public DataSource dataSource() {
        return DataSourceBuilder.create().build();
    }

    @Primary
    @Bean
    public PlatformTransactionManager transactionManager() {
        return new DataSourceTransactionManager(dataSource());
    }

}
@Configuration
@EnableJpaRepositories(
        basePackages = "com.jhome.user.repository",
        entityManagerFactoryRef = "jhomeEntityManager",
        transactionManagerRef = "jhomeTransactionManager"
)
public class JHomeDataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource-jhome")
    public DataSource jhomeDataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    public LocalContainerEntityManagerFactoryBean jhomeEntityManager() {
        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
        em.setDataSource(jhomeDataSource());
        em.setPackagesToScan("com.jhome.user.domain");
        em. setJpaVendorAdapter(new HibernateJpaVendorAdapter());

        HashMap<String, Object> properties = new HashMap<>();
        properties.put("hibernate.ddl-auto", "update");
        properties.put("hibernate.show-sql", "true");
        em.setJpaPropertyMap(properties);

        return em;
    }

    @Bean
    public PlatformTransactionManager jhomeTransactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(jhomeEntityManager().getObject());
        return transactionManager;
    }

}
  1. Batch 메타데이터용 DB (SCDF Dataflow)
  2. 사용자 정보 등 서비스에서 사용하는 DB

두개의 Database를 연결해주기 위해선 위 코드와 같이 DataSource설정을 해줘야한다.

이렇게 하면 어플리케이션 실행 시 @Primary로 설정 된 DB에 메타 데이터 테이블을 생성 또는 참조하고 BatchConfig에선 @Qualifier("jhomeTransactionManager")로 사용할 DB를 명시해주어 정상적으로 Batch가 동작한다.

Spring Cloud Task

이 Batch 프로그램을 Spring Cloud Task를 활용하여 SCDF에서 배포하는 것이 목표이기 때문에 아래 의존성 추가 및 설정을 진행했다.

implementation 'org.springframework.cloud:spring-cloud-starter-task'

SCDF Task 메타데이터 테이블을 보면,

마찬가지로 "boot3_"로 시작하는 테이블과 일반 테이블로 나뉘지만 테이블 내 컬럼은 동일하다.
Task는 따로 설정 없이 잘 될 것만 같다.

@EnableTask
@Configuration
public class TaskConfig {

    private final DataSource dataSource;

    public TaskConfig(@Qualifier("dataSource") DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Bean
    public TaskConfigurer taskConfigurer() {
        return new DefaultTaskConfigurer(dataSource);
    }
}

@EnableTask로 Task를 활성하해주면 DefaultTaskConfigurer로 자동 설정이 가능하다.
하지만 해당 Batch 프로그램은 두개의 DB를 사용하기 때문에 DefaultTaskConfigurer에 들어갈 DataSource를 명시해줘야한다.

하여 위 TaskConfig를 작성해주고 @Qualifier를 통해 기본 DB를 명시해주었다.

이후 실행하면,

Caused by: java.lang.IllegalArgumentException: Invalid TaskExecution, ID 4 not found

이런 에러나

org.springframework.dao.DataIntegrityViolationException: PreparedStatementCallback; SQL [INSERT INTO BOOT3_TASK_TASK_BATCH (TASK_EXECUTION_ID, JOB_EXECUTION_ID) VALUES (?, ?)]; ERROR: insert or update on table "boot3_task_task_batch" violates foreign key constraint "boot3_exec_batch_fk"
  Detail: Key (task_execution_id)=(9) is not present in table "boot3_task_execution".

이런 에러가 발생한다.

위 Batch에서와 동일하게 task 메타데이터 테이블이 boot2와 boot3가 다르기 때문에 발생하는 문제이다.

spring:
  cloud:
    task:
      table-prefix: BOOT3_TASK_

동일하게 위 설정을 추가해주면 되는데 task의 경우 한가지 더 확인해주어야한다.

public class DefaultTaskConfigurer implements TaskConfigurer {
    private static final Log logger = LogFactory.getLog(DefaultTaskConfigurer.class);
    private TaskRepository taskRepository;
    private TaskExplorer taskExplorer;
    private PlatformTransactionManager transactionManager;
    private DataSource dataSource;
    private ApplicationContext context;

    public DefaultTaskConfigurer() {
        this("TASK_");
    }

    public DefaultTaskConfigurer(DataSource dataSource) {
        this(dataSource, "TASK_", (ApplicationContext)null);
    }

    public DefaultTaskConfigurer(String tablePrefix) {
        this((DataSource)null, tablePrefix, (ApplicationContext)null);
    }

    public DefaultTaskConfigurer(DataSource dataSource, String tablePrefix, ApplicationContext context) {
        this.dataSource = dataSource;
        this.context = context;
        TaskExecutionDaoFactoryBean taskExecutionDaoFactoryBean;
        if (this.dataSource != null) {
            taskExecutionDaoFactoryBean = new TaskExecutionDaoFactoryBean(this.dataSource, tablePrefix);
        } else {
            taskExecutionDaoFactoryBean = new TaskExecutionDaoFactoryBean();
        }

        this.taskRepository = new SimpleTaskRepository(taskExecutionDaoFactoryBean);
        this.taskExplorer = new SimpleTaskExplorer(taskExecutionDaoFactoryBean);
    }

    public TaskRepository getTaskRepository() {
        return this.taskRepository;
    }

    public TaskExplorer getTaskExplorer() {
        return this.taskExplorer;
    }

    public DataSource getTaskDataSource() {
        return this.dataSource;
    }

    public TaskNameResolver getTaskNameResolver() {
        return new SimpleTaskNameResolver();
    }

    public PlatformTransactionManager getTransactionManager() {
        if (this.transactionManager == null) {
            if (this.isDataSourceAvailable()) {
                try {
                    Class.forName("jakarta.persistence.EntityManager");
                    if (this.context != null && this.context.getBeanNamesForType(EntityManager.class).length > 0) {
                        logger.debug("EntityManager was found, using JpaTransactionManager");
                        this.transactionManager = new JpaTransactionManager();
                    }
                } catch (ClassNotFoundException var5) {
                    logger.debug("No EntityManager was found, using DataSourceTransactionManager");
                } finally {
                    if (this.transactionManager == null) {
                        this.transactionManager = new JdbcTransactionManager(this.dataSource);
                    }

                }
            } else {
                logger.debug("No DataSource was found, using ResourcelessTransactionManager");
                this.transactionManager = new ResourcelessTransactionManager();
            }
        }

        return this.transactionManager;
    }

    private boolean isDataSourceAvailable() {
        return this.dataSource != null;
    }
}

TaskConfig에 DefaultTaskConfigurer(DataSource)를 생성하였다.
그럼 tablePrefix 값에 "TASK_"가 자동으로 들어가게 된다.
그래서 DefaultTastConfigurer 부분을 아래와 같이 수정해줘야한다.


@Bean
public TaskConfigurer taskConfigurer() {
    return new DefaultTaskConfigurer(dataSource, "BOOT3_TASK_", null);
}

이제 실행하면 정상적으로 실행되는 것을 볼 수 있다.
SCDF Task 등록에 대한 내용은 아래 글에 작성해두었다.

https://velog.io/@given02/SCDF-Spring-Batch

마치며

Spring Batch에 대한 기초 지식 없이 시작하여 며칠이 걸려 간단한 Batch 프로그램을 만들고 배포해봤다.
SQL 관련 에러로 어려움을 겪었지만 하나씩 코드를 확인해보며 해결할 수 있었다.

이제 사용자 서비스 코드를 작성하며 테스트 코드를 작성하지 않았는데 테스트 코드 작성하며 코드 리팩토링을 한 차례 거친 뒤 다음 배포 환경 구축을 해보려 한다.

참조문서

profile
환영합니다!

0개의 댓글