Connection to node 1001 (/127.0.0.1:9092) could not be established. Broker may not be available.
2022-07-02 10:15:06.672 INFO 23090 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Node 1001 disconnected.
2022-07-02 10:15:06.674 WARN 23090 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Node 1001 disconnected.
/opt/kafka/config/server.properties
에 다음 listeners 변수의 주석을 해제한다.
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092 ## <<
만든 토픽을 삭제하고 싶어 다음과 같은 명령어를 실행하면 토픽이 정상적으로 삭제되지 않고 리스트로 조회할 때 already marked for deletion
와 같은 문구가 붙어나오는 것을 볼 수 있다.
$ ./kafka-topics.sh —delete —zookeeper zookeeper:2181 —topic exam-topic
Topic exam-topic is already marked for deletion.
$ ./kafka-topics.sh --list —zookeeper zookeeper
__consumer_offsets
exam-topic - marked for deletion
quickstart-events
이는 server의 delete.topic.enable = true를 설정하지 않았기 때문에 나타나는 문제로 이 경우 다시 토픽을 생성할 수도 없고 해당 토픽으로 들어온 메세지도 처리가 되지 않는다.
때문에 server.properties에 해당 설정을 해주고 재기동해야한다.
/opt/kafka/config/server.properties
delete.topic.enable = true
zookeeper에 붙어서 처리하기
$ cd /kafka/bin
$ ./zookeeper-shell.sh zookeeper:2181
ls /brokers/topics # 존재하는 토픽 조회
[__consumer_offsets, exam-topic, quckstart-events]
deleteall /brokers/topics/exam-topic
참고로 rmr은 deprecated 되었으니 deleteall을 쓰자
다음은 Error는 아니지만 신경쓰이는 WARN이다. 말 그대로 다음 토픽의 리더 파티션을 찾지 못하는 듯하다.
이는 토픽을 찾지 못할 때 자동으로 생성해주는 auto.create.topics.enable이 true로 되어있지 않아서 그렇다.
하지만, 쓸데없는 사고를 막기 위해 자동 생성은 꺼두는 것이 좋다고 한다.
미리 토픽을 생성한 뒤 어플리케이션을 실행하면 이 로그가 뜨는 것을 막을 수 있다.
$ ./kafka-topics.sh --create --zookeeper cp-zookeeper-1:12181 --replication-factor 3 --partitions 3 --topic exam-topic
Created topic exam-topic.
Kafka의 컨슈밍이 되지 않을 때 어떻게 해야할까? 나와 같은 경우에는 어플리케이션을 작성 시 계속 host를 찾지 못하는 문제였다. Log1번의 해결책만 쓰면 될 것 같았는데 밑에 있는 advertiesd.listeners와는 역할이 조금 다른 듯 했다.
/opt/kafka/config/server.properties
.
.
advertised.listeners=PLANTEXT://공인ip:9092
.
.
listeners
카프카 브로커가 내부적으로 바인딩하는 주소
advertised.listeners
카프카 프로듀서, 컨슈머에게 노출할 주소. 설정하지 않을 경우 디폴트로 listeners 설정을 따른다.
만약 Kafka 서버가 3개의 랜카드를 장착 중이고 A,B,C 라는 IP를 각각 부여 받았을 때, 해당 서버에는 Kafka 서비스와 그 Kafka의 토픽을 구독중인 별도의 Test라는 서비스가 실행 중이라고 생각해보자. Test 서비스는 Kafka 서비스와 같은 PC에서 구동 중이기 때문에 localhost또는 127.0.0.1이라는 주소로 kafka에 접근이 가능하다.
하지만, A,B,C 라는 IP로 접근하려는 외부 서비스들이 있을 경우 특정 IP로 접근한 요청들은 Kafka에 접근하지 못하게 해야하는 경우가 있다.
예를들어, 우리의 서버는 localhost로 접근하는 내부 서비스와 B라는 IP로 접근하는 외부 서비스만 Kafka에 접근할 수 있게 하고 싶은 경우
listeners=PLAINTEXT://localhost:9092
advertiesed.listeners=PLAINTEXT://B:9092
라고 지정할 수 있다.
내부와 외부에 오픈할 특정 IP를 별도로 두기 위해 나눠놓았다고 볼 수 있다.