Skip to main content

PredicateJudge -- analyze the design based on SPI

· 6 min read
Apache ShenYu Contributor

Apache Shenyu has been identified as a gateway application which supports a variety of protocols and microservice frameworks such as Dubbo, gRPC, Spring-Cloud, etc. To do this, the product has accomplished an elegant SPI (Service Provider Interface) as its foundation, and make the Rule data parsing and predicting program very simple , resiliency and security. As to rule data parsing processing, the SPI design increases the product's scalability. When appending new plugin, in most cases, the existing module is enough for rule data parsing , otherwise it can be rapidly carry out with tiny effort.

Top level design of SPI#

In Apache Shenyu, the SPI archtecure is defined in shenyu-spi module and composed of three parts: SPI interface, factory design pattern, and configuration file. There is two interface defined as annotation: @SPI and @Join. When class file with @Join annotation, it means that it will join as an SPI extension class, in other words, it is an application or registration. The @SPI denotes that the class is an SPI extension class.

Fig 1 classes in the shenyu-spi

toplevel-SPI

The SPI configuration directory is META-INF/shenyu/. that is specified:

SHENYU_DIRECTORY = "META-INF/shenyu/";

When starting the gateway system , the ExtensionLoader will scan the profiles under SHENYU_DIRECTORY, in turn, load and validate and then initialize each configed class. The configuration file uses "Key = class-file" format. During operation of the system, the corresponding SPI implementation class will be invoked through the factory mechanism.

Implementation of shenyu-plugin SPI#

In shenyu-plugin module, various plugins for HTTP routing are implemented according to the plugin mechanism, including request, redirect, response and rewrite, etc. Plugins for microservice frameworks such as Dubbo, gRPC , Spring-Cloud and Tars have been developed in the gateway product. And plugins are still increasing. If no such dependent module fo parsing and judge routing parameters and data, each plugin is necessary to implement the parsing functions, and has to frequently modify to support their matching rules, such as wildcard, regular expression, SpEL expression, etc. Therefore, they made a high level abstraction for routing parameter data following the SPI framework in shenyu-plugin module. The rule analysis consists of three parts:

  • ParameterData- parameter data

  • PredicatJudge- predicate whether the actural data match the rule

  • MatchStrategy- combine multiple conditions, the final used strategy

These implementation classes are defined in shenyu-plugin-base module. In each plugin, resolution and predication of the routing parameter can be realized through AbstractShenyuPlugin using the above SPIs. That is dedicated and easy to extend, in line with SOLID principle.

​ This section analyzes the PredictJudge in detail. You can find the dependency to shenyu-spi in the pom.xml of this module.

<dependency>    <groupId>org.apache.shenyu</groupId>    <artifactId>shenyu-spi</artifactId>    <version>${project.version}</version></dependency>

Design of PredicateJudge SPI#

PredicateJudge SPI is used to analyze and judge various routing rules configed in Apache Shenyu gateway. The name and functions of this SPI are similar to Predicate in Java, but the acceptance behavior is further abstracted applying for routing aspect. This SPI is implemented through the Factory pattern. Let's look at the Predictejudge SPI interface:

@SPI@FunctionalInterfacepublic interface PredicateJudge {
    /**     * judge conditionData and realData is match.     *     * @param conditionData {@linkplain ConditionData}     * @param realData       realData     * @return true is pass  false is not pass.     */    Boolean judge(ConditionData conditionData, String realData);}

The class diagram is as follows:

Fig 2-Predicate class diagram

predicate-class-diagram

The important methods of PredicateJudgeFactory are shown as follows:

Whenever need to parsing and matching routing data, you can use

    public static PredicateJudge newInstance(final String operator) {        return ExtensionLoader.getExtensionLoader(PredicateJudge.class).getJoin(processSpecialOperator(operator));    }
    public static Boolean judge(final ConditionData conditionData, final String realData) {        if (Objects.isNull(conditionData) || StringUtils.isBlank(realData)) {            return false;        }        return newInstance(conditionData.getOperator()).judge(conditionData, realData);    }

ConditionData contains of four attributes of String type: paramType, operator,paramName,paramValue

ParamTypeEnum#

Where paramType must be the enumeration type ParamTypeEnum. The default supported paramType are:

post, uri,query, host, ip,header, cookie,req_method

OperatorEnum#

operator must be the enumeration type OperatorEnum, currently supported operators are:

   match, =,regex, >,<, contains, SpEL,  Groovy, TimeBefore,TimeAfter

Base on the above defination , the plugin module provides the following eight PredicateJudge implemetation classes to realize the logic of these operators respectively.

Implementation classLogic descriptioncorespondece operator
ContainsPredicateJudge"contain" relation, the actual data needs contain the specified stringcontains
EqualsPredicateJudgeequals "="=
MatchPredicateJudgeused for URI context path matchingmatch
TimerAfterPredicateJudgeWhether the local time is after the specified timeTimeBefore
TimerBeforePredicateJudgeWhether the local time is before the specified timeTimeAfter
GroovyPredicateJudgeused Groovy syntax toe set ParamName and value dataGroovy
RegexPredicateJudgeused Regex to matchregex

How to use PredicateJudge#

When you want to parse parameters, you only need to call PredicateJudgeFactory as follows.

PredicateJudgeFactory.judge(final ConditionData conditionData, final String realData);

SPI profile#

The implementation class is configed in the file under directory SHENYU_DIRECTORY . It will be loaded and cached at startup.

equals=org.apache.shenyu.plugin.base.condition.judge.EqualsPredicateJudge
contains=org.apache.shenyu.plugin.base.condition.judge.ContainsPredicateJudgeGroovy=org.apache.shenyu.plugin.base.condition.judge.GroovyPredicateJudgematch=org.apache.shenyu.plugin.base.condition.judge.MatchPredicateJudgeregex=org.apache.shenyu.plugin.base.condition.judge.RegexPredicateJudgeSpEL=org.apache.shenyu.plugin.base.condition.judge.SpELPredicateJudgeTimeAfter=org.apache.shenyu.plugin.base.condition.judge.TimerAfterPredicateJudgeTimeBefore=org.apache.shenyu.plugin.base.condition.judge.TimerBeforePredicateJudge

The usage of PredicateJudge SPI in Shenyu gateway#

Most plugins in Apache Shenyu are inherited from AbstractShenyuPlugin. In this abstract class, the filter functions (selection and matching) are achieved through MatchStrategy SPI, and PredicateJudge will be invoked from MatchStrategy to predicate each condition data.

Fig 3- class diagram of plugins with PredicateJudge and MatchStrategy SPI

plugin-SPI-class-diagram

The process from client request calling the routing parsing moodule is showed as following chart.

Fig 4- flow chart for Shenyu gateway filter with parameter processing

SPI-flow-diagram

  • When startup, the system will load SPI classes from profile and cache them.
  • When the client sends a new request to the Shenyu gateway, will call the corresponding plugin within the gateway.
  • When analyzing real data with routing rules, the PredicateJudge implementation class will be invoked according to the contained operator.

Others#

Examples of PredicateJudge judgement#

ContainsPredicateJudge- " contains“ rule#

For example, giving a ConditionData with: paramType="uri", paramValue 是 "/http/**", when using the "contains" relation: ContainsPredicateJudge, the matching result is as follows.

ConditionData (operator="contains")real datajudge result
paramType="uri", "/http/**""/http/**/test"true
"/test/http/**/other"true
"/http1/**"false

About other PredicateJudge implemetantion classes, you can refer to the code and test classes.

Apache ShenYu Start Demo

· 2 min read
Kunshuai Zhu
Apache ShenYu Contributor

Environmental preparation#

  • Install JDK1.8+ locally
  • Install Git locally
  • Install Maven locally
  • Choose a development tool, such as IDEA

Pull ShenYu code#

Use Git to clone code

> git clone https://github.com/apache/incubator-shenyu.git

Compile code#

Compile with Maven

> cd incubator-shenyu> mvn clean install -Dmaven.javadoc.skip=true -B -Drat.skip=true -Djacoco.skip=true -DskipITs -DskipTests

Start the gateway service#

Use development tools, take IDEA as an example.

Start shenyu-admin (use H2 database by default)

start-demo-admin

Start shenyu-bootstrap

start-demo-bootstrap

At this point, shenyu gateway has been activated.

We can open the browser and access the admin console: http://localhost:9095/

Start application service#

Apache ShenYu provides examples for Http, Dubbo, SpringCloud and other applications to access the shenyu gateway, located in the shenyu-example module. Here we take the Http service as an example.

If shenyu-example is not marked as a Maven project by IDEA, you can right-click the pom.xml file in the shenyu-example directory to add it as a Maven project.

start-demo-maven

Start shenyu-examples-http

start-demo-examples-http

At this time, shenyu-examples-http will automatically register the interface method annotated with @ShenyuSpringMvcClient and the related configuration in application.yml to the gateway. When we open the admin console, you can see the relevant configuration in divide and context-path.

Test Http request#

Now use postman to simulate http to request your http service:

start-demo-post-http

Use more plugins#

We can refer to Official Document to use other plugins.

Here is an example of using the param-mapping plugin.

Edit the param-mapping plugin in BasicConfig -> Plugin and set status.

start-demo-plugin

Configure selectors and rules in PluginList -> http process.

start-demo-selector

start-demo-rules

Then use postman to make an http request to /http/test/payment.

start-demo-post-param-mapping

WebSocket Data Synchronization Source Code Analysis

· 22 min read
Apache ShenYu Committer

In ShenYu gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for ZooKeeper, WebSocket, http long poll, Nacos, etcd and Consul. The main content of this article is based on WebSocket data synchronization source code analysis.

This paper based on shenyu-2.4.0 version of the source code analysis, the official website of the introduction of please refer to the Data Synchronization Design .

1. About WebSocket Communication#

The WebSocket protocol was born in 2008 and became an international standard in 2011. It can be two-way communication, the server can take the initiative to push information to the client, the client can also take the initiative to send information to the server. The WebSocket protocol is based on the TCP protocol and belongs to the application layer, with low performance overhead and high communication efficiency. The protocol identifier is ws.

2. Admin Data Sync#

Let's trace the source code from a real case, such as adding a selector data in the background management system:

2.1 Accept Changed Data#

  • SelectorController.createSelector()

Enter the createSelector() method of the SelectorController class, which validates data, adds or updates data, and returns results.

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController {        @PostMapping("")    public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) { // @Valid 数校验        // create or update data        Integer createCount = selectorService.createOrUpdate(selectorDTO);        // return result        return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);    }        // ......}

2.2 Handle Data#

  • SelectorServiceImpl.createOrUpdate()

Convert data in the SelectorServiceImpl class using the createOrUpdate() method, save it to the database, publish the event, update upstream.

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService {    // eventPublisher    private final ApplicationEventPublisher eventPublisher;        @Override    @Transactional(rollbackFor = Exception.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        // build data DTO --> DO        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        // insert or update ?        if (StringUtils.isEmpty(selectorDTO.getId())) {            //  insert into data            selectorCount = selectorMapper.insertSelective(selectorDO);            // insert into condition data            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });            // check selector add            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());                dataPermissionDTO.setDataId(selectorDO.getId());                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));            }
        } else {            // update data, delete and then insert            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        // publish event        publishEvent(selectorDO, selectorConditionDTOs);
        // update upstream        updateDivideUpstream(selectorDO);        return selectorCount;    }            // ......    }

In the Service class to persist data, i.e. to the database, this should be familiar, not expand. The update upstream operation is analyzed in the corresponding section below, focusing on the publish event operation, which performs data synchronization.

The logic of the publishEvent() method is to find the plugin corresponding to the selector, build the conditional data, and publish the change data.

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        // find plugin of selector        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        // build condition data        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // publish event        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

Change data released by eventPublisher.PublishEvent() is complete, the eventPublisher object is a ApplicationEventPublisher class, The fully qualified class name is org.springframework.context.ApplicationEventPublisher. Here we see that publishing data is done through Spring related functionality.

ApplicationEventPublisher

When a state change, the publisher calls ApplicationEventPublisher of publishEvent method to release an event, Spring container broadcast event for all observers, The observer's onApplicationEvent method is called to pass the event object to the observer. There are two ways to call publishEvent method, one is to implement the interface by the container injection ApplicationEventPublisher object and then call the method, the other is a direct call container, the method of two methods of publishing events not too big difference.

  • ApplicationEventPublisher: publish event;
  • ApplicationEvent: Spring event, record the event source, time, and data;
  • ApplicationListener: event listener, observer.

In Spring event publishing mechanism, there are three objects,

An object is a publish event ApplicationEventPublisher, in ShenYu through the constructor in the injected a eventPublisher.

The other object is ApplicationEvent , inherited from ShenYu through DataChangedEvent, representing the event object.

public class DataChangedEvent extends ApplicationEvent {//......}

The last object is ApplicationListener in ShenYu in through DataChangedEventDispatcher class implements this interface, as the event listener, responsible for handling the event object.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

2.3 Dispatch Data#

  • DataChangedEventDispatcher.onApplicationEvent()

Released when the event is completed, will automatically enter the DataChangedEventDispatcher class onApplicationEvent() method of handling events.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * This method is called when there are data changes   * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // Iterate through the data change listener (usually using a data synchronization approach is fine)      for (DataChangedListener listener : listeners) {            // What kind of data has changed        switch (event.getGroupKey()) {                case APP_AUTH: // app auth data                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // plugin data                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // rule data                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // selector data                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // metadata                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // Other types throw exception                  throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    }

When there is a data change, the onApplicationEvent method is called and all the data change listeners are iterated to determine the data type and handed over to the appropriate data listener for processing.

ShenYu groups all the data into five categories: APP_AUTH, PLUGIN, RULE, SELECTOR and META_DATA.

Here the data change listener (DataChangedListener) is an abstraction of the data synchronization policy. Its concrete implementation is:

These implementation classes are the synchronization strategies currently supported by ShenYu:

  • WebsocketDataChangedListener: data synchronization based on Websocket;
  • ZookeeperDataChangedListener:data synchronization based on Zookeeper;
  • ConsulDataChangedListener: data synchronization based on Consul;
  • EtcdDataDataChangedListener:data synchronization based on etcd;
  • HttpLongPollingDataChangedListener:data synchronization based on http long polling;
  • NacosDataChangedListener:data synchronization based on nacos;

Given that there are so many implementation strategies, how do you decide which to use?

Because this paper is based on websocket data synchronization source code analysis, so here to WebsocketDataChangedListener as an example, the analysis of how it is loaded and implemented.

A global search in the source code project shows that its implementation is done in the DataSyncConfiguration class.

/** * Data Sync Configuration * By springboot conditional assembly * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {     /**     * The WebsocketListener(default strategy).     */    @Configuration    @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)    @EnableConfigurationProperties(WebsocketSyncProperties.class)    static class WebsocketListener {
        /**         * Config event listener data changed listener.         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(WebsocketDataChangedListener.class)        public DataChangedListener websocketDataChangedListener() {            return new WebsocketDataChangedListener();        }
        /**         * Websocket collector.         * Websocket collector class: establish a connection, send a message, close the connection and other operations         * @return the websocket collector         */        @Bean        @ConditionalOnMissingBean(WebsocketCollector.class)        public WebsocketCollector websocketCollector() {            return new WebsocketCollector();        }
        /**         * Server endpoint exporter          *         * @return the server endpoint exporter         */        @Bean        @ConditionalOnMissingBean(ServerEndpointExporter.class)        public ServerEndpointExporter serverEndpointExporter() {            return new ServerEndpointExporter();        }    }        //......}

This configuration class is implemented through the SpringBoot conditional assembly class. The WebsocketListener class has several annotations:

  • @Configuration: Configuration file, application context;

  • @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true): attribute condition. The configuration class takes effect only when the condition is met. That is, when we have the following configuration, websocket is used for data synchronization. Note, however, the matchIfMissing = true attribute, which means that this configuration class will work if you don't have the following configuration. Data synchronization based on webSocket is officially recommended and the default.

    shenyu:    sync:    websocket:      enabled: true
  • @EnableConfigurationProperties:enable configuration properties;

When we take the initiative to configuration, use the websocket data synchronization, WebsocketDataChangedListener is generated. So in the event handler onApplicationEvent(), it goes to the corresponding listener. In our case, a selector is to increase the new data, the data by adopting the websocket, so, the code will enter the WebsocketDataChangedListener selector data change process.

    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // Iterate through the data change listener (usually using a data synchronization approach is fine)        for (DataChangedListener listener : listeners) {            // What kind of data has changed             switch (event.getGroupKey()) {                                    // other logic is omitted              case SELECTOR:   // selector data                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // WebsocketDataChangedListener handle selector data                    break;         }    }

2.4 Websocket Data Changed Listener#

  • WebsocketDataChangedListener.onSelectorChanged()

In the onSelectorChanged() method, the data is encapsulated into WebsocketData and then sent via webSocketCollector.send().

    // selector data has been updated    @Override    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {        // build WebsocketData         WebsocketData<SelectorData> websocketData =                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);        // websocket send data        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);    }

2.5 Websocket Send Data#

  • WebsocketCollector.send()

In the send() method, the type of synchronization is determined and processed according to the different types.

@Slf4j@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)public class WebsocketCollector {    /**     * Send.     *     * @param message the message     * @param type    the type     */    public static void send(final String message, final DataEventTypeEnum type) {        if (StringUtils.isNotBlank(message)) {            // If it's MYSELF (first full synchronization)          if (DataEventTypeEnum.MYSELF == type) {                // get the session from ThreadLocal            Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);                if (session != null) {                    // send full data to the session                   sendMessageBySession(session, message);                }            } else {                // subsequent incremental synchronization                // synchronize change data to all sessions               SESSION_SET.forEach(session -> sendMessageBySession(session, message));            }        }    }
    private static void sendMessageBySession(final Session session, final String message) {        try {            // The message is sent through the Websocket session           session.getBasicRemote().sendText(message);        } catch (IOException e) {            log.error("websocket send result is exception: ", e);        }    }}

The example we give is a new operation, an incremental synchronization, so it goes

SESSION_SET.forEach(session -> sendMessageBySession(session, message));

then through

session.getBasicRemote().sendText(message);

the data was sent out.

At this point, when data changes occur on the admin side, the changed data is increments sent to the gateway through the WebSocket.

At this point, do you have any questions? For example, where does session come from? How does the gateway establish a connection with admin?

Don't worry, let's do the synchronization analysis on the gateway side.

However, before continuing with the source code analysis, let's use a diagram to string together the above analysis process.

3. Gateway Data Sync#

Assume ShenYu gateway is already in normal operation, using the data synchronization mode is also websocket. How does the gateway receive and process new selector data from admin and send it to the gateway via WebSocket? Let's continue our source code analysis to find out.

3.1 WebsocketClient Accept Data#

  • ShenyuWebsocketClient.onMessage()

There is a ShenyuWebsocketClient class on the gateway, which inherits from WebSocketClient and can establish a connection and communicate with WebSocket.

public final class ShenyuWebsocketClient extends WebSocketClient {  // ......}

After sending data via websocket on the admin side, ShenyuWebsocketClient can receive data via onMessage() and then process it itself.

public final class ShenyuWebsocketClient extends WebSocketClient {      // execute after receiving the message    @Override    public void onMessage(final String result) {        // handle accept data        handleResult(result);    }        private void handleResult(final String result) {        // data deserialization        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);        // which data types, plug-ins, selectors, rules...        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());        // which operation type, update, delete...              String eventType = websocketData.getEventType();        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // handle data        websocketDataHandler.executor(groupEnum, json, eventType);    }}

After receiving the data, first has carried on the deserialization operation, read the data type and operation type, then hand over to websocketDataHandler.executor() for processing.

3.2 Execute Websocket Data Handler#

  • WebsocketDataHandler.executor()

A Websocket data handler is created in factory mode, providing one handler for each data type:

plugin --> PluginDataHandler;

selector --> SelectorDataHandler;

rule --> RuleDataHandler;

auth --> AuthDataHandler;

metadata --> MetaDataHandler.


/** * Create Websocket data handlers through factory mode * The type Websocket cache handler. */public class WebsocketDataHandler {
    private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
    /**     * Instantiates a new Websocket data handler.     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers  the meta data subscribers     * @param authDataSubscribers  the auth data subscribers     */    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,                                final List<MetaDataSubscriber> metaDataSubscribers,                                final List<AuthDataSubscriber> authDataSubscribers) {        // plugin --> PluginDataHandler        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));        // selector --> SelectorDataHandler        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));        // rule --> RuleDataHandler        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));        // auth --> AuthDataHandler        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));        // metadata --> MetaDataHandler        ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));    }
    /**     * Executor.     *     * @param type      the type     * @param json      the json     * @param eventType the event type     */    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {        // find the corresponding data handler based on the data type        ENUM_MAP.get(type).handle(json, eventType);    }}

Different data types have different ways of handling data, so there are different implementation classes. But they also have the same processing logic between them, so they can be implemented through the template approach to design patterns. The same logic is placed in the handle() method of the abstract class, and the different logic is handed over to the respective implementation class.

In our case, a new selector is added, so it will be passed to the SelectorDataHandler for data processing.

3.3 Determine the Event Type#

  • AbstractDataHandler.handle()

Implement common logical handling of data changes: invoke different methods based on different operation types.


public abstract class AbstractDataHandler<T> implements DataHandler {
    /**     * Convert list.     * The different logic is implemented by the respective implementation classes     * @param json the json     * @return the list     */    protected abstract List<T> convert(String json);
    /**     * Do refresh.     * The different logic is implemented by the respective implementation classes     * @param dataList the data list     */    protected abstract void doRefresh(List<T> dataList);
    /**     * Do update.     * The different logic is implemented by the respective implementation classes     * @param dataList the data list     */    protected abstract void doUpdate(List<T> dataList);
    /**     * Do delete.     * The different logic is implemented by the respective implementation classes     * @param dataList the data list     */    protected abstract void doDelete(List<T> dataList);
    // General purpose logic, abstract class implementation    @Override    public void handle(final String json, final String eventType) {        List<T> dataList = convert(json);        if (CollectionUtils.isNotEmpty(dataList)) {            DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);            switch (eventTypeEnum) {                case REFRESH:                case MYSELF:                    doRefresh(dataList);  //Refreshes data and synchronizes all data                    break;                case UPDATE:                case CREATE:                    doUpdate(dataList); // Update or create data, incremental synchronization                    break;                case DELETE:                    doDelete(dataList);  // delete data                    break;                default:                    break;            }        }    }}

New selector data, new operation, through switch-case into doUpdate() method.

3.4 Enter the Specific Data Handler#

  • SelectorDataHandler.doUpdate()

/** * The type Selector data handler. */@RequiredArgsConstructorpublic class SelectorDataHandler extends AbstractDataHandler<SelectorData> {
    private final PluginDataSubscriber pluginDataSubscriber;
    //......
    // update data    @Override    protected void doUpdate(final List<SelectorData> dataList) {        dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);    }}

Iterate over the data and enter the onSelectorSubscribe() method.

  • PluginDataSubscriber.onSelectorSubscribe()

It has no additional logic and calls the subscribeDataHandler() method directly. Within methods, there are data types (plugins, selectors, or rules) and action types (update or delete) to perform different logic.

/** * The common plugin data subscriber, responsible for handling all plug-in, selector, and rule information */public class CommonPluginDataSubscriber implements PluginDataSubscriber {    //......     // handle selector data    @Override    public void onSelectorSubscribe(final SelectoData selectorData) {        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);    }            // A subscription data handler that handles updates or deletions of data    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {        Optional.ofNullable(classData).ifPresent(data -> {            // plugin data            if (data instanceof PluginData) {                PluginData pluginData = (PluginData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                     BaseDataCache.getInstance().cachePluginData(pluginData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removePluginData(pluginData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));                }            } else if (data instanceof SelectorData) {  // selector data                SelectorData selectorData = (SelectorData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                    BaseDataCache.getInstance().cacheSelectData(selectorData);                    // If each plugin has its own processing logic, then do it                     Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removeSelectData(selectorData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));                }            } else if (data instanceof RuleData) {  // rule data                RuleData ruleData = (RuleData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                    BaseDataCache.getInstance().cacheRuleData(ruleData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));                } else if (dataType == DataEventTypeEnum.DELETE) { // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removeRuleData(ruleData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));                }            }        });    }    }

Adding a selector will enter the following logic:

// save the data to gateway memoryBaseDataCache.getInstance().cacheSelectData(selectorData);// If each plugin has its own processing logic, then do itOptional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

One is to save the data to the gateway's memory. BaseDataCache is the class that ultimately caches data, implemented in a singleton pattern. The selector data is stored in the SELECTOR_MAP Map. In the subsequent use, also from this data.

public final class BaseDataCache {    // private instance    private static final BaseDataCache INSTANCE = new BaseDataCache();    // private constructor    private BaseDataCache() {    }        /**     * Gets instance.     *  public method     * @return the instance     */    public static BaseDataCache getInstance() {        return INSTANCE;    }        /**      * A Map of the cache selector data     * pluginName -> SelectorData.     */    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();        public void cacheSelectData(final SelectorData selectorData) {        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);    }           /**     * cache selector data.     * @param data the selector data     */    private void selectorAccept(final SelectorData data) {        String key = data.getPluginName();        if (SELECTOR_MAP.containsKey(key)) { // Update operation, delete before insert            List<SelectorData> existList = SELECTOR_MAP.get(key);            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());            resultList.add(data);            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());            SELECTOR_MAP.put(key, collect);        } else {  // Add new operations directly to Map            SELECTOR_MAP.put(key, Lists.newArrayList(data));        }    }    }

Second, if each plugin has its own processing logic, then do it. Through the IDEA editor, you can see that after adding a selector, there are the following plugins and processing. We're not going to expand it here.

After the above source tracing, and through a practical case, in the admin side to add a selector data, will websocket data synchronization process analysis cleared.

Let's use the following figure to concatenate the data synchronization process on the gateway side:

The data synchronization process has been analyzed, but there are still some problems that have not been analyzed, that is, how does the gateway establish a connection with admin?

4. The Gateway Establishes a Websocket Connection with Admin#

  • websocket config

With the following configuration in the gateway configuration file and the dependency introduced, the websocket related service is started.

shenyu:    file:      enabled: true    cross:      enabled: true    dubbo :      parameter: multi    sync:        websocket :  # Use websocket for data synchronization          urls: ws://localhost:9095/websocket   # websocket address of admin

Add a dependency on websocket in the gateway.

<!--shenyu data sync start use websocket--><dependency>    <groupId>org.apache.shenyu</groupId>    <artifactId>shenyu-spring-boot-starter-sync-data-websocket</artifactId>    <version>${project.version}</version></dependency>
  • Websocket Data Sync Config

The associated bean is created by conditional assembly of springboot. In the gateway started, if we configure the shenyu.sync.websocket.urls, then websocket data synchronization configuration will be loaded. The dependency loading is done through the springboot starter.


/** * WebsocketSyncDataService * Conditional injection is implemented through SpringBoot * Websocket sync data configuration for spring boot. */@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {
    /**     * Websocket sync data service.     * @param websocketConfig   the websocket config     * @param pluginSubscriber the plugin subscriber     * @param metaSubscribers   the meta subscribers     * @param authSubscribers   the auth subscribers     * @return the sync data service     */    @Bean    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        log.info("you use websocket sync shenyu data.......");        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));    }
    /**     * Config websocket config.     *     * @return the websocket config     */    @Bean    @ConfigurationProperties(prefix = "shenyu.sync.websocket")    public WebsocketConfig websocketConfig() {        return new WebsocketConfig();      }}

Start a new spring.factories file in the resources/META-INF directory of your project and specify the configuration classes in the file.

  • WebsocketSyncDataService

The following things are done in 'WebsocketSyncDataService' :

  • Read configuration urls, which represent the admin side of the synchronization address, if there are more than one, use "," split;

  • Create a scheduling thread pool, with each admin assigned one to perform scheduled tasks;

  • Create ShenyuWebsocketClient, assign one to each admin, set up websocket communication with admin;

  • Start connection with admin end websocket;

  • Executes a scheduled task every 10 seconds. The main function is to determine whether the websocket connection has been disconnected, if so, try to reconnect. If not, a ping-pong test is performed.


/** * Websocket sync data service. */@Slf4jpublic class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
    private final List<WebSocketClient> clients = new ArrayList<>();
    private final ScheduledThreadPoolExecutor executor;
    /**     * Instantiates a new Websocket sync cache.     * @param websocketConfig      the websocket config     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers  the meta data subscribers     * @param authDataSubscribers  the auth data subscribers     */    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,                                    final PluginDataSubscriber pluginDataSubscriber,                                    final List<MetaDataSubscriber> metaDataSubscribers,                                    final List<AuthDataSubscriber> authDataSubscribers) {        // If there are multiple synchronization addresses on the admin side, use commas (,) to separate them        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");        // Create a scheduling thread pool, one for each admin        executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));        for (String url : urls) {            try {                //Create a WebsocketClient and assign one to each admin                clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));            } catch (URISyntaxException e) {                log.error("websocket url({}) is error", url, e);            }        }        try {            for (WebSocketClient client : clients) {                // Establish a connection with the WebSocket Server                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);                if (success) {                    log.info("websocket connection is successful.....");                } else {                    log.error("websocket connection is error.....");                }
                // Run a scheduled task every 10 seconds                // The main function is to check whether the WebSocket connection is disconnected. If the connection is disconnected, retry the connection.                // If it is not disconnected, the ping-pong test is performed                executor.scheduleAtFixedRate(() -> {                    try {                        if (client.isClosed()) {                            boolean reconnectSuccess = client.reconnectBlocking();                            if (reconnectSuccess) {                                log.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());                            } else {                                log.error("websocket reconnection server[{}] is error.....", client.getURI().toString());                            }                        } else {                            client.sendPing();                            log.debug("websocket send to [{}] ping message successful", client.getURI().toString());                        }                    } catch (InterruptedException e) {                        log.error("websocket connect is error :{}", e.getMessage());                    }                }, 10, 10, TimeUnit.SECONDS);            }            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/        } catch (InterruptedException e) {            log.info("websocket connection...exception....", e);        }
    }
    @Override    public void close() {        // close websocket client        for (WebSocketClient client : clients) {            if (!client.isClosed()) {                client.close();            }        }        // close threadpool        if (Objects.nonNull(executor)) {            executor.shutdown();        }    }}
  • ShenyuWebsocketClient

The WebSocket client created in ShenYu to communicate with the admin side. After the connection is successfully established for the first time, full data is synchronized and incremental data is subsequently synchronized.


/** * The type shenyu websocket client. */@Slf4jpublic final class ShenyuWebsocketClient extends WebSocketClient {        private volatile boolean alreadySync = Boolean.FALSE;        private final WebsocketDataHandler websocketDataHandler;        /**     * Instantiates a new shenyu websocket client.     * @param serverUri             the server uri       * @param pluginDataSubscriber the plugin data subscriber      * @param metaDataSubscribers   the meta data subscribers      * @param authDataSubscribers   the auth data subscribers      */    public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {        super(serverUri);        this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);    }
    // Execute after the connection is successfully established    @Override    public void onOpen(final ServerHandshake serverHandshake) {        // To prevent re-execution when reconnecting, use alreadySync to determine        if (!alreadySync) {            // Synchronize all data, type MYSELF            send(DataEventTypeEnum.MYSELF.name());            alreadySync = true;        }    }
    // Execute after receiving the message    @Override    public void onMessage(final String result) {        // handle data        handleResult(result);    }        // Execute after shutdown    @Override    public void onClose(final int i, final String s, final boolean b) {        this.close();    }        // Execute after error    @Override    public void onError(final Exception e) {        this.close();    }        @SuppressWarnings("ALL")    private void handleResult(final String result) {        // Data deserialization        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);        // Which data types, plugins, selectors, rules...        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());        // Which operation type, update, delete...        String eventType = websocketData.getEventType();        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // handle data        websocketDataHandler.executor(groupEnum, json, eventType);    }}

5. Summary#

This paper through a practical case, the data synchronization principle of websocket source code analysis. The main knowledge points involved are as follows:

  • WebSocket supports bidirectional communication and has good performance. It is recommended.

  • Complete event publishing and listening via Spring;

  • Support multiple synchronization strategies through abstract DataChangedListener interface, interface oriented programming;

  • Use factory mode to create WebsocketDataHandler to handle different data types;

  • Implement AbstractDataHandler using template method design patterns to handle general operation types;

  • Use singleton design pattern to cache data class BaseDataCache;

  • Loading of configuration classes via conditional assembly of SpringBoot and starter loading mechanism.

ZooKeeper Data Synchronization Source Code Analysis

· 18 min read
Apache ShenYu Committer

Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.

In ShenYu gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for ZooKeeper, WebSocket, http long poll, Nacos, etcd and Consul. The main content of this article is based on WebSocket data synchronization source code analysis.

This paper based on shenyu-2.4.0 version of the source code analysis, the official website of the introduction of please refer to the Data Synchronization Design .

1. About ZooKeeper#

Apache ZooKeeper is a software project of the Apache Software Foundation that provides open source distributed configuration services, synchronization services, and naming registries for large-scale distributed computing. ZooKeeper nodes store their data in a hierarchical namespace, much like a file system or a prefix tree structure. Clients can read and write on nodes and thus have a shared configuration service in this way.

2. Admin Data Sync#

We traced the source code from a real case, such as updating a selector data in the Divide plugin to a weight of 90 in a background administration system:

2.1 Accept Data#

  • SelectorController.createSelector()

Enter the createSelector() method of the SelectorController class, which validates data, adds or updates data, and returns results.

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController {        @PutMapping("/{id}")    public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) {        // set the current selector data ID        selectorDTO.setId(id);        // create or update operation        Integer updateCount = selectorService.createOrUpdate(selectorDTO);        // return result         return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount);    }        // ......}

2.2 Handle Data#

  • SelectorServiceImpl.createOrUpdate()

Convert data in the SelectorServiceImpl class using the createOrUpdate() method, save it to the database, publish the event, update upstream.

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService {    // eventPublisher    private final ApplicationEventPublisher eventPublisher;        @Override    @Transactional(rollbackFor = Exception.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        // build data DTO --> DO        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        // insert or update ?        if (StringUtils.isEmpty(selectorDTO.getId())) {            //  insert into data            selectorCount = selectorMapper.insertSelective(selectorDO);            // insert into condition data            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });            // check selector add            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());                dataPermissionDTO.setDataId(selectorDO.getId());                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));            }
        } else {            // update data, delete and then insert            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        // publish event        publishEvent(selectorDO, selectorConditionDTOs);
        // update upstream        updateDivideUpstream(selectorDO);        return selectorCount;    }        // ......    }

In the Service class to persist data, i.e. to the database, this should be familiar, not expand. The update upstream operation is analyzed in the corresponding section below, focusing on the publish event operation, which performs data synchronization.

The logic of the publishEvent() method is to find the plugin corresponding to the selector, build the conditional data, and publish the change data.

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        // find plugin of selector        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        // build condition data        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // publish event        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

Change data released by eventPublisher.PublishEvent() is complete, the eventPublisher object is a ApplicationEventPublisher class, The fully qualified class name is org.springframework.context.ApplicationEventPublisher. Here we see that publishing data is done through Spring related functionality.

ApplicationEventPublisher

When a state change, the publisher calls ApplicationEventPublisher of publishEvent method to release an event, Spring container broadcast event for all observers, The observer's onApplicationEvent method is called to pass the event object to the observer. There are two ways to call publishEvent method, one is to implement the interface by the container injection ApplicationEventPublisher object and then call the method, the other is a direct call container, the method of two methods of publishing events not too big difference.

  • ApplicationEventPublisher: publish event;
  • ApplicationEvent: Spring event, record the event source, time, and data;
  • ApplicationListener: event listener, observer.

In Spring event publishing mechanism, there are three objects,

An object is a publish event ApplicationEventPublisher, in ShenYu through the constructor in the injected a eventPublisher.

The other object is ApplicationEvent , inherited from ShenYu through DataChangedEvent, representing the event object.

public class DataChangedEvent extends ApplicationEvent {//......}

The last object is ApplicationListener in ShenYu in through DataChangedEventDispatcher class implements this interface, as the event listener, responsible for handling the event object.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

2.3 Dispatch Data#

  • DataChangedEventDispatcher.onApplicationEvent()

Released when the event is completed, will automatically enter the DataChangedEventDispatcher class onApplicationEvent() method of handling events.

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * This method is called when there are data changes   * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // Iterate through the data change listener (usually using a data synchronization approach is fine)      for (DataChangedListener listener : listeners) {            // What kind of data has changed        switch (event.getGroupKey()) {                case APP_AUTH: // app auth data                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // plugin data                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // rule data                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // selector data                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // metadata                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // other types throw exception                  throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    }

When there is a data change, the onApplicationEvent method is called and all the data change listeners are iterated to determine the data type and handed over to the appropriate data listener for processing.

ShenYu groups all the data into five categories: APP_AUTH, PLUGIN, RULE, SELECTOR and META_DATA.

Here the data change listener (DataChangedListener) is an abstraction of the data synchronization policy. Its concrete implementation is:

These implementation classes are the synchronization strategies currently supported by ShenYu:

  • WebsocketDataChangedListener: data synchronization based on Websocket;
  • ZookeeperDataChangedListener:data synchronization based on Zookeeper;
  • ConsulDataChangedListener: data synchronization based on Consul;
  • EtcdDataDataChangedListener:data synchronization based on etcd;
  • HttpLongPollingDataChangedListener:data synchronization based on http long polling;
  • NacosDataChangedListener:data synchronization based on nacos;

Given that there are so many implementation strategies, how do you decide which to use?

Because this paper is based on zookeeper data synchronization source code analysis, so here to ZookeeperDataChangedListener as an example, the analysis of how it is loaded and implemented.

A global search in the source code project shows that its implementation is done in the DataSyncConfiguration class.

/** * Data Sync Configuration * By springboot conditional assembly * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {            /**     * zookeeper data sunc     * The type Zookeeper listener.     */    @Configuration    @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url")  // The condition property is loaded only when it is met    @Import(ZookeeperConfiguration.class)    static class ZookeeperListener {
        /**         * Config event listener data changed listener.         * @param zkClient the zk client         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(ZookeeperDataChangedListener.class)        public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {            return new ZookeeperDataChangedListener(zkClient);        }
        /**         * Zookeeper data init zookeeper data init.         * @param zkClient        the zk client         * @param syncDataService the sync data service         * @return the zookeeper data init         */        @Bean        @ConditionalOnMissingBean(ZookeeperDataInit.class)        public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {            return new ZookeeperDataInit(zkClient, syncDataService);        }    }        // other code is omitted......}

This configuration class is implemented through the SpringBoot conditional assembly class. The ZookeeperListener class has several annotations:

  • @Configuration: Configuration file, application context;

  • @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url"): attribute condition. The configuration class takes effect only when the condition is met. That is, when we have the following configuration, ZooKeeper is used for data synchronization.

    shenyu:    sync:     zookeeper:          url: localhost:2181          sessionTimeout: 5000          connectionTimeout: 2000
  • @Import(ZookeeperConfiguration.class):import ZookeeperConfiguration;

  @EnableConfigurationProperties(ZookeeperProperties.class)  // enable zookeeper properties  public class ZookeeperConfiguration {
    /**     * register zkClient in spring ioc.     * @param zookeeperProp the zookeeper configuration     * @return ZkClient {@linkplain ZkClient}        */      @Bean      @ConditionalOnMissingBean(ZkClient.class)      public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {        return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout()); // 读取zk配置信息,并创建zkClient      }  }
@Data@ConfigurationProperties(prefix = "shenyu.sync.zookeeper") // zookeeper propertiespublic class ZookeeperProperties {
    private String url;
    private Integer sessionTimeout;
    private Integer connectionTimeout;
    private String serializer;}

When we take the initiative to configuration, use the zookeeper data synchronization, zookeeperDataChangedListener is generated. So in the event handler onApplicationEvent(), it goes to the corresponding listener. In our case, it is a selector data update, data synchronization is zookeeper, so, the code will enter the ZookeeperDataChangedListener selector data change process.

    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // Iterate through the data change listener (usually using a data synchronization approach is fine)        for (DataChangedListener listener : listeners) {            // what kind of data has changed         switch (event.getGroupKey()) {                                    // other code logic is omitted                                    case SELECTOR:   // selector data                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // In our case, will enter the ZookeeperDataChangedListener selector data change process                    break;         }    }

2.4 Zookeeper Data Changed Listener#

  • ZookeeperDataChangedListener.onSelectorChanged()

In the onSelectorChanged() method, determine the type of action, whether to refresh synchronization or update or create synchronization. Determine whether the node is in zk based on the current selector data.


/** * use ZooKeeper to publish change data */public class ZookeeperDataChangedListener implements DataChangedListener {        // The selector information changed    @Override    public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {        // refresh        if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {            String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());            deleteZkPathRecursive(selectorParentPath);        }        // changed data        for (SelectorData data : changed) {            // build selector real path            String selectorRealPath = DefaultPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());            // delete            if (eventType == DataEventTypeEnum.DELETE) {                deleteZkPath(selectorRealPath);                continue;            }            // selector parent path            String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(data.getPluginName());            // create parent node            createZkNode(selectorParentPath);            // insert or update data            insertZkNode(selectorRealPath, data);        }    }
    // create zk node    private void createZkNode(final String path) {        // create only if it does not exist        if (!zkClient.exists(path)) {            zkClient.createPersistent(path, true);        }    }
    // insert zk node    private void insertZkNode(final String path, final Object data) {        // create zk node        createZkNode(path);        // write data by zkClient         zkClient.writeData(path, null == data ? "" : GsonUtils.getInstance().toJson(data));    }    }

As long as the changed data is correctly written to the zk node, the admin side of the operation is complete. ShenYu uses zk for data synchronization, zk nodes are carefully designed.

In our current case, updating one of the selector data in the Divide plugin with a weight of 90 updates specific nodes in the graph.

We series the above update flow with a sequence diagram.

3. Gateway Data Sync#

Assume that the ShenYu gateway is already running properly, and the data synchronization mode is also Zookeeper. How does the gateway receive and process the selector data after updating it on the admin side and sending the changed data to ZK? Let's continue our source code analysis to find out.

3.1 ZkClient Accept Data#

  • ZkClient.subscribeDataChanges()

There is a ZookeeperSyncDataService class on the gateway, which subscribing to the data node through ZkClient and can sense when the data changes.

/** * ZookeeperSyncDataService */public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {    private void subscribeSelectorDataChanges(final String path) {       // zkClient subscribe data         zkClient.subscribeDataChanges(path, new IZkDataListener() {            @Override            public void handleDataChange(final String dataPath, final Object data) {                cacheSelectorData(GsonUtils.getInstance().fromJson(data.toString(), SelectorData.class)); // zk node data changed            }
            @Override            public void handleDataDeleted(final String dataPath) {                unCacheSelectorData(dataPath);  // zk node data deleted            }        });    }     // ...}

ZooKeeper's Watch mechanism notifies subscribing clients of node changes. In our case, updating the selector information goes to the handleDataChange() method. cacheSelectorData() is used to process data.

3.2 Handle Data#

  • ZookeeperSyncDataService.cacheSelectorData()

The data is not null, and caching the selector data is again handled by PluginDataSubscriber.

    private void cacheSelectorData(final SelectorData selectorData) {        Optional.ofNullable(selectorData)                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));    }

PluginDataSubscriber is an interface, it is only a CommonPluginDataSubscriber implementation class, responsible for data processing plugin, selector and rules.

3.3 Common Plugin Data Subscriber#

  • PluginDataSubscriber.onSelectorSubscribe()

It has no additional logic and calls the subscribeDataHandler() method directly. Within methods, there are data types (plugins, selectors, or rules) and action types (update or delete) to perform different logic.

/** * The common plugin data subscriber, responsible for handling all plug-in, selector, and rule information */public class CommonPluginDataSubscriber implements PluginDataSubscriber {    //......     // handle selector data    @Override    public void onSelectorSubscribe(final SelectoData selectorData) {        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);    }            // A subscription data handler that handles updates or deletions of data    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {        Optional.ofNullable(classData).ifPresent(data -> {            // plugin data            if (data instanceof PluginData) {                PluginData pluginData = (PluginData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                     BaseDataCache.getInstance().cachePluginData(pluginData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removePluginData(pluginData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));                }            } else if (data instanceof SelectorData) {  // selector data                SelectorData selectorData = (SelectorData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                    BaseDataCache.getInstance().cacheSelectData(selectorData);                    // If each plugin has its own processing logic, then do it                     Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removeSelectData(selectorData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));                }            } else if (data instanceof RuleData) {  // rule data                RuleData ruleData = (RuleData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // update                    // save the data to gateway memory                    BaseDataCache.getInstance().cacheRuleData(ruleData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));                } else if (dataType == DataEventTypeEnum.DELETE) { // delete                    // delete the data from gateway memory                    BaseDataCache.getInstance().removeRuleData(ruleData);                    // If each plugin has its own processing logic, then do it                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));                }            }        });    }    }

3.4 Data cached to Memory#

Adding a selector will enter the following logic:

// save the data to gateway memoryBaseDataCache.getInstance().cacheSelectData(selectorData);// If each plugin has its own processing logic, then do itOptional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

One is to save the data to the gateway's memory. BaseDataCache is the class that ultimately caches data, implemented in a singleton pattern. The selector data is stored in the SELECTOR_MAP Map. In the subsequent use, also from this data.

public final class BaseDataCache {    // private instance    private static final BaseDataCache INSTANCE = new BaseDataCache();    // private constructor    private BaseDataCache() {    }        /**     * Gets instance.     *  public method     * @return the instance     */    public static BaseDataCache getInstance() {        return INSTANCE;    }        /**      * A Map of the cache selector data     * pluginName -> SelectorData.     */    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();        public void cacheSelectData(final SelectorData selectorData) {        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);    }           /**     * cache selector data.     * @param data the selector data     */    private void selectorAccept(final SelectorData data) {        String key = data.getPluginName();        if (SELECTOR_MAP.containsKey(key)) { // Update operation, delete before insert            List<SelectorData> existList = SELECTOR_MAP.get(key);            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());            resultList.add(data);            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());            SELECTOR_MAP.put(key, collect);        } else {  // Add new operations directly to Map            SELECTOR_MAP.put(key, Lists.newArrayList(data));        }    }    }

Second, if each plugin has its own processing logic, then do it. Through the IDEA editor, you can see that after adding a selector, there are the following plugins and processing. We're not going to expand it here.

After the above source tracking, and through a practical case, in the admin end to update a selector data, the ZooKeeper data synchronization process analysis is clear.

Let's series the data synchronization process on the gateway side through the sequence diagram:

The data synchronization process has been analyzed. In order to prevent the synchronization process from being interrupted, other logic is ignored during the analysis. We also need to analyze the process of Admin synchronization data initialization and gateway synchronization operation initialization.

4. Admin Data Sync initialization#

When admin starts, the current data will be fully synchronized to zk, the implementation logic is as follows:


/** * Zookeeper data init */public class ZookeeperDataInit implements CommandLineRunner {
    private final ZkClient zkClient;
    private final SyncDataService syncDataService;
    /**     * Instantiates a new Zookeeper data init.     *     * @param zkClient        the zk client     * @param syncDataService the sync data service     */    public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {        this.zkClient = zkClient;        this.syncDataService = syncDataService;    }
    @Override    public void run(final String... args) {        String pluginPath = DefaultPathConstants.PLUGIN_PARENT;        String authPath = DefaultPathConstants.APP_AUTH_PARENT;        String metaDataPath = DefaultPathConstants.META_DATA;        // Determine whether data exists in zk        if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {            syncDataService.syncAll(DataEventTypeEnum.REFRESH);        }    }}

Check whether there is data in zk, if not, then synchronize.

ZookeeperDataInit implements the CommandLineRunner interface. It is an interface provided by SpringBoot that executes the run() method after all Spring Beans initializations and is often used for initialization operations in a project.

  • SyncDataService.syncAll()

Query data from the database, and then perform full data synchronization, all authentication information, plugin information, selector information, rule information, and metadata information. Synchronous events are published primarily through eventPublisher. After publishing the event via publishEvent(), the ApplicationListener performs the event change operation. In ShenYu is mentioned in DataChangedEventDispatcher.

@Servicepublic class SyncDataServiceImpl implements SyncDataService {    // eventPublisher    private final ApplicationEventPublisher eventPublisher;         /***     * sync all data     * @param type the type     * @return     */    @Override    public boolean syncAll(final DataEventTypeEnum type) {        // app auth data        appAuthService.syncData();        // plugin data        List<PluginData> pluginDataList = pluginService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));        // selector data        List<SelectorData> selectorDataList = selectorService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));        // rule data        List<RuleData> ruleDataList = ruleService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));        // metadata        metaDataService.syncData();        return true;    }    }

5. Gateway Data Sync Init#

The initial operation of data synchronization on the gateway side is mainly the node in the subscription zk. When there is a data change, the changed data will be received. This relies on the Watch mechanism of ZooKeeper. In ShenYu, the one responsible for zk data synchronization is ZookeeperSyncDataService, also mentioned earlier.

The function logic of ZookeeperSyncDataService is completed in the process of instantiation: the subscription to Shenyu data synchronization node in zk is completed. Subscription here is divided into two kinds, one kind is existing node data updated above, through this zkClient.subscribeDataChanges() method; Another kind is under the current node, add or delete nodes change namely child nodes, it through zkClient.subscribeChildChanges() method.

ZookeeperSyncDataService code is a bit too much, here we use plugin data read and subscribe to track, other types of data operation principle is the same.


/** *  zookeeper sync data service */public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {    // At instantiation time, the data is read from the ZK and the node is subscribed    public ZookeeperSyncDataService(/* omit the construction argument */ ) {        this.zkClient = zkClient;        this.pluginDataSubscriber = pluginDataSubscriber;        this.metaDataSubscribers = metaDataSubscribers;        this.authDataSubscribers = authDataSubscribers;        // watch plugin, selector and rule data        watcherData();        // watch app auth data        watchAppAuth();        // watch metadata        watchMetaData();    }        private void watcherData() {        // plugin node path        final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;        // all plugin nodes        List<String> pluginZKs = zkClientGetChildren(pluginParent);        for (String pluginName : pluginZKs) {            // watch plugin, selector, rule data node            watcherAll(pluginName);        }        //subscribing to child nodes (adding or removing a plugin)        zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {            if (CollectionUtils.isNotEmpty(currentChildren)) {                for (String pluginName : currentChildren) {                    // you need to subscribe to all plugin, selector, and rule data for the child node                      watcherAll(pluginName);                }            }        });    }        private void watcherAll(final String pluginName) {        // watch plugin        watcherPlugin(pluginName);        // watch selector        watcherSelector(pluginName);        // watch rule        watcherRule(pluginName);    }
    private void watcherPlugin(final String pluginName) {        // plugin path        String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);        // create if not exist        if (!zkClient.exists(pluginPath)) {            zkClient.createPersistent(pluginPath, true);        }        // read the current node data on zk and deserialize it        PluginData pluginData = null == zkClient.readData(pluginPath) ? null                : GsonUtils.getInstance().fromJson((String) zkClient.readData(pluginPath), PluginData.class);        // cached into gateway memory        cachePluginData(pluginData);        // subscribe plugin data        subscribePluginDataChanges(pluginPath, pluginName);    }       private void cachePluginData(final PluginData pluginData) {    //omit implementation logic, is actually the CommonPluginDataSubscriber operation, can connect with the front    }        private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {        // subscribe data changes        zkClient.subscribeDataChanges(pluginPath, new IZkDataListener() {
            @Override            public void handleDataChange(final String dataPath, final Object data) {  // update                 //omit implementation logic, is actually the CommonPluginDataSubscriber operation, can connect with the front            }
            @Override            public void handleDataDeleted(final String dataPath) {   // delete                  // Omit implementation logic, is actually the CommonPluginDataSubscriber operation, can connect with the front
            }        });    }    }    

The above source code is given comments, I believe you can understand. The main logic for subscribing to plug-in data is as follows:

  1. Create the current plugin path
  2. Create a path if it does not exist
  3. Read the current node data on zK and deserialize it
  4. The plugin data is cached in the gateway memory
  5. Subscribe to the plug-in node

6. Summary#

This paper through a practical case, Zookeeper data synchronization principle source code analysis. The main knowledge points involved are as follows:

  • Data synchronization based on ZooKeeper is mainly implemented through watch mechanism;

  • Complete event publishing and listening via Spring;

  • Support multiple synchronization strategies through abstract DataChangedListener interface, interface oriented programming;

  • Use singleton design pattern to cache data class BaseDataCache;

  • Loading of configuration classes via conditional assembly of SpringBoot and starter loading mechanism.

Code Analysis For Context-Path Plugin

· 3 min read
Kunshuai Zhu
Apache ShenYu Contributor

Before starting, you can refer to this article to start the gateway

Body#

First, look at the ContextPathPlugin#doExecute method, which is the core of this plugin.

protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {    ...    // 1. get the contextMappingHandle from the JVM cache    ContextMappingHandle contextMappingHandle = ContextPathPluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));    ...    // 2. set shenyu context according to contextMappingHandle    buildContextPath(shenyuContext, contextMappingHandle);    return chain.execute(exchange);}
  1. Get the contextMappingHandle from the JVM cache

    The contextMappingHandle here is an instance of the ContextMappingHandle class, which has two member variables: contextPath and addPrefix

    These two variables have appeared in the Rules form in the Admin before, and they are updated when the data is synchronized.

  2. Set shenyu context according to contextMappingHandle

    Below is the source code of the ContextPathPlugin#buildContextPath method

    private void buildContextPath(final ShenyuContext context, final ContextMappingHandle handle) {    String realURI = "";    // 1. set the context path of shenyu, remove the prefix of the real URI according to the length of the contextPath    if (StringUtils.isNoneBlank(handle.getContextPath())) {        context.setContextPath(handle.getContextPath());        context.setModule(handle.getContextPath());        realURI = context.getPath().substring(handle.getContextPath().length());    }    // add prefix    if (StringUtils.isNoneBlank(handle.getAddPrefix())) {        if (StringUtils.isNotBlank(realURI)) {            realURI = handle.getAddPrefix() + realURI;        } else {            realURI = handle.getAddPrefix() + context.getPath();        }    }    context.setRealUrl(realURI);}
    • Set the context path of shenyu, remove the prefix of the real URI according to the length of the contextPath

      You may be wondering whether there is a problem with the so-called "according to the length of the contextPath" here?

      In fact, such a judgment is not a problem, because the request will be processed by the plugin only after it is matched by the Selector and Rules. Therefore, under the premise of setting up Selector and Rules, it is completely possible to meet the needs of converting a specific contextPath.

Then, the ContextPathPlugin class has a more important method skip, part of the code is shown below. We can find: If it is a call to the RPC service, the context_path plugin will be skipped directly.

public Boolean skip(final ServerWebExchange exchange) {    ...    return Objects.equals(rpcType, RpcTypeEnum.DUBBO.getName())            || Objects.equals(rpcType, RpcTypeEnum.GRPC.getName())            || Objects.equals(rpcType, RpcTypeEnum.TARS.getName())            || Objects.equals(rpcType, RpcTypeEnum.MOTAN.getName())            || Objects.equals(rpcType, RpcTypeEnum.SOFA.getName());}

Finally, the context-path plugin has another class ContextPathPluginDataHandler. The function of this class is to subscribe to the data of the plug-in. When the plugin configuration is modified, deleted, or added, the data is modified, deleted, or added to the JVM cache.

LoadBalance SPI Source Code Analysis

· 13 min read
Apache ShenYu Contributor

Gateway applications need to support a variety of load balancing strategies, including random,Hashing, RoundRobin and so on. In Apache Shenyu gateway, it not only realizes such traditional algorithms, but also makes smoother traffic processing for the entry of server nodes through detailed processing such as traffic warm-up, so as to obtain better overall stability. In this article, let's walk through how Apache Shenyu is designed and implemented this part of the function.

This article based on shenyu-2.4.0 version of the source code analysis.

[TOC]

LoadBalance SPI#

The implementation of LoadBalance is in shenyu-plugin-divide module. It has based on its SPI creation mechanism. The core interface code is shown as follows. This interface well explains the concept: load balancing is to select the most appropriate node from a series of server nodes. Routing, traffic processing and load balancing is the basic function of LoadBalance SPI.

@SPIpublic interface LoadBalance {
    /**     * @param upstreamList upstream list     * @param ip ip     * @return divide upstream     */    DivideUpstream select(List<DivideUpstream> upstreamList, String ip);}

Where upstreamList represents the server nodes list available for routing. DivideUpstream is the data structure of server node, the important elements including protocol, upstreamUrl , weight, timestamp, warmup.

public class DivideUpstream implements Serializable {    private String upstreamHost;    /**     * this is http protocol.     */    private String protocol;    private String upstreamUrl;    private int weight;    /**     * false close/ true open.     */    @Builder.Default    private boolean status = true;    private long timestamp;    private int warmup;}

Design of LoadBalance module#

The class diagram of LoadBalance moduleisshown as follows.

loadbalance-class-diagram

We can draw the outline of LoadBalance module from the class diagram:

  1. The abstract class AbstractLoadBalance implements the SPI LoadBalance interface,and supplies the template methods for selection related, such as select(), selector(),and gives the calculation of weight.

  2. Three implementation classes which inherit AbstractLoadBalance to realize their own logic:

    • RandomLoadBalance - Weight Random
    • HashLoadBalance - Consistent Hashing
    • RoundRobinLoadBalance -Weight Round Robin per-packet
  3. The utility class LoadBalanceUtil provides public static method to be called.

    The implementation classes and algorithms are configurable. According to its specification, by adding profile in SHENYU_DIERECTORY directory, the data in profile should be key=value-class format, where the value-class will be load by the Apache Shenyu SPI class loader, and key value should be an name defined in LoadBalanceEnum.

random=org.apache.shenyu.plugin.divide.balance.spi.RandomLoadBalanceroundRobin=org.apache.shenyu.plugin.divide.balance.spi.RoundRobinLoadBalancehash=org.apache.shenyu.plugin.divide.balance.spi.HashLoadBalance

The code of LoadBalanceEnum is as follows:

public enum LoadBalanceEnum {    /**     * Hash load balance enum.     */    HASH(1, "hash", true),
    /**     * Random load balance enum.     */    RANDOM(2, "random", true),
    /**     * Round robin load balance enum.     */    ROUND_ROBIN(3, "roundRobin", true);
    private final int code;    private final String name;    private final boolean support;}

AbstractLoadBalance#

This abstract class implements the LoadBalance interface and define the abstract method doSelect() to be processed by the implementation classes. In the template method select(), It will do validation first then call the doSelect() method.

   /**      * Do select divide upstream.     *     * @param upstreamList the upstream list     * @param ip           the ip     * @return the divide upstream     */    protected abstract DivideUpstream doSelect(List<DivideUpstream> upstreamList, String ip);
    @Override    public DivideUpstream select(final List<DivideUpstream> upstreamList, final String ip) {        if (CollectionUtils.isEmpty(upstreamList)) {            return null;        }        if (upstreamList.size() == 1) {            return upstreamList.get(0);        }        return doSelect(upstreamList, ip);    }

When the timestamp of server node is not null, and the interval between current time and timestamp is within the traffic warm-up time, the formula for weight calculation is. $$ {1-1} ww = min(1,uptime/(warmup/weight)) $$ It can be seen from the formula that the final weight(ww) is proportional to the original-weight value. The closer the time interval is to the warmup time, the greater the final ww. That is, the longer the waiting time of the request, the higher the final weight. When there is no timestamp or other conditions, the ww is equal to the weight value of DivideUpstream object.

The central of thinking about warm-upis to avoid bad performance when adding new server and the new JVMs starting up.

Let's see how the load balancing with Random, Hashing and RoundRobin strategy is implemented.

RandomLoadBalance#

The RandomLoadBalance can handle two situations:

  1. Each node without weight, or every node has the same weight, randomly choose one.
  2. Server Nodes with different weight, choose one randomly by weight.

Following is the random() method of RandomLoadBalance. When traversing server node list, if the randomly generated value is less than the weight of node, then the current node will be chosen. If after one round traversing, there's is no server node match, then it will return the first item of the list. The getWeight(DivideUpstream upstream) is defined in AbstractLoadBalance class.

    private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {        // If the weights are not the same and the weights are greater than 0, then random by the total number of weights        int offset = RANDOM.nextInt(totalWeight);        // Determine which segment the random value falls on        for (DivideUpstream divideUpstream : upstreamList) {            offset -= getWeight(divideUpstream);            if (offset < 0) {                return divideUpstream;            }        }        return upstreamList.get(0);    }

HashLoadBalance#

In HashLoadBalance, it takes the advantages of consistent hashing , that maps both the input traffic and the servers to a unit circle, or name as hash ring. For the requestedip address, with its hash value to find the node closest in clockwise order as the node to be routed. Let's see how consistent hashing is implemented in HashLoadBalance.

As to the hash algorithms, HashLoadBalance uses MD5 hash, which has the advantage of mixing the input in an unpredictable but deterministic way. The output is a 32-bit integer. the code is shown as follows:

private static long hash(final String key) {    // md5 byte    MessageDigest md5;    try {        md5 = MessageDigest.getInstance("MD5");    } catch (NoSuchAlgorithmException e) {        throw new ShenyuException("MD5 not supported", e);    }    md5.reset();    byte[] keyBytes;    keyBytes = key.getBytes(StandardCharsets.UTF_8);    md5.update(keyBytes);    byte[] digest = md5.digest();    // hash code, Truncate to 32-bits    long hashCode = (long) (digest[3] & 0xFF) << 24            | ((long) (digest[2] & 0xFF) << 16)            | ((long) (digest[1] & 0xFF) << 8)            | (digest[0] & 0xFF);    return hashCode & 0xffffffffL;}

Importantly, how to generate the hash ring and avoid skewness? Let's thedoSelect() method inHashLoadBalance as follows:

    private static final int VIRTUAL_NODE_NUM = 5;
    @Override    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {        final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();        for (DivideUpstream address : upstreamList) {            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {                long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);                treeMap.put(addressHash, address);            }        }        long hash = hash(String.valueOf(ip));        SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);        if (!lastRing.isEmpty()) {            return lastRing.get(lastRing.firstKey());        }        return treeMap.firstEntry().getValue();    }

In this method, duplicated labels are used which are called "virtual nodes" (i.e. 5 virtual nodes point to a single "real" server). It will make the distribution in hash ring more evenly, and reduce the occurrence of data skewness.

In order to rescue the data sorted in the hash ring, and can be accessed quickly, we use ConcurrentSkipListMap of Java to store the server node lists ( with virtual nodes) and its hash value as key. This class a member of Java Collections Framework, providing expected average log(n) time cost for retrieve and access operations safely execute concurrent by multiple threads.

Furthermore, the method tailMap(K fromKey) of ConcurrentSkipListMap can return a view of portion of the map whose keys are greater or equal to the fromKey, and not need to navigate the whole map.

In the above code section, after the hash ring is generated, it uses tailMap(K fromKey) of ConcurrentSkipListMap to find the subset that the elements greater, or equal to the hash value of the requested ip, its first element is just the node to be routed. With the suitable data structure, the code looks particularly clear and concise.

Consistent hashing resolved the poor scalability of the traditional hashing by modular operation.

RoundRobinLoadBalance#

The original Round-robin selection is to select server nodes one by one from the candidate list. Whenever some nodes has crash ( ex, cannot be connected after 1 minute), it will be removed from the candidate list, and do not attend the next round, until the server node is recovered and it will be add to the candidate list again. In RoundRobinLoadBalance,the weight Round Robin per-packet schema is implemented.

In order to work in concurrent system, it provides an inner static class WeigthRoundRobin to store and calculate the rolling selections of each server node. Following is the main section of this class( removed remark )

protected static class WeightedRoundRobin {
    private int weight;    private final AtomicLong current = new AtomicLong(0);
    private long lastUpdate;
    void setWeight(final int weight) {        this.weight = weight;        current.set(0);    }    long increaseCurrent() {        return current.addAndGet(weight);    }
    void sel(final int total) {        current.addAndGet(-1 * total);    }    void setLastUpdate(final long lastUpdate) {        this.lastUpdate = lastUpdate;    }}

Please focus on the these method:

  • setWeight(final int weight) : set the current value by weight

  • increaseCurrent(): Increment the current value by weight, and current set to 0.

  • sel(final int total): decrement the current value by total

    Let's see how the weight factor being used in this round-robin selection?

    First it defines a two-level ConcurrentMap type variable named as methodWeightMap , to cache the server node lists and the rolling selection data about each server node.

private final ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<>(16);

In this map, the key of first level is set to upstreamUrl of first element in server node list. The type of second object is ConcurrentMap<String, WeightedRoundRobin>, the key of this inner Map is the value upstreamUrlvariable of each server node in this server list, the value object is WeightedRoundRobin, used to trace the rolling selection data about each server node. As to the implementation class for the Map object, we use ConcurrentHashMap of JUC, a hash table supporting full concurrency of retrievals and high expected concurrency for updates.

In the second level of the map, the embedded static class - WeighedRoundRobin of each node is thread-safe, implementing the weighted RoundRobin per bucket. The following is the code of the doselect() method of this class.

@Overridepublic DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {    String key = upstreamList.get(0).getUpstreamUrl();    ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);    if (map == null) {        methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));        map = methodWeightMap.get(key);    }    int totalWeight = 0;    long maxCurrent = Long.MIN_VALUE;    long now = System.currentTimeMillis();    DivideUpstream selectedInvoker = null;    WeightedRoundRobin selectedWRR = null;    for (DivideUpstream upstream : upstreamList) {        String rKey = upstream.getUpstreamUrl();        WeightedRoundRobin weightedRoundRobin = map.get(rKey);        int weight = getWeight(upstream);        if (weightedRoundRobin == null) {            weightedRoundRobin = new WeightedRoundRobin();            weightedRoundRobin.setWeight(weight);            map.putIfAbsent(rKey, weightedRoundRobin);        }        if (weight != weightedRoundRobin.getWeight()) {            //weight changed            weightedRoundRobin.setWeight(weight);        }        long cur = weightedRoundRobin.increaseCurrent();        weightedRoundRobin.setLastUpdate(now);        if (cur > maxCurrent) {            maxCurrent = cur;            selectedInvoker = upstream;            selectedWRR = weightedRoundRobin;        }        totalWeight += weight;    }    ......  //erase the section which handles the time-out upstreams.     if (selectedInvoker != null) {        selectedWRR.sel(totalWeight);        return selectedInvoker;    }    // should not happen here    return upstreamList.get(0);}

For example we assume upstreamUrl values of three server nodes is: LIST = [upstream-20, upstream-50, upstream-30]. After a round of execution, the data in newly created methodWeightMap is as follows:

methodWeightMap

For the above example LIST, assumes the weight array is [20,50,30]. the following figure shows the value change and polling selection process of the current array in WeighedRoundRobin object.

weighted-roundrobin-demo

In each round, it will choose the server node with max current value.

  • Round1:
    • Traverse the server node list, initialize the weightedRoundRobin instance of each server node or update the weight value of server nodes object DivideUpstream
    • say, in this case, after traverse, the current array of the node list changes to [20, 50,30],so according to rule, the node Stream-50 would be chosen, and then the static object WeightedRoundRobin of Stream-50 executes sel(-total) , the current array is now [20,-50, 30].
  • Round 2: after traverse, the current array should be [40,0,60], so the Stream-30 node would be chosen, current array is now [40,0,-40].
  • Round 3: after traverse, current array changes to [60,50,-10], Stream-20 would be chosen,and current array is now [-40,50,-10].

When there is any inconsistence or some server crashed, for example, the lists size does not match with the elements in map, it would copy and modify the element with lock mechanism, and remove the timeout server node, the data in Map updated. Following is the fault tolerance code segment.

    if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {        try {            // copy -> modify -> update reference            ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);            newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);            methodWeightMap.put(key, newMap);        } finally {            updateLock.set(false);        }    }
    if (selectedInvoker != null) {        selectedWRR.sel(totalWeight);        return selectedInvoker;    }    // should not happen here    return upstreamList.get(0);

LoadBalanceUtils#

In this class, a static method calling LoadBalance is provided, whereExtensionLoader is the entry point of Apache Shenyu SPI. That is to say, LoadBalance module is configurable and extensible. The algorithm variable in this static method is the name enumeration type defined in LoadBalanceEnum.

    /**     * Selector divide upstream.     *     * @param upstreamList the upstream list     * @param algorithm    the loadBalance algorithm     * @param ip           the ip     * @return the divide upstream     */    public static DivideUpstream selector(final List<DivideUpstream> upstreamList, final String algorithm, final String ip) {        LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm);        return loadBalance.select(upstreamList, ip);    }

Using of LoadBalance module#

In the above section, we describe the LoadBalance SPI and three implementation classes. Let's take a look at how the LoadBalance to be used in Apache Shenyu. DividePlugin is a plugin in Apache Shenyu responsible for routing http request. when enable to use this plugin, it will transfer traffic according to selection data and rule data, and deliver to next plugin downstream.

@SneakyThrows@Overrideprotected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {   ......}

The type of second parameter of doExecute() is ShenyuPluginChain, which represents the execution chain of plugins. For details, see the mechanism of Apache Shenyu Plugins. The third one is SelectorData type, and the fourth is RuleData type working as the rule data.

In doExecute() of DividePlugin, first verify the size of header, content length, etc, then preparing for load balancing.

Following is a code fragment usingLoadBalance in the doExecute() method:

   // find the routing server node list   List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());    ...     // the requested ip    String ip =   Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
    //calling the Utility class and invoke the LoadBalance processing.    DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);

In the above code, the output ofruleHandle.getLoadBalance() is the name variable defined in LoadBalanceEnum, that is random, hash, roundRobin, etc. It is very convenient to use LoadBalance by LoadBalanceUtils. When adding more LoadBalance implementing classes, the interface in plugin module will not be affect at all.

Summary#

After reading through the code of LoadBalance module, from the design perspective, it is concluded that this module has the following characteristics:

  1. Extensibility: Interface oriented design and implemented on Apache Shenyu SPI mechanism, it can be easily extended to other dynamic load balancing algorithms (for example, least connection, fastest mode, etc), and supports cluster processing.
  2. Scalability: Every load balancing implementation, weighted Random, consistency Hashing and weighted RoundRobin can well support increase or decrease cluster overall capacity.
  3. More detailed design such as warm-up can bring better performance and obtain better overall stability.

MatchStrategy -- analyze the design based on SPI

· 5 min read
Apache ShenYu Contributor

In most of the plugins ( such as Dubbo, gRPC,Spring-cloud, etc) of Apache Shenyu, the routingparameters are designed to support the combination of multiple conditions. In order to realize such requirements, the parameters and behaviors are abstracted to three parts according to its SPI mechanism, and implemented in shenyu-plugin-base module.

  • ParameterData-parameters
  • PredictJudge-predicate
  • MatchStrategy-matching strategy

Relatively speaking, the MatchStrategy is the part that needs the least extension points. For the combination judgement of multiple conditions, the common select rules are: All conditions are matched, at least one is match, at least the first is met, or most of conditions satisfied. As well need to handle various type of parameters, for example: IP, header, uri, etc. How to make the MatchStrategy to be simple to use and extensible?

MatchStrategy#

The implementation of MatchStrategy is in shenyu-plugin-base module. It has based on the SPI creation mechanism, and used factory pattern and strategy design pattern. The class diagram of MatchStrategy is showed as follows.

MatchStrategy-class-diagram

Base on the interface MatchStrategy to design the implementation classes, and the abstract class AbstractMatchStrategy supplies common method, while the factory class MatchStrategyFactory provides creation functions.

MatchStrategy Interface#

First, let's look at the MatchStrategy SPI interface

@SPIpublic interface MatchStrategy {
    Boolean match(List<ConditionData> conditionDataList, ServerWebExchange exchange);}

The annotation @SPI means that this is an SPI interface. Where ServerWebExchange is org.springframework.web.server.ServerWebExchange, represents the request-response interactive content of of HTTP. Following is the code of ConditionData, the more detail about this class can refer to code analysis of PredicteJudge

public class ConditionData {
    private String paramType;    private String operator;
    private String paramName;    private String paramValue;}

AbstractMatchStrategy#

Second, let's look at the abstract class AbstractMatchStrategy,it has defined a buildRealData method,in this method wrapped various parameters to a unified interface through the functionality of ParameterDataFactory, which is the factory class of ParameterData. There supports a variety types of parameters , such as Ip, Cookie, Header,uri, etc. Modifications of such parameters will not impact the calling of matching rules of MatchStrategy.

public abstract class AbstractMatchStrategy {
    public String buildRealData(final ConditionData condition, final ServerWebExchange exchange) {        return ParameterDataFactory.builderData(condition.getParamType(), condition.getParamName(), exchange);    }}

Implementation class and profile#

Then, let's look at the two implementation class based on the above interface in shenyu-plugin-base module , that is:

  • AndMatchStrategy- AND -All conditions are matched

  • OrMatchStrategy- OR -at least one is match

    The profile is shown as follows, which locates at the SHENYU_DIRECTORYdirectory. When starting up, the top-level SPI classes will read the key-value and load the classes and cache them.

and=org.apache.shenyu.plugin.base.condition.strategy.AndMatchStrategyor=org.apache.shenyu.plugin.base.condition.strategy.OrMatchStrategy

These two implementation classes inherit AbstractMatchStrategy and implement MatchStrategy.

AndMatchStrategy- “AND” relation#

Because PredicateJudge encapsulates the diversity of Predicate , and ConditionData and ParameData can present variety of parameters, for treating of multiple conditions, usingstream and lambda expression, It can be very simple and efficient to process "AND" logic- all conditions must be matched.

@Joinpublic class AndMatchStrategy extends AbstractMatchStrategy implements MatchStrategy {
    @Override    public Boolean match(final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {        return conditionDataList                .stream()                .allMatch(condition -> PredicateJudgeFactory.judge(condition, buildRealData(condition, exchange)));    }}

The OrMatchStrategy similarly implements the "OR" logic- at least one is match.

MatchStrategyFactory#

This is the factory class of MatchStrategy,there are two methods, one is newInstance(), which will return the MatchStrategy implementation class instance cached by the SPI ExtensionLoader indexed by the key-value.

    public static MatchStrategy newInstance(final Integer strategy) {        String matchMode = MatchModeEnum.getMatchModeByCode(strategy);        return ExtensionLoader.getExtensionLoader(MatchStrategy.class).getJoin(matchMode);    }

the matchMode will be the name of strategy, the value will be "and" or "or". The MatchModeEnum defines the code and name of match strategy as follows.

AND(0, "and"), OR(1, "or");

Another method is match() method, which will invoke the match() method of implementation class.

    public static boolean match(final Integer strategy, final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {        return newInstance(strategy).match(conditionDataList, exchange);    }

How it works#

AbstractShenyuPlugin is the base class of plugins in shenyu-plugin module. In this class two selection method are defined: filterSelector() and filterRule() , Both of them call the match() method of MatchStrategyFactory. The code of filterSelector() is shown as follows.

    private Boolean filterSelector(final SelectorData selector, final ServerWebExchange exchange) {        if (selector.getType() == SelectorTypeEnum.CUSTOM_FLOW.getCode()) {            if (CollectionUtils.isEmpty(selector.getConditionList())) {                return false;            }            return MatchStrategyFactory.match(selector.getMatchMode(), selector.getConditionList(), exchange);        }        return true;    }

In filterSelector() method, after validation of the SelectorData, calls the match method of MatchStrategyFactory, and then this factory class will invokes the match method of corresponding implementation class.

    private Boolean filterRule(final RuleData ruleData, final ServerWebExchange exchange) {        return ruleData.getEnabled() && MatchStrategyFactory.match(ruleData.getMatchMode(), ruleData.getConditionDataList(), exchange);    }

In filterRule() it is also calls the match() method of MatchStrategyFactory. Does it look particularly concise or even simple? In the code analysis of PredicteJudge , you can see more detail about parameter processing in shenyu-plugin.

Summary#

Due to the use of SPI mechanism of Apache Shen, the parameter selection module has the characteristic of loose coupling and extensibility. In terms of the combination of multiple conditions, MatchStrategy provides a good design. Although currently only two implementation classes are present, it can be easily develop more complex MatchStrategy rules in the future, such as "firstOf"-first condition must matched, or "mostOf"- most of the conditions must be matched, etc.

Interested readers can read the source code of 'shenyu-plugin' to learn more.

Code Analysis For Param-Mapping Plugin

· 5 min read
Kunshuai Zhu
Apache ShenYu Contributor

Before starting, you can refer to this article to start the gateway

Body#

Let's take a look at the structure of this plugin first, as shown in the figure below.

param-mapping-structure

Guess: handler is used for data synchronization; strategy may be adapted to various request bodies, which should be the focus of this plugin; ParamMappingPlugin should be the implementation of ShenyuPlugin.

First, take a look at the ParamMappingPlugin, the focus is on the override of the doExecute method.

public Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {    ... // judge whether paramMappingHandle is null    // Determine the request body type according to the contentType in the header line    HttpHeaders headers = exchange.getRequest().getHeaders();    MediaType contentType = headers.getContentType();    // *    return match(contentType).apply(exchange, chain, paramMappingHandle);}
  • The match method returns the corresponding Operator according to contentType

    private Operator match(final MediaType mediaType) {    if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {        return operatorMap.get(MediaType.APPLICATION_JSON.toString());    } else if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {        return operatorMap.get(MediaType.APPLICATION_FORM_URLENCODED.toString());    } else {        return operatorMap.get(Constants.DEFAULT);    }}

    As can be seen from the code of the match method, there are currently three types of DefaultOperator, FormDataOperator, and JsonOperator, which support the request body in two formats: x-www-form-urlencoded and json.

So let's take a look at what the above three operators are like.

1. DefaultOperator#

Nothing happens, its apply method just continues to execute the plug-in chain, and has no real function. When the request body does not match the Operator, it will be skipped by DefaultOperator.

2. FormDataOperator#

This class is used to process the request body in the format of x-www-form-urlencoded.

Mainly depends on the apply method, but it looks a bit strange.

public Mono<Void> apply(final ServerWebExchange exchange, final ShenyuPluginChain shenyuPluginChain, final ParamMappingHandle paramMappingHandle) {    return exchange.getFormData()            .switchIfEmpty(Mono.defer(() -> Mono.just(new LinkedMultiValueMap<>())))            .flatMap(multiValueMap -> {                ...            });}

The code in the ellipsis is the processing of the request body, as follows.

// judge whether it is emptyif (Objects.isNull(multiValueMap) || multiValueMap.isEmpty()) {    return shenyuPluginChain.execute(exchange);}// convert form-data to jsonString original = GsonUtils.getInstance().toJson(multiValueMap);LOG.info("get from data success data:{}", original);// *modify request body*String modify = operation(original, paramMappingHandle);if (StringUtils.isEmpty(modify)) {    return shenyuPluginChain.execute(exchange);}...// Convert the modified json into LinkedMultiValueMap. Pay attention to this line, it will be mentioned later!LinkedMultiValueMap<String, String> modifyMap = GsonUtils.getInstance().toLinkedMultiValueMap(modify);...final BodyInserter bodyInserter = BodyInserters.fromValue(modifyMap);...// modify the request body in the exchange, and then continue to execute the plugin chainreturn bodyInserter.insert(cachedBodyOutputMessage, new BodyInserterContext())        .then(Mono.defer(() -> shenyuPluginChain.execute(exchange.mutate()                .request(new ModifyServerHttpRequestDecorator(httpHeaders, exchange.getRequest(), cachedBodyOutputMessage))                .build())        )).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(cachedBodyOutputMessage, throwable));

PS: The omitted part is to set the request first and other operations.

The more important thing above should be the modification request body of the star, that is, the call of the operation method. Here, because of the parameter type, the default method of the Operator interface will be called first (instead of being overridden by the FormDataOperator).

default String operation(final String jsonValue, final ParamMappingHandle paramMappingHandle) {    DocumentContext context = JsonPath.parse(jsonValue);    // call the override operation method and add addParameterKey    operation(context, paramMappingHandle);    // replace the related replacedParameterKey    if (!CollectionUtils.isEmpty(paramMappingHandle.getReplaceParameterKeys())) {        paramMappingHandle.getReplaceParameterKeys().forEach(info -> {            context.renameKey(info.getPath(), info.getKey(), info.getValue());        });    }    // Delete the related removeParameterKey    if (!CollectionUtils.isEmpty(paramMappingHandle.getRemoveParameterKeys())) {        paramMappingHandle.getRemoveParameterKeys().forEach(info -> {            context.delete(info);        });    }    return context.jsonString();}

After sorting it out, we can find that the json tool JsonPath imported here makes the processing of the request body much simpler and clearer.

In addition, we can notice that the FormDataOperator overrides the operation(DocumentContext, ParamMappingHandle) method.

Why override it? There is a default method for handling addParameterKey in the interface.

// Default method in Operator interfacedefault void operation(final DocumentContext context, final ParamMappingHandle paramMappingHandle) {    if (!CollectionUtils.isEmpty(paramMappingHandle.getAddParameterKeys())) {        paramMappingHandle.getAddParameterKeys().forEach(info -> {            context.put(info.getPath(), info.getKey(), info.getValue()); //不同之处        });    }}
// method overridden by FormDataOperator@Overridepublic void operation(final DocumentContext context, final ParamMappingHandle paramMappingHandle) {    if (!CollectionUtils.isEmpty(paramMappingHandle.getAddParameterKeys())) {        paramMappingHandle.getAddParameterKeys().forEach(info -> {            context.put(info.getPath(), info.getKey(), Arrays.asList(info.getValue()));        });    }}

In fact, there is such a line in FormDataOperator#apply (mentioned earlier): LinkedMultiValueMap<String, String> modifyMap = GsonUtils.getInstance().toLinkedMultiValueMap(modify);

This line converts the modified json into LinkedMultiValueMap, GsonUtils#toLinkedMultiValueMap is as follows.

public LinkedMultiValueMap<String, String> toLinkedMultiValueMap(final String json) {    return GSON.fromJson(json, new TypeToken<LinkedMultiValueMap<String, String>>() {    }.getType());}

The attribute targetMap in the LinkedMultiValueMap class is defined as: private final Map<K, List<V>> targetMap

Therefore, the value in the json string must be in the form of a list, otherwise Gson will throw a conversion error exception, which is why the FormDataOperator must override the operator method.

But why use LinkedMultiValueMap?

Go back to the first line exchange.getFormData of the FormDataOperator#apply method. In SpringMVC, the return value type of DefaultServerWebExchange#getFormData is Mono<MultiValueMap<String, String>>, and LinkedMultiValueMap is a subclass of MultiValueMap. And, the getFormData method is for the request body in the format of x-www-form-urlencoded.

param-mapping-getFormData

三、JsonOperator#

Obviously, this class is used to process the request body in Json format.

public Mono<Void> apply(final ServerWebExchange exchange, final ShenyuPluginChain shenyuPluginChain, final ParamMappingHandle paramMappingHandle) {    ServerRequest serverRequest = ServerRequest.create(exchange, MESSAGE_READERS);    Mono<String> mono = serverRequest.bodyToMono(String.class).switchIfEmpty(Mono.defer(() -> Mono.just(""))).flatMap(originalBody -> {        LOG.info("get body data success data:{}", originalBody);        // call the default operation method to modify the request body        String modify = operation(originalBody, paramMappingHandle);        return Mono.just(modify);    });    BodyInserter bodyInserter = BodyInserters.fromPublisher(mono, String.class);    ... //process the header line    CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);    // modify the request body in the exchange, and then continue to execute the plugin chain    return bodyInserter.insert(outputMessage, new BodyInserterContext())            .then(Mono.defer(() -> {                ServerHttpRequestDecorator decorator = new ModifyServerHttpRequestDecorator(headers, exchange.getRequest(), outputMessage);                return shenyuPluginChain.execute(exchange.mutate().request(decorator).build());            })).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(outputMessage, throwable));}

The processing flow of JsonOperator is roughly similar to that of FormDataOperator.

Conclusion#

Finally, use a picture to briefly summarize.

param-mapping-summary