Skip to main content

Apollo Data Synchronization Source Code Analysis

· 12 min read
Apache ShenYu Contributor

This article is based on the source code analysis of version 'shenyu-2.6.1'. Please refer to the official website for an introduction Data Synchronization Design.

Admin management#

Understand the overall process through the process of adding plugins

Receive Data#

  • PluginController.createPlugin()

Enter the createPlugin() method in the PluginController class, which is responsible for data validation, adding or updating data, and returning result information.

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/plugin")public class PluginController {
  @PostMapping("")  @RequiresPermissions("system:plugin:add")  public ShenyuAdminResult createPlugin(@Valid @ModelAttribute final PluginDTO pluginDTO) {      // Call pluginService.createOrUpdate for processing logic      return ShenyuAdminResult.success(pluginService.createOrUpdate(pluginDTO));  }        // ......}

Processing data#

  • PluginServiceImpl.createOrUpdate() -> PluginServiceImpl.create()

Use the create() method in the PluginServiceImpl class to convert data, save it to the database, and publish events.

@RequiredArgsConstructor@Servicepublic class PluginServiceImpl implements SelectorService {    // Event publishing object pluginEventPublisher    private final PluginEventPublisher pluginEventPublisher;
   private String create(final PluginDTO pluginDTO) {      // Check if there is a corresponding plugin      Assert.isNull(pluginMapper.nameExisted(pluginDTO.getName()), AdminConstants.PLUGIN_NAME_IS_EXIST);      // check if Customized plugin jar      if (!Objects.isNull(pluginDTO.getFile())) {        Assert.isTrue(checkFile(Base64.decode(pluginDTO.getFile())), AdminConstants.THE_PLUGIN_JAR_FILE_IS_NOT_CORRECT_OR_EXCEEDS_16_MB);      }      // Create plugin object      PluginDO pluginDO = PluginDO.buildPluginDO(pluginDTO);      // Insert object into database      if (pluginMapper.insertSelective(pluginDO) > 0) {        // publish create event. init plugin data        pluginEventPublisher.onCreated(pluginDO);      }      return ShenyuResultMessage.CREATE_SUCCESS;  }            // ......    }

Complete the data persistence operation in the PluginServiceImpl class, that is, save the data to the database and publish events through pluginEventPublisher.

The logic of the pluginEventPublisher.onCreated method is to publish the changed event:

    @Overridepublic void onCreated(final PluginDO plugin) {        // Publish DataChangeEvent events: event grouping (plugins, selectors, rules), event types (create, delete, update), changed data        publisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, DataEventTypeEnum.CREATE,        Collections.singletonList(PluginTransfer.INSTANCE.mapToData(plugin))));        // Publish PluginCreatedEvent        publish(new PluginCreatedEvent(plugin, SessionUtil.visitorName()));}

Publishing change data is completed through publisher.publishEvent(), which is an 'Application EventPublisher' object with the fully qualified name of 'org. springframework. contentxt.' Application EventPublisher `. From here, we know that publishing data is accomplished through the Spring related features.

About ApplicationEventPublisher

When there is a state change, the publisher calls the publishEvent method of ApplicationEventPublisher to publish an event, the Spring container broadcasts the event to all observers, and calls the observer's onApplicationEvent method to pass the event object to the observer. There are two ways to call the publishEvent method. One is to implement the interface, inject the ApplicationEventPublisher object into the container, and then call its method. The other is to call the container directly. There is not much difference between the two methods to publish events.

  • ApplicationEventPublisher:Publish events;
  • ApplicationEventSpring events,Record the source, time, and data of the event;
  • ApplicationListener:Event listeners, observers;

In the event publishing mechanism of Spring, there are three objects,

One is the ApplicationEventPublisher that publishes events, injecting an publisher through a constructor in ShenYu.

The other object is ApplicationEvent, which is inherited from ShenYu through DataChangedEvent, representing the event object

public class DataChangedEvent extends ApplicationEvent {//......}

The last one is ApplicationListener, which is implemented in ShenYu through the DataChangedEventDispatcher class as a listener for events, responsible for handling event objects.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

Distribute data#

  • DataChangedEventDispatcher.onApplicationEvent()

After the event is published, it will automatically enter the onApplicationEvent() method in the DataChangedEventDispatcher class for event processing.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * When there is a data change, call this method     * @param event     */  @Override  @SuppressWarnings("unchecked")  public void onApplicationEvent(final DataChangedEvent event) {    // Traverse data change listeners (only ApolloDataChangedListener will be registered here)    for (DataChangedListener listener : listeners) {      // Forward according to different grouping types      switch (event.getGroupKey()) {        case APP_AUTH: // authentication information          listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());          break;        case PLUGIN: // Plugin events          // Calling the registered listener object          listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());          break;        case RULE: // Rule events          listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());          break;        case SELECTOR: // Selector event          listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());          break;        case META_DATA: // Metadata events          listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());          break;        case PROXY_SELECTOR: // Proxy selector event          listener.onProxySelectorChanged((List<ProxySelectorData>) event.getSource(), event.getEventType());          break;        case DISCOVER_UPSTREAM: // Registration discovery of downstream list events          listener.onDiscoveryUpstreamChanged((List<DiscoverySyncData>) event.getSource(), event.getEventType());          applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnUpstreamChanged((List<DiscoverySyncData>) event.getSource(), event.getEventType());          break;        default:          throw new IllegalStateException("Unexpected value: " + event.getGroupKey());      }    }  }    }

When there is a data change, call the onApplicationEvent method, then traverse all data change listeners, determine which data type it is, and hand it over to the corresponding data listeners for processing.

ShenYu has grouped all data into the following types: authentication information, plugin information, rule information, selector information, metadata, proxy selector, and downstream event discovery.

The Data Change Listener here is an abstraction of the data synchronization strategy, processed by specific implementations, and different listeners are processed by different implementations. Currently, Apollo is being analyzed Listening, so here we only focus on ApolloDataChangedListener.

// Inheriting AbstractNodeDataChangedListenerpublic class ApolloDataChangedListener extends AbstractNodeDataChangedListener {    }

ApolloDataChangedListener inherits the AbstractNodeDataChangedListener class, which mainly uses key as the base class for storage, such as Apollo, Nacos, etc., while others such as Zookeeper Consul, etc. are searched in a hierarchical manner using a path.

// Using key as the base class for finding storage methodspublic abstract class AbstractNodeDataChangedListener implements DataChangedListener {         protected AbstractNodeDataChangedListener(final ChangeData changeData) {      this.changeData = changeData;    }}

AbstractNodeDataChangedListener receives ChangeData as a parameter, which defines the key names for each data stored in Apollo. The data stored in Apollo includes the following data:

  • Plugin(plugin)
  • Selector(selector)
  • Rules(rule)
  • Authorization(auth)
  • Metadata(meta)
  • Proxy selector(proxy.selector)
  • Downstream List (discovery)

These information are specified by the ApolloDataChangedListener constructor:

public class ApolloDataChangedListener extends AbstractNodeDataChangedListener {  public ApolloDataChangedListener(final ApolloClient apolloClient) {    // Configure prefixes for several types of grouped data    super(new ChangeData(ApolloPathConstants.PLUGIN_DATA_ID,            ApolloPathConstants.SELECTOR_DATA_ID,            ApolloPathConstants.RULE_DATA_ID,            ApolloPathConstants.AUTH_DATA_ID,            ApolloPathConstants.META_DATA_ID,            ApolloPathConstants.PROXY_SELECTOR_DATA_ID,            ApolloPathConstants.DISCOVERY_DATA_ID));    // Manipulating objects of Apollo    this.apolloClient = apolloClient;  }}

DataChangedListener defines the following methods:

// Data Change Listenerpublic interface DataChangedListener {
    // Call when authorization information changes    default void onAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType) {    }
    // Called when plugin information changes    default void onPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType) {    }
    // Called when selector information changes    default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) {    }         // Called when metadata information changes    default void onMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType) {
    }
    // Call when rule information changes    default void onRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType) {    }
    // Called when proxy selector changes    default void onProxySelectorChanged(List<ProxySelectorData> changed, DataEventTypeEnum eventType) {    }    // Called when downstream information changes are discovered    default void onDiscoveryUpstreamChanged(List<DiscoverySyncData> changed, DataEventTypeEnum eventType) {    }
}

When the plugin is processed by DataChangedEventDispatcher, the method listener.onPluginChanged is called. Next, analyze the logic of the object and implement the processing by AbstractNodeDataChangedListener:

public abstract class AbstractNodeDataChangedListener implements DataChangedListener {  @Override  public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {    //Configure prefix as plugin.    final String configKeyPrefix = changeData.getPluginDataId() + DefaultNodeConstants.JOIN_POINT;    this.onCommonChanged(configKeyPrefix, changed, eventType, PluginData::getName, PluginData.class);    LOG.debug("[DataChangedListener] PluginChanged {}", configKeyPrefix);  }}

Firstly, the key prefix for constructing configuration data is: plugin., Call onCommonChanged again for unified processing:

private <T> void onCommonChanged(final String configKeyPrefix, final List<T> changedList,                                     final DataEventTypeEnum eventType, final Function<? super T, ? extends String> mapperToKey,                                     final Class<T> tClass) {        // Avoiding concurrent operations on list nodes        final ReentrantLock reentrantLock = listSaveLockMap.computeIfAbsent(configKeyPrefix, key -> new ReentrantLock());        try {            reentrantLock.lock();            // Current incoming plugin list            final List<String> changeNames = changedList.stream().map(mapperToKey).collect(Collectors.toList());            switch (eventType) {                // Delete Operation                case DELETE:                    // delete plugin.${pluginName}                    changedList.stream().map(mapperToKey).forEach(removeKey -> {                        delConfig(configKeyPrefix + removeKey);                    });                    // Remove the corresponding plugin name from plugin. list                    // The plugin.list records the currently enabled list                    delChangedData(configKeyPrefix, changeNames);                    break;                case REFRESH:                case MYSELF:                    // Overload logic                    // Get a list of all plugins in plugin.list                    final List<String> configDataNames = this.getConfigDataNames(configKeyPrefix);                    // Update each currently adjusted plug-in in turn                    changedList.forEach(changedData -> {                        // Publish Configuration                        publishConfig(configKeyPrefix + mapperToKey.apply(changedData), changedData);                    });                    // If there is more data in the currently stored list than what is currently being passed in, delete the excess data                    if (configDataNames != null && configDataNames.size() > changedList.size()) {                        // Kick out the currently loaded data                        configDataNames.removeAll(changeNames);                        // Delete cancelled data one by one                        configDataNames.forEach(this::delConfig);                    }                    // Update list data again                    publishConfig(configKeyPrefix + DefaultNodeConstants.LIST_STR, changeNames);                    break;                default:                    // Add or update                    changedList.forEach(changedData -> {                        publishConfig(configKeyPrefix + mapperToKey.apply(changedData), changedData);                    });                    // Update the newly added plugin                    putChangeData(configKeyPrefix, changeNames);                    break;            }        } catch (Exception e) {            LOG.error("AbstractNodeDataChangedListener onCommonMultiChanged error ", e);        } finally {            reentrantLock.unlock();        }    }

In the above logic, it actually includes the handling of full overloading (REFRESH, MYSELF) and increment (Delete, UPDATE, CREATE)

The plugin mainly includes two nodes:

  • plugin.list List of currently effective plugins
  • plugin.${plugin.name} Detailed information on specific plugins Finally, write the data corresponding to these two nodes into Apollo.

Data initialization#

After starting admin, the current data information will be fully synchronized to Apollo, which is implemented by ApolloDataChangedInit:

// Inheriting AbstractDataChangedInitpublic class ApolloDataChangedInit extends AbstractDataChangedInit {    // Apollo operation object    private final ApolloClient apolloClient;        public ApolloDataChangedInit(final ApolloClient apolloClient) {        this.apolloClient = apolloClient;    }        @Override    protected boolean notExist() {        // Check if nodes such as plugin, auth, meta, proxy.selector exist        // As long as one does not exist, it enters reload (these nodes will not be created, why check once?)        return Stream.of(ApolloPathConstants.PLUGIN_DATA_ID, ApolloPathConstants.AUTH_DATA_ID, ApolloPathConstants.META_DATA_ID, ApolloPathConstants.PROXY_SELECTOR_DATA_ID).allMatch(                this::dataIdNotExist);    }
    /**     * Data id not exist boolean.     *     * @param pluginDataId the plugin data id     * @return the boolean     */    private boolean dataIdNotExist(final String pluginDataId) {        return Objects.isNull(apolloClient.getItemValue(pluginDataId));    }}

Check if there is data in apollo, and if it does not exist, synchronize it. There is a bug here because the key determined here will not be created during synchronization, which will cause data to be reloaded every time it is restarted. PR#5435

ApolloDataChangedInit implements the CommandLineRunner interface. It is an interface provided by springboot that executes the run() method after all Spring Beans are initialized. It is commonly used for initialization operations in projects.

  • SyncDataService.syncAll()

Query data from the database, then perform full data synchronization, including all authentication information, plugin information, rule information, selector information, metadata, proxy selector, and discover downstream events. Mainly, synchronization events are published through eventPublisher. After publishing events through publishEvent(), ApplicationListener performs event change operations, which is referred to as DataChangedEventDispatcher in ShenYu.

@Servicepublic class SyncDataServiceImpl implements SyncDataService {    // Event Publishing    private final ApplicationEventPublisher eventPublisher;         /***     * Full data synchronization     * @param type the type     * @return     */     @Override     public boolean syncAll(final DataEventTypeEnum type) {         // Synchronize auth data         appAuthService.syncData();         // Synchronize plugin data         List<PluginData> pluginDataList = pluginService.listAll();         //Notify subscribers through the Spring publish/subscribe mechanism (publishing DataChangedEvent)         //Unified monitoring by DataChangedEventDispatcher         //DataChangedEvent comes with configuration grouping type, current operation type, and data         eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));         // synchronizing selector         List<SelectorData> selectorDataList = selectorService.listAll();         eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));         // Synchronization rules         List<RuleData> ruleDataList = ruleService.listAll();         eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));         // Synchronization metadata         metaDataService.syncData();         // Synchronization Downstream List         discoveryService.syncData();         return true;     }    }

Bootstrap synchronization operation initialization#

The data synchronization initialization operation on the gateway side mainly involves subscribing to nodes in apollo, and receiving changed data when there are changes. This depends on the listener mechanism of apollo. In ShenYu, the person responsible for Apollo data synchronization is ApolloDataService. The functional logic of Apollo DataService is completed during the instantiation process: subscribe to the shenyu data synchronization node in Apollo. Implement through the configService.addChangeListener() method;

public class ApolloDataService extends AbstractNodeDataSyncService implements SyncDataService {    public ApolloDataService(final Config configService, final PluginDataSubscriber pluginDataSubscriber,                             final List<MetaDataSubscriber> metaDataSubscribers,                             final List<AuthDataSubscriber> authDataSubscribers,                             final List<ProxySelectorDataSubscriber> proxySelectorDataSubscribers,                             final List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) {        // Configure the prefix for listening        super(new ChangeData(ApolloPathConstants.PLUGIN_DATA_ID,                        ApolloPathConstants.SELECTOR_DATA_ID,                        ApolloPathConstants.RULE_DATA_ID,                        ApolloPathConstants.AUTH_DATA_ID,                        ApolloPathConstants.META_DATA_ID,                        ApolloPathConstants.PROXY_SELECTOR_DATA_ID,                        ApolloPathConstants.DISCOVERY_DATA_ID),                pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);        this.configService = configService;        // Start listening        // Note: The Apollo method is only responsible for obtaining data from Apollo and adding it to the local cache, and does not handle listening        startWatch();        // Configure listening        apolloWatchPrefixes();    }}

Firstly, configure the key information that needs to be processed and synchronize it with the admin's key. Next, call the startWatch() method to process data acquisition and listening. But in the implementation of Apollo, this method is only responsible for handling data retrieval and setting it to the local cache. Listening is handled by the apolloWatchPrefixes method

private void apolloWatchPrefixes() {        // Defining Listeners        final ConfigChangeListener listener = changeEvent -> {            changeEvent.changedKeys().forEach(changeKey -> {                try {                    final ConfigChange configChange = changeEvent.getChange(changeKey);                    // Skip if not changed                    if (configChange == null) {                        LOG.error("apollo watchPrefixes error configChange is null {}", changeKey);                        return;                    }                    final String newValue = configChange.getNewValue();                    // skip last is "list"                    // If it is a Key at the end of the list, such as plugin.list, skip it because it is only a list that records the effectiveness and will not be cached locally                    final int lastListStrIndex = changeKey.length() - DefaultNodeConstants.LIST_STR.length();                    if (changeKey.lastIndexOf(DefaultNodeConstants.LIST_STR) == lastListStrIndex) {                        return;                    }                    // If it starts with plugin. => Process plugin data                    if (changeKey.indexOf(ApolloPathConstants.PLUGIN_DATA_ID) == 0) {                        // delete                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            // clear cache                            unCachePluginData(changeKey);                        } else {                            // update cache                            cachePluginData(newValue);                        }                        // If it starts with selector. => Process selector data                    } else if (changeKey.indexOf(ApolloPathConstants.SELECTOR_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheSelectorData(changeKey);                        } else {                            cacheSelectorData(newValue);                        }                        // If it starts with rule. => Process rule data                    } else if (changeKey.indexOf(ApolloPathConstants.RULE_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheRuleData(changeKey);                        } else {                            cacheRuleData(newValue);                        }                      // If it starts with auth. => Process auth data                    } else if (changeKey.indexOf(ApolloPathConstants.AUTH_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheAuthData(changeKey);                        } else {                            cacheAuthData(newValue);                        }                        // If it starts with meta. => Process meta data                    } else if (changeKey.indexOf(ApolloPathConstants.META_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheMetaData(changeKey);                        } else {                            cacheMetaData(newValue);                        }                        // If it starts with proxy.selector. => Process proxy.selector meta                    } else if (changeKey.indexOf(ApolloPathConstants.PROXY_SELECTOR_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheProxySelectorData(changeKey);                        } else {                            cacheProxySelectorData(newValue);                        }                        // If it starts with discovery. => Process discovery meta                    } else if (changeKey.indexOf(ApolloPathConstants.DISCOVERY_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheDiscoveryUpstreamData(changeKey);                        } else {                            cacheDiscoveryUpstreamData(newValue);                        }                    }                } catch (Exception e) {                    LOG.error("apollo sync listener change key handler error", e);                }            });        };        watchConfigChangeListener = listener;        // Add listening        configService.addChangeListener(listener, Collections.emptySet(), ApolloPathConstants.pathKeySet());
    }

The logic of loading data from the previous admin will only add two keys to the plugin: plugin.list and plugin.${plugin.name}, while plugin.list is a list of all enabled plugins, and the data for this key is in the There is no data in the local cache, only `plugin${plugin.name} will be focus.

At this point, the synchronization logic of bootstrap in apollo has been analyzed.