kafka Producer를 java 로 구현해보자

Karim·2021년 10월 29일
3

kafka 예제

목록 보기
1/3
post-thumbnail

1. Version

💬

  • Kafka : 2.6.0
  • grdle : kafka-clients.2.8.1

2. build.gradle

💬 build.gredle dependencies

dependencies {
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.1'
}

3. Kafka Producer 구현

💬 java code

예제는 간단!
kafka 설치한 서버에 원하는 메세지를 보내는 코드이다.

package kafkaProducer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by karim
 * Date : 2021-10-29
 * Time : 오후 2:54
 */
public class KafkaProducerBasis {

    private static final String KAFKA_SINGLE_IP = "serverIp:9092";
    private static final String TOPIC_NAME = "karim-topic";

    public static void main(String[] args) {

        String sendMessage = "Karim velog gooooooooood";

        Properties configs = new Properties();
        configs.put("bootstrap.servers", KAFKA_SINGLE_IP); // kafka host 및 server 설정
        configs.put("acks", "1"); // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않는다.
        configs.put("block.on.buffer.full", "true"); // 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   // serialize 설정
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

        // kafka로 메세지 개시
        producer.send(new ProducerRecord<String, String>(TOPIC_NAME, sendMessage));
        
        producer.flush();
        // producer 닫기
        producer.close();
    }
}

4. runining log

💬 log

15:52:16.484 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [...:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = false
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

15:52:17.128 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Starting Kafka producer I/O thread.
15:52:17.128 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'block.on.buffer.full' was supplied but isn't a known config.
15:52:17.129 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node ...:9092 (id: -1 rack: null) for sending metadata request
15:52:17.131 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node ...:9092 (id: -1 rack: null) using address /...
15:52:17.132 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.8.1
15:52:17.132 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 839b886f9b732b15
15:52:17.133 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1635490337129
15:52:17.134 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer started
15:52:17.143 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
15:52:17.377 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
15:52:17.377 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -1.
15:52:17.400 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=0) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='2.8.1')
15:52:17.430 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=0): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=8), ApiVersion(apiKey=1, minVersion=0, maxVersion=11), ApiVersion(apiKey=2, minVersion=0, maxVersion=5), ApiVersion(apiKey=3, minVersion=0, maxVersion=9), ApiVersion(apiKey=4, minVersion=0, maxVersion=4), ApiVersion(apiKey=5, minVersion=0, maxVersion=3), ApiVersion(apiKey=6, minVersion=0, maxVersion=6), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=7), ApiVersion(apiKey=10, minVersion=0, maxVersion=3), ApiVersion(apiKey=11, minVersion=0, maxVersion=7), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=4), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=5), ApiVersion(apiKey=20, minVersion=0, maxVersion=4), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=3), ApiVersion(apiKey=23, minVersion=0, maxVersion=3), ApiVersion(apiKey=24, minVersion=0, maxVersion=1), ApiVersion(apiKey=25, minVersion=0, maxVersion=1), ApiVersion(apiKey=26, minVersion=0, maxVersion=1), ApiVersion(apiKey=27, minVersion=0, maxVersion=0), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=2), ApiVersion(apiKey=30, minVersion=0, maxVersion=2), ApiVersion(apiKey=31, minVersion=0, maxVersion=2), ApiVersion(apiKey=32, minVersion=0, maxVersion=3), ApiVersion(apiKey=33, minVersion=0, maxVersion=1), ApiVersion(apiKey=34, minVersion=0, maxVersion=1), ApiVersion(apiKey=35, minVersion=0, maxVersion=2), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=2), ApiVersion(apiKey=38, minVersion=0, maxVersion=2), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=2), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=0), ApiVersion(apiKey=49, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[])
15:52:17.480 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node -1 has finalized features epoch: -1, finalized features: [], supported features: [], API versions: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 3 [usable: 3], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 7], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 3 [usable: 3], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 3 [usable: 3], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 2], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 2 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 [usable: 0], AlterClientQuotas(49): 0 [usable: 0], DescribeUserScramCredentials(50): UNSUPPORTED, AlterUserScramCredentials(51): UNSUPPORTED, AlterIsr(56): UNSUPPORTED, UpdateFeatures(57): UNSUPPORTED, DescribeCluster(60): UNSUPPORTED, DescribeProducers(61): UNSUPPORTED).
15:52:17.482 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='karim-topic')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node ...:9092 (id: -1 rack: null)
15:52:17.483 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=producer-1, correlationId=1) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='karim-topic')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
15:52:17.486 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=producer-1, correlationId=1): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=1, host='...', port=1146, rack=null)], clusterId='ig7CcFmqSeCTqaj1ozuCEA', controllerId=1, topics=[MetadataResponseTopic(errorCode=0, name='karim-topic', topicId=AAAAAAAAAAAAAAAAAAAAAA, isInternal=false, partitions=[MetadataResponsePartition(errorCode=0, partitionIndex=0, leaderId=1, leaderEpoch=0, replicaNodes=[1], isrNodes=[1], offlineReplicas=[])], topicAuthorizedOperations=-2147483648)], clusterAuthorizedOperations=-2147483648)
15:52:17.490 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Updating last seen epoch for partition karim-topic-0 from null to epoch 0 from new metadata
15:52:17.494 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: ig7CcFmqSeCTqaj1ozuCEA
15:52:17.495 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='ig7CcFmqSeCTqaj1ozuCEA', nodes={1=...:9092 (id: 1 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=karim-topic-0, leader=Optional[1], leaderEpoch=Optional[0], replicas=1, isr=1, offlineReplicas=)], controller=...:9092 (id: 1 rack: null)}
15:52:17.507 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node ...:9092 (id: 1 rack: null) using address /...
15:52:17.509 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1
15:52:17.509 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node 1. Fetching API versions.
15:52:17.509 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node 1.
15:52:17.509 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=2) and timeout 30000 to node 1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='2.8.1')
15:52:17.511 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received API_VERSIONS response from node 1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=2): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=8), ApiVersion(apiKey=1, minVersion=0, maxVersion=11), ApiVersion(apiKey=2, minVersion=0, maxVersion=5), ApiVersion(apiKey=3, minVersion=0, maxVersion=9), ApiVersion(apiKey=4, minVersion=0, maxVersion=4), ApiVersion(apiKey=5, minVersion=0, maxVersion=3), ApiVersion(apiKey=6, minVersion=0, maxVersion=6), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=7), ApiVersion(apiKey=10, minVersion=0, maxVersion=3), ApiVersion(apiKey=11, minVersion=0, maxVersion=7), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=4), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=5), ApiVersion(apiKey=20, minVersion=0, maxVersion=4), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=3), ApiVersion(apiKey=23, minVersion=0, maxVersion=3), ApiVersion(apiKey=24, minVersion=0, maxVersion=1), ApiVersion(apiKey=25, minVersion=0, maxVersion=1), ApiVersion(apiKey=26, minVersion=0, maxVersion=1), ApiVersion(apiKey=27, minVersion=0, maxVersion=0), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=2), ApiVersion(apiKey=30, minVersion=0, maxVersion=2), ApiVersion(apiKey=31, minVersion=0, maxVersion=2), ApiVersion(apiKey=32, minVersion=0, maxVersion=3), ApiVersion(apiKey=33, minVersion=0, maxVersion=1), ApiVersion(apiKey=34, minVersion=0, maxVersion=1), ApiVersion(apiKey=35, minVersion=0, maxVersion=2), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=2), ApiVersion(apiKey=38, minVersion=0, maxVersion=2), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=2), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=0), ApiVersion(apiKey=49, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[])
15:52:17.511 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 1 has finalized features epoch: -1, finalized features: [], supported features: [], API versions: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 3 [usable: 3], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 7], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 3 [usable: 3], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 3 [usable: 3], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 2], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 2 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 [usable: 0], AlterClientQuotas(49): 0 [usable: 0], DescribeUserScramCredentials(50): UNSUPPORTED, AlterUserScramCredentials(51): UNSUPPORTED, AlterIsr(56): UNSUPPORTED, UpdateFeatures(57): UNSUPPORTED, DescribeCluster(60): UNSUPPORTED, DescribeProducers(61): UNSUPPORTED).
15:52:17.518 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=producer-1, correlationId=3) and timeout 30000 to node 1: {acks=1,timeout=30000,partitionSizes=[karim-topic-0=92]}
15:52:17.520 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=producer-1, correlationId=3): ProduceResponseData(responses=[TopicProduceResponse(name='karim-topic', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=10, logAppendTimeMs=-1, logStartOffset=2, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
15:52:17.525 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
15:52:17.525 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
15:52:17.529 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Shutdown of Kafka producer I/O thread has completed.
15:52:17.529 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
15:52:17.529 [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
15:52:17.530 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
15:52:17.530 [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
15:52:17.530 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer has been closed

Process finished with exit code 0

5. Kafka Console consumer

💬 console view

[karim@kafka_single bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic karim-topic
Karim velog gooooooooood
profile
나도 보기 위해 정리해 놓은 벨로그

0개의 댓글