
GIF 출처 : https://www.amigoscode.com/courses/java
1 ) 스트림 API ( Stream API ) 란?
2 ) 스트림 구조
3 ) 스트림 동작 변경
4 ) 원시 스트림 ( Primitive Stream )
5 ) 무한 스트림 ( Infinite Stream )
6 ) 다운 스트림 콜렉터 ( Down Stream Collector )
7 ) 스트림의 병렬 데이터 처리
스트림 API ( Stream API ) 는 java 8에서 도입된 기능으로 컬렉션 ( Collection ) 을 함수형 스타일로 처리할 수 있게 도와주는 API이다. 스트림은 다음과 같은 특징을 가지고있다.
| 특징 | 설명 |
|---|---|
| 선언형 | 반복문 대신 filter, map, collect 등 연산자를 사용 |
| 중간 연산 vs 최종 연산 | filter, map은 중간 연산. collect, forEach는 최종 연산. |
| 지연 처리(lazy) | 최종 연산이 실행되기 전까지 중간 연산은 실행되지 않음 |
| 병렬 처리 가능 | .parallelStream()을 쓰면 내부적으로 멀티스레드로 처리 가능 |
| 불변성 | 원본 데이터는 변경되지 않음. 새로운 결과를 생성함 |
스트림에 대해 들어가기 앞서 데이터 처리에 있어 왜 스트림을 도입했는가를 이해할 필요가 있다.
넓은 관점에서 모든 데이터 처리는 파이프라인 방식으로 작동한다. 컬렉션 ( Collection ) 과 같은 자료 구조는 요소들을 제공하며 , 필터링이나 변환과 같은 다양한 작업을 거쳐 결과를 제공하는데 이러한 방식에 있어 두 가지가 존재한다.
① 외부 반복
② 내부 반복
① 외부 반복
: 외부 반복은 전통적인 명령형 접근방법을 의미한다. 예를 들어 책을 찾는 메소드를 담은 클래스가 존재한다고 가정해보자.
public class Book {
// 필드
// 생성자 & setter
// 빌더
List<Book> books = ... ;
Collection.sort(books,Comparator.comparing(Book::title));
// 메서드 루프
List<String> result = new ArrayList<>();
for(var book:books){
if(book.year()>= 1970){
continue;
}
}
// 이후 나머지 메소드 ( 메서드 루프 포함 )
}
해당 클래스에서 코드는 루프를 통해 반복적인 데이터 처리 로직을 구현하고 있다. 이러한 보일러플레이트 코드는 불필요한 코드 사용 및 가독성을 저해시킨다.
근본적으로 이러한 문제의 원인은 ' 무엇을 하느냐 ( 데이터 처리 ) ' 와 ' 어떻게 수행하는가 ( 요소 처리를 반복 ) ' 를 혼용하고 있다. 이를 ' 외부 반복 ' 이라고 한다.
② 내부 반복
: 내부 반복은 외부 반복의 반대 접근법으로 개발자가 순회 과정을 직접 제어하는 것을 포기하고 , 데이터 소스 자체가 ' 어떻게 수행되는지 ' 를 담당하도록 한다.
즉 , 순회를 직접 제어하기 위해 반복자 ( Iterator ) 를 사용하는 대신 데이터 처리 로직은 스스로 반복을 수행하는 파이프라인을 사전에 구성한다.

스트림은 이러한 내부 반복자 ( Internal Iterator ) 라는 장점이 있으며 이는 함수형 관점에서 여러 장점을 가진다.
- 스트림의 장점
① 선언적 접근법
: 단일 호출 체인을 통해 간결하고 명료한 다단계 데이터 파이프라인을 구축할 수 있다.
② 지연 처리
: 모든 요소를 일일이 순회하는 대신 , 마지막 연산이 파이프라인에 연결된 이후에 각 요소가 하나씩 순차적으로 파이프라인을 통해 처리된다.
③ 병럴 데이터 처리
: 스트림은 내장된 병렬 처리 기능을 가지고 있어 호출 체인에서 다인 호출을 간단히 변경해서 활용할 수 있다.
- 스트림의 단점
① 스트림 재사용 불가능
: 스트림 파이프라인은 단 한번만 사용할 수 있다. 스트림 파이프라인은 데이터 소스에 연결되어 있으며 , 종료 연산이 호출된 후 정확히 한번만 전달된다.
이때 , 스트림을 다시 사용하려고 하면 IllegalStateException 예외가 발생한다.
② 지나친 추상화
: 스트림의 메소드인 map(),filter,collect() 등은 연산 융합 ( Fusion ) 이 가능하기 때문에 지나친 중첩 사용은 코드 가독성을 떨어뜨린다.
③ 상태 기반 연산 시 동시성 문제
: 상태를 가지는 연산을 병렬 스트림에서 사용할 경우 , 동기화가 필요하다. 또한 의도치 않은 동기화 비용이 발생할 수 있다.
대부분의 데이터 처리는 동일한 체계를 따르며 기본적으로 세 가지 연산 유형으로 축약된다.
① 요소 선택
② 요소 매핑
③ 최종 연산
위 메소드들은 Collection타입에서 쉽게 사용할 수 있는 메소드들이며 모든 데이터 처리의 기본적인 구성 요소이다.
: 요소 선택은 데이터 처리의 가장 기본적인 작업으로 특정 조건에 따라 선택하는 것을 의미한다. 이는 Predicate를 이용한 필터링 혹은 요소의 개수를 기반으로 선택한다.
( 1 )
Stream filter (Predicate <? super T> predicate)
: 요소 필터링의 가장 직관적인 방법으로Predicate의 결과가 true일 경우에는 해동 요소를 후속 처리를 위해 선택한다.
ex )
List<User> filteredUsers = users.stream()
.filter(User::lengthLargerThan3)
// 정적 메서드 참조
// ( User에 해당 정적 메소드가 존재한다고 가정 )
.collect(Collectors.toList());
이때 , 자바에서는 메서드 참조 혹은 람다식의 경우 boolean을 반환할 경우 자동으로 Predicate 타입으로 추론한다.
( 2 )
Stream<T> limit(long maxSize)
: 요소의 최대 개수를maxSize로 제한하는 메소드이다.
ex )
List<User> filteredUsers = users.stream()
.filter(User::lengthLargerThan3)
.limit(4L) // 상위 4개의 값을 가져온다.
.collect(Collectors.toList());
( 3 )
Stream<T> distinct()
: 요소들을 비교하며 중복되지 않은 요소만 반환한다. 이때 , 해당 연산은 요소를 비교하기 위해 모든 요소를 버퍼에 저장한다.
ex )
List<User> filteredUsers = users.stream()
.filter(User.lengthLargerThan3)
.distinct() // 중복 값 제외
.collect(Collectors.toList());
( 4 )
Stream<T> sorted()
: 요소들을 특정 기준에 따라 정렬시킨다. 이때 ,Comparator를 사용하여 유연한 정렬기준을 설정할 수 있다.
ex )
List<User> filteredUsers = users.stream()
.sorted(Comparator
.comparing(user -> user.getName().length())) // 비교 기준
.collect(Collectors.toList());
: 요소 매핑은 원하는 형식으로 스트림의 요소를 반환하거나 추출하는 작업이다. filter 와 다르게 원하는 형태로 가공할 수 있다.
( 1 )
Stream<T> map(Function<? super T, ? extends R> mapper)
: 해당 메소드는 요소 매핑의 기본적인 메소드로 mapper 함수가 요소에 적용되어 새로운 요소가 스트림으로 반환된다.
ex )
List<User> filteredUsers = users.stream()
.filter(User::lengthLargerThan3)
.map(user -> User.builder() // map()으로 status 필드를 0으로 초기화
.username(user.getUsername())
.password(user.getPassword())
.status(0)
.build())
.collect(Collectors.toList());
( 2 )
Stream<R> flatMap(Function<? super T, ? extends Stream<? extemds R>> mapper)
: 해당 메소드는 요소들을 가공해서 여러 개의 하위 요소(Stream)를 만들고 요소들을 평탄화하여 하나의 연속된 스트림으로 반환한다.
ex )
List<User> userAsia = ...; // map을 통해 추출한 List
List<User> userEurope = ...; // map을 통해 추출한 List
List<List<User>> combined = List.of(userAsia, userEurope);
List<User> userTotal = combined.stream()
.flatMap(List::stream) // 두 리스트를 하나로 평탄화하기
.collect(Collectors.toList());
: 최종 연산은 요소를 실제로 처리하여 결과나 사이드 이펙트를 생성하기 위한 스트림 파이프라인의 마지막 단계이다. 이때 , 최종 연산은 총 네 가지 그룹으로 나뉜다.
: 누적 연산자를 반복적으로 적용하여 스트림의 요소들을 하나의 결과값으로 만든다. 이때 , 이전의 결과를 현재의 요소와 결합하여 새로운 결과를 만든다.
T reduce(T identity, BiFunctional<U, ? super T, U> accumulator, BinaryOperator<U> combiner)
: 해당 함수에서 identity는 초기값을 , BiFunctional<U, ? super T, U> accumulator는 스트림의 요소와 누적 결과를 합치는 누적 함수를 의미한다.
마지막으로 BinaryOperator<U> combiner은 병렬 처리용 병합 함수로 병렬 스트림에서 여러 스레드가 결과를 만들어내면 모든 결과를 합쳐 하나의 최종 결과로 만드는 함수이다.
ex )
var reduce = Stream.of("apple","orange","banana")
.reduce(0, // 초기값
(acc,str) -> acc + str.length(), // 누적함수
Integer::sum); // 병합 함수
: 모든 데이터 처리 과정을 마친 후 , 결과를 새로운 자료 구조로 집계한다.
Collector<T,A,R>
public interface Collector<T, A, R> {
Supplier<A> supplier();
// 누적기의 초기값 공급자
// 예: () -> new ArrayList<T>()
BiConsumer<A, T> accumulator();
// 누적 연산
// 예: (list, element) -> list.add(element)
BinaryOperator<A> combiner();
// 병렬 처리 시 여러 누적 결과 병합
// 예: (list1, list2) -> { list1.addAll(list2); return list1; }
Function<A, R> finisher();
// 누적기를 최종 결과로 변환
// 예: 리스트를 다른 컬렉션으로 변환하거나 불변 객체로 변환
Set<Characteristics> characteristics();
// 수집기의 특성 (CONCURRENT, UNORDERED, IDENTITY_FINISH 등)
}
Collector함수는 인터페이스로 정의되어 있으며 다음과 같은 구성요소를 가진다. 이때 , 다음 순서로 여러 함수형 인터페이스를 거쳐 결과를 수집하게 된다. 
먼저 Supplier<A>에 의해 (1) 가변 컨테이너에 객체가 전달된다. 그 이후 (2) 누적기에 해당하는 BiConsumer<A,T>인터페이스를 통해 누적한 뒤 (3) 결합기에 해당하는 BinaryOperator<A>에서 이를 결합한다.
마지막으로 (4) 반환기에 해당하는 Function<A,R>을 통해 결과를 반환한 뒤 결과를 수집하게 된다.
이때 , 결과를 toset(),toCollection(),toList()등 다양한 컬렉션 자료구조로 변경할 수 있다.
ex )
List<User> = users.stream()
.filter(User::lengthLargerThan3)
.collect(Collectors.toList()); // 리스트 형태의 자료구조로 반환
: 특정한 요소를 찾아내거나 조건에 맞는 요소를 찾아내면 boolean 타입의 결과를 도출한다.
1.
Optinal<T> findAny()
: 스트림 내 임의의 요소를 반환하고 비어있다면 빈 Optional를 반환한다.
2.
boolean anyMatch(Predicate<? super T> predicate)
: predicate와 일치하는 요소가 스트림에 하나라도 존재하면 true를 반환한다.
: 종료 연산의 마지막 그룹으로 사이드 이펙트 전용 연산이다. 여기서 사이드 이펙트 전용 연산이란 반환 결과에 중점을 두지 않고 외부에 변화를 일으키는 것에 목적을 두는 연산을 의미한다.
1.
void forEach(Consumer<? super T> action)
: 각 요소마다 주어진 동작을 수행하도록 한다. 이때 , 병렬 스트림에서는 최적의 성능을 위해 실행 순서는 명시적으로 비결정적이다.
ex )
List<User> userAsia = ...;
List<User> userEurope = ...;
List<List<user>> combined = List.of(userAsia,userEurope);
List<User> userTotal = combined.stream()
.flatMap(List::stream)
.map(user -> user.getUsername().toUpperCase())
.forEach(System.out::println); // 각 요소를 System.out의 println() 메소드를 실행
기본적으로 스트림은 직렬 스트림 ( Serial Stream ) 으로 요소가 순서대로 처리된다. 이는 단일 스레드 이기 때문에 막대한 데이터를 다루거나 많은 트래픽이 존재할 경우 병목현상이 발생할 수 있다.
이를 위해 자바에서는 직렬 스트림을 병렬 스트림으로 변경하는 메소드를 제공하여 병렬 처리를 할 수 있도록 한다.
1.
default Stream<E> parallelStream()
: 컬렉션에 담긴 데이터를 병렬 스트림으로 전환해준다. 내부적으로 ForkJoinPool을 사용해서 병렬처리를 한다. 이때 , parallelStream() 처리 시 기존의 스트림 처리방식에서 Collection.spliterator() + 병렬처리를 실행한다.
ex )
List<String> names = List.of("Alice", "Bob", "Charlie", "David", "Eve");
// 병렬 스트림 생성
names.parallelStream()
.forEach(name -> {
System.out.println(Thread.currentThread().getName() + " → " + name);
});
2.
Stream<T> parallel()
: 기존의 단일스트림에서 적용 시점에 컬렉션에 담긴 데이터를 병렬 스트림으로 전환한다.
ex )
List<String> names = List.of("Alice","Bob","Charlie","David","Eve");
List<String> namesAfter = names.stream()
.filter(Name::startWithA) // Name클래스의 startWithA라는 메소드가 존재한다고 가정
.parallel()
.map(String::toUpperCase)
.collect(Collectors.toList());
3.
Stream<T> sequential()
: 현재의 스트림을 직렬 스트림으로 전환한다. parallelStream()을 호출한 이후 해당 메소드를 호출하면 병렬 스트림에서 직렬 스트림으로 전환된다.
ex )
List<String> names = List.of("Alice", "Bob", "Charlie", "David", "Eve");
names.parallelStream() // 병렬 스트림
.map(String::toUpperCase)
.sequential() // 직렬로 전환
.forEach(System.out::println); // 순서는 무작위이나 실행 순서는 직렬
: 원시 스트림은 원시 타입에서 사용할 수 있는 스트림을 의미한다. 일단 원시타입의 경우 제네릭에서 타입으로 지정해 사용할 수 없기 때문에 객체 타입으로 변경해야 한다. 이때 , 객체 타입으로 변경하기 위해서는 2가지 방법 중 하나를 택해야 한다.
① 오토박싱 ( AutoBoxing )
② 스트림 변형 ( Stream Variant )
오토 박싱의 경우 원시 타입을 객체로 변경할 때 오버헤드가 발생한다. 단일 스트림에서 오버헤드는 큰 문제가 되지 않으나 데이터 처리 파이프라인에서는 지속적인 래퍼 타입의 생성으로 인해 오버헤드가 누적된다.
이 때문에 오토 박싱 방식은 지양되고 있다. 또한 원시 타입을 객체 타입으로 변환하는 것이 아닌 래퍼 타입으로 처리하는 경우 null 값이 반환될 가능성이 있다.
이를 위해 JDK의 다른 함수형 기능들 처럼 스트림 API도 원시타입에 대한 특별한 버전을 제공한다. 이를 원시 스트림이라 한다.
| 원시 타입 | 원시 스트림 | 박싱된 스트림 |
|---|---|---|
int | IntStream | Stream<Integer> |
long | LongStream | Stream<Long> |
double | DoubleStream | Stream<Double> |
Stream<T> boxed() // 원시 스트림을 박싱된 스트림으로 변경
Stream<U> mapToObj(IntFunction<? extends U> mapper) // 원시 스트림을 객체 타입으로 변경
int sum() // 더하기
OptinalInt min() // 최소치 구하기
OptionalInt max() // 최대치 구하기
OptionalDouble average() // 평균값 구하기
1.
<T> Stream<T> generate(Supplier<T> s)
: 정적 편의 메서드 중 하나로 무한한 스트림을 생성한다. 이때 , 스트림은 UNORDERED 상태를 유지한다.
generate 메소드의 경우 Supplier에 의해 생성된 순서 없는 스트림으로 랜덤값과 같이 상호의존성이 없는 요소의 시퀀스에서 활용될 수 있다.
Stream<UUID> createStream(long count){
return Stream.generate(UUID::randomUUID)
.limit(count);
2.
<T> Stream<T> iterate(T seed, UnaryOperator<T> f)
: 초기값과 연산을 기반으로 다음 값을 생성한다.
ex ) 자연수 생성기
List<Integer> iterateForNaturalNumber = Stream.iterate(0,n->n+1)
.limit(5)
.collect(Collectors.toList());
다운 스트림 콜렉터 ( Down Stream collector ) 란?
: 다운 스트림 콜렉터란 그룹화한 메소드 안에서 각 그룹에 속하는 요소들을 어떤 방식으로 수집 , 가공 , 저장할지를 지정하는 콜렉터를 의미한다.
다운 스트림 콜렉터는 일반적으로 다음과 같은 작업을 수행한다.
① 변환 ( Transforming )
② 축소 ( Reducing )
③ 평탄화 ( Flattering )
④ 필터링 ( Filtering )
⑤ 복합 컬렉터 연산 ( Composite Collector Operation )
: groupingBy 메소드를 통해 그룹화한 요소들을 추가적으로 다른 형태로 변환하기 위해 사용한다 .
ex )
Map<String , List<User>> lookUp = users.stream()
.collect(Collectors
.groupingBy(User::group)); // 그룹화 한 뒤 메서드 참조의 리턴값은 String 타입을 받아와야한다.
뿐만 아니라 mapping 메소드를 통해서 내부의 그룹화 요소를 컬렉션 객체에 담아 세분화된 그룹화가 가능하다.
ex )
Map<String,Set<UUID>> lookup = users.stream()
.collect(groupingBy(User::group
,mapping(User::id,toSet())));
// User::group이 String 타입을 리턴해야함
// User::id가 UUID 타입의 객체를 리턴하고 해당 객체의 컬렉션을 Set타입으로 리턴
: 변환된 요소를 다운스트림 콜렉터를 통해서 새로운 컬렉션 군집으로 만들 수 있다.
Collection<T,?,U> reducing(U identity,Function<? super T,? extends U> mapper,BinaryOperator<U> op)
:reduce매소드와 마찬가지로 다운스트림 작업을 위한 축소 컬렉터이며 여러 단계를 하나의 작업으로 합치는 메소드이다.
ex )
Map<UUID,Integer> logCountPerUserId = users.stream()
.collect(Collectors
.groupingBy(User::id
,UserCount::count));
: 개별 요소를 평탄화 하여 파이프라인에서 후속 작업을 수행할 수 있도록 한다.
static Collector<T,?,R> flatMapping(Function<T,Stream<U>> mapper,Collector<U,A,R> downstream)
: 다운스트림 콜렉터의 그룹화 중 평탄화를 실행하는 메소드이다.
ex )
var downstream = Collectors.flatMapping((User user)->user.logEntries()
.stream()
.Collectors.toList());
Map<String,List<String>> result = users.stream()
.collect(Collectors
.groupingBy(User::group
,downstream));
: 그룹화 도중에 필터링을 통해 원하는 요소들만을 선택한다.
static<T,A,R> Collector<T,?,R> filtering(Predicate<T> predicate,Collector<T,A,R> downstream
ex )
var startOfDay = LocalDate.now()
.atStartOfDay();
Predicate<User> loggedInToday = Predicate.not(user->user.lastLogin()
.isBefore(startOfDay));
Map<String,Set<UUID>> todaysLoginsByGroupWithFilterOp = users.stream()
.filter(loggedInToday)
.collect(groupingBy(User::group,
mapping(User::id,toSet()))));
: 한번에 두개의 다운 스트림 컬렉터를 동시에 처리하고 해당 결과를 하나로 통합하는 연산
public static <T, R1, R2, R> Collector<T, ?, R> teeing(Collector<? super T, ?, R1> downstream1,Collector<? super T, ?, R2> downstream2,BiFunction<? super R1, ? super R2, R> merger)
ex )
record UserStatus(long total,long neverLoggedIn){
// 바디 생략
}
UserStats result = user.stream()
.collect(Collectos
.teeing(Collectors.counting(), // 첫번째 컬렉션
Collectors.filtering(user->user.lastLogin() == null,
Collectors.counting()),
UserStats::new));
동시성 ( Concurrency )
: 여러 작업이 중복되는 시간 동안 한정된 리소스에 대한 경쟁을 하면서 실행되는것을 의미한다. 동시성 환경에서는 단일코어가 여러 작업을 중단 및 전환하면서 마치 동시에 여러 작업을 수행하는 것 처럼 보인다.
동시성 환경에서는 자료구조를 사용하려면 해당 구조가 스레드 안전 ( Thread - safe ) 하도록 설계되어야한다. 이를 위해 락 ( Lock ) , 세마포어 ( Semaphore ) 등의 조정이 필요하다.

스트림에서는 병렬 처리 기능을 가진 직관적인 데이터 처리 파이프라인을 제공한다. 이는 앞선 챕터에서 언급한 여러 병렬 처리 관련 메소드들을 참고하면 된다. 또한 스트림은 순차적으로 처리된 스트림으로의 변환메소드 ( sequential )도 제공한다.
병렬 스트림은 재귀적 분해 ( Recursive Decomposition ) 을 사용한다. 이는 요소를 Spliterator를 사용하여 분할ㄹ하며 요소의 덩어리를 병렬로 처리하므로써 데이터 소스를 분할하고 정복하는 것을 의미한다.
이때 , 스트림 API는 ForkJoinPool을 활용하여 작업을 여러 스레드로 분할하고 해당 스레드들은 ForkJoinPool에서 관리하도록 한다.
: 이에 대한 답은 NO이다. 병렬 스트림은 정말 많은 데이터 소스들을 동시에 처리하므로써 하나씩 처리하는 방식보다 효율성을 가져오기 위해 고안된 API임은 틀림없다.
그러나 병렬 스트림을 사용하는 것이 성능 향상을 항상 보장하지는 않으며 다양한 요소에 영향을 받을 수 있기 때문에 무조건적인 병렬 스트림 활용을 옳지않다. 또한 병렬 스트림은 요소들의 순서를 보장하지 않기 때문에 순서가 보장된 자료구조가 필요한 경우 문제가 될 수 있다.
따라서 데이터 소스의 크기가 크지 않거나 직렬 스트림으로 처리하는 것이 성능적으로 효율적인 경우인지를 판단한뒤에 설계할 필요가 있다.
병렬 스트림을 사용할때 순차 스트림에 비해 초기에 더 큰 제약을 감수해야한다. 스트림 구조의 오버헤드 외에도 데이터 소스를 분해나는 데 드는 비용 , ForkJoinPool에 의한 스레드 관리 그리고 최종 결과를 재조합하는 등 전체 프로세스의 모든 측면을 고려해야한다. 이는 암달의 법칙에 의해 설명된다.

