RabbitMQ整合SpringBoot

求知探索 1年前 ⋅ 779 阅读

SpringBoot 是在 Spring AMQP 上面再次封装了一层,不需要再像 Spring AMQP 一样注入各个组件 Bean, 只需要在配置文件上配置好 RabbitMQ 属性,SpringBoot 就可以自动注入了。

而使用 @RabbitListener 注解可以轻松实现消费端事件监听处理。

一、项目配置

1.1 添加 Spring AMQP 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.1.RELEASE</version>
</dependency>

1.2 配置属性

在 /resources/application.properties 文件中申明属性:

# rabbitmq 配置
## 主机地址
spring.rabbitmq.host=111.231.83.100
## 端口号
spring.rabbitmq.port=5672
## 虚拟主机路径
spring.rabbitmq.virtual-host=/
## 连接超时时间
spring.rabbitmq.connection-timeout=15000
## 消费者设置手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## 消费者每次消费数量
spring.rabbitmq.listener.simple.concurrency=1
## 最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=5

# 自定义属性
## 队列名称
spring.rabbitmq.listener.order.queue.name=orderQueue
## 队列是否持久化
spring.rabbitmq.listener.order.queue.durable=true
## 交换机名称
spring.rabbitmq.listener.order.exchange.name=orderExchange
## 交换机是否持久化
spring.rabbitmq.listener.order.exchange.durable=true
## 交换机类型
spring.rabbitmq.listener.order.exchange.type=topic
## 是否忽略异常
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
## 路由 key
spring.rabbitmq.listener.order.routingKey=order#

相关属性表:

属性名 说明 默认值
spring.rabbitmq.address 客户端连接的地址,有多个的时候使用逗号分隔,该地址可以是IP与Port的结合  
spring.rabbitmq.cache.channel.checkout-timeout 当缓存已满时,获取Channel的等待时间,单位为毫秒  
spring.rabbitmq.cache.channel.size 缓存中保持的Channel数量  
spring.rabbitmq.cache.connection.mode 连接缓存的模式 CHANNEL
spring.rabbitmq.cache.connection.size 缓存的连接数  
spring.rabbitmq.connnection-timeout 连接超时参数单位为毫秒:设置为“0”代表无穷大  
spring.rabbitmq.dynamic 默认创建一个AmqpAdmin的Bean true
spring.rabbitmq.host RabbitMQ的主机地址 localhost
spring.rabbitmq.listener.acknowledge-mode 容器的acknowledge模式  
spring.rabbitmq.listener.auto-startup 启动时自动启动容器 true
spring.rabbitmq.listener.concurrency 消费者的最小数量  
spring.rabbitmq.listener.default-requeue-rejected 投递失败时是否重新排队 true
spring.rabbitmq.listener.max-concurrency 消费者的最大数量  
spring.rabbitmq.listener.prefetch 在单个请求中处理的消息个数,他应该大于等于事务数量  
spring.rabbitmq.listener.retry.enabled 是否开启重试机制,默认不开启 false
spring.rabbitmq.listener.retry.initial-interval 第一次与第二次投递尝试的时间间隔 1000
spring.rabbitmq.listener.retry.max-attempts 尝试投递消息的最大数量 3
spring.rabbitmq.listener.retry.max-interval 两次尝试的最大时间间隔 10000
spring.rabbitmq.listener.retry.multiplier 上一次尝试时间间隔的乘数 1.0
spring.rabbitmq.listener.retry.stateless 不论重试是有状态的还是无状态的 true
spring.rabbitmq.listener.transaction-size 在一个事务中处理的消息数量。为了获得最佳效果,该值应设置为小于等于每个请求中处理的消息个数,即spring.rabbitmq.listener.prefetch的值  
spring.rabbitmq.password 登录到RabbitMQ的密码  
spring.rabbitmq.port RabbitMQ的端口号 5672
spring.rabbitmq.publisher-confirms 开启Publisher Confirm机制 false
spring.rabbitmq.publisher-returns 开启publisher Return机制 false
spring.rabbitmq.requested-heartbeat 请求心跳超时时间,单位为秒  
spring.rabbitmq.ssl.enabled 启用SSL支持 false
spring.rabbitmq.ssl.key-store 保存Sspring.rabbitmq.listener.simple.acknowledge-modeSL证书的地址  
spring.rabbitmq.ssl.key-store-password 访问SSL证书的地址使用的密码  
spring.rabbitmq.ssl.trust-store SL的可信地址  
spring.rabbitmq.ssl.trust-store-password 访问SSL的可信地址的密码  
spring.rabbitmq.ssl.algorithm SSL算法,默认使用Rabbit的客户端算法库  
spring.rabbitmq.template.mandatory 启用强制信息 false
spring.rabbitmq.template.receive-timeout receive()方法的超时时间 0
spring.rabbitmq.template.reply-timeout sendAndReceive()方法的超时时间 5000
spring.rabbitmq.template.retry.enabled 设置为true的时候RabbitTemplate能够实现重试 false
spring.rabbitmq.template.retry.initial-interval 第一次与第二次发布消息的时间间隔 1000
spring.rabbitmq.template.retry.max-attempts 尝试发布消息的最大数量 3
spring.rabbitmq.template.retry.max-interval 尝试发布消息的最大时间间隔 10000
spring.rabbitmq.template.retry.multiplier 上一次尝试时间间隔的乘数 1.0
spring.rabbitmq.username 登录到RabbitMQ的用户名  
spring.rabbitmq.virtual-host 连接到RabbitMQ的虚拟主机  

1.3 开启 @RabbitListener 注解

  • 通过 @EnableRabbit 注解来启用 @RabbitListener

  • @RabbitListener 注解指定了目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者 Binding 。使用@RabbitListener 可以设置一个自己明确默认值的 RabbitListenerContainerFactory 对象。

@EnableRabbit
@SpringBootApplication
public class SpringbootApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringbootApplication.class, args);
    }
}

二、简单示例

2.1 配置交换机、队列、绑定关系

@Configuration
public class RabbitMQConfig {
    @Bean
    public TopicExchange orderExchange(){
        return new TopicExchange("simpleExchange", true, false);
    }

    @Bean
    public Queue orderQueue(){
        return new Queue("simpleQueue", true);
    }
    @Bean
    public Binding beanBinding(TopicExchange orderExchange, Queue orderQueue) {
        return BindingBuilder
                // 创建队列
                .bind(orderQueue)
                // 创建交换机
                .to(orderExchange)
                // 指定路由 Key
                .with("simple.#");
    }
}

2.2 创建消费者对象

@Component
public class Consume {

    @RabbitListener(queues = "simpleQueue")
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

2.3 创建生产者对象

@SpringBootTest
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendSimpleMessage() {
        String exchange = "simpleExchange";
        String routingKey = "simple.message";
        rabbitTemplate.convertAndSend(exchange, routingKey, "hello simpleExchange");
    }
}

2.4 测试

先启动应用,然后执行 sendSimpleMessage() 方法,观察控制台输出:

--------------------------------------
消费端Payload: hello simpleExchange

说明 @RabbitListener 生效了。

三、@RabbitListener 高级使用

3.1 在 @RabbitListener 中申明绑定关系

消费者:

public class BindingConsume {


    @RabbitListener(
            queues = "simpleQueue")
    @RabbitListener(
            bindings =
            @QueueBinding(
                    exchange = @Exchange(value = "bindingExchange",
                            type = "topic"),
                    value = @Queue(value = "bindingQueue",
                            durable = "true"),
                    key = "binding.*"
            )
    )
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

生产者:

@SpringBootTest
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendBindingMessage() {
        String exchange = "bindingExchange";
        String routingKey = "binding.message";
        rabbitTemplate.convertAndSend(exchange, routingKey, "hello bindingExchange");
    }
}    

先启动应用,然后执行 sendBindingMessage() 方法,观察控制台输出:

--------------------------------------
消费端Payload: hello bindingExchange

说明 @RabbitListener 生效了。

3.2 自定义消息参数类型

创建订单对象

// 记得需要实现序列化接口
public class Order implements Serializable{
    private String orderId;
    private BigDecimal amount;

    public Order() {

    }

    public Order(String orderId, BigDecimal amount) {
        this.orderId = orderId;
        this.amount = amount;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public BigDecimal getAmount() {
        return amount;
    }

    public void setAmount(BigDecimal amount) {
        this.amount = amount;
    }
    
    @Override
    public String toString() {
        return "Order{" +
                "orderId='" + orderId + '\'' +
                ", amount=" + amount +
                '}';
    }
}

创建消费者

@Component
public class OrderConsume {

    @RabbitListener(
            bindings =
            @QueueBinding(
                    value = @Queue(value = "orderQueue",
                            durable = "true"),
                    exchange = @Exchange(value = "orderExchange",
                            type = "topic"),
                    key = "order.*"
            )
    )
    public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端order: " + order.toString());
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

创建生产者

@SpringBootTest
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendOrderMessage() {
        String exchange = "orderExchange";
        String routingKey = "order.message";
        Order newOrder = new Order("10001", BigDecimal.valueOf(300));
        rabbitTemplate.convertAndSend(exchange, routingKey, newOrder);
    }
}    

先启动应用,然后执行 sendOrderMessage() 方法,观察控制台输出:

--------------------------------------
消费端order: Order{orderId='10001', amount=300}

3.3 @RabbitListener 和@RabbitHandler 的使用

@RabbitListener 除了在方法上面声明,也可以在类上面申明 。当声明在类上之后,就需要和@RabbitHandler 搭配使用:

  • @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用;
  • @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型;
  • 当找不到对应的参数类型方法时将会抛出 ListenerExecutionFailedException 异常,可以通过设置 @RabbitHandler 的 isDefault 属性为 true 为它声明一个默认处理方法。

创建 Consume:

@Component
@RabbitListener(
        bindings =
        @QueueBinding(
                value = @Queue(value = "handlerQueue",
                        durable = "true"),
                exchange = @Exchange(value = "handlerExchange",
                        type = "topic"),
                key = "handler.*"
        )
)
public class HandlerConsume {

    @RabbitHandler(isDefault = true)
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端默认处理: " + message.getPayload());
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

    @RabbitHandler
    public void onMessage(@Payload String msg, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端String参数: " + msg);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }


    @RabbitHandler
    public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Order参数: " + order.toString());
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

创建生产者:

@SpringBootTest
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendHandlerStringMessage() {
        String exchange = "handlerExchange";
        String routingKey = "handler.message";
        // 发送字符串消息
        rabbitTemplate.convertAndSend(exchange, routingKey, "hello handlerExchange");
    }

    @Test
    public void sendHandlerOrderMessage() {
        String exchange = "handlerExchange";
        String routingKey = "handler.message";
        // 发送对象消息
        Order newOrder = new Order("10001", BigDecimal.valueOf(300));
        rabbitTemplate.convertAndSend(exchange, routingKey, newOrder);
    }
    
    @Test
    public void sendHandlerArrayMessage() {
        String exchange = "handlerExchange";
        String routingKey = "handler.message";
        // 发送字符数组消息
        String[] array = {"123","456","789"};
        rabbitTemplate.convertAndSend(exchange, routingKey, array);
    }
}    

先启动应用,然后分别执行 sendHandlerStringMessage() 、sendHandlerOrderMessage() 和 sendHandlerArrayMessage方法,观察控制台输出:

--------------------------------------
消费端String参数: hello handlerExchange
--------------------------------------
消费端Order参数: Order{orderId='10001', amount=300}
--------------------------------------
消费端默认处理: [Ljava.lang.String;@7c655a9a

说明 @RabbitListener 和@RabbitHandler 生效了。


全部评论: 0

    我有话说: