
사용자 삭제 API 호출 시 상태값을 INACTIVE로 변경하는 soft-delete 기능을 구현하였다.
이후 해당 상태의 사용자를 조회하여 사용자의 이용기록을 삭제해준 뒤 최종적으로 사용자 정보를 완전히 삭제 하는 Batch 프로그램을 만들어 보자.
그리고 이를 SCDF Task로 등록하여 스케줄링 및 모니터링이 가능하도록 설정하려고 한다.
Batch의 경우 다양한 구현 방법이 있는 데 해당 내용은 아래 링크에서 잘 설명해주고 있다.
https://khj93.tistory.com/entry/Spring-Batch%EB%9E%80-%EC%9D%B4%ED%95%B4%ED%95%98%EA%B3%A0-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0
@Configuration
public class BatchConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
public BatchConfig(JobRepository jobRepository,
@Qualifier("jhomeTransactionManager") PlatformTransactionManager transactionManager
) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
}
@Bean
public Job userCleanUpJob(Step userCleanupStep) {
return new JobBuilder("userCleanUpJob", jobRepository)
.start(userCleanupStep)
.build();
}
@Bean
public Step userCleanupStep(ItemReader<UserEntity> userReader,
ItemProcessor<UserEntity,UserEntity> userProcessor,
ItemWriter<UserEntity> userWriter) {
return new StepBuilder("userCleanupStep", jobRepository)
.<UserEntity, UserEntity>chunk(2, transactionManager)
.reader(userReader)
.processor(userProcessor)
.writer(userWriter)
.build();
}
@Bean
public RepositoryItemReader<UserEntity> userReader(UserRepository userRepository) {
return new RepositoryItemReaderBuilder<UserEntity>()
.name("userReader")
.repository(userRepository)
.methodName("findByStatus")
.pageSize(10)
.arguments(UserStatus.INACTIVE)
.sorts(Map.of("id", Sort.Direction.ASC))
.build();
}
@Bean
public ItemProcessor<UserEntity, UserEntity> userProcessor() {
return item -> item;
}
@Bean
public RepositoryItemWriter<UserEntity> userWriter(UserRepository userRepository) {
return new RepositoryItemWriterBuilder<UserEntity>()
.repository(userRepository)
.methodName("delete")
.build();
}
Spring Batch를 실행하면 연동 된 db에 메타데이터 테이블을 생성하도록 되어 있다.

그런데 spring batch-core 5.x 이후 버전과 이전 버전의 테이블 구조가 다르다.

자세히 보면 batch_job_execution 테이블과 batch_job_execution_params 테이블의 컬럼이 추가되거나 변경된 것을 볼 수 있다.
이를 SCDF에서는 Dataflow가 실행될 때 테이블을 생성하는데 기존 테이블 네이밍을 따르는("BATCH_"로 시작하는) 테이블 번들과 "boot3_"로 시작하는 5.x이상 버전용 테이블 번들 두가지로 생성하고 있다.
그리고 SCDF application 등록 시 boot 버전을 2 또는 3으로 선택하도록하여 각가의 테이블을 사용하도록 하고 있다.
필자는 SCDF를 활용한 Batch 및 Task 관리를 하고 싶었기에 SCDF Dataflow의 db를 사용하기로 했다.
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
PARAMETER_NAME VARCHAR(100) NOT NULL ,
PARAMETER_TYPE VARCHAR(100) NOT NULL ,
PARAMETER_VALUE VARCHAR(2500) ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
Spring Batch 5.1.1 버전에서 제공하는 postgresql용 schema.sql 전문이다.
SCDF 테이블 기준으로 "boot3"로 시작하는 테이블 구조를 따르고 있는 것을 볼 수 있다.
하지만 테이블의 이름은 "boot3"로 시작하지 않는다.
SCDF 테이블들의 기본 테이블("boot3_"로 시작하지 않는 테이블)은 boot 2 버전 테이블 구조를 따르고 있어 BATCH_JOB_EXECUTION_PARAMS 테이블을 참조할 때에 에러가 발생한다.
즉, SCDF Dataflow 서버에서 사용하는 메타데이터 테이블을 사용하기 위해선 "boot3_"로 시작하는 테이블을 사용해야하도록 설정해주어야한다.
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({BatchRegistrar.class, ScopeConfiguration.class, AutomaticJobRegistrarBeanPostProcessor.class, BatchObservabilityBeanPostProcessor.class})
public @interface EnableBatchProcessing {
boolean modular() default false;
String dataSourceRef() default "dataSource";
String databaseType() default "";
String transactionManagerRef() default "transactionManager";
String executionContextSerializerRef() default "executionContextSerializer";
String charset() default "UTF-8";
String tablePrefix() default "BATCH_";
int maxVarCharLength() default 2500;
String incrementerFactoryRef() default "incrementerFactory";
String jobKeyGeneratorRef() default "jobKeyGenerator";
String lobHandlerRef() default "lobHandler";
int clobType() default 2005;
String isolationLevelForCreate() default "ISOLATION_SERIALIZABLE";
String taskExecutorRef() default "taskExecutor";
String conversionServiceRef() default "conversionService";
}
EnableBatchProcessing 인터페이스를 보면 tablePrefix() 라는게 존재한다.
spring:
batch:
jdbc:
table-prefix: BOOT3_BATCH_
그래서 이런 설정을 추가 해주면 table 생성 시 "BATCH"가 아닌 "BOOT3_BATCH"로 시작하는 테이블을 만들고 이를 활용하게 된다.
이제 Batch 프로그램 작성 및 테스트에는 성공했다.
Task 설정을 시작하기에 앞서 본 Batch 프로그램에서는 두개의 DB를 참조한다.
@Configuration
public class DataSourceConfig {
@Primary
@Bean(name = "dataSource")
@ConfigurationProperties(prefix = "spring.datasource-meta")
public DataSource dataSource() {
return DataSourceBuilder.create().build();
}
@Primary
@Bean
public PlatformTransactionManager transactionManager() {
return new DataSourceTransactionManager(dataSource());
}
}
@Configuration
@EnableJpaRepositories(
basePackages = "com.jhome.user.repository",
entityManagerFactoryRef = "jhomeEntityManager",
transactionManagerRef = "jhomeTransactionManager"
)
public class JHomeDataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource-jhome")
public DataSource jhomeDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public LocalContainerEntityManagerFactoryBean jhomeEntityManager() {
LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
em.setDataSource(jhomeDataSource());
em.setPackagesToScan("com.jhome.user.domain");
em. setJpaVendorAdapter(new HibernateJpaVendorAdapter());
HashMap<String, Object> properties = new HashMap<>();
properties.put("hibernate.ddl-auto", "update");
properties.put("hibernate.show-sql", "true");
em.setJpaPropertyMap(properties);
return em;
}
@Bean
public PlatformTransactionManager jhomeTransactionManager() {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(jhomeEntityManager().getObject());
return transactionManager;
}
}
두개의 Database를 연결해주기 위해선 위 코드와 같이 DataSource설정을 해줘야한다.
이렇게 하면 어플리케이션 실행 시 @Primary로 설정 된 DB에 메타 데이터 테이블을 생성 또는 참조하고 BatchConfig에선 @Qualifier("jhomeTransactionManager")로 사용할 DB를 명시해주어 정상적으로 Batch가 동작한다.
이 Batch 프로그램을 Spring Cloud Task를 활용하여 SCDF에서 배포하는 것이 목표이기 때문에 아래 의존성 추가 및 설정을 진행했다.
implementation 'org.springframework.cloud:spring-cloud-starter-task'
SCDF Task 메타데이터 테이블을 보면,


마찬가지로 "boot3_"로 시작하는 테이블과 일반 테이블로 나뉘지만 테이블 내 컬럼은 동일하다.
Task는 따로 설정 없이 잘 될 것만 같다.
@EnableTask
@Configuration
public class TaskConfig {
private final DataSource dataSource;
public TaskConfig(@Qualifier("dataSource") DataSource dataSource) {
this.dataSource = dataSource;
}
@Bean
public TaskConfigurer taskConfigurer() {
return new DefaultTaskConfigurer(dataSource);
}
}
@EnableTask로 Task를 활성하해주면 DefaultTaskConfigurer로 자동 설정이 가능하다.
하지만 해당 Batch 프로그램은 두개의 DB를 사용하기 때문에 DefaultTaskConfigurer에 들어갈 DataSource를 명시해줘야한다.
하여 위 TaskConfig를 작성해주고 @Qualifier를 통해 기본 DB를 명시해주었다.
이후 실행하면,
Caused by: java.lang.IllegalArgumentException: Invalid TaskExecution, ID 4 not found
이런 에러나
org.springframework.dao.DataIntegrityViolationException: PreparedStatementCallback; SQL [INSERT INTO BOOT3_TASK_TASK_BATCH (TASK_EXECUTION_ID, JOB_EXECUTION_ID) VALUES (?, ?)]; ERROR: insert or update on table "boot3_task_task_batch" violates foreign key constraint "boot3_exec_batch_fk"
Detail: Key (task_execution_id)=(9) is not present in table "boot3_task_execution".
이런 에러가 발생한다.
위 Batch에서와 동일하게 task 메타데이터 테이블이 boot2와 boot3가 다르기 때문에 발생하는 문제이다.
spring:
cloud:
task:
table-prefix: BOOT3_TASK_
동일하게 위 설정을 추가해주면 되는데 task의 경우 한가지 더 확인해주어야한다.
public class DefaultTaskConfigurer implements TaskConfigurer {
private static final Log logger = LogFactory.getLog(DefaultTaskConfigurer.class);
private TaskRepository taskRepository;
private TaskExplorer taskExplorer;
private PlatformTransactionManager transactionManager;
private DataSource dataSource;
private ApplicationContext context;
public DefaultTaskConfigurer() {
this("TASK_");
}
public DefaultTaskConfigurer(DataSource dataSource) {
this(dataSource, "TASK_", (ApplicationContext)null);
}
public DefaultTaskConfigurer(String tablePrefix) {
this((DataSource)null, tablePrefix, (ApplicationContext)null);
}
public DefaultTaskConfigurer(DataSource dataSource, String tablePrefix, ApplicationContext context) {
this.dataSource = dataSource;
this.context = context;
TaskExecutionDaoFactoryBean taskExecutionDaoFactoryBean;
if (this.dataSource != null) {
taskExecutionDaoFactoryBean = new TaskExecutionDaoFactoryBean(this.dataSource, tablePrefix);
} else {
taskExecutionDaoFactoryBean = new TaskExecutionDaoFactoryBean();
}
this.taskRepository = new SimpleTaskRepository(taskExecutionDaoFactoryBean);
this.taskExplorer = new SimpleTaskExplorer(taskExecutionDaoFactoryBean);
}
public TaskRepository getTaskRepository() {
return this.taskRepository;
}
public TaskExplorer getTaskExplorer() {
return this.taskExplorer;
}
public DataSource getTaskDataSource() {
return this.dataSource;
}
public TaskNameResolver getTaskNameResolver() {
return new SimpleTaskNameResolver();
}
public PlatformTransactionManager getTransactionManager() {
if (this.transactionManager == null) {
if (this.isDataSourceAvailable()) {
try {
Class.forName("jakarta.persistence.EntityManager");
if (this.context != null && this.context.getBeanNamesForType(EntityManager.class).length > 0) {
logger.debug("EntityManager was found, using JpaTransactionManager");
this.transactionManager = new JpaTransactionManager();
}
} catch (ClassNotFoundException var5) {
logger.debug("No EntityManager was found, using DataSourceTransactionManager");
} finally {
if (this.transactionManager == null) {
this.transactionManager = new JdbcTransactionManager(this.dataSource);
}
}
} else {
logger.debug("No DataSource was found, using ResourcelessTransactionManager");
this.transactionManager = new ResourcelessTransactionManager();
}
}
return this.transactionManager;
}
private boolean isDataSourceAvailable() {
return this.dataSource != null;
}
}
TaskConfig에 DefaultTaskConfigurer(DataSource)를 생성하였다.
그럼 tablePrefix 값에 "TASK_"가 자동으로 들어가게 된다.
그래서 DefaultTastConfigurer 부분을 아래와 같이 수정해줘야한다.
@Bean
public TaskConfigurer taskConfigurer() {
return new DefaultTaskConfigurer(dataSource, "BOOT3_TASK_", null);
}
이제 실행하면 정상적으로 실행되는 것을 볼 수 있다.
SCDF Task 등록에 대한 내용은 아래 글에 작성해두었다.
https://velog.io/@given02/SCDF-Spring-Batch
Spring Batch에 대한 기초 지식 없이 시작하여 며칠이 걸려 간단한 Batch 프로그램을 만들고 배포해봤다.
SQL 관련 에러로 어려움을 겪었지만 하나씩 코드를 확인해보며 해결할 수 있었다.
이제 사용자 서비스 코드를 작성하며 테스트 코드를 작성하지 않았는데 테스트 코드 작성하며 코드 리팩토링을 한 차례 거친 뒤 다음 배포 환경 구축을 해보려 한다.