SpringBoot 是在 Spring AMQP 上面再次封装了一层,不需要再像 Spring AMQP 一样注入各个组件 Bean, 只需要在配置文件上配置好 RabbitMQ 属性,SpringBoot 就可以自动注入了。
而使用 @RabbitListener 注解可以轻松实现消费端事件监听处理。
一、项目配置
1.1 添加 Spring AMQP 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
1.2 配置属性
在 /resources/application.properties 文件中申明属性:
# rabbitmq 配置
## 主机地址
spring.rabbitmq.host=111.231.83.100
## 端口号
spring.rabbitmq.port=5672
## 虚拟主机路径
spring.rabbitmq.virtual-host=/
## 连接超时时间
spring.rabbitmq.connection-timeout=15000
## 消费者设置手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## 消费者每次消费数量
spring.rabbitmq.listener.simple.concurrency=1
## 最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=5
# 自定义属性
## 队列名称
spring.rabbitmq.listener.order.queue.name=orderQueue
## 队列是否持久化
spring.rabbitmq.listener.order.queue.durable=true
## 交换机名称
spring.rabbitmq.listener.order.exchange.name=orderExchange
## 交换机是否持久化
spring.rabbitmq.listener.order.exchange.durable=true
## 交换机类型
spring.rabbitmq.listener.order.exchange.type=topic
## 是否忽略异常
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
## 路由 key
spring.rabbitmq.listener.order.routingKey=order#
相关属性表:
属性名 | 说明 | 默认值 |
---|---|---|
spring.rabbitmq.address | 客户端连接的地址,有多个的时候使用逗号分隔,该地址可以是IP与Port的结合 | |
spring.rabbitmq.cache.channel.checkout-timeout | 当缓存已满时,获取Channel的等待时间,单位为毫秒 | |
spring.rabbitmq.cache.channel.size | 缓存中保持的Channel数量 | |
spring.rabbitmq.cache.connection.mode | 连接缓存的模式 | CHANNEL |
spring.rabbitmq.cache.connection.size | 缓存的连接数 | |
spring.rabbitmq.connnection-timeout | 连接超时参数单位为毫秒:设置为“0”代表无穷大 | |
spring.rabbitmq.dynamic | 默认创建一个AmqpAdmin的Bean | true |
spring.rabbitmq.host | RabbitMQ的主机地址 | localhost |
spring.rabbitmq.listener.acknowledge-mode | 容器的acknowledge模式 | |
spring.rabbitmq.listener.auto-startup | 启动时自动启动容器 | true |
spring.rabbitmq.listener.concurrency | 消费者的最小数量 | |
spring.rabbitmq.listener.default-requeue-rejected | 投递失败时是否重新排队 | true |
spring.rabbitmq.listener.max-concurrency | 消费者的最大数量 | |
spring.rabbitmq.listener.prefetch | 在单个请求中处理的消息个数,他应该大于等于事务数量 | |
spring.rabbitmq.listener.retry.enabled | 是否开启重试机制,默认不开启 | false |
spring.rabbitmq.listener.retry.initial-interval | 第一次与第二次投递尝试的时间间隔 | 1000 |
spring.rabbitmq.listener.retry.max-attempts | 尝试投递消息的最大数量 | 3 |
spring.rabbitmq.listener.retry.max-interval | 两次尝试的最大时间间隔 | 10000 |
spring.rabbitmq.listener.retry.multiplier | 上一次尝试时间间隔的乘数 | 1.0 |
spring.rabbitmq.listener.retry.stateless | 不论重试是有状态的还是无状态的 | true |
spring.rabbitmq.listener.transaction-size | 在一个事务中处理的消息数量。为了获得最佳效果,该值应设置为小于等于每个请求中处理的消息个数,即spring.rabbitmq.listener.prefetch的值 | |
spring.rabbitmq.password | 登录到RabbitMQ的密码 | |
spring.rabbitmq.port | RabbitMQ的端口号 | 5672 |
spring.rabbitmq.publisher-confirms | 开启Publisher Confirm机制 | false |
spring.rabbitmq.publisher-returns | 开启publisher Return机制 | false |
spring.rabbitmq.requested-heartbeat | 请求心跳超时时间,单位为秒 | |
spring.rabbitmq.ssl.enabled | 启用SSL支持 | false |
spring.rabbitmq.ssl.key-store | 保存Sspring.rabbitmq.listener.simple.acknowledge-modeSL证书的地址 | |
spring.rabbitmq.ssl.key-store-password | 访问SSL证书的地址使用的密码 | |
spring.rabbitmq.ssl.trust-store | SL的可信地址 | |
spring.rabbitmq.ssl.trust-store-password | 访问SSL的可信地址的密码 | |
spring.rabbitmq.ssl.algorithm | SSL算法,默认使用Rabbit的客户端算法库 | |
spring.rabbitmq.template.mandatory | 启用强制信息 | false |
spring.rabbitmq.template.receive-timeout | receive()方法的超时时间 | 0 |
spring.rabbitmq.template.reply-timeout | sendAndReceive()方法的超时时间 | 5000 |
spring.rabbitmq.template.retry.enabled | 设置为true的时候RabbitTemplate能够实现重试 | false |
spring.rabbitmq.template.retry.initial-interval | 第一次与第二次发布消息的时间间隔 | 1000 |
spring.rabbitmq.template.retry.max-attempts | 尝试发布消息的最大数量 | 3 |
spring.rabbitmq.template.retry.max-interval | 尝试发布消息的最大时间间隔 | 10000 |
spring.rabbitmq.template.retry.multiplier | 上一次尝试时间间隔的乘数 | 1.0 |
spring.rabbitmq.username | 登录到RabbitMQ的用户名 | |
spring.rabbitmq.virtual-host | 连接到RabbitMQ的虚拟主机 |
1.3 开启 @RabbitListener 注解
-
通过 @EnableRabbit 注解来启用 @RabbitListener。
-
@RabbitListener 注解指定了目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者 Binding 。使用@RabbitListener 可以设置一个自己明确默认值的 RabbitListenerContainerFactory 对象。
@EnableRabbit
@SpringBootApplication
public class SpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootApplication.class, args);
}
}
二、简单示例
2.1 配置交换机、队列、绑定关系
@Configuration
public class RabbitMQConfig {
@Bean
public TopicExchange orderExchange(){
return new TopicExchange("simpleExchange", true, false);
}
@Bean
public Queue orderQueue(){
return new Queue("simpleQueue", true);
}
@Bean
public Binding beanBinding(TopicExchange orderExchange, Queue orderQueue) {
return BindingBuilder
// 创建队列
.bind(orderQueue)
// 创建交换机
.to(orderExchange)
// 指定路由 Key
.with("simple.#");
}
}
2.2 创建消费者对象
@Component
public class Consume {
@RabbitListener(queues = "simpleQueue")
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端Payload: " + message.getPayload());
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
2.3 创建生产者对象
@SpringBootTest
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendSimpleMessage() {
String exchange = "simpleExchange";
String routingKey = "simple.message";
rabbitTemplate.convertAndSend(exchange, routingKey, "hello simpleExchange");
}
}
2.4 测试
先启动应用,然后执行 sendSimpleMessage() 方法,观察控制台输出:
--------------------------------------
消费端Payload: hello simpleExchange
说明 @RabbitListener 生效了。
三、@RabbitListener 高级使用
3.1 在 @RabbitListener 中申明绑定关系
消费者:
public class BindingConsume {
@RabbitListener(
queues = "simpleQueue")
@RabbitListener(
bindings =
@QueueBinding(
exchange = @Exchange(value = "bindingExchange",
type = "topic"),
value = @Queue(value = "bindingQueue",
durable = "true"),
key = "binding.*"
)
)
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端Payload: " + message.getPayload());
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
生产者:
@SpringBootTest
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendBindingMessage() {
String exchange = "bindingExchange";
String routingKey = "binding.message";
rabbitTemplate.convertAndSend(exchange, routingKey, "hello bindingExchange");
}
}
先启动应用,然后执行 sendBindingMessage() 方法,观察控制台输出:
--------------------------------------
消费端Payload: hello bindingExchange
说明 @RabbitListener 生效了。
3.2 自定义消息参数类型
创建订单对象
// 记得需要实现序列化接口
public class Order implements Serializable{
private String orderId;
private BigDecimal amount;
public Order() {
}
public Order(String orderId, BigDecimal amount) {
this.orderId = orderId;
this.amount = amount;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public BigDecimal getAmount() {
return amount;
}
public void setAmount(BigDecimal amount) {
this.amount = amount;
}
@Override
public String toString() {
return "Order{" +
"orderId='" + orderId + '\'' +
", amount=" + amount +
'}';
}
}
创建消费者
@Component
public class OrderConsume {
@RabbitListener(
bindings =
@QueueBinding(
value = @Queue(value = "orderQueue",
durable = "true"),
exchange = @Exchange(value = "orderExchange",
type = "topic"),
key = "order.*"
)
)
public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端order: " + order.toString());
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手工ACK
channel.basicAck(deliveryTag, false);
}
}
创建生产者
@SpringBootTest
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendOrderMessage() {
String exchange = "orderExchange";
String routingKey = "order.message";
Order newOrder = new Order("10001", BigDecimal.valueOf(300));
rabbitTemplate.convertAndSend(exchange, routingKey, newOrder);
}
}
先启动应用,然后执行 sendOrderMessage() 方法,观察控制台输出:
--------------------------------------
消费端order: Order{orderId='10001', amount=300}
3.3 @RabbitListener 和@RabbitHandler 的使用
@RabbitListener 除了在方法上面声明,也可以在类上面申明 。当声明在类上之后,就需要和@RabbitHandler 搭配使用:
- @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用;
- @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型;
- 当找不到对应的参数类型方法时将会抛出 ListenerExecutionFailedException 异常,可以通过设置 @RabbitHandler 的 isDefault 属性为 true 为它声明一个默认处理方法。
创建 Consume:
@Component
@RabbitListener(
bindings =
@QueueBinding(
value = @Queue(value = "handlerQueue",
durable = "true"),
exchange = @Exchange(value = "handlerExchange",
type = "topic"),
key = "handler.*"
)
)
public class HandlerConsume {
@RabbitHandler(isDefault = true)
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端默认处理: " + message.getPayload());
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
@RabbitHandler
public void onMessage(@Payload String msg, @Headers Map<String, Object> headers, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端String参数: " + msg);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
@RabbitHandler
public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端Order参数: " + order.toString());
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
创建生产者:
@SpringBootTest
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendHandlerStringMessage() {
String exchange = "handlerExchange";
String routingKey = "handler.message";
// 发送字符串消息
rabbitTemplate.convertAndSend(exchange, routingKey, "hello handlerExchange");
}
@Test
public void sendHandlerOrderMessage() {
String exchange = "handlerExchange";
String routingKey = "handler.message";
// 发送对象消息
Order newOrder = new Order("10001", BigDecimal.valueOf(300));
rabbitTemplate.convertAndSend(exchange, routingKey, newOrder);
}
@Test
public void sendHandlerArrayMessage() {
String exchange = "handlerExchange";
String routingKey = "handler.message";
// 发送字符数组消息
String[] array = {"123","456","789"};
rabbitTemplate.convertAndSend(exchange, routingKey, array);
}
}
先启动应用,然后分别执行 sendHandlerStringMessage() 、sendHandlerOrderMessage() 和 sendHandlerArrayMessage方法,观察控制台输出:
--------------------------------------
消费端String参数: hello handlerExchange
--------------------------------------
消费端Order参数: Order{orderId='10001', amount=300}
--------------------------------------
消费端默认处理: [Ljava.lang.String;@7c655a9a
说明 @RabbitListener 和@RabbitHandler 生效了。
注意:本文归作者所有,未经作者允许,不得转载