Skip to main content

4 posts tagged with "plugin"

View All Tags

· 8 min read

This article is based on the source code analysis of version 'shenyu-2.6.1'

Shenyu provides a mechanism to customize its own plugins or modify existing plugins, which is implemented internally through the configuration of extPlugin. It needs to meet the following two points:

  1. Implement interface ShenyuPlugin or PluginDataHandler.
  2. After packaging the implemented package, place it in the corresponding path of 'shenyu. extPlugin. path'.

Entry

The class that truly implements this logic is' ShenyuLoaderService '. Now let's take a look at how this class handles it.

    public ShenyuLoaderService(final ShenyuWebHandler webHandler, final CommonPluginDataSubscriber subscriber, final ShenyuConfig shenyuConfig) {
// Information subscription for plugin information
this.subscriber = subscriber;
// The WebHandler encapsulated by Shenyu contains all the plugin logic
this.webHandler = webHandler;
// configuration information
this.shenyuConfig = shenyuConfig;
// The configuration information of the extension plugin, such as path, whether it is enabled, how many threads are enabled to process, and the frequency of loading checks
ExtPlugin config = shenyuConfig.getExtPlugin();
// If enabled, create a scheduled task to check and load
if (config.getEnabled()) {
// Create a scheduled task with a specified thread name
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(config.getThreads(), ShenyuThreadFactory.create("plugin-ext-loader", true));
// Create a task to be executed at a fixed frequency, with a default time of 30 seconds and execution every 300 seconds
executor.scheduleAtFixedRate(() -> loadExtOrUploadPlugins(null), config.getScheduleDelay(), config.getScheduleTime(), TimeUnit.SECONDS);
}
}

This class has the following properties:

WebHandler: This class is the entry point for shenyu to process requests, referencing all plugin data. After the extension plugin is loaded, it needs to be updated.

Subscriber: This class is the entry point for the subscription of plugins, referencing the subscription processing classes of all plugins. After the extension configuration is loaded, synchronous updates are also required.

Executor: A scheduled task will be created inside' ShenyuLoaderService 'to periodically scan and load jar packages under the specified path, facilitating the loading of extended plugins and achieving dynamic discovery By default, it will scan every 300 seconds after 30 seconds of startup.

Meanwhile, the decision to enable extension plugin functionality can be made through the configuration of shenyu. extPlugin. enabled.

The above configurations can be adjusted in the configuration file:

shenyu:
extPlugin:
path: # Storage directory for extension plugins
enabled: true # Is the extension function enabled
threads: 1 # Number of threads loaded by scanning
scheduleTime: 300 # The frequency of task execution
scheduleDelay: 30 # How long after the task starts to execute

Next, let's take a look at the loading logic:

   public void loadExtOrUploadPlugins(final PluginData uploadedJarResource) {
try {
List<ShenyuLoaderResult> plugins = new ArrayList<>();
// Obtain the holding object of ShenyuPluginClassloader
ShenyuPluginClassloaderHolder singleton = ShenyuPluginClassloaderHolder.getSingleton();
if (Objects.isNull(uploadedJarResource)) {
// If the parameter is empty, load all jar packages from the extended directory
// PluginJar: Data containing the ShenyuPlugin interface and PluginDataHandler interface
List<PluginJarParser.PluginJar> uploadPluginJars = ShenyuExtPathPluginJarLoader.loadExtendPlugins(shenyuConfig.getExtPlugin().getPath());
// Traverse all pending plugins
for (PluginJarParser.PluginJar extPath : uploadPluginJars) {
LOG.info("shenyu extPlugin find new {} to load", extPath.getAbsolutePath());
// Use the loader of the extension plugin to load the specified plugin, facilitating subsequent loading and unloading
ShenyuPluginClassLoader extPathClassLoader = singleton.createPluginClassLoader(extPath);
// Using ShenyuPluginClassLoader for loading
// The main logic is to determine whether to implement ShenyuPlugin interface, PluginDataHandler interface, or identify annotations such as @ Component \ @ Service. If so, register as SpringBean
// Construct ShenyuLoaderResult object
plugins.addAll(extPathClassLoader.loadUploadedJarPlugins());
}
} else {
// Load the specified jar, with the same logic as loading all
PluginJarParser.PluginJar pluginJar = PluginJarParser.parseJar(Base64.getDecoder().decode(uploadedJarResource.getPluginJar()));
LOG.info("shenyu upload plugin jar find new {} to load", pluginJar.getJarKey());
ShenyuPluginClassLoader uploadPluginClassLoader = singleton.createPluginClassLoader(pluginJar);
plugins.addAll(uploadPluginClassLoader.loadUploadedJarPlugins());
}
// Add the extended plugins to the plugin list of ShenyuWebHandler, and subsequent requests will go through the added plugin content
loaderPlugins(plugins);
} catch (Exception e) {
LOG.error("shenyu plugins load has error ", e);
}
}

The logic processed by this method:

  1. Check if the parameter uploadedJarResource has a value. If not, all will be loaded. Otherwise, load the specified resource jar package for processing.

  2. Retrieve the specified jar package from shenyu. extPlugin. path and encapsulate it as a PluginJar object, which contains the following information about the jar package:

    • version: version information

    • groupId: The groupId of the package

    • artifactId: The artifactId of the package

    • absolutePath: Absolute path

    • clazzMap: Bytecode corresponding to class

    • resourceMap: Bytecode of jar package

  3. Create a corresponding ClassLoader using ShenyuPluginClassloaderHolder, with the corresponding class being 'ShenyuPluginClassLoader', and load the corresponding class accordingly.

    • Call ShenyuPluginClassLoader. loadUploadedJarPlugins to load the corresponding class and register it as a Spring Bean, which can be managed using the Spring container
  4. Call the loaderPlugins method to update the extended plugin to'webHandler and subscriber.

Plugin Registration

For the content in the provided jar package, the loader will only handle classes of the specified interface type, and the implementation logic is in the ShenyuPluginClassLoader.loadUploadedJarPlugins() method.

public List<ShenyuLoaderResult> loadUploadedJarPlugins() {
List<ShenyuLoaderResult> results = new ArrayList<>();
// All class mapping relationships
Set<String> names = pluginJar.getClazzMap().keySet();
// Traverse all classes
names.forEach(className -> {
Object instance;
try {
// Try creating objects and, if possible, add them to the Spring container
instance = getOrCreateSpringBean(className);
if (Objects.nonNull(instance)) {
// Building the ShenyuLoaderResult object
results.add(buildResult(instance));
LOG.info("The class successfully loaded into a upload-Jar-plugin {} is registered as a spring bean", className);
}
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
LOG.warn("Registering upload-Jar-plugins succeeds spring bean fails:{}", className, e);
}
});
return results;
}

This method is responsible for building all eligible objects and encapsulating them into a ShenyuLoaderResult object. This object is encapsulated for the created object and will be processed in the method buildResult().

    private ShenyuLoaderResult buildResult(final Object instance) {
ShenyuLoaderResult result = new ShenyuLoaderResult();
// Does the created object implement ShenyuPlugin
if (instance instanceof ShenyuPlugin) {
result.setShenyuPlugin((ShenyuPlugin) instance);
// Does the created object implement PluginDataHandler
} else if (instance instanceof PluginDataHandler) {
result.setPluginDataHandler((PluginDataHandler) instance);
}
return result;
}

Simultaneously enter the method getOrCreatSpringBean() for further analysis:

    private <T> T getOrCreateSpringBean(final String className) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
// Confirm if it has been registered. If so, do not process it and return directly
if (SpringBeanUtils.getInstance().existBean(className)) {
return SpringBeanUtils.getInstance().getBeanByClassName(className);
}
lock.lock();
try {
// Double check,
T inst = SpringBeanUtils.getInstance().getBeanByClassName(className);
if (Objects.isNull(inst)) {
// Using ShenyuPluginClassLoader to load classes
Class<?> clazz = Class.forName(className, false, this);
//Exclude ShenyuPlugin subclass and PluginDataHandler subclass
// without adding @Component @Service annotation
// Confirm if it is a subclass of ShenyuPlugin or PluginDataHandler
boolean next = ShenyuPlugin.class.isAssignableFrom(clazz)
|| PluginDataHandler.class.isAssignableFrom(clazz);
if (!next) {
// If not, confirm if @ Component and @ Service annotations are identified
Annotation[] annotations = clazz.getAnnotations();
next = Arrays.stream(annotations).anyMatch(e -> e.annotationType().equals(Component.class)
|| e.annotationType().equals(Service.class));
}
if (next) {
// If the above content is met, register the bean
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClassName(className);
beanDefinition.setAutowireCandidate(true);
beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
// Registering beans
String beanName = SpringBeanUtils.getInstance().registerBean(beanDefinition, this);
// create object
inst = SpringBeanUtils.getInstance().getBeanByClassName(beanName);
}
}
return inst;
} finally {
lock.unlock();
}
}

The logic is roughly as follows:

  1. Check if the interface ShenyuPlugin or PluginDataHandler has been implemented. If not, check if @Component or @Service has been identified`.
  2. If the condition of 1 is met, register the object in the Spring container and return the created object.

Sync Data

After the plugin registration is successful, the plugin is only instantiated, but it will not take effect yet because it has not been added to Shenyu's plugin chain. The synchronization logic is implemented by the loaderPlugins() method.

    private void loaderPlugins(final List<ShenyuLoaderResult> results) {
if (CollectionUtils.isEmpty(results)) {
return;
}
// Get all objects that implement the interface ShenyuPlugin
List<ShenyuPlugin> shenyuExtendPlugins = results.stream().map(ShenyuLoaderResult::getShenyuPlugin).filter(Objects::nonNull).collect(Collectors.toList());
// Synchronize updating plugins in webHandler
webHandler.putExtPlugins(shenyuExtendPlugins);
// Get all objects that implement the interface PluginDataHandler
List<PluginDataHandler> handlers = results.stream().map(ShenyuLoaderResult::getPluginDataHandler).filter(Objects::nonNull).collect(Collectors.toList());
// Synchronize updating handlers in subscriber
subscriber.putExtendPluginDataHandler(handlers);

}

The logic of this method processes two data points:

  1. Synchronize the data that implements the ShenyuPlugin interface to the plugins list of webHandler.
    public void putExtPlugins(final List<ShenyuPlugin> extPlugins) {
if (CollectionUtils.isEmpty(extPlugins)) {
return;
}
// Filter out newly added plugins
final List<ShenyuPlugin> shenyuAddPlugins = extPlugins.stream()
.filter(e -> plugins.stream().noneMatch(plugin -> plugin.named().equals(e.named())))
.collect(Collectors.toList());
// Filter out updated plugins and determine if they have the same name as the old one, then it is an update
final List<ShenyuPlugin> shenyuUpdatePlugins = extPlugins.stream()
.filter(e -> plugins.stream().anyMatch(plugin -> plugin.named().equals(e.named())))
.collect(Collectors.toList());
// If there is no data, skip
if (CollectionUtils.isEmpty(shenyuAddPlugins) && CollectionUtils.isEmpty(shenyuUpdatePlugins)) {
return;
}
// Copy old data
// copy new list
List<ShenyuPlugin> newPluginList = new ArrayList<>(plugins);
// Add new plugin data
// Add extend plugin from pluginData or shenyu ext-lib
this.sourcePlugins.addAll(shenyuAddPlugins);
// Add new data
if (CollectionUtils.isNotEmpty(shenyuAddPlugins)) {
shenyuAddPlugins.forEach(plugin -> LOG.info("shenyu auto add extends plugins:{}", plugin.named()));
newPluginList.addAll(shenyuAddPlugins);
}
// Modify updated data
if (CollectionUtils.isNotEmpty(shenyuUpdatePlugins)) {
shenyuUpdatePlugins.forEach(plugin -> LOG.info("shenyu auto update extends plugins:{}", plugin.named()));
for (ShenyuPlugin updatePlugin : shenyuUpdatePlugins) {
for (int i = 0; i < newPluginList.size(); i++) {
if (newPluginList.get(i).named().equals(updatePlugin.named())) {
newPluginList.set(i, updatePlugin);
}
}
for (int i = 0; i < this.sourcePlugins.size(); i++) {
if (this.sourcePlugins.get(i).named().equals(updatePlugin.named())) {
this.sourcePlugins.set(i, updatePlugin);
}
}
}
}
// REORDER
plugins = sortPlugins(newPluginList);
}
  1. Synchronize the data that implements the PluginDataHandler interface to the handlers list of the subscriber.
    public void putExtendPluginDataHandler(final List<PluginDataHandler> handlers) {
if (CollectionUtils.isEmpty(handlers)) {
return;
}
// Traverse all data
for (PluginDataHandler handler : handlers) {
String pluginNamed = handler.pluginNamed();
// Update existing PluginDataHandler list
MapUtils.computeIfAbsent(handlerMap, pluginNamed, name -> {
LOG.info("shenyu auto add extends plugin data handler name is :{}", pluginNamed);
return handler;
});
}
}

At this point, the analysis of the loading process of the extension plugin is completed.

· 21 min read

The ShenYu gateway uses the divide plugin to handle http requests. You can see the official documentation Quick start with Http to learn how to use this plugin.

This article is based on shenyu-2.4.3 version for source code analysis, please refer to Http Proxy for the introduction of the official website.

1. Register Service

1.1 Declaration of registration interface

Use the annotation @ShenyuSpringMvcClient to register the service to the gateway. The simple demo is as follows.

@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order") // API
public class OrderController {
@GetMapping("/findById")
@ShenyuSpringMvcClient(path = "/findById", desc = "Find by id") // method
public OrderDTO findById(@RequestParam("id") final String id) {
return build(id, "hello world findById");
}
}

define annotation:


@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {

//path
String path() default "";

//rule name
String ruleName() default "";

//desc info
String desc() default "";

//is enabled
boolean enabled() default true;

//register MetaData
boolean registerMetaData() default false;
}

1.2 Scan annotation

Annotation scanning is done through SpringMvcClientBeanPostProcessor, which implements the BeanPostProcessor interface and is a post-processor provided by Spring.

During constructor instantiation.

  • Read the property configuration
  • Add annotations, read path information
  • Start the registry and register with shenyu-admin
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
//...
/**
* Constructor instantiation
*/
public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 1. read Properties
Properties props = clientConfig.getProps();
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH, "");
if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
String errorMsg = "http register param must config the appName or contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// 2. add annotation
mappingAnnotation.add(ShenyuSpringMvcClient.class);
mappingAnnotation.add(PostMapping.class);
mappingAnnotation.add(GetMapping.class);
mappingAnnotation.add(DeleteMapping.class);
mappingAnnotation.add(PutMapping.class);
mappingAnnotation.add(RequestMapping.class);
// 3. start register cneter
publisher.start(shenyuClientRegisterRepository);
}

@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// override post process

return bean;
}

  • SpringMvcClientBeanPostProcessor#postProcessAfterInitialization()

Rewrite post-processor logic: read annotation information, construct metadata objects and URI objects, and register them with shenyu-admin.

    @Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// 1. If the all service is registered or is not a Controller class, it is not handled
if (Boolean.TRUE.equals(isFull) || !hasAnnotation(bean.getClass(), Controller.class)) {
return bean;
}
// 2. Read the annotations on the class ShenyuSpringMvcClient
final ShenyuSpringMvcClient beanShenyuClient = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);
// 2.1 build superPath
final String superPath = buildApiSuperPath(bean.getClass());
// 2.2 whether to register the entire class method
if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {
// build the metadata object and register it with shenyu-admin
publisher.publishEvent(buildMetaDataDTO(beanShenyuClient, pathJoin(contextPath, superPath)));
return bean;
}
// 3. read all methods
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
for (Method method : methods) {
// 3.1 read the annotations on the method ShenyuSpringMvcClient
ShenyuSpringMvcClient methodShenyuClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
// If there is no annotation on the method, use the annotation on the class
methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;
if (Objects.nonNull(methodShenyuClient)) {
// 3.2 Build path information, build metadata objects, register with shenyu-admin
publisher.publishEvent(buildMetaDataDTO(methodShenyuClient, buildApiPath(method, superPath)));
}
}

return bean;
}
    1. If you are registering the whole service or not Controller class, do not handle it
    1. read the annotation on the class ShenyuSpringMvcClient, if the whole class is registered, build the metadata object here and register it with shenyu-admin.
    1. Annotation on the handler method ShenyuSpringMvcClient, build path information for the specific method, build the metadata object and then register it with shenyu-admin

There are two methods here that take path and need special instructions.

  • buildApiSuperPath()

Construct SuperPath: first take the path property from the annotation ShenyuSpringMvcClient on the class, if not, take the path information from the RequestMapping annotation on the current class.

    private String buildApiSuperPath(@NonNull final Class<?> method) {
// First take the path property from the annotation ShenyuSpringMvcClient on the class
ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {
return shenyuSpringMvcClient.path();
}
// Take the path information from the RequestMapping annotation of the current class
RequestMapping requestMapping = AnnotationUtils.findAnnotation(method, RequestMapping.class);
if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {
return requestMapping.path()[0];
}
return "";
}
  • buildApiPath()

Build path: first read the annotation ShenyuSpringMvcClient on the method and build it if it exists; otherwise get the path information from other annotations on the method; complete path = contextPath(context information) + superPath(class information) + methodPath(method information).

    private String buildApiPath(@NonNull final Method method, @NonNull final String superPath) {
// 1. Read the annotation ShenyuSpringMvcClient on the method
ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
// 1.1 If path exists, build
if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {
//1.2 path = contextPath+superPath+methodPath
return pathJoin(contextPath, superPath, shenyuSpringMvcClient.path());
}
// 2. Get path information from other annotations on the method
final String path = getPathByMethod(method);
if (StringUtils.isNotBlank(path)) {
// 2.1 path = contextPath+superPath+methodPath
return pathJoin(contextPath, superPath, path);
}
return pathJoin(contextPath, superPath);
}
  • getPathByMethod()

Get path information from other annotations on the method, other annotations include.

  • ShenyuSpringMvcClient
  • PostMapping
  • GetMapping
  • DeleteMapping
  • PutMapping
  • RequestMapping

private String getPathByMethod(@NonNull final Method method) {
// Iterate through interface annotations to get path information
for (Class<? extends Annotation> mapping : mappingAnnotation) {
final String pathByAnnotation = getPathByAnnotation(AnnotationUtils.findAnnotation(method, mapping), pathAttributeNames);
if (StringUtils.isNotBlank(pathByAnnotation)) {
return pathByAnnotation;
}
}
return null;
}

After the scanning annotation is finished, construct the metadata object and send the object to shenyu-admin to complete the registration.

  • Metadata

Includes the rule information of the currently registered method: contextPath, appName, registration path, description information, registration type, whether it is enabled, rule name and whether to register metadata.

 private MetaDataRegisterDTO buildMetaDataDTO(@NonNull final ShenyuSpringMvcClient shenyuSpringMvcClient, final String path) {
return MetaDataRegisterDTO.builder()
.contextPath(contextPath) // contextPath
.appName(appName) // appName
.path(path) // Registered path, used when gateway rules match
.pathDesc(shenyuSpringMvcClient.desc()) // desc info
.rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, http type when default
.enabled(shenyuSpringMvcClient.enabled()) // is enabled?
.ruleName(StringUtils.defaultIfBlank(shenyuSpringMvcClient.ruleName(), path))//rule name
.registerMetaData(shenyuSpringMvcClient.registerMetaData()) // whether to register metadata information
.build();
}

The specific registration logic is implemented by the registration center, which has been analyzed in the previous articles and will not be analyzed in depth here.

1.3 Register URI Data

ContextRegisterListener is responsible for registering the client's URI information to shenyu-admin, it implements the ApplicationListener interface, when the context refresh event ContextRefreshedEvent occurs, the onApplicationEvent() method is executed to implement the registration logic.


public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent>, BeanFactoryAware {
//......

/**
* Constructor instantiation
*/
public ContextRegisterListener(final PropertiesConfig clientConfig) {
// read Properties
final Properties props = clientConfig.getProps();
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
if (Boolean.TRUE.equals(isFull)) {
if (StringUtils.isBlank(contextPath)) {
final String errorMsg = "http register param must config the contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
}
this.port = Integer.parseInt(Optional.ofNullable(props.getProperty(ShenyuClientConstants.PORT)).orElseGet(() -> "-1"));
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);
this.host = props.getProperty(ShenyuClientConstants.HOST);
}

@Override
public void setBeanFactory(final BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

// Execute application events
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {
// The method is guaranteed to be executed once
if (!registered.compareAndSet(false, true)) {
return;
}
// 1. If you are registering for the entire service
if (Boolean.TRUE.equals(isFull)) {
// Build metadata and register
publisher.publishEvent(buildMetaDataDTO());
}
try {
// get port
final int mergedPort = port <= 0 ? PortUtils.findPort(beanFactory) : port;
// 2. Constructing URI data and registering
publisher.publishEvent(buildURIRegisterDTO(mergedPort));
} catch (ShenyuException e) {
throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");
}
}

// build URI data
private URIRegisterDTO buildURIRegisterDTO(final int port) {
return URIRegisterDTO.builder()
.contextPath(this.contextPath) // contextPath
.appName(appName) // appName
.protocol(protocol) // protocol
.host(IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host)) //host
.port(port) // port
.rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, default registration http type
.build();
}

// build MetaData
private MetaDataRegisterDTO buildMetaDataDTO() {
return MetaDataRegisterDTO.builder()
.contextPath(contextPath)
.appName(appName)
.path(contextPath)
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(contextPath)
.build();
}
}

1.4 Handle registration information

The metadata and URI data registered by the client through the registry are processed in shenyu-admin, which is responsible for storing to the database and synchronizing to the shenyu gateway. The client registration processing logic of Divide plugin is in ShenyuClientRegisterDivideServiceImpl. The inheritance relationship is as follows.

  • ShenyuClientRegisterService: client registration service, top-level interface.
  • FallbackShenyuClientRegisterService: registration failure, provides retry operation.
  • AbstractShenyuClientRegisterServiceImpl: abstract class, implements part of the public registration logic;
  • AbstractContextPathRegisterService: abstract class, responsible for registering ContextPath.
  • ShenyuClientRegisterDivideServiceImpl: implementation of the Divide plug-in registration.
1.4.1 Register Service
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

The metadata MetaDataRegisterDTO object registered by the client through the registry is picked up and dropped in the register() method of shenyu-admin.

   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
//2. register rule
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
//3. register metadat
registerMetadata(dto);
//4. register ContextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
1.4.1.1 Register Selector
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

Build contextPath, find if the selector information exists, if it does, return id; if it doesn't, create the default selector information.

    @Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
// build contextPath
String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
// Find if selector information exists by name
SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
if (Objects.isNull(selectorDO)) {
// Create a default selector message if it does not exist
return registerSelector(contextPath, pluginName, selectorHandler);
}
return selectorDO.getId();
}
  • Default Selector Information

Construct the default selector information and its conditional properties here.

   //register selector
private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {
// build selector
SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
selectorDTO.setHandle(selectorHandler);
//register default Selector
return registerDefault(selectorDTO);
}
//build
private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {
//build default
SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);
selectorDTO.setPluginId(pluginId);
//build the conditional properties of the default selector
selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));
return selectorDTO;
}
  • Build Default Selector
private SelectorDTO buildDefaultSelectorDTO(final String name) {
return SelectorDTO.builder()
.name(name) // name
.type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // default CUSTOM_FLOW
.matchMode(MatchModeEnum.AND.getCode()) //default AND
.enabled(Boolean.TRUE) //default TRUE
.loged(Boolean.TRUE) //default TRUE
.continued(Boolean.TRUE) //default TRUE
.sort(1) //default 1
.build();
}


  • Build default selector conditional properties
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {
SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();
selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI
selectorConditionDTO.setParamName("/");
selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default match
selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // default /contextPath/**
return Collections.singletonList(selectorConditionDTO);
}
  • Register default selector
@Override
public String registerDefault(final SelectorDTO selectorDTO) {
//selector info
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
//selector condition info
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
if (StringUtils.isEmpty(selectorDTO.getId())) {
// insert selector information into the database
selectorMapper.insertSelective(selectorDO);
// insert selector condition information into the database
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
}
// Publish synchronization events to synchronize selection information and its conditional attributes to the gateway
publishEvent(selectorDO, selectorConditionDTOs);
return selectorDO.getId();
}
1.4.1.2 Register Rule

In the second step of registering the service, start building the default rules and then register the rules.

@Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
//......

//2. register rule
// default rule handle
String ruleHandler = ruleHandler();
// build default rule
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
// register rule
ruleService.registerDefault(ruleDTO);

//3. register Metadata
//......

//4. register ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • default rule handle
    @Override
protected String ruleHandler() {
// default rule handle
return new DivideRuleHandle().toJson();
}

Divide plugin default rule handle.


public class DivideRuleHandle implements RuleHandle {

/**
* load balance: default RANDOM
*/
private String loadBalance = LoadBalanceEnum.RANDOM.getName();

/**
* retry strategy: default CURRENT
*/
private String retryStrategy = RetryEnum.CURRENT.getName();

/**
* retry: default 3
*/
private int retry = 3;

/**
* retry: default 3000
*/
private long timeout = Constants.TIME_OUT;

/**
* retry: default 10240 byte
*/
private long headerMaxSize = Constants.HEADER_MAX_SIZE;

/**
* retry: default 102400 byte
*/
private long requestMaxSize = Constants.REQUEST_MAX_SIZE;
}
  • build default rule info
  // build default rule info
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}
// build default rule info
private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
RuleDTO ruleDTO = RuleDTO.builder()
.selectorId(selectorId) //selector Id
.name(ruleName) //rule Name
.matchMode(MatchModeEnum.AND.getCode()) // default and
.enabled(Boolean.TRUE) // default TRUE
.loged(Boolean.TRUE) //default TRUE
.sort(1) //default 1
.handle(ruleHandler)
.build();
RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
.paramType(ParamTypeEnum.URI.getName()) // default URI
.paramName("/")
.paramValue(path) // path
.build();
if (path.indexOf("*") > 1) {
ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //if the path conatins *, default match
} else {
ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // default =
}
ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
return ruleDTO;
}
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

Registration rules: insert records to the database and publish events to the gateway for data synchronization.


@Override
public String registerDefault(final RuleDTO ruleDTO) {
RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());
if (Objects.nonNull(exist)) {
return "";
}

RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
if (StringUtils.isEmpty(ruleDTO.getId())) {
// insert rule into database
ruleMapper.insertSelective(ruleDO);
//insert rule condition into database
ruleConditions.forEach(ruleConditionDTO -> {
ruleConditionDTO.setRuleId(ruleDO.getId());
ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));
});
}
// Publish events to the gateway for data synchronization
publishEvent(ruleDO, ruleConditions);
return ruleDO.getId();
}

1.4.1.3 Register Metadata
   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
//......

//2. register rule
//......

//3. register metadata
registerMetadata(dto);

//4. register ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDivideServiceImpl#registerMetadata()

Insert or update metadata and then publish sync events to the gateway.


@Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {
if (dto.isRegisterMetaData()) {
MetaDataService metaDataService = getMetaDataService();
MetaDataDO exist = metaDataService.findByPath(dto.getPath());
// save or update MetaData
metaDataService.saveOrUpdateMetaData(exist, dto);
}
}

@Override
public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {
DataEventTypeEnum eventType;
// DTO->DO
MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);
// insert
if (Objects.isNull(exist)) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());
metaDataDO.setDateCreated(currentTime);
metaDataDO.setDateUpdated(currentTime);
metaDataMapper.insert(metaDataDO);
eventType = DataEventTypeEnum.CREATE;
} else {
// update
metaDataDO.setId(exist.getId());
metaDataMapper.update(metaDataDO);
eventType = DataEventTypeEnum.UPDATE;
}
// publish event to gateway
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,
Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
}
1.4.1.4 Register ContextPath
   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
//......

//2. register rule
//......

//3. register metadata
//......

//4. register ContextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
  • org.apache.shenyu.admin.service.register.AbstractContextPathRegisterService#registerContextPath()
    @Override
public void registerContextPath(final MetaDataRegisterDTO dto) {
// set contextPath for selector
String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");
ContextMappingRuleHandle handle = new ContextMappingRuleHandle();
handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));
// set contextPath for rule
getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));
}
1.4.2 Register URI
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

The server side receives the URI information registered by the client and processes it.

    @Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
String result;
String key = key(selectorName);
try {
this.removeFallBack(key);
// register URI
result = this.doRegisterURI(selectorName, uriList);
logger.info("Register success: {},{}", selectorName, uriList);
} catch (Exception ex) {
logger.warn("Register exception: cause:{}", ex.getMessage());
result = "";
// Retry after registration failure
this.addFallback(key, new FallbackHolder(selectorName, uriList));
}
return result;
}
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

Get a valid URI from the URI registered by the client, update the corresponding selector handle property, and send a selector update event to the gateway.

@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
//check
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
//get selector
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
throw new ShenyuException("doRegister Failed to execute,wait to retry.");
}
// gte valid URI
List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
// build handle
String handler = buildHandle(validUriList, selectorDO);
if (handler != null) {
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// Update the handle property of the selector to the database
selectorService.updateSelective(selectorDO);
// Send selector update events to the gateway
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
}
return ShenyuResultMessage.SUCCESS;
}

The source code analysis on service registration is completed as well as the analysis flow chart is as follows.

The next step is to analyze how the divide plugin initiates a call to the http service based on this information.

2. Call Http Service

The divide plugin is the core processing plugin used by the gateway to handle http protocol requests.

Take the case provided on the official website Quick start with Http as an example, a direct connection request is as follows.

GET http://localhost:8189/order/findById?id=100
Accept: application/json

After proxying through the ShenYu gateway, the request is as follows.

GET http://localhost:9195/http/order/findById?id=100
Accept: application/json

The services proxied by the ShenYu gateway are still able to request the previous services, where the divide plugin comes into play. The class inheritance relationship is as follows.

  • ShenyuPlugin: top-level interface, defining interface methods.
  • AbstractShenyuPlugin: abstract class that implements the common logic of the pluin.
  • DividePlugin: Divide pluin.

2.1 Accept Request

After passing the ShenYu gateway proxy, the request entry is ShenyuWebHandler, which implements the org.springframework.web.server.WebHandler interface.

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {
//......

/**
* hanlde web reuest
*/
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
// execute plugin chain
Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);
if (scheduled) {
return execute.subscribeOn(scheduler);
}
return execute;
}

private static class DefaultShenyuPluginChain implements ShenyuPluginChain {

private int index;

private final List<ShenyuPlugin> plugins;

/**
* Instantiating the default plugin chain
*/
DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {
this.plugins = plugins;
}

/**
* Execute each plugin
*/
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
// get current plugin
ShenyuPlugin plugin = plugins.get(this.index++);
// is skip ?
boolean skip = plugin.skip(exchange);
if (skip) {
// If skipped, execute the next
return this.execute(exchange);
}
// execute current plugin
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
}

2.2 Matching rule

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

Execute the matching logic for selectors and rules in the execute() method.

  • Matching selectors.
  • Matching rules.
  • Execute the plugin.
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
String pluginName = named();
PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
// selector
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
// match selector
SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
// rule
List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
// match rule
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
// execute
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}

2.3 Execute Divide Plugin

  • org.apache.shenyu.plugin.divide.DividePlugin#doExecute()

Execute the specific logic of the divide plugin in the doExecute() method.

  • Checks the header size.
  • Checking the request size.
  • Obtaining the list of services.
  • implementing load balancing.
  • Set request url, timeout time, retry policy.
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
// shenyu Context
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// Get the handle property of the rule
DivideRuleHandle ruleHandle = DividePluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));
long headerSize = 0;
// check header size
for (List<String> multiHeader : exchange.getRequest().getHeaders().values()) {
for (String value : multiHeader) {
headerSize += value.getBytes(StandardCharsets.UTF_8).length;
}
}
if (headerSize > ruleHandle.getHeaderMaxSize()) {
LOG.error("request header is too large");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE, null);
return WebFluxResultUtils.result(exchange, error);
}

// check request size
if (exchange.getRequest().getHeaders().getContentLength() > ruleHandle.getRequestMaxSize()) {
LOG.error("request entity is too large");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE, null);
return WebFluxResultUtils.result(exchange, error);
}
// upstream list
List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
if (CollectionUtils.isEmpty(upstreamList)) {
LOG.error("divide upstream configuration error: {}", rule);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// request ip
String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// load balance
Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(upstream)) {
LOG.error("divide has no upstream");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// set url
String domain = upstream.buildDomain();
exchange.getAttributes().put(Constants.HTTP_DOMAIN, domain);
// set timeout
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
// set retry
exchange.getAttributes().put(Constants.RETRY_STRATEGY, ruleHandle.getRetryStrategy());
exchange.getAttributes().put(Constants.LOAD_BALANCE, ruleHandle.getLoadBalance());
exchange.getAttributes().put(Constants.DIVIDE_SELECTOR_ID, selector.getId());
return chain.execute(exchange);
}

2.4 Do Request

By default, the WebClientPlugin initiates a call request to the http service with the following class inheritance relationship.

  • ShenyuPlugin: top-level plug-in, defining plug-in methods.
  • AbstractHttpClientPlugin: abstract class that implements the public logic of request invocation.
  • WebClientPlugin: initiating requests through WebClient.
  • NettyHttpClientPlugin: initiating requests through Netty.

Initiate the request call.

  • org.apache.shenyu.plugin.httpclient.AbstractHttpClientPlugin#execute()

Initiate the request call in the execute() method.

  • Get the specified timeout, number of retries
  • Initiate the request
  • Retry after failure according to the specified retry policy

public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin {

protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class);

@Override
public final Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// shenyu Context
final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// uri
final URI uri = exchange.getAttribute(Constants.HTTP_URI);
if (Objects.isNull(uri)) {
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// get time out
final long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
final Duration duration = Duration.ofMillis(timeout);
// get retry times
final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
// get retry strategy
final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName);
LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy is {}", uri.toASCIIString(), retryTimes, retryStrategy);
// build header
final HttpHeaders httpHeaders = buildHttpHeaders(exchange);
// do request
final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethodValue(), uri, httpHeaders, exchange.getRequest().getBody())
.timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
.doOnError(e -> LOG.error(e.getMessage(), e));

// Retry Policy CURRENT, retries the current service.
if (RetryEnum.CURRENT.getName().equals(retryStrategy)) {
//old version of DividePlugin and SpringCloudPlugin will run on this
return response.retryWhen(Retry.anyOf(TimeoutException.class, ConnectTimeoutException.class, ReadTimeoutException.class, IllegalStateException.class)
.retryMax(retryTimes)
.backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
.onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
.flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));
}

// Retry for other services
// Exclude services that have already been called
final Set<URI> exclude = Sets.newHashSet(uri);
// resend
return resend(response, exchange, duration, httpHeaders, exclude, retryTimes)
.onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
.flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));
}

private Mono<R> resend(final Mono<R> clientResponse,
final ServerWebExchange exchange,
final Duration duration,
final HttpHeaders httpHeaders,
final Set<URI> exclude,
final int retryTimes) {
Mono<R> result = clientResponse;
// Retry according to the specified number of retries
for (int i = 0; i < retryTimes; i++) {
result = resend(result, exchange, duration, httpHeaders, exclude);
}
return result;
}

private Mono<R> resend(final Mono<R> response,
final ServerWebExchange exchange,
final Duration duration,
final HttpHeaders httpHeaders,
final Set<URI> exclude) {
return response.onErrorResume(th -> {
final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID);
final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE);
//Check available services
final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId)
.stream().filter(data -> {
final String trimUri = data.getUrl().trim();
for (URI needToExclude : exclude) {
// exclude already called
if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) {
return false;
}
}
return true;
}).collect(Collectors.toList());
if (CollectionUtils.isEmpty(upstreamList)) {
// no need to retry anymore
return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
}
// requets ip
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// Load Balance
final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip);
if (Objects.isNull(upstream)) {
// no need to retry anymore
return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
}
final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain());
// Exclude uri that has already been called
exclude.add(newUri);
// Make another call
return doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, httpHeaders, exchange.getRequest().getBody())
.timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
.doOnError(e -> LOG.error(e.getMessage(), e));
});
}

//......
}

  • org.apache.shenyu.plugin.httpclient.WebClientPlugin#doRequest()

Initiate a real request call via webClient in the doRequest() method.


@Override
protected Mono<ClientResponse> doRequest(final ServerWebExchange exchange, final String httpMethod, final URI uri,
final HttpHeaders httpHeaders, final Flux<DataBuffer> body) {
return webClient.method(HttpMethod.valueOf(httpMethod)).uri(uri) // uri
.headers(headers -> headers.addAll(httpHeaders)) // header
.body(BodyInserters.fromDataBuffers(body))
.exchange() // request
.doOnSuccess(res -> {
if (res.statusCode().is2xxSuccessful()) { // success
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
} else { // error
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
}
exchange.getResponse().setStatusCode(res.statusCode());
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
});
}

2.5 Response Result

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

The response results are handled by the ResponsePlugin plugin.

    @Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// Processing results according to rpc type
return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
}

The processing type is determined by MessageWriter and the class inheritance relationship is as follows.

  • MessageWriter: interface, defining message processing methods.
  • NettyClientMessageWriter: processing of Netty call results.
  • RPCMessageWriter: processing the results of RPC calls.
  • WebClientMessageWriter: processing WebClient call results.

The default is to initiate http requests via WebCient.

  • org.apache.shenyu.plugin.response.strategy.WebClientMessageWriter#writeWith()

Process the response results in the writeWith() method.


@Override
public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
// get response
ServerHttpResponse response = exchange.getResponse();
ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
if (Objects.isNull(clientResponse)) {
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
//cookies and headers
response.getCookies().putAll(clientResponse.cookies());
response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
// image, pdf or stream does not do format processing.
// Handling special response types
if (clientResponse.headers().contentType().isPresent()) {
final String media = clientResponse.headers().contentType().get().toString().toLowerCase();
if (media.matches(COMMON_BIN_MEDIA_TYPE_REGEX)) {
return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()))
.doOnCancel(() -> clean(exchange));
}
}
// Handling general response types
clientResponse = ResponseUtils.buildClientResponse(response, clientResponse.body(BodyExtractors.toDataBuffers()));
return clientResponse.bodyToMono(byte[].class)
.flatMap(originData -> WebFluxResultUtils.result(exchange, originData))
.doOnCancel(() -> clean(exchange));
}));
}

Analysis to this point, the source code analysis on Divide plugin is complete, the analysis flow chart is as follows.

3. Summary

The source code analysis in this article starts from the http service registration to the divide plugin service calls. The divide plugin is mainly used to handle http requests. Some of the source code does not enter the in-depth analysis, such as the implementation of load balancing, service probe live, will continue to analyze in the following.

· 22 min read

Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.

The Apache ShenYu gateway uses the dubbo plugin to make calls to the dubbo service. You can see the official documentation Dubbo Quick Start to learn how to use the plugin.

This article is based on shenyu-2.4.3 version for source code analysis, please refer to Dubbo Service Access for the introduction of the official website.

1. Service Registration

Take the example provided on the official website shenyu-examples-dubbo. Suppose your dubbo service is defined as follows (spring-dubbo.xml).

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
https://code.alibabatech.com/schema/dubbo/dubbo.xsd">

<dubbo:application name="test-dubbo-service"/>
<dubbo:registry address="${dubbo.registry.address}"/>
<dubbo:protocol name="dubbo" port="20888"/>

<dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>

</beans>

Declare the application service name, register the center address, use the dubbo protocol, declare the service interface, and the corresponding interface implementation class.

/**
* DubboTestServiceImpl.
*/
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {

@Override
@ShenyuDubboClient(path = "/findById", desc = "Query by Id")
public DubboTest findById(final String id) {
return new DubboTest(id, "hello world shenyu Apache, findById");
}

//......
}

In the interface implementation class, use the annotation @ShenyuDubboClient to register the service with shenyu-admin. The role of this annotation and its rationale will be analyzed later.

The configuration information in the configuration file application.yml.

server:
port: 8011
address: 0.0.0.0
servlet:
context-path: /
spring:
main:
allow-bean-definition-overriding: true
dubbo:
registry:
address: zookeeper://localhost:2181 # dubbo registry

shenyu:
register:
registerType: http
serverLists: http://localhost:9095
props:
username: admin
password: 123456
client:
dubbo:
props:
contextPath: /dubbo
appName: dubbo

In the configuration file, declare the registry address used by dubbo. The dubbo service registers with shenyu-admin, using the method http, and the registration address is http://localhost:9095.

See Application Client Access for more information on the use of the registration method.

1.1 Declaration of registration interface

Use the annotation @ShenyuDubboClient to register the service to the gateway. The simple demo is as follows.

// dubbo sevice
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {

@Override
@ShenyuDubboClient(path = "/findById", desc = "Query by Id") // need to be registered method
public DubboTest findById(final String id) {
return new DubboTest(id, "hello world shenyu Apache, findById");
}

//......
}

annotation definition:

/**
* Works on classes and methods
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Inherited
public @interface ShenyuDubboClient {

//path
String path();

//rule name
String ruleName() default "";

//desc
String desc() default "";

//enabled
boolean enabled() default true;
}

1.2 Scan annotation information

Annotation scanning is done through the ApacheDubboServiceBeanListener, which implements the ApplicationListener<ContextRefreshedEvent> interface and starts executing the event handler method when a context refresh event occurs during the Spring container startup onApplicationEvent().

During constructor instantiation.

  • Read property configuration
  • Start the thread pool
  • Start the registry for registering with shenyu-admin
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {

// ......

//Constructor
public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
//1.Read property configuration
Properties props = clientConfig.getProps();
String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
String appName = props.getProperty(ShenyuClientConstants.APP_NAME);
if (StringUtils.isBlank(contextPath)) {
throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName");
}
this.contextPath = contextPath;
this.appName = appName;
this.host = props.getProperty(ShenyuClientConstants.HOST);
this.port = props.getProperty(ShenyuClientConstants.PORT);
//2.Start the thread pool
executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build());
//3.Start the registry for registering with `shenyu-admin`
publisher.start(shenyuClientRegisterRepository);
}

/**
* Context refresh event, execute method logic
*/
@Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
//......
}
  • ApacheDubboServiceBeanListener#onApplicationEvent()

Rewritten method logic: read Dubbo service ServiceBean, build metadata object and URI object, and register it with shenyu-admin.

    @Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
//read ServiceBean
Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class);
if (serviceBean.isEmpty()) {
return;
}
//The method is guaranteed to be executed only once
if (!registered.compareAndSet(false, true)) {
return;
}
//handle metadata
for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) {
handler(entry.getValue());
}
//handle URI
serviceBean.values().stream().findFirst().ifPresent(bean -> {
publisher.publishEvent(buildURIRegisterDTO(bean));
});
}
  • handler()

    In the handler() method, read all methods from the serviceBean, determine if there is a ShenyuDubboClient annotation on the method, build a metadata object if it exists, and register the method with shenyu-admin through the registry.

    private void handler(final ServiceBean<?> serviceBean) {
//get proxy
Object refProxy = serviceBean.getRef();
//get class
Class<?> clazz = refProxy.getClass();
if (AopUtils.isAopProxy(refProxy)) {
clazz = AopUtils.getTargetClass(refProxy);
}
//all methods
Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
for (Method method : methods) {
//read ShenyuDubboClient annotation
ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class);
if (Objects.nonNull(shenyuDubboClient)) {
//build meatdata and registry
publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method));
}
}
}
  • buildMetaDataDTO()

    Constructs a metadata object where the necessary information for method registration is constructed and subsequently used for selector or rule matching.

    private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) {
//app name
String appName = buildAppName(serviceBean);
//path
String path = contextPath + shenyuDubboClient.path();
//desc
String desc = shenyuDubboClient.desc();
//service name
String serviceName = serviceBean.getInterface();
//rule name
String configRuleName = shenyuDubboClient.ruleName();
String ruleName = ("".equals(configRuleName)) ? path : configRuleName;
//method name
String methodName = method.getName();
//parameter Types
Class<?>[] parameterTypesClazz = method.getParameterTypes();
String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(","));
return MetaDataRegisterDTO.builder()
.appName(appName)
.serviceName(serviceName)
.methodName(methodName)
.contextPath(contextPath)
.host(buildHost())
.port(buildPort(serviceBean))
.path(path)
.ruleName(ruleName)
.pathDesc(desc)
.parameterTypes(parameterTypes)
.rpcExt(buildRpcExt(serviceBean)) //dubbo ext
.rpcType(RpcTypeEnum.DUBBO.getName())
.enabled(shenyuDubboClient.enabled())
.build();
}
  • buildRpcExt()

    dubbo ext information.

   private String buildRpcExt(final ServiceBean serviceBean) {
DubboRpcExt build = DubboRpcExt.builder()
.group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//group
.version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//version
.loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//load balance
.retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//retry
.timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//time
.sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent
.cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//cluster
.url("")
.build();
return GsonUtils.getInstance().toJson(build);
}
  • buildURIRegisterDTO()

    Construct URI objects to register information about the service itself, which can be subsequently used for service probing live.

private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) {
return URIRegisterDTO.builder()
.contextPath(this.contextPath) //context path
.appName(buildAppName(serviceBean))//app name
.rpcType(RpcTypeEnum.DUBBO.getName())//dubbo
.host(buildHost()) //host
.port(buildPort(serviceBean))//port
.build();
}

The specific registration logic is implemented by the registration center, please refer to Client Access Principles .

//To the registration center, post registration events   
publisher.publishEvent();

1.3 Processing registration information

The metadata and URI data registered by the client through the registry are processed at the shenyu-admin end, which is responsible for storing to the database and synchronizing to the shenyu gateway. The client-side registration processing logic of the Dubbo plugin is in the ShenyuClientRegisterDubboServiceImpl. The inheritance relationship is as follows.

  • ShenyuClientRegisterService: client registration service, top-level interface.
  • FallbackShenyuClientRegisterService: registration failure, provides retry operation.
  • AbstractShenyuClientRegisterServiceImpl: abstract class, implements part of the public registration logic.
  • ShenyuClientRegisterDubboServiceImpl: implementation of the Dubbo plugin registration.
1.3.1 Registration Service
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    The metadata MetaDataRegisterDTO object registered by the client through the registry is picked up and dropped in the register() method of shenyu-admin.

   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
//2. register rule
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
//3. register metadata
registerMetadata(dto);
//4. register contextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
1.3.1.1 Register Selector
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

Construct contextPath, find if the selector information exists, if it does, return id; if it doesn't, create the default selector information.

    @Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
// build contextPath
String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
// Find if selector information exists by name
SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
if (Objects.isNull(selectorDO)) {
// Create a default selector message if it does not exist
return registerSelector(contextPath, pluginName, selectorHandler);
}
return selectorDO.getId();
}
  • Default selector information

    Construct the default selector information and its conditional properties here.

   //register selector
private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {
//build selector
SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
selectorDTO.setHandle(selectorHandler);
//register default selector
return registerDefault(selectorDTO);
}
//build selector
private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {
//build default
SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);
selectorDTO.setPluginId(pluginId);
//build the conditional properties of the default selector
selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));
return selectorDTO;
}
  • Build default selector
private SelectorDTO buildDefaultSelectorDTO(final String name) {
return SelectorDTO.builder()
.name(name) // name
.type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // default type cutom
.matchMode(MatchModeEnum.AND.getCode()) //default match mode
.enabled(Boolean.TRUE) //enable
.loged(Boolean.TRUE) //log
.continued(Boolean.TRUE)
.sort(1)
.build();
}
  • Build default selector conditional properties
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {
SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();
selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI
selectorConditionDTO.setParamName("/");
selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default match
selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX);
return Collections.singletonList(selectorConditionDTO);
}
  • Register default selector
@Override
public String registerDefault(final SelectorDTO selectorDTO) {
//selector information
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
//selector conditional properties
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
if (StringUtils.isEmpty(selectorDTO.getId())) {
// insert selector information into the database
selectorMapper.insertSelective(selectorDO);
// inserting selector conditional properties to the database
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
}
// Publish synchronization events to synchronize selection information and its conditional attributes to the gateway
publishEvent(selectorDO, selectorConditionDTOs);
return selectorDO.getId();
}
1.3.1.2 Registration Rules

In the second step of registering the service, start building the default rules and then register the rules.

@Override
public String register(final MetaDataRegisterDTO dto) {
//1. handle selector
//......

//2. handle rule

String ruleHandler = ruleHandler();
// build default rule
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
// register rule
ruleService.registerDefault(ruleDTO);

//3. reigster metadata
//......

//4. register ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • 默认规则处理属性
    @Override
protected String ruleHandler() {
// default rule
return new DubboRuleHandle().toJson();
}

Dubbo plugin default rule handling properties.

public class DubboRuleHandle implements RuleHandle {

/**
* dubbo version.
*/
private String version;

/**
* group.
*/
private String group;

/**
* retry.
*/
private Integer retries = 0;

/**
* loadbalance:RANDOM
*/
private String loadbalance = LoadBalanceEnum.RANDOM.getName();

/**
* timeout default 3000
*/
private long timeout = Constants.TIME_OUT;
}
  • build default rule
  // build default rule
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}
// build default rule
private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
RuleDTO ruleDTO = RuleDTO.builder()
.selectorId(selectorId)
.name(ruleName)
.matchMode(MatchModeEnum.AND.getCode())
.enabled(Boolean.TRUE)
.loged(Boolean.TRUE)
.sort(1)
.handle(ruleHandler)
.build();
RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
.paramType(ParamTypeEnum.URI.getName())
.paramName("/")
.paramValue(path)
.build();
if (path.indexOf("*") > 1) {
ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias());
} else {
ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias());
}
ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
return ruleDTO;
}
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

Registration rules: insert records to the database and publish events to the gateway for data synchronization.


@Override
public String registerDefault(final RuleDTO ruleDTO) {
RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());
if (Objects.nonNull(exist)) {
return "";
}

RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
if (StringUtils.isEmpty(ruleDTO.getId())) {
// insert rule information into the database
ruleMapper.insertSelective(ruleDO);
//insert rule body conditional attributes into the database
ruleConditions.forEach(ruleConditionDTO -> {
ruleConditionDTO.setRuleId(ruleDO.getId());
ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));
});
}
// Publish events to the gateway for data synchronization
publishEvent(ruleDO, ruleConditions);
return ruleDO.getId();
}

1.3.1.3 Register Metadata

Metadata is mainly used for RPC service calls.

   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. register selector
//......

//2. register rule
//......

//3. register metadata
registerMetadata(dto);

//4. register ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDubboServiceImpl#registerMetadata()

    Insert or update metadata and then publish sync events to the gateway.

    @Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {
// get metaDataService
MetaDataService metaDataService = getMetaDataService();
MetaDataDO exist = metaDataService.findByPath(dto.getPath());
//insert or update metadata
metaDataService.saveOrUpdateMetaData(exist, dto);
}

@Override
public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {
DataEventTypeEnum eventType;
// DTO->DO
MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);
// insert data
if (Objects.isNull(exist)) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());
metaDataDO.setDateCreated(currentTime);
metaDataDO.setDateUpdated(currentTime);
metaDataMapper.insert(metaDataDO);
eventType = DataEventTypeEnum.CREATE;
} else {
// update
metaDataDO.setId(exist.getId());
metaDataMapper.update(metaDataDO);
eventType = DataEventTypeEnum.UPDATE;
}
// Publish sync events to gateway
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,
Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
}
1.3.2 Register URI
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

The server side receives the URI information registered by the client and processes it.

    @Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
String result;
String key = key(selectorName);
try {
this.removeFallBack(key);
// register URI
result = this.doRegisterURI(selectorName, uriList);
logger.info("Register success: {},{}", selectorName, uriList);
} catch (Exception ex) {
logger.warn("Register exception: cause:{}", ex.getMessage());
result = "";
// Retry after registration failure
this.addFallback(key, new FallbackHolder(selectorName, uriList));
}
return result;
}
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

Get a valid URI from the URI registered by the client, update the corresponding selector handle property, and send a selector update event to the gateway.

@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
//check
if (CollectionUtils.isEmpty(uriList)) {
return "";
}

SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
throw new ShenyuException("doRegister Failed to execute,wait to retry.");
}
// gte valid URI
List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
// build handle
String handler = buildHandle(validUriList, selectorDO);
if (handler != null) {
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// Update the handle property of the selector to the database
selectorService.updateSelective(selectorDO);
// Send selector update events to the gateway
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
}
return ShenyuResultMessage.SUCCESS;
}

The source code analysis on service registration is completed as well as the analysis flow chart is as follows.

The next step is to analyze how the dubbo plugin initiates calls to the http service based on this information.

2. Service Invocation

The dubbo plugin is the core processing plugin used by the ShenYu gateway to convert http requests into the dubbo protocol and invoke the dubbo service.

Take the case provided by the official website Quick Start with Dubbo as an example, a dubbo service is registered with shenyu-admin through the registry, and then requested through the ShenYu gateway proxy, the request is as follows.

GET http://localhost:9195/dubbo/findById?id=100
Accept: application/json

The class inheritance relationship in the Dubbo plugin is as follows.

  • ShenyuPlugin: top-level interface, defining interface methods.
  • AbstractShenyuPlugin: abstract class that implements plugin common logic.
  • AbstractDubboPlugin: dubbo plugin abstract class, implementing dubbo common logic.
  • ApacheDubboPlugin: ApacheDubbo plugin.

ShenYu Gateway supports ApacheDubbo and AlibabaDubbo\

2.1 Receive requests

After passing the ShenYu gateway proxy, the request entry is ShenyuWebHandler, which implements the org.springframework.web.server.WebHandler interface.

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {
//......

/**
* hanlde request
*/
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
// execute default plugin chain
Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);
if (scheduled) {
return execute.subscribeOn(scheduler);
}
return execute;
}

private static class DefaultShenyuPluginChain implements ShenyuPluginChain {

private int index;

private final List<ShenyuPlugin> plugins;


DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {
this.plugins = plugins;
}

/**
* execute.
*/
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
// get plugin
ShenyuPlugin plugin = plugins.get(this.index++);
boolean skip = plugin.skip(exchange);
if (skip) {
// next
return this.execute(exchange);
}
// execute
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
}

2.2 Match Rule

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

Execute the matching logic for selectors and rules in the execute() method.

  • Matching selectors.
  • Matching rules.
  • Execute the plugin.
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// plugin name
String pluginName = named();
// plugin data
PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
// selector data
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
// match selector
SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
// rule data
List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
// match rule
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
// execute
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}

2.3 Execute GlobalPlugin

  • org.apache.shenyu.plugin.global.GlobalPlugin#execute()

GlobalPlugin is a global plugin that constructs contextual information in the execute() method.

public class GlobalPlugin implements ShenyuPlugin {
// shenyu context
private final ShenyuContextBuilder builder;

//......

@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// build context information to be passed into the exchange
ShenyuContext shenyuContext = builder.build(exchange);
exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
return chain.execute(exchange);
}

//......
}
  • org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()

Build the default context information.

public class DefaultShenyuContextBuilder implements ShenyuContextBuilder {
//......

@Override
public ShenyuContext build(final ServerWebExchange exchange) {
//build data
Pair<String, MetaData> buildData = buildData(exchange);
//wrap ShenyuContext
return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());
}

private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {
//......
//get the metadata according to the requested uri
MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());
if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {
exchange.getAttributes().put(Constants.META_DATA, metaData);
return Pair.of(metaData.getRpcType(), metaData);
} else {
return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());
}
}
//set the default context information
private ShenyuContext buildDefaultContext(final ServerHttpRequest request) {
String appKey = request.getHeaders().getFirst(Constants.APP_KEY);
String sign = request.getHeaders().getFirst(Constants.SIGN);
String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);
ShenyuContext shenyuContext = new ShenyuContext();
String path = request.getURI().getPath();
shenyuContext.setPath(path);
shenyuContext.setAppKey(appKey);
shenyuContext.setSign(sign);
shenyuContext.setTimestamp(timestamp);
shenyuContext.setStartDateTime(LocalDateTime.now());
Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name()));
return shenyuContext;
}
}
  • org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()

wrap ShenyuContext:

public class DubboShenyuContextDecorator implements ShenyuContextDecorator {

@Override
public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) {
shenyuContext.setModule(metaData.getAppName());
shenyuContext.setMethod(metaData.getServiceName());
shenyuContext.setContextPath(metaData.getContextPath());
shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName());
return shenyuContext;
}

@Override
public String rpcType() {
return RpcTypeEnum.DUBBO.getName();
}
}

2.4 Execute RpcParamTransformPlugin

The RpcParamTransformPlugin is responsible for reading the parameters from the http request, saving them in the exchange and passing them to the rpc service.

  • org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()

In the execute() method, the core logic of the plugin is executed: get the request information from exchange and process the parameters according to the form of content passed in by the request.

public class RpcParamTransformPlugin implements ShenyuPlugin {

@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
//get request information from exchange
ServerHttpRequest request = exchange.getRequest();
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
if (Objects.nonNull(shenyuContext)) {
// APPLICATION_JSON
MediaType mediaType = request.getHeaders().getContentType();
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
return body(exchange, request, chain);
}
// APPLICATION_FORM_URLENCODED
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
return formData(exchange, request, chain);
}
//query
return query(exchange, request, chain);
}
return chain.execute(exchange);
}

//APPLICATION_JSON
private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())
.flatMap(body -> {
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中
return chain.execute(exchange);
}));
}
// APPLICATION_FORM_URLENCODED
private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())
.flatMap(map -> {
String param = resolveBodyFromRequest(map);
LinkedMultiValueMap<String, String> linkedMultiValueMap;
try {
linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据
} catch (UnsupportedEncodingException e) {
return Mono.error(e);
}
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中
return chain.execute(exchange);
}));
}
//query
private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中
return chain.execute(exchange);
}
//......
}

2.5 Execute DubboPlugin

  • org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()

In the doExecute() method, the main purpose is to check the metadata and parameters.

public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin {

@Override
public Mono<Void> doExecute(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule) {
//param
String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);
//context
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
//metaData
MetaData metaData = exchange.getAttribute(Constants.META_DATA);
//check metaData
if (!checkMetaData(metaData)) {
LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
//check
if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null);
return WebFluxResultUtils.result(exchange, error);
}
//set rpcContext
this.rpcContext(exchange);
//dubbo invoke
return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);
}
}
  • org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()

Set special context information in the doDubboInvoker() method, and then start the dubbo generalization call.

public class ApacheDubboPlugin extends AbstractDubboPlugin {

@Override
protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule,
final MetaData metaData,
final String param) {
//set the current selector and rule information, and request address for dubbo graying support
RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());
RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());
RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());
//dubbo generic invoker
final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);
//execute next plugin in chain
return result.then(chain.execute(exchange));
}
}
  • org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()

genericInvoker() method.

  • Gets the ReferenceConfig object.
  • Gets the generalization service GenericService object.
  • Constructs the request parameter pair object.
  • Initiates an asynchronous generalization invocation.
public class ApacheDubboProxyService {
//......

/**
* Generic invoker object.
*/
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {
//1.Get the ReferenceConfig object
ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath());

if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {
//Failure of the current cache information
ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());
//Reinitialization with metadata
reference = ApacheDubboConfigCache.getInstance().initRef(metaData);
}
//2.Get the GenericService object of the generalization service
GenericService genericService = reference.get();
//3.Constructing the request parameter pair object
Pair<String[], Object[]> pair;
if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) {
pair = new ImmutablePair<>(new String[]{}, new Object[]{});
} else {
pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
}
//4.Initiating asynchronous generalization calls
return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {
//handle result
if (Objects.isNull(ret)) {
ret = Constants.DUBBO_RPC_RESULT_EMPTY;
}
exchange.getAttributes().put(Constants.RPC_RESULT, ret);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
return ret;
})).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常
}

//Generalized calls, asynchronous operations
private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {
genericService.$invoke(method, parameterTypes, args);
Object resultFromFuture = RpcContext.getContext().getFuture();
return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);
}
}

Calling the dubbo service at the gateway can be achieved by generalizing the call.

The ReferenceConfig object is the key object to support generalization calls , and its initialization operation is done during data synchronization. There are two parts of data involved here, one is the synchronized plugin handler information and the other is the synchronized plugin metadata information.

  • org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()

When the plugin data is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The initialization operation is performed in handlerPlugin().

public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {
//......

//Initializing the configuration cache
protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);

@Override
public void handlerPlugin(final PluginData pluginData) {
if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {
//Data deserialization
DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);
DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);
if (Objects.isNull(dubboRegisterConfig)) {
return;
}
if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {
// Perform initialization operations
this.initConfigCache(dubboRegisterConfig);
}
Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);
}
}
//......
}
  • org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()

Perform initialization operations.

public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {

@Override
protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {
//perform initialization operations
ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);
//cached results before failure
ApacheDubboConfigCache.getInstance().invalidateAll();
}
}

  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()

In the initialization, set registryConfig and consumerConfig.

public final class ApacheDubboConfigCache extends DubboConfigCache {
//......
/**
* init
*/
public void init(final DubboRegisterConfig dubboRegisterConfig) {
//ApplicationConfig
if (Objects.isNull(applicationConfig)) {
applicationConfig = new ApplicationConfig("shenyu_proxy");
}
//When the protocol or address changes, you need to update the registryConfig
if (needUpdateRegistryConfig(dubboRegisterConfig)) {
RegistryConfig registryConfigTemp = new RegistryConfig();
registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol());
registryConfigTemp.setId("shenyu_proxy");
registryConfigTemp.setRegister(false);
registryConfigTemp.setAddress(dubboRegisterConfig.getRegister()); Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup);
registryConfig = registryConfigTemp;
}
//ConsumerConfig
if (Objects.isNull(consumerConfig)) {
consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.refresh();
return consumerConfig;
});

//ConsumerConfig
Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool);
Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads);
Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads);
Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues);
}
}

//Does the registration configuration need to be updated
private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) {
if (Objects.isNull(registryConfig)) {
return true;
}
return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol())
|| !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress())
|| !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol());
}

//......
}
  • org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()

When the metadata is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The metadata update operation is performed in the onSubscribe() method.

public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber {
//local memory cache
private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();

//update metaData
public void onSubscribe(final MetaData metaData) {
// dubbo
if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
//Whether the corresponding metadata exists
MetaData exist = META_DATA.get(metaData.getPath());
if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) {
// initRef
ApacheDubboConfigCache.getInstance().initRef(metaData);
} else {
// The corresponding metadata has undergone an update operation
if (!Objects.equals(metaData.getServiceName(), exist.getServiceName())
|| !Objects.equals(metaData.getRpcExt(), exist.getRpcExt())
|| !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes())
|| !Objects.equals(metaData.getMethodName(), exist.getMethodName())) {
//Build ReferenceConfig again based on the latest metadata
ApacheDubboConfigCache.getInstance().build(metaData);
}
}
//local memory cache
META_DATA.put(metaData.getPath(), metaData);
}
}

//dalete
public void unSubscribe(final MetaData metaData) {
if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
//使ReferenceConfig失效
ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());
META_DATA.remove(metaData.getPath());
}
}
}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()

Build ReferenceConfig objects from metaData.

public final class ApacheDubboConfigCache extends DubboConfigCache {
//......

public ReferenceConfig<GenericService> initRef(final MetaData metaData) {
try {
//First try to get it from the cache, and return it directly if it exists
ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());
if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {
return referenceConfig;
}
} catch (ExecutionException e) {
LOG.error("init dubbo ref exception", e);
}

//build if not exist
return build(metaData);
}

/**
* Build reference config.
*/
@SuppressWarnings("deprecation")
public ReferenceConfig<GenericService> build(final MetaData metaData) {
if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) {
return new ReferenceConfig<>();
}
ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //ReferenceConfig
reference.setGeneric("true"); //generic invoke
reference.setAsync(true);//async

reference.setApplication(applicationConfig);//applicationConfig
reference.setRegistry(registryConfig);//registryConfig
reference.setConsumer(consumerConfig);//consumerConfig
reference.setInterface(metaData.getServiceName());//serviceName
reference.setProtocol("dubbo");//dubbo
reference.setCheck(false);
reference.setLoadbalance("gray");//gray

Map<String, String> parameters = new HashMap<>(2);
parameters.put("dispatcher", "direct");
reference.setParameters(parameters);

String rpcExt = metaData.getRpcExt();//rpc ext param
DubboParam dubboParam = parserToDubboParam(rpcExt);
if (Objects.nonNull(dubboParam)) {
if (StringUtils.isNoneBlank(dubboParam.getVersion())) {
reference.setVersion(dubboParam.getVersion());//version
}
if (StringUtils.isNoneBlank(dubboParam.getGroup())) {
reference.setGroup(dubboParam.getGroup());//group
}
if (StringUtils.isNoneBlank(dubboParam.getUrl())) {
reference.setUrl(dubboParam.getUrl());//url
}
if (StringUtils.isNoneBlank(dubboParam.getCluster())) {
reference.setCluster(dubboParam.getCluster());
}
Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout
Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires
Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent
}
try {
//get GenericService
Object obj = reference.get();
if (Objects.nonNull(obj)) {
LOG.info("init apache dubbo reference success there meteData is :{}", metaData);
//cache reference
cache.put(metaData.getPath(), reference);
}
} catch (Exception e) {
LOG.error("init apache dubbo reference exception", e);
}
return reference;
}
//......
}

2.6 Execute ResponsePlugin

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

The response results are handled by the ResponsePlugin plugin.

    @Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// handle results according to rpc type
return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
}

The processing type is determined by MessageWriter and the class inheritance relationship is as follows.

  • MessageWriter: interface, defining message processing methods.
  • NettyClientMessageWriter: processing of Netty call results.
  • RPCMessageWriter: processing the results of RPC calls.
  • WebClientMessageWriter: processing the results of WebClient calls.

Dubbo service call, the processing result is RPCMessageWriter of course.

  • org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()

Process the response results in the writeWith() method.


public class RPCMessageWriter implements MessageWriter {

@Override
public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
Object result = exchange.getAttribute(Constants.RPC_RESULT); //result
if (Objects.isNull(result)) {
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
return WebFluxResultUtils.result(exchange, result);
}));
}
}

At this point in the analysis, the source code analysis of the Dubbo plugin is complete, and the analysis flow chart is as follows.

3. Summary

The source code analysis in this article starts from Dubbo service registration to Dubbo plug-in service calls. The Dubbo plugin is mainly used to handle the conversion of http requests to the dubbo protocol, and the main logic is implemented through generalized calls.

· 11 min read

In the Shenyu gateway, when you start this plugin, Shenyu becomes a fully-featured McpServer.
You can easily register a service as a tool within the Shenyu gateway by simple configuration and use the extended functions the gateway offers.

This article is based on version shenyu-2.7.0.2. Here, I will track the Shenyu Mcp plugin chain and analyze the source code of its SSE communication.

Introduction

The Shenyu gateway's Mcp plugin is built on top of the spring-ai-mcp extension. To better understand how the Mcp plugin works, I’ll briefly introduce how some official Mcp Java classes collaborate within its JDK.

I want to start by introducing three key official Mcp Java classes:

  1. McpServer
    This class manages resources like tools, Resource, promote, etc.
  2. TransportProvider
    Provides corresponding communication methods based on client-server communication protocols.
  3. Session
    Handles request data, response data, and notifications, offers some basic methods and corresponding handlers, and executes tool queries and calls here.

1. Service Registration

In Shenyu Admin, after filling in endpoint and tool information for the McpServer plugin, this info is automatically registered into Shenyu bootstrap.
You can refer to the official websocket data sync source code for details.

Shenyu bootstrap receives the data synced from admin in the handler() method of McpServerPluginDataHandler.

  • handlerSelector() receives URL data and creates McpServer.
  • handlerRule() receives tool info and registers tools.

These two methods together form the service registration part of the Shenyu Mcp plugin. Below, I will analyze these two methods in detail.

1.1 Transport and McpServer Registration

Let’s analyze the handlerSelector() method, which handles McpServer registration.

  • What handlerSelector() does:
public class McpServerPluginDataHandler implements PluginDataHandler {
@Override
public void handlerSelector(final SelectorData selectorData) {
// Get URI
String uri = selectorData.getConditionList().stream()
.filter(condition -> Constants.URI.equals(condition.getParamType()))
.map(ConditionData::getParamValue)
.findFirst()
.orElse(null);

// Build McpServer
ShenyuMcpServer shenyuMcpServer = GsonUtils.getInstance().fromJson(Objects.isNull(selectorData.getHandle()) ? DEFAULT_MESSAGE_ENDPOINT : selectorData.getHandle(), ShenyuMcpServer.class);
shenyuMcpServer.setPath(path);
// Cache shenyuMcpServer
CACHED_SERVER.get().cachedHandle(
selectorData.getId(),
shenyuMcpServer);
String messageEndpoint = shenyuMcpServer.getMessageEndpoint();
// Try to get or register transportProvider
shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint);
}

}

ShenyuMcpServerManager is the management center of McpServer in Shenyu. It not only stores McpAsyncServer, CompositeTransportProvider, etc., but also contains methods to register Transport and McpServer.

  • The getOrCreateMcpServerTransport() method works as follows:
@Component
public class ShenyuMcpServerManager {
public ShenyuSseServerTransportProvider getOrCreateMcpServerTransport(final String uri, final String messageEndPoint) {
// Remove /streamablehttp and /message suffixes
String normalizedPath = processPath(uri);
return getOrCreateTransport(normalizedPath, SSE_PROTOCOL,
() -> createSseTransport(normalizedPath, messageEndPoint));
}

private <T> T getOrCreateTransport(final String normalizedPath, final String protocol,
final java.util.function.Supplier<T> transportFactory) {
// Get composite Transport instance
CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(normalizedPath);

T transport = switch (protocol) {
case SSE_PROTOCOL -> (T) compositeTransport.getSseTransport();
case STREAMABLE_HTTP_PROTOCOL -> (T) compositeTransport.getStreamableHttpTransport();
default -> null;
};
// If instance is missing in cache, create a new one
if (Objects.isNull(transport)) {
// Call createSseTransport() to create and store a new transport
transport = transportFactory.get();
// Create McpAsyncServer and register the transport
addTransportToSharedServer(normalizedPath, protocol, transport);
}

return transport;
}
}
1.1.1 Transport Registration
  • createSseTransport() method

This method is called within getOrCreateMcpServerTransport() and is used to create a Transport

@Component
public class ShenyuMcpServerManager {
private ShenyuSseServerTransportProvider createSseTransport(final String normalizedPath, final String messageEndPoint) {
String messageEndpoint = normalizedPath + messageEndPoint;
ShenyuSseServerTransportProvider transportProvider = ShenyuSseServerTransportProvider.builder()
.objectMapper(objectMapper)
.sseEndpoint(normalizedPath)
.messageEndpoint(messageEndpoint)
.build();
// Register the two functions of transportProvider to the Manager's routeMap
registerRoutes(normalizedPath, messageEndpoint, transportProvider::handleSseConnection, transportProvider::handleMessage);
return transportProvider;
}
}
1.1.2 McpServer Registration
  • addTransportToSharedServer() method

This method is called within getOrCreateMcpServerTransport() and is used to create and save McpServer

This method creates a new McpServer, stores it in sharedServerMap, and saves the TransportProvider obtained above into compositeTransportMap.

@Component
public class ShenyuMcpServerManager {
private void addTransportToSharedServer(final String normalizedPath, final String protocol, final Object transportProvider) {
// Get or create and register McpServer
getOrCreateSharedServer(normalizedPath);

// Save the new transport protocol into compositeTransportMap
compositeTransport.addTransport(protocol, transportProvider);

}

private McpAsyncServer getOrCreateSharedServer(final String normalizedPath) {
return sharedServerMap.computeIfAbsent(normalizedPath, path -> {
// Get transport protocols
CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(path);

// Select server capabilities
var capabilities = McpSchema.ServerCapabilities.builder()
.tools(true)
.logging()
.build();

// Create and store McpServer
McpAsyncServer server = McpServer
.async(compositeTransport)
.serverInfo("MCP Shenyu Server (Multi-Protocol)", "1.0.0")
.capabilities(capabilities)
.tools(Lists.newArrayList())
.build();

return server;
});
}
}

1.2 Tools Registration

  • handlerRule() method works as follows:
  1. Captures the tool configuration info users fill in for the Tool, all used to build the tool
  2. Deserializes to create ShenyuMcpServerTool and obtains tool info

Note: ShenyuMcpServerTool is also a Shenyu-side object for storing tool info, unrelated by inheritance to McpServerTool

  1. Calls addTool() method to create the tool using this info and registers the tool to the matching McpServer based on SelectorId
public class McpServerPluginDataHandler implements PluginDataHandler {
@Override
public void handlerRule(final RuleData ruleData) {
Optional.ofNullable(ruleData.getHandle()).ifPresent(s -> {
// Deserialize a new ShenyuMcpServerTool
ShenyuMcpServerTool mcpServerTool = GsonUtils.getInstance().fromJson(s, ShenyuMcpServerTool.class);
// Cache mcpServerTool
CACHED_TOOL.get().cachedHandle(CacheKeyUtils.INST.getKey(ruleData), mcpServerTool);
// Build MCP schema
List<McpServerToolParameter> parameters = mcpServerTool.getParameters();
String inputSchema = JsonSchemaUtil.createParameterSchema(parameters);
ShenyuMcpServer server = CACHED_SERVER.get().obtainHandle(ruleData.getSelectorId());
if (Objects.nonNull(server)) {
// Save tool info into Manager's sharedServerMap
shenyuMcpServerManager.addTool(server.getPath(),
StringUtils.isBlank(mcpServerTool.getName()) ? ruleData.getName()
: mcpServerTool.getName(),
mcpServerTool.getDescription(),
mcpServerTool.getRequestConfig(),
inputSchema);
}
});
}
}
  • addTool() method

This method is called by handlerRule() to add a new tool

This method performs:

  1. Converts the previous tool info into a shenyuToolDefinition object
  2. Creates a ShenyuToolCallback object using the converted shenyuToolDefinition

ShenyuToolCallback overrides the call() method of ToolCallBack and registers this overridden method to AsyncToolSpecification, so calling the tool's call() will actually invoke this overridden method

  1. Converts ShenyuToolCallback to AsyncToolSpecification and registers it to the corresponding McpServer
public class McpServerPluginDataHandler implements PluginDataHandler {
public void addTool(final String serverPath, final String name, final String description,
final String requestTemplate, final String inputSchema) {
String normalizedPath = normalizeServerPath(serverPath);
// Build Definition object
ToolDefinition shenyuToolDefinition = ShenyuToolDefinition.builder()
.name(name)
.description(description)
.requestConfig(requestTemplate)
.inputSchema(inputSchema)
.build();

ShenyuToolCallback shenyuToolCallback = new ShenyuToolCallback(shenyuToolDefinition);

// Get previously registered McpServer and register the Tool
McpAsyncServer sharedServer = sharedServerMap.get(normalizedPath);
for (AsyncToolSpecification asyncToolSpecification : McpToolUtils.toAsyncToolSpecifications(shenyuToolCallback)) {
sharedServer.addTool(asyncToolSpecification).block();
}
}
}

With this, service registration analysis is complete.

Service registration overview diagram

2. Plugin Execution

Clients will send two types of messages with /sse and /message suffixes. These messages are captured by the Shenyu McpServer plugin, which handles them differently. When receiving /sse messages, the plugin creates and saves a session object, then returns a session id for /message usage. When receiving /message messages, the plugin executes methods based on the method info carried by the /message message, such as fetching work lists, tool invocation, and resource lists.

  • doExecute() method works as follows:
  1. Matches the path and checks if the Mcp plugin registered it
  2. Calls routeByProtocol() to choose the appropriate handling plan based on the request protocol

This article focuses on the SSE request mode, so we enter the handleSseRequest() method

public class McpServerPlugin extends AbstractShenyuPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule) {
final String uri = exchange.getRequest().getURI().getRawPath();
// Check if Mcp plugin registered this route; if not, continue chain without handling
if (!shenyuMcpServerManager.canRoute(uri)) {
return chain.execute(exchange);
}
final ServerRequest request = ServerRequest.create(exchange, messageReaders);
// Choose handling method based on URI protocol
return routeByProtocol(exchange, chain, request, selector, uri);
}

private Mono<Void> routeByProtocol(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final ServerRequest request,
final SelectorData selector,
final String uri) {

if (isStreamableHttpProtocol(uri)) {
return handleStreamableHttpRequest(exchange, chain, request, uri);
} else if (isSseProtocol(uri)) {
// Handle SSE requests
return handleSseRequest(exchange, chain, request, selector, uri);
}
}
}
  • handleSseRequest() method

Called by routeByProtocol() to determine if the client wants to create a session or call a tool based on URI suffix

public class McpServerPlugin extends AbstractShenyuPlugin {
private Mono<Void> handleSseRequest(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final ServerRequest request,
final SelectorData selector,
final String uri) {
ShenyuMcpServer server = McpServerPluginDataHandler.CACHED_SERVER.get().obtainHandle(selector.getId());
String messageEndpoint = server.getMessageEndpoint();
// Get the transport provider
ShenyuSseServerTransportProvider transportProvider
= shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint);
// Determine if the request is an SSE connection or a message call
if (uri.endsWith(messageEndpoint)) {
setupSessionContext(exchange, chain);
return handleMessageEndpoint(exchange, transportProvider, request);
} else {
return handleSseEndpoint(exchange, transportProvider, request);
}
}
}

2.1 Client Sends SSE Request

If the client sends a request ending with /sse, the handleSseEndpoint() method is executed

  • handleSseEndpoint() mainly does:
  1. Sets SSE request headers
  2. Calls ShenyuSseServerTransportProvider.createSseFlux() to create the SSE stream
public class McpServerPlugin extends AbstractShenyuPlugin {
private Mono<Void> handleSseEndpoint(final ServerWebExchange exchange,
final ShenyuSseServerTransportProvider transportProvider,
final ServerRequest request) {
// Configure SSE request headers
configureSseHeaders(exchange);

// Create SSE stream
return exchange.getResponse()
.writeWith(transportProvider
.createSseFlux(request));
}
}
  • createSseFlux() method

Called by handleSseEndpoint(); mainly used to create and save a session

  1. Creates session; the session factory registers a series of handlers, which are the objects actually executing tool calls
  2. Saves the session for reuse
  3. Sends the session id as a parameter of the endpoint URL back to the client, to be used when calling the message endpoint
public class ShenyuSseServerTransportProvider implements McpServerTransportProvider {
public Flux<ServerSentEvent<?>> createSseFlux(final ServerRequest request) {
return Flux.<ServerSentEvent<?>>create(sink -> {
WebFluxMcpSessionTransport sessionTransport = new WebFluxMcpSessionTransport(sink);
// Create McpServerSession and temporarily store plugin chain info
McpServerSession session = sessionFactory.create(sessionTransport);
String sessionId = session.getId();
sessions.put(sessionId, session);

// Send session id info back to client
String endpointUrl = this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId;
ServerSentEvent<String> endpointEvent = ServerSentEvent.<String>builder()
.event(ENDPOINT_EVENT_TYPE)
.data(endpointUrl)
.build();
}).doOnSubscribe(subscription -> LOGGER.info("SSE Flux subscribed"))
.doOnRequest(n -> LOGGER.debug("SSE Flux requested {} items", n));
}
}

2.2 Client Sends Message Request

If the client sends a request ending with /message, the current ShenyuPluginChain info is saved into the session, and handleMessageEndpoint() is called.
Subsequent tool calls continue executing this plugin chain, so plugins after the Mcp plugin will affect tool requests.

  • handleMessageEndpoint() method, calls ShenyuSseServerTransportProvider.handleMessageEndpoint() to process
if (uri.endsWith(messageEndpoint)) {
setupSessionContext(exchange, chain);
return handleMessageEndpoint(exchange, transportProvider, request);
}
public class McpServerPlugin extends AbstractShenyuPlugin {
private Mono<Void> handleMessageEndpoint(final ServerWebExchange exchange,
final ShenyuSseServerTransportProvider transportProvider,
final ServerRequest request) {
// Handle message requests
return transportProvider.handleMessageEndpoint(request)
.flatMap(result -> {
return exchange.getResponse()
.writeWith(Mono.just(exchange.getResponse().bufferFactory().wrap(responseBody.getBytes())));
});
}
}
  • handleMessageEndpoint() method

Called by McpServerPlugin.handleMessageEndpoint(), hands over the request to the session for processing

The session's handler() method performs different actions depending on the message.
For example, when the method in the message is "tools/call", the tool invocation handler executes the call() method to call the tool.
The related source is omitted here.

public class ShenyuSseServerTransportProvider implements McpServerTransportProvider {
public Mono<MessageHandlingResult> handleMessageEndpoint(final ServerRequest request) {
// Get session
String sessionId = request.queryParam("sessionId").get();
McpServerSession session = sessions.get(sessionId);
return request.bodyToMono(String.class)
.flatMap(body -> {
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);
return session.handle(message);
});
}
}

At this point, the Shenyu Mcp Plugin service invocation source code analysis is complete.

Process flow overview

3. Tool Invocation

If the client sends a message to invoke a tool, the session will use the tool invocation handler to execute the tool’s call() method.
From service registration, we know the tool call actually runs the call() method of ShenyuToolCallback.

Therefore, the tool invocation executes the following:

  • call() method mainly does:
  1. Gets session id
  2. Gets requestTemplate, the extra configuration provided by Shenyu
  3. Gets the previously stored Shenyu plugin chain and passes the tool call info to the chain for continued execution
  4. Asynchronously waits for the tool response

After the plugin chain completes, the tool call request is actually sent to the service hosting the tool.

public class ShenyuToolCallback implements ToolCallback {
@NonNull
@Override
public String call(@NonNull final String input, final ToolContext toolContext) {
// Extract sessionId from MCP request
final McpSyncServerExchange mcpExchange = extractMcpExchange(toolContext);
final String sessionId = extractSessionId(mcpExchange);
// Extract requestTemplate info
final String configStr = extractRequestConfig(shenyuTool);

// Get the previously stored plugin chain by sessionId
final ServerWebExchange originExchange = getOriginExchange(sessionId);
final ShenyuPluginChain chain = getPluginChain(originExchange);

// Execute the tool call
return executeToolCall(originExchange, chain, sessionId, configStr, input);
}

private String executeToolCall(final ServerWebExchange originExchange,
final ShenyuPluginChain chain,
final String sessionId,
final String configStr,
final String input) {

final CompletableFuture<String> responseFuture = new CompletableFuture<>();
final ServerWebExchange decoratedExchange = buildDecoratedExchange(
originExchange, responseFuture, sessionId, configStr, input);
// Execute plugin chain, call the actual tool
chain.execute(decoratedExchange)
.subscribe();

// Wait for response
final String result = responseFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
return result;
}
}

This concludes the Shenyu MCP Plugin tool invocation analysis.


4. Summary

This article analyzed the source code from Mcp service registration, through Mcp plugin service invocation, to tool invocation.
The McpServer plugin makes Shenyu a powerful and centralized McpServer.