Skip to main content

3 posts tagged with "data sync"

View All Tags

Http Long Polling Data Synchronization Source Code Analysis

· 30 min read
Apache ShenYu Committer

Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.

In ShenYu gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for ZooKeeper, WebSocket, http long poll, Nacos, etcd and Consul. The main content of this article is based on http long poll data synchronization source code analysis.

This paper based on shenyu-2.4.0 version of the source code analysis, the official website of the introduction of please refer to the Data Synchronization Design .

1. Http Long Polling#

Here is a direct quote from the official website with the relevant description.

The mechanism of Zookeeper and WebSocket data synchronization is relatively simple, while Http long polling is more complex. Apache ShenYu borrowed the design ideas of Apollo and Nacos, took their essence, and implemented Http long polling data synchronization function by itself. Note that this is not the traditional ajax long polling!

Http Long Polling mechanism as shown above, Apache ShenYu gateway active request shenyu-admin configuration service, read timeout time is 90s, means that the gateway layer request configuration service will wait at most 90s, so as to facilitate shenyu-admin configuration service timely response to change data, so as to achieve quasi real-time push.

The Http long polling mechanism is initiated by the gateway requesting shenyu-admin, so for this source code analysis, we start from the gateway side.

2. Gateway Data Sync#

2.1 Load Configuration#

The Http long polling data synchronization configuration is loaded through spring boot starter mechanism when we introduce the relevant dependencies and have the following configuration in the configuration file.

Introduce dependencies in the pom file.

        <!--shenyu data sync start use http-->        <dependency>            <groupId>org.apache.shenyu</groupId>            <artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>            <version>${project.version}</version>        </dependency>

Add the following configuration to the application.yml configuration file.

shenyu:    sync:       http:          url : http://localhost:9095

When the gateway is started, the configuration class HttpSyncDataConfiguration is executed, loading the corresponding Bean.


/** * Http sync data configuration for spring boot. */@Configuration@ConditionalOnClass(HttpSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")@Slf4jpublic class HttpSyncDataConfiguration {
    /**     * Http sync data service.     * @param httpConfig             * @param pluginSubscriber        * @param metaSubscribers         * @param authSubscribers         * @return the sync data service     */    @Bean    public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        log.info("you use http long pull sync shenyu data");        return new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));    }
    /**     * Http config http config.     * @return the http config     */    @Bean    @ConfigurationProperties(prefix = "shenyu.sync.http")    public HttpConfig httpConfig() {        return new HttpConfig();    }}

HttpSyncDataConfiguration is the configuration class for Http long polling data synchronization, responsible for creating HttpSyncDataService (responsible for the concrete implementation of http data synchronization) and HttpConfig (admin property configuration). It is annotated as follows.

  • @Configuration: indicates that this is a configuration class.
  • @ConditionalOnClass(HttpSyncDataService.class): conditional annotation indicating that the class HttpSyncDataService is to be present.
  • @ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url"): conditional annotation to have the property shenyu.sync.http.url configured.

2.2 Property initialization#

  • HttpSyncDataService

In the constructor of HttpSyncDataService, complete the property initialization.

public class HttpSyncDataService implements SyncDataService, AutoCloseable {
    // omitted attribute field ......

   public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {        // 1. create data refresh factory        this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);        // 2. get config of admin         this.httpConfig = httpConfig;        // shenyu-admin url        this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));        // 3. create httpClient, used to initiate requests to admin        this.httpClient = createRestTemplate();        // 4. start a long polling task        this.start();    }
    //......}

Other functions and related fields are omitted from the above code, and the initialization of the properties is done in the constructor, mainly.

  • creating data processors for subsequent caching of various types of data (plugins, selectors, rules, metadata and authentication data).

  • obtaining the admin property configuration, mainly to obtain the url of the admin, admin with possible clusters, multiple split by a comma (,).

  • creating httpClient, using RestTemplate, for launching requests to admin.

    private RestTemplate createRestTemplate() {        OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
        // connection establishment timeout of 10s        factory.setConnectTimeout((int) this.connectionTimeout.toMillis());
        // The gateway actively requests the configuration service of shenyu-admin, and the read timeout is 90s        factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT);        return new RestTemplate(factory);    }
  • Start the long polling task.

2.3 Start the long polling task.#

  • HttpSyncDataService#start()

In the start() method, two things are done, one is to get the full amount of data, that is, to request the admin side to get all the data that needs to be synchronized, and then cache the acquired data into the gateway memory. The other is to open a multi-threaded execution of a long polling task.

private void start() {        // Initialize only once, implemented by atomic classes.         RUNNING = new AtomicBoolean(false);        // It could be initialized multiple times, so you need to control that.        if (RUNNING.compareAndSet(false, true)) {            // fetch all group configs.            // Initial startup, get full data            this.fetchGroupConfig(ConfigGroupEnum.values());
            // A backend service, a thread            int threadSize = serverList.size();            // ThreadPoolExecutor            this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,                    new LinkedBlockingQueue<>(),                    ShenyuThreadFactory.create("http-long-polling", true));                        // start long polling, each server creates a thread to listen for changes.            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));        } else {            log.info("shenyu http long polling was started, executor=[{}]", executor);        }    }
2.3.1 Fetch Data#
  • HttpSyncDataService#fetchGroupConfig()

ShenYu groups all the data that needs to be synchronized, there are 5 data types, namely plugins, selectors, rules, metadata and authentication data.

public enum ConfigGroupEnum {    APP_AUTH, // app auth data    PLUGIN, // plugin data    RULE, // rule data    SELECTOR, // selector data    META_DATA; // meta data}

The admin may be a cluster, and here a request is made to each admin in a round-robin fashion, and if one succeeds, then the operation to get the full amount of data from the admin and cache it to the gateway is executed successfully. If there is an exception, the request is launched to the next admin.

private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException {    // It is possible that admins are clustered, and here requests are made to each admin by means of a loop.        for (int index = 0; index < this.serverList.size(); index++) {            String server = serverList.get(index);            try {                // do execute                this.doFetchGroupConfig(server, groups);                // If you have a success, you are successful and can exit the loop                break;            } catch (ShenyuException e) {                // An exception occurs, try executing the next                // The last one also failed to execute, throwing an exception                // no available server, throw exception.                if (index >= serverList.size() - 1) {                    throw e;                }                log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));            }        }    }
  • HttpSyncDataService#doFetchGroupConfig()

In this method, the request parameters are first assembled, then the request is launched through httpClient to admin to get the data, and finally the obtained data is updated to the gateway memory.

// Launch a request to the admin backend management system to get all synchronized dataprivate void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {    // 1. build request parameters, all grouped enumeration types    StringBuilder params = new StringBuilder();    for (ConfigGroupEnum groupKey : groups) {        params.append("groupKeys").append("=").append(groupKey.name()).append("&");    }
    // admin url:  /configs/fetch    String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");    log.info("request configs: [{}]", url);    String json = null;    try {        // 2. get a request for change data        json = this.httpClient.getForObject(url, String.class);    } catch (RestClientException e) {        String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());        log.warn(message);        throw new ShenyuException(message, e);    }    // update local cache    // 3. Update data in gateway memory    boolean updated = this.updateCacheWithJson(json);    // The update was successful and the method is now complete    if (updated) {        log.info("get latest configs: [{}]", json);        return;    }    // not updated. it is likely that the current config server has not been updated yet. wait a moment.    log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);    // No data update on the server side, just wait 30s    ThreadUtils.sleep(TimeUnit.SECONDS, 30);}

From the code, we can see that the admin side provides the interface to get the full amount of data is /configs/fetch, so we will not go further here and put it in the later analysis.

If you get the result data from admin and update it successfully, then this method is finished. If there is no successful update, then it is possible that there is no data update on the server side, so wait 30s.

Here you need to explain in advance, the gateway in determining whether the update is successful, there is a comparison of the data operation, immediately mentioned.

  • HttpSyncDataService#updateCacheWithJson()

Update the data in the gateway memory. Use GSON for deserialization, take the real data from the property data and give it to DataRefreshFactory to do the update.

    private boolean updateCacheWithJson(final String json) {        // Using GSON for deserialization        JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);        JsonObject data = jsonObject.getAsJsonObject("data");        // if the config cache will be updated?        return factory.executor(data);    }
  • DataRefreshFactory#executor()

Update the data according to different data types and return the updated result. Here, parallelStream() is used for parallel update, and the specific update logic is given to the dataRefresh.refresh() method. In the update result, one of the data types is updated, which means that the operation has been updated.

    public boolean executor(final JsonObject data) {        //updata data in parallelStream        List<Boolean> result = ENUM_MAP.values().parallelStream()                .map(dataRefresh -> dataRefresh.refresh(data))                .collect(Collectors.toList());        //有一个更新就表示此次发生了更新操作        return result.stream().anyMatch(Boolean.TRUE::equals);    }
  • AbstractDataRefresh#refresh()

The data update logic uses the template method design pattern, where the generic operation is done in the abstract method and the different implementation logic is done by subclasses. 5 data types have some differences in the specific update logic, but there is also a common update logic, and the class diagram relationship is as follows.

In the generic refresh() method, it is responsible for data type conversion, determining whether an update is needed, and the actual data refresh operation.

    @Override    public Boolean refresh(final JsonObject data) {        boolean updated = false;        // convert data        JsonObject jsonObject = convert(data);        if (null != jsonObject) {            // get data            ConfigData<T> result = fromJson(jsonObject);            // does it need to be updated            if (this.updateCacheIfNeed(result)) {                updated = true;                // real update logic, data refresh operation                refresh(result.getData());            }        }        return updated;    }
  • AbstractDataRefresh#updateCacheIfNeed()

The process of data conversion, which is based on different data types, we will not trace further to see if the data needs to be updated logically. The method name is updateCacheIfNeed(), which is implemented by method overloading.

// result is dataprotected abstract boolean updateCacheIfNeed(ConfigData<T> result);
// newVal is the latest value obtained// What kind of data type is groupEnumprotected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {        // If it is the first time, then it is put directly into the cache and returns true, indicating that the update was made this time        if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {            return true;        }        ResultHolder holder = new ResultHolder(false);        GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {            // md5 value is the same, no need to update            if (StringUtils.equals(oldVal.getMd5(), newVal.getMd5())) {                log.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());                return oldVal;            }
            // The current cached data has been modified for a longer period than the new data and does not need to be updated.            // must compare the last update time            if (oldVal.getLastModifyTime() >= newVal.getLastModifyTime()) {                log.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum);                return oldVal;            }            log.info("update {} config: {}", groupEnum, newVal);            holder.result = true;            return newVal;        });        return holder.result;    }

As you can see from the source code above, there are two cases where updates are not required.

  • The md5 values of both data are the same, so no update is needed;
  • The current cached data has been modified longer than the new data, so no update is needed.

In other cases, the data needs to be updated.

At this point, we have finished analyzing the logic of the start() method to get the full amount of data for the first time, followed by the long polling operation. For convenience, I will paste the start() method once more.

    private void start() {        // It could be initialized multiple times, so you need to control that.        if (RUNNING.compareAndSet(false, true)) {            // fetch all group configs.            // Initial startup, get full data            this.fetchGroupConfig(ConfigGroupEnum.values());
            // one background service, one thread            int threadSize = serverList.size();            // custom thread pool            this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,                    new LinkedBlockingQueue<>(),                    ShenyuThreadFactory.create("http-long-polling", true));            // start long polling, each server creates a thread to listen for changes.            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));        } else {            log.info("shenyu http long polling was started, executor=[{}]", executor);        }    }
2.3.2 Execute Long Polling Task#
  • HttpLongPollingTask#run()

The long polling task is HttpLongPollingTask, which implements the Runnable interface and the task logic is in the run() method. The task is executed continuously through a while() loop, i.e., long polling. There are three retries in each polling logic, one polling task fails, wait 5s and continue, 3 times all fail, wait 5 minutes and try again.

Start long polling, an admin service, and create a thread for data synchronization.

class HttpLongPollingTask implements Runnable {
        private String server;
        // Default retry 3 times        private final int retryTimes = 3;
        HttpLongPollingTask(final String server) {            this.server = server;        }
        @Override        public void run() {            // long polling            while (RUNNING.get()) {                for (int time = 1; time <= retryTimes; time++) {                    try {                        doLongPolling(server);                    } catch (Exception e) {                        // print warnning log.                        if (time < retryTimes) {                            log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",                                    time, retryTimes - time, e.getMessage());                            // long polling failed, wait 5s and continue                            ThreadUtils.sleep(TimeUnit.SECONDS, 5);                            continue;                        }                        // print error, then suspended for a while.                        log.error("Long polling failed, try again after 5 minutes!", e);                        // failed all 3 times, wait 5 minutes and try again                        ThreadUtils.sleep(TimeUnit.MINUTES, 5);                    }                }            }            log.warn("Stop http long polling.");        }    }
  • HttpSyncDataService#doLongPolling()

Core logic for performing long polling tasks.

  • Assembling request parameters based on data types: md5 and lastModifyTime.
  • Assembling the request header and request body.
  • Launching a request to admin to determine if the group data has changed.
  • Based on the group that has changed, go back and get the data.
private void doLongPolling(final String server) {        // build request params: md5 and lastModifyTime        MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {            ConfigData<?> cacheConfig = factory.cacheConfigData(group);            if (cacheConfig != null) {                String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));                params.put(group.name(), Lists.newArrayList(value));            }        }        // build request heaad and body        HttpHeaders headers = new HttpHeaders();        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);        HttpEntity httpEntity = new HttpEntity(params, headers);        String listenerUrl = server + "/configs/listener";        log.debug("request listener configs: [{}]", listenerUrl);        JsonArray groupJson = null;        //Initiate a request to admin to determine if the group data has changed        //Here it just determines whether a group has changed or not        try {            String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();            log.debug("listener result: [{}]", json);            groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");        } catch (RestClientException e) {            String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());            throw new ShenyuException(message, e);        }        // Depending on the group where the change occurred, go back and get the data        /**         * The official website explains here.         * After the gateway receives the response message, it only knows which Group has made the configuration change, and it still needs to request the configuration data of that Group again.         * There may be a question here: why not write out the changed data directly?         * We also discussed this issue in depth during development, because the http long polling mechanism can only guarantee quasi-real time, if the processing at the gateway layer is not timely, * or the administrator frequently updates the configuration, it is very difficult to get the information from the gateway layer.         * If it is not processed in time at the gateway level, or if the administrator updates the configuration frequently, it is very likely to miss the push of a configuration change, so for security reasons, we only inform a group that the information has changed.         *For security reasons, we only notify a group of changes.         * Personal understanding.         * If the change data is written out directly, when the administrator frequently updates the configuration, the first update will remove the client from the blocking queue and return the response information to the gateway.         * If a second update is made at this time, the current client is not in the blocking queue, so this time the change is missed.         * The same is true for untimely processing by the gateway layer.         * This is a long polling, one gateway one synchronization thread, there may be time consuming process.         * If the admin has data changes, the current gateway client is not in the blocking queue and will not get the data.         */        if (groupJson != null) {            // fetch group configuration async.            ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);            if (ArrayUtils.isNotEmpty(changedGroups)) {                log.info("Group config changed: {}", Arrays.toString(changedGroups));                // Proactively get the changed data from admin, depending on the grouping, and take the data in full                this.doFetchGroupConfig(server, changedGroups);            }        }    }

One special point needs to be explained here: In the long polling task, why don't you get the changed data directly? Instead, we determine which group data has been changed, and then request admin again to get the changed data?

The official explanation here is.

After the gateway receives the response information, it only knows which Group has changed its configuration, and it needs to request the configuration data of that Group again. There may be a question here: Why not write out the changed data directly? We have discussed this issue in depth during development, because the http long polling mechanism can only guarantee quasi-real time, and if it is not processed in time at the gateway layer, it will be very difficult to update the configuration data. If the gateway layer is not processed in time, > or the administrator updates the configuration frequently, it is likely to miss the push of a configuration change, so for security reasons, we only inform a group that the information has changed.

My personal understanding is that.

If the change data is written out directly, when the administrator updates the configuration frequently, the first update will client remove the blocking queue and return the response information to the gateway. If a second update is made at this time, then the current client is not in the blocking queue, so this time the change is missed. The same is true for the gateway layer's untimely processing. This is a long polling, one gateway one synchronization thread, there may be a time-consuming process. If admin has data changes, the current gateway client is not in the blocking queue and will not get the data.

We have not yet analyzed the processing logic of the admin side, so let's talk about it roughly. At the admin end, the gateway client will be put into the blocking queue, and when there is a data change, the gateway client will come out of the queue and send the change data. So, if the gateway client is not in the blocking queue when there is a data change, then the current changed data is not available.

When we know which grouping data has changed, we actively get the changed data from admin again, and get the data in full depending on the grouping. The call method is doFetchGroupConfig(), which has been analyzed in the previous section.

At this point of analysis, the data synchronization operation on the gateway side is complete. The long polling task is to keep making requests to admin to see if the data has changed, and if any group data has changed, then initiate another request to admin to get the changed data, and then update the data in the gateway's memory.

Long polling task flow at the gateway side.

3. Admin Data Sync#

From the previous analysis, it can be seen that the gateway side mainly calls two interfaces of admin.

  • /configs/listener: determine whether the group data has changed.
  • /configs/fetch: get the changed group data.

If we analyze directly from these two interfaces, some parts may not be well understood, so let's start analyzing the data synchronization process from the admin startup process.

3.1 Load Configuration#

If the following configuration is done in the configuration file application.yml, it means that the data synchronization is done by http long polling.

shenyu:  sync:      http:        enabled: true

When the program starts, the configuration of the data synchronization class is loaded through springboot conditional assembly. In this process, HttpLongPollingDataChangedListener is created to handle the implementation logic related to long polling.

/** * Data synchronization configuration class * Conditional assembly via springboot * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {
    /**     * http long polling.     */    @Configuration    @ConditionalOnProperty(name = "shenyu.sync.http.enabled", havingValue = "true")    @EnableConfigurationProperties(HttpSyncProperties.class)    static class HttpLongPollingListener {
        @Bean        @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)        public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {            return new HttpLongPollingDataChangedListener(httpSyncProperties);        }    }}

3.2 Data change listener instantiation#

  • HttpLongPollingDataChangedListener

The data change listener is instantiated and initialized by means of a constructor. In the constructor, a blocking queue is created to hold clients, a thread pool is created to execute deferred tasks and periodic tasks, and information about the properties of long polling is stored.

    public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {        // default client (here is the gateway) 1024        this.clients = new ArrayBlockingQueue<>(1024);        // create thread pool        // ScheduledThreadPoolExecutor can perform delayed tasks, periodic tasks, and normal tasks        this.scheduler = new ScheduledThreadPoolExecutor(1,                ShenyuThreadFactory.create("long-polling", true));        // http sync properties        this.httpSyncProperties = httpSyncProperties;    }

In addition, it has the following class diagram relationships.

The InitializingBean interface is implemented, so the afterInitialize() method is executed during the initialization of the bean. Execute periodic tasks via thread pool: updating the data in memory (CACHE) is executed every 5 minutes and starts after 5 minutes. Refreshing the local cache is reading data from the database to the local cache (in this case the memory), done by refreshLocalCache().

    /**     * is called in the afterPropertiesSet() method of the InitializingBean interface, which is executed during the initialization of the bean     */    @Override    protected void afterInitialize() {        long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();        // Periodically check the data for changes and update the cache
        // Execution cycle task: Update data in memory (CACHE) is executed every 5 minutes and starts after 5 minutes        // Prevent the admin from starting up first for a while and then generating data; then the gateway doesn't get the full amount of data when it first connects        scheduler.scheduleWithFixedDelay(() -> {            log.info("http sync strategy refresh config start.");            try {                // Read data from database to local cache (in this case, memory)                this.refreshLocalCache();                log.info("http sync strategy refresh config success.");            } catch (Exception e) {                log.error("http sync strategy refresh config error!", e);            }        }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);        log.info("http sync strategy refresh interval: {}ms", syncInterval);    }
  • refreshLocalCache()

Update for each of the 5 data types.

    // Read data from database to local cache (in this case, memory)    private void refreshLocalCache() {        //update app auth data        this.updateAppAuthCache();        //update plugin data        this.updatePluginCache();        //update rule data        this.updateRuleCache();        //update selector data        this.updateSelectorCache();        //update meta data        this.updateMetaDataCache();    }

The logic of the 5 update methods is similar, call the service method to get the data and put it into the memory CACHE. Take the updateRuleData method updateRuleCache() for example, pass in the rule enumeration type and call ruleService.listAll() to get all the rule data from the database.

    /**     * Update rule cache.     */    protected void updateRuleCache() {        this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll());    }
  • updateCache()

Update the data in memory using the data in the database.

// cache Mapprotected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
/**     * if md5 is not the same as the original, then update lcoal cache.     * @param group ConfigGroupEnum     * @param <T> the type of class     * @param data the new config data     */    protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {        // data serialization        String json = GsonUtils.getInstance().toJson(data);        // pass in md5 value and modification time        ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());        // update group data        ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);        log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);    }

The initialization process is to start periodic tasks to update the memory data by fetching data from the database at regular intervals.

Next, we start the analysis of two interfaces.

  • /configs/listener: determines if the group data has changed.
  • /configs/fetch: fetching the changed group data.

3.3 Data change polling interface#

  • /configs/listener: determines if the group data has changed.

The interface class is ConfigController, which only takes effect when using http long polling for data synchronization. The interface method listener() has no other logic and calls the doLongPolling() method directly.

   /** * This Controller only when HttpLongPollingDataChangedListener exist, will take effect. */@ConditionalOnBean(HttpLongPollingDataChangedListener.class)@RestController@RequestMapping("/configs")@Slf4jpublic class ConfigController {
    @Resource    private HttpLongPollingDataChangedListener longPollingListener;        // Omit other logic
    /**     * Listener.     * Listen for data changes and perform long polling     * @param request  the request     * @param response the response     */    @PostMapping(value = "/listener")    public void listener(final HttpServletRequest request, final HttpServletResponse response) {        longPollingListener.doLongPolling(request, response);    }
}
  • HttpLongPollingDataChangedListener#doLongPolling()

Perform long polling tasks: If there are data changes, they will be responded to the client (in this case, the gateway side) immediately. Otherwise, the client will be blocked until there is a data change or a timeout.

/**     * Execute long polling: If there is a data change, it will be responded to the client (here is the gateway side) immediately.     * Otherwise, the client will otherwise remain blocked until there is a data change or a timeout.     * @param request     * @param response     */    public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {        // compare group md5        // Compare the md5, determine whether the data of the gateway and the data of the admin side are consistent, and get the data group that has changed        List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);        String clientIp = getRemoteIp(request);        // response immediately.        // Immediate response to the gateway if there is changed data        if (CollectionUtils.isNotEmpty(changedGroup)) {            this.generateResponse(response, changedGroup);            log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);            return;        }
         // No change, then the client (in this case the gateway) is put into the blocking queue        // listen for configuration changed.        final AsyncContext asyncContext = request.startAsync();        // AsyncContext.settimeout() does not timeout properly, so you have to control it yourself        asyncContext.setTimeout(0L);        // block client's thread.        scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));    }
  • HttpLongPollingDataChangedListener#compareChangedGroup()

To determine whether the group data has changed, the judgment logic is to compare the md5 value and lastModifyTime at the gateway side and the admin side.

  • If the md5 value is different, then it needs to be updated.
  • If the lastModifyTime on the admin side is greater than the lastModifyTime on the gateway side, then it needs to be updated.
 /**     * Determine if the group data has changed     * @param request     * @return     */    private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {        List<ConfigGroupEnum> changedGroup = new ArrayList<>(ConfigGroupEnum.values().length);        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {            // The md5 value and lastModifyTime of the data on the gateway side            String[] params = StringUtils.split(request.getParameter(group.name()), ',');            if (params == null || params.length != 2) {                throw new ShenyuException("group param invalid:" + request.getParameter(group.name()));            }            String clientMd5 = params[0];            long clientModifyTime = NumberUtils.toLong(params[1]);            ConfigDataCache serverCache = CACHE.get(group.name());            // do check. determine if the group data has changed            if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {                changedGroup.add(group);            }        }        return changedGroup;    }
  • LongPollingClient

No change data, then the client (in this case the gateway) is put into the blocking queue. The blocking time is 60 seconds, i.e. after 60 seconds remove and respond to the client.

class LongPollingClient implements Runnable {      // omitted other logic            @Override        public void run() {            try {                // Removal after 60 seconds and response to the client                this.asyncTimeoutFuture = scheduler.schedule(() -> {                    clients.remove(LongPollingClient.this);                    List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());                    sendResponse(changedGroups);                }, timeoutTime, TimeUnit.MILLISECONDS);
                // Add to blocking queue                clients.add(this);
            } catch (Exception ex) {                log.error("add long polling client error", ex);            }        }
        /**         * Send response.         *         * @param changedGroups the changed groups         */        void sendResponse(final List<ConfigGroupEnum> changedGroups) {            // cancel scheduler            if (null != asyncTimeoutFuture) {                asyncTimeoutFuture.cancel(false);            }            // Groups responding to changes            generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);            asyncContext.complete();        }    }

3.4 Get Change Data Interface#

  • /configs/fetch: get change data;

Get the grouped data and return the result according to the parameters passed in by the gateway. The main implementation method is longPollingListener.fetchConfig().


@ConditionalOnBean(HttpLongPollingDataChangedListener.class)@RestController@RequestMapping("/configs")@Slf4jpublic class ConfigController {
    @Resource    private HttpLongPollingDataChangedListener longPollingListener;
    /**     * Fetch configs shenyu result.     * @param groupKeys the group keys     * @return the shenyu result     */    @GetMapping("/fetch")    public ShenyuAdminResult fetchConfigs(@NotNull final String[] groupKeys) {        Map<String, ConfigData<?>> result = Maps.newHashMap();        for (String groupKey : groupKeys) {            ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));            result.put(groupKey, data);        }        return ShenyuAdminResult.success(ShenyuResultMessage.SUCCESS, result);    }      // Other interfaces are omitted
}
  • AbstractDataChangedListener#fetchConfig()

Data fetching is taken directly from CACHE, and then matched and encapsulated according to different grouping types.


    /**     * fetch configuration from cache.     * @param groupKey the group key     * @return the configuration data     */    public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {        // get data from CACHE        ConfigDataCache config = CACHE.get(groupKey.name());         switch (groupKey) {            case APP_AUTH: // app auth data                List<AppAuthData> appAuthList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<AppAuthData>>() {                }.getType());                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), appAuthList);            case PLUGIN: // plugin data                List<PluginData> pluginList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<PluginData>>() {                }.getType());                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), pluginList);            case RULE:   // rule data                List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {                }.getType());                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);            case SELECTOR:  // selector data                List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {                }.getType());                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);            case META_DATA: // meta data                List<MetaData> metaList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<MetaData>>() {                }.getType());                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), metaList);            default:  // other data type, throw exception                throw new IllegalStateException("Unexpected groupKey: " + groupKey);        }

3.5 Data Change#

In the previous websocket data synchronization and zookeeper data synchronization source code analysis article, we know that the admin side data synchronization design structure is as follows.

Various data change listeners are subclasses of DataChangedListener.

When the data is modified on the admin side, event notifications are sent through the Spring event handling mechanism. The sending logic is as follows.


/** * Event forwarders, which forward the changed events to each ConfigEventListener. * Data change event distributor: synchronize the change data to ShenYu gateway when there is a data change in admin side * Data changes rely on Spring's event-listening mechanism: ApplicationEventPublisher --> ApplicationEvent --> ApplicationListener * */@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  // other logic omitted
    /**     * Call this method when there are data changes     * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // Iterate through the data change listeners (it's generally good to use a kind of data synchronization)        for (DataChangedListener listener : listeners) {            // What kind of data has changed            switch (event.getGroupKey()) {                case APP_AUTH: // app auth data                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // plugin data                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // rule data                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // selector data                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // meta data                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // other data type, throw exception                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }}

Suppose, the plugin information is modified and the data is synchronized by http long polling, then the actual call to listener.onPluginChanged() is org.apache.shenyu.admin.listener. AbstractDataChangedListener#onPluginChanged.

    /**     * In the operation of the admin, there is an update of the plugin occurred     * @param changed   the changed     * @param eventType the event type     */    @Override    public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {        if (CollectionUtils.isEmpty(changed)) {            return;        }        // update CACHE        this.updatePluginCache();        // execute change task        this.afterPluginChanged(changed, eventType);    }

There are two processing operations, one is to update the memory CACHE, which was analyzed earlier, and the other is to execute the change task, which is executed in the thread pool.

  • HttpLongPollingDataChangedListener#afterPluginChanged()
    @Override    protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {        // execute by thread pool        scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));    }
  • DataChangeTask

Data change task: remove the clients in the blocking queue in turn and send a response to notify the gateway that a group of data has changed.

class DataChangeTask implements Runnable {        //other logic omitted          @Override        public void run() {            // If the client in the blocking queue exceeds the given value of 100, it is executed in batches            if (clients.size() > httpSyncProperties.getNotifyBatchSize()) {                List<LongPollingClient> targetClients = new ArrayList<>(clients.size());                clients.drainTo(targetClients);                List<List<LongPollingClient>> partitionClients = Lists.partition(targetClients, httpSyncProperties.getNotifyBatchSize());               // batch execution                partitionClients.forEach(item -> scheduler.execute(() -> doRun(item)));            } else {                // execute task                doRun(clients);            }        }
        private void doRun(final Collection<LongPollingClient> clients) {            // Notify all clients that a data change has occurred            for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {                LongPollingClient client = iter.next();                iter.remove();                // send response to client                client.sendResponse(Collections.singletonList(groupKey));                log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);            }        }    }

At this point, the data synchronization logic on the admin side is analyzed. In the http long polling based data synchronization is, it has three main functions.

  • providing a data change listening interface.
  • providing the interface to get the changed data.
  • When there is a data change, remove the client in the blocking queue and respond to the result.

Finally, three diagrams describe the long polling task flow on the admin side.

  • /configs/listener data change listener interface.

  • /configs/fetch fetch change data interface.

  • Update data in the admin backend management system for data synchronization.

4. Summary#

This article focuses on the source code analysis of http long polling data synchronization in the ShenYu gateway. The main knowledge points involved are as follows.

  • http long polling is initiated by the gateway side, which constantly requests the admin side.
  • change data at group granularity (authentication information, plugins, selectors, rules, metadata).
  • http long polling results in getting only the change group, and another request needs to be initiated to get the group data.
  • Whether the data is updated or not is determined by the md5 value and the modification time lastModifyTime.

WebSocket Data Synchronization Source Code Analysis

· 22 min read
Apache ShenYu Committer

In ShenYu gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for ZooKeeper, WebSocket, http long poll, Nacos, etcd and Consul. The main content of this article is based on WebSocket data synchronization source code analysis.

This paper based on shenyu-2.4.0 version of the source code analysis, the official website of the introduction of please refer to the Data Synchronization Design .

1. About WebSocket Communication#

The WebSocket protocol was born in 2008 and became an international standard in 2011. It can be two-way communication, the server can take the initiative to push information to the client, the client can also take the initiative to send information to the server. The WebSocket protocol is based on the TCP protocol and belongs to the application layer, with low performance overhead and high communication efficiency. The protocol identifier is ws.

2. Admin Data Sync#

Let's trace the source code from a real case, such as adding a selector data in the background management system:

2.1 Accept Changed Data#

  • SelectorController.createSelector()

Enter the createSelector() method of the SelectorController class, which validates data, adds or updates data, and returns results.

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController {        @PostMapping("")    public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) { // @Valid 数校验        // create or update data        Integer createCount = selectorService.createOrUpdate(selectorDTO);        // return result        return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);    }        // ......}

2.2 Handle Data#

  • SelectorServiceImpl.createOrUpdate()

Convert data in the SelectorServiceImpl class using the createOrUpdate() method, save it to the database, publish the event, update upstream.

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService {    // eventPublisher    private final ApplicationEventPublisher eventPublisher;        @Override    @Transactional(rollbackFor = Exception.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        // build data DTO --> DO        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        // insert or update ?        if (StringUtils.isEmpty(selectorDTO.getId())) {            //  insert into data            selectorCount = selectorMapper.insertSelective(selectorDO);            // insert into condition data            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });            // check selector add            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());                dataPermissionDTO.setDataId(selectorDO.getId());                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));            }
        } else {            // update data, delete and then insert            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        // publish event        publishEvent(selectorDO, selectorConditionDTOs);
        // update upstream        updateDivideUpstream(selectorDO);        return selectorCount;    }            // ......    }

In the Service class to persist data, i.e. to the database, this should be familiar, not expand. The update upstream operation is analyzed in the corresponding section below, focusing on the publish event operation, which performs data synchronization.

The logic of the publishEvent() method is to find the plugin corresponding to the selector, build the conditional data, and publish the change data.

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        // find plugin of selector        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        // build condition data        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // publish event        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

Change data released by eventPublisher.PublishEvent() is complete, the eventPublisher object is a ApplicationEventPublisher class, The fully qualified class name is org.springframework.context.ApplicationEventPublisher. Here we see that publishing data is done through Spring related functionality.

ApplicationEventPublisher

When a state change, the publisher calls ApplicationEventPublisher of publishEvent method to release an event, Spring container broadcast event for all observers, The observer's onApplicationEvent method is called to pass the event object to the observer. There are two ways to call publishEvent method, one is to implement the interface by the container injection ApplicationEventPublisher object and then call the method, the other is a direct call container, the method of two methods of publishing events not too big difference.

  • ApplicationEventPublisher: publish event;
  • ApplicationEvent: Spring event, record the event source, time, and data;
  • ApplicationListener: event listener, observer.

In Spring event publishing mechanism, there are three objects,

An object is a publish event ApplicationEventPublisher, in ShenYu through the constructor in the injected a eventPublisher.

The other object is ApplicationEvent , inherited from ShenYu through DataChangedEvent, representing the event object.

public class DataChangedEvent extends ApplicationEvent {//......}

The last object is ApplicationListener in ShenYu in through DataChangedEventDispatcher class implements this interface, as the event listener, responsible for handling the event object.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

2.3 Dispatch Data#

  • DataChangedEventDispatcher.onApplicationEvent()

Released when the event is completed, will automatically enter the DataChangedEventDispatcher class onApplicationEvent() method of handling events.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * This method is called when there are data changes   * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // Iterate through the data change listener (usually using a data synchronization approach is fine)      for (DataChangedListener listener : listeners) {            // What kind of data has changed        switch (event.getGroupKey()) {                case APP_AUTH: // app auth data                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // plugin data                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // rule data                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // selector data                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // metadata                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // Other types throw exception                  throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    }

When there is a data change, the onApplicationEvent method is called and all the data change listeners are iterated to determine the data type and handed over to the appropriate data listener for processing.

ShenYu groups all the data into five categories: APP_AUTH, PLUGIN, RULE, SELECTOR and META_DATA.

Here the data change listener (DataChangedListener) is an abstraction of the data synchronization policy. Its concrete implementation is:

These implementation classes are the synchronization strategies currently supported by ShenYu:

  • WebsocketDataChangedListener: data synchronization based on Websocket;
  • ZookeeperDataChangedListener:data synchronization based on Zookeeper;
  • ConsulDataChangedListener: data synchronization based on Consul;
  • EtcdDataDataChangedListener:data synchronization based on etcd;
  • HttpLongPollingDataChangedListener:data synchronization based on http long polling;
  • NacosDataChangedListener:data synchronization based on nacos;

Given that there are so many implementation strategies, how do you decide which to use?

Because this paper is based on websocket data synchronization source code analysis, so here to WebsocketDataChangedListener as an example, the analysis of how it is loaded and implemented.

A global search in the source code project shows that its implementation is done in the DataSyncConfiguration class.

/** * Data Sync Configuration * By springboot conditional assembly * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {     /**     * The WebsocketListener(default strategy).     */    @Configuration    @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)    @EnableConfigurationProperties(WebsocketSyncProperties.class)    static class WebsocketListener {
        /**         * Config event listener data changed listener.         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(WebsocketDataChangedListener.class)        public DataChangedListener websocketDataChangedListener() {            return new WebsocketDataChangedListener();        }
        /**         * Websocket collector.         * Websocket collector class: establish a connection, send a message, close the connection and other operations         * @return the websocket collector         */        @Bean        @ConditionalOnMissingBean(WebsocketCollector.class)        public WebsocketCollector websocketCollector() {            return new WebsocketCollector();        }
        /**         * Server endpoint exporter          *         * @return the server endpoint exporter         */        @Bean        @ConditionalOnMissingBean(ServerEndpointExporter.class)        public ServerEndpointExporter serverEndpointExporter() {            return new ServerEndpointExporter();        }    }        //......}

This configuration class is implemented through the SpringBoot conditional assembly class. The WebsocketListener class has several annotations:

  • @Configuration: Configuration file, application context;

  • @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true): attribute condition. The configuration class takes effect only when the condition is met. That is, when we have the following configuration, websocket is used for data synchronization. Note, however, the matchIfMissing = true attribute, which means that this configuration class will work if you don't have the following configuration. Data synchronization based on webSocket is officially recommended and the default.

    shenyu:    sync:    websocket:      enabled: true
  • @EnableConfigurationProperties:enable configuration properties;

When we take the initiative to configuration, use the websocket data synchronization, WebsocketDataChangedListener is generated. So in the event handler onApplicationEvent(), it goes to the corresponding listener. In our case, a selector is to increase the new data, the data by adopting the websocket, so, the code will enter the WebsocketDataChangedListener selector data change process.

    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // Iterate through the data change listener (usually using a data synchronization approach is fine)        for (DataChangedListener listener : listeners) {            // What kind of data has changed             switch (event.getGroupKey()) {                                    // other logic is omitted              case SELECTOR:   // selector data                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // WebsocketDataChangedListener handle selector data                    break;         }    }

2.4 Websocket Data Changed Listener#

  • WebsocketDataChangedListener.onSelectorChanged()

In the onSelectorChanged() method, the data is encapsulated into WebsocketData and then sent via webSocketCollector.send().

    // selector data has been updated    @Override    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {        // build WebsocketData         WebsocketData<SelectorData> websocketData =                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);        // websocket send data        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);    }

2.5 Websocket Send Data#

  • WebsocketCollector.send()

In the send() method, the type of synchronization is determined and processed according to the different types.

@Slf4j@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)public class WebsocketCollector {    /**     * Send.     *     * @param message the message     * @param type    the type     */    public static void send(final String message, final DataEventTypeEnum type) {        if (StringUtils.isNotBlank(message)) {            // If it's MYSELF (first full synchronization)          if (DataEventTypeEnum.MYSELF == type) {                // get the session from ThreadLocal            Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);                if (session != null) {                    // send full data to the session                   sendMessageBySession(session, message);                }            } else {                // subsequent incremental synchronization                // synchronize change data to all sessions               SESSION_SET.forEach(session -> sendMessageBySession(session, message));            }        }    }
    private static void sendMessageBySession(final Session session, final String message) {        try {            // The message is sent through the Websocket session           session.getBasicRemote().sendText(message);        } catch (IOException e) {            log.error("websocket send result is exception: ", e);        }    }}

The example we give is a new operation, an incremental synchronization, so it goes

SESSION_SET.forEach(session -> sendMessageBySession(session, message));

then through

session.getBasicRemote().sendText(message);

the data was sent out.

At this point, when data changes occur on the admin side, the changed data is increments sent to the gateway through the WebSocket.

At this point, do you have any questions? For example, where does session come from? How does the gateway establish a connection with admin?

Don't worry, let's do the synchronization analysis on the gateway side.

However, before continuing with the source code analysis, let's use a diagram to string together the above analysis process.

3. Gateway Data Sync#

Assume ShenYu gateway is already in normal operation, using the data synchronization mode is also websocket. How does the gateway receive and process new selector data from admin and send it to the gateway via WebSocket? Let's continue our source code analysis to find out.

3.1 WebsocketClient Accept Data#

  • ShenyuWebsocketClient.onMessage()

There is a ShenyuWebsocketClient class on the gateway, which inherits from WebSocketClient and can establish a connection and communicate with WebSocket.

public final class ShenyuWebsocketClient extends WebSocketClient {  // ......}

After sending data via websocket on the admin side, ShenyuWebsocketClient can receive data via onMessage() and then process it itself.

public final class ShenyuWebsocketClient extends WebSocketClient {      // execute after receiving the message    @Override    public void onMessage(final String result) {        // handle accept data        handleResult(result);    }        private void handleResult(final String result) {        // data deserialization        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);        // which data types, plug-ins, selectors, rules...        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());        // which operation type, update, delete...              String eventType = websocketData.getEventType();        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // handle data        websocketDataHandler.executor(groupEnum, json, eventType);    }}

After receiving the data, first has carried on the deserialization operation, read the data type and operation type, then hand over to websocketDataHandler.executor() for processing.

3.2 Execute Websocket Data Handler#

  • WebsocketDataHandler.executor()

A Websocket data handler is created in factory mode, providing one handler for each data type:

plugin --> PluginDataHandler;

selector --> SelectorDataHandler;

rule --> RuleDataHandler;

auth --> AuthDataHandler;

metadata --> MetaDataHandler.


/** * Create Websocket data handlers through factory mode * The type Websocket cache handler. */public class WebsocketDataHandler {
    private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
    /**     * Instantiates a new Websocket data handler.     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers  the meta data subscribers     * @param authDataSubscribers  the auth data subscribers     */    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,                                final List<MetaDataSubscriber> metaDataSubscribers,                                final List<AuthDataSubscriber> authDataSubscribers) {        // plugin --> PluginDataHandler        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));        // selector --> SelectorDataHandler        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));        // rule --> RuleDataHandler        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));        // auth --> AuthDataHandler        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));        // metadata --> MetaDataHandler        ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));    }
    /**     * Executor.     *     * @param type      the type     * @param json      the json     * @param eventType the event type     */    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {        // find the corresponding data handler based on the data type        ENUM_MAP.get(type).handle(json, eventType);    }}

Different data types have different ways of handling data, so there are different implementation classes. But they also have the same processing logic between them, so they can be implemented through the template approach to design patterns. The same logic is placed in the handle() method of the abstract class, and the different logic is handed over to the respective implementation class.

In our case, a new selector is added, so it will be passed to the SelectorDataHandler for data processing.

3.3 Determine the Event Type#

  • AbstractDataHandler.handle()

Implement common logical handling of data changes: invoke different methods based on different operation types.


public abstract class AbstractDataHandler<T> implements DataHandler {
    /**     * Convert list.     * The different logic is implemented by the respective implementation classes     * @param json the json     * @return the list     */    protected abstract List<T> convert(String json);
    /**     * Do refresh.     * The different logic is implemented by the respective implementation classes     * @param dataList the data list     */    protected abstract void doRefresh(List<T> dataList);
    /**     * Do update.     * The different logic is implemented by the respective implementation classes     * @param dataList the data list     */    protected abstract void doUpdate(List<T> dataList);
    /**     * Do delete.     * The different logic is implemented by the respective implementation classes     * @param dataList the data list     */    protected abstract void doDelete(List<T> dataList);
    // General purpose logic, abstract class implementation    @Override    public void handle(final String json, final String eventType) {        List<T> dataList = convert(json);        if (CollectionUtils.isNotEmpty(dataList)) {            DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);            switch (eventTypeEnum) {                case REFRESH:                case MYSELF:                    doRefresh(dataList);  //Refreshes data and synchronizes all data                    break;                case UPDATE:                case CREATE:                    doUpdate(dataList); // Update or create data, incremental synchronization                    break;                case DELETE:                    doDelete(dataList);  // delete data                    break;                default:                    break;            }        }    }}

New selector data, new operation, through switch-case into doUpdate() method.

3.4 Enter the Specific Data Handler#

  • SelectorDataHandler.doUpdate()

/** * The type Selector data handler. */@RequiredArgsConstructorpublic class SelectorDataHandler extends AbstractDataHandler<SelectorData> {
    private final PluginDataSubscriber pluginDataSubscriber;
    //......
    // update data    @Override    protected void doUpdate(final List<SelectorData> dataList) {        dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);    }}

Iterate over the data and enter the onSelectorSubscribe() method.

  • PluginDataSubscriber.onSelectorSubscribe()

It has no additional logic and calls the subscribeDataHandler() method directly. Within methods, there are data types (plugins, selectors, or rules) and action types (update or delete) to perform different logic.

/** * The common plugin data subscriber, responsible for handling all plug-in, selector, and rule information */public class CommonPluginDataSubscriber implements PluginDataSubscriber {    //......     // handle selector data    @Override    public void onSelectorSubscribe(final SelectoData selectorData) {        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);    }            // A subscription data handler that handles updates or deletions of data    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {        Optional.ofNullable(classData).ifPresent(data -> {            // plugin data            if (data instanceof PluginData) {                PluginData pluginData = (PluginData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                     BaseDataCache.getInstance().cachePluginData(pluginData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removePluginData(pluginData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));                }            } else if (data instanceof SelectorData) {  // selector data                SelectorData selectorData = (SelectorData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                    BaseDataCache.getInstance().cacheSelectData(selectorData);                    // If each plugin has its own processing logic, then do it                     Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removeSelectData(selectorData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));                }            } else if (data instanceof RuleData) {  // rule data                RuleData ruleData = (RuleData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                    BaseDataCache.getInstance().cacheRuleData(ruleData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));                } else if (dataType == DataEventTypeEnum.DELETE) { // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removeRuleData(ruleData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));                }            }        });    }    }

Adding a selector will enter the following logic:

// save the data to gateway memoryBaseDataCache.getInstance().cacheSelectData(selectorData);// If each plugin has its own processing logic, then do itOptional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

One is to save the data to the gateway's memory. BaseDataCache is the class that ultimately caches data, implemented in a singleton pattern. The selector data is stored in the SELECTOR_MAP Map. In the subsequent use, also from this data.

public final class BaseDataCache {    // private instance    private static final BaseDataCache INSTANCE = new BaseDataCache();    // private constructor    private BaseDataCache() {    }        /**     * Gets instance.     *  public method     * @return the instance     */    public static BaseDataCache getInstance() {        return INSTANCE;    }        /**      * A Map of the cache selector data     * pluginName -> SelectorData.     */    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();        public void cacheSelectData(final SelectorData selectorData) {        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);    }           /**     * cache selector data.     * @param data the selector data     */    private void selectorAccept(final SelectorData data) {        String key = data.getPluginName();        if (SELECTOR_MAP.containsKey(key)) { // Update operation, delete before insert            List<SelectorData> existList = SELECTOR_MAP.get(key);            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());            resultList.add(data);            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());            SELECTOR_MAP.put(key, collect);        } else {  // Add new operations directly to Map            SELECTOR_MAP.put(key, Lists.newArrayList(data));        }    }    }

Second, if each plugin has its own processing logic, then do it. Through the IDEA editor, you can see that after adding a selector, there are the following plugins and processing. We're not going to expand it here.

After the above source tracing, and through a practical case, in the admin side to add a selector data, will websocket data synchronization process analysis cleared.

Let's use the following figure to concatenate the data synchronization process on the gateway side:

The data synchronization process has been analyzed, but there are still some problems that have not been analyzed, that is, how does the gateway establish a connection with admin?

4. The Gateway Establishes a Websocket Connection with Admin#

  • websocket config

With the following configuration in the gateway configuration file and the dependency introduced, the websocket related service is started.

shenyu:    file:      enabled: true    cross:      enabled: true    dubbo :      parameter: multi    sync:        websocket :  # Use websocket for data synchronization          urls: ws://localhost:9095/websocket   # websocket address of admin

Add a dependency on websocket in the gateway.

<!--shenyu data sync start use websocket--><dependency>    <groupId>org.apache.shenyu</groupId>    <artifactId>shenyu-spring-boot-starter-sync-data-websocket</artifactId>    <version>${project.version}</version></dependency>
  • Websocket Data Sync Config

The associated bean is created by conditional assembly of springboot. In the gateway started, if we configure the shenyu.sync.websocket.urls, then websocket data synchronization configuration will be loaded. The dependency loading is done through the springboot starter.


/** * WebsocketSyncDataService * Conditional injection is implemented through SpringBoot * Websocket sync data configuration for spring boot. */@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {
    /**     * Websocket sync data service.     * @param websocketConfig   the websocket config     * @param pluginSubscriber the plugin subscriber     * @param metaSubscribers   the meta subscribers     * @param authSubscribers   the auth subscribers     * @return the sync data service     */    @Bean    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        log.info("you use websocket sync shenyu data.......");        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));    }
    /**     * Config websocket config.     *     * @return the websocket config     */    @Bean    @ConfigurationProperties(prefix = "shenyu.sync.websocket")    public WebsocketConfig websocketConfig() {        return new WebsocketConfig();      }}

Start a new spring.factories file in the resources/META-INF directory of your project and specify the configuration classes in the file.

  • WebsocketSyncDataService

The following things are done in 'WebsocketSyncDataService' :

  • Read configuration urls, which represent the admin side of the synchronization address, if there are more than one, use "," split;

  • Create a scheduling thread pool, with each admin assigned one to perform scheduled tasks;

  • Create ShenyuWebsocketClient, assign one to each admin, set up websocket communication with admin;

  • Start connection with admin end websocket;

  • Executes a scheduled task every 10 seconds. The main function is to determine whether the websocket connection has been disconnected, if so, try to reconnect. If not, a ping-pong test is performed.


/** * Websocket sync data service. */@Slf4jpublic class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
    private final List<WebSocketClient> clients = new ArrayList<>();
    private final ScheduledThreadPoolExecutor executor;
    /**     * Instantiates a new Websocket sync cache.     * @param websocketConfig      the websocket config     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers  the meta data subscribers     * @param authDataSubscribers  the auth data subscribers     */    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,                                    final PluginDataSubscriber pluginDataSubscriber,                                    final List<MetaDataSubscriber> metaDataSubscribers,                                    final List<AuthDataSubscriber> authDataSubscribers) {        // If there are multiple synchronization addresses on the admin side, use commas (,) to separate them        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");        // Create a scheduling thread pool, one for each admin        executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));        for (String url : urls) {            try {                //Create a WebsocketClient and assign one to each admin                clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));            } catch (URISyntaxException e) {                log.error("websocket url({}) is error", url, e);            }        }        try {            for (WebSocketClient client : clients) {                // Establish a connection with the WebSocket Server                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);                if (success) {                    log.info("websocket connection is successful.....");                } else {                    log.error("websocket connection is error.....");                }
                // Run a scheduled task every 10 seconds                // The main function is to check whether the WebSocket connection is disconnected. If the connection is disconnected, retry the connection.                // If it is not disconnected, the ping-pong test is performed                executor.scheduleAtFixedRate(() -> {                    try {                        if (client.isClosed()) {                            boolean reconnectSuccess = client.reconnectBlocking();                            if (reconnectSuccess) {                                log.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());                            } else {                                log.error("websocket reconnection server[{}] is error.....", client.getURI().toString());                            }                        } else {                            client.sendPing();                            log.debug("websocket send to [{}] ping message successful", client.getURI().toString());                        }                    } catch (InterruptedException e) {                        log.error("websocket connect is error :{}", e.getMessage());                    }                }, 10, 10, TimeUnit.SECONDS);            }            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/        } catch (InterruptedException e) {            log.info("websocket connection...exception....", e);        }
    }
    @Override    public void close() {        // close websocket client        for (WebSocketClient client : clients) {            if (!client.isClosed()) {                client.close();            }        }        // close threadpool        if (Objects.nonNull(executor)) {            executor.shutdown();        }    }}
  • ShenyuWebsocketClient

The WebSocket client created in ShenYu to communicate with the admin side. After the connection is successfully established for the first time, full data is synchronized and incremental data is subsequently synchronized.


/** * The type shenyu websocket client. */@Slf4jpublic final class ShenyuWebsocketClient extends WebSocketClient {        private volatile boolean alreadySync = Boolean.FALSE;        private final WebsocketDataHandler websocketDataHandler;        /**     * Instantiates a new shenyu websocket client.     * @param serverUri             the server uri       * @param pluginDataSubscriber the plugin data subscriber      * @param metaDataSubscribers   the meta data subscribers      * @param authDataSubscribers   the auth data subscribers      */    public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {        super(serverUri);        this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);    }
    // Execute after the connection is successfully established    @Override    public void onOpen(final ServerHandshake serverHandshake) {        // To prevent re-execution when reconnecting, use alreadySync to determine        if (!alreadySync) {            // Synchronize all data, type MYSELF            send(DataEventTypeEnum.MYSELF.name());            alreadySync = true;        }    }
    // Execute after receiving the message    @Override    public void onMessage(final String result) {        // handle data        handleResult(result);    }        // Execute after shutdown    @Override    public void onClose(final int i, final String s, final boolean b) {        this.close();    }        // Execute after error    @Override    public void onError(final Exception e) {        this.close();    }        @SuppressWarnings("ALL")    private void handleResult(final String result) {        // Data deserialization        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);        // Which data types, plugins, selectors, rules...        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());        // Which operation type, update, delete...        String eventType = websocketData.getEventType();        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // handle data        websocketDataHandler.executor(groupEnum, json, eventType);    }}

5. Summary#

This paper through a practical case, the data synchronization principle of websocket source code analysis. The main knowledge points involved are as follows:

  • WebSocket supports bidirectional communication and has good performance. It is recommended.

  • Complete event publishing and listening via Spring;

  • Support multiple synchronization strategies through abstract DataChangedListener interface, interface oriented programming;

  • Use factory mode to create WebsocketDataHandler to handle different data types;

  • Implement AbstractDataHandler using template method design patterns to handle general operation types;

  • Use singleton design pattern to cache data class BaseDataCache;

  • Loading of configuration classes via conditional assembly of SpringBoot and starter loading mechanism.

ZooKeeper Data Synchronization Source Code Analysis

· 18 min read
Apache ShenYu Committer

Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.

In ShenYu gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for ZooKeeper, WebSocket, http long poll, Nacos, etcd and Consul. The main content of this article is based on WebSocket data synchronization source code analysis.

This paper based on shenyu-2.4.0 version of the source code analysis, the official website of the introduction of please refer to the Data Synchronization Design .

1. About ZooKeeper#

Apache ZooKeeper is a software project of the Apache Software Foundation that provides open source distributed configuration services, synchronization services, and naming registries for large-scale distributed computing. ZooKeeper nodes store their data in a hierarchical namespace, much like a file system or a prefix tree structure. Clients can read and write on nodes and thus have a shared configuration service in this way.

2. Admin Data Sync#

We traced the source code from a real case, such as updating a selector data in the Divide plugin to a weight of 90 in a background administration system:

2.1 Accept Data#

  • SelectorController.createSelector()

Enter the createSelector() method of the SelectorController class, which validates data, adds or updates data, and returns results.

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController {        @PutMapping("/{id}")    public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) {        // set the current selector data ID        selectorDTO.setId(id);        // create or update operation        Integer updateCount = selectorService.createOrUpdate(selectorDTO);        // return result         return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount);    }        // ......}

2.2 Handle Data#

  • SelectorServiceImpl.createOrUpdate()

Convert data in the SelectorServiceImpl class using the createOrUpdate() method, save it to the database, publish the event, update upstream.

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService {    // eventPublisher    private final ApplicationEventPublisher eventPublisher;        @Override    @Transactional(rollbackFor = Exception.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        // build data DTO --> DO        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        // insert or update ?        if (StringUtils.isEmpty(selectorDTO.getId())) {            //  insert into data            selectorCount = selectorMapper.insertSelective(selectorDO);            // insert into condition data            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });            // check selector add            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());                dataPermissionDTO.setDataId(selectorDO.getId());                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));            }
        } else {            // update data, delete and then insert            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        // publish event        publishEvent(selectorDO, selectorConditionDTOs);
        // update upstream        updateDivideUpstream(selectorDO);        return selectorCount;    }        // ......    }

In the Service class to persist data, i.e. to the database, this should be familiar, not expand. The update upstream operation is analyzed in the corresponding section below, focusing on the publish event operation, which performs data synchronization.

The logic of the publishEvent() method is to find the plugin corresponding to the selector, build the conditional data, and publish the change data.

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        // find plugin of selector        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        // build condition data        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // publish event        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

Change data released by eventPublisher.PublishEvent() is complete, the eventPublisher object is a ApplicationEventPublisher class, The fully qualified class name is org.springframework.context.ApplicationEventPublisher. Here we see that publishing data is done through Spring related functionality.

ApplicationEventPublisher

When a state change, the publisher calls ApplicationEventPublisher of publishEvent method to release an event, Spring container broadcast event for all observers, The observer's onApplicationEvent method is called to pass the event object to the observer. There are two ways to call publishEvent method, one is to implement the interface by the container injection ApplicationEventPublisher object and then call the method, the other is a direct call container, the method of two methods of publishing events not too big difference.

  • ApplicationEventPublisher: publish event;
  • ApplicationEvent: Spring event, record the event source, time, and data;
  • ApplicationListener: event listener, observer.

In Spring event publishing mechanism, there are three objects,

An object is a publish event ApplicationEventPublisher, in ShenYu through the constructor in the injected a eventPublisher.

The other object is ApplicationEvent , inherited from ShenYu through DataChangedEvent, representing the event object.

public class DataChangedEvent extends ApplicationEvent {//......}

The last object is ApplicationListener in ShenYu in through DataChangedEventDispatcher class implements this interface, as the event listener, responsible for handling the event object.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

2.3 Dispatch Data#

  • DataChangedEventDispatcher.onApplicationEvent()

Released when the event is completed, will automatically enter the DataChangedEventDispatcher class onApplicationEvent() method of handling events.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * This method is called when there are data changes   * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // Iterate through the data change listener (usually using a data synchronization approach is fine)      for (DataChangedListener listener : listeners) {            // What kind of data has changed        switch (event.getGroupKey()) {                case APP_AUTH: // app auth data                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // plugin data                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // rule data                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // selector data                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // metadata                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // other types throw exception                  throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    }

When there is a data change, the onApplicationEvent method is called and all the data change listeners are iterated to determine the data type and handed over to the appropriate data listener for processing.

ShenYu groups all the data into five categories: APP_AUTH, PLUGIN, RULE, SELECTOR and META_DATA.

Here the data change listener (DataChangedListener) is an abstraction of the data synchronization policy. Its concrete implementation is:

These implementation classes are the synchronization strategies currently supported by ShenYu:

  • WebsocketDataChangedListener: data synchronization based on Websocket;
  • ZookeeperDataChangedListener:data synchronization based on Zookeeper;
  • ConsulDataChangedListener: data synchronization based on Consul;
  • EtcdDataDataChangedListener:data synchronization based on etcd;
  • HttpLongPollingDataChangedListener:data synchronization based on http long polling;
  • NacosDataChangedListener:data synchronization based on nacos;

Given that there are so many implementation strategies, how do you decide which to use?

Because this paper is based on zookeeper data synchronization source code analysis, so here to ZookeeperDataChangedListener as an example, the analysis of how it is loaded and implemented.

A global search in the source code project shows that its implementation is done in the DataSyncConfiguration class.

/** * Data Sync Configuration * By springboot conditional assembly * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {            /**     * zookeeper data sunc     * The type Zookeeper listener.     */    @Configuration    @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url")  // The condition property is loaded only when it is met    @Import(ZookeeperConfiguration.class)    static class ZookeeperListener {
        /**         * Config event listener data changed listener.         * @param zkClient the zk client         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(ZookeeperDataChangedListener.class)        public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {            return new ZookeeperDataChangedListener(zkClient);        }
        /**         * Zookeeper data init zookeeper data init.         * @param zkClient        the zk client         * @param syncDataService the sync data service         * @return the zookeeper data init         */        @Bean        @ConditionalOnMissingBean(ZookeeperDataInit.class)        public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {            return new ZookeeperDataInit(zkClient, syncDataService);        }    }        // other code is omitted......}

This configuration class is implemented through the SpringBoot conditional assembly class. The ZookeeperListener class has several annotations:

  • @Configuration: Configuration file, application context;

  • @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url"): attribute condition. The configuration class takes effect only when the condition is met. That is, when we have the following configuration, ZooKeeper is used for data synchronization.

    shenyu:    sync:     zookeeper:          url: localhost:2181          sessionTimeout: 5000          connectionTimeout: 2000
  • @Import(ZookeeperConfiguration.class):import ZookeeperConfiguration;

  @EnableConfigurationProperties(ZookeeperProperties.class)  // enable zookeeper properties  public class ZookeeperConfiguration {
    /**     * register zkClient in spring ioc.     * @param zookeeperProp the zookeeper configuration     * @return ZkClient {@linkplain ZkClient}        */      @Bean      @ConditionalOnMissingBean(ZkClient.class)      public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {        return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout()); // 读取zk配置信息,并创建zkClient      }  }
@Data@ConfigurationProperties(prefix = "shenyu.sync.zookeeper") // zookeeper propertiespublic class ZookeeperProperties {
    private String url;
    private Integer sessionTimeout;
    private Integer connectionTimeout;
    private String serializer;}

When we take the initiative to configuration, use the zookeeper data synchronization, zookeeperDataChangedListener is generated. So in the event handler onApplicationEvent(), it goes to the corresponding listener. In our case, it is a selector data update, data synchronization is zookeeper, so, the code will enter the ZookeeperDataChangedListener selector data change process.

    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // Iterate through the data change listener (usually using a data synchronization approach is fine)        for (DataChangedListener listener : listeners) {            // what kind of data has changed         switch (event.getGroupKey()) {                                    // other code logic is omitted                                    case SELECTOR:   // selector data                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // In our case, will enter the ZookeeperDataChangedListener selector data change process                    break;         }    }

2.4 Zookeeper Data Changed Listener#

  • ZookeeperDataChangedListener.onSelectorChanged()

In the onSelectorChanged() method, determine the type of action, whether to refresh synchronization or update or create synchronization. Determine whether the node is in zk based on the current selector data.


/** * use ZooKeeper to publish change data */public class ZookeeperDataChangedListener implements DataChangedListener {        // The selector information changed    @Override    public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {        // refresh        if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {            String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());            deleteZkPathRecursive(selectorParentPath);        }        // changed data        for (SelectorData data : changed) {            // build selector real path            String selectorRealPath = DefaultPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());            // delete            if (eventType == DataEventTypeEnum.DELETE) {                deleteZkPath(selectorRealPath);                continue;            }            // selector parent path            String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(data.getPluginName());            // create parent node            createZkNode(selectorParentPath);            // insert or update data            insertZkNode(selectorRealPath, data);        }    }
    // create zk node    private void createZkNode(final String path) {        // create only if it does not exist        if (!zkClient.exists(path)) {            zkClient.createPersistent(path, true);        }    }
    // insert zk node    private void insertZkNode(final String path, final Object data) {        // create zk node        createZkNode(path);        // write data by zkClient         zkClient.writeData(path, null == data ? "" : GsonUtils.getInstance().toJson(data));    }    }

As long as the changed data is correctly written to the zk node, the admin side of the operation is complete. ShenYu uses zk for data synchronization, zk nodes are carefully designed.

In our current case, updating one of the selector data in the Divide plugin with a weight of 90 updates specific nodes in the graph.

We series the above update flow with a sequence diagram.

3. Gateway Data Sync#

Assume that the ShenYu gateway is already running properly, and the data synchronization mode is also Zookeeper. How does the gateway receive and process the selector data after updating it on the admin side and sending the changed data to ZK? Let's continue our source code analysis to find out.

3.1 ZkClient Accept Data#

  • ZkClient.subscribeDataChanges()

There is a ZookeeperSyncDataService class on the gateway, which subscribing to the data node through ZkClient and can sense when the data changes.

/** * ZookeeperSyncDataService */public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {    private void subscribeSelectorDataChanges(final String path) {       // zkClient subscribe data         zkClient.subscribeDataChanges(path, new IZkDataListener() {            @Override            public void handleDataChange(final String dataPath, final Object data) {                cacheSelectorData(GsonUtils.getInstance().fromJson(data.toString(), SelectorData.class)); // zk node data changed            }
            @Override            public void handleDataDeleted(final String dataPath) {                unCacheSelectorData(dataPath);  // zk node data deleted            }        });    }     // ...}

ZooKeeper's Watch mechanism notifies subscribing clients of node changes. In our case, updating the selector information goes to the handleDataChange() method. cacheSelectorData() is used to process data.

3.2 Handle Data#

  • ZookeeperSyncDataService.cacheSelectorData()

The data is not null, and caching the selector data is again handled by PluginDataSubscriber.

    private void cacheSelectorData(final SelectorData selectorData) {        Optional.ofNullable(selectorData)                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));    }

PluginDataSubscriber is an interface, it is only a CommonPluginDataSubscriber implementation class, responsible for data processing plugin, selector and rules.

3.3 Common Plugin Data Subscriber#

  • PluginDataSubscriber.onSelectorSubscribe()

It has no additional logic and calls the subscribeDataHandler() method directly. Within methods, there are data types (plugins, selectors, or rules) and action types (update or delete) to perform different logic.

/** * The common plugin data subscriber, responsible for handling all plug-in, selector, and rule information */public class CommonPluginDataSubscriber implements PluginDataSubscriber {    //......     // handle selector data    @Override    public void onSelectorSubscribe(final SelectoData selectorData) {        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);    }            // A subscription data handler that handles updates or deletions of data    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {        Optional.ofNullable(classData).ifPresent(data -> {            // plugin data            if (data instanceof PluginData) {                PluginData pluginData = (PluginData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                     BaseDataCache.getInstance().cachePluginData(pluginData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removePluginData(pluginData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));                }            } else if (data instanceof SelectorData) {  // selector data                SelectorData selectorData = (SelectorData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                    BaseDataCache.getInstance().cacheSelectData(selectorData);                    // If each plugin has its own processing logic, then do it                     Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removeSelectData(selectorData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));                }            } else if (data instanceof RuleData) {  // rule data                RuleData ruleData = (RuleData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                    BaseDataCache.getInstance().cacheRuleData(ruleData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));                } else if (dataType == DataEventTypeEnum.DELETE) { // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removeRuleData(ruleData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));                }            }        });    }    }

3.4 Data cached to Memory#

Adding a selector will enter the following logic:

// save the data to gateway memoryBaseDataCache.getInstance().cacheSelectData(selectorData);// If each plugin has its own processing logic, then do itOptional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

One is to save the data to the gateway's memory. BaseDataCache is the class that ultimately caches data, implemented in a singleton pattern. The selector data is stored in the SELECTOR_MAP Map. In the subsequent use, also from this data.

public final class BaseDataCache {    // private instance    private static final BaseDataCache INSTANCE = new BaseDataCache();    // private constructor    private BaseDataCache() {    }        /**     * Gets instance.     *  public method     * @return the instance     */    public static BaseDataCache getInstance() {        return INSTANCE;    }        /**      * A Map of the cache selector data     * pluginName -> SelectorData.     */    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();        public void cacheSelectData(final SelectorData selectorData) {        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);    }           /**     * cache selector data.     * @param data the selector data     */    private void selectorAccept(final SelectorData data) {        String key = data.getPluginName();        if (SELECTOR_MAP.containsKey(key)) { // Update operation, delete before insert            List<SelectorData> existList = SELECTOR_MAP.get(key);            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());            resultList.add(data);            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());            SELECTOR_MAP.put(key, collect);        } else {  // Add new operations directly to Map            SELECTOR_MAP.put(key, Lists.newArrayList(data));        }    }    }

Second, if each plugin has its own processing logic, then do it. Through the IDEA editor, you can see that after adding a selector, there are the following plugins and processing. We're not going to expand it here.

After the above source tracking, and through a practical case, in the admin end to update a selector data, the ZooKeeper data synchronization process analysis is clear.

Let's series the data synchronization process on the gateway side through the sequence diagram:

The data synchronization process has been analyzed. In order to prevent the synchronization process from being interrupted, other logic is ignored during the analysis. We also need to analyze the process of Admin synchronization data initialization and gateway synchronization operation initialization.

4. Admin Data Sync initialization#

When admin starts, the current data will be fully synchronized to zk, the implementation logic is as follows:


/** * Zookeeper data init */public class ZookeeperDataInit implements CommandLineRunner {
    private final ZkClient zkClient;
    private final SyncDataService syncDataService;
    /**     * Instantiates a new Zookeeper data init.     *     * @param zkClient        the zk client     * @param syncDataService the sync data service     */    public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {        this.zkClient = zkClient;        this.syncDataService = syncDataService;    }
    @Override    public void run(final String... args) {        String pluginPath = DefaultPathConstants.PLUGIN_PARENT;        String authPath = DefaultPathConstants.APP_AUTH_PARENT;        String metaDataPath = DefaultPathConstants.META_DATA;        // Determine whether data exists in zk        if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {            syncDataService.syncAll(DataEventTypeEnum.REFRESH);        }    }}

Check whether there is data in zk, if not, then synchronize.

ZookeeperDataInit implements the CommandLineRunner interface. It is an interface provided by SpringBoot that executes the run() method after all Spring Beans initializations and is often used for initialization operations in a project.

  • SyncDataService.syncAll()

Query data from the database, and then perform full data synchronization, all authentication information, plugin information, selector information, rule information, and metadata information. Synchronous events are published primarily through eventPublisher. After publishing the event via publishEvent(), the ApplicationListener performs the event change operation. In ShenYu is mentioned in DataChangedEventDispatcher.

@Servicepublic class SyncDataServiceImpl implements SyncDataService {    // eventPublisher    private final ApplicationEventPublisher eventPublisher;         /***     * sync all data     * @param type the type     * @return     */    @Override    public boolean syncAll(final DataEventTypeEnum type) {        // app auth data        appAuthService.syncData();        // plugin data        List<PluginData> pluginDataList = pluginService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));        // selector data        List<SelectorData> selectorDataList = selectorService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));        // rule data        List<RuleData> ruleDataList = ruleService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));        // metadata        metaDataService.syncData();        return true;    }    }

5. Gateway Data Sync Init#

The initial operation of data synchronization on the gateway side is mainly the node in the subscription zk. When there is a data change, the changed data will be received. This relies on the Watch mechanism of ZooKeeper. In ShenYu, the one responsible for zk data synchronization is ZookeeperSyncDataService, also mentioned earlier.

The function logic of ZookeeperSyncDataService is completed in the process of instantiation: the subscription to Shenyu data synchronization node in zk is completed. Subscription here is divided into two kinds, one kind is existing node data updated above, through this zkClient.subscribeDataChanges() method; Another kind is under the current node, add or delete nodes change namely child nodes, it through zkClient.subscribeChildChanges() method.

ZookeeperSyncDataService code is a bit too much, here we use plugin data read and subscribe to track, other types of data operation principle is the same.


/** *  zookeeper sync data service */public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {    // At instantiation time, the data is read from the ZK and the node is subscribed    public ZookeeperSyncDataService(/* omit the construction argument */ ) {        this.zkClient = zkClient;        this.pluginDataSubscriber = pluginDataSubscriber;        this.metaDataSubscribers = metaDataSubscribers;        this.authDataSubscribers = authDataSubscribers;        // watch plugin, selector and rule data        watcherData();        // watch app auth data        watchAppAuth();        // watch metadata        watchMetaData();    }        private void watcherData() {        // plugin node path        final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;        // all plugin nodes        List<String> pluginZKs = zkClientGetChildren(pluginParent);        for (String pluginName : pluginZKs) {            // watch plugin, selector, rule data node            watcherAll(pluginName);        }        //subscribing to child nodes (adding or removing a plugin)        zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {            if (CollectionUtils.isNotEmpty(currentChildren)) {                for (String pluginName : currentChildren) {                    // you need to subscribe to all plugin, selector, and rule data for the child node                      watcherAll(pluginName);                }            }        });    }        private void watcherAll(final String pluginName) {        // watch plugin        watcherPlugin(pluginName);        // watch selector        watcherSelector(pluginName);        // watch rule        watcherRule(pluginName);    }
    private void watcherPlugin(final String pluginName) {        // plugin path        String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);        // create if not exist        if (!zkClient.exists(pluginPath)) {            zkClient.createPersistent(pluginPath, true);        }        // read the current node data on zk and deserialize it        PluginData pluginData = null == zkClient.readData(pluginPath) ? null                : GsonUtils.getInstance().fromJson((String) zkClient.readData(pluginPath), PluginData.class);        // cached into gateway memory        cachePluginData(pluginData);        // subscribe plugin data        subscribePluginDataChanges(pluginPath, pluginName);    }       private void cachePluginData(final PluginData pluginData) {    //omit implementation logic, is actually the CommonPluginDataSubscriber operation, can connect with the front    }        private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {        // subscribe data changes        zkClient.subscribeDataChanges(pluginPath, new IZkDataListener() {
            @Override            public void handleDataChange(final String dataPath, final Object data) {  // update                 //omit implementation logic, is actually the CommonPluginDataSubscriber operation, can connect with the front            }
            @Override            public void handleDataDeleted(final String dataPath) {   // delete                  // Omit implementation logic, is actually the CommonPluginDataSubscriber operation, can connect with the front
            }        });    }    }    

The above source code is given comments, I believe you can understand. The main logic for subscribing to plug-in data is as follows:

  1. Create the current plugin path
  2. Create a path if it does not exist
  3. Read the current node data on zK and deserialize it
  4. The plugin data is cached in the gateway memory
  5. Subscribe to the plug-in node

6. Summary#

This paper through a practical case, Zookeeper data synchronization principle source code analysis. The main knowledge points involved are as follows:

  • Data synchronization based on ZooKeeper is mainly implemented through watch mechanism;

  • Complete event publishing and listening via Spring;

  • Support multiple synchronization strategies through abstract DataChangedListener interface, interface oriented programming;

  • Use singleton design pattern to cache data class BaseDataCache;

  • Loading of configuration classes via conditional assembly of SpringBoot and starter loading mechanism.