12. 스트림

stream이란 단어는 시냇물이라는 뜻을 갖고 있다. 그리고 자바에는 스트림이라는 기능이 있다. 정확히는 Java 8부터 Stream이라는 객체가 추가됐는데, 요소들이 하나씩 흘러가면서 처리할 수 있다는 의미에서 붙여진 명칭이다.

1회독 때, 스트림 이후로 실용적인 부분에서 엄청 많이 활용되는 것을 기억하고 이를 잘못 이해해서, 스트림의 예를 들면 입출력 스트림...인 줄 알았는데 둘은 서로 취급하는 게 다르다. 즉 공부를 똑바로 안 했다는 뜻이다 그래서 그런가... 공부할 게 엄청 많아 보인다. 목차만 해도 무려 13개(...)

참조 링크
https://tcpschool.com/java/java_stream_concept
https://tcpschool.com/java/java_io_stream

1) 스트림 정의

스트림 정의를 논하기 전에 컬렉션의 Set이 내부 요소들을 어떻게 처리하는 지에 대한 방법 2가지 중 하나였던 Iterator(반복자)를 잠시 떠올려야 한다. 왜냐면 스트림이랑 비교하기 딱 좋은 대상이기 떄문. 사실 Stream도 어떻게 보면 반복자니까.

(1) Iterator와 Stream

보통 배열이나 컬렉션 내부의 저장 데이터들을 처리하는 것을 반복할 때, for문을 사용하거나, Set을 사용한 경우에는 Iterator 반복자를 활용하였다.

// for문
List<String> list = // ...

for(int i = 0; i < list.size(); i++) {
	String item = list.get(i);
    // ...
}

// Iterator 반복자
Set<String> set = // ...

Iterator<String> iterator = set.iterator();
while(iterator.hasNext()) {
	String item = iterator.next();
}

근데 보면 데이터 처리 로직이 요소를 하나씩 꺼내서(get(), next()) 처리하는 느낌이다. 기존에 잘 나열되어 있는 일련의 데이터 묶음을 포장지 단위로 벗기면서 작업을 처리하는(?) 성격의 코드인 것 같다.

스트림도 반복되는 작업의 처리에 있어 쓰인다는 점은 같다. 자바에서의 스트림 표현은 아래와 같이 이뤄진다.

// 스트림 객체 생성
Stream<String> stream = list.stream();

// 내부 반복하면서 람다식으로 데이터 처리
strea.forEach(item - > /* ... */)

표현의 방식만 다르지 결국 반복 처리를 위한 기능이 아닌가라는 생각이 들었지만, Stream의 다른 특징은 내부 반복자라는 점이었다.

(2) 내부 반복자

아까 위에서 말했듯이 단순 반복문이나 Iterator는 저장된 데이터를 하나씩 꺼내서 개발자가 직접 작성한 코드에 처리하려고 가져다 쓰는 과정이라고 말했다. 이 점이 Stream과의 차이점이라고 볼 수 있는데, 왜냐하면 Stream은 개발자가 작성한 코드를 (람다식 등을 활용해서) 아예 컬렉션 내부로 주입시켜서 요소를 반복 처리하게 된다.

즉, 단순 주입은 그저 데이터 처리 로직을 넣기만 하면 되는데, 일반적인 반복 처리는 데이터 처리 로직을 활용하려면 그것을 꺼내 오는 로직까지 포함되어야 한다는 점이 대표적인 차이점이라고 할 수 있다.

  1. 내부 반복자는 처리 속도가 빠르고 병렬 처리에 효율적이다.
  2. 람다식으로 다양한 요소 처리를 정의할 수 있다.
  3. 중간 처리와 최종 처리를 수행하도록 파이프 라인을 형성할 수 있다.

2) Stream 객체

(1) 내부 반복 & 병렬 처리

아까 말했듯이 Stream 객체는 내부에서 반복하기 떄문에 처리 속도가 빠르다는 장점이 있었다. 또한, 병렬 처리가 가능하다고 했는데 이 병렬 처리를 가능케하는 것이 pararellStream() 메소드다.

병렬 처리에서 감이 오겠지만, 멀티 스레드를 활용할 때 해당 메소드를 많이 접하게 된다.

Stream<String> parallel = list.parallelStream(); // 병렬 스트림 얻기

// 내부 요소를 반복(forEach) + 병렬 처리하게 됨
// 처리를 맡은 스레드 명칭 갖고 오는 작업 처리
parallel.forEach(name -> {
            System.out.println(name + " : " + Thread.currentThread().getName());
        });

(2) 파이프라인

Stream을 작성하다 보면 .을 기준으로 줄줄이 소세지처럼 연결돼서 코드를 작성하게 된다. 이 장난감 기차 같은 모양을 스트림 파이프라인이라고 한다.

스트림 파이프라인은 오리지널 스트림, 중간 스트림, 최종 처리로 이뤄진다.

오리지널 스트림

Stream<Student> studentStream = list.stream();

여기 Student 클래스로 요소를 이루는 List가 있다.
이것을 Stream 객체로 스트림을 얻었다. 이것이 오리지널 스트림이다.

중간 스트림

// 각 학생의 점수를 얻음(매핑)
IntStream scoreStream = studentSTream.mapToInt(student -> student.getScore());

최종 처리를 위해 요소를 걸러내거나(필터링), 요소를 변환시키거나(매핑), 요소를 정렬하는 작업을 수행한다. 이것이 중간 스트림이다.

최종 처리

// 평균을 얻음
double avg = scoreStream.average().getAsDouble();

최종 처리에서는 중간 처리에서 정제된 요소를 반복, 집계(카운팅, 총합, 평균) 작업을 수행한다.

파이프라인

double avg = list.stream().
				mapToInt(student->student.getScore()).
				average().getAsDouble();

위의 세 단계를 메소드 체이닝 패턴을 통해서 연결하며 처리할 수 있다. 이렇게 작성된 것을 스트림 파이프라인이라고 하며, 스트림 파이프라인의 맨 마지막은 언제나 최종 처리가 되어야만 한다.

3) 오리지널 스트림 : 리소스에서 스트림 얻기

위에서는 Stream 인터페이스를 주로 봤는데, 사실 그것 이전에 java.util.stream 패키지를 먼저 들여봐야 한다. 왜냐하면 요소를 처리하는 모든 스트림 인터페이스의 부모 스트림 인터페이스인 BaseStream 인터페이스가 있기 때문이다.

BaseStream 인터페이스를 기반으로 총 4개의 스트림이 존재한다.

  • Stream : 객체 요소 처리
  • IntStream : int 요소 처리
  • LongStream : long 요소 처리
  • DoubleStream : double 요소 처리

스트림들은 다양한 리소스에서 얻을 수 있는데 그걸 전부 외우는 건 사실 비효율적인 것 같고... 우선 어떤 리소스에서 얻을 수 있는지 리소스 별로 정리를 하며 손에 익혀봅시다.

(1) 컬렉션

정확히는 컬렉션 중에서 Set 컬렉션List 컬렉션으로부터 Stream<T>를 얻을 수 있다. 위에서 봤던 흔한 예시들에서도 느껴지듯이 주로 컬렉션으로부터 스트림 객체를 얻게 된다. 왜냐하면 컬렉션 인터페이스는 stream() 메소드 외에도 병렬 처리를 위한 pararellStream()을 지니고 있기 때문이다.

(2) 배열

java.util.Arrays 클래스에서 구현된 여러 종류의 배열에서도 스트림을 얻게 된다. 아래 예제 코드를 보고 그렇구나... 정도로 생각하기. 다만, 요소별로 적합한 스트림을 써야 한다.

String[] strings = {"홍길동", "김자바", "치킨냠"};
// 데이터 처리를 위한 스트림화
Stream<String> stringStream = Arrays.stream(strings);
stringStream.forEach(item -> System.out.print(item + ", "));

int[] ints = {1, 2, 3, 4, 5};
// 데이터 처리를 위한 스트림화
IntStream intStream = Arrays.stream(ints);
intStream.forEach(item -> System.out.print(item + ", "));

(3) 숫자 범위

숫자 범위라는 것은 단순한 일련의 수열을 의미한다고 생각하면 된다. IntStream 혹은 LongStream의 정적 메소드인 range()(초과, 미만) 혹은 rangeClosed()(이상, 이하) 메소드로 일련의 데이터 처리를 수행할 수 있다.

(4) 파일

java.nio.file.Files에 있는 line() 메소드는 .txt 파일의 행 단위 스트림을 얻게 해준다. 즉 텍스트 파일의 한 줄 한 줄씩 스트림을 얻게 해서 그것을 읽는 등의 처리를 수행할 수 있다.

// Path의 경로를 URI 객체로 변환 후 반환
Path path = Paths.get(StreamExample.class.getResource("data.txt").toURI());

// 각 라인 단위 스트림별로 처리(여기서는 읽기)
stream.forEach(System.out::println);

여담으로 스트림은 결국 시스템 자원을 소모해서 모든 작업 처리가 끝나면 스트림을 폐쇄하는 것이 좋다. 방법은 close() 메소드를 호출하거나, try-with-resource 문법을 활용하면 된다.

// close() 메소드 호출
stream.close();

// try-with-resource 문법 활용
try (
	Stream<String> stream = Files.lines(path, Charset.defaultCharset())) {
    stream.forEach(System.out::println);
} catch (Exception e) {
    e.printStackTrace();
}

4) 중간 스트림 : 필터링, 매핑, 정렬, 루핑

(1) 필터링 : 요소 걸러내기

필터링 관련 중간 스트림 메소드에는 distinct()filter()가 있다.

  • distinct() : 중복 제거
  • filter() : 조건 필터링

distinct() 메소드의 중복 제거 로직은 Stream 객체의 경우, equals() 메소드의 리턴값으로 판별하며, 그 외의 스트림 객체는 같은 값인지의 여부로 판별한다.

filter() 메소드는 매개값으로 객체(T)를 조사하는 Predicate<T> 함수형 인터페이스, 각각 int, long, double을 조사하는 IntPredicate, LongPredicate, DoublePredicate 함수형 인터페이스가 주어지며 매개값을 조사 후, boolean 값을 리턴한다.

// distinct() : 중복 요소 제거
list.stream().distinct().forEach(e -> System.out.print(e + ", "));

// filter() : 해당되는 요소만 걸러내서 반환
list.stream().filter(e -> e.startsWith("신"))
        .forEach(e -> System.out.print(e + ", "));

(2) 매핑 : 요소 변환

자바스크립트의 고차함수였던 map()을 생각하면 편하다. 요소 별로 mapXxx() 메소드로 요소를 변환시키며, 해당 메소드의 매개타입은 Function 인터페이스로 매개값을 리턴하는 applyXxx() 메소드를 보유하고 있다.

// Student 스트림을 score 스트림으로 변환
studentList.stream().mapToInt(Student::getScore)
        .forEach(System.out::println); 

단순하게 기본 타입(int, double, long) 간의 변환이거나 기본 타입을 래퍼 타입(Integer, Double, Long) 객체로 변환하라면 asLongStream(), asDoubleStream(), boxed() 메소드를 활용할 수 있다.

여담으로, 스트림은 한 번 사용되면 소비되었다고 간주되므로 기존에 할당시킨 변수에 새로운 스트림을 할당하는 것이 가능하다. 아래 예제로 확인할 것

int[] ints = {1, 2, 3, 4, 5};

IntStream intStream = Arrays.stream(ints);
intStream.asDoubleStream(). // int -> double 로 정수 스트림을 실수 스트림으로 변환
        forEach(System.out::println);
// 여기서 기존의 intStream은 사용되었고, 
// asDoubleStream() 메서드를 통해 새로운 DoubleStream이 생성

System.out.println();

// 한 번 사용된 스트림은 '소비(consumed)'되었다고 간주
// 그래서 새로운 IntStream을 생성하여 intStream에 할당

intStream = Arrays.stream(ints); // 기본 스트림(요게 새로운 IntStream을 생성)

// 기존의 intStream 변수에 새로운 IntStream을 할당
intStream.boxed(). // int -> Integer 로 래퍼 스트림 변환
        forEach(e -> System.out.println(e.intValue())); 
        // 그것의 int 값을 갖고와서 출력

요소 별로 복수 개의 요소들로 변환하는 것도 매핑의 종류에 포함된다. flatMap() 메소드를 사용한다. 해당 메소드를 사용함으로써 복수 개의 요소를 지니는 새로운 스트림이 생성된다.

flatMap() 메소드가 조금 이해가 안 돼서 찾아보니, 스트림 평면화라고도 일컫는다. 아래 예제의 흐름으로 다시 공부해보기

// 리스트(list1)를 하나 생성했다.
List<String> list1 = new ArrayList<>();
list1.add("this is java");
list1.add("i am a best developer");

list1.stream().
        flatMap(data -> Arrays.stream(data.split(" "))).
        // 우선 data -> Arrays.stream(data.split(" "))를 통해서
        // 하나의 요소(문장 문자열)를 여러 개의 요소
        // (분리된 문자열 배열을 생성하고, 이를 스트림으로 변환)
        // 그리고 flatMap을 통해서 각 스트림을 전부 펼쳐서 하나의 스트림으로 만들어버림
        // Arrays.stream()에 의헤 String[] 배열을 하나의 Stream<String>으로 만듦
        // 전체 평탄화
        forEach(System.out::println);
List<String> list2 = Arrays.asList("10, 20, 30", "40, 50");
list2.stream().
        flatMapToInt(data -> {
            String[] strings = data.split(",");
            int[] ints = new int[strings.length];
            for (int i=0; i< strings.length; i++) {
                ints[i] = Integer.parseInt(strings[i].trim());
            }
            return Arrays.stream(ints);
            // Arrays.stream() 메소드가 ints 배열을 하나의 IntStream으로 만듦
        }) // 전체 평탄화
        .forEach(System.out::println);

(3) 정렬

통상 정렬은 오름차순 혹은 내림차순으로 중간 처리를 맡는다. sorted() 메소드를 사용해서 요소의 정렬이 이뤄지는데, 컬렉션에서 검색 기능 강화를 위해 봤던 Comparable 인터페이스와 Comparator 인터페이스를 기억해내야 한다. 기억하라고 좀

Comparable 인터페이스는 compareTo() 메소드를 오버라이딩 및 구현함으로써 객체의 정렬 기준을 정의할 수 있고, Comparator 인터페이스는 compare() 메소드를 오버라이딩 및 구현함으로써 객체의 정렬 기준을 정의할 수 있다.

보통 Comparable 구현을 우선으로 생각하고, 만약 그 조건을 충족하지 못 한 상태라면 그 다음에 Comparator 구현을 람다식으로 작성하는 순서로 개념을 생각하자.

스트림의 요소가 객체일 경우에는 객체가 Comparable 인터페이스를 구현하고 있어야만 sorted() 메소드를 사용 가능하다.

대부분의 sorted() 메소드는 오름차순으로 정렬하는 것이 기본이지만 만약 내림차순으로 정렬하고 싶으면 sorted(Comparator.reverseOrder()) 메소드를 사용하면 된다.

// list는 Comparable 인터페이스를 구현한 Student 인스턴스들의 리스트

list.stream()
        .sorted() // 오름차순 정렬
        .forEach(s -> System.out.println(s.getName() + " : " + s.getScore()));

System.out.println();

list.stream()
        .sorted(Comparator.reverseOrder()) // 내림차순 정렬
        // Comparator.reverseOrder() 메소드로 내림차순 재정렬 가능
        .forEach(s -> System.out.println(s.getName() + " : " + s.getScore()));

만약 요소 객체가 Comparable을 구현하지 않고 있으면, 비교자Comparator를 구현한 객체를 람다식으로 작성하면서 요소 정렬을 수행할 수 있다. 정확히는 sorted() 메소드의 매개값으로 람다식을 제공하게 된다.

// Comparable을 구현하지 않은 Student 클래스 인스턴스를 리스트화
list.stream()
        .sorted(Comparator.comparingInt(Student::getScore))
        // .sorted((s1, s2) -> Integer.compare(s1.getScore(), s2.getScore()))
        .forEach(s -> System.out.println(s.getName() + " : " + s.getScore()));

        System.out.println();

list.stream()
        .sorted((s1, s2) -> Integer.compare(s2.getScore(), s1.getScore()))
        .forEach(s -> System.out.println(s.getName() + " : " + s.getScore()));
        
// sorted() 메소드 내부에 Comparator 구현 비교자를 람다식화해서 매개값 제공

(4) 루핑 : 요소 하나씩 반복 처리

주의해야 할 점이 있다. 루핑의 작업 분류를 중간 처리로 집어넣기는 했지만, 루핑 관련 메소드는 중간 처리 메소드인 peek()과 최종 처리 메소드인 forEach()로 나뉘어진다. 즉, 루핑은 스트림 파이프라인에서 중간 스트림으로써의 처리를 맡을 수도 있고, 최종 처리를 맡을 수도 있는 것이다.

루핑은 간단하게 말해서, 요소를 이렇게 처리하고, 다음 요소도 이렇게 처리하고, 그다음 요소도 이렇게 처리하고...의 느낌이다.

peek() 메소드와 forEach() 메소드는 매개타입으로 Consumer 함수형 인터페이스를 받는다. 즉, Consumer 인터페이스를 구현한 람다식을 매개값으로 제공한다.

int[] ints = {1, 2, 3, 4, 5};

// peek() : 중간 처리 과정이라서 뒤에 또 이어줘야함
int totalValue = Arrays.stream(ints)
        .filter(e -> e%2==0)
        .peek(System.out::println) // 여기서 끝내면 오류 발생
        .sum();

System.out.println("총합 : " + totalValue);
System.out.println();

// forEach() : 최종 처리 과정이라서 얘가 끝이어도 무방
Arrays.stream(ints)
        .filter(e -> e%2==0)
        .forEach(System.out::println);

5) 최종 처리 : 매칭, 집계, 수집

(1) 매칭 : 특정 조건 만족 여부

특정 조건 만족 여부에서 감이 오지만, boolean 값을 리턴한다. 매칭과 관련된 메소드로는 allMatch(), anyMatch(), noneMatch() 메소드가 있으며, 매개값으로 주어진 Predicate 함수형 인터페이스(아까 filter() 메소드에서 봤던)를 구현한 람다식을 매개값으로 제공할 수 있다.

int[] ints = {1, 2, 3, 4, 5};

boolean result = Arrays.stream(ints)
        .allMatch(e -> e%2==0); // 스트림 소비, 변수는 재할당으로 재사용 가능
System.out.println("전부 짝수인가요 ? : " + result + "\n");

result = Arrays.stream(ints)
        .anyMatch(e -> e%3==0); // 새로운 스트림 뿅. 아까 위에서 사용된 변수 재할당
System.out.println("저들 중 하나라도 3의 배수인가요 ? : " + result + "\n");

result = Arrays.stream(ints)
        .noneMatch(e -> e%3==0);
System.out.println("쟤네들은 전부 3의 배수가 아니죠 ? : " + result);

(2) 집계

최종 처리에서 상당히 많은 부분을 차지하는 기능이다. 스트림이 제공하는 기본 집계부터, Optional 클래스로 디폴트값을 설정하는 집계reduce() 메소드를 기반으로 한 집계 커스터마이징이 있다. 어떻게 보면 일련되는 대량의 데이터를 하나로 축약하는 과정이어서 리덕션(Reduction)이라고도 칭한다.


스트림 기본 집계

count(), sum(), average(), max(), min() 등이 있다. 이들 중에서 리턴 타입이 원시 타입이 아닌 경우가 있는데, 이거는 Optional 클래스로 리턴하기 때문에 get(), getAsInt(), getAsDouble() 등으로 추가 최종 처리를 해줘야 한다.


Optional 클래스 디폴트 세팅

아까 위에서 리턴 타입이 원시 타입이 아닌 Optional 클래스 타입으로 리턴하는 경우가 있었다. 이 경우, 추가 최종 처리가 필요했는데 예를 들어서 빈 리스트에 대해 스트림 객체를 얻어 평균값을 얻으려고 하면 NoSuchElementException 예외가 발생한다. 이런 예외 처리를 위해서 Optional 클래스에서 제공하는 메소드(isPresent(), orElse(), ifPresent())를 활용해서 디폴트값을 설정하면 된다.

public class OptionalExample {
    public static void main(String[] args) {
        // Optional 클래스의 존재 의의
        // : 컬렉션의 동적 요소 추가로 인해 요소 부존재로 인한 NoSuchElementException 예외 막기

        List<Integer> list = new ArrayList<>();

//        double average = list.stream()
//                .mapToInt(Integer::intValue)
//                .average()
//                .getAsDouble();
//        System.out.println("평균(근데 아마 에러 발생할 걸?) : " + average);
//        // 이렇게 아무 것도 없는데 평균 구하려고 하면 에러 뙇 하고 발생

        // TODO : 방법 1. isPresent()로 값이 존재할 때만 나타내고 분기 가르기
        OptionalDouble optionalDouble = list.stream()
                .mapToInt(Integer::intValue)
                .average();

        if(optionalDouble.isPresent()) {
            System.out.println("isPresent()가 참, 즉 평균값이 존재하면 : " + optionalDouble);
        } else {
            System.out.println("평균값이 존재하지 않으면 : " + 0.0);
        } // 약간 야매식 디폴트값...


        // TODO : 방법 2. orELse()로 값이 존재하지 않을 경우의 디폴트 값을 정해두기
        double average = list.stream()
                .mapToInt(Integer::intValue)
                .average()
                .orElse(0.0);
        System.out.println("값이 없으면 디폴트 값으로 : " + average);


        // TODO : 방법 3. ifPresent()로 집계값 조건부 Consumer 람다식 작성
        list.stream()
                .mapToInt(Integer::intValue)
                .average()
                .ifPresent(e -> System.out.println("아마 나오진 않겠지만 : " + e));
    }
}

reduce() : 집계 커스터마이징

스트림이 제공하는 기본 집계 메소드 외에도 직접 집계 결과를 끌어낼 수 있는 reduce() 메소드에 매개값으로 BinaryOperator 함수형 인터페이스를 람다식으로 제공해서 작성할 수도 있다. BinaryOperator 함수형 인터페이스는 두 개의 매개값을 받아서 하나의 값을 리턴하는 apply() 메소드를 지니고 있어서 해당 메소드를 람다식으로 구현하면서 다양한 집계를 할 수 있다.

또한 reduce() 메소드의 매개값으로 람다식과 더불어 디폴트 값을 제공할 수 있어서 Optional 클래스에서의 디폴트 값 세팅 역시 가능하다.

// 스트림 기본 집계
int sum1 = studentList.stream()
        .mapToInt(Student::getScore)
        .sum();

// 커스텀 reudce()
int sum2 = studentList.stream()
        .mapToInt(Student::getScore)
        .reduce(0, Integer::sum); // 훨씬 더 간단하게
//      .reduce(0 /* 디폴트 값 : 0 */, (a,b) -> a+b);

(3) 수집

요소들을 필터링 또는 매핑한 요소들을 수집하는 최종 처리 메소드 collect() 메소드를 활용해서 필요 요소 수집, 그룹핑 후의 추가 집계가 가능하다


필터링 또는 매핑의 결과물 처리

collect() 메소드에는 Collector<T, A, R>를 매개값으로 받는데, 이를 활용해서 필터링 또는 매핑된 요소들을 새로운 컬렉션에 수집하고 리턴할 수 있다. T는 요소, A는 누적기, R은 컬렉션이며 T 요소를 A 누적기가 R에 저장한다는 메커니즘이다.

Collectors의 정적 메소드인 toList(), toSet(), toMap() 메소드를 활용해서 컬렉션으로 리턴받을 수 있다.

아래의 예시 코드로 이해할 것!

// 스트림 요소 객체 대상이 될 Student 클래스
public class Student {
    private String name;
    private String sex;
    private int score;

    public Student(String name, String sex, int score) {
        this.name = name;
        this.sex = sex;
        this.score = score;
    }

    public String getName() {
        return name;
    }

    public String getSex() {
        return sex;
    }

    public int getScore() {
        return score;
    }
}
public class CollectExample {
    public static void main(String[] args) {
        List<Student> totalList = new ArrayList<>();
        totalList.add(new Student("홍길동", "M", 92));
        totalList.add(new Student("김수영", "F", 87));
        totalList.add(new Student("감자칩", "M", 95));
        totalList.add(new Student("오해영", "F", 93));

        List<Student> maleList = totalList.stream()
                .filter(e -> e.getSex().equals("M"))
                .toList();

        maleList.forEach(e -> System.out.println(e.getName()));

        System.out.println();

        Map<String, Integer> map = totalList.stream()
                .collect(
                        Collectors.toMap(
//                                e -> e.getName(),
//                                e -> e.getScore()
                                Student::getName, // Student 객체에서 키가 될 부분
                                Student::getScore // Student 객체에서 값이 될 부분
                        )
                );

        System.out.println(map);
    }
}

요소 그룹핑 : 컬렉션의 요소를 그룹화해서 Map 객체 생성

collect() 메소드에 매개값으로 Collectors의 정적 메소드 중에 groupingBy()가 있는데, 해당 메소드의 매개값 람다식의 기준으로 그룹핑시키게 된다.

Map<String, List<Student>> map = totalList.stream()
							.collect(
                            	// 그룸핑 키 리턴
                            	Collectors.groupingBy(s -> s.getSex())
                            );

그룹핑 이후에 추가 작업을 할 수 있도록 groupingBy() 메소드에 두 번째 매개값인 Collector를 부여할 수 있다. 즉, 그룹핑 해서 단순히 그 기준 키에 상응하는 인스턴스들로 구별 분류하는 것을 넘어서 그룹핑 기준에 해당하는 각각의 개수, 평균, 최대값, 최소값, 커스텀 집계 값 등으로 값을 부여하는 것이 가능하다.

Map<String, List<Student>> map = totalList.stream()
							.collect(
                            	Collectors.groupingBy(
                                	// 그룹핑 키
                                    s -> s.getSex(),
                                    // 그룹핑 키 기준으로 상응하는 값(여기서는 각 점수의 평균)
                                    Collectors.averagingDouble(s -> s.getScore())
                                )
                            );

5) 병렬 스트림

스트림의 특징(너무 오래된 것 같은 개념...) 중에 하나가 병렬 처리가 가능하다는 점이었다. 멀티 코어 CPU 환경에서 전체 요소를 분할해서 각각의 코어가 병렬적으로 처리하게 된다.

(1) 동시성과 병렬성, 포크조인 프레임워크

멀티 스레드는 멀티 작업을 위해 멀티 스레드가 하나의 코어에서 번갈아가며 실행하는 동시성 또는 멀티 작업을 위해서 멀티 코어를 각각 이용해서 병렬로 실행하는 병렬성으로 실된다.

동시성은 한 시점에 하나의 작업이 처리되는 것이며, 너무 빠르게 번갈아서 처리하다보니 동시에 처리되는 것처럼 보일 뿐이다. 병렬성은 반대로 한 시점에 여러 작업을 병렬로 실행하게 된다. 병렬성에는 전체 데이터를 분할해서 서브 데이터셋으로 만들어서 작업을 진행하는 데이터 병렬성서로 다른 작업을 병렬적으로 처리하는 작업 병렬성이 있다.

자바의 병렬 스트림은 포크조인 프레임워크으로 포크 단계에서 전체 요소들을 서브 요소셋으로 분할(분할 정복 알고리즘)하고 멀티 코어에서 병렬적으로 처리시킨 후, 조인 단계에서 서브 결과를 결합해서 최종 결과를 만들어낸다. 여담으로 포크조인 프레임워크은 병렬 처리를 위해서 스레드풀을 사용하며, ExcutorService를 구현한 ForkJoinPool을 활용해서 작업 스레드를 관리하게 된다.

(2) 병렬 스트림 메소드

위의 내용은 개념적인 내용이었고 결국 이를 써먹으려면 ListSet으로부터 병렬 스트림을 바로 얻는 parallelStream() 메소드 혹은 기존 스트림을 병렬 처리 스트림으로 변환시키는 parallel() 메소드를 사용해서 얻게 된다.

public class ParallelExample {
    public static void main(String[] args) {
        Random random = new Random();

        List<Integer> scores = new ArrayList<>();
        for(int i=0; i<100_000_000; i++) {
            scores.add(random.nextInt(101));
        }

        double average = 0.0;
        long startTime = 0;
        long endTime = 0;
        long time = 0;
        long parallelTime = 0;

        Stream<Integer> stream = scores.stream();
        startTime = System.nanoTime();
        average = stream
                .mapToInt(Integer::intValue)
                .average()
                .getAsDouble();
        endTime = System.nanoTime();
        time = endTime - startTime;
        System.out.println("평균 : " + average + "\n" + "일반 스트림 처리 걸린 시간 : " + time);

        System.out.println();

        Stream<Integer> parallelStream = scores.parallelStream();
        startTime = System.nanoTime();
        average = parallelStream
                .mapToInt(Integer::intValue)
                .average()
                .getAsDouble();
        endTime = System.nanoTime();
        parallelTime = endTime - startTime;
        System.out.println("평균 : " + average + "\n" + "병렬 스트림 처리 걸린 시간 : " + parallelTime);

        System.out.println();

        if (time == parallelTime) {
            System.out.println("동일하게 걸림");
        } else {
            System.out.println(
                    parallelTime < time ? "병렬 스트림 처리가 더 빠름" : "일반 스트림 처리가 더 빠름"
            );
        }
    }
}

병렬 처리 성능은 결국 요소의 수가 적고, 요소당 처리 시간이 짧을 수록, 스트림 소스가 인덱스로 요소를 관리하는 경우(ArrayList, 배열)일 수록, 코어의 수가 많을 수록 좋다. 병렬 처리는 포크 및 조인 단계가 있고 스레드 풀을 생성하는 추가적인 비용이 발생하며, 인덱스로 관리하지 않는 HashSet, TreeSet이나 LinkedList 등은 요소 분리가 쉽지 않아 처리 속도가 느려서 효율이 좋지 않다.

다만, 무조건 병렬 스트림이 최고다라고는 말하기 어려운 게, 스레드 수가 증가할 수록 동시성이 증가하기 때문에 오히려 느려질 가능성이 있어서 코어의 수가 적을 때는 일반 스트림이 더 빠를 수도 있다.

profile
scientia est potentia / 벨로그 이사 예정...

0개의 댓글