개발자는 기록이 답이다

패스트캠퍼스 환급챌린지 23일차 미션 (2월 23일) :  Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지 강의 후기 본문

패스트캠퍼스

패스트캠퍼스 환급챌린지 23일차 미션 (2월 23일) :  Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지 강의 후기

slow-walker 2024. 2. 23. 23:27

content

Q. Publisher와 연산자 사이에 어떻게 값들을 전달해야 할까?

Java에서 Publisher와 연산자 사이에 값을 전달하려면 Reactive Streams 프로토콜을 따라야 한다.

Reactive Streams는 비동기 및 이벤트 기반 애플리케이션을 위한 표준 인터페이스를 제공하는 스펙이다.

일반적으로 Publisher에서 생성한 데이터는 연산자를 통해 가공되고, 이후 Subscriber에게 전달된다.

ThreadLocal

  • 하나의 쓰레드에 값을 저장하고 해당 쓰레드 내에서 어디서든지 접근 가능
  • 만약 subscribeOn, publishOn으로 실행 쓰레드가 달라진다면?
ThreadLocal<String> threadLocal = new ThreadLocal!<>(); threadLocal.set("wooman");
Flux.create(sink !-> {
	log.info("threadLocal: " + threadLocal.get()); sink.next(1);
	})
    .publishOn(Schedulers.parallel() ).map(value !-> {
    log.info("threadLocal: " + threadLocal.get());
	return value; 
    })
    .publishOn(Schedulers.boundedElastic() ).map(value !-> {
	log.info("threadLocal: " + threadLocal.get());
	return value; 
    })
    .subscribeOn(Schedulers.single() ).subscribe();
Thread.sleep(1000);

Context

  • Context는 파이프라인내부 어에서든접 가능한 key value 저장소
  • 특정keyvalue에 접하고keyvalue를 수정할 수 있는 수단을제공
  • Map과유사
  • 기전용(read only)인 ContextView와 쓰기(추가, 삭제, 갱신)를할수있 Context로 구분
public interface ContextView {
	<T> T get(Object key);
    boolean hasKey(Object key);
    boolean isEmpty();
	int size(); 
}
public interface Context extends ContextView {
	Context put(Object key, Object value);
    Context delete(Object key);
	Context putAll(Context context);
}

 

contextWrite

  • Context를인자로Context를반환하는 함수형 인터이스를 제공
  • 이를 통해서 기Context에 값을 추가하거나 변경, 삭제가능
  • Contextimmutable하기때문에각각의작 업은  Context를 생성
public final Mono<T> contextWrite(Function<Context, Context> contextModifier) {}

 

Context write

  • subscribe부터시작하여점차위로올라가며 contextWrite를 만나면 실행하고 새로운 context를 생성해서 위에 있는 연산자에 전달
  • context writesubscribe부터 위로 전파
Flux.just(1)
.flatMap(v !-> ContextLogger.logContext(v, "1")) .contextWrite(context !->
context.put("name", "wooman")) .flatMap(v !-> ContextLogger.logContext(v, "2")) .contextWrite(context !->
context.put("name", "taewoo")) .flatMap(v !-> ContextLogger.logContext(v, "3")) .subscribe();

 

Context 초기화

  • subscribe에 4번째인자로 초기값 전달 가능
  • 이 경우에도subscribe부터 위로 전파
var initialContext = Context.of("name", "taewoo");
Flux.just(1)
	.flatMap(v !-> ContextLogger.logContext(v, "1")) 
    .contextWrite(context !->
		context.put("name", "wooman")) 
    .flatMap(v !-> ContextLogger.logContext(v, "2")) 
    .subscribe(null, null, null, initialContext);

 

 

Context read

  • source에sink가있다면sink.contextView로 접근 가능
var initialContext = Context.of("name", “wooman");
Flux.create(sink !-> {
var name = sink.contextView().get("name"); 
	log.info("name in create: " + name); sink.next(1);
	})
    .contextWrite(context !-> context.put("name", "taewoo")
).subscribe(null, null, null, initialContext);

defer

  •  publisher를생성하는Consumer를인자로 받아서 publisher를 생성하고
  • 생성된publisher의이벤트를아로전달
public static <T> Mono<T> defer(
Supplier<? extends Mono<? extends T!>> supplier)

 

Mono.defer(() !-> { return Mono.just(1);
}).subscribe(n !-> {
log.info("next: {}", n); });

 

deferflatMap

  • Mono.defer는주어진supplier를실행해서 Mono를 구하고 해당 Mono로 이벤트를 전달
 
  • v라는 값이 있을때 Mono.defer(() -> Mono.just(v))를 한다면?
  • 이는결Mono.just(v)와동일
  • flatMap(v -> Mono.defer(() -> Mono.just(v)))
  •  = flatMap(v -> Mono.just(v)))
  • = map(v -> v)
Mono.just(1)
.flatMap(v !-> Mono.defer(() !-> {
return Mono.just(v); })).subscribe(n !-> {
log.info("next: {}", n); });

 

deferContextual

  • defer와비하지만아무런값을전달하지않 는 Consumer가 아ContextView를 인자 로 Function는다
  • Context를인자로고생성된publisher의 이벤트를 아로 전달
public static <T> Mono<T> deferContextual(
Function<ContextView, ? extends Mono<? extends T!>> contextualMonoFactory) {

 

Context read

  • Mono.deferContextual가contextView를 인자로 전달하고 Mono를 반환값으로 받아서 Mono를 생성
  • 게생성된MonoflatMap으로처리
Flux.just(1) .flatMap(value !-> {
return Mono.deferContextual(contextView !-> { String name = contextView.get("name"); log.info("name: " + name);
return Mono.just(value);
}); }).contextWrite(context !->
context.put("name", “taewoo") ).subscribe();

※ 본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성하였습니다. https://bit.ly/48sS29N