[Java] 스트림(Stream)

송재호·2021년 4월 6일
1

Stream이 뭐지?

Stream은 Java 8(1.8)부터 추가된 컬렉션(배열 포함)의 저장 요소를 하나씩 참조해서 람다식으로 처리할 수 있도록 해주는 반복자다.

여러 컬렉션 타입과 마찬가지로 java.util 패키지에 뿌리를 두는데,
Stream은 java.util.stream 패키지에 정의되어있다.

다음은 컬렉션으로부터 stream() 메소드를 사용하여 Stream을 얻어와 사용하는 예시이다.

List<String> list = Arrays.asList("가", "나", "다");
Stream<String> stream = list.stream();
stream.forEach(name -> System.out.println(name));

예시에서 확인할 수 있듯 Stream은 데이터소스를 추상화하고,
데이터를 다루는데 자주 사용되는 메소드들을 정의해 놓았다.

따라서 데이터 소스가 무엇이든 동일한 방식으로 처리할 수 있고,
이는 코드의 재사용성을 높이는 데 일조한다.

Stream은 데이터 소스에 직접적인 변경을 가하지 않고, 다만 읽기만 할 수 있다.
Iterator와 마찬가지로 Stream은 일회용이며, 한 번 사용하면 닫혀서 다시 사용할 수 없다.

Stream의 특징

Stream은 Iterator와 비슷한 역할을 하는 반복자지만 다음과 같은 차이점을 가진다.

Stream은 람다식으로 요소 처리 코드를 제공한다.

Stream이 제공하는 대부분의 요소 처리 메소드는 함수적 인터페이스 매개 타입을 가지기 때문에 람다식 또는 메소드 참조를 이용해서 요소 처리 내용을 매개값으로 전달할 수 있다.

다음은 메소드 참조를 이용해서 요소 처리 내용을 매개값으로 전달하는 예시이다.
(람다식의 예제는 위의 예시를 참고한다.)

List<String> list = Arrays.asList("가", "나", "다");
Stream<String> stream = list.stream();
stream.forEach(System.out::println);

내부 반복자를 사용하므로 병렬 처리가 쉽다.

외부 반복자란 개발자가 코드로 직접 컬렉션의 요소를 반복해서 가져오는 코드 패턴을 말한다.
index를 이용하는 for문, iterator를 이용하는 while문이 이에 외부 반복자에 해당한다.

반면에 내부 반복자는 컬렉션 내부에서 요소들을 반복시키고,
개발자는 요소당 처리해야 할 코드만 제공하는 코드 패턴을 말한다.

컬렉션 내부에서 요소를 반복시키는 방법에 대해서는 컬렉션에게 맡겨두고,
개발자는 요소 처리 코드에만 집중할 수 있게 되는 것이다.

또한 내부 반복자는 요소들의 반복순서를 변경하거나, 멀티 코어 CPU를 최대한 활용하기 위해 요소들을 분배시켜 병렬 작업을 할 수 있게 도와주기 때문에 효율적으로 요소를 반복시킬 수 있다.

병렬 처리 스트림을 이용하면 런타임 시 하나의 작업을 서브작업으로 자동으로 나누고, 서브 작업의 결과를 자동으로 결합해서 최종 결과물을 생성해낸다.

public static void main(String[] args) {
	List<String> names = Arrays.asList("김", "나", "박", "이");

	// 순차 처리
	Stream<String> stream1 = names.stream();
	stream1.forEach(s -> {
		System.out.println(Thread.currentThread().getName() + "쓰레드, 이름 : " + s);
	});

	System.out.println("=============================================");
	
    	// 병렬 처리
	Stream<String> stream2 = names.parallelStream();
	stream2.forEach(s -> {
		System.out.println(Thread.currentThread().getName() + "쓰레드, 이름 : " + s);
	});
}

콘솔 출력 결과는 다음과 같다.

main쓰레드, 이름 : 김
main쓰레드, 이름 : 나
main쓰레드, 이름 : 박
main쓰레드, 이름 : 이
=============================================
main쓰레드, 이름 : 박
main쓰레드, 이름 : 이
ForkJoinPool.commonPool-worker-5쓰레드, 이름 : 김
ForkJoinPool.commonPool-worker-3쓰레드, 이름 : 나

병렬 처리 스트림 (list.parallelStream())을 사용하면 main쓰레드를 포함해서 ForkJoinPool(스레드풀)의 작업 스레드들이 병렬적으로 요소를 처리하는 것을 확인할 수 있다.

parallelStream을 사용할 때 유의해야 할 점은 parallelStream 별로 스레드풀을 만드는게 아니라는 점이다. 별도의 설정이 없다면 하나의 스레드 풀을 모든 parallelStream이 공유하게된다.
그래서 parallelStream 으로 blocking io 가 발생하는 작업을 하게되면 스레드풀 내부의 스레드들은 block 되게된다. 그러므로 꼭 병렬로 처리되어야 한다면 스레드 풀을 default 로 사용하지않고 parallelStream 마다 각각 커스텀하게 지정해주면 된다.

ForkJoinPool pool = new ForkJoinPool(4); 
long sum = pool.submit(() -> LongStream.range(0, 1_000_000_000).parallel() .sum()).get();

(출처: https://multifrontgarden.tistory.com/254 [우리집앞마당])

중간 처리와 최종 처리 작업을 수행할 수 있다.

스트림은 컬렉션의 요소에 대해 중간 처리와 최종 처리를 수행할 수 있는데, 중간 처리에서는 매핑,필터링,정렬을 수행한고 최종 처리에서는 반복,카운팅,평균,총합 등의 집계 처리를 수행한다.

다음은 List에 저장되어 있는 Student객체를 중간 처리에서 score필드값으로 매핑하고, 최종 처리에서 score의 평균값을 산출하는 예시이다.

public class Student {
	private String name;
	private int score;
    
	// 생성자 + getter,setter ....
}

Student 클래스를 만들어 주고, 아래와 같이 stream을 사용해서 순회한다.

List<Student> studentList = Arrays.asList(
		new Student("김김김", 10),
		new Student("나나나", 20),
		new Student("박박박", 30)
);
		
double avg = studentList.stream()
		.mapToInt(Student::getScore) // 중간 처리 (학생 객체를 점수로 매핑)
		.average() // 최종 처리 (평균 점수)
		.getAsDouble();
		
System.out.println("평균 점수 : " + avg); // 평균 점수 : 20.0

Stream의 종류

java.util.stream 패키지에는 BaseStream과 이를 부모로 삼는 자식 인터페이스인
Stream, IntStream, LongStream, DoubleStream이 존재한다.

BaseStream 인터페이스에는 공통 메소드들이 정의되어 있을 뿐 직접 사용하지 않는다.
Stream은 객체 요소를 처리하는 스트림이고,
IntStream, LongStream, DoubleStream은 각각 기본 타입인 int, long, double요소를 처리하는 스트림이다.

스트림 인터페이스의 구현 객체는 다양한 소스로부터 얻을 수 있다.
주로 컬렉션과 배열에서 얻지만, 다음과 같은 소스로부터 얻을 수도 있다.

  • java.util.Collections.stream() || .parallelStream()
  • Arrays.stream(T[]), Stream.of(T[])
  • IntStream.range(int, int) || .rangeClosed(int, int)
  • LongStream.rage(long, long) || .rangeClosed(long, long)
  • Files.lines(Path, Charset)
  • BufferedReader.lines()

Stream 파이프라인

스트림은 데이터의 필터링, 매핑, 정렬, 그룹핑 등의 중간 처리와 합계, 평균, 카운팅, 최대값, 최소값 등의 최종 처리를 파이프라인으로 해결한다. 파이프라인은 여러 개의 스트림이 연결되어있는 구조를 말한다. 파이프라인에서 최종 처리를 제외하고는 모두 중간 처리 스트림이다.

중간 스트림이 생성될 때 요소들이 바로 중간 처리(필터링, 매핑, 정렬)되는 것이 아니라 최종 처리가 시작되기 전까지 중간 처리는 지연(lazy)된다. 최종 처리가 시작되면 비로소 컬렉션의 요소가 하나씩 중간 스트림에서 처리되고 최종 처리까지 오게된다.

Stream 인터페이스에는 필터링, 매핑, 정렬 등의 많은 중간 처리 메소드가 있는데, 이 메소드를은 중간 처리된 스트림을 리턴한다. 그리고 이 스트림에서 다시 중간 처리 메소드를 호출해서 파이프라인을 형성하게 되는 것이다.

List<Student> studentList = Arrays.asList(
	new Student("김김김", 10),
   	new Student("김최최", 50),
   	new Student("나나나", 20),
   	new Student("박박박", 30),
   	new Student("박김김", 90)
);
		
double avg = studentList.stream() // 오리지날 스트림
	.filter(s -> s.getName().contains("김")) // 중간처리스트림1 : 김 들어가는 사람만 뽑음
   	.mapToInt(Student::getScore) // 중간처리스트림2 : 점수로만 묶어냄
   	.average() // 최종 처리
   	.getAsDouble();
		
System.out.println("평균 점수 : " + avg); // 평균 점수 : 50.0

스트림 파이프라인에서 중간 처리를 하는 메소드와 최종 처리를 하는 메소드의 종류는 API문서를 참고하자 : https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html

필터링(distinct(), filter())

필터링은 중간 처리 기능으로 요소를 걸러낸다. 필터렝 메소드인 distinct()와 filter()는
모든 스트림이 가지고 있는 공통 메소드다.

  • distinct() : 중복제거. (Int||Double||Long)Stream의 경우 값 비교,
    Stream인 경우 Object.equals(Object) 결과로 판단한다.

  • filter() : 매개값으로 주어진 Predicate가 true를 리턴하는 요소만 필터링.
    Predicate는 함수적 인터페이스로, public boolean test(Integer t) {}를 구현해야 하며 람다로 사용 가능

매핑(flatMapXXX(), mapXXX(), asXXXStream(), boxed())

매핑은 중간 처리 기능으로 스트림의 요소를 다른 요소로 대체하는 작업을 말한다.

  • flatMapXXX() : 요소를 대체하는 복수 개의 요소들로 구성된 새로운 스트림을 리턴.
public static void streamMethod4() {
	List<String> list = Arrays.asList("가 나", "다 라");
		
	list.stream()
		.flatMap(str -> Stream.of(str.split(" ")))
		.forEach(System.out::print);
}

console : 가나다라
public static void streamMethod5() {
	List<String> list = Arrays.asList("1, 2, 3", "4, 5, 6");
		
	list.stream()
		.flatMapToInt(str -> {
		String [] arr = str.split(",");
			int[] intArr = new int[arr.length];
			
			for (int i=0; i<arr.length; i++) {
				intArr[i] = Integer.parseInt(arr[i].trim());
			}
			return Arrays.stream(intArr);
		})
		.forEach(num -> {
			System.out.print(num + " ");
		});
	}

console : 1 2 3 4 5 6 
  • mapXXX() : 요소를 대체하는 요소로 구성된 새로운 스트림을 리턴.
    위에서 이미 예시로 들었던 mapToInt로 예시 대체

  • asXXXStream() : 타입을 변환해서 그 타입의 스트림을 생성하여 리턴.

  • boxed() : int, long, double요소를 Integer, Long, Double과 같은 wrapper로 박싱해서 Stream을 생성하여 리턴

public static void streamMethod6() {
	int[] intArr = {1, 2, 3, 4 ,5};
		
	IntStream intStream = Arrays.stream(intArr);
	intStream
		.asDoubleStream()
		.forEach(d -> System.out.print(d + " "));
		
	intStream = Arrays.stream(intArr);
	intStream
		.boxed()
		.forEach(obj -> System.out.print(obj.intValue() + " "));
			
}

console :
1 2 3 4 5 6 
1.0 2.0 3.0 4.0 5.0 1 2 3 4 5 

정렬 (sorted())

  • sorted([Comparator<T>]) : 기준에 따라 정렬된 스트림을 리턴
    Stream : 매개변수 없는 경우 Comparable 구현 방법에 따라 정렬,
    매개변수 Comparator가 있는 경우 주어진 Comparator에 따라 정렬
    DoubleStream, IntStream, LongStream : 매개변수 없이 sorted() 가능하며 default는 오름차순
    Comparator를 매개변수로 받기 때문에 당연히 Comparator.reverseOrder() 혹은 Comparator.naturalOrder() 가 사용 가능하다.
	public static void streamMethod7() {
		List<Student> studentList = Arrays.asList(
				new Student("김김김", 10),
				new Student("김최최", 50),
				new Student("나나나", 20),
				new Student("박박박", 30),
				new Student("박김김", 90)
		);
		
		studentList.stream()
			.sorted()
			.forEach(s -> System.out.print(s.getScore() + " , "));
		
		System.out.println();
		
		studentList.stream()
			.sorted(Comparator.reverseOrder())
			.forEach(s -> System.out.print(s.getScore() + " , "));
	}

console :
10 , 20 , 30 , 50 , 90 , 
90 , 50 , 30 , 20 , 10 , 

루핑(peek(), forEach())

루핑(looping)은 요소 전체를 반복하는 것을 말한다.

  • peek() : 중간 처리 메소드로, 전체 요소를 루핑하면서 추가적인 작업을 하기 위해 사용한다.
    최종 처리 메소드가 실행되지 않으면 지연(lazy)되기 때문에 반드시 최종 처리 메소드가 호출되어야 동작한다.
  • forEach() : 최종 처리 메소드로, 파이프라인 마지막에 루핑하면서 요소를 하나씩 처리한다.
    최종 처리 메소드이므로 이후에 sum()과 같은 다른 최종 메소드를 호출하면 안된다.
public static void streamMethod8() {
	IntStream intStream = Arrays.stream(new int[] {1, 2, 3, 4, 5});
	int evenSum = intStream
		.filter(i -> i%2==0)
		.peek(i -> System.out.print(i + " "))
		.sum();
	System.out.println("의 합은 : " + evenSum);
		
	intStream = Arrays.stream(new int[] {1, 2, 3, 4, 5});
	intStream
		.filter(i -> i%2!=0)
		.forEach(System.out::println);
}

console :
2 4 의 합은 : 6
1
3
5

매칭(allMatch(), anyMatch(), noneMatch())

스트림 클래스는 최종 처리 단계에서 요소들이 특정 조건에 만족하는지 조사할 수 있도록
다음과 같은 세 가지의 매칭 메소드들을 제공하고 있다.
당연히 리턴 값의 타입은 boolean이다.

  • allMatch() : 모든 요소들이 매개값으로 주어진 Predicate의 조건을 만족하는가?
  • anyMatch() : 최소 한 개의 요소가 매개값으로 주어진 Predicate의 조건을 만족하는가?
  • noneMatch() : 모든 요소들이 매개값으로 주어진 Predicate의 조건을 만족하지 않는가?
public static void streamMethod9() {
	int[] intArr = {2, 4, 6};
	
	boolean result = Arrays.stream(intArr)
			.allMatch(i -> i%2 == 0);
            
	System.out.println("allMatch : " + result);
	
	result = Arrays.stream(intArr)
			.anyMatch(i -> i%3 == 0);
            
	System.out.println("anyMatch : " + result);
	
	result = Arrays.stream(intArr)
			.noneMatch(i -> i%3 == 0);
            
	System.out.println("noneMatch : " + result);
}

console :
allMatch : true
anyMatch : true
noneMatch : false

기본 집계(sum(), count(), average(), max(), min())

집계는 최종 처리 기능으로 요소들을 처리해서 카운팅, 합계, 평균값, 최대값, 최소값등과 같이 하나의 값으로 산출하는 것을 말한다.
집계는 대량의 데이터를 가공해서 축소하는 리덕션이라고 볼 수 있다.

  • sum() : 요소 총합
  • average() : 요소 평균
  • count() : 요소 개수
  • findFirst() : 첫 번째 요소
  • max() : 최대 요소
    최대 요소는 max(Comparator<T>)를 받아 기준에 따라 제공 가능하다.
  • min() : 최소 요소
    최소 요소는 min(Comparator<T>)를 받아 기준에 따라 제공 가능하다.
public static void streamMethod10() {
	List<Student> studentList = Arrays.asList(
			new Student("김김김", 10),
			new Student("김최최", 50),
			new Student("나나나", 20),
			new Student("박박박", 30),
			new Student("박김김", 90)
	);
	
	Optional<Student> max = studentList.stream()
		.max((o1, o2) -> Integer.compare(o1.getScore(), o2.getScore()));
	
	System.out.println(max.get().getName());
	
	int[] intArr = {2, 4, 6};
	IntStream intStream = Arrays.stream(intArr);
	
	OptionalInt max2 = intStream.max();
		
	System.out.println(max2.getAsInt());
}

console :
박김김
6

findFirst()와 min(), 그리고 max()는 Optional<T> 혹은 OptionalXXX을 리턴하므로 Optional 클래스를 다루는 법을 알아야한다.

// 1. isPresent() 이용
OptionalDouble optional = list.stream()
	.mapToInt(Integer::intValue)
	.average();
if (optional.isPresend()) {
	...
}

// 2. orElse() 이용
double avg = list.stream()
	.mapToInt(Integer::intValue)
   	.average()
   	.orElse(0.0);
    
// 3. ifPresent() 이용
list.stream()
	.mapToInt(Integer::intValue)
   	.average()
   	.ifPresent(i -> System.out.println("평균 : " + i));

커스텀 집계 (reduce())

  • reduce() : 말 그대로 기본 집계를 대신 만들어 쓸 수 있는 커스텀 집계다.
    Stream 인터페이스 종류에 따라 매개 타입으로 XXXOperator를 받고, 리턴 타입으로 OptionalXXX를 가지는 reduce() 메소드가 존재한다.
    디폴트 값도 함께 전달할 수 있는 reduce(XXX identity, XXXOperator op) 가 오버로딩 되어 있는데, 디폴트 값이 존재하기 때문에 딱히 Optional을 리턴할 필요 없어 이 경우에는 원시 타입 혹은 제네릭 T 타입이 그대로 리턴된다.

아래 예시는
reduce(BinaryOperator<Integer> accumulator)
reduce(Integer identity, BinaryOperator<Integer> accumulator)
사용하여 각각 sum을 구하는 예시이다.

identity(디폴트값) 없는 경우 Optional<Integer>를 리턴한다고 했다.
이는 즉 값이 없으면 .get() 을 할 때 NoSuchElementException이 터질 수 있음을 의미한다.

public static void streamMethod11() {
	List<Student> studentList = Arrays.asList(
			new Student("김김김", 10),
			new Student("김최최", 50),
			new Student("나나나", 20),
			new Student("박박박", 30),
			new Student("박김김", 90)
	);
	
	// 둘은 하는 일이 같지만, 리스트가 빈 경우 sum1은 구하다가 NoSuchElementException이 터진다.
	
	int sum1 = studentList.stream()
			.map(Student::getScore)
			.reduce((a, b) -> a + b)
			.get();
	
	int sum2 = studentList.stream()
			.map(Student::getScore)
			.reduce(0, (a, b) -> a + b);
			
	
	System.out.println("sum1 : " + sum1);
	System.out.println("sum2 : " + sum2);
}

console :
sum1 : 200
sum2 : 200

수집 (collect())

  • collect() : 요소들을 필터링 또는 매핑한 후 요소들을 수집하는 최종 처리 메소드
    이 메소드를 이용하면 필요한 요소만 컬렉션으로 담을 수 있고, 요소들을 그룹핑한 후 집계(리덕션)할 수 있다.

Stream.collect(Collector<T, A, R> collector) 와 같이 Collector를 매개변수로 받는다.
Collector의 타입 파라미터 T는 요소, A는 누적기, R은 요소가 저장될 컬렉션이다.

이러한 Collector의 구현 객체는 다음과 같이 Collectors 클래스
다양한 정적 메소드를 이용해서 얻을 수 있다.

  1. toList()
  2. toSet()
  3. toCollection(Supplier<Collection<T>>)
  4. toMap(Function<T,K> keyMapper, Function<T,K> valueMapper)
  5. toConcurrentMap(Function<T,K> keyMapper, Function<T,K> valueMapper)
public static void streamMethod12() {
	List<Student> studentList = Arrays.asList(
			new Student("김김김", 10, Student.Sex.MALE),
			new Student("김최최", 50, Student.Sex.MALE),
			new Student("나나나", 20, Student.Sex.FEMALE),
			new Student("박박박", 30, Student.Sex.FEMALE),
			new Student("박김김", 90, Student.Sex.MALE)
	);
	
	// 남학생들만 묶어 List 생성
	List<Student> maleList = studentList.stream()
			.filter(s -> s.getSex() == Student.Sex.MALE)
			.collect(Collectors.toList());
	
	maleList.stream().forEach(s -> System.out.println(s.getName()));
	
	System.out.println();
	
	// 여학생들만 묶어 Set 생성
	Set<Student> femaleSet = studentList.stream()
			.filter(s -> s.getSex() == Student.Sex.FEMALE)
			.collect(Collectors.toCollection(HashSet::new));
	
	femaleSet.stream().forEach(s -> System.out.println(s.getName()));
}

console : 
김김김
김최최
박김김

박박박
나나나

collect()메소드는 단순히 요소를 수집하는 기능 이외에 컬렉션의 요소들을 그룹핑해서 Map객체를 생성하는 기능도 제공한다. collect()를 호출할 때 Collectors의 groupingBy() 또는 groupingByConcurrent()가 리턴하는 Collector를 매개값으로 대입하면 된다.

다음은 요소를 그룹핑해서 수집하는 예제다.

1.
Map<Student.Sex, List<Student>> mapBySex = totalList.stream()
	.collect(Collectors.groupingBy(Student::getSex));
    
2.
Map<Student.City, List<String>> mapByCity = totalList.stream()
	.collect(Collectors.groupingBy(
    		Student::getCity,
        	Collectors.mapping(Student::getName, Collectors.toList())
   	)

** 참고 도서 : 신용권. 이것이 자바다:신용권의 Java 프로그래밍 정복 (한빛미디어), 2015.

profile
식지 않는 감자

0개의 댓글