개발자는 기록이 답이다

스트림과 병렬처리(4) - 수집, 병렬처리 본문

언어/Java

스트림과 병렬처리(4) - 수집, 병렬처리

slow-walker 2023. 12. 4. 10:48

 

1. 수집(collect())

 

 collect() : 스트림은 요소들을 필터링 또는 매핑한 후 요소들을 수집하는 최종 처리 메소드

이 메소드를 이용하면 필요한 요소만 컬렉션으로 담을 수 있고, 요소들을 그룹핑한 후 집계(리덕션)할 수 있다.

 

필터링한 요소 수집

 

Stream의 collect(Collector<T,A,R> collector) 메소드는 필터링 또는 매핑된 요소들을 새로운 컬렉션에 수집하고, 이 컬렉션을 리턴한다.

 

매개값인 Collector(수집기)는 어떤 요소를 어떤 컬렉션에 수집할 것인지 결정한다. Collector의 타입 파라미터 T의 요소이고, A는 누적기(accumulator)이다. 그리고 R은 요소가 저장될 컬렉션이다. 풀어서 해석하면 T요소를 A 누적기가 R에 저장한다는 의미이다.

 

Collector의 구현 객체는 아래와 같이 Collectors클래스의 다양한 정적 메소드를 이용해서 얻을 수 있다.

 

리턴값인 Collector를 보며 A(누적기)가 ?로 되어 있는데, 이것은 Collector가 R(컬렉션)에 T(요소)를 저장하는 방법을 알고 있어 A(누적기)가 필요 없기 때문이다. Map과 ConcurrentMap의 차이점은 Map은 스레드에 안전하지 않고, ConcurrentMap은 스레드에 안전하다. 멀티 스레드 환경에서 사용하려면 ConcurrentMap을 얻는 것이 좋다.

 

아래는 전체 학생중에서 남학생들만 필터링해서 별도의 List로 생성한다.

 

  1. 전체 학생 List에서 Stream을 얻는다.
  2. 남학생만 필터링해서 Stream을 얻는다.
  3. List에 Student를 수집하는 Collector를 얻는다.
  4. Stream에서 collect()메소드로 Studnet를 수집해서 새로운 List를 얻는다.

 

 

아래 코드는 전체 학생 중에서 여학생들만 필터링해서 별도의 HashSet으로 생성한다.

 

  1. 전체 학생 List에서 Stream을 얻는다
  2. 여학생만 필터링해서 Stream을 얻는다
  3. 새로운 HashSet을 공급하는 Supplier를 얻는다.
  4. Supplier가 공급하는 HashSet에 Student를 수집하는 Collector를 얻는다.
  5. Stream에서  collect()메소드로 Student를 수집해서 새로운 HashSet을 얻는다.
package org.example.chapter16.collect;

import org.example.chapter16.Student;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class ToListExample {
    public static void main(String[] args) {
        List<Student> totalList = Arrays.asList(
                new Student("홍길동",10, Student.Sex.MALE),
                new Student("김수애",6, Student.Sex.FEMALE),
                new Student("신용권",10, Student.Sex.MALE),
                new Student("박수미",6, Student.Sex.FEMALE)
                );

        // 남학생들만 묶어 List 생성
        List<Student> maleList = totalList.stream()
                .filter(s -> s.getSex() == Student.Sex.MALE)
                .collect(Collectors.toList());
        maleList.stream()
                .forEach(s -> System.out.println(s.getName()));

        System.out.println();

        // 남학생들만 묶어 List 생성
        Set<Student> femaleList = totalList.stream()
                .filter(s -> s.getSex() == Student.Sex.FEMALE)
                .collect(Collectors.toCollection(HashSet :: new));
        femaleList.stream()
                .forEach(s -> System.out.println(s.getName()));

        System.out.println();
    }
}

 

사용자 정의 컨테이너에 수집하기

 

List, Set, Map과 같은 컬렉션이 아니라 사용자 정의 컨테이너 객체에 수집하는 방법에 대해 알아보자.

스트림은 요소들을 필터링, 또는 매핑해서 사용자 정의 컨테이너 객체에 수집할 수 있도록 아래와 같이 collect()메소드를 츠가적으로 제공한다.

 

  • 첫번째 Supplier는 요소들이 수집될 컨테이너 객체(R)를 생성하는 역할을 한다. 순차 처리(싱글 스레드)스트림에서는 단 한번 Supplier가 실행되고 하나의 컨테이너 객체를 생성한다. 병렬 처리(멀티 스레드) 스트림에서는 여러 번 Supplier가 실행되고 스레드별로 여러 개의 컨테이너 객체를 생성한다. 하지만 최종적으로 하나의 컨테이너 객체로 결합된다
  • 두번째 XXXConsumer는 컨테이너 객체(R)애 요소(T)를 수집하는 역할을 한다. 스트림에서 요소를 컨테이너에 수집할 때마다 XXXConsumer가 실행된다.
  • 세 번째 XXXConsumer는 컨테이너 객체(R)를 결합하는 역할을 한다. 순차 처리 스트림에서는 호출되지 않고, 병렬 처리 스트림에서만 호출되어 스레드별로 생성된 컨테이너 객체를 결합해서 최종 컨테이너 객체를 완성한다.

 

리턴 타입 R은 요소들이 최종 수집된 컨테이너 객체이다. 순차 처리 스트림에서는 리턴 객체가 첫번째 Supplier가 생성한 객체지만, 병렬 처리 스트림에서는 최종 결합된 컨테이너 객체가 된다. 병렬처리는 다음 절에서 살펴보고, 순차 처리를 이용해서 사용자 정의 객체에 요소를 수집하는 것을 살펴보기로 하자.

 

학생들 중에서는 남학생만 수집하는 MaleStudent 컨테이너가 아래와 같이 정의되어 있다고 가정해보자.

package org.example.chapter16.collect;

import org.example.chapter16.Student;

import java.util.ArrayList;
import java.util.List;

public class MaleStudent {
    private List<Student> list; // dythfmf wjwkdgkf zjffprtus
    
    public MaleStudent() {
        list = new ArrayList<Student>();
                                // 생성자를 호출하는 스레드 이름
        System.out.println("[" + Thread.currentThread().getName() + "] MaleStudent");
    }
    
    // 요소를 수집하는 메소드
    public void accumulate(Student student) {
        list.add(student);
        System.out.println("[" + Thread.currentThread().getName() + "] accumulate()");
    }

    // 두 MaleStudent 를 결합하는 메소드(병렬 처리 시에만 호출)
    public void combine(MaleStudent other) {
        list.addAll(other.getList());
        System.out.println("[" + Thread.currentThread().getName() + "] combine()");
    }
    // 요소가 저장된 컬렉션을 리턴
    public List<Student> getList() {
        return list;
    }
}

 

 

  • list필드는 남학생들이 수집될 필드다.
  • 생성자가 몇 번 호출되었는지 확인하기 위해 호출한 스레드의 이름과 함께 생성자 이름을 출력한다. 순차 처리 스트림에서 MaleStudent() 생성자는 딱 한번 호출되고, 하나의 MaleStudent 객체만 생성된다.
  • accumulate()메소드는 매개값으로 받은 Student를 list필드에 수집하는데, 14라인에서 accumulate()메소드가 몇 번 실행되었는지 확인하기 위해 호출한 스레드의 이름과 함께 메소드 이름을 출력한다.
  • combine()메소드는 병렬 처리 스트림을 사용할 때 다른 MaleStudent와 결합할 목적으로 실행된다. 순차 처리 스트림에서는 호출되지 않기 때문에 정의할 필요가 없다고 생각되지만, collect()메소드의 세 번째 매개값인 BiConsumer를 생성하기 위해서는 필요하다.

 

스트림에서 남학생을 MaleStudent에 수집하는 코드를 살펴보자.

 

  1. 전체 학생 List에서 Stream을 얻는다
  2. 남학생만 필터링해서 Stream을 얻는다
  3. MaleStudent를 공급하는 Supplier를 얻는다
  4. MaleStudent와 Student를 매개값으로 받아서 MaleStudent의 accumulate()메소드로 Studnet를 수집하는 BiConsumer를 얻는다
  5. supplier가 제공하는 MaleStudent에 accumualter가 Stundet를 수집해서 최정 처리된 MaleStudent를 얻는다
  6. 싱글 스레드에서는 combiner는 사용되지 않는다. 

 

 

아래 예제는 순차 스트림을 이용해서 사용자 정의 컨테이너인 MaleStudent에 남학생만 수집한다.

package org.example.chapter16.collect;

import org.example.chapter16.Student;

import java.util.Arrays;
import java.util.List;

public class MaleStudentExample {
    public static void main(String[] args) {
        List<Student> totalList = Arrays.asList(
                new Student("홍길동", 10, Student.Sex.MALE),
                new Student("김수애", 6, Student.Sex.FEMALE),
                new Student("신용권", 10, Student.Sex.MALE),
                new Student("박수미", 6, Student.Sex.FEMALE)
                );

        MaleStudent maleStudent = totalList.stream()
                .filter(s -> s.getSex() == Student.Sex.MALE)
                .collect(MaleStudent::new, MaleStudent::accumulate, MaleStudent::combine);

        maleStudent.getList().stream()
                .forEach(s -> System.out.println(s.getName()));
    }
}

 

실행 결과를 보면 순차 처리를 담당한 스레드는 main 스레드임을 알 수 있다. MaleStudent()생성자가 딱 한번 호출되었기 때문에 한 개의 MaleStudent객체가 생성되었고, accumulate()가 두 번 호출 되었기 때문에 요소들이 2번 수집되었다. 그래서 collect()가 리턴한 최종 MaleStudent에는 남학생 두명이 저장되어 있는 것을 볼 수 있다.

 

요소를 그룹핑해서 수집

 

collect()메소드는 단순히 요소를 수집하는 기능 이외에 컬렉션의 요소들을 그룹핑해서 Map객체를 생성하는 기능도 제공한다

collect()를 호출할때 Collectors의 groupingBy()또는 groupingByConcurrent()가 리턴하는 Collector를 매개값으로 대입하면 된다.

groupingBy()는 스레드에 안전하지 않은 Map을 생성하지만, groupingByConcurrent()는 스레드에 안전한 ConcurrentMap을 생성한다.

 

 

아래 코드는 학생들을 성별로 그룹핑하고 나서, 같은 그룹에 속하는 학생 List를 생성한 후, 성별을 키로, 학생 List를 값으로 갖는 Map을 생성한다. collect()의 매개값으로 groupingBy(Function<T,K>classifier)를 사용했다.

Map<Student.Sex, List<Student>> mapBySex = totalList.stream()
        .collect(Collectors.groupingBy(Student::getSex));
  1. 전체 학생 List에서 Stream을 얻는다
  2. Student를 Student.Sex로 매핑하는 Function을 얻는다
  3. Student.Sex가 키가 되고, 그룹핑된 List<Student>가 값인 Map을 생성하는 Collector를 얻는다
  4. Stream의 collect()메소드로 Student를 Student.Sex별로 그룹핑해서 Map을 얻는다

 

아래 예제는 학생들을 거주 도시별로 그룹핑하고나서, 같은 그룹에 속하는 학생들의 이름 List를 생성한 후, 거주 도시를 키로, 이름 List를 값으로 갖는 Map을 생성한다. collect()의 매개값으로 groupingBy(Function<t,k>classifier, Collector<T,A,D> collecotr)를 사용했다.

 

Map<Student.City, List<String>> mapByCity = totalList.stream()
        .collect(
                Collectors.groupingBy(
                        Student::getCity,
                        Collectors.mapping(Student::getName, Collectors.toList())
                )
        );
  1. 전체 학생 List에서 Stream을 얻는다
  2. Student를 Student.City로 매핑하는 Functiondmf djesmsek
  3. 3~5번에서는 Student의 이름을 List에 수집하는 Collector를 얻는다
  4. Student를 이름으로 매핑하는 Function을 얻는다
  5. 이름을 List에 수집하는 Collector를 얻는다
  6. Collectorsdml mapping()메소드로 Studnet를 이름으로 매핑하고 이름을 List에 수집하는 Collector를 얻는다
  7. Student.City가 키이고, 그룹핑된 이름 List가 값인 Map을 생성하는 Collector를 얻는다
  8. Stream의 collect()메소드로 Studnet를 Student.City별로 그룹핑해서 Map을 얻는다.
package org.example.chapter16.collect;

import org.example.chapter16.Student;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class GroupingByExample {
    public static void main(String[] args) {
        List<Student> totalList = Arrays.asList(
                new Student("홍길동", 10, Student.Sex.MALE, Student.City.Seoul),
                new Student("김수애", 6, Student.Sex.FEMALE, Student.City.Pusan),
                new Student("신용권", 10, Student.Sex.MALE, Student.City.Pusan),
                new Student("박수미", 6, Student.Sex.FEMALE, Student.City.Seoul)
        );

        Map<Student.Sex, List<Student>> mapBySex = totalList.stream()
                .collect(Collectors.groupingBy(Student::getSex));

        System.out.print("[남학생] ");
        mapBySex.get(Student.Sex.MALE).stream()
                .forEach(s-> System.out.print(s.getName() + " "));

        System.out.print("\n[여학생] ");
        mapBySex.get(Student.Sex.FEMALE).stream()
                .forEach(s-> System.out.print(s.getName() + " "));

        System.out.println();

        Map<Student.City, List<String>> mapByCity = totalList.stream()
                .collect(
                        Collectors.groupingBy(
                                Student::getCity,
                                Collectors.mapping(Student::getName, Collectors.toList())
                        )
                );

        System.out.print("\n[서울] ");
        mapByCity.get(Student.City.Seoul).stream().forEach(s-> System.out.print(s + " "));

        System.out.print("\n[부산] ");
        mapByCity.get(Student.City.Pusan).stream().forEach(s-> System.out.print(s + " "));
    }
}

 

그룹핑 후 매핑 및 집계

 

Collectors.groupingBy()메소드는 그룹핑 후, 매핑이나 집계(평규느 카운팅, 연결, 최대, 최소, 합계)를 할 수 있도록 두 번째 매개값을 Collector를 가질 수 있다. 이전 예제에서는 그룹핑된 학생 객체를 학생이름으로 매핑하기 위해 mapping()메소드로 Collector를 얻었다. Collectors는 mapping()메소드 이외에도 집계를 위해 다양한 Collector를 리턴하는 메소드가 있다.

 

아래 코드는 학생들을 성별로 그룹핑한 다음 같은 그룹에 속하는 학생들의 평균 점수를 구하고, 성별을 키로, 평균 점수를 값으로 갖는 Map을 생성한다.

 

Map<Student.Sex, Double> mapBySex = totalList.stream()
        .collect(
                Collectors.groupingBy(
                        Student::getSex,
                        Collectors.averagingDouble(Student::getScore)
                )
        );
  1. 전체 학생 List에서 Stream을 얻는다
  2. Student를 Student.Sex로 매핑하는 Function을 얻는다
  3. Student를 점수로 매핑하는 ToDoubleFunction을 얻는다
  4. 학생 점수의 평균을 산출하는 Collector를 얻는다
  5. Student.Sex가 키이고, 평균 점수 Double이 값인 Map을 생성하는 Collector를 얻는다
  6. Stream의 collect()메소드로 Student를 Student.Sex별로 그룹핑해서 Map을 얻는다

아래 코드는 학생들을 성별로 그룹핑한 다음 같은 그룹에 속하는 학생 이름을 쉼표로 구분해서 문장려로 만들고, 성별을 키로, 이름 문자열 값으로 갖는 Map을 생성한다.

Map<Student.Sex, String> mapByName = totalList.stream()
        .collect(
                Collectors.groupingBy(
                        Student::getSex,
                        Collectors.mapping(
                                Student::getName,
                                Collectors.joining(",")
                        )
                )
        );
package org.example.chapter16.collect;

import org.example.chapter16.Student;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class GroupingAndReductionExample {
    public static void main(String[] args) {
        List<Student> totalList = Arrays.asList(
                new Student("홍길동", 10, Student.Sex.MALE),
                new Student("김수애", 12, Student.Sex.FEMALE),
                new Student("신용권", 10, Student.Sex.MALE),
                new Student("박수미", 12, Student.Sex.FEMALE)
        );

        //성별로 평균 점수를 저장하는 Map얻기
        Map<Student.Sex, Double> mapBySex = totalList.stream()
                .collect(
                        Collectors.groupingBy(
                                Student::getSex,
                                Collectors.averagingDouble(Student::getScore)
                        )
                );

        System.out.println("남학생 평균 점수: " + mapBySex.get(Student.Sex.MALE));
        System.out.println("남학생 평균 점수: " + mapBySex.get(Student.Sex.FEMALE));

        //성별을 쉼표로 구분한 이름을 저장하는 Map얻기
        Map<Student.Sex, String> mapByName = totalList.stream()
                .collect(
                        Collectors.groupingBy(
                                Student::getSex,
                                Collectors.mapping(
                                        Student::getName,
                                        Collectors.joining(",")
                                )
                        )
                );

        System.out.println("남학생 전체 이름: " + mapByName.get(Student.Sex.MALE));
        System.out.println("남학생 전체 이름: " + mapByName.get(Student.Sex.FEMALE));
    }
}

 

 

 

2. 병렬 처리

 

병렬 처리(Parallel Operation)란? 멀티 코어 CPU 환경에서 하나의 작업을 분할해서 각각의 코어가 병렬적으로 처리하는 것을 말하는데, 병렬 처리의 목적은 작업 처리 시간을 줄이기 위한 것이다.

 

자바 8부터 요소를 병렬 처리할 수 있도록 하기 위해 병렬 스트림을 제공하기 때문에 컬렉션(배열)의 전체 요소 처리 시간을 줄여준다.

 

동시성(Concurrency)과 병렬성(Parallelism)

 

멀티 스레드는 동시성 또는 병렬서응로 실행되기 때문에 이 용어들에 정확히 해야해야 한다.

 

이 둘은 멀티 스레드의 동작 방식이라는 점에서 동일하지만 서로 다른 목적을 가지고 있다.

  • 동시성은 멀티 작업을 위해 멀티 스레드가 번걸아가며 실행하는 성질을 말하고,
  • 병렬성은 멀티 작업을 위해 멀티 코어를 이용해서 동시에 실행하는 성질을 말한다.

싱글 코어 CPU를 이용한 멀티 작업은 병렬적으로 실행되는 것처럼 보이지만, 사실은 번갈아가며 실행하는 동시성 작업이다.

번갈아가며 실행하는 것이 워낙 빠르다보니 병렬성으로 보일 뿐이다.

 

병렬성은 데이터 병렬성(Data parallelism)과 작업 병렬성(Task parallelism)으로 구분될 수 있다.

 

데이터 병렬성

 

데이터 병렬성은 전체 데이터를 쪼개어 서브 데이터들로 만들고 이 서브 데이터들을 병렬 처리해서 작업을 빨리 끝내는 것을 말한다.

자바 8에서 지원하는 병렬 스트림은 데이터 병렬성을 구현한 것이다. 멀티 코어의 수만큼 대용량 요소를 서브 요소들로 나누고, 각각의 서브 요소들을 분리된 스레드에서 병렬 처리시킨다. 예를 들어 쿼드 코어 CPU일 경우 4개의 서브 요소들로 나누고, 4개의 스레드가 각각 서브 요소들을 병렬 처리한다.

 

작업 병렬성

 

작업 병렬성은 서로 다른 작업을 병렬 처리하는 것을 말한다. 작업 병렬성의 대표적인 예는 웹 서버(Web server)이다.

웹 서버는 각각의 브라우저에서 요청된 내용을 개별 스레드에서 병렬로 처리한다.

 

포크조인(ForkJoin)프레임워크

 

병렬 스트림은 요소들을 병렬 처리하기 위해 포크조인(ForkJoin) 프레임워크(Framework)를 사용한다. 병렬 스트림을 이용하면 런타임 시에 포크 조인 프레임워크가 동작하는데, 포크 단계에서는 전체 데이터를 서브 데이터로 분리한다. 그리고 나서 서브 데이터를 멀티 코어에서 병렬 처리한다. 조인 단계에서는 서브 결과를 결합해서 최종 결과를 만들어 낸다. 예를 들어 쿼드 코어 CPU에서 병렬 스트림으로 작업을 처리할 경우, 스트림의 요소를 N개라고 보았을 때 포크 단계에서는 전체 요소를 4등분 한다. 그리고 1등분씩 개별 코어에서 처리하고 조인단계에서는 3번의 결합과정을 거쳐 최종 결과를 산출한다.

 

병렬 처리 스틤은 실제로 포크 단계에서 차례대로 요소를 4등분하지 않는다. 이해하기 쉽도록 하기 위해 위 그림은 차례대로 4등분 했지만, 내부적으로 서브 요소를 나누는 알고리즘이 있다. 포크조인 프레임워크는 포크와 조인 기능 이외에 스레드 풀인 ForkJoinPool을 제공한다. 각각의 코어에서 서브 요소를 처리하는 것은 개별 스레드가 해야 하므로 스레드 관리가 필요하다. 포크조인 프레임워크는 ExecutorService의 구현 객체인 ForkJoinPool을 사용해서 작업스레드를 관리한다.

 

병렬 스트림 생성

 

병렬 처리를 위해 코드에서 포크조인 프레임워크를 직접 사용할 수는 있지만, 병렬 스트림을 이용할 경우에는 백그라운드에서 포크 조인 프레임워크가 사용되기 때문에 개발자는 매우 쉽게 병렬 처리를 할 수 있다.

 

parallelStream()메소드는 컬렉션으로부터 병렬 스트림을 바로 리턴한다. parallel()메소드는 순차 처리 스트림을 병렬 처리 스트림으로 변환해서 리턴한다. 어떤 방법으로 병렬 스트림을 얻더라도 이후 요소 처리 과정은 병렬 처리 된다.

 

내부적으로 전체 요소를 서브요소들로 나누고, 이 서브 요소들을 개별 스레드가 처리한다. 서브 처리 결과가 나오면 결합해서 마지막으로 최종 처리 결과를 리턴한다. 내부적인 동작을 확인하려면 사용자 정의 컨테이너에 수집하기에서 살펴본 예제를 병렬 스트림으로 수정해서 실행해보면 된다.

 

 

위의 코드를 보면 전체 학생 목록에서 stream()메소드로 순차 처리 스트림을 얻었기 때문에 MaleStudent객체는 하나만 생성되고, 남학생을 MaleStudent에 수집하기 위해 accumulate()가 호출된다. combine()메소드는 전혀 호출되지 않았는데, 그 이유는 순차 처리 스트림이므로 결합할 서브 작업이 없기 때문이다. 해당 코드를 병렬 처리 스트림으로 변경하면 아래와 같다.

MaleStudent maleStudent = totalList.parallelStream()
        .filter(s -> s.getSex() == Student.Sex.MALE)
        .collect(MaleStudent::new, MaleStudent::accumulate, MaleStudent::combine);

 

단순히 stream()메소드 호출이 parallelStream() 메소드 호출로 변경되었지만 내부 동작은 전혀 다르게 진행된다.

 

  • 쿼드 코어 CPU에서 실행된다면 전체 요소는 4개의 서브 요소로 나눠지고, 4개의 스레드가 병렬 처리한다. 각 스레드는 서브 요소를 수집해야 하므로 4개의 MaleStudent 객체를 생성하기 위해 collect()의 첫 번째 메소드 참조인 MaleStudent :: new 를 4번 실행 시킨다.
  • 각 스레드는 MaleStudent 객체에 남학생 요소를 수집하기 위해 두 번째 메소드 참조인 MaleStudent :: accumulate를 매번 실행시킨다
  • 수집이 완료된 4개의 MaleStudent는 3번의 결합으로 최종 MaleStudent가 만들어질 수 있으므로 세 번째 메소드 참조인 MaleStudent :: combine가 3번 실행된다
package org.example.chapter16.collect;

import org.example.chapter16.Student;

import java.util.Arrays;
import java.util.List;

public class MaleStudentExample {
    public static void main(String[] args) {
        List<Student> totalList = Arrays.asList(
                new Student("홍길동", 10, Student.Sex.MALE),
                new Student("김수애", 6, Student.Sex.FEMALE),
                new Student("신용권", 10, Student.Sex.MALE),
                new Student("박수미", 6, Student.Sex.FEMALE)
                );
        //단일스레드
        MaleStudent maleStudent1 = totalList.stream()
                .filter(s -> s.getSex() == Student.Sex.MALE)
                .collect(MaleStudent::new, MaleStudent::accumulate, MaleStudent::combine);

        maleStudent1.getList().stream()
                .forEach(s -> System.out.println(s.getName()));
        System.out.println();
        
        //멀티 스레드
        MaleStudent maleStudent2 = totalList.parallelStream()
                .filter(s -> s.getSex() == Student.Sex.MALE)
                .collect(MaleStudent::new, MaleStudent::accumulate, MaleStudent::combine);

        maleStudent2.getList().stream()
                .forEach(s -> System.out.println(s.getName()));
    }
}

 

 

실행결과를 보면 main스레드와 ForkJoinPool에서 3개의 스레드가 사용되어 총 4개의 스레드가 동작한다. 이것은 필자의 컴퓨터가 쿼드 코어 CPU를 사용하기 때문이다. 각각의 스레드가 하나의 서브 작업이라고 본다면 총 4개의 서브 작업으로 분리되었다고 생각하면 된다. 각 서브 작업은 남학생을 누적시킬 MaleStudent 객체를 별도로 생성하기 때문에 MaleStudent()생성자가 4번 실행되었다. 하지만 전체 학생 중에서 남학생이 2명밖에 없으므로 accumulate()는 2번밖에 호출되지 않았다. 누적이 완료된 4개의 MaleStudent 객체는 3번의 결합으로 최종 MaleStudent가 만들어지므로 combine메소드가 3번 호출되었다.

 

학생수를 100명으로 늘리고 쿼드 코어 CPU내에서 테스트한 결과를 보면 4개의 코어가 병렬적으로 요소를 처리하고 있는걸 볼 수 있다.

병렬 처리 성능

 

스트림 병렬 처리가 스트림 순차 처리보다 항상 실행 성능이 좋다고 판단해서는 안된다. 병렬 처리에 영향을 미치는 다음 3가지 요인을 살펴봐야 한다.

 

요소의 수와 요소당 처리 시간

 

컬렉션에서 요수의 수가 적고 요소당 처리 시간이 짧으면 순차 처리가 오히려 병렬처리 보다 빠를 수 있다. 병렬 처리는 스레드풀 생성, 스레드 생성이라는 추가적인 비용을 발생하기 때문이다.

 

스트림 소스의 종류

 

ArrayList, 배열은 인덱스로 요소를 관리하기 때문에 포크 단계에서 요소를 쉽게 분리할 수 있어 병렬 처리 시간이 절약된다.

반면에 HashSet, TreeSet은 요소 분리가 쉽지 않고, LinkedList 역시 링크를 따라가야 하므로 요소 분리가 쉽지 않다. 따라서 이 소스들은 ArrayList, 배열보다는 상대적으로 병렬 처리가 늦다.

 

코어(Core)의 수

 

싱글 코어 CPU일 경우에는 순차 처리가 빠르다. 병렬 스트림을 사용할 경우 스레드의 수만 증가하고 동시성 작업으로 처리되기 때문에 좋지 못한 결과를 준다. 코어의 수가 많을 수록 병렬 작업 처리 속도는 빨라진다.

 

package org.example.chapter16.parallel;

import java.util.Arrays;
import java.util.List;

public class SequencialVsParallelExample {
    // 요소처리
    public static void work(int value) {
        try {
            Thread.sleep(100); // 값이 작을수록 순차 처리가 빠름
        } catch (InterruptedException e) {

        }
    }
        //순차 처리
        public static long testSequencial(List<Integer> list) {
            long start = System.nanoTime();
            list.stream().parallel().forEach((a) -> work(a));
            long end = System.nanoTime();
            long runTime = end - start;
            return runTime;
        }

        public static void main(String... args) {
            //소스 컬렉션
            List<Integer> list = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

            //순차 스트림 처리 시간 구하기
            long timeSequencial = testSequencial(list);

            //병렬 스트림 처리 시간 구하기
            long timeParallel = testSequencial(list);

            if (timeSequencial < timeParallel) {
                System.out.println("성능 테스트 결과: 순차 처리가 더 빠름");
            } else {
                System.out.println("성능 테스트 결과: 병렬 처리가 더 빠름");
            }
        }
}

 

위 예제의 실행 결과는 work()의 요소 처리 시간에 따라 달라진다. Thread.sleep(10)으로 할 경우 순차 처리가 더 빠르다. 그렇기 때문에 실제 작업 내용을 작성한 후에는 순차 처리와 병렬 처리 중 어떤 처리가 더 유리한지 테스태봐야 한다.

 

아래 예제는 스트림 소스가 ArrayList인 경우와 LinkedList일 경우 대용량 데이터의 병렬 처리 성능을 테스트한 것이다. 백만 개의 Integer 객체를 각각 ArrayList와 LinkedList에 저장하고 테스트했다.

package org.example.chapter16.parallel;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class ArrayListVsLinkedListExample {
    //요소 처리
    public static void work(int value) {

    }

    //병렬 처리
    public static long testParallel(List<Integer> list) {
        long start = System.nanoTime();
        list.stream().parallel().forEach((a) -> work(a));
        long end = System.nanoTime();
        long runTime = end - start;
        return runTime;
    }

    public static void  main(String... args) {
        //소스 컬렉션
        ArrayList<Integer> arrayList = new ArrayList<>();
        LinkedList<Integer> linkedList = new LinkedList<>();
        for (int i = 0; i < 100000; i++) {
            arrayList.add(i);
            linkedList.add(i);
        }

        //워밍업
        long arrayListParallel = testParallel(arrayList);
        long linkedListParallel = testParallel(linkedList);

        //순차 스트림 처리 시간 구하기
        arrayListParallel = testParallel(arrayList);
        linkedListParallel = testParallel(linkedList);

        if (arrayListParallel < linkedListParallel) {
            System.out.println("성능 테스트 결과: ArrayList 처리가 더 빠름");
        } else {
            System.out.println("성능 테스트 결과: LinkedList 처리가 더 빠름");
        }
    }
}

실행 결과는 ArrayList가 빠른 실행 성능을 보였다. 하지만 요소의 개수가 작을 경우에는 LinkedList가 더 빠른 성능을 보였다. 29~30라인에서 워밍업을 둔 이유는 실행 준비 과정에서의 오차를 줄이기 위해서다.

 

워밍업은 실제 성능 측정 이전에 몇 번의 작업을 수행하여 JIT(Just-In-Time) 컴파일러가 코드를 최적화하도록 유도하는 것을 의미합니다. 따라서 병렬 처리를 위한 최적화가 이미 진행된 상태에서 실제 성능 측정이 이루어집니다.

 

 

항상 ArrayList가 빠른게 아니고, LinkedList 더 빠르다고 나오는 경우도 있는데 이유가 뭘까?

 

  1. HotSpot JIT 컴파일러 최적화: Java의 HotSpot JIT 컴파일러는 프로그램이 실행되면서 동적으로 코드를 최적화합니다. 초기에는 실행 특성에 기반한 최적화가 이루어지지 않았을 수 있으며, 시간이 지남에 따라 이 최적화가 발생하여 성능이 향상될 수 있습니다. 이는 각 자료구조(예: ArrayList 또는 LinkedList)에 대한 최적화로 이어질 수 있습니다.

  2. 캐시 효과: 캐시 효과는 메모리에 접근할 때 데이터를 캐시에 저장하고 재사용함으로써 성능을 향상시킬 수 있는 현상을 나타냅니다. 시간이 지남에 따라 데이터에 대한 캐시 히트율이 증가할 수 있고, 특정 자료구조의 패턴이 캐시 효과를 더욱 잘 이용하게 될 수 있습니다.

  3. Garbage Collection 영향: Java에서는 가비지 컬렉션이 주기적으로 수행되며, 이는 메모리 관리에 영향을 미칩니다. 시간이 지남에 따라 가비지 컬렉션의 효율이 달라질 수 있으며, 이는 자료구조의 성능에도 영향을 미칠 수 있습니다.

  4. 데이터 크기 변화: 초기에는 데이터 크기가 작아서 ArrayList가 더 유리했을 수 있습니다. 그러나 시간이 지나면서 데이터 크기가 증가하면서 LinkedList가 더 효과적인 경우가 발생할 수 있습니다.