
여러 Event를 만들어 던지고, 또한 조건에 따라 EventListener를 이용하여 받을 때, 하나의 EventBus를 사용하면 코드가 간결해질 수 있습니다. 이런 IDEA로 만들어진 것이 Guava의 EventBus입니다. Guava 라이브러리의 EventBus를 사용하면 굉장히 쉽고 간단하게 Publish/Subscribe 방식의 이벤트 처리를 구현할 수 있습니다.
하나의 EventBus를 생성 후, Listener interface를 상속하지 않은 pojo 클래스를 하나 생성하고 이벤트를 받을 메소드에 @Subscribe 를 추가하면 됩니다. 클래스에 @Subscribe 를 추가만 해도 Event는 전달되도록 되어 있습니다.
그리고 EventBus.post()의 인자 type에 따라 Event를 전달하는 방식이라, 객체 지향적으로 코드를 개발할 수 있습니다.
[pom.xml]
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
EventBus는 이벤트를 처리할 각 이벤트 리스너를 등록하고 각 리스너에게 이벤트를 전파하는 역할을 수행합니다. EventBus 객체는 다음과 같이 생성자를 호출해서 간단하게 생성할 수 있습니다.
EventBus eventBus = new EventBus();
혹은 생성자 파라미터에 문자열 값을 넘겨서 이벤트버스의 이름을 지정할 수 있습니다.
EventBus eventBus = new EventBus("testEventBus");
참고로 이름을 설정하지 않으면 기본 이름은 "default"로 설정됩니다. 만약 스프링 환경에서 사용한다면 이벤트버스를 빈으로 등록해 사용하면 됩니다.
이벤트 리스너는 이벤트를 받아 처리하는 객체입니다. 이벤트 리스너를 만들기 위해 특정 interface를 구현할 필요는 없습니다. 아래 코드처럼 이벤트를 처리하는 메서드를 구현하고 @Subscibe (com.google.common.eventbus.Subscribe) 어노테이션을 달아주면 해당 메서드가 이벤트 핸들러 메서드의 역할을 하게됩니다. 여기서는 단순하게 String으로된 이벤트를 처리하는 리스너를 예로 들었습니다.
public class StringEventListener {
@Subscribe
public void handle(String event) {
System.out.println("string event received : " + event);
}
}
이벤트 리스너를 만들기 위해서 특정 타입을 강요하지 않기 때문에 상당히 유연한 장점이 있습니다. 단 아래 사항을 주의해서 구현해야 합니다.
이벤트 객체는 반드시 primitive 타입이 아닌 reference 타입의 객체여야 한다.
핸들러 메서드는 이벤트 객체를 받는 단 하나의 파라미터만 존재해야 한다.
핸들러 메서드의 시그니처는 public void methodName(Object event) 형태여야 한다.
이렇게 구현한 이벤트 리스너를 eventBus의 register() 메서드로 등록하게 되면 이벤트 구독을 할 수 있는 상태가 됩니다.
// 이벤트 리스너를 이벤트버스에 등록
eventBus.register(new StringEventListener());
EventBus의 post() 메서드로 이벤트를 발행할 수 있습니다. 이벤트 버스에 이벤트를 발행하면 해당 이벤트를 구독하고 있는 이벤트 리스너에서 이벤트를 받아 처리하게 됩니다.
@Test
public void postEvent() {
EventBus eventBus = new EventBus("testEventBus");
eventBus.register(new StringEventListener());
eventBus.post("TEST EVENT");
}
// 출력
// string event received : TEST EVENT
String, Integer, Long 같은 기본 타입 뿐만 아니라 직접 이벤트 객체를 정의해서 발행, 구독 하는것도 가능합니다.
먼저 커스텀 이벤트를 하나 정의합니다.
@Getter
@AllArgsConstructor
public class MyEvent {
private String value;
}
커스텀 이벤트를 처리할 이벤트 리스너도 하나 만듭니다.
public class MyEventListener {
@Subscribe
public void handle(MyEvent myEvent) {
System.out.println("MyEvent : " + myEvent.getValue());
}
}
앞의 예제와 마찬가지로 이벤트 버스에 리스너를 등록하여 커스텀 이벤트를 발행 할 수 있습니다.
public class EventBusTest {
public static void main(String[] args) {
EventBus eventBus = new EventBus("testEventBus");
eventBus.register(new MyEventListener());
eventBus.post(new MyEvent("test"));
}
}
// MyEvent : test
어떤 이벤트가 발행 되었는데 만약 이를 처리할 수 있는 이벤트 리스너가 없다면, 이벤트 버스는 이를 죽은 이벤트로 간주하고 DeadEvent 라는 클래스로 감싸서 발행합니다. 아래처럼 DeadEvent 처리를 위한 이벤트 리스너를 등록하면 이렇게 죽은 이벤트들도 놓치지 않고 다시 처리할 수 있습니다.
public class DeadEventListener {
@Subscribe
public void handle(DeadEvent deadEvent) {
System.out.println("dead event posted");
System.out.println("dead event : " + deadEvent.getEvent());
System.out.println("dead event source : " + deadEvent.getSource());
}
}
deadEvent.getEvent() : 발행된 이벤트 객체
deadEvent.getSource() : 이벤트가 발행된 이벤트 버스
public class EventBusTest {
public static void main(String[] args) {
EventBus eventBus = new EventBus("myEventBus");
eventBus.register(new MyEventListener());
eventBus.register(new DeadEventListener());
eventBus.post(new MyEvent("test"));
eventBus.post("this is dead event");
}
}
위의 코드를 보면 마지막에 String 타입의 이벤트를 발행하지만 이를 처리할 수 있는 리스너가 등록되지 않았기 때문에 DeadEvent로 처리가 됩니다.
package co.kr.dean.testapi.consumer;
import co.kr.dean.testapi.event.AsyncWorkerService;
import co.kr.dean.testapi.event.MessageListener;
import com.google.common.eventbus.AsyncEventBus;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
@Slf4j
public class TestConsumer {
private final ExecutorService executor = Executors.newFixedThreadPool(5);
@Autowired
AsyncWorkerService asyncWorkerService;
@KafkaListener(topics = {"testapi-test-topic"})
public void testapiTestTopic(ConsumerRecord<String, String> consumerRecord) {
String topicName = consumerRecord.topic();
String recordValue = consumerRecord.value();
log.info("[BEG] [consumer] {} : {}", topicName, recordValue);
// guava event bus 를 통한 비동기 처리
AsyncEventBus asyncEventBus = new AsyncEventBus(topicName, executor);
asyncEventBus.register(new MessageListener());
asyncEventBus.post(recordValue);
// spring annotation 을 통한 비동기 처리
asyncWorkerService.method1(recordValue);
asyncWorkerService.method2(recordValue);
log.info("[END] [consumer] {} : {}", topicName, recordValue);
}
}
package co.kr.dean.testapi.event;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.DeadEvent;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class MessageListener {
@AllowConcurrentEvents
@Subscribe
public void receive(String message) throws InterruptedException {
log.info("[BEG] : {}", message);
TimeUnit.SECONDS.sleep(1L);
log.info("[END] : {}", message);
}
@Subscribe
public void handle(DeadEvent deadEvent) {
log.error("dead event : {} , {}", deadEvent.getEvent(), deadEvent.getSource());
}
}