해결된 질문
작성
·
10
0
안녕하십니까 코드빌런님.
이번 추석 연휴동안 레빗 엠큐 강의 잘 들었습니다.
다름이 아니라 강의에서 알려주신 여러 내용을 바탕으로 기존에 구현하였던 redis pub/sub 기반의 알림 기능에 레빗 엠큐를 적용해보고 있습니다.
그리고 구현 중 두가지 질문이 있어 질문을 작성하게 되었습니다.
우선 알림을 전송하는 과정에 대해 말씀드리면 다음과 같습니다.
알림 객체 저장
성공 시 알림 발송
sse 연결들을 ConcurrentMap으로 관리하여 대상 userId의 sse연결을 찾아 해당 연결로 알림 객체 전송
현재 메시지 큐 구조는 다음과 같습니다.
알림 저장 메시지 생성 (direct exchange, saveNotificationQueue) -> 메시지 저장 성공 시 알림 전달 메시지 생성 (fanout exchange, publishNotificationQueue), 메시지 저장 실패 시 데드레터 큐로 전달
현재 서비스는 3개의 인스턴스로 동작하고 있습니다. 이때 알림 저장 큐는 1개라서 복수 저장될 일이 없지만, 그 후에 진행되는 알림 전달의 경우 단일 큐로 작동하면 대상 sse 연결이 없는 인스턴스에서 해당 메시지를 소비하게 되면 전송이 실패합니다. 그래서 각 인스턴스마다 큐를 만들어주고 fanout exchange에 모두 바인딩하여 사용하는 방식으로 만들어야 할 것 같다고 생각하였습니다.
그래서 찾아보니 SpEL 기반 동적 큐 이름 지정 방식이 있다고 하여 해당 방식으로 구현해보았습니다.
// RabbitMQConfig.java
// 알림 발송 큐
@Bean
public String dynamicPublishNotificationQueueName() {
String randomString = UUID.randomUUID().toString();
return PUBLISH_NOTIFICATION_QUEUE + " : " + randomString;
}
@Bean
public Queue publishNotificationQueue() {
return new Queue(dynamicPublishNotificationQueueName(), false);
}
@Bean
public FanoutExchange publishNotificationExchange() {
return new FanoutExchange(PUBLISH_NOTIFICATION_EXCHANGE);
}
@Bean
public Binding publishNotificationBinding() {
return BindingBuilder.bind(publishNotificationQueue()).to(publishNotificationExchange());
}
// NotificationSubscriber.java
@RabbitListener(queues = "#{@dynamicPublishNotificationQueueName}")
public void consumePublishNotificationMessage(Notification notification) {
notificationService.publishNotification(notification);
}
해당 방식으로 정상 작동은 확인하였는데, 혹시 해당 방식 외에 더 나은 방식이 있는지 궁금합니다.
강의 18강에서 application.yml에 retry 관련 프로퍼티를 설정하는 것만으로 자동으로 retry가 적용된다고 하여 해당 방식을 프로젝트에 적용해보았습니다.
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=1000
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.max-interval=1000
spring.rabbitmq.listener.simple.default-requeue-rejected=false
하지만 어떤 이유인지는 몰라도 retry가 작동하지 않았습니다. 실제로 실행되는 코드에 로그를 찍어봐도 한번만 시도하고 설정한 예외가 발생 후 바로 DLQ로 이동하였습니다.
그래서 원인을 찾던 도중
에서
@RabbitListener를 사용하면 내부적으로 SimpleMessageListenerContainer가자동으로 생성되기 때문에 retry 설정을 읽어서 exception 이 발생할 경우 RetryTemplate을 사용해서 자동으로 설정된 속성에 해당하는 작업을 수행하게 됩니다.
라고 코드빌런님이 말씀하신 것을 보았습니다.
확인해보니 메시지큐에서 객체 자동 역직렬화를 위해
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter());
return factory;
}
이렇게 SimpleRabbitListenerContainerFactory를 정의하여 빈으로 등록해놓았는데, SimpleRabbitListenerContainerFactory를 살펴보니
public class SimpleRabbitListenerContainerFactory
extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> {
...
말씀하신 SimpleMessageListenerContainer를 타입파라미터로 받아 상속받고 있는 형태였습니다.
이에 말씀하신 SimpleMessageListenerContainer가 자동으로 생성되어 retry 설정이 적용안되는것인가? 라고 예상하여 application.properties에 정의하는 대신
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter());
factory.setDefaultRequeueRejected(false);
factory.setAdviceChain(RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(1000, 2.0, 10000)
.build());
return factory;
}
이렇게 직접 retry 설정을 넣어주니 그제야 재시도가 정상적으로 작동하였습니다.
해당 원인이 제가 생각한 직접 팩토리를 Bean으로 등록하면 application.properties의 retry 설정이 무시되는 것이 맞는지 궁금합니다.
코드는 아래 url에서 보실 수 있습니다.
https://github.com/Dockerel/4th-SC-TEAM1-BE/pull/15/files
강의 정말 잘 들었습니다! 이렇게 프로젝트에 바로 적용해볼 수 있어서 기분이 좋네요.
나중에 코드빌런님의 다른 기술 스택 강의도 들어보고 싶습니다.
감사합니다.
답변 1
0
안녕하세요. 수강자님
동적 큐 네임 바인딩과 관련해서는 SpEL도 동작은 하겠으나 (저도 테스트는 안해봤습니다) 제가 이해한 질문은 동적 큐 바인딩을 통해 브로드 캐스트 하는 방식이 맞는지 물어보신거 같습니다.
@Configuration
public class RabbitMQConfig {
// Bean으로 큐 이름 저장 (한 번만 생성)
@Bean
public String dynamicPublishNotificationQueueName() {
return PUBLISH_NOTIFICATION_QUEUE + ":" + UUID.randomUUID().toString();
}
@Bean
public Queue publishNotificationQueue(
@Qualifier("dynamicPublishNotificationQueueName") String queueName) {
// 주입받아서 사용
return new Queue(queueName, false, true, true);
// durable=false, exclusive=true, autoDelete=true
}
@Bean
public FanoutExchange publishNotificationExchange() {
return new FanoutExchange(PUBLISH_NOTIFICATION_EXCHANGE);
}
@Bean
public Binding publishNotificationBinding(
Queue publishNotificationQueue,
FanoutExchange publishNotificationExchange) {
return BindingBuilder
.bind(publishNotificationQueue)
.to(publishNotificationExchange);
}
}
@Component
public class NotificationSubscriber {
@RabbitListener(queues = "#{@dynamicPublishNotificationQueueName}")
public void consumePublishNotificationMessage(Notification notification) {
notificationService.publishNotification(notification);
}
}
이런식으로 작성해서 3개의 인스턴스를 바탕으로 테스트 해보시고, 로깅을 추가해서 테스트 해보시면 좋을거 같습니다.
두번째 질문의 retry 관련해서는,
SimpleRabbitListenerContainerFactory 를 수동으로 정의하고 RabbitListenerContainerFactoryConfigurer.configure 를 별도로 핸들링하지 않으면 Spring boot의 property가 작동하지 않는다고 않는다고 알고 있습니다. 아래의 코드를 보시고 테스트를 한번 해보시기 바랍니다.
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// configurer.configure()를 호출하지 않음!
// YAML 설정이 전혀 적용되지 않음
return factory;
}
이 코드는 아래와 같이 정의할 수 있습니다.
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
RabbitListenerContainerFactoryConfigurer configurer) { // ⭐ 주입
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
// Boot 설정 먼저 적용
configurer.configure(factory, connectionFactory);
// 추가 커스터마이징 (Boot 설정 덮어쓰기)
factory.setDefaultRequeueRejected(false);
factory.setMessageConverter(myCustomConverter());
return factory;
}
configurer의 configure 에 래빗팩토리와 커넥션 팩토리를 통해 application.yml의 설정을 반영해야 합니다.
요건이나 환경이 정확하지 않아서 다소 상세하게 답변 드리기 어려운 점 참고하시기 바랍니다.