博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
熔断器 Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑
阅读量:6083 次
发布时间:2019-06-20

本文共 23021 字,大约阅读时间需要 76 分钟。

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-first-run/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本


???关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。
  6. 掘金 Java 群 :217878901

1. 概述

本文主要分享 Hystrix 命令执行(一)之正常执行逻辑

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 执行命令整体流程如下图:

FROM

  • 框 :Hystrix 命令执行的过程。
  • 圈 :本文分享的部分 —— 正常执行逻辑。

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG
  • 程序猿DD ——
  • 周立 ——
  • 两书齐买,京东包邮。

2. #applyHystrixSemantics(...)

在 里,我们看到 #toObservable() 方法里的第 11 至 19 行,当缓存特性未开启,或者缓存未命中时,使用 applyHystrixSemantics 传入 Observable#defer(...) 方法,声明执行命令的 Observable。

创建 applyHystrixSemantics 变量,代码如下 :

// `AbstractCommand#toObservable()` 方法  1: final Func0
> applyHystrixSemantics = new Func0
>() { 2: @Override 3: public Observable
call() { 4: // commandState 处于 UNSUBSCRIBED 时,不执行命令 5: if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { 6: return Observable.never(); 7: } 8: // 获得 执行Observable 9: return applyHystrixSemantics(_cmd); 10: } 11: };复制代码
  • 第 5 至 7 行 :当 commandState 处于 UNSUBSCRIBED 时,不执行命令。
  • 第 9 行 :调用 #applyHystrixSemantics(...) 方法,获得执行 Observable 。

#applyHystrixSemantics(...) 方法,代码如下 :

1: private Observable
applyHystrixSemantics(final AbstractCommand
_cmd) { 2: // TODO 【2003】【HOOK】 3: // mark that we're starting execution on the ExecutionHook 4: // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent 5: executionHook.onStart(_cmd); 6: 7: /* determine if we're allowed to execute */ 8: if (circuitBreaker.attemptExecution()) { 9: // 获得 信号量 10: final TryableSemaphore executionSemaphore = getExecutionSemaphore(); 11: 12: // 信号量释放Action 13: final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); 14: final Action0 singleSemaphoreRelease = new Action0() { 15: @Override 16: public void call() { 17: if (semaphoreHasBeenReleased.compareAndSet(false, true)) { 18: executionSemaphore.release(); 19: } 20: } 21: }; 22: 23: // TODO 【2011】【Hystrix 事件机制】 24: final Action1
markExceptionThrown = new Action1
() { 25: @Override 26: public void call(Throwable t) { 27: eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); 28: } 29: }; 30: 31: // 信号量 获得 32: if (executionSemaphore.tryAcquire()) { 33: try { 34: // 标记 executionResult 调用开始时间 35: /* used to track userThreadExecutionTime */ 36: executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); 37: 38: // 获得 执行Observable 39: return executeCommandAndObserve(_cmd) 40: .doOnError(markExceptionThrown) 41: .doOnTerminate(singleSemaphoreRelease) 42: .doOnUnsubscribe(singleSemaphoreRelease); 43: } catch (RuntimeException e) { 44: return Observable.error(e); 45: } 46: } else { 47: return handleSemaphoreRejectionViaFallback(); 48: } 49: } else { 50: return handleShortCircuitViaFallback(); 51: } 52: }复制代码
  • 第 5 行 :TODO 【2003】【HOOK】
  • 第 8 行 :TODO 【2012】【链路健康度】
  • 第 10 行 :调用 #getExecutionSemaphore() 方法,获得信号量( TryableSemaphore )对象,在 详细解析。
  • 第 13 至 21 行 :信号量释放 Action ,用于下面【执行命令 Observable】的 #doOnTerminate(Action)#doOnUnsubscribe(Action) 方法( 见第 41 至 42 行 )。
  • 第 24 至 29 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 32 行 :调用 TryableSemaphore#tryAcquire() 方法,信号量( TryableSemaphore )使用成功,在 详细解析。
  • 第 36 行 :标记 executionResult调用开始时间。
  • 第 39 行 :调用 #executeCommandAndObserve() 方法,获得【执行命令 Observable】。在 详细解析。
  • 第 43 至 45 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。
  • 第 46 至 48 行 :信号量( TryableSemaphore )使用失败,调用 #handleSemaphoreRejectionViaFallback() 方法,处理信号量拒绝的失败回退逻辑,在 详细解析。
  • 第 49 至 51 行 :链路处于熔断状态,调用 #handleShortCircuitViaFallback() 方法,处理链路熔断的失败回退逻辑,在 详细解析。

3. TryableSemaphore

com.netflix.hystrix.AbstractCommand.TryableSemaphore ,Hystrix 定义的信号量接口。代码如下 :

interface TryableSemaphore {        boolean tryAcquire();        void release();        int getNumberOfPermitsUsed();}复制代码
  • 从 API 上,Java 自带的 java.util.concurrent.Semaphore 都能满足,为什么不使用它呢?继续一起往下看。

TryableSemaphore 共有两个子类实现 :

  • TryableSemaphoreNoOp
  • TryableSemaphoreActual

3.1 TryableSemaphoreNoOp

com.netflix.hystrix.AbstractCommand.TryableSemaphoreNoOp无操作的信号量。代码如下 :

/* package */static class TryableSemaphoreNoOp implements TryableSemaphore {    public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();    @Override    public boolean tryAcquire() {        return true;    }    @Override    public void release() {     }    @Override    public int getNumberOfPermitsUsed() {        return 0;    }}复制代码
  • 从实现上看,#tryAcquire() 方法,每次都返回的是 true#release() 方法,无任何操作。这个是为什么?在 Hystrix 里提供了两种执行隔离策略
    • Thread ,该方式不使用信号量,因此使用 TryableSemaphoreNoOp ,这样每次调用 #tryAcquire() 都能返回 true 。在 详细解析该方式。
    • Semaphore ,该方式使用信号量,因此使用 TryableSemaphoreActual ,这样每次调用 #tryAcquire() 根据情况返回 true / false 。在 详细解析。

3.2 TryableSemaphoreActual

com.netflix.hystrix.AbstractCommand.TryableSemaphoreActual真正的的信号量实现。不过实际上,TryableSemaphoreActual 更加像一个计数器。代码如下 :

/* package */static class TryableSemaphoreActual implements TryableSemaphore {    protected final HystrixProperty
numberOfPermits; private final AtomicInteger count = new AtomicInteger(0); public TryableSemaphoreActual(HystrixProperty
numberOfPermits) { this.numberOfPermits = numberOfPermits; } @Override public boolean tryAcquire() { int currentCount = count.incrementAndGet(); if (currentCount > numberOfPermits.get()) { count.decrementAndGet(); return false; } else { return true; } } @Override public void release() { count.decrementAndGet(); } @Override public int getNumberOfPermitsUsed() { return count.get(); }}复制代码
  • numberOfPermits 属性,信号量上限com.netflix.hystrix.strategy.properties.HystrixProperty 是一个接口,当其使用类似 com.netflix.hystrix.strategy.properties.archaius.IntegerDynamicProperty 动态属性的实现时,可以实现动态调整信号量的上限,这就是上文提到的为什么不使用 java.util.concurrent.Semaphore 的原因之一。
  • count 属性,信号量使用数量。?,这是为什么说 TryableSemaphoreActual 更加像一个计数器 的原因。
  • 另一个不使用 java.util.concurrent.Semaphore 的原因,TryableSemaphoreActual 无阻塞获取信号量的需求,使用 AtomicInteger 可以达到更轻量级的实现。

3.3 #getExecutionSemaphore()

调用 #getExecutionSemaphore() 方法,获得信号量对象,代码如下 :

/*** 执行命令(正常执行)信号量映射* KEY :命令名 {
@link #commandKey}*//* each circuit has a semaphore to restrict concurrent fallback execution */protected static final ConcurrentHashMap
executionSemaphorePerCircuit = new ConcurrentHashMap
(); protected TryableSemaphore getExecutionSemaphore() { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) { if (executionSemaphoreOverride == null) { TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name()); if (_s == null) { // 不存在时,创建 TryableSemaphoreActual // we didn't find one cache so setup executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests())); // assign whatever got set (this or another thread) return executionSemaphorePerCircuit.get(commandKey.name()); } else { return _s; } } else { return executionSemaphoreOverride; } } else { // return NoOp implementation since we're not using SEMAPHORE isolation return TryableSemaphoreNoOp.DEFAULT; }}复制代码
  • 根据执行隔离策略不同获取不同的信号量实现 :
    • Thread ,该方式不使用信号量,因此使用 TryableSemaphoreNoOp 。
    • Semaphore ,该方式使用信号量,因此使用 TryableSemaphoreActual 。
      • 相同的 commandKey ,使用相同的 TryableSemaphoreActual 。

4. #executeCommandAndObserve(...)

调用 #executeCommandAndObserve(...) 方法,获得【执行命令 Observable】。代码如下 :

1: private Observable
executeCommandAndObserve(final AbstractCommand
_cmd) { 2: // TODO 【】 3: final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); 4: 5: // TODO 【2007】【executionResult】用途 6: final Action1
markEmits = new Action1
() { 7: @Override 8: public void call(R r) { 9: if (shouldOutputOnNextEvents()) { 10: executionResult = executionResult.addEvent(HystrixEventType.EMIT); 11: eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); 12: } 13: if (commandIsScalar()) { 14: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); 15: eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); 16: executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); 17: eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); 18: circuitBreaker.markSuccess(); 19: } 20: } 21: }; 22: 23: // TODO 【2007】【executionResult】用途 24: final Action0 markOnCompleted = new Action0() { 25: @Override 26: public void call() { 27: if (!commandIsScalar()) { 28: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); 29: eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); 30: executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); 31: eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); 32: circuitBreaker.markSuccess(); 33: } 34: } 35: }; 36: 37: // 失败回退逻辑 Func1 38: final Func1
> handleFallback = new Func1
>() { 39: @Override 40: public Observable
call(Throwable t) { 41: circuitBreaker.markNonSuccess(); 42: Exception e = getExceptionFromThrowable(t); 43: executionResult = executionResult.setExecutionException(e); 44: if (e instanceof RejectedExecutionException) { 45: return handleThreadPoolRejectionViaFallback(e); 46: } else if (t instanceof HystrixTimeoutException) { 47: return handleTimeoutViaFallback(); 48: } else if (t instanceof HystrixBadRequestException) { 49: return handleBadRequestByEmittingError(e); 50: } else { 51: /* 52: * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. 53: */ 54: if (e instanceof HystrixBadRequestException) { 55: eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); 56: return Observable.error(e); 57: } 58: 59: return handleFailureViaFallback(e); 60: } 61: } 62: }; 63: 64: // TODO 【2008】【请求缓存】 65: final Action1
> setRequestContext = new Action1
>() { 66: @Override 67: public void call(Notification
rNotification) { 68: setRequestContextIfNeeded(currentRequestContext); 69: } 70: }; 71: 72: Observable
execution; 73: if (properties.executionTimeoutEnabled().get()) { 74: execution = executeCommandWithSpecifiedIsolation(_cmd) 75: .lift(new HystrixObservableTimeoutOperator
(_cmd)); // 超时 76: } else { 77: execution = executeCommandWithSpecifiedIsolation(_cmd); 78: } 79: 80: return execution.doOnNext(markEmits) 81: .doOnCompleted(markOnCompleted) 82: .onErrorResumeNext(handleFallback) 83: .doOnEach(setRequestContext); 84: }复制代码
  • 第 3 行 :TODO 【2012】【请求上下文】
  • 第 6 至 21 行 :TODO 【2007】【executionResult】用途
  • 第 24 至 35 行 :TODO 【2007】【executionResult】用途
  • 第 38 至 62 行 :失败回退逻辑 Func1 ,在 详细解析。
  • 第 65 至 70 行 :TODO 【2012】【请求上下文】
  • 第 72 至 78 行 :调用 #executeCommandWithSpecifiedIsolation(...) 方法,获得【执行命令 Observable】,在 详细解析。
    • 若执行命令超时特性开启,调用 Observable#lift(HystrixObservableTimeoutOperator) 方法,实现执行命令超时功能。在 详细解析。
  • 第 80 至 83 行 :返回【执行命令 Observable】。

5. #executeCommandWithSpecifiedIsolation(...)

调用 #executeCommandWithSpecifiedIsolation(...) 方法,获得【执行命令 Observable】。代码如下 :

1: private Observable
executeCommandWithSpecifiedIsolation(final AbstractCommand
_cmd) { 2: if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { 3: // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) 4: return Observable.defer(new Func0
>() { 5: @Override 6: public Observable
call() { 7: 8: // 标记 executionResult 执行已发生 9: executionResult = executionResult.setExecutionOccurred(); 10: 11: // 设置 commandState 为 USER_CODE_EXECUTED 12: if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { 13: return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); 14: } 15: 16: // TODO 【2002】【metrics】 17: metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); 18: 19: // TODO 【2009】【执行超时】 20: if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { 21: // the command timed out in the wrapping thread so we will return immediately 22: // and not increment any of the counters below or other such logic 23: return Observable.error(new RuntimeException("timed out before executing run()")); 24: } 25: 26: // 设置 线程状态 为 ThreadState.STARTED 27: if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { 28: // TODO 【2002】【metrics】 29: //we have not been unsubscribed, so should proceed 30: HystrixCounters.incrementGlobalConcurrentThreads(); 31: threadPool.markThreadExecution(); 32: 33: // TODO 【2010】【endCurrentThreadExecutingCommand】 34: // store the command that is being run 35: endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); 36: 37: // 标记 executionResult 使用线程执行 38: executionResult = executionResult.setExecutedInThread(); 39: /** 40: * If any of these hooks throw an exception, then it appears as if the actual execution threw an error 41: */ 42: try { 43: // TODO 【2003】【HOOK】 44: executionHook.onThreadStart(_cmd); 45: executionHook.onRunStart(_cmd); 46: executionHook.onExecutionStart(_cmd); 47: 48: // 获得 执行Observable 49: return getUserExecutionObservable(_cmd); 50: } catch (Throwable ex) { 51: return Observable.error(ex); 52: } 53: } else { 54: //command has already been unsubscribed, so return immediately 55: return Observable.empty(); 56: } 57: } 58: }).doOnTerminate(new Action0() { 59: @Override 60: public void call() { 61: if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { 62: handleThreadEnd(_cmd); 63: } 64: if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { 65: //if it was never started and received terminal, then no need to clean up (I don't think this is possible) 66: } 67: //if it was unsubscribed, then other cleanup handled it 68: } 69: }).doOnUnsubscribe(new Action0() { 70: @Override 71: public void call() { 72: if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { 73: handleThreadEnd(_cmd); 74: } 75: if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { 76: //if it was never started and was cancelled, then no need to clean up 77: } 78: //if it was terminal, then other cleanup handled it 79: } 80: }).subscribeOn(threadPool.getScheduler(new Func0
() { // TODO 芋艿:Scheduler 81: @Override 82: public Boolean call() { 83: return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; 84: } 85: })); 86: } else { 87: return Observable.defer(new Func0
>() { 88: @Override 89: public Observable
call() { 90: // 标记 executionResult 执行已发生 91: executionResult = executionResult.setExecutionOccurred(); 92: 93: // 设置 commandState 为 USER_CODE_EXECUTED 94: if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { 95: return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); 96: } 97: 98: // TODO 【2002】【metrics】 99: metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);100: 101: // TODO 【2010】【endCurrentThreadExecutingCommand】102: // semaphore isolated103: // store the command that is being run104: endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());105: try {106: // TODO 【2003】【HOOK】107: executionHook.onRunStart(_cmd);108: executionHook.onExecutionStart(_cmd);109: 110: // 获得 执行Observable111: return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw112: } catch (Throwable ex) {113: //If the above hooks throw, then use that as the result of the run method114: return Observable.error(ex);115: }116: }117: });118: }119: }复制代码
  • 根据执行隔离策略不同,创建不同的【执行命令 Observable】。仔细对比下,大体逻辑都是相同的,差别在于执行隔离策略Thread 时,使用 RxJava Scheduler 以及对线程的处理。

  • 第 2 至 85 行 :执行隔离策略Thread

    • 第 9 行 :标记 executionResult 执行已发生。
    • 第 12 至 14 行 :设置 commandStateUSER_CODE_EXECUTED 。若设置失败,调用 Observable#error(Exception) 方法返回 Observable 。
    • 第 17 行 :TODO 【2002】【metrics】
    • 第 20 至 24 行 :TODO 【2009】【执行超时】
    • 第 27 行 :设置 threadStateThreadState.STARTED 成功。
      • 第 30 至 31 行 :TODO 【2002】【metrics】
      • 第 35 行 :TODO 【2010】【endCurrentThreadExecutingCommand】
      • 第 38 行 :标记 executionResult 使用线程执行。
      • 第 44 至 46 行 :TODO 【2003】【HOOK】
      • 第 49 行 :调用 #getUserExecutionObservable(...) 方法,创建【执行命令 Observable】。
      • 第 50 至 52 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。
    • 第 53 至 56 行 :设置 threadStateThreadState.STARTED 失败,执行命令此时已经被取消,调用 Observable#empty() 方法返回 Observable 。
    • 第 58 至 68 行 :调用 Observable#doOnTerminate(...) 方法,添加 Action0 。#handleThreadEnd(...) 方法,点击 查看。
    • 第 69 至 79 行 :调用 Observable#doOnUnsubscribe(...) 方法,添加 Action0 。
    • 第 80 至 85 行 :调用 Observable#subscribeOn(Scheduler) 方法,指定 Observable 自身在哪个调度器上执行。
      • RxJava Scheduler ,在 有详细解析。
      • Observable#subscribeOn(Scheduler) ,在 有详细解析。
      • 调用 ThreadPool#getScheduler(Func0<Boolean>) 方法,获得 Hystrix 自定义实现的 RxJava Scheduler ,在 详细解析。
  • 第 86 至 118 行 :执行隔离策略SEMAPHORE

    • 第 91 行 :[ 与第 9 行相同 ]。
    • 第 94 至 96 行 :[ 与第 12 至 14行相同 ]。
    • 第 99 行 :[ 与第 17 行类似 ]。
    • 第 104 行 :[ 与第 35 行相同 ]。
    • 第 107 至 108 行 :[ 与第 45 至 46 行相同 ]。
    • 第 111 行 :[ 与第 49 行相同 ]。
    • 第 112 至 115 行 :[ 与第 50 至 52 行相同 ]。

6. #getUserExecutionObservable(...)

调用 #getUserExecutionObservable(...) 方法,创建【执行命令 Observable】。代码如下 :

1: private Observable
getUserExecutionObservable(final AbstractCommand
_cmd) { 2: Observable
userObservable; 3: 4: try { 5: userObservable = getExecutionObservable(); 6: } catch (Throwable ex) { 7: // the run() method is a user provided implementation so can throw instead of using Observable.onError 8: // so we catch it here and turn it into Observable.error 9: userObservable = Observable.error(ex); 10: } 11: 12: return userObservable 13: .lift(new ExecutionHookApplication(_cmd)) // TODO 【2003】【HOOK】 14: .lift(new DeprecatedOnRunHookApplication(_cmd)); // 已废弃 15: }复制代码
  • 第 5 行 :调用 #getExecutionObservable() 方法,创建【执行命令 Observable】。#getExecutionObservable() 是个抽象方法,代码如下 :

    protected abstract Observable
    getExecutionObservable();复制代码
    • HystrixCommand 实现了该方法,在 详细解析。
  • 第 6 至 10 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。

  • 第 12 至 14 行 :返回【执行命令 Observable】。

    • 第 13 行 :TODO 【2003】【HOOK】

7. #getExecutionObservable()

调用 HystrixCommand#getExecutionObservable() 方法,创建【执行命令 Observable】。代码如下 :

1: @Override  2: final protected Observable
getExecutionObservable() { 3: return Observable.defer(new Func0
>() { 4: @Override 5: public Observable
call() { 6: try { 7: return Observable.just(run()); 8: } catch (Throwable ex) { 9: return Observable.error(ex); 10: } 11: } 12: }).doOnSubscribe(new Action0() { 13: @Override 14: public void call() { 15: // 记录 执行线程 16: // Save thread on which we get subscribed so that we can interrupt it later if needed 17: executionThread.set(Thread.currentThread()); 18: } 19: }); 20: } 21: 22: protected abstract R run() throws Exception;复制代码
  • 第 3 至 11 行 :调用 Observable#defer(Func0<Observable<R>) 方法,创建【执行命令 Observable】。
    • 第 7 行 :调用 #run() 方法,运行正常执逻辑。通过 Observable#just(...) 方法,返回创建【执行命令 Observable】。
  • 第 12 至 19 行 :调用 #doOnSubscribe(...) 方法,添加 Action 。该操作记录执行线程( executionThread ) 。executionThread 用于 HystrixCommand#queue() 方法,返回的 Future 结果,可以调用 Future#cancel(Boolean) 方法,点击 查看该方法。
  • 第 22 行 :#run() 抽象方法,实现该方法,运行正常执逻辑

8. CommandState

com.netflix.hystrix.AbstractCommand.CommandState ,命令状态,代码如下 :

protected enum CommandState {    NOT_STARTED, OBSERVABLE_CHAIN_CREATED, USER_CODE_EXECUTED, UNSUBSCRIBED, TERMINAL}复制代码

状态变迁如下图 :

9. ThreadState

com.netflix.hystrix.AbstractCommand.ThreadState ,线程状态,代码如下 :

protected enum ThreadState {   NOT_USING_THREAD, STARTED, UNSUBSCRIBED, TERMINAL}复制代码

状态变迁如下图 :

666. 彩蛋

对 Hystrix 和 RxJava 慢慢更有感觉了。

柳暗花明又一村。

继续加油!

胖友,分享一波朋友圈可好!

你可能感兴趣的文章
home.php
查看>>
neo4j---删除关系和节点
查看>>
redis分布式锁redisson
查看>>
什么样的企业可以称之为初创企业?
查看>>
Python爬虫之BeautifulSoup
查看>>
《HTML 5与CSS 3权威指南(第3版·下册)》——第20章 使用选择器在页面中插入内容...
查看>>
如何判断自己适不适合做程序员?这几个特点了解一下
查看>>
newinstance()和new有什么区别
查看>>
android下载封装类
查看>>
[node] 用 node-webkit 开发桌面应用
查看>>
Nginx访问控制和虚拟主机
查看>>
report widget not working for external users
查看>>
windows phone 摄像头得到图片是旋转90°
查看>>
Linux--sed使用
查看>>
没有显示器的情况下安装和使用树莓派
查看>>
【android】使用handler更新UI
查看>>
mochiweb 源码阅读(十五)
查看>>
前端面试中的常见的算法问题
查看>>
计算机语言的基本理论
查看>>
nodejs流之行读取器例子
查看>>