좋습니다 정우님. 👌
이전 대화에서 RabbitMQ C API로 amqp_login, amqp_channel_open, amqp_get_rpc_reply 등을 사용한 생산자(producer) 코드가 있었죠.
이번엔 그것을 기반으로, 소비자(consumer) 버전으로 확장한 예제를 완성해드리겠습니다.
이 코드는 다음 역할을 수행합니다.
task_queue) 선언amqp_consume_message)#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#define HOSTNAME "localhost"
#define PORT 5672
#define USERNAME "admin"
#define PASSWORD "admin123"
#define QUEUE_NAME "task_queue"
static void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
fprintf(stderr, "%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
if (x.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
fprintf(stderr, "%s: server connection error %uh, message: %.*s\n",
context, m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
} else if (x.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
fprintf(stderr, "%s: server channel error %uh, message: %.*s\n",
context, m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
} else {
fprintf(stderr, "%s: unknown server error\n", context);
}
break;
}
exit(1);
}
int main(void) {
int status;
amqp_connection_state_t conn = amqp_new_connection();
if (!conn) {
fprintf(stderr, "Failed to create connection object.\n");
return 1;
}
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
if (!socket) {
fprintf(stderr, "Failed to create TCP socket.\n");
return 1;
}
status = amqp_socket_open(socket, HOSTNAME, PORT);
if (status) {
fprintf(stderr, "Failed to open socket: %s\n", amqp_error_string2(status));
return 1;
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 60, AMQP_SASL_METHOD_PLAIN, USERNAME, PASSWORD),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_queue_declare(conn, 1, amqp_cstring_bytes(QUEUE_NAME), 0, 0, 0, 1, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
amqp_basic_consume(conn, 1, amqp_cstring_bytes(QUEUE_NAME), amqp_empty_bytes,
0, 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
printf("[*] Waiting for messages from queue '%s'. To exit press CTRL+C\n", QUEUE_NAME);
for (;;) {
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
amqp_rpc_reply_t res = amqp_consume_message(conn, &envelope, NULL, 0);
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
fprintf(stderr, "Error consuming message: %d\n", res.reply_type);
break;
}
printf("[x] Received message: %.*s\n",
(int)envelope.message.body.len,
(char *)envelope.message.body.bytes);
// ✅ 여기서 ffmpegd 변환 로직을 호출할 수 있음.
// 예: convert_video((char*)envelope.message.body.bytes);
// 메시지 처리 완료 후 ACK 전송
amqp_basic_ack(conn, 1, envelope.delivery_tag, 0);
amqp_destroy_envelope(&envelope);
}
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
amqp_destroy_connection(conn);
return 0;
}
| 구문 | 설명 |
|---|---|
amqp_queue_declare | 큐가 존재하지 않으면 생성 (consumer도 선언 가능) |
amqp_basic_consume | 큐 구독(consume) 시작 |
amqp_consume_message | 블로킹 수신. 메시지가 오면 반환 |
amqp_basic_ack | 메시지 정상 처리 후 ACK 응답 |
amqp_destroy_envelope | 수신 메시지 메모리 해제 |
amqp_maybe_release_buffers | 내부 버퍼 정리(성능 및 메모리 관리용) |
이 구조 안에서:
if (convert_video(url) == 0) {
amqp_basic_ack(conn, 1, envelope.delivery_tag, 0);
} else {
amqp_basic_nack(conn, 1, envelope.delivery_tag, 0, 1); // requeue = 1
}
원하신다면, 위 코드에
✅ presigned download URL 처리
✅ ffmpeg -i ... -vn -acodec mp3 ... 실행
✅ upload 완료 후 ACK
까지 실제 ffmpegd용 consumer 버전으로 이어서 완성해드릴까요?