인프런 커뮤니티 질문&답변

덩더러러쿨님의 프로필 이미지
덩더러러쿨

작성한 질문수

Kevin의 알기 쉬운 RxJava 1부

리액티브 스트림즈(Reactive Streams)란?

Publisher와 Subscriber 간의 프로세스 흐름에 대한 질문

해결된 질문

작성

·

282

1

안녕하세요,

먼저, 5:50부터 그림으로 설명하는 Subscriber(소비자)가 subscribe 메소드를 호출하여 '데이터를 구독한다'라고 하셨습니다.

앞서 reactive stream 표준사양의 Publisher 인터페이스 내용에 subscribe 메소드 선언 부분이 있는데,

마치 Subscriber 객체가 subscribe() 메소드를 호출하는 식으로 들려서 잘 이해가 되지 않습니다;

(이후 설명해주신 흐름에 대해서도 Publisher 객체가 onSubscribe(), onNext()를 호출한다라는 것처럼 들립니다;

조금만 더 자세하게 설명해 주세요ㅜ

답변 3

3

Kevin님의 프로필 이미지
Kevin
지식공유자

안녕하세요? 낮에는 업무에, 저녁에는 아기를 재우느라 답변이 좀 늦었네요.

우선 질문하신 부분에 대해서 먼저 답을 드리도록 하겠습니다.

'Subscriber가 데이터를 구독한다' 라고 표현한것은 겉으로 보이는 코드만으로는 틀린 말이라고 볼 수 있습니다.

왜냐면 말씀하신대로 Observable.just("Hello").subscribe(data -> System.out.println(data)) 라는 이 코드상에서는 Publisher의 역할을 하는 Observable이 subscribe()를 호출하고 있으니까요.

그런데 소스 코드 안으로 조금 더 들어가 보면 Publisher인 Observable이 subscribe() 메서드를 호출 한다고 해서 바로 구독이 이루어지는 것은 아닙니다. 아래 코드는 Observable의 subscribe() 메서드의 내부인데요.

==== Observable.java ====

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer)
;
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);

RxJavaPlugins.onError(e);


throw npe;
}
}

위 코드에서 빨간색으로 된 두 라인 중에 첫번째 라인을 보시면 RxJavaPlugins 라는 유틸리티 클래스가 this 즉, Publisher인 Observable과 Subscriber인 Observer를 커플링 시켜주고 있는데요. 실제로 RxJavaPlugins.onSubscribe() 메서드 내부를 보면, 아래 처럼 로직이 진행되는 것을 볼 수 있습니다.

==== RxJavaPlugins.java ====

public static <T> Observer<? super T> onSubscribe(
Observable<T> source,
@NonNull Observer<? super T> observer) {
BiFunction<? super Observable,
? super Observer,
? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}

여기서 보시면 apply(f, source, observer) 가 보이시죠? 여기서 f 는 BiFunction 함수형 인터페이스인데요. 이 f 라는 함수형 인터페이스의 내부에서 source 즉, Observable과 Observer가 연결이 될거에요.

실제로 연결 되는 부분은 아래 소스에서 볼 수 있습니다.

==== ObservableJust.java ====

@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd =
new ScalarDisposable<T>(observer, value);
observer.onSubscribe(sd);
sd.run();
}

윗 쪽에서 아직 말씀드리지 않은 Observable.java 코드의 두번째 빨간색 라인을 보시면 subscribeActual() 메서드를 호출하고 있는데요. Observable은 추상 클래스이면서 ObservableSource라는 인터페이스를 구현을 하고 있습니다. subscribeActual()은 Observable을 상속하는 하위 클래스인 ObservableJust 클래스에서 구현을 하고 있는데 그게 바로 위에 보이는 코드입니다.

빨간색의 observer.onSubscribe(sd)라는 코드가 보이시죠? Publisher인 자신과 연결되어 있는(dependant) Subscriber의 onSubscribe() 메서드를 호출함으로 인해서 구독 준비가 되었음을 알리는겁니다.

설명이 좀 장황할지도 모르겠지만 결론적으로 Publisher 내부에서 전달받은 Subscriber의 메서드를 호출하기 때문에 결과적으로는 Subscriber가 구독을 한다라는 말은 틀린말이 되지 않는것입니다.

"(이후 설명해주신 흐름에 대해서도 Publisher 객체가 onSubscribe(), onNext()를 호출한다라는 것처럼 들립니다;" 

--> 이 말의 의미는 정확하게는 Publiser객체 내부에서 전달 받은 Subscriber 객체로 onSubscribe(), onNext()를 호출한다가 맞는것입니다.

제가 강의에서 이런식으로 설명을 하지 않은 이유는 강의 제목을 보시면 아시겠지만 알기 쉬운 RxJava가 되어야 되는데 이런 RxJava의 안쪽까지 깊게 들어가버리면 입문하시는 분들이 쉽게 지치고 포기를 할 가능성이 높거든요. ㅡ.,ㅡ

그래서 개념적인 부분을 최대한 쉽게 이해를 시켜드리고자 Publiser와 Subscriber 고유의 역할을 있는 그대로 말씀드리려는 경향이 있었는데 그게 오히려 질문자님을 혼란스럽게 만들어버렸나봐요.

사실 Kafka 같은 Pub/Sub 모델의 경우에는 Publisher와 Subscriber의 역할이 명확하고 독립적이어서 서로가 서로를 구체적으로 알지 못하고 알 필요도 없는데 리액티브 프로그래밍의 경우에는 Publisher와 Subscriber가 어느 정도 커플링이 되어 있다보니 충분히 헷갈릴 소지가 있을거라는 생각이 듭니다.

그리고 제 설명 역시 혼선을 주기에 충분한거 같구요.

아무튼 제 답변이 질문자님의 혼선을 조금이라도 해소할 수 있었으면 하는 바램 가져볼게요.

다른 궁금한 사항이 있으시면 언제든지 편하게 질문 주시구요. 

그럼 좋은 주말 보내시길 바랄게요. 감사합니다.

1

저도 혼란스러웠던 부분인데 이 글을 보고 정리가 되네요. 질문/답변 모두 감사합니다.

0

육아를 병행하시느라고 힘드실텐데 상세한 답변 너무 감사합니다ㅜㅜ!

아직 RxJava가 생소한지라 어렵게 느껴지네요ㅎㅎ;; 그래도 이렇게 좋은 강의와 강사님 덕분에 열심히 공부하도록하겠습니다!

Kevin님의 프로필 이미지
Kevin
지식공유자

아, 급하게 적느라 오타가 났네요. subscribeActual() 맞습니다!

그럼, 좋은 주말 보내세요~

덩더러러쿨님의 프로필 이미지
덩더러러쿨

작성한 질문수

질문하기