SpringBoot 是在 Spring AMQP 上面再次封装了一层,不需要再像 Spring AMQP 一样注入各个组件 Bean, 只需要在配置文件上配置好 RabbitMQ 属性,SpringBoot 就可以自动注入了。
而使用 @RabbitListener 注解可以轻松实现消费端事件监听处理。
一、项目配置
1.1 添加 Spring AMQP 依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.1.RELEASE</version>
</dependency>
1.2 配置属性
在 /resources/application.properties 文件中申明属性:
# rabbitmq 配置
## 主机地址
spring.rabbitmq.host=111.231.83.100
## 端口号
spring.rabbitmq.port=5672
## 虚拟主机路径
spring.rabbitmq.virtual-host=/
## 连接超时时间
spring.rabbitmq.connection-timeout=15000
## 消费者设置手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## 消费者每次消费数量
spring.rabbitmq.listener.simple.concurrency=1
## 最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=5
# 自定义属性
## 队列名称
spring.rabbitmq.listener.order.queue.name=orderQueue
## 队列是否持久化
spring.rabbitmq.listener.order.queue.durable=true
## 交换机名称
spring.rabbitmq.listener.order.exchange.name=orderExchange
## 交换机是否持久化
spring.rabbitmq.listener.order.exchange.durable=true
## 交换机类型
spring.rabbitmq.listener.order.exchange.type=topic
## 是否忽略异常
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
## 路由 key
spring.rabbitmq.listener.order.routingKey=order#
相关属性表:
| 属性名 | 说明 | 默认值 | 
|---|---|---|
| spring.rabbitmq.address | 客户端连接的地址,有多个的时候使用逗号分隔,该地址可以是IP与Port的结合 | |
| spring.rabbitmq.cache.channel.checkout-timeout | 当缓存已满时,获取Channel的等待时间,单位为毫秒 | |
| spring.rabbitmq.cache.channel.size | 缓存中保持的Channel数量 | |
| spring.rabbitmq.cache.connection.mode | 连接缓存的模式 | CHANNEL | 
| spring.rabbitmq.cache.connection.size | 缓存的连接数 | |
| spring.rabbitmq.connnection-timeout | 连接超时参数单位为毫秒:设置为“0”代表无穷大 | |
| spring.rabbitmq.dynamic | 默认创建一个AmqpAdmin的Bean | true | 
| spring.rabbitmq.host | RabbitMQ的主机地址 | localhost | 
| spring.rabbitmq.listener.acknowledge-mode | 容器的acknowledge模式 | |
| spring.rabbitmq.listener.auto-startup | 启动时自动启动容器 | true | 
| spring.rabbitmq.listener.concurrency | 消费者的最小数量 | |
| spring.rabbitmq.listener.default-requeue-rejected | 投递失败时是否重新排队 | true | 
| spring.rabbitmq.listener.max-concurrency | 消费者的最大数量 | |
| spring.rabbitmq.listener.prefetch | 在单个请求中处理的消息个数,他应该大于等于事务数量 | |
| spring.rabbitmq.listener.retry.enabled | 是否开启重试机制,默认不开启 | false | 
| spring.rabbitmq.listener.retry.initial-interval | 第一次与第二次投递尝试的时间间隔 | 1000 | 
| spring.rabbitmq.listener.retry.max-attempts | 尝试投递消息的最大数量 | 3 | 
| spring.rabbitmq.listener.retry.max-interval | 两次尝试的最大时间间隔 | 10000 | 
| spring.rabbitmq.listener.retry.multiplier | 上一次尝试时间间隔的乘数 | 1.0 | 
| spring.rabbitmq.listener.retry.stateless | 不论重试是有状态的还是无状态的 | true | 
| spring.rabbitmq.listener.transaction-size | 在一个事务中处理的消息数量。为了获得最佳效果,该值应设置为小于等于每个请求中处理的消息个数,即spring.rabbitmq.listener.prefetch的值 | |
| spring.rabbitmq.password | 登录到RabbitMQ的密码 | |
| spring.rabbitmq.port | RabbitMQ的端口号 | 5672 | 
| spring.rabbitmq.publisher-confirms | 开启Publisher Confirm机制 | false | 
| spring.rabbitmq.publisher-returns | 开启publisher Return机制 | false | 
| spring.rabbitmq.requested-heartbeat | 请求心跳超时时间,单位为秒 | |
| spring.rabbitmq.ssl.enabled | 启用SSL支持 | false | 
| spring.rabbitmq.ssl.key-store | 保存Sspring.rabbitmq.listener.simple.acknowledge-modeSL证书的地址 | |
| spring.rabbitmq.ssl.key-store-password | 访问SSL证书的地址使用的密码 | |
| spring.rabbitmq.ssl.trust-store | SL的可信地址 | |
| spring.rabbitmq.ssl.trust-store-password | 访问SSL的可信地址的密码 | |
| spring.rabbitmq.ssl.algorithm | SSL算法,默认使用Rabbit的客户端算法库 | |
| spring.rabbitmq.template.mandatory | 启用强制信息 | false | 
| spring.rabbitmq.template.receive-timeout | receive()方法的超时时间 | 0 | 
| spring.rabbitmq.template.reply-timeout | sendAndReceive()方法的超时时间 | 5000 | 
| spring.rabbitmq.template.retry.enabled | 设置为true的时候RabbitTemplate能够实现重试 | false | 
| spring.rabbitmq.template.retry.initial-interval | 第一次与第二次发布消息的时间间隔 | 1000 | 
| spring.rabbitmq.template.retry.max-attempts | 尝试发布消息的最大数量 | 3 | 
| spring.rabbitmq.template.retry.max-interval | 尝试发布消息的最大时间间隔 | 10000 | 
| spring.rabbitmq.template.retry.multiplier | 上一次尝试时间间隔的乘数 | 1.0 | 
| spring.rabbitmq.username | 登录到RabbitMQ的用户名 | |
| spring.rabbitmq.virtual-host | 连接到RabbitMQ的虚拟主机 | 
1.3 开启 @RabbitListener 注解
- 
通过 @EnableRabbit 注解来启用 @RabbitListener。 
- 
@RabbitListener 注解指定了目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者 Binding 。使用@RabbitListener 可以设置一个自己明确默认值的 RabbitListenerContainerFactory 对象。 
@EnableRabbit
@SpringBootApplication
public class SpringbootApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringbootApplication.class, args);
    }
}
二、简单示例
2.1 配置交换机、队列、绑定关系
@Configuration
public class RabbitMQConfig {
    @Bean
    public TopicExchange orderExchange(){
        return new TopicExchange("simpleExchange", true, false);
    }
    @Bean
    public Queue orderQueue(){
        return new Queue("simpleQueue", true);
    }
    @Bean
    public Binding beanBinding(TopicExchange orderExchange, Queue orderQueue) {
        return BindingBuilder
                // 创建队列
                .bind(orderQueue)
                // 创建交换机
                .to(orderExchange)
                // 指定路由 Key
                .with("simple.#");
    }
}
2.2 创建消费者对象
@Component
public class Consume {
    @RabbitListener(queues = "simpleQueue")
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}
2.3 创建生产者对象
@SpringBootTest
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendSimpleMessage() {
        String exchange = "simpleExchange";
        String routingKey = "simple.message";
        rabbitTemplate.convertAndSend(exchange, routingKey, "hello simpleExchange");
    }
}
2.4 测试
先启动应用,然后执行 sendSimpleMessage() 方法,观察控制台输出:
--------------------------------------
消费端Payload: hello simpleExchange
说明 @RabbitListener 生效了。
三、@RabbitListener 高级使用
3.1 在 @RabbitListener 中申明绑定关系
消费者:
public class BindingConsume {
    @RabbitListener(
            queues = "simpleQueue")
    @RabbitListener(
            bindings =
            @QueueBinding(
                    exchange = @Exchange(value = "bindingExchange",
                            type = "topic"),
                    value = @Queue(value = "bindingQueue",
                            durable = "true"),
                    key = "binding.*"
            )
    )
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}
生产者:
@SpringBootTest
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendBindingMessage() {
        String exchange = "bindingExchange";
        String routingKey = "binding.message";
        rabbitTemplate.convertAndSend(exchange, routingKey, "hello bindingExchange");
    }
}    
先启动应用,然后执行 sendBindingMessage() 方法,观察控制台输出:
--------------------------------------
消费端Payload: hello bindingExchange
说明 @RabbitListener 生效了。
3.2 自定义消息参数类型
创建订单对象
// 记得需要实现序列化接口
public class Order implements Serializable{
    private String orderId;
    private BigDecimal amount;
    public Order() {
    }
    public Order(String orderId, BigDecimal amount) {
        this.orderId = orderId;
        this.amount = amount;
    }
    public String getOrderId() {
        return orderId;
    }
    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }
    public BigDecimal getAmount() {
        return amount;
    }
    public void setAmount(BigDecimal amount) {
        this.amount = amount;
    }
    
    @Override
    public String toString() {
        return "Order{" +
                "orderId='" + orderId + '\'' +
                ", amount=" + amount +
                '}';
    }
}
创建消费者
@Component
public class OrderConsume {
    @RabbitListener(
            bindings =
            @QueueBinding(
                    value = @Queue(value = "orderQueue",
                            durable = "true"),
                    exchange = @Exchange(value = "orderExchange",
                            type = "topic"),
                    key = "order.*"
            )
    )
    public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端order: " + order.toString());
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手工ACK
        channel.basicAck(deliveryTag, false);
    }
}
创建生产者
@SpringBootTest
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendOrderMessage() {
        String exchange = "orderExchange";
        String routingKey = "order.message";
        Order newOrder = new Order("10001", BigDecimal.valueOf(300));
        rabbitTemplate.convertAndSend(exchange, routingKey, newOrder);
    }
}    
先启动应用,然后执行 sendOrderMessage() 方法,观察控制台输出:
--------------------------------------
消费端order: Order{orderId='10001', amount=300}
3.3 @RabbitListener 和@RabbitHandler 的使用
@RabbitListener 除了在方法上面声明,也可以在类上面申明 。当声明在类上之后,就需要和@RabbitHandler 搭配使用:
- @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用;
- @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型;
- 当找不到对应的参数类型方法时将会抛出 ListenerExecutionFailedException 异常,可以通过设置 @RabbitHandler 的 isDefault 属性为 true 为它声明一个默认处理方法。
创建 Consume:
@Component
@RabbitListener(
        bindings =
        @QueueBinding(
                value = @Queue(value = "handlerQueue",
                        durable = "true"),
                exchange = @Exchange(value = "handlerExchange",
                        type = "topic"),
                key = "handler.*"
        )
)
public class HandlerConsume {
    @RabbitHandler(isDefault = true)
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端默认处理: " + message.getPayload());
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
    @RabbitHandler
    public void onMessage(@Payload String msg, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端String参数: " + msg);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
    @RabbitHandler
    public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Order参数: " + order.toString());
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}
创建生产者:
@SpringBootTest
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendHandlerStringMessage() {
        String exchange = "handlerExchange";
        String routingKey = "handler.message";
        // 发送字符串消息
        rabbitTemplate.convertAndSend(exchange, routingKey, "hello handlerExchange");
    }
    @Test
    public void sendHandlerOrderMessage() {
        String exchange = "handlerExchange";
        String routingKey = "handler.message";
        // 发送对象消息
        Order newOrder = new Order("10001", BigDecimal.valueOf(300));
        rabbitTemplate.convertAndSend(exchange, routingKey, newOrder);
    }
    
    @Test
    public void sendHandlerArrayMessage() {
        String exchange = "handlerExchange";
        String routingKey = "handler.message";
        // 发送字符数组消息
        String[] array = {"123","456","789"};
        rabbitTemplate.convertAndSend(exchange, routingKey, array);
    }
}    
先启动应用,然后分别执行 sendHandlerStringMessage() 、sendHandlerOrderMessage() 和 sendHandlerArrayMessage方法,观察控制台输出:
--------------------------------------
消费端String参数: hello handlerExchange
--------------------------------------
消费端Order参数: Order{orderId='10001', amount=300}
--------------------------------------
消费端默认处理: [Ljava.lang.String;@7c655a9a
说明 @RabbitListener 和@RabbitHandler 生效了。
注意:本文归作者所有,未经作者允许,不得转载
