Skip to main content

One post tagged with "http"

View All Tags

Http长轮询数据同步源码分析

· One min read
Apache ShenYu Committer

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于Http长轮询的数据同步源码分析。

本文基于shenyu-2.5.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. Http长轮询#

这里直接引用官网的相关描述:

ZookeeperWebSocket 数据同步的机制比较简单,而 Http长轮询则比较复杂。 Apache ShenYu 借鉴了 ApolloNacos 的设计思想,取其精华,自己实现了 Http长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

Http长轮询 机制如上所示,Apache ShenYu网关主动请求 shenyu-admin 的配置服务,读取超时时间为 90s,意味着网关层请求配置服务最多会等待 90s,这样便于 shenyu-admin 配置服务及时响应变更数据,从而实现准实时推送。

Http长轮询 机制是由网关主动请求 shenyu-admin ,所以这次的源码分析,我们从网关这一侧开始。

2. 网关数据同步#

2.1 加载配置#

Http长轮询 数据同步配置的加载是通过spring bootstarter机制,当我们引入相关依赖和在配置文件中有如下配置时,就会加载。

pom文件中引入依赖:

<!--shenyu data sync start use http--><dependency>    <groupId>org.apache.shenyu</groupId>    <artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>    <version>${project.version}</version></dependency>

application.yml配置文件中添加配置:

shenyu:    sync:       http:          url : http://localhost:9095

当网关启动时,配置类HttpSyncDataConfiguration就会执行,加载相应的Bean

/** * Http sync data configuration for spring boot. */@Configuration@ConditionalOnClass(HttpSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")@EnableConfigurationProperties(value = HttpConfig.class)public class HttpSyncDataConfiguration {
  private static final Logger LOGGER = LoggerFactory.getLogger(HttpSyncDataConfiguration.class);
  /**   * Rest template.   * 创建RestTemplate   * @param httpConfig the http config       http配置   * @return the rest template   */  @Bean  public RestTemplate restTemplate(final HttpConfig httpConfig) {    OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();    factory.setConnectTimeout(Objects.isNull(httpConfig.getConnectionTimeout()) ? (int) HttpConstants.CLIENT_POLLING_CONNECT_TIMEOUT : httpConfig.getConnectionTimeout());    factory.setReadTimeout(Objects.isNull(httpConfig.getReadTimeout()) ? (int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT : httpConfig.getReadTimeout());    factory.setWriteTimeout(Objects.isNull(httpConfig.getWriteTimeout()) ? (int) HttpConstants.CLIENT_POLLING_WRITE_TIMEOUT : httpConfig.getWriteTimeout());    return new RestTemplate(factory);  }
  /**   * AccessTokenManager.   * 创建AccessTokenManager,专门用户对admin进行http请求时access token的处理   * @param httpConfig   the http config.         * @param restTemplate the rest template.   * @return the access token manager.   */  @Bean  public AccessTokenManager accessTokenManager(final HttpConfig httpConfig, final RestTemplate restTemplate) {    return new AccessTokenManager(restTemplate, httpConfig);  }
  /**   * Http sync data service.   * 创建 HttpSyncDataService    * @param httpConfig         the http config   * @param pluginSubscriber   the plugin subscriber   * @param restTemplate       the rest template   * @param metaSubscribers    the meta subscribers   * @param authSubscribers    the auth subscribers   * @param accessTokenManager the access token manager   * @return the sync data service   */  @Bean  public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig,                                             final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                             final ObjectProvider<RestTemplate> restTemplate,                                             final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,                                             final ObjectProvider<List<AuthDataSubscriber>> authSubscribers,                                             final ObjectProvider<AccessTokenManager> accessTokenManager) {    LOGGER.info("you use http long pull sync shenyu data");    return new HttpSyncDataService(            Objects.requireNonNull(httpConfig.getIfAvailable()),            Objects.requireNonNull(pluginSubscriber.getIfAvailable()),            Objects.requireNonNull(restTemplate.getIfAvailable()),            metaSubscribers.getIfAvailable(Collections::emptyList),            authSubscribers.getIfAvailable(Collections::emptyList),            Objects.requireNonNull(accessTokenManager.getIfAvailable())    );  }}

HttpSyncDataConfigurationHttp长轮询数据同步的配置类,负责创建HttpSyncDataService(负责http数据同步的具体实现)、RestTemplateAccessTokenManager (负责与adminhttp调用时access token的处理)。它的注解如下:

  • @Configuration:表示这是一个配置类;
  • @ConditionalOnClass(HttpSyncDataService.class):条件注解,表示要有HttpSyncDataService这个类;
  • @ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url"):条件注解,要有shenyu.sync.http.url这个属性配置。
  • @EnableConfigurationProperties(value = HttpConfig.class):表示让HttpConfig上的注解@ConfigurationProperties(prefix = "shenyu.sync.http")生效,将HttpConfig这个配置类注入Ioc容器中。

2.2 属性初始化#

  • HttpSyncDataService

HttpSyncDataService的构造函数中,完成属性初始化。

public class HttpSyncDataService implements SyncDataService {
    // 省略了属性字段......
    public HttpSyncDataService(final HttpConfig httpConfig,                               final PluginDataSubscriber pluginDataSubscriber,                               final RestTemplate restTemplate,                               final List<MetaDataSubscriber> metaDataSubscribers,                               final List<AuthDataSubscriber> authDataSubscribers,                               final AccessTokenManager accessTokenManager) {          // 1.设置accessTokenManager          this.accessTokenManager = accessTokenManager;          // 2.创建数据处理器          this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);          // 3.shenyu-admin的url, 多个用逗号(,)分割          this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));          // 4.只用于http长轮询的restTemplate          this.restTemplate = restTemplate;          // 5.开始执行长轮询任务          this.start();    }
    //......}

上面代码中省略了其他函数和相关字段,在构造函数中完成属性的初始化,主要是:

  • 设置accessTokenManager,定时向admin请求更新accessToken的值。然后每次向admin发起请求时都必须将headerX-Access-Token属性设置成accessToken对应的值;

  • 创建数据处理器,用于后续缓存各种类型的数据(插件、选择器、规则、元数据和认证数据);

  • 获取admin属性配置,主要是获取adminurladmin有可能是集群,多个用逗号(,)分割;

  • 设置RestTemplate,用于向admin发起请求;

  • 开始执行长轮询任务。

2.3 开始长轮询#

  • HttpSyncDataService#start()

start()方法中,干了两件事情,一个是获取全量数据,即请求admin端获取所有需要同步的数据,然后将获取到的数据缓存到网关内存中。另一个是开启多线程执行长轮询任务。

public class HttpSyncDataService implements SyncDataService {        // ......
    private void start() {      // // 只初始化一次,通过原子类实现。       if (RUNNING.compareAndSet(false, true)) {        // 初次启动,获取全量数据        this.fetchGroupConfig(ConfigGroupEnum.values());        // 一个后台服务,一个线程        int threadSize = serverList.size();        // 自定义线程池        this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,                new LinkedBlockingQueue<>(),                ShenyuThreadFactory.create("http-long-polling", true));        // 开始长轮询,一个admin服务,创建一个线程用于数据同步        this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));      } else {        LOG.info("shenyu http long polling was started, executor=[{}]", executor);      }    }      // ......}
2.3.1 获取全量数据#
  • HttpSyncDataService#fetchGroupConfig()

ShenYu将所有需要同步的数据进行了分组,一共有5种数据类型,分别是插件、选择器、规则、元数据和认证数据。

public enum ConfigGroupEnum {    APP_AUTH, // 认证数据    PLUGIN, //插件    RULE, // 规则    SELECTOR, // 选择器    META_DATA; // 元数据}

admin有可能是集群,这里通过循环的方式向每个admin发起请求,有一个执行成功了,那么向admin获取全量数据并缓存到网关的操作就执行成功。如果出现了异常,就向下一个admin发起请求。

public class HttpSyncDataService implements SyncDataService {
  // ......
  private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException {    // admin有可能是集群,这里通过循环的方式向每个admin发起请求    for (int index = 0; index < this.serverList.size(); index++) {      String server = serverList.get(index);      try {        // 真正去执行        this.doFetchGroupConfig(server, groups);        // 有一个成功,就成功了,可以退出循环        break;      } catch (ShenyuException e) {        // 出现异常,尝试执行下一个        // 最后一个也执行失败了,抛出异常        if (index >= serverList.size() - 1) {          throw e;        }        LOG.warn("fetch config fail, try another one: {}", serverList.get(index + 1));      }    }  }
  // ......}
  • HttpSyncDataService#doFetchGroupConfig()

在此方法中,首先拼装请求参数,然后通过httpClient发起请求,到admin中获取数据,最后将获取到的数据更新到网关内存中。

public class HttpSyncDataService implements SyncDataService {  private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {    // 1. 拼请求参数,所有分组枚举类型    StringBuilder params = new StringBuilder();    for (ConfigGroupEnum groupKey : groups) {      params.append("groupKeys").append("=").append(groupKey.name()).append("&");    }    // admin端提供的接口  /configs/fetch    String url = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_FETCH + "?" + StringUtils.removeEnd(params.toString(), "&");    LOG.info("request configs: [{}]", url);    String json;    try {      HttpHeaders headers = new HttpHeaders();      // 设置accessToken      headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());      HttpEntity<String> httpEntity = new HttpEntity<>(headers);      // 2. 发起请求,获取变更数据      json = this.restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class).getBody();    } catch (RestClientException e) {      String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());      LOG.warn(message);      throw new ShenyuException(message, e);    }    // 3. 更新网关内存中数据    boolean updated = this.updateCacheWithJson(json);    if (updated) {      LOG.debug("get latest configs: [{}]", json);      return;    }    // 更新成功,此方法就执行完成了    LOG.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);    // 服务端没有数据更新,就等30s    ThreadUtils.sleep(TimeUnit.SECONDS, 30);  }}

从代码中,可以看到 admin端提供的获取全量数据接口是 /configs/fetch,这里先不进一步深入,放在后文再分析。

获取到admin返回结果数据,并成功更新,那么此方法就执行结束了。如果没有更新成功,那么有可能是服务端没有数据更新,就等待30s

这里需要提前说明一下,网关在判断是否更新成功时,有比对数据的操作,马上就会提到。

  • HttpSyncDataService#updateCacheWithJson()

更新网关内存中的数据。使用GSON进行反序列化,从属性data中拿真正的数据,然后交给DataRefreshFactory去做更新。

    private boolean updateCacheWithJson(final String json) {        // 使用GSON进行反序列化        JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);        // if the config cache will be updated?        return factory.executor(jsonObject.getAsJsonObject("data"));    }
  • DataRefreshFactory#executor()

根据不同数据类型去更新数据,返回更新结果。具体更新逻辑交给了dataRefresh.refresh()方法。在更新结果中,有一种数据类型进行了更新,就表示此次操作发生了更新。

    public boolean executor(final JsonObject data) {        //并行更新数据        List<Boolean> result = ENUM_MAP.values().parallelStream()                .map(dataRefresh -> dataRefresh.refresh(data))                .collect(Collectors.toList());        //有一个更新就表示此次发生了更新操作        return result.stream().anyMatch(Boolean.TRUE::equals);    }
  • AbstractDataRefresh#refresh()

数据更新逻辑采用的是模板方法设计模式,通用操作在抽象方法中完成,不同的实现逻辑由子类完成。5种数据类型具体的更新逻辑有些差异,但是也存在通用的更新逻辑,类图关系如下:

在通用的refresh()方法中,负责数据类型转换,判断是否需要更新,和实际的数据刷新操作。

public abstract class AbstractDataRefresh<T> implements DataRefresh {
  // ......
  @Override  public Boolean refresh(final JsonObject data) {    // 数据类型转换    JsonObject jsonObject = convert(data);    if (Objects.isNull(jsonObject)) {      return false;    }
    boolean updated = false;    // 得到数据类型    ConfigData<T> result = fromJson(jsonObject);    // 是否需要更新    if (this.updateCacheIfNeed(result)) {      updated = true;      // 真正的更新逻辑,数据刷新操作      refresh(result.getData());    }
    return updated;  }
  // ......}
  • AbstractDataRefresh#updateCacheIfNeed()

数据转换的过程,就是根据不同的数据类型进行转换,我们就不再进一步追踪了,看看数据是否需要更新的逻辑。方法名是updateCacheIfNeed(),通过方法重载实现。

public abstract class AbstractDataRefresh<T> implements DataRefresh {
  // ......
  // result是数据  protected abstract boolean updateCacheIfNeed(ConfigData<T> result);
  // newVal是获取到的最新的值  // groupEnum 是哪种数据类型  protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {    // 如果是第一次,那么直接放到cache中,返回 true,表示此次进行了更新    if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {      return true;    }    ResultHolder holder = new ResultHolder(false);    GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {      // md5 值相同,不需要更新      if (StringUtils.equals(oldVal.getMd5(), newVal.getMd5())) {        LOG.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());        return oldVal;      }
      // 当前缓存的数据修改时间大于 新来的数据,不需要更新      // must compare the last update time      if (oldVal.getLastModifyTime() >= newVal.getLastModifyTime()) {        LOG.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum);        return oldVal;      }      LOG.info("update {} config: {}", groupEnum, newVal);      holder.result = true;      return newVal;    });    return holder.result;  }
  // ......}

从上面的源码中可以看到,有两种情况不需要更新:

  • 两个的数据的md5 值相同,不需要更新;
  • 当前缓存的数据修改时间大于 新来的数据,不需要更新。

其他情况需要更新数据。

分析到这里,就将start() 方法中初次启动,获取全量数据的逻辑分析完了,接下来是长轮询的操作。为了方便,我将start()方法再粘贴一次:

public class HttpSyncDataService implements SyncDataService {
  // ......
  private void start() {    // // 只初始化一次,通过原子类实现。     if (RUNNING.compareAndSet(false, true)) {      // 初次启动,获取全量数据      this.fetchGroupConfig(ConfigGroupEnum.values());      // 一个后台服务,一个线程      int threadSize = serverList.size();      // 自定义线程池      this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,              new LinkedBlockingQueue<>(),              ShenyuThreadFactory.create("http-long-polling", true));      // 开始长轮询,一个admin服务,创建一个线程用于数据同步      this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));    } else {      LOG.info("shenyu http long polling was started, executor=[{}]", executor);    }  }
  // ......}
2.3.2 执行长轮询任务#
  • HttpLongPollingTask#run()

长轮询任务是HttpLongPollingTask,它实现了Runnable接口,任务逻辑在run()方法中。通过while()循环实现不断执行任务,即长轮询。在每一次的轮询中有三次重试逻辑,一次轮询任务失败了,等 5s 再继续,3 次都失败了,等5 分钟再试。

开始长轮询,一个admin服务,创建一个线程用于数据同步。

class HttpLongPollingTask implements Runnable {
  private final String server;
  HttpLongPollingTask(final String server) {    this.server = server;  }
  @Override  public void run() {    // 一直轮询    while (RUNNING.get()) {      // 默认重试 3 次      int retryTimes = 3;      for (int time = 1; time <= retryTimes; time++) {        try {          doLongPolling(server);        } catch (Exception e) {          if (time < retryTimes) {            LOG.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",                    time, retryTimes - time, e.getMessage());            // 长轮询失败了,等 5s 再继续            ThreadUtils.sleep(TimeUnit.SECONDS, 5);            continue;          }          LOG.error("Long polling failed, try again after 5 minutes!", e);          // 3 次都失败了,等 5 分钟再试          ThreadUtils.sleep(TimeUnit.MINUTES, 5);        }      }    }    LOG.warn("Stop http long polling.");  }}
  • HttpSyncDataService#doLongPolling()

执行长轮询任务的核心逻辑:

  • 根据数据类型组装请求参数:md5lastModifyTime
  • 组装请求头和请求体;
  • admin发起请求,判断组数据是否发生变更;
  • 根据发生变更的组,再去获取数据。
public class HttpSyncDataService implements SyncDataService {  private void doLongPolling(final String server) {    // 组装请求参数:md5 和 lastModifyTime    MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);    for (ConfigGroupEnum group : ConfigGroupEnum.values()) {      ConfigData<?> cacheConfig = factory.cacheConfigData(group);      if (cacheConfig != null) {        String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));        params.put(group.name(), Lists.newArrayList(value));      }    }    // 组装请求头和请求体    HttpHeaders headers = new HttpHeaders();    headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);    // 设置accessToken    headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());    HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, headers);    String listenerUrl = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_LISTENER;
    JsonArray groupJson;    //向admin发起请求,判断组数据是否发生变更    //这里只是判断了某个组是否发生变更    try {      String json = this.restTemplate.postForEntity(listenerUrl, httpEntity, String.class).getBody();      LOG.info("listener result: [{}]", json);      JsonObject responseFromServer = GsonUtils.getGson().fromJson(json, JsonObject.class);      groupJson = responseFromServer.getAsJsonArray("data");    } catch (RestClientException e) {      String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());      throw new ShenyuException(message, e);    }    // 根据发生变更的组,再去获取数据    /**     * 官网对此处的解释:     * 网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。     * 这里可能会存在一个疑问:为什么不是直接将变更的数据写出?     * 我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,     * 或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。     *     * 个人理解:     * 如果将变更数据直接写出,当管理员频繁更新配置时,第一次更新了,将client移除阻塞队列,返回响应信息给网关。     * 如果这个时候进行了第二次更新,那么当前的client是不在阻塞队列中,所以这一次的变更就会错过。     * 网关层处理不及时,也是同理。     * 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。     * 如果admin有数据变更,当前网关client是没有在阻塞队列中,就不到数据。     */    if (Objects.nonNull(groupJson) && groupJson.size() > 0) {      // fetch group configuration async.      ConfigGroupEnum[] changedGroups = GsonUtils.getGson().fromJson(groupJson, ConfigGroupEnum[].class);      LOG.info("Group config changed: {}", Arrays.toString(changedGroups));      this.doFetchGroupConfig(server, changedGroups);    }  }}

这里需要特别解释一点的是:在长轮询任务中,为什么不直接拿到变更的数据?而是先判断哪个分组数据发生了变更,然后再次请求admin,获取变更数据?

官网对此处的解释是:

网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。 这里可能会存在一个疑问:为什么不是直接将变更的数据写出? 我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时, 或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。

个人理解是:

如果将变更数据直接写出,管理员频繁更新配置时,第一次更新了,将client移除阻塞队列,返回响应信息给网关。如果这个时候进行了第二次更新,那么当前的client是不在阻塞队列中,所以这一次的变更就会错过。网关层处理不及时,也是同理。 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。如果admin有数据变更,当前网关client是没有在阻塞队列中,就会更新不到数据。

我们还没有分析到admin端的处理逻辑,先大概说一下。在admin端,会将网关client放到阻塞队列,有数据变更,网关client就会出队列,发送变更数据。所以,如果有数据变更时,网关client不在阻塞队列,那么就无法得到当前变更的数据。

知道哪个分组数据发生变更时,主动再向admin获取变更的数据,根据分组不同,全量拿数据。调用方法是doFetchGroupConfig(),这个在前面已经分析过了。

分析到这里,网关端的数据同步操作就完成了。长轮询任务就是不断向admin发起请求,看看数据是否发生变更,如果有分组数据发生变更,那么就再主动向admin发起请求,获取变更数据,然后更新网关内存中的数据。

网关端长轮询任务流程:

3. admin数据同步#

从前面分析的过程中,可以看到,网关端主要调用admin的两个接口:

  • /configs/listener:判断组数据是否发生变更;
  • /configs/fetch:获取变更组数据。

直接从这两个接口分析的话,可能有的地方不好理解,所以我们还是从admin启动流程开始分析数据同步过程。

3.1 加载配置#

如果在配置文件application.yml中,进行了如下配置,就表示通过http长轮询的方式进行数据同步。

shenyu:  sync:      http:        enabled: true

程序启动时,通过springboot条件装配实现数据同步类的配置加载。在这个过程中,会创建HttpLongPollingDataChangedListener,负责处理长轮询的相关实现逻辑。

/** * 数据同步配置类 * 通过springboot条件装配实现 * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {
    /**     * http长轮询     * http long polling.     */    @Configuration    @ConditionalOnProperty(name = "shenyu.sync.http.enabled", havingValue = "true")    @EnableConfigurationProperties(HttpSyncProperties.class)    static class HttpLongPollingListener {
        @Bean        @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)        public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {            return new HttpLongPollingDataChangedListener(httpSyncProperties);        }    }}

3.2 数据变更监听器实例化#

  • HttpLongPollingDataChangedListener

数据变更监听器通过构造函数的方式完成实例化和初始化操作。在构造函数中会创建阻塞队列,用于存放客户端;创建线程池,用于执行延迟任务,周期任务;保存长轮询相关属性信息。

    public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {        // 默认客户端(这里是网关)1024个        this.clients = new ArrayBlockingQueue<>(1024);        // 创建线程池        // ScheduledThreadPoolExecutor 可以执行延迟任务,周期任务,普通任务        this.scheduler = new ScheduledThreadPoolExecutor(1,                ShenyuThreadFactory.create("long-polling", true));        // 长轮询的属性信息        this.httpSyncProperties = httpSyncProperties;    }

另外,它的类图关系如下:

实现了InitializingBean接口,所以在bean的初始化过程中执行afterInitialize()方法。通过线程池执行周期任务:更新内存中(CACHE)的数据每隔5分钟执行一次,5分钟后开始执行。刷新本地缓存就是从数据库读取数据到本地缓存(这里就是内存),通过refreshLocalCache()完成。

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
  // ......
  /**   * 在 InitializingBean接口中的afterPropertiesSet()方法中被调用,即在bean的初始化过程中执行   */  @Override  protected void afterInitialize() {    long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();    // 执行周期任务:更新内存中(CACHE)的数据每隔5分钟执行一次,5分钟后开始执行    // 防止admin先启动一段时间后,产生了数据;然后网关初次连接时,没有拿到全量数据    scheduler.scheduleWithFixedDelay(() -> {      LOG.info("http sync strategy refresh config start.");      try {        // 从数据库读取数据到本地缓存(这里就是内存)        this.refreshLocalCache();        LOG.info("http sync strategy refresh config success.");      } catch (Exception e) {        LOG.error("http sync strategy refresh config error!", e);      }    }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);    LOG.info("http sync strategy refresh interval: {}ms", syncInterval);  }
  // ......}
  • refreshLocalCache()

分别对5种数据类型进行更新。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
  // ......
  // 从数据库读取数据到本地缓存(这里就是内存)  private void refreshLocalCache() {    //更新认证数据    this.updateAppAuthCache();    //更新插件数据    this.updatePluginCache();    //更新规则数据    this.updateRuleCache();    //更新选择器数据    this.updateSelectorCache();    //更新元数据    this.updateMetaDataCache();  }
  // ......}

5个更新方法的逻辑是类似的,调用service方法获取数据,然后放到内存CACHE中。以更新规则数据方法updateRuleCache()为例,传入规则枚举类型,调用ruleService.listAll()从数据库获取所有规则数据。

    /**     * Update rule cache.     */    protected void updateRuleCache() {        this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll());    }
  • updateCache()

使用数据库中的数据更新内存中的数据。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
  // ......
  // 缓存数据的 Map  protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
  /**   * if md5 is not the same as the original, then update lcoal cache.   * 更新缓存中的数据   * @param group ConfigGroupEnum   * @param <T> the type of class   * @param data the new config data   */  protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {    //数据序列化    String json = GsonUtils.getInstance().toJson(data);    //传入md5值和修改时间    ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());    //更新分组数据    ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);    LOG.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);  }
  // ......}

初始化的过程就是启动周期性任务,定时从数据库获取数据更新内存数据。

接下来开始对两个接口开始分析:

  • /configs/listener:判断组数据是否发生变更;
  • /configs/fetch:获取变更组数据。

3.3 数据变更轮询接口#

  • /configs/listener:判断组数据是否发生变更;

接口类是ConfigController,只有使用http长轮询进行数据同步时才会生效。接口方法listener()没有其他逻辑,直接调用doLongPolling()方法。

   /** * This Controller only when HttpLongPollingDataChangedListener exist, will take effect. */@ConditionalOnBean(HttpLongPollingDataChangedListener.class)@RestController@RequestMapping("/configs")public class ConfigController {
    private final HttpLongPollingDataChangedListener longPollingListener;
    public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {      this.longPollingListener = longPollingListener;    }        // 省略其他逻辑
    /**     * Listener.     * 监听数据变更,执行长轮询     * @param request  the request     * @param response the response     */    @PostMapping(value = "/listener")    public void listener(final HttpServletRequest request, final HttpServletResponse response) {        longPollingListener.doLongPolling(request, response);    }
}
  • HttpLongPollingDataChangedListener#doLongPolling()

执行长轮询任务:如果有数据变更,将会立即响应给客户端(这里就是网关端)。否则,客户端会一直被阻塞,直到有数据变更或者超时。

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
  // ......
  /**   * 执行长轮询:如果有数据变更,会立即响应给客户端(这里就是网关端)。   * 否则,否则客户端会一直被阻塞,直到有数据变更或者超时。   * @param request   * @param response   */  public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {    // compare group md5    // 比较md5,判断网关的数据和admin端的数据是否一致,得到发生变更的数据组    List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);    String clientIp = getRemoteIp(request);    // response immediately.    // 有变更的数据,则立即向网关响应    if (CollectionUtils.isNotEmpty(changedGroup)) {      this.generateResponse(response, changedGroup);      Log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);      return;    }
    // 没有变更,则将客户端(这里就是网关)放进阻塞队列    final AsyncContext asyncContext = request.startAsync();    asyncContext.setTimeout(0L);    scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));  }
  // ......}
  • HttpLongPollingDataChangedListener#compareChangedGroup()

判断组数据是否发生变更,判断逻辑是比较网关端和admin端的md5值和lastModifyTime

  • 如果md5值不一样,那么需要更新;
  • 如果admin端的lastModifyTime大于网关端的lastModifyTime,那么需要更新。
 /**     * 判断组数据是否发生变更     * @param request     * @return     */    private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {        List<ConfigGroupEnum> changedGroup = new ArrayList<>(ConfigGroupEnum.values().length);        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {            // 网关端数据的md5值和lastModifyTime            String[] params = StringUtils.split(request.getParameter(group.name()), ',');            if (params == null || params.length != 2) {                throw new ShenyuException("group param invalid:" + request.getParameter(group.name()));            }            String clientMd5 = params[0];            long clientModifyTime = NumberUtils.toLong(params[1]);            ConfigDataCache serverCache = CACHE.get(group.name());            // do check. 判断组数据是否发生变更            if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {                changedGroup.add(group);            }        }        return changedGroup;    }
  • LongPollingClient

没有变更数据,则将客户端(这里就是网关)放进阻塞队列。阻塞时间是60秒,即60秒后移除,并响应客户端。

class LongPollingClient implements Runnable {      // 省略了其他逻辑            @Override        public void run() {            try {                // 先设置定时任务:60秒后移除,并响应客户端                this.asyncTimeoutFuture = scheduler.schedule(() -> {                    clients.remove(LongPollingClient.this);                    List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());                    sendResponse(changedGroups);                }, timeoutTime, TimeUnit.MILLISECONDS);
                // 添加到阻塞队列                clients.add(this);
            } catch (Exception ex) {                log.error("add long polling client error", ex);            }        }
        /**         * Send response.         *         * @param changedGroups the changed groups         */        void sendResponse(final List<ConfigGroupEnum> changedGroups) {            // cancel scheduler            if (null != asyncTimeoutFuture) {                asyncTimeoutFuture.cancel(false);            }            // 响应变更的组            generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);            asyncContext.complete();        }    }

3.4 获取变更数据接口#

  • /configs/fetch:获取变更数据;

根据网关传入的参数,获取分组数据,返回结果。主要实现方法是longPollingListener.fetchConfig()


@ConditionalOnBean(HttpLongPollingDataChangedListener.class)@RestController@RequestMapping("/configs")public class ConfigController {
    private final HttpLongPollingDataChangedListener longPollingListener;      public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {      this.longPollingListener = longPollingListener;    }
    /**     * Fetch configs shenyu result.     * 全量获取分组数据     * @param groupKeys the group keys     * @return the shenyu result     */    @GetMapping("/fetch")    public ShenyuAdminResult fetchConfigs(@NotNull final String[] groupKeys) {        Map<String, ConfigData<?>> result = Maps.newHashMap();        for (String groupKey : groupKeys) {            ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));            result.put(groupKey, data);        }        return ShenyuAdminResult.success(ShenyuResultMessage.SUCCESS, result);    }      // 省略了其他接口
}
  • AbstractDataChangedListener#fetchConfig()

数据获取直接从CACHE中拿,然后根据不同分组类型进行匹配,封装。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {  /**   * fetch configuration from cache.   * 获取分组下的全量数据   * @param groupKey the group key   * @return the configuration data   */  public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {    // 直接从 CACHE 中拿数据    ConfigDataCache config = CACHE.get(groupKey.name());    switch (groupKey) {      case APP_AUTH: // 认证数据        return buildConfigData(config, AppAuthData.class);      case PLUGIN: // 插件数据        return buildConfigData(config, PluginData.class);      case RULE:   // 规则数据        return buildConfigData(config, RuleData.class);      case SELECTOR:  // 选择器数据        return buildConfigData(config, SelectorData.class);      case META_DATA: // 元数据        return buildConfigData(config, MetaData.class);      default:  // 其他类型,抛出异常        throw new IllegalStateException("Unexpected groupKey: " + groupKey);    }  }}

3.5 数据变更#

在之前的websocket数据同步和zookeeper数据同步源码分析文章中,我们知道admin端数据同步设计结构如下:

各种数据变更监听器都是DataChangedListener的子类。

当在admin端修改数据后,通过Spring的事件处理机制,发送事件通知。发送逻辑如下:


/** * Event forwarders, which forward the changed events to each ConfigEventListener. * 数据变更事件分发器:当admin端有数据发生变更时,将变更数据同步到 ShenYu 网关 * 数据变更依赖于Spring的事件监听机制:ApplicationEventPublisher --> ApplicationEvent --> ApplicationListener * */@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
   //省略了其他逻辑
    /**     * 有数据变更时,调用此方法     * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                case APP_AUTH: // 认证信息                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // 插件信息                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // 规则信息                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    // 当选择器数据更新时,更新API文档信息                    applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // 元数据                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // 其他类型,抛出异常                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }}

假设,对插件信息进行了修改,通过http长轮询的方式进行数据同步,那么listener.onPluginChanged()的实际调用的是org.apache.shenyu.admin.listener.AbstractDataChangedListener#onPluginChanged

    /**     * 在admin的操作,有插件发生了更新     * @param changed   the changed     * @param eventType the event type     */    @Override    public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {        if (CollectionUtils.isEmpty(changed)) {            return;        }        // 更新内存CACHE        this.updatePluginCache();        // 执行变更任务        this.afterPluginChanged(changed, eventType);    }

有两个处理操作,一是更新内存CACHE,这个在前面分析过了;另一个是执行变更任务,在线程池中执行。

  • HttpLongPollingDataChangedListener#afterPluginChanged()
    @Override    protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {        // 在线程池中执行        scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));    }
  • DataChangeTask

数据变更任务:将阻塞队列中的客户端依次移除,并发送响应,通知网关有组数据发生变更。

class DataChangeTask implements Runnable {        //省略了其他逻辑         @Override        public void run() {            // 阻塞队列中的客户端超过了给定的值100,则分批执行            if (clients.size() > httpSyncProperties.getNotifyBatchSize()) {                List<LongPollingClient> targetClients = new ArrayList<>(clients.size());                clients.drainTo(targetClients);                List<List<LongPollingClient>> partitionClients = Lists.partition(targetClients, httpSyncProperties.getNotifyBatchSize());               // 分批执行                partitionClients.forEach(item -> scheduler.execute(() -> doRun(item)));            } else {                // 执行任务                doRun(clients);            }        }
        private void doRun(final Collection<LongPollingClient> clients) {            // 通知所有客户端发生了数据变更            for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {                LongPollingClient client = iter.next();                iter.remove();                // 发送响应                client.sendResponse(Collections.singletonList(groupKey));                LOG.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);            }        }    }

至此,admin端数据同步逻辑就分析完了。在基于http长轮询数据同步是,它主要有三个功能:

  • 提供数据变更监听接口;
  • 提供获取变更数据接口;
  • 有数据变更时,移除阻塞队列中的客户端,并响应结果。

最后,用三张图描述下admin端长轮询任务流程:

  • /configs/listener数据变更监听接口:

  • /configs/fetch获取变更数据接口:

  • 在admin后台管理系统更新数据,进行数据同步:

4. 总结#

本文主要对ShenYu网关中的http长轮询数据同步进行了源码分析。涉及到的主要知识点如下:

  • http长轮询由网关端主动发起请求,不断请求admin端;
  • 变更数据以组为粒度(认证信息、插件、选择器、规则、元数据);
  • http长轮询结果只拿到了变更组,还需要再次发起请求获取组数据;
  • 数据是否更新由md5值和修改时间lastModifyTime决定。

注册中心实现原理之Http注册

· One min read
Apache ShenYu Committer

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,注册中心是用于将客户端信息注册到shenyu-adminadmin再通过数据同步将这些信息同步到网关,网关通过这些数据完成流量筛选。客户端信息主要包括接口信息URI信息

本文基于shenyu-2.5.0版本进行源码分析,官网的介绍请参考 客户端接入原理

1. 注册中心原理#

当客户端启动时,读取接口信息和uri信息,通过指定的注册类型,将数据发送到shenyu-admin

图中的注册中心需要用户指定使用哪种注册类型,ShenYu当前支持HttpZookeeperEtcdConsulNacos进行注册。具体如何配置请参考 客户端接入配置

ShenYu在注册中心的原理设计上引入了DisruptorDisruptor队列在其中起到数据与操作解耦,利于扩展。如果注册请求过多,导致注册异常,也有数据缓冲作用。

如图所示,注册中心分为两个部分,一是注册中心客户端register-client,负责处理客户端数据读取。另一个是注册中心服务端register-server,负责处理服务端(就是shenyu-admin)数据写入。通过指定注册类型进行数据发送和接收。

  • 客户端:通常来说就是一个微服务,可以是springmvcspring-clouddubbogrpc等。
  • register-client:注册中心客户端,读取客户接口和uri信息。
  • Disruptor:数据与操作解耦,数据缓冲作用。
  • register-server:注册中心服务端,这里就是shenyu-admin,接收数据,写入数据库,发数据同步事件。
  • 注册类型:指定注册类型,完成数据注册,当前支持HttpZookeeperEtcdConsulNacos

本文分析的是使用Http的方式进行注册,所以具体的处理流程如下:

在客户端,数据出队列后,通过http传输数据,在服务端,提供相应的接口,接收数据,然后写入队列。

2. 客户端注册流程#

当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。官方提供的例子是一个由springboot构建的微服务。注册中心的相关配置可以参考官网 客户端接入配置

2.1 加载配置,读取属性#

先用一张图串联下注册中心客户端初始化流程:

我们分析的是通过http的方式进行注册,所以需要进行如下配置:

shenyu:  register:    registerType: http    serverLists: http://localhost:9095  props:    username: admin    password: 123456  client:    http:        props:          contextPath: /http          appName: http          port: 8189            isFull: false

每个属性表示的含义如下:

  • registerType: 服务注册类型,填写 http
  • serverList: 为http注册类型时,填写Shenyu-Admin项目的地址,注意加上http://,多个地址用英文逗号分隔。
  • username: Shenyu-Admin用户名
  • password: Shenyu-Admin用户对应的密码
  • port: 你本项目的启动端口,目前springmvc/tars/grpc需要进行填写。
  • contextPath: 为你的这个mvc项目在shenyu网关的路由前缀, 比如/order/product 等等,网关会根据你的这个前缀来进行路由。
  • appName:你的应用名称,不配置的话,会默认取 spring.application.name 的值。
  • isFull: 设置 true 代表代理你的整个服务,false表示代理你其中某几个controller;目前适用于springmvc/springcloud

项目启动后,会先加载配置文件,读取属性信息,生成相应的Bean

首先读取到的配置文件是 ShenyuSpringMvcClientConfiguration,它是shenyu 客户端http注册配置类,通过@Configuration表示这是一个配置类,通过@ImportAutoConfiguration引入其他配置类。创建SpringMvcClientEventListener,主要处理元数据和 URI 信息。

/** * shenyu 客户端http注册配置类 */@Configuration@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")public class ShenyuSpringMvcClientConfiguration {       // 创建SpringMvcClientEventListener,主要处理元数据和URI信息   @Bean   public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,                                                                     final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {       return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);   }}

ShenyuClientCommonBeanConfigurationshenyu客户端通用配置类,会创建注册中心客户端通用的bean

  • 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
  • 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置。
  • 创建ShenyuClientConfig,读取shenyu.client属性配置。

/** * shenyu客户端通用配置类 */@Configurationpublic class ShenyuClientCommonBeanConfiguration {       // 创建ShenyuClientRegisterRepository,通过工厂类创建而成。    @Bean    public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {        return ShenyuClientRegisterRepositoryFactory.newInstance(config);    }        // 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置    @Bean    @ConfigurationProperties(prefix = "shenyu.register")    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {        return new ShenyuRegisterCenterConfig();    }      // 创建ShenyuClientConfig,读取shenyu.client属性配置    @Bean    @ConfigurationProperties(prefix = "shenyu")    public ShenyuClientConfig shenyuClientConfig() {        return new ShenyuClientConfig();    }}

2.2 用于注册的 HttpClientRegisterRepository#

上面的配置文件中生成的ShenyuClientRegisterRepository是客户端注册的具体实现,它是一个接口,它的实现类如下。

  • HttpClientRegisterRepository:通过http进行注册;
  • ConsulClientRegisterRepository:通过Consul进行注册;
  • EtcdClientRegisterRepository:通过Etcd进行注册;
  • NacosClientRegisterRepository:通过nacos进行注册;
  • ZookeeperClientRegisterRepository通过Zookeeper进行注册。

具体是哪一种方式,是通过SPI进行加载实现的,实现逻辑如下:


/** * 加载 ShenyuClientRegisterRepository */public final class ShenyuClientRegisterRepositoryFactory {        private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();        /**     * 创建 ShenyuClientRegisterRepository     */    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {            // 通过SPI的方式进行加载,类型由registerType决定            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());            //执行初始化操作            result.init(shenyuRegisterCenterConfig);            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);            return result;        }        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());    }}

加载类型通过registerType指定,也就是我们在配置文件中指定的类型:

shenyu:  register:    registerType: http    serverLists: http://localhost:9095

我们指定的是http,所以会去加载HttpClientRegisterRepository。对象创建成功后,执行的初始化方法init()如下:

@Joinpublic class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {        @Override    public void init(final ShenyuRegisterCenterConfig config) {        this.username = config.getProps().getProperty(Constants.USER_NAME);        this.password = config.getProps().getProperty(Constants.PASS_WORD);        this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));        this.setAccessToken();    }    // 暂时省略其他逻辑}

读取配置文件中的usernamepasswordserverLists,即sheenyu-admin的访问账号、密码和地址信息,为后续数据发送做准备。类注解@Join用于SPI的加载。

SPI 全称为 Service Provider Interface, 是 JDK 内置的一种服务提供发现功能, 一种动态替换发现的机制。

shenyu-spiApache ShenYu网关自定义的SPI扩展实现,设计和实现原理参考了DubboSPI扩展实现

2.3 构建 元数据 和 URI信息 的 SpringMvcClientEventListener#

创建 SpringMvcClientEventListener,负责客户端 元数据URI 数据的构建和注册,它的创建是在配置文件中完成。

@Configuration@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")public class ShenyuSpringMvcClientConfiguration {     // ......        //  创建 SpringMvcClientEventListener    @Bean    public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,                                                                      final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);    }}
  • SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener

AbstractContextRefreshedEventListener是一个抽象类,它实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。它的实现目前有八种,每一种表示对应的RPC调用协议的 元数据URI 信息的注册。

  • AlibabaDubboServiceBeanListener:处理使用Alibaba Dubbo协议;
  • ApacheDubboServiceBeanListener:处理使用Apacge Dubbo协议;
  • GrpcClientEventListener:处理使用grpc协议;
  • MotanServiceEventListener:处理使用Mortan协议;
  • SofaServiceEventListener:处理使用Sofa协议;
  • SpringMvcClientEventListener:处理使用http协议;
  • SpringWebSocketClientEventListener:处理使用websocket协议;
  • TarsServiceBeanEventListener:处理使用Tars注册类型;
// 实现了ApplicationListener接口public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {
     //......
    //构造函数    public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,                                                 final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        // 读取 shenyu.client.http 配置信息        Properties props = clientConfig.getProps();        // appName 应用名称        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);        // contextPath上下文路径        this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");        if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {            String errorMsg = "client register param must config the appName or contextPath";            LOG.error(errorMsg);            throw new ShenyuClientIllegalArgumentException(errorMsg);        }        this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);        // host信息        this.host = props.getProperty(ShenyuClientConstants.HOST);        // port 客户端端口信息        this.port = props.getProperty(ShenyuClientConstants.PORT);        // 开始事件发布        publisher.start(shenyuClientRegisterRepository);    }
    // 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行    @Override    public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {        //保证该方法的内容只执行一次        if (!registered.compareAndSet(false, true)) {            return;        }        final ApplicationContext context = event.getApplicationContext();        // 获取声明RPC调用的类        Map<String, T> beans = getBeans(context);        if (MapUtils.isEmpty(beans)) {            return;        }        // 构建URI数据并注册        publisher.publishEvent(buildURIRegisterDTO(context, beans));        // 构建元数据并注册        beans.forEach(this::handle);    }
    // 交给不同的子类实现    @SuppressWarnings("all")    protected abstract URIRegisterDTO buildURIRegisterDTO(ApplicationContext context,                                                          Map<String, T> beans);            protected void handle(final String beanName, final T bean) {        Class<?> clazz = getCorrectedClass(bean);        // 获取当前bean的对应shenyu客户端的注解(对应不同的RPC调用注解不一样,像http的就是@ShenyuSpringMvcClient,而像SpringCloud的则是@ShenyuSpringCloudClient)        final A beanShenyuClient = AnnotatedElementUtils.findMergedAnnotation(clazz, getAnnotationType());        // 根据bean获取对应的path(不同子类实现不一样)        final String superPath = buildApiSuperPath(clazz, beanShenyuClient);        // 如果包含Shenyu客户端注解或者path中包括'*',表示注册整个类的接口        if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {            // 构建类的元数据,发送注册事件            handleClass(clazz, bean, beanShenyuClient, superPath);            return;        }        // 获取当前bean的所有方法        final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);        // 遍历方法        for (Method method : methods) {            // 注册符合条件的方法            handleMethod(bean, clazz, beanShenyuClient, method, superPath);        }    }
    // 构建类元数据并注册的默认实现    protected void handleClass(final Class<?> clazz,                               final T bean,                               @NonNull final A beanShenyuClient,                               final String superPath) {        publisher.publishEvent(buildMetaDataDTO(bean, beanShenyuClient, pathJoin(contextPath, superPath), clazz, null));    }
    // 构建方法元数据并注册的默认实现    protected void handleMethod(final T bean,                                final Class<?> clazz,                                @Nullable final A beanShenyuClient,                                final Method method,                                final String superPath) {        // 如果方法上有Shenyu客户端注解,就表示该方法需要注册        A methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, getAnnotationType());        if (Objects.nonNull(methodShenyuClient)) {            // 构建元数据,发送注册事件            publisher.publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));        }    }
    // 交给不同子类实现    protected abstract MetaDataRegisterDTO buildMetaDataDTO(T bean,                                                            @NonNull A shenyuClient,                                                            String path,                                                            Class<?> clazz,                                                            Method method);}

在构造函数中主要是读取属性配置。

shenyu:  client:    http:      props:        contextPath: /http        appName: http        port: 8189        isFull: false

最后,执行了publisher.start(),开始事件发布,为注册做准备。

  • ShenyuClientRegisterEventPublisher

ShenyuClientRegisterEventPublisher通过单例模式实现,主要是生成元数据和URI订阅器(后续用于数据发布),然后启动Disruptor队列。提供了一个共有方法publishEvent(),发布事件,向Disruptor队列发数据。


public class ShenyuClientRegisterEventPublisher {    // 私有变量    private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
    private DisruptorProviderManage<DataTypeParent> providerManage;        /**     * 公开静态方法     *     * @return ShenyuClientRegisterEventPublisher instance     */    public static ShenyuClientRegisterEventPublisher getInstance() {        return INSTANCE;    }        /**     * Start方法执行     *     * @param shenyuClientRegisterRepository shenyuClientRegisterRepository     */    public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        // 创建客户端注册工厂类        RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();        // 添加元数据订阅器        factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));        //  添加URI订阅器        factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));        // 启动Disruptor队列        providerManage = new DisruptorProviderManage(factory);        providerManage.startup();    }        /**     * 发布事件,向Disruptor队列发数据     *     * @param data the data     */    public <T> void publishEvent(final DataTypeParent data) {        DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();        provider.onData(data);    }}

AbstractContextRefreshedEventListener的构造函数逻辑分析完成了,主要是读取属性配置,创建元数据URI订阅器,启动Disruptor队列。

onApplicationEvent()方法是有Spring事件发生时会执行,这里的参数是ContextRefreshedEvent,表示上下文刷新事件。当Spring容器就绪后执行此处逻辑:先构建URI数据并注册,再构建元数据并注册,

ContextRefreshedEventSpring内置事件。ApplicationContext被初始化或刷新时,该事件被触发。这也可以在 ConfigurableApplicationContext接口中使用 refresh() 方法来发生。此处的初始化是指:所有的Bean被成功装载,后处理Bean被检测并激活,所有Singleton Bean 被预实例化,ApplicationContext容器已就绪可用。

再来看AbstractContextRefreshedEventListener的http实现SpringMvcClientEventListener

public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {        private final List<Class<? extends Annotation>> mappingAnnotation = new ArrayList<>(3);        private final Boolean isFull;        private final String protocol;        // 构造函数    public SpringMvcClientEventListener(final PropertiesConfig clientConfig,                                        final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        super(clientConfig, shenyuClientRegisterRepository);        Properties props = clientConfig.getProps();        // 获取 isFull        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));        // 表示是http协议的实现        this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);        mappingAnnotation.add(ShenyuSpringMvcClient.class);        mappingAnnotation.add(RequestMapping.class);    }        @Override    protected Map<String, Object> getBeans(final ApplicationContext context) {        // 配置属性,如果 isFull=true 的话,表示注册整个微服务        if (Boolean.TRUE.equals(isFull)) {            getPublisher().publishEvent(MetaDataRegisterDTO.builder()                    .contextPath(getContextPath())                    .appName(getAppName())                    .path(PathUtils.decoratorPathWithSlash(getContextPath()))                    .rpcType(RpcTypeEnum.HTTP.getName())                    .enabled(true)                    .ruleName(getContextPath())                    .build());            return null;        }        // 否则获取带Controller注解的bean        return context.getBeansWithAnnotation(Controller.class);    }        // 构造URI数据    @Override    protected URIRegisterDTO buildURIRegisterDTO(final ApplicationContext context,                                                 final Map<String, Object> beans) {        // ...    }        @Override    protected String buildApiSuperPath(final Class<?> clazz, @Nullable final ShenyuSpringMvcClient beanShenyuClient) {        // 如果有带上Shenyu客户端注解,则优先取注解中的不为空的path属性        if (Objects.nonNull(beanShenyuClient) && StringUtils.isNotBlank(beanShenyuClient.path())) {            return beanShenyuClient.path();        }        // 如果有带上RequestMapping注解,且path属性不为空,则返回path数组的第一个值        RequestMapping requestMapping = AnnotationUtils.findAnnotation(clazz, RequestMapping.class);        if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {            return requestMapping.path()[0];        }        return "";    }        // 声明http实现的客户端注解是ShenyuSpringMvcClient    @Override    protected Class<ShenyuSpringMvcClient> getAnnotationType() {        return ShenyuSpringMvcClient.class;    }        @Override    protected void handleMethod(final Object bean, final Class<?> clazz,                                @Nullable final ShenyuSpringMvcClient beanShenyuClient,                                final Method method, final String superPath) {        // 获取当前bean的RequestMapping注解        final RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(method, RequestMapping.class);        // 获取当前bean的 ShenyuSpringMvcClient 注解        ShenyuSpringMvcClient methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, ShenyuSpringMvcClient.class);        methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;        //如果有 ShenyuSpringMvcClient 注解并且包含RequestMapping注解(表示是一个接口),则进行注册        if (Objects.nonNull(methodShenyuClient) && Objects.nonNull(requestMapping)) {            getPublisher().publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));        }    }        //...        // 构造元数据    @Override    protected MetaDataRegisterDTO buildMetaDataDTO(final Object bean,                                                   @NonNull final ShenyuSpringMvcClient shenyuClient,                                                   final String path, final Class<?> clazz,                                                   final Method method) {        //...    }}

注册逻辑都是通过 publisher.publishEvent()完成。

Controller注解和RequestMapping注解是由Spring提供的,这个大家应该很熟悉,不过多赘述。ShenyuSpringMvcClient 注解是由Apache ShenYu提供的,用于注册SpringMvc客户端,它的定义如下:


/** * shenyu 客户端接口,用于方法上或类上 */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})public @interface ShenyuSpringMvcClient {
    // path 注册路径    @AliasFor(attribute = "path")    String value() default "";        // path 注册路径    @AliasFor(attribute = "value")    String path();        // ruleName 规则名称    String ruleName() default "";        // desc 描述信息    String desc() default "";
    // enabled是否启用    boolean enabled() default true;        // registerMetaData 注册元数据    boolean  registerMetaData() default false;}

它的使用如下:

  • 注册整个接口
@RestController@RequestMapping("/test")@ShenyuSpringMvcClient(path = "/test/**")  // 表示整个接口注册public class HttpTestController { //......}
  • 注册当前方法
@RestController@RequestMapping("/order")@ShenyuSpringMvcClient(path = "/order")public class OrderController {
    /**     * Save order dto.     *     * @param orderDTO the order dto     * @return the order dto     */    @PostMapping("/save")    @ShenyuSpringMvcClient(path = "/save", desc = "Save order") // 注册当前方法    public OrderDTO save(@RequestBody final OrderDTO orderDTO) {        orderDTO.setName("hello world save order");        return orderDTO;    }}
  • publisher.publishEvent() 发布注册事件

该方法会将数据发送到Disruptor队列中,关于Disruptor队列更多细节这里不做更多介绍,这不影响分析注册的流程。

当数据发送后,Disruptor队列的消费者会处理数据,进行消费。

  • QueueConsumer 消费数据

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;
public interface WorkHandler<T> {    void onEvent(T event) throws Exception;}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。


/** *  * 队列消费者 */public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {        // 省略了其他逻辑
    @Override    public void onEvent(final DataEvent<T> t) {        if (t != null) {            // 根据事件类型使用不同的线程池            ThreadPoolExecutor executor = orderly(t);            // 通过工厂创建队列消费任务            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();            // 保存数据            queueConsumerExecutor.setData(t.getData());            // help gc            t.setData(null);            // 放在线程池中执行 消费任务            executor.execute(queueConsumerExecutor);        }    }}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务(服务端就是admin,在下文进行分析)。

  • RegisterClientConsumerExecutor 消费者执行器

重写的run()逻辑如下:


public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {        //...... 
    @Override    public void run() {        // 获取数据        final T data = getData();        // 根据数据类型调用相应的处理器进行处理        subscribers.get(data.getType()).executor(Lists.newArrayList(data));    }    }

根据不同的数据类型调用不同的处理器去执行相应的任务。数据类型有两种,一个是元数据,记录客户端注册信息。一个是URI数据,记录客户端服务信息。

//数据类型public enum DataType {   // 元数据    META_DATA,       // URI数据    URI,}
  • ExecutorSubscriber#executor() 执行器订阅者

执行器订阅者也分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

先看元数据处理

  • ShenyuClientMetadataExecutorSubscriber#executor()

客户端这边对元数据处理逻辑是:遍历元数据信息,调用接口方法persistInterface()完成数据的发布。

public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {       //......        @Override    public DataType getType() {        return DataType.META_DATA; // 元数据    }        @Override    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {        for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {            // 调用接口方法persistInterface()完成数据的发布            shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);        }    }}
  • ShenyuClientRegisterRepository#persistInterface()

ShenyuClientRegisterRepository是一个接口,用于表示客户端数据注册,它的实现类目前有五种,每一种就表示一种注册方法。

  • ConsulClientRegisterRepository:通过Consul实现客户端注册;
  • EtcdClientRegisterRepository:通过Etcd实现客户端注册;
  • HttpClientRegisterRepository:通过Http实现客户端注册;
  • NacosClientRegisterRepository:通过Nacos实现客户端注册;
  • ZookeeperClientRegisterRepository:通过Zookeeper实现客户端注册;

从图中可以看出,注册中心的加载是通过SPI的方式完成的。这个在前面提到过了,在客户端通用配置文件中,通过指定配置文件中的属性完成具体的类加载。


/** * 加载 ShenyuClientRegisterRepository */public final class ShenyuClientRegisterRepositoryFactory {        private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();        /**     * 创建 ShenyuClientRegisterRepository     */    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {            // 通过SPI的方式进行加载,类型由registerType决定            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());            //执行初始化操作            result.init(shenyuRegisterCenterConfig);            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);            return result;        }        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());    }}

本文的源码分析是基于Http的方式进行注册,所以我们先分析HttpClientRegisterRepository,其他的注册方式后续再分析。HttpClientRegisterRepository继承了FailbackRegistryRepository,而FailbackRegistryRepository本身主要用于对Http注册过程中的失败异常的处理,这里就省略了。

通过http的方式注册很简单,就是调用工具类发送http请求。注册元数据和URI都是调用的同一个方法doRegister(),指定接口和类型就好。

  • Constants.URI_PATH的值/shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • Constants.META_PATH的值/shenyu-client/register-uri: 服务端提供的接口用于注册URI。
@Joinpublic class HttpClientRegisterRepository extends FailbackRegistryRepository {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientRegisterRepository.class);
    private static URIRegisterDTO uriRegisterDTO;
    private String username;
    private String password;
    private List<String> serverList;
    private String accessToken;        public HttpClientRegisterRepository() {    }        public HttpClientRegisterRepository(final ShenyuRegisterCenterConfig config) {        init(config);    }
    @Override    public void init(final ShenyuRegisterCenterConfig config) {        // admin的用户名        this.username = config.getProps().getProperty(Constants.USER_NAME);        // admin的用户名对应的密码        this.password = config.getProps().getProperty(Constants.PASS_WORD);        // admin服务列表        this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));        // 设置访问的token        this.setAccessToken();    }
    /**     * Persist uri.     *     * @param registerDTO the register dto     */    @Override    public void doPersistURI(final URIRegisterDTO registerDTO) {        if (RuntimeUtils.listenByOther(registerDTO.getPort())) {            return;        }        doRegister(registerDTO, Constants.URI_PATH, Constants.URI);        uriRegisterDTO = registerDTO;    }
    @Override    public void doPersistInterface(final MetaDataRegisterDTO metadata) {        doRegister(metadata, Constants.META_PATH, Constants.META_TYPE);    }
    @Override    public void close() {        if (uriRegisterDTO != null) {            uriRegisterDTO.setEventType(EventType.DELETED);            doRegister(uriRegisterDTO, Constants.URI_PATH, Constants.URI);        }    }
    private void setAccessToken() {        for (String server : serverList) {            try {                Optional<?> login = RegisterUtils.doLogin(username, password, server.concat(Constants.LOGIN_PATH));                login.ifPresent(v -> this.accessToken = String.valueOf(v));            } catch (Exception e) {                LOGGER.error("Login admin url :{} is fail, will retry. cause: {} ", server, e.getMessage());            }        }    }
    private <T> void doRegister(final T t, final String path, final String type) {        int i = 0;        // 遍历admin服务列表(admin可能是集群)        for (String server : serverList) {            i++;            String concat = server.concat(path);            try {                // 设置访问token                if (StringUtils.isBlank(accessToken)) {                    this.setAccessToken();                    if (StringUtils.isBlank(accessToken)) {                        throw new NullPointerException("accessToken is null");                    }                }                // 调用工具类发送 http 请求                RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);                return;            } catch (Exception e) {                LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());                if (i == serverList.size()) {                    throw new RuntimeException(e);                }            }        }    }}

将数据序列化后,通过OkHttp发送数据。


public final class RegisterUtils {      //...... 
    // 通过OkHttp发送数据    public static void doRegister(final String json, final String url, final String type) throws IOException {        if (!StringUtils.hasLength(accessToken)) {            LOGGER.error("{} client register error accessToken is null, please check the config : {} ", type, json);            return;        }        Headers headers = new Headers.Builder().add(Constants.X_ACCESS_TOKEN, accessToken).build();        String result = OkHttpTools.getInstance().post(url, json, headers);        if (Objects.equals(SUCCESS, result)) {            LOGGER.info("{} client register success: {} ", type, json);        } else {            LOGGER.error("{} client register error: {} ", type, json);        }    }}

至此,客户端通过http的方式注册元数据的逻辑就分析完了。小结一下:通过读取自定义的注解信息构造元数据,将数据发到Disruptor队列,然后从队列中消费数据,将消费者放到线程池中去执行,最终通过发送http请求到admin

再来看看 URI 数据的处理

  • ShenyuClientURIExecutorSubscriber#executor()

主要逻辑是遍历URI数据集合,通过persistURI()方法实现数据注册。


public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {        //......        @Override    public DataType getType() {        return DataType.URI; //数据类型是URI    }        // 注册URI数据    @Override    public void executor(final Collection<URIRegisterDTO> dataList) {        for (URIRegisterDTO uriRegisterDTO : dataList) {            Stopwatch stopwatch = Stopwatch.createStarted();            while (true) {                try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {                    break;                } catch (IOException e) {                    long sleepTime = 1000;                    // maybe the port is delay exposed                    if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {                        LOG.error("host:{}, port:{} connection failed, will retry",                                uriRegisterDTO.getHost(), uriRegisterDTO.getPort());                        // If the connection fails for a long time, Increase sleep time                        if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {                            sleepTime = 10000;                        }                    }                    try {                        TimeUnit.MILLISECONDS.sleep(sleepTime);                    } catch (InterruptedException ex) {                        ex.printStackTrace();                    }                }            }            //添加hook,优雅停止客户端             ShenyuClientShutdownHook.delayOtherHooks();                        // 注册URI            shenyuClientRegisterRepository.persistURI(uriRegisterDTO);        }    }}

代码中的while(true)循环是为了保证客户端已经成功启动了,通过hostport可以连接上。

后面的逻辑是:添加hook函数,用于优雅停止客户端 。

通过persistURI()方法实现数据注册。整个逻辑也在前面分析过了,最终就是通过OkHttp客户端向shenyu-admin发起http,通过http的方式注册URI

分析到这里就将客户端的注册逻辑分析完了,将构建的元数据和URI数据发送到Disruptor队列,再从中消费,读取数据,通过httpadmin发送数据。

客户端元数据URI注册流程的源码分析完成了,流程图如下:

3. 服务端注册流程#

3.1 注册接口ShenyuClientHttpRegistryController#

从前面的分析可以知道,服务端提供了注册的两个接口:

  • /shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • /shenyu-client/register-uri: 服务端提供的接口用于注册URI。

这两个接口位于ShenyuClientHttpRegistryController中,它实现了ShenyuClientServerRegisterRepository接口,是服务端注册的实现类。它用@Join标记,表示通过SPI进行加载。

// shenuyu客户端接口@RequestMapping("/shenyu-client")@Joinpublic class ShenyuClientHttpRegistryController implements ShenyuClientServerRegisterRepository {
    private ShenyuClientServerRegisterPublisher publisher;
    @Override    public void init(final ShenyuClientServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {        this.publisher = publisher;    }
    @Override    public void close() {        publisher.close();    }        // 注册元数据    @PostMapping("/register-metadata")    @ResponseBody    public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {        publisher.publish(metaDataRegisterDTO);        return ShenyuResultMessage.SUCCESS;    }           // 注册URI    @PostMapping("/register-uri")    @ResponseBody    public String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {        publisher.publish(uriRegisterDTO);        return ShenyuResultMessage.SUCCESS;    }}

两个注册接口获取到数据好,就调用了publisher.publish()方法,把数据发布到Disruptor队列中。

  • ShenyuClientServerRegisterRepository接口

ShenyuClientServerRegisterRepository接口是服务注册接口,它有五个实现类,表示有五种注册方式:

  • ConsulClientServerRegisterRepository:通过Consul实现注册;
  • EtcdClientServerRegisterRepository:通过Etcd实现注册;
  • NacosClientServerRegisterRepository:通过Nacos实现注册;
  • ShenyuClientHttpRegistryController:通过Http实现注册;
  • ZookeeperClientServerRegisterRepository:通过Zookeeper实现注册。

具体用哪一种方式,是通过配置文件指定的,然后通过SPI进行加载。

shenyu-admin中的application.yml文件中配置注册方式,registerType指定注册类型,当用http进行注册时,serverLists不需要填写,更多配置说明可以参考官网 客户端接入配置

shenyu:  register:    registerType: http     serverLists:
  • RegisterCenterConfiguration 加载配置

在引入相关依赖和属性配置后,启动shenyu-admin时,会先加载配置文件,和注册中心相关的配置文件类是RegisterCenterConfiguration

// 注册中心配置类@Configurationpublic class RegisterCenterConfiguration {    // 读取配置属性    @Bean    @ConfigurationProperties(prefix = "shenyu.register")    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {        return new ShenyuRegisterCenterConfig();    }        //创建ShenyuServerRegisterRepository,用于服务端注册    @Bean(destroyMethod = "close")    public ShenyuServerRegisterRepository shenyuServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, final List<ShenyuClientRegisterService> shenyuClientRegisterService) {        // 1.从配置属性中获取注册类型        String registerType = shenyuRegisterCenterConfig.getRegisterType();        // 2.通过注册类型,以SPI的方法加载实现类        ShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);        // 3.获取publisher,向Disruptor队列中写数据        RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();        // 4.注册Service, rpcType -> registerService        Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));        // 5.事件发布的准备工作        publisher.start(registerServiceMap);        // 6.注册的初始化操作        registerRepository.init(publisher, shenyuRegisterCenterConfig);        return registerRepository;    }}

在配置类中生成了两个bean

  • shenyuRegisterCenterConfig:读取属性配置;

  • shenyuClientServerRegisterRepository:用于服务端注册。

在创建shenyuClientServerRegisterRepository的过程中,也进行了一系列的准备工作:

  • 1.从配置属性中获取注册类型。
  • 2.通过注册类型,以SPI的方法加载实现类:比如指定的类型是http,就会加载ShenyuClientHttpRegistryController
  • 3.获取publisher,向Disruptor队列中写数据。
  • 4.注册ServicerpcType -> registerService:获取注册的Service,每种rpc都有对应的Service。本文的客户端构建是通过springboot,属于http类型,还有其他客户端类型:dubboSpring CloudgRPC等。
  • 5.事件发布的准备工作:添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。
  • 6.注册的初始化操作:http类型的注册初始化操作就是保存publisher
  • RegisterServerDisruptorPublisher#publish()

服务端向Disruptor队列写入数据的发布者 ,通过单例模式构建。


public class RegisterClientServerDisruptorPublisher implements ShenyuClientServerRegisterPublisher {    //私有属性    private static final RegisterClientServerDisruptorPublisher INSTANCE = new RegisterClientServerDisruptorPublisher();
    private DisruptorProviderManage<Collection<DataTypeParent>> providerManage;
    //公开静态方法获取实例    public static RegisterServerDisruptorPublisher getInstance() {        return INSTANCE;    }       //事件发布的准备工作,添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。    public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {        //服务端注册工厂        RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();        //添加URI数据订阅器        factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));        //添加元数据订阅器        factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));        //启动Disruptor队列        providerManage = new DisruptorProviderManage(factory);        providerManage.startup();    }        // 向队列中写入数据    @Override    public void publish(final DataTypeParent data) {        DisruptorProvider<Collection<DataTypeParent>> provider = providerManage.getProvider();        provider.onData(Collections.singleton(data));    }
    // 批量向队列中写入数据    @Override    public void publish(final Collection<? extends DataTypeParent> dataList) {        DisruptorProvider<Collection<DataTypeParent>> provider = providerManage.getProvider();        provider.onData(dataList.stream().map(DataTypeParent.class::cast).collect(Collectors.toList()));    }        @Override    public void close() {        providerManage.getProvider().shutdown();    }}

配置文件的加载,可看作是注册中心服务端初始化流程,用图描述如下:

3.2 消费数据QueueConsumer#

在前面分析了客户端disruptor队列消费数据的过。服务端也是一样的逻辑,只是其中执行任务的执行者变了。

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;
public interface WorkHandler<T> {    void onEvent(T event) throws Exception;}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。

/** *  * 队列消费者 */public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {        // 省略了其他逻辑
    @Override    public void onEvent(final DataEvent<T> t) {        if (t != null) {            // 根据事件类型获取相应的线程池            ThreadPoolExecutor executor = orderly(t);            // 通过工厂创建队列消费任务            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();            // 保存数据            queueConsumerExecutor.setData(t.getData());            // help gc            t.setData(null);            // 放在线程池中执行 消费任务            executor.execute(queueConsumerExecutor);        }    }}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务。

  • RegisterServerConsumerExecutor#run()

RegisterServerConsumerExecutor是服务端消费者执行器,它通过QueueConsumerExecutor间接实现了Runnable接口,并重写了run()方法。


public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<Collection<DataTypeParent>> {   // ...
    @Override    public void run() {        //获取从disruptor队列中拿到的数据        Collection<DataTypeParent> results = getData()                .stream()                .filter(this::isValidData)                .collect(Collectors.toList());        if (CollectionUtils.isEmpty(results)) {            return;        }        //根据类型执行操作        selectExecutor(results).executor(results);    }
    // 根据类型获取订阅者    private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {        final Optional<DataTypeParent> first = list.stream().findFirst();        return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());    }}
  • ExecutorSubscriber#executor()

执行器订阅者分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

  • MetadataExecutorSubscriber#executor()

如果是注册元数据,则通过MetadataExecutorSubscriber#executor()实现:根据类型获取注册Service,调用register()

public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {     //......
    @Override    public DataType getType() {        return DataType.META_DATA;  // 元数据类型    }
    @Override    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {        // 遍历元数据列表        metaDataRegisterDTOList.forEach(meta -> {            Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType())) // 根据类型获取注册Service                    .ifPresent(shenyuClientRegisterService -> {                        // 对元数据进行注册,加锁确保顺序执行,防止并发错误                        synchronized (shenyuClientRegisterService) {                            shenyuClientRegisterService.register(meta);                        }                    });        });    }}
  • URIRegisterExecutorSubscriber#executor()

如果是注册元数据,则通过URIRegisterExecutorSubscriber#executor()实现:构建URI数据,根据注册类型查找Service,通过registerURI方法实现注册。


public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {    //......        @Override    public DataType getType() {        return DataType.URI; // URI数据类型    }        @Override    public void executor(final Collection<URIRegisterDTO> dataList) {        if (CollectionUtils.isEmpty(dataList)) {            return;        }        // 根据rpc调用类型聚集数据        final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream()                .filter(data -> StringUtils.isNotBlank(data.getRpcType()))                .collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));        for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {            final String rpcType = entry.getKey();            // 根据类型查找Service            Optional.ofNullable(shenyuClientRegisterService.get(rpcType))                    .ifPresent(service -> {                        final List<URIRegisterDTO> list = entry.getValue();                        // 构建URI数据类型,通过registerURI方法实现注册                        Map<String, List<URIRegisterDTO>> listMap = buildData(list);                        listMap.forEach(service::registerURI);                    });        }    }}
  • ShenyuClientRegisterService#register()

ShenyuClientRegisterService是注册方法接口,它有多个实现类:

  • AbstractContextPathRegisterService:抽象类,处理部分公共逻辑;
  • AbstractShenyuClientRegisterServiceImpl::抽象类,处理部分公共逻辑;
  • ShenyuClientRegisterDivideServiceImpldivide类,处理http注册类型;
  • ShenyuClientRegisterDubboServiceImpldubbo类,处理dubbo注册类型;
  • ShenyuClientRegisterGrpcServiceImplgRPC类,处理gRPC注册类型;
  • ShenyuClientRegisterMotanServiceImplMotan类,处理Motan注册类型;
  • ShenyuClientRegisterSofaServiceImplSofa类,处理Sofa注册类型;
  • ShenyuClientRegisterSpringCloudServiceImplSpringCloud类,处理SpringCloud注册类型;
  • ShenyuClientRegisterTarsServiceImplTars类,处理Tars注册类型;
  • ShenyuClientRegisterWebSocketServiceImplWebsocket类,处理Websocket注册类型;

从上面可以看出每种微服务都有对应的注册实现类,本文的源码分析是 以官方提供的 shenyu-examples-http 为例,是属http注册类型,所以元数据和URI数据的注册实现类是 ShenyuClientRegisterDivideServiceImpl

  • register(): 注册元数据
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {        //......        public String register(final MetaDataRegisterDTO dto) {        // 1.注册选择器信息        String selectorHandler = selectorHandler(dto);        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);        // 2.注册规则信息        String ruleHandler = ruleHandler();        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        ruleService.registerDefault(ruleDTO);        // 3.注册元数据信息        registerMetadata(dto);        // 4.注册contextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }}

整个注册逻辑可以分为4个步骤:

  • 1.注册选择器信息
  • 2.注册规则信息
  • 3.注册元数据信息
  • 4.注册contextPath

admin这一侧通过客户端的元数据信息需要构建选择器、规则、元数据和ContextPath。具体的注册过程和细节处理跟rpc类型有关。我们就不再继续向下追踪了,对于注册中心的逻辑分析,跟踪到这里就够了。

服务端元数据注册流程的源码分析完了,流程图描述如下:

  • registerURI(): 注册URI数据
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {
    //......        public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {        if (CollectionUtils.isEmpty(uriList)) {            return "";        }        // 对应的选择器是否存在        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        if (Objects.isNull(selectorDO)) {            throw new ShenyuException("doRegister Failed to execute,wait to retry.");        }        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());        // 处理选择器中的handler信息        String handler = buildHandle(validUriList, selectorDO);        if (handler != null) {            selectorDO.setHandle(handler);            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));            selectorData.setHandle(handler);            // 更新数据库中的记录            selectorService.updateSelective(selectorDO);            // 发布事件            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));        }        return ShenyuResultMessage.SUCCESS;    }}

admin拿到URI数据后,主要是更新选择器中的handler信息,然后写入到数据库,最后发布事件通知网关。通知网关的逻辑是由数据同步操作完成,这在之前的文章中已经分析过了,就不再赘述。

服务端URI注册流程的源码分析完成了,用图描述如下:

至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor队列,再从中消费数据,根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler

4. 总结#

本文主要对Apache ShenYu网关中的http注册模块进行了源码分析。涉及到的主要知识点,归纳如下:

  • 注册中心是为了将客户端信息注册到admin,方便流量筛选;
  • http注册是将客户端元数据信息和URI信息注册到admin
  • http服务的接入通过注解@ShenyuSpringMvcClient标识;
  • 注册信息的构建主要通过Spring应用监听器ApplicationListener
  • 注册类型的加载通过SPI完成;
  • 引入Disruptor队列是为了数据与操作解耦,以及数据缓冲。
  • 注册中心的实现采用了面向接口编程,使用模板方法、单例、观察者等设计模式。