• 카테고리

    질문 & 답변
  • 세부 분야

    풀스택

  • 해결 여부

    해결됨

DoOnDisposeExample 예제에서의 스레드 관련 질문 (자문자답)

21.07.05 14:24 작성 조회수 112

0

안녕하세요,

해당 강의 이후에 있는 강의 중(RxJava : doOnEach, doOnCancel/doOnDispose, 그 밖의 doXXXXX 함수) 예제 DoOnDisposeExample.java를 다루고 있는데요

subscribe( )에서 오버라이딩된 onSubscribe( ) 내에 로그 출력 코드를 추가해보았습니다

- DoOnDisposeExample.java

public class DoOnDisposeExample {
public static void main(String[] args) {
Observable.fromArray(SampleData.carMakers)
.zipWith(
Observable.interval(300L, TimeUnit.MILLISECONDS),
(carMaker, num) -> carMaker
)
.doOnDispose(() -> Logger.log(LogType.DO_ON_DISPOSE, "# 생산자: 구독 해지 완료"))
.subscribe(new Observer<CarMaker>() {
private Disposable disposable;
private long startTime;
@Override
public void onSubscribe(Disposable disposable) {
this.disposable = disposable;
this.startTime = TimeUtil.start();
SimpleDateFormat date = new SimpleDateFormat("HH:mm:ss.SSS");
Logger.log(LogType.PRINT, "시작 시간 확인 : " + date.format(new Date(startTime)));
}

@Override
public void onNext(CarMaker carMaker) {
Logger.log(LogType.ON_NEXT, carMaker);
if(TimeUtil.getCurrentTime() - startTime > 1000L){
Logger.log(LogType.PRINT, "# 소비자: 구독 해지 , 1000L 초과");
disposable.dispose();
}
}

@Override
public void onError(Throwable error) {
Logger.log(LogType.ON_ERROR, error);
}

@Override
public void onComplete() {
Logger.log(LogType.ON_COMPLETE);
}
});


TimeUtil.sleep(2000L);
}
}

- result

구독자가 Observable을 구독할 때 스케줄러를 별도로 지정하지 않았으니 main 스레드에서 동작을 하는거라고 이해가 가는데,

나머지 onNext( ) 메소드를 처음 호출하는 부분부터 어째서 'Schedulers.computation()'을 지정하지 않았음에도 RxComputationThreadPool-1이 사용하는지 이해가 되지 않습니다.


* 자문자답 :
Observable의 interval( ) 메소드에서 Schedulers.computation()을 지정하는 코드를 확인했습니다!

- Observable.java

/**
* Returns an Observable that emits a sequential number every specified interval of time.
* <p>
* <img width="640" height="195" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/interval.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code interval} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param period
* the period size in time units (see below)
* @param unit
* time units to use for the interval size
* @return an Observable that emits a sequential number each time interval
* @see <a href="http://reactivex.io/documentation/operators/interval.html">ReactiveX operators documentation: Interval</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long period, TimeUnit unit) {
return interval(period, period, unit, Schedulers.computation());
}

답변 1

답변을 작성해보세요.

1

네, 제가 답변을 늦게 드리다보니 먼저 답을 찾으셨군요.ㅎ

말씀하신대로 Interval 연산자는 Default Scheduler가 Comptutation입니다.

이는 소스 코드 내부에서도 확인 가능하고, 또한 아래처럼 API Docs 에서도 확인 가능합니다.

오늘은 제가 말씀 드릴게 별로 없군요.ㅎ 다른 궁금한점 있으시면 또 질문주세요.

감사합니다!