Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的
API网关。
在ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeper、WebSocket、Http长轮询、Nacos 、Etcd 和 Consul 进行数据同步。本文的主要内容是基于Http长轮询的数据同步源码分析。
本文基于
shenyu-2.5.0版本进行源码分析,官网的介绍请参考 数据同步原理 。
1. Http长轮询
这里直接引用官网的相关描述:
Zookeeper和WebSocket数据同步的机制比较简单,而Http长轮询则比较复杂。Apache ShenYu借鉴了Apollo、Nacos的设计思想,取其精华,自己实现了Http长轮询数据同步功能。注意,这里并非传统的ajax长轮询!

Http长轮询 机制如上所示,Apache ShenYu网关主动请求 shenyu-admin 的配置服务,读取超时时间为 90s,意味着网关层请求配置服务最多会等待 90s,这样便于 shenyu-admin 配置服务及时响应变更数据,从而实现准实时推送。
Http长轮询 机制是由网关主动请求 shenyu-admin ,所以这次的源码分析,我们从网关这一侧开始。
2. 网关数据同步
2.1 加载配置
Http长轮询 数据同步配置的加载是通过spring boot的starter机制,当我们引入相关依赖和在配置文件中有如下配置时,就会加载。
在pom文件中引入依赖:
<!--shenyu data sync start use http-->
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>
<version>${project.version}</version>
</dependency>
在application.yml配置文件中添加配置:
shenyu:
sync:
http:
url : http://localhost:9095
当网关启动时,配置类HttpSyncDataConfiguration就会执行,加载相应的Bean。
/**
* Http sync data configuration for spring boot.
*/
@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")
@EnableConfigurationProperties(value = HttpConfig.class)
public class HttpSyncDataConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpSyncDataConfiguration.class);
/**
* Rest template.
* 创建RestTemplate
* @param httpConfig the http config http配置
* @return the rest template
*/
@Bean
public RestTemplate restTemplate(final HttpConfig httpConfig) {
OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
factory.setConnectTimeout(Objects.isNull(httpConfig.getConnectionTimeout()) ? (int) HttpConstants.CLIENT_POLLING_CONNECT_TIMEOUT : httpConfig.getConnectionTimeout());
factory.setReadTimeout(Objects.isNull(httpConfig.getReadTimeout()) ? (int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT : httpConfig.getReadTimeout());
factory.setWriteTimeout(Objects.isNull(httpConfig.getWriteTimeout()) ? (int) HttpConstants.CLIENT_POLLING_WRITE_TIMEOUT : httpConfig.getWriteTimeout());
return new RestTemplate(factory);
}
/**
* AccessTokenManager.
* 创建AccessTokenManager,专门用户对admin进行http请求时access token的处理
* @param httpConfig the http config.
* @param restTemplate the rest template.
* @return the access token manager.
*/
@Bean
public AccessTokenManager accessTokenManager(final HttpConfig httpConfig, final RestTemplate restTemplate) {
return new AccessTokenManager(restTemplate, httpConfig);
}
/**
* Http sync data service.
* 创建 HttpSyncDataService
* @param httpConfig the http config
* @param pluginSubscriber the plugin subscriber
* @param restTemplate the rest template
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @param accessTokenManager the access token manager
* @return the sync data service
*/
@Bean
public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig,
final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<RestTemplate> restTemplate,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
final ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
final ObjectProvider<AccessTokenManager> accessTokenManager) {
LOGGER.info("you use http long pull sync shenyu data");
return new HttpSyncDataService(
Objects.requireNonNull(httpConfig.getIfAvailable()),
Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
Objects.requireNonNull(restTemplate.getIfAvailable()),
metaSubscribers.getIfAvailable(Collections::emptyList),
authSubscribers.getIfAvailable(Collections::emptyList),
Objects.requireNonNull(accessTokenManager.getIfAvailable())
);
}
}
HttpSyncDataConfiguration是Http长轮询数据同步的配置类,负责创建HttpSyncDataService(负责http数据同步的具体实现)、RestTemplate和AccessTokenManager (负责与adminhttp调用时access token的处理)。它的注解如下:
@Configuration:表示这是一个配置类;@ConditionalOnClass(HttpSyncDataService.class):条件注解,表示要有HttpSyncDataService这个类;@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url"):条件注解,要有shenyu.sync.http.url这个属性配置。@EnableConfigurationProperties(value = HttpConfig.class):表示让HttpConfig上的注解@ConfigurationProperties(prefix = "shenyu.sync.http")生效,将HttpConfig这个配置类注入Ioc容器中。
2.2 属性初始化
- HttpSyncDataService
在HttpSyncDataService的构造函数中,完成属性初始化。
public class HttpSyncDataService implements SyncDataService {
// 省略了属性字段......
public HttpSyncDataService(final HttpConfig httpConfig,
final PluginDataSubscriber pluginDataSubscriber,
final RestTemplate restTemplate,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers,
final AccessTokenManager accessTokenManager) {
// 1.设置accessTokenManager
this.accessTokenManager = accessTokenManager;
// 2.创建数据处理器
this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
// 3.shenyu-admin的url, 多个用逗号(,)分割
this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
// 4.只用于http长轮询的restTemplate
this.restTemplate = restTemplate;
// 5.开始执行长轮询任务
this.start();
}
//......
}
上面代码中省略了其他函数和相关字段,在构造函数中完成属性的初始化,主要是:
-
设置
accessTokenManager,定时向admin请求更新accessToken的值。然后每次向admin发起请求时都必须将header的X-Access-Token属性设置成accessToken对应的值; -
创建数据处理器,用于后续缓存各种类型的数据(插件、选择器、规则、元数据和认证数据);
-
获取
admin属性配置,主要是获取admin的url,admin有可能是集群,多个用逗号(,)分割; -
设置
RestTemplate,用于向admin发起请求; -
开始执行长轮询任务。
2.3 开始长轮询
- HttpSyncDataService#start()
在start()方法中,干了两件事情,一个是获取全量数据,即请求admin