스프링부트 실행 후 Kafka Topic 추가하기

함궈·2023년 6월 23일

한글로 작성된 스프링 카프카 자료가 부족하여 직접 카프카 명령어를 스프링에서 실행하도록 작성하였다.

public String createKafkaTopic(String topicName){
        String command = createTopicPrefix 
        	+ " " + topicName + " "
            + createConsumerSuffix;

        ProcessBuilder pb = new ProcessBuilder("sh", "-c", command);
        return runScript(pb);
    }
public String runScript(ProcessBuilder pb){
        try {
            // Run script
            Process process = pb.start();

            // Read output
            StringBuilder output = new StringBuilder();
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(process.getInputStream()));

            String line;
            while ((line = reader.readLine()) != null) {
                output.append(line + "\n");
            }

            System.out.println(output.toString());

            return output.toString();

        }catch (Exception e){
            e.printStackTrace();
            return e.getMessage();
        }
    }

위 방식의 문제점으로는 스프링 카프카의 예외처리를 사용할 수 없어 매 번 카프카 명령어를 수행하고 나서 정상적으로 실행되었는지 확인하는 과정과, 예외 처리를 직접 구현해야 한다는 것이다.

토픽을 생성하는 코드를 다음과 같이 수정하였다.

public void createKafkaTopic(Integer topicNum){
        try(AdminClient client = AdminClient.create(kafkaAdmin.getConfigurationProperties())){

            try {
                String topic = "chat." + topicNum;
                client.createTopics(Collections.singleton(
                        TopicBuilder.name(topic).partitions(1).replicas(1).build())).all().get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();

            }catch (TopicExistsException e){
                // 토픽이 이미 존재 예외처리 추가
            }
        }
    }

0개의 댓글