이번 프로젝트를 하면서 Spring Batch를 처음 써봐서 틀리거나 미숙한 점이 있을 수 있습니다!
틀린부분이 있다면 말씀해 주시면 고치겠습니다! :)
일정 시간마다 또는 매일 지정 시간에 한꺼번에 알림이 가게 하거나, 조건에 맞는 데이터를 삭제하고 싶어서 스프링 배치를 적용하게 되었습니다.
- 윈도우 기준 shift를 2번 누르면 나오는 검색창에서 "schema-" 를 검색합니다.
- 자신이 사용하고 있는 db에 맞는 sql문을 찾아서 들어갑니다(저는 mariadb 사용 중 이므로 mysql 선택).
- 해당 sql문을 가지고 하나 하나씩 직접 테이블을 생성하니 실행이 되었습니다.
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME(6) NOT NULL,
START_TIME DATETIME(6) DEFAULT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL DATETIME(6) DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME DATETIME(6) NOT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
스프링부트 버전을 '2.7.8'로 내려서 해결
A-Job의 reader메소드
@Bean
@StepScope
public JpaPagingItemReader<Todo> reader(@Value("#{jobParameters[requestDate]}") String requestDate) {
*** 중략 ***
}
B-Job의 reader메소드
@Bean
@StepScope
public JpaPagingItemReader<Todo> reader(@Value("#{jobParameters[requestDate]}") String requestDate) {
*** 중략 ***
}
@Bean 이 붙혀져 있는 reader, writer의 메소드를 각각 다르게(unique) 설정해서 해결
JpaItemWriter를 상속 받은 클래스를 하나 만들어서 메소드 오버라이드로 해결
@Configuration
@RequiredArgsConstructor
@Log4j2
@EnableBatchProcessing
public class TomorrowBatchConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final NoticeService noticeService;
private final TodoService todoService;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job notificationJob() {
return jobBuilderFactory.get("tomorrowNotificationJob")
.start(sendWeekNotification())
.build();
}
@Bean
@JobScope
public Step sendWeekNotification() {
return stepBuilderFactory.get("tomorrowNotificationStep1")
.<Todo, Notice>chunk(1000)
.reader(todoReader(null))
.processor(todoItemProcessor(null))
.writer(noticeItemWriter(null))
.build();
}
@Bean
@StepScope
public JpaPagingItemReader<Todo> todoReader(@Value("#{jobParameters[requestDate]}") String requestDate) {
String tomorrowDate = todoService.dateToString(LocalDate.now().plusDays(1));
return new JpaPagingItemReaderBuilder<Todo>()
.name("jpaPagingItemReader")
.entityManagerFactory(entityManagerFactory) //DataSource가 아닌 EntityManagerFactory를 통한 접근
.pageSize(1000)
.queryString("SELECT t FROM Todo t WHERE finish_date = " + tomorrowDate + " ORDER BY todo_id ASC") //ORDER 조건은 필수!
.build();
}
@Bean
@StepScope
public ItemProcessor<Todo, Notice> todoItemProcessor(@Value("#{jobParameters[requestDate]}") String requestDate) {
return new ItemProcessor<Todo, Notice>() {
@Override
public Notice process(Todo item) throws Exception {
Notice notice = noticeService.makeFinishDateTomorrowNotice(item.getMember().getMemberId(), item.getTodo());
return notice;
}
};
}
@Bean
@StepScope
public JpaItemWriter<Notice> noticeItemWriter(@Value("#{jobParameters[requestDate]}") String requestDate) {
return new JpaItemWriterBuilder<Notice>()
.entityManagerFactory(entityManagerFactory)
.build();
}
}
@Configuration
@RequiredArgsConstructor
@Log4j2
@EnableBatchProcessing
public class FollowerDeleteBatchConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final FriendRepository friendRepository;
private final TodoService todoService;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job followerDeleteJob() {
return jobBuilderFactory.get("FollowerDeleteJob")
.start(followerDeleteStep())
.build();
}
@Bean
@JobScope
public Step followerDeleteStep() {
return stepBuilderFactory.get("FollowerDeleteStep1")
.<Friend, Friend>chunk(1000)
.reader(friendReader(null))
.writer(FriendItemWriter(null))
.build();
}
@Bean
@StepScope
public JpaPagingItemReader<Friend> friendReader(@Value("#{jobParameters[requestDate]}") String requestDate) {
String oneDaysAgo = todoService.dateToString(LocalDate.now().minusDays(1));
return new JpaPagingItemReaderBuilder<Friend>()
.name("jpaPagingItemReaderForFD")
.entityManagerFactory(entityManagerFactory) //DataSource가 아닌 EntityManagerFactory를 통한 접근
.pageSize(1000)
.queryString("SELECT f FROM Friend f WHERE follow_status = 'Reject' and mod_date < " + oneDaysAgo + " ORDER BY friend_id ASC") //ORDER 조건은 필수!
.build();
}
@Bean
@StepScope
public JpaItemWriter<Friend> FriendItemWriter(@Value("#{jobParameters[requestDate]}") String requestDate) {
JpaItemWriterCustom<Friend> writer = new JpaItemWriterCustom<>();
writer.setEntityManagerFactory(entityManagerFactory);
writer.setUsePersist(false);
return writer;
}
}
public class JpaItemWriterCustom<T> extends JpaItemWriter<T> {
protected static final Log logger = LogFactory.getLog(JpaItemWriter.class);
private EntityManagerFactory entityManagerFactory;
private boolean usePersist = false;
@Override
protected void doWrite(EntityManager entityManager, List<? extends T> items) {
if (logger.isDebugEnabled()) {
logger.debug("Writing to JPA with " + items.size() + " items.");
}
if (!items.isEmpty()) {
long addedToContextCount = 0;
for (T item : items) {
if (!entityManager.contains(item)) {
if(usePersist) {
entityManager.persist(item);
}
else {
entityManager.remove(entityManager.contains(item) ? item : entityManager.merge(item));
}
addedToContextCount++;
}
}
if (logger.isDebugEnabled()) {
logger.debug(addedToContextCount + " entities " + (usePersist ? " persisted." : "merged."));
logger.debug((items.size() - addedToContextCount) + " entities found in persistence context.");
}
}
}
}
@Log4j2
@EnableScheduling
@RequiredArgsConstructor
@Component
public class NotificationScheduler {
private final TomorrowBatchConfig tomorrowBatchConfig;
private final FollowerDeleteBatchConfig followerDeleteBatchConfig;
private final JobLauncher jobLauncher;
@Scheduled(cron = "0 0/1 * * * *")
public void tomorrowNotificationSchedule() {
try {
Map <String, JobParameter> jobParametersMap = new HashMap <> ();
SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date time = new Date();
String time1 = format1.format(time);
jobParametersMap.put("requestDate", new JobParameter(time1));
JobParameters parameters = new JobParameters(jobParametersMap);
JobExecution jobExecution = jobLauncher.run(tomorrowBatchConfig.notificationJob(), parameters);
} catch (JobExecutionAlreadyRunningException e) {
e.printStackTrace();
} catch (JobRestartException e) {
e.printStackTrace();
} catch (JobInstanceAlreadyCompleteException e) {
e.printStackTrace();
} catch (JobParametersInvalidException e) {
e.printStackTrace();
}
}
// 초 분 시 날짜 월 요일
@Scheduled(cron = "30 */1 * * * *")
public void followerDeleteSchedule() {
try {
Map <String, JobParameter> jobParametersMap = new HashMap <> ();
SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date time = new Date();
String time1 = format1.format(time);
jobParametersMap.put("requestDate", new JobParameter(time1));
JobParameters parameters = new JobParameters(jobParametersMap);
JobExecution jobExecution = jobLauncher.run(followerDeleteBatchConfig.followerDeleteJob(), parameters);
} catch (JobExecutionAlreadyRunningException e) {
e.printStackTrace();
} catch (JobRestartException e) {
e.printStackTrace();
} catch (JobInstanceAlreadyCompleteException e) {
e.printStackTrace();
} catch (JobParametersInvalidException e) {
e.printStackTrace();
}
}
}
# spring batch
spring.batch.jdbc.initialize-schema = always
spring.batch.job.enabled = false