Hystrix熔断

    科技2022-07-21  89

    首先查看EnableCircuitBreaker注解,通过Import注解导入了EnableCircuitBreakerImportSelector(SpringFactoryImportSelector的子类,实现了ImportSelector接口),当Spring容器启动时,会调用SpringFactoryImportSelector.selectImports方法将需要注册的Bean解析出来(当spring.cloud.circuit.breaker.enabled配置的值为false时会放弃解析)。EnableCircuitBreakerImportSelector的作用主要是导入HystrixCircuitBreakerConfiguration配置类,从而向Spring容器中注册HystrixCommandAspect及其他的Hystrix需要的类。

    public String[] selectImports(AnnotationMetadata metadata) {     if(!this.isEnabled()) {         return new String[0];     } else {         AnnotationAttributes attributes = AnnotationAttributes.fromMap(metadata.getAnnotationAttributes(this.annotationClass.getName(), true));         /*通过Springboot的SPI机制从CLASSPATH的META-INF/spring.factories文件中加载               org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker对应的value值          org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration*/         ArrayList factories = new ArrayList(new LinkedHashSet(SpringFactoriesLoader.loadFactoryNames(this.annotationClass, this.beanClassLoader)));         if(factories.isEmpty() && !this.hasDefaultFactory()) {         } else {             if(factories.size() > 1) {             }             return (String[])factories.toArray(new String[factories.size()]);         }     } }

    HystrixCommandAspect是一个切面配置类 ,其首先定义了两个切点,所有标注有HystrixCommand或者HystrixCollapser注解的方法都会被代理,然后定义了环绕方法methodsAnnotatedWithHystrixCommand, @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {   //获取aop代理的方法 Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);   //方法上同时配置了HystrixCommand和HystrixCollapser会报错 if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); }   //根据方法上的注解是HystrixCommand还是HystrixCollapser获取不同的MetaHolderFactory ,为创建MetaHolder做准备 MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));   //封装方法的信息,如调用的方法、方法的参数、熔断后调用的方法、HystrixCommand注解的配置等 MetaHolder metaHolder = metaHolderFactory.create(joinPoint); //创建调用者,持有一个命令对象,在合适的时候通过这个命令对象完成具体的业务逻辑 HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) {//执行命令,不是响应式走这个逻辑,默认 result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause(); } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; } private static class CommandMetaHolderFactory extends MetaHolderFactory { @Override public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) { HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);   //根据返回值的类型,设置执行类型,SYNCHRONOUS(同步,默认),ASYNCHRONOUS(异步),OBSERVABLE(响应式) ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType()); MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint); if (isCompileWeaving()) { builder.ajcMethod(getAjcMethodFromTarget(joinPoint)); } return builder.defaultCommandKey(method.getName()) .hystrixCommand(hystrixCommand) .observableExecutionMode(hystrixCommand.observableExecutionMode()) .executionType(executionType) .observable(ExecutionType.OBSERVABLE == executionType) .build(); } } CommandExecutor.execute public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { case SYNCHRONOUS: {//默认同步执行 return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } }

    CommandExecutor.execute方法会根据executionType类型选择执行不同的逻辑,例如castToExecutable(invokable, executionType).execute()最终会执行HystrixCommand.execute(同步执行)方法。execute方法调用了调用queue(异步执行)方法,返回一个 Future 对象,包含着执行结束后返回的单一结果,最后调用Future的get方法获得同步返回结果。在HystrixCommand.queue方法中,其首先调用了AbstractCommand.toObservable方法,创建一个被观察者。

    public R execute() { try { return this.queue().get(); } catch (Exception var2) { throw Exceptions.sneakyThrow(this.decomposeException(var2)); } } public Future<R> queue() { final Future delegate = this.toObservable().toBlocking().toFuture(); Future f = new Future() { public boolean cancel(boolean mayInterruptIfRunning) { if(delegate.isCancelled()) { return false; } else { if(((Boolean)HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()).booleanValue()) { HystrixCommand.this.interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning); } boolean res = delegate.cancel(HystrixCommand.this.interruptOnFutureCancel.get()); if(!HystrixCommand.this.isExecutionComplete() && HystrixCommand.this.interruptOnFutureCancel.get()) { Thread t = (Thread)HystrixCommand.this.executionThread.get(); if(t != null && !t.equals(Thread.currentThread())) { t.interrupt(); } } return res; } } public boolean isCancelled() { return delegate.isCancelled(); } public boolean isDone() { return delegate.isDone(); } public R get() throws InterruptedException, ExecutionException { return delegate.get(); } public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.get(timeout, unit); } }; if(f.isDone()) { try { f.get(); return f; } catch (Exception var6) { Throwable t = this.decomposeException(var6); if(t instanceof HystrixBadRequestException) { return f; } else if(t instanceof HystrixRuntimeException) { HystrixRuntimeException hre = (HystrixRuntimeException)t; } else { throw Exceptions.sneakyThrow(t); } } } else { return f; } } HystrixCommand.toObservable

     

    public Observable<R> toObservable() {   //订阅即将被终止时的监听,无论是正常终止还是异常终止 final Action0 terminateCommandCleanup = new Action0() { public void call() { if(AbstractCommand.this.commandState.compareAndSet(AbstractCommand.CommandState.OBSERVABLE_CHAIN_CREATED, AbstractCommand.CommandState.TERMINAL)) { AbstractCommand.this.handleCommandEnd(false); } else if(AbstractCommand.this.commandState.compareAndSet(AbstractCommand.CommandState.USER_CODE_EXECUTED, AbstractCommand.CommandState.TERMINAL)) { AbstractCommand.this.handleCommandEnd(true); } } };   //取消订阅时的监听 final Action0 unsubscribeCommandCleanup = new Action0() { public void call() { if(AbstractCommand.this.commandState.compareAndSet(AbstractCommand.CommandState.OBSERVABLE_CHAIN_CREATED, AbstractCommand.CommandState.UNSUBSCRIBED)) { if(!AbstractCommand.this.executionResult.containsTerminalEvent()) { AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.CANCELLED, AbstractCommand.this.commandKey); try { AbstractCommand.this.executionHook.onUnsubscribe(AbstractCommand.this); } catch (Throwable var3) { AbstractCommand.logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", var3); } AbstractCommand.this.executionResultAtTimeOfCancellation = AbstractCommand.this.executionResult.addEvent((int)(System.currentTimeMillis() - AbstractCommand.this.commandStartTimestamp), HystrixEventType.CANCELLED); } AbstractCommand.this.handleCommandEnd(false); } else if(AbstractCommand.this.commandState.compareAndSet(AbstractCommand.CommandState.USER_CODE_EXECUTED, AbstractCommand.CommandState.UNSUBSCRIBED)) { if(!AbstractCommand.this.executionResult.containsTerminalEvent()) { AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.CANCELLED, AbstractCommand.this.commandKey); try { AbstractCommand.this.executionHook.onUnsubscribe(AbstractCommand.this); } catch (Throwable var2) { AbstractCommand.logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", var2); } AbstractCommand.this.executionResultAtTimeOfCancellation = AbstractCommand.this.executionResult.addEvent((int)(System.currentTimeMillis() - AbstractCommand.this.commandStartTimestamp), HystrixEventType.CANCELLED); } AbstractCommand.this.handleCommandEnd(true); } } }; final Func0 applyHystrixSemantics = new Func0() { public Observable<R> call() {   // 当commandState处于UNSUBSCRIBED时,不执行命令,否则调用applyHystrixSemantics,返回执行命令的Observable, return ((AbstractCommand.CommandState)AbstractCommand.this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED)?Observable.never():AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this); } }; final Func1 wrapWithAllOnNextHooks = new Func1() { public R call(R r) { Object afterFirstApplication = r; try { afterFirstApplication = AbstractCommand.this.executionHook.onComplete(AbstractCommand.this, r); } catch (Throwable var5) { AbstractCommand.logger.warn("Error calling HystrixCommandExecutionHook.onComplete", var5); } try { return AbstractCommand.this.executionHook.onEmit(AbstractCommand.this, afterFirstApplication); } catch (Throwable var4) { AbstractCommand.logger.warn("Error calling HystrixCommandExecutionHook.onEmit", var4); return afterFirstApplication; } } };   //正常终止时的监听 final Action0 fireOnCompletedHook = new Action0() { public void call() { try { AbstractCommand.this.executionHook.onSuccess(AbstractCommand.this); } catch (Throwable var2) { AbstractCommand.logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", var2); } } };   // return Observable.defer(new Func0() { public Observable<R> call() {   //CAS操作,保证命令只执行一次 if(!AbstractCommand.this.commandState.compareAndSet(AbstractCommand.CommandState.NOT_STARTED, AbstractCommand.CommandState.OBSERVABLE_CHAIN_CREATED)) { } else { AbstractCommand.this.commandStartTimestamp = System.currentTimeMillis(); //是否可以缓存请求结果 boolean requestCacheEnabled = AbstractCommand.this.isRequestCachingEnabled(); String cacheKey = AbstractCommand.this.getCacheKey(); if(requestCacheEnabled) {   //从缓存中获取结果 HystrixCommandResponseFromCache hystrixObservable = (HystrixCommandResponseFromCache)AbstractCommand.this.requestCache.get(cacheKey); if(hystrixObservable != null) { AbstractCommand.this.isResponseFromCache = true; return AbstractCommand.this.handleRequestCacheHitAndEmitValues(hystrixObservable, AbstractCommand.this); } } //申明执行命令的Observable Observable hystrixObservable1 = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks); Observable afterCache; if(requestCacheEnabled && cacheKey != null) { HystrixCachedObservable toCache = HystrixCachedObservable.from(hystrixObservable1, AbstractCommand.this);   //保存缓存的结果 HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache)AbstractCommand.this.requestCache.putIfAbsent(cacheKey, toCache); if(fromCache != null) { toCache.unsubscribe(); AbstractCommand.this.isResponseFromCache = true; return AbstractCommand.this.handleRequestCacheHitAndEmitValues(fromCache, AbstractCommand.this); } afterCache = toCache.toObservable(); } else { afterCache = hystrixObservable1; } return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook); } } }); }

     在toObservable方法中,通过Observable.defer方法生成了Observable对象,并定义了Observable对象的各种动作的回调。其中返回的Observable对象又是通过Observable.defer(applyHystrixSemantics)生成的,所以重点看applyHystrixSemantics方法。如果已经熔断,或者获取信号量失败(仅对信号量限流起作用),执行Fallback方法,获取成功调用 executeCommandAndObserve 获取命令执行的Observable

    AbstractCommand.applyHystrixSemantics private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) { this.executionHook.onStart(_cmd); //如果断路器没有打开或者可以处于半开状态(即没有熔断) if(this.circuitBreaker.allowRequest()) { /*获得信号量,根据不同的隔离策略,获取不同的信号量,Thread隔离选择TryableSemaphoreNoOp(其tryAcquire方法永远返回true); Semaphore隔离策略选择TryableSemaphoreActual(tryAcquire方法根据实际情况返回true/false)*/ final AbstractCommand.TryableSemaphore executionSemaphore = this.getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);   //设置命令执行完毕的行为方法,即释放信号量 Action0 singleSemaphoreRelease = new Action0() { public void call() { if(semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } };   //设置命令执行出错的行为方法 Action1 markExceptionThrown = new Action1() { public void call(Throwable t) { AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, AbstractCommand.this.commandKey); } };   //调用tryAcquire获取信号量 if(executionSemaphore.tryAcquire()) { try { this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis()); return this.executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException var7) { return Observable.error(var7); } } else {//如果信号量满了,调用降级方法 return this.handleSemaphoreRejectionViaFallback(); } } else {//熔断直接服务降级,如果配置了fallback方法,则执行fallback方法 return this.handleShortCircuitViaFallback(); } }

    AbstractCommand.executeCommandAndObserve

    private Observable<R> executeCommandAndObserve(AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); // doOnNext中的回调。即命令执行之前执行的操作 Action1 markEmits = new Action1() { public void call(R r) { if(AbstractCommand.this.shouldOutputOnNextEvents()) { AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.addEvent(HystrixEventType.EMIT); AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.EMIT, AbstractCommand.this.commandKey); } if(AbstractCommand.this.commandIsScalar()) { long latency = System.currentTimeMillis() - AbstractCommand.this.executionResult.getStartTimestamp(); AbstractCommand.this.eventNotifier.markCommandExecution(AbstractCommand.this.getCommandKey(), (ExecutionIsolationStrategy)AbstractCommand.this.properties.executionIsolationStrategy().get(), (int)latency, AbstractCommand.this.executionResult.getOrderedList()); AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.SUCCESS, AbstractCommand.this.commandKey); AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.addEvent((int)latency, HystrixEventType.SUCCESS); AbstractCommand.this.circuitBreaker.markSuccess(); } } }; // doOnCompleted中的回调。命令执行完毕后执行的操作 Action0 markOnCompleted = new Action0() { public void call() { if(!AbstractCommand.this.commandIsScalar()) { long latency = System.currentTimeMillis() - AbstractCommand.this.executionResult.getStartTimestamp(); AbstractCommand.this.eventNotifier.markCommandExecution(AbstractCommand.this.getCommandKey(), (ExecutionIsolationStrategy)AbstractCommand.this.properties.executionIsolationStrategy().get(), (int)latency, AbstractCommand.this.executionResult.getOrderedList()); AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.SUCCESS, AbstractCommand.this.commandKey); AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.addEvent((int)latency, HystrixEventType.SUCCESS); AbstractCommand.this.circuitBreaker.markSuccess(); } } }; // onErrorResumeNext中的回调。命令执行失败后的回退逻辑 Func1 handleFallback = new Func1() { public Observable<R> call(Throwable t) { Exception e = AbstractCommand.this.getExceptionFromThrowable(t); AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutionException(e); if(e instanceof RejectedExecutionException) { return AbstractCommand.this.handleThreadPoolRejectionViaFallback(e); } else if(t instanceof HystrixTimeoutException) { return AbstractCommand.this.handleTimeoutViaFallback(); } else if(t instanceof HystrixBadRequestException) { return AbstractCommand.this.handleBadRequestByEmittingError(e); } else if(e instanceof HystrixBadRequestException) { AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, AbstractCommand.this.commandKey); return Observable.error(e); } else { return AbstractCommand.this.handleFailureViaFallback(e); } } }; // doOnEach中的回调。`Observable`每发射一个数据都会执行这个回调,设置请求上下文 Action1 setRequestContext = new Action1() { public void call(Notification<? super R> rNotification) { AbstractCommand.setRequestContextIfNeeded(currentRequestContext); } }; Observable execution; //若执行命令超时特性开启,调用 Observable.lift 方法实现执行命令超时功能。 if(((Boolean)this.properties.executionTimeoutEnabled().get()).booleanValue()) { execution = this.executeCommandWithSpecifiedIsolation(_cmd).lift(new AbstractCommand.HystrixObservableTimeoutOperator(_cmd)); } else { execution = this.executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted).onErrorResumeNext(handleFallback).doOnEach(setRequestContext); } private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {   //根据隔离策略,执行不同的逻辑 return this.properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD?Observable.defer(new Func0() { public Observable<R> call() { AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutionOccurred(); if(!AbstractCommand.this.commandState.compareAndSet(AbstractCommand.CommandState.OBSERVABLE_CHAIN_CREATED, AbstractCommand.CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + ((AbstractCommand.CommandState)AbstractCommand.this.commandState.get()).name())); } else { AbstractCommand.this.metrics.markCommandStart(AbstractCommand.this.commandKey, AbstractCommand.this.threadPoolKey, ExecutionIsolationStrategy.THREAD); if(AbstractCommand.this.isCommandTimedOut.get() == AbstractCommand.TimedOutStatus.TIMED_OUT) { return Observable.error(new RuntimeException("timed out before executing run()")); } else if(AbstractCommand.this.threadState.compareAndSet(AbstractCommand.ThreadState.NOT_USING_THREAD, AbstractCommand.ThreadState.STARTED)) { HystrixCounters.incrementGlobalConcurrentThreads(); AbstractCommand.this.threadPool.markThreadExecution(); AbstractCommand.this.endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(AbstractCommand.this.getCommandKey()); AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutedInThread(); try { AbstractCommand.this.executionHook.onThreadStart(_cmd); AbstractCommand.this.executionHook.onRunStart(_cmd); AbstractCommand.this.executionHook.onExecutionStart(_cmd); return AbstractCommand.this.getUserExecutionObservable(_cmd); } catch (Throwable var2) { return Observable.error(var2); } } else { return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } } }).doOnTerminate(new Action0() { public void call() { if(AbstractCommand.this.threadState.compareAndSet(AbstractCommand.ThreadState.STARTED, AbstractCommand.ThreadState.TERMINAL)) { AbstractCommand.this.handleThreadEnd(_cmd); } if(AbstractCommand.this.threadState.compareAndSet(AbstractCommand.ThreadState.NOT_USING_THREAD, AbstractCommand.ThreadState.TERMINAL)) { ; } } }).doOnUnsubscribe(new Action0() { public void call() { if(AbstractCommand.this.threadState.compareAndSet(AbstractCommand.ThreadState.STARTED, AbstractCommand.ThreadState.UNSUBSCRIBED)) { AbstractCommand.this.handleThreadEnd(_cmd); } if(AbstractCommand.this.threadState.compareAndSet(AbstractCommand.ThreadState.NOT_USING_THREAD, AbstractCommand.ThreadState.UNSUBSCRIBED)) { ; } } }).subscribeOn(this.threadPool.getScheduler(new Func0() {//设置命令执行的线程池 public Boolean call() { return Boolean.valueOf(((Boolean)AbstractCommand.this.properties.executionIsolationThreadInterruptOnTimeout().get()).booleanValue() && _cmd.isCommandTimedOut.get() == AbstractCommand.TimedOutStatus.TIMED_OUT); } })):Observable.defer(new Func0() { public Observable<R> call() { AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutionOccurred(); if(!AbstractCommand.this.commandState.compareAndSet(AbstractCommand.CommandState.OBSERVABLE_CHAIN_CREATED, AbstractCommand.CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + ((AbstractCommand.CommandState)AbstractCommand.this.commandState.get()).name())); } else { AbstractCommand.this.metrics.markCommandStart(AbstractCommand.this.commandKey, AbstractCommand.this.threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); AbstractCommand.this.endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(AbstractCommand.this.getCommandKey()); try { AbstractCommand.this.executionHook.onRunStart(_cmd); AbstractCommand.this.executionHook.onExecutionStart(_cmd); return AbstractCommand.this.getUserExecutionObservable(_cmd); } catch (Throwable var2) { return Observable.error(var2); } } } }); }

     在AbstractCommand.executeCommandAndObserve中首先定义了不同的回调(doOnNext、doOnCompleted、onErrorResumeNext、doOnEach等),然后继续调用executeCommandWithSpecififiedIsolation获得执行命令的Observable。executeCommandWithSpecififiedIsolation方法根据当前不同的资源隔离策略执行不同的逻辑(THREAD:线程池、SEMAPHORE:信号量),如果是THREAD隔离策略,会调用Observable.subscribeOn设置执行任务的线程池,最后都会调用getExecutionObservable创建执行命令的Observable,getExecutionObservable的实现方法在HystrixCommand(AbstractCommand的继承类之一),主要逻辑就是调用GenericCommand(HystrixCommand的子类)的run方法。

     

    protected final Observable<R> getExecutionObservable() { return Observable.defer(new Func0() { public Observable<R> call() { try { // return Observable.just(HystrixCommand.this.run()); } catch (Throwable var2) { return Observable.error(var2); } } }).doOnSubscribe(new Action0() { public void call() { HystrixCommand.this.executionThread.set(Thread.currentThread()); } }); } @Override protected Object run() throws Exception { LOGGER.debug("execute command: {}", getCommandKey().name()); return process(new Action() { @Override Object execute() { return getCommandAction().execute(getExecutionType()); } }); }

    在GenericCommand.run方法中,调用getCommandAction方法获取了命令对应的CommandAction,并执行execute方法。执行的是MethodExecutionAction.executeWithArgs方法,即通过反射调用目标方法

    @Override public Object executeWithArgs(ExecutionType executionType, Object[] args) throws CommandActionExecutionException { if(ExecutionType.ASYNCHRONOUS == executionType){ Closure closure = AsyncClosureFactory.getInstance().createClosure(metaHolder, method, object, args); return executeClj(closure.getClosureObj(), closure.getClosureMethod()); } return execute(object, method, args); } private Object execute(Object o, Method m, Object... args) throws CommandActionExecutionException { Object result = null; try { m.setAccessible(true); // suppress Java language access if (isCompileWeaving() && metaHolder.getAjcMethod() != null) { result = invokeAjcMethod(metaHolder.getAjcMethod(), o, metaHolder, args); } else { result = m.invoke(o, args); } } catch (IllegalAccessException e) { propagateCause(e); } catch (InvocationTargetException e) { propagateCause(e); } return result; }
    Processed: 0.012, SQL: 8