Skip to content

第21讲:Cluter插件剖析,你想要的集群模式它都有

在上一课时中介绍的 ConfigurationModule 是一个非常基础的 Module,在后续介绍 CoreModule、TraceModule 时,都会看到它们的 requireModule 集合都包含了 ConfigurationModule。

本课时将介绍 Cluster 模块,该模块也是非常基础的模块,被很多其他模块依赖。我们重点介绍支持单机模式 cluster-standalone-plugin 模块,以及依赖 Zookeeper 的 cluster-zookeeper-plugin 模块。

ClusterModule

在 application.yml 配置文件中,我们可以看到 ClusterModule 相关的配置,如下图所示,其中包含了多个 Cluster 实现模块的配置。

全部 Cluster 模块使用的 ModuleDefine 实现类 ------ ClusterModule 位于在 server-core 这个模块中,如下图所示,在 server-core 模块的 SPI 文件中指定了多个 ModuleDefine 实现,其中就包含 ClusterModule。

毫无疑问,ClusterModule 的 name() 方法固定返回 "cluster" 字符串,与上图展示的 application.yml 配置文件相对应。其 services() 方法中返回了两个 Service 接口的子接口类型 ------ ClusterRegister、ClusterNodesQuery,所以,ClusterModule 底层的 ModuleProvider 实现需要提供它们的实现类。如上图所示,这两个接口与 ClusterModule 在同一个包下。

ClusterRegister 接口中定义了注册集群中一个节点地址的方法:

java
public interface ClusterRegister extends Service {
    void registerRemote(RemoteInstance remoteInstance);
}

这里的参数 RemoteInstance 表示了 OAP 集群中的一个节点,有三个关键信息:host 地址、port 端口以及 isSelf 标识,其中 isSelf 标识该 RemoteInstance 对象是否表示当前 OAP 节点本身。

在 ClusterNodesQuery 接口中定义了查询集群中所有远端节点地址的方法:

java
public interface ClusterNodesQuery extends Service {
    List<RemoteInstance> queryRemoteNodes(); // 查询集群中全部节点
}

cluster-standalone-plugin 模块

在 server-cluster-plugin 模块下有多个子模块实现了 ClusterModule 的功能,如下图所示:

这里的 cluster-standalone-plugin 模块实现了单机模式,在其 ModuleProvider SPI 文件中指定的实现类是 ClusterModuleStandaloneProvider,其 name() 方法固定返回 "standalone" 字符串,与 application.yml 配置文件对应。requireModules() 方法返回空数组,表示不依赖其他任何 Module。

在 prepare() 方法中,ClusterModuleStandaloneProvider 会创建 StandaloneManager 实例。StandaloneManager 同时实现了 ClusterNodesQuery 接口和 ClusterRegister 接口,如下图所示:

StandaloneManager 中只有一个 RemoteInstance 类型字段(isSelf 标识始终为 true,表示当前 OAP 服务本身),其 registerRemote() 方法和 queryRemoteNodes() 方法都是操作这一个 RemoteInstance 字段,这样就实现了简单的单机模式。

curator-x-discovery 扩展库

在开始介绍 cluster-zookeeper-plugin 模块之前,需要先了解其中使用到的 curator-x-discovery 依赖的功能。

为了避免 curator-recipes 包过于膨胀,Curator 将很多其他解决方案都拆出来了,作为单独的一个包,命名方式就是 curator-x-*,例如:curator-x-discovery、curator-x-rpc。

在 SkyWalking 中的 cluster-zookeeper-plugin 模块就使用了 curator-x-discovery 这个包。curator-x-discovery 扩展包是一个服务发现的解决方案。在 ZooKeeper 中,我们可以使用临时节点(Ephemeral Node)实现一个服务注册机制。当服务启动后在 ZooKeeper 的指定 Path 下创建临时节点,服务断掉与 ZooKeeper 的会话之后,其相应的临时节点就会被删除。这个 curator-x-discovery 扩展包抽象了这种功能,并提供了一套简单的 API 来实现服务发现机制。curator-x-discovery 扩展包的核心概念如下:

  • ServiceInstance:ServiceInstance 是 curator-x-discovery 扩展包对服务实例的抽象,ServiceInstance 由 name、id、address、port 以及一个可选的 payload 属性构成。ServiceInstance 序列化并存储在 ZooKeeper 中的方式如下:
  • ServiceProvider:ServiceProvider 是 curator-x-discovery 扩展包的核心,它提供了多种不同策略的服务发现方式,具体策略有:轮询调度、随机和黏性(总是选择相同的一个)。得到 ServiceProvider 对象之后,我们可以调用其 getInstance() 方法,按照指定策略获取 ServiceInstance 对象(即发现可用服务实例);还可以调用 getAllInstances() 方法,获取所有 ServiceInstance 对象(即获取全部可用服务实例)。
  • ServiceDiscovery:ServiceDiscovery 是 curator-x-discovery 扩展包的入口类。开始必须调用 start() 方法,当使用完成应该调用 close() 方法进行销毁。
  • ServiceCache:如果程序中会频繁地查询 ServiceInstance 对象、添加 ServiceCache 缓存,ServiceCache 会在内存中缓存 ServiceInstance 实例的列表,并且添加相应的 Watcher 来同步更新缓存。查询 ServiceCache 的方式也是 getInstances() 方法。另外,ServiceCache 上还可以添加 Listener 来监听缓存变化。

cluster-zookeeper-plugin 模块

在前面课时介绍 OAP 启动流程时看到,OAP 会先从 application.yml 配置文件中读取配置信息,存入 ApplicationConfiguration、ModuleConfiguration、ProviderConfiguration 之中,最后将这些配置信息转换成 Java Bean(即 ModuleProvider 对应的 ModuleConfig 实现类)。

cluster-zookeeper-plugin 模块中的 ModuleProvider SPI 文件中指定的实现类是 ClusterModuleZookeeperProvider,其对应的 ModuleConfig 实现类是

ClusterModuleZookeeperConfig 类,该类的字段如下,与 application.yml 配置文件中的字段一一对应:

java
private String nameSpace; // 命名空间,即Zk节点的路径,默认值为"/skywalking"
private String hostPort; //  Zookeeper集群地址
private int baseSleepTimeMs; // 两次重试之间的初始间隔时间,后面间隔会指数增长
private int maxRetries; // 最大重试次数

ClusterModuleZookeeperProvider 的 name() 方法固定返回 "zookeeper" 字符串,与 application.yml 配置文件对应。requireModules() 方法返回空数组,表示不依赖任何其他 Module。

在 prepare() 方法中,ClusterModuleZookeeperProvider 会初始化前文介绍的 curator-x-discovery 扩展库,下面是具体实现,其中同时展示了 curator-x-discovery 扩展库的基础使用:

java
public void prepare(){
    RetryPolicy retryPolicy = // 重试策略
        new ExponentialBackoffRetry(config.getBaseSleepTimeMs(),       
            config.getMaxRetries());
    // 创建Curator客户端
    client = CuratorFrameworkFactory.newClient(config.getHostPort(), 
        retryPolicy);
    // 存储ServiceInstance实例的节点路径
    String path = BASE_PATH + (StringUtil.isEmpty(
        config.getNameSpace()) ? "" : "/" + config.getNameSpace());
    // 创建ServiceDiscovery
    serviceDiscovery = ServiceDiscoveryBuilder.builder(
            RemoteInstance.class).client(client) // 依赖Curator客户端
        .basePath(path) // 管理的Zk路径
        .watchInstances(true) // 当ServiceInstance加载
         // 这里的SWInstanceSerializer是将RemoteInstance序列化成Json
        .serializer(new SWInstanceSerializer()).build();
    client.start(); // 启动Curator客户端
    client.blockUntilConnected(); // 阻塞当前线程,等待连接成功
    serviceDiscovery.start(); // 启动ServiceDiscovery
    // 创建ZookeeperCoordinator对象
    ZookeeperCoordinator coordinator = 
        new ZookeeperCoordinator(config, serviceDiscovery);
    // 注册ClusterRegister、ClusterNodesQuery实现
    this.registerServiceImplementation(ClusterRegister.class, 
        coordinator);
    this.registerServiceImplementation(ClusterNodesQuery.class, 
        coordinator);
}

ZookeeperCoordinator 同时实现了 ClusterRegister、ClusterNodesQuery 两个接口,如下图所示:

在 registerRemote() 方法中会将 RemoteInstance 实例转换成 curator-x-discovery 扩展库中的 ServiceInstance 对象,并注册到 ZooKeeper。注意,这里传入的 RemoteInstance 实例中的 isSelf 标识都是 true,因为每个 OAP 服务只会暴露自身的地址(也只知道自身的地址)。

java
synchronized void registerRemote(RemoteInstance remoteInstance){
    String remoteNamePath = "remote";
    // 将RemoteInstance对象转换成ServiceInstance对象
    ServiceInstance<RemoteInstance> thisInstance = ServiceInstance.
         <RemoteInstance>builder().name(remoteNamePath)
        .id(UUID.randomUUID().toString()) // id是随机生成的UUID
        .address(remoteInstance.getAddress().getHost())
        .port(remoteInstance.getAddress().getPort())
        .payload(remoteInstance).build();
    // 将ServiceInstance写入到Zookeeper中
    serviceDiscovery.registerService(thisInstance);
    // 创建ServiceCache,监Zookeeper相应节点的变化,也方便后续的读取
    serviceCache = serviceDiscovery.serviceCacheBuilder()
        .name(remoteNamePath).build();
    serviceCache.start(); // 启动ServiceCache
}

所有 OAP 实例在启动过程中都会通过 registerRemote() 方法将自身地址注册到 ZooKeeper 集群。在需要获取所有 OAP 实例地址的时候,就可以通过 ZookeeperCoordinator 的queryRemoteNodes() 方法进行查询。

在 queryRemoteNodes() 方法中会查询 ServiceCache 获取全部的 ServiceInstance 对象,然后从每个 ServiceInstance 对象的 playload 字段中反序列化得到 RemoteInstance 实例(如果其中地址与当前 OAP 服务一致会将其 isSelf 字段设置为 true,标识本地 OAP 服务的地址)返回。

总结

本课时重点介绍了 SkyWalking 中提供的 Cluster 相关模块。首先介绍了ClusterModule 的相关内容,然后介绍了 cluster-standalone-plugin 模块对单机模式的支持,最后介绍了 cluster-zookeeper-plugin 模块如何依赖 Apache Curator 扩展包 curator-x-discovery 以及 ZooKeeper 集群实现 OAP 集群的功能。