차후 진행할 토이 프로젝트에 Spring Batch의 기능을 활용하면 좋을 것 같아서 간단한 예제를 만들면서 Spring Batch에 대해 공부를 해보려고 한다. 해당 예제는 telegeram의 bot을 사용하여 특정시간 마다 알람을 전송하는 기능을 구현해 보려고 한다.
우선 텔레그램의 봇을 생성해보자.
그림과 같이 텔레그램의 BotFather에게 /newbot 명령어를 사용하여 새로운 봇을 생성할 수 있다. 이름은 대충 지어주자. 생성하고 나면 token을 확인할 수 있는데 이 token을 활용하여 bot으로 메시지를 전달하거나 bot에게 전달된 메시지를 확인하는 것이 가능하다.
우선 BaseUrl은 다음과 같다.
https://api.telegram.org/bot{token}/
사용할 method는 다음 2가지이다.
https://api.telegram.org/bot{token}/getUpdates
https://api.telegram.org/bot{token}/sendmessage
만약 token이 5112580382:AAGcb-nF-UkDf1rcfVyW9kPKkwINzyeWLsc이라고 하면 다음과 같이 request를 전달하면 된다.
GET https://api.telegram.org/bot5112580382:AAGcb-nF-UkDf1rcfVyW9kPKkwINzyeWLsc/getUpdates
더 많은 API들을 확인하고 싶다면 링크에서 확인
우선 bot에게 메시지를 하나날려본다. 그런뒤에 GET /getUpdates를 호출해보자. 그러면 아래와 같이 반환되는 것을 확인할 수 있다.
{
"ok": true,
"result": [
{
"update_id": 416670593,
"message": {
"message_id": 1,
"from": {
"id": 52940587,
"is_bot": false,
"first_name": "Ki Young",
"last_name": "Kwon",
"language_code": "ko"
},
"chat": {
"id": 52940587,
"first_name": "Ki Young",
"last_name": "Kwon",
"type": "private"
},
"date": 1642948898,
"text": "/start",
"entities": [
{
"offset": 0,
"length": 6,
"type": "bot_command"
}
]
}
},
{
"update_id": 416670594,
"message": {
"message_id": 2,
"from": {
"id": 52940587,
"is_bot": false,
"first_name": "Ki Young",
"last_name": "Kwon",
"language_code": "ko"
},
"chat": {
"id": 52940587,
"first_name": "Ki Young",
"last_name": "Kwon",
"type": "private"
},
"date": 1642949217,
"text": "hi"
}
},
{
"update_id": 416670595,
"message": {
"message_id": 16,
"from": {
"id": 52940587,
"is_bot": false,
"first_name": "Ki Young",
"last_name": "Kwon",
"language_code": "ko"
},
"chat": {
"id": 52940587,
"first_name": "Ki Young",
"last_name": "Kwon",
"type": "private"
},
"date": 1642964676,
"text": "hello bot"
}
}
]
}
여기서 chat의 id를 확인해두자. 해당 id를 사용하여 bot이 나에게 메시지를 보낼 것 이다. 위를 기준으로 하면 chat의 id는 52940587이다. 그러면 다음과 같이 보내면 된다.
GET https://api.telegram.org/bot5112580382:AAGcb-nF-UkDf1rcfVyW9kPKkwINzyeWLsc/sendmessage?text=hello bot&chat_id=52940587
그럼 bot이 나에게 메시지를 전송한다. 그러면 이제 Spring Batch를 사용해서 특정시간에 나에게 메시지를 보내도록 해주자. 프로젝트에서 사용해볼 라이브러리 구성은 다음과 같다.
FeignClient는 텔레그램의 API를 사용할 때 활용할 것이다. 여기서 주의할 점은 Spring Boot의 버전과 FeignClient의 버전(Spring Cloud)을 맞추어 주지 않으면 동작하지 않는다. boot 2.5.8은 3.0.4와 호환된다. 다른버전을 사용하는 경우 확인이 필요할 것 이다.
gradle을 기준으로 프로젝트를 구성해보도록 하자. 다음 3개를 추가하면 된다.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-quartz'
implementation 'org.springframework.cloud:spring-cloud-starter-openfeign:3.0.4'
}
Spring Batch의 사용방법은 링크를 참조하여 작업하였다.
우선 위에서 설명한 bot이 메시지를 전달하는 API를 FeignClient를 사용하여 정의해야 한다.
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name="telegram", url="https://api.telegram.org")
public interface TelegramAPIClient {
@GetMapping(value = "/bot{token}/sendmessage")
Object sendMessage(@PathVariable("token")String token, @RequestParam("text") String text, @RequestParam("chat_id") String chatId);
}
FeignClient의 사용방법은 우리가 Spring MVC 어플리케이션을 개발할 때와 동일하게 작성해주면 된다. 다만 인터페이스이므로 함수의 body는 존재하지 않는다.
token은 path variable, 전달할 메시지의 내용과 채팅방의 id는 url 패러미터로 전달해주면 된다. 사용할 텔레그램 API의 정의가 완료되었으니 job과 tasklet을 정의해보도록 하자.
우선 tasklet을 정의해야 한다. tasklet은 job에서 step의 단위 중 하나이다. 다음과 같이 텔레그램의 API를 호출하는 기능을 한다.
import com.example.kky.feign.TelegramAPIClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
@Slf4j
public class ExampleTasklet implements Tasklet {
private final TelegramAPIClient telegramAPIClient;
public ExampleTasklet(TelegramAPIClient telegramAPIClient) {
this.telegramAPIClient = telegramAPIClient;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
log.info("send message start");
telegramAPIClient.sendMessage("5112580382:AAGcb-nF-UkDf1rcfVyW9kPKkwINzyeWLsc","hello bot", "52940587");
log.info("send message finished");
return RepeatStatus.FINISHED;
}
}
Tasklet이라는 인터페이스를 상속받아 execute함수를 구현해주면 된다. job의 step에서 해당함수가 호출될 것이다.
이제 job의 내용을 설정해주어야 한다.
import com.example.kky.feign.TelegramAPIClient;
import com.example.kky.tasklets.ExampleTasklet;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ExampleConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final TelegramAPIClient telegramAPIClient;
public ExampleConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, TelegramAPIClient telegramAPIClient) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.telegramAPIClient = telegramAPIClient;
}
@Bean
public Job tutorialJob() {
return jobBuilderFactory.get("exampleJob")
.start(tutorialStep()) // Step 설정
.build();
}
@Bean
public Step tutorialStep() {
return stepBuilderFactory.get("exampleStep")
.tasklet(new ExampleTasklet(telegramAPIClient))
.build();
}
}
job을 정의하는데 필요한 내용을 주입해주고 Spring Bean으로 Step과 Job을 정의해준다. FeignClient의 인터페이스도 Spring의 bean으로 등록되므로 생성자 DI를 통해 주입해서 사용해주도록 하자.
그리고 Spring 어플리케이션의 Main에 다음 annotation을 추가해야 한다.
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableBatchProcessing //추가
@EnableFeignClients //추가
public class KkyApplication {
public static void main(String[] args) {
SpringApplication.run(KkyApplication.class, args);
}
}
아마도 Job과 관련된 bean과 FeignClient의 인터페이스를 bean으로 생성해주는데 필요한 부분이 아닐까 싶다. 실행하기 전에 spring batch관련 테이블을 생성해주어야 한다. application.properties를 다음과 같이 작성해주자.
spring.datasource.username=yourusername
spring.datasource.password=yourpassword
spring.datasource.url=jdbc:mysql://localhost:3306/yourdb
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.jpa.hibernate.ddl-auto=none
spring.sql.init.schema-locations=classpath:schema.sql
spring.sql.init.mode=always
mysql을 기준으로 설정을 진행하였다. schema.sql은 아래와 같다.
DROP TABLE IF EXISTS BATCH_JOB_INSTANCE;
CREATE TABLE BATCH_JOB_INSTANCE
(
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY,
VERSION BIGINT,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
DROP TABLE IF EXISTS BATCH_JOB_EXECUTION;
CREATE TABLE BATCH_JOB_EXECUTION
(
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
VERSION BIGINT,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME NOT NULL,
START_TIME DATETIME DEFAULT NULL,
END_TIME DATETIME DEFAULT NULL,
STATUS VARCHAR(10),
EXIT_CODE VARCHAR(2500),
EXIT_MESSAGE VARCHAR(2500),
LAST_UPDATED DATETIME,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE (JOB_INSTANCE_ID)
) ENGINE=InnoDB;
DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_PARAMS;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS
(
JOB_EXECUTION_ID BIGINT NOT NULL,
TYPE_CD VARCHAR(6) NOT NULL,
KEY_NAME VARCHAR(100) NOT NULL,
STRING_VAL VARCHAR(250),
DATE_VAL DATETIME DEFAULT NULL,
LONG_VAL BIGINT,
DOUBLE_VAL DOUBLE PRECISION,
IDENTIFYING CHAR(1) NOT NULL,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
) ENGINE=InnoDB;
DROP TABLE IF EXISTS BATCH_STEP_EXECUTION;
CREATE TABLE BATCH_STEP_EXECUTION
(
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME DATETIME NOT NULL,
END_TIME DATETIME DEFAULT NULL,
STATUS VARCHAR(10),
COMMIT_COUNT BIGINT,
READ_COUNT BIGINT,
FILTER_COUNT BIGINT,
WRITE_COUNT BIGINT,
READ_SKIP_COUNT BIGINT,
WRITE_SKIP_COUNT BIGINT,
PROCESS_SKIP_COUNT BIGINT,
ROLLBACK_COUNT BIGINT,
EXIT_CODE VARCHAR(2500),
EXIT_MESSAGE VARCHAR(2500),
LAST_UPDATED DATETIME,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
) ENGINE=InnoDB;
DROP TABLE IF EXISTS BATCH_STEP_EXECUTION_CONTEXT;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT
(
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION (STEP_EXECUTION_ID)
) ENGINE=InnoDB;
DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_CONTEXT;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT
(
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
) ENGINE=InnoDB;
DROP TABLE IF EXISTS BATCH_STEP_EXECUTION_SEQ;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ
(
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY)
select *
from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_SEQ;
CREATE TABLE BATCH_JOB_EXECUTION_SEQ
(
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY)
select *
from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
DROP TABLE IF EXISTS BATCH_JOB_SEQ;
CREATE TABLE BATCH_JOB_SEQ
(
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY)
select *
from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
where not exists(select * from BATCH_JOB_SEQ);
예제 실행의 편의성을 위해 drop table을 실행하도록 했다. 테이블을 미리 생성해두면 스키마관련 설정은 따로 해주지 않아도 된다. 위의 테이블을 사용해 Job의 수행정보를 관리하고 Step별 진척상황을 확인하는 것으로 보인다. 그리고 실행을 하면 아래와 같이 bot이 메시지를 전달한다.
여기서 Spring 어플리케이션을 한번더 실행하면
이미 실행된 Job이라 실행할 수 없다고 한다. 그렇다면 Job은 Table의 정보를 바탕으로 고유성을 체크한다는 의미이다. parameter가 없는 Job은 다시 실행할 수 없다는 의미가 된다. 만약 동일 job을 한번더 실행하려면 job parameter를 다르게 입력하여 실행해야 한다. 관련 내용은 위 Spring Batch관련 글에 잘나와있으니 참고 바란다.
Job의 메타데이터 테이블의 다이어그램이다. 테이블의 구조를 살펴보면
그러면 이제 특정시간마다 실행하도록 변경해보자. Job을 특정시간마다 실행시키는 역할은 Quarts를 사용하여 구현할 것 이다.
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
@RequiredArgsConstructor
public class ExampleScheduler {
private final Job job;
private final JobLauncher jobLauncher;
@Scheduled(cron="0 30 8 * * *")
public void executeJob () {
try {
jobLauncher.run(
job,
new JobParametersBuilder()
.addString("datetime", LocalDateTime.now().toString())
.toJobParameters() // job parameter 설정
);
} catch (JobExecutionException ex) {
System.out.println(ex.getMessage());
ex.printStackTrace();
}
}
}
Job의 경우 bean이 하나뿐이므로 앞에서 정의했던 exampleJob이 주입될 것이다. 그리고 Scheduled 어노테이션을 사용하여 어떤식으로 Job을 실행할지 정의할 수 있다. 여기서 Scheduled 어노테이션에 cron이라는 변수에 값을 넣어 특정시간마다 함수를 실행하도록 지정할 수 있다. 여기서 "0 30 8 * * *"의 의미는 8시30분 마다 해당 함수를 실행하라는 의미이다.
"{초} {분} {시} {일} {달} {년}" <- 이렇게 보면 되고 *은 all의 의미이다. 이제 main함수에 다음을 추가해 주자.
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableBatchProcessing
@EnableFeignClients
@EnableScheduling //추가
public class KkyApplication {
public static void main(String[] args) {
SpringApplication.run(KkyApplication.class, args);
}
}
그리고 application.properties에 다음을 추가해주자.
spring.batch.job.enabled=false # Spring 어플리케이션 시작 시에 Job을 실행하지 않겠다는 의미
그러면 이제 내가 지정한 시간마다 bot이 메시지를 전달해 줄 것이다. 소스코드는 링크에서 확인할 수 있다.