RabbitMq高级
一、集群配置
这里就简单说一下思路:
rabbitmq 本身是支持集群的,这点类似redis,rabbitmq主从节点共享的时元数据信息。有一点需要注意,在主节点挂掉时,从节点就只能等待主节点重新启动,不然从节点就处于不可用状态。
那更好的解决办法就是,加入物理集群利用keepalived提供的虚拟IP多主多从热备。
二、分布式事务
分布式事务有多种解决方案,比如2PC/XA,TCC等,也可以通过本地消息表的方式。
2.1 基于MQ的分布式事务整体设计思路
2.2 基于MQ的分布式事务消息的可靠生产问题
2.3 基于MQ的分布式事务消息的可靠生产问题-定时重发
2.4 基于MQ的分布式事务消息的可靠消费
2.5 基于MQ的分布式事务消息的消息重发
2.6 基于MQ的分布式事务消息的死信队列消息转移 + 人工处理
如果死信队列报错就进行人工处理
2.7 分布式事务总结
基于MQ的分布式事务解决方案优点:
1、通用性强 2、拓展方便 3、耦合度低,方案也比较成熟
基于MQ的分布式事务解决方案缺点:
1、基于消息中间件,只适合异步场景 2、消息会延迟处理,需要业务上能够容忍
建议
1、尽量去避免分布式事务 2、尽量将非核心业务做成异步
三、RabbitMq总结
3.1 总结
RabbitMq重点理解什么通道,交换机,队列,以及路由等概念以及各种模式之间区别,从使用角度来通俗简单总结一下:
消息生成者:
配置好connection连接,通过连接获得通道channel,当然如果是在SpringBoot中,通过rabbitTemplate操作就行
使用channel创建交换机Exchange,交换机包含fanout,direct,topic和headers四种类型,利用不同类型的交换机衍生出RabbitMq的各种模式,如发布订阅使用fanout类型的交换机,路由模式使用direct类型的交换机,主题模式使用topic类型的交换机。另外还有DLX死信交换机概念需要注意,当队列绑定了死信交换机,那这个队列就是死信队列,死信队列中存在死信消息时,消息不会像其他队列一样立即删除,而是被转发到死信交换机。
使用channel创建队列queue,队列需要绑定绑定交换机,同时路由模式下需要设置对应的路由。队列也可以设置对应参数,包括队列消息长度,队列消息过期时间,指定死信交换机等等。可以通过RabbitMq的WEB管理界面看到对应的队列Features,包括Lim,DLX,TTL等等。
通过channel发送消息,需要指定对应的交换机,队列或路由,以及对应的消息。如果需要确认消息是否发送成功,可以通过开启ConfirmCallback模式(
spring.rabbitmq.publisher-confirm-type: correlated
),无论成功或失败都会触发,ConfirmCallback回调。
消息消费者:
配置好connection连接配置,通过连接获得通道channel,当然如果是在SpringBoot中,通过rabbitTemplate操作就行
使用channel消费消息,只需要指定队列名。消息消费确认时可以指定是手动ack(
`spring.rabbitmq.listener.simple.acknowledge-mode: manual
),还是自动ack,手动ack就需要每次通过channel.basicAck()去确认,或者channel.basicNack()拒绝,这里有很多操作空间,多数情况可以通过死信队列+手动ack的方式优化消息消费异常的情况
3.2 附录
配置文件
rabbitmq:
addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
# port:
##集群配置 addresses之间用逗号隔开
# addresses: ip:port,ip:port
password: admin
username: 123456
virtual-host: / # 连接到rabbitMQ的vhost
requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s
publisher-confirms: #是否启用 发布确认
publisher-reurns: # 是否启用发布返回
connection-timeout: #连接超时,单位毫秒,0表示无穷大,不超时
cache:
channel.size: # 缓存中保持的channel数量
channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
connection.size: # 缓存的连接数,只有是CONNECTION模式时生效
connection.mode: # 连接工厂缓存模式:CHANNEL 和 CONNECTION
listener:
simple.auto-startup: # 是否启动时自动启动容器
simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
simple.concurrency: # 最小的消费者数量
simple.max-concurrency: # 最大的消费者数量
simple.prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
simple.default-requeue-rejected: # 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒
simple.retry.enabled: # 监听重试是否可用
simple.retry.max-attempts: # 最大重试次数
simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
simple.retry.multiplier: # 应用于上一重试间隔的乘数
simple.retry.max-interval: # 最大重试时间间隔
simple.retry.stateless: # 重试是有状态or无状态
template:
mandatory: # 启用强制信息;默认false
receive-timeout: # receive() 操作的超时时间
reply-timeout: # sendAndReceive() 操作的超时时间
retry.enabled: # 发送重试是否可用
retry.max-attempts: # 最大重试次数
retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
retry.multiplier: # 应用于上一重试间隔的乘数
retry.max-interval: #最大重试时间间隔
常见配置
Spring AMQP的主要对象
通过配置类加载的方式
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
public static final String RECEIVEDLXEXCHANGE="spring-ex";
public static final String RECEIVEDLXQUEUE="spring-qu1";
public static final String RECEIVEDLXROUTINGKEY="aa";
public static final String DIRECTEXCHANGE="spring-ex";
public static final String MDMQUEUE="mdmQueue";
public static final String TOPICEXCHANGE="spring-top";
@Value("${spring.rabbitmq.addresses}")
private String hosts;
@Value("${spring.rabbitmq.username}")
private String userName;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
/* @Value("${rabbit.channelCacheSize}")
private int channelCacheSize;*/
// @Value("${rabbit.port}")
// private int port;
/* @Autowired
private ConfirmCallBackListener confirmCallBackListener;
@Autowired
private ReturnCallBackListener returnCallBackListener;*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(hosts);
cachingConnectionFactory.setUsername(userName);
cachingConnectionFactory.setPassword(password);
// cachingConnectionFactory.setChannelCacheSize(channelCacheSize);
//cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setVirtualHost(virtualHost);
//设置连接工厂缓存模式:
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
//缓存连接数
cachingConnectionFactory.setConnectionCacheSize(3);
//设置连接限制
cachingConnectionFactory.setConnectionLimit(6);
logger.info("连接工厂设置完成,连接地址{}"+hosts);
logger.info("连接工厂设置完成,连接用户{}"+userName);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.setIgnoreDeclarationExceptions(true);
rabbitAdmin.declareBinding(bindingMdmQueue());
//声明topic交换器
rabbitAdmin.declareExchange(directExchange());
logger.info("管理员设置完成");
return rabbitAdmin;
}
@Bean
public RabbitListenerContainerFactory listenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//最小消费者数量
factory.setConcurrentConsumers(10);
//最大消费者数量
factory.setMaxConcurrentConsumers(10);
//一个请求最大处理的消息数量
factory.setPrefetchCount(10);
//
factory.setChannelTransacted(true);
//默认不排队
factory.setDefaultRequeueRejected(true);
//手动确认接收到了消息
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
logger.info("监听者设置完成");
return factory;
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECTEXCHANGE,true,false);
}
@Bean
public Queue mdmQueue(){
Map arguments = new HashMap<>();
// 绑定该队列到私信交换机
arguments.put("x-dead-letter-exchange",RECEIVEDLXEXCHANGE);
arguments.put("x-dead-letter-routing-key",RECEIVEDLXROUTINGKEY);
logger.info("队列交换机绑定完成");
return new Queue(RECEIVEDLXQUEUE,true,false,false,arguments);
}
@Bean
Binding bindingMdmQueue() {
return BindingBuilder.bind(mdmQueue()).to(directExchange()).with("");
}
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMandatory(true);
//发布确认
// rabbitTemplate.setConfirmCallback(confirmCallBackListener);
// 启用发布返回
// rabbitTemplate.setReturnCallback(returnCallBackListener);
logger.info("连接模板设置完成");
return rabbitTemplate;
}
/* @Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPICEXCHANGE,true,false);
}*/
/*
*//**
* @return DirectExchange
*//*
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(RECEIVEDLXEXCHANGE,true,false);
}
*//*
*
* @return Queue
*//*
@Bean
public Queue dlxQueue() {
return new Queue(RECEIVEDLXQUEUE,true);
}
*//*
* @return Binding
*//*
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(RECEIVEDLXROUTINGKEY);
}*/
}