Notice
Recent Posts
Recent Comments
Link
개발자는 기록이 답이다
패스트캠퍼스 환급챌린지 9일차 미션 (2월 9일) : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지 강의 후기 본문
패스트캠퍼스
패스트캠퍼스 환급챌린지 9일차 미션 (2월 9일) : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지 강의 후기
slow-walker 2024. 2. 9. 23:081. Reactive stream 구조
![](https://blog.kakaocdn.net/dn/uwzi4/btsEHHGOLem/Yi6ikRlQiKgeg4I2jrukNk/img.png)
- 데이터 혹은 이벤트를 제공하는 Publisher
- 데이터 혹은 이벤트를 제공받는 Subscriber
- 데이터 흐름을 조절하는 Subscription
2. Publisher
- subscribe함수제공해서publisher에다수의 subscriber 등록 지원
- subscription을포함하고Subscriber가추가 되면 subscription 제공
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber); }
3. Subscriber
- subscribe하는시점에publisher로부터 subscription을 받을 수 있는 인자 제공
- onNext, onError, onComplete를 통해서 값 이나이벤트를받을수있다
- onNext는 여러 번, onError나 onComplete 는딱한번만호출된다
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
4. Subscription
- back-pressure를조절할수있는request함 수
- Publisher가onNext를통해서값을전달하는 것을 취소할 수 있는 cancel 함수
public static interface Subscription { public void request(long n);
public void cancel();
}
5. Publisher, Subscriber 연동하기
1) FixedIntPublisher
- Flow.Publisher를구현
- 고정된숫자의integer를전달하는publisher
- 8개의integer를전달후complete
- iterator를생성해서subscription을생성하 고 subscriber에게 전달
- requestCount를세기위해서Result객체 사용
public class FixedIntPublisher
implements Flow.Publisher<FixedIntPublisher.Result> {
@Data
public static class Result {
private final Integer value;
private final Integer requestCount;
}
@Override
public void subscribe(Flow.Subscriber<? super Result> subscriber) { var numbers = Collections.synchronizedList(
new ArrayList!<>(List.of(1, 2, 3, 4, 5, 6, 7))
);
Iterator<Integer> iterator = numbers.iterator();
var subscription = new IntSubscription(subscriber, iterator); subscriber.onSubscribe(subscription);
}
2) IntSubscription
- Flow.Subscription을구현
- subscriber의onNext와subscription의
- request가 동기적으로 동작하면 안되기 때문 에 executor를 이용해서 별도의 쓰레드에서 실행
- 요청횟수를count에저장하고결과에함께전 달
- 더이상iterator에값이없으면, onComplete 호출
private final Flow.Subscriber<? super Result> subscriber;
private final Iterator<Integer> numbers;
private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final AtomicInteger count = new AtomicInteger(1);
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
@Override
public void request(long n) { executor.submit(() !-> {
for (int i = 0; i < n; i!++) { if (numbers.hasNext()) {
int number = numbers.next();
numbers.remove();
subscriber.onNext(new Result(number, count.get()));
} else {
var isChanged = isCompleted.compareAndSet(false, true); if (isChanged) {
executor.shutdown();
subscriber.onComplete();
isCompleted.set(true);
}
break; }
}
count.incrementAndGet();
});
}
@Override
public void cancel() {
subscriber.onComplete();
}
Hot Publisher
- subscriber가 없더라도 데이터를 생성하고 stream에 push하는 publisher
- 트위터게시글읽기,공유리소스변화 등
- 여러 subscriber에게 동일한 데이터 전달
Cold Publisher
- subscribe가 시작되는 순간부터 데이 터를 생성하고 전송
- 파일읽기, 웹API요청등
- subscriber에 따라 독립적인 데이터 스 트림 제공
본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성하였습니다. https://bit.ly/48sS29N