场景
Rabbitmq延迟队列
死信交换机
- 
一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。 
- 
上面的消息的TTL到了,消息过期了。 
- 
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。 
消息TTL(消息存活时间)
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去 

创建交换机(Exchanges)和队列(Queues)
创建死信交换机

创建自动过期消息队列

x-dead-letter-exchange代表消息过期后,消息要进入的交换机,这里配置的是delay,也就是死信交换机,x-dead-letter-routing-key是配置消息过期后,进入死信交换机的routing-key,跟发送消息的routing-key一个道理,根据这个key将消息放入不同的队列创建消息处理队列

消息队列绑定到交换机



String msg = "hello word";
MessageProperties messageProperties = new MessageProperties();
  messageProperties.setExpiration("6000");
  messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
  Message message = new Message(msg.getBytes(), messageProperties);
  rabbitTemplate.convertAndSend("delay", "delay",message);主要的代码就是
messageProperties.setExpiration("6000");注意:因为要让消息自动过期,所以一定不能设置delay_queue1的监听,不能让这个队列里面的消息被接受到,否则消息一旦被消费,就不存在过期了 
package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayQueue {
 /** 消息交换机的名字*/
 public static final String EXCHANGE = "delay";
 /** 队列key1*/
 public static final String ROUTINGKEY1 = "delay";
 /** 队列key2*/
 public static final String ROUTINGKEY2 = "delay_key";
 /**
  * 配置链接信息
  * @return
  */
 @Bean
 public ConnectionFactory connectionFactory() {
  CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);
  
  connectionFactory.setUsername("kberp");
  connectionFactory.setPassword("kberp");
  connectionFactory.setVirtualHost("/");
  connectionFactory.setPublisherConfirms(true); // 必须要设置
  return connectionFactory;
 }
 
 /**  
  * 配置消息交换机
     * 针对消费者配置  
        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
        HeadersExchange :通过添加属性key-value匹配  
        DirectExchange:按照routingkey分发到指定队列  
        TopicExchange:多关键字匹配  
     */  
    @Bean  
    public DirectExchange defaultExchange() {  
     return new DirectExchange(EXCHANGE, true, false);
    } 
   
    /**
     * 配置消息队列2
     * 针对消费者配置  
     * @return
     */
    @Bean
    public Queue queue() {  
       return new Queue("delay_queue2", true); //队列持久  
  
    }
    /**
     * 将消息队列2与交换机绑定
     * 针对消费者配置  
     * @return
     */
    @Bean  
    @Autowired
    public Binding binding() {  
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);  
    } 
    /**
     * 接受消息的监听,这个监听会接受消息队列1的消息
     * 针对消费者配置  
     * @return
     */
    @Bean  
    @Autowired
    public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
        container.setQueues(queue());  
        container.setExposeListenerChannel(true);  
        container.setMaxConcurrentConsumers(1);  
        container.setConcurrentConsumers(1);  
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  
        container.setMessageListener(new ChannelAwareMessageListener() {
   public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
    byte[] body = message.getBody();  
                System.out.println("delay_queue2 收到消息 : " + new String(body));  
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  
    
   }  
  
        });  
        return container;  
    }  
    
   
}总结
注意:本文归作者所有,未经作者允许,不得转载
