功能介绍
全局配置 RabbitMq,开启全局消息发送成功确认回调以及消息推送失败回调,同时修改 rabbit 序列化方式保证针对对象数据的存储和读取不乱码
代码展示
自定义 RabbitMq 的配置信息,即连接地址等信息
@ConfigurationProperties(prefix = "spring.rabbit", ignoreInvalidFields = true)
@Data
public class RabbitValueConfiguration {
/** * 主机资源地址(包含通讯方式、用户名、密码、端口) */
private String uri;
/** * rabbit 主机地址 */
private String host;
/** * rabbit 服务端口 */
private int port;
/** * 登录用户名 */
private String name;
/** * 登录用户密码 */
private String password;
/** * 当前服务使用的分区地址名 */
private String virtualHost;
}
对应的配置属性名称
"spring.rabbit.uri": "rabbit 资源地址"
"spring.rabbit.host": "主机地址"
"spring.rabbit.port": "服务端口"
"spring.rabbit.name": "用户名"
"spring.rabbit.password": "密码"
"spring.rabbit.virtual-host": "分区名"
Rabbit 的主要配置
注:其中注入的 ObjectMapper 是已经定制化之后的 Bean,定制信息不在当前模块,这里仅展示部分代码
@SpringBootConfiguration
@EnableConfigurationProperties(RabbitValueConfiguration.class)
@Slf4j
public class RabbitMqConfiguration {
/** * 消息的来源服务,全局设置 */
private final static String APP_ID = "fc";
/** * 数据传输类型,json格式 */
private final static String CONTENT_TYPE = MessageProperties.CONTENT_TYPE_JSON;
/** * 数据编码格式,utf-8 */
private final static String CONTENT_ENCODING = StandardCharsets.UTF_8.name();
@Autowired
private RabbitValueConfiguration rabbitValueConfiguration;
/** * 注入全局序列化配置 * 如果未生效,请在本地自行设置后注入 * #@see com.fatcat.core.config.JacksonConfiguration#objectMapper() */
@Autowired
private ObjectMapper objectMapper;
@Bean
public CachingConnectionFactory connectionFactory() {
log.info("rabbit connection factory begin to set up...");
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setUri(rabbitValueConfiguration.getUri());
connectionFactory.setVirtualHost(rabbitValueConfiguration.getVirtualHost());
// 开启发送消息成功回调,对应配置属性
spring.rabbitmq.publisher - confirm - type = correlated
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
// 开启推送消息至队列失败回调,对应配置属性
spring.rabbitmq.publisher - returns = true connectionFactory.setPublisherReturns(Boolean.TRUE);
log.info("rabbit connection factory set successfully!");
return connectionFactory;
}
/** * 全局设置消息的回调函数 * 如果需要单独设置,请将此bean设置成多例模式 @Scope(value = "prototype") * 然后单独设置回调函数 setConfirmCallback(), setReturnCallback() */
@Bean
public RabbitTemplate rabbitTemplate() {
// 重写发送消息的方法,保证每次发送消息都传递消息id
RabbitTemplate rabbitTemplate = new RabbitTemplate() {
@Override
public void convertAndSend(@NonNull String exchange, @NonNull String routingKey, @NonNull final Object message) throws AmqpException {
CorrelationData correlationData = new CorrelationData(IdUtil.fastSimpleUUID());
this.convertAndSend(exchange, routingKey, message, correlationData);
}
};
rabbitTemplate.setConnectionFactory(connectionFactory());
// 消息转换器
MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
messagePropertiesConverter.fromMessageProperties(messageProperties(), StandardCharsets.UTF_8.name());
rabbitTemplate.setMessagePropertiesConverter(messagePropertiesConverter);
// 设置回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String msgId = ObjectUtil.isNotNull(correlationData) ? correlationData.getId() : "no-id";
log.warn("消息发送成功:id={}, ack={}, cause={}", msgId, ack, cause);
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.warn("消息被退回:message={}, replyCode={}, replyText={}, exchange={}, routingKey={}", message, replyCode, replyText, exchange, routingKey);
});
// 开始强制委托,即回调上述两方法
rabbitTemplate.setMandatory(Boolean.TRUE);
// 设置消息转换(生产者)
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
/** * 设置rabbit将消息转换成java对象时的处理参数 */
@Bean
public MessageProperties messageProperties() {
// 个性化参数设置
MessageProperties messageProperties = new MessageProperties();
// 设置一个标志,判断消息来源等
messageProperties.setAppId(APP_ID);
// 消息的传输格式,json 格式传输
messageProperties.setContentType(CONTENT_TYPE);
// 消息的id
messageProperties.setMessageId(IdUtil.fastUUID());
// 编码格式
messageProperties.setContentEncoding(CONTENT_ENCODING);
// 消息生成时间
messageProperties.setTimestamp(DateUtil.date());
return messageProperties;
}
/** * rabbit消息序列化配置 */
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter(objectMapper);
}
/** * 针对消费者的配置 */
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 设置链接配置
factory.setConnectionFactory(connectionFactory());
// 设置消息转换(消费者)
factory.setMessageConverter(messageConverter());
// 设置确认模式为手工确认
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
常用依赖
<dependencies>
<!-- 自定义配置参数提示 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
<!-- 不传递依赖,只是一个工具依赖 -->
</dependency>
<!-- 序列化工具, starter-web 项目自带 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<!-- rabbitmq 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
补充说明
1. rabbit 核心依赖包中已经提供了 spring.rabbitmq 的配置属性为什么还要自己封装自己的配置属性?
答:依赖中的配置仍然可以使用,采用依赖中默认的配属属性后只需要直接注入默认的链接工厂即可;其中大部分配置可以直接使用默认的配置参数即可完成,具体如下:
// 开启发送消息成功回调,对应默认配置属性 spring.rabbitmq.publisher-confirm-type=correlated
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
// 开启推送消息至队列失败回调,对应默认配置属性 spring.rabbitmq.publisher-returns=true
connectionFactory.setPublisherReturns(Boolean.TRUE);
// 设置确认模式为手工确认,对应默认配置属性 spring.rabbitmq.listener.simple.acknowledge-mode=manual
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
本文使用自定义配置属性只是为了更加熟悉 rabbit 初始化的流程,前文中的 es 其实也有默认的配置属性;本人只是比较倾向于使用配置类进行配置而已
2. rabbit 默认序列化方式即 SimpleMessageConverter, 为什么还要给 rabbit 单独配置序列化方式?
答:rabbit 对字符串的存储采用对字符串编码成二进制后的方式保存,针对对象的存储是使用 SimpleMessageConverter 的序列化方式后保存;这样消费者对于字符串的读取不会出现问题,但是对于对象数据的读取会出现乱码,无法进行反序列化,这并不是用户所期望的;所以指定序列化方式后,就可以使用对应的反序列化方式转换成对象;博主采用的仍然是 Jackson 的序列化和反序列化方式,只针对特殊类型字段做了处理;
可以看如下两图,图一是指定序列化方式后的数据展示,图二是未指定序列化方式的数据展示;
其中 content_type 字段的值在图一中是常见的 application/json, 而在图二中则是 application/x-java-serialized-object 即 SimpleMessageConverter 的序列化类型
3. 不设置序列化方式保存对象数据存取会出现问题,但直接保存字符串没问题。为什么将消息推送至队列前不转换成 json 后推送,而是直接推送整个对象?
答:无论推送的是什么类型的数据,rabbit 都会序列化一次,如果将对象转换成 json 后推送,消费者取出的 json 数据会在原数据中多一对引号,这个时候使用反序列化工具转换就会报错了;所以切记不要向 rabbit 中推送手动序列化后的数据,尽量保证原始数据格式和类型;如果是自定义对象类型,请务必实现 Serializable 接口并生成唯一 serialVersionUID;这些都是保证可以正确存取数据的必要条件