한글로 작성된 스프링 카프카 자료가 부족하여 직접 카프카 명령어를 스프링에서 실행하도록 작성하였다.
public String createKafkaTopic(String topicName){
String command = createTopicPrefix
+ " " + topicName + " "
+ createConsumerSuffix;
ProcessBuilder pb = new ProcessBuilder("sh", "-c", command);
return runScript(pb);
}
public String runScript(ProcessBuilder pb){
try {
// Run script
Process process = pb.start();
// Read output
StringBuilder output = new StringBuilder();
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
output.append(line + "\n");
}
System.out.println(output.toString());
return output.toString();
}catch (Exception e){
e.printStackTrace();
return e.getMessage();
}
}
위 방식의 문제점으로는 스프링 카프카의 예외처리를 사용할 수 없어 매 번 카프카 명령어를 수행하고 나서 정상적으로 실행되었는지 확인하는 과정과, 예외 처리를 직접 구현해야 한다는 것이다.
토픽을 생성하는 코드를 다음과 같이 수정하였다.
public void createKafkaTopic(Integer topicNum){
try(AdminClient client = AdminClient.create(kafkaAdmin.getConfigurationProperties())){
try {
String topic = "chat." + topicNum;
client.createTopics(Collections.singleton(
TopicBuilder.name(topic).partitions(1).replicas(1).build())).all().get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}catch (TopicExistsException e){
// 토픽이 이미 존재 예외처리 추가
}
}
}