kubekey의 내부 주요 요소은 Pipeline, Module, Task, Action을 중심으로 k8s cluste 설치 과정에 대해 심도있게 분석해 본다.
{
"version": "0.2.0",
"configurations": [
{
"name": "Create cluster(utcl)",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceRoot}/cmd/kk",
"args": ["create", "cluster", "-f", "${workspaceRoot}/bin/utcl-cluster.yaml", "-y"]
},
{
"name": "Upgrade cluster(utcl)",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceRoot}/cmd/kk",
"args": ["upgrade", "-f", "${workspaceRoot}/bin/utcl-cluster.yaml", "-y", "--skip-dependency-check"]
},
{
"name": "Delete cluster(utcl)",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceRoot}/cmd/kk",
"args": ["delete", "cluster", "-f", "${workspaceRoot}/bin/utcl-cluster.yaml", "-A", "-y"]
},
]
}
package는 core부분과 core를 이용하여 로직을 구현한 부분(예, pipeline, plugins, 기타)로 크게 나눌수 있다.
cobra library를 활용하는 kubekey의 subcommand의 Run() 메소드가 호출되면 그에 해당하는 Pipeline 함수가 실행된다. Pipeline은 N개의 모듈로 구성되고 하나의 모듈은 N개의 task로 구성된다. Task는 N개의 Action으로 구성된다.
예를 들어, 클러스터를 생성하는 부분을 살펴보자.
cmd/kk/cmd/create/cluster.go
// kubekey create cluster 커맨드 실행시 호출되는 함수
func NewCmdCreateCluster() *cobra.Command {
o := NewCreateClusterOptions()
cmd := &cobra.Command{
Use: "cluster",
Short: "Create a Kubernetes or KubeSphere cluster",
Run: func(cmd *cobra.Command, args []string) {
util.CheckErr(o.Complete(cmd, args))
util.CheckErr(o.Validate(cmd, args))
util.CheckErr(o.Run())
},
}
o.CommonOptions.AddCommonFlag(cmd)
o.AddFlags(cmd)
if err := completionSetting(cmd); err != nil {
panic(fmt.Sprintf("Got error with the completion setting"))
}
return cmd
}
...
// 실제로 pipeline을 호출하는 함수
func (o *CreateClusterOptions) Run() error {
arg := common.Argument{
FilePath: o.ClusterCfgFile,
KubernetesVersion: o.Kubernetes,
KsEnable: o.EnableKubeSphere,
KsVersion: o.KubeSphere,
SkipPullImages: o.SkipPullImages,
SkipPushImages: o.SkipPushImages,
SecurityEnhancement: o.SecurityEnhancement,
Debug: o.CommonOptions.Verbose,
IgnoreErr: o.CommonOptions.IgnoreErr,
SkipConfirmCheck: o.CommonOptions.SkipConfirmCheck,
ContainerManager: o.ContainerManager,
Artifact: o.Artifact,
InstallPackages: o.InstallPackages,
Namespace: o.CommonOptions.Namespace,
}
// local PV provisioner 설치 여부
if o.localStorageChanged {
deploy := o.LocalStorage
arg.DeployLocalStorage = &deploy
}
// create cluster 뒤의 옵션 정보와 사용자가 추가적으로 binary를 다운로드 하기 위해 설정한 명령어를 인수로 하여 pipeline 호출
return pipelines.CreateCluster(arg, o.DownloadCmd)
}
cluster를 생성하는 module을 포함하는 pipeline은 아래와 같다.
cmd/kk/pkg/pipelines/create_cluster.go 의 일부
func NewCreateClusterPipeline(runtime *common.KubeRuntime) error {
noArtifact := runtime.Arg.Artifact == ""
skipPushImages := runtime.Arg.SkipPushImages || noArtifact || (!noArtifact && runtime.Cluster.Registry.PrivateRegistry == "")
skipLocalStorage := true
if runtime.Arg.DeployLocalStorage != nil {
skipLocalStorage = !*runtime.Arg.DeployLocalStorage
} else if runtime.Cluster.KubeSphere.Enabled {
skipLocalStorage = false
}
m := []module.Module{
&precheck.GreetingsModule{},
&customscripts.CustomScriptsModule{Phase: "PreInstall", Scripts: runtime.Cluster.System.PreInstall},
&precheck.NodePreCheckModule{},
&confirm.InstallConfirmModule{},
&artifact.UnArchiveModule{Skip: noArtifact},
&os.RepositoryModule{Skip: noArtifact || !runtime.Arg.InstallPackages},
&binaries.NodeBinariesModule{},
&os.ConfigureOSModule{Skip: runtime.Cluster.System.SkipConfigureOS},
&kubernetes.StatusModule{},
&container.InstallContainerModule{},
&container.InstallCriDockerdModule{Skip: runtime.Cluster.Kubernetes.ContainerManager != "docker"},
&images.CopyImagesToRegistryModule{Skip: skipPushImages},
&images.PullModule{Skip: runtime.Arg.SkipPullImages},
&etcd.PreCheckModule{Skip: runtime.Cluster.Etcd.Type != kubekeyapiv1alpha2.KubeKey},
&etcd.CertsModule{},
&etcd.InstallETCDBinaryModule{Skip: runtime.Cluster.Etcd.Type != kubekeyapiv1alpha2.KubeKey},
&etcd.ConfigureModule{Skip: runtime.Cluster.Etcd.Type != kubekeyapiv1alpha2.KubeKey},
&etcd.BackupModule{Skip: runtime.Cluster.Etcd.Type != kubekeyapiv1alpha2.KubeKey},
&kubernetes.InstallKubeBinariesModule{},
// init kubeVip on first master
&loadbalancer.KubevipModule{Skip: !runtime.Cluster.ControlPlaneEndpoint.IsInternalLBEnabledVip()},
&kubernetes.InitKubernetesModule{},
&dns.ClusterDNSModule{},
&kubernetes.StatusModule{},
&kubernetes.JoinNodesModule{},
// deploy kubeVip on other masters
&loadbalancer.KubevipModule{Skip: !runtime.Cluster.ControlPlaneEndpoint.IsInternalLBEnabledVip()},
&loadbalancer.HaproxyModule{Skip: !runtime.Cluster.ControlPlaneEndpoint.IsInternalLBEnabled()},
&network.DeployNetworkPluginModule{},
&kubernetes.ConfigureKubernetesModule{},
&filesystem.ChownModule{},
&certs.AutoRenewCertsModule{Skip: !runtime.Cluster.Kubernetes.EnableAutoRenewCerts()},
&kubernetes.SecurityEnhancementModule{Skip: !runtime.Arg.SecurityEnhancement},
&kubernetes.SaveKubeConfigModule{},
&plugins.DeployPluginsModule{},
&addons.AddonsModule{},
&storage.DeployLocalVolumeModule{Skip: skipLocalStorage},
&kubesphere.DeployModule{Skip: !runtime.Cluster.KubeSphere.Enabled},
&kubesphere.CheckResultModule{Skip: !runtime.Cluster.KubeSphere.Enabled},
&customscripts.CustomScriptsModule{Phase: "PostInstall", Scripts: runtime.Cluster.System.PostInstall},
}
p := pipeline.Pipeline{
Name: "CreateClusterPipeline",
Modules: m,
Runtime: runtime,
}
if err := p.Start(); err != nil {
return err
}
if runtime.Cluster.KubeSphere.Enabled {
fmt.Print(`Installation is complete.
Please check the result using the command:
kubectl logs -n kubesphere-system $(kubectl get pod -n kubesphere-system -l 'app in (ks-install, ks-installer)' -o jsonpath='{.items[0].metadata.name}') -f
`)
} else {
fmt.Print(`Installation is complete.
Please check the result using the command:
kubectl get pod -A
`)
}
return nil
}
cluster를 upgrade하는 module을 포함하는 pipeline은 아래와 같다.
func NewUpgradeClusterPipeline(runtime *common.KubeRuntime) error {
noArtifact := runtime.Arg.Artifact == ""
skipUpgradeETCD := (runtime.Cluster.Etcd.Type != kubekeyapiv1alpha2.KubeKey) || (runtime.Arg.EtcdUpgrade == false)
m := []module.Module{
&precheck.GreetingsModule{},
&precheck.NodePreCheckModule{},
&precheck.ClusterPreCheckModule{SkipDependencyCheck: runtime.Arg.SkipDependencyCheck},
&confirm.UpgradeConfirmModule{Skip: runtime.Arg.SkipConfirmCheck},
&artifact.UnArchiveModule{Skip: noArtifact},
&binaries.NodeBinariesModule{},
&container.InstallCriDockerdModule{Skip: runtime.Cluster.Kubernetes.ContainerManager != "docker"},
&etcd.PreCheckModule{Skip: skipUpgradeETCD},
&etcd.CertsModule{Skip: skipUpgradeETCD},
&etcd.InstallETCDBinaryModule{Skip: skipUpgradeETCD},
&etcd.ConfigureModule{Skip: skipUpgradeETCD},
&etcd.BackupModule{Skip: skipUpgradeETCD},
&kubernetes.SetUpgradePlanModule{Step: kubernetes.ToV121},
&kubernetes.ProgressiveUpgradeModule{Step: kubernetes.ToV121},
&loadbalancer.HaproxyModule{Skip: !runtime.Cluster.ControlPlaneEndpoint.IsInternalLBEnabled()},
&kubesphere.CleanClusterConfigurationModule{Skip: !runtime.Cluster.KubeSphere.Enabled},
&kubesphere.ConvertModule{Skip: !runtime.Cluster.KubeSphere.Enabled},
&kubesphere.DeployModule{Skip: !runtime.Cluster.KubeSphere.Enabled},
&kubesphere.CheckResultModule{Skip: !runtime.Cluster.KubeSphere.Enabled},
&kubernetes.SetUpgradePlanModule{Step: kubernetes.ToV122},
&kubernetes.ProgressiveUpgradeModule{Step: kubernetes.ToV122},
&filesystem.ChownModule{},
&certs.AutoRenewCertsModule{Skip: !runtime.Cluster.Kubernetes.EnableAutoRenewCerts()},
}
p := pipeline.Pipeline{
Name: "UpgradeClusterPipeline",
Modules: m,
Runtime: runtime,
}
if err := p.Start(); err != nil {
return err
}
return nil
}
Pipeline은 core module 하위에 위치해 있으며 Pipeline 구조체에 정의되어 있다. Pipeline 구조체에는 실행해야 할 Module 배열과 PostHook을 실행할 수 있는 module.PostHookInterface 배열이 포함되어 있다.
cmd/kk/pkg/core/pipeline/pipeline.go
var logo = `
_ __ _ _ __
| | / / | | | | / /
| |/ / _ _| |__ ___| |/ / ___ _ _
| \| | | | '_ \ / _ \ \ / _ \ | | |
| |\ \ |_| | |_) | __/ |\ \ __/ |_| |
\_| \_/\__,_|_.__/ \___\_| \_/\___|\__, |
__/ |
|___/
`
type Pipeline struct {
Name string
Modules []module.Module
Runtime connector.Runtime
SpecHosts int
PipelineCache *cache.Cache
ModuleCachePool sync.Pool
ModulePostHooks []module.PostHookInterface
}
func (p *Pipeline) Init() error {
fmt.Print(logo)
p.PipelineCache = cache.NewCache()
p.SpecHosts = len(p.Runtime.GetAllHosts())
//if err := p.Runtime.GenerateWorkDir(); err != nil {
// return err
//}
//if err := p.Runtime.InitLogger(); err != nil {
// return err
//}
return nil
}
func (p *Pipeline) Start() error {
if err := p.Init(); err != nil {
return errors.Wrapf(err, "Pipeline[%s] execute failed", p.Name)
}
for i := range p.Modules {
m := p.Modules[i]
if m.IsSkip() {
continue
}
moduleCache := p.newModuleCache()
m.Default(p.Runtime, p.PipelineCache, moduleCache)
m.AutoAssert()
m.Init()
for j := range p.ModulePostHooks {
m.AppendPostHook(p.ModulePostHooks[j])
}
res := p.RunModule(m)
err := m.CallPostHook(res)
if res.IsFailed() {
return errors.Wrapf(res.CombineResult, "Pipeline[%s] execute failed", p.Name)
}
if err != nil {
return errors.Wrapf(err, "Pipeline[%s] execute failed", p.Name)
}
p.releaseModuleCache(moduleCache)
}
p.releasePipelineCache()
// close ssh connect
for _, host := range p.Runtime.GetAllHosts() {
p.Runtime.GetConnector().Close(host)
}
if p.SpecHosts != len(p.Runtime.GetAllHosts()) {
return errors.Errorf("Pipeline[%s] execute failed: there are some error in your spec hosts", p.Name)
}
logger.Log.Infof("Pipeline[%s] execute successfully", p.Name)
return nil
}
func (p *Pipeline) RunModule(m module.Module) *ending.ModuleResult {
m.Slogan()
result := ending.NewModuleResult()
for {
switch m.Is() {
case module.TaskModuleType:
m.Run(result)
if result.IsFailed() {
return result
}
case module.GoroutineModuleType:
go func() {
m.Run(result)
if result.IsFailed() {
os.Exit(1)
}
}()
default:
m.Run(result)
if result.IsFailed() {
return result
}
}
stop, err := m.Until()
if err != nil {
result.LocalErrResult(err)
return result
}
if stop == nil || *stop {
break
}
}
return result
}
func (p *Pipeline) newModuleCache() *cache.Cache {
moduleCache, ok := p.ModuleCachePool.Get().(*cache.Cache)
if ok {
return moduleCache
}
return cache.NewCache()
}
func (p *Pipeline) releasePipelineCache() {
p.PipelineCache.Clean()
}
func (p *Pipeline) releaseModuleCache(c *cache.Cache) {
c.Clean()
p.ModuleCachePool.Put(c)
}
cmd/kk/pkg/core/module 에 위치한다.
type Module interface {
IsSkip() bool // 생략 가능 여부
Default(runtime connector.Runtime, pipelineCache *cache.Cache, moduleCache *cache.Cache) // 실행환경 및 캐쉬 초기화
Init() // 초기화
Is() string //
Run(result *ending.ModuleResult)
Until() (*bool, error)
Slogan()
AutoAssert()
AppendPostHook(h PostHookInterface)
CallPostHook(result *ending.ModuleResult) error
}
type BaseModule struct {
Name string
Desc string
Skip bool
ModuleCache *cache.Cache
PipelineCache *cache.Cache
Runtime connector.ModuleRuntime
PostHook []PostHookInterface
}
type BaseTaskModule struct {
BaseModule
Tasks []task.Interface
}
// Run 은 Tasks에 정의된 task의 action을 실행한다.
func (b *BaseTaskModule) Run(result *ending.ModuleResult) {
for i := range b.Tasks {
t := b.Tasks[i]
t.Init(b.Runtime.(connector.Runtime), b.ModuleCache, b.PipelineCache)
logger.Log.Infof("[%s] %s", b.Name, t.GetDesc())
res := t.Execute()
for j := range res.ActionResults {
ac := res.ActionResults[j]
logger.Log.Infof("%s: [%s]", ac.Status.String(), ac.Host.GetName())
result.AppendHostResult(ac)
if _, ok := t.(*task.RemoteTask); ok {
if b.Runtime.GetIgnoreErr() {
if len(b.Runtime.GetAllHosts()) > 0 {
if ac.GetStatus() == ending.FAILED {
res.Status = ending.SUCCESS
b.Runtime.DeleteHost(ac.Host)
}
} else {
result.ErrResult(errors.Wrapf(res.CombineErr(), "Module[%s] exec failed", b.Name))
return
}
}
}
}
if res.IsFailed() {
t.ExecuteRollback()
result.ErrResult(errors.Wrapf(res.CombineErr(), "Module[%s] exec failed", b.Name))
return
}
}
result.NormalResult()
}
cmd/kk/pkg/core/task 모듈에 위치한다.
// Task 인터페이스
type Interface interface {
GetDesc() string
Init(runtime connector.Runtime, moduleCache *cache.Cache, pipelineCache *cache.Cache)
Execute() *ending.TaskResult
ExecuteRollback()
}
// Localtask 구조체
type LocalTask struct {
Name string
Desc string
Prepare prepare.Prepare
Action action.Action
Rollback rollback.Rollback
Retry int
Delay time.Duration
Timeout time.Duration
PipelineCache *cache.Cache
ModuleCache *cache.Cache
Runtime connector.Runtime
tag string
IgnoreError bool
TaskResult *ending.TaskResult
}
// RemoteTask 구조체
type RemoteTask struct {
Name string
Desc string
Hosts []connector.Host
Prepare prepare.Prepare
Action action.Action
Rollback rollback.Rollback
Parallel bool
Retry int
Delay time.Duration
Timeout time.Duration
Concurrency float64
PipelineCache *cache.Cache
ModuleCache *cache.Cache
Runtime connector.Runtime
IgnoreError bool
TaskResult *ending.TaskResult
}
func (t *RemoteTask) Execute() *ending.TaskResult {
for i := range t.Hosts {
if t.Parallel {
go t.RunWithTimeout(ctx, selfRuntime, t.Hosts[i], i, wg, routinePool)
} else {
t.RunWithTimeout(ctx, selfRuntime, t.Hosts[i], i, wg, routinePool)
}
}
}
func (t *RemoteTask) RunWithTimeout(ctx context.Context, runtime connector.Runtime, host connector.Host, index int,
wg *sync.WaitGroup, pool chan struct{}) {
pool <- struct{}{}
resCh := make(chan error)
go t.Run(runtime, host, index, resCh)
select {
case <-ctx.Done():
t.TaskResult.AppendErr(host, fmt.Errorf("execute task timeout, Timeout=%s", util.ShortDur(t.Timeout)))
case e := <-resCh:
if e != nil {
t.TaskResult.AppendErr(host, e)
}
}
<-pool
wg.Done()
}
// Run:
func (t *RemoteTask) Run(runtime connector.Runtime, host connector.Host, index int, resCh chan error) {
if err := t.ExecuteWithRetry(runtime); err != nil {
res = err
return
}
}
// ExecuteWithRetry 실제 Action 함수를 실행하는 정해진 재시도 횟수에 따라 실행하는 함수
func (t *RemoteTask) ExecuteWithRetry(runtime connector.Runtime) error {
err := fmt.Errorf("[%s] exec failed after %d retries: ", t.Name, t.Retry)
for i := 0; i < t.Retry; i++ {
e := t.Action.Execute(runtime) // Action.Execute() 함수 실행
if e != nil {
logger.Log.Messagef(runtime.RemoteHost().GetName(), e.Error())
if i == t.Retry-1 {
err = errors.New(err.Error() + e.Error())
continue
}
logger.Log.Infof("retry: [%s]", runtime.GetRunner().Host.GetName())
time.Sleep(t.Delay)
continue
} else {
err = nil
break
}
}
return err
}
cmd/kk/pkg/core/action 모듈에 위치한다.
// Action 인터페이스
type Action interface {
Execute(runtime connector.Runtime) (err error)
Init(cache *cache.Cache, rootCache *cache.Cache)
AutoAssert(runtime connector.Runtime)
}
// BaseAction 구조체
type BaseAction struct {
ModuleCache *cache.Cache
PipelineCache *cache.Cache
}
func (b *BaseAction) Init(moduleCache *cache.Cache, pipelineCache *cache.Cache) {
b.ModuleCache = moduleCache
b.PipelineCache = pipelineCache
}
func (b *BaseAction) Execute(runtime connector.Runtime) error {
return nil
}
func (b *BaseAction) AutoAssert(runtime connector.Runtime) {
}
kubernetes 관련한 task는 KubeModule 구조체를 일반적으로 사용한다.
cmd/kk/pkg/common/kube_module.go
type KubeConf struct {
ClusterHosts []string
ClusterName string
Cluster *kubekeyapiv1alpha2.ClusterSpec
Kubeconfig string
Arg Argument
}
type KubeModule struct {
module.BaseTaskModule
KubeConf *KubeConf
}
cmd/kk/pkg/common/kube_action.go
type KubeAction struct {
action.BaseAction
KubeConf *KubeConf
}
func GetArgs(argsMap map[string]string, args []string) ([]string, map[string]string) {
for _, arg := range args {
splitArg := strings.SplitN(arg, "=", 2)
if len(splitArg) < 2 {
continue
}
argsMap[splitArg[0]] = splitArg[1]
}
for arg, value := range argsMap {
cmd := fmt.Sprintf("%s=%s", arg, value)
args = append(args, cmd)
}
sort.Strings(args)
return args, argsMap
}
func GetArgs(argsMap map[string]string, args []string) ([]string, map[string]string) {
targetMap := make(map[string]string, len(argsMap))
for k, v := range argsMap {
targetMap[k] = v
}
targetSlice := make([]string, len(args))
copy(targetSlice, args)
for _, arg := range targetSlice {
splitArg := strings.SplitN(arg, "=", 2)
if len(splitArg) < 2 {
continue
}
targetMap[splitArg[0]] = splitArg[1]
}
for arg, value := range targetMap {
cmd := fmt.Sprintf("%s=%s", arg, value)
targetSlice = append(targetSlice, cmd)
}
sort.Strings(targetSlice)
return targetSlice, targetMap
}
tar -zxvf artifact-v3.1.1.tar.gz -C kubesphere/kubekey
openssl dgst -md5 artifact-v3.1.1.tat.gz | cut -d '=' -f2 | tr -d ' ' | tr -d '\12' > kubesphere/artifacr.md5