Skip to main content

22 posts tagged with "Apache ShenYu"

View All Tags

· 21 min read

The ShenYu gateway uses the divide plugin to handle http requests. You can see the official documentation Quick start with Http to learn how to use this plugin.

This article is based on shenyu-2.4.3 version for source code analysis, please refer to Http Proxy for the introduction of the official website.

1. Register Service

1.1 Declaration of registration interface

Use the annotation @ShenyuSpringMvcClient to register the service to the gateway. The simple demo is as follows.

@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order") // API
public class OrderController {
@GetMapping("/findById")
@ShenyuSpringMvcClient(path = "/findById", desc = "Find by id") // method
public OrderDTO findById(@RequestParam("id") final String id) {
return build(id, "hello world findById");
}
}

define annotation:


@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {

//path
String path() default "";

//rule name
String ruleName() default "";

//desc info
String desc() default "";

//is enabled
boolean enabled() default true;

//register MetaData
boolean registerMetaData() default false;
}

1.2 Scan annotation

Annotation scanning is done through SpringMvcClientBeanPostProcessor, which implements the BeanPostProcessor interface and is a post-processor provided by Spring.

During constructor instantiation.

  • Read the property configuration
  • Add annotations, read path information
  • Start the registry and register with shenyu-admin
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
//...
/**
* Constructor instantiation
*/
public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 1. read Properties
Properties props = clientConfig.getProps();
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH, "");
if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
String errorMsg = "http register param must config the appName or contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// 2. add annotation
mappingAnnotation.add(ShenyuSpringMvcClient.class);
mappingAnnotation.add(PostMapping.class);
mappingAnnotation.add(GetMapping.class);
mappingAnnotation.add(DeleteMapping.class);
mappingAnnotation.add(PutMapping.class);
mappingAnnotation.add(RequestMapping.class);
// 3. start register cneter
publisher.start(shenyuClientRegisterRepository);
}

@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// override post process

return bean;
}

  • SpringMvcClientBeanPostProcessor#postProcessAfterInitialization()

Rewrite post-processor logic: read annotation information, construct metadata objects and URI objects, and register them with shenyu-admin.

    @Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// 1. If the all service is registered or is not a Controller class, it is not handled
if (Boolean.TRUE.equals(isFull) || !hasAnnotation(bean.getClass(), Controller.class)) {
return bean;
}
// 2. Read the annotations on the class ShenyuSpringMvcClient
final ShenyuSpringMvcClient beanShenyuClient = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);
// 2.1 build superPath
final String superPath = buildApiSuperPath(bean.getClass());
// 2.2 whether to register the entire class method
if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {
// build the metadata object and register it with shenyu-admin
publisher.publishEvent(buildMetaDataDTO(beanShenyuClient, pathJoin(contextPath, superPath)));
return bean;
}
// 3. read all methods
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
for (Method method : methods) {
// 3.1 read the annotations on the method ShenyuSpringMvcClient
ShenyuSpringMvcClient methodShenyuClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
// If there is no annotation on the method, use the annotation on the class
methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;
if (Objects.nonNull(methodShenyuClient)) {
// 3.2 Build path information, build metadata objects, register with shenyu-admin
publisher.publishEvent(buildMetaDataDTO(methodShenyuClient, buildApiPath(method, superPath)));
}
}

return bean;
}
    1. If you are registering the whole service or not Controller class, do not handle it
    1. read the annotation on the class ShenyuSpringMvcClient, if the whole class is registered, build the metadata object here and register it with shenyu-admin.
    1. Annotation on the handler method ShenyuSpringMvcClient, build path information for the specific method, build the metadata object and then register it with shenyu-admin

There are two methods here that take path and need special instructions.

  • buildApiSuperPath()

Construct SuperPath: first take the path property from the annotation ShenyuSpringMvcClient on the class, if not, take the path information from the RequestMapping annotation on the current class.

    private String buildApiSuperPath(@NonNull final Class<?> method) {
// First take the path property from the annotation ShenyuSpringMvcClient on the class
ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {
return shenyuSpringMvcClient.path();
}
// Take the path information from the RequestMapping annotation of the current class
RequestMapping requestMapping = AnnotationUtils.findAnnotation(method, RequestMapping.class);
if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {
return requestMapping.path()[0];
}
return "";
}
  • buildApiPath()

Build path: first read the annotation ShenyuSpringMvcClient on the method and build it if it exists; otherwise get the path information from other annotations on the method; complete path = contextPath(context information) + superPath(class information) + methodPath(method information).

    private String buildApiPath(@NonNull final Method method, @NonNull final String superPath) {
// 1. Read the annotation ShenyuSpringMvcClient on the method
ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
// 1.1 If path exists, build
if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {
//1.2 path = contextPath+superPath+methodPath
return pathJoin(contextPath, superPath, shenyuSpringMvcClient.path());
}
// 2. Get path information from other annotations on the method
final String path = getPathByMethod(method);
if (StringUtils.isNotBlank(path)) {
// 2.1 path = contextPath+superPath+methodPath
return pathJoin(contextPath, superPath, path);
}
return pathJoin(contextPath, superPath);
}
  • getPathByMethod()

Get path information from other annotations on the method, other annotations include.

  • ShenyuSpringMvcClient
  • PostMapping
  • GetMapping
  • DeleteMapping
  • PutMapping
  • RequestMapping

private String getPathByMethod(@NonNull final Method method) {
// Iterate through interface annotations to get path information
for (Class<? extends Annotation> mapping : mappingAnnotation) {
final String pathByAnnotation = getPathByAnnotation(AnnotationUtils.findAnnotation(method, mapping), pathAttributeNames);
if (StringUtils.isNotBlank(pathByAnnotation)) {
return pathByAnnotation;
}
}
return null;
}

After the scanning annotation is finished, construct the metadata object and send the object to shenyu-admin to complete the registration.

  • Metadata

Includes the rule information of the currently registered method: contextPath, appName, registration path, description information, registration type, whether it is enabled, rule name and whether to register metadata.

 private MetaDataRegisterDTO buildMetaDataDTO(@NonNull final ShenyuSpringMvcClient shenyuSpringMvcClient, final String path) {
return MetaDataRegisterDTO.builder()
.contextPath(contextPath) // contextPath
.appName(appName) // appName
.path(path) // Registered path, used when gateway rules match
.pathDesc(shenyuSpringMvcClient.desc()) // desc info
.rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, http type when default
.enabled(shenyuSpringMvcClient.enabled()) // is enabled?
.ruleName(StringUtils.defaultIfBlank(shenyuSpringMvcClient.ruleName(), path))//rule name
.registerMetaData(shenyuSpringMvcClient.registerMetaData()) // whether to register metadata information
.build();
}

The specific registration logic is implemented by the registration center, which has been analyzed in the previous articles and will not be analyzed in depth here.

1.3 Register URI Data

ContextRegisterListener is responsible for registering the client's URI information to shenyu-admin, it implements the ApplicationListener interface, when the context refresh event ContextRefreshedEvent occurs, the onApplicationEvent() method is executed to implement the registration logic.


public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent>, BeanFactoryAware {
//......

/**
* Constructor instantiation
*/
public ContextRegisterListener(final PropertiesConfig clientConfig) {
// read Properties
final Properties props = clientConfig.getProps();
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
if (Boolean.TRUE.equals(isFull)) {
if (StringUtils.isBlank(contextPath)) {
final String errorMsg = "http register param must config the contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
}
this.port = Integer.parseInt(Optional.ofNullable(props.getProperty(ShenyuClientConstants.PORT)).orElseGet(() -> "-1"));
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);
this.host = props.getProperty(ShenyuClientConstants.HOST);
}

@Override
public void setBeanFactory(final BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

// Execute application events
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {
// The method is guaranteed to be executed once
if (!registered.compareAndSet(false, true)) {
return;
}
// 1. If you are registering for the entire service
if (Boolean.TRUE.equals(isFull)) {
// Build metadata and register
publisher.publishEvent(buildMetaDataDTO());
}
try {
// get port
final int mergedPort = port <= 0 ? PortUtils.findPort(beanFactory) : port;
// 2. Constructing URI data and registering
publisher.publishEvent(buildURIRegisterDTO(mergedPort));
} catch (ShenyuException e) {
throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");
}
}

// build URI data
private URIRegisterDTO buildURIRegisterDTO(final int port) {
return URIRegisterDTO.builder()
.contextPath(this.contextPath) // contextPath
.appName(appName) // appName
.protocol(protocol) // protocol
.host(IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host)) //host
.port(port) // port
.rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, default registration http type
.build();
}

// build MetaData
private MetaDataRegisterDTO buildMetaDataDTO() {
return MetaDataRegisterDTO.builder()
.contextPath(contextPath)
.appName(appName)
.path(contextPath)
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(contextPath)
.build();
}
}

1.4 Handle registration information

The metadata and URI data registered by the client through the registry are processed in shenyu-admin, which is responsible for storing to the database and synchronizing to the shenyu gateway. The client registration processing logic of Divide plugin is in ShenyuClientRegisterDivideServiceImpl. The inheritance relationship is as follows.

  • ShenyuClientRegisterService: client registration service, top-level interface.
  • FallbackShenyuClientRegisterService: registration failure, provides retry operation.
  • AbstractShenyuClientRegisterServiceImpl: abstract class, implements part of the public registration logic;
  • AbstractContextPathRegisterService: abstract class, responsible for registering ContextPath.
  • ShenyuClientRegisterDivideServiceImpl: implementation of the Divide plug-in registration.
1.4.1 Register Service
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

The metadata MetaDataRegisterDTO object registered by the client through the registry is picked up and dropped in the register() method of shenyu-admin.

   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
//2. register rule
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
//3. register metadat
registerMetadata(dto);
//4. register ContextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
1.4.1.1 Register Selector
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

Build contextPath, find if the selector information exists, if it does, return id; if it doesn't, create the default selector information.

    @Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
// build contextPath
String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
// Find if selector information exists by name
SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
if (Objects.isNull(selectorDO)) {
// Create a default selector message if it does not exist
return registerSelector(contextPath, pluginName, selectorHandler);
}
return selectorDO.getId();
}
  • Default Selector Information

Construct the default selector information and its conditional properties here.

   //register selector
private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {
// build selector
SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
selectorDTO.setHandle(selectorHandler);
//register default Selector
return registerDefault(selectorDTO);
}
//build
private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {
//build default
SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);
selectorDTO.setPluginId(pluginId);
//build the conditional properties of the default selector
selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));
return selectorDTO;
}
  • Build Default Selector
private SelectorDTO buildDefaultSelectorDTO(final String name) {
return SelectorDTO.builder()
.name(name) // name
.type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // default CUSTOM_FLOW
.matchMode(MatchModeEnum.AND.getCode()) //default AND
.enabled(Boolean.TRUE) //default TRUE
.loged(Boolean.TRUE) //default TRUE
.continued(Boolean.TRUE) //default TRUE
.sort(1) //default 1
.build();
}


  • Build default selector conditional properties
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {
SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();
selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI
selectorConditionDTO.setParamName("/");
selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default match
selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // default /contextPath/**
return Collections.singletonList(selectorConditionDTO);
}
  • Register default selector
@Override
public String registerDefault(final SelectorDTO selectorDTO) {
//selector info
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
//selector condition info
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
if (StringUtils.isEmpty(selectorDTO.getId())) {
// insert selector information into the database
selectorMapper.insertSelective(selectorDO);
// insert selector condition information into the database
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
}
// Publish synchronization events to synchronize selection information and its conditional attributes to the gateway
publishEvent(selectorDO, selectorConditionDTOs);
return selectorDO.getId();
}
1.4.1.2 Register Rule

In the second step of registering the service, start building the default rules and then register the rules.

@Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
//......

//2. register rule
// default rule handle
String ruleHandler = ruleHandler();
// build default rule
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
// register rule
ruleService.registerDefault(ruleDTO);

//3. register Metadata
//......

//4. register ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • default rule handle
    @Override
protected String ruleHandler() {
// default rule handle
return new DivideRuleHandle().toJson();
}

Divide plugin default rule handle.


public class DivideRuleHandle implements RuleHandle {

/**
* load balance: default RANDOM
*/
private String loadBalance = LoadBalanceEnum.RANDOM.getName();

/**
* retry strategy: default CURRENT
*/
private String retryStrategy = RetryEnum.CURRENT.getName();

/**
* retry: default 3
*/
private int retry = 3;

/**
* retry: default 3000
*/
private long timeout = Constants.TIME_OUT;

/**
* retry: default 10240 byte
*/
private long headerMaxSize = Constants.HEADER_MAX_SIZE;

/**
* retry: default 102400 byte
*/
private long requestMaxSize = Constants.REQUEST_MAX_SIZE;
}
  • build default rule info
  // build default rule info
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}
// build default rule info
private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
RuleDTO ruleDTO = RuleDTO.builder()
.selectorId(selectorId) //selector Id
.name(ruleName) //rule Name
.matchMode(MatchModeEnum.AND.getCode()) // default and
.enabled(Boolean.TRUE) // default TRUE
.loged(Boolean.TRUE) //default TRUE
.sort(1) //default 1
.handle(ruleHandler)
.build();
RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
.paramType(ParamTypeEnum.URI.getName()) // default URI
.paramName("/")
.paramValue(path) // path
.build();
if (path.indexOf("*") > 1) {
ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //if the path conatins *, default match
} else {
ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // default =
}
ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
return ruleDTO;
}
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

Registration rules: insert records to the database and publish events to the gateway for data synchronization.


@Override
public String registerDefault(final RuleDTO ruleDTO) {
RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());
if (Objects.nonNull(exist)) {
return "";
}

RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
if (StringUtils.isEmpty(ruleDTO.getId())) {
// insert rule into database
ruleMapper.insertSelective(ruleDO);
//insert rule condition into database
ruleConditions.forEach(ruleConditionDTO -> {
ruleConditionDTO.setRuleId(ruleDO.getId());
ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));
});
}
// Publish events to the gateway for data synchronization
publishEvent(ruleDO, ruleConditions);
return ruleDO.getId();
}

1.4.1.3 Register Metadata
   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
//......

//2. register rule
//......

//3. register metadata
registerMetadata(dto);

//4. register ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDivideServiceImpl#registerMetadata()

Insert or update metadata and then publish sync events to the gateway.


@Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {
if (dto.isRegisterMetaData()) {
MetaDataService metaDataService = getMetaDataService();
MetaDataDO exist = metaDataService.findByPath(dto.getPath());
// save or update MetaData
metaDataService.saveOrUpdateMetaData(exist, dto);
}
}

@Override
public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {
DataEventTypeEnum eventType;
// DTO->DO
MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);
// insert
if (Objects.isNull(exist)) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());
metaDataDO.setDateCreated(currentTime);
metaDataDO.setDateUpdated(currentTime);
metaDataMapper.insert(metaDataDO);
eventType = DataEventTypeEnum.CREATE;
} else {
// update
metaDataDO.setId(exist.getId());
metaDataMapper.update(metaDataDO);
eventType = DataEventTypeEnum.UPDATE;
}
// publish event to gateway
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,
Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
}
1.4.1.4 Register ContextPath
   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
//......

//2. register rule
//......

//3. register metadata
//......

//4. register ContextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
  • org.apache.shenyu.admin.service.register.AbstractContextPathRegisterService#registerContextPath()
    @Override
public void registerContextPath(final MetaDataRegisterDTO dto) {
// set contextPath for selector
String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");
ContextMappingRuleHandle handle = new ContextMappingRuleHandle();
handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));
// set contextPath for rule
getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));
}
1.4.2 Register URI
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

The server side receives the URI information registered by the client and processes it.

    @Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
String result;
String key = key(selectorName);
try {
this.removeFallBack(key);
// register URI
result = this.doRegisterURI(selectorName, uriList);
logger.info("Register success: {},{}", selectorName, uriList);
} catch (Exception ex) {
logger.warn("Register exception: cause:{}", ex.getMessage());
result = "";
// Retry after registration failure
this.addFallback(key, new FallbackHolder(selectorName, uriList));
}
return result;
}
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

Get a valid URI from the URI registered by the client, update the corresponding selector handle property, and send a selector update event to the gateway.

@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
//check
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
//get selector
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
throw new ShenyuException("doRegister Failed to execute,wait to retry.");
}
// gte valid URI
List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
// build handle
String handler = buildHandle(validUriList, selectorDO);
if (handler != null) {
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// Update the handle property of the selector to the database
selectorService.updateSelective(selectorDO);
// Send selector update events to the gateway
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
}
return ShenyuResultMessage.SUCCESS;
}

The source code analysis on service registration is completed as well as the analysis flow chart is as follows.

The next step is to analyze how the divide plugin initiates a call to the http service based on this information.

2. Call Http Service

The divide plugin is the core processing plugin used by the gateway to handle http protocol requests.

Take the case provided on the official website Quick start with Http as an example, a direct connection request is as follows.

GET http://localhost:8189/order/findById?id=100
Accept: application/json

After proxying through the ShenYu gateway, the request is as follows.

GET http://localhost:9195/http/order/findById?id=100
Accept: application/json

The services proxied by the ShenYu gateway are still able to request the previous services, where the divide plugin comes into play. The class inheritance relationship is as follows.

  • ShenyuPlugin: top-level interface, defining interface methods.
  • AbstractShenyuPlugin: abstract class that implements the common logic of the pluin.
  • DividePlugin: Divide pluin.

2.1 Accept Request

After passing the ShenYu gateway proxy, the request entry is ShenyuWebHandler, which implements the org.springframework.web.server.WebHandler interface.

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {
//......

/**
* hanlde web reuest
*/
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
// execute plugin chain
Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);
if (scheduled) {
return execute.subscribeOn(scheduler);
}
return execute;
}

private static class DefaultShenyuPluginChain implements ShenyuPluginChain {

private int index;

private final List<ShenyuPlugin> plugins;

/**
* Instantiating the default plugin chain
*/
DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {
this.plugins = plugins;
}

/**
* Execute each plugin
*/
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
// get current plugin
ShenyuPlugin plugin = plugins.get(this.index++);
// is skip ?
boolean skip = plugin.skip(exchange);
if (skip) {
// If skipped, execute the next
return this.execute(exchange);
}
// execute current plugin
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
}

2.2 Matching rule

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

Execute the matching logic for selectors and rules in the execute() method.

  • Matching selectors.
  • Matching rules.
  • Execute the plugin.
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
String pluginName = named();
PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
// selector
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
// match selector
SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
// rule
List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
// match rule
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
// execute
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}

2.3 Execute Divide Plugin

  • org.apache.shenyu.plugin.divide.DividePlugin#doExecute()

Execute the specific logic of the divide plugin in the doExecute() method.

  • Checks the header size.
  • Checking the request size.
  • Obtaining the list of services.
  • implementing load balancing.
  • Set request url, timeout time, retry policy.
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
// shenyu Context
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// Get the handle property of the rule
DivideRuleHandle ruleHandle = DividePluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));
long headerSize = 0;
// check header size
for (List<String> multiHeader : exchange.getRequest().getHeaders().values()) {
for (String value : multiHeader) {
headerSize += value.getBytes(StandardCharsets.UTF_8).length;
}
}
if (headerSize > ruleHandle.getHeaderMaxSize()) {
LOG.error("request header is too large");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE, null);
return WebFluxResultUtils.result(exchange, error);
}

// check request size
if (exchange.getRequest().getHeaders().getContentLength() > ruleHandle.getRequestMaxSize()) {
LOG.error("request entity is too large");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE, null);
return WebFluxResultUtils.result(exchange, error);
}
// upstream list
List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
if (CollectionUtils.isEmpty(upstreamList)) {
LOG.error("divide upstream configuration error: {}", rule);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// request ip
String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// load balance
Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(upstream)) {
LOG.error("divide has no upstream");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// set url
String domain = upstream.buildDomain();
exchange.getAttributes().put(Constants.HTTP_DOMAIN, domain);
// set timeout
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
// set retry
exchange.getAttributes().put(Constants.RETRY_STRATEGY, ruleHandle.getRetryStrategy());
exchange.getAttributes().put(Constants.LOAD_BALANCE, ruleHandle.getLoadBalance());
exchange.getAttributes().put(Constants.DIVIDE_SELECTOR_ID, selector.getId());
return chain.execute(exchange);
}

2.4 Do Request

By default, the WebClientPlugin initiates a call request to the http service with the following class inheritance relationship.

  • ShenyuPlugin: top-level plug-in, defining plug-in methods.
  • AbstractHttpClientPlugin: abstract class that implements the public logic of request invocation.
  • WebClientPlugin: initiating requests through WebClient.
  • NettyHttpClientPlugin: initiating requests through Netty.

Initiate the request call.

  • org.apache.shenyu.plugin.httpclient.AbstractHttpClientPlugin#execute()

Initiate the request call in the execute() method.

  • Get the specified timeout, number of retries
  • Initiate the request
  • Retry after failure according to the specified retry policy

public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin {

protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class);

@Override
public final Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// shenyu Context
final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// uri
final URI uri = exchange.getAttribute(Constants.HTTP_URI);
if (Objects.isNull(uri)) {
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// get time out
final long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
final Duration duration = Duration.ofMillis(timeout);
// get retry times
final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
// get retry strategy
final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName);
LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy is {}", uri.toASCIIString(), retryTimes, retryStrategy);
// build header
final HttpHeaders httpHeaders = buildHttpHeaders(exchange);
// do request
final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethodValue(), uri, httpHeaders, exchange.getRequest().getBody())
.timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
.doOnError(e -> LOG.error(e.getMessage(), e));

// Retry Policy CURRENT, retries the current service.
if (RetryEnum.CURRENT.getName().equals(retryStrategy)) {
//old version of DividePlugin and SpringCloudPlugin will run on this
return response.retryWhen(Retry.anyOf(TimeoutException.class, ConnectTimeoutException.class, ReadTimeoutException.class, IllegalStateException.class)
.retryMax(retryTimes)
.backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
.onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
.flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));
}

// Retry for other services
// Exclude services that have already been called
final Set<URI> exclude = Sets.newHashSet(uri);
// resend
return resend(response, exchange, duration, httpHeaders, exclude, retryTimes)
.onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
.flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));
}

private Mono<R> resend(final Mono<R> clientResponse,
final ServerWebExchange exchange,
final Duration duration,
final HttpHeaders httpHeaders,
final Set<URI> exclude,
final int retryTimes) {
Mono<R> result = clientResponse;
// Retry according to the specified number of retries
for (int i = 0; i < retryTimes; i++) {
result = resend(result, exchange, duration, httpHeaders, exclude);
}
return result;
}

private Mono<R> resend(final Mono<R> response,
final ServerWebExchange exchange,
final Duration duration,
final HttpHeaders httpHeaders,
final Set<URI> exclude) {
return response.onErrorResume(th -> {
final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID);
final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE);
//Check available services
final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId)
.stream().filter(data -> {
final String trimUri = data.getUrl().trim();
for (URI needToExclude : exclude) {
// exclude already called
if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) {
return false;
}
}
return true;
}).collect(Collectors.toList());
if (CollectionUtils.isEmpty(upstreamList)) {
// no need to retry anymore
return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
}
// requets ip
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// Load Balance
final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip);
if (Objects.isNull(upstream)) {
// no need to retry anymore
return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
}
final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain());
// Exclude uri that has already been called
exclude.add(newUri);
// Make another call
return doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, httpHeaders, exchange.getRequest().getBody())
.timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
.doOnError(e -> LOG.error(e.getMessage(), e));
});
}

//......
}

  • org.apache.shenyu.plugin.httpclient.WebClientPlugin#doRequest()

Initiate a real request call via webClient in the doRequest() method.


@Override
protected Mono<ClientResponse> doRequest(final ServerWebExchange exchange, final String httpMethod, final URI uri,
final HttpHeaders httpHeaders, final Flux<DataBuffer> body) {
return webClient.method(HttpMethod.valueOf(httpMethod)).uri(uri) // uri
.headers(headers -> headers.addAll(httpHeaders)) // header
.body(BodyInserters.fromDataBuffers(body))
.exchange() // request
.doOnSuccess(res -> {
if (res.statusCode().is2xxSuccessful()) { // success
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
} else { // error
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
}
exchange.getResponse().setStatusCode(res.statusCode());
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
});
}

2.5 Response Result

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

The response results are handled by the ResponsePlugin plugin.

    @Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// Processing results according to rpc type
return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
}

The processing type is determined by MessageWriter and the class inheritance relationship is as follows.

  • MessageWriter: interface, defining message processing methods.
  • NettyClientMessageWriter: processing of Netty call results.
  • RPCMessageWriter: processing the results of RPC calls.
  • WebClientMessageWriter: processing WebClient call results.

The default is to initiate http requests via WebCient.

  • org.apache.shenyu.plugin.response.strategy.WebClientMessageWriter#writeWith()

Process the response results in the writeWith() method.


@Override
public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
// get response
ServerHttpResponse response = exchange.getResponse();
ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
if (Objects.isNull(clientResponse)) {
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
//cookies and headers
response.getCookies().putAll(clientResponse.cookies());
response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
// image, pdf or stream does not do format processing.
// Handling special response types
if (clientResponse.headers().contentType().isPresent()) {
final String media = clientResponse.headers().contentType().get().toString().toLowerCase();
if (media.matches(COMMON_BIN_MEDIA_TYPE_REGEX)) {
return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()))
.doOnCancel(() -> clean(exchange));
}
}
// Handling general response types
clientResponse = ResponseUtils.buildClientResponse(response, clientResponse.body(BodyExtractors.toDataBuffers()));
return clientResponse.bodyToMono(byte[].class)
.flatMap(originData -> WebFluxResultUtils.result(exchange, originData))
.doOnCancel(() -> clean(exchange));
}));
}

Analysis to this point, the source code analysis on Divide plugin is complete, the analysis flow chart is as follows.

3. Summary

The source code analysis in this article starts from the http service registration to the divide plugin service calls. The divide plugin is mainly used to handle http requests. Some of the source code does not enter the in-depth analysis, such as the implementation of load balancing, service probe live, will continue to analyze in the following.

· 22 min read

Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.

The Apache ShenYu gateway uses the dubbo plugin to make calls to the dubbo service. You can see the official documentation Dubbo Quick Start to learn how to use the plugin.

This article is based on shenyu-2.4.3 version for source code analysis, please refer to Dubbo Service Access for the introduction of the official website.

1. Service Registration

Take the example provided on the official website shenyu-examples-dubbo. Suppose your dubbo service is defined as follows (spring-dubbo.xml).

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
https://code.alibabatech.com/schema/dubbo/dubbo.xsd">

<dubbo:application name="test-dubbo-service"/>
<dubbo:registry address="${dubbo.registry.address}"/>
<dubbo:protocol name="dubbo" port="20888"/>

<dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>

</beans>

Declare the application service name, register the center address, use the dubbo protocol, declare the service interface, and the corresponding interface implementation class.

/**
* DubboTestServiceImpl.
*/
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {

@Override
@ShenyuDubboClient(path = "/findById", desc = "Query by Id")
public DubboTest findById(final String id) {
return new DubboTest(id, "hello world shenyu Apache, findById");
}

//......
}

In the interface implementation class, use the annotation @ShenyuDubboClient to register the service with shenyu-admin. The role of this annotation and its rationale will be analyzed later.

The configuration information in the configuration file application.yml.

server:
port: 8011
address: 0.0.0.0
servlet:
context-path: /
spring:
main:
allow-bean-definition-overriding: true
dubbo:
registry:
address: zookeeper://localhost:2181 # dubbo registry

shenyu:
register:
registerType: http
serverLists: http://localhost:9095
props:
username: admin
password: 123456
client:
dubbo:
props:
contextPath: /dubbo
appName: dubbo

In the configuration file, declare the registry address used by dubbo. The dubbo service registers with shenyu-admin, using the method http, and the registration address is http://localhost:9095.

See Application Client Access for more information on the use of the registration method.

1.1 Declaration of registration interface

Use the annotation @ShenyuDubboClient to register the service to the gateway. The simple demo is as follows.

// dubbo sevice
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {

@Override
@ShenyuDubboClient(path = "/findById", desc = "Query by Id") // need to be registered method
public DubboTest findById(final String id) {
return new DubboTest(id, "hello world shenyu Apache, findById");
}

//......
}

annotation definition:

/**
* Works on classes and methods
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Inherited
public @interface ShenyuDubboClient {

//path
String path();

//rule name
String ruleName() default "";

//desc
String desc() default "";

//enabled
boolean enabled() default true;
}

1.2 Scan annotation information

Annotation scanning is done through the ApacheDubboServiceBeanListener, which implements the ApplicationListener<ContextRefreshedEvent> interface and starts executing the event handler method when a context refresh event occurs during the Spring container startup onApplicationEvent().

During constructor instantiation.

  • Read property configuration
  • Start the thread pool
  • Start the registry for registering with shenyu-admin
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {

// ......

//Constructor
public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
//1.Read property configuration
Properties props = clientConfig.getProps();
String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
String appName = props.getProperty(ShenyuClientConstants.APP_NAME);
if (StringUtils.isBlank(contextPath)) {
throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName");
}
this.contextPath = contextPath;
this.appName = appName;
this.host = props.getProperty(ShenyuClientConstants.HOST);
this.port = props.getProperty(ShenyuClientConstants.PORT);
//2.Start the thread pool
executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build());
//3.Start the registry for registering with `shenyu-admin`
publisher.start(shenyuClientRegisterRepository);
}

/**
* Context refresh event, execute method logic
*/
@Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
//......
}
  • ApacheDubboServiceBeanListener#onApplicationEvent()

Rewritten method logic: read Dubbo service ServiceBean, build metadata object and URI object, and register it with shenyu-admin.

    @Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
//read ServiceBean
Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class);
if (serviceBean.isEmpty()) {
return;
}
//The method is guaranteed to be executed only once
if (!registered.compareAndSet(false, true)) {
return;
}
//handle metadata
for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) {
handler(entry.getValue());
}
//handle URI
serviceBean.values().stream().findFirst().ifPresent(bean -> {
publisher.publishEvent(buildURIRegisterDTO(bean));
});
}
  • handler()

    In the handler() method, read all methods from the serviceBean, determine if there is a ShenyuDubboClient annotation on the method, build a metadata object if it exists, and register the method with shenyu-admin through the registry.

    private void handler(final ServiceBean<?> serviceBean) {
//get proxy
Object refProxy = serviceBean.getRef();
//get class
Class<?> clazz = refProxy.getClass();
if (AopUtils.isAopProxy(refProxy)) {
clazz = AopUtils.getTargetClass(refProxy);
}
//all methods
Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
for (Method method : methods) {
//read ShenyuDubboClient annotation
ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class);
if (Objects.nonNull(shenyuDubboClient)) {
//build meatdata and registry
publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method));
}
}
}
  • buildMetaDataDTO()

    Constructs a metadata object where the necessary information for method registration is constructed and subsequently used for selector or rule matching.

    private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) {
//app name
String appName = buildAppName(serviceBean);
//path
String path = contextPath + shenyuDubboClient.path();
//desc
String desc = shenyuDubboClient.desc();
//service name
String serviceName = serviceBean.getInterface();
//rule name
String configRuleName = shenyuDubboClient.ruleName();
String ruleName = ("".equals(configRuleName)) ? path : configRuleName;
//method name
String methodName = method.getName();
//parameter Types
Class<?>[] parameterTypesClazz = method.getParameterTypes();
String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(","));
return MetaDataRegisterDTO.builder()
.appName(appName)
.serviceName(serviceName)
.methodName(methodName)
.contextPath(contextPath)
.host(buildHost())
.port(buildPort(serviceBean))
.path(path)
.ruleName(ruleName)
.pathDesc(desc)
.parameterTypes(parameterTypes)
.rpcExt(buildRpcExt(serviceBean)) //dubbo ext
.rpcType(RpcTypeEnum.DUBBO.getName())
.enabled(shenyuDubboClient.enabled())
.build();
}
  • buildRpcExt()

    dubbo ext information.

   private String buildRpcExt(final ServiceBean serviceBean) {
DubboRpcExt build = DubboRpcExt.builder()
.group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//group
.version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//version
.loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//load balance
.retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//retry
.timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//time
.sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent
.cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//cluster
.url("")
.build();
return GsonUtils.getInstance().toJson(build);
}
  • buildURIRegisterDTO()

    Construct URI objects to register information about the service itself, which can be subsequently used for service probing live.

private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) {
return URIRegisterDTO.builder()
.contextPath(this.contextPath) //context path
.appName(buildAppName(serviceBean))//app name
.rpcType(RpcTypeEnum.DUBBO.getName())//dubbo
.host(buildHost()) //host
.port(buildPort(serviceBean))//port
.build();
}

The specific registration logic is implemented by the registration center, please refer to Client Access Principles .

//To the registration center, post registration events   
publisher.publishEvent();

1.3 Processing registration information

The metadata and URI data registered by the client through the registry are processed at the shenyu-admin end, which is responsible for storing to the database and synchronizing to the shenyu gateway. The client-side registration processing logic of the Dubbo plugin is in the ShenyuClientRegisterDubboServiceImpl. The inheritance relationship is as follows.

  • ShenyuClientRegisterService: client registration service, top-level interface.
  • FallbackShenyuClientRegisterService: registration failure, provides retry operation.
  • AbstractShenyuClientRegisterServiceImpl: abstract class, implements part of the public registration logic.
  • ShenyuClientRegisterDubboServiceImpl: implementation of the Dubbo plugin registration.
1.3.1 Registration Service
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    The metadata MetaDataRegisterDTO object registered by the client through the registry is picked up and dropped in the register() method of shenyu-admin.

   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
//2. register rule
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
//3. register metadata
registerMetadata(dto);
//4. register contextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
1.3.1.1 Register Selector
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

Construct contextPath, find if the selector information exists, if it does, return id; if it doesn't, create the default selector information.

    @Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
// build contextPath
String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
// Find if selector information exists by name
SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
if (Objects.isNull(selectorDO)) {
// Create a default selector message if it does not exist
return registerSelector(contextPath, pluginName, selectorHandler);
}
return selectorDO.getId();
}
  • Default selector information

    Construct the default selector information and its conditional properties here.

   //register selector
private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {
//build selector
SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
selectorDTO.setHandle(selectorHandler);
//register default selector
return registerDefault(selectorDTO);
}
//build selector
private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {
//build default
SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);
selectorDTO.setPluginId(pluginId);
//build the conditional properties of the default selector
selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));
return selectorDTO;
}
  • Build default selector
private SelectorDTO buildDefaultSelectorDTO(final String name) {
return SelectorDTO.builder()
.name(name) // name
.type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // default type cutom
.matchMode(MatchModeEnum.AND.getCode()) //default match mode
.enabled(Boolean.TRUE) //enable
.loged(Boolean.TRUE) //log
.continued(Boolean.TRUE)
.sort(1)
.build();
}
  • Build default selector conditional properties
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {
SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();
selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI
selectorConditionDTO.setParamName("/");
selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default match
selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX);
return Collections.singletonList(selectorConditionDTO);
}
  • Register default selector
@Override
public String registerDefault(final SelectorDTO selectorDTO) {
//selector information
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
//selector conditional properties
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
if (StringUtils.isEmpty(selectorDTO.getId())) {
// insert selector information into the database
selectorMapper.insertSelective(selectorDO);
// inserting selector conditional properties to the database
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
}
// Publish synchronization events to synchronize selection information and its conditional attributes to the gateway
publishEvent(selectorDO, selectorConditionDTOs);
return selectorDO.getId();
}
1.3.1.2 Registration Rules

In the second step of registering the service, start building the default rules and then register the rules.

@Override
public String register(final MetaDataRegisterDTO dto) {
//1. handle selector
//......

//2. handle rule

String ruleHandler = ruleHandler();
// build default rule
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
// register rule
ruleService.registerDefault(ruleDTO);

//3. reigster metadata
//......

//4. register ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • 默认规则处理属性
    @Override
protected String ruleHandler() {
// default rule
return new DubboRuleHandle().toJson();
}

Dubbo plugin default rule handling properties.

public class DubboRuleHandle implements RuleHandle {

/**
* dubbo version.
*/
private String version;

/**
* group.
*/
private String group;

/**
* retry.
*/
private Integer retries = 0;

/**
* loadbalance:RANDOM
*/
private String loadbalance = LoadBalanceEnum.RANDOM.getName();

/**
* timeout default 3000
*/
private long timeout = Constants.TIME_OUT;
}
  • build default rule
  // build default rule
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}
// build default rule
private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
RuleDTO ruleDTO = RuleDTO.builder()
.selectorId(selectorId)
.name(ruleName)
.matchMode(MatchModeEnum.AND.getCode())
.enabled(Boolean.TRUE)
.loged(Boolean.TRUE)
.sort(1)
.handle(ruleHandler)
.build();
RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
.paramType(ParamTypeEnum.URI.getName())
.paramName("/")
.paramValue(path)
.build();
if (path.indexOf("*") > 1) {
ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias());
} else {
ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias());
}
ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
return ruleDTO;
}
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

Registration rules: insert records to the database and publish events to the gateway for data synchronization.


@Override
public String registerDefault(final RuleDTO ruleDTO) {
RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());
if (Objects.nonNull(exist)) {
return "";
}

RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
if (StringUtils.isEmpty(ruleDTO.getId())) {
// insert rule information into the database
ruleMapper.insertSelective(ruleDO);
//insert rule body conditional attributes into the database
ruleConditions.forEach(ruleConditionDTO -> {
ruleConditionDTO.setRuleId(ruleDO.getId());
ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));
});
}
// Publish events to the gateway for data synchronization
publishEvent(ruleDO, ruleConditions);
return ruleDO.getId();
}

1.3.1.3 Register Metadata

Metadata is mainly used for RPC service calls.

   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
//......

//2. register rule
//......

//3. register metadata
registerMetadata(dto);

//4. register ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDubboServiceImpl#registerMetadata()

    Insert or update metadata and then publish sync events to the gateway.

    @Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {
// get metaDataService
MetaDataService metaDataService = getMetaDataService();
MetaDataDO exist = metaDataService.findByPath(dto.getPath());
//insert or update metadata
metaDataService.saveOrUpdateMetaData(exist, dto);
}

@Override
public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {
DataEventTypeEnum eventType;
// DTO->DO
MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);
// insert data
if (Objects.isNull(exist)) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());
metaDataDO.setDateCreated(currentTime);
metaDataDO.setDateUpdated(currentTime);
metaDataMapper.insert(metaDataDO);
eventType = DataEventTypeEnum.CREATE;
} else {
// update
metaDataDO.setId(exist.getId());
metaDataMapper.update(metaDataDO);
eventType = DataEventTypeEnum.UPDATE;
}
// Publish sync events to gateway
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,
Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
}
1.3.2 Register URI
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

The server side receives the URI information registered by the client and processes it.

    @Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
String result;
String key = key(selectorName);
try {
this.removeFallBack(key);
// register URI
result = this.doRegisterURI(selectorName, uriList);
logger.info("Register success: {},{}", selectorName, uriList);
} catch (Exception ex) {
logger.warn("Register exception: cause:{}", ex.getMessage());
result = "";
// Retry after registration failure
this.addFallback(key, new FallbackHolder(selectorName, uriList));
}
return result;
}
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

Get a valid URI from the URI registered by the client, update the corresponding selector handle property, and send a selector update event to the gateway.

@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
//check
if (CollectionUtils.isEmpty(uriList)) {
return "";
}

SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
throw new ShenyuException("doRegister Failed to execute,wait to retry.");
}
// gte valid URI
List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
// build handle
String handler = buildHandle(validUriList, selectorDO);
if (handler != null) {
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// Update the handle property of the selector to the database
selectorService.updateSelective(selectorDO);
// Send selector update events to the gateway
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
}
return ShenyuResultMessage.SUCCESS;
}

The source code analysis on service registration is completed as well as the analysis flow chart is as follows.

The next step is to analyze how the dubbo plugin initiates calls to the http service based on this information.

2. Service Invocation

The dubbo plugin is the core processing plugin used by the ShenYu gateway to convert http requests into the dubbo protocol and invoke the dubbo service.

Take the case provided by the official website Quick Start with Dubbo as an example, a dubbo service is registered with shenyu-admin through the registry, and then requested through the ShenYu gateway proxy, the request is as follows.

GET http://localhost:9195/dubbo/findById?id=100
Accept: application/json

The class inheritance relationship in the Dubbo plugin is as follows.

  • ShenyuPlugin: top-level interface, defining interface methods.
  • AbstractShenyuPlugin: abstract class that implements plugin common logic.
  • AbstractDubboPlugin: dubbo plugin abstract class, implementing dubbo common logic.
  • ApacheDubboPlugin: ApacheDubbo plugin.

ShenYu Gateway supports ApacheDubbo and AlibabaDubbo\

2.1 Receive requests

After passing the ShenYu gateway proxy, the request entry is ShenyuWebHandler, which implements the org.springframework.web.server.WebHandler interface.

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {
//......

/**
* hanlde request
*/
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
// execute default plugin chain
Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);
if (scheduled) {
return execute.subscribeOn(scheduler);
}
return execute;
}

private static class DefaultShenyuPluginChain implements ShenyuPluginChain {

private int index;

private final List<ShenyuPlugin> plugins;


DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {
this.plugins = plugins;
}

/**
* execute.
*/
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
// get plugin
ShenyuPlugin plugin = plugins.get(this.index++);
boolean skip = plugin.skip(exchange);
if (skip) {
// next
return this.execute(exchange);
}
// execute
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
}

2.2 Match Rule

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

Execute the matching logic for selectors and rules in the execute() method.

  • Matching selectors.
  • Matching rules.
  • Execute the plugin.
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// plugin name
String pluginName = named();
// plugin data
PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
// selector data
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
// match selector
SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
// rule data
List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
// match rule
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
// execute
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}

2.3 Execute GlobalPlugin

  • org.apache.shenyu.plugin.global.GlobalPlugin#execute()

GlobalPlugin is a global plugin that constructs contextual information in the execute() method.

public class GlobalPlugin implements ShenyuPlugin {
// shenyu context
private final ShenyuContextBuilder builder;

//......

@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// build context information to be passed into the exchange
ShenyuContext shenyuContext = builder.build(exchange);
exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
return chain.execute(exchange);
}

//......
}
  • org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()

Build the default context information.

public class DefaultShenyuContextBuilder implements ShenyuContextBuilder {
//......

@Override
public ShenyuContext build(final ServerWebExchange exchange) {
//build data
Pair<String, MetaData> buildData = buildData(exchange);
//wrap ShenyuContext
return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());
}

private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {
//......
//get the metadata according to the requested uri
MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());
if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {
exchange.getAttributes().put(Constants.META_DATA, metaData);
return Pair.of(metaData.getRpcType(), metaData);
} else {
return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());
}
}
//set the default context information
private ShenyuContext buildDefaultContext(final ServerHttpRequest request) {
String appKey = request.getHeaders().getFirst(Constants.APP_KEY);
String sign = request.getHeaders().getFirst(Constants.SIGN);
String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);
ShenyuContext shenyuContext = new ShenyuContext();
String path = request.getURI().getPath();
shenyuContext.setPath(path);
shenyuContext.setAppKey(appKey);
shenyuContext.setSign(sign);
shenyuContext.setTimestamp(timestamp);
shenyuContext.setStartDateTime(LocalDateTime.now());
Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name()));
return shenyuContext;
}
}
  • org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()

wrap ShenyuContext:

public class DubboShenyuContextDecorator implements ShenyuContextDecorator {

@Override
public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) {
shenyuContext.setModule(metaData.getAppName());
shenyuContext.setMethod(metaData.getServiceName());
shenyuContext.setContextPath(metaData.getContextPath());
shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName());
return shenyuContext;
}

@Override
public String rpcType() {
return RpcTypeEnum.DUBBO.getName();
}
}

2.4 Execute RpcParamTransformPlugin

The RpcParamTransformPlugin is responsible for reading the parameters from the http request, saving them in the exchange and passing them to the rpc service.

  • org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()

In the execute() method, the core logic of the plugin is executed: get the request information from exchange and process the parameters according to the form of content passed in by the request.

public class RpcParamTransformPlugin implements ShenyuPlugin {

@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
//get request information from exchange
ServerHttpRequest request = exchange.getRequest();
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
if (Objects.nonNull(shenyuContext)) {
// APPLICATION_JSON
MediaType mediaType = request.getHeaders().getContentType();
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
return body(exchange, request, chain);
}
// APPLICATION_FORM_URLENCODED
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
return formData(exchange, request, chain);
}
//query
return query(exchange, request, chain);
}
return chain.execute(exchange);
}

//APPLICATION_JSON
private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())
.flatMap(body -> {
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中
return chain.execute(exchange);
}));
}
// APPLICATION_FORM_URLENCODED
private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())
.flatMap(map -> {
String param = resolveBodyFromRequest(map);
LinkedMultiValueMap<String, String> linkedMultiValueMap;
try {
linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据
} catch (UnsupportedEncodingException e) {
return Mono.error(e);
}
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中
return chain.execute(exchange);
}));
}
//query
private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中
return chain.execute(exchange);
}
//......
}

2.5 Execute DubboPlugin

  • org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()

In the doExecute() method, the main purpose is to check the metadata and parameters.

public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin {

@Override
public Mono<Void> doExecute(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule) {
//param
String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);
//context
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
//metaData
MetaData metaData = exchange.getAttribute(Constants.META_DATA);
//check metaData
if (!checkMetaData(metaData)) {
LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
//check
if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null);
return WebFluxResultUtils.result(exchange, error);
}
//set rpcContext
this.rpcContext(exchange);
//dubbo invoke
return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);
}
}
  • org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()

Set special context information in the doDubboInvoker() method, and then start the dubbo generalization call.

public class ApacheDubboPlugin extends AbstractDubboPlugin {

@Override
protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule,
final MetaData metaData,
final String param) {
//set the current selector and rule information, and request address for dubbo graying support
RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());
RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());
RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());
//dubbo generic invoker
final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);
//execute next plugin in chain
return result.then(chain.execute(exchange));
}
}
  • org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()

genericInvoker() method.

  • Gets the ReferenceConfig object.
  • Gets the generalization service GenericService object.
  • Constructs the request parameter pair object.
  • Initiates an asynchronous generalization invocation.
public class ApacheDubboProxyService {
//......

/**
* Generic invoker object.
*/
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {
//1.Get the ReferenceConfig object
ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath());

if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {
//Failure of the current cache information
ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());
//Reinitialization with metadata
reference = ApacheDubboConfigCache.getInstance().initRef(metaData);
}
//2.Get the GenericService object of the generalization service
GenericService genericService = reference.get();
//3.Constructing the request parameter pair object
Pair<String[], Object[]> pair;
if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) {
pair = new ImmutablePair<>(new String[]{}, new Object[]{});
} else {
pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
}
//4.Initiating asynchronous generalization calls
return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {
//handle result
if (Objects.isNull(ret)) {
ret = Constants.DUBBO_RPC_RESULT_EMPTY;
}
exchange.getAttributes().put(Constants.RPC_RESULT, ret);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
return ret;
})).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常
}

//Generalized calls, asynchronous operations
private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {
genericService.$invoke(method, parameterTypes, args);
Object resultFromFuture = RpcContext.getContext().getFuture();
return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);
}
}

Calling the dubbo service at the gateway can be achieved by generalizing the call.

The ReferenceConfig object is the key object to support generalization calls , and its initialization operation is done during data synchronization. There are two parts of data involved here, one is the synchronized plugin handler information and the other is the synchronized plugin metadata information.

  • org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()

When the plugin data is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The initialization operation is performed in handlerPlugin().

public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {
//......

//Initializing the configuration cache
protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);

@Override
public void handlerPlugin(final PluginData pluginData) {
if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {
//Data deserialization
DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);
DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);
if (Objects.isNull(dubboRegisterConfig)) {
return;
}
if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {
// Perform initialization operations
this.initConfigCache(dubboRegisterConfig);
}
Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);
}
}
//......
}
  • org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()

Perform initialization operations.

public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {

@Override
protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {
//perform initialization operations
ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);
//cached results before failure
ApacheDubboConfigCache.getInstance().invalidateAll();
}
}

  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()

In the initialization, set registryConfig and consumerConfig.

public final class ApacheDubboConfigCache extends DubboConfigCache {
//......
/**
* init
*/
public void init(final DubboRegisterConfig dubboRegisterConfig) {
//ApplicationConfig
if (Objects.isNull(applicationConfig)) {
applicationConfig = new ApplicationConfig("shenyu_proxy");
}
//When the protocol or address changes, you need to update the registryConfig
if (needUpdateRegistryConfig(dubboRegisterConfig)) {
RegistryConfig registryConfigTemp = new RegistryConfig();
registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol());
registryConfigTemp.setId("shenyu_proxy");
registryConfigTemp.setRegister(false);
registryConfigTemp.setAddress(dubboRegisterConfig.getRegister()); Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup);
registryConfig = registryConfigTemp;
}
//ConsumerConfig
if (Objects.isNull(consumerConfig)) {
consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.refresh();
return consumerConfig;
});

//ConsumerConfig
Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool);
Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads);
Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads);
Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues);
}
}

//Does the registration configuration need to be updated
private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) {
if (Objects.isNull(registryConfig)) {
return true;
}
return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol())
|| !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress())
|| !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol());
}

//......
}
  • org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()

When the metadata is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The metadata update operation is performed in the onSubscribe() method.

public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber {
//local memory cache
private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();

//update metaData
public void onSubscribe(final MetaData metaData) {
// dubbo
if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
//Whether the corresponding metadata exists
MetaData exist = META_DATA.get(metaData.getPath());
if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) {
// initRef
ApacheDubboConfigCache.getInstance().initRef(metaData);
} else {
// The corresponding metadata has undergone an update operation
if (!Objects.equals(metaData.getServiceName(), exist.getServiceName())
|| !Objects.equals(metaData.getRpcExt(), exist.getRpcExt())
|| !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes())
|| !Objects.equals(metaData.getMethodName(), exist.getMethodName())) {
//Build ReferenceConfig again based on the latest metadata
ApacheDubboConfigCache.getInstance().build(metaData);
}
}
//local memory cache
META_DATA.put(metaData.getPath(), metaData);
}
}

//dalete
public void unSubscribe(final MetaData metaData) {
if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
//使ReferenceConfig失效
ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());
META_DATA.remove(metaData.getPath());
}
}
}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()

Build ReferenceConfig objects from metaData.

public final class ApacheDubboConfigCache extends DubboConfigCache {
//......

public ReferenceConfig<GenericService> initRef(final MetaData metaData) {
try {
//First try to get it from the cache, and return it directly if it exists
ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());
if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {
return referenceConfig;
}
} catch (ExecutionException e) {
LOG.error("init dubbo ref exception", e);
}

//build if not exist
return build(metaData);
}

/**
* Build reference config.
*/
@SuppressWarnings("deprecation")
public ReferenceConfig<GenericService> build(final MetaData metaData) {
if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) {
return new ReferenceConfig<>();
}
ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //ReferenceConfig
reference.setGeneric("true"); //generic invoke
reference.setAsync(true);//async

reference.setApplication(applicationConfig);//applicationConfig
reference.setRegistry(registryConfig);//registryConfig
reference.setConsumer(consumerConfig);//consumerConfig
reference.setInterface(metaData.getServiceName());//serviceName
reference.setProtocol("dubbo");//dubbo
reference.setCheck(false);
reference.setLoadbalance("gray");//gray

Map<String, String> parameters = new HashMap<>(2);
parameters.put("dispatcher", "direct");
reference.setParameters(parameters);

String rpcExt = metaData.getRpcExt();//rpc ext param
DubboParam dubboParam = parserToDubboParam(rpcExt);
if (Objects.nonNull(dubboParam)) {
if (StringUtils.isNoneBlank(dubboParam.getVersion())) {
reference.setVersion(dubboParam.getVersion());//version
}
if (StringUtils.isNoneBlank(dubboParam.getGroup())) {
reference.setGroup(dubboParam.getGroup());//group
}
if (StringUtils.isNoneBlank(dubboParam.getUrl())) {
reference.setUrl(dubboParam.getUrl());//url
}
if (StringUtils.isNoneBlank(dubboParam.getCluster())) {
reference.setCluster(dubboParam.getCluster());
}
Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout
Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires
Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent
}
try {
//get GenericService
Object obj = reference.get();
if (Objects.nonNull(obj)) {
LOG.info("init apache dubbo reference success there meteData is :{}", metaData);
//cache reference
cache.put(metaData.getPath(), reference);
}
} catch (Exception e) {
LOG.error("init apache dubbo reference exception", e);
}
return reference;
}
//......
}

2.6 Execute ResponsePlugin

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

The response results are handled by the ResponsePlugin plugin.

    @Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// handle results according to rpc type
return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
}

The processing type is determined by MessageWriter and the class inheritance relationship is as follows.

  • MessageWriter: interface, defining message processing methods.
  • NettyClientMessageWriter: processing of Netty call results.
  • RPCMessageWriter: processing the results of RPC calls.
  • WebClientMessageWriter: processing the results of WebClient calls.

Dubbo service call, the processing result is RPCMessageWriter of course.

  • org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()

Process the response results in the writeWith() method.


public class RPCMessageWriter implements MessageWriter {

@Override
public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
Object result = exchange.getAttribute(Constants.RPC_RESULT); //result
if (Objects.isNull(result)) {
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
return WebFluxResultUtils.result(exchange, result);
}));
}
}

At this point in the analysis, the source code analysis of the Dubbo plugin is complete, and the analysis flow chart is as follows.

3. Summary

The source code analysis in this article starts from Dubbo service registration to Dubbo plug-in service calls. The Dubbo plugin is mainly used to handle the conversion of http requests to the dubbo protocol, and the main logic is implemented through generalized calls.

· 11 min read

In the Shenyu gateway, when you start this plugin, Shenyu becomes a fully-featured McpServer.
You can easily register a service as a tool within the Shenyu gateway by simple configuration and use the extended functions the gateway offers.

This article is based on version shenyu-2.7.0.2. Here, I will track the Shenyu Mcp plugin chain and analyze the source code of its SSE communication.

Introduction

The Shenyu gateway's Mcp plugin is built on top of the spring-ai-mcp extension. To better understand how the Mcp plugin works, I’ll briefly introduce how some official Mcp Java classes collaborate within its JDK.

I want to start by introducing three key official Mcp Java classes:

  1. McpServer
    This class manages resources like tools, Resource, promote, etc.
  2. TransportProvider
    Provides corresponding communication methods based on client-server communication protocols.
  3. Session
    Handles request data, response data, and notifications, offers some basic methods and corresponding handlers, and executes tool queries and calls here.

1. Service Registration

In Shenyu Admin, after filling in endpoint and tool information for the McpServer plugin, this info is automatically registered into Shenyu bootstrap.
You can refer to the official websocket data sync source code for details.

Shenyu bootstrap receives the data synced from admin in the handler() method of McpServerPluginDataHandler.

  • handlerSelector() receives URL data and creates McpServer.
  • handlerRule() receives tool info and registers tools.

These two methods together form the service registration part of the Shenyu Mcp plugin. Below, I will analyze these two methods in detail.

1.1 Transport and McpServer Registration

Let’s analyze the handlerSelector() method, which handles McpServer registration.

  • What handlerSelector() does:
public class McpServerPluginDataHandler implements PluginDataHandler {
@Override
public void handlerSelector(final SelectorData selectorData) {
// Get URI
String uri = selectorData.getConditionList().stream()
.filter(condition -> Constants.URI.equals(condition.getParamType()))
.map(ConditionData::getParamValue)
.findFirst()
.orElse(null);

// Build McpServer
ShenyuMcpServer shenyuMcpServer = GsonUtils.getInstance().fromJson(Objects.isNull(selectorData.getHandle()) ? DEFAULT_MESSAGE_ENDPOINT : selectorData.getHandle(), ShenyuMcpServer.class);
shenyuMcpServer.setPath(path);
// Cache shenyuMcpServer
CACHED_SERVER.get().cachedHandle(
selectorData.getId(),
shenyuMcpServer);
String messageEndpoint = shenyuMcpServer.getMessageEndpoint();
// Try to get or register transportProvider
shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint);
}

}

ShenyuMcpServerManager is the management center of McpServer in Shenyu. It not only stores McpAsyncServer, CompositeTransportProvider, etc., but also contains methods to register Transport and McpServer.

  • The getOrCreateMcpServerTransport() method works as follows:
@Component
public class ShenyuMcpServerManager {
public ShenyuSseServerTransportProvider getOrCreateMcpServerTransport(final String uri, final String messageEndPoint) {
// Remove /streamablehttp and /message suffixes
String normalizedPath = processPath(uri);
return getOrCreateTransport(normalizedPath, SSE_PROTOCOL,
() -> createSseTransport(normalizedPath, messageEndPoint));
}

private <T> T getOrCreateTransport(final String normalizedPath, final String protocol,
final java.util.function.Supplier<T> transportFactory) {
// Get composite Transport instance
CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(normalizedPath);

T transport = switch (protocol) {
case SSE_PROTOCOL -> (T) compositeTransport.getSseTransport();
case STREAMABLE_HTTP_PROTOCOL -> (T) compositeTransport.getStreamableHttpTransport();
default -> null;
};
// If instance is missing in cache, create a new one
if (Objects.isNull(transport)) {
// Call createSseTransport() to create and store a new transport
transport = transportFactory.get();
// Create McpAsyncServer and register the transport
addTransportToSharedServer(normalizedPath, protocol, transport);
}

return transport;
}
}
1.1.1 Transport Registration
  • createSseTransport() method

This method is called within getOrCreateMcpServerTransport() and is used to create a Transport

@Component
public class ShenyuMcpServerManager {
private ShenyuSseServerTransportProvider createSseTransport(final String normalizedPath, final String messageEndPoint) {
String messageEndpoint = normalizedPath + messageEndPoint;
ShenyuSseServerTransportProvider transportProvider = ShenyuSseServerTransportProvider.builder()
.objectMapper(objectMapper)
.sseEndpoint(normalizedPath)
.messageEndpoint(messageEndpoint)
.build();
// Register the two functions of transportProvider to the Manager's routeMap
registerRoutes(normalizedPath, messageEndpoint, transportProvider::handleSseConnection, transportProvider::handleMessage);
return transportProvider;
}
}
1.1.2 McpServer Registration
  • addTransportToSharedServer() method

This method is called within getOrCreateMcpServerTransport() and is used to create and save McpServer

This method creates a new McpServer, stores it in sharedServerMap, and saves the TransportProvider obtained above into compositeTransportMap.

@Component
public class ShenyuMcpServerManager {
private void addTransportToSharedServer(final String normalizedPath, final String protocol, final Object transportProvider) {
// Get or create and register McpServer
getOrCreateSharedServer(normalizedPath);

// Save the new transport protocol into compositeTransportMap
compositeTransport.addTransport(protocol, transportProvider);

}

private McpAsyncServer getOrCreateSharedServer(final String normalizedPath) {
return sharedServerMap.computeIfAbsent(normalizedPath, path -> {
// Get transport protocols
CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(path);

// Select server capabilities
var capabilities = McpSchema.ServerCapabilities.builder()
.tools(true)
.logging()
.build();

// Create and store McpServer
McpAsyncServer server = McpServer
.async(compositeTransport)
.serverInfo("MCP Shenyu Server (Multi-Protocol)", "1.0.0")
.capabilities(capabilities)
.tools(Lists.newArrayList())
.build();

return server;
});
}
}

1.2 Tools Registration

  • handlerRule() method works as follows:
  1. Captures the tool configuration info users fill in for the Tool, all used to build the tool
  2. Deserializes to create ShenyuMcpServerTool and obtains tool info

Note: ShenyuMcpServerTool is also a Shenyu-side object for storing tool info, unrelated by inheritance to McpServerTool

  1. Calls addTool() method to create the tool using this info and registers the tool to the matching McpServer based on SelectorId
public class McpServerPluginDataHandler implements PluginDataHandler {
@Override
public void handlerRule(final RuleData ruleData) {
Optional.ofNullable(ruleData.getHandle()).ifPresent(s -> {
// Deserialize a new ShenyuMcpServerTool
ShenyuMcpServerTool mcpServerTool = GsonUtils.getInstance().fromJson(s, ShenyuMcpServerTool.class);
// Cache mcpServerTool
CACHED_TOOL.get().cachedHandle(CacheKeyUtils.INST.getKey(ruleData), mcpServerTool);
// Build MCP schema
List<McpServerToolParameter> parameters = mcpServerTool.getParameters();
String inputSchema = JsonSchemaUtil.createParameterSchema(parameters);
ShenyuMcpServer server = CACHED_SERVER.get().obtainHandle(ruleData.getSelectorId());
if (Objects.nonNull(server)) {
// Save tool info into Manager's sharedServerMap
shenyuMcpServerManager.addTool(server.getPath(),
StringUtils.isBlank(mcpServerTool.getName()) ? ruleData.getName()
: mcpServerTool.getName(),
mcpServerTool.getDescription(),
mcpServerTool.getRequestConfig(),
inputSchema);
}
});
}
}
  • addTool() method

This method is called by handlerRule() to add a new tool

This method performs:

  1. Converts the previous tool info into a shenyuToolDefinition object
  2. Creates a ShenyuToolCallback object using the converted shenyuToolDefinition

ShenyuToolCallback overrides the call() method of ToolCallBack and registers this overridden method to AsyncToolSpecification, so calling the tool's call() will actually invoke this overridden method

  1. Converts ShenyuToolCallback to AsyncToolSpecification and registers it to the corresponding McpServer
public class McpServerPluginDataHandler implements PluginDataHandler {
public void addTool(final String serverPath, final String name, final String description,
final String requestTemplate, final String inputSchema) {
String normalizedPath = normalizeServerPath(serverPath);
// Build Definition object
ToolDefinition shenyuToolDefinition = ShenyuToolDefinition.builder()
.name(name)
.description(description)
.requestConfig(requestTemplate)
.inputSchema(inputSchema)
.build();

ShenyuToolCallback shenyuToolCallback = new ShenyuToolCallback(shenyuToolDefinition);

// Get previously registered McpServer and register the Tool
McpAsyncServer sharedServer = sharedServerMap.get(normalizedPath);
for (AsyncToolSpecification asyncToolSpecification : McpToolUtils.toAsyncToolSpecifications(shenyuToolCallback)) {
sharedServer.addTool(asyncToolSpecification).block();
}
}
}

With this, service registration analysis is complete.

Service registration overview diagram

2. Plugin Execution

Clients will send two types of messages with /sse and /message suffixes. These messages are captured by the Shenyu McpServer plugin, which handles them differently. When receiving /sse messages, the plugin creates and saves a session object, then returns a session id for /message usage. When receiving /message messages, the plugin executes methods based on the method info carried by the /message message, such as fetching work lists, tool invocation, and resource lists.

  • doExecute() method works as follows:
  1. Matches the path and checks if the Mcp plugin registered it
  2. Calls routeByProtocol() to choose the appropriate handling plan based on the request protocol

This article focuses on the SSE request mode, so we enter the handleSseRequest() method

public class McpServerPlugin extends AbstractShenyuPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule) {
final String uri = exchange.getRequest().getURI().getRawPath();
// Check if Mcp plugin registered this route; if not, continue chain without handling
if (!shenyuMcpServerManager.canRoute(uri)) {
return chain.execute(exchange);
}
final ServerRequest request = ServerRequest.create(exchange, messageReaders);
// Choose handling method based on URI protocol
return routeByProtocol(exchange, chain, request, selector, uri);
}

private Mono<Void> routeByProtocol(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final ServerRequest request,
final SelectorData selector,
final String uri) {

if (isStreamableHttpProtocol(uri)) {
return handleStreamableHttpRequest(exchange, chain, request, uri);
} else if (isSseProtocol(uri)) {
// Handle SSE requests
return handleSseRequest(exchange, chain, request, selector, uri);
}
}
}
  • handleSseRequest() method

Called by routeByProtocol() to determine if the client wants to create a session or call a tool based on URI suffix

public class McpServerPlugin extends AbstractShenyuPlugin {
private Mono<Void> handleSseRequest(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final ServerRequest request,
final SelectorData selector,
final String uri) {
ShenyuMcpServer server = McpServerPluginDataHandler.CACHED_SERVER.get().obtainHandle(selector.getId());
String messageEndpoint = server.getMessageEndpoint();
// Get the transport provider
ShenyuSseServerTransportProvider transportProvider
= shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint);
// Determine if the request is an SSE connection or a message call
if (uri.endsWith(messageEndpoint)) {
setupSessionContext(exchange, chain);
return handleMessageEndpoint(exchange, transportProvider, request);
} else {
return handleSseEndpoint(exchange, transportProvider, request);
}
}
}

2.1 Client Sends SSE Request

If the client sends a request ending with /sse, the handleSseEndpoint() method is executed

  • handleSseEndpoint() mainly does:
  1. Sets SSE request headers
  2. Calls ShenyuSseServerTransportProvider.createSseFlux() to create the SSE stream
public class McpServerPlugin extends AbstractShenyuPlugin {
private Mono<Void> handleSseEndpoint(final ServerWebExchange exchange,
final ShenyuSseServerTransportProvider transportProvider,
final ServerRequest request) {
// Configure SSE request headers
configureSseHeaders(exchange);

// Create SSE stream
return exchange.getResponse()
.writeWith(transportProvider
.createSseFlux(request));
}
}
  • createSseFlux() method

Called by handleSseEndpoint(); mainly used to create and save a session

  1. Creates session; the session factory registers a series of handlers, which are the objects actually executing tool calls
  2. Saves the session for reuse
  3. Sends the session id as a parameter of the endpoint URL back to the client, to be used when calling the message endpoint
public class ShenyuSseServerTransportProvider implements McpServerTransportProvider {
public Flux<ServerSentEvent<?>> createSseFlux(final ServerRequest request) {
return Flux.<ServerSentEvent<?>>create(sink -> {
WebFluxMcpSessionTransport sessionTransport = new WebFluxMcpSessionTransport(sink);
// Create McpServerSession and temporarily store plugin chain info
McpServerSession session = sessionFactory.create(sessionTransport);
String sessionId = session.getId();
sessions.put(sessionId, session);

// Send session id info back to client
String endpointUrl = this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId;
ServerSentEvent<String> endpointEvent = ServerSentEvent.<String>builder()
.event(ENDPOINT_EVENT_TYPE)
.data(endpointUrl)
.build();
}).doOnSubscribe(subscription -> LOGGER.info("SSE Flux subscribed"))
.doOnRequest(n -> LOGGER.debug("SSE Flux requested {} items", n));
}
}

2.2 Client Sends Message Request

If the client sends a request ending with /message, the current ShenyuPluginChain info is saved into the session, and handleMessageEndpoint() is called.
Subsequent tool calls continue executing this plugin chain, so plugins after the Mcp plugin will affect tool requests.

  • handleMessageEndpoint() method, calls ShenyuSseServerTransportProvider.handleMessageEndpoint() to process
if (uri.endsWith(messageEndpoint)) {
setupSessionContext(exchange, chain);
return handleMessageEndpoint(exchange, transportProvider, request);
}
public class McpServerPlugin extends AbstractShenyuPlugin {
private Mono<Void> handleMessageEndpoint(final ServerWebExchange exchange,
final ShenyuSseServerTransportProvider transportProvider,
final ServerRequest request) {
// Handle message requests
return transportProvider.handleMessageEndpoint(request)
.flatMap(result -> {
return exchange.getResponse()
.writeWith(Mono.just(exchange.getResponse().bufferFactory().wrap(responseBody.getBytes())));
});
}
}
  • handleMessageEndpoint() method

Called by McpServerPlugin.handleMessageEndpoint(), hands over the request to the session for processing

The session's handler() method performs different actions depending on the message.
For example, when the method in the message is "tools/call", the tool invocation handler executes the call() method to call the tool.
The related source is omitted here.

public class ShenyuSseServerTransportProvider implements McpServerTransportProvider {
public Mono<MessageHandlingResult> handleMessageEndpoint(final ServerRequest request) {
// Get session
String sessionId = request.queryParam("sessionId").get();
McpServerSession session = sessions.get(sessionId);
return request.bodyToMono(String.class)
.flatMap(body -> {
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);
return session.handle(message);
});
}
}

At this point, the Shenyu Mcp Plugin service invocation source code analysis is complete.

Process flow overview

3. Tool Invocation

If the client sends a message to invoke a tool, the session will use the tool invocation handler to execute the tool’s call() method.
From service registration, we know the tool call actually runs the call() method of ShenyuToolCallback.

Therefore, the tool invocation executes the following:

  • call() method mainly does:
  1. Gets session id
  2. Gets requestTemplate, the extra configuration provided by Shenyu
  3. Gets the previously stored Shenyu plugin chain and passes the tool call info to the chain for continued execution
  4. Asynchronously waits for the tool response

After the plugin chain completes, the tool call request is actually sent to the service hosting the tool.

public class ShenyuToolCallback implements ToolCallback {
@NonNull
@Override
public String call(@NonNull final String input, final ToolContext toolContext) {
// Extract sessionId from MCP request
final McpSyncServerExchange mcpExchange = extractMcpExchange(toolContext);
final String sessionId = extractSessionId(mcpExchange);
// Extract requestTemplate info
final String configStr = extractRequestConfig(shenyuTool);

// Get the previously stored plugin chain by sessionId
final ServerWebExchange originExchange = getOriginExchange(sessionId);
final ShenyuPluginChain chain = getPluginChain(originExchange);

// Execute the tool call
return executeToolCall(originExchange, chain, sessionId, configStr, input);
}

private String executeToolCall(final ServerWebExchange originExchange,
final ShenyuPluginChain chain,
final String sessionId,
final String configStr,
final String input) {

final CompletableFuture<String> responseFuture = new CompletableFuture<>();
final ServerWebExchange decoratedExchange = buildDecoratedExchange(
originExchange, responseFuture, sessionId, configStr, input);
// Execute plugin chain, call the actual tool
chain.execute(decoratedExchange)
.subscribe();

// Wait for response
final String result = responseFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
return result;
}
}

This concludes the Shenyu MCP Plugin tool invocation analysis.


4. Summary

This article analyzed the source code from Mcp service registration, through Mcp plugin service invocation, to tool invocation.
The McpServer plugin makes Shenyu a powerful and centralized McpServer.


· 5 min read
Kunshuai Zhu

Before starting, you can refer to this article to start the gateway

Body

Let's take a look at the structure of this plugin first, as shown in the figure below.

param-mapping-structure

Guess: handler is used for data synchronization; strategy may be adapted to various request bodies, which should be the focus of this plugin; ParamMappingPlugin should be the implementation of ShenyuPlugin.

First, take a look at the ParamMappingPlugin, the focus is on the override of the doExecute method.

public Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
... // judge whether paramMappingHandle is null
// Determine the request body type according to the contentType in the header line
HttpHeaders headers = exchange.getRequest().getHeaders();
MediaType contentType = headers.getContentType();
// *
return match(contentType).apply(exchange, chain, paramMappingHandle);
}
  • The match method returns the corresponding Operator according to contentType

    private Operator match(final MediaType mediaType) {
    if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
    return operatorMap.get(MediaType.APPLICATION_JSON.toString());
    } else if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
    return operatorMap.get(MediaType.APPLICATION_FORM_URLENCODED.toString());
    } else {
    return operatorMap.get(Constants.DEFAULT);
    }
    }

    As can be seen from the code of the match method, there are currently three types of DefaultOperator, FormDataOperator, and JsonOperator, which support the request body in two formats: x-www-form-urlencoded and json.

So let's take a look at what the above three operators are like.

1. DefaultOperator

Nothing happens, its apply method just continues to execute the plug-in chain, and has no real function. When the request body does not match the Operator, it will be skipped by DefaultOperator.

2. FormDataOperator

This class is used to process the request body in the format of x-www-form-urlencoded.

Mainly depends on the apply method, but it looks a bit strange.

public Mono<Void> apply(final ServerWebExchange exchange, final ShenyuPluginChain shenyuPluginChain, final ParamMappingHandle paramMappingHandle) {
return exchange.getFormData()
.switchIfEmpty(Mono.defer(() -> Mono.just(new LinkedMultiValueMap<>())))
.flatMap(multiValueMap -> {
...
});
}

The code in the ellipsis is the processing of the request body, as follows.

// judge whether it is empty
if (Objects.isNull(multiValueMap) || multiValueMap.isEmpty()) {
return shenyuPluginChain.execute(exchange);
}
// convert form-data to json
String original = GsonUtils.getInstance().toJson(multiValueMap);
LOG.info("get from data success data:{}", original);
// *modify request body*
String modify = operation(original, paramMappingHandle);
if (StringUtils.isEmpty(modify)) {
return shenyuPluginChain.execute(exchange);
}
...
// Convert the modified json into LinkedMultiValueMap. Pay attention to this line, it will be mentioned later!
LinkedMultiValueMap<String, String> modifyMap = GsonUtils.getInstance().toLinkedMultiValueMap(modify);
...
final BodyInserter bodyInserter = BodyInserters.fromValue(modifyMap);
...
// modify the request body in the exchange, and then continue to execute the plugin chain
return bodyInserter.insert(cachedBodyOutputMessage, new BodyInserterContext())
.then(Mono.defer(() -> shenyuPluginChain.execute(exchange.mutate()
.request(new ModifyServerHttpRequestDecorator(httpHeaders, exchange.getRequest(), cachedBodyOutputMessage))
.build())
)).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(cachedBodyOutputMessage, throwable));

PS: The omitted part is to set the request first and other operations.

The more important thing above should be the modification request body of the star, that is, the call of the operation method. Here, because of the parameter type, the default method of the Operator interface will be called first (instead of being overridden by the FormDataOperator).

default String operation(final String jsonValue, final ParamMappingHandle paramMappingHandle) {
DocumentContext context = JsonPath.parse(jsonValue);
// call the override operation method and add addParameterKey
operation(context, paramMappingHandle);
// replace the related replacedParameterKey
if (!CollectionUtils.isEmpty(paramMappingHandle.getReplaceParameterKeys())) {
paramMappingHandle.getReplaceParameterKeys().forEach(info -> {
context.renameKey(info.getPath(), info.getKey(), info.getValue());
});
}
// Delete the related removeParameterKey
if (!CollectionUtils.isEmpty(paramMappingHandle.getRemoveParameterKeys())) {
paramMappingHandle.getRemoveParameterKeys().forEach(info -> {
context.delete(info);
});
}
return context.jsonString();
}

After sorting it out, we can find that the json tool JsonPath imported here makes the processing of the request body much simpler and clearer.

In addition, we can notice that the FormDataOperator overrides the operation(DocumentContext, ParamMappingHandle) method.

Why override it? There is a default method for handling addParameterKey in the interface.

// Default method in Operator interface
default void operation(final DocumentContext context, final ParamMappingHandle paramMappingHandle) {
if (!CollectionUtils.isEmpty(paramMappingHandle.getAddParameterKeys())) {
paramMappingHandle.getAddParameterKeys().forEach(info -> {
context.put(info.getPath(), info.getKey(), info.getValue()); //不同之处
});
}
}

// method overridden by FormDataOperator
@Override
public void operation(final DocumentContext context, final ParamMappingHandle paramMappingHandle) {
if (!CollectionUtils.isEmpty(paramMappingHandle.getAddParameterKeys())) {
paramMappingHandle.getAddParameterKeys().forEach(info -> {
context.put(info.getPath(), info.getKey(), Arrays.asList(info.getValue()));
});
}
}

In fact, there is such a line in FormDataOperator#apply (mentioned earlier): LinkedMultiValueMap<String, String> modifyMap = GsonUtils.getInstance().toLinkedMultiValueMap(modify);

This line converts the modified json into LinkedMultiValueMap, GsonUtils#toLinkedMultiValueMap is as follows.

public LinkedMultiValueMap<String, String> toLinkedMultiValueMap(final String json) {
return GSON.fromJson(json, new TypeToken<LinkedMultiValueMap<String, String>>() {
}.getType());
}

The attribute targetMap in the LinkedMultiValueMap class is defined as: private final Map<K, List<V>> targetMap

Therefore, the value in the json string must be in the form of a list, otherwise Gson will throw a conversion error exception, which is why the FormDataOperator must override the operator method.

But why use LinkedMultiValueMap?

Go back to the first line exchange.getFormData of the FormDataOperator#apply method. In SpringMVC, the return value type of DefaultServerWebExchange#getFormData is Mono<MultiValueMap<String, String>>, and LinkedMultiValueMap is a subclass of MultiValueMap. And, the getFormData method is for the request body in the format of x-www-form-urlencoded.

param-mapping-getFormData

三、JsonOperator

Obviously, this class is used to process the request body in Json format.

public Mono<Void> apply(final ServerWebExchange exchange, final ShenyuPluginChain shenyuPluginChain, final ParamMappingHandle paramMappingHandle) {
ServerRequest serverRequest = ServerRequest.create(exchange, MESSAGE_READERS);
Mono<String> mono = serverRequest.bodyToMono(String.class).switchIfEmpty(Mono.defer(() -> Mono.just(""))).flatMap(originalBody -> {
LOG.info("get body data success data:{}", originalBody);
// call the default operation method to modify the request body
String modify = operation(originalBody, paramMappingHandle);
return Mono.just(modify);
});
BodyInserter bodyInserter = BodyInserters.fromPublisher(mono, String.class);
... //process the header line
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
// modify the request body in the exchange, and then continue to execute the plugin chain
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
ServerHttpRequestDecorator decorator = new ModifyServerHttpRequestDecorator(headers, exchange.getRequest(), outputMessage);
return shenyuPluginChain.execute(exchange.mutate().request(decorator).build());
})).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(outputMessage, throwable));
}

The processing flow of JsonOperator is roughly similar to that of FormDataOperator.

Conclusion

Finally, use a picture to briefly summarize.

param-mapping-summary

· 29 min read

Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.

In ShenYu gateway, the registration center is used to register the client information to shenyu-admin, admin then synchronizes this information to the gateway through data synchronization, and the gateway completes traffic filtering through these data. The client information mainly includes interface information and URI information.

This article is based on shenyu-2.5.0 version for source code analysis, please refer to Client Access Principles for the introduction of the official website.

1. Registration Center Principle

When the client starts, it reads the interface information and uri information, and sends the data to shenyu-admin by the specified registration type.

The registration center in the figure requires the user to specify which registration type to use. ShenYu currently supports Http, Zookeeper, Etcd, Consul and Nacos for registration. Please refer to Client Access Configuration for details on how to configure them.

ShenYu introduces Disruptor in the principle design of the registration center, in which the Disruptor queue plays a role in decoupling data and operations, which is conducive to expansion. If too many registration requests lead to registration exceptions, it also has a data buffering role.

As shown in the figure, the registration center is divided into two parts, one is the registration center client register-client, the load processing client data reading. The other is the registration center server register-server, which is loaded to handle the server side (that is shenyu-admin) data writing. Data is sent and received by specifying the registration type.

  • Client: Usually it is a microservice, which can be springmvc, spring-cloud, dubbo, grpc, etc.
  • register-client: register the central client, read the client interface and uri information.
  • Disruptor: decoupling data from operations, data buffering role.
  • register-server: registry server, here is shenyu-admin, receive data, write to database, send data synchronization events.
  • registration-type: specify the registration type, complete data registration, currently supports Http, Zookeeper, Etcd, Consul and Nacos.

This article analyzes the use of Http for registration, so the specific processing flow is as follows.

On the client side, after the data is out of the queue, the data is transferred via http and on the server side, the corresponding interface is provided to receive the data and then write it to the queue.

2. Client Registration Process

When the client starts, it reads the attribute information according to the relevant configuration, and then writes it to the queue. Let's take the official shenyu-examples-http as an example and start the source code analysis . The official example is a microservice built by springboot. For the configuration of the registration center, please refer to the official website client access configuration .

2.1 Load configuration, read properties

Let's start with a diagram that ties together the initialization process of the registry client.

We are analyzing registration by means of http, so the following configuration is required.

shenyu:
register:
registerType: http
serverLists: http://localhost:9095
props:
username: admin
password: 123456
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false

Each attribute indicates the following meaning.

  • registerType: the service registration type, fill in http.
  • serverList: The address of the Shenyu-Admin project to fill in for the http registration type, note the addition of http:// and separate multiple addresses with English commas.
  • username: The username of the Shenyu-Admin
  • password: The password of the Shenyu-Admin
  • port: the start port of your project, currently springmvc/tars/grpc needs to be filled in.
  • contextPath: the routing prefix for your mvc project in shenyu gateway, such as /order, /product, etc. The gateway will route according to your prefix.
  • appName: the name of your application, if not configured, it will take the value of spring.application.name by default.
  • isFull: set true to proxy your entire service, false to proxy one of your controllers; currently applies to springmvc/springcloud.

After the project starts, it will first load the configuration file, read the property information and generate the corresponding Bean.

The first configuration file read is ShenyuSpringMvcClientConfiguration, which is the http registration configuration class for the shenyu client, indicated by @Configuration which is a configuration class, and by @ImportAutoConfiguration which is a configuration class. to introduce other configuration classes. Create SpringMvcClientEventListener, which mainly handles metadata and URI information.

/**
* Shenyu SpringMvc Client Configuration
*/
@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {

// create SpringMvcClientEventListener to handle metadata and URI
@Bean
public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
}
}

ShenyuClientCommonBeanConfiguration is a shenyu client common configuration class that will create the bean common to the registry client.

  • Create ShenyuClientRegisterRepository, which is created by factory class.
  • Create ShenyuRegisterCenterConfig, which reads the shenyu.register property configuration.
  • Create ShenyuClientConfig, read the shenyu.client property configuration.

/**
* Shenyu Client Common Bean Configuration
*/
@Configuration
public class ShenyuClientCommonBeanConfiguration {

// create ShenyuClientRegisterRepository by factory
@Bean
public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
return ShenyuClientRegisterRepositoryFactory.newInstance(config);
}

// create ShenyuRegisterCenterConfig to read shenyu.register properties
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}

// create ShenyuClientConfig to read shenyu.client properties
@Bean
@ConfigurationProperties(prefix = "shenyu")
public ShenyuClientConfig shenyuClientConfig() {
return new ShenyuClientConfig();
}
}

2.2 HttpClientRegisterRepository

The ShenyuClientRegisterRepository generated in the configuration file above is a concrete implementation of the client registration, which is an interface with the following implementation class.

  • HttpClientRegisterRepository: registration via http.
  • ConsulClientRegisterRepository: registration via Consul.
  • EtcdClientRegisterRepository: registration via Etcd; EtcdClientRegisterRepository: registration via Etcd.
  • NacosClientRegisterRepository: registration via nacos; NacosClientRegisterRepository: registration via nacos.
  • ZookeeperClientRegisterRepository: registration through Zookeeper.

The specific way which is achieved by loading through SPI, the implementation logic is as follows.


/**
* load ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {

private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();

/**
* create ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// Loading by means of SPI, type determined by registerType
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
//init ShenyuClientRegisterRepository
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}

The load type is specified by registerType, which is the type we specify in the configuration file at

shenyu:
register:
registerType: http
serverLists: http://localhost:9095

We specified http, so it will go to load HttpClientRegisterRepository. After the object is successfully created, the initialization method init() is executed as follows.

@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {

@Override
public void init(final ShenyuRegisterCenterConfig config) {
this.username = config.getProps().getProperty(Constants.USER_NAME);
this.password = config.getProps().getProperty(Constants.PASS_WORD);
this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
this.setAccessToken();
}

// ......
}

Read username, password and serverLists from the configuration file, the username, password and address of sheenyu-admin, in preparation for subsequent data sending. The class annotation @Join is used for SPI loading.

SPI, known as Service Provider Interface, is a service provider discovery feature built into the JDK, a mechanism for dynamic replacement discovery.

shenyu-spi is a custom SPI extension implementation for the Apache ShenYu gateway, designed and implemented with reference to Dubbo SPI extension implementation.

2.3 SpringMvcClientEventListener

Create SpringMvcClientEventListener, which is responsible for the construction and registration of client-side metadata and URI data, and its creation is done in the configuration file.

@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
public class ShenyuSpringMvcClientConfiguration {
// ......

// create SpringMvcClientEventListener
@Bean
public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
}
}

SpringMvcClientEventListener implements the AbstractContextRefreshedEventListener

The AbstractContextRefreshedEventListener is an abstract class. it implements the ApplicationListener interface and overrides the onApplicationEvent() method, which is executed when a Spring event occurs. It has several implementation classes, which support different kind of RPC styles.

  • AlibabaDubboServiceBeanListener:handles Alibaba Dubbo protocol.
  • ApacheDubboServiceBeanListener:handles Apache Dubbo protocol.
  • GrpcClientEventListener:handles grpc protocol.
  • MotanServiceEventListener:handles Motan protocol.
  • SofaServiceEventListener:handles Sofa protocol.
  • SpringMvcClientEventListener:handles http protocol.
  • SpringWebSocketClientEventListener:handles Websocket protocol.
  • TarsServiceBeanEventListener:handles Tars protocol.
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {

//......

// Instantiation is done through the constructor
public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// read shenyu.client.http properties
Properties props = clientConfig.getProps();
// appName
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
// contextPath
this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");
if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
String errorMsg = "client register param must config the appName or contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);
// host
this.host = props.getProperty(ShenyuClientConstants.HOST);
// port
this.port = props.getProperty(ShenyuClientConstants.PORT);
// publish event
publisher.start(shenyuClientRegisterRepository);
}

// This method is executed when a context refresh event(ContextRefreshedEvent), occurs
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {
// The contents of the method are guaranteed to be executed only once
if (!registered.compareAndSet(false, true)) {
return;
}
final ApplicationContext context = event.getApplicationContext();
// get the specific beans
Map<String, T> beans = getBeans(context);
if (MapUtils.isEmpty(beans)) {
return;
}
// build URI data and register it
publisher.publishEvent(buildURIRegisterDTO(context, beans));
// build metadata and register it
beans.forEach(this::handle);
}

@SuppressWarnings("all")
protected abstract URIRegisterDTO buildURIRegisterDTO(ApplicationContext context,
Map<String, T> beans);


protected void handle(final String beanName, final T bean) {
Class<?> clazz = getCorrectedClass(bean);
final A beanShenyuClient = AnnotatedElementUtils.findMergedAnnotation(clazz, getAnnotationType());
final String superPath = buildApiSuperPath(clazz, beanShenyuClient);
if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {
handleClass(clazz, bean, beanShenyuClient, superPath);
return;
}
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
for (Method method : methods) {
handleMethod(bean, clazz, beanShenyuClient, method, superPath);
}
}

// default implementation. build URI data and register it
protected void handleClass(final Class<?> clazz,
final T bean,
@NonNull final A beanShenyuClient,
final String superPath) {
publisher.publishEvent(buildMetaDataDTO(bean, beanShenyuClient, pathJoin(contextPath, superPath), clazz, null));
}

// default implementation. build metadata and register it
protected void handleMethod(final T bean,
final Class<?> clazz,
@Nullable final A beanShenyuClient,
final Method method,
final String superPath) {
// get the annotation
A methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, getAnnotationType());
if (Objects.nonNull(methodShenyuClient)) {
// 构建元数据,发送注册事件
publisher.publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));
}
}

protected abstract MetaDataRegisterDTO buildMetaDataDTO(T bean,
@NonNull A shenyuClient,
String path,
Class<?> clazz,
Method method);
}

In the constructor, the main purpose is to read the property information and then perform the checksum.

shenyu:
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false

Finally, publisher.start() is executed to start event publishing and prepare for registration.

ShenyuClientRegisterEventPublisher is implemented via singleton pattern, mainly generating metadata and URI subscribers (subsequently used for data publishing), and then starting the Disruptor queue. A common method publishEvent() is provided to publish events and send data to the Disruptor queue.


public class ShenyuClientRegisterEventPublisher {

private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();

private DisruptorProviderManage<DataTypeParent> providerManage;

public static ShenyuClientRegisterEventPublisher getInstance() {
return INSTANCE;
}

public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();
factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}

public <T> void publishEvent(final DataTypeParent data) {
DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
provider.onData(data);
}
}

The logic of the constructor of AbstractContextRefreshedEventListener is analyzed, it mainly reads the property configuration, creates metadata and URI subscribers, and starts the Disruptor queue.

The onApplicationEvent() method is executed when a Spring event occurs, the parameter here is ContextRefreshedEvent, which means the context refresh event.

ContextRefreshedEvent is a Spring built-in event. It is fired when the ApplicationContext is initialized or refreshed. This can also happen in the ConfigurableApplicationContext interface using the refresh() method. Initialization here means that all Beans have been successfully loaded, post-processing Beans have been detected and activated, all Singleton Beans have been pre-instantiated, and the ApplicationContext container is ready to be used.

  • SpringMvcClientEventListener: the http implementation of AbstractContextRefreshedEventListener:
public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {

private final List<Class<? extends Annotation>> mappingAnnotation = new ArrayList<>(3);

private final Boolean isFull;

private final String protocol;

// 构造函数
public SpringMvcClientEventListener(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
super(clientConfig, shenyuClientRegisterRepository);
Properties props = clientConfig.getProps();
// get isFull
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// http protocol
this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);
mappingAnnotation.add(ShenyuSpringMvcClient.class);
mappingAnnotation.add(RequestMapping.class);
}

@Override
protected Map<String, Object> getBeans(final ApplicationContext context) {
// Configuration attribute, if isFull=true, means register the whole microservice
if (Boolean.TRUE.equals(isFull)) {
getPublisher().publishEvent(MetaDataRegisterDTO.builder()
.contextPath(getContextPath())
.appName(getAppName())
.path(PathUtils.decoratorPathWithSlash(getContextPath()))
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(getContextPath())
.build());
return null;
}
// get bean with Controller annotation
return context.getBeansWithAnnotation(Controller.class);
}

@Override
protected URIRegisterDTO buildURIRegisterDTO(final ApplicationContext context,
final Map<String, Object> beans) {
// ...
}

@Override
protected String buildApiSuperPath(final Class<?> clazz, @Nullable final ShenyuSpringMvcClient beanShenyuClient) {
if (Objects.nonNull(beanShenyuClient) && StringUtils.isNotBlank(beanShenyuClient.path())) {
return beanShenyuClient.path();
}
RequestMapping requestMapping = AnnotationUtils.findAnnotation(clazz, RequestMapping.class);
// Only the first path is supported temporarily
if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {
return requestMapping.path()[0];
}
return "";
}

@Override
protected Class<ShenyuSpringMvcClient> getAnnotationType() {
return ShenyuSpringMvcClient.class;
}

@Override
protected void handleMethod(final Object bean, final Class<?> clazz,
@Nullable final ShenyuSpringMvcClient beanShenyuClient,
final Method method, final String superPath) {
// get RequestMapping annotation
final RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(method, RequestMapping.class);
// get ShenyuSpringMvcClient annotation
ShenyuSpringMvcClient methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, ShenyuSpringMvcClient.class);
methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;
// the result of ReflectionUtils#getUniqueDeclaredMethods contains method such as hashCode, wait, toSting
// add Objects.nonNull(requestMapping) to make sure not register wrong method
if (Objects.nonNull(methodShenyuClient) && Objects.nonNull(requestMapping)) {
getPublisher().publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));
}
}

//...

// 构造元数据
@Override
protected MetaDataRegisterDTO buildMetaDataDTO(final Object bean,
@NonNull final ShenyuSpringMvcClient shenyuClient,
final String path, final Class<?> clazz,
final Method method) {
//...
}
}

The registration logic is done through publisher.publishEvent().

The Controller annotation and the RequestMapping annotation are provided by Spring, which you should be familiar with, so I won't go into details. The ShenyuSpringMvcClient annotation is provided by Apache ShenYu to register the SpringMvc client, which is defined as follows.


/**
* ShenyuSpringMvcClient
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {

// path
@AliasFor(attribute = "path")
String value() default "";

// path
@AliasFor(attribute = "value")
String path();

// ruleName
String ruleName() default "";

// desc info
String desc() default "";

// enabled
boolean enabled() default true;

// register MetaData
boolean registerMetaData() default false;
}

It is used as follows.

  • register the entire interface
@RestController
@RequestMapping("/test")
@ShenyuSpringMvcClient(path = "/test/**") // register the entire interface
public class HttpTestController {
//......
}
  • register current method
@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order")
public class OrderController {

/**
* Save order dto.
*
* @param orderDTO the order dto
* @return the order dto
*/
@PostMapping("/save")
@ShenyuSpringMvcClient(path = "/save", desc = "Save order") // register current method
public OrderDTO save(@RequestBody final OrderDTO orderDTO) {
orderDTO.setName("hello world save order");
return orderDTO;
}
  • publisher.publishEvent()

This method sends the data to the Disruptor queue. More details about the Disruptor queue are not described here, which does not affect the flow of analyzing the registration.

When the data is sent, the consumers of the Disruptor queue will process the data for consumption.

This method sends the data to the Disruptor queue. More details about the Disruptor queue are not described here, which does not affect the flow of analyzing the registration.

  • QueueConsumer

QueueConsumer is a consumer that implements the WorkHandler interface, which is created in the providerManage.startup() logic. The WorkHandler interface is the data consumption interface for Disruptor, and the only method is onEvent().

package com.lmax.disruptor;

public interface WorkHandler<T> {
void onEvent(T event) throws Exception;
}

The QueueConsumer overrides the onEvent() method, and the main logic is to generate the consumption task and then go to the thread pool to execute it.


/**
*
* QueueConsumer
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {

// ......

@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// Use different thread pools based on DataEvent type
ThreadPoolExecutor executor = orderly(t);
// create queue consumption tasks via factory
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// set data
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// put in the thread pool to execute the consumption task
executor.execute(queueConsumerExecutor);
}
}
}

QueueConsumerExecutor is the task that is executed in the thread pool, it implements the Runnable interface, and there are two specific implementation classes.

  • RegisterClientConsumerExecutor:the client-side consumer executor.
  • RegisterServerConsumerExecutor:server-side consumer executor.

As the name implies, one is responsible for handling client-side tasks, and one is responsible for handling server-side tasks (the server side is admin, which is analyzed below).

  • RegisterClientConsumerExecutor

The logic of the rewritten run() is as follows.


public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {

//......

@Override
public void run() {
// get data
final T data = getData();
// call the appropriate processor for processing according to the data type
subscribers.get(data.getType()).executor(Lists.newArrayList(data));
}

}

Different processors are called to perform the corresponding tasks based on different data types. There are two types of data, one is metadata, which records the client registration information. One is the URI data, which records the client service information.

public enum DataType {

META_DATA,

URI,
}
  • ExecutorSubscriber#executor()

The actuator subscribers are divided into two categories, one that handles metadata and one that handles URIs. There are two on the client side and two on the server side, so there are four in total.

Here is the registration metadata information, so the execution class is ShenyuClientMetadataExecutorSubscriber.

  • ShenyuClientMetadataExecutorSubscriber#executor()

The metadata processing logic on the client side is: iterate through the metadata information and call the interface method persistInterface() to finish publishing the data.

public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.META_DATA;
}

@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
// call the interface method persistInterface() to finish publishing the data
shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);
}
}
}

The two registration interfaces get the data well and call the publish() method to publish the data to the Disruptor queue.

  • ShenyuServerRegisterRepository

The ShenyuServerRegisterRepository interface is a service registration interface, which has five implementation classes, indicating five types of registration.

  • ConsulServerRegisterRepository: registration is achieved through Consul;
  • EtcdServerRegisterRepository: registration through Etcd.
  • NacosServerRegisterRepository: registration through Nacos.
  • ShenyuHttpRegistryController: registration via Http; ShenyuHttpRegistryController: registration via Http.
  • ZookeeperServerRegisterRepository: registration through Zookeeper.

As you can see from the diagram, the loading of the registry is done by means of SPI. This was mentioned earlier, and the specific class loading is done in the client-side generic configuration file by specifying the properties in the configuration file.


/**
* load ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {

private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();

/**
* create ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// loading by means of SPI, type determined by registerType
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
// perform initialization operations
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}

The source code analysis in this article is based on the Http way of registration, so we first analyze the HttpClientRegisterRepository, and the other registration methods will be analyzed afterwards.

Registration by way of http is very simple, it is to call the tool class to send http requests. The registration metadata and URI are both called by the same method doRegister(), specifying the interface and type.

  • Constants.URI_PATH = /shenyu-client/register-metadata: the interface provided by the server for registering metadata.
  • Constants.META_PATH = /shenyu-client/register-uri: Server-side interface for registering URIs.
@Join
public class HttpClientRegisterRepository extends FailbackRegistryRepository {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientRegisterRepository.class);

private static URIRegisterDTO uriRegisterDTO;

private String username;

private String password;

private List<String> serverList;

private String accessToken;

public HttpClientRegisterRepository() {
}

public HttpClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
init(config);
}

@Override
public void init(final ShenyuRegisterCenterConfig config) {
// admin username
this.username = config.getProps().getProperty(Constants.USER_NAME);
// admin paaword
this.password = config.getProps().getProperty(Constants.PASS_WORD);
// admin server address
this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
// set access token
this.setAccessToken();
}

/**
* Persist uri.
*
* @param registerDTO the register dto
*/
@Override
public void doPersistURI(final URIRegisterDTO registerDTO) {
if (RuntimeUtils.listenByOther(registerDTO.getPort())) {
return;
}
doRegister(registerDTO, Constants.URI_PATH, Constants.URI);
uriRegisterDTO = registerDTO;
}

@Override
public void doPersistInterface(final MetaDataRegisterDTO metadata) {
doRegister(metadata, Constants.META_PATH, Constants.META_TYPE);
}

@Override
public void close() {
if (uriRegisterDTO != null) {
uriRegisterDTO.setEventType(EventType.DELETED);
doRegister(uriRegisterDTO, Constants.URI_PATH, Constants.URI);
}
}

private void setAccessToken() {
for (String server : serverList) {
try {
Optional<?> login = RegisterUtils.doLogin(username, password, server.concat(Constants.LOGIN_PATH));
login.ifPresent(v -> this.accessToken = String.valueOf(v));
} catch (Exception e) {
LOGGER.error("Login admin url :{} is fail, will retry. cause: {} ", server, e.getMessage());
}
}
}

private <T> void doRegister(final T t, final String path, final String type) {
int i = 0;
// iterate through the list of admin services (admin may be clustered)
for (String server : serverList) {
i++;
String concat = server.concat(path);
try {
// 设置访问token
if (StringUtils.isBlank(accessToken)) {
this.setAccessToken();
if (StringUtils.isBlank(accessToken)) {
throw new NullPointerException("accessToken is null");
}
}
// calling the tool class to send http requests
RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);
return;
} catch (Exception e) {
LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());
if (i == serverList.size()) {
throw new RuntimeException(e);
}
}
}
}
}

Serialize the data and send it via OkHttp.


public final class RegisterUtils {

//......

// Sending data via OkHttp
public static void doRegister(final String json, final String url, final String type) throws IOException {
if (!StringUtils.hasLength(accessToken)) {
LOGGER.error("{} client register error accessToken is null, please check the config : {} ", type, json);
return;
}
Headers headers = new Headers.Builder().add(Constants.X_ACCESS_TOKEN, accessToken).build();
String result = OkHttpTools.getInstance().post(url, json, headers);
if (Objects.equals(SUCCESS, result)) {
LOGGER.info("{} client register success: {} ", type, json);
} else {
LOGGER.error("{} client register error: {} ", type, json);
}
}
}

At this point, the logic of the client registering metadata by means of http is finished. To summarize: construct metadata by reading custom annotation information, send the data to the Disruptor queue, then consume the data from the queue, put the consumer into the thread pool to execute, and finally send an http request to the admin.

Similarly, ShenyuClientURIExecutorSubscriber is the execution class of registering URI information.

  • ShenyuClientURIExecutorSubscriber#executor()

The main logic is to iterate through the URI data collection and implement data registration through the persistURI() method.


public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.URI;
}

// register URI
@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
for (URIRegisterDTO uriRegisterDTO : dataList) {
Stopwatch stopwatch = Stopwatch.createStarted();
while (true) {
try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
break;
} catch (IOException e) {
long sleepTime = 1000;
// maybe the port is delay exposed
if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
LOG.error("host:{}, port:{} connection failed, will retry",
uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
// If the connection fails for a long time, Increase sleep time
if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
sleepTime = 10000;
}
}
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
ShenyuClientShutdownHook.delayOtherHooks();

shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
}
}
}

The while(true) loop in the code is to ensure that the client has been successfully started and can connect via host and port.

The logic behind it is: add the hook function for gracefully stopping the client .

Data registration is achieved through the persistURI() method. The whole logic is also analyzed in the previous section, and ultimately it is the OkHttp client that initiates http to shenyu-admin and registers the URI by way of http.

The analysis of the registration logic of the client is finished here, and the metadata and URI data constructed are sent to the Disruptor queue, from which they are then consumed, read, and sent to admin via http.

The source code analysis of the client-side metadata and URI registration process is complete, with the following flow chart.

3. Server-side registration process

3.1 ShenyuHttpRegistryController

From the previous analysis, we know that the server side provides two interfaces for registration.

  • /shenyu-client/register-metadata: The interface provided by the server side is used to register metadata.
  • /shenyu-client/register-uri: The server-side interface is provided for registering URIs.

These two interfaces are located in ShenyuHttpRegistryController, which implements the ShenyuServerRegisterRepository interface and is the implementation class for server-side registration. It is marked with @Join to indicate loading via SPI.

@RequestMapping("/shenyu-client")
@Join
public class ShenyuHttpRegistryController implements ShenyuServerRegisterRepository {

private ShenyuServerRegisterPublisher publisher;

@Override
public void init(final ShenyuServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {
this.publisher = publisher;
}

// register Metadata
@PostMapping("/register-metadata")
@ResponseBody
public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
publisher.publish(metaDataRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}

// register URI
@PostMapping("/register-uri")
@ResponseBody
public String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {
publisher.publish(uriRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}
}

The exact method used is specified by the configuration file and then loaded via SPI.

In the application.yml file in shenyu-admin configure the registration method, registerType specify the registration type, when registering with http, serverLists do not need to be filled in, for more configuration instructions you can refer to the official website Client Access Configuration.

shenyu:
register:
registerType: http
serverLists:
  • RegisterCenterConfiguration

After introducing the relevant dependencies and properties configuration, when starting shenyu-admin, the configuration file will be loaded first, and the configuration file class related to the registration center is RegisterCenterConfiguration.

@Configuration
public class RegisterCenterConfiguration {
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}

//create ShenyuServerRegisterRepository to register in admin
@Bean(destroyMethod = "close")
public ShenyuServerRegisterRepository shenyuServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
// 1. get the registration type from the configuration property
String registerType = shenyuRegisterCenterConfig.getRegisterType();
// 2. load the implementation class by registering the type with the SPI method
ShenyuServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuServerRegisterRepository.class).getJoin(registerType);
// 3. get the publisher and write data to the Disruptor queue
RegisterServerDisruptorPublisher publisher = RegisterServerDisruptorPublisher.getInstance();
// 4. ShenyuClientRegisterService, rpcType -> registerService
Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));
// 5. start publisher
publisher.start(registerServiceMap);
// 6. init registerRepository
registerRepository.init(publisher, shenyuRegisterCenterConfig);
return registerRepository;
}
}

Two beans are generated in the configuration class.

  • shenyuRegisterCenterConfig: to read the attribute configuration.

  • shenyuServerRegisterRepository: for server-side registration.

In the process of creating shenyuServerRegisterRepository, a series of preparations are also performed.

    1. get the registration type from the configuration property.
    1. Load the implementation class by the registration type with the SPI method: for example, if the specified type is http, ShenyuHttpRegistryController will be loaded.
    1. Get publisher and write data to the Disruptor queue.
    1. Register Service, rpcType -> registerService: get the registered Service, each rpc has a corresponding Service. The client for this article is built through springboot, which belongs to the http type, and other client types: dubbo, Spring Cloud, gRPC, etc.
    1. Preparation for event publishing: add server-side metadata and URI subscribers, process the data. And start the Disruptor queue.
    1. Initialization operation for registration: http type registration initialization operation is to save publisher.
  • RegisterClientServerDisruptorPublisher#publish()

The server-side publisher that writes data to the Disruptor queue , built via the singleton pattern.


public class RegisterClientServerDisruptorPublisher implements ShenyuServerRegisterPublisher {
private static final RegisterClientServerDisruptorPublisher INSTANCE = new private static final RegisterClientServerDisruptorPublisher INSTANCE = new RegisterServerDisruptorPublisher();
();

public static RegisterClientServerDisruptorPublisher getInstance() {
return INSTANCE;
}

//prepare for event publishing, add server-side metadata and URI subscribers, process data. And start the Disruptor queue.
public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();
// add URI data subscriber
factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
// add Metadata subscriber
factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
//start Disruptor
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}

// write data to queue
@Override
public <T> void publish(final DataTypeParent data) {
DisruptorProvider<Object> provider = providerManage.getProvider();
provider.onData(Collections.singleton(data));
}

// write data to queue on batch
@Override
public void publish(final Collection<? extends DataTypeParent> dataList) {
DisruptorProvider<Collection<DataTypeParent>> provider = providerManage.getProvider();
provider.onData(dataList.stream().map(DataTypeParent.class::cast).collect(Collectors.toList()));
}

@Override
public void close() {
providerManage.getProvider().shutdown();
}
}

The loading of the configuration file, which can be seen as the initialization process of the registry server, is described in the following diagram.

3.2 QueueConsumer

In the previous analysis of the client-side disruptor queue consumption of data over. The server side has the same logic, except that the executor performing the task changes.

The QueueConsumer is a consumer that implements the WorkHandler interface, which is created in the providerManage.startup() logic. The WorkHandler interface is the data consumption interface for disruptor, and the only method is onEvent().

package com.lmax.disruptor;

public interface WorkHandler<T> {
void onEvent(T var1) throws Exception;
}

The QueueConsumer overrides the onEvent() method, and the main logic is to generate the consumption task and then go to the thread pool to execute it.

/**
*
* QueueConsumer
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {

// ......

@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// Use different thread pools based on DataEvent type
ThreadPoolExecutor executor = orderly(t);
// create queue consumption tasks via factory
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// set data
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// put in the thread pool to execute the consumption task
executor.execute(queueConsumerExecutor);
}
}
}

QueueConsumerExecutor is the task that is executed in the thread pool, it implements the Runnable interface, and there are two specific implementation classes.

  • RegisterClientConsumerExecutor: the client-side consumer executor.
  • RegisterServerConsumerExecutor: server-side consumer executor.

As the name implies, one is responsible for handling client-side tasks and one is responsible for handling server-side tasks.

  • RegisterServerConsumerExecutor#run()

RegisterServerConsumerExecutor is a server-side consumer executor that indirectly implements the Runnable interface via QueueConsumerExecutor and overrides the run() method.


public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<List<DataTypeParent>> {
// ...

@Override
public void run() {
//get the data from the disruptor queue and check data
Collection<DataTypeParent> results = getData()
.stream()
.filter(this::isValidData)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(results)) {
return;
}
//execute operations according to type
getType(results).executor(results);
}

// get subscribers by type
private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {
final Optional<DataTypeParent> first = list.stream().findFirst();
return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());
}
}

  • ExecutorSubscriber#executor()

The actuator subscribers are divided into two categories, one that handles metadata and one that handles URIs. There are two on the client side and two on the server side, so there are four in total.

  • MetadataExecutorSubscriber#executor()

In case of registering metadata, this is achieved by MetadataExecutorSubscriber#executor(): get the registered Service according to the type and call register().

public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.META_DATA;
}

@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
// Traversing the metadata list
metaDataRegisterDTOList.forEach(meta -> {
Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType())) // Get registered Service by type
.ifPresent(shenyuClientRegisterService -> {
// Registration of metadata, locking to ensure sequential execution and prevent concurrent errors
synchronized (shenyuClientRegisterService) {
shenyuClientRegisterService.register(meta);
}
});
});
}
}
  • URIRegisterExecutorSubscriber#executor()

In case of registration metadata, this is achieved by URIRegisterExecutorSubscriber#executor(): construct URI data, find Service according to the registration type, and achieve registration by the registerURI method.


public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
//......

@Override
public DataType getType() {
return DataType.URI;
}

@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return;
}

findService(dataList).ifPresent(service -> {
Map<String, List<URIRegisterDTO>> listMap = buildData(dataList);
listMap.forEach(service::registerURI);
});
final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream()
.filter(data -> StringUtils.isNotBlank(data.getRpcType()))
.collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));
for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {
final String rpcType = entry.getKey();
// Get registered Service by type
Optional.ofNullable(shenyuClientRegisterService.get(rpcType))
.ifPresent(service -> {
final List<URIRegisterDTO> list = entry.getValue();
// Build URI data types and register them with the registerURI method
Map<String, List<URIRegisterDTO>> listMap = buildData(list);
listMap.forEach(service::registerURI);
});
}
}

// Find Service by type
private Optional<ShenyuClientRegisterService> findService(final Collection<URIRegisterDTO> dataList) {
return dataList.stream().map(dto -> shenyuClientRegisterService.get(dto.getRpcType())).findFirst();
}
}

  • ShenyuClientRegisterService#register()

ShenyuClientRegisterService is the registration method interface, which has several implementation classes.

  • AbstractContextPathRegisterService: abstract class, handling part of the public logic.
  • AbstractShenyuClientRegisterServiceImpl: : abstract class, handles part of the public logic.
  • ShenyuClientRegisterDivideServiceImpl: divide class, handles http registration types.
  • ShenyuClientRegisterDubboServiceImpl: dubbo class, handles dubbo registration types.
  • ShenyuClientRegisterGrpcServiceImpl: gRPC class, handles gRPC registration types.
  • ShenyuClientRegisterMotanServiceImpl: Motan class, handles Motan registration types.
  • ShenyuClientRegisterSofaServiceImpl: Sofa class, handles Sofa registration types.
  • ShenyuClientRegisterSpringCloudServiceImpl: SpringCloud class, handles SpringCloud registration types.
  • ShenyuClientRegisterTarsServiceImpl: Tars class, handles Tars registration types.
  • ShenyuClientRegisterWebSocketServiceImplWebsocket class,handles Websocket registration types.

From the above, we can see that each microservice has a corresponding registration implementation class. The source code analysis in this article is based on the official shenyu-examples-http as an example, it is of http registration type, so the registration implementation class for metadata and URI data is ShenyuClientRegisterDivideServiceImpl: ShenyuClientRegisterDivideServiceImpl.

  • register():
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {

//......

public String register(final MetaDataRegisterDTO dto) {
// 1.register selector information
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
// 2.register rule information
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
// 3.register metadata information
registerMetadata(dto);
// 4.register contextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
}

The whole registration logic can be divided into 4 steps.

    1. Register selector information
    1. Register rule information
    1. Register metadata information
    1. Register `contextPath

This side of admin requires the construction of selectors, rules, metadata and ContextPath through the metadata information of the client. The specific registration process and details of processing are related to the rpc type. We will not continue to track down the logical analysis of the registration center, tracking to this point is enough.

The source code of the server-side metadata registration process is analyzed and the flow chart is described as follows.

  • registerURI()
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {

//......

public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
// Does the corresponding selector exist
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
return "";
}
// Handle handler information in the selector
String handler = buildHandle(uriList, selectorDO);
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);

// Update records in the database
selectorService.updateSelective(selectorDO);
// publish Event to gateway
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
return ShenyuResultMessage.SUCCESS;
}
}

After admin gets the URI data, it mainly updates the handler information in the selector, then writes it to the database, and finally publishes the event notification gateway. The logic of notifying the gateway is done by the data synchronization operation, which has been analyzed in the previous article, so we will not repeat it.

The source code analysis of the server-side URI registration process is complete and is described in the following diagram.

At this point, the server-side registration process is also analyzed, mainly through the interface provided externally, accept the registration information from the client, and then write to the Disruptor queue, and then consume data from it, and update the admin selector, rules, metadata and selector handler according to the received metadata and URI data.

4. Summary

This article focuses on the http registration module of the Apache ShenYu gateway for source code analysis. The main knowledge points involved are summarized as follows.

  • The register center is for registering client information to admin to facilitate traffic filtering.
  • http registration is to register client metadata information and URI information to admin.
  • http service access is identified by the annotation @ShenyuSpringMvcClient.
  • construction of the registration information mainly through the application listener ApplicationListener.
  • loading of the registration type is done through SPI.
  • The Disruptor queue was introduced to decouple data from operations, and data buffering.
  • The implementation of the registry uses interface-oriented programming, using design patterns such as template methods, singleton, and observer.

· 14 min read

Gateway applications need to support a variety of load balancing strategies, including random,Hashing, RoundRobin and so on. In Apache Shenyu gateway, it not only realizes such traditional algorithms, but also makes smoother traffic processing for the entry of server nodes through detailed processing such as traffic warm-up, so as to obtain better overall stability. In this article, let's walk through how Apache Shenyu is designed and implemented this part of the function.

This article based on shenyu-2.5.0 version of the source code analysis.

[TOC]

LoadBalancer SPI

The implementation of LoadBalancer is in shenyu-loadbalancer module. It has based on its SPI creation mechanism. The core interface code is shown as follows. This interface well explains the concept: load balancing is to select the most appropriate node from a series of server nodes. Routing, traffic processing and load balancing is the basic function of LoadBalancer SPI.

@SPI
public interface LoadBalancer {

/**
* this is select one for upstream list.
*
* @param upstreamList upstream list
* @param ip ip
* @return upstream
*/
Upstream select(List<Upstream> upstreamList, String ip);
}

Where upstreamList represents the server nodes list available for routing. Upstream is the data structure of server node, the important elements including protocol, upstreamUrl , weight, timestamp, warmuphealthy.

public class Upstream {
/**
* protocol.
*/
private final String protocol;

/**
* url.
*/
private String url;

/**
* weight.
*/
private final int weight;

/**
* false close, true open.
*/
private boolean status;

/**
* startup time.
*/
private final long timestamp;

/**
* warmup.
*/
private final int warmup;

/**
* healthy.
*/
private boolean healthy;

/**
* lastHealthTimestamp.
*/
private long lastHealthTimestamp;

/**
* lastUnhealthyTimestamp.
*/
private long lastUnhealthyTimestamp;

/**
* group.
*/
private String group;

/**
* version.
*/
private String version;
}


Design of LoadBalancer module

The class diagram of LoadBalancer moduleisshown as follows.

loadbalancer-class-diagram

We can draw the outline of LoadBalancer module from the class diagram:

  1. The abstract class AbstractLoadBalancer implements the SPI LoadBalancer interface,and supplies the template methods for selection related, such as select(), selector(),and gives the calculation of weight.

  2. Three implementation classes which inherit AbstractLoadBalancer to realize their own logic:

    • RandomLoadBalancer - Weight Random
    • HashLoadBalancer - Consistent Hashing
    • RoundRobinLoadBalancer -Weight Round Robin per-packet
  3. The factory class LoadBalancerFactory provides public static method to be called.

    The implementation classes and algorithms are configurable. According to its specification, by adding profile in SHENYU_DIERECTORY directory, the data in profile should be key=value-class format, where the value-class will be load by the Apache Shenyu SPI class loader, and key value should be an name defined in LoadBalanceEnum.

random=org.apache.shenyu.loadbalancer.spi.RandomLoadBalancer
roundRobin=org.apache.shenyu.loadbalancer.spi.RoundRobinLoadBalancer
hash=org.apache.shenyu.loadbalancer.spi.HashLoadBalancer

The code of LoadBalanceEnum is as follows:

public enum LoadBalanceEnum {
/**
* Hash load balance enum.
*/
HASH(1, "hash", true),

/**
* Random load balance enum.
*/
RANDOM(2, "random", true),

/**
* Round robin load balance enum.
*/
ROUND_ROBIN(3, "roundRobin", true);

private final int code;
private final String name;
private final boolean support;
}

AbstractLoadBalancer

This abstract class implements the LoadBalancer interface and define the abstract method doSelect() to be processed by the implementation classes. In the template method select(), It will do validation first then call the doSelect() method.

public abstract class AbstractLoadBalancer implements LoadBalancer {
/**
* Do select divide upstream.
*
* @param upstreamList the upstream list
* @param ip the ip
* @return the divide upstream
*/
protected abstract Upstream doSelect(List<Upstream> upstreamList, String ip);

@Override
public Upstream select(final List<Upstream> upstreamList, final String ip) {
if (CollectionUtils.isEmpty(upstreamList)) {
return null;
}
if (upstreamList.size() == 1) {
return upstreamList.get(0);
}
return doSelect(upstreamList, ip);
}
}

When the timestamp of server node is not null, and the interval between current time and timestamp is within the traffic warm-up time, the formula for weight calculation is. $$ 0 ww = min(1,uptime/(warmup/weight)) $$ It can be seen from the formula that the final weight(ww) is proportional to the original-weight value. The closer the time interval is to the warmup time, the greater the final ww. That is, the longer the waiting time of the request, the higher the final weight. When there is no timestamp or other conditions, the ww is equal to the weight value of Upstream object.

The central of thinking about warm-upis to avoid bad performance when adding new server and the new JVMs starting up.

Let's see how the load balancing with Random, Hashing and RoundRobin strategy is implemented.

RandomLoadBalancer

The RandomLoadBalancer can handle two situations:

  1. Each node without weight, or every node has the same weight, randomly choose one.
  2. Server Nodes with different weight, choose one randomly by weight.

Following is the random() method of RandomLoadBalancer. When traversing server node list, if the randomly generated value is less than the weight of node, then the current node will be chosen. If after one round traversing, there is no server node match, then it will choose one randomly. The getWeight(final Upstream upstream) is defined in AbstractLoadBalancer class.

    @Override
public Upstream doSelect(final List<Upstream> upstreamList, final String ip) {
int length = upstreamList.size();
// every upstream has the same weight?
boolean sameWeight = true;
// the weight of every upstream
int[] weights = new int[length];
int firstUpstreamWeight = getWeight(upstreamList.get(0));
weights[0] = firstUpstreamWeight;
// init the totalWeight
int totalWeight = firstUpstreamWeight;
int halfLengthTotalWeight = 0;
for (int i = 1; i < length; i++) {
int currentUpstreamWeight = getWeight(upstreamList.get(i));
if (i <= (length + 1) / 2) {
halfLengthTotalWeight = totalWeight;
}
weights[i] = currentUpstreamWeight;
totalWeight += currentUpstreamWeight;
if (sameWeight && currentUpstreamWeight != firstUpstreamWeight) {
// Calculate whether the weight of ownership is the same.
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
return random(totalWeight, halfLengthTotalWeight, weights, upstreamList);
}
return random(upstreamList);
}

private Upstream random(final int totalWeight, final int halfLengthTotalWeight, final int[] weights, final List<Upstream> upstreamList) {
// If the weights are not the same and the weights are greater than 0, then random by the total number of weights.
int offset = RANDOM.nextInt(totalWeight);
int index = 0;
int end = weights.length;
if (offset >= halfLengthTotalWeight) {
index = (weights.length + 1) / 2;
offset -= halfLengthTotalWeight;
} else {
end = (weights.length + 1) / 2;
}
// Determine which segment the random value falls on
for (; index < end; index++) {
offset -= weights[index];
if (offset < 0) {
return upstreamList.get(index);
}
}
return random(upstreamList);
}

HashLoadBalancer

In HashLoadBalancer, it takes the advantages of consistent hashing , that maps both the input traffic and the servers to a unit circle, or name as hash ring. For the requestedip address, with its hash value to find the node closest in clockwise order as the node to be routed. Let's see how consistent hashing is implemented in HashLoadBalancer.

As to the hash algorithms, HashLoadBalancer uses MD5 hash, which has the advantage of mixing the input in an unpredictable but deterministic way. The output is a 32-bit integer. the code is shown as follows:

private static long hash(final String key) {
// md5 byte
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new ShenyuException("MD5 not supported", e);
}
md5.reset();
byte[] keyBytes;
keyBytes = key.getBytes(StandardCharsets.UTF_8);
md5.update(keyBytes);
byte[] digest = md5.digest();
// hash code, Truncate to 32-bits
long hashCode = (long) (digest[3] & 0xFF) << 24
| ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8)
| (digest[0] & 0xFF);
return hashCode & 0xffffffffL;
}

Importantly, how to generate the hash ring and avoid skewness? Let's thedoSelect() method inHashLoadBalancer as follows:

    private static final int VIRTUAL_NODE_NUM = 5;

@Override
public Upstream doSelect(final List<Upstream> upstreamList, final String ip) {
final ConcurrentSkipListMap<Long, Upstream> treeMap = new ConcurrentSkipListMap<>();
upstreamList.forEach(upstream -> IntStream.range(0, VIRTUAL_NODE_NUM).forEach(i -> {
long addressHash = hash("SHENYU-" + upstream.getUrl() + "-HASH-" + i);
treeMap.put(addressHash, upstream);
}));
long hash = hash(ip);
SortedMap<Long, Upstream> lastRing = treeMap.tailMap(hash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return treeMap.firstEntry().getValue();
}

In this method, duplicated labels are used which are called "virtual nodes" (i.e. 5 virtual nodes point to a single "real" server). It will make the distribution in hash ring more evenly, and reduce the occurrence of data skewness.

In order to rescue the data sorted in the hash ring, and can be accessed quickly, we use ConcurrentSkipListMap of Java to store the server node lists ( with virtual nodes) and its hash value as key. This class a member of Java Collections Framework, providing expected average log(n) time cost for retrieve and access operations safely execute concurrent by multiple threads.

Furthermore, the method tailMap(K fromKey) of ConcurrentSkipListMap can return a view of portion of the map whose keys are greater or equal to the fromKey, and not need to navigate the whole map.

In the above code section, after the hash ring is generated, it uses tailMap(K fromKey) of ConcurrentSkipListMap to find the subset that the elements greater, or equal to the hash value of the requested ip, its first element is just the node to be routed. With the suitable data structure, the code looks particularly clear and concise.

Consistent hashing resolved the poor scalability of the traditional hashing by modular operation.

RoundRobinLoadBalancer

The original Round-robin selection is to select server nodes one by one from the candidate list. Whenever some nodes has crash ( ex, cannot be connected after 1 minute), it will be removed from the candidate list, and do not attend the next round, until the server node is recovered and it will be add to the candidate list again. In RoundRobinLoadBalancer,the weight Round Robin per-packet schema is implemented.

In order to work in concurrent system, it provides an inner static class WeigthRoundRobin to store and calculate the rolling selections of each server node. Following is the main section of this class( removed remark )

protected static class WeightedRoundRobin {

private int weight;

private final AtomicLong current = new AtomicLong(0);

private long lastUpdate;

void setWeight(final int weight) {
this.weight = weight;
current.set(0);
}
long increaseCurrent() {
return current.addAndGet(weight);
}

void sel(final int total) {
current.addAndGet(-1 * total);
}
void setLastUpdate(final long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}

Please focus on the these method:

  • setWeight(final int weight) : set the current value by weight

  • increaseCurrent(): Increment the current value by weight, and current set to 0.

  • sel(final int total): decrement the current value by total

    Let's see how the weight factor being used in this round-robin selection?

    First it defines a two-level ConcurrentMap type variable named as methodWeightMap , to cache the server node lists and the rolling selection data about each server node.

private final ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<>(16);

In this map, the key of first level is set to upstreamUrl of first element in server node list. The type of second object is ConcurrentMap<String, WeightedRoundRobin>, the key of this inner Map is the value upstreamUrlvariable of each server node in this server list, the value object is WeightedRoundRobin, used to trace the rolling selection data about each server node. As to the implementation class for the Map object, we use ConcurrentHashMap of JUC, a hash table supporting full concurrency of retrievals and high expected concurrency for updates.

In the second level of the map, the embedded static class - WeighedRoundRobin of each node is thread-safe, implementing the weighted RoundRobin per bucket. The following is the code of the doselect() method of this class.

@Override
public Upstream doSelect(final List<Upstream> upstreamList, final String ip) {
String key = upstreamList.get(0).getUrl();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (Objects.isNull(map)) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Upstream selectedInvoker = null;
WeightedRoundRobin selectedWeightedRoundRobin = null;
for (Upstream upstream : upstreamList) {
String rKey = upstream.getUrl();
WeightedRoundRobin weightedRoundRobin = map.get(rKey);
int weight = getWeight(upstream);
if (Objects.isNull(weightedRoundRobin)) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(rKey, weightedRoundRobin);
}
if (weight != weightedRoundRobin.getWeight()) {
// weight changed.
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = upstream;
selectedWeightedRoundRobin = weightedRoundRobin;
}
totalWeight += weight;
}
...... //erase the section which handles the time-out upstreams.
if (selectedInvoker != null) {
selectedWeightedRoundRobin.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return upstreamList.get(0);
}

For example we assume upstreamUrl values of three server nodes is: LIST = [upstream-20, upstream-50, upstream-30]. After a round of execution, the data in newly created methodWeightMap is as follows:

methodWeightMap

For the above example LIST, assumes the weight array is [20,50,30]. the following figure shows the value change and polling selection process of the current array in WeighedRoundRobin object.

weighted-roundrobin-demo

In each round, it will choose the server node with max current value.

  • Round1:
    • Traverse the server node list, initialize the weightedRoundRobin instance of each server node or update the weight value of server nodes object Upstream
    • Traverse the server node list, initialize the weightedRoundRobin instance of each server node or update the weight value of server nodes object Upstream
    • say, in this case, after traverse, the current array of the node list changes to [20, 50,30],so according to rule, the node Stream-50 would be chosen, and then the static object WeightedRoundRobin of Stream-50 executes sel(-total) , the current array is now [20,-50, 30].
  • Round 2: after traverse, the current array should be [40,0,60], so the Stream-30 node would be chosen, current array is now [40,0,-40].
  • Round 3: after traverse, current array changes to [60,50,-10], Stream-20 would be chosen,and current array is now [-40,50,-10].

When there is any inconsistence or some server crashed, for example, the lists size does not match with the elements in map, it would copy and modify the element with lock mechanism, and remove the timeout server node, the data in Map updated. Following is the fault tolerance code segment.

    if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference.
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
if (Objects.nonNull(selectedInvoker)) {
selectedWeightedRoundRobin.sel(totalWeight);
return selectedInvoker;
}
// should not happen here.
return upstreamList.get(0);

LoadBalancerFactory

In this class, a static method calling LoadBalancer is provided, whereExtensionLoader is the entry point of Apache Shenyu SPI. That is to say, LoadBalancer module is configurable and extensible. The algorithm variable in this static method is the name enumeration type defined in LoadBalanceEnum.

    /**
* Selector upstream.
*
* @param upstreamList the upstream list
* @param algorithm the loadBalance algorithm
* @param ip the ip
* @return the upstream
*/
public static Upstream selector(final List<Upstream> upstreamList, final String algorithm, final String ip) {
LoadBalancer loadBalance = ExtensionLoader.getExtensionLoader(LoadBalancer.class).getJoin(algorithm);
return loadBalance.select(upstreamList, ip);
}

Using of LoadBalancer module

In the above section, we describe the LoadBalancer SPI and three implementation classes. Let's take a look at how the LoadBalancer to be used in Apache Shenyu. DividePlugin is a plugin in Apache Shenyu responsible for routing http request. when enable to use this plugin, it will transfer traffic according to selection data and rule data, and deliver to next plugin downstream.

@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
......
}

The type of second parameter of doExecute() is ShenyuPluginChain, which represents the execution chain of plugins. For details, see the mechanism of Apache Shenyu Plugins. The third one is SelectorData type, and the fourth is RuleData type working as the rule data.

In doExecute() of DividePlugin, first verify the size of header, content length, etc, then preparing for load balancing.

Following is a code fragment usingLoadBalancer in the doExecute() method:

    // find the routing server node list
List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
...
// the requested ip
String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();

//calling the Utility class and invoke the LoadBalance processing.
Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip);

In the above code, the output ofruleHandle.getLoadBalance() is the name variable defined in LoadBalanceEnum, that is random, hash, roundRobin, etc. It is very convenient to use LoadBalancer by LoadBalancerFactory. When adding more LoadBalancer implementing classes, the interface in plugin module will not be effect at all.

Summary

After reading through the code of LoadBalancer module, from the design perspective, it is concluded that this module has the following characteristics:

  1. Extensibility: Interface oriented design and implemented on Apache Shenyu SPI mechanism, it can be easily extended to other dynamic load balancing algorithms (for example, least connection, fastest mode, etc), and supports cluster processing.
  2. Scalability: Every load balancing implementation, weighted Random, consistency Hashing and weighted RoundRobin can well support increase or decrease cluster overall capacity.
  3. More detailed design such as warm-up can bring better performance and obtain better overall stability.

· 5 min read

In most of the plugins ( such as Dubbo, gRPC,Spring-cloud, etc) of Apache Shenyu, the routingparameters are designed to support the combination of multiple conditions. In order to realize such requirements, the parameters and behaviors are abstracted to three parts according to its SPI mechanism, and implemented in shenyu-plugin-base module.

  • ParameterData-parameters
  • PredictJudge-predicate
  • MatchStrategy-matching strategy

Relatively speaking, the MatchStrategy is the part that needs the least extension points. For the combined judgement of multiple conditions, the common selection rules are: All conditions are matched, at least one is matched, at least the first is met, or most of conditions satisfied. As we will need to handle various types of parameters, for example: IP, header, uri, etc.

How to make the MatchStrategy to be simple to use and extensible?

MatchStrategy

The implementation of MatchStrategy is in shenyu-plugin-base module. It is based on the SPI creation mechanism, and has used factory pattern and strategy design pattern. The class diagram of MatchStrategy is showed as follows.

MatchStrategy-class-diagram

Based on the interface MatchStrategy we design the implementation classes, and the abstract class AbstractMatchStrategy supplies common method, while the factory class MatchStrategyFactory provides creation functions.

MatchStrategy Interface

First, let's look at the MatchStrategy SPI interface

@SPI
public interface MatchStrategy {

Boolean match(List<ConditionData> conditionDataList, ServerWebExchange exchange);
}

The annotation @SPI means that this is an SPI interface. Where ServerWebExchange is org.springframework.web.server.ServerWebExchange, represents the request-response interactive content of HTTP. Following is the code of ConditionData, the more detail about this class can refer to code analysis of PredicteJudge

public class ConditionData {

private String paramType;
private String operator;

private String paramName;
private String paramValue;
}

AbstractMatchStrategy

Second, let's look at the abstract class AbstractMatchStrategy,it has defined a buildRealData method,In this method it wraps various parameters to a unified interface through the functionality of ParameterDataFactory, which is the factory class of ParameterData. It supports a variety of types of parameters , such as Ip, Cookie, Header,uri, etc. Modifications of such parameters will not impact the calling of matching rules of MatchStrategy.

public abstract class AbstractMatchStrategy {

public String buildRealData(final ConditionData condition, final ServerWebExchange exchange) {
return ParameterDataFactory.builderData(condition.getParamType(), condition.getParamName(), exchange);
}
}

Implementation class and profile

Now, let's look at the two implementation class based on the above interface in shenyu-plugin-base module , that is:

  • AndMatchStrategy- AND -All conditions are matched

  • OrMatchStrategy- OR -at least one is match

    The properties file containing the SPI implementation is shown as follows, which located at the SHENYU_DIRECTORYdirectory. When starting up, the top-level SPI classes will read the key-value and load the classes and cache them.

and=org.apache.shenyu.plugin.base.condition.strategy.AndMatchStrategy
or=org.apache.shenyu.plugin.base.condition.strategy.OrMatchStrategy

These two implementation classes inherit AbstractMatchStrategy class and implement MatchStrategy interface.

AndMatchStrategy- “AND” relation

Since the PredicateJudge interface can encapsulate different variety of Predicates , for example EqualsPredicateJudge, EndsWithPredicateJudge and so on, the ConditionData and ParamData passed to it can present with variety of parameters, for treating of multiple conditions. So usingstream and lambda expression, it can be very simple and efficient to process "AND" logic (all conditions must be matched).

@Join
public class AndMatchStrategy extends AbstractMatchStrategy implements MatchStrategy {

@Override
public Boolean match(final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {
return conditionDataList
.stream()
.allMatch(condition -> PredicateJudgeFactory.judge(condition, buildRealData(condition, exchange)));
}
}

The OrMatchStrategy similarly implements the "OR" logic- at least one is match.

MatchStrategyFactory

This is the factory class of MatchStrategy,there are two methods, one is newInstance(), which will return the MatchStrategy implementation class instance cached by the SPI ExtensionLoader indexed by the key-value.

    public static MatchStrategy newInstance(final Integer strategy) {
String matchMode = MatchModeEnum.getMatchModeByCode(strategy);
return ExtensionLoader.getExtensionLoader(MatchStrategy.class).getJoin(matchMode);
}

the matchMode will be the name of strategy, the value will be "and" or "or". The MatchModeEnum defines the code and name of match strategy as follows.

AND(0, "and"), 
OR(1, "or");

Another method is match() method, which will invoke the match() method of implementation class.

    public static boolean match(final Integer strategy, final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {
return newInstance(strategy).match(conditionDataList, exchange);
}

How it works

AbstractShenyuPlugin is the base class of plugins in shenyu-plugin module. In this class two selection method are defined: filterSelector() and filterRule() , Both of them call the match() method of MatchStrategyFactory. The code of filterSelector() is shown as follows.

    private Boolean filterSelector(final SelectorData selector, final ServerWebExchange exchange) {
if (selector.getType() == SelectorTypeEnum.CUSTOM_FLOW.getCode()) {
if (CollectionUtils.isEmpty(selector.getConditionList())) {
return false;
}
return MatchStrategyFactory.match(selector.getMatchMode(), selector.getConditionList(), exchange);
}
return true;
}

In filterSelector() method, after validation of the SelectorData, calls the match method of MatchStrategyFactory, and then this factory class will invokes the match method of corresponding implementation class.

    private Boolean filterRule(final RuleData ruleData, final ServerWebExchange exchange) {
return ruleData.getEnabled() && MatchStrategyFactory.match(ruleData.getMatchMode(), ruleData.getConditionDataList(), exchange);
}

In filterRule() it is also calls the match() method of MatchStrategyFactory. Does it look particularly concise or even simple? In the code analysis of PredicteJudge , you can see more detail about parameter processing in shenyu-plugin.

Summary

Due to the use of SPI mechanism of Apache Shenyu, the parameter selection module has the characteristic of loose coupling and extensibility. In terms of the combination of multiple conditions, MatchStrategy provides a good design. Although currently only two implementation classes are present, it can be easily used to develop more complex MatchStrategy rules in the future, such as "firstOf"-first condition must matched, or "mostOf"- most of the conditions must be matched, etc.

Interested readers can read the source code of 'shenyu-plugin' to learn more.

· 6 min read

Apache Shenyu has been identified as a gateway application which supports a variety of protocols and microservice frameworks such as Dubbo, gRPC, Spring-Cloud, etc. To do this, the product has accomplished an elegant SPI (Service Provider Interface) as its foundation, and make the Rule data parsing and predicting program very simple , resiliency and security. As to rule data parsing processing, the SPI design increases the product's scalability. When appending new plugin, in most cases, the existing module is enough for rule data parsing , otherwise it can be rapidly carry out with tiny effort.

Top level design of SPI

In Apache Shenyu, the SPI archtecure is defined in shenyu-spi module and composed of three parts: SPI interface, factory design pattern, and configuration file. There is two interface defined as annotation: @SPI and @Join. When class file with @Join annotation, it means that it will join as an SPI extension class, in other words, it is an application or registration. The @SPI denotes that the class is an SPI extension class.

Fig 1 classes in the shenyu-spi

toplevel-SPI

The SPI configuration directory is META-INF/shenyu/. that is specified:

SHENYU_DIRECTORY = "META-INF/shenyu/";

When starting the gateway system , the ExtensionLoader will scan the profiles under SHENYU_DIRECTORY, in turn, load and validate and then initialize each configed class. The configuration file uses "Key = class-file" format. During operation of the system, the corresponding SPI implementation class will be invoked through the factory mechanism.

Implementation of shenyu-plugin SPI

In shenyu-plugin module, various plugins for HTTP routing are implemented according to the plugin mechanism, including request, redirect, response and rewrite, etc. Plugins for microservice frameworks such as Dubbo, gRPC , Spring-Cloud and Tars have been developed in the gateway product. And plugins are still increasing. If no such dependent module fo parsing and judge routing parameters and data, each plugin is necessary to implement the parsing functions, and has to frequently modify to support their matching rules, such as wildcard, regular expression, SpEL expression, etc. Therefore, they made a high level abstraction for routing parameter data following the SPI framework in shenyu-plugin module. The rule analysis consists of three parts:

  • ParameterData- parameter data

  • PredicatJudge- predicate whether the actural data match the rule

  • MatchStrategy- combine multiple conditions, the final used strategy

These implementation classes are defined in shenyu-plugin-base module. In each plugin, resolution and predication of the routing parameter can be realized through AbstractShenyuPlugin using the above SPIs. That is dedicated and easy to extend, in line with SOLID principle.

​ This section analyzes the PredictJudge in detail. You can find the dependency to shenyu-spi in the pom.xml of this module.

<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-spi</artifactId>
<version>${project.version}</version>
</dependency>

Design of PredicateJudge SPI

PredicateJudge SPI is used to analyze and judge various routing rules configed in Apache Shenyu gateway. The name and functions of this SPI are similar to Predicate in Java, but the acceptance behavior is further abstracted applying for routing aspect. This SPI is implemented through the Factory pattern. Let's look at the Predictejudge SPI interface:

@SPI
@FunctionalInterface
public interface PredicateJudge {

/**
* judge conditionData and realData is match.
*
* @param conditionData {@linkplain ConditionData}
* @param realData realData
* @return true is pass false is not pass.
*/
Boolean judge(ConditionData conditionData, String realData);
}

The class diagram is as follows:

Fig 2-Predicate class diagram

predicate-class-diagram

The important methods of PredicateJudgeFactory are shown as follows:

Whenever need to parsing and matching routing data, you can use

    public static PredicateJudge newInstance(final String operator) {
return ExtensionLoader.getExtensionLoader(PredicateJudge.class).getJoin(processSpecialOperator(operator));
}
    public static Boolean judge(final ConditionData conditionData, final String realData) {
if (Objects.isNull(conditionData) || StringUtils.isBlank(realData)) {
return false;
}
return newInstance(conditionData.getOperator()).judge(conditionData, realData);
}

ConditionData contains of four attributes of String type: paramType, operator,paramName,paramValue

ParamTypeEnum

Where paramType must be the enumeration type ParamTypeEnum. The default supported paramType are:

post, uri,query, host, ip,header, cookie,req_method

OperatorEnum

operator must be the enumeration type OperatorEnum, currently supported operators are:

   match, =,regex, >,<, contains, SpEL,  Groovy, TimeBefore,TimeAfter

Base on the above defination , the plugin module provides the following eight PredicateJudge implemetation classes to realize the logic of these operators respectively.

Implementation classLogic descriptioncorespondece operator
ContainsPredicateJudge"contain" relation, the actual data needs contain the specified stringcontains
EqualsPredicateJudgeequals "="=
MatchPredicateJudgeused for URI context path matchingmatch
TimerAfterPredicateJudgeWhether the local time is after the specified timeTimeAfter
TimerBeforePredicateJudgeWhether the local time is before the specified timeTimeBefore
GroovyPredicateJudgeused Groovy syntax toe set ParamName and value dataGroovy
RegexPredicateJudgeused Regex to matchregex

How to use PredicateJudge

When you want to parse parameters, you only need to call PredicateJudgeFactory as follows.

PredicateJudgeFactory.judge(final ConditionData conditionData, final String realData);

SPI profile

The implementation class is configed in the file under directory SHENYU_DIRECTORY . It will be loaded and cached at startup.

equals=org.apache.shenyu.plugin.base.condition.judge.EqualsPredicateJudge

contains=org.apache.shenyu.plugin.base.condition.judge.ContainsPredicateJudge
Groovy=org.apache.shenyu.plugin.base.condition.judge.GroovyPredicateJudge
match=org.apache.shenyu.plugin.base.condition.judge.MatchPredicateJudge
regex=org.apache.shenyu.plugin.base.condition.judge.RegexPredicateJudge
SpEL=org.apache.shenyu.plugin.base.condition.judge.SpELPredicateJudge
TimeAfter=org.apache.shenyu.plugin.base.condition.judge.TimerAfterPredicateJudge
TimeBefore=org.apache.shenyu.plugin.base.condition.judge.TimerBeforePredicateJudge

The usage of PredicateJudge SPI in Shenyu gateway

Most plugins in Apache Shenyu are inherited from AbstractShenyuPlugin. In this abstract class, the filter functions (selection and matching) are achieved through MatchStrategy SPI, and PredicateJudge will be invoked from MatchStrategy to predicate each condition data.

Fig 3- class diagram of plugins with PredicateJudge and MatchStrategy SPI

plugin-SPI-class-diagram

The process from client request calling the routing parsing moodule is showed as following chart.

Fig 4- flow chart for Shenyu gateway filter with parameter processing

SPI-flow-diagram

  • When startup, the system will load SPI classes from profile and cache them.
  • When the client sends a new request to the Shenyu gateway, will call the corresponding plugin within the gateway.
  • When analyzing real data with routing rules, the PredicateJudge implementation class will be invoked according to the contained operator.

Others

Examples of PredicateJudge judgement

ContainsPredicateJudge- " contains“ rule

For example, giving a ConditionData with: paramType="uri", paramValue 是 "/http/**", when using the "contains" relation: ContainsPredicateJudge, the matching result is as follows.

ConditionData (operator="contains")real datajudge result
paramType="uri", "/http/**""/http/**/test"true
"/test/http/**/other"true
"/http1/**"false

About other PredicateJudge implemetantion classes, you can refer to the code and test classes.

· 16 min read

Rate limiter is a very important integral of gateway application, to deal with high traffic. When the system is attacked abnormally by a large number of traffic gathered in a short time; When there are a large number of lower priority request need to be slow down or else it will effect your high priority transactions; Or sometimes your system can not afford the regular traffic; in these scenarios, we need to start rate limiter component to protect our system, through rejection, wait, load shedding,etc, limit the requests to an acceptable quantities, or only certain domains (or services) requests can get through.

Facing above scenarios, following need to be considered when designing the rate limiter component of an gateway.

  1. Supports a variety of rate limiter algorithms and easy to extends.
  2. Resilient resolvers which can distinguish traffic by different way, such as ip, url, even user group etc.
  3. High availability, can quickly get allow or reject result from rate limiter
  4. With fault tolerance against when rate limiter is down, the gateway can continue work.

This article will first introduce the overall architecture of the rate limiter module in Apache Shenyu, and then focus on the code analysis of rate limiter SPI.

This article based on shenyu-2.4.0 version of the source code analysis.

Overall design of RateLimiter

Spring WebFlux is reactive and non-blocking web framework, which can benefit throughput and make applications more resilient. The plugin of Apache Shenyu is based on WebFlux,its rate limiter component is implemented in ratelimiter-plugin. In rate limiter process, the commonly used algorithms are token bucket, leaky bucket, etc. To speed up concurrency performance, the counting and calculation logic is treated in Redis, and Java code is responsible for the transmission of parameters. When applying Redis, the Lua script can be resident memory, and be executed as a whole, so it is atomic. Let alone the reducing of network overhead. Redis commands abstraction and automatic serialization/deserialization with Redis store is provided in Spring Data Redis. Because of based on reactive framework, the Spring Redis Reactive is used in ratelimiter-plugin.

The class diagram of this plugin is as follows, highlighting two packages related to RateLimiter SPI: resolver 和algorithm.

ratelimiter-package-diagram

Design of RateLimiter SPI

High performance issue is achieved through the architecture of Spring data+ Redis+Lua , two SPI are supplied in ratelimiter-plugin for the extension of algorithm and key resolver。

  • RateLimiterAlgorithm:used for algorithms expansion.
  • RateLimiterKeyResolver: used for resolver expansion, to distinguish requests by various information, including ip, url, ect.

The profile of SPI is located at directory of SHENYU_DIRECTORY (default/META-INF/shenyu).

RateLimiterKeyResolver

Obtain the critical info of the request used for packet rate limiter,the interface of RateLimiterKeyResolver is follows:

@SPI
public interface RateLimiterKeyResolver {

/**
* get Key resolver's name.
*
* @return Key resolver's name
*/
String getKeyResolverName();

/**
* resolve.
*
* @param exchange exchange the current server exchange {@linkplain ServerWebExchange}
* @return rate limiter key
*/
String resolve(ServerWebExchange exchange);
}

@SPI registers the current interface as Apache Shenyu SPI. Method resolve(ServerWebExchange exchange) is used to provide the resolution way. Currently there are two key resolvers in RateLimiterKeyResolver SPI:WholeKeyResolve and RemoteAddrKeyResolver. The resolve method of RemoteAddrKeyResolveris as follows:

    @Override
public String resolve(final ServerWebExchange exchange) {
return Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
}

Where the resolved key is ip of request. Based on SPI mechanism and its factory pattern, new resolver can be easily developed.

RateLimiterAlgorithm SPI

RateLimiterAlgorithm SPI is used to identify and define different rate limiter algorithms, following is the class diagram of this module.

ratelimiteral-class-diagram

In this module, factory pattern is used , providing interface, abstract class and factory class, and four implementation classes. The lua script corresponding to the implementation class is enumerated in RateLimitEnum and located in /META-INF/scripts.

@SPI
public interface RateLimiterAlgorithm<T> {

RedisScript<T> getScript();
List<String> getKeys(String id);

/**
* Callback string.
*
* @param script the script
* @param keys the keys
* @param scriptArgs the script args
*/
default void callback(final RedisScript<?> script, final List<String> keys, final List<String> scriptArgs) {
}
}

@SPI registers the current interface as Apache Shenyu SPI. There are three methods:

  • getScript() returns a RedisScript object, which will be passed to Redis.
  • getKeys(String id) returns a List contains with keys.
  • callback() the callback function which will be executed asynchronously later on, and default is an empty method.

AbstractRateLimiterAlgorithm

The template method is implemented in this abstract class, and the reified generics used is List<Long>. Two abstract methods getScriptName() and getKeyName() are left for the implementation class. Following is the code to load lua script.

    public RedisScript<List<Long>> getScript() {
if (!this.initialized.get()) {
DefaultRedisScript redisScript = new DefaultRedisScript<>();
String scriptPath = "/META-INF/scripts/" + getScriptName();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(scriptPath)));
redisScript.setResultType(List.class);
this.script = redisScript;
initialized.compareAndSet(false, true);
return redisScript;
}
return script;
}

initialized is an AtomicBoolean type variable used to indicate whether the lua script is loaded. If has not been loaded, the system will read specified scripts form META-INF/scripts; After reading, specify the result with List type, and set initialized=true, then returning RedisScriptobject.

The code of getKeys() in AbstractRateLimiterAlgorithm is as follows:

    @Override
public List<String> getKeys(final String id) {
String prefix = getKeyName() + ".{" + id;
String tokenKey = prefix + "}.tokens";
String timestampKey = prefix + "}.timestamp";
return Arrays.asList(tokenKey, timestampKey);
}

Two strings are generated in this template method, where the tokenKey will work as Key to a Sorted map in Redis.

We can observe from above class diagram that ConcurrentRateLimiterAlgorithm and SlidingWindowRateLimiterAlgorithm override getKeys(String id) method but another two implementation classes not, and use template method in AbstractRateLimiterAlgorithm. Only in ConcurrentRateLimiterAlgorithm has override callback() method, the others not. We will do further analysis in the following.

RateLimiterAlgorithmFactory

The method getsRateLimiterAlgorithm instance by name in RateLimiterAlgorithmFactory is as follows:

public static RateLimiterAlgorithm<?> newInstance(final String name) {
return Optional.ofNullable(ExtensionLoader.getExtensionLoader(RateLimiterAlgorithm.class).getJoin(name)).orElse(new TokenBucketRateLimiterAlgorithm());
}

ExtensionLoader of SPI is responsible for loading SPI classes by "name", if cannot find the specified algorithm class, it will return TokenBucketRateLimiterAlgorithm by default.

Data access with Redis

Above detailed the extension interface in RateLimiter SPI. In Apache Shenyu, we use ReactiveRedisTemplate to perform Redis processing reactively, which is implemented inisAllowed() method of RedisRateLimiter class.

    public Mono<RateLimiterResponse> isAllowed(final String id, final RateLimiterHandle limiterHandle) {
// get parameters that will pass to redis from RateLimiterHandle Object
double replenishRate = limiterHandle.getReplenishRate();
double burstCapacity = limiterHandle.getBurstCapacity();
double requestCount = limiterHandle.getRequestCount();
// get the current used RateLimiterAlgorithm
RateLimiterAlgorithm<?> rateLimiterAlgorithm = RateLimiterAlgorithmFactory.newInstance(limiterHandle.getAlgorithmName());

........
Flux<List<Long>> resultFlux = Singleton.INST.get(ReactiveRedisTemplate.class).execute(script, keys, scriptArgs);
return resultFlux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
return new RateLimiterResponse(allowed, tokensLeft);
})
.doOnError(throwable -> log.error("Error occurred while judging if user is allowed by RedisRateLimiter:{}", throwable.getMessage()))
.doFinally(signalType -> rateLimiterAlgorithm.callback(script, keys, scriptArgs));
}

The POJO class RateLimiterHandle wraps the parameters needed in rate limiter, they are algorithName, replenishRate, burstCapacity, requestCount, etc. First, gets the parameters that need to be passed into Redis from RateLimiterHandle class. Then obtain the current implementation class from RateLimiterAlgorithmFactory.

For convenience, we give an flow image to show the parameters I/O and execution procedure in Java and Redis respectively. On the left is the second half of isAllowed() , and on the right is the processing of Lua script.

Following is the execution process of the JAVA code.

  1. Get two keys value in List<String> type from the getKeys() method, the first element will map to a sorted set in Redis.

  2. Set four parameters, replenishRate , burstCapacity, timestamp (EpochSecond) and requestcount.

  3. Calling ReactiveRedisTemplate with the scripts, keys and parameters, the return a Flux<List<Long>>

  4. The return value is converted from Flux<ArrayList<Long>> to Mono<ArrayList<Long>> the through reduce() of Flux ,and then transform it to Mono<RateLimiterResponse> via map() function. Returned two data, one is allowed (1-allow, 0- not allowed), the other is tokensLeft, the number of available remaining request.

  5. As for the fault tolerance, due to using of reactor non-blocking model, when an error occurs, the fallback function onErrorResume() will be executed and a new stream (1L, -1L) will generated by Flux.just, which means allow the request getting through, and log the error on the side.

  6. After that, performs the doFinally() method, that is to execute the callback() method of the implementation class.

io-with-lua

Four rate limiter algorithms

From above we know that how the java code works with Redis in the gateway. In this chapter we briefly analysis some code of the four rate limiter algorithms, to understand how to develop the interface of RateLimiter SPI and work efficiently with Redis.

Four rate limiter algorithms are supplied in Apache Shenyu Ratelimit SPI:

Algorithm nameJava classLua script file
Request rate limiterTokenBucketRateLimiterAlgorithmrequest_rate_limiter.lua
Slide window rate limiterSlidingWindowRateLimiterAlgorithmliding_window_request_rate_limiter.lua
Concurrent rate limiterConcurrentRateLimiterAlgorithmconcurrent_request_rate_limiter.lua
Leaky bucket algorithmLeakyBucketRateLimiterAlgorithmrequest_leaky_rate_limiter.lua
  1. Token bucket rate limiter: Limiting the traffic according to the number of requests. Assuming that N requests can be passed per second, when requests exceeding N will be rejected. In implementing of the algorithm, the requests will be grouped by bucket, the tokens will be generated at an evenly rate. If the number of requests is less than the tokens in the bucket, then it is allowed to pass. The time window is 2* capacity/rate.
  2. Slide window rate limiter: Different from token bucket algorithm, its window size is smaller than that of token bucket rate limiter, which is a capacity/rate. And move backward one time window at a time. Other rate limiter principles are similar to token bucket.
  3. Concurrent rate limiter: Strictly limit the concurrent requests to N. Each time when there is a new request, it will check whether the number of concurrent requests is greater than N. If it is less than N, it is allowed to pass through, and the count is increased by 1. When the requests call ends, the signal is released (count minus 1).
  4. Leaky bucket rate limiter: In contrast with token bucket algorithm, the leaky bucket algorithm can help to smooths the burst of requests and only allows a pre-defined N number of requests. This limiter can force the output flow at a constant rate of N. It is based on a leaky bucket model, the leaky water quantity is time interval*rate. if the leaky water quantity is greater than the number of has used (represented by key_bucket_count), then clear the bucket, that is, set the key_bucket_count to 0. Otherwise, set key_bucket_count minus the leaky water quantity. If the number (requests + key_bucket_count ) is less than the capacity, then allow the requests passing through.

Let's understand the functionality of callback() by reading concurrent rate limiter code, and understand the usage of getKeys() through reading the Lua script of token rate limiter and slide window rate limiter.

callback() used in Concurrent requests limiter

The getKeys() method of ConcurrentRateLimiterAlgorithm overrides the template method in AbstractRateLimiterAlgorithm

    @Override
public List<String> getKeys(final String id) {
String tokenKey = getKeyName() + ".{" + id + "}.tokens";
String requestKey = UUIDUtils.getInstance().generateShortUuid();
return Arrays.asList(tokenKey, requestKey);
}

The second element, requestKey is a long type and non-duplicate value (generated by a distributed ID generator,it is incremented and smaller than the current time Epochsecond value). The corresponding Lua script in concurrent_request_rate_limiter.lua:

local key = KEYS[1]

local capacity = tonumber(ARGV[2])
local timestamp = tonumber(ARGV[3])
local id = KEYS[2]

Here id is requestKey generated by getKeys() method, it is an uuid(unique value). Subsequent process is as follows:

local count = redis.call("zcard", key)
local allowed = 0

if count < capacity then
redis.call("zadd", key, timestamp, id)
allowed = 1
count = count + 1
end
return { allowed, count }

First, using zcard command to obtain the cardinality of the sorted set, and set count equals the cardinality , if the cardinality is less than the capacity, we will add a new member id (it is an uuid) to the sorted set, with the score of current time(in seconds) . then count =count+1, the cardinality is also incremented by 1 in reality.

All of the code above is executed in Redis as an atomic transaction. If there are a large number of concurrent requests from the same key( such as ip) , the cardinality of the sorted set of this key will increasing sharply, when then capacity limit is exceeded, the service will be denied, that is allowed =0

In concurrent requests limiter, It is required to release the semaphore when the request is completed. However, it is not included in Lua script.

Let's see the callback function of ConcurrentRateLimiterAlgorithm

    @Override
@SuppressWarnings("unchecked")
public void callback(final RedisScript<?> script, final List<String> keys, final List<String> scriptArgs) {
Singleton.INST.get(ReactiveRedisTemplate.class).opsForZSet().remove(keys.get(0), keys.get(1)).subscribe();
}

Here gives asynchronous subscription, using ReactiveRedisTemplate to delete the elements (key,id) in Redis store. That is once the request operation ends, the semaphore will be released. This remove operation cannot be executed in Lua script. This is just what design intention of callback in RateLimiterAlgorithm SPI .

getKeys() used in token bucket rate limiter

Following is the corresponding Lua code:

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]

Here we omit the code that get the parameters of rate ,capacity, etc.

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)

The window size variable(ttl) is approximately two times of capacity/rate.

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end

Get last_tokens from the sorted set, if it not exist, then last_tokens equals capacity.

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end

Get the last refreshed time by the key =timestamp_key from the sorted set, and default 0.

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end

The filled_tokens is produced evenly by time interval * rate,if the number of tokens greater than requests, then allowed=1, and update new_tokens.

redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

return { allowed_num, new_tokens }

Here now is current time parameters passed in, set tokens_key to hold the string new_tokens and settokens_key to timeout after ttl of seconds. Set timestamp_key to hold the string value now, and expires after ttl seconds.

getKeys() used in sliding window rate limiter

The getKeys() in SlidingWindowRateLimiterAlgorithm also overrides the parent class, and the code is consistent with the method in ConcurrentRateLimiterAlgorithm

Following is the Lua code of slide window rate limiter, the receiving of other parameters is omitted.

local timestamp_key = KEYS[2]
......
local window_size = tonumber(capacity / rate)
local window_time = 1

Here set the window_size to capacity/rate.

local last_requested = 0
local exists_key = redis.call('exists', tokens_key)
if (exists_key == 1) then
last_requested = redis.call('zcard', tokens_key)
end

Obtain the cardinality(last_requested) of the tokens_key in the sorted set.

local remain_request = capacity - last_requested
local allowed_num = 0
if (last_requested < capacity) then
allowed_num = 1
redis.call('zadd', tokens_key, now, timestamp_key)
end

Calculate remaining available remain_request equals capacity minus last_requested . If last_requested less than capacity ,then allow current requests passing through,add element in the sorted set with (key=timestamp_key, value=now) .

redis.call('zremrangebyscore', tokens_key, 0, now - window_size / window_time)
redis.call('expire', tokens_key, window_size)

return { allowed_num, remain_request }

Previously has set window_time=1, using zremrangebyscore command of Redis to remove all the elements in the sorted set stored at tokens_key with a score in [0,now - window_size / window_time] , that is, move the window a window size. Set the expire time of tokens_key to window_size.

In the template method getKeys(final String id) of AbstractRateLimiterAlgorithm,the second key ( represented y secondKey) is a fixed string which concat the input parameter{id}. As we can see from the above three algorithm codes, in the token bucket algorithm, secondKey will be updated to the latest time in the Lua code, so it doesn't matter what value is passed in. In the concurrent rate limiter, secondKey will be used as the key to remove Redis data in the java callback method. In the sliding window algorithm, the secondKey will be added to the sorted set as the key of a new element, and will be removed during window sliding.

That's all, when in a new rate limiter algorithm, the getKeys(final String id)method should be carefully designed according to the logic of the algorithm.

How to use RateLimiter SPI

The three parameters in doExecute() method of RateLimiter plugin, exchange is an web request, chain is the execution chain of the plugins,selector is the selection parameters,rule is the policies or rules of rate limiter setting in the system.

protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
//get the `RateLimiterHandle` parameters from cache
RateLimiterHandle limiterHandle = RatelimiterRuleHandleCache.getInstance()
.obtainHandle(CacheKeyUtils.INST.getKey(rule));
//find the resolver name
String resolverKey = Optional.ofNullable(limiterHandle.getKeyResolverName())
.flatMap(name -> Optional.of("-" + RateLimiterKeyResolverFactory.newInstance(name).resolve(exchange)))
.orElse("");
return redisRateLimiter.isAllowed(rule.getId() + resolverKey, limiterHandle)
.flatMap(response -> {
if (!response.isAllowed()) {
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
Object error = ShenyuResultWrap.error(ShenyuResultEnum.TOO_MANY_REQUESTS.getCode(), ShenyuResultEnum.TOO_MANY_REQUESTS.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
return chain.execute(exchange);
});
}
  1. Firstly get the RateLimiterHandle parameters from cache.

  2. Obtains the corresponding Key resolver by RateLimiterHandle instance.

  3. Reactively executes isAllowed() method of RedisRateLimiter.

  4. If not allowed, error handling is performed.

  5. If the request is allowed, dispatch it to the next process of execution chain.

Summary

RateLimiter plugin is based on Spring WebFlux,and with Apache Shen SPI, with Redis and Lua script to responsible for the critical algorithm and logic process, make it with characteristics of high concurrency and elastic. As for the RateLimiter SPI.

  1. RateLimiter SPI provides two SPI interface, with interface oriented design and various design patterns, it's easy to develop new rate limiter algorithm and key resolver rule.
  2. RateLimiterAlgorithm SPI supplies four rate limiter algorithms, token bucket,concurrency rate limiter, leaky bucket and sliding window rate limiter. When designing rate limiter algorithm, the KEY generation need to be carefully designed according to the algorithm characteristic. Using Lua script to realize the logic of the algorithm, and design callback() method for asynchronous processing when needed.
  3. Reactive programming, simple and efficient implementation.

· 6 min read
Yuxuan Zhang

Preface

As a first-time developer in the Shenyu community, I encountered some "Pitfalls" that were not mentioned in the tutorials I followed to start and develop the project. I have documented the detailed steps I took to start shenyu, shenyu-dashboard, shenyu-website in this blog, hoping to help more new contributors in the community.

Environmental Preparation

  • Correct local installation of JDK17+
  • Properly install Git locally
  • Correctly install Maven 3.6.3+
  • Choose a development tool, this article uses IDEA as an example

ShenYu Backend Startup Guide

Install and Configure Maven

Maven is a cross-platform project management tool . As the Apache organization's top open source projects , its main service for Java-based platform project creation , dependency management and project information management.

  1. Download maven and extract it to a path with no Chinese and no spaces.

  2. Add the bin directory under the maven directory to the environment variables. For Windows, if the download directory is E:\apache-maven-3.9.1, add E:\apache-maven-3.9.1\bin to the Path system variable.

  3. Verify that the installation was successful. Type mvn -v in the cmd window, and if the Maven version and Java version appear, the installation is successful. This is shown below:

    C:\Users\pc>mvn -v
    Apache Maven 3.9.1 (2e178502fcdbffc201671fb2537d0cb4b4cc58f8)
    Maven home: E:\apache-maven-3.9.1
    Java version: 18.0.1.1, vendor: Oracle Corporation, runtime: C:\Program Files\Java\jdk-18.0.1.1
    Default locale: zh_CN, platform encoding: UTF-8
    OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
  4. To speed up the download of project-related dependencies, you need to change the Maven mirrors, here add Aliyun and other mirrors. Change the <mirrors> </mirrors> tag pair in conf/settings.xml to the following:

    <mirrors>
    <mirror>
    <id>alimaven</id>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    <mirrorOf>central</mirrorOf>
    </mirror>

    <mirror>
    <id>alimaven</id>
    <mirrorOf>central</mirrorOf>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
    </mirror>

    <mirror>
    <id>maven</id>
    <mirrorOf>central</mirrorOf>
    <name>name_</name>
    <url>http://repo1.maven.org/maven2</url>
    </mirror>

    <mirror>
    <id>junit</id>
    <mirrorOf>central</mirrorOf>
    <name>junit address/</name>
    <url>http://jcenter.bintray.com/</url>
    </mirror>
    </mirrors>

    and add <localRepository>E:/maven_local_repository</localRepository> to the next line of </mirrors> to set the location of Maven local repository. You can specify the exact location yourself.

Pull ShenYu Code

  1. Fork ShenYu repository on Github to your own repository, where you can develop and commit PRs in the future

  2. Use Git to download the repository from the previous step locally:

    git clone git@github.com:${YOUR_USERNAME}/${TARGET_REPO}.git

    If prompted for a long file name, execute the following command via the command line:

    git config --global core.longpaths true

    Tips: If you encounter the following error or have network issues preventing you from pulling all the code:

    RPC failed; curl 92 HTTP/2 stream 5 was not closed cleanly: CANCEL (err 8) 2057 bytes of body are still expected fetch-pack: unexpected disconnect while reading sideband packet early EOF fetch-pack: invalid index-pack output

    You can execute the following commands to first pull a single version of the code, then fetch the full code:

    git clone https://github.com/apache/shenyu.git --depth 1
    cd ./shenyu
    git fetch --unshallow

ShenYu First Start

Preparation

  1. Compile with Maven in the shenyu directory:

    mvn clean install -Dmaven.javadoc.skip=true -B -Drat.skip=true -Djacoco.skip=true -DskipITs -DskipTests
  2. Configure IDEA environment. Open shenyu project with IDEA, click File -> Settings in the top left corner, and configure Maven as shown below. Where User settings file select your settings.xml directory, and then Local repository will automatically load the localRepository path set in settings.xml:

  3. At this point, IDEA will automatically download the project-related dependencies, you need to wait for a while, when finished, as shown in the following figure:

    As you can see, shenyu-e2e, shenyu-examples, shenyu-integrated-test are not marked as Maven projects by IDEA and need to be added manually. Select the pom.xml file in the package and right-click Add as Maven Project. If the shenyu-e2e build fails, then add the <relativePath>. /pom.xml</relativePath> to <relativePath/>.

Start Gateway Service

  1. Start the shenyu-admin console (H2 database is used by default)

  2. start shenyu-bootstrap

By this point, the shenyu gateway has been started.

We can open the browser and access the admin console: http://localhost:9095/

Default account: admin , default password: 123456

Start Application Service

Apache ShenYu provides samples of Http, Dubbo, SpringCloud and other applications to access the shenyu gateway, located in the shenyu-example module, here the Http service is used as an example.

Start shenyu-examples-http

At this point, shenyu-examples-http will automatically register the interface methods annotated with @ShenyuSpringMvcClient and the relevant configuration in application.yml to the gateway. We can open the admin console and see the configuration in Client List -> Proxy -> divide.

Test Http Request

The following uses the IDEA HTTP Client Plugin to mock http to access http services.

  • Local access without using shenyu proxy

  • Use shenyu proxy

Use more plugins

We can refer to the official documentation to the left of Plugins collection to use the required plugins.

Shenyu Front End Startup Guide

Install Node.js

Download

  1. Download and install Node.js from official website and select LTS version.

  2. When installing, except for setting the installation path, just keep clicking Next.

  3. After the installation is complete, verify at the command line:

    C:\Users\pc>node -v
    v12.22.12

    C:\Users\pc>npm -v
    6.14.16

Pull ShenYu Dashboard Code

  1. Fork ShenYu Dashboard repository

  2. Using Git to download locally

    git clone git@github.com:${YOUR_USERNAME}/${TARGET_REPO}.git

Front and Back End Co-development

  1. Add enablePrintApiLog: true to the shenyu-admin/src/main/resources/application.yml file in the backend repository shenyu as shown below to show the log of frontend interface calls in the backend console.

  2. Start ShenyuAdminBootstrap

  3. Switch to the front-end repository shenyu-dashboard, open README, click npm install, npm start or enter the above command from cmd to access the front-end interface via http://localhost:8000, and display the log of the front-end interface called in the back-end console. Realize the co-development of front-end and back-end.

Package Front-end Code

Execute the npm build command in README and copy all the generated files from the dist folder to the shenyu-admin/src/main/resources/static/ directory in the backend repository.

Contribute to Shenyu Official Website

Just follow the README in shenyu-website.

Tips

  1. I recommend downloading the LTS version from the Node website.
  2. Windows systems cannot be deployed, if you want to verify your changes, you can deploy on a Linux virtual machine or server.