inflearn logo
강의

Course

Instructor

Kevin's Easy-to-Understand Spring Reactive Web Applications: Reactor Part 2

filterWhen의 차이

349

jchttl

3 asked

1

filter와 달리 filterWhen은 비동기로 동작한다고 했는데,

실제 예제를 돌려보면 동일하게 동기로 돌아가는 것 같습니다.

 

FilterWhenExample01 예제에서 종료 sleep을 10초로 늘려주고, 조건을 3_000 변경 후

public static void main(String[] args) {
    Flux
        .fromIterable(SampleData.coronaVaccineNames)
        /** filterWhen : 데이터를 비동기적으로 filtering 하고 싶을때 사용 */
        .filterWhen(vaccine -> isGreaterThan(vaccine, 3_000))
        .subscribe(Logger::onNext);

    TimeUtils.sleep(10000);
}

 

isGreaterThan 메소드에서 비동기 동작 확인을 위해 sleep 1초를 주었습니다.

public static Mono<Boolean> isGreaterThan(SampleData.CoronaVaccine coronaVaccine, int amount)
{
    TimeUtils.sleep(1000);
    return Mono
            .just(vaccineMap.get(coronaVaccine).getT2() > amount)
            .publishOn(Schedulers.parallel());
}

 

예상 결과로 아래 출력 5건이 1초 후 동시에 나올 것이라 생각했는데 동기와 동일하게 1초당 1건씩 출력이 됩니다.

> Task :FilterWhenExample01.main()

14:37:55.393 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework

14:37:56.494 [parallel-1] INFO com.itvillage.utils.Logger - # onNext(): Pfizer

14:37:57.501 [parallel-2] INFO com.itvillage.utils.Logger - # onNext(): AstraZeneca

14:37:58.513 [parallel-3] INFO com.itvillage.utils.Logger - # onNext(): Moderna

14:37:59.527 [parallel-4] INFO com.itvillage.utils.Logger - # onNext(): Janssen

14:38:00.537 [parallel-5] INFO com.itvillage.utils.Logger - # onNext(): Novavax

 

 

 

좀더 간단한 예제로 아래 코드는 동기적으로 1초당 1건씩 출력되어, filter와의 차이점을 모르겠습니다.

public static void main(String[] args) {
    Flux
        .range(1, 20)
        .filterWhen(num -> {
            TimeUtils.sleep(1000); // 예시를 위해 잠시 대기
            return Mono.just(num % 2 == 0);
        })
        .subscribe(Logger::onNext);
}

filterWhen의 특성을 정확하게 나타낼 수 있는 예제와 설명을 부탁드립니다

감사합니다~

 

 

 

java spring project-reactor

Answer 1

1

Kevin

안녕하세요?

filterWhen() Operator는 사실 처음 Reactor에 입문하는 분들이 학습할 때 혼란스러울 수도 있을 것 같아서 filter() Operator만 설명드리려다가 그래도 간단한 예제로라도 설명을 해보자 라는 생각으로 강의에 추가한 Operator이긴 한데요.

 

아무튼 filterWhen() Operator를 어떤 경우에 사용할 수 있는지에 대해서만 간단하게 설명드리겠습니다.

 


Flux<String> data = Flux.just("data1", "data2", "data3", "data4");

data
    .filterWhen(data -> callExternalService(data)) // 외부 서비스 호출
    .subscribe(filteredData -> System.out.println("Filtered data: " + filteredData));

Mono<Boolean> callExternalService(String data) {
    // 외부 서비스 호출을 Non-Blocking 호출로 시뮬레이션하는 코드
    return WebClient.create()
                    .get()
                    .uri("https://aaa.com/evaluate?data=" + data)
                    .retrieve()
                    .bodyToMono(Boolean.class);
}

 

위 코드를 보시면, filterWhen() 내부에서 callExternalService() 메서드를 호출하고 있는데요.

callExternalService()가 호출되면 WebClient를 이용해 외부 서비스에서 각각의 data의 조건을 평가할 것입니다.

여기서 WebClient로 호출하는 외부 API(aaa.com/evaluate?data=xxx)가 Spring WebFlux 기반이라면 data1부터 data4까지 각각 Non-Blocking 형태로 호출 되어서 동기적으로 조건을 평가하는 것보다 효율성이 향상 될 겁니다.

 

Reactor 만으로 설명하기가 사실상 한계가 있긴한데 그래도 기본 개념은 알려드리고 싶어서 강의에서 filterWhen()에 대한 예제를 추가했었습니다. ^^;

 

강의에서 설명이 부족했다면 양해 부탁드리겠습니다.

나중에 Spring WebFlux 기반의 애플리케이션을 여러 개 돌려 놓고, 테스트 해보시고 결과를 눈으로 확인해 보시면 좋을 것 같아요.

 

감사합니다.

패키지 구분에 대해 궁금한게 있습니다

0

8

1

안녕하세요. 바뀐 채점사이트 관련해서 문의드립니다.

0

19

1

갑자기 채점 사이트가 바뀌었어요

0

19

1

코드 자료

0

25

2

문제 리스트 페이지

0

22

1

part8 Notion 링크

0

23

1

채점 사이트 관련 질문드립니다

0

20

1

인텔리제이 MCP 서버 설정 관련

0

26

1

조회속도 개선에서 더 개선하는 방법이 궁금합니다.

0

28

2

필기자료 사라졌나요?(실기 일주일만에 안돼서 재도전-_-)

0

37

2

servlet과 container에 대한 질문입니다

0

24

1

질문있습니다

0

25

1

1번 문제 질문입니다.

0

31

1

26년 1회 실기 해설 강의

0

51

2

음악플레이어 문제 중 코드질문

0

26

1

잠겨버린 사물함 시간초과 관련 질문입니다.

0

25

1

RepositoryTest의 패키지 위치가 domain인 이유

0

30

2

도메인 모델에서 관계와 규칙을 구분하는 방법

0

37

2

프로젝트 질문 문의

0

45

1

UserService, CertificationService 책임 분리 기준 질문

0

26

1

문제와 풀이4 문제점

0

41

2

window 예제 1번 request(n) * maxSize 부분을 잘 모르겠습니다.

0

58

2

onErrorResume을 사용하지 않는 모든 경우 예외 발생 시, 시퀀스는 종료되나요?

0

232

2

혹시 다음강의부터는 ppt 한번에 묶어서 올려주실수있나요?

0

354

1