rabbitmq 를 설정하고 사용해보자
환경은 다음과 같다
- springboot 2.7.18
- jdk 11.0.21
개발에 앞서 로컬 rabbitmq 구성 방법은 이전 게시물을 참고한다
https://fullmooney.tistory.com/29
rabbitmq 로컬구성과 DLQ 설정
도커데스크탑으로 rabbitmq DLQ 설정을 해보자 환경은 다음과 같다. Windows 10 docker desktop 4.15.0 먼저 dockerhub에서 rabbitmq를 검색해서 오피셜 이미지를 받는다. 테스트는 3.12.12-management 태그로 진행했다.
fullmooney.tistory.com
dependency를 추가한다.
<!--pom.xml -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>3.2.10</version>
</dependency>
application.yml에 configuration에 사용할 properties를 추가한다.
mq:
host: localhost
port: 5672
username: user01
password: pass01
exchange: x.domain.dev
reply:
timeout: 60000
concurrent:
consumers: 4
max:
concurrent:
consumers: 10
attempts: 5
이제 MqConfig 클래스를 생성한다.
@Slf4j
@EnableRabbit
@Configuration
public class MqConfig {
@Value("${mq.host}")
private String host;
@Value("${mq.port}")
private int port;
@Value("${mq.username}")
private String username;
@Value("${mq.password}")
private String password;
@Value("${mq.exchange}")
private String exchange;
@Value("${mq.reply.timeout}")
private Integer replyTimeout;
@Value("${mq.concurrent.consumers}")
private Integer concurrentConsumers;
@Value("${mq.max.concurrent.consumers}")
private Integer maxConcurrentConsumers;
@Value("${mq.max.attempts}")
private Integer maxAttempts;
@Bean
public DirectExchange exchange() {
return new DirectExchange(exchange);
}
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
MessageConverter jsonMessageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean("mqTemplate")
RabbitTemplate mqTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
rabbitTemplate.setReplyTimeout(replyTimeout);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(jsonMessageConverter());
factory.setConcurrentConsumers(concurrentConsumers);
factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
MessageRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer();
factory.setAdviceChain(
RetryInterceptorBuilder.stateless().maxAttempts(maxAttempts).backOffOptions(3000, 2, 10000)
.recoverer(messageRecoverer).build()); // 3초 간격 2번 10초까지 최대 maxAttempts(3회)까지 retry 이후 exhausted
// 로 전환
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
}
public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
if (log.isErrorEnabled()) {
log.error("Failed to process inbound message from queue {}: failed message: {}, {}",
lefe.getFailedMessage().getMessageProperties().getConsumerQueue(), lefe.getFailedMessage(),
t);
}
}
return super.isFatal(t);
}
}
}
MQClient 생성을 위해 exchange는 Bean 생성하고, Queue와 Binding은 스킵한다.
DLQ 사용을 위해 rabbitListenerContainerFactory 에서 setAdviceChain을 통해 RetryInterceptor를 추가해준다. 만약 추가하지 않으면 무한반복되는 listening에러를 볼 수 있다. -_-
이제 publish에 사용할 MQClient를 생성해보자. Template을 사용해도 되지만 공통화 차원에서 Client 클래스로 구성한다.
@Component
public class MQClient {
private final RabbitTemplate mqTemplate;
public MQClient(@Qualifier("mqTemplate") RabbitTemplate mqTemplate) {
this.mqTemplate = mqTemplate;
}
public <T> void publish(final String exchange, final String routingKey, T event) {
try {
if (StringUtils.isEmpty(exchange)) {
throw new NoSuchMethodException("[ERROR] exchange is mandatory. ");
}
if (StringUtils.isEmpty(routingKey)) {
throw new NoSuchMethodException("[ERROR] routing-key is mandatory. ");
}
mqTemplate.convertAndSend(exchange, routingKey, event);
} catch (AmqpException | NoSuchMethodException | SecurityException | IllegalArgumentException e) {
throw new RuntimeException(e.getMessage());
}
}
}
테스트를 위해 MQTestController와 EventVO를 생성한다.
//MQTestController.java
@Slf4j
@RestController
@RequestMapping("/api/async")
public class MQTestController {
private final MQClient mqClient;
public MQTestController(MQClient mqClient) {
this.mqClient = mqClient;
}
@PostMapping("/publish-msg")
public ResponseEntity<Void> publishMsg(@RequestBody EventVO vo) {
mqClient.publish("x.domain.dev", "r.domain.001.dev", vo);
return ResponseEntity.ok().build();
}
}
//EventVO.java
@Getter
@Setter
@NoArgsConstructor
@ToString
public class EventVO {
@Schema(example = "Ligue1")
private String league;
@Schema(example = "PSG")
private String team;
@Schema(example = "Lee Gang-in")
private String name;
private int number;
private int wage;
}
eventVO 를 받아서 처리할.. 처리하지 않고 exception 던지다가 DLQ로 보낼 리스너 클래스도 생성한다.
@Slf4j
@Component
public class MQTestListener {
@RabbitListener(queues = { "q.domain.001.dev" }, group = "g.domain.001.dev")
public void listenMsg(final EventVO event) {
log.debug("", event.toString());
// DLQ 테스트를 위해 일부러 아무 Exception throw
throw new ReadTimeoutException("DLQ를 테스트하자");
}
// DLQ를 통해 publish된 event를 받는다.
@RabbitListener(queues = { "q.domain.001d.dev" }, group = "g.domain.001d.dev")
public void listenDLQMsg(final EventVO event) {
log.debug("welcome to DLQ {}", event.toString());
// TODO DB나 스토리지에 저장하여 후처리 해보자.
}
}
swagger에서 api를 테스트해보자.

비동기 통신으로 publish이후 곧바로 200 응답을 받는다.
MQ 어드민 페이지에서 Message rates를 보면 17:18 즈음 q.domain.001.dev queue에서 먼저 메시지가 들어오고, 이후 (retry 가 되다가) 17:19 즈음 DLQ인 q.domain.001d.dev로 이동함을 확인할 수 있다.


그리고 콘솔 로그에서도 의도한 대로, 찍혔다.

이제 잘 사용해 보자
Git: async/mq at dev · FullMooney/async (github.com)
'개발 > java' 카테고리의 다른 글
| springboot threadLocal 테스트 (1) | 2024.01.22 |
|---|---|
| springboot resttemplate config 와 restClient 생성 (1) | 2024.01.19 |
| springboot + redis @Cacheable 사용 (3) | 2024.01.11 |
| springboot redis client 만들기 (1) | 2024.01.08 |
| Logbook 으로 access log 남기기 (2) | 2024.01.04 |