Skip to main content

2 posts tagged with "http"

View All Tags

· 31 min read

Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.

In ShenYu gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for ZooKeeper, WebSocket, http long poll, Nacos, etcd and Consul. The main content of this article is based on http long poll data synchronization source code analysis.

This paper based on shenyu-2.5.0 version of the source code analysis, the official website of the introduction of please refer to the Data Synchronization Design .

1. Http Long Polling

Here is a direct quote from the official website with the relevant description.

The mechanism of Zookeeper and WebSocket data synchronization is relatively simple, while Http long polling is more complex. Apache ShenYu borrowed the design ideas of Apollo and Nacos, took their essence, and implemented Http long polling data synchronization function by itself. Note that this is not the traditional ajax long polling!

Http Long Polling mechanism as shown above, Apache ShenYu gateway active request shenyu-admin configuration service, read timeout time is 90s, means that the gateway layer request configuration service will wait at most 90s, so as to facilitate shenyu-admin configuration service timely response to change data, so as to achieve quasi real-time push.

The Http long polling mechanism is initiated by the gateway requesting shenyu-admin, so for this source code analysis, we start from the gateway side.

2. Gateway Data Sync

2.1 Load Configuration

The Http long polling data synchronization configuration is loaded through spring boot starter mechanism when we introduce the relevant dependencies and have the following configuration in the configuration file.

Introduce dependencies in the pom file.

<!--shenyu data sync start use http-->
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>
<version>${project.version}</version>
</dependency>

Add the following configuration to the application.yml configuration file.

shenyu:
sync:
http:
url : http://localhost:9095

When the gateway is started, the configuration class HttpSyncDataConfiguration is executed, loading the corresponding Bean.


/**
* Http sync data configuration for spring boot.
*/
@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")
@EnableConfigurationProperties(value = HttpConfig.class)
public class HttpSyncDataConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpSyncDataConfiguration.class);

/**
* Rest template.
*
* @param httpConfig the http config
* @return the rest template
*/
@Bean
public RestTemplate restTemplate(final HttpConfig httpConfig) {
OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
factory.setConnectTimeout(Objects.isNull(httpConfig.getConnectionTimeout()) ? (int) HttpConstants.CLIENT_POLLING_CONNECT_TIMEOUT : httpConfig.getConnectionTimeout());
factory.setReadTimeout(Objects.isNull(httpConfig.getReadTimeout()) ? (int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT : httpConfig.getReadTimeout());
factory.setWriteTimeout(Objects.isNull(httpConfig.getWriteTimeout()) ? (int) HttpConstants.CLIENT_POLLING_WRITE_TIMEOUT : httpConfig.getWriteTimeout());
return new RestTemplate(factory);
}

/**
* AccessTokenManager.
*
* @param httpConfig the http config.
* @param restTemplate the rest template.
* @return the access token manager.
*/
@Bean
public AccessTokenManager accessTokenManager(final HttpConfig httpConfig, final RestTemplate restTemplate) {
return new AccessTokenManager(restTemplate, httpConfig);
}

/**
* Http sync data service.
*
* @param httpConfig the http config
* @param pluginSubscriber the plugin subscriber
* @param restTemplate the rest template
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @param accessTokenManager the access token manager
* @return the sync data service
*/
@Bean
public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig,
final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<RestTemplate> restTemplate,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
final ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
final ObjectProvider<AccessTokenManager> accessTokenManager) {
LOGGER.info("you use http long pull sync shenyu data");
return new HttpSyncDataService(
Objects.requireNonNull(httpConfig.getIfAvailable()),
Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
Objects.requireNonNull(restTemplate.getIfAvailable()),
metaSubscribers.getIfAvailable(Collections::emptyList),
authSubscribers.getIfAvailable(Collections::emptyList),
Objects.requireNonNull(accessTokenManager.getIfAvailable())
);
}
}

HttpSyncDataConfiguration is the configuration class for Http long polling data synchronization, responsible for creating HttpSyncDataService (responsible for the concrete implementation of http data synchronization) 、 RestTemplate and AccessTokenManager (responsible for the access token processing). It is annotated as follows.

  • @Configuration: indicates that this is a configuration class.
  • @ConditionalOnClass(HttpSyncDataService.class): conditional annotation indicating that the class HttpSyncDataService is to be present.
  • @ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url"): conditional annotation to have the property shenyu.sync.http.url configured.
  • @EnableConfigurationProperties(value = HttpConfig.class): indicates that the annotation @ConfigurationProperties(prefix = "shenyu.sync.http") on HttpConfig will take effect, and the configuration class HttpConfig will be injected into the Ioc container.

2.2 Property initialization

  • HttpSyncDataService

In the constructor of HttpSyncDataService, complete the property initialization.

public class HttpSyncDataService implements SyncDataService {

// omitted attribute field ......

public HttpSyncDataService(final HttpConfig httpConfig,
final PluginDataSubscriber pluginDataSubscriber,
final RestTemplate restTemplate,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers,
final AccessTokenManager accessTokenManager) {
// 1. accessTokenManager
this.accessTokenManager = accessTokenManager;
// 2. create data refresh factory
this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
// 3. shenyu-admin url
this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
// 4. restTemplate
this.restTemplate = restTemplate;
// 5. start a long polling task
this.start();
}

//......
}

Other functions and related fields are omitted from the above code, and the initialization of the properties is done in the constructor, mainly.

  • the role of accessTokenManager is to request admin and update the access token regularly.

  • creating data processors for subsequent caching of various types of data (plugins, selectors, rules, metadata and authentication data).

  • obtaining the admin property configuration, mainly to obtain the url of the admin, admin with possible clusters, multiple split by a comma (,).

  • using RestTemplate, for launching requests to admin.

  • Start the long polling task.

2.3 Start the long polling task.

  • HttpSyncDataService#start()

In the start() method, two things are done, one is to get the full amount of data, that is, to request the admin side to get all the data that needs to be synchronized, and then cache the acquired data into the gateway memory. The other is to open a multi-threaded execution of a long polling task.

public class HttpSyncDataService implements SyncDataService {

// ......

private void start() {
// It could be initialized multiple times, so you need to control that.
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
// Initial startup, get full data
this.fetchGroupConfig(ConfigGroupEnum.values());
// one backend service, one thread
int threadSize = serverList.size();
// ThreadPoolExecutor
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ShenyuThreadFactory.create("http-long-polling", true));
// start long polling, each server creates a thread to listen for changes.
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
LOG.info("shenyu http long polling was started, executor=[{}]", executor);
}
}

// ......
}
2.3.1 Fetch Data
  • HttpSyncDataService#fetchGroupConfig()

ShenYu groups all the data that needs to be synchronized, there are 5 data types, namely plugins, selectors, rules, metadata and authentication data.

public enum ConfigGroupEnum {
APP_AUTH, // app auth data
PLUGIN, // plugin data
RULE, // rule data
SELECTOR, // selector data
META_DATA; // meta data
}

The admin may be a cluster, and here a request is made to each admin in a round-robin fashion, and if one succeeds, then the operation to get the full amount of data from the admin and cache it to the gateway is executed successfully. If there is an exception, the request is launched to the next admin.

public class HttpSyncDataService implements SyncDataService {

// ......

private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException {
// It is possible that admins are clustered, and here requests are made to each admin by means of a loop.
for (int index = 0; index < this.serverList.size(); index++) {
String server = serverList.get(index);
try {
// do execute
this.doFetchGroupConfig(server, groups);
// If you have a success, you are successful and can exit the loop
break;
} catch (ShenyuException e) {
// An exception occurs, try executing the next
// The last one also failed to execute, throwing an exception
// no available server, throw exception.
if (index >= serverList.size() - 1) {
throw e;
}
LOG.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
}
}
}

// ......
}
  • HttpSyncDataService#doFetchGroupConfig()

In this method, the request parameters are first assembled, then the request is launched through httpClient to admin to get the data, and finally the obtained data is updated to the gateway memory.

public class HttpSyncDataService implements SyncDataService {

// ......

// Launch a request to the admin backend management system to get all synchronized data
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
// 1. build request parameters, all grouped enumeration types
StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
// admin url: /configs/fetch
String url = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_FETCH + "?" + StringUtils.removeEnd(params.toString(), "&");
LOG.info("request configs: [{}]", url);
String json;
try {
HttpHeaders headers = new HttpHeaders();
// set accessToken
headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());
HttpEntity<String> httpEntity = new HttpEntity<>(headers);
// 2. get a request for change data
json = this.restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class).getBody();
} catch (RestClientException e) {
String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
LOG.warn(message);
throw new ShenyuException(message, e);
}
// update local cache
// 3. Update data in gateway memory
boolean updated = this.updateCacheWithJson(json);
if (updated) {
LOG.debug("get latest configs: [{}]", json);
return;
}
// not updated. it is likely that the current config server has not been updated yet. wait a moment.
LOG.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
// No data update on the server side, just wait 30s
ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}

// ......
}

From the code, we can see that the admin side provides the interface to get the full amount of data is /configs/fetch, so we will not go further here and put it in the later analysis.

If you get the result data from admin and update it successfully, then this method is finished. If there is no successful update, then it is possible that there is no data update on the server side, so wait 30s.

Here you need to explain in advance, the gateway in determining whether the update is successful, there is a comparison of the data operation, immediately mentioned.

  • HttpSyncDataService#updateCacheWithJson()

Update the data in the gateway memory. Use GSON for deserialization, take the real data from the property data and give it to DataRefreshFactory to do the update.

public class HttpSyncDataService implements SyncDataService {

// ......

private boolean updateCacheWithJson(final String json) {
// Using GSON for deserialization
JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
// if the config cache will be updated?
return factory.executor(jsonObject.getAsJsonObject("data"));
}

// ......
}
  • DataRefreshFactory#executor()

Update the data according to different data types and return the updated result. The specific update logic is given to the dataRefresh.refresh() method. In the update result, one of the data types is updated, which means that the operation has been updated.

public final class DataRefreshFactory {

// ......

public boolean executor(final JsonObject data) {
// update data
List<Boolean> result = ENUM_MAP.values().parallelStream()
.map(dataRefresh -> dataRefresh.refresh(data))
.collect(Collectors.toList());
// one of the data types is updated, which means that the operation has been updated.
return result.stream().anyMatch(Boolean.TRUE::equals);
}

// ......
}
  • AbstractDataRefresh#refresh()

The data update logic uses the template method design pattern, where the generic operation is done in the abstract method and the different implementation logic is done by subclasses. 5 data types have some differences in the specific update logic, but there is also a common update logic, and the class diagram relationship is as follows.

In the generic refresh() method, it is responsible for data type conversion, determining whether an update is needed, and the actual data refresh operation.

public abstract class AbstractDataRefresh<T> implements DataRefresh {

// ......

@Override
public Boolean refresh(final JsonObject data) {
// convert data
JsonObject jsonObject = convert(data);
if (Objects.isNull(jsonObject)) {
return false;
}

boolean updated = false;
// get data
ConfigData<T> result = fromJson(jsonObject);
// does it need to be updated
if (this.updateCacheIfNeed(result)) {
updated = true;
// real update logic, data refresh operation
refresh(result.getData());
}

return updated;
}

// ......
}
  • AbstractDataRefresh#updateCacheIfNeed()

The process of data conversion, which is based on different data types, we will not trace further to see if the data needs to be updated logically. The method name is updateCacheIfNeed(), which is implemented by method overloading.

public abstract class AbstractDataRefresh<T> implements DataRefresh {

// ......

// result is data
protected abstract boolean updateCacheIfNeed(ConfigData<T> result);

// newVal is the latest value obtained
// What kind of data type is groupEnum
protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
// If it is the first time, then it is put directly into the cache and returns true, indicating that the update was made this time
if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
return true;
}
ResultHolder holder = new ResultHolder(false);
GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
// md5 value is the same, no need to update
if (StringUtils.equals(oldVal.getMd5(), newVal.getMd5())) {
LOG.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());
return oldVal;
}

// The current cached data has been modified for a longer period than the new data and does not need to be updated.
// must compare the last update time
if (oldVal.getLastModifyTime() >= newVal.getLastModifyTime()) {
LOG.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum);
return oldVal;
}
LOG.info("update {} config: {}", groupEnum, newVal);
holder.result = true;
return newVal;
});
return holder.result;
}

// ......
}

As you can see from the source code above, there are two cases where updates are not required.

  • The md5 values of both data are the same, so no update is needed;
  • The current cached data has been modified longer than the new data, so no update is needed.

In other cases, the data needs to be updated.

At this point, we have finished analyzing the logic of the start() method to get the full amount of data for the first time, followed by the long polling operation. For convenience, I will paste the start() method once more.

public class HttpSyncDataService implements SyncDataService {

// ......

private void start() {
// It could be initialized multiple times, so you need to control that.
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
// Initial startup, get full data
this.fetchGroupConfig(ConfigGroupEnum.values());
// one backend service, one thread
int threadSize = serverList.size();
// ThreadPoolExecutor
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ShenyuThreadFactory.create("http-long-polling", true));
// start long polling, each server creates a thread to listen for changes.
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
LOG.info("shenyu http long polling was started, executor=[{}]", executor);
}
}

// ......
}
2.3.2 Execute Long Polling Task
  • HttpLongPollingTask#run()

The long polling task is HttpLongPollingTask, which implements the Runnable interface and the task logic is in the run() method. The task is executed continuously through a while() loop, i.e., long polling. There are three retries in each polling logic, one polling task fails, wait 5s and continue, 3 times all fail, wait 5 minutes and try again.

Start long polling, an admin service, and create a thread for data synchronization.

class HttpLongPollingTask implements Runnable {

private final String server;

HttpLongPollingTask(final String server) {
this.server = server;
}

@Override
public void run() {
// long polling
while (RUNNING.get()) {
// Default retry 3 times
int retryTimes = 3;
for (int time = 1; time <= retryTimes; time++) {
try {
doLongPolling(server);
} catch (Exception e) {
if (time < retryTimes) {
LOG.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
time, retryTimes - time, e.getMessage());
// long polling failed, wait 5s and continue
ThreadUtils.sleep(TimeUnit.SECONDS, 5);
continue;
}
// print error, then suspended for a while.
LOG.error("Long polling failed, try again after 5 minutes!", e);
// 3 次都失败了,等 5 分钟再试
ThreadUtils.sleep(TimeUnit.MINUTES, 5);
}
}
}
LOG.warn("Stop http long polling.");
}
}
  • HttpSyncDataService#doLongPolling()

Core logic for performing long polling tasks.

  • Assembling request parameters based on data types: md5 and lastModifyTime.
  • Assembling the request header and request body.
  • Launching a request to admin to determine if the group data has changed.
  • Based on the group that has changed, go back and get the data.
public class HttpSyncDataService implements SyncDataService {
private void doLongPolling(final String server) {
// build request params: md5 and lastModifyTime
MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
ConfigData<?> cacheConfig = factory.cacheConfigData(group);
if (cacheConfig != null) {
String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
params.put(group.name(), Lists.newArrayList(value));
}
}
// build request head and body
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
// set accessToken
headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());
HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, headers);
String listenerUrl = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_LISTENER;

JsonArray groupJson;
//Initiate a request to admin to determine if the group data has changed
//Here it just determines whether a group has changed or not
try {
String json = this.restTemplate.postForEntity(listenerUrl, httpEntity, String.class).getBody();
LOG.info("listener result: [{}]", json);
JsonObject responseFromServer = GsonUtils.getGson().fromJson(json, JsonObject.class);
groupJson = responseFromServer.getAsJsonArray("data");
} catch (RestClientException e) {
String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
throw new ShenyuException(message, e);
}
// Depending on the group where the change occurred, go back and get the data
/**
* The official website explains here.
* After the gateway receives the response message, it only knows which Group has made the configuration change, and it still needs to request the configuration data of that Group again.
* There may be a question here: why not write out the changed data directly?
* We also discussed this issue in depth during development, because the http long polling mechanism can only guarantee quasi-real time, if the processing at the gateway layer is not timely, * or the administrator frequently updates the configuration, it is very difficult to get the information from the gateway layer.
* If it is not processed in time at the gateway level, or if the administrator updates the configuration frequently, it is very likely to miss the push of a configuration change, so for security reasons, we only inform a group that the information has changed.
*For security reasons, we only notify a group of changes.
* Personal understanding.
* If the change data is written out directly, when the administrator frequently updates the configuration, the first update will remove the client from the blocking queue and return the response information to the gateway.
* If a second update is made at this time, the current client is not in the blocking queue, so this time the change is missed.
* The same is true for untimely processing by the gateway layer.
* This is a long polling, one gateway one synchronization thread, there may be time consuming process.
* If the admin has data changes, the current gateway client is not in the blocking queue and will not get the data.
*/
if (groupJson != null) {
// fetch group configuration async.
ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
if (ArrayUtils.isNotEmpty(changedGroups)) {
log.info("Group config changed: {}", Arrays.toString(changedGroups));
// Proactively get the changed data from admin, depending on the grouping, and take the data in full
this.doFetchGroupConfig(server, changedGroups);
}
}
if (Objects.nonNull(groupJson) && groupJson.size() > 0) {
// fetch group configuration async.
ConfigGroupEnum[] changedGroups = GsonUtils.getGson().fromJson(groupJson, ConfigGroupEnum[].class);
LOG.info("Group config changed: {}", Arrays.toString(changedGroups));
// Proactively get the changed data from admin, depending on the grouping, and take the data in full
this.doFetchGroupConfig(server, changedGroups);
}
}
}

One special point needs to be explained here: In the long polling task, why don't you get the changed data directly? Instead, we determine which group data has been changed, and then request admin again to get the changed data?

The official explanation here is.

After the gateway receives the response information, it only knows which Group has changed its configuration, and it needs to request the configuration data of that Group again. There may be a question here: Why not write out the changed data directly? We have discussed this issue in depth during development, because the http long polling mechanism can only guarantee quasi-real time, and if it is not processed in time at the gateway layer, it will be very difficult to update the configuration data. If the gateway layer is not processed in time, or the administrator updates the configuration frequently, it is likely to miss the push of a configuration change, so for security reasons, we only inform a group that the information has changed.

My personal understanding is that.

If the change data is written out directly, when the administrator updates the configuration frequently, the first update will remove the client from blocking queue and return the response information to the gateway. If a second update is made at this time, then the current client is not in the blocking queue, so this time the change is missed. The same is true for the gateway layer's untimely processing. This is a long polling, one gateway one synchronization thread, there may be a time-consuming process. If admin has data changes, the current gateway client is not in the blocking queue and will not get the data.

We have not yet analyzed the processing logic of the admin side, so let's talk about it roughly. At the admin end, the gateway client will be put into the blocking queue, and when there is a data change, the gateway client will come out of the queue and send the change data. So, if the gateway client is not in the blocking queue when there is a data change, then the current changed data is not available.

When we know which grouping data has changed, we actively get the changed data from admin again, and get the data in full depending on the grouping. The call method is doFetchGroupConfig(), which has been analyzed in the previous section.

At this point of analysis, the data synchronization operation on the gateway side is complete. The long polling task is to keep making requests to admin to see if the data has changed, and if any group data has changed, then initiate another request to admin to get the changed data, and then update the data in the gateway's memory.

Long polling task flow at the gateway side.

3. Admin Data Sync

From the previous analysis, it can be seen that the gateway side mainly calls two interfaces of admin.

  • /configs/listener: determine whether the group data has changed.
  • /configs/fetch: get the changed group data.

If we analyze directly from these two interfaces, some parts may not be well understood, so let's start analyzing the data synchronization process from the admin startup process.

3.1 Load Configuration

If the following configuration is done in the configuration file application.yml, it means that the data synchronization is done by http long polling.

shenyu:
sync:
http:
enabled: true

When the program starts, the configuration of the data synchronization class is loaded through springboot conditional assembly. In this process, HttpLongPollingDataChangedListener is created to handle the implementation logic related to long polling.

/**
* Data synchronization configuration class
* Conditional assembly via springboot
* The type Data sync configuration.
*/
@Configuration
public class DataSyncConfiguration {

/**
* http long polling.
*/
@Configuration
@ConditionalOnProperty(name = "shenyu.sync.http.enabled", havingValue = "true")
@EnableConfigurationProperties(HttpSyncProperties.class)
static class HttpLongPollingListener {

@Bean
@ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)
public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
return new HttpLongPollingDataChangedListener(httpSyncProperties);
}
}
}

3.2 Data change listener instantiation

  • HttpLongPollingDataChangedListener

The data change listener is instantiated and initialized by means of a constructor. In the constructor, a blocking queue is created to hold clients, a thread pool is created to execute deferred tasks and periodic tasks, and information about the properties of long polling is stored.

    public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
// default client (here is the gateway) 1024
this.clients = new ArrayBlockingQueue<>(1024);
// create thread pool
// ScheduledThreadPoolExecutor can perform delayed tasks, periodic tasks, and normal tasks
this.scheduler = new ScheduledThreadPoolExecutor(1,
ShenyuThreadFactory.create("long-polling", true));
// http sync properties
this.httpSyncProperties = httpSyncProperties;
}

In addition, it has the following class diagram relationships.

The InitializingBean interface is implemented, so the afterInitialize() method is executed during the initialization of the bean. Execute periodic tasks via thread pool: updating the data in memory (CACHE) is executed every 5 minutes and starts after 5 minutes. Refreshing the local cache is reading data from the database to the local cache (in this case the memory), done by refreshLocalCache().

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {

// ......

/**
* is called in the afterPropertiesSet() method of the InitializingBean interface, which is executed during the initialization of the bean
*/
@Override
protected void afterInitialize() {
long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
// Periodically check the data for changes and update the cache

// Execution cycle task: Update data in memory (CACHE) is executed every 5 minutes and starts after 5 minutes
// Prevent the admin from starting up first for a while and then generating data; then the gateway doesn't get the full amount of data when it first connects
scheduler.scheduleWithFixedDelay(() -> {
LOG.info("http sync strategy refresh config start.");
try {
// Read data from database to local cache (in this case, memory)
this.refreshLocalCache();
LOG.info("http sync strategy refresh config success.");
} catch (Exception e) {
LOG.error("http sync strategy refresh config error!", e);
}
}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
LOG.info("http sync strategy refresh interval: {}ms", syncInterval);
}

// ......
}
  • refreshLocalCache()

Update for each of the 5 data types.

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {

// ......

// Read data from database to local cache (in this case, memory)
private void refreshLocalCache() {
//update app auth data
this.updateAppAuthCache();
//update plugin data
this.updatePluginCache();
//update rule data
this.updateRuleCache();
//update selector data
this.updateSelectorCache();
//update meta data
this.updateMetaDataCache();
}

// ......
}

The logic of the 5 update methods is similar, call the service method to get the data and put it into the memory CACHE. Take the updateRuleData method updateRuleCache() for example, pass in the rule enumeration type and call ruleService.listAll() to get all the rule data from the database.

    /**
* Update rule cache.
*/
protected void updateRuleCache() {
this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll());
}
  • updateCache()

Update the data in memory using the data in the database.

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {

// ......

// cache Map
protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();

/**
* if md5 is not the same as the original, then update lcoal cache.
* @param group ConfigGroupEnum
* @param <T> the type of class
* @param data the new config data
*/
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
// data serialization
String json = GsonUtils.getInstance().toJson(data);
// pass in md5 value and modification time
ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
// update group data
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}

// ......
}

The initialization process is to start periodic tasks to update the memory data by fetching data from the database at regular intervals.

Next, we start the analysis of two interfaces.

  • /configs/listener: determines if the group data has changed.
  • /configs/fetch: fetching the changed group data.

3.3 Data change polling interface

  • /configs/listener: determines if the group data has changed.

The interface class is ConfigController, which only takes effect when using http long polling for data synchronization. The interface method listener() has no other logic and calls the doLongPolling() method directly.

   
/**
* This Controller only when HttpLongPollingDataChangedListener exist, will take effect.
*/
@ConditionalOnBean(HttpLongPollingDataChangedListener.class)
@RestController
@RequestMapping("/configs")
public class ConfigController {

private final HttpLongPollingDataChangedListener longPollingListener;

public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {
this.longPollingListener = longPollingListener;
}

// Omit other logic

/**
* Listener.
* Listen for data changes and perform long polling
* @param request the request
* @param response the response
*/
@PostMapping(value = "/listener")
public void listener(final HttpServletRequest request, final HttpServletResponse response) {
longPollingListener.doLongPolling(request, response);
}

}
  • HttpLongPollingDataChangedListener#doLongPolling()

Perform long polling tasks: If there are data changes, they will be responded to the client (in this case, the gateway side) immediately. Otherwise, the client will be blocked until there is a data change or a timeout.

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {

// ......

/**
* Execute long polling: If there is a data change, it will be responded to the client (here is the gateway side) immediately.
* Otherwise, the client will otherwise remain blocked until there is a data change or a timeout.
* @param request
* @param response
*/
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// compare group md5
// Compare the md5, determine whether the data of the gateway and the data of the admin side are consistent, and get the data group that has changed
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
// response immediately.
// Immediate response to the gateway if there is changed data
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
Log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}

// No change, then the client (in this case the gateway) is put into the blocking queue
// listen for configuration changed.
final AsyncContext asyncContext = request.startAsync();
// AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
asyncContext.setTimeout(0L);
// block client's thread.
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
}
  • HttpLongPollingDataChangedListener#compareChangedGroup()

To determine whether the group data has changed, the judgment logic is to compare the md5 value and lastModifyTime at the gateway side and the admin side.

  • If the md5 value is different, then it needs to be updated.
  • If the lastModifyTime on the admin side is greater than the lastModifyTime on the gateway side, then it needs to be updated.
 /**
* Determine if the group data has changed
* @param request
* @return
*/
private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
List<ConfigGroupEnum> changedGroup = new ArrayList<>(ConfigGroupEnum.values().length);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
// The md5 value and lastModifyTime of the data on the gateway side
String[] params = StringUtils.split(request.getParameter(group.name()), ',');
if (params == null || params.length != 2) {
throw new ShenyuException("group param invalid:" + request.getParameter(group.name()));
}
String clientMd5 = params[0];
long clientModifyTime = NumberUtils.toLong(params[1]);
ConfigDataCache serverCache = CACHE.get(group.name());
// do check. determine if the group data has changed
if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
changedGroup.add(group);
}
}
return changedGroup;
}
  • LongPollingClient

No change data, then the client (in this case the gateway) is put into the blocking queue. The blocking time is 60 seconds, i.e. after 60 seconds remove and respond to the client.

class LongPollingClient implements Runnable {
// omitted other logic

@Override
public void run() {
try {
// Removal after 60 seconds and response to the client
this.asyncTimeoutFuture = scheduler.schedule(() -> {
clients.remove(LongPollingClient.this);
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);

// Add to blocking queue
clients.add(this);

} catch (Exception ex) {
log.error("add long polling client error", ex);
}
}

/**
* Send response.
*
* @param changedGroups the changed groups
*/
void sendResponse(final List<ConfigGroupEnum> changedGroups) {
// cancel scheduler
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
// Groups responding to changes
generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
asyncContext.complete();
}
}

3.4 Get Change Data Interface

  • /configs/fetch: get change data;

Get the grouped data and return the result according to the parameters passed in by the gateway. The main implementation method is longPollingListener.fetchConfig().


@ConditionalOnBean(HttpLongPollingDataChangedListener.class)
@RestController
@RequestMapping("/configs")
public class ConfigController {

private final HttpLongPollingDataChangedListener longPollingListener;

public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {
this.longPollingListener = longPollingListener;
}

/**
* Fetch configs shenyu result.
* @param groupKeys the group keys
* @return the shenyu result
*/
@GetMapping("/fetch")
public ShenyuAdminResult fetchConfigs(@NotNull final String[] groupKeys) {
Map<String, ConfigData<?>> result = Maps.newHashMap();
for (String groupKey : groupKeys) {
ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));
result.put(groupKey, data);
}
return ShenyuAdminResult.success(ShenyuResultMessage.SUCCESS, result);
}

// Other interfaces are omitted

}
  • AbstractDataChangedListener#fetchConfig()

Data fetching is taken directly from CACHE, and then matched and encapsulated according to different grouping types.

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {

// ......

/**
* fetch configuration from cache.
* @param groupKey the group key
* @return the configuration data
*/
public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
// get data from CACHE
ConfigDataCache config = CACHE.get(groupKey.name());
switch (groupKey) {
case APP_AUTH: // app auth data
return buildConfigData(config, AppAuthData.class);
case PLUGIN: // plugin data
return buildConfigData(config, PluginData.class);
case RULE: // rule data
return buildConfigData(config, RuleData.class);
case SELECTOR: // selector data
return buildConfigData(config, SelectorData.class);
case META_DATA: // meta data
return buildConfigData(config, MetaData.class);
default: // other data type, throw exception
throw new IllegalStateException("Unexpected groupKey: " + groupKey);
}
}

// ......
}

3.5 Data Change

In the previous websocket data synchronization and zookeeper data synchronization source code analysis article, we know that the admin side data synchronization design structure is as follows.

Various data change listeners are subclasses of DataChangedListener.

When the data is modified on the admin side, event notifications are sent through the Spring event handling mechanism. The sending logic is as follows.


/**
* Event forwarders, which forward the changed events to each ConfigEventListener.
* Data change event distributor: synchronize the change data to ShenYu gateway when there is a data change in admin side
* Data changes rely on Spring's event-listening mechanism: ApplicationEventPublisher --> ApplicationEvent --> ApplicationListener
*
*/
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

// other logic omitted

/**
* Call this method when there are data changes
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// Iterate through the data change listeners (it's generally good to use a kind of data synchronization)
for (DataChangedListener listener : listeners) {
// What kind of data has changed
switch (event.getGroupKey()) {
case APP_AUTH: // app auth data
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // plugin data
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // rule data
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // selector data
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
// pull and save API document on seletor changed
applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // meta data
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default: // other data type, throw exception
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
}

Suppose, the plugin information is modified and the data is synchronized by http long polling, then the actual call to listener.onPluginChanged() is org.apache.shenyu.admin.listener. AbstractDataChangedListener#onPluginChanged.

    /**
* In the operation of the admin, there is an update of the plugin occurred
* @param changed the changed
* @param eventType the event type
*/
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
// update CACHE
this.updatePluginCache();
// execute change task
this.afterPluginChanged(changed, eventType);
}

There are two processing operations, one is to update the memory CACHE, which was analyzed earlier, and the other is to execute the change task, which is executed in the thread pool.

  • HttpLongPollingDataChangedListener#afterPluginChanged()
    @Override
protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
// execute by thread pool
scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));
}
  • DataChangeTask

Data change task: remove the clients in the blocking queue in turn and send a response to notify the gateway that a group of data has changed.

class DataChangeTask implements Runnable {
//other logic omitted

@Override
public void run() {
// If the client in the blocking queue exceeds the given value of 100, it is executed in batches
if (clients.size() > httpSyncProperties.getNotifyBatchSize()) {
List<LongPollingClient> targetClients = new ArrayList<>(clients.size());
clients.drainTo(targetClients);
List<List<LongPollingClient>> partitionClients = Lists.partition(targetClients, httpSyncProperties.getNotifyBatchSize());
// batch execution
partitionClients.forEach(item -> scheduler.execute(() -> doRun(item)));
} else {
// execute task
doRun(clients);
}
}

private void doRun(final Collection<LongPollingClient> clients) {
// Notify all clients that a data change has occurred
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
LongPollingClient client = iter.next();
iter.remove();
// send response to client
client.sendResponse(Collections.singletonList(groupKey));
Log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
}
}
}

At this point, the data synchronization logic on the admin side is analyzed. In the http long polling based data synchronization is, it has three main functions.

  • providing a data change listening interface.
  • providing the interface to get the changed data.
  • When there is a data change, remove the client in the blocking queue and respond to the result.

Finally, three diagrams describe the long polling task flow on the admin side.

  • /configs/listener data change listener interface.

  • /configs/fetch fetch change data interface.

  • Update data in the admin backend management system for data synchronization.

4. Summary

This article focuses on the source code analysis of http long polling data synchronization in the ShenYu gateway. The main knowledge points involved are as follows.

  • http long polling is initiated by the gateway side, which constantly requests the admin side.
  • change data at group granularity (authentication information, plugins, selectors, rules, metadata).
  • http long polling results in getting only the change group, and another request needs to be initiated to get the group data.
  • Whether the data is updated or not is determined by the md5 value and the modification time lastModifyTime.

· 29 min read

Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.

In ShenYu gateway, the registration center is used to register the client information to shenyu-admin, admin then synchronizes this information to the gateway through data synchronization, and the gateway completes traffic filtering through these data. The client information mainly includes interface information and URI information.

This article is based on shenyu-2.5.0 version for source code analysis, please refer to Client Access Principles for the introduction of the official website.

1. Registration Center Principle

When the client starts, it reads the interface information and uri information, and sends the data to shenyu-admin by the specified registration type.

The registration center in the figure requires the user to specify which registration type to use. ShenYu currently supports Http, Zookeeper, Etcd, Consul and Nacos for registration. Please refer to Client Access Configuration for details on how to configure them.

ShenYu introduces Disruptor in the principle design of the registration center, in which the Disruptor queue plays a role in decoupling data and operations, which is conducive to expansion. If too many registration requests lead to registration exceptions, it also has a data buffering role.

As shown in the figure, the registration center is divided into two parts, one is the registration center client register-client, the load processing client data reading. The other is the registration center server register-server, which is loaded to handle the server side (that is shenyu-admin) data writing. Data is sent and received by specifying the registration type.

  • Client: Usually it is a microservice, which can be springmvc, spring-cloud, dubbo, grpc, etc.
  • register-client: register the central client, read the client interface and uri information.
  • Disruptor: decoupling data from operations, data buffering role.
  • register-server: registry server, here is shenyu-admin, receive data, write to database, send data synchronization events.
  • registration-type: specify the registration type, complete data registration, currently supports Http, Zookeeper, Etcd, Consul and Nacos.

This article analyzes the use of Http for registration, so the specific processing flow is as follows.

On the client side, after the data is out of the queue, the data is transferred via http and on the server side, the corresponding interface is provided to receive the data and then write it to the queue.

2. Client Registration Process

When the client starts, it reads the attribute information according to the relevant configuration, and then writes it to the queue. Let's take the official shenyu-examples-http as an example and start the source code analysis . The official example is a microservice built by springboot. For the configuration of the registration center, please refer to the official website client access configuration .

2.1 Load configuration, read properties

Let's start with a diagram that ties together the initialization process of the registry client.

We are analyzing registration by means of http, so the following configuration is required.

shenyu:
register:
registerType: http
serverLists: http://localhost:9095
props:
username: admin
password: 123456
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false

Each attribute indicates the following meaning.

  • registerType: the service registration type, fill in http.
  • serverList: The address of the Shenyu-Admin project to fill in for the http registration type, note the addition of http:// and separate multiple addresses with English commas.
  • username: The username of the Shenyu-Admin
  • password: The password of the Shenyu-Admin
  • port: the start port of your project, currently springmvc/tars/grpc needs to be filled in.
  • contextPath: the routing prefix for your mvc project in shenyu gateway, such as /order, /product, etc. The gateway will route according to your prefix.
  • appName: the name of your application, if not configured, it will take the value of spring.application.name by default.
  • isFull: set true to proxy your entire service, false to proxy one of your controllers; currently applies to springmvc/springcloud.

After the project starts, it will first load the configuration file, read the property information and generate the corresponding Bean.

The first configuration file read is ShenyuSpringMvcClientConfiguration, which is the http registration configuration class for the shenyu client, indicated by @Configuration which is a configuration class, and by @ImportAutoConfiguration which is a configuration class. to introduce other configuration classes. Create SpringMvcClientEventListener, which mainly handles metadata and URI information.

/**
* Shenyu SpringMvc Client Configuration
*/
@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {

// create SpringMvcClientEventListener to handle metadata and URI
@Bean
public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
}
}

ShenyuClientCommonBeanConfiguration is a shenyu client common configuration class that will create the bean common to the registry client.

  • Create ShenyuClientRegisterRepository, which is created by factory class.
  • Create ShenyuRegisterCenterConfig, which reads the shenyu.register property configuration.
  • Create ShenyuClientConfig, read the shenyu.client property configuration.

/**
* Shenyu Client Common Bean Configuration
*/
@Configuration
public class ShenyuClientCommonBeanConfiguration {

// create ShenyuClientRegisterRepository by factory
@Bean
public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
return ShenyuClientRegisterRepositoryFactory.newInstance(config);
}

// create ShenyuRegisterCenterConfig to read shenyu.register properties
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}

// create ShenyuClientConfig to read shenyu.client properties
@Bean
@ConfigurationProperties(prefix = "shenyu")
public ShenyuClientConfig shenyuClientConfig() {
return new ShenyuClientConfig();
}
}

2.2 HttpClientRegisterRepository

The ShenyuClientRegisterRepository generated in the configuration file above is a concrete implementation of the client registration, which is an interface with the following implementation class.

  • HttpClientRegisterRepository: registration via http.
  • ConsulClientRegisterRepository: registration via Consul.
  • EtcdClientRegisterRepository: registration via Etcd; EtcdClientRegisterRepository: registration via Etcd.
  • NacosClientRegisterRepository: registration via nacos; NacosClientRegisterRepository: registration via nacos.
  • ZookeeperClientRegisterRepository: registration through Zookeeper.

The specific way which is achieved by loading through SPI, the implementation logic is as follows.


/**
* load ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {

private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();

/**
* create ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// Loading by means of SPI, type determined by registerType
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
//init ShenyuClientRegisterRepository
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}

The load type is specified by registerType, which is the type we specify in the configuration file at

shenyu:
register:
registerType: http
serverLists: http://localhost:9095

We specified http, so it will go to load HttpClientRegisterRepository. After the object is successfully created, the initialization method init() is executed as follows.

@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {

@Override
public void init(final ShenyuRegisterCenterConfig config) {
this.username = config.getProps().getProperty(Constants.USER_NAME);
this.password = config.getProps().getProperty(Constants.PASS_WORD);
this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
this.setAccessToken();
}

// ......
}

Read username, password and serverLists from the configuration file, the username, password and address of sheenyu-admin, in preparation for subsequent data sending. The class annotation @Join is used for SPI loading.

SPI, known as Service Provider Interface, is a service provider discovery feature built into the JDK, a mechanism for dynamic replacement discovery.

shenyu-spi is a custom SPI extension implementation for the Apache ShenYu gateway, designed and implemented with reference to Dubbo SPI extension implementation.

2.3 SpringMvcClientEventListener

Create SpringMvcClientEventListener, which is responsible for the construction and registration of client-side metadata and URI data, and its creation is done in the configuration file.

@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
public class ShenyuSpringMvcClientConfiguration {
// ......

// create SpringMvcClientEventListener
@Bean
public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
}
}

SpringMvcClientEventListener implements the AbstractContextRefreshedEventListener

The AbstractContextRefreshedEventListener is an abstract class. it implements the ApplicationListener interface and overrides the onApplicationEvent() method, which is executed when a Spring event occurs. It has several implementation classes, which support different kind of RPC styles.

  • AlibabaDubboServiceBeanListener:handles Alibaba Dubbo protocol.
  • ApacheDubboServiceBeanListener:handles Apache Dubbo protocol.
  • GrpcClientEventListener:handles grpc protocol.
  • MotanServiceEventListener:handles Motan protocol.
  • SofaServiceEventListener:handles Sofa protocol.
  • SpringMvcClientEventListener:handles http protocol.
  • SpringWebSocketClientEventListener:handles Websocket protocol.
  • TarsServiceBeanEventListener:handles Tars protocol.
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {

//......

// Instantiation is done through the constructor
public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// read shenyu.client.http properties
Properties props = clientConfig.getProps();
// appName
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
// contextPath
this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");
if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
String errorMsg = "client register param must config the appName or contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);
// host
this.host = props.getProperty(ShenyuClientConstants.HOST);
// port
this.port = props.getProperty(ShenyuClientConstants.PORT);
// publish event
publisher.start(shenyuClientRegisterRepository);
}

// This method is executed when a context refresh event(ContextRefreshedEvent), occurs
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {
// The contents of the method are guaranteed to be executed only once
if (!registered.compareAndSet(false, true)) {
return;
}
final ApplicationContext context = event.getApplicationContext();
// get the specific beans
Map<String, T> beans = getBeans(context);
if (MapUtils.isEmpty(beans)) {
return;
}
// build URI data and register it
publisher.publishEvent(buildURIRegisterDTO(context, beans));
// build metadata and register it
beans.forEach(this::handle);
}

@SuppressWarnings("all")
protected abstract URIRegisterDTO buildURIRegisterDTO(ApplicationContext context,
Map<String, T> beans);


protected void handle(final String beanName, final T bean) {
Class<?> clazz = getCorrectedClass(bean);
final A beanShenyuClient = AnnotatedElementUtils.findMergedAnnotation(clazz, getAnnotationType());
final String superPath = buildApiSuperPath(clazz, beanShenyuClient);
if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {
handleClass(clazz, bean, beanShenyuClient, superPath);
return;
}
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
for (Method method : methods) {
handleMethod(bean, clazz, beanShenyuClient, method, superPath);
}
}

// default implementation. build URI data and register it
protected void handleClass(final Class<?> clazz,
final T bean,
@NonNull final A beanShenyuClient,
final String superPath) {
publisher.publishEvent(buildMetaDataDTO(bean, beanShenyuClient, pathJoin(contextPath, superPath), clazz, null));
}

// default implementation. build metadata and register it
protected void handleMethod(final T bean,
final Class<?> clazz,
@Nullable final A beanShenyuClient,
final Method method,
final String superPath) {
// get the annotation
A methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, getAnnotationType());
if (Objects.nonNull(methodShenyuClient)) {
// 构建元数据,发送注册事件
publisher.publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));
}
}

protected abstract MetaDataRegisterDTO buildMetaDataDTO(T bean,
@NonNull A shenyuClient,
String path,
Class<?> clazz,
Method method);
}

In the constructor, the main purpose is to read the property information and then perform the checksum.

shenyu:
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false

Finally, publisher.start() is executed to start event publishing and prepare for registration.

ShenyuClientRegisterEventPublisher is implemented via singleton pattern, mainly generating metadata and URI subscribers (subsequently used for data publishing), and then starting the Disruptor queue. A common method publishEvent() is provided to publish events and send data to the Disruptor queue.


public class ShenyuClientRegisterEventPublisher {

private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();

private DisruptorProviderManage<DataTypeParent> providerManage;

public static ShenyuClientRegisterEventPublisher getInstance() {
return INSTANCE;
}

public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();
factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}

public <T> void publishEvent(final DataTypeParent data) {
DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
provider.onData(data);
}
}

The logic of the constructor of AbstractContextRefreshedEventListener is analyzed, it mainly reads the property configuration, creates metadata and URI subscribers, and starts the Disruptor queue.

The onApplicationEvent() method is executed when a Spring event occurs, the parameter here is ContextRefreshedEvent, which means the context refresh event.

ContextRefreshedEvent is a Spring built-in event. It is fired when the ApplicationContext is initialized or refreshed. This can also happen in the ConfigurableApplicationContext interface using the refresh() method. Initialization here means that all Beans have been successfully loaded, post-processing Beans have been detected and activated, all Singleton Beans have been pre-instantiated, and the ApplicationContext container is ready to be used.

  • SpringMvcClientEventListener: the http implementation of AbstractContextRefreshedEventListener:
public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {

private final List<Class<? extends Annotation>> mappingAnnotation = new ArrayList<>(3);

private final Boolean isFull;

private final String protocol;

// 构造函数
public SpringMvcClientEventListener(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
super(clientConfig, shenyuClientRegisterRepository);
Properties props = clientConfig.getProps();
// get isFull
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// http protocol
this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);
mappingAnnotation.add(ShenyuSpringMvcClient.class);
mappingAnnotation.add(RequestMapping.class);
}

@Override
protected Map<String, Object> getBeans(final ApplicationContext context) {
// Configuration attribute, if isFull=true, means register the whole microservice
if (Boolean.TRUE.equals(isFull)) {
getPublisher().publishEvent(MetaDataRegisterDTO.builder()
.contextPath(getContextPath())
.appName(getAppName())
.path(PathUtils.decoratorPathWithSlash(getContextPath()))
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(getContextPath())
.build());
return null;
}
// get bean with Controller annotation
return context.getBeansWithAnnotation(Controller.class);
}

@Override
protected URIRegisterDTO buildURIRegisterDTO(final ApplicationContext context,
final Map<String, Object> beans) {
// ...
}

@Override
protected String buildApiSuperPath(final Class<?> clazz, @Nullable final ShenyuSpringMvcClient beanShenyuClient) {
if (Objects.nonNull(beanShenyuClient) && StringUtils.isNotBlank(beanShenyuClient.path())) {
return beanShenyuClient.path();
}
RequestMapping requestMapping = AnnotationUtils.findAnnotation(clazz, RequestMapping.class);
// Only the first path is supported temporarily
if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {
return requestMapping.path()[0];
}
return "";
}

@Override
protected Class<ShenyuSpringMvcClient> getAnnotationType() {
return ShenyuSpringMvcClient.class;
}

@Override
protected void handleMethod(final Object bean, final Class<?> clazz,
@Nullable final ShenyuSpringMvcClient beanShenyuClient,
final Method method, final String superPath) {
// get RequestMapping annotation
final RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(method, RequestMapping.class);
// get ShenyuSpringMvcClient annotation
ShenyuSpringMvcClient methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, ShenyuSpringMvcClient.class);
methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;
// the result of ReflectionUtils#getUniqueDeclaredMethods contains method such as hashCode, wait, toSting
// add Objects.nonNull(requestMapping) to make sure not register wrong method
if (Objects.nonNull(methodShenyuClient) && Objects.nonNull(requestMapping)) {
getPublisher().publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));
}
}

//...

// 构造元数据
@Override
protected MetaDataRegisterDTO buildMetaDataDTO(final Object bean,
@NonNull final ShenyuSpringMvcClient shenyuClient,
final String path, final Class<?> clazz,
final Method method) {
//...
}
}

The registration logic is done through publisher.publishEvent().

The Controller annotation and the RequestMapping annotation are provided by Spring, which you should be familiar with, so I won't go into details. The ShenyuSpringMvcClient annotation is provided by Apache ShenYu to register the SpringMvc client, which is defined as follows.


/**
* ShenyuSpringMvcClient
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {

// path
@AliasFor(attribute = "path")
String value() default "";

// path
@AliasFor(attribute = "value")
String path();

// ruleName
String ruleName() default "";

// desc info
String desc() default "";

// enabled
boolean enabled() default true;

// register MetaData
boolean registerMetaData() default false;
}

It is used as follows.

  • register the entire interface
@RestController
@RequestMapping("/test")
@ShenyuSpringMvcClient(path = "/test/**") // register the entire interface
public class HttpTestController {
//......
}
  • register current method
@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order")
public class OrderController {

/**
* Save order dto.
*
* @param orderDTO the order dto
* @return the order dto
*/
@PostMapping("/save")
@ShenyuSpringMvcClient(path = "/save", desc = "Save order") // register current method
public OrderDTO save(@RequestBody final OrderDTO orderDTO) {
orderDTO.setName("hello world save order");
return orderDTO;
}
  • publisher.publishEvent()

This method sends the data to the Disruptor queue. More details about the Disruptor queue are not described here, which does not affect the flow of analyzing the registration.

When the data is sent, the consumers of the Disruptor queue will process the data for consumption.

This method sends the data to the Disruptor queue. More details about the Disruptor queue are not described here, which does not affect the flow of analyzing the registration.

  • QueueConsumer

QueueConsumer is a consumer that implements the WorkHandler interface, which is created in the providerManage.startup() logic. The WorkHandler interface is the data consumption interface for Disruptor, and the only method is onEvent().

package com.lmax.disruptor;

public interface WorkHandler<T> {
void onEvent(T event) throws Exception;
}

The QueueConsumer overrides the onEvent() method, and the main logic is to generate the consumption task and then go to the thread pool to execute it.


/**
*
* QueueConsumer
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {

// ......

@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// Use different thread pools based on DataEvent type
ThreadPoolExecutor executor = orderly(t);
// create queue consumption tasks via factory
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// set data
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// put in the thread pool to execute the consumption task
executor.execute(queueConsumerExecutor);
}
}
}

QueueConsumerExecutor is the task that is executed in the thread pool, it implements the Runnable interface, and there are two specific implementation classes.

  • RegisterClientConsumerExecutor:the client-side consumer executor.
  • RegisterServerConsumerExecutor:server-side consumer executor.

As the name implies, one is responsible for handling client-side tasks, and one is responsible for handling server-side tasks (the server side is admin, which is analyzed below).

  • RegisterClientConsumerExecutor

The logic of the rewritten run() is as follows.


public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {

//......

@Override
public void run() {
// get data
final T data = getData();
// call the appropriate processor for processing according to the data type
subscribers.get(data.getType()).executor(Lists.newArrayList(data));
}

}

Different processors are called to perform the corresponding tasks based on different data types. There are two types of data, one is metadata, which records the client registration information. One is the URI data, which records the client service information.

public enum DataType {

META_DATA,

URI,
}
  • ExecutorSubscriber#executor()

The actuator subscribers are divided into two categories, one that handles metadata and one that handles URIs. There are two on the client side and two on the server side, so there are four in total.

Here is the registration metadata information, so the execution class is ShenyuClientMetadataExecutorSubscriber.

  • ShenyuClientMetadataExecutorSubscriber#executor()

The metadata processing logic on the client side is: iterate through the metadata information and call the interface method persistInterface() to finish publishing the data.

public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.META_DATA;
}

@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
// call the interface method persistInterface() to finish publishing the data
shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);
}
}
}

The two registration interfaces get the data well and call the publish() method to publish the data to the Disruptor queue.

  • ShenyuServerRegisterRepository

The ShenyuServerRegisterRepository interface is a service registration interface, which has five implementation classes, indicating five types of registration.

  • ConsulServerRegisterRepository: registration is achieved through Consul;
  • EtcdServerRegisterRepository: registration through Etcd.
  • NacosServerRegisterRepository: registration through Nacos.
  • ShenyuHttpRegistryController: registration via Http; ShenyuHttpRegistryController: registration via Http.
  • ZookeeperServerRegisterRepository: registration through Zookeeper.

As you can see from the diagram, the loading of the registry is done by means of SPI. This was mentioned earlier, and the specific class loading is done in the client-side generic configuration file by specifying the properties in the configuration file.


/**
* load ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {

private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();

/**
* create ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// loading by means of SPI, type determined by registerType
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
// perform initialization operations
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}

The source code analysis in this article is based on the Http way of registration, so we first analyze the HttpClientRegisterRepository, and the other registration methods will be analyzed afterwards.

Registration by way of http is very simple, it is to call the tool class to send http requests. The registration metadata and URI are both called by the same method doRegister(), specifying the interface and type.

  • Constants.URI_PATH = /shenyu-client/register-metadata: the interface provided by the server for registering metadata.
  • Constants.META_PATH = /shenyu-client/register-uri: Server-side interface for registering URIs.
@Join
public class HttpClientRegisterRepository extends FailbackRegistryRepository {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientRegisterRepository.class);

private static URIRegisterDTO uriRegisterDTO;

private String username;

private String password;

private List<String> serverList;

private String accessToken;

public HttpClientRegisterRepository() {
}

public HttpClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
init(config);
}

@Override
public void init(final ShenyuRegisterCenterConfig config) {
// admin username
this.username = config.getProps().getProperty(Constants.USER_NAME);
// admin paaword
this.password = config.getProps().getProperty(Constants.PASS_WORD);
// admin server address
this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
// set access token
this.setAccessToken();
}

/**
* Persist uri.
*
* @param registerDTO the register dto
*/
@Override
public void doPersistURI(final URIRegisterDTO registerDTO) {
if (RuntimeUtils.listenByOther(registerDTO.getPort())) {
return;
}
doRegister(registerDTO, Constants.URI_PATH, Constants.URI);
uriRegisterDTO = registerDTO;
}

@Override
public void doPersistInterface(final MetaDataRegisterDTO metadata) {
doRegister(metadata, Constants.META_PATH, Constants.META_TYPE);
}

@Override
public void close() {
if (uriRegisterDTO != null) {
uriRegisterDTO.setEventType(EventType.DELETED);
doRegister(uriRegisterDTO, Constants.URI_PATH, Constants.URI);
}
}

private void setAccessToken() {
for (String server : serverList) {
try {
Optional<?> login = RegisterUtils.doLogin(username, password, server.concat(Constants.LOGIN_PATH));
login.ifPresent(v -> this.accessToken = String.valueOf(v));
} catch (Exception e) {
LOGGER.error("Login admin url :{} is fail, will retry. cause: {} ", server, e.getMessage());
}
}
}

private <T> void doRegister(final T t, final String path, final String type) {
int i = 0;
// iterate through the list of admin services (admin may be clustered)
for (String server : serverList) {
i++;
String concat = server.concat(path);
try {
// 设置访问token
if (StringUtils.isBlank(accessToken)) {
this.setAccessToken();
if (StringUtils.isBlank(accessToken)) {
throw new NullPointerException("accessToken is null");
}
}
// calling the tool class to send http requests
RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);
return;
} catch (Exception e) {
LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());
if (i == serverList.size()) {
throw new RuntimeException(e);
}
}
}
}
}

Serialize the data and send it via OkHttp.


public final class RegisterUtils {

//......

// Sending data via OkHttp
public static void doRegister(final String json, final String url, final String type) throws IOException {
if (!StringUtils.hasLength(accessToken)) {
LOGGER.error("{} client register error accessToken is null, please check the config : {} ", type, json);
return;
}
Headers headers = new Headers.Builder().add(Constants.X_ACCESS_TOKEN, accessToken).build();
String result = OkHttpTools.getInstance().post(url, json, headers);
if (Objects.equals(SUCCESS, result)) {
LOGGER.info("{} client register success: {} ", type, json);
} else {
LOGGER.error("{} client register error: {} ", type, json);
}
}
}

At this point, the logic of the client registering metadata by means of http is finished. To summarize: construct metadata by reading custom annotation information, send the data to the Disruptor queue, then consume the data from the queue, put the consumer into the thread pool to execute, and finally send an http request to the admin.

Similarly, ShenyuClientURIExecutorSubscriber is the execution class of registering URI information.

  • ShenyuClientURIExecutorSubscriber#executor()

The main logic is to iterate through the URI data collection and implement data registration through the persistURI() method.


public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.URI;
}

// register URI
@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
for (URIRegisterDTO uriRegisterDTO : dataList) {
Stopwatch stopwatch = Stopwatch.createStarted();
while (true) {
try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
break;
} catch (IOException e) {
long sleepTime = 1000;
// maybe the port is delay exposed
if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
LOG.error("host:{}, port:{} connection failed, will retry",
uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
// If the connection fails for a long time, Increase sleep time
if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
sleepTime = 10000;
}
}
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
ShenyuClientShutdownHook.delayOtherHooks();

shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
}
}
}

The while(true) loop in the code is to ensure that the client has been successfully started and can connect via host and port.

The logic behind it is: add the hook function for gracefully stopping the client .

Data registration is achieved through the persistURI() method. The whole logic is also analyzed in the previous section, and ultimately it is the OkHttp client that initiates http to shenyu-admin and registers the URI by way of http.

The analysis of the registration logic of the client is finished here, and the metadata and URI data constructed are sent to the Disruptor queue, from which they are then consumed, read, and sent to admin via http.

The source code analysis of the client-side metadata and URI registration process is complete, with the following flow chart.

3. Server-side registration process

3.1 ShenyuHttpRegistryController

From the previous analysis, we know that the server side provides two interfaces for registration.

  • /shenyu-client/register-metadata: The interface provided by the server side is used to register metadata.
  • /shenyu-client/register-uri: The server-side interface is provided for registering URIs.

These two interfaces are located in ShenyuHttpRegistryController, which implements the ShenyuServerRegisterRepository interface and is the implementation class for server-side registration. It is marked with @Join to indicate loading via SPI.

@RequestMapping("/shenyu-client")
@Join
public class ShenyuHttpRegistryController implements ShenyuServerRegisterRepository {

private ShenyuServerRegisterPublisher publisher;

@Override
public void init(final ShenyuServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {
this.publisher = publisher;
}

// register Metadata
@PostMapping("/register-metadata")
@ResponseBody
public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
publisher.publish(metaDataRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}

// register URI
@PostMapping("/register-uri")
@ResponseBody
public String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {
publisher.publish(uriRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}
}

The exact method used is specified by the configuration file and then loaded via SPI.

In the application.yml file in shenyu-admin configure the registration method, registerType specify the registration type, when registering with http, serverLists do not need to be filled in, for more configuration instructions you can refer to the official website Client Access Configuration.

shenyu:
register:
registerType: http
serverLists:
  • RegisterCenterConfiguration

After introducing the relevant dependencies and properties configuration, when starting shenyu-admin, the configuration file will be loaded first, and the configuration file class related to the registration center is RegisterCenterConfiguration.

@Configuration
public class RegisterCenterConfiguration {
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}

//create ShenyuServerRegisterRepository to register in admin
@Bean(destroyMethod = "close")
public ShenyuServerRegisterRepository shenyuServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
// 1. get the registration type from the configuration property
String registerType = shenyuRegisterCenterConfig.getRegisterType();
// 2. load the implementation class by registering the type with the SPI method
ShenyuServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuServerRegisterRepository.class).getJoin(registerType);
// 3. get the publisher and write data to the Disruptor queue
RegisterServerDisruptorPublisher publisher = RegisterServerDisruptorPublisher.getInstance();
// 4. ShenyuClientRegisterService, rpcType -> registerService
Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));
// 5. start publisher
publisher.start(registerServiceMap);
// 6. init registerRepository
registerRepository.init(publisher, shenyuRegisterCenterConfig);
return registerRepository;
}
}

Two beans are generated in the configuration class.

  • shenyuRegisterCenterConfig: to read the attribute configuration.

  • shenyuServerRegisterRepository: for server-side registration.

In the process of creating shenyuServerRegisterRepository, a series of preparations are also performed.

    1. get the registration type from the configuration property.
    1. Load the implementation class by the registration type with the SPI method: for example, if the specified type is http, ShenyuHttpRegistryController will be loaded.
    1. Get publisher and write data to the Disruptor queue.
    1. Register Service, rpcType -> registerService: get the registered Service, each rpc has a corresponding Service. The client for this article is built through springboot, which belongs to the http type, and other client types: dubbo, Spring Cloud, gRPC, etc.
    1. Preparation for event publishing: add server-side metadata and URI subscribers, process the data. And start the Disruptor queue.
    1. Initialization operation for registration: http type registration initialization operation is to save publisher.
  • RegisterClientServerDisruptorPublisher#publish()

The server-side publisher that writes data to the Disruptor queue , built via the singleton pattern.


public class RegisterClientServerDisruptorPublisher implements ShenyuServerRegisterPublisher {
private static final RegisterClientServerDisruptorPublisher INSTANCE = new private static final RegisterClientServerDisruptorPublisher INSTANCE = new RegisterServerDisruptorPublisher();
();

public static RegisterClientServerDisruptorPublisher getInstance() {
return INSTANCE;
}

//prepare for event publishing, add server-side metadata and URI subscribers, process data. And start the Disruptor queue.
public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();
// add URI data subscriber
factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
// add Metadata subscriber
factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
//start Disruptor
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}

// write data to queue
@Override
public <T> void publish(final DataTypeParent data) {
DisruptorProvider<Object> provider = providerManage.getProvider();
provider.onData(Collections.singleton(data));
}

// write data to queue on batch
@Override
public void publish(final Collection<? extends DataTypeParent> dataList) {
DisruptorProvider<Collection<DataTypeParent>> provider = providerManage.getProvider();
provider.onData(dataList.stream().map(DataTypeParent.class::cast).collect(Collectors.toList()));
}

@Override
public void close() {
providerManage.getProvider().shutdown();
}
}

The loading of the configuration file, which can be seen as the initialization process of the registry server, is described in the following diagram.

3.2 QueueConsumer

In the previous analysis of the client-side disruptor queue consumption of data over. The server side has the same logic, except that the executor performing the task changes.

The QueueConsumer is a consumer that implements the WorkHandler interface, which is created in the providerManage.startup() logic. The WorkHandler interface is the data consumption interface for disruptor, and the only method is onEvent().

package com.lmax.disruptor;

public interface WorkHandler<T> {
void onEvent(T var1) throws Exception;
}

The QueueConsumer overrides the onEvent() method, and the main logic is to generate the consumption task and then go to the thread pool to execute it.

/**
*
* QueueConsumer
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {

// ......

@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// Use different thread pools based on DataEvent type
ThreadPoolExecutor executor = orderly(t);
// create queue consumption tasks via factory
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// set data
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// put in the thread pool to execute the consumption task
executor.execute(queueConsumerExecutor);
}
}
}

QueueConsumerExecutor is the task that is executed in the thread pool, it implements the Runnable interface, and there are two specific implementation classes.

  • RegisterClientConsumerExecutor: the client-side consumer executor.
  • RegisterServerConsumerExecutor: server-side consumer executor.

As the name implies, one is responsible for handling client-side tasks and one is responsible for handling server-side tasks.

  • RegisterServerConsumerExecutor#run()

RegisterServerConsumerExecutor is a server-side consumer executor that indirectly implements the Runnable interface via QueueConsumerExecutor and overrides the run() method.


public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<List<DataTypeParent>> {
// ...

@Override
public void run() {
//get the data from the disruptor queue and check data
Collection<DataTypeParent> results = getData()
.stream()
.filter(this::isValidData)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(results)) {
return;
}
//execute operations according to type
getType(results).executor(results);
}

// get subscribers by type
private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {
final Optional<DataTypeParent> first = list.stream().findFirst();
return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());
}
}

  • ExecutorSubscriber#executor()

The actuator subscribers are divided into two categories, one that handles metadata and one that handles URIs. There are two on the client side and two on the server side, so there are four in total.

  • MetadataExecutorSubscriber#executor()

In case of registering metadata, this is achieved by MetadataExecutorSubscriber#executor(): get the registered Service according to the type and call register().

public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.META_DATA;
}

@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
// Traversing the metadata list
metaDataRegisterDTOList.forEach(meta -> {
Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType())) // Get registered Service by type
.ifPresent(shenyuClientRegisterService -> {
// Registration of metadata, locking to ensure sequential execution and prevent concurrent errors
synchronized (shenyuClientRegisterService) {
shenyuClientRegisterService.register(meta);
}
});
});
}
}
  • URIRegisterExecutorSubscriber#executor()

In case of registration metadata, this is achieved by URIRegisterExecutorSubscriber#executor(): construct URI data, find Service according to the registration type, and achieve registration by the registerURI method.


public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
//......

@Override
public DataType getType() {
return DataType.URI;
}

@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return;
}

findService(dataList).ifPresent(service -> {
Map<String, List<URIRegisterDTO>> listMap = buildData(dataList);
listMap.forEach(service::registerURI);
});
final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream()
.filter(data -> StringUtils.isNotBlank(data.getRpcType()))
.collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));
for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {
final String rpcType = entry.getKey();
// Get registered Service by type
Optional.ofNullable(shenyuClientRegisterService.get(rpcType))
.ifPresent(service -> {
final List<URIRegisterDTO> list = entry.getValue();
// Build URI data types and register them with the registerURI method
Map<String, List<URIRegisterDTO>> listMap = buildData(list);
listMap.forEach(service::registerURI);
});
}
}

// Find Service by type
private Optional<ShenyuClientRegisterService> findService(final Collection<URIRegisterDTO> dataList) {
return dataList.stream().map(dto -> shenyuClientRegisterService.get(dto.getRpcType())).findFirst();
}
}

  • ShenyuClientRegisterService#register()

ShenyuClientRegisterService is the registration method interface, which has several implementation classes.

  • AbstractContextPathRegisterService: abstract class, handling part of the public logic.
  • AbstractShenyuClientRegisterServiceImpl: : abstract class, handles part of the public logic.
  • ShenyuClientRegisterDivideServiceImpl: divide class, handles http registration types.
  • ShenyuClientRegisterDubboServiceImpl: dubbo class, handles dubbo registration types.
  • ShenyuClientRegisterGrpcServiceImpl: gRPC class, handles gRPC registration types.
  • ShenyuClientRegisterMotanServiceImpl: Motan class, handles Motan registration types.
  • ShenyuClientRegisterSofaServiceImpl: Sofa class, handles Sofa registration types.
  • ShenyuClientRegisterSpringCloudServiceImpl: SpringCloud class, handles SpringCloud registration types.
  • ShenyuClientRegisterTarsServiceImpl: Tars class, handles Tars registration types.
  • ShenyuClientRegisterWebSocketServiceImplWebsocket class,handles Websocket registration types.

From the above, we can see that each microservice has a corresponding registration implementation class. The source code analysis in this article is based on the official shenyu-examples-http as an example, it is of http registration type, so the registration implementation class for metadata and URI data is ShenyuClientRegisterDivideServiceImpl: ShenyuClientRegisterDivideServiceImpl.

  • register():
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {

//......

public String register(final MetaDataRegisterDTO dto) {
// 1.register selector information
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
// 2.register rule information
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
// 3.register metadata information
registerMetadata(dto);
// 4.register contextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
}

The whole registration logic can be divided into 4 steps.

    1. Register selector information
    1. Register rule information
    1. Register metadata information
    1. Register `contextPath

This side of admin requires the construction of selectors, rules, metadata and ContextPath through the metadata information of the client. The specific registration process and details of processing are related to the rpc type. We will not continue to track down the logical analysis of the registration center, tracking to this point is enough.

The source code of the server-side metadata registration process is analyzed and the flow chart is described as follows.

  • registerURI()
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {

//......

public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
// Does the corresponding selector exist
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
return "";
}
// Handle handler information in the selector
String handler = buildHandle(uriList, selectorDO);
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);

// Update records in the database
selectorService.updateSelective(selectorDO);
// publish Event to gateway
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
return ShenyuResultMessage.SUCCESS;
}
}

After admin gets the URI data, it mainly updates the handler information in the selector, then writes it to the database, and finally publishes the event notification gateway. The logic of notifying the gateway is done by the data synchronization operation, which has been analyzed in the previous article, so we will not repeat it.

The source code analysis of the server-side URI registration process is complete and is described in the following diagram.

At this point, the server-side registration process is also analyzed, mainly through the interface provided externally, accept the registration information from the client, and then write to the Disruptor queue, and then consume data from it, and update the admin selector, rules, metadata and selector handler according to the received metadata and URI data.

4. Summary

This article focuses on the http registration module of the Apache ShenYu gateway for source code analysis. The main knowledge points involved are summarized as follows.

  • The register center is for registering client information to admin to facilitate traffic filtering.
  • http registration is to register client metadata information and URI information to admin.
  • http service access is identified by the annotation @ShenyuSpringMvcClient.
  • construction of the registration information mainly through the application listener ApplicationListener.
  • loading of the registration type is done through SPI.
  • The Disruptor queue was introduced to decouple data from operations, and data buffering.
  • The implementation of the registry uses interface-oriented programming, using design patterns such as template methods, singleton, and observer.