Spring Batch 적용기

tinyeye·2023년 2월 6일
1

스프링

목록 보기
6/10

이번 프로젝트를 하면서 Spring Batch를 처음 써봐서 틀리거나 미숙한 점이 있을 수 있습니다!
틀린부분이 있다면 말씀해 주시면 고치겠습니다! :)

1. 사용하려는 이유

일정 시간마다 또는 매일 지정 시간한꺼번에 알림이 가게 하거나, 조건에 맞는 데이터를 삭제하고 싶어서 스프링 배치를 적용하게 되었습니다.

2. 적용 하면서 발생한 오류, 고친 점

스프링 배치에서 생성하는 테이블(메타 데이터 테이블) seq가 제대로 생성되지 않아 오류 발생

  1. 윈도우 기준 shift를 2번 누르면 나오는 검색창에서 "schema-" 를 검색합니다.
  2. 자신이 사용하고 있는 db에 맞는 sql문을 찾아서 들어갑니다(저는 mariadb 사용 중 이므로 mysql 선택).
  3. 해당 sql문을 가지고 하나 하나씩 직접 테이블을 생성하니 실행이 되었습니다.

CREATE TABLE BATCH_JOB_INSTANCE  (
	JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT ,
	JOB_NAME VARCHAR(100) NOT NULL,
	JOB_KEY VARCHAR(32) NOT NULL,
	constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION  (
	JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT  ,
	JOB_INSTANCE_ID BIGINT NOT NULL,
	CREATE_TIME DATETIME(6) NOT NULL,
	START_TIME DATETIME(6) DEFAULT NULL ,
	END_TIME DATETIME(6) DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED DATETIME(6),
	JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
	constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
	references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
	JOB_EXECUTION_ID BIGINT NOT NULL ,
	TYPE_CD VARCHAR(6) NOT NULL ,
	KEY_NAME VARCHAR(100) NOT NULL ,
	STRING_VAL VARCHAR(250) ,
	DATE_VAL DATETIME(6) DEFAULT NULL ,
	LONG_VAL BIGINT ,
	DOUBLE_VAL DOUBLE PRECISION ,
	IDENTIFYING CHAR(1) NOT NULL ,
	constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION  (
	STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT NOT NULL,
	STEP_NAME VARCHAR(100) NOT NULL,
	JOB_EXECUTION_ID BIGINT NOT NULL,
	START_TIME DATETIME(6) NOT NULL ,
	END_TIME DATETIME(6) DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	COMMIT_COUNT BIGINT ,
	READ_COUNT BIGINT ,
	FILTER_COUNT BIGINT ,
	WRITE_COUNT BIGINT ,
	READ_SKIP_COUNT BIGINT ,
	WRITE_SKIP_COUNT BIGINT ,
	PROCESS_SKIP_COUNT BIGINT ,
	ROLLBACK_COUNT BIGINT ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED DATETIME(6),
	constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
	STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT TEXT ,
	constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
	references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
	JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT TEXT ,
	constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
	ID BIGINT NOT NULL,
	UNIQUE_KEY CHAR(1) NOT NULL,
	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);

CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
	ID BIGINT NOT NULL,
	UNIQUE_KEY CHAR(1) NOT NULL,
	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);

CREATE TABLE BATCH_JOB_SEQ (
	ID BIGINT NOT NULL,
	UNIQUE_KEY CHAR(1) NOT NULL,
	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);

JobBuilderFactory, StepBuilderFactory를 사용하려는데 코드 취소선(deprecate) 발생

  • 발생 이유 : springboot 3.0 이상 버전을 사용하면서 batch-core의 버전이 5.0이상으로 올라가 버려서 발생

    스프링부트 버전을 '2.7.8'로 내려서 해결


Job을 2개를 사용했는데 B-Job 실행중 A-Job 의 entity를 가지고와서 casting error 발생

  • 발생 이유 : 복사 붙혀넣기로 인한 @Bean 이름 겹침...
  1. 각각 Job의 reader, writer 메소드 이름을 똑같이 reader로 작성

A-Job의 reader메소드

	@Bean
    @StepScope
    public JpaPagingItemReader<Todo> reader(@Value("#{jobParameters[requestDate]}") String requestDate) {
        *** 중략 ***
    }

B-Job의 reader메소드

	@Bean
    @StepScope
    public JpaPagingItemReader<Todo> reader(@Value("#{jobParameters[requestDate]}") String requestDate) {
        *** 중략 ***
    }
  1. bean이 중복된다는 오류가 떠서 properties에 'spring.main.allow-bean-definition-overriding=true' 추가
  2. 실행은 되지만 A-job의 entity를 가지고와서 B-job에서 처리하려고 해서 casting오류 발생

@Bean 이 붙혀져 있는 reader, writer의 메소드를 각각 다르게(unique) 설정해서 해결


Jpaitemwriter에서는 delete를 지원하지 않아서 구현

JpaItemWriter를 상속 받은 클래스를 하나 만들어서 메소드 오버라이드로 해결

  1. 상속받아서 doWrite부분의 merge()를 remove()로 바꿈
  2. 영속성에 <없는? || 있는지 없는지 모르는?> entity를 삭제하려고 해서 오류(정확하지 않음)
  3. contains로 확인한후에 있다면 entity를 반환하게 해서 삭제

3. 코드

  • TomorrowBatchConfig
@Configuration
@RequiredArgsConstructor
@Log4j2
@EnableBatchProcessing
public class TomorrowBatchConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private final NoticeService noticeService;
    private final TodoService todoService;
    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public Job notificationJob() {
        return jobBuilderFactory.get("tomorrowNotificationJob")
                .start(sendWeekNotification())
                .build();
    }

    @Bean
    @JobScope
    public Step sendWeekNotification() {
        return stepBuilderFactory.get("tomorrowNotificationStep1")
                .<Todo, Notice>chunk(1000)
                .reader(todoReader(null))
                .processor(todoItemProcessor(null))
                .writer(noticeItemWriter(null))
                .build();
    }

    @Bean
    @StepScope
    public JpaPagingItemReader<Todo> todoReader(@Value("#{jobParameters[requestDate]}") String requestDate) {
        String tomorrowDate = todoService.dateToString(LocalDate.now().plusDays(1));

        return new JpaPagingItemReaderBuilder<Todo>()
                .name("jpaPagingItemReader")
                .entityManagerFactory(entityManagerFactory) //DataSource가 아닌 EntityManagerFactory를 통한 접근
                .pageSize(1000)
                .queryString("SELECT t FROM Todo t WHERE finish_date = " + tomorrowDate + " ORDER BY todo_id ASC")  //ORDER 조건은 필수!
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<Todo, Notice> todoItemProcessor(@Value("#{jobParameters[requestDate]}") String requestDate) {
        return new ItemProcessor<Todo, Notice>() {
            @Override
            public Notice process(Todo item) throws Exception {
                Notice notice = noticeService.makeFinishDateTomorrowNotice(item.getMember().getMemberId(), item.getTodo());
                return notice;
            }
        };
    }

    @Bean
    @StepScope
    public JpaItemWriter<Notice> noticeItemWriter(@Value("#{jobParameters[requestDate]}") String requestDate) {
        return new JpaItemWriterBuilder<Notice>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }
}
  • FollowerDeleteBatchConfig
@Configuration
@RequiredArgsConstructor
@Log4j2
@EnableBatchProcessing
public class FollowerDeleteBatchConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final FriendRepository friendRepository;
    private final TodoService todoService;
    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public Job followerDeleteJob() {
        return jobBuilderFactory.get("FollowerDeleteJob")
                .start(followerDeleteStep())
                .build();
    }

    @Bean
    @JobScope
    public Step followerDeleteStep() {
        return stepBuilderFactory.get("FollowerDeleteStep1")
                .<Friend, Friend>chunk(1000)
                .reader(friendReader(null))
                .writer(FriendItemWriter(null))
                .build();
    }

    @Bean
    @StepScope
    public JpaPagingItemReader<Friend> friendReader(@Value("#{jobParameters[requestDate]}") String requestDate) {
        String oneDaysAgo = todoService.dateToString(LocalDate.now().minusDays(1));

        return new JpaPagingItemReaderBuilder<Friend>()
                .name("jpaPagingItemReaderForFD")
                .entityManagerFactory(entityManagerFactory) //DataSource가 아닌 EntityManagerFactory를 통한 접근
                .pageSize(1000)
                .queryString("SELECT f FROM Friend f WHERE follow_status = 'Reject' and mod_date < " + oneDaysAgo + " ORDER BY friend_id ASC")  //ORDER 조건은 필수!
                .build();
    }


    @Bean
    @StepScope
    public JpaItemWriter<Friend> FriendItemWriter(@Value("#{jobParameters[requestDate]}") String requestDate) {
        JpaItemWriterCustom<Friend> writer = new JpaItemWriterCustom<>();
        writer.setEntityManagerFactory(entityManagerFactory);
        writer.setUsePersist(false);

        return writer;
    }

}
  • JpaItemWriterCustom
public class JpaItemWriterCustom<T> extends JpaItemWriter<T> {

    protected static final Log logger = LogFactory.getLog(JpaItemWriter.class);

    private EntityManagerFactory entityManagerFactory;
    private boolean usePersist = false;

    @Override
    protected void doWrite(EntityManager entityManager, List<? extends T> items) {

        if (logger.isDebugEnabled()) {
            logger.debug("Writing to JPA with " + items.size() + " items.");
        }

        if (!items.isEmpty()) {
            long addedToContextCount = 0;
            for (T item : items) {
                if (!entityManager.contains(item)) {
                    if(usePersist) {
                        entityManager.persist(item);
                    }
                    else {
                        entityManager.remove(entityManager.contains(item) ? item : entityManager.merge(item));
                    }
                    addedToContextCount++;
                }
            }
            if (logger.isDebugEnabled()) {
                logger.debug(addedToContextCount + " entities " + (usePersist ? " persisted." : "merged."));
                logger.debug((items.size() - addedToContextCount) + " entities found in persistence context.");
            }
        }

    }
}
  • NotificationScheduler
@Log4j2
@EnableScheduling
@RequiredArgsConstructor
@Component
public class NotificationScheduler {

    private final TomorrowBatchConfig tomorrowBatchConfig;
    private final FollowerDeleteBatchConfig followerDeleteBatchConfig;
    private final JobLauncher jobLauncher;


    @Scheduled(cron = "0 0/1 * * * *")
    public void tomorrowNotificationSchedule() {
        try {
            Map <String, JobParameter> jobParametersMap = new HashMap <> ();

            SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date time = new Date();
            String time1 = format1.format(time);

            jobParametersMap.put("requestDate", new JobParameter(time1));

            JobParameters parameters = new JobParameters(jobParametersMap);

            JobExecution jobExecution = jobLauncher.run(tomorrowBatchConfig.notificationJob(), parameters);

        } catch (JobExecutionAlreadyRunningException e) {
            e.printStackTrace();
        } catch (JobRestartException e) {
            e.printStackTrace();
        } catch (JobInstanceAlreadyCompleteException e) {
            e.printStackTrace();
        } catch (JobParametersInvalidException e) {
            e.printStackTrace();
        }
    }

    // 초 분 시 날짜 월 요일
    @Scheduled(cron = "30 */1 * * * *")
    public void followerDeleteSchedule() {
        try {
            Map <String, JobParameter> jobParametersMap = new HashMap <> ();

            SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date time = new Date();
            String time1 = format1.format(time);

            jobParametersMap.put("requestDate", new JobParameter(time1));

            JobParameters parameters = new JobParameters(jobParametersMap);

            JobExecution jobExecution = jobLauncher.run(followerDeleteBatchConfig.followerDeleteJob(), parameters);


        } catch (JobExecutionAlreadyRunningException e) {
            e.printStackTrace();
        } catch (JobRestartException e) {
            e.printStackTrace();
        } catch (JobInstanceAlreadyCompleteException e) {
            e.printStackTrace();
        } catch (JobParametersInvalidException e) {
            e.printStackTrace();
        }
    }
}
  • application.properties
# spring batch
spring.batch.jdbc.initialize-schema = always
spring.batch.job.enabled = false
profile
백엔드 개발자를 노리며!

0개의 댓글