개발자는 기록이 답이다

스트림과 병렬처리(1) - 스트림이란 무엇인가? 본문

언어/Java

스트림과 병렬처리(1) - 스트림이란 무엇인가?

slow-walker 2023. 12. 3. 16:03

1. 스트림 소개

스트림(Stream)은 자바 8부터 추가된 컬렉션(배열 포함)의 저장요소를 하나씩 참조해서 람다식(함수적-스타일)로 처리할 수 있도록 해주는 반복자이다.

 

반복자 스트림

 

자바 7이전까지는 List<String>컬렉션에서 요소를 순차적으로 처리하기 위해 Iterator반복자를 사용해왔다.

List<String> list = Arrays.asList("홍길동","신용권","감자바");
Iterator<String> iterator = list.iterator();
while(iterator.hasNext()) {
    String name = iterator.next();
    System.out.println(name);
}

 

위의 코드를 Stream을 사용해서 변경하면 다음과 같다.

List<String> list = Arrays.asList("홍길동","신용권","감자바");
Stream<String> stream = list.stream();
stream.forEach( name -> System.out.println(name));

 

컬렉션(java.util.Collection)의 stream() 메소드로 스트림 객체를 얻고나서 stream.forEach( name -> System.out.println(name)); 메소드를 통해 컬렉션의 요소르 하나씩 콘솔에 출력한다.

 

forEach()메소드는 다음과 같이 Consumer 함수적 인터페이스 타입의 매개값을 가지므로 컬렉션의 요소를 소비할 코드를 람다식으로 기술할 수 있다.

 

Iterator를 사용한 코드와 Stream을 사용한 코드를 비교해보면 Stream을 사용하는 것이 훨씬 단순해 보인다.

 

스트림의 특징

 

🚩 Stream은 Iterator와 비슷한 역할을 하는 반복자이지만, 아래와 같은 측면에서 많은 차이를 갖고 있다.

1. 람다식으로 요소 처리 코드를 제공하는 점
2. 내부 반복자를 사용하므로 병렬 처리가 쉽다는 점
3. 중간 처리와 최종 처리 작업을 수행하는 점

 

💡 람다식으로 요소 처리 코드를 제공한다

 

Stream이 제공하는 대부분의 요소 처리 메소드는 함수적 인터페이스 매개 타입을 가지기 때문에 람다식 또는 메소드 참조를 이용해서 요소 처리 내용을 매개값으로 전달할 수 있다.

 

💡 내부 반복자를 사용하므로 병렬 처리가 쉽다

 

외부 반복자(external iterator)란 개발자가 코드로 직접 컬렉션의 요소를 반복해서 가져오는 코드 패턴을 말한다.

Index를 이용한 for문 그리고 Iterator를 이용하는 while문은 모두 외부 반복자를 이용하는 것이다.

 

반면에 내부 반복자(internal iterator)는 컬렉션 내부에서 요소들을 반복시키고, 개발자는 요소당 처리해야 할 코드만 제공하는 코드 패턴을 말한다.

 

내부 반복자를 사용해서 얻는 이점은 컬렉션 내부에서 어떻게 요소를 반복시킬 것인가는 컬렉션에게 맡겨두고, 개발자는 요소 처리 코드에만 집중할 수 있다는 것이다. 내부 반복자는 요소들의 반복 순서를 변경하거나, 멀티 코어 CPU를 최대한 활용하기 위해 요소들을 분배시켜 병렬 작업을 할수 있게 도와주기 때문에 하나씩 처리하는 순차적 외부 반복자보다는 효율적으로 요소를 반복시킬 수 있다.

 

 

Iterator는 컬렉션의 요소를 가져오는 것에서부터 처리하는 것 까지 모두 개발자가 작성해야 하지만, 스트림은 람다식으로 요소 처리 내용만 전달할 뿐, 반복은 컬렉션 내부에서 일어난다. 스트림을 사용하면 코드도 간결해지지만, 무엇보다도 요소의 병렬 처리가 컬렉션 내부에서 처리되므로 일석이조의 효과를 가져온다.

 

🚩  병렬(parallel)처리란?
한 가지 작업을 서브 작업으로 나누고, 서브 작업들을 분리된 스레드에서 병렬적으로 처리하는 것을 말한다. 

 

병렬 처리 스트림을 이용하면 런타임 시 하나의 작업을 서브 작업으로 자동으로 나누고, 서브 작업의 결과를 자동으로 결합해서 최종 결과물을 생성한다.

 

예를 들어 컬렉션의 요소 총합을 구할 때 순차 처리 스트림은 하나의 스레드가 요소들을 순차적으로 읽어 합을 구하지만, 병렬 처리 스트림을 이용하면 여러 개의 스레드가 요소들을 부분적으로 합하고 이 부분 합을 최종 결합해서 전체 합을 생성한다.

 

다음 예제는 순차 처리 스트림과 병렬 처리 스트림을 이용할 경우, 사용된 스레드의 이름이 무엇인지 콘솔에 출력한다.

살행 결과를 보면 병렬 처리 스트림은 main스레드를 포함해서 ForkJoinPool(스레드 풀)의 작업 스레드들이 병렬적으로 요소를 처리하는 것을 볼 수 있다.

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

public class ParallelExample {

    public static void main(String[] args) {
        List<String> list = Arrays.asList(
                "홍길동", "신용권", "김자바",
                "람다식", "박병렬");

        // 순차 처리
        Stream<String> stream = list.stream();
                        // 메소드 참조(s -> ParallelExampleprint(s)와 동일)
        stream.forEach(ParallelExample :: print);
        System.out.println();

        // 병렬 처리
        Stream<String> parallelStream = list.parallelStream();
                                // 메서드 참조
        parallelStream.forEach(ParallelExample::print);

    }

    public static void print(String str) {
        System.out.println(str+ " : " + Thread.currentThread().getName());
    }
}

 

 

💡 스트림은 중간 처리와 최종 처리를 할 수 있다

 

스트림은 컬렉션의 요소에 대한 중간 처리와 최종 처리를 수행할 수 있는데, 중간 처리에서는 매핑, 필터링, 정렬을 수행하고 최종 처리에서는 반복, 카운팅, 평균, 총합 등의 집계 처리를 수행한다.

 

예를 들어 학생 객체를 요소로 가지는 컬렉션이 있다고 가정해보자. 중간 처리에서는 학생의 점수를 뽑아내고, 최종 처리에서는 점수의 평균값을 산출한다.

 

 

import java.util.Arrays;
import java.util.List;

public class MapAndReduceExample {

    public static void main(String[] args) {
        List<Student> studentList = Arrays.asList(
                new Student("홍길동",10),
                new Student("신용권",20),
                new Student("유미선",30)
        );

        double avg = studentList.stream()
                // 중간처리(학생 객체를 점수로 매핑)
                .mapToInt(Student :: getScore)
                // 최종 처리(평균 점수)
                .average()
                .getAsDouble();
        System.out.println("평균 점수 : " + avg);
    }

    static class Student {
        private String name;
        private int score;

        public Student(String name, int score) {
            this.name = name;
            this.score = score;
        }

        public String getName() {
            return name;
        }

        public int getScore() {
            return score;
        }
    }
}

 

2. 스트림의 종류

 

자바 8부터 새로 추가된 java.util.stream 패키지에는 스트림(stream) API들이 포진하고 있다.

패키지 내용을 보면 BaseStream 인터페이스를 부모로 해서 자식 인터페이스들이 다음과 같은 상속 관계를 이루고 있다.

BaseStream인터페이스에는 모든 스트림에서 사용할 수 있는 공통 메소드들이 정의되어 있을 뿐 코드에서 직접적으로 사용되지는 않는다 하위 스트림인 Stream, IntStream, LongStream, DoubleStream이 직접적으로 이용되는 스트림인데, Stream은 객체 요소를 처리하는 스트림이고, IntStream, LongStream, DoubleStream은 각각 기본 타입인 int, long, double 요소를 처리하는 스트림이다.

 

이 스트림 인터페이스의 구현 객체는 다양한 소스로부터 얻을 수 있다. 주로 컬렉션과 배열에서 얻지만 다음 과 같은 소스로 부터 스트림 구현 객체를 얻을 수 있다.

 

컬렉션으로부터 스트림 얻기
package org.example.chapter16;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

public class FromCollectionExample {
    public static void main(String[] args) {
        List<Student> studentList = Arrays.asList(
                new Student("홍길동",10),
                new Student("신용권",20),
                new Student("유미선",30)
                );

        Stream<Student> stream = studentList.stream();
        stream.forEach(s -> System.out.println(s.getName()));
    }
}
package org.example.chapter16;

public class Student {
    private String name;
    private int score;

    public Student(String name, int score) {
        this.name = name;
        this.score = score;
    }

    public String getName() {
        return name;
    }

    public int getScore() {
        return score;
    }
}

 

배열로부터 스트림 얻기
package org.example.chapter16;

import java.util.Arrays;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class FromArrayExample {
    public static void main(String[] args) {
        String[] strArray = {"홍길동","신용권","김미나"};
        Stream<String> strStream = Arrays.stream(strArray);
        strStream.forEach(a -> System.out.println(a + ","));
        System.out.println();

        int[] intArray = {1,2,3,4,5};
        IntStream intStream = Arrays.stream(intArray);
        intStream.forEach(a -> System.out.println(a + " "));
        System.out.println();
    }
}

 

숫자 범위로부터 스트림얻기

 

rangeClosed()는 첫번째 매개값에샤부터 두번째 매개값까지 순차적으로 제공하는 IntStream을 리턴한다

package org.example.chapter16;

import java.util.stream.IntStream;

public class FromIntRangeExample {
    public static int sum;

    public static void main(String[] args) {
        IntStream stream = IntStream.rangeClosed(1, 100);
        stream.forEach(a -> sum += a);
        System.out.println("총합: "+ sum);
    }
}

 

파일로부터 스트림 얻기

 

Files의 정적 메소드인 lines()와 BufferedReader의 lines()메소드를 이용하여 문자 파일의 내용을 스트림을 통해 행 단위로 읽고 출력

package org.example.chapter16;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;

public class FromFileContentExample {
    public static void main(String[] args) throws IOException {

        Path path = Paths.get("src/main/java/org/example/chapter16/linedata.txt");
        Stream<String> stream;

        //Files.lines()메소드 이용
        stream = Files.lines(path, Charset.defaultCharset()); // 운영체제의 기본 문자셋
                    //메서드 참조(s -> System.out.println(s)와 동일)
        stream.forEach(System.out :: println);
        System.out.println();

        //BufferedReader의 lines()메소드 이용
        File file = path.toFile();
        FileReader fileReader = new FileReader(file);
        BufferedReader br = new BufferedReader(fileReader);
        stream = br.lines();
        stream.forEach(System.out :: println);
    }
}

 

디렉토리로부터 스트림얻기

 

Files()의 정적 메소드인 list()를 이용해서 디렉토리의 내용(서브 디렉토리 또는 파일 목록)을 스트림을 통해 읽는다

package org.example.chapter16;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;

public class FromDirectoryExample {
    public static void main(String[] args) throws IOException {
        Path path = Paths.get("src/main/java/org/example");
        Stream<Path> stream = Files.list(path);
        // p:서브 디렉토리 또는 파일에 해당하는 Path객체
        stream.forEach(p -> System.out.println(p.getFileName()));
    }
}

 

3. 스트림 파이프라인

 

🚩 리덕션(Reduction) 대량의 데이터를 가공해서 축소하는 것

 

데이터의 합계, 평균값, 카운팅, 최대값, 최소값 등이 대표적인 리덕션의 결과물이라고 볼 수 있다.

그러나 컬렉션의 요소를 리덕션의 결과물로 바로 집계할 수 없을 경우에는 집계하기 좋도록 필터링, 매핑, 정렬, 그룹핑 등의 중간 처리가 필요하다.

 

중간 처리와 최종 처리

 

스트림은 데이터의 필터링, 매핑, 정렬, 그룹핑 등의 중간 처리와 합계, 평균, 카운팅, 최대값, 최소값 등의 최종 처리를 파이프라인으로 해결한다. 파이프라인은 여러 개의 스트림이 연결되어 있는 구조를 말한다. 파이프라인에서 최종 처리를 제외하고는 모두 중간 처리 스트림이다.

 

 

중간 스트림이 생성될 때 요소들이 바로 중간처리(필터링, 매핑, 정렬)되는 것이 아니라 최종 처리가 시작되기 전까지 중간 처리는 지연(lazy)된다. 최종 처리가 시작되면 비로소 컬렉션의 요소가 하나씩 중간 스트림에서 처리되고 최종 처리까지 오게 된다.

 

stream인터페이스에서는 필터링, 매핑 ,정렬 등의 많은 중간 처리 메소드가 있는데, 이 메소드들은 중간 처리된 스트림을 리턴한다.

그리고 이 스트림에서 다시 중간 처리 메소드를 호출해서 파이프라인을 형성하게 된다.

 

예를 들어, 회원 컬렉션에서 남자만 필터링하는 중간 스트림을 연결하고, 다시 남자의 나이로 매핑하는 스트림을 연결한 후, 최종 남자 평균 나이를 집계한다면 다음 그림처럼 파이프라인이 형성된다.

 

위의 내용을 자바코드로 작성하면 아래와 같다.

 

좌측의 로컬 변수를 생략하고 메소드 체이닝으로 연결하면, 우측의 사진처럼 파이프라인 코드만 남는다.

 

package org.example.chapter16;

import java.util.Arrays;
import java.util.List;

public class StreamPipelinesExample {

    public static void main(String[] args) {
        List<Member> list = Arrays.asList(
                new Member("홍길동",Member.MALE, 30),
                new Member("김나리",Member.FEMALE, 20),
                new Member("신용권",Member.MALE, 45),
                new Member("박수미",Member.FEMALE, 27)
        );

        double ageAvg = list.stream()
                .filter(m -> m.getSex() == Member.MALE)
                .mapToInt(Member :: getAge)
                .average()
                .getAsDouble();

        System.out.println("남자 평균 나이: " + ageAvg);
    }

}
package org.example.chapter16;

public class Member {
    public static int MALE = 0;
    public static int FEMALE = 1;

    private String name;
    private int sex;
    private int age;

    public Member(String name, int sex, int age) {
        this.name = name;
        this.sex = sex;
        this.age = age;
    }

    public int getSex() {
        return sex;
    }

    public int getAge() {
        return age;
    }
}

 

 

중간 처리 메소드와 최정 처리 메소드

 

중간처리 메서드와 최정 처리 메소드를 쉽게 구분하는 방법은 리턴 타입을 보면 된다.

리턴 타입이 스트림이라면 중간 처리 메소드이고, 기본 타입이거나 OptionalXXX라면 최종 처리 메소드이다.

소속된 인터페이스에서 공통의 의미는 Stream, IntStream, LongStream, DoubleStream에서 모두 제공된다는 의미이다.

 


 

추가적으로 참고할만한 블로그

 

JAVA Stream 병렬 처리와 성능 분석