개발자는 기록이 답이다

패스트캠퍼스 환급챌린지 9일차 미션 (2월 9일) : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지 강의 후기 본문

패스트캠퍼스

패스트캠퍼스 환급챌린지 9일차 미션 (2월 9일) : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지 강의 후기

slow-walker 2024. 2. 9. 23:08

1. Reactive stream 구조

  • 데이터 혹은 이벤트를 제공하는 Publisher
  • 데이터 혹은 이벤트를 제공받는 Subscriber
  • 데이터 흐름을 조절하는 Subscription

2. Publisher

  • subscribe함수제공해서publisher에다수의 subscriber 등록 지원
  • subscription을포함하고Subscriber가추가 되면 subscription 제공
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber); }

 

3. Subscriber

  • subscribe하는시점에publisher로부터 subscription을 수 있는 인자 제공
  • onNext, onError, onComplete를 통해서 이나이트를을수있다
  • onNext는 여러 , onErroronComplete 만호출된다
public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
        public void onComplete();
}

 

4. Subscription

  • back-pressure를조절할수있는request함 수
  • Publisher가onNext를통해서값을전달하는 것을 취소할 수 있는 cancel 함수

 

public static interface Subscription { public void request(long n);
    public void cancel();
}

 

5. Publisher, Subscriber 연동하기

 

1) FixedIntPublisher

  • Flow.Publisher를구현
  • 고정된자의integer를전달하는publisher
  • 8개의integer를전달후complete
  • iterator를생성해서subscription을생성하 고 subscriber에게 전달
  • requestCount를세기위해서Result객체 사용
public class FixedIntPublisher
        implements Flow.Publisher<FixedIntPublisher.Result> {
        @Data
            public static class Result {
                private final Integer value;
                private final Integer requestCount;
            }
        @Override
        public void subscribe(Flow.Subscriber<? super Result> subscriber) { var numbers = Collections.synchronizedList(
        new ArrayList!<>(List.of(1, 2, 3, 4, 5, 6, 7))
        );
        Iterator<Integer> iterator = numbers.iterator();
        var subscription = new IntSubscription(subscriber, iterator); subscriber.onSubscribe(subscription);
}

2) IntSubscription

  • Flow.Subscription을구현
  • subscriberonNextsubscription
  • request가 동기적으로 동작하면 안되기 문 에 executor를 이용해서 별도의 쓰레드에서 실행
  • 요청수를count에저장하고결과에함전 달
  • 더이상iterator이없으면, onComplete 호출
private final Flow.Subscriber<? super Result> subscriber;
private final Iterator<Integer> numbers;
private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final AtomicInteger count = new AtomicInteger(1);
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
@Override
public void request(long n) { executor.submit(() !-> {
for (int i = 0; i < n; i!++) { if (numbers.hasNext()) {
int number = numbers.next();
numbers.remove();
subscriber.onNext(new Result(number, count.get()));
} else {
var isChanged = isCompleted.compareAndSet(false, true); if (isChanged) {
                    executor.shutdown();
                    subscriber.onComplete();
                    isCompleted.set(true);
}
break; }
}
        count.incrementAndGet();
    });
}
@Override
public void cancel() {
    subscriber.onComplete();
}

 

Hot Publisher

  • subscriber가 없더라도 이터를 생성하고 streampush하는 publisher
  • 트위터게시읽기,공유리소스변화
  • 여러 subscriber에게 동일한 이터 전달

Cold Publisher

  • subscribe가 시작되는 순간부터 데이 터를 생성하고 전송
  • 파일읽기, 웹API요청등
  • subscriber에 따라 독립적인 데이터 스 트림 제공

본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성하였습니다. https://bit.ly/48sS29N