Skip to main content

WebSocket Data Synchronization Source Code Analysis

· 22 min read
Apache ShenYu Committer

In ShenYu gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for ZooKeeper, WebSocket, http long poll, Nacos, etcd and Consul. The main content of this article is based on WebSocket data synchronization source code analysis.

This paper based on shenyu-2.4.0 version of the source code analysis, the official website of the introduction of please refer to the Data Synchronization Design .

1. About WebSocket Communication#

The WebSocket protocol was born in 2008 and became an international standard in 2011. It can be two-way communication, the server can take the initiative to push information to the client, the client can also take the initiative to send information to the server. The WebSocket protocol is based on the TCP protocol and belongs to the application layer, with low performance overhead and high communication efficiency. The protocol identifier is ws.

2. Admin Data Sync#

Let's trace the source code from a real case, such as adding a selector data in the background management system:

2.1 Accept Changed Data#

  • SelectorController.createSelector()

Enter the createSelector() method of the SelectorController class, which validates data, adds or updates data, and returns results.

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController {        @PostMapping("")    public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) { // @Valid 数校验        // create or update data        Integer createCount = selectorService.createOrUpdate(selectorDTO);        // return result        return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);    }        // ......}

2.2 Handle Data#

  • SelectorServiceImpl.createOrUpdate()

Convert data in the SelectorServiceImpl class using the createOrUpdate() method, save it to the database, publish the event, update upstream.

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService {    // eventPublisher    private final ApplicationEventPublisher eventPublisher;        @Override    @Transactional(rollbackFor = Exception.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        // build data DTO --> DO        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        // insert or update ?        if (StringUtils.isEmpty(selectorDTO.getId())) {            //  insert into data            selectorCount = selectorMapper.insertSelective(selectorDO);            // insert into condition data            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });            // check selector add            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());                dataPermissionDTO.setDataId(selectorDO.getId());                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));            }
        } else {            // update data, delete and then insert            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        // publish event        publishEvent(selectorDO, selectorConditionDTOs);
        // update upstream        updateDivideUpstream(selectorDO);        return selectorCount;    }            // ......    }

In the Service class to persist data, i.e. to the database, this should be familiar, not expand. The update upstream operation is analyzed in the corresponding section below, focusing on the publish event operation, which performs data synchronization.

The logic of the publishEvent() method is to find the plugin corresponding to the selector, build the conditional data, and publish the change data.

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        // find plugin of selector        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        // build condition data        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // publish event        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

Change data released by eventPublisher.PublishEvent() is complete, the eventPublisher object is a ApplicationEventPublisher class, The fully qualified class name is org.springframework.context.ApplicationEventPublisher. Here we see that publishing data is done through Spring related functionality.

ApplicationEventPublisher

When a state change, the publisher calls ApplicationEventPublisher of publishEvent method to release an event, Spring container broadcast event for all observers, The observer's onApplicationEvent method is called to pass the event object to the observer. There are two ways to call publishEvent method, one is to implement the interface by the container injection ApplicationEventPublisher object and then call the method, the other is a direct call container, the method of two methods of publishing events not too big difference.

  • ApplicationEventPublisher: publish event;
  • ApplicationEvent: Spring event, record the event source, time, and data;
  • ApplicationListener: event listener, observer.

In Spring event publishing mechanism, there are three objects,

An object is a publish event ApplicationEventPublisher, in ShenYu through the constructor in the injected a eventPublisher.

The other object is ApplicationEvent , inherited from ShenYu through DataChangedEvent, representing the event object.

public class DataChangedEvent extends ApplicationEvent {//......}

The last object is ApplicationListener in ShenYu in through DataChangedEventDispatcher class implements this interface, as the event listener, responsible for handling the event object.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

2.3 Dispatch Data#

  • DataChangedEventDispatcher.onApplicationEvent()

Released when the event is completed, will automatically enter the DataChangedEventDispatcher class onApplicationEvent() method of handling events.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * This method is called when there are data changes   * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // Iterate through the data change listener (usually using a data synchronization approach is fine)      for (DataChangedListener listener : listeners) {            // What kind of data has changed        switch (event.getGroupKey()) {                case APP_AUTH: // app auth data                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // plugin data                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // rule data                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // selector data                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // metadata                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // Other types throw exception                  throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    }

When there is a data change, the onApplicationEvent method is called and all the data change listeners are iterated to determine the data type and handed over to the appropriate data listener for processing.

ShenYu groups all the data into five categories: APP_AUTH, PLUGIN, RULE, SELECTOR and META_DATA.

Here the data change listener (DataChangedListener) is an abstraction of the data synchronization policy. Its concrete implementation is:

These implementation classes are the synchronization strategies currently supported by ShenYu:

  • WebsocketDataChangedListener: data synchronization based on Websocket;
  • ZookeeperDataChangedListener:data synchronization based on Zookeeper;
  • ConsulDataChangedListener: data synchronization based on Consul;
  • EtcdDataDataChangedListener:data synchronization based on etcd;
  • HttpLongPollingDataChangedListener:data synchronization based on http long polling;
  • NacosDataChangedListener:data synchronization based on nacos;

Given that there are so many implementation strategies, how do you decide which to use?

Because this paper is based on websocket data synchronization source code analysis, so here to WebsocketDataChangedListener as an example, the analysis of how it is loaded and implemented.

A global search in the source code project shows that its implementation is done in the DataSyncConfiguration class.

/** * Data Sync Configuration * By springboot conditional assembly * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {     /**     * The WebsocketListener(default strategy).     */    @Configuration    @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)    @EnableConfigurationProperties(WebsocketSyncProperties.class)    static class WebsocketListener {
        /**         * Config event listener data changed listener.         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(WebsocketDataChangedListener.class)        public DataChangedListener websocketDataChangedListener() {            return new WebsocketDataChangedListener();        }
        /**         * Websocket collector.         * Websocket collector class: establish a connection, send a message, close the connection and other operations         * @return the websocket collector         */        @Bean        @ConditionalOnMissingBean(WebsocketCollector.class)        public WebsocketCollector websocketCollector() {            return new WebsocketCollector();        }
        /**         * Server endpoint exporter          *         * @return the server endpoint exporter         */        @Bean        @ConditionalOnMissingBean(ServerEndpointExporter.class)        public ServerEndpointExporter serverEndpointExporter() {            return new ServerEndpointExporter();        }    }        //......}

This configuration class is implemented through the SpringBoot conditional assembly class. The WebsocketListener class has several annotations:

  • @Configuration: Configuration file, application context;

  • @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true): attribute condition. The configuration class takes effect only when the condition is met. That is, when we have the following configuration, websocket is used for data synchronization. Note, however, the matchIfMissing = true attribute, which means that this configuration class will work if you don't have the following configuration. Data synchronization based on webSocket is officially recommended and the default.

    shenyu:    sync:    websocket:      enabled: true
  • @EnableConfigurationProperties:enable configuration properties;

When we take the initiative to configuration, use the websocket data synchronization, WebsocketDataChangedListener is generated. So in the event handler onApplicationEvent(), it goes to the corresponding listener. In our case, a selector is to increase the new data, the data by adopting the websocket, so, the code will enter the WebsocketDataChangedListener selector data change process.

    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // Iterate through the data change listener (usually using a data synchronization approach is fine)        for (DataChangedListener listener : listeners) {            // What kind of data has changed             switch (event.getGroupKey()) {                                    // other logic is omitted              case SELECTOR:   // selector data                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // WebsocketDataChangedListener handle selector data                    break;         }    }

2.4 Websocket Data Changed Listener#

  • WebsocketDataChangedListener.onSelectorChanged()

In the onSelectorChanged() method, the data is encapsulated into WebsocketData and then sent via webSocketCollector.send().

    // selector data has been updated    @Override    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {        // build WebsocketData         WebsocketData<SelectorData> websocketData =                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);        // websocket send data        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);    }

2.5 Websocket Send Data#

  • WebsocketCollector.send()

In the send() method, the type of synchronization is determined and processed according to the different types.

@Slf4j@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)public class WebsocketCollector {    /**     * Send.     *     * @param message the message     * @param type    the type     */    public static void send(final String message, final DataEventTypeEnum type) {        if (StringUtils.isNotBlank(message)) {            // If it's MYSELF (first full synchronization)          if (DataEventTypeEnum.MYSELF == type) {                // get the session from ThreadLocal            Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);                if (session != null) {                    // send full data to the session                   sendMessageBySession(session, message);                }            } else {                // subsequent incremental synchronization                // synchronize change data to all sessions               SESSION_SET.forEach(session -> sendMessageBySession(session, message));            }        }    }
    private static void sendMessageBySession(final Session session, final String message) {        try {            // The message is sent through the Websocket session           session.getBasicRemote().sendText(message);        } catch (IOException e) {            log.error("websocket send result is exception: ", e);        }    }}

The example we give is a new operation, an incremental synchronization, so it goes

SESSION_SET.forEach(session -> sendMessageBySession(session, message));

then through

session.getBasicRemote().sendText(message);

the data was sent out.

At this point, when data changes occur on the admin side, the changed data is increments sent to the gateway through the WebSocket.

At this point, do you have any questions? For example, where does session come from? How does the gateway establish a connection with admin?

Don't worry, let's do the synchronization analysis on the gateway side.

However, before continuing with the source code analysis, let's use a diagram to string together the above analysis process.

3. Gateway Data Sync#

Assume ShenYu gateway is already in normal operation, using the data synchronization mode is also websocket. How does the gateway receive and process new selector data from admin and send it to the gateway via WebSocket? Let's continue our source code analysis to find out.

3.1 WebsocketClient Accept Data#

  • ShenyuWebsocketClient.onMessage()

There is a ShenyuWebsocketClient class on the gateway, which inherits from WebSocketClient and can establish a connection and communicate with WebSocket.

public final class ShenyuWebsocketClient extends WebSocketClient {  // ......}

After sending data via websocket on the admin side, ShenyuWebsocketClient can receive data via onMessage() and then process it itself.

public final class ShenyuWebsocketClient extends WebSocketClient {      // execute after receiving the message    @Override    public void onMessage(final String result) {        // handle accept data        handleResult(result);    }        private void handleResult(final String result) {        // data deserialization        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);        // which data types, plug-ins, selectors, rules...        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());        // which operation type, update, delete...              String eventType = websocketData.getEventType();        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // handle data        websocketDataHandler.executor(groupEnum, json, eventType);    }}

After receiving the data, first has carried on the deserialization operation, read the data type and operation type, then hand over to websocketDataHandler.executor() for processing.

3.2 Execute Websocket Data Handler#

  • WebsocketDataHandler.executor()

A Websocket data handler is created in factory mode, providing one handler for each data type:

plugin --> PluginDataHandler;

selector --> SelectorDataHandler;

rule --> RuleDataHandler;

auth --> AuthDataHandler;

metadata --> MetaDataHandler.


/** * Create Websocket data handlers through factory mode * The type Websocket cache handler. */public class WebsocketDataHandler {
    private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
    /**     * Instantiates a new Websocket data handler.     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers  the meta data subscribers     * @param authDataSubscribers  the auth data subscribers     */    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,                                final List<MetaDataSubscriber> metaDataSubscribers,                                final List<AuthDataSubscriber> authDataSubscribers) {        // plugin --> PluginDataHandler        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));        // selector --> SelectorDataHandler        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));        // rule --> RuleDataHandler        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));        // auth --> AuthDataHandler        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));        // metadata --> MetaDataHandler        ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));    }
    /**     * Executor.     *     * @param type      the type     * @param json      the json     * @param eventType the event type     */    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {        // find the corresponding data handler based on the data type        ENUM_MAP.get(type).handle(json, eventType);    }}

Different data types have different ways of handling data, so there are different implementation classes. But they also have the same processing logic between them, so they can be implemented through the template approach to design patterns. The same logic is placed in the handle() method of the abstract class, and the different logic is handed over to the respective implementation class.

In our case, a new selector is added, so it will be passed to the SelectorDataHandler for data processing.

3.3 Determine the Event Type#

  • AbstractDataHandler.handle()

Implement common logical handling of data changes: invoke different methods based on different operation types.


public abstract class AbstractDataHandler<T> implements DataHandler {
    /**     * Convert list.     * The different logic is implemented by the respective implementation classes     * @param json the json     * @return the list     */    protected abstract List<T> convert(String json);
    /**     * Do refresh.     * The different logic is implemented by the respective implementation classes     * @param dataList the data list     */    protected abstract void doRefresh(List<T> dataList);
    /**     * Do update.     * The different logic is implemented by the respective implementation classes     * @param dataList the data list     */    protected abstract void doUpdate(List<T> dataList);
    /**     * Do delete.     * The different logic is implemented by the respective implementation classes     * @param dataList the data list     */    protected abstract void doDelete(List<T> dataList);
    // General purpose logic, abstract class implementation    @Override    public void handle(final String json, final String eventType) {        List<T> dataList = convert(json);        if (CollectionUtils.isNotEmpty(dataList)) {            DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);            switch (eventTypeEnum) {                case REFRESH:                case MYSELF:                    doRefresh(dataList);  //Refreshes data and synchronizes all data                    break;                case UPDATE:                case CREATE:                    doUpdate(dataList); // Update or create data, incremental synchronization                    break;                case DELETE:                    doDelete(dataList);  // delete data                    break;                default:                    break;            }        }    }}

New selector data, new operation, through switch-case into doUpdate() method.

3.4 Enter the Specific Data Handler#

  • SelectorDataHandler.doUpdate()

/** * The type Selector data handler. */@RequiredArgsConstructorpublic class SelectorDataHandler extends AbstractDataHandler<SelectorData> {
    private final PluginDataSubscriber pluginDataSubscriber;
    //......
    // update data    @Override    protected void doUpdate(final List<SelectorData> dataList) {        dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);    }}

Iterate over the data and enter the onSelectorSubscribe() method.

  • PluginDataSubscriber.onSelectorSubscribe()

It has no additional logic and calls the subscribeDataHandler() method directly. Within methods, there are data types (plugins, selectors, or rules) and action types (update or delete) to perform different logic.

/** * The common plugin data subscriber, responsible for handling all plug-in, selector, and rule information */public class CommonPluginDataSubscriber implements PluginDataSubscriber {    //......     // handle selector data    @Override    public void onSelectorSubscribe(final SelectoData selectorData) {        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);    }            // A subscription data handler that handles updates or deletions of data    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {        Optional.ofNullable(classData).ifPresent(data -> {            // plugin data            if (data instanceof PluginData) {                PluginData pluginData = (PluginData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                     BaseDataCache.getInstance().cachePluginData(pluginData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removePluginData(pluginData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));                }            } else if (data instanceof SelectorData) {  // selector data                SelectorData selectorData = (SelectorData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                    BaseDataCache.getInstance().cacheSelectData(selectorData);                    // If each plugin has its own processing logic, then do it                     Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removeSelectData(selectorData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));                }            } else if (data instanceof RuleData) {  // rule data                RuleData ruleData = (RuleData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                    BaseDataCache.getInstance().cacheRuleData(ruleData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));                } else if (dataType == DataEventTypeEnum.DELETE) { // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removeRuleData(ruleData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));                }            }        });    }    }

Adding a selector will enter the following logic:

// save the data to gateway memoryBaseDataCache.getInstance().cacheSelectData(selectorData);// If each plugin has its own processing logic, then do itOptional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

One is to save the data to the gateway's memory. BaseDataCache is the class that ultimately caches data, implemented in a singleton pattern. The selector data is stored in the SELECTOR_MAP Map. In the subsequent use, also from this data.

public final class BaseDataCache {    // private instance    private static final BaseDataCache INSTANCE = new BaseDataCache();    // private constructor    private BaseDataCache() {    }        /**     * Gets instance.     *  public method     * @return the instance     */    public static BaseDataCache getInstance() {        return INSTANCE;    }        /**      * A Map of the cache selector data     * pluginName -> SelectorData.     */    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();        public void cacheSelectData(final SelectorData selectorData) {        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);    }           /**     * cache selector data.     * @param data the selector data     */    private void selectorAccept(final SelectorData data) {        String key = data.getPluginName();        if (SELECTOR_MAP.containsKey(key)) { // Update operation, delete before insert            List<SelectorData> existList = SELECTOR_MAP.get(key);            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());            resultList.add(data);            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());            SELECTOR_MAP.put(key, collect);        } else {  // Add new operations directly to Map            SELECTOR_MAP.put(key, Lists.newArrayList(data));        }    }    }

Second, if each plugin has its own processing logic, then do it. Through the IDEA editor, you can see that after adding a selector, there are the following plugins and processing. We're not going to expand it here.

After the above source tracing, and through a practical case, in the admin side to add a selector data, will websocket data synchronization process analysis cleared.

Let's use the following figure to concatenate the data synchronization process on the gateway side:

The data synchronization process has been analyzed, but there are still some problems that have not been analyzed, that is, how does the gateway establish a connection with admin?

4. The Gateway Establishes a Websocket Connection with Admin#

  • websocket config

With the following configuration in the gateway configuration file and the dependency introduced, the websocket related service is started.

shenyu:    file:      enabled: true    cross:      enabled: true    dubbo :      parameter: multi    sync:      websocket :  # Use websocket for data synchronization        urls: ws://localhost:9095/websocket   # websocket address of admin        allowOrigin: ws://localhost:9195

Add a dependency on websocket in the gateway.

<!--shenyu data sync start use websocket--><dependency>    <groupId>org.apache.shenyu</groupId>    <artifactId>shenyu-spring-boot-starter-sync-data-websocket</artifactId>    <version>${project.version}</version></dependency>
  • Websocket Data Sync Config

The associated bean is created by conditional assembly of springboot. In the gateway started, if we configure the shenyu.sync.websocket.urls, then websocket data synchronization configuration will be loaded. The dependency loading is done through the springboot starter.


/** * WebsocketSyncDataService * Conditional injection is implemented through SpringBoot * Websocket sync data configuration for spring boot. */@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {
    /**     * Websocket sync data service.     * @param websocketConfig   the websocket config     * @param pluginSubscriber the plugin subscriber     * @param metaSubscribers   the meta subscribers     * @param authSubscribers   the auth subscribers     * @return the sync data service     */    @Bean    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        log.info("you use websocket sync shenyu data.......");        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));    }
    /**     * Config websocket config.     *     * @return the websocket config     */    @Bean    @ConfigurationProperties(prefix = "shenyu.sync.websocket")    public WebsocketConfig websocketConfig() {        return new WebsocketConfig();      }}

Start a new spring.factories file in the resources/META-INF directory of your project and specify the configuration classes in the file.

  • WebsocketSyncDataService

The following things are done in 'WebsocketSyncDataService' :

  • Read configuration urls, which represent the admin side of the synchronization address, if there are more than one, use "," split;

  • Create a scheduling thread pool, with each admin assigned one to perform scheduled tasks;

  • Create ShenyuWebsocketClient, assign one to each admin, set up websocket communication with admin;

  • Start connection with admin end websocket;

  • Executes a scheduled task every 10 seconds. The main function is to determine whether the websocket connection has been disconnected, if so, try to reconnect. If not, a ping-pong test is performed.


/** * Websocket sync data service. */@Slf4jpublic class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
    private final List<WebSocketClient> clients = new ArrayList<>();
    private final ScheduledThreadPoolExecutor executor;
    /**     * Instantiates a new Websocket sync cache.     * @param websocketConfig      the websocket config     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers  the meta data subscribers     * @param authDataSubscribers  the auth data subscribers     */    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,                                    final PluginDataSubscriber pluginDataSubscriber,                                    final List<MetaDataSubscriber> metaDataSubscribers,                                    final List<AuthDataSubscriber> authDataSubscribers) {        // If there are multiple synchronization addresses on the admin side, use commas (,) to separate them        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");        // Create a scheduling thread pool, one for each admin        executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));        for (String url : urls) {            try {                //Create a WebsocketClient and assign one to each admin                clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));            } catch (URISyntaxException e) {                log.error("websocket url({}) is error", url, e);            }        }        try {            for (WebSocketClient client : clients) {                // Establish a connection with the WebSocket Server                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);                if (success) {                    log.info("websocket connection is successful.....");                } else {                    log.error("websocket connection is error.....");                }
                // Run a scheduled task every 10 seconds                // The main function is to check whether the WebSocket connection is disconnected. If the connection is disconnected, retry the connection.                // If it is not disconnected, the ping-pong test is performed                executor.scheduleAtFixedRate(() -> {                    try {                        if (client.isClosed()) {                            boolean reconnectSuccess = client.reconnectBlocking();                            if (reconnectSuccess) {                                log.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());                            } else {                                log.error("websocket reconnection server[{}] is error.....", client.getURI().toString());                            }                        } else {                            client.sendPing();                            log.debug("websocket send to [{}] ping message successful", client.getURI().toString());                        }                    } catch (InterruptedException e) {                        log.error("websocket connect is error :{}", e.getMessage());                    }                }, 10, 10, TimeUnit.SECONDS);            }            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/        } catch (InterruptedException e) {            log.info("websocket connection...exception....", e);        }
    }
    @Override    public void close() {        // close websocket client        for (WebSocketClient client : clients) {            if (!client.isClosed()) {                client.close();            }        }        // close threadpool        if (Objects.nonNull(executor)) {            executor.shutdown();        }    }}
  • ShenyuWebsocketClient

The WebSocket client created in ShenYu to communicate with the admin side. After the connection is successfully established for the first time, full data is synchronized and incremental data is subsequently synchronized.


/** * The type shenyu websocket client. */@Slf4jpublic final class ShenyuWebsocketClient extends WebSocketClient {        private volatile boolean alreadySync = Boolean.FALSE;        private final WebsocketDataHandler websocketDataHandler;        /**     * Instantiates a new shenyu websocket client.     * @param serverUri             the server uri       * @param pluginDataSubscriber the plugin data subscriber      * @param metaDataSubscribers   the meta data subscribers      * @param authDataSubscribers   the auth data subscribers      */    public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {        super(serverUri);        this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);    }
    // Execute after the connection is successfully established    @Override    public void onOpen(final ServerHandshake serverHandshake) {        // To prevent re-execution when reconnecting, use alreadySync to determine        if (!alreadySync) {            // Synchronize all data, type MYSELF            send(DataEventTypeEnum.MYSELF.name());            alreadySync = true;        }    }
    // Execute after receiving the message    @Override    public void onMessage(final String result) {        // handle data        handleResult(result);    }        // Execute after shutdown    @Override    public void onClose(final int i, final String s, final boolean b) {        this.close();    }        // Execute after error    @Override    public void onError(final Exception e) {        this.close();    }        @SuppressWarnings("ALL")    private void handleResult(final String result) {        // Data deserialization        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);        // Which data types, plugins, selectors, rules...        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());        // Which operation type, update, delete...        String eventType = websocketData.getEventType();        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // handle data        websocketDataHandler.executor(groupEnum, json, eventType);    }}

5. Summary#

This paper through a practical case, the data synchronization principle of websocket source code analysis. The main knowledge points involved are as follows:

  • WebSocket supports bidirectional communication and has good performance. It is recommended.

  • Complete event publishing and listening via Spring;

  • Support multiple synchronization strategies through abstract DataChangedListener interface, interface oriented programming;

  • Use factory mode to create WebsocketDataHandler to handle different data types;

  • Implement AbstractDataHandler using template method design patterns to handle general operation types;

  • Use singleton design pattern to cache data class BaseDataCache;

  • Loading of configuration classes via conditional assembly of SpringBoot and starter loading mechanism.

SPI Source Code Analysis

· 18 min read

Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.

background#

Recently,when I read the source code of open source project Apache Shenyu API gateway,I find and many core components of the gateway are loaded with the SPI module. Here I will analyzes the source code of SPI module in Shenyu gateway.

what is SPI#

SPI means 'Service Provider Interface', which is a dynamic Service discovery mechanism. We can dynamically load the implementation class of the Interface based on the runtime of the Interface (that is, a development mode of Interface programming + strategy pattern + configuration file) with it. The most common is the built-in database Driver interface 'java.sql.Driver' in JDK. Different vendors can implement this interface differently. For example, 'MySQL' ('com.mysql.jdbc.Driver' in the 'MySQL' Driver package),' PostgreSQL' ('org.postgresql.driver' in the 'PostgreSQL' Driver package), etc.

spi-jdk-api-diagram

The JDK's built-in 'SPI' can be used as follows:

  • In the 'META-INF/services' directory of the classpath, create a file named with the fully qualified name of the interface (essentially a 'properties' file) whose implementation classes you want the SPI loader to load , for example if you want to load the SQL driver implementation classes mentioned above then create a file named 'java.sql.Driver' since those classes implement the 'java.sql.driver' interface.

  • In this file we can add entries for all the specific implementations of the interface . For example for the above driver class scenario we would add entries as shown in below code snippet in the file META-INF/services/java.sql.Driver

# content of file META-INF/services/java.sql.Drivercom.mysql.jdbc.Driverorg.postgresql.Driver
  • Finally load the file with 'java.util.ServiceLoader' to instantiate the corresponding implementation class of the interface

ServiceLoader<Driver> loader = ServiceLoader.load(Driver.class)

The underlying implementation involves classloading, the parent delegate model, and so on, which I won't expand here. Based on this design idea, many mainstream frameworks self-implemented a set of 'SPI' extension, such as 'Dubbo SPI' extension module, which would read the 'META-INF/services/dubbo' directory file content in the classppath for class loading. The 'shenyu-spi' module also follows this similar design idea.

source code of shenyu-spi#

The 'shenyu-spi' module is very concise, and the code structure is as follows:

- shenyu-spi[module]  - org.apache.shenyu.spi[package]    -- ExtensionFactory    -- ExtensionLoader    -- Join    -- SPI    -- SpiExtensionFactory

这些类功能如下:

  • 'ExtensionFactory' : 'SPI' loader Factory, used to load an 'ExtensionLoader' instance based on the 'SPI' mechanism and to obtain the default 'SPI' identity implementation based on the 'ExtensionLoader' instance
  • 'SpiExtensionFactory' : is an implementation of 'ExtensionFactory'
  • 'SPI' : identification annotation, used to identify 'SPI', used on the interface
  • 'Join' : identification annotation, used on the implementation class, used to identify the class joining the SPI system
  • 'ExtensionLoader' : 'SPI' loader, analogous to 'java.util.ServiceLoader', used to load the implementation class of the interface in 'SPI'

@SPI#

org.apache.shenyu.spi.SPI is an identification annotation which is used for identifying an interface as a 'SPI' interface.That is, only interfaces that use '@SPI' annotation can be loaded by 'shenyu-spi'. The class's annotation describes the implementation of Apache Dubbo, a reference to all the SPI systems (which makes sense, since the SPI extension is already a mature scheme with much the same implementation). This annotation has only one method:

@Documented@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)public @interface SPI {
    /**     * Value string.     *     * @return the string     */    String value() default "";}

The unique 'value()' method is used to specify the default 'SPI' implementation (optional), as will be shown later when analyzing 'ExtensionLoader'.

@Join#

org.apache.shenyu.spi.Join is an identification annotation too. When this annotation is used on a class it specifies that the class contains 'SPI' implementation and to indicate that the class is added to the SPI system and can be loaded by ExtensionLoader. This annotation has two methods:

@Documented@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)public @interface Join {        /**     * It will be sorted according to the current serial number..     * @return int.     */    int order() default 0;
    /**     * Indicates that the object joined by @Join is a singleton,     * otherwise a completely new instance is created each time.     * @return true or false.     */    boolean isSingleton() default true;}

The unique 'order()' method is used to specify the specific sequence number. If a single interface that annotated with '@SPI' has multiple implementation classes that annotated with '@Join', the sequence number determines the order of these implementation class instances (the smaller one comes first).

The isSingleton() method indicates whether the class that the implementation class is a singleton class or not. That is if it is a singleton class it will be instantiated only once else it will create a new instance everytime .

ExtensionLoader#

'ExtensionLoader' is the core of the 'SPI' module. Look at it's attributes first:

public final class ExtensionLoader<T> {        // SLF4J日志句柄    private static final Logger LOG = LoggerFactory.getLogger(ExtensionLoader.class);        // SPI配置文件基于类路径下的相对目录    private static final String SHENYU_DIRECTORY = "META-INF/shenyu/";        // @SPI标识接口类型 -> ExtensionLoader实例的缓存 => 注意这个是一个全局的静态变量    private static final Map<Class<?>, ExtensionLoader<?>> LOADERS = new ConcurrentHashMap<>();        // 当前@SPI标识接口类型    private final Class<T> clazz;        // 类加载器实例    private final ClassLoader classLoader;        // 当前ExtensionLoader缓存的已加载的实现类信息,使用值持有器包装,是一个HashMap,映射关系:实现类别名 -> 实现类信息    private final Holder<Map<String, ClassEntity>> cachedClasses = new Holder<>();        // 当前ExtensionLoader缓存的已加载的实现类实例的值包装器,使用值持有器包装,映射关系:实现类别名 -> 值持有器包装的实现类实体    private final Map<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();        // 当前ExtensionLoader缓存的已加载的实现类实例,使用值持有器包装,映射关系:实现类类型 -> 实现类实体    private final Map<Class<?>, Object> joinInstances = new ConcurrentHashMap<>();        // 缓存默认名称,来源于@SPI注解的value()方法非空白返回值,用于加载默认的接口实现    private String cachedDefaultName;        // Holder比较器,按照Holder的order降序,也就是顺序号小的排在前面    private final Comparator<Holder<Object>> holderComparator = Comparator.comparing(Holder::getOrder);        // ClassEntity比较器,按照ClassEntity的order降序,也就是顺序号小的排在前面    private final Comparator<ClassEntity> classEntityComparator = Comparator.comparing(ClassEntity::getOrder);        // 暂时省略其他代码
    // 值持有器,简单VO,用来存储泛型值和值加载顺序    public static class Holder<T> {                // 这里的值引用是volatile修饰,便于某线程更变另一线程马上读到最新的值        private volatile T value;                private Integer order;
        private boolean isSingleton;                // 省略setter和getter代码    }        // 类实体,主要存放加载的实现类的信息    static final class ClassEntity {                // 名称,这里是指SPI实现类的别名,不是类名        private String name;                // 加载顺序号        private Integer order;
        private Boolean isSingleton;                // SPI实现类        private Class<?> clazz;
        private ClassEntity(final String name, final Integer order, final Class<?> clazz, final boolean isSingleton) {            this.name = name;            this.order = order;            this.clazz = clazz;            this.isSingleton = isSingleton;        }                // 省略setter和getter代码    }}

After analyzing the attributes, it is not difficult to find the following points:

  • 'ExtensionLoader' There will be a global static cache 'LOADERS' to cache already created instances of 'ExtensionLoader' to prevent the performance overhead of repeated creation
  • Each '@SPI' interface that is loaded using 'ExtensionLoader' generates a new instance of 'ExtensionLoader'
  • '@SPI' interfaces that have multiple implementations are eventually acquired in order

Then look at it's constructors and static factory methods:

// 私有构造函数,需要入参为@SPI标识的接口类型和类加载器实例private ExtensionLoader(final Class<T> clazz, final ClassLoader cl) {    // 成员变量clazz赋值    this.clazz = clazz;    // 成员变量classLoader赋值    this.classLoader = cl;    // 这里对于非ExtensionFactory接口类型会懒加载一个用于加载ExtensionFactory的ExtensionLoader    if (!Objects.equals(clazz, ExtensionFactory.class)) {        ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getExtensionClassesEntity();    }}
// 实例化getExtensionLoader,静态工厂方法,需要入参为@SPI标识的接口类型和类加载器实例public static <T> ExtensionLoader<T> getExtensionLoader(final Class<T> clazz, final ClassLoader cl) {    // 前缀校验,接口类型必须非空并且必须存在@SPI注解,否则抛出异常中断    Objects.requireNonNull(clazz, "extension clazz is null");    if (!clazz.isInterface()) {        throw new IllegalArgumentException("extension clazz (" + clazz + ") is not interface!");    }    if (!clazz.isAnnotationPresent(SPI.class)) {        throw new IllegalArgumentException("extension clazz (" + clazz + ") without @" + SPI.class + " Annotation");    }    // 从缓存LOADERS中加载ExtensionLoader实例,不存在则创建,典型的懒加载模式    ExtensionLoader<T> extensionLoader = (ExtensionLoader<T>) LOADERS.get(clazz);    if (Objects.nonNull(extensionLoader)) {        return extensionLoader;    }    LOADERS.putIfAbsent(clazz, new ExtensionLoader<>(clazz, cl));    return (ExtensionLoader<T>) LOADERS.get(clazz);}
// 实例化getExtensionLoader,静态工厂方法,需要入参为@SPI标识的接口类型,使用ExtensionLoader类的类加载器public static <T> ExtensionLoader<T> getExtensionLoader(final Class<T> clazz) {    return getExtensionLoader(clazz, ExtensionLoader.class.getClassLoader());}

'ExtensionLoader' uses a private constructor, static factory methods, and lazy loading mode. Class loading is not triggered after initializing 'ExtensionLoader'. The actual scanning and loading is delayed until the 'getJoin' series methods are called, where the code is swept and all the method call chains that implement class information are loaded:

// 加载所有扩展类信息,这里采用了DCL(双重锁校验)防止并发加载private Map<String, ClassEntity> getExtensionClassesEntity() {    // 缓存不存在    Map<String, ClassEntity> classes = cachedClasses.getValue();    if (Objects.isNull(classes)) {        // 加锁后再检查一次缓存        synchronized (cachedClasses) {            classes = cachedClasses.getValue();            if (Objects.isNull(classes)) {                // 最终确认缓存不存在,则进行加载,并且标记顺序号为0                classes = loadExtensionClass();                cachedClasses.setValue(classes);                cachedClasses.setOrder(0);            }        }    }    return classes;}
// 加载当前ExtensionLoader中clazz的所有SPI系统内的实现类private Map<String, ClassEntity> loadExtensionClass() {    SPI annotation = clazz.getAnnotation(SPI.class);    if (Objects.nonNull(annotation)) {        // 这里就是前面提到,如果@SPI注解的value()方法非空白返回值会作为默认实现的别名        // 也就是如果只使用了@SPI,那么就无法获取默认实现        // 如果使用了@SPI("foo"),可以通过别名foo去映射和获取默认实现        String value = annotation.value();        if (StringUtils.isNotBlank(value)) {            cachedDefaultName = value;        }    }    // 初始化一个Hashmap容器用于存储加载的实现类信息,这个变量会透传到下一个方法链    Map<String, ClassEntity> classes = new HashMap<>(16);    // 加载目录中的属性文件    loadDirectory(classes);    return classes;}
// 加载目录中的属性文件,并且加载文件中的实现类,目标目录:META-INF/shenyu/private void loadDirectory(final Map<String, ClassEntity> classes) {    // 文件名 => META-INF/shenyu/$className    String fileName = SHENYU_DIRECTORY + clazz.getName();    try {        // 这里使用类加载器加载文件资源,如果传入的类加载器为空会使用系统类加载器        Enumeration<URL> urls = Objects.nonNull(this.classLoader) ? classLoader.getResources(fileName)                : ClassLoader.getSystemResources(fileName);        // 遍历解析的文件URL集合        if (Objects.nonNull(urls)) {            while (urls.hasMoreElements()) {                URL url = urls.nextElement();                // 通过文件URL加载资源                loadResources(classes, url);            }        }    } catch (IOException t) {        LOG.error("load extension class error {}", fileName, t);    }}
// 加载文件资源,解析文件并且加载实现类存储到classes中private void loadResources(final Map<String, ClassEntity> classes, final URL url) throws IOException {    // 读取URL文件资源,加载到Properties中,每行格式为name=classPath    try (InputStream inputStream = url.openStream()) {        Properties properties = new Properties();        properties.load(inputStream);        properties.forEach((k, v) -> {            String name = (String) k;            String classPath = (String) v;            if (StringUtils.isNotBlank(name) && StringUtils.isNotBlank(classPath)) {                try {                    // 基于name和classPath进行类加载                    loadClass(classes, name, classPath);                } catch (ClassNotFoundException e) {                    throw new IllegalStateException("load extension resources error", e);                }            }        });    } catch (IOException e) {        throw new IllegalStateException("load extension resources error", e);    }}
// 基于name(别名)和classPath(类全限定名称)进行类加载private void loadClass(final Map<String, ClassEntity> classes,                        final String name, final String classPath) throws ClassNotFoundException {    // 类初始化,并且确定实现类必须是当前@SPI注解标识接口的子类    Class<?> subClass = Objects.nonNull(this.classLoader) ? Class.forName(classPath, true, this.classLoader) : Class.forName(classPath);    if (!clazz.isAssignableFrom(subClass)) {        throw new IllegalStateException("load extension resources error," + subClass + " subtype is not of " + clazz);    }    // 实现类必须存在注解@Join    if (!subClass.isAnnotationPresent(Join.class)) {        throw new IllegalStateException("load extension resources error," + subClass + " without @" + Join.class + " annotation");    }    // 如果缓存中不存在同样别名的实现类才进行缓存,已经存在则校验旧的类型和当前实现类型是否一致    ClassEntity oldClassEntity = classes.get(name);    if (Objects.isNull(oldClassEntity)) {        // 创建类信息实体保存别名、顺序号和实现类并且缓存,映射关系:别名 -> 类信息实体        Join joinAnnotation = subClass.getAnnotation(Join.class);        ClassEntity classEntity = new ClassEntity(name, joinAnnotation.order(), subClass);        classes.put(name, classEntity);    } else if (!Objects.equals(oldClassEntity.getClazz(), subClass)) {        throw new IllegalStateException("load extension resources error,Duplicate class " + clazz.getName() + " name "                + name + " on " + oldClassEntity.getClazz().getName() + " or " + subClass.getName());    }}

Processing with the chain of method 'getExtensionClassesEntity - > loadExtensionClass - > loadDirectory - > loadResources - > LoadClass',it will create a mapping of 'alias' to 'implementation class information' for subsequent instantiations, as shown in the 'getJoin()' method:

// 基于别名获取实现类实例public T getJoin(final String name) {    // 别名必须为非空白字符串    if (StringUtils.isBlank(name)) {        throw new NullPointerException("get join name is null");    }    // 这里也使用DCL去cachedInstances缓存中取别名对应的值持有器,值持有器为空则创建    Holder<Object> objectHolder = cachedInstances.get(name);    if (Objects.isNull(objectHolder)) {        cachedInstances.putIfAbsent(name, new Holder<>());        objectHolder = cachedInstances.get(name);    }    Object value = objectHolder.getValue();    if (Objects.isNull(value)) {        synchronized (cachedInstances) {            // 加锁后再次判断值持有器中的值是否存在,不存在的时候则进行实现类实例化            value = objectHolder.getValue();            if (Objects.isNull(value)) {                Holder<T> pair = createExtension(name);                value = pair.getValue();                int order = pair.getOrder();                // 实例化完成后更新值持有器缓存                objectHolder.setValue(value);                objectHolder.setOrder(order);            }        }    }    return (T) value;}
// 基于别名搜索已经加载的实现类信息,并且实例化对应的实现类进行值包装private Holder<T> createExtension(final String name) {    // 加载该@SPI标识接口的所有实现类信息并且获取对应别名的实现类信息    ClassEntity classEntity = getExtensionClassesEntity().get(name);    if (Objects.isNull(classEntity)) {        throw new IllegalArgumentException("name is error");    }    Class<?> aClass = classEntity.getClazz();    // 如果实现类实例缓存中已经存在,则直接封装为值包装器返回,否则进行实例化    Object o = joinInstances.get(aClass);    if (Objects.isNull(o)) {        try {            // 反射实例化并且缓存该实现类实例            joinInstances.putIfAbsent(aClass, aClass.newInstance());            o = joinInstances.get(aClass);        } catch (InstantiationException | IllegalAccessException e) {            throw new IllegalStateException("Extension instance(name: " + name + ", class: "                    + aClass + ")  could not be instantiated: " + e.getMessage(), e);                    }    }    Holder<T> objectHolder = new Holder<>();    objectHolder.setOrder(classEntity.getOrder());    objectHolder.setValue((T) o);    return objectHolder;}

As you can see from the 'createExtension()' method, we end up using reflection to instantiate the implementation class. The reflection method 'newInstance()' requires that the class must provide a no-argument constructor because of an implicit convention: The 'SPI' implementation class must provide a no-argument constructor or the instantiation will fail. The rest methods, such as 'getDefaultJoin()' and 'getJoins()' are uncomplicated extensions of 'getJoin()', so we won't analyze them here. In addition, the 'getJoin()' method uses a multilevel cache:

  • 'cachedInstances' : Search for the corresponding implementation class instance by alias
  • 'joinInstances' : If the alias lookup fails, load all the implementation class information, locate the implementation class type by the alias, and update the' cachedInstances' cache by either finding the implementation class type or creating and caching the implementation class instance

This completes the source code analysis of 'ExtensionLoader'. Here's another example of an 'ExtensionLoader' instance member property memory layout diagram to help you understand:

spi-attr-memory-debug

ExtensionFactory#

'ExtensionFactory' is the factory interface inside the factory pattern, which defines a method to get an instance of the 'SPI' implementation (the default implementation, or the only implementation) :

@SPI("spi")public interface ExtensionFactory {
    /**     * Gets Extension.     *     * @param <T>   the type parameter     * @param key   此参数暂时没有使用,猜测是预留用于映射@SPI的value()     * @param clazz @SPI标识的接口类型     * @return the extension     */    <T> T getExtension(String key, Class<T> clazz);}

Let's look the class 'SpiExtensionFactory' :

@Joinpublic class SpiExtensionFactory implements ExtensionFactory {
    @Override    public <T> T getExtension(final String key, final Class<T> clazz) {        return Optional.ofNullable(clazz)   // 入参clazz非空                .filter(Class::isInterface)  // 入参clazz必须是接口                .filter(cls -> cls.isAnnotationPresent(SPI.class))  // 入参clazz必须被@SPI标识                .map(ExtensionLoader::getExtensionLoader)  // 基于clazz这个接口类型实例化ExtensionLoader                .map(ExtensionLoader::getDefaultJoin)  // 获取该@SPI标识接口的默认实现,不存在则返回NULL                .orElse(null);    }}

It's worth noting here that the 'ExtensionFactory' itself is part of the 'SPI' system. So when using 'ExtensionFactory' you can instantiate it directly:

ExtensionFactory extensionFactory = new SpiExtensionFactory();

It can also be loaded based on an 'ExtensionLoader':

# the content of META-INF/services/shenyu/org.apache.shenyu.spi.ExtensionFactoryspi=org.apache.shenyu.spi.SpiExtensionFactory
# then load it with ExtensionLoaderExtensionFactory extensionFactory = ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getDefaultJoin();

Once you have an 'ExtensionFactory' instance, you can load an instance of its default implementation class based on the '@SPI' interface.

小结#

The 'SPI' extension framework based on the design idea of 'Java' native 'SPI' has the characteristics of loose coupling, high usability and high scalability, a loading instance cache system, concurrency security and other features to fill some defects of 'SPI' in the native 'JDK', Shenyu SPI module is the same. Base on this powerful 'SPI' module, other modules in 'Shenyu' such as the 'Plugin' module can be configured quickly and pluggable, making it easier to load a newly developed Plugin instance.