在微服务架构中,服务与服务之间通过远程调用的方式进行通信,一旦某个被调用的服务发生了故障,其依赖服务也会发生故障,此时就会发生故障的蔓延,最终导致系统瘫痪。Hystrix实现了断路器模式,当某个服务发生故障时,通过断路器的监控,给调用方返回一个错误响应,而不是长时间的等待,这样就不会使得调用方由于长时间得不到响应而占用线程,从而防止故障的蔓延。Hystrix具备服务降级、服务熔断、线程隔离、请求缓存、请求合并及服务监控等强大功能。
这里我们创建一个hystrix-service模块来演示hystrix的常用功能。
主要是配置了端口、注册中心地址及user-service的调用路径。
server: port: 8401 spring: application: name: hystrix-service eureka: client: register-with-eureka: true fetch-registry: true service-url: defaultZone: http://localhost:8001/eureka/ service-url: user-service: http://user-service/此处省略了创建用户类User和统一返回前端的响应类Result,RestTemplate配置,UserService接口的创建,具体的可以参考项目源码
@RestController @RequestMapping("/user") public class UserHystrixController { @Autowired private UserService userService; @GetMapping("/testFallback/{id}") public Result testFallback(@PathVariable Long id) { return userService.getUser(id); } @GetMapping("/testException/{id}") public Result testException(@PathVariable Long id) { return userService.getUserException(id); } @GetMapping("/testCommand/{id}") public Result getUserCommand(@PathVariable Long id) { return userService.getUserCommand(id); } @GetMapping("/testCache/{id}") public Result testCache(@PathVariable Long id) { userService.getUserCache(id); userService.getUserCache(id); userService.getUserCache(id); return new Result("操作成功", 200); } @GetMapping("/testRemoveCache/{id}") public Result testRemoveCache(@PathVariable Long id) { userService.getUserCache(id); userService.removeCache(id); userService.getUserCache(id); return new Result("操作成功", 200); } @GetMapping("/testCollapser") public Result testCollapser() throws ExecutionException, InterruptedException { Future<User> future1 = userService.getUserFuture(1L); Future<User> future2 = userService.getUserFuture(2L); future1.get(); future2.get(); ThreadUtil.safeSleep(200); Future<User> future3 = userService.getUserFuture(3L); future3.get(); return new Result("操作成功", 200); } }在UserHystrixController中添加用于测试服务降级的接口:
@GetMapping("/testFallback/{id}") public Result testFallback(@PathVariable Long id) { return userService.getUser(id); }在UserService中添加调用方法与服务降级方法,方法上需要添加@HystrixCommand注解:
@HystrixCommand(fallbackMethod = "fallbackMethod1") public Result getUser(Long id) { return restTemplate.getForObject(userServiceUrl + "/user/{1}", Result.class, id); } /** * 声明的参数需要包含controller的声明参数 * * @param id * @return */ public Result fallbackMethod1(@PathVariable Long id) { return new Result("服务调用失败", 500); }启动eureka-server、user-service、hystrix-service服务
调用接口进行测试:http://localhost:8401/user/testFallback/1
关闭user-service服务重新测试该接口,发现已经发生了服务降级:
在UserHystrixController中添加测试接口:
@GetMapping("/testCommand/{id}") public Result getUserCommand(@PathVariable Long id) { return userService.getUserCommand(id); }在UserService中添加方式实现功能:
@HystrixCommand(fallbackMethod = "fallbackMethod1", commandKey = "getUserCommand", groupKey = "getUserGroup", threadPoolKey = "getUserThreadPool") public Result getUserCommand(Long id) { return restTemplate.getForObject(userServiceUrl + "/user/{1}", Result.class, id); }在UserHystrixController中添加测试接口:
@GetMapping("/testException/{id}") public Result testException(@PathVariable Long id) { return userService.getUserException(id); }在UserService中添加实现方法,这里忽略了NullPointerException,当id为1时抛出IndexOutOfBoundsException,id为2时抛出NullPointerException:
@HystrixCommand(fallbackMethod = "fallbackMethod2", ignoreExceptions = {NullPointerException.class}) public Result getUserException(Long id) { if (id == 1) { throw new IndexOutOfBoundsException(); } else if (id == 2) { throw new NullPointerException(); } return restTemplate.getForObject(userServiceUrl + "/user/{1}", Result.class, id); } public Result fallbackMethod2(@PathVariable Long id, Throwable e) { LOGGER.error("id {},throwable class:{}", id, e.getClass()); return new Result("服务调用失败", 500); }调用接口进行测试:http://localhost:8401/user/tesException/1
调用接口进行测试:http://localhost:8401/user/tesException/2
当系统并发量越来越大时,我们需要使用缓存来优化系统,达到减轻并发请求线程数,提供响应速度的效果。
在UserHystrixController中添加使用缓存的测试接口,直接调用三次getUserCache方法:
@GetMapping("/testCache/{id}") public Result testCache(@PathVariable Long id) { userService.getUserCache(id); userService.getUserCache(id); userService.getUserCache(id); return new Result("操作成功", 200); }在UserService中添加具有缓存功能的getUserCache方法:
@CacheResult(cacheKeyMethod = "getCacheKey") @HystrixCommand(fallbackMethod = "fallbackMethod1", commandKey = "getUserCache") public Result getUserCache(Long id) { LOGGER.info("getUserCache id:{}", id); return restTemplate.getForObject(userServiceUrl + "/user/{1}", Result.class, id); } /** * 为缓存生成key的方法 * * @return */ public String getCacheKey(Long id) { return String.valueOf(id); }调用接口测试http://localhost:8401/user/testCache/1,这个接口中调用了三次getUserCache方法,但是只打印了一次日志,说明有两次走的是缓存:
在UserHystrixController中添加移除缓存的测试接口,调用一次removeCache方法:
@GetMapping("/testRemoveCache/{id}") public Result testRemoveCache(@PathVariable Long id) { userService.getUserCache(id); userService.removeCache(id); userService.getUserCache(id); return new Result("操作成功", 200); }在UserService中添加具有移除缓存功能的removeCache方法:
@HystrixCommand @CacheRemove(commandKey = "getUserCache", cacheKeyMethod = "getCacheKey") public Result removeCache(Long id) { LOGGER.info("removeCache id:{}", id); return restTemplate.postForObject(userServiceUrl + "/user/delete/{1}", null, Result.class, id); }调用接口测试http://localhost:8401/user/testRemoveCache/1,可以发现有两次查询都走的是接口:
在缓存使用过程中,我们需要在每次使用缓存的请求前后对HystrixRequestContext进行初始化和关闭,否则会出现如下异常:
java.lang.IllegalStateException: Request caching is not available. Maybe you need to initialize the HystrixRequestContext? at com.netflix.hystrix.HystrixRequestCache.get(HystrixRequestCache.java:104) ~[hystrix-core-1.5.18.jar:1.5.18] at com.netflix.hystrix.AbstractCommand$7.call(AbstractCommand.java:478) ~[hystrix-core-1.5.18.jar:1.5.18] at com.netflix.hystrix.AbstractCommand$7.call(AbstractCommand.java:454) ~[hystrix-core-1.5.18.jar:1.5.18]这里我们通过使用过滤器,在每个请求前后初始化和关闭HystrixRequestContext来解决该问题:
@Component @WebFilter(urlPatterns = "/*", asyncSupported = true) public class HystrixRequestContextFilter implements Filter { @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { filterChain.doFilter(servletRequest, servletResponse); } finally { context.close(); } } }微服务系统中的服务间通信,需要通过远程调用来实现,随着调用次数越来越多,占用线程资源也会越来越多。Hystrix中提供了@HystrixCollapser用于合并请求,从而达到减少通信消耗及线程数量的效果。
在UserHystrixController中添加testCollapser方法,这里我们先进行两次服务调用,再间隔200ms以后进行第三次服务调用:
@GetMapping("/testCollapser") public Result testCollapser() throws ExecutionException, InterruptedException { Future<User> future1 = userService.getUserFuture(1L); Future<User> future2 = userService.getUserFuture(2L); future1.get(); future2.get(); ThreadUtil.safeSleep(200); Future<User> future3 = userService.getUserFuture(3L); future3.get(); return new Result("操作成功", 200); }使用@HystrixCollapser实现请求合并,所有对getUserFuture的的多次调用都会转化为对getUserByIds的单次调用:
@HystrixCollapser(batchMethod = "listUsersByIds",collapserProperties = { @HystrixProperty(name = "timerDelayInMilliseconds",value = "100") }) public Future<User> getUserFuture(Long id) { return new AsyncResult<User>() { @Override public User invoke() { Result result = restTemplate.getForObject(userServiceUrl + "/user/{1}", Result.class, id); Map data = (Map) result.getData(); User user = BeanUtil.mapToBean(data, User.class, true); LOGGER.info("getUserById username:{}",user.getUsername()); return user; } }; } @HystrixCommand public List<User> listUsersByIds(List<Long> ids) { LOGGER.info("listUsersByIds:{}",ids); Result result = restTemplate.getForObject(userServiceUrl + "/user/listUsersByIds?ids={1}", Result.class, CollUtil.join(ids, ",")); return (List<User>)result.getData(); }注意:测试之前需要重启user-service服务,因为刚才测试请求缓存把数据删了一个,不然会报错
访问接口测试http://localhost:8401/user/testCollapser,由于我们设置了100毫秒进行一次请求合并,前两次被合并,最后一次自己单独合并了。
实例配置只需要将全局配置中的default换成与之对应的key即可。
hystrix: command: HystrixComandKey: #将default换成HystrixComrnandKey execution: isolation: strategy: THREAD collapser: HystrixCollapserKey: #将default换成HystrixCollapserKey maxRequestsInBatch: 100 threadpool: HystrixThreadPoolKey: #将default换成HystrixThreadPoolKey coreSize: 10