이전 시간에는 RxKotlin에 대한 개념과 Observable을 생성하는 메서드를 확인해봤습니다. Observable을 생성하여 데이터를 발행할 때, 발행되는 아이템을 변환하여 다른 아이템으로 변경할 수 있습니다. 이번 글에서는 Observable을 변형하는 연산자에 대해 알아보도록 하겠습니다.
map 연산자는 발행되는 아이템을 변환하는 가장 기본적인 방법이자 가장 많이 사용하는 연산자입니다.
발행되는 값에 대해 원하는 수식을 적용하거나 다른 타입으로 변환시킬 수 있습니다. 즉, 원하는 형태로 데이터를 가공하는 것입니다.
마블 다이어그램을 보면 각 원 모양의 데이터에 마름모로 만들어주는 함수를 적용하여 마름모 데이터로 가공하는 모습을 확인할 수 있습니다.
val intObservable = Observable.just(1, 2, 3)
val multiplyIntObservable = intObservable.map {
it * 10
}
multiplyIntObservable.subscribe {
println(it)
}
/*
결과
10
20
30
*/
flatMap도 이름에 map이 들어가 있으므로 기본적으로 데이터를 다른 형태로 가공하는 메서드입니다.
flatMap 연산자는 제공한 함수를 Observable에서 제공하는 각 아이템에 적용한 후 그대로 return하지 않고, Observable들로 return한 다음, 이것들을 병합해서 다시 Observable로 return 합니다. 다만, flatMap은 아이템 순서를 전혀 신경쓰지 않습니다.
마블 다이어그램을 보면 빨간색 원을 넣으면 빨간 마름모 2개가 나왔습니다. 즉, flatMap의 결과 값이 Observable이므로 여러 개의 데이터를 발행하는 모습을 볼 수 있습니다.
val strObservable = Observable.just("a", "b", "c")
strObservable.flatMap { str ->
Observable.just(str + 1, str + 2)
}.subscribe {
println(it)
}
/*
결과
a1
a2
b1
b2
c1
c2
*/
Observable.range(2, 8)
.flatMap { x ->
Observable.range(1, 9)
.map { y -> String.format("%d * %d = %d", x, y, x * y) }
}.subscribe { println(it) }
/*
결과
2 * 1 = 2
2 * 2 = 2
...
9 * 8 = 72
9 * 9 = 81
*/
switchMap도 이름에 map이 들어가 있으므로 기본적으로 데이터를 다른 형태로 가공하는 메서드입니다.
switchMap 연산자도 flatMap 연산자와 같이 연산자에 제공한 함수를 Observable에서 제공하는 각 아이템에 적용한 후 그대로 return하지 않고, Observable들로 return한 다음, 이것들을 병합해서 다시 Observable로 return 합니다. 다만, 가장 최근에 발행된 아이템을 방출하여 반환합니다. 즉, 새로운 데이터가 들어올 경우 이전 동작을 멈추고 현재 들어온 작업을 수행하는 것입니다.
해당 연산자를 사용하면 검색 기능을 구현하기 좋습니다. 사용자가 검색어를 자음, 모음, 알파벳등 조합하여 입력을 하는데 실시간 결과를 조회해야 하기 때문에 아주 짧은 시간마다 쿼리를 해야 합니다. 그럴때마다 아직 응답을 받지 못한 데이터를 무시하고 최근에 쿼리 요청한 검색어만 처리를 해야 하기 때문에 이 연산자를 사용하면 유용하게 대처할 수 있습니다.
마블 다이어그램을 보면 초록색원을 넣고 주어진 함수에 따라서 초록색 마름모와 초록색 사각형이 발행되어야 하는데, 초록색 마름모가 발행된 후 파란색원이 들어와서 가장 최근에 발행된 아이템으로 변경하여 파란색 마름모와 파란색 사각형이 발행되는 모습을 확인할 수 있습니다.
val observable = Observable.just("a", "b", "c")
observable.switchMap { str ->
val delay = Random().nextInt(2)
Observable.just("${str}1", "${str}2")
.delay(delay.toLong(), TimeUnit.SECONDS)
}
.toList()
.subscribe { result ->
println("Result : $result")
}
Thread.sleep(7000)
/*
[a1, a2, c1, c2]
*/
concatMap도 이름에 map이 들어가 있으므로 기본적으로 데이터를 다른 형태로 가공하는 메서드입니다.
concatMap 연산자도 flatMap 연산자와 같이 연산자에 제공한 함수를 Observable에서 제공하는 각 아이템에 적용한 후 그대로 return하지 않고, Observable들로 return한 다음, 이것들을 병합해서 다시 Observable로 return 합니다. 다만, 처음에 발행된 아이템의 순서를 유지하고 있습니다.
마블 다이어그램을 보면 아이템이 발행된 순서대로 인자로 주어진 함수를 적용하여 발행된 것을 확인할 수 있습니다.
val observable = Observable.just("a", "b", "c")
observable.concatMap { str ->
val delay = Random().nextInt(2)
Observable.just("${str}1", "${str}2")
.delay(delay.toLong(), TimeUnit.SECONDS)
}
.toList()
.subscribe { result ->
println("Result : $result")
}
Thread.sleep(7000)
/*
[a1, a2, b1, b2, c1, c2]
*/
buffer는 변형 연산자로 넣기는 했지만 흐름제어를 하는 연산자입니다. ReactiveX에서 흐름제어는 Observable이 데이터를 발행하는 속도와 Observer가 데이터를 받아서 처리하는 속도 사이의 차이가 발생할 때 사용하는 함수입니다. 예를 들면, 네트워크를 통해서 값을 받고 이를 그래프로 그려주는 기능을 생각해보겠습니다. 네트워크를 통해 값을 받는 속도와 그래프를 갱신하는 속도에 있어서 차이가 발생할 수 있습니다. 이럴 때, 흐름제어를 사용해서 속도의 차이를 처리하는 것입니다.
buffer() 함수는 일정 시간 동안 데이터를 모아두었다가 한꺼번에 방출합니다. 즉, Observable이 발행하는 아이템을 묶어서 List로 발행합니다. 따라서 넘쳐나는 데이터 흐름을 제어하면서 모든 데이터를 받아야 하는 경우 활용할 수 있습니다.
에러를 발행하는 경우 이미 발행된 아이템들이 버퍼에 포함되더라도 버퍼를 발행하지 않고 에러를 즉시 전달합니다.
아래의 예제에서는 buffer(count: Int) 함수를 사용해서 인자로 주어진 count 값만큼의 아이템을 모아서 발행하도록 하였습니다. buffer 연산자는 이외에도 여러 타입의 값을 받을 수 있도록 오버로딩이 되어 있습니다.
마블 다이어크램을 보면 buffer의 인자로 3을 주었습니다. 따라서 Observable이 데이터를 발행하면 모아 두었다가 해당 개수가 3개가 되면 이름 Observer에게 전달하는 모습을 확인할 수 있습니다.
Observable.range(0, 10)
.buffer(3)
.subscribe { integers ->
println("버퍼 아이템 발행")
integers.forEach {
println("#$it")
}
}
/*
결과
버퍼 아이템 발행
#0
#1
#2
버퍼 아이템 발행
#3
#4
#5
버퍼 아이템 발행
#6
#7
#8
버퍼 아이템 발행
#9
*/
scan 연산자는 순차적으로 발행되는 아이템들의 연산을 다음 아이템 발행의 첫 번째 인자로 전달합니다.
계속해서 아이템들이 누적되기에 scan 연산자를 누산기(accumulator)라고 부르기도 합니다.
마블 다이어그램을 보면 첫 번째 아이템인 별이 첫 번째 결과로 나오고, 해당 값이 두 번째 아이템인 빨간색 원에 들어가서 두 번째 결과로 나옵니다. 이러한 과정이 반복적으로 이루어져서 결국 모든 아이템이 누적된 결과를 확인할 수 있습니다.
Observable.range(1, 5)
.scan { x, y ->
print(String.format("%d + %d = ", x, y))
x + y
}.subscribe {
println(it)
}
/*
결과
1
1 + 2 = 3
3 + 3 = 6
6 + 4 = 10
10 + 5 = 15
*/
Observable.just("a", "b", "c", "d", "e")
.scan { x, y -> x + y }
.subscribe {
println(it)
}
/*
결과
a
ab
abc
abcd
abcde
*/
아이템들을 특정 그룹화된 GroupObservable로 재정의할 수 있습니다. 즉, 단일 Observable을 여러 개로 이루어진 Observable 그룹으로 만드는 기능을 수행합니다.
마블 다이어그램에서 원 모양의 아이템들을 하나의 GroupObservable로, 삼각형 모양의 아이템들을 하나의 GroupObservable로 묶는 모습을 확인할 수 있습니다.
Observable.just(
"Magenta Circle",
"Cyan Circle",
"Yellow Triangle",
"Yellow Circle",
"Magenta Triangle",
"Cyan Triangle"
).groupBy { item ->
if(item.contains("Circle")) {
"Circle"
} else if(item.contains("Triangle")) {
"Triangle"
} else {
"None"
}
}.subscribe { group ->
group.subscribe { shape ->
println("${group.key} : $shape")
}
}
/*
결과
Circle : Magenta Circle
Circle : Cyan Circle
Triangle : Yellow Triangle
Circle : Yellow Circle
Triangle : Magenta Triangle
Triangle : Cyan Triangle
*/
참조 및 참고
틀린 부분은 댓글로 남겨주시면 바로 수정하겠습니다..!!
2022-09-23
에 작성되었습니다.
아키텍처를 알아야 앱 개발이 보인다.
RxJava Docs