본글은 어라운드허브스튜디오 강의를 참고하여 작성하였다.
https://youtu.be/gpgOt9XM5EQ?si=ByuY6Xt-ZACy3uqv
ExecutorService와 @Async를 활용한 블로킹+비동기 작업이 아닌,
Spring Webflux기반의 논블로킹+비동기 작업에 대해 알아보려한다.
Reactive Stream
- Publisher : 데이터 생성 및 제공
- Subsciber : 데이터 수신 및 처리
- Subscription : 전달 받을 데이터의 흐름 조절
- Processor : Publisher 및 Subscriber의 기능 모두 포함
Reactive 선언문
- MEANS(방법) : Message Driven - 비동기 메시지 기반으로 Back Pressure가 필요
- FORM(형태) : Elastic(탄력성)&Resilent(회복성) - 확장성 있는 설계 & 장애 전파 격리
- VALUE(가치) : Responsive - 신속 일관성 있는 응답 제공
Publisher와 Subscriber 간 흐름

Reactor란
: Reactor는 Reactive Streams를 구현한 비동기 스트림 라이브러리
- BackPrssure 등의 리액티브 스트림즈의 명세 기능 지원
- Spring WebFlux에서 공식적으로 지원
Publisher
1. Mono : 1개의 요소를 방출하는데 특화된 Publisher 구현체
2. Flux : n개의 요소를 방출하는 Publisher 구현체

Buffer
: backpressure 관리 및 데이터 집합 처리에 주로 사용됨.
1. 크기 기반 Buffer : 데이터를 n개씩 모아서 전달
//1.크기기반
fastPublisher
.buffer(3)
.subscribe(bufferedData ->{
System.out.println("Buffered data: " + bufferedData); //1 2 3 , 4 5 6 , 7 8 9
});
2. 시간 기반 Buffer : 데이터를 n 시간동안 모아 전달
//2.시간기반
fastPublisher
.buffer(Duration.ofSeconds(1))
.subscribe(bufferedData ->{
System.out.println("Buffered data: " + bufferedData); //1 2 3 4 ,5 6 , 7 8 9 ...
});
3. 크기 및 시간 기반 Buffer : 위의 두개 짬뽕
//3.짬뽕
fastPublisher
.bufferTimeout(3, Duration.ofSeconds(1))
.subscribe(bufferedData ->{
System.out.println("Buffered data: " + bufferedData); //1 2 3 ,4 5 , 6 7 8 ...
});
기타.
- bufferUtil / bufferWhile
: Predicate가 true를 반환할때까지 버퍼를 생성
Take
: 데이터를 방출하는데에 초점(특정 조건이 되면 그만 보내게 통제하는 방식)
1. 크기 기반 take
//1. 크기기반 take
Flux<Integer> flux = Flux.range(1,10);
//처음 3개의 데이터만 방출
flux.take(3)
.subscribe(data -> Sytstem.out.println("Received: " + data"));
2. 시간 기반 take
//2. 시간기반 take
//500ms마다 데이터를 방출하는 flux
Flux<Integer> flux = Flux.interval(Duration.ofMillis(500));
//2초동안 데이터 방출
flux.take(Duration.ofSeconds(2))
.subscribe(data -> Sytstem.out.println("Received: " + data"));
3. 조건 기반 take : takeWhile
4. 취소 기반 take : takeUntil
5. 취소 신호 기반 : takeUntilOther (다른 publisher에서 신호가 발생할때까지 데이터를 방출하고, 신호가 발생하면 스트림 종료)
Subscriber
subscribe()
- initialContext를 통해 전파하는 유형의 subscribe()오버로딩 메소드
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,//일반적인 데이터 처리
@Nullable Consumer<? super T> errorConsumer,//error 처리
@Nullable Runnable complete Consumer,//complete 시그널 처리
@Nullable Context initialContext//Context를 생성하고 전파하여 특정 체인에서 이를 읽거나 수정 및 분기처리를 통한 활용 가능
)
- Basesubscriber를 통한 BackPressure 설정
1. 소비자가 데이터를 하나씩 소비
2. Publisher는 빠르게 데이터를 방출하고싶어도
3. 소비자가 요청하는 속도에 맞춰 데이터를 방출
Flux<Integer> fastPublisher = Flux.range(1,100)
.doOnRequest(n -> System.out.println("Requesting " + n + "items"))//요청할 때 로그 출력
.doOnNext(i->System.out.println("Publishing " + i);
//BackPressure 설정
BaseSubScriber<Integer> slowSubscriber = new BaseSubscriber<Integer>(){
@Override
protected void hookOnSubscribe(Subscription subscription){
//처음에는 1개의 데이터만 요청
request(1);
}
}
@Override
protected void hookOnNext(Integer value){
System.out.println("Received: " + value);
//데이터를 천천히 처리하는 시뮬레이션(1초 대기)
try{
Thread.sleep(1000);//데이터 처리하는게 걸리는시간 예시
}catch(InterruptedException e){
e.printStackTrace();
}
//하나의 데이터를 처리한 후 -> 다음 데이터 하나를 요청
request(1);
}
Reactor의 스케줄러
- Schedulers.immediate()
: 현재 실행중인 스레드에서 즉시 작업 실행
- Schedulers.single()
: 단일 스레드를 사용하여 작업을 처리하는 스케줄러(순차적으로 실행하므로 동기화 필요 작업에 쓰임)
- Schedulers.boundedElastic()
: 일정 수의 스레드까지만 생성하고 이를 넘어서면 재사용(기본적으로 CPU 코어 수의 10배수만큼 스레드 사용 / 최신 버전에서는 Virtual Thread도 사용 가능)
* I/O bound 작업에 사용 -> 네트워크/파일시스템 등의 입출력 작업에 주로 시간을 소모하는 작업(CPU는 대기상태에 머무름)
- Schedulers.parallel()
: 고정된 수의 스레드를 사용하여 병렬로 작업을 실행하는 스케줄러(CPU 많이쓰는 연산 작업에 적합)
* CPU bound 작업에 사용 -> 입출력 대기시간이 짧고, CPU 사용량이 큼(암복호화, 동영상 등)
- Schedulers.fromExecutorService(ExecutorService executor)
: ExecutorService를 기반으로 커스텀 스케줄러를 생성하는 방식
- Schedulers.newSingle(String name)
: 지정된 이름을 가진 단일 스레드 스케줄러를 생성(특정 작업을 독립적으로 처리하거나 디버깅 용도)
- Schedulers.newBoundedElastic(int threadCap, intqueuedTakskCap, String name)
: 스레드 최대수와 큐에 쌓일 수 있는 최대작업수를 설정하여 제한하는 스케줄러
- subscribeOn()
: 구독이 시작되는 시점의 스레드를 제어 : Upstream 작업을 어떤 스레드로 시작할지 결정(위치에 상관없이 이 스레드에서 실행) - publishOn()
: 데이터가 흐르는 도중 스레드를 전환할때 사용 : Downstream 작업을 어느 스레드에서 실행할지 결정(이 연산자를 만나는 시점 이후의 작업에만 영향을 미침)
//subscribeOn() 예시
Flux.range(1,5)
.map(i->{
return i;
})
.subscribeOn(Schedulers.boundedElastic())//상류작업부터 Main 쓰레드가 아닌 이 쓰레드에서 실행됨
.map(i->{
return i;
})
.subscribe(i->System.out.pintln("Received : " + i);
//publishOn() 예시
Flux.range(1,5)
.map(i->{
return i;
})
.publishOn(Schedulers.boundedElastic())//Main쓰레드가 아닌 이 쓰레드에서 실행됨
.map(i->{
return i;
})
.subscribe(i->System.out.pintln("Received : " + i);
'웹(Web)' 카테고리의 다른 글
| 2025 개발자 로드맵, 학습방법, AI로 나만의 로드맵 만들기 (1) | 2024.12.29 |
|---|