Week 4 Day 2: Queue 송수신

학습 목표

  • xQueueSend()xQueueReceive()의 Blocking과 Timeout 동작 원리를 정확히 이해한다
  • Queue가 가득 찼거나 비어있을 때 각 API가 어떻게 반응하는지 시나리오별로 파악한다
  • pdMS_TO_TICKS() 변환과 portMAX_DELAY의 차이를 이해하고 상황에 맞게 선택한다
  • 실제 센서 데이터를 Queue로 전달하는 생산자-소비자 시스템을 구현한다

Day 16에서 Queue의 기본 구조와 생성 방법, API 개요를 학습했습니다. 이번 시간에는 xQueueSend()와 xQueueReceive()의 Blocking 동작을 중심으로 실제 시스템에서 자주 마주치는 포화(Full)와 공백(Empty) 상황의 처리 방법을 다룹니다.


1. Blocking 동작 원리

1.1 Task가 차단되는 시점

xQueueSend()는 Queue가 가득 찼을 때, xQueueReceive()는 Queue가 비어있을 때 xTicksToWait 매개변수에 지정된 시간만큼 호출한 Task를 Blocked 상태로 전환합니다. 차단된 Task는 CPU를 소비하지 않으며, FreeRTOS 스케줄러가 Queue 상태가 변할 때까지 다른 Task를 실행합니다.

[xQueueReceive() Blocking 흐름]

Consumer Task (우선순위 2)
  xQueueReceive(q, &data, pdMS_TO_TICKS(500))
       │
   Queue 비어있음?
       │ YES
       ↓
  Blocked 상태 진입 (최대 500ms 대기)
  CPU → 다른 Task (Producer 등) 실행
       │
   Producer가 xQueueSend()로 데이터 추가
       │
   FreeRTOS: Consumer를 Ready 상태로 전환
       │
   Consumer 스케줄링 → 데이터 수신 → pdTRUE 반환
       │
   500ms 내에 데이터가 오지 않으면
       ↓
  pdFALSE 반환 (타임아웃)

Blocked 상태의 Task는 CPU를 전혀 소비하지 않습니다. while(1)로 Queue를 폴링하는 구현 대신 항상 Blocking 방식을 사용하여 다른 Task에 CPU를 양보해야 합니다.

1.2 xTicksToWait 설정 기준

/*
 * xTicksToWait 옵션 비교
 *
 *  0                  : 비차단(Non-blocking), Queue 상태 무관하게 즉시 반환
 *                       ISR이 아닌 Task에서도 사용 가능
 *                       반환값으로 성공/실패 판단 후 직접 처리
 *
 *  pdMS_TO_TICKS(N)   : N 밀리초 동안 대기 후 실패 시 pdFALSE 반환
 *                       주기적 작업이 있거나 타임아웃 감지가 필요한 경우
 *
 *  portMAX_DELAY      : Queue 조건이 충족될 때까지 무한 대기
 *                       타임아웃 처리가 필요 없는 데이터 스트림 처리에 적합
 *                       반환값이 항상 pdTRUE (인터럽트나 vTaskDelete로만 종료)
 *
 * pdMS_TO_TICKS(N) 내부 동작:
 *   FreeRTOSConfig.h의 configTICK_RATE_HZ = 1000 이면 1 tick = 1ms
 *   configTICK_RATE_HZ = 100 이면 1 tick = 10ms
 *   pdMS_TO_TICKS(100) = (100 * configTICK_RATE_HZ) / 1000
 */

#include "FreeRTOS.h"
#include "queue.h"
#include <stdio.h>

static QueueHandle_t xDemoQueue;

void vNonBlockingProducer(void *pvParameters)
{
    uint32_t data = 0;

    while (1)
    {
        data++;

        /* 0: 즉시 반환 — 큐가 꽉 찼으면 데이터를 버리고 계속 진행 */
        BaseType_t result = xQueueSend(xDemoQueue, &data, 0);
        if (result != pdTRUE)
        {
            printf("[P] 큐 포화, 드롭: %lu\n", data);
        }

        vTaskDelay(pdMS_TO_TICKS(10));
    }
}

void vTimeoutConsumer(void *pvParameters)
{
    uint32_t received;

    while (1)
    {
        /* 200ms 동안 기다린 후 없으면 타임아웃 처리 */
        BaseType_t result = xQueueReceive(xDemoQueue, &received, pdMS_TO_TICKS(200));

        if (result == pdTRUE)
        {
            printf("[C] 수신: %lu\n", received);
        }
        else
        {
            /* 타임아웃: 200ms 동안 데이터 없음 → 워치독 갱신 또는 경고 */
            printf("[C] 타임아웃, 큐 비어있음\n");
        }
    }
}

2. Queue 포화(Full) 처리

2.1 포화 시나리오별 대응 전략

생산자가 소비자보다 빠를 때 Queue가 포화됩니다. 시스템의 요구사항에 따라 세 가지 전략 중 하나를 선택합니다.

전략 1: 대기 후 재전송 (손실 허용 불가)

#include "FreeRTOS.h"
#include "queue.h"
#include <stdio.h>

/*
 * 센서 경보 데이터처럼 손실이 허용되지 않는 경우
 * 최대 1초까지 대기하되, 1초 후에도 실패하면 오류 처리
 */

typedef struct
{
    uint8_t  sensor_id;
    uint16_t raw_value;
    uint8_t  alarm_level;   /* 0=정상, 1=경고, 2=위험 */
} AlarmMsg_t;

static QueueHandle_t xAlarmQueue;

void vAlarmProducerTask(void *pvParameters)
{
    AlarmMsg_t msg;
    msg.sensor_id   = 1;
    msg.alarm_level = 2;   /* 위험 경보 */

    while (1)
    {
        msg.raw_value = (uint16_t)(HAL_ADC_GetValue(&hadc1));

        /* 최대 1초 대기, 그래도 안 되면 오류 처리 */
        BaseType_t result = xQueueSend(xAlarmQueue, &msg, pdMS_TO_TICKS(1000));

        if (result != pdTRUE)
        {
            /* 큐 포화가 1초 이상 지속 → 소비자 Task 이상 의심 */
            printf("[ALARM] 큐 포화 오류! 소비자 상태 확인 필요\n");
            /* 필요 시 소비자 Task 재시작 또는 시스템 리셋 */
        }

        vTaskDelay(pdMS_TO_TICKS(100));
    }
}

전략 2: 드롭 후 통계 기록 (손실 허용, 연속 스트림)

/*
 * ADC 파형 데이터처럼 최신 값이 더 중요하고 일부 손실은 허용되는 경우
 * 큐가 꽉 찼으면 즉시 포기하고 드롭 카운터를 증가시켜 나중에 분석
 */

static uint32_t g_drop_count = 0;

void vStreamProducerTask(void *pvParameters)
{
    uint16_t sample;

    while (1)
    {
        sample = (uint16_t)(HAL_ADC_GetValue(&hadc1));

        if (xQueueSend(xStreamQueue, &sample, 0) != pdTRUE)
        {
            g_drop_count++;
        }

        vTaskDelay(pdMS_TO_TICKS(1));   /* 1kHz 샘플링 */
    }
}

/* 1초마다 드롭 통계 출력 */
void vStatsTask(void *pvParameters)
{
    uint32_t last_drop = 0;

    while (1)
    {
        vTaskDelay(pdMS_TO_TICKS(1000));

        uint32_t current_drop = g_drop_count;
        printf("[Stats] 드롭: %lu/s\n", current_drop - last_drop);
        last_drop = current_drop;
    }
}

전략 3: 가장 오래된 항목 제거 후 삽입 (오버라이트 모드)

/*
 * 가장 최신 상태만 중요한 경우(예: 표시용 센서 값)
 * FreeRTOS에는 내장 오버라이트 모드가 없으므로 직접 구현합니다.
 * Queue가 꽉 찼으면 가장 오래된 항목(FIFO 앞)을 버리고 새 항목을 삽입합니다.
 */

void send_overwrite(QueueHandle_t xQueue, const void *pvItem, UBaseType_t uxItemSize)
{
    (void)uxItemSize;

    /* 큐가 꽉 찼으면 가장 오래된 항목 제거 */
    if (uxQueueSpacesAvailable(xQueue) == 0)
    {
        uint8_t dummy[32];   /* 최대 아이템 크기만큼 임시 버퍼 */
        xQueueReceive(xQueue, dummy, 0);   /* 가장 오래된 항목 버리기 */
    }

    xQueueSend(xQueue, pvItem, 0);
}

/*
 * 주의: 위 패턴은 Task 컨텍스트에서만 안전합니다.
 * ISR에서 사용하면 xQueueReceive() 호출 자체가 금지됩니다.
 * ISR에서 오버라이트가 필요한 경우 xQueueOverwriteFromISR()을
 * 지원하는 Queue Set 또는 별도의 단일 슬롯 Queue를 사용합니다.
 */

3. Queue 공백(Empty) 처리

3.1 소비자가 너무 빠른 경우

소비자 Task가 생산자보다 빠르면 Queue가 자주 비어있는 상태가 됩니다. 이 경우 portMAX_DELAY로 대기하면 데이터가 들어올 때까지 CPU를 양보하여 전력과 처리 시간을 절약합니다.

#include "FreeRTOS.h"
#include "queue.h"
#include <stdio.h>
#include <stdint.h>

typedef struct
{
    uint8_t  channel;
    uint16_t raw;
    uint32_t timestamp_ms;
} SensorData_t;

static QueueHandle_t xSensorQueue;

/*
 * 소비자: Queue에 데이터가 올 때까지 무한 대기
 * CPU 사용률 0% (Blocked 상태) → 다른 Task가 CPU 활용
 */
void vSensorConsumerTask(void *pvParameters)
{
    SensorData_t data;

    while (1)
    {
        /*
         * portMAX_DELAY:
         *   INCLUDE_vTaskSuspend = 1 이면 실제 무한 대기
         *   INCLUDE_vTaskSuspend = 0 이면 최대 틱 값(약 49일) 대기
         */
        xQueueReceive(xSensorQueue, &data, portMAX_DELAY);

        float voltage = data.raw * 3.3f / 4095.0f;
        printf("[Consumer] ch%u %.3fV @%lums\n",
               data.channel, voltage, data.timestamp_ms);
    }
}

/*
 * 생산자: 500ms마다 Queue에 삽입
 * Consumer Task는 데이터가 없는 동안 자동으로 Blocked 상태 유지
 */
void vSensorProducerTask(void *pvParameters)
{
    SensorData_t data;
    uint8_t ch = 0;

    while (1)
    {
        data.channel      = ch;
        data.raw          = (uint16_t)(1000 + ch * 300);   /* 시뮬레이션 */
        data.timestamp_ms = xTaskGetTickCount();

        xQueueSend(xSensorQueue, &data, portMAX_DELAY);   /* 큐 여유 생길 때까지 대기 */

        ch = (ch + 1) % 4;
        vTaskDelay(pdMS_TO_TICKS(500));
    }
}

3.2 xQueuePeek()으로 확인 후 처리

/*
 * xQueuePeek(): 큐에서 데이터를 꺼내지 않고 복사만 합니다.
 *               이후 xQueueReceive()로 실제로 꺼낼 수 있습니다.
 *
 * 사용 사례:
 *   - 데이터를 처리할 수 있는 상태인지 먼저 확인 후 꺼내는 패턴
 *   - 여러 큐 중 가장 먼저 도착한 데이터 타입을 보고 처리 순서 결정
 */

void vPeekExampleTask(void *pvParameters)
{
    SensorData_t data;

    while (1)
    {
        /* 데이터 타입을 확인만 함 (큐에 남아있음) */
        if (xQueuePeek(xSensorQueue, &data, pdMS_TO_TICKS(100)) == pdTRUE)
        {
            /* 채널 0 데이터만 우선 처리, 나머지는 나중에 */
            if (data.channel == 0)
            {
                xQueueReceive(xSensorQueue, &data, 0);   /* 실제로 꺼냄 */
                printf("[Priority] ch0 처리: raw=%u\n", data.raw);
            }
            else
            {
                /* ch0이 아니면 지금 처리하지 않고 잠시 대기 */
                vTaskDelay(pdMS_TO_TICKS(10));
            }
        }
    }
}

4. 실습: 센서 데이터 Queue 전송

실습 1: 멀티 채널 센서 집계 시스템

4개 ADC 채널을 순서대로 측정하여 Queue에 보내고, 처리 Task가 채널별 이동평균을 계산합니다.

#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
#include "stm32f4xx_hal.h"
#include <stdio.h>
#include <stdint.h>

#define ADC_CH_COUNT     4
#define QUEUE_DEPTH      16
#define AVG_WINDOW       8     /* 이동평균 윈도우 크기 */

typedef struct
{
    uint8_t  channel;
    uint16_t raw;
    uint32_t timestamp_ms;
} AdcMsg_t;

static QueueHandle_t xAdcQueue;
static uint32_t      g_drop_count = 0;

/* 이동평균 필터 상태 */
static uint16_t g_avg_buf[ADC_CH_COUNT][AVG_WINDOW] = {0};
static uint8_t  g_avg_idx[ADC_CH_COUNT] = {0};

static float moving_average(uint8_t ch, uint16_t new_val)
{
    g_avg_buf[ch][g_avg_idx[ch]] = new_val;
    g_avg_idx[ch] = (g_avg_idx[ch] + 1) % AVG_WINDOW;

    uint32_t sum = 0;
    for (uint8_t i = 0; i < AVG_WINDOW; i++)
        sum += g_avg_buf[ch][i];

    return (float)sum / AVG_WINDOW;
}

/* ADC 수집 Task: 100ms마다 4채널 순차 측정 */
void vAdcCollectTask(void *pvParameters)
{
    AdcMsg_t msg;

    while (1)
    {
        for (uint8_t ch = 0; ch < ADC_CH_COUNT; ch++)
        {
            msg.channel      = ch;
            msg.raw          = (uint16_t)(500 + ch * 800 + (xTaskGetTickCount() % 100));
            msg.timestamp_ms = xTaskGetTickCount();

            if (xQueueSend(xAdcQueue, &msg, pdMS_TO_TICKS(20)) != pdTRUE)
            {
                g_drop_count++;
            }

            vTaskDelay(pdMS_TO_TICKS(25));   /* 채널 간 25ms 간격 */
        }
    }
}

/* 처리 Task: Queue에서 꺼내 이동평균 계산 */
void vAdcProcessTask(void *pvParameters)
{
    AdcMsg_t msg;

    while (1)
    {
        xQueueReceive(xAdcQueue, &msg, portMAX_DELAY);

        float avg = moving_average(msg.channel, msg.raw);
        float voltage = avg * 3.3f / 4095.0f;

        printf("[ADC] ch%u raw=%u avg=%.1f volt=%.3fV @%lums drop=%lu\n",
               msg.channel, msg.raw, avg, voltage,
               msg.timestamp_ms, g_drop_count);
    }
}

int main(void)
{
    HAL_Init();
    SystemClock_Config();

    xAdcQueue = xQueueCreate(QUEUE_DEPTH, sizeof(AdcMsg_t));
    configASSERT(xAdcQueue != NULL);

    xTaskCreate(vAdcCollectTask, "AdcCollect", 256, NULL, 3, NULL);
    xTaskCreate(vAdcProcessTask, "AdcProcess", 512, NULL, 2, NULL);

    vTaskStartScheduler();
    while (1);
}

실습 2: Queue 포화 감지 및 복구

Queue가 포화 상태에 가까워지면 생산자 속도를 줄이는 적응형 속도 제어를 구현합니다.

#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
#include <stdio.h>
#include <stdint.h>

#define QUEUE_DEPTH      12
#define THROTTLE_THRESH  8    /* 8개 이상 쌓이면 생산 속도 감소 */

static QueueHandle_t xDataQueue;

void vAdaptiveProducerTask(void *pvParameters)
{
    uint32_t count = 0;
    TickType_t delay_ms = 50;   /* 기본 50ms */

    while (1)
    {
        count++;

        UBaseType_t waiting = uxQueueMessagesWaiting(xDataQueue);

        /* 큐 상태에 따라 생산 속도 조절 */
        if (waiting >= THROTTLE_THRESH)
        {
            delay_ms = 150;   /* 속도 감소 */
            printf("[P] 쓰로틀: 큐 %u/%u, delay=%lums\n",
                   (unsigned)waiting, QUEUE_DEPTH, delay_ms);
        }
        else if (waiting < THROTTLE_THRESH / 2)
        {
            delay_ms = 50;    /* 속도 복원 */
        }

        xQueueSend(xDataQueue, &count, pdMS_TO_TICKS(10));
        vTaskDelay(pdMS_TO_TICKS(delay_ms));
    }
}

void vSlowConsumerTask(void *pvParameters)
{
    uint32_t received;

    while (1)
    {
        xQueueReceive(xDataQueue, &received, portMAX_DELAY);
        printf("[C] 수신: %lu\n", received);
        vTaskDelay(pdMS_TO_TICKS(120));   /* 소비가 느림 */
    }
}

int main(void)
{
    xDataQueue = xQueueCreate(QUEUE_DEPTH, sizeof(uint32_t));
    configASSERT(xDataQueue != NULL);

    xTaskCreate(vAdaptiveProducerTask, "Prod", 256, NULL, 2, NULL);
    xTaskCreate(vSlowConsumerTask,     "Cons", 256, NULL, 2, NULL);

    vTaskStartScheduler();
    while (1);
}

학습 정리

오늘 핵심은 xTicksToWait 매개변수가 Task의 CPU 양보 방식을 제어한다는 점입니다. 0은 비차단, pdMS_TO_TICKS(N)은 타임아웃, portMAX_DELAY는 무한 대기이며, 각 상황에 맞게 선택해야 시스템 응답성과 CPU 효율을 동시에 확보할 수 있습니다.

항목내용
xQueueSend() Blocking큐 포화 시 Task를 Blocked 상태로 전환, CPU 양보
xQueueReceive() Blocking큐 공백 시 Task를 Blocked 상태로 전환, CPU 양보
xTicksToWait = 0비차단, 즉시 반환, ISR이 아닌 Task에서도 사용 가능
pdMS_TO_TICKS(N)N ms 후 타임아웃, 주기적 작업이나 워치독 패턴에 적합
portMAX_DELAY무한 대기, 연속 데이터 스트림 소비자에 적합
큐 포화 시 대응대기/드롭/오버라이트 중 요구사항에 맞게 선택
uxQueueMessagesWaiting()현재 큐 항목 수 조회, 적응형 속도 제어에 활용

실습 과제

  1. xTicksToWait를 0, 100ms, portMAX_DELAY로 각각 설정했을 때 Producer/Consumer 속도 차이에 따라 시스템이 어떻게 반응하는지 세 가지 시나리오를 직접 빌드하고 관찰하십시오.
  2. Queue 깊이를 2로 줄이고 Producer가 10ms마다, Consumer가 50ms마다 동작하도록 설정한 뒤 드롭 카운터가 얼마나 빠르게 증가하는지 측정하십시오.
  3. 실습 2의 적응형 속도 제어 코드에 THROTTLE_THRESH를 동적으로 변경하는 설정 Task를 추가하고, UART로 값을 입력받아 실시간으로 조정하는 기능을 구현하십시오.

디버깅 팁

  1. xQueueSend()errQUEUE_FULL을 반환하는데 소비자 Task가 있다면 소비자 우선순위를 확인한다. 생산자 우선순위가 너무 높아 소비자가 실행되지 못하면 큐가 금방 포화된다.
  2. Queue 전체를 지우려면 xQueueReset()을 사용한다. 시스템 오류 복구 시 이전 데이터를 전부 버리고 새로 시작할 때 유용하다.
  3. vTaskDelay(0)은 동일 우선순위의 다른 Task에 CPU를 양보하는 최소 단위 지연이다. 소비자가 없어 큐가 포화되는 상황에서 생산자 Task 내부에 vTaskDelay(0)을 넣으면 스케줄러가 소비자를 실행할 기회를 얻는다.
  4. FreeRTOS Trace(Tracealyzer 등)를 사용하면 Queue 송수신 이벤트 타임라인을 시각적으로 확인할 수 있어 Blocking과 Timeout 동작을 직관적으로 분석할 수 있다.
profile
당신의 코딩 메이트

0개의 댓글