springboot rabbitmq config와 DLQ 예제

2024. 1. 18. 09:17·개발/java

rabbitmq 를 설정하고 사용해보자

 

환경은 다음과 같다

  • springboot 2.7.18
  • jdk 11.0.21

개발에 앞서 로컬 rabbitmq 구성 방법은 이전 게시물을 참고한다

 

https://fullmooney.tistory.com/29

 

rabbitmq 로컬구성과 DLQ 설정

도커데스크탑으로 rabbitmq DLQ 설정을 해보자 환경은 다음과 같다. Windows 10 docker desktop 4.15.0 먼저 dockerhub에서 rabbitmq를 검색해서 오피셜 이미지를 받는다. 테스트는 3.12.12-management 태그로 진행했다.

fullmooney.tistory.com

 

dependency를 추가한다.

<!--pom.xml -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    <version>3.2.10</version>
</dependency>

 

application.yml에 configuration에 사용할 properties를 추가한다.

mq:
  host: localhost
  port: 5672
  username: user01
  password: pass01
  exchange: x.domain.dev
  reply: 
    timeout: 60000
  concurrent:
    consumers: 4
  max:
    concurrent:
      consumers: 10
    attempts: 5

 

이제 MqConfig 클래스를 생성한다.

@Slf4j
@EnableRabbit
@Configuration
public class MqConfig {
    @Value("${mq.host}")
    private String host;
    @Value("${mq.port}")
    private int port;
    @Value("${mq.username}")
    private String username;
    @Value("${mq.password}")
    private String password;
    @Value("${mq.exchange}")
    private String exchange;
    @Value("${mq.reply.timeout}")
    private Integer replyTimeout;
    @Value("${mq.concurrent.consumers}")
    private Integer concurrentConsumers;
    @Value("${mq.max.concurrent.consumers}")
    private Integer maxConcurrentConsumers;
    @Value("${mq.max.attempts}")
    private Integer maxAttempts;

    @Bean
    public DirectExchange exchange() {
	return new DirectExchange(exchange);
    }

    @Bean
    ConnectionFactory connectionFactory() {
	CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
	connectionFactory.setHost(host);
	connectionFactory.setPort(port);
	connectionFactory.setVirtualHost("/");
	connectionFactory.setUsername(username);
	connectionFactory.setPassword(password);
	return connectionFactory;
    }

    @Bean
    MessageConverter jsonMessageConverter() {
	ObjectMapper objectMapper = new ObjectMapper();
	return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Bean("mqTemplate")
    RabbitTemplate mqTemplate(ConnectionFactory connectionFactory) {
	final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
	rabbitTemplate.setMessageConverter(jsonMessageConverter());
	rabbitTemplate.setReplyTimeout(replyTimeout);
	rabbitTemplate.setUseDirectReplyToContainer(false);
	return rabbitTemplate;
    }

    @Bean
    AmqpAdmin amqpAdmin() {
	return new RabbitAdmin(connectionFactory());
    }

    @Bean
    SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
	final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
	factory.setConnectionFactory(connectionFactory());
	factory.setMessageConverter(jsonMessageConverter());
	factory.setConcurrentConsumers(concurrentConsumers);
	factory.setMaxConcurrentConsumers(maxConcurrentConsumers);

	MessageRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer();
	factory.setAdviceChain(
		RetryInterceptorBuilder.stateless().maxAttempts(maxAttempts).backOffOptions(3000, 2, 10000)
			.recoverer(messageRecoverer).build()); // 3초 간격 2번 10초까지 최대 maxAttempts(3회)까지 retry 이후 exhausted
							       // 로 전환
	factory.setErrorHandler(errorHandler());
	return factory;
    }

    @Bean
    ErrorHandler errorHandler() {
	return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
	@Override
	public boolean isFatal(Throwable t) {
	    if (t instanceof ListenerExecutionFailedException) {
		ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
		if (log.isErrorEnabled()) {

		    log.error("Failed to process inbound message from queue {}: failed message: {}, {}",
			    lefe.getFailedMessage().getMessageProperties().getConsumerQueue(), lefe.getFailedMessage(),
			    t);
		}
	    }
	    return super.isFatal(t);
	}
    }
}

 

MQClient 생성을 위해 exchange는 Bean 생성하고, Queue와 Binding은 스킵한다.

DLQ 사용을 위해 rabbitListenerContainerFactory 에서 setAdviceChain을 통해 RetryInterceptor를 추가해준다. 만약 추가하지 않으면 무한반복되는 listening에러를 볼 수 있다. -_-

 

이제 publish에 사용할 MQClient를 생성해보자. Template을 사용해도 되지만 공통화 차원에서 Client 클래스로 구성한다.

@Component
public class MQClient {

    private final RabbitTemplate mqTemplate;

    public MQClient(@Qualifier("mqTemplate") RabbitTemplate mqTemplate) {
	this.mqTemplate = mqTemplate;
    }

    public <T> void publish(final String exchange, final String routingKey, T event) {

	try {

	    if (StringUtils.isEmpty(exchange)) {
		throw new NoSuchMethodException("[ERROR] exchange is mandatory. ");
	    }

	    if (StringUtils.isEmpty(routingKey)) {
		throw new NoSuchMethodException("[ERROR] routing-key is mandatory. ");
	    }

	    mqTemplate.convertAndSend(exchange, routingKey, event);

	} catch (AmqpException | NoSuchMethodException | SecurityException | IllegalArgumentException e) {
	    throw new RuntimeException(e.getMessage());
	}

    }
}

 

테스트를 위해 MQTestController와 EventVO를 생성한다.

//MQTestController.java
@Slf4j
@RestController
@RequestMapping("/api/async")
public class MQTestController {
    
    private final MQClient mqClient;

    public MQTestController(MQClient mqClient) {
	this.mqClient = mqClient;
    }

    @PostMapping("/publish-msg")
    public ResponseEntity<Void> publishMsg(@RequestBody EventVO vo) {
        
	mqClient.publish("x.domain.dev", "r.domain.001.dev", vo);

	return ResponseEntity.ok().build();
    }

}


//EventVO.java
@Getter
@Setter
@NoArgsConstructor
@ToString
public class EventVO {
    
    @Schema(example = "Ligue1")
    private String league;
    @Schema(example = "PSG")
    private String team;
    @Schema(example = "Lee Gang-in")
    private String name;
    private int number;
    private int wage;
}

 

eventVO 를 받아서 처리할.. 처리하지 않고 exception 던지다가 DLQ로 보낼 리스너 클래스도 생성한다.

@Slf4j
@Component
public class MQTestListener {

    @RabbitListener(queues = { "q.domain.001.dev" }, group = "g.domain.001.dev")
    public void listenMsg(final EventVO event) {
	log.debug("", event.toString());
	// DLQ 테스트를 위해 일부러 아무 Exception throw
	throw new ReadTimeoutException("DLQ를 테스트하자");
    }

    // DLQ를 통해 publish된 event를 받는다.
    @RabbitListener(queues = { "q.domain.001d.dev" }, group = "g.domain.001d.dev")
    public void listenDLQMsg(final EventVO event) {
	log.debug("welcome to DLQ {}", event.toString());
	// TODO DB나 스토리지에 저장하여 후처리 해보자.
    }
}

 

swagger에서 api를 테스트해보자.

 

비동기 통신으로 publish이후 곧바로 200 응답을 받는다.

 

MQ 어드민 페이지에서 Message rates를 보면 17:18 즈음 q.domain.001.dev queue에서 먼저 메시지가 들어오고, 이후 (retry 가 되다가) 17:19 즈음 DLQ인 q.domain.001d.dev로 이동함을 확인할 수 있다.

 

그리고 콘솔 로그에서도 의도한 대로, 찍혔다.

 

이제 잘 사용해 보자

 

Git: async/mq at dev · FullMooney/async (github.com)

 

728x90

'개발 > java' 카테고리의 다른 글

springboot threadLocal 테스트  (1) 2024.01.22
springboot resttemplate config 와 restClient 생성  (1) 2024.01.19
springboot + redis @Cacheable 사용  (3) 2024.01.11
springboot redis client 만들기  (1) 2024.01.08
Logbook 으로 access log 남기기  (2) 2024.01.04
'개발/java' 카테고리의 다른 글
  • springboot threadLocal 테스트
  • springboot resttemplate config 와 restClient 생성
  • springboot + redis @Cacheable 사용
  • springboot redis client 만들기
yunapapa
yunapapa
working on the cloud
    250x250
  • yunapapa
    supermoon
    yunapapa
  • 전체
    오늘
    어제
    • 분류 전체보기 (94)
      • 개발 (20)
        • java (17)
        • web (2)
        • MSX (1)
        • Go (0)
      • CloudNative (50)
        • App Definition & Developeme.. (17)
        • Orchestration & Management (4)
        • Runtime (3)
        • Provisioning (7)
        • Observability & Analysis (14)
        • event review (5)
      • AWS (7)
      • 환경관련 (17)
      • 취미생활 (0)
        • 맛집 (0)
        • 게임 (0)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

    • CNCF Past Events
    • Kubernetes Korea Group
  • 공지사항

  • 인기 글

  • 태그

    gitlab
    Pinpoint
    OpenShift
    k8s
    Java
    helm
    티스토리챌린지
    kubernetes
    springboot
    AWS
    istio
    dop-c02
    devops
    오블완
    APM
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
yunapapa
springboot rabbitmq config와 DLQ 예제
상단으로

티스토리툴바