몇 안되는 최신 카프카 설명서인 카프카 핵심 가이드를 읽으며 스터디를 하고 있었다. 그런데 155쪽에 나온 이 그림을 두고 다들 서로 다르게 이해한 채로 책을 읽고 있었던 것…!
아무리 구글링해도 저게 뭔지 안 나오길래 카프카 코드를 직접 구경하고 왔다. 결론부터 말하면, 저 사각형은 그저 프로그램 안의 클래스 인스턴스 하나다.^,^.. 하지만 상황에 따라 서버 한 대를 의미할 수도 있다..! (이건 또 무슨 소리? 자세한것은 아래로 내려보자.)
151p. 브로커가 컨트롤러가 되면, 클러스터 메타데이터 관리와 리더 선출을 시작하기 전에 먼저 주키퍼로부터 최신 레플리카 상태 맵을 읽어온다.
...
브로커가 클러스터를 나갔다는 사실을 컨트롤러가 알아차리면,
컨트롤러는 해당 브로커가 리더를 맡고 있었던 모든 파티션에 대해 새로운 브로커를 할당해주게 된다.
컨트롤러는 새로운 리더가 필요한 모든 파티션을 순회해 가면서 새로운 리더가 될 브로커를 결정한다.
그러고 나서 새로운 상태를 주키퍼에 쓴 뒤, ...
그래서 Raft 도입 이전 버전의 카프카 그림을 그려보면 이렇게 된다. 카프카 클러스터와 주키퍼 클러스터가 각각 있을 때, 카프카 클러스터의 프로세스 중 하나가 컨트롤러
라는 역할을 맡게 되고 주키퍼와의 통신을 담당한다. 그림에서 3개의 카프카 프로세스 중, 누가 파티션 리더 브로커가 될 지는 주키퍼가 결정하고 주키퍼가 저장한다.
여기서부터 조금 헷갈리기 시작한다. KRaft 모드와 그 이전 버전에서 똑같은 단어를 사용하지만 의미가 달라진다. 껍데기만 ‘컨트롤러’로 같은 것이다. 아래는 KRaft 모드에서의 컨트롤러
에 대한 설명이다.
154p. KRaft 이후부터는 주키퍼 프로세스가 제거되기 때문에 카프카 프로세스 외에 다른 프로세스는 없다.
다만 카프카 프로세스가 다음 두 개 중 적어도 하나의 역할(role)을 가지게 된다(즉, 두 역할을 겸할 수도 있다)
: 컨트롤러, 브로커.
- 컨트롤러: 메타데이터관리
- 브로커: 클라이언트 요청을 받아서 카프카 레코드 관리
Kraft 모드에서는 주키퍼가 사라진다. 따라서 주키퍼 클러스터는 없어지고, 카프카 클러스터만 그림에 남게 된다.
__cluster_metadata
라는 카프카 토픽에서 관리하게 된다.
In KRaft mode, cluster metadata, reflecting the current state of all controller managed resources, is stored in a single partition Kafka topic called __cluster_metadata.
(https://developer.confluent.io/courses/architecture/control-plane/)
주키퍼가 없어지니 자연스럽게 주키퍼와의 통신을 담당하는 역할을 의미하던 “컨트롤러 브로커”는 사라지게 된다. 대신 이제 “컨트롤러”라는 용어는 다른 의미로 사용된다. 바로 카프카 프로세스가 가질 수 있는 역할의 일종이 되는 것이다. 책에 나온 것처럼 카프카 프로세스는 컨트롤러
이거나 브로커
이거나 컨트롤러 이면서 브로커
역할을 맡게된다. 더이상 카프카 프로세스 == 브로커
라는 공식이 통하지 않게 되는 것이다.
컨트롤러
역할을 맡은 모두가 리더 선출에 관여한다.(이제 컨트롤러라고 불리는 프로세스는 더이상 1개가 아니다!) 컨트롤러 역할은 브로커 역할과 겸임할 수 있다. 예를 들어, 카프카 프로세스 5개로 구성된 클러스터가 있을 때 이런 역할을 맡을 수 있다.
컨트롤러 1, 2, 3 즉, 카프카 프로세스 1, 2, 3으로 구성된 서버 풀을 '컨트롤러 풀' 이라고 부른다.
process.roles
를 controller
라고만 설정하면 브로커 역할은 안 하고 컨트롤러 역할만 하는 노드를 만들 수 있다.process.roles
를 설정값으로 받고// KRaftConfigs.java
public class KRaftConfigs {
/** KRaft mode configs */
public static final String PROCESS_ROLES_CONFIG = "process.roles";
parseProcessRoles()
에서 위 설정값을 보고 역할을 정해준다. 브로커이거나, 컨트롤러이거나, 혹은 둘 다가 될 수 있다.// KafkaConfig.scala
class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
extends AbstractKafkaConfig(KafkaConfig.configDef, props, Utils.castToStringObjectMap(props), doLog) with Logging {
// ...
val processRoles: Set[ProcessRole] = parseProcessRoles()
// ...
private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole
case "controller" => ProcessRole.ControllerRole
case role => throw new ConfigException(s"Unknown process role '$role'" +
" (only 'broker' and 'controller' are allowed roles)")
}
val distinctRoles: Set[ProcessRole] = roles.toSet
if (distinctRoles.size != roles.size) {
throw new ConfigException(s"Duplicate role names found in `${KRaftConfigs.PROCESS_ROLES_CONFIG}`: $roles")
}
distinctRoles
}
// KafkaConfig.scala
// ...
def requiresZookeeper: Boolean = processRoles.isEmpty // (설정값으로 아무 역할도 지정 안 했다면 KRaft 쓰는게 아닌거임!)
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
// Kafka.scala
private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, doLog = false)
if (config.requiresZookeeper) {
new KafkaServer(
config,
Time.SYSTEM,
threadNamePrefix = None,
enableForwarding = enableApiForwarding(config)
)
} else {
new KafkaRaftServer(
config,
Time.SYSTEM,
)
}
}
// KafkaRaftServer.scala
override def startup(): Unit = {
Mx4jLoader.maybeLoad()
// Controller component must be started before the broker component so that
// the controller endpoints are passed to the KRaft manager
controller.foreach(_.startup())
broker.foreach(_.startup())
AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
info(KafkaBroker.STARTED_MESSAGE)
}
override def shutdown(): Unit = {
// In combined mode, we want to shut down the broker first, since the controller may be
// needed for controlled shutdown. Additionally, the controller shutdown process currently
// stops the raft client early on, which would disrupt broker shutdown.
broker.foreach(_.shutdown())
controller.foreach(_.shutdown())
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics), this)
}
사실 하위호환을 생각하면 이런식으로 구성하는게 아주 자연스럽다! 하지만 클래스 필드 하나일 것이라고는 상상도 못 한 정체… 오늘도 하나 배웠다.
+) 번외: 추가로 든 생각과 스터디원의 대답
정리 잘 해주셔서 너무 잘 읽었습니다! 차라리 과거의 카프카를 몰랏다면,, 싶은 생각도 드네요