Appearance
14重试机制是网络操作的基本保证
在真实的微服务系统中, ZooKeeper、etcd 等服务发现组件一般会独立部署成一个集群,业务服务通过网络连接这些服务发现节点,完成注册和订阅操作。但即使是机房内部的稳定网络,也无法保证两个节点之间的请求一定成功,因此 Dubbo 这类 RPC 框架在稳定性和容错性方面,就受到了比较大的挑战。为了保证服务的可靠性,重试机制就变得必不可少了。
所谓的 "重试机制"就是在请求失败时,客户端重新发起一个一模一样的请求,尝试调用相同或不同的服务端,完成相应的业务操作。能够使用重试机制的业务接口得是"幂等"的,也就是无论请求发送多少次,得到的结果都是一样的,例如查询操作。
核心设计
在上一课时中,我们介绍了 AbstractRegistry 中的 register()/unregister()、subscribe()/unsubscribe() 以及 notify() 等核心操作,详细分析了通过本地缓存 实现的容错功能。其实,这几个核心方法同样也是重试机制的关注点。
dubbo-registry 将重试机制的相关实现放到了 AbstractRegistry 的子类------ FailbackRegistry 中。如下图所示,接入 ZooKeeper、etcd 等开源服务发现组件的 Registry 实现,都继承了 FailbackRegistry,也就都拥有了失败重试的能力。
FailbackRegistry 设计核心是:覆盖了 AbstractRegistry 中 register()/unregister()、subscribe()/unsubscribe() 以及 notify() 这五个核心方法,结合前面介绍的时间轮,实现失败重试的能力;真正与服务发现组件的交互能力则是放到了 doRegister()/doUnregister()、doSubscribe()/doUnsubscribe() 以及 doNotify() 这五个抽象方法中,由具体子类实现。这是典型的模板方法模式的应用。
核心字段介绍
分析一个实现类的第一步就是了解其核心字段,那 FailbackRegistry 的核心字段有哪些呢?
retryTimer(HashedWheelTimer 类型):用于定时执行失败重试操作的时间轮。
retryPeriod(int 类型):重试操作的时间间隔。
failedRegistered(ConcurrentMap<URL, FailedRegisteredTask>类型):注册失败的 URL 集合,其中 Key 是注册失败的 URL,Value 是对应的重试任务。
failedUnregistered(ConcurrentMap<URL, FailedUnregisteredTask>类型):取消注册失败的 URL 集合,其中 Key 是取消注册失败的 URL,Value 是对应的重试任务。
failedSubscribed(ConcurrentMap<Holder, FailedSubscribedTask>类型):订阅失败 URL 集合,其中 Key 是订阅失败的 URL + Listener 集合,Value 是相应的重试任务。
failedUnsubscribed(ConcurrentMap<URL, Set >类型):取消订阅失败的 URL 集合,其中 Key 是取消订阅失败的 URL + Listener 集合,Value 是相应的重试任务。
failedNotified(ConcurrentMap<Holder, FailedNotifiedTask>类型):通知失败的 URL 集合,其中 Key 是通知失败的 URL + Listener 集合,Value 是相应的重试任务。
在 FailbackRegistry 的构造方法中,首先会调用父类 AbstractRegistry 的构造方法完成本地缓存相关的初始化操作,然后从传入的 URL 参数中获取重试操作的时间间隔(即retry.period 参数)来初始化 retryPeriod 字段,最后初始化 retryTimer****时间轮。整个代码比较简单,这里就不展示了。
核心方法实现分析
FailbackRegistry 对 register()/unregister() 方法和 subscribe()/unsubscribe() 方法的具体实现非常类似,所以这里我们就只介绍其中register() 方法的具体实现流程。
根据 registryUrl 中 accepts 参数指定的匹配模式,决定是否接受当前要注册的 Provider URL。
调用父类 AbstractRegistry 的 register() 方法,将 Provider URL 写入 registered 集合中。
调用 removeFailedRegistered() 方法和 removeFailedUnregistered() 方法,将该 Provider URL 从 failedRegistered 集合和 failedUnregistered 集合中删除,并停止相关的重试任务。
调用 doRegister() 方法,与服务发现组件进行交互。该方法由子类实现,每个子类只负责接入一个特定的服务发现组件。
在 doRegister() 方法出现异常的时候,会根据 URL 参数以及异常的类型,进行分类处理:待注册 URL 的 check 参数为 true(默认值为 true);待注册的 URL 不是 consumer 协议;registryUrl 的 check 参数也为 true(默认值为 true)。若满足这三个条件或者抛出的异常为 SkipFailbackWrapperException,则直接抛出异常。否则,就会创建重试任务并添加到 failedRegistered 集合中。
明确 register() 方法的核心流程之后,我们再来看 register() 方法的具体代码实现:
java
public void register(URL url) {
if (!acceptable(url)) {
logger.info("..."); // 打印相关的提示日志
return;
}
super.register(url); // 完成本地文件缓存的初始化
// 清理failedRegistered集合和failedUnregistered集合,并取消相关任务
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
doRegister(url); // 与服务发现组件进行交互,具体由子类实现
} catch (Exception e) {
Throwable t = e;
// 检测check参数,决定是否直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY,
true) && url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof
SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register");
}
// 如果不抛出异常,则创建失败重试的任务,并添加到failedRegistered集合中
addFailedRegistered(url);
}
}
从以上代码可以看出,当 Provider 向 Registry 注册 URL 的时候,如果注册失败,且未设置 check 属性,则创建一个定时任务,添加到时间轮中。
下面我们再来看看创建并添加这个重试任务的相关方法------addFailedRegistered() 方法,具体实现如下:
java
private void addFailedRegistered(URL url) {
FailedRegisteredTask oldOne = failedRegistered.get(url);
if (oldOne != null) { // 已经存在重试任务,则无须创建,直接返回
return;
}
FailedRegisteredTask newTask = new FailedRegisteredTask(url,
this);
oldOne = failedRegistered.putIfAbsent(url, newTask);
if (oldOne == null) {
// 如果是新建的重试任务,则提交到时间轮中,等待retryPeriod毫秒后执行
retryTimer.newTimeout(newTask, retryPeriod,
TimeUnit.MILLISECONDS);
}
}
重试任务
FailbackRegistry.addFailedRegistered() 方法中创建的 FailedRegisteredTask 任务以及其他的重试任务,都继承了 AbstractRetryTask 抽象类,如下图所示:
在 AbstractRetryTask 中维护了当前任务关联的 URL、当前重试的次数等信息,在其 run() 方法中,会根据重试 URL 中指定的重试次数(retry.times 参数,默认值为 3)、任务是否被取消以及时间轮的状态,决定此次任务的 doRetry() 方法是否正常执行。
java
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) { // 检测定时任务状态和时间轮状态
return;
}
if (times > retryTimes) { // 检查重试次数
logger.warn("...");
return;
}
try {
doRetry(url, registry, timeout); // 执行重试
} catch (Throwable t) {
reput(timeout, retryPeriod); // 重新添加定时任务,等待重试
}
}
如果任务的 doRetry() 方法执行出现异常,AbstractRetryTask 会通过 reput() 方法将当前任务重新放入时间轮中,并递增当前任务的执行次数。
java
protected void reput(Timeout timeout, long tick) {
if (timeout == null) { // 边界检查
throw new IllegalArgumentException();
}
Timer timer = timeout.timer(); // 检查定时任务
if (timer.isStop() || timeout.isCancelled() || isCancel()) {
return;
}
times++; // 递增times
// 添加定时任务
timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
}
AbstractRetryTask 将 doRetry() 方法作为抽象方法,留给子类实现具体的重试逻辑,这也是模板方法的使用。
在子类 FailedRegisteredTask 的 doRetry() 方法实现中,会再次执行关联 Registry 的 doRegister() 方法,完成与服务发现组件交互。如果注册成功,则会调用 removeFailedRegisteredTask() 方法将当前关联的 URL 以及当前重试任务从 failedRegistered 集合中删除。如果注册失败,则会抛出异常,执行上文介绍的 reput ()方法重试。
java
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
registry.doRegister(url); // 重新注册
registry.removeFailedRegisteredTask(url); // 删除重试任务
}
public void removeFailedRegisteredTask(URL url) {
failedRegistered.remove(url);
}
另外,在 register() 方法入口处,会主动调用 removeFailedRegistered() 方法和 removeFailedUnregistered() 方法来清理指定 URL 关联的定时任务:
java
public void register(URL url) {
super.register(url);
removeFailedRegistered(url); // 清理FailedRegisteredTask定时任务
removeFailedUnregistered(url); // 清理FailedUnregisteredTask定时任务
try {
doRegister(url);
} catch (Exception e) {
addFailedRegistered(url);
}
}
其他核心方法
unregister() 方法以及 unsubscribe() 方法的实现方式与 register() 方法类似,只是调用的 do*() 抽象方法、依赖的 AbstractRetryTask 有所不同而已,这里就不再展开细讲。
你还记得上一课时我们介绍的 AbstractRegistry 通过本地文件缓存实现的容错机制吗?FailbackRegistry.subscribe() 方法在处理异常的时候,会先获取缓存的订阅数据并调用 notify() 方法,如果没有缓存相应的订阅数据,才会检查 check 参数决定是否抛出异常。
通过上一课时对 AbstractRegistry.notify() 方法的介绍,我们知道其核心逻辑之一就是回调 NotifyListener。下面我们就来看一下 FailbackRegistry 对 notify() 方法的覆盖:
java
protected void notify(URL url, NotifyListener listener,
List<URL> urls) {
... // 检查url和listener不为空(略)
try {
// FailbackRegistry.doNotify()方法实际上就是调用父类
// AbstractRegistry.notify()方法,没有其他逻辑
doNotify(url, listener, urls);
} catch (Exception t) {
// doNotify()方法出现异常,则会添加一个定时任务
addFailedNotified(url, listener, urls);
}
}
addFailedNotified() 方法会创建相应的 FailedNotifiedTask 任务,添加到 failedNotified 集合中,同时也会添加到时间轮中等待执行。如果已存在相应的 FailedNotifiedTask 重试任务,则会更新任务需要处理的 URL 集合。
在 FailedNotifiedTask 中维护了一个 URL 集合,用来记录当前任务一次运行需要通知的 URL,每执行完一次任务,就会清空该集合,具体实现如下:
java
protected void doRetry(URL url, FailbackRegistry registry,
Timeout timeout) {
// 如果urls集合为空,则会通知所有Listener,该任务也就啥都不做了
if (CollectionUtils.isNotEmpty(urls)) {
listener.notify(urls);
urls.clear();
}
reput(timeout, retryPeriod); // 将任务重新添加到时间轮中等待执行
}
从上面的代码可以看出,FailedNotifiedTask 重试任务一旦被添加,就会一直运行下去,但真的是这样吗?在 FailbackRegistry 的 subscribe()、unsubscribe() 方法中,可以看到 removeFailedNotified() 方法的调用,这里就是清理 FailedNotifiedTask 任务的地方。我们以 FailbackRegistry.subscribe() 方法为例进行介绍:
java
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener); // 关注这个方法
try {
doSubscribe(url, listener);
} catch (Exception e) {
addFailedSubscribed(url, listener);
}
}
// removeFailedSubscribed()方法中会清理FailedSubscribedTask、FailedUnsubscribedTask、FailedNotifiedTask三类定时任务
private void removeFailedSubscribed(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener); // 清理FailedSubscribedTask
FailedSubscribedTask f = failedSubscribed.remove(h);
if (f != null) {
f.cancel();
}
removeFailedUnsubscribed(url, listener);// 清理FailedUnsubscribedTask
removeFailedNotified(url, listener); // 清理FailedNotifiedTask
}
介绍完 FailbackRegistry 中最核心的注册/订阅实现之后,我们再来关注其实现的恢复功能,也就是 recover() 方法。该方法会直接通过 FailedRegisteredTask 任务处理 registered 集合中的全部 URL,通过 FailedSubscribedTask 任务处理 subscribed 集合中的 URL 以及关联的 NotifyListener。
FailbackRegistry 在生命周期结束时,会调用自身的 destroy() 方法,其中除了调用父类的 destroy() 方法之外,还会调用时间轮(即 retryTimer 字段)的 stop() 方法,释放时间轮相关的资源。
总结
本课时重点介绍了 AbstractRegistry 的实现类------FailbackRegistry 的核心实现,它主要是在 AbstractRegistry 的基础上,提供了重试机制。具体方法就是通过之前课时介绍的时间轮,在 register()/ unregister()、subscribe()/ unsubscribe() 等核心方法失败时,添加重试定时任务,实现重试机制,同时也添加了相应的定时任务清理逻辑。