Spring Batch와 Telegram API를 활용하여 간단한 알람 시스템 만들기

KIYOUNG KWON·2022년 1월 24일
1

차후 진행할 토이 프로젝트에 Spring Batch의 기능을 활용하면 좋을 것 같아서 간단한 예제를 만들면서 Spring Batch에 대해 공부를 해보려고 한다. 해당 예제는 telegeram의 bot을 사용하여 특정시간 마다 알람을 전송하는 기능을 구현해 보려고 한다.

우선 텔레그램의 봇을 생성해보자.

그림과 같이 텔레그램의 BotFather에게 /newbot 명령어를 사용하여 새로운 봇을 생성할 수 있다. 이름은 대충 지어주자. 생성하고 나면 token을 확인할 수 있는데 이 token을 활용하여 bot으로 메시지를 전달하거나 bot에게 전달된 메시지를 확인하는 것이 가능하다.

우선 BaseUrl은 다음과 같다.

https://api.telegram.org/bot{token}/

사용할 method는 다음 2가지이다.

https://api.telegram.org/bot{token}/getUpdates
https://api.telegram.org/bot{token}/sendmessage

만약 token이 5112580382:AAGcb-nF-UkDf1rcfVyW9kPKkwINzyeWLsc이라고 하면 다음과 같이 request를 전달하면 된다.

GET https://api.telegram.org/bot5112580382:AAGcb-nF-UkDf1rcfVyW9kPKkwINzyeWLsc/getUpdates

더 많은 API들을 확인하고 싶다면 링크에서 확인

우선 bot에게 메시지를 하나날려본다. 그런뒤에 GET /getUpdates를 호출해보자. 그러면 아래와 같이 반환되는 것을 확인할 수 있다.

{
    "ok": true,
    "result": [
        {
            "update_id": 416670593,
            "message": {
                "message_id": 1,
                "from": {
                    "id": 52940587,
                    "is_bot": false,
                    "first_name": "Ki Young",
                    "last_name": "Kwon",
                    "language_code": "ko"
                },
                "chat": {
                    "id": 52940587,
                    "first_name": "Ki Young",
                    "last_name": "Kwon",
                    "type": "private"
                },
                "date": 1642948898,
                "text": "/start",
                "entities": [
                    {
                        "offset": 0,
                        "length": 6,
                        "type": "bot_command"
                    }
                ]
            }
        },
        {
            "update_id": 416670594,
            "message": {
                "message_id": 2,
                "from": {
                    "id": 52940587,
                    "is_bot": false,
                    "first_name": "Ki Young",
                    "last_name": "Kwon",
                    "language_code": "ko"
                },
                "chat": {
                    "id": 52940587,
                    "first_name": "Ki Young",
                    "last_name": "Kwon",
                    "type": "private"
                },
                "date": 1642949217,
                "text": "hi"
            }
        },
        {
            "update_id": 416670595,
            "message": {
                "message_id": 16,
                "from": {
                    "id": 52940587,
                    "is_bot": false,
                    "first_name": "Ki Young",
                    "last_name": "Kwon",
                    "language_code": "ko"
                },
                "chat": {
                    "id": 52940587,
                    "first_name": "Ki Young",
                    "last_name": "Kwon",
                    "type": "private"
                },
                "date": 1642964676,
                "text": "hello bot"
            }
        }
    ]
}

여기서 chat의 id를 확인해두자. 해당 id를 사용하여 bot이 나에게 메시지를 보낼 것 이다. 위를 기준으로 하면 chat의 id는 52940587이다. 그러면 다음과 같이 보내면 된다.

GET https://api.telegram.org/bot5112580382:AAGcb-nF-UkDf1rcfVyW9kPKkwINzyeWLsc/sendmessage?text=hello bot&chat_id=52940587

그럼 bot이 나에게 메시지를 전송한다. 그러면 이제 Spring Batch를 사용해서 특정시간에 나에게 메시지를 보내도록 해주자. 프로젝트에서 사용해볼 라이브러리 구성은 다음과 같다.

  • Spring Boot 2.5.8
  • Spring Boot Batch
  • Spring Boot Quarts
  • FeignClient 3.0.4

FeignClient는 텔레그램의 API를 사용할 때 활용할 것이다. 여기서 주의할 점은 Spring Boot의 버전과 FeignClient의 버전(Spring Cloud)을 맞추어 주지 않으면 동작하지 않는다. boot 2.5.8은 3.0.4와 호환된다. 다른버전을 사용하는 경우 확인이 필요할 것 이다.

gradle을 기준으로 프로젝트를 구성해보도록 하자. 다음 3개를 추가하면 된다.

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation 'org.springframework.boot:spring-boot-starter-quartz'
    implementation 'org.springframework.cloud:spring-cloud-starter-openfeign:3.0.4'
}

Spring Batch의 사용방법은 링크를 참조하여 작업하였다.

우선 위에서 설명한 bot이 메시지를 전달하는 API를 FeignClient를 사용하여 정의해야 한다.

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient(name="telegram", url="https://api.telegram.org")
public interface TelegramAPIClient {

    @GetMapping(value = "/bot{token}/sendmessage")
    Object sendMessage(@PathVariable("token")String token, @RequestParam("text") String text, @RequestParam("chat_id") String chatId);
}

FeignClient의 사용방법은 우리가 Spring MVC 어플리케이션을 개발할 때와 동일하게 작성해주면 된다. 다만 인터페이스이므로 함수의 body는 존재하지 않는다.

token은 path variable, 전달할 메시지의 내용과 채팅방의 id는 url 패러미터로 전달해주면 된다. 사용할 텔레그램 API의 정의가 완료되었으니 job과 tasklet을 정의해보도록 하자.

우선 tasklet을 정의해야 한다. tasklet은 job에서 step의 단위 중 하나이다. 다음과 같이 텔레그램의 API를 호출하는 기능을 한다.

import com.example.kky.feign.TelegramAPIClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

@Slf4j
public class ExampleTasklet implements Tasklet {

    private final TelegramAPIClient telegramAPIClient;

    public ExampleTasklet(TelegramAPIClient telegramAPIClient) {
        this.telegramAPIClient = telegramAPIClient;
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        log.info("send message start");
        telegramAPIClient.sendMessage("5112580382:AAGcb-nF-UkDf1rcfVyW9kPKkwINzyeWLsc","hello bot", "52940587");
        log.info("send message finished");
        return RepeatStatus.FINISHED;
    }
}

Tasklet이라는 인터페이스를 상속받아 execute함수를 구현해주면 된다. job의 step에서 해당함수가 호출될 것이다.

이제 job의 내용을 설정해주어야 한다.

import com.example.kky.feign.TelegramAPIClient;
import com.example.kky.tasklets.ExampleTasklet;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ExampleConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final TelegramAPIClient telegramAPIClient;

    public ExampleConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, TelegramAPIClient telegramAPIClient) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.telegramAPIClient = telegramAPIClient;
    }

    @Bean
    public Job tutorialJob() {
        return jobBuilderFactory.get("exampleJob")
                .start(tutorialStep())  // Step 설정
                .build();
    }

    @Bean
    public Step tutorialStep() {
        return stepBuilderFactory.get("exampleStep")
                .tasklet(new ExampleTasklet(telegramAPIClient))
                .build();
    }
}

job을 정의하는데 필요한 내용을 주입해주고 Spring Bean으로 Step과 Job을 정의해준다. FeignClient의 인터페이스도 Spring의 bean으로 등록되므로 생성자 DI를 통해 주입해서 사용해주도록 하자.

그리고 Spring 어플리케이션의 Main에 다음 annotation을 추가해야 한다.

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableBatchProcessing //추가
@EnableFeignClients //추가
public class KkyApplication {

    public static void main(String[] args) {
        SpringApplication.run(KkyApplication.class, args);
    }

}

아마도 Job과 관련된 bean과 FeignClient의 인터페이스를 bean으로 생성해주는데 필요한 부분이 아닐까 싶다. 실행하기 전에 spring batch관련 테이블을 생성해주어야 한다. application.properties를 다음과 같이 작성해주자.

spring.datasource.username=yourusername
spring.datasource.password=yourpassword
spring.datasource.url=jdbc:mysql://localhost:3306/yourdb
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.jpa.hibernate.ddl-auto=none
spring.sql.init.schema-locations=classpath:schema.sql
spring.sql.init.mode=always

mysql을 기준으로 설정을 진행하였다. schema.sql은 아래와 같다.

DROP TABLE IF EXISTS BATCH_JOB_INSTANCE;
CREATE TABLE BATCH_JOB_INSTANCE
(
    JOB_INSTANCE_ID BIGINT       NOT NULL PRIMARY KEY,
    VERSION         BIGINT,
    JOB_NAME        VARCHAR(100) NOT NULL,
    JOB_KEY         VARCHAR(32)  NOT NULL,
    constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;

DROP TABLE IF EXISTS BATCH_JOB_EXECUTION;
CREATE TABLE BATCH_JOB_EXECUTION
(
    JOB_EXECUTION_ID           BIGINT   NOT NULL PRIMARY KEY,
    VERSION                    BIGINT,
    JOB_INSTANCE_ID            BIGINT   NOT NULL,
    CREATE_TIME                DATETIME NOT NULL,
    START_TIME                 DATETIME DEFAULT NULL,
    END_TIME                   DATETIME DEFAULT NULL,
    STATUS                     VARCHAR(10),
    EXIT_CODE                  VARCHAR(2500),
    EXIT_MESSAGE               VARCHAR(2500),
    LAST_UPDATED               DATETIME,
    JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
    constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
        references BATCH_JOB_INSTANCE (JOB_INSTANCE_ID)
) ENGINE=InnoDB;

DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_PARAMS;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS
(
    JOB_EXECUTION_ID BIGINT       NOT NULL,
    TYPE_CD          VARCHAR(6)   NOT NULL,
    KEY_NAME         VARCHAR(100) NOT NULL,
    STRING_VAL       VARCHAR(250),
    DATE_VAL         DATETIME DEFAULT NULL,
    LONG_VAL         BIGINT,
    DOUBLE_VAL       DOUBLE PRECISION,
    IDENTIFYING      CHAR(1)      NOT NULL,
    constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
        references BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
) ENGINE=InnoDB;

DROP TABLE IF EXISTS BATCH_STEP_EXECUTION;
CREATE TABLE BATCH_STEP_EXECUTION
(
    STEP_EXECUTION_ID  BIGINT       NOT NULL PRIMARY KEY,
    VERSION            BIGINT       NOT NULL,
    STEP_NAME          VARCHAR(100) NOT NULL,
    JOB_EXECUTION_ID   BIGINT       NOT NULL,
    START_TIME         DATETIME     NOT NULL,
    END_TIME           DATETIME DEFAULT NULL,
    STATUS             VARCHAR(10),
    COMMIT_COUNT       BIGINT,
    READ_COUNT         BIGINT,
    FILTER_COUNT       BIGINT,
    WRITE_COUNT        BIGINT,
    READ_SKIP_COUNT    BIGINT,
    WRITE_SKIP_COUNT   BIGINT,
    PROCESS_SKIP_COUNT BIGINT,
    ROLLBACK_COUNT     BIGINT,
    EXIT_CODE          VARCHAR(2500),
    EXIT_MESSAGE       VARCHAR(2500),
    LAST_UPDATED       DATETIME,
    constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
        references BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
) ENGINE=InnoDB;

DROP TABLE IF EXISTS BATCH_STEP_EXECUTION_CONTEXT;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT
(
    STEP_EXECUTION_ID  BIGINT        NOT NULL PRIMARY KEY,
    SHORT_CONTEXT      VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT,
    constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
        references BATCH_STEP_EXECUTION (STEP_EXECUTION_ID)
) ENGINE=InnoDB;

DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_CONTEXT;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT
(
    JOB_EXECUTION_ID   BIGINT        NOT NULL PRIMARY KEY,
    SHORT_CONTEXT      VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT,
    constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
        references BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
) ENGINE=InnoDB;

DROP TABLE IF EXISTS BATCH_STEP_EXECUTION_SEQ;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ
(
    ID         BIGINT  NOT NULL,
    UNIQUE_KEY CHAR(1) NOT NULL,
    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY)
select *
from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
where not exists(select * from BATCH_STEP_EXECUTION_SEQ);

DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_SEQ;
CREATE TABLE BATCH_JOB_EXECUTION_SEQ
(
    ID         BIGINT  NOT NULL,
    UNIQUE_KEY CHAR(1) NOT NULL,
    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY)
select *
from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
where not exists(select * from BATCH_JOB_EXECUTION_SEQ);

DROP TABLE IF EXISTS BATCH_JOB_SEQ;
CREATE TABLE BATCH_JOB_SEQ
(
    ID         BIGINT  NOT NULL,
    UNIQUE_KEY CHAR(1) NOT NULL,
    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY)
select *
from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
where not exists(select * from BATCH_JOB_SEQ);

예제 실행의 편의성을 위해 drop table을 실행하도록 했다. 테이블을 미리 생성해두면 스키마관련 설정은 따로 해주지 않아도 된다. 위의 테이블을 사용해 Job의 수행정보를 관리하고 Step별 진척상황을 확인하는 것으로 보인다. 그리고 실행을 하면 아래와 같이 bot이 메시지를 전달한다.

여기서 Spring 어플리케이션을 한번더 실행하면

이미 실행된 Job이라 실행할 수 없다고 한다. 그렇다면 Job은 Table의 정보를 바탕으로 고유성을 체크한다는 의미이다. parameter가 없는 Job은 다시 실행할 수 없다는 의미가 된다. 만약 동일 job을 한번더 실행하려면 job parameter를 다르게 입력하여 실행해야 한다. 관련 내용은 위 Spring Batch관련 글에 잘나와있으니 참고 바란다.

db diagram

Job의 메타데이터 테이블의 다이어그램이다. 테이블의 구조를 살펴보면

  • 크게 JOB, JOB EXECUTION , JOB STEP EXECUTION 으로 구분
  • 정의 된 BATCH_JOB_INSTANCE 가 존재하고 이를 실행하면 BATCH_JOB_EXECUTION 이 됨
  • 실행 된 BATCH_JOB_EXECUTION 에 대한 Step 별 진행 정보를 BATCH_STEP_EXECUTION 에 저장하고 있는 것으로 보임
  • 동일한 이름의 Job의 실행이 성공했을 경우 JOB Parameter가 달라야 실행 가능

그러면 이제 특정시간마다 실행하도록 변경해보자. Job을 특정시간마다 실행시키는 역할은 Quarts를 사용하여 구현할 것 이다.

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
@RequiredArgsConstructor
public class ExampleScheduler {

    private final Job job;
    private final JobLauncher jobLauncher;

    @Scheduled(cron="0 30 8 * * *")
    public void executeJob () {
        try {
            jobLauncher.run(
                    job,
                    new JobParametersBuilder()
                            .addString("datetime", LocalDateTime.now().toString())
                            .toJobParameters()  // job parameter 설정
            );
        } catch (JobExecutionException ex) {
            System.out.println(ex.getMessage());
            ex.printStackTrace();
        }
    }

}

Job의 경우 bean이 하나뿐이므로 앞에서 정의했던 exampleJob이 주입될 것이다. 그리고 Scheduled 어노테이션을 사용하여 어떤식으로 Job을 실행할지 정의할 수 있다. 여기서 Scheduled 어노테이션에 cron이라는 변수에 값을 넣어 특정시간마다 함수를 실행하도록 지정할 수 있다. 여기서 "0 30 8 * * *"의 의미는 8시30분 마다 해당 함수를 실행하라는 의미이다.

"{초} {분} {시} {일} {달} {년}" <- 이렇게 보면 되고 *은 all의 의미이다. 이제 main함수에 다음을 추가해 주자.

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableBatchProcessing
@EnableFeignClients
@EnableScheduling //추가
public class KkyApplication {

    public static void main(String[] args) {
        SpringApplication.run(KkyApplication.class, args);
    }

}

그리고 application.properties에 다음을 추가해주자.

spring.batch.job.enabled=false # Spring 어플리케이션 시작 시에 Job을 실행하지 않겠다는 의미

그러면 이제 내가 지정한 시간마다 bot이 메시지를 전달해 줄 것이다. 소스코드는 링크에서 확인할 수 있다.

0개의 댓글