Overview
- Process
- rook cmd startOperator -> (reconcile process) -> create mon, mgr, osd-prepare(runPrepareJob), osd yaml
- [in osd-prepare yaml] ceph cmd osd provision -> osd daemon
- [in osd yaml] ceph-osd 실행
Rook Code
- cmd/rook의 command 실행(startOperator)에서 pkg/operator와 같은 실제 로직으로 이어짐
- cmd src
- cmd/rook/ceph/operator.go
- operator
- opcontroller
- pkg/operator/ceph/controller
- 아래 OSD-Prepare Deployment 같은 애들이 실행하는 /rook/rook command가 cmd/rook에 정의된 명령이고 args를 어떻게 주느냐에 따라서 pkg의 관련 실제 로직이 실행됨
- e.g. rook operator yaml
- containers:
- args:
- command는 정의가 안되어있음. 정의 안되어있으면 Rook 이미지의 Dockerfile의 ENTRYPOINT 실행
- ENTRYPOINT ["/usr/local/bin/rook"]
cmd startOperator of rook command
- 위에서 언급했듯 operator가 뜨면서 rook command를 통해 startOperator가 실행되고 아래와 같은 순으로 실행됨
- CephCluster CR을 watch하고 reconcile하는 controller(operator)를 생성하고 실행
- op := operator.New(context, rookImage, serviceAccountName)
- o.clusterController = cluster.NewClusterController(context, rookImage)
- err = op.Run()
- o.runCRDManager() // Start the CRD manager
- go o.startCRDManager(opManagerContext, mgrCRDErrorChan) // Run the operator CRD manager
- src
- pkg/operator/ceph/cr_manager.go
- mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), mgrOpts) // setting up the controller-runtime manager
- err = o.addToManager(mgr, controllerOpts, context, *o.config) // Add the registered controllers to the manager (entrypoint for controllers)
- cluster.Add(m, c.ClusterdContext, o.clusterController, opManagerContext, opconfig)
- src
- pkg/operator/ceph/cluster/controller.go
- add(opManagerContext, mgr, newReconciler(mgr, ctx, clusterController, opManagerContext, opConfig), ctx, opConfig)
- 이게 내가 아는 특정 custom resource watch 시키는 controller interface
- func (r *ReconcileCephCluster) Reconcile(context context.Context, request reconcile.Request) (reconcile.Result, error)
- 이것도 내가 아는 Reconcile controller interface
- reconcileResponse, cephCluster, err := r.reconcile(request)
- err := r.clusterController.reconcileCephCluster(cephCluster, ownerInfo) // Do reconcile here!
- cluster, ok := c.clusterMap[clusterObj.Namespace]
- (or) cluster = newCluster(c.OpManagerCtx, clusterObj, c.context, ownerInfo)
- c.initializeCluster(cluster) // Start the main ceph cluster orchestration
- 여기서부터가 본격 ceph cluster initialization이 시작되는 로직
- err := c.configureExternalCephCluster(cluster)
- or
- clusterInfo, , , err := controller.LoadClusterInfo(c.context, c.OpManagerCtx, cluster.Namespace, cluster.Spec) // check already configured existing cluster
- rook-ceph-mgr이 떠있는지 확인해서 이미 존재하는 configured cluster가 있는지 확인. mgr이 정상동작한다는 것은 mon도 정상 동작한다는 뜻.
- mgr이 정상동작하여 이미 동작하는 configured cluster가 있다는 것이 확인되면 바로 monitoring configuration을 실행하고 리턴
- err = c.configureLocalCephCluster(cluster)
- err = cluster.reconcileCephDaemons(c.rookImage, *cephVersion)
- 여기서부터 ceph component를 bring-up
- clusterInfo, err := c.mons.Start(c.ClusterInfo, rookImage, cephVersion, *c.Spec) // Start the mon pods
- err = mgrs.Start() // Start Ceph manager
- err = osds.Start() // Start the OSDs
- logger.Infof("done reconciling ceph cluster in namespace %q", c.Namespace)
- controller.UpdateCondition(c.OpManagerCtx, c.context, c.namespacedName, cluster.observedGeneration, cephv1.ConditionReady, v1.ConditionTrue, cephv1.ClusterCreatedReason, "Cluster created successfully")
- go cluster.reportTelemetry() // Asynchronously report the telemetry to allow another reconcile to proceed if needed
- err := csi.SaveCSIDriverOptions(c.context.Clientset, cluster.Namespace, cluster.ClusterInfo)
- c.configureCephMonitoring(cluster, cluster.ClusterInfo) // Start the monitoring if not already started
- err := mgr.Start(context) // starting the controller-runtime manager
- logger.Info("successfully started the controller-runtime manager")
// AddToManagerFuncsMaintenance is a list of functions to add all Controllers to the Manager (entrypoint for controller)
var AddToManagerFuncsMaintenance = []func(manager.Manager, *controllerconfig.Context) error{
clusterdisruption.Add,
}
// AddToManagerFuncs is a list of functions to add all Controllers to the Manager (entrypoint for controller)
var AddToManagerFuncs = []func(manager.Manager, *clusterd.Context, context.Context, opcontroller.OperatorConfig) error{
nodedaemon.Add,
pool.Add,
objectuser.Add,
realm.Add,
zonegroup.Add,
zone.Add,
object.Add,
file.Add,
nfs.Add,
rbd.Add,
client.Add,
mirror.Add,
Add,
csi.Add,
bucket.Add,
topic.Add,
notification.Add,
subvolumegroup.Add,
radosnamespace.Add,
cosi.Add,
}
// AddToManagerOpFunc is a list of functions to add all Controllers to the Manager (entrypoint for
// controller)
// var AddToManagerOpFunc = []func(manager.Manager, *clusterd.Context, opcontroller.OperatorConfig) error{}
// AddToManager adds all the registered controllers to the passed manager.
// each controller package will have an Add method listed in AddToManagerFuncs
// which will setup all the necessary watch
func (o *Operator) addToManager(m manager.Manager, c *controllerconfig.Context, opManagerContext context.Context, opconfig opcontroller.OperatorConfig) error {
if c == nil {
return errors.New("nil context passed")
}
// Run CephCluster CR
if err := cluster.Add(m, c.ClusterdContext, o.clusterController, opManagerContext, opconfig); err != nil {
return err
}
// Add Ceph child CR controllers
for _, f := range AddToManagerFuncs {
if err := f(m, c.ClusterdContext, opManagerContext, *o.config); err != nil {
return err
}
}
// Add maintenance controllers
for _, f := range AddToManagerFuncsMaintenance {
if err := f(m, c); err != nil {
return err
}
}
return nil
}
cmd startOSD - daemon start
- startOSD는 이미 parameter로 대상이 될 blockPath를 받고 시작함. 누군가가 prepare 단계에서 얻어낸 list(ceph-volume info list)를 기반으로 각 item에 대해 startOSD를 실행하는 듯. 해당 로직은 어디 있는가???
- Operator 쪽에 있을 듯..
- /pkg/operator/ceph/cluster/osd/osd.go
- 지금 다시보니 startOSD 안쓰는 듯. startOSD도 결국 ceph-volume(이건 또 ceph-volume lvm activate ~ 이렇게 시작)에 ceph-osd 실행해주는 정도가 끝인 것 같음. 다른 상황에서 쓰는진 모르겠지만 처음 bring-up시에는 적어도 이거 안쓰고 그냥 OSD Deployment의 container command-args에서 ceph-osd 명령어를 바로 실행하고 있음
Operator osds.Start()
- cmd의 startOSD가 아니라 Operator의 osds.Start()를 통해 OSD Deployment들이 실행됨
- /pkg/operator/ceph/cluster/osd/osd.go
- Operator의 osds.Start()는 어떤 경로를 통해 실행하는 것인가
- 위 cmd startOperator에 정리해둠
- osds.Start()
- osd-prepare(provision)
- updateQueue, deployments, err := c.getOSDUpdateInfo(errs) // prepare for updating existing OSDs
- updateQueue := newUpdateQueueWithCapacity(len(deps.Items))
- existenceList := newExistenceListWithCapacity(len(deps.Items))
- id, err := getOSDID(&deps.Items[i])
- existenceList.Add(id)
- updateQueue.Push(id)
- updateConfig := c.newUpdateConfig(config, updateQueue, deployments, osdsToSkipReconcile)
- statusConfigMaps := sets.Newstring // prepare for creating new OSDs
- pvcConfigMaps, err := c.startProvisioningOverPVCs(config, errs) // start provisioning the OSDs on PVCs, if needed
- statusConfigMaps = statusConfigMaps.Union(pvcConfigMaps)
- nodeConfigMaps, err := c.startProvisioningOverNodes(config, errs) // start provisioning the OSDs on nodes, if needed
- statusConfigMaps = statusConfigMaps.Union(nodeConfigMaps)
- createConfig := c.newCreateConfig(config, statusConfigMaps, deployments)
- osd-deployment
- createConfig := c.newCreateConfig(config, statusConfigMaps, deployments)
- err = c.updateAndCreateOSDs(createConfig, updateConfig, errs) // do the update and create operations // do the update and create operations
- doLoop, err = c.updateAndCreateOSDsLoop(createConfig, updateConfig, minuteTicker, errs)
- c.createOSDsForStatusMap(&configMapList.Items[i], createConfig, errs) // statusConfigMap의 node 한개 한개에 대해 OSD 생성 프로세스 시작
- // Create OSD Deployments for OSDs reported by the prepare job status configmap.
- // Do not create OSD deployments if a deployment already exists for a given OSD.
- status := parseOrchestrationStatus(configMap.Data)
- logger.Infof("OSD orchestration status for %s %s is %q", nodeOrPVC, nodeOrPVCName, status.Status)
- createConfig.createNewOSDsFromStatus(status, nodeOrPVCName, errs)
- c.deleteStatusConfigMap(nodeOrPVCName) // remove the provisioning status configmap
- c.finishedStatusConfigMaps.Has(statusConfigMapName(nodeOrPVCName))
- for _, osd := range status.OSDs {
- // 이미 osd.ID 를 갖는 osd deployment가 있으면 skip
- if c.deployments.Exists(osd.ID) {
- // This OSD will be handled by the updater
- logger.Debugf("not creating deployment for OSD %d which already exists", osd.ID)
- continue
- }
- logger.Infof("creating OSD %d on PVC %q", osd.ID, nodeOrPVCName)
- err := createDaemonOnPVCFunc(c.cluster, osd, nodeOrPVCName, c.provisionConfig)
- or
- logger.Infof("creating OSD %d on node %q", osd.ID, nodeOrPVCName)
- err := createDaemonOnNodeFunc(c.cluster, osd, nodeOrPVCName, c.provisionConfig)
- c.doneWithStatus(nodeOrPVCName)
- c.deleteStatusConfigMap(nodeOrPVCName) // remove the provisioning status configmap
- 아래부터는 updateTicker를 이용해 계속 loop 돌면서 최종 osd의 state update를 수행
- // tick after a short time of waiting for new OSD provision status configmaps to change state
- // in order to allow opportunistic deployment updates while we wait
- case <-updateTicker.C:
- // do an update
- updateConfig.updateExistingOSDs(errs)
- osdIDQuery, _ := c.queue.Pop()
- osdIDs, err = cephclient.OSDOkToStop(c.cluster.context, c.cluster.clusterInfo, osdIDQuery, maxUpdatesInParallel) // OSDOkToStop returns a list of OSDs that can be stopped that includes the OSD ID given. This is relevant, for example, when checking which OSDs can be updated. The number of OSDs returned is limited by the value set in maxReturned. // maxReturned=0 is the same as maxReturned=1.
- buf, err := NewCephCommand(context, clusterInfo, args).Run()
- 이것으로 전체 process가 마무리 됨
- delete statusConfigMaps
- c.deleteAllStatusConfigMaps()
cmd OSD-Prepare
- cmd/rook
- ceph osd provision
- prepareOSD
- agent := osddaemon.NewAgent(context, dataDevices, cfg.metadataDevice, forceFormat, cfg.storeConfig, &clusterInfo, cfg.nodeName, kv, replaceOSD, cfg.pvcBacked)
- err = osddaemon.Provision(context, agent, crushLocation, topologyAffinity, deviceFilter, metaDevice)
- osddaemon.Provision은 pkg/daemon/ceph/osd/daemon.go에 존재
- status := oposd.OrchestrationStatus{Status: oposd.OrchestrationStatusOrchestrating}
- oposd.UpdateNodeOrPVCStatus(agent.clusterInfo.Context, agent.kv, agent.nodeName, status)
- // UpdateNodeOrPVCStatus updates the status ConfigMap for the OSD on the given node or PVC. It returns the name the ConfigMap used.
- logger.Infof("discovering hardware")
- 이 사이가 local disk scan하는 코드
- pvcBacked인지 아닌지에 따라 다름
- logger.Info("creating and starting the osds")
- devices, err := getAvailableDevices(context, agent)
- desiredDevices := agent.devices
- logger.Debugf("desiredDevices are %+v", desiredDevices)
- logger.Debug("context.Devices are:")
- logger.Infof("skipping device %q because it contains a filesystem %q", device.Name, device.Filesystem)
- 내부적으로 이런 명령어 실행
- lsblk --bytes --nodeps --pairs --paths --output SIZE,ROTA,RO,TYPE,PKNAME,NAME,KNAME,MOUNTPOINT,FSTYPE
- deviceOSDs, err := agent.configureCVDevices(context, devices)
- rawOsds, err = GetCephVolumeRawOSDs(context, a.clusterInfo, a.clusterInfo.FSID, block, "", "", false, false
- // GetCephVolumeRawOSDs list OSD prepared with raw mode.
- result, err := callCephVolume(context, args...)
- 내부적으로 이런 명령어 실행
- stdbuf -oL ceph-volume --log-path /tmp/ceph-log raw list --format json
- cephVolumeResult 생성
- err = createOSDBootstrapKeyring(context, a.clusterInfo, cephConfigDir)
- // Let's fail if no OSDs were configured
- if len(deviceOSDs) == 0 {
logger.Warningf("skipping OSD configuration as no devices matched the storage settings for this node %q", agent.nodeName)
status = oposd.OrchestrationStatus{OSDs: deviceOSDs, Status: oposd.OrchestrationStatusCompleted, PvcBackedOSD: agent.pvcBacked}
oposd.UpdateNodeOrPVCStatus(agent.clusterInfo.Context, agent.kv, agent.nodeName, status)
return nil
}
- // Populate CRUSH location for each OSD on the host
for i := range deviceOSDs {
deviceOSDs[i].Location = crushLocation
deviceOSDs[i].TopologyAffinity = topologyAffinity
}
- logger.Infof("devices = %+v", deviceOSDs)
- status = oposd.OrchestrationStatus{OSDs: deviceOSDs, Status: oposd.OrchestrationStatusCompleted, PvcBackedOSD: agent.pvcBacked}
- oposd.UpdateNodeOrPVCStatus(agent.clusterInfo.Context, agent.kv, agent.nodeName, status)
- prepare 단계에서 scan되는 Node의 device중 새롭게 configuration(bluestore + osd 설치)이 필요한 device만 실질적인 prepare 로직이 동작은 하나, provision을 통해 반환되는 deviceOSDs엔 현존하는 모든 osd에 대한 정보(ceph-volume list 정보)가 들어있고 이는 StatusConfigMap에 저장되어 osd deployment 단계에 전달됨
- node 또는 pvc 단위로 저장됨에 주목!
OSD Deployment
- initContainers
- activate
- command:
- /bin/bash -c "<shell_script>"
- volumeMounts:
- mountPath: /var/lib/ceph/osd/ceph-106 (container 내부 path)
name: activate-osd
- expand-bluefs
- command:
- args:
- bluefs-bdev-expand
- '--path'
- /var/lib/ceph/osd/ceph-106
- chown-container-data-dir
- command:
- args:
- '--verbose'
- '--recursive'
- ceph:ceph
- /var/log/ceph
- /var/lib/ceph/crash
- /run/ceph
- volumes
- hostPath:
- path: >-
/var/lib/rook/rook-ceph/3dc5c7e0-34ab-4e75-9e09-15d2fe08ee31_f99e9ec0-1516-4077-8f0c-da866f66e969 (host path)
type: DirectoryOrCreate
name: activate-osd