DoOnDisposeExample 예제에서의 스레드 관련 질문 (자문자답)
안녕하세요,
해당 강의 이후에 있는 강의 중(RxJava : doOnEach, doOnCancel/doOnDispose, 그 밖의 doXXXXX 함수) 예제 DoOnDisposeExample.java를 다루고 있는데요
subscribe( )에서 오버라이딩된 onSubscribe( ) 내에 로그 출력 코드를 추가해보았습니다
- DoOnDisposeExample.java
public class DoOnDisposeExample {
public static void main(String[] args) {
Observable.fromArray(SampleData.carMakers)
.zipWith(
Observable.interval(300L, TimeUnit.MILLISECONDS),
(carMaker, num) -> carMaker
)
.doOnDispose(() -> Logger.log(LogType.DO_ON_DISPOSE, "# 생산자: 구독 해지 완료"))
.subscribe(new Observer<CarMaker>() {
private Disposable disposable;
private long startTime;
@Override
public void onSubscribe(Disposable disposable) {
this.disposable = disposable;
this.startTime = TimeUtil.start();
SimpleDateFormat date = new SimpleDateFormat("HH:mm:ss.SSS");
Logger.log(LogType.PRINT, "시작 시간 확인 : " + date.format(new Date(startTime)));
}
@Override
public void onNext(CarMaker carMaker) {
Logger.log(LogType.ON_NEXT, carMaker);
if(TimeUtil.getCurrentTime() - startTime > 1000L){
Logger.log(LogType.PRINT, "# 소비자: 구독 해지 , 1000L 초과");
disposable.dispose();
}
}
@Override
public void onError(Throwable error) {
Logger.log(LogType.ON_ERROR, error);
}
@Override
public void onComplete() {
Logger.log(LogType.ON_COMPLETE);
}
});
TimeUtil.sleep(2000L);
}
}
- result
구독자가 Observable을 구독할 때 스케줄러를 별도로 지정하지 않았으니 main 스레드에서 동작을 하는거라고 이해가 가는데,
나머지 onNext( ) 메소드를 처음 호출하는 부분부터 어째서 'Schedulers.computation()'을 지정하지 않았음에도 RxComputationThreadPool-1이 사용하는지 이해가 되지 않습니다.
* 자문자답 :
Observable의 interval( ) 메소드에서 Schedulers.computation()을 지정하는 코드를 확인했습니다!
- Observable.java
/**
* Returns an Observable that emits a sequential number every specified interval of time.
* <p>
* <img width="640" height="195" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/interval.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code interval} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param period
* the period size in time units (see below)
* @param unit
* time units to use for the interval size
* @return an Observable that emits a sequential number each time interval
* @see <a href="http://reactivex.io/documentation/operators/interval.html">ReactiveX operators documentation: Interval</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long period, TimeUnit unit) {
return interval(period, period, unit, Schedulers.computation());
}
Answer 1
1
네, 제가 답변을 늦게 드리다보니 먼저 답을 찾으셨군요.ㅎ
말씀하신대로 Interval 연산자는 Default Scheduler가 Comptutation입니다.
이는 소스 코드 내부에서도 확인 가능하고, 또한 아래처럼 API Docs 에서도 확인 가능합니다.

오늘은 제가 말씀 드릴게 별로 없군요.ㅎ 다른 궁금한점 있으시면 또 질문주세요.
감사합니다!
Single과 관련해 여쭤보고 싶은 부분이 있습니다!
0
264
2
cold/hot publisher 예제 코드와 관련해 질문 드립니다.
0
363
1
CompletableObserver 클래스의 람다식 표현관련
0
297
1
1강에 예시로 보여주신 ToDoSample 코드에 관해 질문 드립니다!
0
356
1
[질문] cascading operator 설명하실 때
2
614
1
_get 함수 부분이 너무 이해가 안갑니다 ㅠ
0
656
2
초급자 질문
0
501
1
CachedNetworkImage 로 인한 memory leak 문제..
0
943
2
병렬평가 질문 있습니다!
0
428
1
함수를 분리하지 않고 실행하게 되면 순환참조 오류가 발생합니다...
0
621
1
rxjs 와 차이점
0
567
1
거르기 함수 중 compact 질문드립니다.
0
475
1
return 있고 없고 차이 질문드립니다...!
0
1062
3
강의 잘봤습니당
0
452
1
이것이 왜 실행이 안되는지 여쭙고 싶네요.
0
2792
1
초급강의를 수강할때 질문입니다.
0
469
1
실무에서 SSE 서비스 구성 시 컴포넌트와 아키텍처 문의
0
421
1
doOnComplete, doOnError 는 왜 필요한가요?
0
807
1
예제에서 Runnable 용도
0
230
1
스프링 웹플럭스 강의는 안하시나요?
0
1232
1
Schedulers.computation()에 관하여
0
471
5
조금 더 자세한 설명부탁드립니다
0
316
2
SSE 실습 소스코드 위치 문의
1
486
1
blockingXXXX 함수들은 테스트용도로만 사용하는건가요?
2
224
1

