Skip to main content

Http长轮询数据同步源码分析

· One min read
Apache ShenYu Committer

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于Http长轮询的数据同步源码分析。

本文基于shenyu-2.5.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. Http长轮询#

这里直接引用官网的相关描述:

ZookeeperWebSocket 数据同步的机制比较简单,而 Http长轮询则比较复杂。 Apache ShenYu 借鉴了 ApolloNacos 的设计思想,取其精华,自己实现了 Http长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

Http长轮询 机制如上所示,Apache ShenYu网关主动请求 shenyu-admin 的配置服务,读取超时时间为 90s,意味着网关层请求配置服务最多会等待 90s,这样便于 shenyu-admin 配置服务及时响应变更数据,从而实现准实时推送。

Http长轮询 机制是由网关主动请求 shenyu-admin ,所以这次的源码分析,我们从网关这一侧开始。

2. 网关数据同步#

2.1 加载配置#

Http长轮询 数据同步配置的加载是通过spring bootstarter机制,当我们引入相关依赖和在配置文件中有如下配置时,就会加载。

pom文件中引入依赖:

<!--shenyu data sync start use http--><dependency>    <groupId>org.apache.shenyu</groupId>    <artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>    <version>${project.version}</version></dependency>

application.yml配置文件中添加配置:

shenyu:    sync:       http:          url : http://localhost:9095

当网关启动时,配置类HttpSyncDataConfiguration就会执行,加载相应的Bean

/** * Http sync data configuration for spring boot. */@Configuration@ConditionalOnClass(HttpSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")@EnableConfigurationProperties(value = HttpConfig.class)public class HttpSyncDataConfiguration {
  private static final Logger LOGGER = LoggerFactory.getLogger(HttpSyncDataConfiguration.class);
  /**   * Rest template.   * 创建RestTemplate   * @param httpConfig the http config       http配置   * @return the rest template   */  @Bean  public RestTemplate restTemplate(final HttpConfig httpConfig) {    OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();    factory.setConnectTimeout(Objects.isNull(httpConfig.getConnectionTimeout()) ? (int) HttpConstants.CLIENT_POLLING_CONNECT_TIMEOUT : httpConfig.getConnectionTimeout());    factory.setReadTimeout(Objects.isNull(httpConfig.getReadTimeout()) ? (int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT : httpConfig.getReadTimeout());    factory.setWriteTimeout(Objects.isNull(httpConfig.getWriteTimeout()) ? (int) HttpConstants.CLIENT_POLLING_WRITE_TIMEOUT : httpConfig.getWriteTimeout());    return new RestTemplate(factory);  }
  /**   * AccessTokenManager.   * 创建AccessTokenManager,专门用户对admin进行http请求时access token的处理   * @param httpConfig   the http config.         * @param restTemplate the rest template.   * @return the access token manager.   */  @Bean  public AccessTokenManager accessTokenManager(final HttpConfig httpConfig, final RestTemplate restTemplate) {    return new AccessTokenManager(restTemplate, httpConfig);  }
  /**   * Http sync data service.   * 创建 HttpSyncDataService    * @param httpConfig         the http config   * @param pluginSubscriber   the plugin subscriber   * @param restTemplate       the rest template   * @param metaSubscribers    the meta subscribers   * @param authSubscribers    the auth subscribers   * @param accessTokenManager the access token manager   * @return the sync data service   */  @Bean  public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig,                                             final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                             final ObjectProvider<RestTemplate> restTemplate,                                             final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,                                             final ObjectProvider<List<AuthDataSubscriber>> authSubscribers,                                             final ObjectProvider<AccessTokenManager> accessTokenManager) {    LOGGER.info("you use http long pull sync shenyu data");    return new HttpSyncDataService(            Objects.requireNonNull(httpConfig.getIfAvailable()),            Objects.requireNonNull(pluginSubscriber.getIfAvailable()),            Objects.requireNonNull(restTemplate.getIfAvailable()),            metaSubscribers.getIfAvailable(Collections::emptyList),            authSubscribers.getIfAvailable(Collections::emptyList),            Objects.requireNonNull(accessTokenManager.getIfAvailable())    );  }}

HttpSyncDataConfigurationHttp长轮询数据同步的配置类,负责创建HttpSyncDataService(负责http数据同步的具体实现)、RestTemplateAccessTokenManager (负责与adminhttp调用时access token的处理)。它的注解如下:

  • @Configuration:表示这是一个配置类;
  • @ConditionalOnClass(HttpSyncDataService.class):条件注解,表示要有HttpSyncDataService这个类;
  • @ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url"):条件注解,要有shenyu.sync.http.url这个属性配置。
  • @EnableConfigurationProperties(value = HttpConfig.class):表示让HttpConfig上的注解@ConfigurationProperties(prefix = "shenyu.sync.http")生效,将HttpConfig这个配置类注入Ioc容器中。

2.2 属性初始化#

  • HttpSyncDataService

HttpSyncDataService的构造函数中,完成属性初始化。

public class HttpSyncDataService implements SyncDataService {
    // 省略了属性字段......
    public HttpSyncDataService(final HttpConfig httpConfig,                               final PluginDataSubscriber pluginDataSubscriber,                               final RestTemplate restTemplate,                               final List<MetaDataSubscriber> metaDataSubscribers,                               final List<AuthDataSubscriber> authDataSubscribers,                               final AccessTokenManager accessTokenManager) {          // 1.设置accessTokenManager          this.accessTokenManager = accessTokenManager;          // 2.创建数据处理器          this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);          // 3.shenyu-admin的url, 多个用逗号(,)分割          this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));          // 4.只用于http长轮询的restTemplate          this.restTemplate = restTemplate;          // 5.开始执行长轮询任务          this.start();    }
    //......}

上面代码中省略了其他函数和相关字段,在构造函数中完成属性的初始化,主要是:

  • 设置accessTokenManager,定时向admin请求更新accessToken的值。然后每次向admin发起请求时都必须将headerX-Access-Token属性设置成accessToken对应的值;

  • 创建数据处理器,用于后续缓存各种类型的数据(插件、选择器、规则、元数据和认证数据);

  • 获取admin属性配置,主要是获取adminurladmin有可能是集群,多个用逗号(,)分割;

  • 设置RestTemplate,用于向admin发起请求;

  • 开始执行长轮询任务。

2.3 开始长轮询#

  • HttpSyncDataService#start()

start()方法中,干了两件事情,一个是获取全量数据,即请求admin端获取所有需要同步的数据,然后将获取到的数据缓存到网关内存中。另一个是开启多线程执行长轮询任务。

public class HttpSyncDataService implements SyncDataService {        // ......
    private void start() {      // // 只初始化一次,通过原子类实现。       if (RUNNING.compareAndSet(false, true)) {        // 初次启动,获取全量数据        this.fetchGroupConfig(ConfigGroupEnum.values());        // 一个后台服务,一个线程        int threadSize = serverList.size();        // 自定义线程池        this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,                new LinkedBlockingQueue<>(),                ShenyuThreadFactory.create("http-long-polling", true));        // 开始长轮询,一个admin服务,创建一个线程用于数据同步        this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));      } else {        LOG.info("shenyu http long polling was started, executor=[{}]", executor);      }    }      // ......}
2.3.1 获取全量数据#
  • HttpSyncDataService#fetchGroupConfig()

ShenYu将所有需要同步的数据进行了分组,一共有5种数据类型,分别是插件、选择器、规则、元数据和认证数据。

public enum ConfigGroupEnum {    APP_AUTH, // 认证数据    PLUGIN, //插件    RULE, // 规则    SELECTOR, // 选择器    META_DATA; // 元数据}

admin有可能是集群,这里通过循环的方式向每个admin发起请求,有一个执行成功了,那么向admin获取全量数据并缓存到网关的操作就执行成功。如果出现了异常,就向下一个admin发起请求。

public class HttpSyncDataService implements SyncDataService {
  // ......
  private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException {    // admin有可能是集群,这里通过循环的方式向每个admin发起请求    for (int index = 0; index < this.serverList.size(); index++) {      String server = serverList.get(index);      try {        // 真正去执行        this.doFetchGroupConfig(server, groups);        // 有一个成功,就成功了,可以退出循环        break;      } catch (ShenyuException e) {        // 出现异常,尝试执行下一个        // 最后一个也执行失败了,抛出异常        if (index >= serverList.size() - 1) {          throw e;        }        LOG.warn("fetch config fail, try another one: {}", serverList.get(index + 1));      }    }  }
  // ......}
  • HttpSyncDataService#doFetchGroupConfig()

在此方法中,首先拼装请求参数,然后通过httpClient发起请求,到admin中获取数据,最后将获取到的数据更新到网关内存中。

public class HttpSyncDataService implements SyncDataService {  private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {    // 1. 拼请求参数,所有分组枚举类型    StringBuilder params = new StringBuilder();    for (ConfigGroupEnum groupKey : groups) {      params.append("groupKeys").append("=").append(groupKey.name()).append("&");    }    // admin端提供的接口  /configs/fetch    String url = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_FETCH + "?" + StringUtils.removeEnd(params.toString(), "&");    LOG.info("request configs: [{}]", url);    String json;    try {      HttpHeaders headers = new HttpHeaders();      // 设置accessToken      headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());      HttpEntity<String> httpEntity = new HttpEntity<>(headers);      // 2. 发起请求,获取变更数据      json = this.restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class).getBody();    } catch (RestClientException e) {      String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());      LOG.warn(message);      throw new ShenyuException(message, e);    }    // 3. 更新网关内存中数据    boolean updated = this.updateCacheWithJson(json);    if (updated) {      LOG.debug("get latest configs: [{}]", json);      return;    }    // 更新成功,此方法就执行完成了    LOG.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);    // 服务端没有数据更新,就等30s    ThreadUtils.sleep(TimeUnit.SECONDS, 30);  }}

从代码中,可以看到 admin端提供的获取全量数据接口是 /configs/fetch,这里先不进一步深入,放在后文再分析。

获取到admin返回结果数据,并成功更新,那么此方法就执行结束了。如果没有更新成功,那么有可能是服务端没有数据更新,就等待30s

这里需要提前说明一下,网关在判断是否更新成功时,有比对数据的操作,马上就会提到。

  • HttpSyncDataService#updateCacheWithJson()

更新网关内存中的数据。使用GSON进行反序列化,从属性data中拿真正的数据,然后交给DataRefreshFactory去做更新。

    private boolean updateCacheWithJson(final String json) {        // 使用GSON进行反序列化        JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);        // if the config cache will be updated?        return factory.executor(jsonObject.getAsJsonObject("data"));    }
  • DataRefreshFactory#executor()

根据不同数据类型去更新数据,返回更新结果。具体更新逻辑交给了dataRefresh.refresh()方法。在更新结果中,有一种数据类型进行了更新,就表示此次操作发生了更新。

    public boolean executor(final JsonObject data) {        //并行更新数据        List<Boolean> result = ENUM_MAP.values().parallelStream()                .map(dataRefresh -> dataRefresh.refresh(data))                .collect(Collectors.toList());        //有一个更新就表示此次发生了更新操作        return result.stream().anyMatch(Boolean.TRUE::equals);    }
  • AbstractDataRefresh#refresh()

数据更新逻辑采用的是模板方法设计模式,通用操作在抽象方法中完成,不同的实现逻辑由子类完成。5种数据类型具体的更新逻辑有些差异,但是也存在通用的更新逻辑,类图关系如下:

在通用的refresh()方法中,负责数据类型转换,判断是否需要更新,和实际的数据刷新操作。

public abstract class AbstractDataRefresh<T> implements DataRefresh {
  // ......
  @Override  public Boolean refresh(final JsonObject data) {    // 数据类型转换    JsonObject jsonObject = convert(data);    if (Objects.isNull(jsonObject)) {      return false;    }
    boolean updated = false;    // 得到数据类型    ConfigData<T> result = fromJson(jsonObject);    // 是否需要更新    if (this.updateCacheIfNeed(result)) {      updated = true;      // 真正的更新逻辑,数据刷新操作      refresh(result.getData());    }
    return updated;  }
  // ......}
  • AbstractDataRefresh#updateCacheIfNeed()

数据转换的过程,就是根据不同的数据类型进行转换,我们就不再进一步追踪了,看看数据是否需要更新的逻辑。方法名是updateCacheIfNeed(),通过方法重载实现。

public abstract class AbstractDataRefresh<T> implements DataRefresh {
  // ......
  // result是数据  protected abstract boolean updateCacheIfNeed(ConfigData<T> result);
  // newVal是获取到的最新的值  // groupEnum 是哪种数据类型  protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {    // 如果是第一次,那么直接放到cache中,返回 true,表示此次进行了更新    if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {      return true;    }    ResultHolder holder = new ResultHolder(false);    GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {      // md5 值相同,不需要更新      if (StringUtils.equals(oldVal.getMd5(), newVal.getMd5())) {        LOG.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());        return oldVal;      }
      // 当前缓存的数据修改时间大于 新来的数据,不需要更新      // must compare the last update time      if (oldVal.getLastModifyTime() >= newVal.getLastModifyTime()) {        LOG.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum);        return oldVal;      }      LOG.info("update {} config: {}", groupEnum, newVal);      holder.result = true;      return newVal;    });    return holder.result;  }
  // ......}

从上面的源码中可以看到,有两种情况不需要更新:

  • 两个的数据的md5 值相同,不需要更新;
  • 当前缓存的数据修改时间大于 新来的数据,不需要更新。

其他情况需要更新数据。

分析到这里,就将start() 方法中初次启动,获取全量数据的逻辑分析完了,接下来是长轮询的操作。为了方便,我将start()方法再粘贴一次:

public class HttpSyncDataService implements SyncDataService {
  // ......
  private void start() {    // // 只初始化一次,通过原子类实现。     if (RUNNING.compareAndSet(false, true)) {      // 初次启动,获取全量数据      this.fetchGroupConfig(ConfigGroupEnum.values());      // 一个后台服务,一个线程      int threadSize = serverList.size();      // 自定义线程池      this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,              new LinkedBlockingQueue<>(),              ShenyuThreadFactory.create("http-long-polling", true));      // 开始长轮询,一个admin服务,创建一个线程用于数据同步      this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));    } else {      LOG.info("shenyu http long polling was started, executor=[{}]", executor);    }  }
  // ......}
2.3.2 执行长轮询任务#
  • HttpLongPollingTask#run()

长轮询任务是HttpLongPollingTask,它实现了Runnable接口,任务逻辑在run()方法中。通过while()循环实现不断执行任务,即长轮询。在每一次的轮询中有三次重试逻辑,一次轮询任务失败了,等 5s 再继续,3 次都失败了,等5 分钟再试。

开始长轮询,一个admin服务,创建一个线程用于数据同步。

class HttpLongPollingTask implements Runnable {
  private final String server;
  HttpLongPollingTask(final String server) {    this.server = server;  }
  @Override  public void run() {    // 一直轮询    while (RUNNING.get()) {      // 默认重试 3 次      int retryTimes = 3;      for (int time = 1; time <= retryTimes; time++) {        try {          doLongPolling(server);        } catch (Exception e) {          if (time < retryTimes) {            LOG.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",                    time, retryTimes - time, e.getMessage());            // 长轮询失败了,等 5s 再继续            ThreadUtils.sleep(TimeUnit.SECONDS, 5);            continue;          }          LOG.error("Long polling failed, try again after 5 minutes!", e);          // 3 次都失败了,等 5 分钟再试          ThreadUtils.sleep(TimeUnit.MINUTES, 5);        }      }    }    LOG.warn("Stop http long polling.");  }}
  • HttpSyncDataService#doLongPolling()

执行长轮询任务的核心逻辑:

  • 根据数据类型组装请求参数:md5lastModifyTime
  • 组装请求头和请求体;
  • admin发起请求,判断组数据是否发生变更;
  • 根据发生变更的组,再去获取数据。
public class HttpSyncDataService implements SyncDataService {  private void doLongPolling(final String server) {    // 组装请求参数:md5 和 lastModifyTime    MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);    for (ConfigGroupEnum group : ConfigGroupEnum.values()) {      ConfigData<?> cacheConfig = factory.cacheConfigData(group);      if (cacheConfig != null) {        String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));        params.put(group.name(), Lists.newArrayList(value));      }    }    // 组装请求头和请求体    HttpHeaders headers = new HttpHeaders();    headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);    // 设置accessToken    headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());    HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, headers);    String listenerUrl = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_LISTENER;
    JsonArray groupJson;    //向admin发起请求,判断组数据是否发生变更    //这里只是判断了某个组是否发生变更    try {      String json = this.restTemplate.postForEntity(listenerUrl, httpEntity, String.class).getBody();      LOG.info("listener result: [{}]", json);      JsonObject responseFromServer = GsonUtils.getGson().fromJson(json, JsonObject.class);      groupJson = responseFromServer.getAsJsonArray("data");    } catch (RestClientException e) {      String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());      throw new ShenyuException(message, e);    }    // 根据发生变更的组,再去获取数据    /**     * 官网对此处的解释:     * 网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。     * 这里可能会存在一个疑问:为什么不是直接将变更的数据写出?     * 我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,     * 或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。     *     * 个人理解:     * 如果将变更数据直接写出,当管理员频繁更新配置时,第一次更新了,将client移除阻塞队列,返回响应信息给网关。     * 如果这个时候进行了第二次更新,那么当前的client是不在阻塞队列中,所以这一次的变更就会错过。     * 网关层处理不及时,也是同理。     * 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。     * 如果admin有数据变更,当前网关client是没有在阻塞队列中,就不到数据。     */    if (Objects.nonNull(groupJson) && groupJson.size() > 0) {      // fetch group configuration async.      ConfigGroupEnum[] changedGroups = GsonUtils.getGson().fromJson(groupJson, ConfigGroupEnum[].class);      LOG.info("Group config changed: {}", Arrays.toString(changedGroups));      this.doFetchGroupConfig(server, changedGroups);    }  }}

这里需要特别解释一点的是:在长轮询任务中,为什么不直接拿到变更的数据?而是先判断哪个分组数据发生了变更,然后再次请求admin,获取变更数据?

官网对此处的解释是:

网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。 这里可能会存在一个疑问:为什么不是直接将变更的数据写出? 我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时, 或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。

个人理解是:

如果将变更数据直接写出,管理员频繁更新配置时,第一次更新了,将client移除阻塞队列,返回响应信息给网关。如果这个时候进行了第二次更新,那么当前的client是不在阻塞队列中,所以这一次的变更就会错过。网关层处理不及时,也是同理。 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。如果admin有数据变更,当前网关client是没有在阻塞队列中,就会更新不到数据。

我们还没有分析到admin端的处理逻辑,先大概说一下。在admin端,会将网关client放到阻塞队列,有数据变更,网关client就会出队列,发送变更数据。所以,如果有数据变更时,网关client不在阻塞队列,那么就无法得到当前变更的数据。

知道哪个分组数据发生变更时,主动再向admin获取变更的数据,根据分组不同,全量拿数据。调用方法是doFetchGroupConfig(),这个在前面已经分析过了。

分析到这里,网关端的数据同步操作就完成了。长轮询任务就是不断向admin发起请求,看看数据是否发生变更,如果有分组数据发生变更,那么就再主动向admin发起请求,获取变更数据,然后更新网关内存中的数据。

网关端长轮询任务流程:

3. admin数据同步#

从前面分析的过程中,可以看到,网关端主要调用admin的两个接口:

  • /configs/listener:判断组数据是否发生变更;
  • /configs/fetch:获取变更组数据。

直接从这两个接口分析的话,可能有的地方不好理解,所以我们还是从admin启动流程开始分析数据同步过程。

3.1 加载配置#

如果在配置文件application.yml中,进行了如下配置,就表示通过http长轮询的方式进行数据同步。

shenyu:  sync:      http:        enabled: true

程序启动时,通过springboot条件装配实现数据同步类的配置加载。在这个过程中,会创建HttpLongPollingDataChangedListener,负责处理长轮询的相关实现逻辑。

/** * 数据同步配置类 * 通过springboot条件装配实现 * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {
    /**     * http长轮询     * http long polling.     */    @Configuration    @ConditionalOnProperty(name = "shenyu.sync.http.enabled", havingValue = "true")    @EnableConfigurationProperties(HttpSyncProperties.class)    static class HttpLongPollingListener {
        @Bean        @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)        public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {            return new HttpLongPollingDataChangedListener(httpSyncProperties);        }    }}

3.2 数据变更监听器实例化#

  • HttpLongPollingDataChangedListener

数据变更监听器通过构造函数的方式完成实例化和初始化操作。在构造函数中会创建阻塞队列,用于存放客户端;创建线程池,用于执行延迟任务,周期任务;保存长轮询相关属性信息。

    public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {        // 默认客户端(这里是网关)1024个        this.clients = new ArrayBlockingQueue<>(1024);        // 创建线程池        // ScheduledThreadPoolExecutor 可以执行延迟任务,周期任务,普通任务        this.scheduler = new ScheduledThreadPoolExecutor(1,                ShenyuThreadFactory.create("long-polling", true));        // 长轮询的属性信息        this.httpSyncProperties = httpSyncProperties;    }

另外,它的类图关系如下:

实现了InitializingBean接口,所以在bean的初始化过程中执行afterInitialize()方法。通过线程池执行周期任务:更新内存中(CACHE)的数据每隔5分钟执行一次,5分钟后开始执行。刷新本地缓存就是从数据库读取数据到本地缓存(这里就是内存),通过refreshLocalCache()完成。

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
  // ......
  /**   * 在 InitializingBean接口中的afterPropertiesSet()方法中被调用,即在bean的初始化过程中执行   */  @Override  protected void afterInitialize() {    long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();    // 执行周期任务:更新内存中(CACHE)的数据每隔5分钟执行一次,5分钟后开始执行    // 防止admin先启动一段时间后,产生了数据;然后网关初次连接时,没有拿到全量数据    scheduler.scheduleWithFixedDelay(() -> {      LOG.info("http sync strategy refresh config start.");      try {        // 从数据库读取数据到本地缓存(这里就是内存)        this.refreshLocalCache();        LOG.info("http sync strategy refresh config success.");      } catch (Exception e) {        LOG.error("http sync strategy refresh config error!", e);      }    }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);    LOG.info("http sync strategy refresh interval: {}ms", syncInterval);  }
  // ......}
  • refreshLocalCache()

分别对5种数据类型进行更新。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
  // ......
  // 从数据库读取数据到本地缓存(这里就是内存)  private void refreshLocalCache() {    //更新认证数据    this.updateAppAuthCache();    //更新插件数据    this.updatePluginCache();    //更新规则数据    this.updateRuleCache();    //更新选择器数据    this.updateSelectorCache();    //更新元数据    this.updateMetaDataCache();  }
  // ......}

5个更新方法的逻辑是类似的,调用service方法获取数据,然后放到内存CACHE中。以更新规则数据方法updateRuleCache()为例,传入规则枚举类型,调用ruleService.listAll()从数据库获取所有规则数据。

    /**     * Update rule cache.     */    protected void updateRuleCache() {        this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll());    }
  • updateCache()

使用数据库中的数据更新内存中的数据。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
  // ......
  // 缓存数据的 Map  protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
  /**   * if md5 is not the same as the original, then update lcoal cache.   * 更新缓存中的数据   * @param group ConfigGroupEnum   * @param <T> the type of class   * @param data the new config data   */  protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {    //数据序列化    String json = GsonUtils.getInstance().toJson(data);    //传入md5值和修改时间    ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());    //更新分组数据    ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);    LOG.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);  }
  // ......}

初始化的过程就是启动周期性任务,定时从数据库获取数据更新内存数据。

接下来开始对两个接口开始分析:

  • /configs/listener:判断组数据是否发生变更;
  • /configs/fetch:获取变更组数据。

3.3 数据变更轮询接口#

  • /configs/listener:判断组数据是否发生变更;

接口类是ConfigController,只有使用http长轮询进行数据同步时才会生效。接口方法listener()没有其他逻辑,直接调用doLongPolling()方法。

   /** * This Controller only when HttpLongPollingDataChangedListener exist, will take effect. */@ConditionalOnBean(HttpLongPollingDataChangedListener.class)@RestController@RequestMapping("/configs")public class ConfigController {
    private final HttpLongPollingDataChangedListener longPollingListener;
    public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {      this.longPollingListener = longPollingListener;    }        // 省略其他逻辑
    /**     * Listener.     * 监听数据变更,执行长轮询     * @param request  the request     * @param response the response     */    @PostMapping(value = "/listener")    public void listener(final HttpServletRequest request, final HttpServletResponse response) {        longPollingListener.doLongPolling(request, response);    }
}
  • HttpLongPollingDataChangedListener#doLongPolling()

执行长轮询任务:如果有数据变更,将会立即响应给客户端(这里就是网关端)。否则,客户端会一直被阻塞,直到有数据变更或者超时。

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
  // ......
  /**   * 执行长轮询:如果有数据变更,会立即响应给客户端(这里就是网关端)。   * 否则,否则客户端会一直被阻塞,直到有数据变更或者超时。   * @param request   * @param response   */  public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {    // compare group md5    // 比较md5,判断网关的数据和admin端的数据是否一致,得到发生变更的数据组    List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);    String clientIp = getRemoteIp(request);    // response immediately.    // 有变更的数据,则立即向网关响应    if (CollectionUtils.isNotEmpty(changedGroup)) {      this.generateResponse(response, changedGroup);      Log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);      return;    }
    // 没有变更,则将客户端(这里就是网关)放进阻塞队列    final AsyncContext asyncContext = request.startAsync();    asyncContext.setTimeout(0L);    scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));  }
  // ......}
  • HttpLongPollingDataChangedListener#compareChangedGroup()

判断组数据是否发生变更,判断逻辑是比较网关端和admin端的md5值和lastModifyTime

  • 如果md5值不一样,那么需要更新;
  • 如果admin端的lastModifyTime大于网关端的lastModifyTime,那么需要更新。
 /**     * 判断组数据是否发生变更     * @param request     * @return     */    private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {        List<ConfigGroupEnum> changedGroup = new ArrayList<>(ConfigGroupEnum.values().length);        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {            // 网关端数据的md5值和lastModifyTime            String[] params = StringUtils.split(request.getParameter(group.name()), ',');            if (params == null || params.length != 2) {                throw new ShenyuException("group param invalid:" + request.getParameter(group.name()));            }            String clientMd5 = params[0];            long clientModifyTime = NumberUtils.toLong(params[1]);            ConfigDataCache serverCache = CACHE.get(group.name());            // do check. 判断组数据是否发生变更            if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {                changedGroup.add(group);            }        }        return changedGroup;    }
  • LongPollingClient

没有变更数据,则将客户端(这里就是网关)放进阻塞队列。阻塞时间是60秒,即60秒后移除,并响应客户端。

class LongPollingClient implements Runnable {      // 省略了其他逻辑            @Override        public void run() {            try {                // 先设置定时任务:60秒后移除,并响应客户端                this.asyncTimeoutFuture = scheduler.schedule(() -> {                    clients.remove(LongPollingClient.this);                    List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());                    sendResponse(changedGroups);                }, timeoutTime, TimeUnit.MILLISECONDS);
                // 添加到阻塞队列                clients.add(this);
            } catch (Exception ex) {                log.error("add long polling client error", ex);            }        }
        /**         * Send response.         *         * @param changedGroups the changed groups         */        void sendResponse(final List<ConfigGroupEnum> changedGroups) {            // cancel scheduler            if (null != asyncTimeoutFuture) {                asyncTimeoutFuture.cancel(false);            }            // 响应变更的组            generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);            asyncContext.complete();        }    }

3.4 获取变更数据接口#

  • /configs/fetch:获取变更数据;

根据网关传入的参数,获取分组数据,返回结果。主要实现方法是longPollingListener.fetchConfig()


@ConditionalOnBean(HttpLongPollingDataChangedListener.class)@RestController@RequestMapping("/configs")public class ConfigController {
    private final HttpLongPollingDataChangedListener longPollingListener;      public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {      this.longPollingListener = longPollingListener;    }
    /**     * Fetch configs shenyu result.     * 全量获取分组数据     * @param groupKeys the group keys     * @return the shenyu result     */    @GetMapping("/fetch")    public ShenyuAdminResult fetchConfigs(@NotNull final String[] groupKeys) {        Map<String, ConfigData<?>> result = Maps.newHashMap();        for (String groupKey : groupKeys) {            ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));            result.put(groupKey, data);        }        return ShenyuAdminResult.success(ShenyuResultMessage.SUCCESS, result);    }      // 省略了其他接口
}
  • AbstractDataChangedListener#fetchConfig()

数据获取直接从CACHE中拿,然后根据不同分组类型进行匹配,封装。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {  /**   * fetch configuration from cache.   * 获取分组下的全量数据   * @param groupKey the group key   * @return the configuration data   */  public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {    // 直接从 CACHE 中拿数据    ConfigDataCache config = CACHE.get(groupKey.name());    switch (groupKey) {      case APP_AUTH: // 认证数据        return buildConfigData(config, AppAuthData.class);      case PLUGIN: // 插件数据        return buildConfigData(config, PluginData.class);      case RULE:   // 规则数据        return buildConfigData(config, RuleData.class);      case SELECTOR:  // 选择器数据        return buildConfigData(config, SelectorData.class);      case META_DATA: // 元数据        return buildConfigData(config, MetaData.class);      default:  // 其他类型,抛出异常        throw new IllegalStateException("Unexpected groupKey: " + groupKey);    }  }}

3.5 数据变更#

在之前的websocket数据同步和zookeeper数据同步源码分析文章中,我们知道admin端数据同步设计结构如下:

各种数据变更监听器都是DataChangedListener的子类。

当在admin端修改数据后,通过Spring的事件处理机制,发送事件通知。发送逻辑如下:


/** * Event forwarders, which forward the changed events to each ConfigEventListener. * 数据变更事件分发器:当admin端有数据发生变更时,将变更数据同步到 ShenYu 网关 * 数据变更依赖于Spring的事件监听机制:ApplicationEventPublisher --> ApplicationEvent --> ApplicationListener * */@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
   //省略了其他逻辑
    /**     * 有数据变更时,调用此方法     * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                case APP_AUTH: // 认证信息                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // 插件信息                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // 规则信息                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    // 当选择器数据更新时,更新API文档信息                    applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // 元数据                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // 其他类型,抛出异常                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }}

假设,对插件信息进行了修改,通过http长轮询的方式进行数据同步,那么listener.onPluginChanged()的实际调用的是org.apache.shenyu.admin.listener.AbstractDataChangedListener#onPluginChanged

    /**     * 在admin的操作,有插件发生了更新     * @param changed   the changed     * @param eventType the event type     */    @Override    public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {        if (CollectionUtils.isEmpty(changed)) {            return;        }        // 更新内存CACHE        this.updatePluginCache();        // 执行变更任务        this.afterPluginChanged(changed, eventType);    }

有两个处理操作,一是更新内存CACHE,这个在前面分析过了;另一个是执行变更任务,在线程池中执行。

  • HttpLongPollingDataChangedListener#afterPluginChanged()
    @Override    protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {        // 在线程池中执行        scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));    }
  • DataChangeTask

数据变更任务:将阻塞队列中的客户端依次移除,并发送响应,通知网关有组数据发生变更。

class DataChangeTask implements Runnable {        //省略了其他逻辑         @Override        public void run() {            // 阻塞队列中的客户端超过了给定的值100,则分批执行            if (clients.size() > httpSyncProperties.getNotifyBatchSize()) {                List<LongPollingClient> targetClients = new ArrayList<>(clients.size());                clients.drainTo(targetClients);                List<List<LongPollingClient>> partitionClients = Lists.partition(targetClients, httpSyncProperties.getNotifyBatchSize());               // 分批执行                partitionClients.forEach(item -> scheduler.execute(() -> doRun(item)));            } else {                // 执行任务                doRun(clients);            }        }
        private void doRun(final Collection<LongPollingClient> clients) {            // 通知所有客户端发生了数据变更            for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {                LongPollingClient client = iter.next();                iter.remove();                // 发送响应                client.sendResponse(Collections.singletonList(groupKey));                LOG.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);            }        }    }

至此,admin端数据同步逻辑就分析完了。在基于http长轮询数据同步是,它主要有三个功能:

  • 提供数据变更监听接口;
  • 提供获取变更数据接口;
  • 有数据变更时,移除阻塞队列中的客户端,并响应结果。

最后,用三张图描述下admin端长轮询任务流程:

  • /configs/listener数据变更监听接口:

  • /configs/fetch获取变更数据接口:

  • 在admin后台管理系统更新数据,进行数据同步:

4. 总结#

本文主要对ShenYu网关中的http长轮询数据同步进行了源码分析。涉及到的主要知识点如下:

  • http长轮询由网关端主动发起请求,不断请求admin端;
  • 变更数据以组为粒度(认证信息、插件、选择器、规则、元数据);
  • http长轮询结果只拿到了变更组,还需要再次发起请求获取组数据;
  • 数据是否更新由md5值和修改时间lastModifyTime决定。

SPI设计实现源码分析

· One min read

背景#

最近研读Apache开源项目Shenyu网关的源码,网关的多个核心组件加载都用到了SPI模块。本文就Shenyu中的SPI设计和源码实现进行分析。

什么是SPI#

SPI就是Service Provider Interface,直译"服务提供方接口",是一种动态的服务发现机制,可以基于接口运行时动态加载接口的实现类(也就是接口编程 + 策略模式 + 配置文件的一种开发模式)。最常见的就是JDK内置的数据库驱动接口java.sql.Driver,不同的厂商可以对该接口完成不同的实现,例如MySQLMySQL驱动包中的com.mysql.jdbc.Driver)、PostgreSQLPostgreSQL驱动包中的org.postgresql.Driver)等等。

spi-jdk-api-diagram

JDK内置的SPI使用方式如下:

  • 在类路径的META-INF/services目录创建一个以接口全限定名称命名的文件(本质是一个properties)文件,例如命名为java.sql.Driver
  • 该文件中可以指定具体的实现类,也就是每个实现类的全类型限定名为单独一行,例如META-INF/services/java.sql.Driver中:
# META-INF/services/java.sql.Driver文件内容com.mysql.jdbc.Driverorg.postgresql.Driver
  • 最后通过java.util.ServiceLoader对该文件进行加载,实例化接口的对应实现类(这里隐含了一个约定,所有实现类必须提供无参构造函数

底层的实现涉及到类加载、双亲委托模型等内容,这里就不展开。基于这种设计思路,很多主流框架了自实现了一套SPI扩展,例如DubboSPI扩展模块,就是读取类路径下META-INF/services/dubbo目录的文件内容进行类加载。Shenyu-SPI模块也是沿用类似的设计思路。

shenyu-spi源码分析#

shenyu-spi模块十分精炼,代码结构如下:

- shenyu-spi[module]  - org.apache.shenyu.spi[package]    -- ExtensionFactory    -- ExtensionLoader    -- Join    -- SPI    -- SpiExtensionFactory

这些类功能如下:

  • ExtensionFactorySPI加载器工厂,本身也是一个SPI,用于基于SPI机制加载ExtensionLoader实例同时基于ExtensionLoader实例获取默认的SPI标识接口实现
  • SpiExtensionFactory:其实就是ExtensionFactory的一个实现类
  • SPI:标识注解,用于标识SPI,用于接口上
  • Join:标识注解,用于实现类上,用于标识该类加入SPI系统
  • ExtensionLoaderSPI加载器,类比java.util.ServiceLoader,用于加载SPI中接口的实现类

接下来细看每个类的源码实现。

@SPI#

org.apache.shenyu.spi.SPI作为一个标识注解,主要用于接口上,也就是只有使用了@SPI的接口才能被shenyu-spi加载。这个类的注释中描述到:所有SPI系统相关参考Apache Dubbo的实现(这一点比较合情理,其实SPI扩展已经是一种成熟的方案,实现上大同小异)。该注解只有一个方法:

@Documented@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)public @interface SPI {
    /**     * Value string.     *     * @return the string     */    String value() default "";}

唯一的value()方法用于指定默认的SPI实现(可选的),后文在展开ExtensionLoader时候会说明。

@Join#

org.apache.shenyu.spi.Join也是一个标识注解,主要用在使用了@SPI注解的接口的实现类上,用于标识该类加入SPI系统中而后可以被ExtensionLoader加载。该注解也只有一个方法:

@Documented@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)public @interface Join {        /**     * It will be sorted according to the current serial number..     * @return int.     */    int order() default 0;}

唯一的order()方法用于指定具体的顺序号,单个使用了@SPI的接口存在多个使用了@Join的实现类的时候,这个顺序号就确定了这些实现类实例的排序(顺序号小的排在前面)。

ExtensionLoader#

ExtensionLoader就是"类型扩展加载器",就是整个SPI模块的核心。先看其成员属性:

public final class ExtensionLoader<T> {        // SLF4J日志句柄    private static final Logger LOG = LoggerFactory.getLogger(ExtensionLoader.class);        // SPI配置文件基于类路径下的相对目录    private static final String SHENYU_DIRECTORY = "META-INF/shenyu/";        // @SPI标识接口类型 -> ExtensionLoader实例的缓存 => 注意这个是一个全局的静态变量    private static final Map<Class<?>, ExtensionLoader<?>> LOADERS = new ConcurrentHashMap<>();        // 当前@SPI标识接口类型    private final Class<T> clazz;        // 类加载器实例    private final ClassLoader classLoader;        // 当前ExtensionLoader缓存的已加载的实现类信息,使用值持有器包装,是一个HashMap,映射关系:实现类别名 -> 实现类信息    private final Holder<Map<String, ClassEntity>> cachedClasses = new Holder<>();        // 当前ExtensionLoader缓存的已加载的实现类实例的值包装器,使用值持有器包装,映射关系:实现类别名 -> 值持有器包装的实现类实体    private final Map<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();        // 当前ExtensionLoader缓存的已加载的实现类实例,使用值持有器包装,映射关系:实现类类型 -> 实现类实体    private final Map<Class<?>, Object> joinInstances = new ConcurrentHashMap<>();        // 缓存默认名称,来源于@SPI注解的value()方法非空白返回值,用于加载默认的接口实现    private String cachedDefaultName;        // Holder比较器,按照Holder的order降序,也就是顺序号小的排在前面    private final Comparator<Holder<Object>> holderComparator = (o1, o2) -> {        if (o1.getOrder() > o2.getOrder()) {            return 1;        } else if (o1.getOrder() < o2.getOrder()) {            return -1;        } else {            return 0;        }    };        // ClassEntity比较器,按照ClassEntity的order降序,也就是顺序号小的排在前面    private final Comparator<ClassEntity> classEntityComparator = (o1, o2) -> {        if (o1.getOrder() > o2.getOrder()) {            return 1;        } else if (o1.getOrder() < o2.getOrder()) {            return -1;        } else {            return 0;        }    };        // 暂时省略其他代码
    // 值持有器,简单VO,用来存储泛型值和值加载顺序    public static class Holder<T> {                // 这里的值引用是volatile修饰,便于某线程更变另一线程马上读到最新的值        private volatile T value;                private Integer order;        // 省略setter和getter代码    }        // 类实体,主要存放加载的实现类的信息    static final class ClassEntity {                // 名称,这里是指SPI实现类的别名,不是类名        private String name;                // 加载顺序号        private Integer order;                // SPI实现类        private Class<?> clazz;                private ClassEntity(final String name, final Integer order, final Class<?> clazz) {            this.name = name;            this.order = order;            this.clazz = clazz;        }        // 省略setter和getter代码    }}

分析完成员属性,不难发现下面几点:

  • ExtensionLoader会存在一个全局的静态缓存LOADERS,缓存已经创建的ExtensionLoader实例以防止重复创建的性能开销
  • 每个@SPI标记的接口如果使用ExtensionLoader进行加载,都会生成一个全新的ExtensionLoader实例
  • @SPI标记的接口如果有多个实现,那么最终获取到这些实现实例的时候是有序的

接着看其构造函数和静态工厂方法:

// 私有构造函数,需要入参为@SPI标识的接口类型和类加载器实例private ExtensionLoader(final Class<T> clazz, final ClassLoader cl) {    // 成员变量clazz赋值    this.clazz = clazz;    // 成员变量classLoader赋值    this.classLoader = cl;    // 这里对于非ExtensionFactory接口类型会懒加载一个用于加载ExtensionFactory的ExtensionLoader    if (!Objects.equals(clazz, ExtensionFactory.class)) {        ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getExtensionClassesEntity();    }}
// 实例化getExtensionLoader,静态工厂方法,需要入参为@SPI标识的接口类型和类加载器实例public static <T> ExtensionLoader<T> getExtensionLoader(final Class<T> clazz, final ClassLoader cl) {    // 前缀校验,接口类型必须非空并且必须存在@SPI注解,否则抛出异常中断    Objects.requireNonNull(clazz, "extension clazz is null");    if (!clazz.isInterface()) {        throw new IllegalArgumentException("extension clazz (" + clazz + ") is not interface!");    }    if (!clazz.isAnnotationPresent(SPI.class)) {        throw new IllegalArgumentException("extension clazz (" + clazz + ") without @" + SPI.class + " Annotation");    }    // 从缓存LOADERS中加载ExtensionLoader实例,不存在则创建,典型的懒加载模式    ExtensionLoader<T> extensionLoader = (ExtensionLoader<T>) LOADERS.get(clazz);    if (Objects.nonNull(extensionLoader)) {        return extensionLoader;    }    LOADERS.putIfAbsent(clazz, new ExtensionLoader<>(clazz, cl));    return (ExtensionLoader<T>) LOADERS.get(clazz);}
// 实例化getExtensionLoader,静态工厂方法,需要入参为@SPI标识的接口类型,使用ExtensionLoader类的类加载器public static <T> ExtensionLoader<T> getExtensionLoader(final Class<T> clazz) {    return getExtensionLoader(clazz, ExtensionLoader.class.getClassLoader());}

ExtensionLoader使用了私有构造器,使用了静态工厂方法和懒加载模式。初始化ExtensionLoader完成后并不会触发类加载工作,真正的扫描和加载行为延迟到调用getJoin系列方法执行,这里扫码和加载所有实现类信息的方法调用链:

// 加载所有扩展类信息,这里采用了DCL(双重锁校验)防止并发加载private Map<String, ClassEntity> getExtensionClassesEntity() {    // 缓存不存在    Map<String, ClassEntity> classes = cachedClasses.getValue();    if (Objects.isNull(classes)) {        // 加锁后再检查一次缓存        synchronized (cachedClasses) {            classes = cachedClasses.getValue();            if (Objects.isNull(classes)) {                // 最终确认缓存不存在,则进行加载,并且标记顺序号为0                classes = loadExtensionClass();                cachedClasses.setValue(classes);                cachedClasses.setOrder(0);            }        }    }    return classes;}
// 加载当前ExtensionLoader中clazz的所有SPI系统内的实现类private Map<String, ClassEntity> loadExtensionClass() {    SPI annotation = clazz.getAnnotation(SPI.class);    if (Objects.nonNull(annotation)) {        // 这里就是前面提到,如果@SPI注解的value()方法非空白返回值会作为默认实现的别名        // 也就是如果只使用了@SPI,那么就无法获取默认实现        // 如果使用了@SPI("foo"),可以通过别名foo去映射和获取默认实现        String value = annotation.value();        if (StringUtils.isNotBlank(value)) {            cachedDefaultName = value;        }    }    // 初始化一个Hashmap容器用于存储加载的实现类信息,这个变量会透传到下一个方法链    Map<String, ClassEntity> classes = new HashMap<>(16);    // 加载目录中的属性文件    loadDirectory(classes);    return classes;}
// 加载目录中的属性文件,并且加载文件中的实现类,目标目录:META-INF/shenyu/private void loadDirectory(final Map<String, ClassEntity> classes) {    // 文件名 => META-INF/shenyu/$className    String fileName = SHENYU_DIRECTORY + clazz.getName();    try {        // 这里使用类加载器加载文件资源,如果传入的类加载器为空会使用系统类加载器        Enumeration<URL> urls = Objects.nonNull(this.classLoader) ? classLoader.getResources(fileName)                : ClassLoader.getSystemResources(fileName);        // 遍历解析的文件URL集合        if (Objects.nonNull(urls)) {            while (urls.hasMoreElements()) {                URL url = urls.nextElement();                // 通过文件URL加载资源                loadResources(classes, url);            }        }    } catch (IOException t) {        LOG.error("load extension class error {}", fileName, t);    }}
// 加载文件资源,解析文件并且加载实现类存储到classes中private void loadResources(final Map<String, ClassEntity> classes, final URL url) throws IOException {    // 读取URL文件资源,加载到Properties中,每行格式为name=classPath    try (InputStream inputStream = url.openStream()) {        Properties properties = new Properties();        properties.load(inputStream);        properties.forEach((k, v) -> {            String name = (String) k;            String classPath = (String) v;            if (StringUtils.isNotBlank(name) && StringUtils.isNotBlank(classPath)) {                try {                    // 基于name和classPath进行类加载                    loadClass(classes, name, classPath);                } catch (ClassNotFoundException e) {                    throw new IllegalStateException("load extension resources error", e);                }            }        });    } catch (IOException e) {        throw new IllegalStateException("load extension resources error", e);    }}
// 基于name(别名)和classPath(类全限定名称)进行类加载private void loadClass(final Map<String, ClassEntity> classes,                        final String name, final String classPath) throws ClassNotFoundException {    // 类初始化,并且确定实现类必须是当前@SPI注解标识接口的子类    Class<?> subClass = Objects.nonNull(this.classLoader) ? Class.forName(classPath, true, this.classLoader) : Class.forName(classPath);    if (!clazz.isAssignableFrom(subClass)) {        throw new IllegalStateException("load extension resources error," + subClass + " subtype is not of " + clazz);    }    // 实现类必须存在注解@Join    if (!subClass.isAnnotationPresent(Join.class)) {        throw new IllegalStateException("load extension resources error," + subClass + " without @" + Join.class + " annotation");    }    // 如果缓存中不存在同样别名的实现类才进行缓存,已经存在则校验旧的类型和当前实现类型是否一致    ClassEntity oldClassEntity = classes.get(name);    if (Objects.isNull(oldClassEntity)) {        // 创建类信息实体保存别名、顺序号和实现类并且缓存,映射关系:别名 -> 类信息实体        Join joinAnnotation = subClass.getAnnotation(Join.class);        ClassEntity classEntity = new ClassEntity(name, joinAnnotation.order(), subClass);        classes.put(name, classEntity);    } else if (!Objects.equals(oldClassEntity.getClazz(), subClass)) {        throw new IllegalStateException("load extension resources error,Duplicate class " + clazz.getName() + " name "                + name + " on " + oldClassEntity.getClazz().getName() + " or " + subClass.getName());    }}

通过方法链getExtensionClassesEntity -> loadExtensionClass -> loadDirectory -> loadResources -> loadClass最终得到一个别名->实现类信息的映射,用于后续的实例化,见getJoin()方法:

// 基于别名获取实现类实例public T getJoin(final String name) {    // 别名必须为非空白字符串    if (StringUtils.isBlank(name)) {        throw new NullPointerException("get join name is null");    }    // 这里也使用DCL去cachedInstances缓存中取别名对应的值持有器,值持有器为空则创建    Holder<Object> objectHolder = cachedInstances.get(name);    if (Objects.isNull(objectHolder)) {        cachedInstances.putIfAbsent(name, new Holder<>());        objectHolder = cachedInstances.get(name);    }    Object value = objectHolder.getValue();    if (Objects.isNull(value)) {        synchronized (cachedInstances) {            // 加锁后再次判断值持有器中的值是否存在,不存在的时候则进行实现类实例化            value = objectHolder.getValue();            if (Objects.isNull(value)) {                Holder<T> pair = createExtension(name);                value = pair.getValue();                int order = pair.getOrder();                // 实例化完成后更新值持有器缓存                objectHolder.setValue(value);                objectHolder.setOrder(order);            }        }    }    return (T) value;}
// 基于别名搜索已经加载的实现类信息,并且实例化对应的实现类进行值包装private Holder<T> createExtension(final String name) {    // 加载该@SPI标识接口的所有实现类信息并且获取对应别名的实现类信息    ClassEntity classEntity = getExtensionClassesEntity().get(name);    if (Objects.isNull(classEntity)) {        throw new IllegalArgumentException("name is error");    }    Class<?> aClass = classEntity.getClazz();    // 如果实现类实例缓存中已经存在,则直接封装为值包装器返回,否则进行实例化    Object o = joinInstances.get(aClass);    if (Objects.isNull(o)) {        try {            // 反射实例化并且缓存该实现类实例            joinInstances.putIfAbsent(aClass, aClass.newInstance());            o = joinInstances.get(aClass);        } catch (InstantiationException | IllegalAccessException e) {            throw new IllegalStateException("Extension instance(name: " + name + ", class: "                    + aClass + ")  could not be instantiated: " + e.getMessage(), e);                    }    }    Holder<T> objectHolder = new Holder<>();    objectHolder.setOrder(classEntity.getOrder());    objectHolder.setValue((T) o);    return objectHolder;}

createExtension()方法中可以看到最终是使用反射方式实例化实现类,反射方法newInstance()要求该类必须提供无参构造函数,因为这里有一点隐含的约定:SPI实现类必须提供无参构造函数,否则会实例化失败。剩余的getDefaultJoin()getJoins()是基于getJoin()方法进行扩展,功能并不复杂,这里就不展开分析了。另外,在getJoin()方法用到了多级缓存:

  • cachedInstances:通过别名就可以搜索到对应的实现类实例
  • joinInstances:别名查找失败,则加载所有实现类信息,然后通过别名定位实现类类型,再通过实现类类型查找或者创建并缓存实现类实例后更新cachedInstances缓存

到此,ExtensionLoader的源码分析完毕。这里再通过一个ExtensionLoader实例成员属性内存布局图可以加深理解:

spi-attr-memory-debug

ExtensionFactory#

ExtensionFactory是工厂模式里面的工厂接口,该接口定义了一个获取SPI实现(默认实现,唯一)实例的方法:

@SPI("spi")public interface ExtensionFactory {
    /**     * Gets Extension.     *     * @param <T>   the type parameter     * @param key   此参数暂时没有使用,猜测是预留用于映射@SPI的value()     * @param clazz @SPI标识的接口类型     * @return the extension     */    <T> T getExtension(String key, Class<T> clazz);}

接着看其实现类SpiExtensionFactory的代码:

@Joinpublic class SpiExtensionFactory implements ExtensionFactory {
    @Override    public <T> T getExtension(final String key, final Class<T> clazz) {        return Optional.ofNullable(clazz)   // 入参clazz非空                .filter(Class::isInterface)  // 入参clazz必须是接口                .filter(cls -> cls.isAnnotationPresent(SPI.class))  // 入参clazz必须被@SPI标识                .map(ExtensionLoader::getExtensionLoader)  // 基于clazz这个接口类型实例化ExtensionLoader                .map(ExtensionLoader::getDefaultJoin)  // 获取该@SPI标识接口的默认实现,不存在则返回NULL                .orElse(null);    }}

这里值得注意的是:ExtensionFactory本身也是SPI系统的一部分。因此使用ExtensionFactory的时候可以直接实例化:

ExtensionFactory extensionFactory = new SpiExtensionFactory();

也可以基于ExtensionLoader进行加载:

# 在类路径META-INF/services/shenyu目录下添加一个属性文件org.apache.shenyu.spi.ExtensionFactory,内容是spi=org.apache.shenyu.spi.SpiExtensionFactory
# 然后基于ExtensionLoader进行加载ExtensionFactory extensionFactory = ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getDefaultJoin();

得到ExtensionFactory实例后就可以基于@SPI接口加载其默认实现类的实例。

扩展和建议#

下面是个人的一些见解。目前来看,Shenyu中的SPI模块功能是完备的,建议考虑引入两个常用的功能:

  • 可以在Join注解中添加属性标记SPI接口实现类生成的实例是单例还是全新实例,类似于Spring中的Scope声明(singleton或者prototype)那样,例如:
@Documented@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)public @interface Join {        /**     * It will be sorted according to the current serial number..     * @return int.     */    int order() default 0;
    /**     * The join class instance should be singleton or not     * @return true or false     */    boolean isSingleton() default true;}
  • 可选地SPI的实现类实现一个初始化器接口,在该实现类实例化后回调初始化器接口方法,例如:
public interface ExtensionInitializer {        void init();  }
/** * demo */@SPIpublic interface JdbcSPI {
    String getClassName();}
@Joinpublic class MysqlSPI implements JdbcSPI, ExtensionInitializer {
    @Override    public void init() {        // callback when MysqlSPI instance init    }        @Override    public String getClassName() {        return "mysql";    }}

如果添加了这两点,能够满足很多现实场景的需求。另外,ExtensionLoader中的两处比较器成员变量可以进行代码精简,例如对classEntityComparator而言:

private final Comparator<ClassEntity> classEntityComparator = (o1, o2) -> {    if (o1.getOrder() > o2.getOrder()) {        return 1;    } else if (o1.getOrder() < o2.getOrder()) {        return -1;    } else {        return 0;    }};
// 可以精简为Comparator提供的静态工厂方法和方法引用private final Comparator<ClassEntity> classEntityComparator = Comparator.comparing(ClassEntity::getOrder);

小结#

基于Java原生SPI设计思路上设计出来的SPI框架具备了松耦合、高易用性和高扩展性的特点,并且添加了加载实例缓存、并发安全等特性,填补了原生JDKSPI的一些缺陷,ShenyuSPI模块也是如此。正是由于此强大的SPI模块的存在,Shenyu中的其他模块如Plugin模块可以实现快速插拔式配置,让加载一个全新开发的插件实例变得更加容易。