十五、微服务进阶之Core模块(feign 和 hystrix)

十五、微服务进阶之Core模块(feign 和 hystrix)

功能介绍

上文主要做了一个全局数据格式转换的功能型配置,此模块介绍 feign 和 hystrix 的核心配置

feign

—— feign 接口的实现底层原理还是 http,调用方会根据 @FeignClient 的[name]获取到对应的服务名,然后在注册服务中心列表中找到对应的服务获取其具体的地址信息(ip:port),然后根据[path]拼接出完整的请求路径,根据设置的各种超时时间设置http请求的超时时间,最后发送 http 请求完成请求;

—— 这里只是简述,feign 的实现原理更加复杂,还涉及到解码和编码器(数据格式转换)、错误解码器、请求拦截器等;这里不做详细介绍,原理图如下

hystrix

—— hystrix 就是熔断器,功能就是对请求超时的接口熔断,对服务降级,防止服务雪崩;这里主要介绍其熔断策略:线程池和信号量;先看如下区别对比:

 线程池隔离信号量隔离
线程与调用线程非相同线程与调用线程相同(jetty线程)
开销排队、调度、上下文开销等无线程切换,开销低
异步可以是异步,也可以是同步。看调用的方法同步调用,不支持异步
并发支持支持(最大线程池大小hystrix.threadpool.default.maximumSize)支持(最大信号量上限maxConcurrentRequests)
是否超时支持,可直接返回不支持,如果阻塞,只能通过调用协议(如:socket超时才能返回)
是否支持熔断支持,当线程池到达maxSize后,再请求会触发fallback接口进行熔断支持,当信号量达到maxConcurrentRequests后。再请求会触发fallback
隔离原理每个服务单独用线程池通过信号量的计数器
资源开销大,大量线程的上下文切换,容易造成机器负载高小,只是个计数器

对以上区别对比中的几点简要说明:
—— 线程:信号量使用的是同一个线程,这样获取到的 HttpServletRequest 就是同一个;所以信号量模式下的调用链任意一个节点都是可以获取到最初的请求头(需要重写 feign 的拦截器);线程模式下不可;
—— 异步:如果接口中的某个远程请求不需要其返回值可以使用异步提高整个接口的响应速度,否则需要使用同步的方式获取到返回值然后执行接口后面的逻辑;

场景说明:
1. 信号量模式:适用于内部环境互相调用(同一工程不同微服务之间的调用),因为可以传递请求头;且服务之间耦合度比较大,不同服务之间的数据存在前后关联,请求需要同步执行;
2. 线程池模式:适用于对外部工程提供接口的情况,外部环境请求量往往少于内部服务的请求量;线程池模式适用于请求耗时比较大的请求,可以保证调用方服务请求不阻塞,容器用于处理远程请求的线程资源占用较少,不至于出现服务雪崩;

配置代码

feign 的编码器和解码器;由于架构中存在全局统一异常捕获和处理,所以 feign 的错误解码器使用默认的即可;下面的代码中又出现了 ObjectMapper,同理仍然指的是全局配置中的 ObjectMapper

/**
 * 注意事项:此编码器和解码器基于 json 格式的处理 
 * 对于路径参数不起作用,但是路径参数 feign 会使用默认的方式封装在 map 中后处理 
 * 所以路径参数中对于 jackson 默认序列化方式不支持的参数会报错,提示转换错误 
 * 目前发现不支持的参数对象有 LocalDateTime、LocalDate、LocalTime(使用字符串时间格式传参时不支持) 
 * 所以建议 feign 接口不要使用非 POST、GET 的请求方式,GET 尽量传递实体对象 
 * 当然,如果参数中不包含(LocalDateTime、LocalDate、LocalTime)等不支持的对象类型,可以随意传参;
 **/
@SpringBootConfiguration
@ConditionalOnBean(ObjectMapper.class)
@AutoConfigureAfter(JacksonConfiguration.class)
public class FeignCoder {
    @Resource(name = "objectMapper")
    private ObjectMapper objectMapper;

    /** * feign 接口的编码器,用于处理请求参数,仅针对json数据 */
    @Bean
    public Encoder encoder() {
        HttpMessageConverter converter = new MappingJackson2HttpMessageConverter(objectMapper);
        ObjectFactory messageConverters = () -> new HttpMessageConverters(converter);
        return new SpringEncoder(messageConverters);
    }

    /** * feign 接口的解码器,用于处理接口响应结果,json结果序列化 */
    @Bean
    public Decoder decoder() {
        HttpMessageConverter converter = new MappingJackson2HttpMessageConverter(objectMapper);
        ObjectFactory messageConverters = () -> new HttpMessageConverters(converter);
        return new SpringDecoder(messageConverters);
    }
}

feign 的请求拦截器(封装原请求的头部到下一个请求)

/**
 * feign 全局请求拦截器,主要目的是保留 feign 接口原有请求的头部信息 
 * 注:头部信息传递仅在 信号量 的模式下生效,线程池模式会新开线程丢失原有的请求头
 **/
@SpringBootConfiguration
public class FeignInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate requestTemplate) {
        // 获取请求体 
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        if (ObjectUtil.isNull(requestAttributes)) {
            return;
        }
        HttpServletRequest request = requestAttributes.getRequest();
        if (ObjectUtil.isNull(request)) {
            return;
        }
        Enumeration headerNames = request.getHeaderNames();
        while (ObjectUtil.isNotNull(headerNames) && headerNames.hasMoreElements()) {
            String headerName = headerNames.nextElement();
            requestTemplate.header(headerName, request.getHeader(headerName));
        }
    }
}

hystrix 的配置,由于配置属性譬如 “hystrix.command.default.execution.isolation=SEMAPHORE” 无法被 IDEA 识别,配置参数可能并未生效,所以采用配置类的方式

——1. 自定义配置参数属性,HystrixValueConfiguration

@SpringBootConfiguration
@ConfigurationProperties(prefix = "spring.hystrix", ignoreInvalidFields = true)
@Data
public class HystrixValueConfiguration {
    /** * 熔断策略,自定义默认使用信号量 */
    private HystrixCommandProperties.ExecutionIsolationStrategy strategy = HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE;
    /** * 核心线程池的线程大小,策略为线程时生效 */
    private int threadCoreSize = 100;
    /** * 请求处理的队列最大值,线程和信号量处理最大请求量的数 */
    private int maxRequest = 200;
    /** * hystrix 熔断的超时时间,默认 10000,配置值 > feign.connectTimeout + feign.readTimeout */
    private int timeOut = 60000;
}

——2. 对应的配置参数

"spring.hystrix.max-request":"请求处理的队列最大值,线程和信号量处理最大请求量的数" 
"spring.hystrix.strategy":"熔断策略,自定义默认使用信号量" 
"spring.hystrix.thread-core-size":"核心线程池的线程大小,策略为线程时生效" 
"spring.hystrix.time-out":"hystrix 熔断的超时时间"

——3. 生效上述配置的代码

/**
 * 当前类用来修改 hystrix 的配置,配置文件中针对 hystrix 的配置均是无效 
 * proxyBeanMethods = false不使用代理,每次使用新的bean,即多例模式 * ConditionalOnClass 表示存在参数中的类对象(bean)才会加载当前配置 
 **/
@SpringBootConfiguration(proxyBeanMethods = false)
@EnableConfigurationProperties(HystrixValueConfiguration.class)
@ConditionalOnClass({HystrixCommand.class, HystrixFeign.class})
public class HystrixConfiguration {
    @Autowired
    private HystrixValueConfiguration hystrixValueConfiguration;

    /** * 覆写 Feign.Builder */
    @Bean
    @Scope("prototype")
    @ConditionalOnProperty(name = "feign.hystrix.enabled")
    public Feign.Builder feignHystrixBuilder() {
        HystrixFeign.Builder builder = HystrixFeign.builder();
        SetterFactory setterFactory = (target, method) -> {
            String groupKey = target.name();
            String commandKey = Feign.configKey(target.type(), method);
            // HystrixCommand 熔断器相关属性配置 
            HystrixCommand.Setter setter = HystrixCommand.Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
                    .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
            // HystrixCommandProperties 熔断器相关属性配置 
            HystrixCommandProperties.Setter commandSetter = HystrixCommandProperties.Setter()
                    .withExecutionTimeoutInMilliseconds(hystrixValueConfiguration.getTimeOut());
            if (THREAD.equals(hystrixValueConfiguration.getStrategy())) {
                // HystrixThreadPoolProperties 线程池相关配置 
                HystrixThreadPoolProperties.Setter threadPoolSetter = HystrixThreadPoolProperties.Setter()
                        .withCoreSize(hystrixValueConfiguration.getThreadCoreSize())
                        .withMaxQueueSize(hystrixValueConfiguration.getMaxRequest());
                commandSetter.withExecutionIsolationStrategy(THREAD);
                setter.andThreadPoolPropertiesDefaults(threadPoolSetter);
            } else {
                // 当前架构默认信号量,错误配置值也默认为信号量 
                commandSetter.withExecutionIsolationStrategy(SEMAPHORE)
                        .withExecutionIsolationSemaphoreMaxConcurrentRequests(hystrixValueConfiguration.getMaxRequest());
            }
            return setter.andCommandPropertiesDefaults(commandSetter);
        };
        return builder.setterFactory(setterFactory);
    }
}

核心依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
        <!-- 不传递依赖,只是一个工具依赖 -->
    </dependency>
    <!-- OPEN-FEIGN 依赖 -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
    <!-- 熔断器 hystrix -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
    </dependency>
    <!-- 拦截请求 -->
    <dependency>
        <groupId>jakarta.servlet</groupId>
        <artifactId>jakarta.servlet-api</artifactId>
    </dependency>
</dependencies>

补充说明

报错 “No qualifying bean of type org.springframework.boot.autoconfigure.http.HttpMessageConvert”, 如何解决?
答:这个错误在调用 feign 接口调用时偶尔会出现,这是因为容器中没有 HttpMessageConvert 的 bean, feign 接口返回数据需要当前数据转换器;如果已经注入了自定义的解码器是不会出现此错误的;所以解决方法就是定义自己的解码器,或者加入下面的代码

@Bean
@ConditionalOnMissingBean
public HttpMessageConverters messageConverters(ObjectProvider<HttpMessageConverter<?>> converters) {
    // 使用默认的数据传输转换器即可 
    return new HttpMessageConverters(converters.orderedStream().collect(Collectors.toList()));
}