이번 장에서는 리눅스 커널의 radix tree
에 대해 살펴 볼 것이다. 리눅스의 radix tree
는 (포인터) 값에 연관되어질 수 있는 (long) 정수 키이다. 이는 저장의 측면에서 그리고 탐색에 있어서도 합리적인 효율성을 가진다. 추가적으로, 리눅스 커널에서 radix tree
는 커널 요구 사항으로부터 유래된 일부 특징(특정 엔트리의 태그들과 연관하는 능력)들을 가진다.
상단의 다이어그램은 리눅스 radix tree
의 리프 노드의 모습을 보여준다. 노드는 다수의 슬롯들을 포함하고 이들은 트리의 제작자가 관심을 가지는 것들에 대한 무언가를 가르키는 포인터를 포함한다. 이 트리는 커널 2.6.16-rc
에 넒게 분포하여 있다. 각 radix tree
노드는 64
개의 슬롯이 있다. 슬롯은 정수 키의 한 부분으로 색인되어 진다. 만일 가장 큰 키의 값이 64 이하라면, 전체 트리는 단일 노드로 표현되어 질 수 있다. 그러나 일반적으로, 더 큰 키의 세트가 사용되어 진다. 그렇지 않다면 단순한 배열이 사용되어 진다. 따라서 더 큰 트리는 아래와 같이 나타날 수 있다:
위 트리는 3 단계
의 깊이이다. 만일 커널이 특정한 키를 찾는다면, 최상단 6 비트
는 루트 노드에서 적합한 슬롯을 찾는데 사용되어 질 것이다. 그 다음 6 비트
는 중간 노드의 슬롯을 색인하는데 사용된다. 그리고 가장 최하단 6 비트
는 실제 값이 저장되어 있는 슬롯(item
)을 나타내는데 사용되어진다.
radix tree
테스트 필자는 리눅스 커널에서 제공하는 테스트 코드를 활용하여 radix tree
에 대해 간단하게 설명하려 한다. 해당 테스트 코드는 tools/testing/radix-tree/
경로에 위치해 있다.
우선 tools/testing/radix-tree/
경로에 있는 프로그램을 빌드하기 위해서는 몇 가지 필요한 라이브러리가 있다.
> user-space rcu 설치
$ git clone "git://git.liburcu.org/userspace-rcu.git"
$ cd userspace-rcu
$ ./bootstrap
$ ./configure
$ make
# make install
# dnf install userspace-rcu
# dnf install userspace-rcu-devel
> `ls` 로 아무 것도 안 나오면 아래 명령어로 `libasan 설치`
$ ls /usr/lib64 | grep libasan
# dnf install libasan
위 명령어를 입력하여 필요한 라이브러리를 설치한다. 각 행의 시작에 나타나는 #
기호는 root
권한을 의미하고, $
기호는 일반 유저 권한을 의미한다.따라서 #
으로 시작하는 명령어는 반드시 sudo
를 선행하여 입력하라.
> `radix-tree` 테스트 코드 빌드
$ cd tools/testing/radix-tree/
# make
테스트 코드 빌드가 끝났다면 ./main
를 실행하여 실행결과를 확인하라. 약간의 시간이 걸릴 수 있다. 필자의 경우 실행에 대략 1분 정도 걸렸다.
더 자세한 결과를 확인하고 싶다면 main
함수에 extern int test_verbose = 2;
라는 구문을 적으면 된다. 그럼 아래와 같이 더 디테일할 실행 결과를 확인할 수 있다:
가장 먼저 실행되는 main
함수부터 천천히 아래로 내려가면서 코드를 분석해보겠다. 코드 분량이 매우 많아 전부 다 분석하는 것은 불가능하고 radix tree
와 관련된 부분만 분석하려 한다.
main.c
코드 분석 전에 include/linux/radix-tree.h
코드부터 잠깐 보려 한다.
/* Keep unconverted code working */
#define radix_tree_root xarray
#define radix_tree_node xa_node
struct radix_tree_iter {
unsigned long index;
unsigned long next_index;
unsigned long tags;
struct radix_tree_node *node;
};
radix tree
는 기본적으로 xarray
를 통해 구현된다. radix tree
를 초기화하는 코드 역시 이전 장에서 봤던 매크로를 통해 구현된다.
#define RADIX_TREE_INIT(name, mask) XARRAY_INIT(name, mask)
#define RADIX_TREE(name, mask) \
struct radix_tree_root name = RADIX_TREE_INIT(name, mask)
#define INIT_RADIX_TREE(root, mask) xa_init_flags(root, mask)
radix_tree_init()
함수 radix_tree_init()
함수는 radix_tree_node_cachep
변수를 초기화한다.
void __init radix_tree_init(void)
{
int ret;
BUILD_BUG_ON(RADIX_TREE_MAX_TAGS + __GFP_BITS_SHIFT > 32);
BUILD_BUG_ON(ROOT_IS_IDR & ~GFP_ZONEMASK);
BUILD_BUG_ON(XA_CHUNK_SIZE > 255);
radix_tree_node_cachep = kmem_cache_create("radix_tree_node",
sizeof(struct radix_tree_node), 0,
SLAB_PANIC | SLAB_RECLAIM_ACCOUNT,
radix_tree_node_ctor);
ret = cpuhp_setup_state_nocalls(CPUHP_RADIX_DEAD, "lib/radix:dead",
NULL, radix_tree_cpu_dead);
WARN_ON(ret < 0);
}
kmem_cache_create
함수는 슬랩 캐시 객체를 생성하는 함수인데 이 내용은 그것 자체로 설명할 내용이 너무나 많다. 간단하게 "radix_tree_node"
라는 캐시 객체를 생성하고, 이는 이후 radix tree
의 노드 생성에 사용된다.
BUILD_BUG_ON
매크로는 인자로 전달된 값이 0 인지 아닌지 확인하고 0 이 아니라면(true
) 컴파일을 중단 시킨다.
xarray_tests()
함수 xarray_tests()
함수는 다시 xarray_checks()
함수를 호출하는 래핑 함수이다.
void xarray_tests(void)
{
xarray_checks();
xarray_exit();
}
static int xarray_checks(void)
{
check_xa_err(&array);
check_xas_retry(&array);
check_xa_load(&array);
check_xa_mark(&array);
check_xa_shrink(&array);
check_xas_erase(&array);
check_insert(&array);
check_cmpxchg(&array);
check_reserve(&array);
check_reserve(&xa0);
check_multi_store(&array);
check_get_order(&array);
check_xa_alloc();
check_find(&array);
check_find_entry(&array);
check_pause(&array);
check_account(&array);
check_destroy(&array);
check_move(&array);
check_create_range(&array);
check_store_range(&array);
check_store_iter(&array);
check_align(&xa0);
check_split(&array);
check_workingset(&array, 0);
check_workingset(&array, 64);
check_workingset(&array, 4096);
printk("XArray: %u of %u tests passed\n", tests_passed, tests_run);
return (tests_run == tests_passed) ? 0 : -EINVAL;
}
xarray_checks()
함수는 각종 xarray
API 함수를 호출하고 그 결과를 반환한다. xarray
를 통해 구현되는 radix tree
의 동작에 대한 무결성을 검사했으므로 우리는 radix tree
에 집중해서 그 내용을 확인하면 된다.
regressionX_test()
함수이후에 나오는 regressionX_test()
류 함수들은 모두 radix tree
의 동작을 검사하는 함수들이다. 필자는 가장 처음 나온 regression1_test()
함수만 확인하려 한다.
void regression1_test(void)
{
int nr_threads;
int i;
long arg;
/* Regression #1 */
printv(1, "running regression test 1, should finish in under a minute\n");
nr_threads = 2;
pthread_barrier_init(&worker_barrier, NULL, nr_threads);
threads = malloc(nr_threads * sizeof(pthread_t *));
for (i = 0; i < nr_threads; i++) {
arg = i;
if (pthread_create(&threads[i], NULL, regression1_fn, (void *)arg)) {
perror("pthread_create");
exit(1);
}
}
for (i = 0; i < nr_threads; i++) {
if (pthread_join(threads[i], NULL)) {
perror("pthread_join");
exit(1);
}
}
free(threads);
printv(1, "regression test 1, done\n");
}
regression1_test()
함수는 두 개의 쓰레드를 생성해서 두 쓰레드는 regression1_fn
함수로 다시 분기된다. 쓰레드와 관련되어서는 POSIX Thread
라이브러리를 사용하는데 이는 여기에서 설명할만한 내용이 아니므로 설명을 생략하겠다. pthreads
에 대해서 잘 모른다면 다음을 참고하길 바란다.
regression1_fn()
함수static void *regression1_fn(void *arg)
{
rcu_register_thread();
if (pthread_barrier_wait(&worker_barrier) ==
PTHREAD_BARRIER_SERIAL_THREAD) {
int j;
for (j = 0; j < 1000000; j++) {
struct page *p;
p = page_alloc(0);
xa_lock(&mt_tree);
radix_tree_insert(&mt_tree, 0, p);
xa_unlock(&mt_tree);
p = page_alloc(1);
xa_lock(&mt_tree);
radix_tree_insert(&mt_tree, 1, p);
xa_unlock(&mt_tree);
xa_lock(&mt_tree);
p = radix_tree_delete(&mt_tree, 1);
pthread_mutex_lock(&p->lock);
p->count--;
pthread_mutex_unlock(&p->lock);
xa_unlock(&mt_tree);
page_free(p);
xa_lock(&mt_tree);
p = radix_tree_delete(&mt_tree, 0);
pthread_mutex_lock(&p->lock);
p->count--;
pthread_mutex_unlock(&p->lock);
xa_unlock(&mt_tree);
page_free(p);
}
} else {
int j;
for (j = 0; j < 100000000; j++) {
struct page *pages[10];
find_get_pages(0, 10, pages);
}
}
rcu_unregister_thread();
return NULL;
}
regression1_fn()
함수는 배리어를 통해 두 쓰레드의 행동 방식을 갈라친다. barrier
의 serial_thread
로 결정된 쓰레드는 for
문의 몸체를 10,000,000 번 반복 수행한다.
page_alloc()
함수 page_alloc()
함수는 struct page
구조체를 동적할당하고, 해당 구조체의 멤버 변수를 초기화한다. mutex
역시 초기화한다:
struct page {
pthread_mutex_t lock;
struct rcu_head rcu;
int count;
unsigned long index;
};
static struct page *page_alloc(int index)
{
struct page *p;
p = malloc(sizeof(struct page));
p->count = 1;
p->index = index;
pthread_mutex_init(&p->lock, NULL);
return p;
}
radix_tree_insert()
함수page
를 동적할당한 이후 radix_tree_insert()
함수를 호출하여 해당 객체를 radix tree
에 삽입한다.
#define RADIX_TREE_INIT(name, mask) XARRAY_INIT(name, mask)
#define RADIX_TREE(name, mask) \
struct radix_tree_root name = RADIX_TREE_INIT(name, mask)
static RADIX_TREE(mt_tree, GFP_KERNEL);
xa_lock(&mt_tree);
radix_tree_insert(&mt_tree, 0, p);
xa_unlock(&mt_tree);
p = page_alloc(1);
xa_lock(&mt_tree);
radix_tree_insert(&mt_tree, 1, p);
xa_unlock(&mt_tree);
mt_tree
는 전역으로 선언된 radix tree
객체이다. RADIX_TREE
매크로를 통해 선언하는데 이는 최종적으로 XARRAY_INIT
의 확장이다. 따라서 mt_tree
에 대해 xa_lock
, xa_unlock
(XArray
API 함수들)을 호출하는 것은 문제가 되지 않는다.
radix_tree_delete()
함수 radix_tree_delete()
함수는 두 번째 인자로 전달된 인덱스 키에 해당하는 엔트리를 삭제한다.
xa_lock(&mt_tree);
p = radix_tree_delete(&mt_tree, 1);
pthread_mutex_lock(&p->lock);
p->count--;
pthread_mutex_unlock(&p->lock);
xa_unlock(&mt_tree);
page_free(p);
xa_lock(&mt_tree);
p = radix_tree_delete(&mt_tree, 0);
pthread_mutex_lock(&p->lock);
p->count--;
pthread_mutex_unlock(&p->lock);
xa_unlock(&mt_tree);
page_free(p);
트리에 등록한 순서는 0, 1 이고 삭제는 그 반대인 1, 0 으로 진행한다.
find_get_pages()
함수 find_get_pages()
함수는 xarray
(이는 mt_tree
의 radix tree
이다) 를 순회하며 그 결과를 pages
포인터 배열에 저장한다. 조회 중에 버그가 발생할 수 있는데 자세한 사항은 regression1.c
코드 상단에 써 있는 주석문을 읽기 바란다.
static unsigned find_get_pages(unsigned long start,
unsigned int nr_pages, struct page **pages)
{
XA_STATE(xas, &mt_tree, start);
struct page *page;
unsigned int ret = 0;
rcu_read_lock();
xas_for_each(&xas, page, ULONG_MAX) {
if (xas_retry(&xas, page))
continue;
pthread_mutex_lock(&page->lock);
if (!page->count)
goto unlock;
/* don't actually update page refcount */
pthread_mutex_unlock(&page->lock);
/* Has the page moved? */
if (unlikely(page != xas_reload(&xas)))
goto put_page;
pages[ret] = page;
ret++;
continue;
unlock:
pthread_mutex_unlock(&page->lock);
put_page:
xas_reset(&xas);
}
rcu_read_unlock();
return ret;
}
benchmark()
함수 benchmark()
함수는 benchmark_size()
함수로 확장되고, benchmark_size()
함수는 다시 benchmark_iter()
함수로 확장되어 진다.
benchmark()
함수void benchmark(void)
{
unsigned long size[] = {1 << 10, 1 << 20, 0};
unsigned long step[] = {1, 2, 7, 15, 63, 64, 65,
128, 256, 512, 12345, 0};
int c, s;
printv(1, "starting benchmarks\n");
printv(1, "RADIX_TREE_MAP_SHIFT = %d\n", RADIX_TREE_MAP_SHIFT);
for (c = 0; size[c]; c++)
for (s = 0; step[s]; s++)
benchmark_size(size[c], step[s]);
}
size
그리고 step
배열의 값을 통해 benchmark_size()
함수를 반복적으로 호출한다.
benchmark_size()
함수static void benchmark_size(unsigned long size, unsigned long step)
{
RADIX_TREE(tree, GFP_KERNEL);
long long normal, tagged;
benchmark_insert(&tree, size, step);
benchmark_tagging(&tree, size, step);
tagged = benchmark_iter(&tree, true);
normal = benchmark_iter(&tree, false);
printv(2, "Size: %8ld, step: %8ld, tagged iteration: %8lld ns\n",
size, step, tagged);
printv(2, "Size: %8ld, step: %8ld, normal iteration: %8lld ns\n",
size, step, normal);
benchmark_delete(&tree, size, step);
item_kill_tree(&tree);
rcu_barrier();
}
benchmark_size()
함수는 radix tree
를 생성하고 tree
에 엔트리를 삽입, 태그를 붙여 benchmark_iter()
함수를 호출한다.
benchmark_iter()
함수static long long benchmark_iter(struct radix_tree_root *root, bool tagged)
{
volatile unsigned long sink = 0;
struct radix_tree_iter iter;
struct timespec start, finish;
long long nsec;
int l, loops = 1;
void **slot;
#ifdef BENCHMARK
again:
#endif
clock_gettime(CLOCK_MONOTONIC, &start);
for (l = 0; l < loops; l++) {
if (tagged) {
radix_tree_for_each_tagged(slot, root, &iter, 0, 0)
sink ^= (unsigned long)slot;
} else {
radix_tree_for_each_slot(slot, root, &iter, 0)
sink ^= (unsigned long)slot;
}
}
clock_gettime(CLOCK_MONOTONIC, &finish);
nsec = (finish.tv_sec - start.tv_sec) * NSEC_PER_SEC +
(finish.tv_nsec - start.tv_nsec);
#ifdef BENCHMARK
if (loops == 1 && nsec * 5 < NSEC_PER_SEC) {
loops = NSEC_PER_SEC / nsec / 4 + 1;
goto again;
}
#endif
nsec /= loops;
return nsec;
}
benchmark_iter()
함수는 radix_tree_for_each_tagged
와 radix_tree_for_each_slot
매크로(이들은 다시 radix_tree_next_chunk()
로 확장된다) 를 통해 트리의 슬롯들을 탐색하여 걸린 시간을 출력한다.
radix tree
테스트 모듈 작성 위에서 살펴본 기본적인 API 함수들을 바탕으로 직접 radix tree
를 테스트하는 코드를 작성해보겠다.
#include <linux/xarray.h>
#include <linux/radix-tree.h>
#include <linux/module.h>
#include <linux/slab.h>
struct item {
struct rcu_head rcu_head;
unsigned long index;
unsigned int order;
};
struct item *item_create(unsigned long index, unsigned int order)
{
struct item *item = kmalloc(sizeof(*item), GFP_KERNEL);
item->index = index;
item->order = order;
return item;
}
int item_insert(struct radix_tree_root *root, unsigned long index)
{
struct item *item;
int err;
item = item_create(index, (index + 1) * 100);
err = radix_tree_insert(root, item->index, item);
if (err)
kfree(item);
return err;
}
int item_delete(struct radix_tree_root *root, unsigned long index)
{
struct item *item;
item = radix_tree_delete(root, index);
if (!item)
return 0;
kfree(item);
return 1;
}
static int __init radix_test_init(void)
{
int i;
struct item *item;
RADIX_TREE(rdx, GFP_KERNEL);
pr_info("item_test starting...\n");
for (i = 0; i < 5; i++) {
item_insert(&rdx, i);
pr_info("item_insert(): index = %d\n", i);
}
for (i = 0; i < 5; i++) {
item = radix_tree_lookup(&rdx, i);
pr_info("lookup(): index = %d, order = %d\n",
item->index, item->order);
}
for (i = 0; i < 5; i++) {
item_delete(&rdx, i);
pr_info("item_delete(): index = %d\n", i);
}
pr_info("item_test end.\n");
return 0;
}
static void __exit radix_test_exit(void)
{
pr_info("xarray_exit() end.\n");
}
module_init(radix_test_init);
module_exit(radix_test_exit);
MODULE_AUTHOR("Yeounsu Moon <yyyynoom@gmail.com>");
MODULE_LICENSE("GPL");
위 코드는 radix tree
에 struct item
구조체를 5번 동적할당하고 삽입하여 그 결과를 조회한 후 다시 삭제하는 과정을 진행하는 단순한 코드이다. 해당 코드를 컴파일하여 모듈을 삽입하고, 제거하면 실행 결과를 확인할 수 있다.
[사이트] https://liburcu.org/
[사이트] https://lwn.net/Articles/175432/
[사이트] https://github.com/google/sanitizers/wiki/AddressSanitizer
[사이트][책] 리눅스 커널 소스 해설: 기초 입문 (정재준 저)