Notice
Recent Posts
Recent Comments
Link
개발자는 기록이 답이다
패스트캠퍼스 환급챌린지 23일차 미션 (2월 23일) : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지 강의 후기 본문
패스트캠퍼스
패스트캠퍼스 환급챌린지 23일차 미션 (2월 23일) : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지 강의 후기
slow-walker 2024. 2. 23. 23:27content
Q. Publisher와 연산자 사이에 어떻게 값들을 전달해야 할까?
Java에서 Publisher와 연산자 사이에 값을 전달하려면 Reactive Streams 프로토콜을 따라야 한다.
Reactive Streams는 비동기 및 이벤트 기반 애플리케이션을 위한 표준 인터페이스를 제공하는 스펙이다.
일반적으로 Publisher에서 생성한 데이터는 연산자를 통해 가공되고, 이후 Subscriber에게 전달된다.
ThreadLocal
- 하나의 쓰레드에 값을 저장하고 해당 쓰레드 내에서 어디서든지 접근 가능
- 만약 subscribeOn, publishOn으로 실행 쓰레드가 달라진다면?
ThreadLocal<String> threadLocal = new ThreadLocal!<>(); threadLocal.set("wooman");
Flux.create(sink !-> {
log.info("threadLocal: " + threadLocal.get()); sink.next(1);
})
.publishOn(Schedulers.parallel() ).map(value !-> {
log.info("threadLocal: " + threadLocal.get());
return value;
})
.publishOn(Schedulers.boundedElastic() ).map(value !-> {
log.info("threadLocal: " + threadLocal.get());
return value;
})
.subscribeOn(Schedulers.single() ).subscribe();
Thread.sleep(1000);
Context
- Context는 파이프라인내부 어디에서든접근 가능한 key value 저장소
- 특정key의 value에 접근하고key의value를 수정할 수 있는 수단을제공
- Map과유사
- 읽기전용(read only)인 ContextView와 쓰기(추가, 삭제, 갱신)를할수있는 Context로 구분
public interface ContextView {
<T> T get(Object key);
boolean hasKey(Object key);
boolean isEmpty();
int size();
}
public interface Context extends ContextView {
Context put(Object key, Object value);
Context delete(Object key);
Context putAll(Context context);
}
contextWrite
- Context를인자로받고Context를반환하는 함수형 인터페이스를 제공
- 이를 통해서 기존의 Context에 값을 추가하거나 변경, 삭제가능
- Context는immutable하기때문에각각의작 업은 새로운 Context를 생성
public final Mono<T> contextWrite(Function<Context, Context> contextModifier) {}
Context write
- subscribe부터시작하여점차위로올라가며 contextWrite를 만나면 실행하고 새로운 context를 생성해서 위에 있는 연산자에 전달
- context write는 subscribe부터 위로 전파
Flux.just(1)
.flatMap(v !-> ContextLogger.logContext(v, "1")) .contextWrite(context !->
context.put("name", "wooman")) .flatMap(v !-> ContextLogger.logContext(v, "2")) .contextWrite(context !->
context.put("name", "taewoo")) .flatMap(v !-> ContextLogger.logContext(v, "3")) .subscribe();
Context 초기화
- subscribe에 4번째인자로 초기값 전달 가능
- 이 경우에도subscribe부터 위로 전파
var initialContext = Context.of("name", "taewoo");
Flux.just(1)
.flatMap(v !-> ContextLogger.logContext(v, "1"))
.contextWrite(context !->
context.put("name", "wooman"))
.flatMap(v !-> ContextLogger.logContext(v, "2"))
.subscribe(null, null, null, initialContext);
Context read
- source에sink가있다면sink.contextView로 접근 가능
var initialContext = Context.of("name", “wooman");
Flux.create(sink !-> {
var name = sink.contextView().get("name");
log.info("name in create: " + name); sink.next(1);
})
.contextWrite(context !-> context.put("name", "taewoo")
).subscribe(null, null, null, initialContext);
defer
- publisher를생성하는Consumer를인자로 받아서 publisher를 생성하고
- 생성된publisher의이벤트를아래로전달
public static <T> Mono<T> defer(
Supplier<? extends Mono<? extends T!>> supplier)
Mono.defer(() !-> { return Mono.just(1);
}).subscribe(n !-> {
log.info("next: {}", n); });
defer와 flatMap
- Mono.defer는주어진supplier를실행해서 Mono를 구하고 해당 Mono로 이벤트를 전달
- v라는 값이 있을때 Mono.defer(() -> Mono.just(v))를 한다면?
- 이는결국Mono.just(v)와동일
- flatMap(v -> Mono.defer(() -> Mono.just(v)))
- = flatMap(v -> Mono.just(v)))
- = map(v -> v)
Mono.just(1)
.flatMap(v !-> Mono.defer(() !-> {
return Mono.just(v); })).subscribe(n !-> {
log.info("next: {}", n); });
deferContextual
- defer와비슷하지만아무런값을전달하지않 는 Consumer가 아닌 ContextView를 인자 로 받는 Function을 받는다
- Context를인자로받고생성된publisher의 이벤트를 아래로 전달
public static <T> Mono<T> deferContextual(
Function<ContextView, ? extends Mono<? extends T!>> contextualMonoFactory) {
Context read
- Mono.deferContextual가contextView를 인자로 전달하고 Mono를 반환값으로 받아서 Mono를 생성
- 이렇게생성된Mono를flatMap으로처리
Flux.just(1) .flatMap(value !-> {
return Mono.deferContextual(contextView !-> { String name = contextView.get("name"); log.info("name: " + name);
return Mono.just(value);
}); }).contextWrite(context !->
context.put("name", “taewoo") ).subscribe();
※ 본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성하였습니다. https://bit.ly/48sS29N