Spring AMQP : Spring에서 AMQP기반 메시징 애플리케이션을 개발할 수 있도록 Spring의 개념을 적용한 라이브러리
AMQP(Advanced Message Queuing Protocol) : MQ(Message Queuing)기반의 프로토콜
Exchange가 Producer로부터 메시지를 받고 Queue에 전달한다. Queue는 Consumer에게 메시지를 전달한다.
Producer(Publisher)
: 메시지를 보내는 곳Consumer(Subscriber)
: 메시지를 받는 곳Exchange
: Producer로부터 메시지를 수신하는 곳. 수신한 메시지를 큐에 분배한다.Queue
: 메시지를 저장하는 곳. 저장했다가 Consumer에게 전달한다.Binding
: Exchange와 Queue의 mapping. 1:1 또는 1:N백엔드 서버를 기능별(사용자 인증 처리, 서비스 사용 이력 저장 등)로 나누게 된다면, 상황에 따라 서버들 끼리 연락을 주고받아야 하는 상황이 발생할 수 있다.
이때 우리가 배운 HTTP 통신을 이용해도 되지만
라는 단점이 있다.
이 상황에서 Message Broker 미들웨어를 활용해 메시지를 보내는 서버에서 비동기식으로 메시지로 보내면 메시지를 받는 서버가 메시지를 정상적으로 받았는지를 확인할 필요가 없어지고
라는 장점이 있어 Spring AMQP를 사용한다.
Spring AMQP를 사용하면 서버와 서버 사이의 결합성이 감소하고 확장성을 확보한다.
AMQP를 구현한 오픈소스 Message Broker이다. producers에서 consumers로 메세지(요청)를 전달할 때 중간에서 브로커 역할을 한다.
비동기식 처리 : 서버가 Queue에 작업을 등록하면, 그 작업이 완료되기를 기다릴 필요가 없다.
분산 작업 처리 : 작업을 처리하는 서버가 여러개가 존재하면 한번에 여러개씩 처리가 가능하여 작업시간을 줄일 수 있다.
작업 우선순위 관리 : 작업 큐는 작업에 우선순위를 부여하여 처리 순서를 관리할 수 있다.
오류 처리와 재시도 : 작업을 처리하는 서버가 일시적으로 오류로 인해 작업을 실패하면 해당 작업은 큐에 다시 등록되어 재시도될 수 있다.
Job Queue는 분산 시스템에서 확장성과 유연성을 강화하고, 작업 처리를 효율적으로 관리하기 위한 중요한 디자인 패턴이다.
오래 걸리는 작업(특정 파일을 변환하는 등)을 가정하고, 해당 작업을 여러 서버에서 처리할 수 있도록 한다.
Producer
와 작업을 처리하는 Consumer
가 있다.Producer
의 HTTP 요청을 통해 오래걸리는 작업을 Queue에 등록하고, DB에 기록한다.Consumer
은 Queue에서 작업을 꺼내 처리한다.Consumer
은 작업이 완료되면 DB에 해당 작업이 완료되었음(DONE)을 기록한다.Message Queue에 메세지를 등록하는 역할을 하는 경우 일반적으로 Producer라고 한다.
CloudAMQP
User & Vhost & PW 기입
생성한 작업을 기록하기 위한 JobEntity
Entity를 다루는 Repository
JobRequest
는 작업을 생성하는 요청에 대한 DTO
JobPayload
는 작업을 처리하는 서버에 데이터를 전달하기 위한 DTO
JobPayload
는 작업을 처리하는 서버에 데이터를 전달하기 위한 DTO
JobStatus
는 작업의 상태를 확인하기 위한 DTO
@Configuration
public class ProducerConfig {
@Bean
public Queue queue(){
return new Queue("boot.amqp.worker-queue", true, false, true);
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class ProducerService {
// RabbitMQ에 메세지를 적재하기위한 클래스
private final RabbitTemplate rabbitTemplate;
// Config에 정의한 처리할 작업정보가 대기하는 Queue
private final Queue jobQueue;
private final JobRepository jobRepository;
// 객체를 쉽게 JSON 문자열로 직렬화 해주는 라이브러리
private final Gson gson;
// filename을 인자로 받고
// filename을 바탕으로 JSON으로 직렬화된 작업 정보를
// Queue에 적재한 뒤
// 사용자에게 JobTicket을 반환하는 메소드
public JobTicket send(String filename) {
// jobId 발행
String jobId = UUID.randomUUID().toString();
JobTicket jobTicket = new JobTicket(jobId);
// JobPayload 생성 (Consumer가 확인하는 데이터)
JobPayload payload = new JobPayload(
jobId,
filename,
String.format("/media/user-uploaded/raw/%s", filename)
);
// JobEntity로 작업 내역 입력 기록
JobEntity jobEntity = new JobEntity();
jobEntity.setJobId(jobId);
jobEntity.setStatus("IDLE");
jobRepository.save(jobEntity);
// Message Broker에게 메세지 전달
rabbitTemplate.convertAndSend(
// 어떤 Queue에 적재할지에 대한 이름
jobQueue.getName(),
// 메세지로 보낼 문자열
gson.toJson(payload)
);
// 사용자에게 추후 확인용 JobTicket 전달
log.info("Sent Job: {}", jobTicket.getJobId());
return jobTicket;
}
}
@RequiredArgsConstructor
@RestController
public class ProducerController {
private final ProducerService producerService;
@PostMapping("/make-job")
// 사용자는 Make Job을 통해 처리하고 싶은 파일의 정보를 전달
public JobTicket makeJob(
@RequestBody
JobRequest request) {
// 추후 작업의 처리 상태를 판단하기 위한 jobId를 담고 있는
// JobTicket 객체 반환
return producerService.send(request.getFilename());
}
}
POST test
IDLE
상태의 작업이 쌓인다.
Queue에서 작업을 꺼내는 역할을 하는 의미에서 Consumer, 작업을 진행해서 Worker라고 부르기도 한다.
@Configuration
public class ConsumerConfig {
@Bean
public Queue queue(){
return new Queue(
//이름: Producer와 Consumer가 같은 Queue를 사용하기 위해 작성하는 식별자
"boot.amqp.worker-queue",
//durable : 서버(Producer)가 종료된 후에도 Queue가 유지될지
true,
//exclusive : 지금 이 서버만 큐를 사용할 수 있는지
false,
//autoDelete : 사용되고 있지 않은 큐를 자동으로 삭제할 것인지
true
);
}
}
@Slf4j
@Service
@RabbitListener(queues = "boot.amqp.worker-queue")
@RequiredArgsConstructor
public class ConsumerService {
private final JobRepository jobRepository;
private final Gson gson;
@RabbitHandler
// message에 Queue에 담겼던 문자열이 전달된다.
public void receive(String message) throws InterruptedException {
// Message Broker로 부터 온 데이터 역직렬화
JobPayload newJob
= gson.fromJson(message, JobPayload.class);
String jobId = newJob.getJobId();
log.info("Received Job: {}", jobId);
// Entity 검색
Optional<JobEntity> optionalJob = jobRepository.findByJobId(jobId);
// TODO 예외처리를 해줘야 마땅하나 잠시 생략
// 요청을 처리상태로 업데이트
JobEntity jobEntity = optionalJob.get();
jobEntity.setStatus("PROCESSING");
jobEntity = jobRepository.save(jobEntity);
log.info("Start Processing Job: {}", jobId);
// 처리하는데 시간이 5초 걸린다고 치자
TimeUnit.SECONDS.sleep(5);
// 요청을 완료 상태로 업데이트
jobEntity.setStatus("DONE");
jobEntity.setResultPath(
String.format("/media/user-uploaded/processed/%s", newJob.getFilename())
);
jobRepository.save(jobEntity);
// 기록
log.info("Finished Job: {}", jobId);
}
}
status가 바뀐다.
출처 : 멋사 5기 백엔드 위키 14팀 오후2시
정리가 잘 된 글이네요. 도움이 됐습니다.