
(SASL 아키텍처)
인증을 해야한다.변경될 때마다 재시작이 필요해시를 활용하며  username/password 조합에 salt / count를 부가적으로 전달하여 보안을 높인 방법 zookeeper에 저장되므로 브로커를 재부팅 불 필요ssl 암호화를 활성화OAuth2 토큰을 기반으로 인증하는 방법으로 KIP(Kafka Improvement Proposals)를 통해 읽습니다.client가 티켓을 발급받아 본인의 신원을 증명하면 인증하는 매우 안전한 방법티켓검증용 서버가 필요하며 서버가 불능이 될 경우 인증 불가하므로 관리에 주의vi ../config/zookeeper_jaas.conf
Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin007"
    user_admin="admin007"
    user_karim="karim007";
};[UserName] = "[PassWords]"vi ../config/kafka_jaas.conf.conf
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin007"
    user_admin="admin007"
    user_karim="karim007";
};
Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin007";
};vi ../config/zookeeper.properties
# 모든 클라이언트들이 SASL 인증을 하도록 설정.
zookeeper.sasl.client=true
 
# 인증되지않은 사용자의 연결을 유지하지만, 조작은 할 수 없음.
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
 
# 모든 Connection이 SASL 인증방식을 사용하도록 설정.
requireClientAuthScheme=saslvi ../config/kafka.properties
# Kafka Server의 Super 유저 이름.
super.users=User:admin
 
# SASL 인증 방식을 처리하기 위한 설정.
security.inter.broker.protocol=SASL_PLAINTEXT
#브로커간 통신에 사용할 SASL 메커니즘, 기본값은 GSSAPI
sasl.mechanism.inter.broker.protocol=PLAIN
#Kafka 서버에서 활성화 된 SASL 메커니즘의 리스트
sasl.enabled.mechanisms=PLAIN
# ACL을 저장하는 즉시 사용 가능한 Authorizer 구현 제공
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# Super User를 제외한 User도 해당 리소스에 액세스를 제어 가능
allow.everyone.if.no.acl.found=true
# Broker가 사용하는 호스트와 포트를 지정
listeners=SASL_PLAINTEXT://:9092
 
# Producer와 Consumer가 접근할 호스트와 포트를 지정
advertised.listeners=SASL_PLAINTEXT://localhost:9092vi ../config/zookeeper-server-start.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=file:$base_dir/../config/zookeeper_jaas.conf"vi ../config/kafka-server-start.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=file:$base_dir/../config/kafka_jaas.conf"
[2022-04-11 15:52:47,174] INFO KafkaConfig values:
        advertised.host.name = null
        advertised.listeners = SASL_PLAINTEXT://localhost:9092
        advertised.port = null
PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
        listeners = SASL_PLAINTEXT://:9092
        sasl.client.callback.handler.class = null
        sasl.enabled.mechanisms = [PLAIN]
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism.inter.broker.protocol = PLAIN
        sasl.server.callback.handler.class = null
        security.inter.broker.protocol = SASL_PLAINTEXT 
  
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:karim --operation All --topic '*' --group '*'--cluster 추가./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:karim --operation All --topic '*' --group '*' --cluster[UserName] : 권한 부여할 유저[karim@cede526eeb56 bin]$ ./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:karim --operation All --topic '*' --group '*'
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=*, patternType=LITERAL)`:
        (principal=User:karim, host=*, operation=ALL, permissionType=ALLOW)
Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`:
        (principal=User:karim, host=*, operation=ALL, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=*, patternType=LITERAL)`:
        (principal=User:karim, host=*, operation=ALL, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`:
        (principal=User:karim, host=*, operation=ALL, permissionType=ALLOW)vi ../config/consumer_jaas.conf
client.id=karim
group.id=local-group
bootstrap.servers=SASL_PLAINTEXT://localhost:9092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="karim" \
  password="karim007"; ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group local-group --consumer.config=../config/consumer_jaas.conf[2022-04-11 16:44:22,540] INFO [GroupCoordinator 1]: Preparing to rebalance group local-group in state PreparingRebalance with old generation 45 (__consumer_offsets-1) (reason: Adding new member interezen-b504b536-23f5-4965-92d1-55678f509c3c with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-11 16:44:23,392] INFO [GroupCoordinator 1]: Stabilized group local-group generation 46 (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator)
[2022-04-11 16:44:23,626] INFO [GroupCoordinator 1]: Assignment received from leader for group local-group for generation 46 (kafka.coordinator.group.GroupCoordinator)vi ../config/producer_jaas.conf
bootstrap.servers=SASL_PLAINTEXT://localhost:9092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="karim" \
  password="karim007";
./kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config=../config/producer_jaas.conf[2022-04-11 16:46:11,647] INFO [GroupCoordinator 1]: Stabilized group local-group generation 47 (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator)
[2022-04-11 16:46:11,648] INFO [GroupCoordinator 1]: Assignment received from leader for group local-group for generation 47 (kafka.coordinator.group.GroupCoordinator)[2022-04-11 16:48:37,469] INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)[2022-04-11 16:48:37,587] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)        kafka {
            group_id => "logstash_input"
            client_id => "logstash_client"
            bootstrap_servers => "127.0.0.1:9092"
            sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"karim\" password=\"karim007\";"
            sasl_mechanism => "PLAIN"
            security_protocol => "SASL_PLAINTEXT"
            topics => ["karim_topic"]
            auto_offset_reset => "latest"
            decorate_events => true
            codec => plain
        }vi metricbeat/modules.d/kafka.yml
  # SASL authentication
  username: "karim"
  password: "karim007"
vi /filebeat/conf/filebeat.yml
output.kafka:
  enabled: true
  hosts: ["127.0.0.1:9092"]
  version: "0.10.2.1"
  topic: "karim_topic_test"
  username: "karim"
  password: "karim007"
  sasl.mechanism: "PLAIN"
  partition.round_robin.group_events: 1
  client_id: "agent_beat"
📌 여담
📚 참고