[0620] Stream API / Future(못함)

ㅇㅇㅈ·2025년 6월 20일

Stream API

데이터 컬렉션 (List, Set, Map)을 함수형 방식으로 처리하는 도구

예전에는 for문으로 리스트를 돌렸다면 이제는 stream()을 붙이는 것으로 데이터 흐름-가공을 한다고 한다!!

List<String> result = names.stream()  // names에서
    .filter(n -> n.startsWith("A"))   // 이름이 A로 시작하는 것만 골라서
    .map(String::toUpperCase)         // 전부 대문자로 바꾸고
    .collect(Collectors.toList());    // 결과를 리스트로 모아라
  • for-loop 없이도 간결하게 데이터 변환/필터링/매핑이 가능하다!
  • for문 반복이 줄어든다!
  • 불변성 유지: 원본 데이터를 변경하지 않고 새로운 스트림을 생성하여 처리
  • parallelStream()을 활용하면 멀티코어 CPU에서 병렬 처리 가능

 

Stream = for문 + 조건 + 변환 + 결과 모으기

 

깜자 이거 완전 밥도둑이 따로 없다. 생긴 건 공포 그 자체지만..

 


1) 선언형 프로그래밍 지원

  • for-loop 없이 stream()으로 데이터를 처리
    • for문 : 어떻게 처리할 지 하나씩 적음
      stream() : 무엇을 할지 선언만 한다.
// for문
for(String s : list) { ... }

// stream
list.stream().filter(...).map(...).collect(...)

2) 중간 연산 (intermediate operations) 지원

  • filter(). map(), sorted() 등 여러 단계로 데이터 변환(손질/가공/선별)이 가능하다.
    • 필터링 → 변환 → 정렬
      여러 가공을 체인처럼 연결하여 처리한다.
list.stream()
    .filter(x -> x > 10)  // 10보다 큰 것만 남김
    .map(x -> x * 2)      // 2배로 변환
    .sorted()             // 정렬

3) 최종 연산 (terminal operations) 지원

  • 최종적으로 데이터가 어떤 형태가 될지
  • collect(), count(), forEach() 등으로 데이터를 최종 처리 가능
List<String> names = List.of("Tom", "Amy", "Bob");

names.stream()
    .filter(n -> n.startsWith("A"))
    .map(String::toUpperCase); // ← 여기까지는 그냥 “계획”만 함!
    .collect(Collectors.toList()); // ← “최종연산”! 진짜로 실행!

4) 병렬 처리 (parallelStream) 가능

  • 내부적으로 ForJoinPool을 사용하여 멀티 코어 활용
    • 데이터가 너무 많거나 시간이 오래 걸리면
      .prallelStream()으로 여러 코어가 동시에 처리
list.parallelStream()
    .filter(...)
    .map(...)
    .collect(...)

단, 병렬 처리가 반드시 빠른 것은 아니다.

5) Lazy Evolution (지연 연산)

  • stream()은 연산을 정의만 하고, 실행은 최종 연산 시 수행됨.
    • 중간연산을 아무리 많이 나열해도
      최종연산(collect, forEach 등) 없으면 아무 일도 안 일어남
    • 실질적인 데이터 처리는 최종 연산 시 수행됨
list.stream().filter(...).map(...); // 아무 일도 안 함!
list.stream().filter(...).map(...).collect(...); // 그제서야 실행!

Stream API 활용 예제

리스트 데이터를 필터링하고 가공하는 예제

List<String>filtered = names.stream()
						.filter(each -> each.startsWith("A"))
                        .map(String::toUpperCase)
                        .sorted()
                        .collect(Collectors.toList());

 

 


병렬 스트림

  • parallelStream()을 사용하면 내부적으로 멀티코어 CPU를 활용하여 데이터를 병렬로 처리
  • 내부적으로 ForkJoinPool을 사용하여 작업을 나누어 병렬로 실행

 

  • 데이터가 너무 많거나
    시간이 너무 오래 걸리는 복잡한 작업의 경우
  • 하나의 CPU가 일하면 느림
  • 여러 CPU가 나눠서 하면 빠르다!
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

numbers.stream().forEach(each -> 
    System.out.println(Thread.currentThread().getName() + ": " + each)
);
// 순차 스트림 : main 스레드가 차례대로 진행

numbers.parallelStream().forEach(each -> 
    System.out.println(Thread.currentThread().getName() + ": " + each)
);
// 병렬 스트림 : main과 4개의 ForkJoinPool 스레드들이 함께 처리
// → 다만 동시에 처리하기에 순서는 뒤죽박죽이다.

Fork-Join 알고리즘

깜자

참 알고리즘 관련 이야기가 많이 나오는 것 같다.

내가 일단 열심히 들어봤는데, 대충 이진탐색의 원리라는 것 같다.


병렬 스트림과 ForkJoinPool의 차이점

병렬 스트림 ParallelStream()

  • List.parallelStream() 사용
    → 리스트(혹은 Set, Map 등의 컬렉션)에 바로 붙여서 사용
  • 자동 분할
  • 컬렉션 대상 (List, Set, Map)
  • 메소드 체이닝 가능
    → .map().filter().collect() 이런 식으로 연달아(체이닝) 붙일 수 있음
  • 데이터 변환, 필터링, 집계 연산에 적합

한 줄로 쉽게 / 알아서 데이터 쪼개고 / 여러 CPU로 처리

ForkJoinPool

  • ForkJoinTask 또는 RecursiveTask 사용
    → 직접 “작업 단위”를 만들어서 나눔(분할-정복 패턴)
  • 개발자가 직접 작업을 분할 (fork(), join())
  • 대량 데이터 처리, 재귀적 연산에 적합
    → (예: 병렬로 퀵소트, 피보나치 계산 등 복잡한 알고리즘,
    단순 반복이 아니라 “계속 쪼개야 하는” 문제)

진짜 복잡하고 커스텀한 병령 처리가 필요할 때
(내가 일일히 어떻게 쪼개고 합칠지 정해야함)

 

솔직히: 이거왜쓰지?

  • 병렬 처리
  • 여러 개의 CPU 동시에 사용
  • 사용자가 직접 길게 작성(ForkJoinPool)
    은 당장 어제도 배웠는데
상황추천 방식이유
데이터 가공/집계, 변환, 단순 반복parallelStream1줄로 쉽고, 내부 병렬처리 자동
작업 분배/예외 처리/커스텀 동작 등직접 멀티스레드세밀한 제어 필요
여러 스레드가 동시에 Map을 써야 할 때ConcurrentHashMap동시성 안전

 
뭐지? 진짜 모르겠다

오늘 한 예제코드 확인하고 와야할 것 같다

예제코드

import java.util.List;
import java.util.stream.LongStream;
// 리스트와 스트림을 불러왔다.

public class StreamPerformanceComparison {
    public static void main(String[] args) {
        List<Long> numbers = LongStream.rangeClosed(1, 100_000_000L)
        // 1부터 1억까지의 숫자를
                .boxed()
                // long 기본형에서 Long 객체로 바꿨다.
                //  list애는 숫자(primitive)가 들어가지 않고 객체가 들어가기 때문. primitive(long)
                .toList();
                // 그리고 나서 Long 객체들을 리스트로 변환했다. 
                

        long start = System.currentTimeMillis();
        // 시간 측정용
        long sumSequential = numbers.stream()
                .filter(n -> n % 2 == 0)
                // 람다: 짝수만 필터링
                .mapToLong(Long::longValue)
                // 람다: primitive로 다시 변환
                .sum();
                // primitive타입에 대한 작업
        long end = System.currentTimeMillis();
        // 작업 끝난 시점의 시간을 저장
        System.out.println("sequential: " + sumSequential + ", time: " + (end - start) + "ms");
        // 결과와 시간 출력
        //여기까지가 stream()

        start = System.currentTimeMillis();
        long sumParallel = numbers.parallelStream()
        //병렬 스트림 시작
                .filter(n -> n % 2 == 0)
                .mapToLong(Long::longValue)
                .sum();
        end = System.currentTimeMillis();
        System.out.println("parallel: " + sumParallel + ", time: " + (end - start) + "ms");
    }
}

 
그니까 이 코드는,

스트림에 primitive 값을 넣음
값을 객체화
리스트에 저장

StreamAPI에서 람다 연산(필터링)
List<Long>의 값을 primitive 타입화
sum() 작업

으로 이루어져 있다.

잘 이해가 안된다.

List<Long>을 위해 long을 Long 객체로 바꾸었다는 것은 알겠다.
람다 연산(.filter)에서 사용해야 하니까.

그런데 왜 이후의 .sum()을 위해 long(primitive 타입)으로 바꾸는 건가?

이미 리스트에 들어가 있는 객체들을 잠시 long 값으로 전환한 건가?

[1, 2, 3, 4, 5, ... 100_000_000]   ← (primitive long 값들)
          │
      .boxed()
          ↓
[Long, Long, Long, ...]            ← (Long 객체가 담긴 리스트)
          │
      .stream()
          ↓
     .filter(n -> n % 2 == 0)      ← (람다식! 짝수만 뽑음)
          ↓
   [Long, Long, Long, ...]         ← (여전히 Long 객체)
          │
    .mapToLong(Long::longValue)
          ↓
[long, long, long, ...]            ← (primitive long 스트림, 리스트에는 안 들어감!)
          │
        .sum()
          ↓
     (최종 합계값)
깜자

잠깐 쉬고나서 Future 공부를 하겠다.
일단 노트 필기만 해두고 시간을 들여가면서 정독해야할 것 같다.

0개의 댓글