RabbitMQ C 예제

백엔드&인프라 추종자·2025년 11월 4일

좋습니다 정우님. 👌
이전 대화에서 RabbitMQ C API로 amqp_login, amqp_channel_open, amqp_get_rpc_reply 등을 사용한 생산자(producer) 코드가 있었죠.
이번엔 그것을 기반으로, 소비자(consumer) 버전으로 확장한 예제를 완성해드리겠습니다.


🧩 개요

이 코드는 다음 역할을 수행합니다.

  1. RabbitMQ 서버 연결
  2. 채널 열기
  3. 큐(task_queue) 선언
  4. 큐로부터 메시지를 계속 소비 (amqp_consume_message)
  5. 메시지 처리 후 ACK 전송
  6. 에러 또는 종료 시 정리 후 종료

🧠 전체 구조

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <amqp.h>
#include <amqp_tcp_socket.h>

#define HOSTNAME "localhost"
#define PORT 5672
#define USERNAME "admin"
#define PASSWORD "admin123"
#define QUEUE_NAME "task_queue"

static void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
    switch (x.reply_type) {
        case AMQP_RESPONSE_NORMAL:
            return;

        case AMQP_RESPONSE_NONE:
            fprintf(stderr, "%s: missing RPC reply type!\n", context);
            break;

        case AMQP_RESPONSE_LIBRARY_EXCEPTION:
            fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
            break;

        case AMQP_RESPONSE_SERVER_EXCEPTION:
            if (x.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
                amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
                fprintf(stderr, "%s: server connection error %uh, message: %.*s\n",
                        context, m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
            } else if (x.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
                amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
                fprintf(stderr, "%s: server channel error %uh, message: %.*s\n",
                        context, m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
            } else {
                fprintf(stderr, "%s: unknown server error\n", context);
            }
            break;
    }

    exit(1);
}

int main(void) {
    int status;
    amqp_connection_state_t conn = amqp_new_connection();
    if (!conn) {
        fprintf(stderr, "Failed to create connection object.\n");
        return 1;
    }

    amqp_socket_t *socket = amqp_tcp_socket_new(conn);
    if (!socket) {
        fprintf(stderr, "Failed to create TCP socket.\n");
        return 1;
    }

    status = amqp_socket_open(socket, HOSTNAME, PORT);
    if (status) {
        fprintf(stderr, "Failed to open socket: %s\n", amqp_error_string2(status));
        return 1;
    }

    die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 60, AMQP_SASL_METHOD_PLAIN, USERNAME, PASSWORD),
                      "Logging in");

    amqp_channel_open(conn, 1);
    die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

    amqp_queue_declare(conn, 1, amqp_cstring_bytes(QUEUE_NAME), 0, 0, 0, 1, amqp_empty_table);
    die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");

    amqp_basic_consume(conn, 1, amqp_cstring_bytes(QUEUE_NAME), amqp_empty_bytes,
                       0, 0, 0, amqp_empty_table);
    die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");

    printf("[*] Waiting for messages from queue '%s'. To exit press CTRL+C\n", QUEUE_NAME);

    for (;;) {
        amqp_envelope_t envelope;
        amqp_maybe_release_buffers(conn);

        amqp_rpc_reply_t res = amqp_consume_message(conn, &envelope, NULL, 0);

        if (res.reply_type != AMQP_RESPONSE_NORMAL) {
            fprintf(stderr, "Error consuming message: %d\n", res.reply_type);
            break;
        }

        printf("[x] Received message: %.*s\n",
               (int)envelope.message.body.len,
               (char *)envelope.message.body.bytes);

        // ✅ 여기서 ffmpegd 변환 로직을 호출할 수 있음.
        // 예: convert_video((char*)envelope.message.body.bytes);

        // 메시지 처리 완료 후 ACK 전송
        amqp_basic_ack(conn, 1, envelope.delivery_tag, 0);

        amqp_destroy_envelope(&envelope);
    }

    die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
    die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
    amqp_destroy_connection(conn);

    return 0;
}

🧩 주요 포인트 정리

구문설명
amqp_queue_declare큐가 존재하지 않으면 생성 (consumer도 선언 가능)
amqp_basic_consume큐 구독(consume) 시작
amqp_consume_message블로킹 수신. 메시지가 오면 반환
amqp_basic_ack메시지 정상 처리 후 ACK 응답
amqp_destroy_envelope수신 메시지 메모리 해제
amqp_maybe_release_buffers내부 버퍼 정리(성능 및 메모리 관리용)

🧠 응용 아이디어 (ffmpegd용)

이 구조 안에서:

  • 수신 메시지: 변환할 presigned download URL
  • ffmpeg 실행 후 결과물 업로드 (presigned upload URL 사용)
  • 처리 성공 시 ACK, 실패 시 NACK 또는 재큐잉
if (convert_video(url) == 0) {
    amqp_basic_ack(conn, 1, envelope.delivery_tag, 0);
} else {
    amqp_basic_nack(conn, 1, envelope.delivery_tag, 0, 1); // requeue = 1
}

원하신다면, 위 코드에
presigned download URL 처리
ffmpeg -i ... -vn -acodec mp3 ... 실행
upload 완료 후 ACK
까지 실제 ffmpegd용 consumer 버전으로 이어서 완성해드릴까요?

profile
AI 답변 글을 주로 올립니다.

0개의 댓글