해커톤 프로젝트 - Spring batch 관련 : SenuriKakaoNotificationJobConfig

Chooooo·2023년 9월 1일
0

TIL

목록 보기
9/28
post-thumbnail

해당 배치는 노인 공고가 업데이트 될 시에 알림 카카오톡(나에게 메세지)를 통해 카톡으로 공고를 알려주는 배치이다.

😎 SenuriKakaoNotificationJobConfig 전체 코드

@Slf4j
//@Configuration
@RequiredArgsConstructor
public class SenuriKakaoNotificationJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager platformTransactionManager;

    private final KakaoMessageService kakaoMessageService;

    private final SenuriServiceDetailRepository senuriRepository;
    private final KakaoMemberFavRepository kakaoFavRepository;

    @Bean
    public Job senuriKakaoNotificationJob(Step fetchAllKakaoMembers, Step sendKakaoNotification) {
        log.info("[SenuriKakaoNotificationJob] Job Launched");

        return new JobBuilder("senuriKakaoNotificationJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(fetchAllKakaoMembers).next(sendKakaoNotification)
                .build();
    }

    @Bean
    @JobScope
    public Step fetchAllKakaoMembers(Tasklet fetchAllKakaoMembersTasklet) {
        return new StepBuilder("fetchAllKakaoMembers", jobRepository)
                .allowStartIfComplete(true)
                .tasklet(fetchAllKakaoMembersTasklet, platformTransactionManager)
                .build();
    }

    @Bean
    @JobScope
    public Step sendKakaoNotification(ItemReader<KakaoMemberAndSenuri> kakaoMemberFavItemReader,
                                      ItemProcessor<KakaoMemberAndSenuri, List<KakaoMessageFeedAndToken>> senuriKakaoNotificationProcessor,
                                      ItemWriter<List<KakaoMessageFeedAndToken>> senuriKakaoNotificationWriter) {
        log.info("[SenuriKakaoNotificationJob] Insert Info Step Launched");

        return new StepBuilder("fetchSenuriServiceDetailStep", jobRepository)
                .allowStartIfComplete(true)
                .<KakaoMemberAndSenuri, List<KakaoMessageFeedAndToken>>chunk(5, platformTransactionManager)
                .reader(kakaoMemberFavItemReader)
                .processor(senuriKakaoNotificationProcessor)
                .writer(senuriKakaoNotificationWriter)
                .build();
    }

    @Bean
    @StepScope
    public ItemReader<KakaoMemberAndSenuri> kakaoMemberFavItemReader(@Value("#{jobExecutionContext['members']}") List<Long> members) {
        return new ItemReader<KakaoMemberAndSenuri>() {

            private final int size = members.size();
            private int index = 0;
            private String today = LocalDate.now().minusDays(10L).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
            //10일 전 기준으로 이후 데이터 공고 메시지 보내지는지 확인.
            @Override
            public KakaoMemberAndSenuri read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                if (index >= size) {
                    return null;
                }

                List<KakaoMemberFav> favs = kakaoFavRepository.findAllByKakaoMemberUserId(members.get(index));
                index++;
                if (favs.isEmpty() || favs.size() == 0){
                    return KakaoMemberAndSenuri.of(null, null);
                }
                KakaoMember member = favs.get(0).getKakaoMember();
                List<SenuriServiceDetailCheck> senuriServiceDetailChecks = favs.stream()
                        .map(f -> senuriRepository.findCreateDayAfterTodayAndCityOrderByToAcptDdAsc(today, f.getAreaName()))
                        .flatMap(Collection::stream)
                        .collect(Collectors.toList());

                return KakaoMemberAndSenuri.of(member, senuriServiceDetailChecks);
            }

        };
    }

    @Bean
    @StepScope
    public ItemProcessor<KakaoMemberAndSenuri, List<KakaoMessageFeedAndToken>> senuriKakaoNotificationProcessor() {

        return item -> {
            if(item.kakaoMember == null){
                return new ArrayList<>();
            }

            String accessToken = item.getKakaoMember().getAccessToken();

            return item.getSenuriServiceDetailCheck().stream()
                    .map(s -> {
                        KakaoMessageFeedRequest request = new KakaoMessageFeedRequest();
                        request.setObject_type("feed");
                        request.updateContent("회원님의 관심 지역에 새로운 구직 정보가 등록됐어요!",
                                "",
                                "https://ifh.cc/g/CG3xKK.png",
                                640,
                                300,
                                "http://www.naver.com",
                                "www.naver.com",
                                "contentId=100",
                                "contentId=100");
                        request.updateItemContent("채용제목", s.getWantedTitle(),
                                "채용공고형태", s.getEmplymShpNm(),
                                "접수방법", s.getAcptMthdCd(),
                                "담장자 연락처", s.getClerkContt());

                        return new KakaoMessageFeedAndToken(accessToken, request);
                    }).collect(Collectors.toList());
        };
    }

    @Bean
    @StepScope
    public ItemWriter<List<KakaoMessageFeedAndToken>> senuriKakaoNotificationWriter() {
        log.info("[SenuriKakaoNotificationWriter] Writer");

        return items -> items.forEach(i ->
                i.stream().filter(ii -> ii.accessToken != null)
                                .forEach(ii -> kakaoMessageService.sendFeedMessage(ii.getAccessToken(), ii.getKakaoMessageFeedRequest())));
    }

    @Getter
    @Setter
    @ToString
    @NoArgsConstructor
    @AllArgsConstructor
    private static class KakaoMemberAndSenuri{

        private KakaoMember kakaoMember;
        private List<SenuriServiceDetailCheck> senuriServiceDetailCheck;

        public static KakaoMemberAndSenuri of(KakaoMember kakaoMember, List<SenuriServiceDetailCheck> senuriServiceDetailCheck) {
            return new KakaoMemberAndSenuri(kakaoMember, senuriServiceDetailCheck);
        }
    }

    @Getter
    @Setter
    @ToString
    @NoArgsConstructor
    @AllArgsConstructor
    private static class KakaoMessageFeedAndToken{

        private String accessToken;
        private KakaoMessageFeedRequest kakaoMessageFeedRequest;

    }


}

해당 배치는 카카오 메시지를 통해 공고 정보를 알리는 작업을 수행한다.

🖤 senuriKakaoNotificationJob 메서드: Job을 정의하는 메서드. JobBuilder를 사용하여 Job을 생성하고, 두 개의 Step(fetchAllKakaoMembers와 sendKakaoNotification)를 순차적으로 실행하도록 설정합니다. 또한 incrementer를 사용하여 Job 파라미터를 증가시키고, 실행이 가능한지 여부에 따라 다음 Step을 지정한다.

🖤 fetchAllKakaoMembers 메서드: 첫 번째 Step인 fetchAllKakaoMembers를 정의하는 메서드. tasklet을 사용하여 작업을 수행하며, platformTransactionManager를 통해 트랜잭션 관리를 합니다. allowStartIfComplete로 스텝이 이미 완료되었을 경우 다시 시작할 수 있도록 설정한다.

🖤 sendKakaoNotification 메서드: 두 번째 Step인 sendKakaoNotification을 정의하는 메서드입니다. 이 Step은 공고 정보를 카카오 메시지로 전송하는 역할을 한다. chunk 크기는 5로 설정되어 한 번에 5개의 아이템을 처리하며, ItemReader, ItemProcessor, ItemWriter를 설정하여 step을 진행. 이 역시 allowStartIfComplete로 스텝이 이미 완료되었을 경우 다시 시작할 수 있도록 설정한다.

🖤 kakaoMemberFavItemReader 메서드: 카카오 멤버와 관련된 정보를 읽어오는 ItemReader를 정의하는 메서드입니다. jobExecutionContext에서 members를 가져와서 사용합니다. 이 멤버들에 대한 정보를 읽어오고 처리하는 역할을 합니다.

🖤senuriKakaoNotificationProcessor 메서드: 읽어온 데이터를 처리하는 ItemProcessor를 정의하는 메서드. 카카오 멤버와 관련된 세누리 서비스 상세 확인 항목들을 이용하여 카카오 메시지를 생성하고, KakaoMessageFeedAndToken 객체로 묶어서 반환.

🖤 senuriKakaoNotificationWriter 메서드: 생성된 카카오 메시지를 전송하는 ItemWriter를 정의하는 메서드입니다. 카카오 메시지를 보내는 작업을 수행하고, kakaoMessageService를 통해 메시지를 전송한다.

이렇게 코드는 Spring Batch를 사용하여 공고 정보를 읽어오고, 카카오 메시지를 통해 멤버들에게 전송하는 작업을 구성하고 있다. 이 작업은 두 개의 스텝으로 구성되며, 첫 번째 스텝에서는 데이터를 읽어오고 두 번째 스텝에서는 읽어온 데이터를 처리하여 카카오 메시지를 전송한다.

  • 코드를 부분 별로 분석해보자.
    reader, processor, writer 를 통해 step의 구성 요건들을 보면서 위로 올라가자.

👻 ItemReader

@Bean
    @StepScope
    public ItemReader<KakaoMemberAndSenuri> kakaoMemberFavItemReader(@Value("#{jobExecutionContext['members']}") List<Long> members) {
        return new ItemReader<KakaoMemberAndSenuri>() {

            private final int size = members.size();
            private int index = 0;
            private String today = LocalDate.now().minusDays(10L).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
            //10일 전 기준으로 이후 데이터 공고 메시지 보내지는지 확인.
            @Override
            public KakaoMemberAndSenuri read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                if (index >= size) {
                    return null;
                }

                List<KakaoMemberFav> favs = kakaoFavRepository.findAllByKakaoMemberUserId(members.get(index));
                index++;
                if (favs.isEmpty() || favs.size() == 0){
                    return KakaoMemberAndSenuri.of(null, null);
                }
                KakaoMember member = favs.get(0).getKakaoMember();
                List<SenuriServiceDetailCheck> senuriServiceDetailChecks = favs.stream()
                        .map(f -> senuriRepository.findCreateDayAfterTodayAndCityOrderByToAcptDdAsc(today, f.getAreaName()))
                        .flatMap(Collection::stream)
                        .collect(Collectors.toList());

                return KakaoMemberAndSenuri.of(member, senuriServiceDetailChecks);
            }

        };
    }

🖤 public ItemReader<KakaoMemberAndSenuri> kakaoMemberFavItemReader(@Value("#{jobExecutionContext['members']}") List<Long> members): 이 메서드는 ItemReader를 생성하는 메서드이다. members라는 파라미터를 받아서 사용한다. 이 파라미터는 JobExecutionContext에서 가져온다.

🖤 size, index, today: 멤버 리스트의 크기, 현재 처리 중인 멤버의 인덱스, 그리고 오늘 날짜로 사용할 문자열을 선언

🖤 read() 메서드: ItemReader 인터페이스를 구현한 실제 읽기 작업을 수행하는 메서드.

  • index >= size인 경우, 더 이상 읽을 데이터가 없으므로 null을 반환하여 읽기를 종료합니다.
    kakaoFavRepository.findAllByKakaoMemberUserId(members.get(index))를 사용하여 특정
    멤버의 즐겨찾기 목록을 가져온다.

index를 증가시키고, 만약 favs가 비어있거나 크기가 0인 경우 KakaoMemberAndSenuri.of(null, null)를 반환합니다.
그렇지 않은 경우, 첫 번째 즐겨찾기 항목의 회원 정보를 가져옵니다. 그리고 해당 회원의 즐겨찾기 항목들을 이용하여 senuriRepository에서 특정 날짜 이후의 세누리 서비스 상세 확인 항목들을 가져와서 리스트로 만듭니다.
마지막으로 KakaoMemberAndSenuri.of(member, senuriServiceDetailChecks)를 사용하여 KakaoMemberAndSenuri 객체를 생성하고 반환합니다.

👻 ItemProcessor

@Bean
    @StepScope
    public ItemProcessor<KakaoMemberAndSenuri, List<KakaoMessageFeedAndToken>> senuriKakaoNotificationProcessor() {

        return item -> {
            if(item.kakaoMember == null){
                return new ArrayList<>();
            }

            String accessToken = item.getKakaoMember().getAccessToken();

            return item.getSenuriServiceDetailCheck().stream()
                    .map(s -> {
                        KakaoMessageFeedRequest request = new KakaoMessageFeedRequest();
                        request.setObject_type("feed");
                        request.updateContent("회원님의 관심 지역에 새로운 구직 정보가 등록됐어요!",
                                "",
                                "https://ifh.cc/g/CG3xKK.png",
                                640,
                                300,
                                "http://www.naver.com",
                                "www.naver.com",
                                "contentId=100",
                                "contentId=100");
                        request.updateItemContent("채용제목", s.getWantedTitle(),
                                "채용공고형태", s.getEmplymShpNm(),
                                "접수방법", s.getAcptMthdCd(),
                                "담장자 연락처", s.getClerkContt());

                        return new KakaoMessageFeedAndToken(accessToken, request);
                    }).collect(Collectors.toList());
        };
    }

🖤 processor에서 카카오 멤버와 관련된 정보를 카카오 메시지로 가공하는 빈을 정의.

🖤 ItemProcessor<KakaoMemberAndSenuri, List<KakaoMessageFeedAndToken>>: ItemProcessor 인터페이스를 구현한 익명 내부 클래스를 반환. 이 ItemProcessor는 KakaoMemberAndSenuri 객체를 입력으로 받아 카카오 메시지로 가공하는 역할을 한다.

👻 ItemWriter

@Bean
    @StepScope
    public ItemWriter<List<KakaoMessageFeedAndToken>> senuriKakaoNotificationWriter() {
        log.info("[SenuriKakaoNotificationWriter] Writer");

        return items -> items.forEach(i ->
                i.stream().filter(ii -> ii.accessToken != null)
                                .forEach(ii -> kakaoMessageService.sendFeedMessage(ii.getAccessToken(), ii.getKakaoMessageFeedRequest())));
    }

🖤 ItemWriter<List<KakaoMessageFeedAndToken>>: ItemWriter 인터페이스를 구현한 익명 내부 클래스를 반환합니다. 이 ItemWriter는 KakaoMessageFeedAndToken 객체 리스트를 받아서 Kakao 메시지를 전송하는 역할을 합니다.

🖤 items -> { ... }: 람다 표현식을 사용하여 ItemWriter의 write 메서드를 구현합니다. 입력으로 받은 items를 처리하고 Kakao 메시지를 전송합니다.

🖤 log.info("[SenuriKakaoNotificationWriter] Writer");: 로그 메시지를 출력하여 이 작업이 수행되고 있음을 알립니다.

🖤 items.forEach(i -> ... ): items 리스트를 순회하면서 아래 내용을 수행합니다.

🖤 i.stream().filter(ii -> ii.accessToken != null): i의 스트림을 생성하고, accessToken이 null이 아닌 경우만 필터링합니다.

🖤 .forEach(ii -> kakaoMessageService.sendFeedMessage(ii.getAccessToken(), ii.getKakaoMessageFeedRequest())): 각각의 KakaoMessageFeedAndToken 객체에 대해 kakaoMessageService를 사용하여 Kakao 메시지를 전송합니다. getAccessToken() 메서드로 액세스 토큰을 가져오고, getKakaoMessageFeedRequest() 메서드로 Kakao 메시지를 가져와서 전송합니다.

이렇게 코드는 입력으로 받은 KakaoMessageFeedAndToken 객체 리스트를 처리하여 Kakao 메시지를 전송합니다. ItemWriter는 Spring Batch의 마지막 단계로, 처리된 데이터를 최종 출력하는 역할을 수행합니다.

profile
back-end, 지속 성장 가능한 개발자를 향하여

0개의 댓글