Dart에서 제공하는 비동기 프로그래밍의 타입은 Future 또는 Stream 타입입니다. 그 중에서 Stream에 대해서 한번 알아보도록 하겠습니다.
Stream을 이해하기 위해서 Future에 대해서 생각해봅시다. Future타입은 즉시 완료되지 않는 데이터타입을 의미합니다. Future로 반환되는 비동기 함수는 작업이 종료되지 않는다면 Future타입을 반환하고 완료되면 반환값의 타입이 반환되죠. 자세한 내용은 아래의 글에서 더 확인할 수 있습니다.
[Dart]비동기 프로그래밍
이제 Stream을 알아봅시다. Stream은 이러한 Future의 반복이라고 할 수 있습니다. 공식문서에서 제공하는 간단한 Stream 예제를 봅시다.
Stream<int> countStream(int max) async* {
for (int i = 0; i < max; i++) {
yield i;
}
}
이는 간단한 정수타입의 Stream입니다. Stream 타입의 함수는 async* 키워드를 이용하여 선언하며, return이 아니라 yield를 사용합니다. 이렇게 생성한 Stream은 listen 메소드를 통해서 읽어볼 수 있습니다.
void main() {
countStream(10).listen((number) => print(number));
}
Stream<int> countStream(int max) async* {
for (int i = 0; i < max; i++) {
yield i;
}
}
0
1
2
3
4
5
6
7
8
9
또한, Stream을 통해서 합계같은 것을 구하고 싶다면 아래와 같이 사용해볼 수 있습니다.
Future<int> sumStream(Stream<int> stream) async {
int sum = 0;
await for (int value in stream) {
sum += value;
}
return sum;
}
이 예제는 Stream의 데이터를 await 키워드로 기다려서 합계를 계산한 후 반환합니다.
void main() async {
Stream<int> stream = countStream(10);
int sum = await sumStream(stream);
print(sum); // 45
}
Future<int> sumStream(Stream<int> stream) async {
int sum = 0;
await for (int value in stream) {
sum += value;
}
return sum;
}
Stream<int> countStream(int max) async* {
for (int i = 0; i < max; i++) {
yield i;
}
}
이처럼, Stream은 물이 흐르는 파이프라인과 같고, 안에 흐르는 물은 비동기 데이터입니다. 그리고 비동기 데이터는 이벤트(Event)라고 합니다.
Stream은 이벤트들이 모두 완료되면 종료됩니다. 하지만, 어떠한 경우 Stream이 완료되기 이전에 에러가 발생할 수 있습니다. 예를 들어, 원격 서버에서 데이터 수신 중 네트워크 에러가 발생하는 경우 또는 코드 자체에서 버그가 있을 수 있습니다. 이러한 에러를 핸들링하는 방법은 try catch를 이용하는 것입니다.
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
if (i == 4) {
throw Exception('Intentional exception');
} else {
yield i;
}
}
}
void main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // -1
}
이 코드는 i가 4인 경우 에러가 발생하는 코드입니다. 이러한 에러를 잡기 위해서 try catch를 await for 문에 감싸줍니다.
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
// 에러 핸들링
try {
await for (final value in stream) {
sum += value;
}
} catch (e) {
return -1;
}
return sum;
}
이제 기존의 코드는 에러가 발생하는 경우 -1을 반환합니다.
listen()메소드는 Stream을 수신할 수 있도록 해줍니다. 가장 상단에서 보여준 예제는 listen()메소드를 통해 Stream을 읽어드려 하나하나 이벤트를 수행했습니다.
void main() {
countStream(10).listen((number) => print(number));
}
Stream<int> countStream(int max) async* {
for (int i = 0; i < max; i++) {
yield i;
}
}
0
1
2
3
4
5
6
7
8
9
이때, 수신하기 전 Stream은 불활성 객체(Inert Object)이며, 수신하면 StreamSubscription 객체가 반환됩니다. 아래는 listen()메소드의 정의입니다.
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
정의된 선언을 보면 각 데이터의 이벤트 또는 오류에 대해서 콜백을 지정할 수 있으며, 콜백은 Stream이 종료되면 호출됩니다.