FIFO implementation

Seungyun Lee·4일 전

RTOS

목록 보기
12/14

1.Wrap-around

그림을 보면 지시봉(PutPt와 GetPt)이 배열의 맨 아래(노란색 칸)까지 내려갔다가, 다음 번에는 배열의 맨 위(첫 번째 칸)로 휙! 하고 꺾여서 올라가는 화살표가 보이시죠?
이것이 바로 메모리 낭비와 폭발을 막아주는 환형 큐의 핵심입니다.

수정 사항 1 & 2: PutPt와 GetPt는 한 칸씩 전진하다가 배열의 끝(FIFOSIZE)에 도달하면, 다시 인덱스 0으로 돌아가야(Wrap-around) 합니다.

2. 완벽한 FIFO를 위한 3개의 세마포어 (중간고사 핵심)

아까 우리가 배운 '택배함' 비유에서 세마포어 하나가 더 추가되었습니다. 여러 명의 택배 기사(Multiple Producers)와 여러 명의 고객(Multiple Consumers)이 동시에 접근할 때 발생하는 충돌(Race Condition)을 막기 위해서입니다.

① RoomLeft (초기값 = FIFOSIZE)

  • 의미: "택배함에 빈자리가 몇 개 남았는가?"
  • 동작: 생산자(Fifo_Put)가 물건을 넣기 전에 이 값을 하나 깎고(Wait), 소비자(Fifo_Get)가 물건을 빼가면 빈자리가 생겼으니 값을 하나 늘려줍니다(Signal).
  • 블로킹: 빈자리가 0이면 생산자는 문 밖에서 꼼짝없이 기다려야(Block) 합니다.

② CurrentSize (초기값 = 0)

  • 의미: "택배함에 들어있는 물건이 몇 개인가?"
  • 동작: 생산자가 물건을 넣으면 값을 늘려주고(Signal), 소비자가 물건을 뺄 때 값을 하나 깎습니다(Wait).
  • 블로킹: 물건이 0개면 소비자는 물건이 올 때까지 꼼짝없이 기다려야(Block) 합니다.

③ FIFOmutex (초기값 = 1) 👈 NEW!

  • 의미: "지시봉(PutPt, GetPt)을 만질 수 있는 열쇠"
  • 왜 필요한가: 만약 택배 기사가 2명인데, 동시에 같은 빈자리에 물건을 쑤셔 넣으려고 하면 대참사가 일어납니다. 그래서 지시봉을 움직이는 아주 짧은 순간(Critical Section)에는 오직 한 명만 접근할 수 있도록 상호 배제(Mutual Exclusion) 방어막을 쳐주는 것입니다.

Two-pointer three-semaphore FIFO

#define FIFOSIZE 10 // Can be any size

uint32_t volatile *PutPt;  // Pointer for next put
uint32_t volatile *GetPt;  // Pointer for next get
uint32_t static Fifo[FIFOSIZE]; // Statically allocated array

int32_t CurrentSize; // Number of elements (0 means FIFO empty)
int32_t RoomLeft;    // Number of empty spaces (0 means FIFO full)
int32_t FIFOmutex;   // Binary semaphore for exclusive access

// ******** OS_Fifo_Init ************
// Initialize FIFO pointers and semaphores
void OS_Fifo_Init(void){
    PutPt = GetPt = &Fifo[0]; // Initially empty, both point to start
    
    OS_InitSemaphore(&CurrentSize, 0);        // Initially 0 items
    OS_InitSemaphore(&RoomLeft, FIFOSIZE);    // Initially 10 empty spaces
    OS_InitSemaphore(&FIFOmutex, 1);          // Initially unlocked (1)
}

// ******** OS_Fifo_Put ************
// Put data into FIFO. Blocks if full.
void OS_Fifo_Put(uint32_t data){
    OS_Wait(&RoomLeft);       // 1. Wait for empty space
    OS_Wait(&FIFOmutex);      // 2. Lock FIFO (Critical Section)
    
    *(PutPt) = data;          // 3. Put data
    PutPt++;                  // 4. Move pointer to next slot
    
    // Wrap-around logic
    if(PutPt == &Fifo[FIFOSIZE]){
        PutPt = &Fifo[0];     // Wrap back to the beginning
    }
    
    OS_Signal(&FIFOmutex);    // 5. Unlock FIFO
    OS_Signal(&CurrentSize);  // 6. Notify that a new item is available
}

// ******** OS_Fifo_Get ************
// Get data from FIFO. Blocks if empty.
uint32_t OS_Fifo_Get(void){ 
    uint32_t data;
    
    OS_Wait(&CurrentSize);    // 1. Wait for available data
    OS_Wait(&FIFOmutex);      // 2. Lock FIFO (Critical Section)
    
    data = *(GetPt);          // 3. Get data
    GetPt++;                  // 4. Move pointer to next slot
    
    // Wrap-around logic
    if(GetPt == &Fifo[FIFOSIZE]){
        GetPt = &Fifo[0];     // Wrap back to the beginning
    }
    
    OS_Signal(&FIFOmutex);    // 5. Unlock FIFO
    OS_Signal(&RoomLeft);     // 6. Notify that a space is freed up
    
    return data;
}

이 코드에서 절대 틀리면 안 되는 2가지

이 코드는 실무에서도 그대로 쓰는 교과서적인 정답입니다. 교수님들이 이 문제를 낼 때는 학생이 코드의 모양만 외웠는지, 아니면 동작 원리를 진짜 이해했는지 걸러내기 위해 두 가지 함정을 팝니다.

  1. Wrap-around (포인터 순간이동) 조건문의 위치
    • if(PutPt == &Fifo[FIFOSIZE]) 이 부분입니다.
    • 배열의 크기가 10(FIFOSIZE)이라면, 유효한 인덱스는 0부터 9까지입니다.
    • 포인터가 9번에 데이터를 쓰고 PutPt++를 하면 주소가 10번 인덱스 위치(&Fifo[10])를 가리키게 됩니다. 이때 정확히 &Fifo[0]으로 되돌려줘야 메모리 초과 에러(Hard Fault)를 막을 수 있습니다.
  2. "FIFO 구현 시, 반드시 Resource Semaphore (RoomLeft / CurrentSize)를 먼저 Wait 하고, Mutex를 나중에 Wait 해야 Deadlock을 방지할 수 있다."
OS_Wait(&RoomLeft);    // 빈자리 확인
OS_Wait(&FIFOmutex);   // 자물쇠부터 잠금

Question

On average over the long term, what is the relationship between the
number of times Wait is called compared to the number of times
Signal is called?

  • Over the long term, Wait and Signal must be equal. Excess Signals lead to infinite values, while excess Waits cause all threads to be blocked/stalled.

On average over the long term, what is the relationship between the number of times Put is successfully called compared to the number of times Get is successfully called?

  • "Successful Put and Get calls must average to be equal. Excess Puts make the FIFO full, and excess Gets make it empty. The difference between successful Puts and Gets is strictly bounded between 0 and N."

"Producer가 ISR(Event Thread)일 때 FIFO/ Two-semaphore FIFO

1. No Blocking for Event Threads (절대 멈추면 안 됨)

이 슬라이드의 존재 이유입니다. Producer Event Thread (ISR)는 타이머 인터럽트처럼 하드웨어에 의해 잠깐 불려 나와서 순식간에 일을 끝내고 사라져야 하는 아주 예민한 녀석입니다. 만약 FIFO가 꽉 찼다고 해서 ISR 안에서 빈자리가 날 때까지 기다리며 Block 되거나 Spin 해버리면? RTOS 스케줄러 전체가 그 자리에 멈춰서 완전히 뻗어버립니다 (System crash).

2. Handling Full FIFO (데이터 유실 허용)

기다릴 수 없으니 현실적인 선택지는 하나뿐입니다. FIFO가 꽉 차서(CurrentSize == FIFOSIZE) 넣을 공간이 없다면, 새로 들어온 데이터는 과감하게 버려야 합니다. 대신 시스템이 얼마나 데이터를 잃어버렸는지 추적하기 위해 LostData 카운터를 증가시키고, Put 함수는 실패했다는 의미로 Error값(-1)을 리턴하고 곧바로 종료합니다.

3. Only ONE Producer Allowed (Mutex 사용 불가)

이전 슬라이드에서 생산자가 여러 명일 때는 포인터(PutPt)가 꼬이는 Read-Modify-Write 충돌(Race Condition)을 막기 위해 FIFOmutex가 필요하다고 배웠었죠?
하지만 Event Thread 안에서는 Mutex를 획득하려고 기다릴(Wait/Block) 수 없습니다. 방패(Mutex)를 못 쓰니 충돌을 막을 유일한 방법은 "데이터를 집어넣는 Producer를 딱 1개(단일 인터럽트)로 엄격하게 제한"하는 것뿐입니다. (대신, 데이터를 빼가는 Main Thread Consumer는 여러 명이어도 상관없습니다.)

#define FIFOSIZE 10 // Can be any size

uint32_t volatile *PutPt; // Pointer for next put
uint32_t volatile *GetPt; // Pointer for next get
uint32_t static Fifo[FIFOSIZE];

int32_t CurrentSize; // 0 means FIFO empty
int32_t FIFOmutex;   // Exclusive access to FIFO for consumers
uint32_t LostData;   // Counter for discarded data

// ******** OS_Fifo_Init ************
void OS_Fifo_Init(void){
    PutPt = GetPt = &Fifo[0];         // Initialize to Empty
    OS_InitSemaphore(&CurrentSize, 0); 
    OS_InitSemaphore(&FIFOmutex, 1);
    LostData = 0;                     // Initialize error counter
}

// ******** OS_FIFO_Put ************
// Called by Producer (Event Thread / ISR)
// Returns 0 on success, -1 if full (Non-blocking)
int OS_FIFO_Put(uint32_t data){
    // 1. Check if full WITHOUT blocking
    if(CurrentSize == FIFOSIZE){
        LostData++; // Error: Data discarded
        return -1;
    }
    
    // 2. Put data and advance pointer
    *(PutPt) = data; 
    PutPt++; 
    
    // 3. Wrap-around logic
    if(PutPt == &Fifo[FIFOSIZE]){
        PutPt = &Fifo[0]; 
    }
    
    // 4. Notify consumer
    OS_Signal(&CurrentSize);
    return 0;
}

// ******** OS_FIFO_Get ************
// Called by Consumer (Main Thread)
// Returns data, blocks if empty
uint32_t OS_FIFO_Get(void){
    uint32_t data;
    
    OS_Wait(&CurrentSize); // Block if FIFO is empty
    OS_Wait(&FIFOmutex);   // Enter Critical Section for GetPt
    
    // 1. Get data and advance pointer
    data = *(GetPt); 
    GetPt++; 
    
    // 2. Wrap-around logic
    if(GetPt == &Fifo[FIFOSIZE]){
        GetPt = &Fifo[0]; 
    }
    
    OS_Signal(&FIFOmutex); // Exit Critical Section
    return data;
}
  1. RoomLeft Semaphore가 완전히 삭제됨 (Non-blocking)
  • 이전: 꽉 차면 빈자리가 날 때까지 대기했습니다 (OS_Wait(&RoomLeft)).
  • 현재: 꽉 차면 if(CurrentSize == FIFOSIZE)로 확인한 뒤, 기다리지 않고 LostData++를 실행하며 데이터를 버립니다 (Discard).
  • Why? Producer가 ISR (Event Thread)이기 때문에 절대 Block이나 Spin을 걸어 시스템을 멈추게 하면 안 되기 때문입니다.
  1. OS_FIFO_Put 함수 내부에 FIFOmutex가 없음 (Single Producer 제한)
  • OS_FIFO_Get에는 소비자들끼리의 충돌을 막기 위해 OS_Wait(&FIFOmutex)가 여전히 존재합니다. 하지만 Put 함수에는 자물쇠가 없습니다.
  • Why? ISR 안에서는 자물쇠(Mutex)가 열리기를 기다리며 멈출 수 없습니다. 그래서 이 설계는 애초에 "Producer는 단 1개(Single Producer)의 하드웨어 인터럽트만 존재한다"는 엄격한 전제 조건하에 작동합니다. 반면 소비자(Consumer)는 Main Thread이므로 여러 개(Multiple Consumers)가 존재할 수 있어 FIFOmutex로 보호합니다.

One-semaphore FIFO implementation


3개에서 시작했던 세마포어가 2개를 거쳐 드디어 딱 1개로 줄었습니다. 시험 문제에서 "생산자가 1명(Event Thread)이고 소비자가 1명(Main Thread)일 때, 코드를 어떻게 가장 간결하게 최적화할 수 있는가?"라는 질문이 나오면 바로 이 코드가 정답입니다.

#define FIFOSIZE 10 // Size of the FIFO

uint32_t PutI; // Index for the next put
uint32_t GetI; // Index for the next get
uint32_t Fifo[FIFOSIZE]; // Statically allocated array

int32_t CurrentSize; // Number of elements (0 = empty, FIFOSIZE = full)
uint32_t LostData;   // Counter for discarded data

// ******** OS_FIFO_Init ************
void OS_FIFO_Init(void){
    PutI = GetI = 0; // Initialize indices to 0 (Empty)
    OS_InitSemaphore(&CurrentSize, 0); // Initially empty
    LostData = 0;
}

// ******** OS_FIFO_Put ************
// Called by exactly ONE Producer (Event Thread / ISR)
int OS_FIFO_Put(uint32_t data){
    if(CurrentSize == FIFOSIZE){ // Check if full (Non-blocking)
        LostData++;              // Discard data
        return -1;               // Return error
    } else {
        Fifo[PutI] = data;       // Put data at current index
        PutI = (PutI + 1) % FIFOSIZE; // Wrap-around using modulo
        OS_Signal(&CurrentSize); // Notify consumer
        return 0;                // Success
    }
}

// ******** OS_FIFO_Get ************
// Called by exactly ONE Consumer (Main Thread)
uint32_t OS_FIFO_Get(void){
    uint32_t data;
    
    OS_Wait(&CurrentSize);       // Block if empty
    
    data = Fifo[GetI];           // Get data from current index
    GetI = (GetI + 1) % FIFOSIZE; // Wrap-around using modulo
    
    return data;
}
  1. FIFOmutex의 완전한 제거 (No Critical Section)
  • "With one producer and one consumer, they access different indices (PutI vs GetI). There is no critical section regarding the pointers, so the mutex is removed."
  1. 포인터 대신 인덱스와 Modulo 연산자 (%) 사용
  • 앞선 코드에서는 포인터 주소(PutPt++)를 쓴 다음, 끝에 도달했는지 확인하려고 if 문을 썼었죠?

  • 이번엔 배열의 인덱스 번호(PutI, GetI)를 사용합니다. 그리고 프로그래밍의 마법인 나머지 연산자(Modulo, %)를 사용해 if 문을 완전히 없애버렸습니다.

  • 작동 원리: PutI가 9일 때, (9 + 1) % 10을 계산하면 나머지가 0이 됩니다. if 문 없이도 완벽하게 맨 처음 위치(인덱스 0)로 랩어라운드(Wrap-around) 하는 가장 우아한 방법입니다.

profile
RTL, FPGA Engineer

0개의 댓글