Skip to main content

One post tagged with "http"

View All Tags

· One min read

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于Http长轮询的数据同步源码分析。

本文基于shenyu-2.5.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. Http长轮询

这里直接引用官网的相关描述:

ZookeeperWebSocket 数据同步的机制比较简单,而 Http长轮询则比较复杂。 Apache ShenYu 借鉴了 ApolloNacos 的设计思想,取其精华,自己实现了 Http长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

Http长轮询 机制如上所示,Apache ShenYu网关主动请求 shenyu-admin 的配置服务,读取超时时间为 90s,意味着网关层请求配置服务最多会等待 90s,这样便于 shenyu-admin 配置服务及时响应变更数据,从而实现准实时推送。

Http长轮询 机制是由网关主动请求 shenyu-admin ,所以这次的源码分析,我们从网关这一侧开始。

2. 网关数据同步

2.1 加载配置

Http长轮询 数据同步配置的加载是通过spring bootstarter机制,当我们引入相关依赖和在配置文件中有如下配置时,就会加载。

pom文件中引入依赖:

<!--shenyu data sync start use http-->
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>
<version>${project.version}</version>
</dependency>

application.yml配置文件中添加配置:

shenyu:
sync:
http:
url : http://localhost:9095

当网关启动时,配置类HttpSyncDataConfiguration就会执行,加载相应的Bean

/**
* Http sync data configuration for spring boot.
*/
@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")
@EnableConfigurationProperties(value = HttpConfig.class)
public class HttpSyncDataConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpSyncDataConfiguration.class);

/**
* Rest template.
* 创建RestTemplate
* @param httpConfig the http config http配置
* @return the rest template
*/
@Bean
public RestTemplate restTemplate(final HttpConfig httpConfig) {
OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
factory.setConnectTimeout(Objects.isNull(httpConfig.getConnectionTimeout()) ? (int) HttpConstants.CLIENT_POLLING_CONNECT_TIMEOUT : httpConfig.getConnectionTimeout());
factory.setReadTimeout(Objects.isNull(httpConfig.getReadTimeout()) ? (int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT : httpConfig.getReadTimeout());
factory.setWriteTimeout(Objects.isNull(httpConfig.getWriteTimeout()) ? (int) HttpConstants.CLIENT_POLLING_WRITE_TIMEOUT : httpConfig.getWriteTimeout());
return new RestTemplate(factory);
}

/**
* AccessTokenManager.
* 创建AccessTokenManager,专门用户对admin进行http请求时access token的处理
* @param httpConfig the http config.
* @param restTemplate the rest template.
* @return the access token manager.
*/
@Bean
public AccessTokenManager accessTokenManager(final HttpConfig httpConfig, final RestTemplate restTemplate) {
return new AccessTokenManager(restTemplate, httpConfig);
}

/**
* Http sync data service.
* 创建 HttpSyncDataService
* @param httpConfig the http config
* @param pluginSubscriber the plugin subscriber
* @param restTemplate the rest template
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @param accessTokenManager the access token manager
* @return the sync data service
*/
@Bean
public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig,
final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<RestTemplate> restTemplate,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
final ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
final ObjectProvider<AccessTokenManager> accessTokenManager) {
LOGGER.info("you use http long pull sync shenyu data");
return new HttpSyncDataService(
Objects.requireNonNull(httpConfig.getIfAvailable()),
Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
Objects.requireNonNull(restTemplate.getIfAvailable()),
metaSubscribers.getIfAvailable(Collections::emptyList),
authSubscribers.getIfAvailable(Collections::emptyList),
Objects.requireNonNull(accessTokenManager.getIfAvailable())
);
}
}

HttpSyncDataConfigurationHttp长轮询数据同步的配置类,负责创建HttpSyncDataService(负责http数据同步的具体实现)、RestTemplateAccessTokenManager (负责与adminhttp调用时access token的处理)。它的注解如下:

  • @Configuration:表示这是一个配置类;
  • @ConditionalOnClass(HttpSyncDataService.class):条件注解,表示要有HttpSyncDataService这个类;
  • @ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url"):条件注解,要有shenyu.sync.http.url这个属性配置。
  • @EnableConfigurationProperties(value = HttpConfig.class):表示让HttpConfig上的注解@ConfigurationProperties(prefix = "shenyu.sync.http")生效,将HttpConfig这个配置类注入Ioc容器中。

2.2 属性初始化

  • HttpSyncDataService

HttpSyncDataService的构造函数中,完成属性初始化。

public class HttpSyncDataService implements SyncDataService {

// 省略了属性字段......

public HttpSyncDataService(final HttpConfig httpConfig,
final PluginDataSubscriber pluginDataSubscriber,
final RestTemplate restTemplate,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers,
final AccessTokenManager accessTokenManager) {
// 1.设置accessTokenManager
this.accessTokenManager = accessTokenManager;
// 2.创建数据处理器
this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
// 3.shenyu-admin的url, 多个用逗号(,)分割
this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
// 4.只用于http长轮询的restTemplate
this.restTemplate = restTemplate;
// 5.开始执行长轮询任务
this.start();
}

//......
}

上面代码中省略了其他函数和相关字段,在构造函数中完成属性的初始化,主要是:

  • 设置accessTokenManager,定时向admin请求更新accessToken的值。然后每次向admin发起请求时都必须将headerX-Access-Token属性设置成accessToken对应的值;

  • 创建数据处理器,用于后续缓存各种类型的数据(插件、选择器、规则、元数据和认证数据);

  • 获取admin属性配置,主要是获取adminurladmin有可能是集群,多个用逗号(,)分割;

  • 设置RestTemplate,用于向admin发起请求;

  • 开始执行长轮询任务。

2.3 开始长轮询

  • HttpSyncDataService#start()

start()方法中,干了两件事情,一个是获取全量数据,即请求admin端获取所有需要同步的数据,然后将获取到的数据缓存到网关内存中。另一个是开启多线程执行长轮询任务。

public class HttpSyncDataService implements SyncDataService {

// ......

private void start() {
// // 只初始化一次,通过原子类实现。
if (RUNNING.compareAndSet(false, true)) {
// 初次启动,获取全量数据
this.fetchGroupConfig(ConfigGroupEnum.values());
// 一个后台服务,一个线程
int threadSize = serverList.size();
// 自定义线程池
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ShenyuThreadFactory.create("http-long-polling", true));
// 开始长轮询,一个admin服务,创建一个线程用于数据同步
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
LOG.info("shenyu http long polling was started, executor=[{}]", executor);
}
}

// ......
}
2.3.1 获取全量数据
  • HttpSyncDataService#fetchGroupConfig()

ShenYu将所有需要同步的数据进行了分组,一共有5种数据类型,分别是插件、选择器、规则、元数据和认证数据。

public enum ConfigGroupEnum {
APP_AUTH, // 认证数据
PLUGIN, //插件
RULE, // 规则
SELECTOR, // 选择器
META_DATA; // 元数据
}

admin有可能是集群,这里通过循环的方式向每个admin发起请求,有一个执行成功了,那么向admin获取全量数据并缓存到网关的操作就执行成功。如果出现了异常,就向下一个admin发起请求。

public class HttpSyncDataService implements SyncDataService {

// ......

private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException {
// admin有可能是集群,这里通过循环的方式向每个admin发起请求
for (int index = 0; index < this.serverList.size(); index++) {
String server = serverList.get(index);
try {
// 真正去执行
this.doFetchGroupConfig(server, groups);
// 有一个成功,就成功了,可以退出循环
break;
} catch (ShenyuException e) {
// 出现异常,尝试执行下一个
// 最后一个也执行失败了,抛出异常
if (index >= serverList.size() - 1) {
throw e;
}
LOG.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
}
}
}

// ......
}
  • HttpSyncDataService#doFetchGroupConfig()

在此方法中,首先拼装请求参数,然后通过httpClient发起请求,到admin中获取数据,最后将获取到的数据更新到网关内存中。

public class HttpSyncDataService implements SyncDataService {
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
// 1. 拼请求参数,所有分组枚举类型
StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
// admin端提供的接口 /configs/fetch
String url = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_FETCH + "?" + StringUtils.removeEnd(params.toString(), "&");
LOG.info("request configs: [{}]", url);
String json;
try {
HttpHeaders headers = new HttpHeaders();
// 设置accessToken
headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());
HttpEntity<String> httpEntity = new HttpEntity<>(headers);
// 2. 发起请求,获取变更数据
json = this.restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class).getBody();
} catch (RestClientException e) {
String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
LOG.warn(message);
throw new ShenyuException(message, e);
}
// 3. 更新网关内存中数据
boolean updated = this.updateCacheWithJson(json);
if (updated) {
LOG.debug("get latest configs: [{}]", json);
return;
}
// 更新成功,此方法就执行完成了
LOG.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
// 服务端没有数据更新,就等30s
ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}
}

从代码中,可以看到 admin端提供的获取全量数据接口是 /configs/fetch,这里先不进一步深入,放在后文再分析。

获取到admin返回结果数据,并成功更新,那么此方法就执行结束了。如果没有更新成功,那么有可能是服务端没有数据更新,就等待30s

这里需要提前说明一下,网关在判断是否更新成功时,有比对数据的操作,马上就会提到。

  • HttpSyncDataService#updateCacheWithJson()

更新网关内存中的数据。使用GSON进行反序列化,从属性data中拿真正的数据,然后交给DataRefreshFactory去做更新。

    private boolean updateCacheWithJson(final String json) {
// 使用GSON进行反序列化
JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
// if the config cache will be updated?
return factory.executor(jsonObject.getAsJsonObject("data"));
}
  • DataRefreshFactory#executor()

根据不同数据类型去更新数据,返回更新结果。具体更新逻辑交给了dataRefresh.refresh()方法。在更新结果中,有一种数据类型进行了更新,就表示此次操作发生了更新。

    public boolean executor(final JsonObject data) {
//并行更新数据
List<Boolean> result = ENUM_MAP.values().parallelStream()
.map(dataRefresh -> dataRefresh.refresh(data))
.collect(Collectors.toList());
//有一个更新就表示此次发生了更新操作
return result.stream().anyMatch(Boolean.TRUE::equals);
}
  • AbstractDataRefresh#refresh()

数据更新逻辑采用的是模板方法设计模式,通用操作在抽象方法中完成,不同的实现逻辑由子类完成。5种数据类型具体的更新逻辑有些差异,但是也存在通用的更新逻辑,类图关系如下:

在通用的refresh()方法中,负责数据类型转换,判断是否需要更新,和实际的数据刷新操作。

public abstract class AbstractDataRefresh<T> implements DataRefresh {

// ......

@Override
public Boolean refresh(final JsonObject data) {
// 数据类型转换
JsonObject jsonObject = convert(data);
if (Objects.isNull(jsonObject)) {
return false;
}

boolean updated = false;
// 得到数据类型
ConfigData<T> result = fromJson(jsonObject);
// 是否需要更新
if (this.updateCacheIfNeed(result)) {
updated = true;
// 真正的更新逻辑,数据刷新操作
refresh(result.getData());
}

return updated;
}

// ......
}
  • AbstractDataRefresh#updateCacheIfNeed()

数据转换的过程,就是根据不同的数据类型进行转换,我们就不再进一步追踪了,看看数据是否需要更新的逻辑。方法名是updateCacheIfNeed(),通过方法重载实现。

public abstract class AbstractDataRefresh<T> implements DataRefresh {

// ......

// result是数据
protected abstract boolean updateCacheIfNeed(ConfigData<T> result);

// newVal是获取到的最新的值
// groupEnum 是哪种数据类型
protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
// 如果是第一次,那么直接放到cache中,返回 true,表示此次进行了更新
if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
return true;
}
ResultHolder holder = new ResultHolder(false);
GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
// md5 值相同,不需要更新
if (StringUtils.equals(oldVal.getMd5(), newVal.getMd5())) {
LOG.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());
return oldVal;
}

// 当前缓存的数据修改时间大于 新来的数据,不需要更新
// must compare the last update time
if (oldVal.getLastModifyTime() >= newVal.getLastModifyTime()) {
LOG.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum);
return oldVal;
}
LOG.info("update {} config: {}", groupEnum, newVal);
holder.result = true;
return newVal;
});
return holder.result;
}

// ......
}

从上面的源码中可以看到,有两种情况不需要更新:

  • 两个的数据的md5 值相同,不需要更新;
  • 当前缓存的数据修改时间大于 新来的数据,不需要更新。

其他情况需要更新数据。

分析到这里,就将start() 方法中初次启动,获取全量数据的逻辑分析完了,接下来是长轮询的操作。为了方便,我将start()方法再粘贴一次:

public class HttpSyncDataService implements SyncDataService {

// ......

private void start() {
// // 只初始化一次,通过原子类实现。
if (RUNNING.compareAndSet(false, true)) {
// 初次启动,获取全量数据
this.fetchGroupConfig(ConfigGroupEnum.values());
// 一个后台服务,一个线程
int threadSize = serverList.size();
// 自定义线程池
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ShenyuThreadFactory.create("http-long-polling", true));
// 开始长轮询,一个admin服务,创建一个线程用于数据同步
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
LOG.info("shenyu http long polling was started, executor=[{}]", executor);
}
}

// ......
}
2.3.2 执行长轮询任务
  • HttpLongPollingTask#run()

长轮询任务是HttpLongPollingTask,它实现了Runnable接口,任务逻辑在run()方法中。通过while()循环实现不断执行任务,即长轮询。在每一次的轮询中有三次重试逻辑,一次轮询任务失败了,等 5s 再继续,3 次都失败了,等5 分钟再试。

开始长轮询,一个admin服务,创建一个线程用于数据同步。

class HttpLongPollingTask implements Runnable {

private final String server;

HttpLongPollingTask(final String server) {
this.server = server;
}

@Override
public void run() {
// 一直轮询
while (RUNNING.get()) {
// 默认重试 3 次
int retryTimes = 3;
for (int time = 1; time <= retryTimes; time++) {
try {
doLongPolling(server);
} catch (Exception e) {
if (time < retryTimes) {
LOG.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
time, retryTimes - time, e.getMessage());
// 长轮询失败了,等 5s 再继续
ThreadUtils.sleep(TimeUnit.SECONDS, 5);
continue;
}
LOG.error("Long polling failed, try again after 5 minutes!", e);
// 3 次都失败了,等 5 分钟再试
ThreadUtils.sleep(TimeUnit.MINUTES, 5);
}
}
}
LOG.warn("Stop http long polling.");
}
}
  • HttpSyncDataService#doLongPolling()

执行长轮询任务的核心逻辑:

  • 根据数据类型组装请求参数:md5lastModifyTime
  • 组装请求头和请求体;
  • admin发起请求,判断组数据是否发生变更;
  • 根据发生变更的组,再去获取数据。
public class HttpSyncDataService implements SyncDataService {
private void doLongPolling(final String server) {
// 组装请求参数:md5 和 lastModifyTime
MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
ConfigData<?> cacheConfig = factory.cacheConfigData(group);
if (cacheConfig != null) {
String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
params.put(group.name(), Lists.newArrayList(value));
}
}
// 组装请求头和请求体
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
// 设置accessToken
headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());
HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, headers);
String listenerUrl = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_LISTENER;

JsonArray groupJson;
//向admin发起请求,判断组数据是否发生变更
//这里只是判断了某个组是否发生变更
try {
String json = this.restTemplate.postForEntity(listenerUrl, httpEntity, String.class).getBody();
LOG.info("listener result: [{}]", json);
JsonObject responseFromServer = GsonUtils.getGson().fromJson(json, JsonObject.class);
groupJson = responseFromServer.getAsJsonArray("data");
} catch (RestClientException e) {
String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
throw new ShenyuException(message, e);
}
// 根据发生变更的组,再去获取数据
/**
* 官网对此处的解释:
* 网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。
* 这里可能会存在一个疑问:为什么不是直接将变更的数据写出?
* 我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,
* 或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。
*
* 个人理解:
* 如果将变更数据直接写出,当管理员频繁更新配置时,第一次更新了,将client移除阻塞队列,返回响应信息给网关。
* 如果这个时候进行了第二次更新,那么当前的client是不在阻塞队列中,所以这一次的变更就会错过。
* 网关层处理不及时,也是同理。
* 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。
* 如果admin有数据变更,当前网关client是没有在阻塞队列中,就不到数据。
*/
if (Objects.nonNull(groupJson) && groupJson.size() > 0) {
// fetch group configuration async.
ConfigGroupEnum[] changedGroups = GsonUtils.getGson().fromJson(groupJson, ConfigGroupEnum[].class);
LOG.info("Group config changed: {}", Arrays.toString(changedGroups));
this.doFetchGroupConfig(server, changedGroups);
}
}
}

这里需要特别解释一点的是:在长轮询任务中,为什么不直接拿到变更的数据?而是先判断哪个分组数据发生了变更,然后再次请求admin,获取变更数据?

官网对此处的解释是:

网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。 这里可能会存在一个疑问:为什么不是直接将变更的数据写出? 我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时, 或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。

个人理解是:

如果将变更数据直接写出,管理员频繁更新配置时,第一次更新了,将client移除阻塞队列,返回响应信息给网关。如果这个时候进行了第二次更新,那么当前的client是不在阻塞队列中,所以这一次的变更就会错过。网关层处理不及时,也是同理。 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。如果admin有数据变更,当前网关client是没有在阻塞队列中,就会更新不到数据。

我们还没有分析到admin端的处理逻辑,先大概说一下。在admin端,会将网关client放到阻塞队列,有数据变更,网关client就会出队列,发送变更数据。所以,如果有数据变更时,网关client不在阻塞队列,那么就无法得到当前变更的数据。

知道哪个分组数据发生变更时,主动再向admin获取变更的数据,根据分组不同,全量拿数据。调用方法是doFetchGroupConfig(),这个在前面已经分析过了。

分析到这里,网关端的数据同步操作就完成了。长轮询任务就是不断向admin发起请求,看看数据是否发生变更,如果有分组数据发生变更,那么就再主动向admin发起请求,获取变更数据,然后更新网关内存中的数据。

网关端长轮询任务流程:

3. admin数据同步

从前面分析的过程中,可以看到,网关端主要调用admin的两个接口:

  • /configs/listener:判断组数据是否发生变更;
  • /configs/fetch:获取变更组数据。

直接从这两个接口分析的话,可能有的地方不好理解,所以我们还是从admin启动流程开始分析数据同步过程。

3.1 加载配置

如果在配置文件application.yml中,进行了如下配置,就表示通过http长轮询的方式进行数据同步。

shenyu:
sync:
http:
enabled: true

程序启动时,通过springboot条件装配实现数据同步类的配置加载。在这个过程中,会创建HttpLongPollingDataChangedListener,负责处理长轮询的相关实现逻辑。

/**
* 数据同步配置类
* 通过springboot条件装配实现
* The type Data sync configuration.
*/
@Configuration
public class DataSyncConfiguration {

/**
* http长轮询
* http long polling.
*/
@Configuration
@ConditionalOnProperty(name = "shenyu.sync.http.enabled", havingValue = "true")
@EnableConfigurationProperties(HttpSyncProperties.class)
static class HttpLongPollingListener {

@Bean
@ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)
public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
return new HttpLongPollingDataChangedListener(httpSyncProperties);
}
}
}

3.2 数据变更监听器实例化

  • HttpLongPollingDataChangedListener

数据变更监听器通过构造函数的方式完成实例化和初始化操作。在构造函数中会创建阻塞队列,用于存放客户端;创建线程池,用于执行延迟任务,周期任务;保存长轮询相关属性信息。

    public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
// 默认客户端(这里是网关)1024个
this.clients = new ArrayBlockingQueue<>(1024);
// 创建线程池
// ScheduledThreadPoolExecutor 可以执行延迟任务,周期任务,普通任务
this.scheduler = new ScheduledThreadPoolExecutor(1,
ShenyuThreadFactory.create("long-polling", true));
// 长轮询的属性信息
this.httpSyncProperties = httpSyncProperties;
}

另外,它的类图关系如下:

实现了InitializingBean接口,所以在bean的初始化过程中执行afterInitialize()方法。通过线程池执行周期任务:更新内存中(CACHE)的数据每隔5分钟执行一次,5分钟后开始执行。刷新本地缓存就是从数据库读取数据到本地缓存(这里就是内存),通过refreshLocalCache()完成。

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {

// ......

/**
* 在 InitializingBean接口中的afterPropertiesSet()方法中被调用,即在bean的初始化过程中执行
*/
@Override
protected void afterInitialize() {
long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
// 执行周期任务:更新内存中(CACHE)的数据每隔5分钟执行一次,5分钟后开始执行
// 防止admin先启动一段时间后,产生了数据;然后网关初次连接时,没有拿到全量数据
scheduler.scheduleWithFixedDelay(() -> {
LOG.info("http sync strategy refresh config start.");
try {
// 从数据库读取数据到本地缓存(这里就是内存)
this.refreshLocalCache();
LOG.info("http sync strategy refresh config success.");
} catch (Exception e) {
LOG.error("http sync strategy refresh config error!", e);
}
}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
LOG.info("http sync strategy refresh interval: {}ms", syncInterval);
}

// ......
}
  • refreshLocalCache()

分别对5种数据类型进行更新。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {

// ......

// 从数据库读取数据到本地缓存(这里就是内存)
private void refreshLocalCache() {
//更新认证数据
this.updateAppAuthCache();
//更新插件数据
this.updatePluginCache();
//更新规则数据
this.updateRuleCache();
//更新选择器数据
this.updateSelectorCache();
//更新元数据
this.updateMetaDataCache();
}

// ......
}

5个更新方法的逻辑是类似的,调用service方法获取数据,然后放到内存CACHE中。以更新规则数据方法updateRuleCache()为例,传入规则枚举类型,调用ruleService.listAll()从数据库获取所有规则数据。

    /**
* Update rule cache.
*/
protected void updateRuleCache() {
this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll());
}
  • updateCache()

使用数据库中的数据更新内存中的数据。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {

// ......

// 缓存数据的 Map
protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();

/**
* if md5 is not the same as the original, then update lcoal cache.
* 更新缓存中的数据
* @param group ConfigGroupEnum
* @param <T> the type of class
* @param data the new config data
*/
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
//数据序列化
String json = GsonUtils.getInstance().toJson(data);
//传入md5值和修改时间
ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
//更新分组数据
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
LOG.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}

// ......
}

初始化的过程就是启动周期性任务,定时从数据库获取数据更新内存数据。

接下来开始对两个接口开始分析:

  • /configs/listener:判断组数据是否发生变更;
  • /configs/fetch:获取变更组数据。

3.3 数据变更轮询接口

  • /configs/listener:判断组数据是否发生变更;

接口类是ConfigController,只有使用http长轮询进行数据同步时才会生效。接口方法listener()没有其他逻辑,直接调用doLongPolling()方法。

   
/**
* This Controller only when HttpLongPollingDataChangedListener exist, will take effect.
*/
@ConditionalOnBean(HttpLongPollingDataChangedListener.class)
@RestController
@RequestMapping("/configs")
public class ConfigController {

private final HttpLongPollingDataChangedListener longPollingListener;

public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {
this.longPollingListener = longPollingListener;
}

// 省略其他逻辑

/**
* Listener.
* 监听数据变更,执行长轮询
* @param request the request
* @param response the response
*/
@PostMapping(value = "/listener")
public void listener(final HttpServletRequest request, final HttpServletResponse response) {
longPollingListener.doLongPolling(request, response);
}

}
  • HttpLongPollingDataChangedListener#doLongPolling()

执行长轮询任务:如果有数据变更,将会立即响应给客户端(这里就是网关端)。否则,客户端会一直被阻塞,直到有数据变更或者超时。

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {

// ......

/**
* 执行长轮询:如果有数据变更,会立即响应给客户端(这里就是网关端)。
* 否则,否则客户端会一直被阻塞,直到有数据变更或者超时。
* @param request
* @param response
*/
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// compare group md5
// 比较md5,判断网关的数据和admin端的数据是否一致,得到发生变更的数据组
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
// response immediately.
// 有变更的数据,则立即向网关响应
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
Log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}

// 没有变更,则将客户端(这里就是网关)放进阻塞队列
final AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0L);
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}

// ......
}
  • HttpLongPollingDataChangedListener#compareChangedGroup()

判断组数据是否发生变更,判断逻辑是比较网关端和admin端的md5值和lastModifyTime

  • 如果md5值不一样,那么需要更新;
  • 如果admin端的lastModifyTime大于网关端的lastModifyTime,那么需要更新。
 /**
* 判断组数据是否发生变更
* @param request
* @return
*/
private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
List<ConfigGroupEnum> changedGroup = new ArrayList<>(ConfigGroupEnum.values().length);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
// 网关端数据的md5值和lastModifyTime
String[] params = StringUtils.split(request.getParameter(group.name()), ',');
if (params == null || params.length != 2) {
throw new ShenyuException("group param invalid:" + request.getParameter(group.name()));
}
String clientMd5 = params[0];
long clientModifyTime = NumberUtils.toLong(params[1]);
ConfigDataCache serverCache = CACHE.get(group.name());
// do check. 判断组数据是否发生变更
if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
changedGroup.add(group);
}
}
return changedGroup;
}
  • LongPollingClient

没有变更数据,则将客户端(这里就是网关)放进阻塞队列。阻塞时间是60秒,即60秒后移除,并响应客户端。

class LongPollingClient implements Runnable {
// 省略了其他逻辑

@Override
public void run() {
try {
// 先设置定时任务:60秒后移除,并响应客户端
this.asyncTimeoutFuture = scheduler.schedule(() -> {
clients.remove(LongPollingClient.this);
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);

// 添加到阻塞队列
clients.add(this);

} catch (Exception ex) {
log.error("add long polling client error", ex);
}
}

/**
* Send response.
*
* @param changedGroups the changed groups
*/
void sendResponse(final List<ConfigGroupEnum> changedGroups) {
// cancel scheduler
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
// 响应变更的组
generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
asyncContext.complete();
}
}

3.4 获取变更数据接口

  • /configs/fetch:获取变更数据;

根据网关传入的参数,获取分组数据,返回结果。主要实现方法是longPollingListener.fetchConfig()


@ConditionalOnBean(HttpLongPollingDataChangedListener.class)
@RestController
@RequestMapping("/configs")
public class ConfigController {

private final HttpLongPollingDataChangedListener longPollingListener;

public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {
this.longPollingListener = longPollingListener;
}

/**
* Fetch configs shenyu result.
* 全量获取分组数据
* @param groupKeys the group keys
* @return the shenyu result
*/
@GetMapping("/fetch")
public ShenyuAdminResult fetchConfigs(@NotNull final String[] groupKeys) {
Map<String, ConfigData<?>> result = Maps.newHashMap();
for (String groupKey : groupKeys) {
ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));
result.put(groupKey, data);
}
return ShenyuAdminResult.success(ShenyuResultMessage.SUCCESS, result);
}

// 省略了其他接口

}
  • AbstractDataChangedListener#fetchConfig()

数据获取直接从CACHE中拿,然后根据不同分组类型进行匹配,封装。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
/**
* fetch configuration from cache.
* 获取分组下的全量数据
* @param groupKey the group key
* @return the configuration data
*/
public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
// 直接从 CACHE 中拿数据
ConfigDataCache config = CACHE.get(groupKey.name());
switch (groupKey) {
case APP_AUTH: // 认证数据
return buildConfigData(config, AppAuthData.class);
case PLUGIN: // 插件数据
return buildConfigData(config, PluginData.class);
case RULE: // 规则数据
return buildConfigData(config, RuleData.class);
case SELECTOR: // 选择器数据
return buildConfigData(config, SelectorData.class);
case META_DATA: // 元数据
return buildConfigData(config, MetaData.class);
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected groupKey: " + groupKey);
}
}
}

3.5 数据变更

在之前的websocket数据同步和zookeeper数据同步源码分析文章中,我们知道admin端数据同步设计结构如下:

各种数据变更监听器都是DataChangedListener的子类。

当在admin端修改数据后,通过Spring的事件处理机制,发送事件通知。发送逻辑如下:


/**
* Event forwarders, which forward the changed events to each ConfigEventListener.
* 数据变更事件分发器:当admin端有数据发生变更时,将变更数据同步到 ShenYu 网关
* 数据变更依赖于Spring的事件监听机制:ApplicationEventPublisher --> ApplicationEvent --> ApplicationListener
*
*/
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

//省略了其他逻辑

/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件信息
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则信息
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
// 当选择器数据更新时,更新API文档信息
applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
}

假设,对插件信息进行了修改,通过http长轮询的方式进行数据同步,那么listener.onPluginChanged()的实际调用的是org.apache.shenyu.admin.listener.AbstractDataChangedListener#onPluginChanged

    /**
* 在admin的操作,有插件发生了更新
* @param changed the changed
* @param eventType the event type
*/
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
// 更新内存CACHE
this.updatePluginCache();
// 执行变更任务
this.afterPluginChanged(changed, eventType);
}

有两个处理操作,一是更新内存CACHE,这个在前面分析过了;另一个是执行变更任务,在线程池中执行。

  • HttpLongPollingDataChangedListener#afterPluginChanged()
    @Override
protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
// 在线程池中执行
scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));
}
  • DataChangeTask

数据变更任务:将阻塞队列中的客户端依次移除,并发送响应,通知网关有组数据发生变更。

class DataChangeTask implements Runnable {
//省略了其他逻辑
@Override
public void run() {
// 阻塞队列中的客户端超过了给定的值100,则分批执行
if (clients.size() > httpSyncProperties.getNotifyBatchSize()) {
List<LongPollingClient> targetClients = new ArrayList<>(clients.size());
clients.drainTo(targetClients);
List<List<LongPollingClient>> partitionClients = Lists.partition(targetClients, httpSyncProperties.getNotifyBatchSize());
// 分批执行
partitionClients.forEach(item -> scheduler.execute(() -> doRun(item)));
} else {
// 执行任务
doRun(clients);
}
}

private void doRun(final Collection<LongPollingClient> clients) {
// 通知所有客户端发生了数据变更
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
LongPollingClient client = iter.next();
iter.remove();
// 发送响应
client.sendResponse(Collections.singletonList(groupKey));
LOG.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
}
}
}

至此,admin端数据同步逻辑就分析完了。在基于http长轮询数据同步是,它主要有三个功能:

  • 提供数据变更监听接口;
  • 提供获取变更数据接口;
  • 有数据变更时,移除阻塞队列中的客户端,并响应结果。

最后,用三张图描述下admin端长轮询任务流程:

  • /configs/listener数据变更监听接口:

  • /configs/fetch获取变更数据接口:

  • 在admin后台管理系统更新数据,进行数据同步:

4. 总结

本文主要对ShenYu网关中的http长轮询数据同步进行了源码分析。涉及到的主要知识点如下:

  • http长轮询由网关端主动发起请求,不断请求admin端;
  • 变更数据以组为粒度(认证信息、插件、选择器、规则、元数据);
  • http长轮询结果只拿到了变更组,还需要再次发起请求获取组数据;
  • 数据是否更新由md5值和修改时间lastModifyTime决定。

· One min read

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,注册中心是用于将客户端信息注册到shenyu-adminadmin再通过数据同步将这些信息同步到网关,网关通过这些数据完成流量筛选。客户端信息主要包括接口信息URI信息

本文基于shenyu-2.5.0版本进行源码分析,官网的介绍请参考 客户端接入原理

1. 注册中心原理

当客户端启动时,读取接口信息和uri信息,通过指定的注册类型,将数据发送到shenyu-admin

图中的注册中心需要用户指定使用哪种注册类型,ShenYu当前支持HttpZookeeperEtcdConsulNacos进行注册。具体如何配置请参考 客户端接入配置

ShenYu在注册中心的原理设计上引入了DisruptorDisruptor队列在其中起到数据与操作解耦,利于扩展。如果注册请求过多,导致注册异常,也有数据缓冲作用。

如图所示,注册中心分为两个部分,一是注册中心客户端register-client,负责处理客户端数据读取。另一个是注册中心服务端register-server,负责处理服务端(就是shenyu-admin)数据写入。通过指定注册类型进行数据发送和接收。

  • 客户端:通常来说就是一个微服务,可以是springmvcspring-clouddubbogrpc等。
  • register-client:注册中心客户端,读取客户接口和uri信息。
  • Disruptor:数据与操作解耦,数据缓冲作用。
  • register-server:注册中心服务端,这里就是shenyu-admin,接收数据,写入数据库,发数据同步事件。
  • 注册类型:指定注册类型,完成数据注册,当前支持HttpZookeeperEtcdConsulNacos

本文分析的是使用Http的方式进行注册,所以具体的处理流程如下:

在客户端,数据出队列后,通过http传输数据,在服务端,提供相应的接口,接收数据,然后写入队列。

2. 客户端注册流程

当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。官方提供的例子是一个由springboot构建的微服务。注册中心的相关配置可以参考官网 客户端接入配置

2.1 加载配置,读取属性

先用一张图串联下注册中心客户端初始化流程:

我们分析的是通过http的方式进行注册,所以需要进行如下配置:

shenyu:
register:
registerType: http
serverLists: http://localhost:9095
props:
username: admin
password: 123456
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false

每个属性表示的含义如下:

  • registerType: 服务注册类型,填写 http
  • serverList: 为http注册类型时,填写Shenyu-Admin项目的地址,注意加上http://,多个地址用英文逗号分隔。
  • username: Shenyu-Admin用户名
  • password: Shenyu-Admin用户对应的密码
  • port: 你本项目的启动端口,目前springmvc/tars/grpc需要进行填写。
  • contextPath: 为你的这个mvc项目在shenyu网关的路由前缀, 比如/order/product 等等,网关会根据你的这个前缀来进行路由。
  • appName:你的应用名称,不配置的话,会默认取 spring.application.name 的值。
  • isFull: 设置 true 代表代理你的整个服务,false表示代理你其中某几个controller;目前适用于springmvc/springcloud

项目启动后,会先加载配置文件,读取属性信息,生成相应的Bean

首先读取到的配置文件是 ShenyuSpringMvcClientConfiguration,它是shenyu 客户端http注册配置类,通过@Configuration表示这是一个配置类,通过@ImportAutoConfiguration引入其他配置类。创建SpringMvcClientEventListener,主要处理元数据和 URI 信息。

/**
* shenyu 客户端http注册配置类
*/
@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {

// 创建SpringMvcClientEventListener,主要处理元数据和URI信息
@Bean
public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
}
}

ShenyuClientCommonBeanConfigurationshenyu客户端通用配置类,会创建注册中心客户端通用的bean

  • 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
  • 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置。
  • 创建ShenyuClientConfig,读取shenyu.client属性配置。

/**
* shenyu客户端通用配置类
*/
@Configuration
public class ShenyuClientCommonBeanConfiguration {

// 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
@Bean
public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
return ShenyuClientRegisterRepositoryFactory.newInstance(config);
}

// 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}

// 创建ShenyuClientConfig,读取shenyu.client属性配置
@Bean
@ConfigurationProperties(prefix = "shenyu")
public ShenyuClientConfig shenyuClientConfig() {
return new ShenyuClientConfig();
}
}

2.2 用于注册的 HttpClientRegisterRepository

上面的配置文件中生成的ShenyuClientRegisterRepository是客户端注册的具体实现,它是一个接口,它的实现类如下。

  • HttpClientRegisterRepository:通过http进行注册;
  • ConsulClientRegisterRepository:通过Consul进行注册;
  • EtcdClientRegisterRepository:通过Etcd进行注册;
  • NacosClientRegisterRepository:通过nacos进行注册;
  • ZookeeperClientRegisterRepository通过Zookeeper进行注册。

具体是哪一种方式,是通过SPI进行加载实现的,实现逻辑如下:


/**
* 加载 ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {

private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();

/**
* 创建 ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// 通过SPI的方式进行加载,类型由registerType决定
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
//执行初始化操作
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}

加载类型通过registerType指定,也就是我们在配置文件中指定的类型:

shenyu:
register:
registerType: http
serverLists: http://localhost:9095

我们指定的是http,所以会去加载HttpClientRegisterRepository。对象创建成功后,执行的初始化方法init()如下:

@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {

@Override
public void init(final ShenyuRegisterCenterConfig config) {
this.username = config.getProps().getProperty(Constants.USER_NAME);
this.password = config.getProps().getProperty(Constants.PASS_WORD);
this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
this.setAccessToken();
}

// 暂时省略其他逻辑
}

读取配置文件中的usernamepasswordserverLists,即sheenyu-admin的访问账号、密码和地址信息,为后续数据发送做准备。类注解@Join用于SPI的加载。

SPI 全称为 Service Provider Interface, 是 JDK 内置的一种服务提供发现功能, 一种动态替换发现的机制。

shenyu-spiApache ShenYu网关自定义的SPI扩展实现,设计和实现原理参考了DubboSPI扩展实现

2.3 构建 元数据 和 URI信息 的 SpringMvcClientEventListener

创建 SpringMvcClientEventListener,负责客户端 元数据URI 数据的构建和注册,它的创建是在配置文件中完成。

@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {
// ......

// 创建 SpringMvcClientEventListener
@Bean
public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
}
}
  • SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener

AbstractContextRefreshedEventListener是一个抽象类,它实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。它的实现目前有八种,每一种表示对应的RPC调用协议的 元数据URI 信息的注册。

  • AlibabaDubboServiceBeanListener:处理使用Alibaba Dubbo协议;
  • ApacheDubboServiceBeanListener:处理使用Apacge Dubbo协议;
  • GrpcClientEventListener:处理使用grpc协议;
  • MotanServiceEventListener:处理使用Mortan协议;
  • SofaServiceEventListener:处理使用Sofa协议;
  • SpringMvcClientEventListener:处理使用http协议;
  • SpringWebSocketClientEventListener:处理使用websocket协议;
  • TarsServiceBeanEventListener:处理使用Tars注册类型;
// 实现了ApplicationListener接口
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {

//......

//构造函数
public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 读取 shenyu.client.http 配置信息
Properties props = clientConfig.getProps();
// appName 应用名称
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
// contextPath上下文路径
this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");
if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
String errorMsg = "client register param must config the appName or contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);
// host信息
this.host = props.getProperty(ShenyuClientConstants.HOST);
// port 客户端端口信息
this.port = props.getProperty(ShenyuClientConstants.PORT);
// 开始事件发布
publisher.start(shenyuClientRegisterRepository);
}

// 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {
//保证该方法的内容只执行一次
if (!registered.compareAndSet(false, true)) {
return;
}
final ApplicationContext context = event.getApplicationContext();
// 获取声明RPC调用的类
Map<String, T> beans = getBeans(context);
if (MapUtils.isEmpty(beans)) {
return;
}
// 构建URI数据并注册
publisher.publishEvent(buildURIRegisterDTO(context, beans));
// 构建元数据并注册
beans.forEach(this::handle);
}

// 交给不同的子类实现
@SuppressWarnings("all")
protected abstract URIRegisterDTO buildURIRegisterDTO(ApplicationContext context,
Map<String, T> beans);


protected void handle(final String beanName, final T bean) {
Class<?> clazz = getCorrectedClass(bean);
// 获取当前bean的对应shenyu客户端的注解(对应不同的RPC调用注解不一样,像http的就是@ShenyuSpringMvcClient,而像SpringCloud的则是@ShenyuSpringCloudClient)
final A beanShenyuClient = AnnotatedElementUtils.findMergedAnnotation(clazz, getAnnotationType());
// 根据bean获取对应的path(不同子类实现不一样)
final String superPath = buildApiSuperPath(clazz, beanShenyuClient);
// 如果包含Shenyu客户端注解或者path中包括'*',表示注册整个类的接口
if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {
// 构建类的元数据,发送注册事件
handleClass(clazz, bean, beanShenyuClient, superPath);
return;
}
// 获取当前bean的所有方法
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
// 遍历方法
for (Method method : methods) {
// 注册符合条件的方法
handleMethod(bean, clazz, beanShenyuClient, method, superPath);
}
}

// 构建类元数据并注册的默认实现
protected void handleClass(final Class<?> clazz,
final T bean,
@NonNull final A beanShenyuClient,
final String superPath) {
publisher.publishEvent(buildMetaDataDTO(bean, beanShenyuClient, pathJoin(contextPath, superPath), clazz, null));
}

// 构建方法元数据并注册的默认实现
protected void handleMethod(final T bean,
final Class<?> clazz,
@Nullable final A beanShenyuClient,
final Method method,
final String superPath) {
// 如果方法上有Shenyu客户端注解,就表示该方法需要注册
A methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, getAnnotationType());
if (Objects.nonNull(methodShenyuClient)) {
// 构建元数据,发送注册事件
publisher.publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));
}
}

// 交给不同子类实现
protected abstract MetaDataRegisterDTO buildMetaDataDTO(T bean,
@NonNull A shenyuClient,
String path,
Class<?> clazz,
Method method);
}

在构造函数中主要是读取属性配置。

shenyu:
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false

最后,执行了publisher.start(),开始事件发布,为注册做准备。

  • ShenyuClientRegisterEventPublisher

ShenyuClientRegisterEventPublisher通过单例模式实现,主要是生成元数据和URI订阅器(后续用于数据发布),然后启动Disruptor队列。提供了一个共有方法publishEvent(),发布事件,向Disruptor队列发数据。


public class ShenyuClientRegisterEventPublisher {
// 私有变量
private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();

private DisruptorProviderManage<DataTypeParent> providerManage;

/**
* 公开静态方法
*
* @return ShenyuClientRegisterEventPublisher instance
*/
public static ShenyuClientRegisterEventPublisher getInstance() {
return INSTANCE;
}

/**
* Start方法执行
*
* @param shenyuClientRegisterRepository shenyuClientRegisterRepository
*/
public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 创建客户端注册工厂类
RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();
// 添加元数据订阅器
factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
// 添加URI订阅器
factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
// 启动Disruptor队列
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}

/**
* 发布事件,向Disruptor队列发数据
*
* @param data the data
*/
public <T> void publishEvent(final DataTypeParent data) {
DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
provider.onData(data);
}
}

AbstractContextRefreshedEventListener的构造函数逻辑分析完成了,主要是读取属性配置,创建元数据URI订阅器,启动Disruptor队列。

onApplicationEvent()方法是有Spring事件发生时会执行,这里的参数是ContextRefreshedEvent,表示上下文刷新事件。当Spring容器就绪后执行此处逻辑:先构建URI数据并注册,再构建元数据并注册,

ContextRefreshedEventSpring内置事件。ApplicationContext被初始化或刷新时,该事件被触发。这也可以在 ConfigurableApplicationContext接口中使用 refresh() 方法来发生。此处的初始化是指:所有的Bean被成功装载,后处理Bean被检测并激活,所有Singleton Bean 被预实例化,ApplicationContext容器已就绪可用。

再来看AbstractContextRefreshedEventListener的http实现SpringMvcClientEventListener

public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {

private final List<Class<? extends Annotation>> mappingAnnotation = new ArrayList<>(3);

private final Boolean isFull;

private final String protocol;

// 构造函数
public SpringMvcClientEventListener(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
super(clientConfig, shenyuClientRegisterRepository);
Properties props = clientConfig.getProps();
// 获取 isFull
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// 表示是http协议的实现
this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);
mappingAnnotation.add(ShenyuSpringMvcClient.class);
mappingAnnotation.add(RequestMapping.class);
}

@Override
protected Map<String, Object> getBeans(final ApplicationContext context) {
// 配置属性,如果 isFull=true 的话,表示注册整个微服务
if (Boolean.TRUE.equals(isFull)) {
getPublisher().publishEvent(MetaDataRegisterDTO.builder()
.contextPath(getContextPath())
.appName(getAppName())
.path(PathUtils.decoratorPathWithSlash(getContextPath()))
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(getContextPath())
.build());
return null;
}
// 否则获取带Controller注解的bean
return context.getBeansWithAnnotation(Controller.class);
}

// 构造URI数据
@Override
protected URIRegisterDTO buildURIRegisterDTO(final ApplicationContext context,
final Map<String, Object> beans) {
// ...
}

@Override
protected String buildApiSuperPath(final Class<?> clazz, @Nullable final ShenyuSpringMvcClient beanShenyuClient) {
// 如果有带上Shenyu客户端注解,则优先取注解中的不为空的path属性
if (Objects.nonNull(beanShenyuClient) && StringUtils.isNotBlank(beanShenyuClient.path())) {
return beanShenyuClient.path();
}
// 如果有带上RequestMapping注解,且path属性不为空,则返回path数组的第一个值
RequestMapping requestMapping = AnnotationUtils.findAnnotation(clazz, RequestMapping.class);
if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {
return requestMapping.path()[0];
}
return "";
}

// 声明http实现的客户端注解是ShenyuSpringMvcClient
@Override
protected Class<ShenyuSpringMvcClient> getAnnotationType() {
return ShenyuSpringMvcClient.class;
}

@Override
protected void handleMethod(final Object bean, final Class<?> clazz,
@Nullable final ShenyuSpringMvcClient beanShenyuClient,
final Method method, final String superPath) {
// 获取当前bean的RequestMapping注解
final RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(method, RequestMapping.class);
// 获取当前bean的 ShenyuSpringMvcClient 注解
ShenyuSpringMvcClient methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, ShenyuSpringMvcClient.class);
methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;
//如果有 ShenyuSpringMvcClient 注解并且包含RequestMapping注解(表示是一个接口),则进行注册
if (Objects.nonNull(methodShenyuClient) && Objects.nonNull(requestMapping)) {
getPublisher().publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));
}
}

//...

// 构造元数据
@Override
protected MetaDataRegisterDTO buildMetaDataDTO(final Object bean,
@NonNull final ShenyuSpringMvcClient shenyuClient,
final String path, final Class<?> clazz,
final Method method) {
//...
}
}

注册逻辑都是通过 publisher.publishEvent()完成。

Controller注解和RequestMapping注解是由Spring提供的,这个大家应该很熟悉,不过多赘述。ShenyuSpringMvcClient 注解是由Apache ShenYu提供的,用于注册SpringMvc客户端,它的定义如下:


/**
* shenyu 客户端接口,用于方法上或类上
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {

// path 注册路径
@AliasFor(attribute = "path")
String value() default "";

// path 注册路径
@AliasFor(attribute = "value")
String path();

// ruleName 规则名称
String ruleName() default "";

// desc 描述信息
String desc() default "";

// enabled是否启用
boolean enabled() default true;

// registerMetaData 注册元数据
boolean registerMetaData() default false;
}

它的使用如下:

  • 注册整个接口
@RestController
@RequestMapping("/test")
@ShenyuSpringMvcClient(path = "/test/**") // 表示整个接口注册
public class HttpTestController {
//......
}
  • 注册当前方法
@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order")
public class OrderController {

/**
* Save order dto.
*
* @param orderDTO the order dto
* @return the order dto
*/
@PostMapping("/save")
@ShenyuSpringMvcClient(path = "/save", desc = "Save order") // 注册当前方法
public OrderDTO save(@RequestBody final OrderDTO orderDTO) {
orderDTO.setName("hello world save order");
return orderDTO;
}
}
  • publisher.publishEvent() 发布注册事件

该方法会将数据发送到Disruptor队列中,关于Disruptor队列更多细节这里不做更多介绍,这不影响分析注册的流程。

当数据发送后,Disruptor队列的消费者会处理数据,进行消费。

  • QueueConsumer 消费数据

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;

public interface WorkHandler<T> {
void onEvent(T event) throws Exception;
}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。


/**
*
* 队列消费者
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {

// 省略了其他逻辑

@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// 根据事件类型使用不同的线程池
ThreadPoolExecutor executor = orderly(t);
// 通过工厂创建队列消费任务
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 保存数据
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// 放在线程池中执行 消费任务
executor.execute(queueConsumerExecutor);
}
}
}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务(服务端就是admin,在下文进行分析)。

  • RegisterClientConsumerExecutor 消费者执行器

重写的run()逻辑如下:


public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {

//......

@Override
public void run() {
// 获取数据
final T data = getData();
// 根据数据类型调用相应的处理器进行处理
subscribers.get(data.getType()).executor(Lists.newArrayList(data));
}

}

根据不同的数据类型调用不同的处理器去执行相应的任务。数据类型有两种,一个是元数据,记录客户端注册信息。一个是URI数据,记录客户端服务信息。

//数据类型
public enum DataType {
// 元数据
META_DATA,

// URI数据
URI,
}
  • ExecutorSubscriber#executor() 执行器订阅者

执行器订阅者也分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

先看元数据处理

  • ShenyuClientMetadataExecutorSubscriber#executor()

客户端这边对元数据处理逻辑是:遍历元数据信息,调用接口方法persistInterface()完成数据的发布。

public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.META_DATA; // 元数据
}

@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
// 调用接口方法persistInterface()完成数据的发布
shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);
}
}
}
  • ShenyuClientRegisterRepository#persistInterface()

ShenyuClientRegisterRepository是一个接口,用于表示客户端数据注册,它的实现类目前有五种,每一种就表示一种注册方法。

  • ConsulClientRegisterRepository:通过Consul实现客户端注册;
  • EtcdClientRegisterRepository:通过Etcd实现客户端注册;
  • HttpClientRegisterRepository:通过Http实现客户端注册;
  • NacosClientRegisterRepository:通过Nacos实现客户端注册;
  • ZookeeperClientRegisterRepository:通过Zookeeper实现客户端注册;

从图中可以看出,注册中心的加载是通过SPI的方式完成的。这个在前面提到过了,在客户端通用配置文件中,通过指定配置文件中的属性完成具体的类加载。


/**
* 加载 ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {

private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();

/**
* 创建 ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// 通过SPI的方式进行加载,类型由registerType决定
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
//执行初始化操作
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}

本文的源码分析是基于Http的方式进行注册,所以我们先分析HttpClientRegisterRepository,其他的注册方式后续再分析。HttpClientRegisterRepository继承了FailbackRegistryRepository,而FailbackRegistryRepository本身主要用于对Http注册过程中的失败异常的处理,这里就省略了。

通过http的方式注册很简单,就是调用工具类发送http请求。注册元数据和URI都是调用的同一个方法doRegister(),指定接口和类型就好。

  • Constants.URI_PATH的值/shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • Constants.META_PATH的值/shenyu-client/register-uri: 服务端提供的接口用于注册URI。
@Join
public class HttpClientRegisterRepository extends FailbackRegistryRepository {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientRegisterRepository.class);

private static URIRegisterDTO uriRegisterDTO;

private String username;

private String password;

private List<String> serverList;

private String accessToken;

public HttpClientRegisterRepository() {
}

public HttpClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
init(config);
}

@Override
public void init(final ShenyuRegisterCenterConfig config) {
// admin的用户名
this.username = config.getProps().getProperty(Constants.USER_NAME);
// admin的用户名对应的密码
this.password = config.getProps().getProperty(Constants.PASS_WORD);
// admin服务列表
this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
// 设置访问的token
this.setAccessToken();
}

/**
* Persist uri.
*
* @param registerDTO the register dto
*/
@Override
public void doPersistURI(final URIRegisterDTO registerDTO) {
if (RuntimeUtils.listenByOther(registerDTO.getPort())) {
return;
}
doRegister(registerDTO, Constants.URI_PATH, Constants.URI);
uriRegisterDTO = registerDTO;
}

@Override
public void doPersistInterface(final MetaDataRegisterDTO metadata) {
doRegister(metadata, Constants.META_PATH, Constants.META_TYPE);
}

@Override
public void close() {
if (uriRegisterDTO != null) {
uriRegisterDTO.setEventType(EventType.DELETED);
doRegister(uriRegisterDTO, Constants.URI_PATH, Constants.URI);
}
}

private void setAccessToken() {
for (String server : serverList) {
try {
Optional<?> login = RegisterUtils.doLogin(username, password, server.concat(Constants.LOGIN_PATH));
login.ifPresent(v -> this.accessToken = String.valueOf(v));
} catch (Exception e) {
LOGGER.error("Login admin url :{} is fail, will retry. cause: {} ", server, e.getMessage());
}
}
}

private <T> void doRegister(final T t, final String path, final String type) {
int i = 0;
// 遍历admin服务列表(admin可能是集群)
for (String server : serverList) {
i++;
String concat = server.concat(path);
try {
// 设置访问token
if (StringUtils.isBlank(accessToken)) {
this.setAccessToken();
if (StringUtils.isBlank(accessToken)) {
throw new NullPointerException("accessToken is null");
}
}
// 调用工具类发送 http 请求
RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);
return;
} catch (Exception e) {
LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());
if (i == serverList.size()) {
throw new RuntimeException(e);
}
}
}
}
}

将数据序列化后,通过OkHttp发送数据。


public final class RegisterUtils {

//......

// 通过OkHttp发送数据
public static void doRegister(final String json, final String url, final String type) throws IOException {
if (!StringUtils.hasLength(accessToken)) {
LOGGER.error("{} client register error accessToken is null, please check the config : {} ", type, json);
return;
}
Headers headers = new Headers.Builder().add(Constants.X_ACCESS_TOKEN, accessToken).build();
String result = OkHttpTools.getInstance().post(url, json, headers);
if (Objects.equals(SUCCESS, result)) {
LOGGER.info("{} client register success: {} ", type, json);
} else {
LOGGER.error("{} client register error: {} ", type, json);
}
}
}

至此,客户端通过http的方式注册元数据的逻辑就分析完了。小结一下:通过读取自定义的注解信息构造元数据,将数据发到Disruptor队列,然后从队列中消费数据,将消费者放到线程池中去执行,最终通过发送http请求到admin

再来看看 URI 数据的处理

  • ShenyuClientURIExecutorSubscriber#executor()

主要逻辑是遍历URI数据集合,通过persistURI()方法实现数据注册。


public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.URI; //数据类型是URI
}

// 注册URI数据
@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
for (URIRegisterDTO uriRegisterDTO : dataList) {
Stopwatch stopwatch = Stopwatch.createStarted();
while (true) {
try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
break;
} catch (IOException e) {
long sleepTime = 1000;
// maybe the port is delay exposed
if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
LOG.error("host:{}, port:{} connection failed, will retry",
uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
// If the connection fails for a long time, Increase sleep time
if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
sleepTime = 10000;
}
}
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
//添加hook,优雅停止客户端
ShenyuClientShutdownHook.delayOtherHooks();

// 注册URI
shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
}
}
}

代码中的while(true)循环是为了保证客户端已经成功启动了,通过hostport可以连接上。

后面的逻辑是:添加hook函数,用于优雅停止客户端 。

通过persistURI()方法实现数据注册。整个逻辑也在前面分析过了,最终就是通过OkHttp客户端向shenyu-admin发起http,通过http的方式注册URI

分析到这里就将客户端的注册逻辑分析完了,将构建的元数据和URI数据发送到Disruptor队列,再从中消费,读取数据,通过httpadmin发送数据。

客户端元数据URI注册流程的源码分析完成了,流程图如下:

3. 服务端注册流程

3.1 注册接口ShenyuClientHttpRegistryController

从前面的分析可以知道,服务端提供了注册的两个接口:

  • /shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • /shenyu-client/register-uri: 服务端提供的接口用于注册URI。

这两个接口位于ShenyuClientHttpRegistryController中,它实现了ShenyuClientServerRegisterRepository接口,是服务端注册的实现类。它用@Join标记,表示通过SPI进行加载。

// shenuyu客户端接口
@RequestMapping("/shenyu-client")
@Join
public class ShenyuClientHttpRegistryController implements ShenyuClientServerRegisterRepository {

private ShenyuClientServerRegisterPublisher publisher;

@Override
public void init(final ShenyuClientServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {
this.publisher = publisher;
}

@Override
public void close() {
publisher.close();
}

// 注册元数据
@PostMapping("/register-metadata")
@ResponseBody
public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
publisher.publish(metaDataRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}

// 注册URI
@PostMapping("/register-uri")
@ResponseBody
public String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {
publisher.publish(uriRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}
}

两个注册接口获取到数据好,就调用了publisher.publish()方法,把数据发布到Disruptor队列中。

  • ShenyuClientServerRegisterRepository接口

ShenyuClientServerRegisterRepository接口是服务注册接口,它有五个实现类,表示有五种注册方式:

  • ConsulClientServerRegisterRepository:通过Consul实现注册;
  • EtcdClientServerRegisterRepository:通过Etcd实现注册;
  • NacosClientServerRegisterRepository:通过Nacos实现注册;
  • ShenyuClientHttpRegistryController:通过Http实现注册;
  • ZookeeperClientServerRegisterRepository:通过Zookeeper实现注册。

具体用哪一种方式,是通过配置文件指定的,然后通过SPI进行加载。

shenyu-admin中的application.yml文件中配置注册方式,registerType指定注册类型,当用http进行注册时,serverLists不需要填写,更多配置说明可以参考官网 客户端接入配置

shenyu:
register:
registerType: http
serverLists:
  • RegisterCenterConfiguration 加载配置

在引入相关依赖和属性配置后,启动shenyu-admin时,会先加载配置文件,和注册中心相关的配置文件类是RegisterCenterConfiguration

// 注册中心配置类
@Configuration
public class RegisterCenterConfiguration {
// 读取配置属性
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}

//创建ShenyuServerRegisterRepository,用于服务端注册
@Bean(destroyMethod = "close")
public ShenyuServerRegisterRepository shenyuServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
// 1.从配置属性中获取注册类型
String registerType = shenyuRegisterCenterConfig.getRegisterType();
// 2.通过注册类型,以SPI的方法加载实现类
ShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);
// 3.获取publisher,向Disruptor队列中写数据
RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();
// 4.注册Service, rpcType -> registerService
Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));
// 5.事件发布的准备工作
publisher.start(registerServiceMap);
// 6.注册的初始化操作
registerRepository.init(publisher, shenyuRegisterCenterConfig);
return registerRepository;
}
}

在配置类中生成了两个bean

  • shenyuRegisterCenterConfig:读取属性配置;

  • shenyuClientServerRegisterRepository:用于服务端注册。

在创建shenyuClientServerRegisterRepository的过程中,也进行了一系列的准备工作:

  • 1.从配置属性中获取注册类型。

  • 2.通过注册类型,以SPI的方法加载实现类:比如指定的类型是http,就会加载ShenyuClientHttpRegistryController

  • 3.获取publisher,向Disruptor队列中写数据。

  • 4.注册ServicerpcType -> registerService:获取注册的Service,每种rpc都有对应的Service。本文的客户端构建是通过springboot,属于http类型,还有其他客户端类型:dubboSpring CloudgRPC等。

  • 5.事件发布的准备工作:添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。

  • 6.注册的初始化操作:http类型的注册初始化操作就是保存publisher

  • RegisterServerDisruptorPublisher#publish()

服务端向Disruptor队列写入数据的发布者 ,通过单例模式构建。


public class RegisterClientServerDisruptorPublisher implements ShenyuClientServerRegisterPublisher {
//私有属性
private static final RegisterClientServerDisruptorPublisher INSTANCE = new RegisterClientServerDisruptorPublisher();

private DisruptorProviderManage<Collection<DataTypeParent>> providerManage;

//公开静态方法获取实例
public static RegisterServerDisruptorPublisher getInstance() {
return INSTANCE;
}

//事件发布的准备工作,添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。
public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
//服务端注册工厂
RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();
//添加URI数据订阅器
factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
//添加元数据订阅器
factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
//启动Disruptor队列
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}

// 向队列中写入数据
@Override
public void publish(final DataTypeParent data) {
DisruptorProvider<Collection<DataTypeParent>> provider = providerManage.getProvider();
provider.onData(Collections.singleton(data));
}

// 批量向队列中写入数据
@Override
public void publish(final Collection<? extends DataTypeParent> dataList) {
DisruptorProvider<Collection<DataTypeParent>> provider = providerManage.getProvider();
provider.onData(dataList.stream().map(DataTypeParent.class::cast).collect(Collectors.toList()));
}

@Override
public void close() {
providerManage.getProvider().shutdown();
}
}

配置文件的加载,可看作是注册中心服务端初始化流程,用图描述如下:

3.2 消费数据QueueConsumer

在前面分析了客户端disruptor队列消费数据的过。服务端也是一样的逻辑,只是其中执行任务的执行者变了。

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;

public interface WorkHandler<T> {
void onEvent(T event) throws Exception;
}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。

/**
*
* 队列消费者
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {

// 省略了其他逻辑

@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// 根据事件类型获取相应的线程池
ThreadPoolExecutor executor = orderly(t);
// 通过工厂创建队列消费任务
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 保存数据
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// 放在线程池中执行 消费任务
executor.execute(queueConsumerExecutor);
}
}
}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务。

  • RegisterServerConsumerExecutor#run()

RegisterServerConsumerExecutor是服务端消费者执行器,它通过QueueConsumerExecutor间接实现了Runnable接口,并重写了run()方法。


public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<Collection<DataTypeParent>> {
// ...

@Override
public void run() {
//获取从disruptor队列中拿到的数据
Collection<DataTypeParent> results = getData()
.stream()
.filter(this::isValidData)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(results)) {
return;
}
//根据类型执行操作
selectExecutor(results).executor(results);
}

// 根据类型获取订阅者
private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {
final Optional<DataTypeParent> first = list.stream().findFirst();
return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());
}
}

  • ExecutorSubscriber#executor()

执行器订阅者分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

  • MetadataExecutorSubscriber#executor()

如果是注册元数据,则通过MetadataExecutorSubscriber#executor()实现:根据类型获取注册Service,调用register()

public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.META_DATA; // 元数据类型
}

@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
// 遍历元数据列表
metaDataRegisterDTOList.forEach(meta -> {
Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType())) // 根据类型获取注册Service
.ifPresent(shenyuClientRegisterService -> {
// 对元数据进行注册,加锁确保顺序执行,防止并发错误
synchronized (shenyuClientRegisterService) {
shenyuClientRegisterService.register(meta);
}
});
});
}
}
  • URIRegisterExecutorSubscriber#executor()

如果是注册元数据,则通过URIRegisterExecutorSubscriber#executor()实现:构建URI数据,根据注册类型查找Service,通过registerURI方法实现注册。


public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
//......

@Override
public DataType getType() {
return DataType.URI; // URI数据类型
}

@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return;
}
// 根据rpc调用类型聚集数据
final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream()
.filter(data -> StringUtils.isNotBlank(data.getRpcType()))
.collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));
for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {
final String rpcType = entry.getKey();
// 根据类型查找Service
Optional.ofNullable(shenyuClientRegisterService.get(rpcType))
.ifPresent(service -> {
final List<URIRegisterDTO> list = entry.getValue();
// 构建URI数据类型,通过registerURI方法实现注册
Map<String, List<URIRegisterDTO>> listMap = buildData(list);
listMap.forEach(service::registerURI);
});
}
}
}

  • ShenyuClientRegisterService#register()

ShenyuClientRegisterService是注册方法接口,它有多个实现类:

  • AbstractContextPathRegisterService:抽象类,处理部分公共逻辑;
  • AbstractShenyuClientRegisterServiceImpl::抽象类,处理部分公共逻辑;
  • ShenyuClientRegisterDivideServiceImpldivide类,处理http注册类型;
  • ShenyuClientRegisterDubboServiceImpldubbo类,处理dubbo注册类型;
  • ShenyuClientRegisterGrpcServiceImplgRPC类,处理gRPC注册类型;
  • ShenyuClientRegisterMotanServiceImplMotan类,处理Motan注册类型;
  • ShenyuClientRegisterSofaServiceImplSofa类,处理Sofa注册类型;
  • ShenyuClientRegisterSpringCloudServiceImplSpringCloud类,处理SpringCloud注册类型;
  • ShenyuClientRegisterTarsServiceImplTars类,处理Tars注册类型;
  • ShenyuClientRegisterWebSocketServiceImplWebsocket类,处理Websocket注册类型;

从上面可以看出每种微服务都有对应的注册实现类,本文的源码分析是 以官方提供的 shenyu-examples-http 为例,是属http注册类型,所以元数据和URI数据的注册实现类是 ShenyuClientRegisterDivideServiceImpl

  • register(): 注册元数据
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {

//......

public String register(final MetaDataRegisterDTO dto) {
// 1.注册选择器信息
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
// 2.注册规则信息
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
// 3.注册元数据信息
registerMetadata(dto);
// 4.注册contextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
}

整个注册逻辑可以分为4个步骤:

  • 1.注册选择器信息
  • 2.注册规则信息
  • 3.注册元数据信息
  • 4.注册contextPath

admin这一侧通过客户端的元数据信息需要构建选择器、规则、元数据和ContextPath。具体的注册过程和细节处理跟rpc类型有关。我们就不再继续向下追踪了,对于注册中心的逻辑分析,跟踪到这里就够了。

服务端元数据注册流程的源码分析完了,流程图描述如下:

  • registerURI(): 注册URI数据
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {

//......

public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
// 对应的选择器是否存在
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
throw new ShenyuException("doRegister Failed to execute,wait to retry.");
}
List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
// 处理选择器中的handler信息
String handler = buildHandle(validUriList, selectorDO);
if (handler != null) {
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// 更新数据库中的记录
selectorService.updateSelective(selectorDO);
// 发布事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
}
return ShenyuResultMessage.SUCCESS;
}
}

admin拿到URI数据后,主要是更新选择器中的handler信息,然后写入到数据库,最后发布事件通知网关。通知网关的逻辑是由数据同步操作完成,这在之前的文章中已经分析过了,就不再赘述。

服务端URI注册流程的源码分析完成了,用图描述如下:

至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor队列,再从中消费数据,根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler

4. 总结

本文主要对Apache ShenYu网关中的http注册模块进行了源码分析。涉及到的主要知识点,归纳如下:

  • 注册中心是为了将客户端信息注册到admin,方便流量筛选;
  • http注册是将客户端元数据信息和URI信息注册到admin
  • http服务的接入通过注解@ShenyuSpringMvcClient标识;
  • 注册信息的构建主要通过Spring应用监听器ApplicationListener
  • 注册类型的加载通过SPI完成;
  • 引入Disruptor队列是为了数据与操作解耦,以及数据缓冲。
  • 注册中心的实现采用了面向接口编程,使用模板方法、单例、观察者等设计模式。