A: "Flow에서 back pressure현상이 일어났을 때 어떻게 처리해야하나요?"
나: "...그게 뭐에요...?"
어디선가 이런 질문을 받았는데, back pressure라는 키워드를 들어 본 적도 없어서 대답하지 못했다.
질문 해주셨던 분이 대략적으로 설명을 해 주셔서 그나마 나의 생각을 말했는데...
긴장해서 잘 기억은 안나지만, "SharedFlow에 버퍼 있으니 버퍼를 이용해서 오래된 것들을 버리면 되지 않을까요...?"라는 애매한 대답을 한 것 같다. (근데 또 대충 맞았던 것도 신기하다.)
Back Pressure에 대한 트러블 경험이 없어서 오히려 아무 생각 없이 개발했다는 생각이 든다.
이번 글에서는 데이터의 발행 속도가 소비 속도보다 빠를 때 생기는 문제인 Back Pressure에 대해 알아보자!
Back Pressure는 데이터의 소비 속도가 공급 속도보다 느릴 때 일어나는 현상이다. 데이터가 소비되지 않고 계속 뒤에서 쌓이게 되면 데이터가 처리되지 않아 OutOfMemory가 일어날 수 있다.
아래의 예시 코드를 보자.
val backPressureFlow = flow {
while(true) {
delay(100L) // 100ms 간격으로 방출
emit("emit data")
}
}
launch {
backPressureFlow.collect {
delay(1000L)// 1000ms 간격으로 소비
println(it)
}
}
먼저 데이터의 발행을 담당하는 backPressureFlow는 100ms 주기로 무한히 String데이터를 발행하고 있다.
Android App으로 예시로 들자면 앱을 켜놓는 동안에 주기적으로 데이터를 방출하거나 갱신시키는 정도로 생각할 수 있다.
이는 평소에도 잘 일어날 수 있는 시나리오인 "앱 오래 켜두기"로 볼 수 있다.
데이터를 소비하는 쪽은 collect로 소비를 하고 있다.
그리고 소비의 주기는 1000ms간격이다. 1초에 한 번씩 처리를 하고 있는 셈이다.
이는 데이터의 발행보다 10배는 느린 상황이다.
Android App을 예시로 보자면, 데이터를 통해 화면을 그리는 행위에 해당할 수 있다.
보통 데이터를 생성해 발행하는 것 보다 데이터로 화면을 그리는 행위가 더 오래걸릴 것으로 예상한다.
문제점은 데이터의 발행보다 데이터의 소비가 10배는 느리다는 것이다.
데이터를 발행하는 쪽에서는 0.1초 마다 데이터를 발행하지만, 소비하는 쪽은 1초에 하나씩 소비하기 때문에
1초 간격으로 9개의 데이터가 쌓이게 된다.
이런 상황에서 앱을 오래 켜두게 되면 1초에 9개씩 계속 데이터가 쌓이게 때문에 언젠가 OOM(OutOfMemory) Exception으로 앱이 비정상 종료될 수 있다.
이 문제의 해결 방법은 매우 가까이에 있었다. 그리고 다들 한번씩은 사용해보았을 것이다.
여기서 키워드는 다량의 데이터를 잘못 소비하면 OOM이 발생한다는 점인데, 이와 똑같은 문제를 해결하기 위해 사용하는 방법이 있다. 바로 파일을 읽어올 때 쓰는 방식이다.
파일을 읽어올 때, 한번에 읽어올 수 있다. 아래처럼 readText()를 사용하면 파일의 텍스트를 한번에 읽어오게 된다.
fun main() {
val file = File("file/path")
println(file.readText())
}
하지만 이는 파일의 크기가 작을 때만 가능하다. 만약 한번에 읽어올 파일의 크기가 크다면 메모리 영역을 가득 채울 것이고, OOM을 발생시켜 프로세스가 종료될 것이다.
파일의 크기가 얼마나 될 지는 보통 알 수 없기 때문에 한번에 불러오지 않고, Buffer를 두어 그 크기 만큼 나눠 읽는 경우가 일반적이다.
fun main() {
val file = File("file/path")
file.bufferedReader().use { reader ->
reader.forEachLine {
println(it)
}
}
}
만약 읽어올 단위를 정하고 싶다면 buffueredReader()에 값을 정하면 된다.
fun main() {
val file = File("file/path")
file.bufferedReader(bufferSize = 512).use { reader ->
reader.forEachLine {
println(it)
}
}
}
위의 방식에 buffer를 넘는 데이터에 대해 처리하는 적절한 정책까지 정해진다면, Flow에도 적용시킬 수 있을 것이다!
Flow에도 buffer()라는 Extension이 존재한다. 그래서 얼마만큼의 데이터만을 다룰 것인지 결정할 수 있다.
val backPressureFlow = flow {
while (true) { //매우 많은 양의 데이터 방출
delay(100L) // 100ms 간격으로 방출
emit("emit data")
}
}
//...중략
launch {
backPressureFlow.buffer(capacity = 100).collect { //최대 100개만 처리하자.
delay(1000L)// 1000ms 간격으로 소비
println(it)
}
}
backPressureFlow에 buffer라는 Extension을 달아주고, capacity를 100으로 설정하였다.
이렇게 하면 버퍼 값인 100개 만큼의 데이터만 다루게 된다.
만약 버퍼 값이 가득 찬 상태에서 데이터가 발행되면 어떻게 될까?
이럴 때 사용할 수 있는 정책이 3가지가 있는데, 코드를 직접 살펴보면 아래와 같다.

위 세 개가 buffer가 가득찼을 경우 설정할 수 있는 정책이다.
만약 최신 값을 반영하고 싶다면 DROP_OLDEST를 아래와 같이 설정하면 된다...!!
val backPressureFlow = flow {
while (true) { //매우 많은 양의 데이터 방출
delay(100L) // 100ms 간격으로 방출
emit("emit data")
}
}
//...중략
launch {
//오래된 값 버리기
backPressureFlow.buffer(capacity = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST)
.collect {
delay(1000L)// 1000ms 간격으로 소비
println(it)
}
}
어디서 많이 보지 않았나...?
그렇다.
SharedFlow에서도 위와 같이 버퍼와 버퍼를 비우는 정책을 설정할 수 있다...!
val sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST)
(내가 완전히 틀린 대답을 한 것이 아니었다.)
이렇게 버퍼를 두고 값을 버리거나 발행을 중지하면서 Back Pressure를 예방할 수 있다.
collect는 값을 소비하고 lambda 안에 있는 코드를 반드시 실행시킨다.
이는 실행하는 코드가 매우 느릴 경우(앞선 코드의 delay) Back Pressure를 발생시키는 원인이 된다.
collectLatest는 새로운 값이 발행되었을 때, 이전 작업을 취소시키고 현재 방출된 값으로 다시 코드를 실행시킨다.
val backPressureFlow = flow {
while (true) { //매우 많은 양의 데이터 방출
delay(100L) // 100ms 간격으로 방출
emit("emit data")
}
}
//...중략
launch {
backPressureFlow.collectLatest { //새로운 값이 들어오면 이전 작업을 취소시킨다.
delay(1000L)// 1000ms 간격으로 소비
println(it)
}
}
매우 간단하게 Back Pressure를 해결할 수 있다!
Back Pressure를 해결하는 두 가지 방법을 보았는데
두 코드를 비교해 보면 collectLatest가 가장 간단하게 보인다. 이것만 쓰면 될까?
상황에 따라 다르다고 할 수 있다.
collectLatest는 이전 작업을 취소시키기 때문에 곤란한 경우가 있을 수 있다.
"데이터가 들어온 이상 무조건 실행"해야 하고 Back Pressure를 예방하려면 buffer 방법을, "취소되어도 괜찮은 코드"라면 collectLatest가 좋은 것 같다.
buffercollectLatest답은 X 이다.
이전 작업은 취소시키고 "항상 최신 값만 반영"한다는 특성때문에 자칫 오해하기 쉽지만, collectLatest를 쓴다고 Hot Flow가 되는 것은 아니다.
(하지만 매우 유사해지긴 한다.)
Cold Flow에서 collectLatest를 사용하는 것과 Hot Flow에서 collectLatest를 사용하는 것의 차이점들을 정리해 보면 아래와 같다.
Android에서는 UI 업데이트에
collectLatest를 사용할 것을 권장하고 있다.
UI를 업데이트 할 때에 성능을 위해 이전 값을 취소시키는 collectLatest가 적절한 것 같다.
하지만 꼭 실행시켜야 하는 UI 업데이트 작업이 있거나, 무한히 방출되는 값에 의해 계속 취소되는 상황이라면 collect와 buffer, conflate와 같이 쓰는 방법으로collectLatest를 대체할 수 있을 것이다.
이제 두 가지를 잘 알았으니 적절한 곳에 사용해보아야 겠다!