강의

멘토링

로드맵

인프런 커뮤니티 질문&답변

도등어님의 프로필 이미지
도등어

작성한 질문수

RabbitMQ를 이용한 비동기 아키텍처 한방에 해결하기

동적 큐 이름 설정 방법 및 SimpleRabbitListenerContainerFactory의 재정의에 따른 Retry 설정 미적용 관련 질문입니다.

해결된 질문

작성

·

10

0

안녕하십니까 코드빌런님.

이번 추석 연휴동안 레빗 엠큐 강의 잘 들었습니다.

 

다름이 아니라 강의에서 알려주신 여러 내용을 바탕으로 기존에 구현하였던 redis pub/sub 기반의 알림 기능에 레빗 엠큐를 적용해보고 있습니다.

 

그리고 구현 중 두가지 질문이 있어 질문을 작성하게 되었습니다.

 

1. 동적 큐 이름 설정 방식

우선 알림을 전송하는 과정에 대해 말씀드리면 다음과 같습니다.

  1. 알림 객체 저장

  2. 성공 시 알림 발송

  3. sse 연결들을 ConcurrentMap으로 관리하여 대상 userId의 sse연결을 찾아 해당 연결로 알림 객체 전송

 

현재 메시지 큐 구조는 다음과 같습니다.

 

알림 저장 메시지 생성 (direct exchange, saveNotificationQueue) -> 메시지 저장 성공 시 알림 전달 메시지 생성 (fanout exchange, publishNotificationQueue), 메시지 저장 실패 시 데드레터 큐로 전달

 

현재 서비스는 3개의 인스턴스로 동작하고 있습니다. 이때 알림 저장 큐는 1개라서 복수 저장될 일이 없지만, 그 후에 진행되는 알림 전달의 경우 단일 큐로 작동하면 대상 sse 연결이 없는 인스턴스에서 해당 메시지를 소비하게 되면 전송이 실패합니다. 그래서 각 인스턴스마다 큐를 만들어주고 fanout exchange에 모두 바인딩하여 사용하는 방식으로 만들어야 할 것 같다고 생각하였습니다.

 

그래서 찾아보니 SpEL 기반 동적 큐 이름 지정 방식이 있다고 하여 해당 방식으로 구현해보았습니다.

// RabbitMQConfig.java
// 알림 발송 큐
@Bean
public String dynamicPublishNotificationQueueName() {
    String randomString = UUID.randomUUID().toString();
    return PUBLISH_NOTIFICATION_QUEUE + " : " + randomString;
}

@Bean
public Queue publishNotificationQueue() {
    return new Queue(dynamicPublishNotificationQueueName(), false);
}

@Bean
public FanoutExchange publishNotificationExchange() {
    return new FanoutExchange(PUBLISH_NOTIFICATION_EXCHANGE);
}

@Bean
public Binding publishNotificationBinding() {
    return BindingBuilder.bind(publishNotificationQueue()).to(publishNotificationExchange());
}

// NotificationSubscriber.java
@RabbitListener(queues = "#{@dynamicPublishNotificationQueueName}")
public void consumePublishNotificationMessage(Notification notification) {
    notificationService.publishNotification(notification);
}

해당 방식으로 정상 작동은 확인하였는데, 혹시 해당 방식 외에 더 나은 방식이 있는지 궁금합니다.

2. SimpleRabbitListenerContainerFactory의 재정의에 따른 Retry 설정 미적용

강의 18강에서 application.yml에 retry 관련 프로퍼티를 설정하는 것만으로 자동으로 retry가 적용된다고 하여 해당 방식을 프로젝트에 적용해보았습니다.


spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=1000
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.max-interval=1000

spring.rabbitmq.listener.simple.default-requeue-rejected=false

하지만 어떤 이유인지는 몰라도 retry가 작동하지 않았습니다. 실제로 실행되는 코드에 로그를 찍어봐도 한번만 시도하고 설정한 예외가 발생 후 바로 DLQ로 이동하였습니다.

 

그래서 원인을 찾던 도중

https://inf.run/bsxxr

에서

@RabbitListener를 사용하면 내부적으로 SimpleMessageListenerContainer자동으로 생성되기 때문에 retry 설정을 읽어서 exception 이 발생할 경우 RetryTemplate을 사용해서 자동으로 설정된 속성에 해당하는 작업을 수행하게 됩니다.

라고 코드빌런님이 말씀하신 것을 보았습니다.

확인해보니 메시지큐에서 객체 자동 역직렬화를 위해

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(messageConverter());
    return factory;
}

이렇게 SimpleRabbitListenerContainerFactory를 정의하여 빈으로 등록해놓았는데, SimpleRabbitListenerContainerFactory를 살펴보니

public class SimpleRabbitListenerContainerFactory
		extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> {
...

말씀하신 SimpleMessageListenerContainer를 타입파라미터로 받아 상속받고 있는 형태였습니다.

 

이에 말씀하신 SimpleMessageListenerContainer가 자동으로 생성되어 retry 설정이 적용안되는것인가? 라고 예상하여 application.properties에 정의하는 대신

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(messageConverter());
    factory.setDefaultRequeueRejected(false);
    factory.setAdviceChain(RetryInterceptorBuilder.stateless()
            .maxAttempts(3)
            .backOffOptions(1000, 2.0, 10000)
            .build());
    return factory;
}

이렇게 직접 retry 설정을 넣어주니 그제야 재시도가 정상적으로 작동하였습니다.

 

해당 원인이 제가 생각한 직접 팩토리를 Bean으로 등록하면 application.properties의 retry 설정이 무시되는 것이 맞는지 궁금합니다.

 

코드는 아래 url에서 보실 수 있습니다.

https://github.com/Dockerel/4th-SC-TEAM1-BE/pull/15/files

 

강의 정말 잘 들었습니다! 이렇게 프로젝트에 바로 적용해볼 수 있어서 기분이 좋네요.

나중에 코드빌런님의 다른 기술 스택 강의도 들어보고 싶습니다.

감사합니다.

답변 1

0

코드빌런님의 프로필 이미지
코드빌런
지식공유자

안녕하세요. 수강자님

동적 큐 네임 바인딩과 관련해서는 SpEL도 동작은 하겠으나 (저도 테스트는 안해봤습니다) 제가 이해한 질문은 동적 큐 바인딩을 통해 브로드 캐스트 하는 방식이 맞는지 물어보신거 같습니다.

 


@Configuration
public class RabbitMQConfig {
    
    // Bean으로 큐 이름 저장 (한 번만 생성)
    @Bean
    public String dynamicPublishNotificationQueueName() {
        return PUBLISH_NOTIFICATION_QUEUE + ":" + UUID.randomUUID().toString();
    }
    
    @Bean
    public Queue publishNotificationQueue(
            @Qualifier("dynamicPublishNotificationQueueName") String queueName) {
        //  주입받아서 사용
        return new Queue(queueName, false, true, true);
        //               durable=false, exclusive=true, autoDelete=true
    }
    
    @Bean
    public FanoutExchange publishNotificationExchange() {
        return new FanoutExchange(PUBLISH_NOTIFICATION_EXCHANGE);
    }
    
    @Bean
    public Binding publishNotificationBinding(
            Queue publishNotificationQueue,
            FanoutExchange publishNotificationExchange) {
        return BindingBuilder
            .bind(publishNotificationQueue)
            .to(publishNotificationExchange);
    }
}

@Component
public class NotificationSubscriber {
    
    @RabbitListener(queues = "#{@dynamicPublishNotificationQueueName}")
    public void consumePublishNotificationMessage(Notification notification) {
        notificationService.publishNotification(notification);
    }
}

이런식으로 작성해서 3개의 인스턴스를 바탕으로 테스트 해보시고, 로깅을 추가해서 테스트 해보시면 좋을거 같습니다.

 

두번째 질문의 retry 관련해서는,

SimpleRabbitListenerContainerFactory 를 수동으로 정의하고 RabbitListenerContainerFactoryConfigurer.configure 를 별도로 핸들링하지 않으면 Spring boot의 property가 작동하지 않는다고 않는다고 알고 있습니다. 아래의 코드를 보시고 테스트를 한번 해보시기 바랍니다.

 

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory connectionFactory) {
    
    SimpleRabbitListenerContainerFactory factory = 
        new SimpleRabbitListenerContainerFactory();
    
    factory.setConnectionFactory(connectionFactory);
    // configurer.configure()를 호출하지 않음!
    // YAML 설정이 전혀 적용되지 않음
    
    return factory;
}

이 코드는 아래와 같이 정의할 수 있습니다.

@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            RabbitListenerContainerFactoryConfigurer configurer) { // ⭐ 주입
        
        SimpleRabbitListenerContainerFactory factory = 
            new SimpleRabbitListenerContainerFactory();
        
        // Boot 설정 먼저 적용
        configurer.configure(factory, connectionFactory);
        
        // 추가 커스터마이징 (Boot 설정 덮어쓰기)
        factory.setDefaultRequeueRejected(false);
        factory.setMessageConverter(myCustomConverter());
        
        return factory;
    }

configurer의 configure 에 래빗팩토리와 커넥션 팩토리를 통해 application.yml의 설정을 반영해야 합니다.

 

요건이나 환경이 정확하지 않아서 다소 상세하게 답변 드리기 어려운 점 참고하시기 바랍니다.

 

 

도등어님의 프로필 이미지
도등어

작성한 질문수

질문하기