Confluent CLI 사용해보기 (feat. 어떨 때 사용하면 좋을까)

Yunhong Min·2022년 1월 12일
0

Confluent CLI 를 사용해보고 어떤 목적으로 사용하는 것이 좋을지 판단하기 위해 테스트 해보았다

(사전 설정) Confluent Kafka Rest

* confluent cloud에서 rest endpoint를 제공해주는데 confluent cli와 함께 직접적으로 이 endpoint를 사용하는 방법은 찾지 못했다. 그래서 로컬에서 rest 서버를 띄워서 우회하여 사용하는 방법을 사용하였다.

로컬에서 confluent cloud로 confluent rest kafka를 사용하기 위해서는 Connecting REST Proxy to Confluent Cloud 의 내용대로 REST Proxy 을 실행시켜 두어야 한다.

confluent platform 다운로드 및 압축 풀기

confluent platform을 다운 받아야 하는데 Manual Install using ZIP and TAR Archives에 방법이 잘 나와있다. 이 메뉴얼을 참고하여 community componenets만 포함된 zip파일을 다운 받은 후 압축을 풀어주자.

설정 파일 생성

위에서 압축을 푼 디렉토리 내로 들어가서 아래와 같은 설정 파일을 만든다. # Kafka 아래 설정과 # Client Kafka 아래 설정은 완전 동일한 값인데 key 값 앞에 client. prefix만 붙은것이 다르다. (왜 이렇게 중복하여 값을 넣어줘야 하는지는 모르겠다.). 그 외에 producer, consumer, admin 관련 설정은 producer, consumer, admin 설정 키 앞에 producer., consumer., admin. prefix를 붙여주면 된다.

내가 사용하기 원하는 클러스터에서 생성한 API key와 API secret 을 넣어주어야 한다. confluent cloud의 endpoint url (e.g bootstrap server, rest endpoint)은 특이한 구조를 가지고 있는데, cluster 별로 endpoint url를 가지는 것이 아닌 environment가 다르더라도 end point는 같은 값을 가지고, 구분은 API key를 가지고 이루어진다.

# Kafka
bootstrap.servers=<myproject>.cloud:9092
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<kafka-cluster-api-key>" password="<kafka-cluster-api-secret>";
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
# Client Kafka
client.bootstrap.servers=<myproject>.cloud:9092
client.security.protocol=SASL_SSL
client.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<kafka-cluster-api-key>" password="<kafka-cluster-api-secret>";
client.ssl.endpoint.identification.algorithm=https
client.sasl.mechanism=PLAIN


# consumer only properties must be prefixed with consumer.
consumer.retry.backoff.ms=600
consumer.request.timeout.ms=25000

# producer only properties must be prefixed with producer.
producer.acks=1

# admin client only properties must be prefixed with admin.
admin.request.timeout.ms=50000

설정 파일 이름을 kafka-rest.config라 가정하자.

rest 실행

다음 명령어를 실행하면 실행이 된다. 에러는 신경쓰지 말자

./bin/kafka-rest-start kafka-rest.config
log4j:ERROR setFile(null,true) call failed.
java.io.FileNotFoundException: /kafka-rest.log (Read-only file system)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:133)
	at org.apache.log4j.FileAppender.setFile(FileAppender.java:294)

이제 http://localhost:8082 을 통해 rest proxy에 접근할 수 있다.confluent kafka partition describe 와 같이 Confluent Kafka REST를 사용해야만 하는 cli 명령어를 사용하면 된다.

Environment

List & Use

먼저 cli를 통해 사용하고 싶은 cloud environment를 먼저 선택해야 한다.

confluent environment list --o json

[
  {
    "id": "abc-ab123",
    "name": "your-env"
  }
]

사용할 environment를 고르자.

confluent environment use abc-ab123

Kafka Cluster

list

kafka cluster list 명령어를 사용하여 cluster list를 불러온다.

confluent kafka cluster list -o json
[
  {
    "availability": "single-zone",
    "id": "abc-7a5b0",
    "name": "your-cluster-standard",
    "provider": "aws",
    "region": "ap-northeast-2",
    "status": "UP",
    "type": "STANDARD"
  },
  {
    "availability": "single-zone",
    "id": "cba-a1b23",
    "name": "your-cluster-basic",
    "provider": "aws",
    "region": "ap-northeast-2",
    "status": "UP",
    "type": "BASIC"
  }
]

describe

더 자세히 살펴보고 싶은 클러스터가 있으면 kafka cluster describe 명령어를 사용하면 된다.

confluent kafka cluster describe lkc-v3g90 -o json
{
  "id": "cba-a1b23",
  "name": "your-cluster-basic",
  "type": "BASIC",
  "ingress": 100,
  "egress": 100,
  "storage": "5000",
  "provider": "aws",
  "region": "ap-northeast-2",
  "availability": "single-zone",
  "status": "UP",
  "endpoint": "SASL_SSL://abd-a12ab.ap-northeast-2.aws.confluent.cloud:9092",
  "rest_endpoint": "https://abd-a12ab.ap-northeast-2.aws.confluent.cloud:443"
}

Topic

생성

토픽 생성은 자주 사용하는 명령어이므로 한번 알아보자. cluster id가 cba-a1b23 인 cluster에 토픽을 생성하고 싶다면 confluent kafka topic create 명령어를 사용하면 된다.

confluent kafka topic create your-topic --partitions 2 --cluster cba-a1b23  --config min.insync.replicas=1,retention.ms=3600000

--dry-run flag를 사용하여 생성 테스트(실제로는 생성되지 않는다.)를 해볼수도 있다. --config 명령어를 사용하면 topic의 config를 변경할수 있다. 그러나 변경할 수 있는 설정값은 제한되어 있으며, 제한 범위는 confluent cloud에서 변경할(editable) 수 있는 값과 동일한 것 같다. 예를 들어 replication.factor=1 로 해서 설정값을 넣어도 에러는 나지 않으나 생성된 topic에는 반영이 되어 있지 않았다.

토픽 리스트

confluent kafka topic list --cluster cba-a1b23 -o json
[
  {
    "name": "your-topic"
  },
  {
    "name": "your-topic-1"
  }
 
]

cluster id의 클러스터에 있는 topic list를 보여준다. 진짜 topic list 보여준다.

토픽 설정 조회

kafka topic describe 명령어를 사용하면 topic의 설정값을 볼 수 있다. 설정값 외의 정보도 보여주면 좋은데 아쉽다.

confluent kafka topic describe your-topic --cluster cba-a1b23 -o json
{
  "topic_name": "your-topic",
  "config": {
    "cleanup.policy": "delete",
    "compression.type": "producer",
    "delete.retention.ms": "86400000",
    "file.delete.delay.ms": "60000",
    "flush.messages": "9223372036854775807",
    "flush.ms": "9223372036854775807",
    "follower.replication.throttled.replicas": "",
    "index.interval.bytes": "4096",
    "leader.replication.throttled.replicas": "",
    "max.compaction.lag.ms": "9223372036854775807",
    "max.message.bytes": "2097164",
    "message.downconversion.enable": "true",
    "message.format.version": "2.3-IV1",
    "message.timestamp.difference.max.ms": "9223372036854775807",
    "message.timestamp.type": "CreateTime",
    "min.cleanable.dirty.ratio": "0.5",
    "min.compaction.lag.ms": "0",
    "min.insync.replicas": "1",
    "num.partitions": "2",
    "preallocate": "false",
    "retention.bytes": "-1",
    "retention.ms": "3600000",
    "segment.bytes": "104857600",
    "segment.index.bytes": "10485760",
    "segment.jitter.ms": "0",
    "segment.ms": "604800000",
    "unclean.leader.election.enable": "false"
  }
}

토픽 파티션 리스트 조회

Confluent Kafka Rest 서버가 돌아가고 있어야 한다.

confluent kafka partition list --topic your-topic --url http://localhost:8082 --no-auth -o json
[
  {
    "cluster_id": "cba-a1b23",
    "leader_id": "0",
    "partition_id": "0",
    "topic_name": "your-topic"
  },
  {
    "cluster_id": "cba-a1b23",
    "leader_id": "2",
    "partition_id": "1",
    "topic_name": "your-topic"
  }
]

--no-authconfluent login을 사용하여 로그인한 상태이면 넣어준다. 그러면 추가 로그인이 필요 없다.

로컬에서 돌리고있는 kafka rest 서버의 설정값 (endpoint + API key)에 해당하는 cluster의 your-topic 의 partition에 대한 정보를 가져온다. leader_id 같은 정보만 주고 있는데, 정보가 너무 제한적이라 사용할 일은 없을것 같다.

토픽 파티션 하나 조회

Confluent Kafka Rest 서버가 돌아가고 있어야 한다.

confluent kafka partition describe 0 --topic your-topic --url http://localhost:8082 --no-auth -o json
{
  "cluster_id": "cba-a1b23",
  "topic_name": "your-topic",
  "partition_id": 0,
  "leader_id": 0
}

로컬에서 돌리고있는 kafka rest 서버의 설정값 (endpoint + API key)에 해당하는 cluster의 your-topic 의 partition 0 에 대한 정보를 가져온다. 파티션 리스트의 정보값과 동일하다. 정보가 너무 제한적이다.

결론

confluent CLI는 kafka 자체의 정보 조회에는 쓸모가 거의 없음을 확인하였다. confluent cloud의 리소스 관리시에는 설정 파일의 코드화가 가능해서 유용할 수 있으나, 그 외에는 kafka나 confluent platform에서 기본적으로 제공하는 툴 또는 서드파티 툴(kcat 등)을 사용하는 것이 더 좋을것 같다.

profile
Born as programmer

0개의 댓글