Spring AMQP

calis_ws·2023년 8월 7일
0

Spring AMQP

Spring AMQP : Spring에서 AMQP기반 메시징 애플리케이션을 개발할 수 있도록 Spring의 개념을 적용한 라이브러리

AMQP(Advanced Message Queuing Protocol) : MQ(Message Queuing)기반의 프로토콜

동작 원리

Exchange가 Producer로부터 메시지를 받고 Queue에 전달한다. Queue는 Consumer에게 메시지를 전달한다.

  • Producer(Publisher) : 메시지를 보내는 곳
  • Consumer(Subscriber) : 메시지를 받는 곳
  • Exchange : Producer로부터 메시지를 수신하는 곳. 수신한 메시지를 큐에 분배한다.
  • Queue : 메시지를 저장하는 곳. 저장했다가 Consumer에게 전달한다.
  • Binding : Exchange와 Queue의 mapping. 1:1 또는 1:N

사용하는 이유

백엔드 서버를 기능별(사용자 인증 처리, 서비스 사용 이력 저장 등)로 나누게 된다면, 상황에 따라 서버들 끼리 연락을 주고받아야 하는 상황이 발생할 수 있다.

이때 우리가 배운 HTTP 통신을 이용해도 되지만

  • HTTP 통신은 요청을 보내면 이후 응답을 받을때까지 요청을 보낸 서버의 수행이 중단된다.
  • 여러 서버에 동시에 정보를 전달하기 어렵다.
  • 어디에 요청을 보내야 하는지를 정확히 알고 있어야 하기 때문에 결합성이 증가한다.

라는 단점이 있다.

이 상황에서 Message Broker 미들웨어를 활용해 메시지를 보내는 서버에서 비동기식으로 메시지로 보내면 메시지를 받는 서버가 메시지를 정상적으로 받았는지를 확인할 필요가 없어지고

  • 서버와 서버 사이의 결합성(의존성)이 감소한다.
  • 확장성을 확보한다.
  • 비동기 통신을 기반으로 한 다양한 기능을 구현하는데 도움을 준다.

라는 장점이 있어 Spring AMQP를 사용한다.

Spring AMQP를 사용하면 서버와 서버 사이의 결합성이 감소하고 확장성을 확보한다.

RabbitMQ

AMQP를 구현한 오픈소스 Message Broker이다. producers에서 consumers로 메세지(요청)를 전달할 때 중간에서 브로커 역할을 한다.

Job Queue

  • 메세지를 주고 받는 방식을 정의한 디자인 패턴 (messaging pattern) 의 일종
  • 한 서버에서 처리를 필요로 하는 작업을 생성하고, Message Broker를 통해 Queue 형태로 보관하여 대기시킨다. Queue에 있는 작업을 다른 서버에서 꺼내서 처리한다.

장점

  • 비동기식 처리 : 서버가 Queue에 작업을 등록하면, 그 작업이 완료되기를 기다릴 필요가 없다.

  • 분산 작업 처리 : 작업을 처리하는 서버가 여러개가 존재하면 한번에 여러개씩 처리가 가능하여 작업시간을 줄일 수 있다.

  • 작업 우선순위 관리 : 작업 큐는 작업에 우선순위를 부여하여 처리 순서를 관리할 수 있다.

  • 오류 처리와 재시도 : 작업을 처리하는 서버가 일시적으로 오류로 인해 작업을 실패하면 해당 작업은 큐에 다시 등록되어 재시도될 수 있다.

Job Queue는 분산 시스템에서 확장성과 유연성을 강화하고, 작업 처리를 효율적으로 관리하기 위한 중요한 디자인 패턴이다.

Spring Boot AMQP를 활용한 Job Queue

오래 걸리는 작업(특정 파일을 변환하는 등)을 가정하고, 해당 작업을 여러 서버에서 처리할 수 있도록 한다.

  1. 작업을 생산하는 Producer와 작업을 처리하는 Consumer가 있다.
  2. Producer의 HTTP 요청을 통해 오래걸리는 작업을 Queue에 등록하고, DB에 기록한다.
  3. Consumer은 Queue에서 작업을 꺼내 처리한다.
  4. Consumer은 작업이 완료되면 DB에 해당 작업이 완료되었음(DONE)을 기록한다.

Producer

Message Queue에 메세지를 등록하는 역할을 하는 경우 일반적으로 Producer라고 한다.

producer 프로젝트

application.yaml username, password 수정

CloudAMQP

User & Vhost & PW 기입

Entity, DTO 생성

생성한 작업을 기록하기 위한 JobEntity
Entity를 다루는 Repository
JobRequest 는 작업을 생성하는 요청에 대한 DTO
JobPayload 는 작업을 처리하는 서버에 데이터를 전달하기 위한 DTO
JobPayload 는 작업을 처리하는 서버에 데이터를 전달하기 위한 DTO
JobStatus 는 작업의 상태를 확인하기 위한 DTO

ProducerConfig

@Configuration
public class ProducerConfig {
@Bean
public Queue queue(){
return new Queue("boot.amqp.worker-queue", true, false, true);
	}
}

ProducerService

@Slf4j
@Service
@RequiredArgsConstructor
public class ProducerService {
    // RabbitMQ에 메세지를 적재하기위한 클래스
    private final RabbitTemplate rabbitTemplate;
    // Config에 정의한 처리할 작업정보가 대기하는 Queue
    private final Queue jobQueue;

    private final JobRepository jobRepository;
    // 객체를 쉽게 JSON 문자열로 직렬화 해주는 라이브러리
    private final Gson gson;

    // filename을 인자로 받고
    // filename을 바탕으로 JSON으로 직렬화된 작업 정보를
    // Queue에 적재한 뒤
    // 사용자에게 JobTicket을 반환하는 메소드
    public JobTicket send(String filename) {
        // jobId 발행
        String jobId = UUID.randomUUID().toString();
        JobTicket jobTicket = new JobTicket(jobId);

        // JobPayload 생성 (Consumer가 확인하는 데이터)
        JobPayload payload = new JobPayload(
                jobId,
                filename,
                String.format("/media/user-uploaded/raw/%s", filename)
        );

        // JobEntity로 작업 내역 입력 기록
        JobEntity jobEntity = new JobEntity();
        jobEntity.setJobId(jobId);
        jobEntity.setStatus("IDLE");
        jobRepository.save(jobEntity);

        // Message Broker에게 메세지 전달
        rabbitTemplate.convertAndSend(
                // 어떤 Queue에 적재할지에 대한 이름
                jobQueue.getName(),
                // 메세지로 보낼 문자열
                gson.toJson(payload)
        );
        // 사용자에게 추후 확인용 JobTicket 전달
        log.info("Sent Job: {}", jobTicket.getJobId());
        return jobTicket;
    }
}

ProducerController

@RequiredArgsConstructor
@RestController
public class ProducerController {
    private final ProducerService producerService;

    @PostMapping("/make-job")
    // 사용자는 Make Job을 통해 처리하고 싶은 파일의 정보를 전달
    public JobTicket makeJob(
            @RequestBody
            JobRequest request) {
        // 추후 작업의 처리 상태를 판단하기 위한 jobId를 담고 있는
        // JobTicket 객체 반환
        return producerService.send(request.getFilename());
    }
}

Queue에 메세지 적재 확인

POST test

DB

IDLE 상태의 작업이 쌓인다.

Queue 1 증가

RabbitMQ

Consumer

Queue에서 작업을 꺼내는 역할을 하는 의미에서 Consumer, 작업을 진행해서 Worker라고 부르기도 한다.

consumer 프로젝트

application.yaml none 수정

JobEntity, JobRepository, JobPayload 생성

ConsumerConfig

@Configuration
public class ConsumerConfig {
    @Bean
    public Queue queue(){
        return new Queue(
                //이름: Producer와 Consumer가 같은 Queue를 사용하기 위해 작성하는 식별자
                "boot.amqp.worker-queue",
                //durable : 서버(Producer)가 종료된 후에도 Queue가 유지될지
                true,
                //exclusive : 지금 이 서버만 큐를 사용할 수 있는지
                false,
                //autoDelete : 사용되고 있지 않은 큐를 자동으로 삭제할 것인지
                true
        );
    }
}

ConsumerService

@Slf4j
@Service
@RabbitListener(queues = "boot.amqp.worker-queue")
@RequiredArgsConstructor
public class ConsumerService {
    private final JobRepository jobRepository;
    private final Gson gson;

    @RabbitHandler
    // message에 Queue에 담겼던 문자열이 전달된다.
    public void receive(String message) throws InterruptedException {
        // Message Broker로 부터 온 데이터 역직렬화
        JobPayload newJob
                = gson.fromJson(message, JobPayload.class);
        String jobId = newJob.getJobId();
        log.info("Received Job: {}", jobId);
        // Entity 검색
        Optional<JobEntity> optionalJob = jobRepository.findByJobId(jobId);
        // TODO 예외처리를 해줘야 마땅하나 잠시 생략

        // 요청을 처리상태로 업데이트
        JobEntity jobEntity = optionalJob.get();
        jobEntity.setStatus("PROCESSING");
        jobEntity = jobRepository.save(jobEntity);

        log.info("Start Processing Job: {}", jobId);
        // 처리하는데 시간이 5초 걸린다고 치자
        TimeUnit.SECONDS.sleep(5);

        // 요청을 완료 상태로 업데이트
        jobEntity.setStatus("DONE");
        jobEntity.setResultPath(
                String.format("/media/user-uploaded/processed/%s", newJob.getFilename())
        );
        jobRepository.save(jobEntity);
        // 기록
        log.info("Finished Job: {}", jobId);
    }
}

중복 실행 설정

producer DB

status가 바뀐다.

출처 : 멋사 5기 백엔드 위키 14팀 오후2시

profile
반갑습니다람지

2개의 댓글

comment-user-thumbnail
2023년 8월 7일

정리가 잘 된 글이네요. 도움이 됐습니다.

1개의 답글