병렬 처리(Parallel Operation)란 멀티 코어 환경에서 하나의 작업을 분할해 각각의 코어가 병렬적으로 처리하는 것이다.
자바7 이전에는 데이터 컬렉션을 병렬 처리하기 위해서는 데이터를 분할하고 각각의 스레드로 할당 해야한다. 하지만 스레드는 경쟁 상태(race condition)가 발생할 수 있어 동기화가 필요하고, 마지막에는 각 스레드에서 발생한 부분 결과를 하나로 합치는 과정이 필요하다.
하지만 병렬 스트림과 포크/조인 프레임워크를 사용하면 쉽게 병렬 처리가 가능하다
둘 다 멀티 스레드의 동작 방식이라는 점은 동일하지만 목적이 다르다.
멀티 작업을 위해 멀티 스레드가 번갈아가며 실행하는 성질이다. 싱글 코어 CPU를 이용한 멀티 작업은 병렬적으로 실행되는 것처럼 보이지만 실제로는 동시성 작업이다.
병렬성은 멀티 작업을 위해 멀티 코어를 이용해 동시에 실행하는 성질이다.
병렬 스트림은 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. 때문에 병렬 스트림을 이용하면 멀티 코어 프로세서가 각각의 청크를 처리하도록 할당이 가능해진다.
parallelStream()을 호출하면 컬렉션에서 바로 병렬 스트림을 리턴한다.
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
list.parallelStream().reduce(0, Integer::sum);
병렬 스트림은 내부적으로 ForkJoinPool을 사용하는데, ForkJoinPool은 프로세서 수(Runtion.getRuntime().availableProcessors())의 스레드를 가진다.
이를 변경할 수는 있지만 모든 병렬 스트림 연산에 영향을 주기 때문에 기본값을 사용하는 것을 권장한다.
순차 스트림에 parallel()메소드를 호출하면 기존 스트림을 병렬로 변경이 가능하고, 병렬 스트림에 sequential()메소드를 호출하면 순차 스트림으로 변경할 수 있다.
이 메소드를 호출하면 내부적으로 병렬로 수행해야 함을 의미하는 불리언(boolean) 플래그가 설정되게 된다.
메소드를 통해 연산 마다 병렬 실행과 순차 실행을 수행할 지 제어할 수 있게 된다.
stream.parallel()
.map() // 병렬 실행
.sequential()
.reduce(); // 순차 실행
병렬 스트림이 항상 순차 스트림보다 빠른 것은 아니다.
병렬 스트림은 병렬화하기 위해 스트림을 재귀적으로 분할하고, 스레드를 할당하고, 최종적으로 부분 결과를 하나로 합치는 과정이 필요하기 때문에 오히려 속도가 느릴 수 있다.
병렬화할 수 있는 작업을 재귀적으로 서브 데이터로 분할한 다음 각각의 결과를 합쳐 전체 결과를 만들도록 설계되었다.
병렬스트림 내부에서 포크/조인 프레임워크를 활용하고 있기 때문에 정확한 동작방식을 이해하면 병렬스트림을 쉽게 이해할 수 있다.
스레드 풀을 사용하기 위해서는 RecursiveTask<R>의 서브 클래스를 구현하면 된다. 여기서 R은 병렬화를 통해 연산된 결과이다.
RecursiveTask를 구현하려면 compute() 추상메소드를 구현하면 된다. 이 메소드는 테스크를 서브 테스크로 분할하는 로직, 더이상 분할이 불가능할 때 서브 테스크의 결과를 생산할 알고리즘을 정의한다.
if(Task is small) { // 테스크가 작아 분할이 불가능
Execute the task // 순차적으로 테스크 계산
} else {
// 테스크를 두 서브 테스크로 분할
ForkJoinTask first = getFirstHalfTask();
first.fork();
ForkJoinTask second = getSecondHalfTask();
second.compute();
// 모든 서브 테스크의 연산이 완료될 때까지 기다리고, 결과를 합친다.
first.join();
}
각각의 서브 테스크의 크기가 작아질 때까지 재귀적으로 테스크를 분할한다. 더이상 분할이 불가능하면 서브 테스크를 병렬로 수행하고 나온 부분 결과를 조합해 최종 결과를 만든다.
import java.util.concurrent.RecursiveTask;
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 1000;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
// 서브 테스크를 재귀적으로 만들기 위해 사용할 비공개 생성자
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int len = end - start; // 서브 테스크의 배열의 길이
// 정해진 기준값 이하로 배열의 길이가 줄어들 경우 결과를 반환한다.
if (len <= THRESHOLD) {
return computeSequentially();
}
// 왼쪽 절반으로 분할
ForkJoinSumCalculator left = new ForkJoinSumCalculator(numbers, start, start + len / 2);
left.fork(); // ForkJoinFool의 다른 스레드로 새로 생성한 테스크를 비동기로 실행
// 나머지 절반을 분할
ForkJoinSumCalculator right = new ForkJoinSumCalculator(numbers, start + len / 2, end);
Long rightRet = right.compute(); // 두 번째 서브 테스크를 동기로 실행한다.
Long leftRet = left.join(); // 첫 번째 서브 테스크의 결과를 읽거나, 아직 결과가 나오지 않았다면 대기
return leftRet + rightRet; // 두 서브 테스크의 결과를 합해 반환환
}
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
long ret = new ForkJoinPool().invoke(task); // 결과 반환
만약 서브 테스크를 1000개 이상으로 분리하면 각각의 테스크에 CPU을 할당할 수 없어 낭비같아 보일 수 있다. 하지만 실제로는 가능한 많이 분할하는 것이 좋다.
작업 훔치기(work stealing) 기법은 모든 스레드를 거의 공정하게 분할하게 된다. 각각의 스레드는 자신에게 할당된 테스크를 포함하는 이중 연결리스트를 참조하고, 작업이 끝날 때마다 큐의 헤드에서 다른 테스크를 가져와 작업을 처리한다.
한 스레드는 다른 스레드보다 빠르게 작업을 처리하게 되면 다른 스레드 큐의 꼬리(tail)에서 작업을 훔쳐온다. 모든 큐가 빌 때까지 과정을 반복하게 되는데 이 때문에 테스크의 크기를 작게 나누어야 스레드 간의 작업 부하를 비슷하게 유지할 수 있게 된다.
'분할할 수 있는 반복자'라는 뜻으로 Iterator처럼 요소 탐색 기능을 제공한다는 점은 동일하지만 병렬 작업에 특화되어 있다. 탐색하는 데이터를 어떻게 병렬화할지 정의한다.
컬렉션 프레임워크는 디폴트 Spliterator 구현을 제공하고 컬렉션은 Spliterator 인터페이스를 구현한다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action); // 탐색할 요소가 남았는지 여부
Spliterator<T> trySplit(); // 일부 요소를 분할해 두 번째 Spliterator를 생성
long estimateSize(); // 탐색해야 할 요소 수 정보
int characteristics(); // 특성 집합 정보
}
[Function] function 자세한설명 (0) | 2023.04.06 |
---|---|
[Optional] Optional클래스 (0) | 2023.04.05 |
[Function] 사용방식 (0) | 2023.04.05 |
[이넘] 윤달구하기 일수의 총합 (0) | 2023.04.05 |
[쓰레드 넘버프린트] wait notify 교대로 가다가 안되는거 해결 (0) | 2023.04.05 |
댓글 영역