38,500원
다른 수강생들이 자주 물어보는 질문이 궁금하신가요?
- 미해결Kevin의 알기 쉬운 RxJava 2부
실무에서 SSE 서비스 구성 시 컴포넌트와 아키텍처 문의
안녕하세요. 예제 코드에서는 Observable만을 사용해 서버에 데이터를 공급해주었는데 실무에서는 어떤 컴포넌트와 아키텍처로 이러한 서비스를 구성하는지 문의드립니다. 메시지 큐를 사용할 것 같기는 한데 이를 Observable과 연동해서 사용하는 건지 아니면 사용하는 메시지 큐의 클라이언트 라이브러리를 사용하는지 알려주시면 감사하겠습니다.
- 해결됨Kevin의 알기 쉬운 RxJava 2부
doOnComplete, doOnError 는 왜 필요한가요?
질문이 있습니다. doOncomplete, doOnError 메서드라는 게 있다는 건 알겠지만, "이게 필요한가?"라는 의문점이 계속 생깁니다. 생산자의 doOnComplete 대신에 소비자의 onComplete 를 써도 될거 같고... 에러도 마찬가지로 생산자의 doOnError 대신에 소비자의 onError 를 쓰면 되는게 아닌가요??
- 미해결Kevin의 알기 쉬운 RxJava 2부
예제에서 Runnable 용도
안녕하세요. dispose 함수에 Runnable 객체가 전달되는데, emitter 완료와 타임아웃 때 처리되는 로직이 runnable로 만 처리해야 하는 건가요? 아직 rx 프로그래밍에 대한 경험이 없다보니 예제 코드가 관례적인 것인지 아닌지 판단하기 어렵습니다.
- 미해결Kevin의 알기 쉬운 RxJava 2부
스프링 웹플럭스 강의는 안하시나요?
스프링 웹플럭스 강의는 안하시나요?
- 해결됨Kevin의 알기 쉬운 RxJava 2부
Schedulers.computation()에 관하여
RxJava 1부 강의 예제 ObservableFromFutureExample.java에서 스케줄러 computation을 적용해보았습니다. (코드 일부를 추가해보았습니다.) - ObservableFromFutureExample.java public class ObservableFromFutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { Logger.log(LogType.PRINT, "# start time : "); TimeUtil.start(); // 긴 처리 시간이 걸리는 작업 Future<Double> future = longTimeWork(); // 짧은 처리 시간이 걸리는 작업 shortTimeWork(); Observable.fromFuture(future) .subscribeOn(Schedulers.computation()) .subscribe(data -> Logger.log(LogType.PRINT, "# 긴 처리 시간 작업 결과 : " + data)); TimeUtil.end(); TimeUtil.takeTime(); Logger.log(LogType.PRINT, "# end time" ); } public static CompletableFuture<Double> longTimeWork(){ return CompletableFuture.supplyAsync(() -> calculate()); } private static Double calculate() { Logger.log(LogType.PRINT, "# 긴 처리 시간이 걸리는 작업 중........."); TimeUtil.sleep(6000L); return 100000000000000000.0; } private static void shortTimeWork() { TimeUtil.sleep(3000L); Logger.log(LogType.PRINT, "# 짧은 처리 시간 작업 완료!"); }} - Result print() | main | 02:35:32.127 | # start time : print() | ForkJoinPool.commonPool-worker-3 | 02:35:32.136 | # 긴 처리 시간이 걸리는 작업 중......... print() | main | 02:35:35.151 | # 짧은 처리 시간 작업 완료! # 실행시간: 3120 ms print() | main | 02:35:35.250 | # end time 1. Schedulers.computation()를 적용하면 subscribe( ) 메소드에서 구독자에게 발행되는 데이터 결과 값이 출력되지 않는 이유를 알고 싶습니다. 2. Schedules.computation()을 적용했음에도 불구하고 위 결과처럼 'ForkJoinPool.commonPool-worker-3'라는 스레드에서 실행되는 이유를 알고 싶습니다! * 2. 자문자답CompletableFuture.java를 살펴보았더니, supplyAsync( ) 메소드의 기본 실행자가 ForkJoinPool.commonPool()이라는 설명을 찾았습니다. - CompletableFuture.java /** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 그렇다면, fromFuture( ) 및 CompletableFuture를 이용하는 경우 subscribeOn( ) 함수로 굳이 스케줄러를 지정하지 않아도 되는건가요?
- 해결됨Kevin의 알기 쉬운 RxJava 2부
조금 더 자세한 설명부탁드립니다
해당 강의에서 추가 설명에 대해 이해가 되지 않아서 조금 더 자세한 설명 부탁드립니다. 우선, 제가 임의로 아래 코드처럼 main 스레드를 0.5초 동안 일시 정지를 적용 시켜보았더니 정상적으로 동작했습니다. - UnitTestNotByRxJava.java (Test) public class UnitTestNotByRxJava { @Test public void getCarMakerStreamSyncTest(){ List<CarMaker> carMakerList = new ArrayList<>(); SampleObservable.getCarMakerStream() .subscribe(data -> carMakerList.add(data)); TimeUtil.sleep(500L); assertThat(carMakerList.size(), is(5)); }} - SampleObservable.java public class SampleObservable { ... public static Observable<CarMaker> getCarMakerStream() { Observable<CarMaker> observable = Observable.fromArray(SampleData.carMakers) .subscribeOn(Schedulers.computation()); return observable; } ...} 즉, 기존 예제에서 오류가 나는 이유가, main 스레드와 호출 함수 getCarMakerStream 내 'RxComputationThreadPool' 스레드가 동시에 실행이 되어 그 짧은 시간 사이에 main 스레드에서 결과 값을 리턴받지 못하기 때문이다라고 이해하면 될까요?
- 해결됨Kevin의 알기 쉬운 RxJava 2부
DoOnDisposeExample 예제에서의 스레드 관련 질문 (자문자답)
안녕하세요, 해당 강의 이후에 있는 강의 중(RxJava : doOnEach, doOnCancel/doOnDispose, 그 밖의 doXXXXX 함수) 예제 DoOnDisposeExample.java를 다루고 있는데요 subscribe( )에서 오버라이딩된 onSubscribe( ) 내에 로그 출력 코드를 추가해보았습니다 - DoOnDisposeExample.java public class DoOnDisposeExample { public static void main(String[] args) { Observable.fromArray(SampleData.carMakers) .zipWith( Observable.interval(300L, TimeUnit.MILLISECONDS), (carMaker, num) -> carMaker ) .doOnDispose(() -> Logger.log(LogType.DO_ON_DISPOSE, "# 생산자: 구독 해지 완료")) .subscribe(new Observer<CarMaker>() { private Disposable disposable; private long startTime; @Override public void onSubscribe(Disposable disposable) { this.disposable = disposable; this.startTime = TimeUtil.start(); SimpleDateFormat date = new SimpleDateFormat("HH:mm:ss.SSS"); Logger.log(LogType.PRINT, "시작 시간 확인 : " + date.format(new Date(startTime))); } @Override public void onNext(CarMaker carMaker) { Logger.log(LogType.ON_NEXT, carMaker); if(TimeUtil.getCurrentTime() - startTime > 1000L){ Logger.log(LogType.PRINT, "# 소비자: 구독 해지 , 1000L 초과"); disposable.dispose(); } } @Override public void onError(Throwable error) { Logger.log(LogType.ON_ERROR, error); } @Override public void onComplete() { Logger.log(LogType.ON_COMPLETE); } }); TimeUtil.sleep(2000L); }} - result 구독자가 Observable을 구독할 때 스케줄러를 별도로 지정하지 않았으니 main 스레드에서 동작을 하는거라고 이해가 가는데, 나머지 onNext( ) 메소드를 처음 호출하는 부분부터 어째서 'Schedulers.computation()'을 지정하지 않았음에도 RxComputationThreadPool-1이 사용하는지 이해가 되지 않습니다. * 자문자답 :Observable의 interval( ) 메소드에서 Schedulers.computation()을 지정하는 코드를 확인했습니다! - Observable.java /** * Returns an Observable that emits a sequential number every specified interval of time. * <p> * <img width="640" height="195" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/interval.png" alt=""> * <dl> * <dt><b>Scheduler:</b></dt> * <dd>{@code interval} operates by default on the {@code computation} {@link Scheduler}.</dd> * </dl> * * @param period * the period size in time units (see below) * @param unit * time units to use for the interval size * @return an Observable that emits a sequential number each time interval * @see <a href="http://reactivex.io/documentation/operators/interval.html">ReactiveX operators documentation: Interval</a> */@CheckReturnValue@SchedulerSupport(SchedulerSupport.COMPUTATION)public static Observable<Long> interval(long period, TimeUnit unit) { return interval(period, period, unit, Schedulers.computation());}
- 미해결Kevin의 알기 쉬운 RxJava 2부
SSE 실습 소스코드 위치 문의
안녕하세요. 좋은 강의 감사드립니다. 마지막 SSE 실습 소스코드 위치 문의드립니다. 알려주신 Github에서는 보이지 않습니다. https://github.com/ITVillage-Kevin/rxjava-episode2 * 실습 파일명 예시: RxJavaPracticeApplication.java 감사합니다.
- 미해결Kevin의 알기 쉬운 RxJava 2부
blockingXXXX 함수들은 테스트용도로만 사용하는건가요?
안녕하세요! 강의를 보다가 고민없이 생겨난 궁금증인데... blockingXXXX 함수들은 다 테스트 용도로만 사용하는 건가요? 아니면 실제 서비스에서도 사용될 수 있는건가요?