RabbitMQ中级

一、SpringBoot中使用RabbitMq

1. fanout模式

img

导入jar包

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

配置连接

# 服务端口
server:
  port: 3001
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: 账户
    password: 密码
    virtual-host: /
    host: IP
    port: 5672

创建配置类

package com.wz.sdkservspringrq.config;
​
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
/**
 * @DESCRIPTION:
 * @USER: wangzhen
 * @DATE: 2022/9/6 18:59
 */
@Configuration
public class FanoutRabbitMqConfig {
    @Bean
    public Queue emailQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        // return new Queue("TestDirectQueue",true,true,false);
        // 一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("email.fanout.queue", true);
    }
​
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.fanout.queue", true);
    }
​
    @Bean
    public Queue weixinQueue() {
        return new Queue("weixin.fanout.queue", true);
    }
​
    /**
     * 创建交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutProducerExchange() {
        return new FanoutExchange("fanout_producer_exchange", true, false);
    }
​
​
    /**
     * @despriction 队列绑定交换机
     * @return
     */
    @Bean
    public Binding bindingFanoutWeixin() {
        return BindingBuilder.bind(weixinQueue()).to(fanoutProducerExchange());
    }
    @Bean
    public Binding bindingFanoutSms() {
        return BindingBuilder.bind(smsQueue()).to(fanoutProducerExchange());
    }
    @Bean
    public Binding bindingFanoutEmail() {
        return BindingBuilder.bind(emailQueue()).to(fanoutProducerExchange());
    }
​
}
​

创建生产者

@Component
public class ProducerService {
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 1: 定义交换机
    private String fanoutExchangeName = "fanout_producer_exchange";
    // 2: 路由key
    private String routeKey = "";
​
    public void makeOrder(Long userId, Long productId, int num) {
        // 1: 模拟用户下单
        String orderNumber = UUID.randomUUID().toString();
        // 2: 根据商品id productId 去查询商品的库存
        // int numstore = productSerivce.getProductNum(productId);
        // 3:判断库存是否充足
        // if(num >  numstore ){ return  "商品库存不足..."; }
        // 4: 下单逻辑
        // orderService.saveOrder(order);
        // 5: 下单成功要扣减库存
        // 6: 下单完成以后
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumber);
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(fanoutExchangeName, routeKey, orderNumber);
    }
}

构建消费者Email,短信,微信服务

需要注意@RabbitListener@RabbitHandler用法

  • @RabbitListener可以修饰类和方法,用于监听队列的消息,指定队列名即可,其中包含bindings等参数,可以完成交换机队列绑定。

  • @RabbitListener修饰方法时,可以直接监听队列消息,此时接收的参数需要与发送的类型一致,注意这里如果类型,如果监听是

  • @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用,@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。

package com.wz.sdkservspringrq.consumer;
​
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
​
/**
 * @DESCRIPTION: Email服务
 * @USER: wangzhen
 * @DATE: 2022/9/6 21:13
 */
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "email.fanout.queue",autoDelete = "false"),
        // fanout_producer_exchange 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "fanout_producer_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.FANOUT)
))
@Component
public class EmailService {
    // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
    @RabbitHandler
    public void messageService(String message){
        // 此处省略发邮件的逻辑
        System.out.println("email-------------->" + message);
    }
}
​
package com.wz.sdkservspringrq.consumer;
​
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
​
/**
 * @DESCRIPTION:短信服务
 * @USER: wangzhen
 * @DATE: 2022/9/6 21:14
 */
@RabbitListener(queues = "sms.fanout.queue")
@Component
public class SMSService {
    // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
    @RabbitHandler
    public void messageService(String message){
        System.out.println("sms-------------->" + message.toString());
    }
​
    @RabbitHandler
    public void messageService(byte[] message){
        System.out.println("sms-byte[]-------------->" + message);
    }
}
​
package com.wz.sdkservspringrq.consumer;
​
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
 * @DESCRIPTION:微信服务
 * @USER: wangzhen
 * @DATE: 2022/9/6 21:15
 */
@Component
public class WeixinService {
    @RabbitListener(queues = "weixin.fanout.queue")
    public void messageService(String message){
        System.out.println("weixin-------------->" + message);
    }
}

启动项目和测试类

@SpringBootTest
public class SdkServSpringRqTests {
    @Autowired
    ProducerService producerService;
​
    @Test
    public void makeOrder() throws Exception {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Long userId = 100L + i;
            Long productId = 10001L + i;
            int num = 10;
            producerService.makeOrder(userId, productId, num);
        }
    }
}

可以看到消息都被消费了

2. direct模式

和fanout模式类似

@Configuration
public class DirectRabbitMqConfig {

    @Bean
    public DirectExchange directProducerExchange() {
        return new DirectExchange("direct_producer_exchange", true, false);
    }

    @Bean
    public Queue smsDirectQueue() {
        return new Queue("sms.direct.queue", true, false, false);
    }

    @Bean
    public Binding bindingDirectSms(){
        return BindingBuilder.bind(smsDirectQueue()).to(directProducerExchange()).with("smsMessage");
    }

}

3. topic模式

和fanout模式类似

@Configuration
public class TopicRabbitMqConfig {

    @Bean
    public TopicExchange topicProducerExchange() {
        return new TopicExchange("topic_producer_exchange", true, false);
    }

    @Bean
    public Queue emailTopicQueue() {
        return new Queue("email.topic.queue", true);
    }

    @Bean
    public Binding bindingTopicEmail() {
        return BindingBuilder.bind(emailTopicQueue()).to(topicProducerExchange()).with("*.message.#");
    }

}

4. 扩展问题

关于RabbitMq在SpringBoot中使用的一些问题:https://www.jianshu.com/p/090ed51006d5

img

content_type

handler的可接收形参

text/plain

String

application/json

/

application/x-java-serialized-object

实体类

application/octet-stream

byte[]

未指定

byte[]

未指定

org.springframework.amqp.core.Message

在执行过程中发现,@RabbitListener监听的队列,如果其中的消息类型与handler形参的类型不一致,会导致死循环,一般会报如下错误

Caused by: org.springframework.amqp.AmqpException: No method found for class [B

有三种解决方法

  • 第一种,@RabbitHandler使用可选参数isDefault,添加一个兜底方案

        @RabbitHandler(isDefault = true)
        public void messageDefaultService(Object message){
            // 此处省略发邮件的逻辑
            System.out.println("email-------------->" + message);
        }
  • 第二种,如果消息是auto自动ack,可以设置重试次数,这也适合消息消费过程中出现异常

    spring:
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: auto #开启自动Ack
            retry:
              enabled: true    # 开启消费重试机制
              max-attempts: 5  # 最大重试机制,默认为3
              initial-interval: 1000  # 重试间隔,单位毫秒,默认1000
  • 第三种就是发送和接收统一规定String接收

//发送是将Bean转成Json,然后json.toString()发送出去
JSONObject json = (JSONObject)JSONObject.toJSON(order);
//接收将json格式的String类型转成Bean
Order order = JSONObject.parseObject(message, Order.class);

//注意使用时需要在配置类中添加上Jackson2JsonMessageConverter覆盖原先的MessageConverter
@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}

二、RabbitMq中一些特性

1. 队列过期时间

RabbitMQ的队列(Queue)的参数及其含义

参数介绍:
1、name: 队列的名称;
2、actualName: 队列的真实名称,默认用name参数,如果name为空,则根据规则生成一个;
3、durable: 是否持久化;
4、exclusive: 是否独享、排外的;
5、autoDelete: 是否自动删除;
6、arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:
(1)x-message-ttl:队列中消息的过期时间,单位:毫秒;
(2)x-expires:队列本身过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

给队列中的消息设置过期时间有两种方式,

  • 第一种设置队列过期时间

@Configuration
public class TTLRabbitMqConfig {
    @Bean
    public Queue ttlQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 15000);//队列中所有消息设置过期时间
        return new Queue("ttl.msg.queue", true, false, false, args);
    }

    @Bean
    public FanoutExchange ttlProducerExchange() {
        return new FanoutExchange("ttl_producer_exchange", true, false);
    }

    @Bean
    public Binding bindingTTL() {
        return BindingBuilder.bind(ttlQueue()).to(ttlProducerExchange());
    }

}
  • 第二种直接给消息设置过期时间

    public void makeOrderOfTTLMsg() {
        String orderNumber = UUID.randomUUID().toString();
        String ttlExchangeName = "ttl_producer_exchange";

        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };

        rabbitTemplate.convertAndSend(ttlExchangeName, "", orderNumber, messagePostProcessor);
    }

这两种方式的区别:

  1. 如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。

  2. 给队列设置过期时间,一旦消息过期,消息将被加入到死信队列中,而单独给消息设置过期时间,一旦过期就不存在了。

2. 消息确认机制

RabbitMQ的消息确认有两种: 1、对生产端发送消息的确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 2、对消费端消费消息的确认。这种是确认消费者是否成功消费了队列中的消息。

2.1 RabbitMQ对生产端发送消息的确认

rabbitmq对生产端发送消息的确认分为事务和实现confirm机制。不过一般不使用事务,性能消耗太大(后面学到分布式事务会提到)

对生产端的confirm机制参见:https://www.cnblogs.com/alan6/p/11483419.html

2.2 消费端消费消息后对RabbitMQ的确认

为了保证消息能可靠到达消费端,RabbitMQ也提供了消费端的消息确认机制。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。 采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。

消费端消息的确认分为:自动确认(默认)、手动确认、不确认

  • AcknowledgeMode.NONE:不确认

  • AcknowledgeMode.AUTO:自动确认

  • AcknowledgeMode.MANUAL:手动确认

手动确认在spring-boot中配置方法:

spring.rabbitmq.listener.simple.acknowledge-mode = manual

1、消费成功手动确认方法:

void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的index
multiple:是否批量确认。true:将一次性ack所有小于deliveryTag的消息。
消费者成功处理消息后,手动调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行消费确认。

2、消费失败手动确认方法:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag:该消息的index。
multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列。

void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:该消息的index。
requeue:被拒绝的是否重新入队列。

channel.basicNack 方法与 channel.basicReject 方法区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。

2.3 消费者手动确认可能出现的问题

1、 消息无法ack

消费端在消费消息过程中出现异常,不能回复ack应答,消息将变成unacked状态,并且一直处于队列中。如果积压的过多将会导致程序无法继续消费数据。 消费端服务重启,断开rabbitmq的连接后,unacked的消息状态会重新变为ready等待消费。但是如果不重启消费端服务,消息将一直驻留在MQ中。

所以,可以捕获异常,然后调用Nack确认,然后消息进入队列重新消费。

2、无效消息循环重入队列

在上一个问题中,如果消费端捕获异常,并进行basicNack应答,并将消息重新放入队列中,可能会出现另一个问题:

如果消息或者代码本身有bug,每次处理这个消息都会报异常,那消息将一直处于消费——>报异常——>重入队列——>继续消费——>报异常。。。的死循环过程。

以上两个问题其实属于同一类问题,都需要我们确保代码在消费消息后,一定要通知MQ,不然消息将一直驻留在MQ中。如果消息成功消费,则调用channel.basicAck正常通知mq;如果消费失败,则调用channel.basicNack或者channel.basicReject确认消费失败。

3、防止死循环有两种处理办法

  • 根据处理过程中报的不同异常类型,选择消息要不要重入队列。

enum Action {
  ACCEPT,  // 处理成功
  RETRY,   // 可以重试的错误
  REJECT,  // 无需重试的错误
}

Action action = Action.RETRY; 
try {
    // 如果成功完成则action=Action.ACCEPT
}
catch (Exception e) {
   // 根据异常种类决定是ACCEPT、RETRY还是 REJECT
}
finally {
  // 通过finally块来保证Ack/Nack会且只会执行一次
  if (action == Action.ACCEPT) {
    channel.basicAck(tag);
  } else if (action == Action.RETRY) {
     channel.basicNack(tag, false, true);
  } else {
     channel.basicNack(tag, false, false);
  }  
} 
  • 将处理失败的消息放入死信队列中,手动取出处理。

3. 死信队列

当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。 消息变成死信,可能是由于以下的原因:

  • 消息被拒绝

  • 队列消息过期

  • 队列长度达到最大后

当正常队列中存在死信消息时,队列如果设置了参数 x-dead-letter-exchange 指定对应的交换机,则死信消息将被指定的交换机接收,进而进入到交换机绑定的队列中。

在实例中需要注意一些事项,以springboot项目为例

1. 开始手动ACK

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #开启手动Ack

2. 设置DLX参数

设置死信队列,通过参数x-dead-letter-exchange,如果是direct模式还需要设置上x-dead-letter-routing-key参数

当死信队列出现死信消息时,及上面提到的三种情况时,设置的x-dead-letter-exchange交换机发挥了作用,消息会被转移到设置的新的队列中,将会做进一步处理。

package com.wz.sdkservspringrq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @DESCRIPTION:
 * @USER: wangzhen
 * @DATE: 2022/9/7 18:44
 */
@Configuration
public class DLXRabbitMqConfig {
    @Autowired
    DirectExchange directProducerExchange;

//    @Bean
//    public DirectExchange directProducerExchange() {
//        return new DirectExchange("direct_producer_exchange", true, false);
//    }
    
    @Bean
    public DirectExchange dlxProducerExchange() {
        return new DirectExchange("dlx_producer_exchange", true, false);
    }

    @Bean
    public Queue emailDirectQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dlx_producer_exchange");
        args.put("x-dead-letter-routing-key", "dlxMessage");
        return new Queue("email.direct.queue", true, false, false, args);
    }

    @Bean
    public Queue dlxDirectQueue() {
        return new Queue("dlx.direct.queue", true);
    }

    @Bean
    public Binding bindingDirectEmail(){
        return BindingBuilder.bind(emailDirectQueue()).to(directProducerExchange).with("message");
    }

    @Bean
    public Binding bindingDlxDirect(){
        return BindingBuilder.bind(dlxDirectQueue()).to(dlxProducerExchange()).with("dlxMessage");
    }

}

3. 手动ACK

package com.wz.sdkservspringrq.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @DESCRIPTION:
 * @USER: wangzhen
 * @DATE: 2022/9/6 21:13
 */
@RabbitListener(queues = "email.direct.queue")
@Component
public class EmailDirectService {
    @RabbitHandler
    public void messageService(byte[] message){
        System.out.println("email-------------->" + message);
    }

    @RabbitHandler
    public void messageDefaultService(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) throws IOException {
        try {
            int i = 1 / 0;
            //告诉MQ删除这一条消息,若是true,则是删除所有小于tags的消息
            channel.basicAck(tag,false);
        } catch (Exception e) {
            //不requeue,消息会被打入到死信队列中
            channel.basicNack(tag, false, false);
        }
    }

    @RabbitHandler(isDefault = true)
    public void messageDefaultService(Object message){
        //如果参数类型不对应,则默认执行这个handler
        System.out.println("email-------------->" + message);
    }
}

4. DLX处理消息

package com.wz.sdkservspringrq.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @DESCRIPTION: 将被打入到死信队列的消息,重新处理
 * @USER: wangzhen
 * @DATE: 2022/9/6 21:14
 */
@RabbitListener(queues = "dlx.direct.queue")
@Component
public class DeadQueueService {
    @RabbitHandler
    public void messageService(byte[] message){
        System.out.println("email-------------->" + message);
    }

    @RabbitHandler
    public void messageDefaultService(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) throws IOException {
        try {
            Thread.sleep(30000);
            int i = 1 / 0;
            channel.basicAck(tag,false);
        } catch (Exception e) {
            System.out.println("这条消息已经没救了--------------------->");
            channel.basicNack(tag, false, false);
        }
    }

    @RabbitHandler(isDefault = true)
    public void messageDefaultService(Object message){
        //如果参数类型不对应,则默认执行这个handler
        System.out.println("email-------------->" + message);
    }
}

4. 内存磁盘监控

4.1 RabbitMQ的内存警告

当内存使用超过配置的阈值或者磁盘空间剩余空间对于配置的阈值时,RabbitMQ会暂时阻塞客户端的连接,并且停止接收从客户端发来的消息,以此避免服务器的崩溃,客户端与服务端的心态检测机制也会失效。

img

4.2 RabbitMQ的内存控制

参考帮助文档:https://www.rabbitmq.com/configure.html,当出现警告的时候,可以通过配置去修改和调整

  • 通过命令

rabbitmqctl set_vm_memory_high_watermark <fraction>
或
rabbitmqctl set_vm_memory_high_watermark absolute 50MB

fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当RabbitMQ的内存超过40%时,就会产生警告并且阻塞所有生产者的连接。通过此命令修改阈值在Broker重启以后将会失效,通过修改配置文件方式设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启broker才会生效。

  • 配置文件方式 rabbitmq.conf

#默认
#vm_memory_high_watermark.relative = 0.4
# 使用relative相对值进行设置fraction,建议取值在04~0.7之间,不建议超过0.7.
vm_memory_high_watermark.relative = 0.6
# 使用absolute的绝对值的方式,但是是KB,MB,GB对应的命令如下
vm_memory_high_watermark.absolute = 2GB

4.3 RabbitMQ的内存换页

在某个Broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。

默认情况下,内存到达的阈值是50%时就会换页处理。也就是说,在默认情况下该内存的阈值是0.4的情况下,当内存超过0.4*0.5=0.2时,会进行换页动作。

比如有1000MB内存,当内存的使用率达到了400MB,已经达到了极限,但是因为配置的换页内存0.5,这个时候会在达到极限400mb之前,会把内存中的200MB进行转移到磁盘中。从而达到稳健的运行。

可以通过设置 vm_memory_high_watermark_paging_ratio 来进行调整

vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7(设置小于1的值)

为什么设置小于1,以为你如果你设置为1的阈值。内存都已经达到了极限了。你在去换页意义不是很大了。

4.4 RabbitMQ的磁盘预警

当磁盘的剩余空间低于确定的阈值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃。

默认情况下:磁盘预警为50MB的时候会进行预警。表示当前磁盘空间第50MB的时候会阻塞生产者并且停止内存消息换页到磁盘的过程。 这个阈值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第一次检查是:60MB ,第二检查可能就是1MB,就会出现警告。

通过命令方式修改如下:

rabbitmqctl set_disk_free_limit  <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit  <fraction>
disk_limit:固定单位 KB MB GB
fraction :是相对阈值,建议范围在:1.0~2.0之间。(相对于内存)

通过配置文件配置如下:

disk_free_limit.relative = 3.0
disk_free_limit.absolute = 50mb