0,1,all(또는 -1) 값을 가질 수 있다.
프로듀서가 전송한 데이터가 카프카 클러스터에 얼마나 신뢰성 높게 저장할 지 지정하는것
카프카 복제 개수가 2 이상으로 운영하는 경우에만 의미가 있다.(1인경우 성능차이가 없음)
acks=0
프로듀서가 리더 파티션으로 데이터를 전송했을때 리더 파티션으로 데이터가 저장되었는지 확인하지 않겠다는 의미
프로듀서가 데이터를 전송한 후 이 데이터가 몇 번째 오프셋에 저장되었는지 확인할 수 없음
전송이 실패한 경우를 알 수 없기 때문에 재시도 옵션인 retries도 의미가 없음
데이터 전송 속도는 1,all 경우보다 훨씬 빠르다. 데이터 유실이 발생하더라도 전송 속도가 중요한 경우에 사용
acks=1
정상적으로 적재되었는지 확인전송 속도가 느림데이터는 유실될 수 있음acks=all 또는 acks=-1
정상적으로 적재되었는지 확인일부 브로커 장애가 발생하더라도 프로듀서는 안전하게 데이터를 전송하고 저장하고 있음을 보장할 수 있다.동일한 데이터를 여러번 전송하더라도 카프카 클러스터에 단한 번만 저장됨을 의미한다.데이터를 병렬처리하기 위해서 파티션 개수와 컨슈머 개수를 동일하게 맞추는 것이 가장 좋은 방법이다.고려할 점이 매우 많지만 안정/지속적으로 운영할 수 있는 경우 매우 효율적으로 운영할 수 있다.멀티코어 CPU를 가진 서버 환경에서 멀티 컨슈머 스레드를 운영하면 제한된 리소스 내에서 최상의 성능을 발휘할 수 있다. 컨슈머를 멀티스레드로 활용하는 방식은 크게 2가지로 나뉜다.컨슈머 스레드는 1개만 실행하고 데이터 처리를 담당하는 워커 스레드를 여러개 실행 하는 방법(멀티 워커 스레드 전략)컨슈머 멀티 스레드 전략 
poll()을 통해 받은 데이터를 병렬처리함으로써 속도의 이점을 확실히 얻을 수 있다.
데이터 처리가 스레드에서 진행 중임에도 불구하고 다음 poll() 메서드를 호출 시 커밋을 할 수 있기 때문에 리밸런싱, 컨슈머 장애 발생시 데이터 유실이 발생할 수 있다.
스레드의 처리시간이 달라 레코드의 순서가 뒤바뀌는 현상이 발생할 수도 있다.
데이터 순서 상관없이 빠른 처리 속도가 필요한 데이터 처리에 적합
ex) 서버리소스 모니터링 파이프라인, IOT 서비스의 센서 데이터 수집 파이프라인
public class ConsumerWorker implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
private String recordValue;
ConsumerWorker(String recordValue){
this.recordValue = recordValue;
}
@Override
public void run(){
logger.info("thread:{}\trecord:{}", Thread.currentThread().getName(), ` recordValue)
}
}
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
ExecutorService executorService = Executors.newCachedThreadPool();
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord<String, String> record : records){
ConsumerWorker worker = new ConsumerWorker(record.value());
executorService.execute(worker);
}
}

하나의 파티션은 동일 컨슈머 중 최대 1개까지 할당된다. 이런 특징을 가장 잘 살리는 방법은 1개의 애플리케이션에 구독하고자 하는 토픽의 파티션 개수만큼 컨슈머 스레드 개수를 늘려서 운영하는 것이다.
컨슈머 스레드가 파티션 개수보다 많아지면 할당할 파티션 개수가 더는 없으므로 파티션에 할당되지 못한 컨슈머 스레드는 데이터 처리를 하지 않게된다.
public class ConsumerWorker implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
private Properties prop;
private String topic;
private String threadName;
private KafkaConsumer<String, String> consumer;
ConsumerWorker(Properties prop, String topic, int number){
this.prop = prop;
this.topic = topic;
this.number = number;
}
@Override
public void run(){
consumer = new ConsumerWorker<(prop);
consumer.subscribe(Arrays.asList(topic));
while(true){
ConsumerRecord<String, String>records = consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String, String> record : records){
logger.info("{}", record);
}
}
}
}
public class MultiConsumerThread {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
private final static String GROUP_ID = "test-group";
private final static String CONSUMER_COUNT = "3";
public static void main(String[] args){
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
ExecutorService executorService = Executors.newCachedThreadPool();
for(int i=0; i<CONSUMER_COUNT; i++){
ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
executorService.execute(worker);
}
}
}