코드 위치 : https://github.com/CUBRID/cubrid
storage/double_write_buffer.c: 2820
/*
* dwb_create () - Create DWB.
*
* return : Error code.
* thread_p (in): The thread entry.
* dwb_path_p (in) : The double write buffer volume path.
* db_name_p (in) : The database name.
*/
int dwb_create(THREAD_ENTRY *thread_p, const char *dwb_path_p, const char *db_name_p)
{
UINT64 current_position_with_flags;
int error_code = NO_ERROR;
error_code = dwb_starts_structure_modification(thread_p, ¤t_position_with_flags);
// 구조 변경 시작, bit 플래그 세팅, dwb 초기화
if (error_code != NO_ERROR)
{
dwb_log_error("Can't create DWB: error = %d\n", error_code);
return error_code;
}
/* DWB structure modification started, no other transaction can modify the global position with flags */
if (DWB_IS_CREATED(current_position_with_flags))
// current_position_with_flags에 DWB_CREATE가 set되어 있다면
{
/* Already created, restore the modification flag. */
goto end;
}
fileio_make_dwb_name(dwb_Volume_name, dwb_path_p, db_name_p);
// 만약 dwb_path_p가 /로 끝나는 경우
// *dwb_Volume_name = "[dwb_path_p][db_name_p]_dwb";
// /가 없는 경우
// *dwb_Volume_name = "[dwb_path_p]/[db_name_p]_dwb";
error_code = dwb_create_internal(thread_p, dwb_Volume_name, ¤t_position_with_flags);
// dwb_Global의 실질적 할당 / 초기화
if (error_code != NO_ERROR)
{
dwb_log_error("Can't create DWB: error = %d\n", error_code);
goto end;
}
end:
/* Ends the modification, allowing to others to modify global position with flags. */
dwb_ends_structure_modification(thread_p, current_position_with_flags);
// 구조 변경 종료, bit 플래그 세팅, 이 스레드의 점유 상태를 해제하고 wait_queue에 있는 다음 스레드 깨움
return error_code;
}
storage/double_write_buffer.c: 806
/*
* dwb_starts_structure_modification () - Starts structure modifications.
*
* return : Error code
* thread_p (in): The thread entry.
* current_position_with_flags(out): The current position with flags.
*
* Note: This function must be called before changing structure of DWB.
*/
STATIC_INLINE int
dwb_starts_structure_modification(THREAD_ENTRY *thread_p, UINT64 *current_position_with_flags)
{
UINT64 local_current_position_with_flags, new_position_with_flags, min_version;
unsigned int block_no;
int error_code = NO_ERROR;
unsigned int start_block_no, blocks_count;
DWB_BLOCK *file_sync_helper_block;
assert(current_position_with_flags != NULL);
do
{
local_current_position_with_flags = ATOMIC_INC_64(&dwb_Global.position_with_flags, 0ULL);
// local_current_position_with_flags = dwb_Global.position_with_flags;
if (DWB_IS_MODIFYING_STRUCTURE(local_current_position_with_flags))
// 만약 local_current_position_with_flags에 DWB_MODIFY_STRUCTURE 플래그가 세워져있다면
{
/* Only one thread can change the structure */
return ER_FAILED;
// 오직 하나의 스레드만 구조체에 영향을 줄 수 있기 때문에 에러처리
}
new_position_with_flags = DWB_STARTS_MODIFYING_STRUCTURE(local_current_position_with_flags);
/* Start structure modifications, the threads that want to flush afterwards, have to wait. */
} while (!ATOMIC_CAS_64(&dwb_Global.position_with_flags, local_current_position_with_flags, new_position_with_flags));
// dwb_Global.position_with_flags 값과 local_current_position_with_flags값이 같으면,
// dwb_Global.position_with_flag에 new_position_with_flags를 할당하고 true를 반환
// 같지 않으면 false 반환
// 아마도 다른 스레드가 위 코드를 거의 동시에 시작한 경우,
// while 에 늦게 도착한 스레드는 코드를 다시 실행하고 DWB_MODIFY_STRUCTURE 플래그가 세워져 있기 때문에 에러처리
#if defined(SERVER_MODE)
while ((ATOMIC_INC_32(&dwb_Global.blocks_flush_counter, 0) > 0) || dwb_flush_block_daemon_is_running() || dwb_file_sync_helper_daemon_is_running())
// while (dwb_Global.blocks_flush_counter > 0 || \
// dwb_flush_block_daemon_is_running() || \
// dwb_file_sync_helper_daemon_is_running())
// flush thread가 dwb에 접근 중일 때는 구조를 변경할 수 없으므로 flush가 끝날 때까지 대기
{
/* Can't modify structure while flush thread can access DWB. */
thread_sleep(20);
}
#endif
/* Since we set the modify structure flag, I'm the only thread that access the DWB. */
file_sync_helper_block = dwb_Global.file_sync_helper_block;
if (file_sync_helper_block != NULL)
{
/* All remaining blocks are flushed by me. */
(void)ATOMIC_TAS_ADDR(&dwb_Global.file_sync_helper_block, (DWB_BLOCK *)NULL);
dwb_log("Structure modification, needs to flush DWB block = %d having version %lld\n",
file_sync_helper_block->block_no, file_sync_helper_block->version);
}
local_current_position_with_flags = ATOMIC_INC_64(&dwb_Global.position_with_flags, 0ULL);
/* Need to flush incomplete blocks, ordered by version. */
start_block_no = DWB_NUM_TOTAL_BLOCKS;
min_version = 0xffffffffffffffff;
// 어떤 block의 version 보다 무조건 크도록 설정
blocks_count = 0;
for (block_no = 0; block_no < DWB_NUM_TOTAL_BLOCKS; block_no++)
{
if (DWB_IS_BLOCK_WRITE_STARTED(local_current_position_with_flags, block_no))
// MSB부터 정방향 체크
{
if (dwb_Global.blocks[block_no].version < min_version)
{
min_version = dwb_Global.blocks[block_no].version;
start_block_no = block_no;
}
blocks_count++;
}
}
// > DWB_IS_BLOCK_WRITE_STARTED(local_current_position_with_flags, block_no)
// = (local_current_position_with_flags) & (1ULL << (63 - (block_no)))) != 0
// local_current_position_with_flags에서 오른쪽 방향으로 block_no번째 bit가 set되어 있으면 true, clear되어있으면 false를 반환
block_no = start_block_no;
while (blocks_count > 0)
{
if (DWB_IS_BLOCK_WRITE_STARTED(local_current_position_with_flags, block_no))
// 위와 같음. 오른쪽 방향으로 block_no 번째 비트가 set되어있으면
{
/* Flush all pages from current block. I must flush all remaining data. */
error_code =
dwb_flush_block(thread_p, &dwb_Global.blocks[block_no], false, &local_current_position_with_flags);
// 블록에 남아있는 데이터 flush
if (error_code != NO_ERROR)
{
/* Something wrong happened. */
dwb_log_error("Can't flush block = %d having version %lld\n", block_no,
dwb_Global.blocks[block_no].version);
return error_code;
}
dwb_log_error("DWB flushed %d block having version %lld\n", block_no, dwb_Global.blocks[block_no].version);
blocks_count--;
// flush 이후 flush가 필요한 블록 카운트 -1
}
block_no = (block_no + 1) % DWB_NUM_TOTAL_BLOCKS;
// 결과적으로 version이 가장 낮은 블록부터 오른쪽으로 순회, 인덱스 끝에 닿으면 다시 0으로 돌아와서 순회
}
local_current_position_with_flags = ATOMIC_INC_64(&dwb_Global.position_with_flags, 0ULL);
// local_current_position_with_flags = dwb_Global.position_with_flags
assert(DWB_GET_BLOCK_STATUS(local_current_position_with_flags) == 0);
// flush에 실패한 block이 있을 경우, crash
*current_position_with_flags = local_current_position_with_flags;
// 포인터 매개변수를 통해 out
return NO_ERROR;
}
storage/double_write_buffer.c: 908
/*
* dwb_ends_structure_modification () - Ends structure modifications.
*
* return : Error code.
* thread_p (in): The thread entry.
* current_position_with_flags(in): The current position with flags.
*/
STATIC_INLINE void
dwb_ends_structure_modification(THREAD_ENTRY *thread_p, UINT64 current_position_with_flags)
{
UINT64 new_position_with_flags;
new_position_with_flags = DWB_ENDS_MODIFYING_STRUCTURE(current_position_with_flags);
// 구조 변경을 마쳤으므로 DWB_MODIFY_STRUCTURE를 clear
/* Ends structure modifications. */
assert(dwb_Global.position_with_flags == current_position_with_flags);
ATOMIC_TAS_64(&dwb_Global.position_with_flags, new_position_with_flags);
// dwb_Global.position_with_flags = new_position_with_flags
/* Signal the other threads. */
dwb_signal_structure_modificated(thread_p);
}
storage/double_write_buffer.c: 1147
/*
* dwb_create_internal () - Create double write buffer.
*
* return : Error code.
* thread_p (in): The thread entry.
* dwb_volume_name (in) : The double write buffer volume name.
* current_position_with_flags (in/out): Current position with flags.
*
* Note: Is user responsibility to ensure that no other transaction can access DWB structure, during creation.
*/
STATIC_INLINE int
dwb_create_internal(THREAD_ENTRY *thread_p, const char *dwb_volume_name, UINT64 *current_position_with_flags)
{
int error_code = NO_ERROR;
unsigned int double_write_buffer_size, num_blocks = 0;
unsigned int i, num_pages, num_block_pages;
int vdes = NULL_VOLDES;
DWB_BLOCK *blocks = NULL;
UINT64 new_position_with_flags;
const int freelist_block_count = 2;
const int freelist_block_size = DWB_SLOTS_FREE_LIST_SIZE;
assert(dwb_volume_name != NULL && current_position_with_flags != NULL);
double_write_buffer_size = prm_get_integer_value(PRM_ID_DWB_SIZE);
// PRM_ID_DWB_SIZE : defualt 2M
num_blocks = prm_get_integer_value(PRM_ID_DWB_BLOCKS);
// PRM_ID_DWB_BLOCKS : (PRM_NAME_DWB_BLOCKS) default 2개
if (double_write_buffer_size == 0 || num_blocks == 0)
{
/* Do not use double write buffer. */
return NO_ERROR;
}
dwb_adjust_write_buffer_values(&double_write_buffer_size, &num_blocks);
num_pages = double_write_buffer_size / IO_PAGESIZE;
num_block_pages = num_pages / num_blocks;
// double_write_buffer_size = PRM_ID_DWB_SIZE : defualt 2M
// IO_PAGESIZE : MIN 4K, MAX 16K(default)
// 페이지 크기는 4K, 8K, 16K. 4K와 16K 사이의 값을 지정할 경우 지정한 값의 올림값으로 설정되며,
// 4K보다 작으면 4K로 설정되고 16K보다 크면 16K로 설정된다.
assert(IS_POWER_OF_2(num_blocks));
assert(IS_POWER_OF_2(num_pages));
assert(IS_POWER_OF_2(num_block_pages));
assert(num_blocks <= DWB_MAX_BLOCKS);
/* Create and open DWB volume first */
vdes = fileio_format(thread_p, boot_db_full_name(), dwb_volume_name, LOG_DBDWB_VOLID, num_block_pages, true,
false, false, IO_PAGESIZE, 0, false);
if (vdes == NULL_VOLDES)
{
goto exit_on_error;
}
/* Needs to flush dirty page before activating DWB. */
fileio_synchronize_all(thread_p, false);
/* Create DWB blocks */
error_code = dwb_create_blocks(thread_p, num_blocks, num_block_pages, &blocks);
if (error_code != NO_ERROR)
{
goto exit_on_error;
}
dwb_Global.blocks = blocks;
dwb_Global.num_blocks = num_blocks;
dwb_Global.num_pages = num_pages;
dwb_Global.num_block_pages = num_block_pages;
dwb_Global.log2_num_block_pages = (unsigned int)(log((float)num_block_pages) / log((float)2));
dwb_Global.blocks_flush_counter = 0;
dwb_Global.next_block_to_flush = 0;
pthread_mutex_init(&dwb_Global.mutex, NULL);
dwb_init_wait_queue(&dwb_Global.wait_queue);
dwb_Global.vdes = vdes;
dwb_Global.file_sync_helper_block = NULL;
dwb_Global.slots_hashmap.init(dwb_slots_Ts, THREAD_TS_DWB_SLOTS, DWB_SLOTS_HASH_SIZE, freelist_block_size,
freelist_block_count, slots_entry_Descriptor);
/* Set creation flag. */
new_position_with_flags = DWB_RESET_POSITION(*current_position_with_flags);
new_position_with_flags = DWB_STARTS_CREATION(new_position_with_flags);
// position_with_flags 를 초기화하고(MSB에서 32bit, CREATE, MODIFY_STRUCTURE 를 제외한 flag 정리) CREATE bit 을 올린다.
if (!ATOMIC_CAS_64(&dwb_Global.position_with_flags, *current_position_with_flags, new_position_with_flags))
{
/* Impossible. */
assert(false);
}
*current_position_with_flags = new_position_with_flags;
return NO_ERROR;
exit_on_error:
if (vdes != NULL_VOLDES)
{
fileio_dismount(thread_p, vdes);
fileio_unformat(NULL, dwb_volume_name);
}
if (blocks != NULL)
{
for (i = 0; i < num_blocks; i++)
{
dwb_finalize_block(&blocks[i]);
}
free_and_init(blocks);
}
return error_code;
}
storage/double_write_buffer.c: 994
/*
* dwb_create_blocks () - Create the blocks.
*
* return : Error code.
* thread_p (in) : The thread entry.
* num_blocks(in): The number of blocks.
* num_block_pages(in): The number of block pages.
* p_blocks(out): The created blocks.
*/
STATIC_INLINE int
dwb_create_blocks(THREAD_ENTRY *thread_p, unsigned int num_blocks, unsigned int num_block_pages,
DWB_BLOCK **p_blocks)
{
DWB_BLOCK *blocks = NULL;
char *blocks_write_buffer[DWB_MAX_BLOCKS];
FLUSH_VOLUME_INFO *flush_volumes_info[DWB_MAX_BLOCKS];
DWB_SLOT *slots[DWB_MAX_BLOCKS];
unsigned int block_buffer_size, i, j;
int error_code;
FILEIO_PAGE *io_page;
assert(num_blocks <= DWB_MAX_BLOCKS);
*p_blocks = NULL;
for (i = 0; i < DWB_MAX_BLOCKS; i++)
{
blocks_write_buffer[i] = NULL;
slots[i] = NULL;
flush_volumes_info[i] = NULL;
}
blocks = (DWB_BLOCK *)malloc(num_blocks * sizeof(DWB_BLOCK));
if (blocks == NULL)
{
er_set(ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, num_blocks * sizeof(DWB_BLOCK));
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto exit_on_error;
}
memset(blocks, 0, num_blocks * sizeof(DWB_BLOCK));
// blocks 메모리할당 및 초기화, 블록 생성과 초기화
block_buffer_size = num_block_pages * IO_PAGESIZE;
// double_write_buffer_size / num_blocks
for (i = 0; i < num_blocks; i++)
{
blocks_write_buffer[i] = (char *)malloc(block_buffer_size * sizeof(char));
if (blocks_write_buffer[i] == NULL)
{
er_set(ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, block_buffer_size * sizeof(char));
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto exit_on_error;
}
memset(blocks_write_buffer[i], 0, block_buffer_size * sizeof(char));
}
// blocks_write_buffer에 block의 크기만큼 할당 (블록당 1개씩 갖는 느낌)
// block_write_buffer 생성
// 각 Block에서 지역변수 write_buffer라는 포인터를 가지고 있다.
// 블록 내 모든 Slot들이 참조하여 실제 Page의 내용이 저장된다.
for (i = 0; i < num_blocks; i++)
{
slots[i] = (DWB_SLOT *)malloc(num_block_pages * sizeof(DWB_SLOT));
if (slots[i] == NULL)
{
er_set(ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, num_block_pages * sizeof(DWB_SLOT));
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto exit_on_error;
}
memset(slots[i], 0, num_block_pages * sizeof(DWB_SLOT));
}
// block 1개의 page 개수만큼 SLOT 생성 및 초기화
for (i = 0; i < num_blocks; i++)
{
flush_volumes_info[i] = (FLUSH_VOLUME_INFO *)malloc(num_block_pages * sizeof(FLUSH_VOLUME_INFO));
if (flush_volumes_info[i] == NULL)
{
er_set(ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
num_block_pages * sizeof(FLUSH_VOLUME_INFO));
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto exit_on_error;
}
memset(flush_volumes_info[i], 0, num_block_pages * sizeof(FLUSH_VOLUME_INFO));
}
// flush_volumes_info 는 vdes, volume 에 flush 할 페이지의 수, 모든 페이지가 쓰였는지 여부, flush status 등을 담고있다.
// 각 블록의 페이지수만큼 할당하고 초기화한다.
for (i = 0; i < num_blocks; i++)
{
/* No need to initialize FILEIO_PAGE header here, since is overwritten before flushing */
for (j = 0; j < num_block_pages; j++)
{
io_page = (FILEIO_PAGE *)(blocks_write_buffer[i] + j * IO_PAGESIZE);
// buffer의 각 블록의 페이지마다의 point 를 io_page 에 저장
fileio_initialize_res(thread_p, io_page, IO_PAGESIZE);
// io_page 초기화 LOG_LSA의 값은 null 로, pageid, volid -1 로 초기화한다.
dwb_initialize_slot(&slots[i][j], io_page, j, i);
// i = block_no, j = position_in_blocks, slot 의 VPID, LOG_LSA 생성 및 초기화
}
dwb_initialize_block(&blocks[i], i, 0, blocks_write_buffer[i], slots[i], flush_volumes_info[i], 0,
num_block_pages);
}
// block 의 초기화 flush_volumes_info, count_flush_volumes_info = 0 (현재 flush 할 볼륨의 수)
// max_to_flush_vdes (flush 할 수 있는 최대 개수 = num_block_pages), write_buffer, slots, dwb_wait_queue
// count_wb_pages = 0 (Count DWB pages 라는데 뭔지 모르겠다.), block_no, version = 0, all_pages_written = false; 로 초기화한다.
*p_blocks = blocks;
return NO_ERROR;
exit_on_error:
for (i = 0; i < DWB_MAX_BLOCKS; i++)
{
if (slots[i] != NULL)
{
free_and_init(slots[i]);
}
if (blocks_write_buffer[i] != NULL)
{
free_and_init(blocks_write_buffer[i]);
}
if (flush_volumes_info[i] != NULL)
{
free_and_init(flush_volumes_info[i]);
}
}
if (blocks != NULL)
{
free_and_init(blocks);
}
return error_code;
}
본 시리즈의 글들은 CUBRID DB엔진 오픈 스터디를 진행하며 팀원들과 함께 공부한 내용을 정리한 것입니다.
Github 링크