Kafka acl SASL/PALIN 인증 설정을 해보자!

Karim·2022년 4월 11일
8

kafka 운영

목록 보기
9/9
post-thumbnail

1. Version

💬

  • Kafka : 2.6.0

2. SASL 란


(SASL 아키텍처)

  • SASL 은 연결 지향 프로토콜에서 교체 가능한 메커니즘을 통해 인증 및 데이터 보안 서비스를 제공하는 프레임워크이다.
  • Kafka 프로토콜이 데이터 교환 과정에서 Kafka가 지원하는 Kerberos, PLAIN, SCRAM, OAUTHBEARER 등의 메커니즘을 사용하여 인증/인가를 할 수 있도록 해주며, 인증/인가 교환이 성공했을 때, 후속 데이터 교환을 데이터 보안 계층 위에서 할 수 있도록 해 주는 기술
  • 간단하게 생각하면 이걸 사용할려면 인증을 해야한다.

💡 SASL이 지원하는 매커니즘

  • SASL/PLAINTEXT (0.9.0.0 ~)
    • PLAINTEXT로 username/password 를 설정하여 인증을 하는 가장 기본적이나 고전적인 방식
    • PLAINTEXT로 사용되는 username과 password는 kafka 브로커에 미리 저장되어 있어야 하며, 변경될 때마다 재시작이 필요
  • SASL/SCRAM (0.10.0.0 ~)
    • PBKDF2 암호화 알고리즘을 활용해 생성된 해시를 활용하며 username/password 조합에 salt / count를 부가적으로 전달하여 보안을 높인 방법
    • username 과 password의 해시는 zookeeper에 저장되므로 브로커를 재부팅 불 필요
    • 네트워크에서 자격증명이 PAINTEXT로 전송되지 않도록 ssl 암호화를 활성화
  • SASL/OAUITHBEARER (2.0 ~)
    • OAuth2 토큰을 기반으로 인증하는 방법으로 KIP(Kafka Improvement Proposals)를 통해 읽습니다.
  • SASL/KERBEROS(GSSAPI) (0.9.0.0 ~)
    • 노드간 통신에서 보안을 client티켓을 발급받아 본인의 신원을 증명하면 인증하는 매우 안전한 방법
    • 별도의 인증 및 티켓검증용 서버가 필요하며 서버가 불능이 될 경우 인증 불가하므로 관리에 주의

3. 구성 전 작업

4. 인증정보 conf 파일 구성

  • 모든 작업은 kafka/bin dir에서 작업합니다.

✒️ zookeeper_jaas.conf

vi ../config/zookeeper_jaas.conf

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin007"
    user_admin="admin007"
    user_karim="karim007";
};
  • username, password : Super User 설정
  • user_[UserName] = "[PassWords]"

✒️ kafka_jaas.conf.conf

vi ../config/kafka_jaas.conf.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin007"
    user_admin="admin007"
    user_karim="karim007";
};
Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin007";
};
  • KafkaServer : zookeeper_jaas.conf 와 똑같이 Super 유저와 일반 유저 설정
  • Client : Zookeeper Server에 접속할 유저 정보

5. server's 설정

  • 모든 작업은 kafka/bin dir에서 작업합니다.

✒️ zookeeper.properties

vi ../config/zookeeper.properties

# 모든 클라이언트들이 SASL 인증을 하도록 설정.
zookeeper.sasl.client=true
 
# 인증되지않은 사용자의 연결을 유지하지만, 조작은 할 수 없음.
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
 
# 모든 Connection이 SASL 인증방식을 사용하도록 설정.
requireClientAuthScheme=sasl

✒️ kafka.properties

vi ../config/kafka.properties

# Kafka Server의 Super 유저 이름.
super.users=User:admin
 
# SASL 인증 방식을 처리하기 위한 설정.
security.inter.broker.protocol=SASL_PLAINTEXT

#브로커간 통신에 사용할 SASL 메커니즘, 기본값은 GSSAPI
sasl.mechanism.inter.broker.protocol=PLAIN

#Kafka 서버에서 활성화 된 SASL 메커니즘의 리스트
sasl.enabled.mechanisms=PLAIN

# ACL을 저장하는 즉시 사용 가능한 Authorizer 구현 제공
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

# Super User를 제외한 User도 해당 리소스에 액세스를 제어 가능
allow.everyone.if.no.acl.found=true

# Broker가 사용하는 호스트와 포트를 지정
listeners=SASL_PLAINTEXT://:9092
 
# Producer와 Consumer가 접근할 호스트와 포트를 지정
advertised.listeners=SASL_PLAINTEXT://localhost:9092

6. 인증정보 conf 파일 실행 파일에 설정

  • 모든 작업은 kafka/bin dir에서 작업합니다.

✒️ zookeeper-server-start.sh

vi ../config/zookeeper-server-start.sh

  • 추가
export KAFKA_OPTS="-Djava.security.auth.login.config=file:$base_dir/../config/zookeeper_jaas.conf"

✒️ kafka-server-start.sh

vi ../config/kafka-server-start.sh

  • 추가
export KAFKA_OPTS="-Djava.security.auth.login.config=file:$base_dir/../config/kafka_jaas.conf"

7. zookeeper, kafka 실행

💻 zookeepr.out

💻 server.log

  • listeners 및 sasl 옵션 확인
[2022-04-11 15:52:47,174] INFO KafkaConfig values:
        advertised.host.name = null
        advertised.listeners = SASL_PLAINTEXT://localhost:9092
        advertised.port = null
PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
        listeners = SASL_PLAINTEXT://:9092
        sasl.client.callback.handler.class = null
        sasl.enabled.mechanisms = [PLAIN]
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism.inter.broker.protocol = PLAIN
        sasl.server.callback.handler.class = null
        security.inter.broker.protocol = SASL_PLAINTEXT

7. 계정 권한 부여 방법

  • 모든 작업은 kafka/bin dir에서 작업합니다.

✒️ single

./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:karim --operation All --topic '*' --group '*'

✒️ cluster

  • --cluster 추가
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:karim --operation All --topic '*' --group '*' --cluster
  • --allow-principal User:[UserName] : 권한 부여할 유저
  • --operation : producer, consumer, All 권한
  • --topic : topic, '*'
  • --group : group Id (producer만 지정할 땐 필요X)

💻 계정 권한 부여

[karim@cede526eeb56 bin]$ ./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:karim --operation All --topic '*' --group '*'


Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=*, patternType=LITERAL)`:
        (principal=User:karim, host=*, operation=ALL, permissionType=ALLOW)

Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`:
        (principal=User:karim, host=*, operation=ALL, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=*, patternType=LITERAL)`:
        (principal=User:karim, host=*, operation=ALL, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`:
        (principal=User:karim, host=*, operation=ALL, permissionType=ALLOW)

8. console consumer/producer 적용 및 사용법

  • 모든 작업은 kafka/bin dir에서 작업합니다.

✒️ consumer_jaas.conf

vi ../config/consumer_jaas.conf

client.id=karim
group.id=local-group

bootstrap.servers=SASL_PLAINTEXT://localhost:9092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="karim" \
  password="karim007";

💻 console consumer

 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group local-group --consumer.config=../config/consumer_jaas.conf

💻 console consumer server.log

[2022-04-11 16:44:22,540] INFO [GroupCoordinator 1]: Preparing to rebalance group local-group in state PreparingRebalance with old generation 45 (__consumer_offsets-1) (reason: Adding new member interezen-b504b536-23f5-4965-92d1-55678f509c3c with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-11 16:44:23,392] INFO [GroupCoordinator 1]: Stabilized group local-group generation 46 (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator)
[2022-04-11 16:44:23,626] INFO [GroupCoordinator 1]: Assignment received from leader for group local-group for generation 46 (kafka.coordinator.group.GroupCoordinator)

✒️ producer_jaas.conf

vi ../config/producer_jaas.conf

bootstrap.servers=SASL_PLAINTEXT://localhost:9092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="karim" \
  password="karim007";

💻 console producer

./kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config=../config/producer_jaas.conf

💻 console producer server.log

[2022-04-11 16:46:11,647] INFO [GroupCoordinator 1]: Stabilized group local-group generation 47 (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator)
[2022-04-11 16:46:11,648] INFO [GroupCoordinator 1]: Assignment received from leader for group local-group for generation 47 (kafka.coordinator.group.GroupCoordinator)

9. 허가되지 않은 user로 console consumer/producer 시

  • 못 붙고, 연결이 끊겼다라는 메세지가 뜬다.

💻 server.log

[2022-04-11 16:48:37,469] INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

💻 console log

[2022-04-11 16:48:37,587] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

10. logstash 및 kafka 쓰는 Open-source id/pw 적용 방법

✒️ logstash

        kafka {
            group_id => "logstash_input"
            client_id => "logstash_client"
            bootstrap_servers => "127.0.0.1:9092"
            sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"karim\" password=\"karim007\";"
            sasl_mechanism => "PLAIN"
            security_protocol => "SASL_PLAINTEXT"
            topics => ["karim_topic"]
            auto_offset_reset => "latest"
            decorate_events => true
            codec => plain
        }

✒️ metricbeat

vi metricbeat/modules.d/kafka.yml

  # SASL authentication
  username: "karim"
  password: "karim007"

✒️ filebeat

vi /filebeat/conf/filebeat.yml

output.kafka:
  enabled: true
  hosts: ["127.0.0.1:9092"]
  version: "0.10.2.1"
  topic: "karim_topic_test"
  username: "karim"
  password: "karim007"
  sasl.mechanism: "PLAIN"
  partition.round_robin.group_events: 1
  client_id: "agent_beat"

📌 여담

  • 어쩌다 acl을 적용할 일이 있었는데 좋은건데 왜 지금까지 왜 안쓰고 있었을까..ㅎ

📚 참고

profile
나도 보기 위해 정리해 놓은 벨로그

0개의 댓글