Messenger

골덕·2025년 4월 5일
  • Messenger & Dispatcher & SharedThreadPool

구조
PG는 OSD의 여러 Shard(Thread) 중 하나에 매핑됨
PG 자신이 Shard는 아니고, OSD 내부에서 특정 Shard가 해당 PG를 담당
하나의 OSDShard는 여러 개의 PG를 관리

예를 들어:

하나의 OSD에 1000개의 PG가 있고,

osd_op_num_shards_ssd = 8 (default)로 설정되면 → 8개의 OSDShard가 생성

각 OSDShard가 약 125개씩 PG를 나눠 갖는 구조가 되는 거야.

각 OSD Shard는 자신이 관리하는 PG에 대한 meta를 pgslot이라는 구조체로 갖고 있음
PG 하나당 Object의 논리적 묶음인 collection을 갖고 있음 ( coll_t(pgid) )
코드 흐름 : ceph_osd.cc (OSD) → Messenger  
int main(int argc, const char **argv)

osdptr = new OSD(g_ceph_context,
std::move(store),
whoami,
ms_cluster,
ms_public,
ms_hb_front_client,
ms_hb_back_client,
ms_hb_front_server,
ms_hb_back_server,
ms_objecter,
&mc,
data_path,
journal_path,
poolctx);

// Determine scheduler type for this OSD

op_queue_type_t op_queue = get_op_queue_type();

아마 op_queue_type_t::mClockScheduler

// Determine op queue cutoff

unsigned op_queue_cut_off = get_op_queue_cut_off()

num_shards = get_num_op_shards();

cct->_conf->osd_op_num_shards_ssd

for (uint32_t i = 0; i < num_shards; i++) {
OSDShard *one_shard = new OSDShard(
i,
cct,
this,
op_queue,
op_queue_cut_off);
shards.push_back(one_shard);
}

int err = osdptr->pre_init();

ms_public->start();
ms_hb_front_client->start();
ms_hb_back_client->start();
ms_hb_front_server->start();
ms_hb_back_server->start();
ms_cluster->start();
ms_objecter->start();

// start osd

err = osdptr->init();

// load up pgs (as they previously existed)

load_pgs();

vector<coll_t> ls;
int r = store->list_collections(ls);

for (vector<coll_t>::iterator it = ls.begin();
it != ls.end();
++it) {

pgid 불러옴

spg_t pgid;

it->is_temp(&pgid)

pg의 epoch 확인

epoch_t map_epoch = 0;

int r = PG::peek_map_epoch(store.get(), pgid, &map_epoch);

pg 객체 생성

if (map_epoch > 0) 면

OSDMapRef pgosdmap = service.try_get_map(map_epoch);

pg = _make_pg(pgosdmap, pgid);

아니면

pg = _make_pg(get_osdmap(), pgid);

pg initialize

pg->lock();

pg->ch = store->open_collection(pg->coll);

// read pg state, log

pg->read_state(store.get());

// "PG는 OSD의 여러 Shard(Thread) 중 하나에 매핑되며, 그 때 사용하는 Index가 Shard Index이다. PG 자신이 Shard는 아니고, OSD 내부에서 특정 Shard가 해당 PG를 담당

uint32_t shard_index = pgid.hash_to_shard(shards.size());

store->set_collection_commit_queue(pg->coll, &(shards[shard_index]->context_queue));

이 코드는 PG를 어떤 Shard에 매핑하고, 해당 Shard의 context_queue (작업 큐) 를 해당 PG(Collection)의 커밋 큐로 설정

register_pg(pg);

PGSlot은 특정 PG가 어떤 OSDShard에 속해 있고,
그 PG가 어떤 epoch에 동작하고 있는지를 담는 메타 구조체
즉, OSDShard ↔ PG 연결을 담당하는 구조체

auto r = sdata->pg_slots.emplace(pgid, make_unique());

pgid를 키로 pg_slots map에 OSDShardPGSlot을 하나 생성해서 넣음

sdata->_attach_pg(slot, pg.get());

양방향 링크 설정

slot이 어떤 PG를 가리키는지

PG가 어떤 OSDShard에 속해 있는지, 어떤 slot에 연결되었는지

// i'm ready!
client_messenger->add_dispatcher_tail(&mgrc);
client_messenger->add_dispatcher_tail(this);
cluster_messenger->add_dispatcher_head(this);
hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
objecter_messenger->add_dispatcher_head(service.objecter.get());
osd_op_tp.start();

osdptr->final_init();

profile
다시 시작하는 개발자

0개의 댓글