@Service
public class ConsumerManager {
private Properties props;
@Value("${bootstrap.servers}")
private String bootstrapServers;
private static List<ConsumerWorker> workers = new ArrayList<>();
@PostConstruct
private void init(){
props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("sasl.mechanism", "PLAIN");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", StringDeserializer.class.getName());
props.setProperty("auto.offset.reset", "earliest");
}
public Boolean addConsumerWorker(Long chattingRoomId, Long userId){
try {
ConsumerWorker worker = new ConsumerWorker(props, chattingRoomId, userId);
workers.add(worker);
Thread thread = new Thread(worker);
thread.start();
}catch(Exception e){
return false;
}
return true;
}
public Boolean subConsumerWorker(Long chattingRoomId, Long userId){
String threadName = "consumer-thread" + chattingRoomId + userId;
for(ConsumerWorker worker : workers){
if(threadName.equals(worker.getThreadName())){
worker.wakeup();
workers.remove(worker);
return true;
}
}
return false;
}
}
@RequiredArgsConstructor
@ToString
public class ConsumerWorker implements Runnable{
Logger log = LoggerFactory.getLogger(ConsumerWorker.class.getSimpleName());
private Properties prop;
private String topic;
private String threadName;
private KafkaConsumer<String, String> consumer;
public ConsumerWorker(Properties prop, Long chattingRoomId, Long userId) {
this.prop = prop;
// 각 워커마다 다른 group.id를 부여하지 않으면 파티션을 공유하게 된다.
prop.setProperty("group.id", String.valueOf(chattingRoomId) + userId);
this.topic = "test" + chattingRoomId;
this.threadName = "consumer-thread" + chattingRoomId + userId;
}
@Override
public void run() {
Thread.currentThread().setName(threadName);
consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singletonList(topic));
try{
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String, String> record : records){
log.info("{}", record);
}
}
}catch (WakeupException e){
log.info(this.threadName + " wakeup");
}catch (Exception e){
log.error(e.getMessage(), e);
}finally {
log.info("컨슈머 종료");
consumer.close();
}
}
public void wakeup(){
consumer.wakeup();
}
public String getThreadName() {
return threadName;
}
}
@SpringBootTest
@AutoConfigureMockMvc
class ConsumerManagerTest {
@Autowired
private ConsumerManager consumerManager;
Logger log = LoggerFactory.getLogger(ConsumerManagerTest.class.getSimpleName());
@Test
@Order(1)
@DisplayName("워커 생성 테스트")
void addConsumerWorkerTest() {
log.info("addConsumerWorkerTest");
//given
Long chattingRoomId = 1L;
Long userId = 1L;
//when
Boolean isAdded = consumerManager.addConsumerWorker(chattingRoomId, userId);
//then
assertTrue(isAdded);
}
@Test
@Order(2)
@DisplayName("워커 삭제 테스트")
void subConsumerWorkerTest() {
//given
Long chattingRoomId = 1L;
Long userId = 1L;
//when
Boolean isRemoved = consumerManager.subConsumerWorker(chattingRoomId, userId);
//then
assertTrue(isRemoved);
}
}
위 테스트코드를 실행하면, 컨슈머가 null이므로 wakeup() 메소드를 실행할 수 없다고 함.
테스트코드에서 addConsumerWorker() 다음으로 subConsumerWorker()를 호출했더라도, run() 메서드 내에서 컨슈머가 초기화되므로 wakeup() 메서드를 실행할 때 컨슈머가 생성되지 않았을 수 있기 때문으로 보인다.