6. application.properties 설정
# 접속할 DB IP 주소
spring.data.mongodb.host=localhost
# 접속할 DB port 번호
spring.data.mongodb.prot=27017
# 접속할 계정이 위치한 DB 이름
spring.data.mongodb.authentication-database=kahyun_webflux
spring.data.mongodb.uri=mongodb://localhost:27017/kahyun_webflux
7. Vo
@Data
@Document(collection = "board")
public class BoardVO {
@Id
private String id;
private String seq;
private String title;
private String content;
private String authorId;
private LocalDateTime create_dt;
private LocalDateTime update_dt;
}
- NoSQLBooster for MongoDB
- mongoDB 다운로드
- Reactive
- Blocking I/O
- Non-Blocking I/O
- Spring MVC, Spring WebFlux
- Reactor
- Marble Diagram
- Publisher
- Sequence
- Backpressure
- Sinks
- Scheduler
- Context
- Responsive (응답성)
- Resilient (회복성, 시스템 장애 -> 응답성 유지)
- Elastic (탄력성, 작업량의 다양한 변화 -> 응답성 유지)
- Message Driven (메시지 기반 동작)
- 데이터 소스에 변경이 있을 때마다 데이터를 전파
- 선언형 프로그래밍 패러다임 -> 실행할 동작을 구체적으로 명시하지 않고 목표만 정함
- 함수형 프로그래밍 기법 사용
명령형 프로그래밍 예시
List<Integer> numbers = Arrays.asList(...);
int sum = 0;
for(...) {
if(...) {
sum += number;
}
}
선언형 프로그래밍 예시
List<Integer> numbers = Arrays.asList(...);
int sum = numbers.stream()
.filter( )
.mapToint(number -> number)
.sum();
리액티브 프로그래밍을 표준화한 명세
- Publisher : 데이터를 통제하는 주체
- Subscriber : 퍼블리셔에서 통제한 데이터를 구독하는 구독자
- Subscription : 구독 자체를 정의해놓은 인터페이스
- Processor : Publisher와 Subscriber 역할을 동시에 할 수 있는 인터페이스
Reactive Streams를 구현한 구현체
RxJava
Java 9 Flow API
Akka Streams
Reactor
그 외 RxJs, RxScala, Rx Android 등의 리액티브 확장
- 작업 쓰레드(지점 API)가 종료될 때까지 요청을 한 쓰레드(본사 API)는 차단된다.
- 이러한 단점 보완 -> 멀티 쓰레딩 기법을 통해 추가 쓰레드 할당 가능
- CPU 대비 많은 수의 쓰레드를 사용하는 애플리케이션은 비효율적
- 컨텍스트 스위칭(Context Switching)으로 인한 쓰레드 전환 비용 발생
- 메모리 사용에 있어서 오버헤드 발생
- 쓰레드 풀에서의 응답 지연 문제 발생
서블릿 기반의 애플리케이션이 쓰레드를 사용하고 사용이 끝난 쓰레드를 다시 쓰레드 풀에 반납하는 과정에서 응답이 지연된다.
- 쓰레드 풀에 아직 사용되지 않은 유휴 쓰레드가 없을 경우 응답 지연 시간이 길어진다.
- 쓰레드가 반납된다 하더라도 반납된 쓰레드가 사용 가능하도록 전환되는 과정에서 응답 지연이 발생한다.
- 실행 시점과 Idle Time의 종료 시점 불일치
* PCB에 저장하고 PCB에서 로드하는데 추가 시간이 들어가기 때문에 실행 시점과 유휴 시점이 딱 들어맞지 않는다.
* PCB에 저장하고 PCB에서 로드하는 시간동안 CPU는 대기하기 때문에 일을 할 수 없다.
* 컨텍스트 스위칭이 잦을수록 전체 대기 시간이 길어지므로 성능이 저하된다.
하나의 프로세스에 여러 개의 쓰레드(프로세스의 자식 개념)가 번갈아 가면서 실행
프로세스와 마찬가지로 쓰레드 같은 경우에도 번갈아 가면서 실행이 되기 때문에 컨텍스트의 스위치 비용이 발생한다.
- 작업 쓰레드의 종료와 상관없이 요청을 한 쓰레드는 차단되지 않는다.
- 적은 수의 쓰레드를 사용하기 때문에 쓰레드 전환 비용이 적게 발생한다.
- 따라서 CPU 대기 시간 및 사용량에 있어서도 효율적이다.
- CPU를 많이 사용하는 작업이 포함되어 있을 경우에는 성능 향상에 악영향을 준다.
- 사용자의 요청에서 응답까지의 과정에 Blocking I/O 요소가 포함되어 있을 경우 Non-Blocking의 이점을 발휘하기 힘들다.
spring MVC | Spring WebFlux |
---|---|
Blocking I/O 방식 | Non-Blocking I/O 방식 |
요청 당 하나의 Thread 사용 (OneRequest OneThread 모델) | 하나의 Thread로 대량의 요청 처리 가능 |
과도한 Thread 사용으로 인한 CPU 대기 시간이 늘어나고 메모리 사용에서 오버헤드가 발생한다. | 적은 수의 Thread를 사용하므로 CPU와 메모리를 효율적으로 사용한다. |
10년 이상 주도적으로 사용된 명령형 프로그래밍 기반 기술 | CPU를 많이 사용하는 복잡한 계산을 요청할 경우 다른 요청들을 처리할 수 없다. |
요청에서 응답까지 Fully Non-Blocking이어야 진정한 효과를 발휘한다. | |
선언형 프로그래밍 |
- Spring 5부터 지원하는 리액티브 웹 프레임워크
- 비동기(병렬적 처리) Non-Blocking I/O(입출력) 방식으로 적은 수의 Thread를 사용한다.
- Reactive Streams의 구현체 중 하나인 Reactor에 의존하여 비동기 로직을 구성하고 리액티브 스트림을 제공한다.
- Reactor 기반이지만 RxJava 등 다른 리액티브 확장 라이브러리를 쉽게 적용할 수 있다.
- Event Loop 방식을 통한 Spring WebFlux의 Non-Blocking Process
- Spring WebFlux를 사용하기 적합한 시스템
- Blocking I/O 방식으로 처리하는데 한계가 있는 대량의 요청 트래픽이 발생하는 시스템
- 마이크로 서비스 기반 시스템
많은 수의 I/O 발생
- 스트리밍 시스템 또는 실시간 시스템
- 네트워크 접속이 느린 클라이언트의 요청 처리
- 리액티브 프로그래밍을 위한 리액티브 라이브러리
- Reactive Streams 스펙을 구현한 구현체 중 하나이다.
- Spring 에코 시스템에서 Reactive Stack의 기반이 되며 Spring WebFlux 프레임워크에 포함되어 있다.
- Fully Non-Blocking
클라이언트의 요청을 시작으로 데이터 액세스 레이어를 거쳐서 다시 클라이언트의 Response를 보낼 때까지 Blocking I/O가 전혀 개입하지 않는다는 의미
- Functional API(함수형 API)
Functional API를 사용해서 Publisher와 Subsriber 간에 상호작용을 한다.
reactive 프로그래밍의 특징 : Functional API 사용 - Sequences 타입
Reactor에서 데이터를 생산해서 내보내는 퍼블리셔 타입 2가지
- Non-Blocking 애플리케이션(대량의 Request) 제작에 특화
마이크로 서비스에 적합한 라이브러리
- backpressure 지원
- Publisher : 발행자, 게시자, 생산자, 방출자 (Emitter)
- Subscriber : 구독자, 소비자
- Emit : Publisher가 데이터를 내보내는 것
- Sequence : Publisher가 emit하는 데이터의 연속적인 흐름을 정의해놓은 것으로 Operator(연산자) 체인 형태로 정의된다.
Flux, just, map 같은 연산자 체인으로 구성된 코드
- subscribe : Publisher를 통해서 정의된 Sequence를 Subscriber가 구독하는 것
- Dispose : Subscriber가 Sequence 구독을 해지하는 것
- 퍼블리셔가 데이터를 생성
- Operator(ex.map)를 사용해서 데이터를 가공
- 최종적으로 가공한 데이터를 Subscriber에 전달
: 리액티브 프로그래밍에서 데이터 처리 흐름을 그림으로 시각화해놓은 다이어그램
- 0개 또는 1개의 데이터를 emit하는 Publisher (Compare with RxJava Maybe)
- 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit한다.
- 0 ~ N개의 데이터를 emit하는 Publisher
- 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit한다.
Concat / concatWith
ConcatWith : 앞 쪽에 있는 Publisher와 ConcatWith의 parameter로 입력된 Publisher를 연결
@Slf4j
public class FluxExample3 {
public static void main(String[] args) {
Flux<Object> flux =
// just operator의 경우 null값을 포함할 수 없지만 justOrEmpty의 경우 null 값 포함 가능
Mono.justOrEmpty(null)
.concatWith(Mono.justOrEmpty("Jobs"));
// ConcatWith 를 이용해서 Mono 와 Mono 를 결합해 새로운 Flux 를 생성할 수 있다.
flux.subscribe(data->log.info("# result : {}", data));
}
}
concat : concat operator의 parameter로 입력된 Publisher 들을 연결
@Slf4j
public class FluxExample4 {
public static void main(String[] args) {
Flux.concat(
Mono.just("Venus"),
Flux.just("Earth"),
Flux.just("Moon", "Sun"),
Flux.just("Mars"))
// 하나의 리스트 안에 각각의 데이터들이 담겨 리스트로 전달
.collectList()
.subscribe(planetList -> log.info("# Solar System : {}", planetList)
);
}
}
- subscriber의 구독 시점이 달라도 구독을 할 때마다 Publisher가 데이터를 emit하는 과정을 처음부터 다시 시작하는 데이터의 흐름
- Subscriber가 구독을 할 때마다 타임라인에 처음부터 emit된 모든 데이터를 받을 수 있다.
- 시퀀스 타임라인이 구독을 할 때마다 cld sequence가 하나씩 더 생성
@Slf4j
public class ColdSequenceExample {
public static void main(String[] args) {
Flux<String> coldFlux = Flux.fromIterable(Arrays.asList("RED", "YELLOW", "PINK"))
.map(String::toLowerCase);
coldFlux.subscribe(country->log.info("# subscriber1 : {}", country));
log.info("-------------------------------");
coldFlux.subscribe(country->log.info("# subscriber2 : {}", country));
}
}
- Subscriber가 구독한 시점의 타임라인부터 emit된 데이터를 받을 수 있다.
- Publisher가 데이터를 emit하는 과정이 한 번만 일어나고 Subscriber가 각 구독 시점 이후에 emit된 데이터만 전달받는다.
- 구독이 여러 번 발생해도 타임라인은 하나만 생성
@Slf4j
public class HotSequence {
public static void main(String[] args) throws InterruptedException {
Flux<String> concertFlux =
Flux.fromStream(Stream.of("A","B","C","D","E"))
.delayElements(Duration.ofSeconds(1)).share();
// share -> 원본 Flux를 여러 subscriber가 공유하도록 한다. (cold sequence를 hot sequence로 변환해주는 operator)
concertFlux.subscribe(code -> log.info("# Subscriber {}", code));
TimeUnit.SECONDS.sleep(3);
concertFlux.subscribe(code -> log.info("# Subscriber 2 {}", code));
TimeUnit.SECONDS.sleep(4);
}
}
Publisher에서 emit되는 데이터들을 Subscriber 쪽에서 안정적으로 처리하기 위한 제어 기능
- 요청 데이터의 개수를 제어하는 방법
- Subscriber가 적절히 제어할 수 있는 수준의 데이터 개수를 Publisher에게 요청
- Backpressure 전략을 사용하는 방법
- Reactor에서 제공하는 Backpressure 전략을 사용
-
IGNORE
- Backpressure를 사용하지 않는다.
-
ERROR
- Downstream으로 전달할 데이터가 Buffer에 가득 찬 경우, Exception을 발생
-
DROP
-
Downstream으로 전달할 데이터가 Buffer에 가득 찰 경우, Buffer 밖에서 대기하는 먼저 emit된 데이터부터 DROP한다.
-
BUFFER가 비워질 때까지 하나씩 DROP
-
Buffer가 가득 찬 상태에서 데이터가 들어온 경우, 그 즉시 DROP
-
-
LATEST
-
Downstream으로 전달할 데이터가 Buffer에 가득 찰 경우, Buffer 밖에서 대기하는 가장 최근(나중)에 emit된 데이터부터 Buffer에 채운다.
-
Buffer가 가득 찬 상태에서 데이터가 들어온 경우 즉시 Drop 되지 않고 데이터 하나가 더 들어온 경우 폐기된다. (최신 데이터가 아닌 데이터가 폐기)
-
-
BUFFER 전략
-
Downstream으로 전달할 데이터가 Buffer에 가득 찰 경우, Buffer 안에 있는 데이터를 DROP 시킨다.
-
- Reactive Streams에서 발생하는 signal을 프로그래밍적으로 push 할 수 있는 기능을 가지고 있는 Publisher의 일종
- Thread-safe 가 보장되지 않을 수 있는 Processor 보다 더 나은 대안이 된다.
Thread-Safe
: 멀티스레드 환경에서 함수나 변수 같은 공유 자원에 동시 접근할 경우에도 프로그램의 실행에 문제가 없다.
Thread-Safe가 보장되지 않는다
: 여러 개의 스레드가 공유 변수에 동시 접근해서 올바르지 않은 값이 할당된다거나, 공유 함수에 동시에 접근할 때 교착상태에 빠지게 되는 것Processor의 경우 onNext, onComplete, onError 메소드를 직접적으로 호출하기 때문에 Thread-Safe 하지 않을 수 있다. Sinks의 경우에는 동시 접근을 감지하고 동시접근을 하는 Thread 중에서 하나가 빠르게 실패하기 때문에 Thread-Safe가 보장된다.
- Sinks는 Thread-Safe 하게 signal을 발생시킨다.
- Sinks는 Sinsk.Many 또는 Sinks.Oneinterface를 사용해서 Thread-safe하게 signal을 발생시킨다.
- 구독 시점에 데이터가 emit되는 영역과 emit된 데이터를 operator로 가공 처리하는 영역을 분리해서 손쉽게 멀티쓰레딩을 가능하게 한다.
구성
- operator 체인에서 Scheduler를 전환하는 역할을 하는 전용 operator
- Scheduler를 통해 생성되는 쓰레드 실행 모델을 지정하는 부분
- Scheduler를 위한 전용 Operator
- publishOn() : Operator 체인에서 Downstream Operator의 실행을 위한 쓰레드를 지정한다.
- subscribeOn() : 최상위 Upstream Publisher의 실행을 위한 쓰레드를 지정한다. 즉, 원본 데이터 소스를 emit 하기 위한 스케줄러를 지정한다.
- parallel() : Downstream에 대한 데이터 처리를 병렬로 분할 처리하기 위한 쓰레드를 지정한다.
② : parallel operator가 return 값으로 반환하는 parallelFlux라는 특별한 타입에서 지원하는 runOn operator를 사용해서 Scheduler를 지정하게 되면 이 시점에 병렬 작업을 시작하게 됨
③ : rail이라는 논리적인 작업 단위에서 분할되는 워크로드 처리
@Slf4j
public class ParallelExample01 {
public static void main(String[] args) {
Flux.fromArray(new Integer[] {1, 3, 5, 7, 9, 11, 13, 15})
.parallel() // 병렬로 처리하겠다는 정의
.subscribe(data -> log.info("# onNext : {}", data));
}
}
병렬로 처리되지 않고 메인 쓰레드에서 전부 처리가 됨
parallel 만 사용할 경우에는 병렬로 작업을 수행하지 않는다.
@Slf4j
public class ParallelExample03 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7, 9, 11, 13, 15, 17, 29})
.parallel()
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("{}", data));
Thread.sleep(100L);
}
}
runOn() 을 사용해서 Scheduler 를 할당해주어야 병렬로 작업을 수행한다.
CPU 코어 갯수 내에서 worker thread 를 할당한다.
@Slf4j
public class ParallelExample04 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
.parallel(4) // Thread 의 갯수 지정
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("{}", data));
Thread.sleep(100L);
}
}
CPU 코어 갯수에 의존하지 않고, worker thread 를 강제 할당한다.
- Operator 체인에서 최초의 thread는 subsribe()가 호출되는 scope에 있는 thread이다.
- Operator 체인에서 publishOn()이 호출되면 publishOn() 호출 이후의 Operator 체인은 다음 publishOn()을 만나기 전까지 publishOn()에서 지정한 Thread에서 실행이 된다.
- Operator 체인에서 publishOn()이 호출되면 publishOn() 호출 이후의 Operator 체인은 다음 publishOn()을 만나기 전까지 publishOn()에서 지정한 Thread에서 실행이 된다.

@Slf4j
public class SchedulerOperatorExample03 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("doOnNext : {}", data))
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> log.info("filter : {}", data))
.publishOn(Schedulers.parallel())
.map(data -> data * 10)
.doOnNext(data -> log.info("map : {}", data))
.subscribe(data -> log.info("onNext : {}", data));
Thread.sleep(500L);
}
}
- subscribeOn()은 최상위 Upstream publisher의 실행 쓰레드를 subscribe() 호출 scope의 쓰레드에서 subscribeOn()에서 지정한 쓰레드로 바꾼다.
메인 쓰레드를 다른 쓰레드로 바꿔주는 역할을 한다.

- subscribeOn()과 publishOn()이 같이 있다면, publishOn()을 만나기 전까지의 Upstream Operator 체인은 subscribeOn()에서 지정한 쓰레드에서 실행되고, publishOn()을 만날 때마다 publishOn() 아래의 Operator 체인 downstream은 publishOn()에서 지정한 쓰레드에서 실행된다.

@Slf4j
public class SchedulerOperatorExample05 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7,})
.subscribeOn(Schedulers.boundedElastic())
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter : {}", data))
.publishOn(Schedulers.parallel())
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map : {}", data))
.subscribe(data -> log.info("# doOnNext subscribe : {}", data));
Thread.sleep(500L);
}
}
- subscribeOn()이 publishOn() 뒤에 위치하든 상관없이 publishOn()을 만나기 전까지의 Upstream Operator 체인은 subscribeOn()에서 지정한 쓰레드에서 실행된다.

@Slf4j
public class SchedulerOperatorExample06 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7,})
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.subscribeOn(Schedulers.boundedElastic())
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# doOnNext subscribe: {}", data));
Thread.sleep(500L);
}
}
Schedulers 클래스의 정적 메소드로 제공됨
- Schedulers.immediate()
- 별도의 쓰레드를 추가 할당하지 않고, 현재 쓰레드에서 실행된다.
- Schedulers.single()
- 하나의 쓰레드를 재사용한다.
쓰레드를 하나만 생성해서 스케줄러가 제거되기 전까지 재사용, 저지연(low latency) 일회성 실행에 최적화 되어 있다.
- Schedulers.boundedElastic()
- 쓰레드 풀을 생성하여 생성된 쓰레드를 재사용한다.
생성된 Thread pool(executor Service 기반의 Thread Pool) 안에서 정해진 수만큼의 Thread를 사용해서 작업을 처리하고, 작업이 종료되면 해당 Thread를 반납해서 재사용하는 방식
- 생성할 수 있는 쓰레드 수에 제한이 있다. (Default, CPU 코어 갯수 * 10)
- 긴 실행 시간을 가질 수 있는 Blocking I/O 작업에 최적화 되어 있다.
subscribeOn()이라는 Scheduler 전용 Operator에서 주로 사용
실제 데이터베이스나 Http request 같은 blocking I/O 작업을 통해서 대량의 데이터를 데이터 소스로 사용하는 경우가 많음. 이런 경우 대량의 데이터를 입력으로 받아들이고 출력으로 내보는 경우 실행시간이 길어진다.
따라서 다른 Non-Blocking I/O 작업에 영향을 주지 않기 위해 BoundedElastic 같은 Scheduler를 이용해 Blocking I/O를 처리하기 위한 전용 Thread를 할당해서 작업 처리 시간을 효율적으로 사용할 수 있다. - Schedulers.paralle()
- 여러 개의 쓰레드를 할당해서 동시에 작업을 수행할 수 있다.
- Non-Blocking I/O 작업에 최적화 되어 있다.
CPU 코어 갯수 만큼의 Thread 생성
- Schedulers.formExecutorService()
- 기존의 ExecutorService를 사용하여 쓰레드를 생성한다.
- 의미있는 식별자를 제공하기 때문에 Metric에서 주로 사용된다.
- Schedulers.newXXX
- 다양한 유형의 새로운 Scheduler를 생성할 수 있다.
new Single()
,newParallel()
,newboundedElastic()
- Scheduler의 이름을 직접 지정할 수 있다.
- 다양한 유형의 새로운 Scheduler를 생성할 수 있다.
- Reactor Sequence 상에서 상태를 저장할 수 있고, 저장된 상태값을 Operator 체인에서 공유해서 사용할 수 있는 인터페이스이다.
- Context에 저장할 상태값은 key, value 형태로 저장이 된다.
- context에 값을 저장하기 위해서는 contextWrite()을 사용한다.
- Context에서 값을 읽어오기 위해서는 읽기 전용 뷰인 ContextView를 사용한다.
- ContextView는 Reactor Sequence에서 deferContextual() 또는 transformDeferredContextual()을 통해서 제공된다.
Context API : Context에 데이터를 저장
- put(key, value) : key/value 형태로 Context에 값을 쓴다.
- Context.of(key1, value1, key2, value2, ... ) : key/value 형태로 Context에 여러 개의 값을 쓴다.
- putAll(ContextView) : 파라미터로 입력된 ContextView를 merge한다.
- 기존의 Context와 파라미터로 입력받은 ContextView의 데이터를 하나로 합쳐서 새로운 Context를 리턴
- delete(key) : Context에서 key에 해당하는 value를 삭제한다.
ContextView API : Context에 저장된 데이터를 읽어올 때
- get(key) : ContextView에서 key에 해당하는 value를 반환한다.
- getOrEmpty(key) : ContextView에서 key에 해당하는 value를 Optional로 래핑해서 반환한다.
- key에 해당하는 값이 없을 경우 비어있는 optional로 리턴 가능
- getOrDefault(key, default value) : ContextView에서 key에 해당하는 value를 가져온다. key에 해당하는 value가 없으면 default value를 가져온다.
- hasKey(key) : ContextView에서 특정 key가 존재하는지 확인한다.
- isEmpty() : Context가 비어있는지 확인한다.
- size() : Context 내에 있는 key/value의 개수를 반환한다.
- Context는 각각의 Subscriber를 통해 Reactor Sequence에 연결되며 체인에서 각각의 Operator들이 실행 쓰레드가 달라도 연결된 Context에 접근할 수 있다.
- Context는 체인의 맨 아래에서부터 위로 전파된다.
- Context는 Downstream에서 Upstream으로 전파된다.
- Operator 체인에서 Context read 메소드가 Context write 메소드 밑에 있을 경우에는 write 된 값을 read 할 수 없다.
- 일반적으로 Context에 write 할 때(데이터를 저장하는 경우)에는 Operator 체인의 마지막에 둔다.
- 동일한 key에 대해서 write할 경우, 값을 덮어쓴다.
- 메인 Operator 내부에서 sequence를 생성하는 flatMap() 같은 Operator 내에서 write 된 Context의 값은 Inner Sequence 내부에서만 유효하고, 외부 Operator 체인에서는 보이지 않는다.
Spring MVC vs Spring WebFlux
Reactive Microservices With Spring Boot
https://poiemaweb.com/
완화하다
, 완충제
, 임시 저장 공간
하나의 장치에서 다른 장치로 데이터를 전송할 경우에 양자 간의 데이터의 전송 속도나 처리 속도의 차를 보상하여 양호하게 결합할 목적으로 사용하는 기억 영역
- 운영체제에서 Thread는 프로세스 내부에서 실행되는 작은 작업 단위이다.
- Thread는 process의 실행 흐름을 구성하는 단위로, 하나의 프로세스는 내부에 여러 개의 thread가 포함될 수 있다.
- 특징
- 프로세스들이 서로의 데이터에 접근하는 것이 직접적으로 불가능한 반면, 스레드는 프로세스 내부에 존재하기 때문에 프로세스의 데이터 영역에 접근이 가능하다. 또한 Thread끼리의 서로의 데이터에 접근 가능하다.
- Thread 또한 일종의 작업 단위이기 때문에 프로세스처럼 작업이 처리된다. 즉, Thread도 프로세스처럼 작업을 병렬로 처리해서 속도를 높일 수 있다.
- Thread는 일종의 함수로 구현된다. 데이터(지역 변수)를 다루고, 데이터를 관리하기 위한 stack 메모리 영역을 가진다. 이 stack 공간은 프로세스의 stack 메모리 영역과는 별개이며, Thread Stack이라고 부른다.
프로세스는 메모리 영역을 크게 code, data, stack, heap으로 나눌 수 있지만, Thread는 thread stack 메모리 공간만 가진다. (code, data, heap 영역 공유) Thread 간 컨텍스트 스위칭 시 보다 적은 정보를 저장하고 복원하기 때문에 프로세스 간 컨텍스트 스위칭 보다 적은 시간이 소요된다.
- 하나의 process는 내부에 여러 개의 Thread를 가질 수 있다.
장점 | 단점 |
---|---|
응답성(성능) 향상, 스레드 간의 작업 분할과 병렬 처리로 인해 사용자가 응용 프로그램을 원할하게 사용하는 데 빠른 응답성을 제공 | Thread 간의 상호 간섭, 멀티 쓰레드 실행의 경우, 쓰레드가 다른 쓰레드의 작업을 방해하거나 우선 순위 설정에 문제가 있을 경우 서로에게 영향을 미친다 |
자원 공유 효율성 향상, 하나의 프로세스 내에서 실행, 프로세스 내부에 있기 때문에 별도의 메모리 공간을 할당할 필요가 없음. (쓰레드는 프로세스 내부에서 thread stack 공간에 데이터를 관리한다.) | 성능 저하, 많은 쓰레드 생성 시 성능 저하 발생 가능, 각각의 스레드들이 병렬로 실행되기 위해 컨텍스트 스위칭이 빈번하게 발생한다. |
간결성, 작업을 분리할 수 있기 때문에 코드가 간결해질 수 있음 | 자원 소비, 쓰레드는 개별적인 실행 흐름을 가지기 때문에, Thread 마다 stack 및 레지스터 등의 메모리 자원을 소비한다. 따라서 쓰레드의 수가 증가하면 메모리 사용량도 증가하게 되어 시스템 자원이 한계에 도달할 수 있다. |
동기화 이슈, 여러 스레드가 공유 자원에 동시에 접근할 때 동기화 문제 발생 가능, 경쟁 상태(Race Condition) 문제는 Thread 간의 실행 순서나 타이밍에 따라 예측할 수 없는 결과가 발생할 수 있다. |
멀티 스레드는 하나의 프로세스 내부에서 여러 개의 스레드가 동시에 실행되는 것이다. 스레드끼리는 서로의 메모리 공간(thread stack)을 공유하고 접근할 수 있다. IPC 통신이 아닌 메모리 기반 통신을 사용하기에 통신 속도가 빠르다. 각 스레드들은 여러 자원을 공유하기에, 하나의 스레드에 문제가 생기면 나머지 스레드들도 영향을 받을 수 있다. 프로세스 내에서 스레드의 작업을 여러개로 분할(쪼개서) 병렬로 처리할 수 있다.
멀티 프로세스는 여러 개의 독립적인 프로세스가 동시에 실행되는 것이다. 각 프로세스는 독립된 메모리 공간을 가지며, 서로에게 접근하려면 IPC 기법을 사용해야 한다. 각 프로세스는 각각 고유한 자원을 관리하고 있어 서로에게 영향을 미치지 않는다. 하나의 프로세스 작업을 여러개로 분할(쪼개서) 병렬로 처리할 수 있다. 이때, 프로세스는 스레드 단위로 작업을 분할한다.
- 시간이 지남에 따라 변화하는 데이터를 의미한다. 메모리 사용률, CPU 사용률, 스레드 사용률 등등.. 시간에 따른 추이를 추적할 가치가 있는 데이터
참고
- Debug 모드를 활성화하는 방법(Globally)
- checkpoint() Operator를 사용하는 방법(Locally)
- log() operator를 사용해서 Reactor Sequence에서 발생하는 Signal을 확인하는 방법
- Debug 모드를 사용한 Debugging
- Debug 모드 시, Operator의 stacktrace capturing을 통해 디버깅에 필요한 정보를 측정한다.
- Hooks.onOperatorDebug()를 통해서 Debug 모드를 활성화 할 수 있다.
- Hooks.onOperatorDebug()는 Operator 체인이 선언되기 전에 수행되어야 한다.
- Debug 모드를 활성화하면 Operator 체인에서 에러 발생 시, 에러가 발생한 Operator의 위치를 알려준다.
- 사용이 쉽지만 애플리케이션 내 모든 Operator의 assembly(New Flux or Mono)를 캡쳐하기 때문에 비용이 많이 든다.
- checkpoint() Operator를 사용하는 방법
- 특정 Operator 체인 내에서만 assembly stacktrace를 캡쳐한다.
- checkpoint(description)를 사용하면 에러 발생 시, checkpoint(description)를 추가한 지점의 assembly stacktrace를 생략하고 description을 통해 에러 발생 지점을 예상할 수 있다.
- checkpoint(description, true) = checkpoint() + checkpoint("description")
- 에러 발생 시, 고유한 식별자 등의 description과 assembly stack trace(traceback)를 모두 출력한다.
- log() Operator 사용
- Flux 또는 Mono에서 발생하는 signal event를 출력해준다. (onNext, onError, onComplete, subscriptions, cancellations, requests)
- 여러 개의 log()를 사용할 수 있으며, Operator마다 전파되는 signal evnet를 확인할 수 있다.
- Custom Category를 입력해서 Operator마다 출력되는 signal event를 구분할 수 있다.
- 에러 발생 시, stacktrace도 출력해준다.
- stacktrace : 호출된 메서드에 대한 Stack Frame에 대한 리포트
- assembly : 새로운 Flux가 선언된 지점
- traceback : 실패한 operator의 stacktrace를 캡쳐한 정보