웹(Web)

Spring Webflux란

SK_MOUSE 2025. 12. 2. 00:03
반응형

본글은 어라운드허브스튜디오 강의를 참고하여 작성하였다.

https://youtu.be/gpgOt9XM5EQ?si=ByuY6Xt-ZACy3uqv

 

ExecutorService와 @Async를 활용한 블로킹+비동기 작업이 아닌,

Spring Webflux기반의 논블로킹+비동기 작업에 대해 알아보려한다.

Reactive Stream

- Publisher : 데이터 생성 및 제공

- Subsciber : 데이터 수신 및  처리

- Subscription : 전달 받을 데이터의 흐름 조절

- Processor : Publisher 및 Subscriber의 기능 모두 포함

 

Reactive 선언문

- MEANS(방법) : Message Driven - 비동기 메시지 기반으로 Back Pressure가 필요

- FORM(형태) : Elastic(탄력성)&Resilent(회복성) - 확장성 있는 설계 & 장애 전파 격리

- VALUE(가치) : Responsive - 신속 일관성 있는 응답 제공

 

Publisher와 Subscriber 간 흐름

Subscription 전달 및 요청

 

Reactor란

: Reactor는 Reactive Streams를 구현한 비동기 스트림 라이브러리

- BackPrssure 등의 리액티브 스트림즈의 명세 기능 지원

- Spring WebFlux에서 공식적으로 지원

 

Publisher

1. Mono : 1개의 요소를 방출하는데 특화된 Publisher 구현체

2. Flux :   n개의 요소를 방출하는 Publisher 구현체

Mono/Flux는 Publisher의 구현체다

 

Buffer

: backpressure 관리 및 데이터 집합 처리에 주로 사용됨.

 

1. 크기 기반 Buffer : 데이터를 n개씩 모아서 전달

//1.크기기반
fastPublisher
	.buffer(3)
    .subscribe(bufferedData ->{
    	System.out.println("Buffered data: " + bufferedData); //1 2 3 , 4 5 6 , 7 8 9
    });

 

2. 시간 기반 Buffer : 데이터를 n 시간동안 모아 전달

//2.시간기반
fastPublisher
	.buffer(Duration.ofSeconds(1))
    .subscribe(bufferedData ->{
    	System.out.println("Buffered data: " + bufferedData); //1 2 3 4 ,5 6 , 7 8 9 ...
    });

 

3. 크기 및 시간 기반 Buffer : 위의 두개 짬뽕

//3.짬뽕
fastPublisher
	.bufferTimeout(3, Duration.ofSeconds(1))
    .subscribe(bufferedData ->{
    	System.out.println("Buffered data: " + bufferedData); //1 2 3  ,4 5  , 6 7 8  ...
    });

 

기타.

- bufferUtil / bufferWhile

 : Predicate가 true를 반환할때까지 버퍼를 생성

 

Take

: 데이터를 방출하는데에 초점(특정 조건이 되면 그만 보내게 통제하는 방식)

 

1. 크기 기반 take

//1. 크기기반 take
Flux<Integer> flux = Flux.range(1,10);

//처음 3개의 데이터만 방출
flux.take(3)
	.subscribe(data -> Sytstem.out.println("Received: " + data"));

 

 

2. 시간 기반 take

//2. 시간기반 take
//500ms마다 데이터를 방출하는 flux
Flux<Integer> flux = Flux.interval(Duration.ofMillis(500));

//2초동안 데이터 방출
flux.take(Duration.ofSeconds(2))
	.subscribe(data -> Sytstem.out.println("Received: " + data"));

 

3. 조건 기반 take : takeWhile

4. 취소 기반 take : takeUntil

5. 취소 신호 기반 : takeUntilOther (다른 publisher에서 신호가 발생할때까지 데이터를 방출하고, 신호가 발생하면 스트림 종료)

 

 

Subscriber

subscribe()

- initialContext를 통해 전파하는 유형의 subscribe()오버로딩 메소드

public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,//일반적인 데이터 처리
@Nullable Consumer<? super T> errorConsumer,//error 처리
@Nullable Runnable complete Consumer,//complete 시그널 처리
@Nullable Context initialContext//Context를 생성하고 전파하여 특정 체인에서 이를 읽거나 수정 및 분기처리를 통한 활용 가능
)

 

- Basesubscriber를 통한 BackPressure 설정

 

1. 소비자가 데이터를 하나씩 소비

2. Publisher는 빠르게 데이터를 방출하고싶어도

3. 소비자가 요청하는 속도에 맞춰 데이터를 방출

Flux<Integer> fastPublisher = Flux.range(1,100)
	.doOnRequest(n -> System.out.println("Requesting " + n + "items"))//요청할 때 로그 출력
    .doOnNext(i->System.out.println("Publishing " + i);
    
//BackPressure 설정
BaseSubScriber<Integer> slowSubscriber = new BaseSubscriber<Integer>(){
	@Override
    protected void hookOnSubscribe(Subscription subscription){
     //처음에는 1개의 데이터만 요청
     request(1);
    }
}

@Override
protected void hookOnNext(Integer value){
	System.out.println("Received: " + value);
    //데이터를 천천히 처리하는 시뮬레이션(1초 대기)
    try{
    	Thread.sleep(1000);//데이터 처리하는게 걸리는시간 예시
    }catch(InterruptedException e){
    	e.printStackTrace();
    }
    
    //하나의 데이터를 처리한 후 -> 다음 데이터 하나를 요청
    request(1);
}

 

 

 


Reactor의 스케줄러

- Schedulers.immediate()

: 현재 실행중인 스레드에서 즉시 작업 실행

 

- Schedulers.single()

: 단일 스레드를 사용하여 작업을 처리하는 스케줄러(순차적으로 실행하므로 동기화 필요 작업에 쓰임)

 

- Schedulers.boundedElastic()

: 일정 수의 스레드까지만 생성하고 이를 넘어서면 재사용(기본적으로 CPU 코어 수의 10배수만큼 스레드 사용 / 최신 버전에서는 Virtual Thread도 사용 가능)

* I/O bound 작업에 사용 -> 네트워크/파일시스템 등의 입출력 작업에 주로 시간을 소모하는 작업(CPU는 대기상태에 머무름)

 

- Schedulers.parallel()

: 고정된 수의 스레드를 사용하여 병렬로 작업을 실행하는 스케줄러(CPU 많이쓰는 연산 작업에 적합)

* CPU bound 작업에 사용 -> 입출력 대기시간이 짧고, CPU 사용량이 큼(암복호화, 동영상 등)

 

- Schedulers.fromExecutorService(ExecutorService executor)

: ExecutorService를 기반으로 커스텀 스케줄러를 생성하는 방식

 

- Schedulers.newSingle(String name)

: 지정된 이름을 가진 단일 스레드 스케줄러를 생성(특정 작업을 독립적으로 처리하거나 디버깅 용도)

 

- Schedulers.newBoundedElastic(int threadCap, intqueuedTakskCap, String name)

: 스레드 최대수와 큐에 쌓일 수 있는 최대작업수를 설정하여 제한하는 스케줄러

 

  • subscribeOn()
    : 구독이 시작되는 시점의 스레드를 제어 : Upstream 작업을 어떤 스레드로 시작할지 결정(위치에 상관없이 이 스레드에서 실행)

  • publishOn()
    : 데이터가 흐르는 도중 스레드를 전환할때 사용 : Downstream 작업을 어느 스레드에서 실행할지 결정(이 연산자를 만나는 시점 이후의 작업에만 영향을 미침)
//subscribeOn() 예시
Flux.range(1,5)
	.map(i->{
    	return i;
    })
    .subscribeOn(Schedulers.boundedElastic())//상류작업부터 Main 쓰레드가 아닌 이 쓰레드에서  실행됨
    .map(i->{
    	return i;
    })
    .subscribe(i->System.out.pintln("Received : " + i);
    
//publishOn() 예시
Flux.range(1,5)
	.map(i->{
    	return i;
    })
    .publishOn(Schedulers.boundedElastic())//Main쓰레드가 아닌 이 쓰레드에서 실행됨
    .map(i->{
    	return i;
    })
    .subscribe(i->System.out.pintln("Received : " + i);

 

반응형