디자인 패턴 공부 - 옵저버 패턴

이혁진·2023년 1월 30일
0

옵저버 패턴

옵저버 패턴은 여러 객체가 특정 객체의 상태 변화를 감지하고 알림을 받는 패턴이다. 정확히 말하면 객체 관찰하고자 하는 객체에 여러 옵저버를 등록해놓고, 상태 변화가 있을 때마다 메서드를 호출(=메시지를 전송)하여 옵저버에게 직접 통지하는 것이다. 원래는 상태 변화를 감지하고 이에 따라 처리를 하는 경우에, 처리를 하는 측에서 상태를 계속해서 관찰해야 했다. 이 경우에, 불필요하게 상태를 관찰하는 작업(상태 변화가 없는 경우)이 일어나기 마련이다. 옵저버 패턴은 옵저버가 관찰하는 것이 아닌, 관찰 대상이 상태 변화를 통보하는 식으로 설계하여 그러한 문제를 해결한다.

구현

일단 이렇게 생겼다.

observerCollection이 옵저버 집합이다. register와 unregister을 통해 등록하고 해지할 수 있다. notifyObservers()는 어떤 상태 변화가 Subject에 가해질 때, 그 상태를 변화하는 메소드 내에서 호출될 수 있도록 한다. 여기에서 observer들에게 notify()라는 메시지를 전달(호출)한다.(updated()라는 작명이 나은 것 같다.) 인자로 이벤트 발행자나 여타 정보들이 전달될 수 있다. 그러면 그 안에서 구상 옵저버들의 응답이 수행된다. 또한, register와 unregister 이외에도 임시로 이벤트 알림을 멈추거나 재개하여 이벤트가 엄청 많을 때 이를 조절해주는 메소드를 구현하기도 한다.

주의해야 할 사항은 순환 실행이다. 옵저버의 핸들러(콜백, notify())는 종종 Subject의 상태를 바꾸기도 하는데, 이것으로 인해서 또 옵저버에 알림이 오고, 핸들러가 계속해서 호출되는 상황이 일어날 수 있다. 이를 막는 메커니즘이 필요할 수도 있다.

옵저버 패턴은 크게 두 가지 방식이 있다. pull과 push방식이 그것인데, 각각을 모두 구현해보겠다.

옵저버 패턴 미 적용

Subject에서 onEvent()를 통해서 내부 상태를 변경할 수 있다. getState..()로 상태 정보를 가져올 수 있다. 옵저버들은 상태가 변했는지 알기 위해서 쓰레드로 1초에 한 번씩 상태를 확인해야 한다.

public class Subject {
    private String state1;
    private String state2;
	private String state3;

	// event
	public void onEvent(String state1, String state2, String state3) {
        this.state1 = state1;
	    this.state2 = state2;
    	this.state3 = state3;
	}

	public String getState1() {
    	return state1;
	}

    public String getState2() {
	    return state2;
	}

	public String getState3() {
    	return state3;
	}
}

public class Observer1 implements Runnable {
	private final Subject subject;
	private final long freq;

	public Observer1(Subject subject) {
    	this.subject = subject;
    	this.freq = 1000;
	}

	@Override
	public void run() {
    	while (true) {
        	display(subject.getState1(),
            	    subject.getState2(),
                	subject.getState3());


        	try {
            	Thread.sleep(freq);
        	} catch (InterruptedException e) {
            	e.printStackTrace();
        	}
    	}
	}

	private void display(String state1, String state2, String state3) {
    	String prefix = "[Observer1]";
    	System.out.println(prefix + " state1 : " + state1);
    	System.out.println(prefix + " state2 : " + state2);
    	System.out.println(prefix + " state3 : " + state3);
	}
}

public class Observer2 implements Runnable {
	// Observer1과 동일
}

public class Observer3 implements Runnable {
	// Observer1과 동일
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
	    Subject subject = new Subject();
    	Observer1 observer1 = new Observer1(subject);
    	Observer2 observer2 = new Observer2(subject);
    	Observer3 observer3 = new Observer3(subject);

   	 	Thread thread1 = new Thread(observer1);
    	Thread thread2 = new Thread(observer2);
    	Thread thread3 = new Thread(observer3);

    	thread1.start();
    	thread2.start();
    	thread3.start();

    	Thread.sleep(4000);
    	subject.onEvent("hello1", "hello2", "hello3");
    	Thread.sleep(4000);
    	subject.onEvent("good1", "good2", "good3");
    	Thread.sleep(4000);
    	subject.onEvent("bye1", "bye2", "bye3");
	}
}

Result :

	...
    
	[Observer3] state2 : hello2  <<
	[Observer3] state3 : hello3  <<
	[Observer2] state1 : hello1
	[Observer2] state2 : hello2
	[Observer2] state3 : hello3
	[Observer1] state1 : hello1  
	[Observer1] state2 : hello2
	[Observer1] state3 : hello3
	[Observer3] state1 : hello1
	[Observer3] state2 : hello2  << not changed
	[Observer3] state3 : hello3  << not changed
	[Observer2] state1 : good1
	[Observer2] state2 : good2
	[Observer2] state3 : good3
	[Observer1] state1 : good1
	[Observer1] state2 : good2
	[Observer1] state3 : good3
	[Observer3] state1 : good1
    [Observer3] state2 : good2  << changed
	[Observer3] state3 : good3  << changed
	[Observer2] state1 : good1
	[Observer2] state2 : good2

    ...

계속 상태가 변했는지 체크를 해야 한다. 옵저버 방식은 이렇게 옵저버가 확인하는 것이 아니라, 서브젝트가 통보를 하는 식이다.

push 기반 옵저버

서브젝트에서 오브젝트에게 변화를 통보할 때, 필요한 상태들을 인자로 전달한다. setChanged를 쓰는 이유는 아마 back pressure 라고 불리는 현상 때문인 듯 하다. 뒤에서 다루겠다.

public class Subject {
	private final List<Observer> observerList;
	private String state1;
	private String state2;
	private String state3;
	private boolean changed;

	public Subject() {
    	observerList = new ArrayList<>();
    	changed = false;
	}

	public void addObserver(Observer observer) {
    	observerList.add(observer);
	}

	public void deleteObserver(Observer observer) {
    	observerList.remove(observer);
	}

	public void onEvent(String state1, String state2, String state3) {
    	this.state1 = state1;
    	this.state2 = state2;
    	this.state3 = state3;
    	setChanged();
    	// control back pressure
    	notifyObserver();
	}

	private void notifyObserver() {
    	if(changed) {
        	for(Observer observer : observerList) {
            	observer.update(state1, state2, state3);
        	}
        	changed = false;
    	}
    	else {
        	System.out.println("not changed");
    	}
	}

	private void setChanged() {
    	this.changed = true;
	}
}

public interface Observer {
	public void update(String state1, String state2, String state3);
}

public class Observer1 implements Observer {
	public void display(String state1, String state2, String state3) {
	    String prefix = "[Observer1]";
    	System.out.println(prefix + " state1 : " + state1);
    	System.out.println(prefix + " state2 : " + state2);
    	System.out.println(prefix + " state3 : " + state3);
	}

	@Override
	public void update(String state1, String state2, String state3) {
    	display(state1, state2, state3);
	}
}

public class Observer2 implements Observer {
	// Observer1과 같다.
}

public class Observer3 implements Observer {
	// Observer1과 같다.
}

public class Main {
	public static void main(String[] args) {
    	Subject subject = new Subject();
    	Observer observer1 = new Observer1();
    	Observer observer2 = new Observer2();
    	Observer observer3 = new Observer3();

    	subject.addObserver(observer1);
    	subject.addObserver(observer2);
    	subject.addObserver(observer3);

    	subject.onEvent("a", "b", "c");
    	subject.onEvent("d", "e", "f");
    	subject.onEvent("u", "c", "k");
	}
}

Result :
	[Observer1] state1 : a
	[Observer1] state2 : b
	[Observer1] state3 : c
	[Observer2] state1 : a
	[Observer2] state2 : b
	[Observer2] state3 : c
	[Observer3] state1 : a
	[Observer3] state2 : b
	[Observer3] state3 : c
	[Observer1] state1 : d
    
    ...
    

상태가 변할 때에만 데이터를 확인하고 시각화하는 것을 볼 수 있다.

pull 기반 옵저버

pull 기반 옵저버는 상태 정보를 변화와 함께 통지하지 않는다. 변화 시에는 변화했음을 알리고, 각 옵저버들은 Subject의 getState..()를 활용하여 상태를 가져간다.

public class Subject {
	private final List<Observer> observerList;
	private String state1;
	private String state2;
	private String state3;
	private boolean changed;

	public Subject() {
    	observerList = new ArrayList<>();
    	changed = false;
	}

	public void addObserver(Observer observer) {
    	observerList.add(observer);
	}

	public void deleteObserver(Observer observer) {
    	observerList.remove(observer);
	}

	public void onEvent(String state1, String state2, String state3) {
    	this.state1 = state1;
    	this.state2 = state2;
    	this.state3 = state3;
    	setChanged();
    	// control back pressure
    	notifyObserver();
	}

	private void notifyObserver() {
    	if(changed) {
        	for(Observer observer : observerList) {
            	observer.update();
        	}
        	changed = false;
    	}
    	else {
        	System.out.println("not changed");
    	}
	}

	private void setChanged() {
    	this.changed = true;
	}

	public String getState1() {
    	return state1;
	}

	public String getState2() {
    	return state2;
	}

	public String getState3() {
    	return state3;
	}
}

public interface Observer {
	public void update();
}

public class Observer1 implements Observer {
	private final Subject subject;

	public Observer1(Subject subject) {
    	this.subject = subject;
	}

	public void display(String state1, String state2, String state3) {
    	String prefix = "[Observer1]";
    	System.out.println(prefix + " state1 : " + state1);
    	System.out.println(prefix + " state2 : " + state2);
    	System.out.println(prefix + " state3 : " + state3);
	}

	@Override
	public void update() {
    	display(subject.getState1(), subject.getState2(), subject.getState3());
	}
}

public class Observer2 implements Observer {
	// Observer1 과 같다.
}

public class Observer3 implements Observer {
	// Observer1 과 같다.
}

public class Main {
	public static void main(String[] args) {
    	// push 에서는 각 상태들에 의존했다.
    	// pull 은 객체 자체를 필드로 넘긴다.
    	// 강하게 결합하여, 메시지로써 상태를 딱 맞게 탐색할 수 있지만
    	// 옵저버가 Subject 에 의존하게 된다.

    	Subject subject = new Subject();
    	Observer observer1 = new Observer1(subject);
    	Observer observer2 = new Observer2(subject);
    	Observer observer3 = new Observer3(subject);

    	subject.addObserver(observer1);
    	subject.addObserver(observer2);
    	subject.addObserver(observer3);

    	subject.onEvent("a", "b", "c");
    	subject.onEvent("d", "e", "f");
    	subject.onEvent("g", "h", "i");
    	subject.onEvent("j", "k", "l");
	}
}

pull 방식 옵저버 vs push 방식 옵저버

위에 써있는 것처럼 pull 방식과 push 방식의 차이는 전달할 상태의 캡슐화 여부이다. push에서는 상태 정보가 Subject 뿐만 아니라 Concrete Observer 에도 흩어져 있다. 따라서, 해당 로직의 정책 변화(상태가 늘어나거나 없어지거나)가 여러 클래스에 걸쳐 영향을 미치게 된다. 반면 상태 정보를 캡슐화된 getter를 통해서 가져온다면, 다른 클래스에 미치는 영향이 줄어든다. 무슨 소리냐면

가령 Subject에 새로운 상태인 state4를 추가하고, 옵저버1이 state4를 관찰하여 출력 한다고 해보자.

push 방식일 경우

  • Subject 에서 state4 추가
  • Subject 에서 observer.update(...) 인자 변경
  • Observer 에서 update(...) 매개변수 변경
  • Observer1 에서 update(...) 매개변수 변경 <<
  • Observer2 에서 update(...) 매개변수 변경 <<
  • Observer3 에서 update(...) 매개변수 변경 <<
  • Observer1 에서 display(...) 인자/매개변수 state4 추가

pull 방식일 경우

  • Subject 에서 state4 추가
  • Subject 에서 getter 추가
  • Observer1 에서 display(...) 인자/매개변수 state4 추가

옵저버 수를 n개라 할 때, push는 n+4개, pull은 3개의 수정 포인트가 생긴다고 생각하면 좀 더 와닿는다.

state 변화에 좀 더 유연한 것이 pull 이라면, push 방식은 각 옵저버들이 Subject에 연관 자체를 안하기 때문에, Subject가 뭐가 되든 잘 돌아간다. 다만, 그 상태 정보가 달라진다면 어차피 다 고쳐야 해서 이 방식보다는 pull 방식이 나은 듯 하다? 물론 상황마다 다를 순 있지만, 여기에선 응집도를 높이는 쪽이 좋을 것 같다.

concrte observer vs abstract observer

설계 변화를 그림으로 나타내면 다음과 같다.

너무 당연한 이야기지만, Subject가 옵저버에 연관할 때에는 구상 클래스인 옵저버보다는 인터페이스 옵저버가 유연하다. 추상적인 대상에 연관함으로써 같은 연관 관계라도 보다 느슨한 형태의 관계를 제공한다.(DIC)

자바 내장 옵저버 패턴

한편, 위키피디아에서는 자바에 내장된 Observable과 Observer를 활용하여 주기적으로 사용자의 콘솔 입력을 받는 EventSource와 그러한 입력에 응답하는 ResponseHandler를 구현하여 옵저버 패턴을 설명하기도 한다. 근데 이거 곧 사라진다고 한다.

public class EventSource extends Observable implements Runnable {
	@Override
	public void run() {
    	// 콘솔에서 입력받는 스트림
    	final InputStreamReader isr = new InputStreamReader(System.in);
    	final BufferedReader br = new BufferedReader(isr);

    	// 계속 입력받는다. 이벤트 루프
    	while(true) {
        	try {
            	final String response = br.readLine();
            	// 변경되었음을 알리는 state 를 true 로 바꾸는 듯
            	super.setChanged();
            	// 등록된 옵저버들에게 response 전달하고 콜백 호출
            	super.notifyObservers(response);

        	} catch (IOException e) {
            	e.printStackTrace();
        	}
    	}
	}
}

public class ResponseHandler implements Observer {
	@Override
	public void update(Observable o, Object arg) {
    	if (arg instanceof String response) {
        	System.out.println("\nReceived Response: " + response);
    	}
	}
}

public class Main {
	public static void main(String[] args) {
    	System.out.println("Enter Text >");

    	// 이벤트 발행자를 생성
    	final EventSource evSrc = new EventSource();
    	final ResponseHandler respHandler = new ResponseHandler();

    	evSrc.addObserver(respHandler);

    	// 입력 이벤트를 계속 발행하는 쓰레드 시작.
    	// 이벤트 : 상태 변화 혹은 어떤 사건의 발생
    	// 이벤트 소스 : 마우스, 키보드 입력 .. 등 내외부 입력
    	// 이벤트 알림 : 이벤트 발생 시 이벤트 소비자에게 알린다.(소비자에게 메시지로 전달 = 소비자의 메소드 호출)
    	Thread thread = new Thread(evSrc);
    	thread.start();
	}
}

Result : 
	Enter Text >
	fdsa

	Received Response: fdsa
	fds

	Received Response: fds
	af
    
	Received Response: af
	...
    

역압(backpressure)

챗 GPT에게 물어본 역압 또는 배압이란 이것이다.

스트리밍 시스템의 배압은 데이터 생성 속도가 데이터 소비 속도를 압도하지 않도록 데이터 생성 및 소비 속도를 제어하는 데 사용되는 메커니즘입니다. 즉, 메모리 오버플로, 속도 저하 및 충돌로 이어질 수 있는 시스템의 데이터 축적을 방지하는 데 도움이 됩니다. 배압의 기본 개념은 데이터 생산자가 소비자의 능력을 인식하여 그에 따라 생산 속도를 조정할 수 있도록 하는 것입니다. 이는 흐름 제어, 버퍼링 및 속도 제한과 같은 다양한 기술을 통해 달성할 수 있습니다.

내가 이해하기로는 소비자가 처리할 수 있을 정도로, 이벤트가 생산자 -> 소비자에게 전달되는 흐름과 반대로, 소비자가 생산자에게 파라미터든 뭐든 메세지를 줘서 압력을 조절하는 것 같다.

생산자는 Subject, 소비자는 Observer 라고 보면 되겠다. 생산보다 소비를 느리게 하면 위 코드에서는 다음 소비자에게 이벤트 통지가 지연된다. 딱 그 정도지만, 대규모 시스템에서는

마치 서버가 요청을 너무 많이 받으면 터지는 것처럼 소비자(Consumer)도 터지게 된다.

그래서 시스템이 터지지 않도록 배압을 준다. 방법은 크게 세 가지가 있다고 한다.

  1. 이벤트 발행 속도 자체를 낮춘다. 가장 기본적인 방법이다. 이러면 당연히 소비자가 터지지 않겠지만, 이벤트 발행 속도를 낮추는 것은 시스템의 데이터가 현실을 제대로 반영하지 못하게 한다. 무슨 소리냐면, 온도를 관측하는 시스템에서 변화의 발행을 느리게 해보자. 몇개를 빼먹고 처리를 하니까 시스템 정확도가 당연히 줄어든다. 만약 관측 데이터가 별로 변하지 않거나 정확도가 중요하지 않은 경우 좋은 방법이 될 수 있겠다.

  2. 이벤트를 버퍼링한다. 데이터가 들어오면 전부 저장해놓고, 적절한 속도로 소비자에게 전달한다. 이러면 이벤트가 유실되지 않는다. 하지만 이벤트를 저장하느라 메모리가 터질 수 있다.

  3. 발생한 이벤트 중 소비자가 해결하지 못하는 것들을 버린다. 이건 1번과 비슷한 것 같다.

1번은 해볼 수 있겠는데, 나머지는 너무 깊게 들어가는 것 같다. 이벤트 객체를 만들고 큐에다가 담으면 되겠는데, 그러면 또 옵저버 패턴이 크게 바뀌기 때문에 힘들 것 같다.

public class Subject {
	...
    private boolean changed;
    private long period_millis;
    private long pre_time;
    
    public Subject() {
    	...
        changed = false;
        period_millis = 1000; // can be injected by Observer
        pre_time = System.currentTimeMillis();
    }
    
    ...
    
    public void onEvent(...) {
    	...
		// 시간을 측정한다.
		long cur_time = System.currentTimeMillis();
    
		// 이전 이벤트 실행 시간이랑 1000밀리초(기본) 이상 차이나면 변화 플래그 True
		// 그리고 이전 이벤트 실행 시간을 갱신
		if(cur_time - pre_time > period_millis) {
			setChanged();
			pre_time = cur_time;
		}
    
		// controlled back pressure
		notifyObserver();
	}
}

이러면 되지 않을까 싶다. 그리고 subscribe를 할 때 period_millis를 인자로 넘기든 setter로 넘기든 해서 역압을 가하면 될 것 같다.

아무튼 이런 의도로 책이나 위키피디아 같은 데에서 notify와 changed를 분리한 것 같다.

그런 줄 알았는데, 진짜 그런 용도라고 한다.

리액티브 프로그래밍은 옵저버 패턴의 서브젝트와 옵저버의 개념과 유사한 발행자와 구독자를 사용해 데이터를 통지하고 처리합니다. 데이터를 제공하는 측에서 데이터를 소비하는 측에 통지하는 방식을 일반적으로 푸시 기반 (Push-Based)이라고 부릅니다. 리액티브 프로그래밍은 옵저버 패턴의 단순한 데이터 통지 기능에 더해서 백프레셔라는 특징을 통해 데이터를 처리하는 측에서 데이터를 전달받는 개수를 역으로 요청할 수 있습니다. 그러므로 데이터를 통지하는 주체의 통지 속도를 제어하고 처리하는 측에서는 자신이 처리 가능한 만큼의 데이터만 받아서 처리할 수 있게 됩니다.

또, 저기 request(10) 하는게 아마 period_millis의 setter인 듯하다. 물론 period_millis는 밀리초 단위이지만, 이벤트 처리 속도를 통해 이벤트 사이즈로 변환하면 될 것 같다.

아무튼 정리하면, 발행자가 구독자보다 이벤트 발행하는 속도가 빠를 때, 구독자에서 발행자에 역압을 가해 이벤트 발행? 량을 조절하고 시스템의 붕괴를 막는다.(버퍼를 쓰지 않고) 그 방식으로 옵저버 패턴에 기반해 여러 비동기적인 요소 간 상호작용하는 것을 리액티브 스트림이라 한다. 자바에서는 Flow API를 활용해서 이를 구현할 수 있다고 한다.

장점

불필요한 상태 확인을 없애고, 유연한 시스템을 설계할 수 있다.

Pub-Sub 패턴과의 관계

옵저버 패턴은 이벤트 기반 아키텍쳐를 구현하는 방식 중 하나인 발행-구독 패턴과 유사하다. 하지만 두 패턴은 다르다. 브로커라고 하는 중간 계층의 유무가 그 차이인데

옵저버 패턴은 일단 인터페이스라고 하더라도, Subject가 Observer를 알기는 해야 한다. 그래야 Observer 목록의 추가/삭제를 구현하고 그들에게 통보를 할 수 있기 때문이다. 반면, PubSub 패턴은 Publisher와 Subscriber가 서로 전혀 모르고, 브로커만을 알고 있다. 그리고 모든 작업을 브로커를 통해 수행한다. 가령 Publisher는 이벤트의 발행을 브로커에 해 놓으면, Subscriber는 누가 발행했는지 모를 이벤트를 토픽만 정해서 가져오는 것이다. 서로 의존하지 않기에 보다 느슨하게 결합되었다고 할 수도 있겠다. 그 브로커 중 하나가 Kafka이다.

발행-구독 모델은 비동기 메시징 패러다임이다. 발행-구독 모델에서 발신자의 메시지는 특별한 수신자가 정해져 있지 않다. 대신 발행된 메시지는 정해진 범주에 따라, 각 범주에 대한 구독을 신청한 수신자에게 전달된다. 수신자는 발행자에 대한 지식이 없어도 원하는 메시지만을 수신할 수 있다. 이러한 발행자와 구독자의 디커플링은 더 다이나믹한 네트워크 토폴로지와 높은 확장성을 허용한다.

또, 이벤트 기반 아키텍처를 구현하는 패턴이 두 가지 있단다.

메시징 큐 방식(RabbitMQ, ActiveMQ)에서는 브로커가 구독자에게 메시지를 push한다면, Pub/Sub(=스트리밍, Kafka)에서는 구독자가 직접 메시지를 가져가는 pull방식이다. 가져오는 양을 구독자가 조절할 수 있고, 보다 느슨하다고(메시징 큐 기반은 브로커가 구독자를 알아야 함) 한다.

	<Producer>
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
    for (int i = 0; i < 5; i++) {
        producer.send(new ProducerRecord<String, String>("mytopic", ""+i));
    }
    
    producer.flush();
    producer.close();
    
    
    <Consumer>
	KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
    consumer.subscribe(Arrays.asList("mytopic")); // topic 설정

	while (true) {
		ConsumerRecords<String, String> records = consumer.poll(1000);
		for (ConsumerRecord<String, String> record : records) {
			String input = record.topic();
            if ("mytopic".equals(input)) {
                System.out.println(record.value());
            } else {
                ...
            }
		}	
	}

카프카를 이렇게 쓴다고 한다. 아무튼 중요한 것은 컨슈머와 프로듀서가 서로 모른다는 것이다.

예시1 - Flow API

flow api는 자바에서 리액티브 프로그래밍을 지원하기 위한 Flow 클래스와 그 인터페이스를 말한다.

먼저 Publisher는 subscribe()에서 Subscriber를 리스너로 등록할 수 있고, 내부 구현에 따라 리스너에게 이벤트를 전달할 수 있다.

@FunctionalInterface
public static interface publisher<T> {
	public void subscribe(Subscribe<? super T> subscriber);
}

Subscriber는 Publisher가 이벤트를 발행 시 처리할 수 있도록 메소드 4개를 가지고 있다. 이들은 Publisher의 내부에서 호출되는 콜백 메소드들이다.

public static interface Subscriber<T> {
	public void onSubscribe(Subscription subscription);
	public void onNext(T item);
    pubilc void onError(Throwable throwable);
    public void onComplete();
}

onSubscribe()는 Publisher에 등록될 때 가장 처음 호출되는 메소드이고, 초기 세팅? 을 할 수 있는가보다. 그리고 onNext()가 push받은 데이터를 통해 처리할 작업이 될 것이다. 옵저버에서 update()에 해당되는 듯. onComplete()를 통해 이벤트 스트림이 종료되었음을 알릴 수 있고, 에러가 발생했을 때에는 onError()를 호출할 수 있다.

public static interface Subscribtion {
	public void request(long n);
    public void cancel();
}

아마 이거는 역압을 조절하는 인터페이스인 듯하다. request()로 처리할 수 있는 이벤트에 따라 그 개수를 알릴 수 있고, cancel()로 이벤트를 받지 않음을 알린다.

public static interface<T, R> extends Subscriber<T>, Publisher<R> {

}

이벤트를 변환하는 거라고 하는데, 잘 모르겠다. 쓸 때는 다음과 같이 쓰면 된다. hello flow라는 데이터를 담아 이벤트를 발행하고, 구독자가 그걸 출력한다.

// event source
Flow.Publisher<String> publisher = new Flow.Publisher<String>() {
	@Override
	public void subscribe(Flow.Subscriber<? super String> subscriber) {
		subscriber.onNext("hellow flow 1");
        subscriber.onNext("hellow flow 2");
        subscriber.onNext("hellow flow 3");
		subscriber.onComplete();
	}
};

// event handler
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
	@Override
    public void onSubscribe(Flow.Subscription subscription) {
    
    }
    
    @Override
    public void onNext(String item) {
    	System.out.println("onNext");
        System.out.println(item);
    }
    
    @Override
    public void onError(Throwable throwable) {
    	
    }
    
    @Override
    public void onComplete() {
    	System.out.println("completed");
    }
}

publisher.subscribe(subscriber);
System.out.println("end");

Result : 
    onNext
    hello flow 1
    onNext
    hello flow 2
    onNext
    hello flow 3
    completed
    end
    

동기적으로 작동하기 때문에(단일한 쓰레드) end가 모든 이벤트 처리가 끝난 후에 출력된다. 비동기 처리를 하고 싶으면 아래와 같이 SubmissionPublisher를 활용한다. publisher를 바꾸고 subscriber 구현만 살짝 바꾸면 된다.

publisher = new SubmissionPublisher<>();

Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
	@Override
	public void onSubscribe(Flow.Subscription subscription) {
		System.out.println("OnSubscribe!");
		subscription.request(1);
	}

	@Override
	public void onNext(String item) {
		System.out.println("onNext");
		System.out.println(Thread.currentThread().getName());
		System.out.println(item);
	}
	
    ...
    
};

publisher.subscribe(subscriber);
((SubmissionPublisher)publisher).submit("hello sub 1");
((SubmissionPublisher)publisher).submit("hello sub 2");
System.out.println("end");
Thread.sleep(3000);

Result :
	end
	OnSubscribe!
	onNext
	ForkJoinPool.commonPool-worker-1
	hello sub 1
    

내부 뜯어보니까 아마 request를 할 때, 거기에서 쓰레드를 만들어서 처리하는 것 같다. 메인 쓰레드랑 다른 쓰레드를 돌리니까, 메인 쓰레드가 end를 먼저 출력할 수도 있는 것이다. 아마 아까 동기적 처리에서 currentThread()의 이름을 출력하면 메인 쓰레드가 뜰 것이다.

아까랑 다르게 OnSubscribe!가 출력되었다. 당연하게도 아까는 OnSubscribe를 이벤트 소스에서 실행하지 않았기 때문인데, SubissionPublisher는 내부에서 Subscription의 request와 cancle을 구현한 후, 그것을 인자로 넘겨 OnSubscribe를 실행한다. 그래서 출력되는 것이다.

또 중요한 점은

((SubmissionPublisher)publisher).submit("hello sub 2");

이거는 작동이 안되는 것에 의문이 생길 수 있다. 내가 생각하기론 이게 백프레셔이다. request를 할 때, 1만 넣어서 줬는데 이러면 이벤트 하나만 받아서 처리한다. 2를 넣어주면 다 출력된다. 안에 뜯어보니까, 일단 request 될 때 아마 이벤트 수를 받고, 이벤트 수신 할 때마다 상태를 바꿔가면서 받은 이벤트 수와 어떤 계산을 해서 조건이 안맞으면 다음 이벤트는 수신하지 않는 식인 듯 하다. 비트마스킹인가 싶기도 하다.

아마 여기 if 절 이 그 부분인 듯하다. tryStart()를 하면, task를 만들어서 쓰레드로 돌린다.

ConsumerTask를 잘 보면, Executer의 excute()? 그게 실행될 때 실행되는 run() 메소드에 consumer.consume() 이 있다.

일단 여기 있는 subscriber는 아까 Main에서 publisher.subscribe(subscriber); 여기에 있는 subscriber가 주입된 것이다. 아무튼 이거를 통해서 이벤트를 처리한다. 대표적으로 subscribeOnOpen(s)에서 onSubscribe()가 호출되고, takeItems(...)에서 onNext()가 호출된다.



예시2 - Spring event

스프링 이벤트는 스프링에서 이벤트 기반 프로그래밍(EBA랑 다른 용어인 것 같은데, 아직 잘 모르니까 혼용할 수도 있다.)을 지원하는 방식이다.

스프링 이벤트를 쓰면, 옵저버 패턴에서 설계를 계속 {이벤트 확인 -> 이벤트 발생 -> 처리}의 설계를 {이벤트 발생 -> 이벤트 발행 -> 이벤트 처리} 처럼 이벤트에 의해 제어 흐름이 변화되는 식으로 바뀌겠지 싶다.

아래 그림이 스프링 이벤트의 동작을 단적으로 잘 보여주는 예시이다

이러면, 서비스 끼리 굉장히 강하게 결합하게 된다. 지금이야 한쪽이 여러 곳에 요청을 하는 1:N관계이지만, N:N이 될 경우 이것보다 훨씬 복잡해질 것이다.

설계를 바꾸기 위해 일단, 한 곳에서 여러 서비스에 요청하는 부분을 보자. 예를 들어 저 사진에서 UserService가 AdminService, CouponService, EmailService에 의존하고 있다. 여러 곳에 의존하는 것은 좋지 않으므로, 공통 인터페이스를 추출하고 그것을 구현하도록 한다.

이때 인터페이스 이름을 ApplicationListener이라고 하고, 아까 서비스들의 이름을 AdminEventListener, CouponEventListener, EmailEventListener라고 하자. 옵저버 패턴이라고 볼 수도 있는데, 설계는 이렇게 바뀐다.

수신사의 확장성이 좋아지기는 했지만, 아직 발신자의 변화에 취약하다. 가령 다른 발신자인 NewUserService가 있다고 하자. 그러면 수신자들은 if문과 instanceof를 쓰든 해서 요청을 구분해야 한다. 즉, 서비스가 서로 의존적이게 된다.

이런 식이다. 근데 생각해보면, 수신자들이 발신자에 의존하게 된 이유는 요청(이벤트) 때문이다. 이벤트를 구분하기 위해 발신자를 알아야 하는 것이다. 그렇다면, 발신자로부터 요청(이벤트)를 분리한다. 당연히 인터페이스로 분리한다. 그러면 발신자들도 이벤트를 받아 전달해야 하므로, 이벤트에 의존하게 된다.

아직 발신자가 수신자의 인터페이스에 의존하고 있는 상태이다. 여기서 해볼 수 있는 고민은 pubilsh(Event e) 부분이다. 이벤트를 발행하는 부분이 서비스에 담겨 있는 것이 과연 맞는 것일까? 아무래도 따로 Publisher라는 클래스로 빼내는 것이 좋아보인다.

수신자와 발신자의 결합은 분리했다지만, 아직 걸리는 부분이 있다. 수신자에서 전달된 이벤트가 무엇인지 구분하기 위해서 여러 조건문 :

public void update(ApplicationEvent e) {	
	if (e instanceof ConcreteEvent1) {
    	...
    }
    elif if (e instanceof ConcreteEvent2) {
    	...
    }
    elif if (e instanceof ConcreteEvent3) {
    	...
    }
    ...
    else {
    	...
    }
}

난 이거 좀 불편하다. 중재자 패턴에서도 이런 로직을 중재자에게 할당했던 거 같은데, 그것처럼 따로 클래스를 분리해서, 이벤트를 다 뿌리고 수신자가 알아서 구분하는 것이 아닌, 이벤트별로 그것을 원하는 수신자에게 각각 뿌리도록 하는 것이 좋아보인다. 아마 이벤트를 받아 그것을 수신하길 원하는 수신자에게 뿌려주는 클래스가 필요하겠다. 허술하긴 하지만, 다음과 같은 아이디어를 떠올려보았다.

첫째, 리스너에서 이벤트 핸들러마다 수신하고자 하는 이벤트를 어노테이션이든 매개변수로든 넣는다.

public class AdminServiceEventListener {
	public void onApplicationEvent(UserAdminEvent event) {
    	log.info("어드민 서비스: {}님이 회원으로 등록되었습니다." event.getUsername());
	}
}

둘째, 뿌려주는 클래스에서 어플리케이션 로딩할 때, 클래스 다 리플렉션으로 읽어서 이벤트를 키로 하고, 그것을 수신하길 원하는 메소드? 들을 리스트에 담는다. 가령 저기 AdminServiceEventListener의 onApplicationEvent는 매개변수가 UserAdminEvent니까 그 이벤트를 키로 하는 리스트에 넣게 된다. 굳이 매개변수가 아니라 어노테이션이어도 될 것 같다.

<뿌리기 클래스>
Map<Event, List<수신자핸들러?>> map = new HashMap<>();

셋째, 퍼블리셔에서 뿌리기 클래스에 보낼 메시지를 정하자. 이 메시지는 Event를 받아서 그것에 해당되는 리스너에게 notify() 메시지를 전달한다.

<뿌리기 클래스>
public void notifyListeners(ApplicationEvent e) {
	// 받은 이벤트에 해당하는 수신자 리스트 반환
	List<수신자핸들러?> list = map.get(e의 클래스 이름?);
    for(Listener l : list) {
    	l.notify(e);
    }
}

<Publisher>
public void publish(ApplicationEvent e) {
	뿌리기클래스객체.notifyListeners(e);
}

이때, 뿌리기 클래스를 ApplicationContext라고 해보자.

이벤트만 똑 떼놓고 보자. 서비스와 리스너가 느슨한 결합도 아니고, 그냥 완전히 떨어져버렸다. 그리고 밑에 있는 스프링 이벤트의 설계도와 거의 똑같은 것을 확인할 수 있다. 물론 다른 의도가 있을 수 있지만, 내가 생각하기에 스프링 이벤트가 저런 디자인인 이유는 이상의 과정과 같다.

쓸 때에는 전달할 이벤트를 이렇게 만들고

// 유저의 회원가입 이벤트
public class RegisterEvent extends ApplicationEvent {
	private String name;
    
    public RegisterEvent(Object source, String name) {
    	super(source);
        this.name = name;
    }
    
    public String getName() {
    	return name;
    }
}

서비스에서 퍼블리셔를 주입받아서 이벤트를 만들고 발행한다. UserService에서는 유저의 회원가입 시 레포지토리에 넣고 이벤트를 발행한다. 그러면 여러 리스너들에서 이벤트를 처리하겠디.

@Service
public class UserService {
	private final ApplicationEventPublisher publisher;
    private fianl UserRepository userRepository;
    
    public void register(Request request) {
		User user = new User(
        	request.getName(),
            request.getEmail(),
            request.getPhoneNumber()
        );
        userRepository.save(user);
        publisher.publishEvent(new RegisterEvent(this, user.getName()));
    }
}

리스너는 송신 서비스를 모른채 그냥 이벤트만 지정해서 받는다.

@Component
@Slf4j
public class UserServiceEventListener implements ApplicationListener<UserAdminEvent> {
	@Override
	public void onApplicationEvent(RegisterEvent event) {
    	log.info("회원가입 : {}님이 회원으로 등록되었습니다.", event.getUsername());
	}
}

onApplicationEvent가 마치 notify()나 update()메소드 같은 느낌. 퍼블리셔가 이벤트를 발행하면, ApplicationContext가 빈으로 등록된 이 리스너의 onApplicationEvent를 호출하는 듯. 이렇게 어노테이션만 쓸 수도 있다. 또, Async붙이면 알아서 비동기로 처리해줌()

@Component
@Slf4j
public class UserServiceEventListener {
	@EventListener
    @Async
	public void onApplicationEvent(RegisterEvent event) {
    	log.info("회원가입 : {}님이 회원으로 등록되었습니다.", event.getUsername());
	}
}

아무튼 스프링 이벤트는 옵저버 패턴에서 존재하던 인터페이스 결합마저도 끊어서 서비스 간 결합을 아예 없애버린 구조라 볼 수 있겠다.

profile
한양대학교 정보시스템학과 22학번 이혁진 입니다

0개의 댓글