게시글
질문&답변
2024.04.12
Backpressure Example 코드 질문드립니다
안녕하세요? 답변이 조금 늦었습니다. 육아를 병행하고 있어서 답변이 늦어진 점 양해 부탁드릴게요. Backpressure Error 전략은 Downstream 쪽에서 Upstream 쪽의 속도에 대응하지 못할 경우 에러를 발생시키는 전략인데요. 에러가 발생할 때 아래와 같은 에러 로그가 출력됩니다. reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...) 이 문장에서 bounded queue 가 일종의 버퍼를 의미하는데요. Downstream 쪽에서 내부적으로 데이터를 처리할 때 사용하는 버퍼라고 생각하시면 될 것 같습니다. 그런데 여기서의 버퍼는 Backpressure Buffer 전략에서의 Buffer와 조금 다른 동작을 합니다. Downstream 쪽에서 지연 시간을 짧게 주면 버퍼에 그만큼 더 빠르게 데이터가 쌓이게 되고, 지연 시간을 길게 주었을 때 데이터가 그만큼 천천히 쌓인다고 생각하시면 될 것 같습니다.(내부적으로 버퍼에 쌓이는 동작을 조절한다고 보시면 될 것 같습니다.) Backpressure의 내부 메커니즘을 100 퍼센트 다 이해하고 있다고 한다면 솔직히 거짓말일텐데 아무튼 쉽게 생각해서 다음 데이터를 처리하기 위해 넉넉한 시간을 가질 수 있다면 여유를 가지고 처리를 할 수 있는거라고 생각해주시면 좋을 것 같아요. 데이터를 처리할 준비가 안되어 있는데 무작정 데이터가 들어오면 과부하가 걸릴테니까요. 다만, 이 경우에는 여유는 생기겠지만 처리할 수 있는 데이터의 건 수는 그만큼 줄어들테니 적절한 조건을 찾아야 될테구요. 적절한 답변이 되셨으면 좋겠습니다. 감사합니다.
- 0
- 1
- 38
질문&답변
2024.04.05
Single과 관련해 여쭤보고 싶은 부분이 있습니다!
안녕하세요. 아침에 질문을 확인했는데 일정이 바빠서 답변이 조금 늦어졌네요. ^^ 1. Single은 전통적인 client - server 방식의 요청을 처리하는 데 사용한다고 말씀해주셨습니다. 실제로 회사에서 single.blocking() 이런 방식으로 처리하는 코드를 본 적이 있는데, single을 해당 의도처럼 사용할 경우, 동기처리 방식에 비해 얻는 이점이 있을까요?? 제가 아직 생각하기로는 코드는 리액티브이지만 비동기 방식으로 처리하는 것이 아닌 거 같아서요! --> Single을 전통적인 client - server 방식의 요청을 처리하는데 사용한다 라는 의미는 우리가 일반적으로 사용하는 HTTP request와 response가 하나의 HTTP message로 이루어져 있기 때문에 이걸 처리하기 적합하다는 의미이구요. single.blocking() 방식으로 코드를 본 적이 있다고 말씀해 주셨는데, 여러 개의 비동기 작업이 있어서 이 작업이 다 끝날 때 까지 동기적으로 기다려야 되는 상황이 아닌데 습관적으로 blocking()을 호출하는 방식이라면 바람직하지는 않다고 생각합니다. 즉, 비동기 방식을 습관적으로 동기 방식으로 전환한다면 RxJava 같은 리액티브 라이브러리를 사용하는 의미가 퇴색될 수 있는거라고 생각하시면 될 것 같아요. 또한 강의 영상 3분 정도에서 보여주신 SingleCreateExample 코드에서, 발행자인 Single의 create 메소드의 파라미터인 SingleOnSubscribe 가 구현하고 있는 subscribe가 아래 구독자가 구독하는 subscribe() 와 같은 함수가 맞을까요? 코드적으로는 둘이 연관이 있어 보여서, SingleObserver 와 SingleEmitter 사이의 같은 부모가 있는지 확인 해보았는데 그것두 아니더라구요 ㅠ... 둘이 아예 다른 메소드 인가요?? --> 일단 결론은 다른 메서드인데 서로 연관이 있습니다. (사진): 캡쳐한 이미지의 코드를 보시면 SingleCreate은 Single의 하위 클래스인데요. Single.create()을 호출하면 리턴됩니다. (1)의 SingleOnSubscribe는 (4)에서 subscribe()를 호출하는데 이 때의 subscribe()는 구독자가 구독 즉, subscribe()를 호출하면 데이터를 구독자에게 통지하기 위한 emitter를 전달 받는 역할을 한다고 보시면 될 것 같구요. (2)의 SingleObserver를 포함하고 있는 parent 객체가 (3)과 (4)에 서로 연결이 되는걸 볼 수 있는데요. 이 말은 구독자가 구독하면 발행자가 이 구독을 인지하고 데이터를 구독자에게 전달한다라는 의미와 같다고 보시면 될 것 같습니다. 좀 어렵죠? 코드 내부를 다 이해하실 필요는 없고 리액티브 프로그래밍의 Pub/Sub 모델은 서로 독립적인 관계가 아니라 서로 밀접하게 관계를 맺고 있다라는 사실만 일단 기억하시면 좋을 것 같아요. 4. 코드를 살펴보다가 RxJavaHooks 라는 유틸성 클래스를 발견하게 되었는데, 함수형 인터페이스들이 막 정의되어 있는 것은 알겠는데 어떻게 해석해야 할 지가 감이 안 잡히더라구요 ㅠㅠ... 혹시 코드를 해석하는데 조언을 주실 수 잇으실까요?.. --> Hook이라는 의미는 프로그래밍에서 종종 사용하는 용어인데요. 코드 상의 흐름 중간에 끼어 들어서 추가적인 작업을 한다는 의미입니다. RxJavaHooks를 이용해서 Observable이 생성될 때 로깅, 예외 처리, 디버깅 등의 추가적인 작업을 할 수 있도록 해줍니다. 학습하시는데 도움이 되셨길 바랍니다~
- 0
- 2
- 63
질문&답변
2024.03.29
Backpressure 전략
안녕하세요? 제가 바빠서 답변이 좀 늦었는데, AI 인턴이 먼저 답을 달아주었네요. ^^ 결론부터 말씀 드리자면 drop 되는 데이터는 폐기됩니다. Backpressure의 여러 전략 중 하나라고 생각해 주시면 되구요. 데이터를 drop 시키느냐 그렇지 않느냐는 요구 사항에 따라서 달라 질수 있다고 생각합니다. 일반적으로 데이터 신뢰성이 중요하기 때문에 데이터가 유실되지 않는 쪽이 바람직하겠지만 실시간 데이터 스트리밍 등의 예에서 빠른 응답을 위해 일부 데이터 유실을 허용할 수도 있으며, 시스템의 가용 리소스를 초과하는 데이터의 흐름이 발생했을 때, 과부하가 걸려서 시스템에 문제가 발생하는 것보다는 일부 데이터를 폐기하지만 시스템의 안정성을 유지하는데 도움이 될거라 생각합니다. 답변이 되셨으면 좋겠네요. 감사합니다.
- 0
- 2
- 62
질문&답변
2024.03.03
Flux 와 Mono
안녕하세요? WebFlux라는 이름의 의미에 대해서 질문을 주셨는데요. Reactor라는 리액티브 프로그래밍 기술에서 사용되는 데이터 타입은 크게 Mono와 Flux로 나뉘는데요. Flux는 Mono를 포함하고 있는 개념이기도 해서 아마도 Flux라는 이름을 채용하지 않았나 싶습니다. 단순히 WebFlux라는 이름때문에 Mono를 사용하지 않는다는 의미는 아닙니다. 실제로 일반적인 HTTP 요청/응답에서 가장 많이 사용되는건 Mono입니다. Mono는 데이터를 0건 또는 1건 포함하고 있다고 볼 수 있는데 REST API 기반 애플리케이션을 예로 들면, HTTP 요청 데이터는 한 건의 JSON 데이터이고, 응답 데이터도 한 건의 JSON 데이터인 것이 대부분입니다. Streaming 데이터는 한 건 이상일 수 있기 때문에 Flux를 사용하지만 대부분의 HTTP 요청과 응답은 Mono로 처리됩니다. 제가 드린 답변이 도움 되셨으면 좋겠네요. 감사합니다.
- 0
- 1
- 148
질문&답변
2024.03.03
Spring WebFlux 를 사용하기 적합한 시스템
안녕하세요? Spring MVC 기반의 애플리케이션에 Spring MVC 대신 Spring WebFlux를 사용해도 되는지에 대해서 질문 주셨는데요. 결론부터 말씀드리면 Spring MVC 대신에 Spring WebFlux를 사용해도 상관은 없습니다. 다만, Spring MVC는 기본적으로 Blockin I/O 기반이기 때문에 단순히 Spring MVC 대신에 Spring WebFlux를 사용하더라도 네트워크 요청이나 DB 요청까지 Non-Blocking I/O 기반이 아니라면 완벽하게 Non-Blocking I/O 방식으로 요청이 처리되지는 않을것입니다. Spring MVC로 감당이 되는 서비스인데 굳이 Spring WebFlux로 변경할 필요는 없을 것 같구요. 미래를 위해서 Spring WebFlux를 점진적으로 도입을 하는건 나쁘지 않다고 생각하지만 처음부터 아예 아무런 분석없이 Spring WebFlux를 바로 도입하는건 조금 고민을 해보아야하지 않을까 하는 생각이 듭니다. 실제로 스타트업에서 Spring WebFlux로 Spring MVC를 대체하는 경우를 보기는 했었지만 그런 케이스가 많을지는 잘 모르겠습니다. 답변이 조금 도움되시길 바래볼게요. 감사합니다.
- 0
- 1
- 202
질문&답변
2024.02.29
cold/hot publisher 예제 코드와 관련해 질문 드립니다.
안녕하세요? RxJava 학습하시다가 질문 주셨는데요. 아래에 답변 드리도록 하겠습니다. 그런데 해당 강의 마지막 부분에 cold/hot publisher 예제 코드에서 콘솔에 데이터가 출려되는 형태를 보면 구독자1이 데이터를 모두 소비하고 구독자2가 그 다음으로 데이터를 소비하는, 동기적으로 처리하는 것처럼 보이더라구요. 혹시 제 생각처럼 동기적으로 처리되고 있는 것이 맞을까요? --> 동기적으로 출력되는게 맞습니다. subscribe()에서 콘솔에 출력하는 곳에서 Thread.currentThread().getName() 으로 Thread 이름을 출력해보면 모두다 main 쓰레드에서 실행이 되는 걸 확인하실 수 있을 거에요. 그렇다면 사유가 궁금합니다 ㅠ --> RxJava나 Reactor 같은 리액티브 프로그래밍 라이브러리는 이벤트 기반의 비동기 프로그래밍 방식의 기술인데요. 비동기 라는 용어는 좁은 의미로는 Java의 동시성 즉, 멀티쓰레딩을 의미하고 넓은 의미로는 동기적으로 순차적으로 실행되지 않는 모든걸로 이해할 수 있습니다. 단순히 Java의 main() 메서드에서 RxJava로 데이터를 통지하고, 데이터를 구독하는것은 main 쓰레드에서만 실행되기 때문에 동기적으로 실행되는 것이구요. 2부에서 스케쥴러 를 학습하실텐데요. 스케쥴러를 통해서 비동기적으로 데이터를 통지하거나 데이터를 처리하는 방법에 대해서 학습 하시게 된다는 점 참고해주시면 좋을 것 같습니다. RxJava는 사실 단순 pub/sub 형태의 인터페이스를 구현한 구현체에 불과해서 RxJava를 이용해 소스코드를 작성하여도 이걸 비동기적으로 처리하기 위해서는 더 아랫단의 도움을 받아야 하는건지 그렇다면 이 코드를 실행하는 was의 영향을 받는건지 만약 그냥 java로 실행한다면 비동기적으로 실행할 수 없는 건지 아니면 RxJava는 동기적 비동기적으로 소스코드를 작성할 수 있는데, 그냥 현재 예제 코드가 동기적으로 코드를 작성한 것인지 아니면 메인 스레드가 구독자1과 2의 역할을 동시에 하고 있기 때문에 순차처리가 되는것인지 --> 1, 2, 3번 질문에는 한꺼번에 답변을 드리는게 좋을 것 같아요. 앞에서도 말씀드렸지만 비동기 처리라는 것은 일반적으로 Java에서 멀티 쓰레딩 기법 즉, 쓰레드를 여러개 사용해서 처리하는 것을 의미하는데요. Java에서 비동기 멀티 쓰레딩 이외에 NIO(Non-Blocking I/O)라는 기술이 있습니다. Non-Blocking I/O라는 것은 데이터 입출력 시, 쓰레드가 차단이 되지 않기 때문에 하나의 쓰레드에서 동시 다발적으로 발생하는 입출력을 처리할 수 있는데 이 Non-Blocking I/O 역시도 동기적으로 실행되는 것은 아니기 때문에 비동기적이다라는 표현을 많이 사용합니다. Non-Blocking I/O는 일반적으로 Non-Blocking I/O를 지원하는 WAS를 사용하구요. 단순히 Java 만으로 Non-Blocking I/O 기능을 사용하려면 Java의 NIO 기술을 사용하셔야 합니다. 그게 아니라면 멀티쓰레딩 기법을 이용하시면 됩니다. 또한 비동기에 대한 정의에 대해 여쭤보고 싶습니다. 비동기 처리라는 것을 스레드의 동작 방식을 제어하는 것이라고 이해하면 될까요?? 비동기 처리를 구현하기 위해서는 스레드 제어만 하면 되니까 OS단의 도움을 받지 않고 어플리케이션 단에서 구현을 할 수 있다고 이해해도 괜찮을까요?? --> 위에서 설명 드렸다시피 비동기에 대한 정의는 광범위한 의미까지 포함이 되어 있긴한데 Java에서 일반적으로 비동기 처리라고 하면 멀티 쓰레딩이라고 생각하셔도 무방할 것 같아요. RxJava 2부에서 스케쥴러를 학습하시면 RxJava에서 멀티 쓰레딩을 손쉽게 적용하실 수 있다는 걸 배우게 되실거에요. 강의에서는 RxJava의 기본 개념이나 연산자의 기본 동작에 대한 이해를 돕기 위해서 무조건적으로 스케쥴러를 사용해서 비동기적으로 처리하지는 않는다고 생각해 주시면 감사드리겠습니다.
- 0
- 1
- 150
질문&답변
2024.02.01
Sinks 와 thread-safe
안녕하세요? Sinks의 tread-safe 하다는 부분에 대해서 질문을 주셨네요. 먼저 아래 코드는 공식 문서에 나오는 Sinks 내용 중 일부인데요. (사진)쓰레드가 동시 접근하는 상황이 발생했을 때 쓰레드 중에서 동시 접근하는 쓰레드 중 하나를 빠르게 실패하게 함으로써 쓰레드 안전성을 보장한다라고 나와있습니다. 그리고 아래는 Sinks 내부 코드 중 일부인데요. (사진)- 먼저 (1)에서 쓰레드 관련된 문제 등을 포함해서 데이터를 emit할 때 발생할 수 있는 문제에 대해서 체크한 후, 데이터를 emit합니다. 만약에 어떤 문제가 감지되면, - (2)에서 EmitFailureHandler가 빠르게 emit을 실패시킵니다. 디폴트는 즉시 실패를 시키지만 EmiFaiureHandler 함수형 인터페이스를 구현할 때 더 시도할지 말지 여부를 지정할 수 있습니다. - 그리고 (3)에서 emit이 실패한 부분에 대해서 후처리를 하고 있습니다. 질문에 대한 답변이 되셨길 바랄게요.
- 0
- 1
- 93
질문&답변
2024.01.30
Mono 의 정의를 잘 모르겠습니다..
안녕하세요? 두 가지 질문을 주신걸로 여겨지는데요. 주신 질문에 간단하게나마 답변을 드리도록 하겠습니다. 저는 User를 리턴한게 아닌 Mono 를 리턴한건데 어떻게 포스트맨이 별도의 처리 없이 Mono 안의 User 값을 받을 수 있나요? --> Postman 뿐만 아니라 Spring에서 지원하는 RestTemplate 같은 Rest Client를 사용해서 Spring WebFlux의 엔드포인트를 호출해도 응답 데이터를 전달 받을 수 있습니다. 즉, 말씀하신대로 Spring WebFlux의 엔드포인트에서는 Mono 를 리턴하지만 클라이언트 쪽에서는 json으로 전달 받을 수 있습니다. 이렇게 전달 받을 수 있는 이유는 Postman이나 RestTemplate 같은 클라이언트 툴이 Blocking I/O 방식으로 동작하기 때문인데요. 클라이언트 쪽에서 Non-Blocking I/O를 지원하지 않는다면 서버 쪽에서는 내부적으로 Blocking I/O 방식으로 동작한다고 보시면 될 것 같습니다. 어떻게 Mono 를 리턴 타입으로 지정했는데, json을 전달 받을 수 있는지에 관해서는 두 번째 질문과도 연관이 있는데요. Spring WebFlux 프레임워크 내부에서 subscribe()가 호출이 되는게 맞습니다. 다만, subscribe()가 언제 호출되는지 시점의 차이가 있는데 RestTemplate 같이 Non-Blocking I/O를 지원하지 않는 Rest Client를 사용할 경우에는 서버 쪽 Spring WebFlux 내부에서부터 subscribe() 호출이 시작된다고 보시면 될 것 같구요. Non-Blocking I/O를 지원하는 WebClient를 이용하면 WebClient 쪽 즉, 클라이언트 쪽에서 subscribe()를 호출해야지만 서버 쪽 엔드포인트를 호출합니다. 이 말은 subscribe() 호출 시작이 클라이언트에서부터 시작한다는 의미와 같습니다. 추가로 스프링웹 플럭스가 컨트롤러쪽으로 Mono 나 Flux 타입이 리턴되면 subscribe() 를 자동으로 해준다는데 이게 맞는말인지 궁금합니다 --> 위에서 말씀 드렸지만 Spring WebFlux 프레임워크 내부에서 subscribe()를 해준다고 보시면 되는데, Rest Client가 Non-Blocking을 지원하면 클라이언트 쪽에서 subscribe()를 호출 해야지만 서버 쪽에서도 subscribe()가 호출되어서 Reactor Sequence가 동작을 합니다. 테스트 해 보시면 아시겠지만 WebClient로 서버쪽 엔드 포인트를 호출해서 response body 응답을 mono로 변환하는 부분(bodyToMono()) 다음 코드 체인으로 subscribe()를 호출하지 않으면 서버쪽 엔드 포인트를 호출하지 않습니다. 답변 드린 내용이 도움이 되셨으면 좋겠습니다. 감사합니다.
- 0
- 1
- 151
질문&답변
2024.01.11
혹시 다음강의부터는 ppt 한번에 묶어서 올려주실수있나요?
안녕하세요? 학습하시는데 번거롭게 해드려서 죄송합니다. 좋은 의견 주셔서 요청 주신대로 강의 자료를 섹션 당 1개의 PPT 파일로 합쳤습니다. 아래 Github Repository에서 받으실 수 있으세요. https://github.com/ITVillage-Kevin/spring-reactive-02-resources 감사합니다!
- 0
- 1
- 136
질문&답변
2024.01.06
CompletableObserver 클래스의 람다식 표현관련
안녕하세요? 아래 캡쳐 이미지에 있는 코드는 Completable 추상 클래스의 내부 코드 중 일부인데요. 파라미터로 CompletableObserver를 전달 받습니다. (사진) 아래 캡쳐 이미지 역시 Completable 추상 클래스의 내부 코드 중 일부이고, 파라미터로 두 개의 람다 표현식을 전달 받습니다. (사진) 즉, 파라미터가 다른 subscribe() 메서드가 Complatable 클래스 내부에 구현되어 있습니다. 첫번째 캡처 이미지에서는 파라미터로 전달 받은 CompletableObserver를 subscribeActual(observer)을 호출할 때 전달합니다. 두번째 캡쳐 이미지에서는 CallbackCompletableObserver의 생성자 파라미터로 람다를 전달한 후에 subscribe(observer)로 호출하는데 이렇게 호출하면 결국 내부에서 subscribeActual(observer)을 다시 호출합니다. 즉, CompletableObserver 인터페이스를 구현하는 클래스 내부에서 람다 표현식을 전달 받아서 사용할 수도 있고, 그렇지 않을 수도 있다라고 생각해 주시면 될 것 같습니다.
- 0
- 1
- 111