
어제에 이어 리액티브 프로그래밍을 배워본다.
디자인 패턴 중 하나인 옵저버 패턴을 통해 프로그래밍하는 패러다임이다.
기존의 절차지향, 객체지향 프로그래밍은 명령형(Imperative) 프로그래밍에 해당한다. 반응형은 다른 방식으로 동작한다.
작업 대기열(Buffer)에 작업을 추가하면서, 조건을 만족 시 작업을 수행하는 방식의 프로그래밍 패러다임인 리액티브 프로그래밍을 유니티에서 쓸 수 있게 하는 리액티브 라이브러리다.
쉽게 말해서 "~하면 ~한다" 라는 식의 프로그래밍이다.
리액티브 프로그래밍은 일반적인 모바일 게임들에서 사용자의 입력을 기점으로 로직이 수행되는 게임들에 적합한 프로그래밍 패러다임이다.
Imperative Programming
Reactive Programming
Reactive 프로그래밍에서 사용되는 개념들을 알아보자.
이벤트를 발생시키는 주체인 Subject와 Subject를 관찰하며, 이벤트를 구독하는 Subscriber로 이루어진 디자인 패턴이다. 정확한 내용은 Observer Pattern 포스트를 참고하자.

유니티에서 리액티브 프로그래밍을 더 쉽게 해주는 라이브러리다.
더블 클릭을 구현한다고 해보자. 기존의 명령형 코드로 구현하면 아래와 같다.
private Coroutine doubleClickRoutine;
private bool isActiveCoroutine;
private void Update()
{
if(Input.GetMouseButtonDown(0))
{
if(!isActiveCoroutine)
{
doubleClickRoutine = StartCoroutine(DoubleClickCoroutine());
}
}
}
private IEnumerator DoubleClickCoroutine()
{
isActiveCoroutine = true;
int clickCount = 1;
float timeLimit = 0.3f;
while(0 < timeLimit && clickCount < 2)
{
yield return null;
if(Input.GetMouseButtonDown(0))
{
clickCount++;
}
timeLimit -= Time.deltaTime;
}
if(clickCount == 2)
{
Debug.Log("더블 클릭");
}
isActiveCoroutine = false;
yield break;
}
이 코드를 UniRx 형식으로 쓰자면 아래와 같다.
private void Awake()
{
var updateStream = this.UpdateAsObservable()
.Where(_ => Input.GetMouseButtonDown(0));
updateStream
.Buffer(updateStream.Throttle(TimeSpan.FromMilliseconds(300)))
.Where(x => x.Count == 2)
.Subscribe(x => Debug.Log("더블 클릭"));
}
코드가 매우 짧아지고, 가독성이 좋아졌다.
gameObject 이후 Observable을 써보면, 모든 Unity Event함수에 해당하는 Observable이 있다는 것을 알 수 있다.
Subscribe가 수행되는 내용을 구독한다.
// 점프
gameObject.UpdateAsObservable() // 관찰자
.Where(x => Input.GetKeyDown(KeyCode.Space)) // 조건 1
.Where(x => isGround == true) // 조건 2
.Subscribe(x => rigid.AddForce(Vector3.up * 10, ForceMode.Impulse)); // 수행 내용 구독
// 이동
gameObject.UpdateAsObservable()
.Where(x => Input.GetAxis("Horizontal") != 0)
.Subscribe(x => transform.Translate(Vector3.right * Input.GetAxis("Horizontal") * Time.deltaTime));
// 땅 판정
gameObject.OnCollisionEnter2DAsObservable()
.Where(collision => collision.collider.tag == "Ground")
.Subscribe(x => isGround = true);
// 땅 떨어짐
gameObject.OnCollisionExit2DAsObservable()
.Where(collision => collision.collider.tag == "Ground")
.Subscribe(x => isGround = false);
위와 같이 각 기능들을 분리하기가 쉽다.
UI 요소들 또한 많은 Observable이 있다.
button.OnPointerEnterAsObservable() // 포인터 들어왔을 시
이러한 코딩 방식은 MVP 패턴을 적용하기 수월해진다.
먼저, Model 스크립트
// MVPModel
public ReactiveProperty<int> Count; // 옵저버 패턴을 적용 시킬 수 있는 프로퍼티
private void Update()
{
if(Input.GetKeyDown(KeyCode.Space)
{
Count++;
}
}
이어서 Presenter 스크립트 작성.
//MVPTester
public MVPModel model;
public void Awake()
{
model.Count
.Where(value => value % 2 == 0)
.Subscribe(value => textUI.text = value.ToString());
}
ReactiveProperty는 Subscribe가 되어 있는 대상에게, 초기값을 먼저 세팅해준다. 즉, UI를 연결하는 코드를 작성할 필요가 없다.
옵저버 패턴에서 중요한 점은 3가지다.
1. 옵저버가 주체를 구독하는 형태로 구현
2. 주체가 구독중인 옵저버들에게 알림을 전송
3. 주체는 본인을 구독중인 옵저버들을 모름.
// 플레이어 스탯 스크립트
public Subject<int> ScoreSubject { get; private set; } = new Subject<int>();
private int score;
public int Score
{
get => score;
private set
{
score = value;
ScoreSubect.OnNext(score); // 알림전송
}
}
private void Update()
{
if(Input.GetMouseButtonDown(0))
{
Score += 1;
}
}
// 플레이어 뷰어 스크립트
[SerializeField] private PlayerStat stat;
private void Awake()
{
stat.ScoreSubject // ScoreSubject를 대상으로
.Subscribe(x => Debug.Log($"점수: {x}")) // 구독한다. x는 입력받은 인자다.
.AddTo(this); // 뷰어 객체가 파괴될 시, 구독 해지.
}
다양한 종류의 이벤트 스트림을 생성하고, 통신하는 데 사용되는 다목적 객체이다. 수동으로 이벤트를 발행(OnNext())해야 한다.
여러 Observable(관찰 대상)에게 이벤트를 전달하는 중개자 역할을 함.
이벤트 메세지를 직접 발행해야 하므로, 제어를 신경써야 한다.
Subject 객체를 생성 후, OnNext() 메소드를 통해 이벤트를 수동으로 발행한다.
게임 내 이벤트 발생 시점이나 특정 조건이 충족 되었을 때, 메세지를 여러 곳에 발송해야 하는 상황에서 유용하다.
UniRx의 Subject의 한 종류다.
특정 변수의 값이 변경될 때마다 자동으로 이벤트를 발행한다.
변수의 값을 변경하는 것만으로도 스트림에 이벤트가 발행되므로, UI요소의 상태 변화와 같은 것들의 관리에 용이하다.
ReactiveProperty<T>로 변수를 선언한다. 해당 변수의 값이 변경되면, 자동으로 스트림에 이벤트가 발행된다.
UI 요소의 상태 변화를 감지하고, 이를 스트림을 통해 다른 곳으로 전달하거나, 게임 오브젝트의 상태(체력, 점수 등)을 실시간으로 UI에 반영할 때 주로 사용된다.
private ReactiveProperty<int> data1 = new ReactiveProperty<int>();
[field: SerializeField] private FloatReactiveProperty data2 { get; private set; } = new FloatReactiveProperty();
public ReadOnlyReactiveProperty<double> data3;
플레이어의 능력치 변화에 따라 UI에 반영하는 코드를 작성해본다.
// 플레이어 스탯 스크립트
// 생성자를 통해 초기값을 지정 가능
private ReactiveProperty<int> _hp = new ReactiveProperty<int>(100);
public ReadOnlyReactiveProperty<int> Hp { get; private set; }
// 어트리뷰트를 통해 인스펙터에서 초기값을 지정할 수 있다
[SerializeField] private FloatReactiveProperty _mp = new FloatReactiveProperty(50);
public ReadOnlyReactiveProperty<float> Mp { get; private set; }
private void Awake()
{
Init();
}
private void Init()
{
// 외부에서는 Readonly로만 hp를 사용할 수 있게 한다.
// 내부에서 값을 변경할 때는 hp의 값을 변경하면 된다.
Hp = _hp.ToReadOnlyReactiveProperty();
Mp = _mp.ToReadOnlyReactiveProperty();
}
private void Update()
{
if(Input.GetKeyDown(KeyCode.Alpha1))
{
_hp.Value -= 1;
}
if(Input.GetKey(KeyCode.Alpha2))
{
_mp.Value -= 0.01f;
}
}
ToReactiveProperty
IObservable<T>를ReactiveProperty<T>로 변환한다.
ToReadonlyReactiveProperty
IObservable<T>를Readonly형식의ReactiveProperty<T>로 변환한다. 캡슐화와 데이터보호에 유리한 기법으로 사용된다. 외부에는 값만 공개하고, 내부에서는 수정이 가능하다.
// 플레이어 스탯 UI 스크립트
[field: SerializeField] public PlayerStat Stat { get; private set; }
[field: SerializeField] public TextMeshProUGUI Hp { get; private set; }
[field: SerializeField] public TextMeshProUGUI Mp { get; private set; }
private void Start()
{
Init();
}
private void Init()
{
Stat.Hp
.Where(x => x >= 0) // 조건식을 추가할 수 있다
.Subscribe(x => RefreshText(Hp, $"HP : {x}"))
.AddTo(this);
Stat.Mp
.Where(x => x >= 0)
.Subscribe(x => RefreshText(Mp, $"MP : {x}"))
.AddTo(this);
}
private void RefreshText(TextMeshProUGUI target, string text)
{
target.text = text;
}
MonoBehaviour의 콜백들인 Update(), OnTriggerEnter(), OnCollisionStay() 등을 Reactive Extensions 스타일(Observable)로 변환해주는 모듈이다.
즉, 기존 Unity 이벤트 함수를 오버라이드하거나 직접 호출하지 않고, 스트림 기반으로 이벤트를 구독할 수 있도록 만들어주는 것이다. 모든 Unity 이벤트 함수는 Observable이 될 수 있다.
UpdateAsObservable()
LateUpdateAsObservable()
FixedUpdateAsObservable()
2D용도 동일하다.
OnTriggerEnterAsObservable()
OnTriggerExitAsObservable()
OnCollisionEnterAsObservable()
OnCollisionExitAsObservable() 등등
OnEnableAsObservable()
OnDisableAsObservable()
OnDestroyAsObservable() 등등
OnMouseDownAsObservable()
OnMouseEnterAsObservable()
OnMouseExitAsObservable()
OnPointerEnterAsObservable() 등등
// 마우스 더블 클릭 스크립트
private void Init()
{
// ClickOnUpdateStream 변수는
var ClickOnUpdateStream =
// 매 프레임 체크되는 스트림(작업흐름) 중에서
this.UpdateAsObservable()
// 마우스 좌클릭이 일어난 스트림만 모아놓은 것이다.
.Where(_ => Input.GetMouseButtonDown(0));
// ClickOnUpdateStream 변수(마우스 좌클릭이 일어난 스트림)를 기준으로
ClickOnUpdateStream
// ClickOnUpdateStream(클릭이 일어난 시점)부터 0.5초동안 유지되는 조건(Throttle)으로
// (클릭)이벤트를 모으는 Buffer(일시적인 영역)를 생성
.Buffer(ClickOnUpdateStream.Throttle(TimeSpan.FromMilliseconds(500)))
// Buffer가 가진 이벤트 횟수가 2번일 때만
.Where(x => x.Count == 2)
// 로그를 출력하는 로직을 구독시킨다.
.Subscribe(x => Debug.Log($"더블 클릭"))
// 이 객체가 파괴되면 자동으로 구독 해지시킨다.
.AddTo(this);
}
Throttle 함수는 밑에서 확인하자
// 버튼 클릭 스크립트
private void Init()
{
// 버튼의 클릭을 관찰하는 스트림 변수
var normalStream = _normalButton.OnClickAsObservable();
// ---버튼이 눌릴때마다 로그를 출력하는 기능
// normalStream(버튼이 클릭된 스트림)에서
normalStream
// 로그를 출력하는 로직을 구독
.Subscribe(_ => Debug.Log("Normal Button 클릭"))
// 객체 파괴시 구독 해지
.AddTo(this);
// ---버튼이 5회째 눌릴때마다 로그를 출력하는 기능
// NormalStream(버튼이 클릭된 스트림)에서
normalStream
// 버튼의 클릭을 4회 무시한다(아무것도 하지 않는다) => 5번째 클릭부터 반응한다.
.Skip(4)
// 최초로 반응이 발생할 때(위 코드에서는 5회째에서)만 반응한다(이후엔 반응하지 않는다)
.First()
// 이 스트림을 반복해서 사용한다(5회째 클릭마다 반응한다)
.Repeat()
// 로그를 출력하는 로직을 구독
.Subscribe(_ => Debug.Log("5회 눌렸음"))
// 객체 파괴시 구독 해지
.AddTo(this);
// ---더블클릭 버튼이 최초 클릭부터 0.3초 안에 더블클릭 될 경우 로그를 출력하는 기능
// 버튼의 클릭을 관찰하는 스트림 변수
var doubleClickStream = _doubleClickButton.OnClickAsObservable();
// doubleClickStream 변수(버튼 클릭이 일어난 스트림)를 기준으로
doubleClickStream
// 0.3초간 유지되는 조건(Throttle)으로 버튼이 클릭되는 이벤트를 모으는 Buffer 생성
.Buffer(doubleClickStream.Throttle(TimeSpan.FromMilliseconds(300)))
// 버튼 클릭 수가 2회일 ㄹ경우에만
.Where(x => x.Count == 2)
// 로그를 출력하는 로직을 구독
.Subscribe(x => Debug.Log("더블 클릭"))
// 객체 파괴시 구독 해지
.AddTo(this);
}
UniRx에서 다뤄지는 메서드들을 알아보자
| 오퍼레이터 | 내용 |
|---|---|
| Create | Observable을 직접 생성하고, 이벤트를 내보낼 때 사용한다 |
| Return | 구독자가 구독할 때 하나의 메시지만 전달한다 |
| Repeat | 메시지를 반복해서 전달한다 |
| Range | 특정 범위의 수치의 메시지를 전달한다 |
| Timer | 설정 시간 경과 후 한번 이벤트를 내보낸다. |
| Interval | 설정 시간마다 이벤트를 내보낸다 |
| Empty | 값을 전달하지 않고 이벤트를 완료한다(OnCompleted) |
| FromEvent | Unity Event를 Observable로 변환하고, 이벤트가 발생할 때마다 값을 전달한다. |
| EveryUpdate | 매 프레임마다 이벤트를 내보낸다. |
| FixedEveryUpdate | 고정 프레임마다 이벤트를 내보낸다. |
| 오퍼레이터 | 내용 |
|---|---|
| Where | 조건식이 true일 때만 통과시킨다 |
| Distinct | 중복된 메세지들을 제거한다(중복되지 않은 값만 통과시킨다) |
| DistinctUntilChanged | 값의 변화가 있을 때만 통과시킨다. |
| Throttle | 지정한 간격(시간) 내에 들어온 메시지 중 마지막 메시지만 통과시킨다 |
| ThrottleFrame | 지정한 간격(프레임)내에 들어온 메시지 중 마지막 메시지만 통과시킨다 |
| First | 가장 먼저 발생한 메시지만 전달하고 Observable을 완료한다. (별도의 오퍼레이터가 없다면 한 번만 동작한다) |
| FirstOrDefault | 가장 먼저 발생한 메시지만 전달하거나 값이 없다면 기본값을 전달한다 |
| Take | 지정한 수 만큼의 메시지만 전달한다 |
| TakeUntil | 인자로 입력받은 Observable에게서 메시지를 받을때까지 지속적으로 메시지를 전달한다 |
| TakeWhile | 조건이 true인 경우 지속적으로 메시지를 전달한다 |
| Skip | 지정한 수 만큼 메시지를 스킵한다(무시한다) |
| SkipUntil | 인자로 입력받은 Observable에게서 메시지를 받을때까지 메시지를 스킵한다 |
| SkipWhile | 조건이 true인 경우 지속적으로 메시지를 스킵한다 |
| 오퍼레이터 | 내용 |
|---|---|
| Amb | 여러 Observable 중에서 가장 먼저 메시지를 전달한 Observable의 데이터를 선택해 전달한다 |
| Zip | 각 Observable에서 1개씩 메시지를 뽑아서 합성한다 |
| ZipLatest | 각 Observable에서 가장 최신의 1개의 메시지를 뽑아서 합성한다 |
| CombineLatest | 각 Observable에서 가장 최신의 1개의 메시지를 합성하고, 다음 메시지를 받기 전까지 재활용한다 |
| WithLatestFrom | 2개의 Observable중 하나를 메인으로 지정하고, 메인에서 새 메시지를 받으면 다른 Observable의 최신 메시지와 함께 전달한다 |
| Merge | 복수의 Observable을 하나로 합성한다 |
| Concat | 지정한 Observable에서 OnCompleted가 발생하면(완료되면) 다른 Observable로 대체한다 |
| SelectMany | 원본 Observable에 대해 새로운 Observable을 생성하고, 생성된 Observable들의 값을 하나의 Observable로 합성한다 |
| 오퍼레이터 | 내용 |
|---|---|
| ToReactiveProperty | Observable을 ReactiveProperty로 변환한다 |
| ToReadOnlyReactiveProperty | Observable을 ReadOnlyReactiveProperty(읽기 전용, 값 수정 불가)로 변환한다 |
| ToYieldInstruction | Observable을 코루틴으로 변환하고, 코루틴 내에서 대기하며 처리할 수 있도록 한다 |
| Select | 현재 Observable이 처리하는 메시지를 다른 Observable로 변환할 수 있다 |
| Cast<T> | 메시지의 형변환에 사용한다. 형 변환이 불가능할 경우 예외가 발생한다 |
| TimeInterval | 이전 메시지와 최신 메시지까지의 시간 차를 전달한다. |
| TimeStamp | 메시지가 발생한 시점을 기록하고 메시지로 전달한다. |
| AsUnitObservable | 메시지를 Unit형으로 변환한다. 메시지가 전달되는 사실 자체에만 초점을 둘 경우 사용할 수 있다. |
| 오퍼레이터 | 내용 |
|---|---|
| Scan | 현재 메시지의 값과 이전 메시지 결과를 이용해 누적 연산 처리할 수 있다 |
| Buffer | 메시지를 지정한 수만큼 모아서 전달한다 |
| 오퍼레이터 | 내용 |
|---|---|
| Observable.Catch | 복수의 Observable에서 메시지가 성공적으로 전달될 때까지 차례로 실행한다(에러 처리 용도) |