十一、微服务进阶之RabbitMq

十一、微服务进阶之RabbitMq

功能介绍

全局配置 RabbitMq,开启全局消息发送成功确认回调以及消息推送失败回调,同时修改 rabbit 序列化方式保证针对对象数据的存储和读取不乱码

代码展示

自定义 RabbitMq 的配置信息,即连接地址等信息

@ConfigurationProperties(prefix = "spring.rabbit", ignoreInvalidFields = true)
@Data
public class RabbitValueConfiguration {
    /** * 主机资源地址(包含通讯方式、用户名、密码、端口) */
    private String uri;
    /** * rabbit 主机地址 */
    private String host;
    /** * rabbit 服务端口 */
    private int port;
    /** * 登录用户名 */
    private String name;
    /** * 登录用户密码 */
    private String password;
    /** * 当前服务使用的分区地址名 */
    private String virtualHost;
}

对应的配置属性名称

"spring.rabbit.uri": "rabbit 资源地址" 
"spring.rabbit.host": "主机地址" 
"spring.rabbit.port": "服务端口" 
"spring.rabbit.name": "用户名" 
"spring.rabbit.password": "密码" 
"spring.rabbit.virtual-host": "分区名"

Rabbit 的主要配置

注:其中注入的 ObjectMapper 是已经定制化之后的 Bean,定制信息不在当前模块,这里仅展示部分代码

@SpringBootConfiguration
@EnableConfigurationProperties(RabbitValueConfiguration.class)
@Slf4j
public class RabbitMqConfiguration {
    /** * 消息的来源服务,全局设置 */
    private final static String APP_ID = "fc";
    /** * 数据传输类型,json格式 */
    private final static String CONTENT_TYPE = MessageProperties.CONTENT_TYPE_JSON;
    /** * 数据编码格式,utf-8 */
    private final static String CONTENT_ENCODING = StandardCharsets.UTF_8.name();
    @Autowired
    private RabbitValueConfiguration rabbitValueConfiguration;
    /** * 注入全局序列化配置 * 如果未生效,请在本地自行设置后注入 * #@see com.fatcat.core.config.JacksonConfiguration#objectMapper() */
    @Autowired
    private ObjectMapper objectMapper;

    @Bean
    public CachingConnectionFactory connectionFactory() {
        log.info("rabbit connection factory begin to set up...");
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setUri(rabbitValueConfiguration.getUri());
        connectionFactory.setVirtualHost(rabbitValueConfiguration.getVirtualHost());
        // 开启发送消息成功回调,对应配置属性
        spring.rabbitmq.publisher - confirm - type = correlated
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        // 开启推送消息至队列失败回调,对应配置属性
        spring.rabbitmq.publisher - returns = true connectionFactory.setPublisherReturns(Boolean.TRUE);
        log.info("rabbit connection factory set successfully!");
        return connectionFactory;
    }

    /** * 全局设置消息的回调函数 * 如果需要单独设置,请将此bean设置成多例模式 @Scope(value = "prototype") * 然后单独设置回调函数 setConfirmCallback(), setReturnCallback() */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        // 重写发送消息的方法,保证每次发送消息都传递消息id 
        RabbitTemplate rabbitTemplate = new RabbitTemplate() {
            @Override
            public void convertAndSend(@NonNull String exchange, @NonNull String routingKey, @NonNull final Object message) throws AmqpException {
                CorrelationData correlationData = new CorrelationData(IdUtil.fastSimpleUUID());
                this.convertAndSend(exchange, routingKey, message, correlationData);
            }
        };
        rabbitTemplate.setConnectionFactory(connectionFactory());
        // 消息转换器 
        MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
        messagePropertiesConverter.fromMessageProperties(messageProperties(), StandardCharsets.UTF_8.name());
        rabbitTemplate.setMessagePropertiesConverter(messagePropertiesConverter);
        // 设置回调 
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String msgId = ObjectUtil.isNotNull(correlationData) ? correlationData.getId() : "no-id";
            log.warn("消息发送成功:id={}, ack={}, cause={}", msgId, ack, cause);
        });
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.warn("消息被退回:message={}, replyCode={}, replyText={}, exchange={}, routingKey={}", message, replyCode, replyText, exchange, routingKey);
        });
        // 开始强制委托,即回调上述两方法 
        rabbitTemplate.setMandatory(Boolean.TRUE);
        // 设置消息转换(生产者) 
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    /** * 设置rabbit将消息转换成java对象时的处理参数 */
    @Bean
    public MessageProperties messageProperties() {
        // 个性化参数设置
        MessageProperties messageProperties = new MessageProperties();
        // 设置一个标志,判断消息来源等 
        messageProperties.setAppId(APP_ID);
        // 消息的传输格式,json 格式传输 
        messageProperties.setContentType(CONTENT_TYPE);
        // 消息的id 
        messageProperties.setMessageId(IdUtil.fastUUID());
        // 编码格式 
        messageProperties.setContentEncoding(CONTENT_ENCODING);
        // 消息生成时间
        messageProperties.setTimestamp(DateUtil.date());
        return messageProperties;
    }

    /** * rabbit消息序列化配置 */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    /** * 针对消费者的配置 */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        // 设置链接配置 
        factory.setConnectionFactory(connectionFactory());
        // 设置消息转换(消费者) 
        factory.setMessageConverter(messageConverter());
        // 设置确认模式为手工确认
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

常用依赖

<dependencies>
    <!-- 自定义配置参数提示 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
        <!-- 不传递依赖,只是一个工具依赖 -->
    </dependency>
    <!-- 序列化工具, starter-web 项目自带 -->
    <dependency>
        <groupId>com.fasterxml.jackson.datatype</groupId>
        <artifactId>jackson-datatype-jsr310</artifactId>
    </dependency>
    <!-- rabbitmq 依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>
补充说明

1. rabbit 核心依赖包中已经提供了 spring.rabbitmq 的配置属性为什么还要自己封装自己的配置属性?
答:依赖中的配置仍然可以使用,采用依赖中默认的配属属性后只需要直接注入默认的链接工厂即可;其中大部分配置可以直接使用默认的配置参数即可完成,具体如下:

// 开启发送消息成功回调,对应默认配置属性 spring.rabbitmq.publisher-confirm-type=correlated
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
// 开启推送消息至队列失败回调,对应默认配置属性 spring.rabbitmq.publisher-returns=true
connectionFactory.setPublisherReturns(Boolean.TRUE);
// 设置确认模式为手工确认,对应默认配置属性 spring.rabbitmq.listener.simple.acknowledge-mode=manual
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

本文使用自定义配置属性只是为了更加熟悉 rabbit 初始化的流程,前文中的 es 其实也有默认的配置属性;本人只是比较倾向于使用配置类进行配置而已

2. rabbit 默认序列化方式即 SimpleMessageConverter, 为什么还要给 rabbit 单独配置序列化方式?
答:rabbit 对字符串的存储采用对字符串编码成二进制后的方式保存,针对对象的存储是使用 SimpleMessageConverter 的序列化方式后保存;这样消费者对于字符串的读取不会出现问题,但是对于对象数据的读取会出现乱码,无法进行反序列化,这并不是用户所期望的;所以指定序列化方式后,就可以使用对应的反序列化方式转换成对象;博主采用的仍然是 Jackson 的序列化和反序列化方式,只针对特殊类型字段做了处理;
可以看如下两图,图一是指定序列化方式后的数据展示,图二是未指定序列化方式的数据展示;

序列化
序列化(图一)
未序列化
未序列化(图二)

其中 content_type 字段的值在图一中是常见的 application/json, 而在图二中则是 application/x-java-serialized-object 即 SimpleMessageConverter 的序列化类型

3. 不设置序列化方式保存对象数据存取会出现问题,但直接保存字符串没问题。为什么将消息推送至队列前不转换成 json 后推送,而是直接推送整个对象?
答:无论推送的是什么类型的数据,rabbit 都会序列化一次,如果将对象转换成 json 后推送,消费者取出的 json 数据会在原数据中多一对引号,这个时候使用反序列化工具转换就会报错了;所以切记不要向 rabbit 中推送手动序列化后的数据,尽量保证原始数据格式和类型;如果是自定义对象类型,请务必实现 Serializable 接口并生成唯一 serialVersionUID;这些都是保证可以正确存取数据的必要条件