임시

함궈·2023년 10월 31일
0

아파치카프카

목록 보기
10/10

ConsumerManager.class


@Service
public class ConsumerManager {


    private Properties props;

    @Value("${bootstrap.servers}")
    private String bootstrapServers;

    private static List<ConsumerWorker> workers = new ArrayList<>();
  

    @PostConstruct
    private void init(){
        props = new Properties();

        props.setProperty("bootstrap.servers", bootstrapServers);
        props.setProperty("sasl.mechanism", "PLAIN");
        props.setProperty("key.deserializer", StringDeserializer.class.getName());
        props.setProperty("value.deserializer", StringDeserializer.class.getName());
        props.setProperty("auto.offset.reset", "earliest");
    }

    public Boolean addConsumerWorker(Long chattingRoomId, Long userId){

        try {
            ConsumerWorker worker = new ConsumerWorker(props, chattingRoomId, userId);

            workers.add(worker);

            Thread thread = new Thread(worker);
            thread.start();

        }catch(Exception e){
            return false;
        }

        return true;
    }

    public Boolean subConsumerWorker(Long chattingRoomId, Long userId){

        String threadName = "consumer-thread" + chattingRoomId + userId;

        for(ConsumerWorker worker : workers){
            if(threadName.equals(worker.getThreadName())){

                worker.wakeup();
                workers.remove(worker);

                return true;
            }
        }

        return false;
    }
}

ConsumerWorker.class

@RequiredArgsConstructor
@ToString
public class ConsumerWorker implements Runnable{

    Logger log = LoggerFactory.getLogger(ConsumerWorker.class.getSimpleName());

    private Properties prop;
    private String topic;

    private String threadName;
    private KafkaConsumer<String, String> consumer;


    public ConsumerWorker(Properties prop, Long chattingRoomId, Long userId) {
        this.prop = prop;

        // 각 워커마다 다른 group.id를 부여하지 않으면 파티션을 공유하게 된다.
        prop.setProperty("group.id", String.valueOf(chattingRoomId) + userId);

        this.topic = "test" + chattingRoomId;
        this.threadName = "consumer-thread" + chattingRoomId + userId;
    }

    @Override
    public void run() {
    
        Thread.currentThread().setName(threadName);

        consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singletonList(topic));
        
        try{
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

                for(ConsumerRecord<String, String> record : records){
                    log.info("{}", record);
                }
            }
        }catch (WakeupException e){
            log.info(this.threadName + " wakeup");
        }catch (Exception e){
            log.error(e.getMessage(), e);
        }finally {
            log.info("컨슈머 종료");
            consumer.close();
        }
    }

    public void wakeup(){
        consumer.wakeup();
    }

    public String getThreadName() {
        return threadName;
    }

}

ConsumerManagerTest.class


@SpringBootTest
@AutoConfigureMockMvc
class ConsumerManagerTest {

    @Autowired
    private ConsumerManager consumerManager;

    Logger log = LoggerFactory.getLogger(ConsumerManagerTest.class.getSimpleName());

    @Test
    @Order(1)
    @DisplayName("워커 생성 테스트")
    void addConsumerWorkerTest() {
        log.info("addConsumerWorkerTest");

        //given
        Long chattingRoomId = 1L;
        Long userId = 1L;

        //when
        Boolean isAdded = consumerManager.addConsumerWorker(chattingRoomId, userId);

        //then
        assertTrue(isAdded);
    }

    @Test
    @Order(2)
    @DisplayName("워커 삭제 테스트")
    void subConsumerWorkerTest() {

        //given
        Long chattingRoomId = 1L;
        Long userId = 1L;

        //when
        Boolean isRemoved = consumerManager.subConsumerWorker(chattingRoomId, userId);

        //then
        assertTrue(isRemoved);
    }

}

위 테스트코드를 실행하면, 컨슈머가 null이므로 wakeup() 메소드를 실행할 수 없다고 함.

테스트코드에서 addConsumerWorker() 다음으로 subConsumerWorker()를 호출했더라도, run() 메서드 내에서 컨슈머가 초기화되므로 wakeup() 메서드를 실행할 때 컨슈머가 생성되지 않았을 수 있기 때문으로 보인다.

0개의 댓글