Skip to main content

4 posts tagged with "spi"

View All Tags

LoadBalance SPI Source Code Analysis

· 13 min read
Apache ShenYu Contributor

Gateway applications need to support a variety of load balancing strategies, including random,Hashing, RoundRobin and so on. In Apache Shenyu gateway, it not only realizes such traditional algorithms, but also makes smoother traffic processing for the entry of server nodes through detailed processing such as traffic warm-up, so as to obtain better overall stability. In this article, let's walk through how Apache Shenyu is designed and implemented this part of the function.

This article based on shenyu-2.4.0 version of the source code analysis.


LoadBalance SPI#

The implementation of LoadBalance is in shenyu-plugin-divide module. It has based on its SPI creation mechanism. The core interface code is shown as follows. This interface well explains the concept: load balancing is to select the most appropriate node from a series of server nodes. Routing, traffic processing and load balancing is the basic function of LoadBalance SPI.

@SPIpublic interface LoadBalance {
    /**     * @param upstreamList upstream list     * @param ip ip     * @return divide upstream     */    DivideUpstream select(List<DivideUpstream> upstreamList, String ip);}

Where upstreamList represents the server nodes list available for routing. DivideUpstream is the data structure of server node, the important elements including protocol, upstreamUrl , weight, timestamp, warmup.

public class DivideUpstream implements Serializable {    private String upstreamHost;    /**     * this is http protocol.     */    private String protocol;    private String upstreamUrl;    private int weight;    /**     * false close/ true open.     */    @Builder.Default    private boolean status = true;    private long timestamp;    private int warmup;}

Design of LoadBalance module#

The class diagram of LoadBalance moduleisshown as follows.


We can draw the outline of LoadBalance module from the class diagram:

  1. The abstract class AbstractLoadBalance implements the SPI LoadBalance interface,and supplies the template methods for selection related, such as select(), selector(),and gives the calculation of weight.

  2. Three implementation classes which inherit AbstractLoadBalance to realize their own logic:

    • RandomLoadBalance - Weight Random
    • HashLoadBalance - Consistent Hashing
    • RoundRobinLoadBalance -Weight Round Robin per-packet
  3. The utility class LoadBalanceUtil provides public static method to be called.

    The implementation classes and algorithms are configurable. According to its specification, by adding profile in SHENYU_DIERECTORY directory, the data in profile should be key=value-class format, where the value-class will be load by the Apache Shenyu SPI class loader, and key value should be an name defined in LoadBalanceEnum.


The code of LoadBalanceEnum is as follows:

public enum LoadBalanceEnum {    /**     * Hash load balance enum.     */    HASH(1, "hash", true),
    /**     * Random load balance enum.     */    RANDOM(2, "random", true),
    /**     * Round robin load balance enum.     */    ROUND_ROBIN(3, "roundRobin", true);
    private final int code;    private final String name;    private final boolean support;}


This abstract class implements the LoadBalance interface and define the abstract method doSelect() to be processed by the implementation classes. In the template method select(), It will do validation first then call the doSelect() method.

   /**      * Do select divide upstream.     *     * @param upstreamList the upstream list     * @param ip           the ip     * @return the divide upstream     */    protected abstract DivideUpstream doSelect(List<DivideUpstream> upstreamList, String ip);
    @Override    public DivideUpstream select(final List<DivideUpstream> upstreamList, final String ip) {        if (CollectionUtils.isEmpty(upstreamList)) {            return null;        }        if (upstreamList.size() == 1) {            return upstreamList.get(0);        }        return doSelect(upstreamList, ip);    }

When the timestamp of server node is not null, and the interval between current time and timestamp is within the traffic warm-up time, the formula for weight calculation is. $$ {1-1} ww = min(1,uptime/(warmup/weight)) $$ It can be seen from the formula that the final weight(ww) is proportional to the original-weight value. The closer the time interval is to the warmup time, the greater the final ww. That is, the longer the waiting time of the request, the higher the final weight. When there is no timestamp or other conditions, the ww is equal to the weight value of DivideUpstream object.

The central of thinking about warm-upis to avoid bad performance when adding new server and the new JVMs starting up.

Let's see how the load balancing with Random, Hashing and RoundRobin strategy is implemented.


The RandomLoadBalance can handle two situations:

  1. Each node without weight, or every node has the same weight, randomly choose one.
  2. Server Nodes with different weight, choose one randomly by weight.

Following is the random() method of RandomLoadBalance. When traversing server node list, if the randomly generated value is less than the weight of node, then the current node will be chosen. If after one round traversing, there's is no server node match, then it will return the first item of the list. The getWeight(DivideUpstream upstream) is defined in AbstractLoadBalance class.

    private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {        // If the weights are not the same and the weights are greater than 0, then random by the total number of weights        int offset = RANDOM.nextInt(totalWeight);        // Determine which segment the random value falls on        for (DivideUpstream divideUpstream : upstreamList) {            offset -= getWeight(divideUpstream);            if (offset < 0) {                return divideUpstream;            }        }        return upstreamList.get(0);    }


In HashLoadBalance, it takes the advantages of consistent hashing , that maps both the input traffic and the servers to a unit circle, or name as hash ring. For the requestedip address, with its hash value to find the node closest in clockwise order as the node to be routed. Let's see how consistent hashing is implemented in HashLoadBalance.

As to the hash algorithms, HashLoadBalance uses MD5 hash, which has the advantage of mixing the input in an unpredictable but deterministic way. The output is a 32-bit integer. the code is shown as follows:

private static long hash(final String key) {    // md5 byte    MessageDigest md5;    try {        md5 = MessageDigest.getInstance("MD5");    } catch (NoSuchAlgorithmException e) {        throw new ShenyuException("MD5 not supported", e);    }    md5.reset();    byte[] keyBytes;    keyBytes = key.getBytes(StandardCharsets.UTF_8);    md5.update(keyBytes);    byte[] digest = md5.digest();    // hash code, Truncate to 32-bits    long hashCode = (long) (digest[3] & 0xFF) << 24            | ((long) (digest[2] & 0xFF) << 16)            | ((long) (digest[1] & 0xFF) << 8)            | (digest[0] & 0xFF);    return hashCode & 0xffffffffL;}

Importantly, how to generate the hash ring and avoid skewness? Let's thedoSelect() method inHashLoadBalance as follows:

    private static final int VIRTUAL_NODE_NUM = 5;
    @Override    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {        final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();        for (DivideUpstream address : upstreamList) {            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {                long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);                treeMap.put(addressHash, address);            }        }        long hash = hash(String.valueOf(ip));        SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);        if (!lastRing.isEmpty()) {            return lastRing.get(lastRing.firstKey());        }        return treeMap.firstEntry().getValue();    }

In this method, duplicated labels are used which are called "virtual nodes" (i.e. 5 virtual nodes point to a single "real" server). It will make the distribution in hash ring more evenly, and reduce the occurrence of data skewness.

In order to rescue the data sorted in the hash ring, and can be accessed quickly, we use ConcurrentSkipListMap of Java to store the server node lists ( with virtual nodes) and its hash value as key. This class a member of Java Collections Framework, providing expected average log(n) time cost for retrieve and access operations safely execute concurrent by multiple threads.

Furthermore, the method tailMap(K fromKey) of ConcurrentSkipListMap can return a view of portion of the map whose keys are greater or equal to the fromKey, and not need to navigate the whole map.

In the above code section, after the hash ring is generated, it uses tailMap(K fromKey) of ConcurrentSkipListMap to find the subset that the elements greater, or equal to the hash value of the requested ip, its first element is just the node to be routed. With the suitable data structure, the code looks particularly clear and concise.

Consistent hashing resolved the poor scalability of the traditional hashing by modular operation.


The original Round-robin selection is to select server nodes one by one from the candidate list. Whenever some nodes has crash ( ex, cannot be connected after 1 minute), it will be removed from the candidate list, and do not attend the next round, until the server node is recovered and it will be add to the candidate list again. In RoundRobinLoadBalance,the weight Round Robin per-packet schema is implemented.

In order to work in concurrent system, it provides an inner static class WeigthRoundRobin to store and calculate the rolling selections of each server node. Following is the main section of this class( removed remark )

protected static class WeightedRoundRobin {
    private int weight;    private final AtomicLong current = new AtomicLong(0);
    private long lastUpdate;
    void setWeight(final int weight) {        this.weight = weight;        current.set(0);    }    long increaseCurrent() {        return current.addAndGet(weight);    }
    void sel(final int total) {        current.addAndGet(-1 * total);    }    void setLastUpdate(final long lastUpdate) {        this.lastUpdate = lastUpdate;    }}

Please focus on the these method:

  • setWeight(final int weight) : set the current value by weight

  • increaseCurrent(): Increment the current value by weight, and current set to 0.

  • sel(final int total): decrement the current value by total

    Let's see how the weight factor being used in this round-robin selection?

    First it defines a two-level ConcurrentMap type variable named as methodWeightMap , to cache the server node lists and the rolling selection data about each server node.

private final ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<>(16);

In this map, the key of first level is set to upstreamUrl of first element in server node list. The type of second object is ConcurrentMap<String, WeightedRoundRobin>, the key of this inner Map is the value upstreamUrlvariable of each server node in this server list, the value object is WeightedRoundRobin, used to trace the rolling selection data about each server node. As to the implementation class for the Map object, we use ConcurrentHashMap of JUC, a hash table supporting full concurrency of retrievals and high expected concurrency for updates.

In the second level of the map, the embedded static class - WeighedRoundRobin of each node is thread-safe, implementing the weighted RoundRobin per bucket. The following is the code of the doselect() method of this class.

@Overridepublic DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {    String key = upstreamList.get(0).getUpstreamUrl();    ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);    if (map == null) {        methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));        map = methodWeightMap.get(key);    }    int totalWeight = 0;    long maxCurrent = Long.MIN_VALUE;    long now = System.currentTimeMillis();    DivideUpstream selectedInvoker = null;    WeightedRoundRobin selectedWRR = null;    for (DivideUpstream upstream : upstreamList) {        String rKey = upstream.getUpstreamUrl();        WeightedRoundRobin weightedRoundRobin = map.get(rKey);        int weight = getWeight(upstream);        if (weightedRoundRobin == null) {            weightedRoundRobin = new WeightedRoundRobin();            weightedRoundRobin.setWeight(weight);            map.putIfAbsent(rKey, weightedRoundRobin);        }        if (weight != weightedRoundRobin.getWeight()) {            //weight changed            weightedRoundRobin.setWeight(weight);        }        long cur = weightedRoundRobin.increaseCurrent();        weightedRoundRobin.setLastUpdate(now);        if (cur > maxCurrent) {            maxCurrent = cur;            selectedInvoker = upstream;            selectedWRR = weightedRoundRobin;        }        totalWeight += weight;    }    ......  //erase the section which handles the time-out upstreams.     if (selectedInvoker != null) {        selectedWRR.sel(totalWeight);        return selectedInvoker;    }    // should not happen here    return upstreamList.get(0);}

For example we assume upstreamUrl values of three server nodes is: LIST = [upstream-20, upstream-50, upstream-30]. After a round of execution, the data in newly created methodWeightMap is as follows:


For the above example LIST, assumes the weight array is [20,50,30]. the following figure shows the value change and polling selection process of the current array in WeighedRoundRobin object.


In each round, it will choose the server node with max current value.

  • Round1:
    • Traverse the server node list, initialize the weightedRoundRobin instance of each server node or update the weight value of server nodes object DivideUpstream
    • say, in this case, after traverse, the current array of the node list changes to [20, 50,30],so according to rule, the node Stream-50 would be chosen, and then the static object WeightedRoundRobin of Stream-50 executes sel(-total) , the current array is now [20,-50, 30].
  • Round 2: after traverse, the current array should be [40,0,60], so the Stream-30 node would be chosen, current array is now [40,0,-40].
  • Round 3: after traverse, current array changes to [60,50,-10], Stream-20 would be chosen,and current array is now [-40,50,-10].

When there is any inconsistence or some server crashed, for example, the lists size does not match with the elements in map, it would copy and modify the element with lock mechanism, and remove the timeout server node, the data in Map updated. Following is the fault tolerance code segment.

    if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {        try {            // copy -> modify -> update reference            ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);            newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);            methodWeightMap.put(key, newMap);        } finally {            updateLock.set(false);        }    }
    if (selectedInvoker != null) {        selectedWRR.sel(totalWeight);        return selectedInvoker;    }    // should not happen here    return upstreamList.get(0);


In this class, a static method calling LoadBalance is provided, whereExtensionLoader is the entry point of Apache Shenyu SPI. That is to say, LoadBalance module is configurable and extensible. The algorithm variable in this static method is the name enumeration type defined in LoadBalanceEnum.

    /**     * Selector divide upstream.     *     * @param upstreamList the upstream list     * @param algorithm    the loadBalance algorithm     * @param ip           the ip     * @return the divide upstream     */    public static DivideUpstream selector(final List<DivideUpstream> upstreamList, final String algorithm, final String ip) {        LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm);        return, ip);    }

Using of LoadBalance module#

In the above section, we describe the LoadBalance SPI and three implementation classes. Let's take a look at how the LoadBalance to be used in Apache Shenyu. DividePlugin is a plugin in Apache Shenyu responsible for routing http request. when enable to use this plugin, it will transfer traffic according to selection data and rule data, and deliver to next plugin downstream.

@SneakyThrows@Overrideprotected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {   ......}

The type of second parameter of doExecute() is ShenyuPluginChain, which represents the execution chain of plugins. For details, see the mechanism of Apache Shenyu Plugins. The third one is SelectorData type, and the fourth is RuleData type working as the rule data.

In doExecute() of DividePlugin, first verify the size of header, content length, etc, then preparing for load balancing.

Following is a code fragment usingLoadBalance in the doExecute() method:

   // find the routing server node list   List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());    ...     // the requested ip    String ip =   Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
    //calling the Utility class and invoke the LoadBalance processing.    DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);

In the above code, the output ofruleHandle.getLoadBalance() is the name variable defined in LoadBalanceEnum, that is random, hash, roundRobin, etc. It is very convenient to use LoadBalance by LoadBalanceUtils. When adding more LoadBalance implementing classes, the interface in plugin module will not be affect at all.


After reading through the code of LoadBalance module, from the design perspective, it is concluded that this module has the following characteristics:

  1. Extensibility: Interface oriented design and implemented on Apache Shenyu SPI mechanism, it can be easily extended to other dynamic load balancing algorithms (for example, least connection, fastest mode, etc), and supports cluster processing.
  2. Scalability: Every load balancing implementation, weighted Random, consistency Hashing and weighted RoundRobin can well support increase or decrease cluster overall capacity.
  3. More detailed design such as warm-up can bring better performance and obtain better overall stability.

MatchStrategy -- analyze the design based on SPI

· 5 min read
Apache ShenYu Contributor

In most of the plugins ( such as Dubbo, gRPC,Spring-cloud, etc) of Apache Shenyu, the routingparameters are designed to support the combination of multiple conditions. In order to realize such requirements, the parameters and behaviors are abstracted to three parts according to its SPI mechanism, and implemented in shenyu-plugin-base module.

  • ParameterData-parameters
  • PredictJudge-predicate
  • MatchStrategy-matching strategy

Relatively speaking, the MatchStrategy is the part that needs the least extension points. For the combination judgement of multiple conditions, the common select rules are: All conditions are matched, at least one is match, at least the first is met, or most of conditions satisfied. As well need to handle various type of parameters, for example: IP, header, uri, etc. How to make the MatchStrategy to be simple to use and extensible?


The implementation of MatchStrategy is in shenyu-plugin-base module. It has based on the SPI creation mechanism, and used factory pattern and strategy design pattern. The class diagram of MatchStrategy is showed as follows.


Base on the interface MatchStrategy to design the implementation classes, and the abstract class AbstractMatchStrategy supplies common method, while the factory class MatchStrategyFactory provides creation functions.

MatchStrategy Interface#

First, let's look at the MatchStrategy SPI interface

@SPIpublic interface MatchStrategy {
    Boolean match(List<ConditionData> conditionDataList, ServerWebExchange exchange);}

The annotation @SPI means that this is an SPI interface. Where ServerWebExchange is org.springframework.web.server.ServerWebExchange, represents the request-response interactive content of of HTTP. Following is the code of ConditionData, the more detail about this class can refer to code analysis of PredicteJudge

public class ConditionData {
    private String paramType;    private String operator;
    private String paramName;    private String paramValue;}


Second, let's look at the abstract class AbstractMatchStrategy,it has defined a buildRealData method,in this method wrapped various parameters to a unified interface through the functionality of ParameterDataFactory, which is the factory class of ParameterData. There supports a variety types of parameters , such as Ip, Cookie, Header,uri, etc. Modifications of such parameters will not impact the calling of matching rules of MatchStrategy.

public abstract class AbstractMatchStrategy {
    public String buildRealData(final ConditionData condition, final ServerWebExchange exchange) {        return ParameterDataFactory.builderData(condition.getParamType(), condition.getParamName(), exchange);    }}

Implementation class and profile#

Then, let's look at the two implementation class based on the above interface in shenyu-plugin-base module , that is:

  • AndMatchStrategy- AND -All conditions are matched

  • OrMatchStrategy- OR -at least one is match

    The profile is shown as follows, which locates at the SHENYU_DIRECTORYdirectory. When starting up, the top-level SPI classes will read the key-value and load the classes and cache them.


These two implementation classes inherit AbstractMatchStrategy and implement MatchStrategy.

AndMatchStrategy- “AND” relation#

Because PredicateJudge encapsulates the diversity of Predicate , and ConditionData and ParameData can present variety of parameters, for treating of multiple conditions, usingstream and lambda expression, It can be very simple and efficient to process "AND" logic- all conditions must be matched.

@Joinpublic class AndMatchStrategy extends AbstractMatchStrategy implements MatchStrategy {
    @Override    public Boolean match(final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {        return conditionDataList                .stream()                .allMatch(condition -> PredicateJudgeFactory.judge(condition, buildRealData(condition, exchange)));    }}

The OrMatchStrategy similarly implements the "OR" logic- at least one is match.


This is the factory class of MatchStrategy,there are two methods, one is newInstance(), which will return the MatchStrategy implementation class instance cached by the SPI ExtensionLoader indexed by the key-value.

    public static MatchStrategy newInstance(final Integer strategy) {        String matchMode = MatchModeEnum.getMatchModeByCode(strategy);        return ExtensionLoader.getExtensionLoader(MatchStrategy.class).getJoin(matchMode);    }

the matchMode will be the name of strategy, the value will be "and" or "or". The MatchModeEnum defines the code and name of match strategy as follows.

AND(0, "and"), OR(1, "or");

Another method is match() method, which will invoke the match() method of implementation class.

    public static boolean match(final Integer strategy, final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {        return newInstance(strategy).match(conditionDataList, exchange);    }

How it works#

AbstractShenyuPlugin is the base class of plugins in shenyu-plugin module. In this class two selection method are defined: filterSelector() and filterRule() , Both of them call the match() method of MatchStrategyFactory. The code of filterSelector() is shown as follows.

    private Boolean filterSelector(final SelectorData selector, final ServerWebExchange exchange) {        if (selector.getType() == SelectorTypeEnum.CUSTOM_FLOW.getCode()) {            if (CollectionUtils.isEmpty(selector.getConditionList())) {                return false;            }            return MatchStrategyFactory.match(selector.getMatchMode(), selector.getConditionList(), exchange);        }        return true;    }

In filterSelector() method, after validation of the SelectorData, calls the match method of MatchStrategyFactory, and then this factory class will invokes the match method of corresponding implementation class.

    private Boolean filterRule(final RuleData ruleData, final ServerWebExchange exchange) {        return ruleData.getEnabled() && MatchStrategyFactory.match(ruleData.getMatchMode(), ruleData.getConditionDataList(), exchange);    }

In filterRule() it is also calls the match() method of MatchStrategyFactory. Does it look particularly concise or even simple? In the code analysis of PredicteJudge , you can see more detail about parameter processing in shenyu-plugin.


Due to the use of SPI mechanism of Apache Shen, the parameter selection module has the characteristic of loose coupling and extensibility. In terms of the combination of multiple conditions, MatchStrategy provides a good design. Although currently only two implementation classes are present, it can be easily develop more complex MatchStrategy rules in the future, such as "firstOf"-first condition must matched, or "mostOf"- most of the conditions must be matched, etc.

Interested readers can read the source code of 'shenyu-plugin' to learn more.

PredicateJudge -- analyze the design based on SPI

· 6 min read
Apache ShenYu Contributor

Apache Shenyu has been identified as a gateway application which supports a variety of protocols and microservice frameworks such as Dubbo, gRPC, Spring-Cloud, etc. To do this, the product has accomplished an elegant SPI (Service Provider Interface) as its foundation, and make the Rule data parsing and predicting program very simple , resiliency and security. As to rule data parsing processing, the SPI design increases the product's scalability. When appending new plugin, in most cases, the existing module is enough for rule data parsing , otherwise it can be rapidly carry out with tiny effort.

Top level design of SPI#

In Apache Shenyu, the SPI archtecure is defined in shenyu-spi module and composed of three parts: SPI interface, factory design pattern, and configuration file. There is two interface defined as annotation: @SPI and @Join. When class file with @Join annotation, it means that it will join as an SPI extension class, in other words, it is an application or registration. The @SPI denotes that the class is an SPI extension class.

Fig 1 classes in the shenyu-spi


The SPI configuration directory is META-INF/shenyu/. that is specified:


When starting the gateway system , the ExtensionLoader will scan the profiles under SHENYU_DIRECTORY, in turn, load and validate and then initialize each configed class. The configuration file uses "Key = class-file" format. During operation of the system, the corresponding SPI implementation class will be invoked through the factory mechanism.

Implementation of shenyu-plugin SPI#

In shenyu-plugin module, various plugins for HTTP routing are implemented according to the plugin mechanism, including request, redirect, response and rewrite, etc. Plugins for microservice frameworks such as Dubbo, gRPC , Spring-Cloud and Tars have been developed in the gateway product. And plugins are still increasing. If no such dependent module fo parsing and judge routing parameters and data, each plugin is necessary to implement the parsing functions, and has to frequently modify to support their matching rules, such as wildcard, regular expression, SpEL expression, etc. Therefore, they made a high level abstraction for routing parameter data following the SPI framework in shenyu-plugin module. The rule analysis consists of three parts:

  • ParameterData- parameter data

  • PredicatJudge- predicate whether the actural data match the rule

  • MatchStrategy- combine multiple conditions, the final used strategy

These implementation classes are defined in shenyu-plugin-base module. In each plugin, resolution and predication of the routing parameter can be realized through AbstractShenyuPlugin using the above SPIs. That is dedicated and easy to extend, in line with SOLID principle.

​ This section analyzes the PredictJudge in detail. You can find the dependency to shenyu-spi in the pom.xml of this module.

<dependency>    <groupId>org.apache.shenyu</groupId>    <artifactId>shenyu-spi</artifactId>    <version>${project.version}</version></dependency>

Design of PredicateJudge SPI#

PredicateJudge SPI is used to analyze and judge various routing rules configed in Apache Shenyu gateway. The name and functions of this SPI are similar to Predicate in Java, but the acceptance behavior is further abstracted applying for routing aspect. This SPI is implemented through the Factory pattern. Let's look at the Predictejudge SPI interface:

@SPI@FunctionalInterfacepublic interface PredicateJudge {
    /**     * judge conditionData and realData is match.     *     * @param conditionData {@linkplain ConditionData}     * @param realData       realData     * @return true is pass  false is not pass.     */    Boolean judge(ConditionData conditionData, String realData);}

The class diagram is as follows:

Fig 2-Predicate class diagram


The important methods of PredicateJudgeFactory are shown as follows:

Whenever need to parsing and matching routing data, you can use

    public static PredicateJudge newInstance(final String operator) {        return ExtensionLoader.getExtensionLoader(PredicateJudge.class).getJoin(processSpecialOperator(operator));    }
    public static Boolean judge(final ConditionData conditionData, final String realData) {        if (Objects.isNull(conditionData) || StringUtils.isBlank(realData)) {            return false;        }        return newInstance(conditionData.getOperator()).judge(conditionData, realData);    }

ConditionData contains of four attributes of String type: paramType, operator,paramName,paramValue


Where paramType must be the enumeration type ParamTypeEnum. The default supported paramType are:

post, uri,query, host, ip,header, cookie,req_method


operator must be the enumeration type OperatorEnum, currently supported operators are:

   match, =,regex, >,<, contains, SpEL,  Groovy, TimeBefore,TimeAfter

Base on the above defination , the plugin module provides the following eight PredicateJudge implemetation classes to realize the logic of these operators respectively.

Implementation classLogic descriptioncorespondece operator
ContainsPredicateJudge"contain" relation, the actual data needs contain the specified stringcontains
EqualsPredicateJudgeequals "="=
MatchPredicateJudgeused for URI context path matchingmatch
TimerAfterPredicateJudgeWhether the local time is after the specified timeTimeBefore
TimerBeforePredicateJudgeWhether the local time is before the specified timeTimeAfter
GroovyPredicateJudgeused Groovy syntax toe set ParamName and value dataGroovy
RegexPredicateJudgeused Regex to matchregex

How to use PredicateJudge#

When you want to parse parameters, you only need to call PredicateJudgeFactory as follows.

PredicateJudgeFactory.judge(final ConditionData conditionData, final String realData);

SPI profile#

The implementation class is configed in the file under directory SHENYU_DIRECTORY . It will be loaded and cached at startup.


The usage of PredicateJudge SPI in Shenyu gateway#

Most plugins in Apache Shenyu are inherited from AbstractShenyuPlugin. In this abstract class, the filter functions (selection and matching) are achieved through MatchStrategy SPI, and PredicateJudge will be invoked from MatchStrategy to predicate each condition data.

Fig 3- class diagram of plugins with PredicateJudge and MatchStrategy SPI


The process from client request calling the routing parsing moodule is showed as following chart.

Fig 4- flow chart for Shenyu gateway filter with parameter processing


  • When startup, the system will load SPI classes from profile and cache them.
  • When the client sends a new request to the Shenyu gateway, will call the corresponding plugin within the gateway.
  • When analyzing real data with routing rules, the PredicateJudge implementation class will be invoked according to the contained operator.


Examples of PredicateJudge judgement#

ContainsPredicateJudge- " contains“ rule#

For example, giving a ConditionData with: paramType="uri", paramValue 是 "/http/**", when using the "contains" relation: ContainsPredicateJudge, the matching result is as follows.

ConditionData (operator="contains")real datajudge result
paramType="uri", "/http/**""/http/**/test"true

About other PredicateJudge implemetantion classes, you can refer to the code and test classes.

RateLimiter SPI code analysis

· 16 min read
Apache ShenYu Contributor

Rate limiter is a very important integral of gateway application, to deal with high traffic. When the system is attacked abnormally by a large number of traffic gathered in a short time; When there are a large number of lower priority request need to be slow down or else it will effect your high priority transactions; Or sometimes your system can not afford the regular traffic; in these scenarios, we need to start rate limiter component to protect our system, through rejection, wait, load shedding,etc, limit the requests to an acceptable quantities, or only certain domains (or services) requests can get through.

Facing above scenarios, following need to be considered when designing the rate limiter component of an gateway.

  1. Supports a variety of rate limiter algorithms and easy to extends.
  2. Resilient resolvers which can distinguish traffic by different way, such as ip, url, even user group etc.
  3. High availability, can quickly get allow or reject result from rate limiter
  4. With fault tolerance against when rate limiter is down, the gateway can continue work.

This article will first introduce the overall architecture of the rate limiter module in Apache Shenyu, and then focus on the code analysis of rate limiter SPI.

This article based on shenyu-2.4.0 version of the source code analysis.

Overall design of RateLimiter#

Spring WebFlux is reactive and non-blocking web framework, which can benefit throughput and make applications more resilient. The plugin of Apache Shenyu is based on WebFlux,its rate limiter component is implemented in ratelimiter-plugin. In rate limiter process, the commonly used algorithms are token bucket, leaky bucket, etc. To speed up concurrency performance, the counting and calculation logic is treated in Redis, and Java code is responsible for the transmission of parameters. When applying Redis, the Lua script can be resident memory, and be executed as a whole, so it is atomic. Let alone the reducing of network overhead. Redis commands abstraction and automatic serialization/deserialization with Redis store is provided in Spring Data Redis. Because of based on reactive framework, the Spring Redis Reactive is used in ratelimiter-plugin.

The class diagram of this plugin is as follows, highlighting two packages related to RateLimiter SPI: resolver 和algorithm.


Design of RateLimiter SPI#

High performance issue is achieved through the architecture of Spring data+ Redis+Lua , two SPI are supplied in ratelimiter-plugin for the extension of algorithm and key resolver。

  • RateLimiterAlgorithm:used for algorithms expansion.
  • RateLimiterKeyResolver: used for resolver expansion, to distinguish requests by various information, including ip, url, ect.

The profile of SPI is located at directory of SHENYU_DIRECTORY (default/META-INF/shenyu).


Obtain the critical info of the request used for packet rate limiter,the interface of RateLimiterKeyResolver is follows:

@SPIpublic interface RateLimiterKeyResolver {
    /**     * get Key resolver's name.     *     * @return Key resolver's name     */    String getKeyResolverName();
    /**     * resolve.     *     * @param exchange exchange the current server exchange {@linkplain ServerWebExchange}     * @return rate limiter key     */    String resolve(ServerWebExchange exchange);}

@SPI registers the current interface as Apache Shenyu SPI. Method resolve(ServerWebExchange exchange) is used to provide the resolution way. Currently there are two key resolvers in RateLimiterKeyResolver SPI:WholeKeyResolve and RemoteAddrKeyResolver. The resolve method of RemoteAddrKeyResolveris as follows:

    @Override    public String resolve(final ServerWebExchange exchange) {        return Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();    }

Where the resolved key is ip of request. Based on SPI mechanism and its factory pattern, new resolver can be easily developed.

RateLimiterAlgorithm SPI#

RateLimiterAlgorithm SPI is used to identify and define different rate limiter algorithms, following is the class diagram of this module.


In this module, factory pattern is used , providing interface, abstract class and factory class, and four implementation classes. The lua script corresponding to the implementation class is enumerated in RateLimitEnum and located in /META-INF/scripts.

@SPIpublic interface RateLimiterAlgorithm<T> {        RedisScript<T> getScript();    List<String> getKeys(String id);        /**     * Callback string.     *     * @param script the script     * @param keys the keys     * @param scriptArgs the script args     */    default void callback(final RedisScript<?> script, final List<String> keys, final List<String> scriptArgs) {    }}

@SPI registers the current interface as Apache Shenyu SPI. There are three methods:

  • getScript() returns a RedisScript object, which will be passed to Redis.
  • getKeys(String id) returns a List contains with keys.
  • callback() the callback function which will be executed asynchronously later on, and default is an empty method.


The template method is implemented in this abstract class, and the reified generics used is List<Long>. Two abstract methods getScriptName() and getKeyName() are left for the implementation class. Following is the code to load lua script.

    public RedisScript<List<Long>> getScript() {        if (!this.initialized.get()) {            DefaultRedisScript redisScript = new DefaultRedisScript<>();            String scriptPath = "/META-INF/scripts/" + getScriptName();            redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(scriptPath)));            redisScript.setResultType(List.class);            this.script = redisScript;            initialized.compareAndSet(false, true);            return redisScript;        }        return script;    }

initialized is an AtomicBoolean type variable used to indicate whether the lua script is loaded. If has not been loaded, the system will read specified scripts form META-INF/scripts; After reading, specify the result with List type, and set initialized=true, then returning RedisScriptobject.

The code of getKeys() in AbstractRateLimiterAlgorithm is as follows:

    @Override    public List<String> getKeys(final String id) {        String prefix = getKeyName() + ".{" + id;        String tokenKey = prefix + "}.tokens";        String timestampKey = prefix + "}.timestamp";        return Arrays.asList(tokenKey, timestampKey);    }

Two strings are generated in this template method, where the tokenKey will work as Key to a Sorted map in Redis.

We can observe from above class diagram that ConcurrentRateLimiterAlgorithm and SlidingWindowRateLimiterAlgorithm override getKeys(String id) method but another two implementation classes not, and use template method in AbstractRateLimiterAlgorithm. Only in ConcurrentRateLimiterAlgorithm has override callback() method, the others not. We will do further analysis in the following.


The method getsRateLimiterAlgorithm instance by name in RateLimiterAlgorithmFactory is as follows:

public static RateLimiterAlgorithm<?> newInstance(final String name) {    return Optional.ofNullable(ExtensionLoader.getExtensionLoader(RateLimiterAlgorithm.class).getJoin(name)).orElse(new TokenBucketRateLimiterAlgorithm());}

ExtensionLoader of SPI is responsible for loading SPI classes by "name", if cannot find the specified algorithm class, it will return TokenBucketRateLimiterAlgorithm by default.

Data access with Redis#

Above detailed the extension interface in RateLimiter SPI. In Apache Shenyu, we use ReactiveRedisTemplate to perform Redis processing reactively, which is implemented inisAllowed() method of RedisRateLimiter class.

    public Mono<RateLimiterResponse> isAllowed(final String id, final RateLimiterHandle limiterHandle) {        // get parameters that will pass to redis from RateLimiterHandle Object        double replenishRate = limiterHandle.getReplenishRate();        double burstCapacity = limiterHandle.getBurstCapacity();        double requestCount = limiterHandle.getRequestCount();        // get the current used RateLimiterAlgorithm        RateLimiterAlgorithm<?> rateLimiterAlgorithm = RateLimiterAlgorithmFactory.newInstance(limiterHandle.getAlgorithmName());                ........        Flux<List<Long>> resultFlux = Singleton.INST.get(ReactiveRedisTemplate.class).execute(script, keys, scriptArgs);        return resultFlux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))                .reduce(new ArrayList<Long>(), (longs, l) -> {                    longs.addAll(l);                    return longs;                }).map(results -> {                    boolean allowed = results.get(0) == 1L;                    Long tokensLeft = results.get(1);                    return new RateLimiterResponse(allowed, tokensLeft);                })                .doOnError(throwable -> log.error("Error occurred while judging if user is allowed by RedisRateLimiter:{}", throwable.getMessage()))                .doFinally(signalType -> rateLimiterAlgorithm.callback(script, keys, scriptArgs));    }

The POJO class RateLimiterHandle wraps the parameters needed in rate limiter, they are algorithName, replenishRate, burstCapacity, requestCount, etc. First, gets the parameters that need to be passed into Redis from RateLimiterHandle class. Then obtain the current implementation class from RateLimiterAlgorithmFactory.

For convenience, we give an flow image to show the parameters I/O and execution procedure in Java and Redis respectively. On the left is the second half of isAllowed() , and on the right is the processing of Lua script.

Following is the execution process of the JAVA code.

  1. Get two keys value in List<String> type from the getKeys() method, the first element will map to a sorted set in Redis.

  2. Set four parameters, replenishRate , burstCapacity, timestamp (EpochSecond) and requestcount.

  3. Calling ReactiveRedisTemplate with the scripts, keys and parameters, the return a Flux<List<Long>>

  4. The return value is converted from Flux<ArrayList<Long>> to Mono<ArrayList<Long>> the through reduce() of Flux ,and then transform it to Mono<RateLimiterResponse> via map() function. Returned two data, one is allowed (1-allow, 0- not allowed), the other is tokensLeft, the number of available remaining request.

  5. As for the fault tolerance, due to using of reactor non-blocking model, when an error occurs, the fallback function onErrorResume() will be executed and a new stream (1L, -1L) will generated by Flux.just, which means allow the request getting through, and log the error on the side.

  6. After that, performs the doFinally() method, that is to execute the callback() method of the implementation class.


Four rate limiter algorithms#

From above we know that how the java code works with Redis in the gateway. In this chapter we briefly analysis some code of the four rate limiter algorithms, to understand how to develop the interface of RateLimiter SPI and work efficiently with Redis.

Four rate limiter algorithms are supplied in Apache Shenyu Ratelimit SPI:

Algorithm nameJava classLua script file
Request rate limiterTokenBucketRateLimiterAlgorithmrequest_rate_limiter.lua
Slide window rate limiterSlidingWindowRateLimiterAlgorithmliding_window_request_rate_limiter.lua
Concurrent rate limiterConcurrentRateLimiterAlgorithmconcurrent_request_rate_limiter.lua
Leaky bucket algorithmLeakyBucketRateLimiterAlgorithmrequest_leaky_rate_limiter.lua
  1. Token bucket rate limiter: Limiting the traffic according to the number of requests. Assuming that N requests can be passed per second, when requests exceeding N will be rejected. In implementing of the algorithm, the requests will be grouped by bucket, the tokens will be generated at an evenly rate. If the number of requests is less than the tokens in the bucket, then it is allowed to pass. The time window is 2* capacity/rate.
  2. Slide window rate limiter: Different from token bucket algorithm, its window size is smaller than that of token bucket rate limiter, which is a capacity/rate. And move backward one time window at a time. Other rate limiter principles are similar to token bucket.
  3. Concurrent rate limiter: Strictly limit the concurrent requests to N. Each time when there is a new request, it will check whether the number of concurrent requests is greater than N. If it is less than N, it is allowed to pass through, and the count is increased by 1. When the requests call ends, the signal is released (count minus 1).
  4. Leaky bucket rate limiter: In contrast with token bucket algorithm, the leaky bucket algorithm can help to smooths the burst of requests and only allows a pre-defined N number of requests. This limiter can force the output flow at a constant rate of N. It is based on a leaky bucket model, the leaky water quantity is time interval*rate. if the leaky water quantity is greater than the number of has used (represented by key_bucket_count), then clear the bucket, that is, set the key_bucket_count to 0. Otherwise, set key_bucket_count minus the leaky water quantity. If the number (requests + key_bucket_count ) is less than the capacity, then allow the requests passing through.

Let's understand the functionality of callback() by reading concurrent rate limiter code, and understand the usage of getKeys() through reading the Lua script of token rate limiter and slide window rate limiter.

callback() used in Concurrent requests limiter#

The getKeys() method of ConcurrentRateLimiterAlgorithm overrides the template method in AbstractRateLimiterAlgorithm

    @Override    public List<String> getKeys(final String id) {        String tokenKey = getKeyName() + ".{" + id + "}.tokens";        String requestKey = UUIDUtils.getInstance().generateShortUuid();        return Arrays.asList(tokenKey, requestKey);    }

The second element, requestKey is a long type and non-duplicate value (generated by a distributed ID generator,it is incremented and smaller than the current time Epochsecond value). The corresponding Lua script in concurrent_request_rate_limiter.lua:

local key = KEYS[1]
local capacity = tonumber(ARGV[2])local timestamp = tonumber(ARGV[3])local id = KEYS[2]

Here id is requestKey generated by getKeys() method, it is an uuid(unique value). Subsequent process is as follows:

local count ="zcard", key)local allowed = 0
if count < capacity then"zadd", key, timestamp, id)  allowed = 1  count = count + 1endreturn { allowed, count }

First, using zcard command to obtain the cardinality of the sorted set, and set count equals the cardinality , if the cardinality is less than the capacity, we will add a new member id (it is an uuid) to the sorted set, with the score of current time(in seconds) . then count =count+1, the cardinality is also incremented by 1 in reality.

All of the code above is executed in Redis as an atomic transaction. If there are a large number of concurrent requests from the same key( such as ip) , the cardinality of the sorted set of this key will increasing sharply, when then capacity limit is exceeded, the service will be denied, that is allowed =0

In concurrent requests limiter, It is required to release the semaphore when the request is completed. However, it is not included in Lua script.

Let's see the callback function of ConcurrentRateLimiterAlgorithm

    @Override    @SuppressWarnings("unchecked")    public void callback(final RedisScript<?> script, final List<String> keys, final List<String> scriptArgs) {        Singleton.INST.get(ReactiveRedisTemplate.class).opsForZSet().remove(keys.get(0), keys.get(1)).subscribe();    }

Here gives asynchronous subscription, using ReactiveRedisTemplate to delete the elements (key,id) in Redis store. That is once the request operation ends, the semaphore will be released. This remove operation cannot be executed in Lua script. This is just what design intention of callback in RateLimiterAlgorithm SPI .

getKeys() used in token bucket rate limiter#

Following is the corresponding Lua code:

local tokens_key = KEYS[1]local timestamp_key = KEYS[2]

Here we omit the code that get the parameters of rate ,capacity, etc.

local fill_time = capacity/ratelocal ttl = math.floor(fill_time*2)

The window size variable(ttl) is approximately two times of capacity/rate.

local last_tokens = tonumber("get", tokens_key))if last_tokens == nil then  last_tokens = capacityend

Get last_tokens from the sorted set, if it not exist, then last_tokens equals capacity.

local last_refreshed = tonumber("get", timestamp_key))if last_refreshed == nil then  last_refreshed = 0end

Get the last refreshed time by the key =timestamp_key from the sorted set, and default 0.

local delta = math.max(0, now-last_refreshed)local filled_tokens = math.min(capacity, last_tokens+(delta*rate))local allowed = filled_tokens >= requestedlocal allowed_num = 0if allowed then  new_tokens = filled_tokens - requested  allowed_num = 1end

The filled_tokens is produced evenly by time interval * rate,if the number of tokens greater than requests, then allowed=1, and update new_tokens."setex", tokens_key, ttl, new_tokens)"setex", timestamp_key, ttl, now)
return { allowed_num, new_tokens }

Here now is current time parameters passed in, set tokens_key to hold the string new_tokens and settokens_key to timeout after ttl of seconds. Set timestamp_key to hold the string value now, and expires after ttl seconds.

getKeys() used in sliding window rate limiter#

The getKeys() in SlidingWindowRateLimiterAlgorithm also overrides the parent class, and the code is consistent with the method in ConcurrentRateLimiterAlgorithm

Following is the Lua code of slide window rate limiter, the receiving of other parameters is omitted.

local timestamp_key = KEYS[2]...... local window_size = tonumber(capacity / rate)local window_time = 1

Here set the window_size to capacity/rate.

local last_requested = 0local exists_key ='exists', tokens_key)if (exists_key == 1) then    last_requested ='zcard', tokens_key)end

Obtain the cardinality(last_requested) of the tokens_key in the sorted set.

local remain_request = capacity - last_requestedlocal allowed_num = 0if (last_requested < capacity) then    allowed_num = 1'zadd', tokens_key, now, timestamp_key)end

Calculate remaining available remain_request equals capacity minus last_requested . If last_requested less than capacity ,then allow current requests passing through,add element in the sorted set with (key=timestamp_key, value=now) .'zremrangebyscore', tokens_key, 0, now - window_size / window_time)'expire', tokens_key, window_size)
return { allowed_num, remain_request }

Previously has set window_time=1, using zremrangebyscore command of Redis to remove all the elements in the sorted set stored at tokens_key with a score in [0,now - window_size / window_time] , that is, move the window a window size. Set the expire time of tokens_key to window_size.

In the template method getKeys(final String id) of AbstractRateLimiterAlgorithm,the second key ( represented y secondKey) is a fixed string which concat the input parameter{id}. As we can see from the above three algorithm codes, in the token bucket algorithm, secondKey will be updated to the latest time in the Lua code, so it doesn't matter what value is passed in. In the concurrent rate limiter, secondKey will be used as the key to remove Redis data in the java callback method. In the sliding window algorithm, the secondKey will be added to the sorted set as the key of a new element, and will be removed during window sliding.

That's all, when in a new rate limiter algorithm, the getKeys(final String id)method should be carefully designed according to the logic of the algorithm.

How to use RateLimiter SPI#

The three parameters in doExecute() method of RateLimiter plugin, exchange is an web request, chain is the execution chain of the plugins,selector is the selection parameters,rule is the policies or rules of rate limiter setting in the system.

protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {    //get  the `RateLimiterHandle` parameters from cache     RateLimiterHandle limiterHandle = RatelimiterRuleHandleCache.getInstance()        .obtainHandle(CacheKeyUtils.INST.getKey(rule));    //find the resolver name     String resolverKey = Optional.ofNullable(limiterHandle.getKeyResolverName())        .flatMap(name -> Optional.of("-" + RateLimiterKeyResolverFactory.newInstance(name).resolve(exchange)))        .orElse("");    return redisRateLimiter.isAllowed(rule.getId() + resolverKey, limiterHandle)        .flatMap(response -> {            if (!response.isAllowed()) {                exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);                Object error = ShenyuResultWrap.error(ShenyuResultEnum.TOO_MANY_REQUESTS.getCode(), ShenyuResultEnum.TOO_MANY_REQUESTS.getMsg(), null);                return WebFluxResultUtils.result(exchange, error);            }            return chain.execute(exchange);        });}
  1. Firstly get the RateLimiterHandle parameters from cache.

  2. Obtains the corresponding Key resolver by RateLimiterHandle instance.

  3. Reactively executes isAllowed() method of RedisRateLimiter.

  4. If not allowed, error handling is performed.

  5. If the request is allowed, dispatch it to the next process of execution chain.


RateLimiter plugin is based on Spring WebFlux,and with Apache Shen SPI, with Redis and Lua script to responsible for the critical algorithm and logic process, make it with characteristic of high concurrency and elastic. As for the RateLimiter SPI.

  1. RateLimiter SPI provides two SPI interface, with interface oriented design and various design patterns, it's easy to develop new rate limiter algorithm and key resolver rule.
  2. RateLimiterAlgorithm SPI supplies four rate limiter algorithms, token bucket,concurrency rate limiter, leaky bucket and sliding window rate limiter. When designing rate limiter algorithm, the KEY generation need to be carefully designed according to the algorithm characteristic. Using Lua script to realize the logic of the algorithm, and design callback() method for asynchronous processing when needed.
  3. Reactive programming, simple and efficient implementation.