Skip to main content

3 posts tagged with "plugin"

View All Tags

Extension plugin loading logic

· 8 min read
hql0312 Coder

This article is based on the source code analysis of version 'shenyu-2.6.1'

Content

Shenyu provides a mechanism to customize its own plugins or modify existing plugins, which is implemented internally through the configuration of extPlugin. It needs to meet the following two points:

  1. Implement interface ShenyuPlugin or PluginDataHandler.
  2. After packaging the implemented package, place it in the corresponding path of 'shenyu. extPlugin. path'.

Entry#

The class that truly implements this logic is' ShenyuLoaderService '. Now let's take a look at how this class handles it.

    public ShenyuLoaderService(final ShenyuWebHandler webHandler, final CommonPluginDataSubscriber subscriber, final ShenyuConfig shenyuConfig) {        // Information subscription for plugin information        this.subscriber = subscriber;        // The WebHandler encapsulated by Shenyu contains all the plugin logic        this.webHandler = webHandler;        // configuration information        this.shenyuConfig = shenyuConfig;        // The configuration information of the extension plugin, such as path, whether it is enabled, how many threads are enabled to process, and the frequency of loading checks        ExtPlugin config = shenyuConfig.getExtPlugin();        // If enabled, create a scheduled task to check and load        if (config.getEnabled()) {            // Create a scheduled task with a specified thread name            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(config.getThreads(), ShenyuThreadFactory.create("plugin-ext-loader", true));            // Create a task to be executed at a fixed frequency, with a default time of 30 seconds and execution every 300 seconds            executor.scheduleAtFixedRate(() -> loadExtOrUploadPlugins(null), config.getScheduleDelay(), config.getScheduleTime(), TimeUnit.SECONDS);        }    }    

This class has the following properties:

WebHandler: This class is the entry point for shenyu to process requests, referencing all plugin data. After the extension plugin is loaded, it needs to be updated.

Subscriber: This class is the entry point for the subscription of plugins, referencing the subscription processing classes of all plugins. After the extension configuration is loaded, synchronous updates are also required.

Executor: A scheduled task will be created inside' ShenyuLoaderService 'to periodically scan and load jar packages under the specified path, facilitating the loading of extended plugins and achieving dynamic discovery By default, it will scan every 300 seconds after 30 seconds of startup.

Meanwhile, the decision to enable extension plugin functionality can be made through the configuration of shenyu. extPlugin. enabled.

The above configurations can be adjusted in the configuration file:

shenyu:  extPlugin:    path:   # Storage directory for extension plugins    enabled: true # Is the extension function enabled    threads: 1 # Number of threads loaded by scanning    scheduleTime: 300 # The frequency of task execution    scheduleDelay: 30 # How long after the task starts to execute

Next, let's take a look at the loading logic:

   public void loadExtOrUploadPlugins(final PluginData uploadedJarResource) {        try {            List<ShenyuLoaderResult> plugins = new ArrayList<>();            // Obtain the holding object of ShenyuPluginClassloader            ShenyuPluginClassloaderHolder singleton = ShenyuPluginClassloaderHolder.getSingleton();            if (Objects.isNull(uploadedJarResource)) {                // If the parameter is empty, load all jar packages from the extended directory                // PluginJar: Data containing the ShenyuPlugin interface and PluginDataHandler interface                List<PluginJarParser.PluginJar> uploadPluginJars = ShenyuExtPathPluginJarLoader.loadExtendPlugins(shenyuConfig.getExtPlugin().getPath());                // Traverse all pending plugins                for (PluginJarParser.PluginJar extPath : uploadPluginJars) {                    LOG.info("shenyu extPlugin find new {} to load", extPath.getAbsolutePath());                    // Use the loader of the extension plugin to load the specified plugin, facilitating subsequent loading and unloading                    ShenyuPluginClassLoader extPathClassLoader = singleton.createPluginClassLoader(extPath);                    // Using ShenyuPluginClassLoader for loading                    // The main logic is to determine whether to implement ShenyuPlugin interface, PluginDataHandler interface, or identify annotations such as @ Component \ @ Service. If so, register as SpringBean                    // Construct ShenyuLoaderResult object                    plugins.addAll(extPathClassLoader.loadUploadedJarPlugins());                }            } else {                // Load the specified jar, with the same logic as loading all                PluginJarParser.PluginJar pluginJar = PluginJarParser.parseJar(Base64.getDecoder().decode(uploadedJarResource.getPluginJar()));                LOG.info("shenyu upload plugin jar find new {} to load", pluginJar.getJarKey());                ShenyuPluginClassLoader uploadPluginClassLoader = singleton.createPluginClassLoader(pluginJar);                plugins.addAll(uploadPluginClassLoader.loadUploadedJarPlugins());            }            // Add the extended plugins to the plugin list of ShenyuWebHandler, and subsequent requests will go through the added plugin content            loaderPlugins(plugins);        } catch (Exception e) {            LOG.error("shenyu plugins load has error ", e);        }    }

The logic processed by this method:

  1. Check if the parameter uploadedJarResource has a value. If not, all will be loaded. Otherwise, load the specified resource jar package for processing.

  2. Retrieve the specified jar package from shenyu. extPlugin. path and encapsulate it as a PluginJar object, which contains the following information about the jar package:

    • version: version information

    • groupId: The groupId of the package

    • artifactId: The artifactId of the package

    • absolutePath: Absolute path

    • clazzMap: Bytecode corresponding to class

    • resourceMap: Bytecode of jar package

  3. Create a corresponding ClassLoader using ShenyuPluginClassloaderHolder, with the corresponding class being 'ShenyuPluginClassLoader', and load the corresponding class accordingly.

    • Call ShenyuPluginClassLoader. loadUploadedJarPlugins to load the corresponding class and register it as a Spring Bean, which can be managed using the Spring container
  4. Call the loaderPlugins method to update the extended plugin to'webHandler and subscriber.

Plugin Registration#

For the content in the provided jar package, the loader will only handle classes of the specified interface type, and the implementation logic is in the ShenyuPluginClassLoader.loadUploadedJarPlugins() method.

public List<ShenyuLoaderResult> loadUploadedJarPlugins() {        List<ShenyuLoaderResult> results = new ArrayList<>();        // All class mapping relationships        Set<String> names = pluginJar.getClazzMap().keySet();        // Traverse all classes        names.forEach(className -> {            Object instance;            try {                // Try creating objects and, if possible, add them to the Spring container                instance = getOrCreateSpringBean(className);                if (Objects.nonNull(instance)) {                    // Building the ShenyuLoaderResult object                    results.add(buildResult(instance));                    LOG.info("The class successfully loaded into a upload-Jar-plugin {} is registered as a spring bean", className);                }            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {                LOG.warn("Registering upload-Jar-plugins succeeds spring bean fails:{}", className, e);            }        });        return results;    }

This method is responsible for building all eligible objects and encapsulating them into a ShenyuLoaderResult object. This object is encapsulated for the created object and will be processed in the method buildResult().

    private ShenyuLoaderResult buildResult(final Object instance) {        ShenyuLoaderResult result = new ShenyuLoaderResult();        // Does the created object implement ShenyuPlugin        if (instance instanceof ShenyuPlugin) {            result.setShenyuPlugin((ShenyuPlugin) instance);            // Does the created object implement PluginDataHandler        } else if (instance instanceof PluginDataHandler) {            result.setPluginDataHandler((PluginDataHandler) instance);        }        return result;    }

Simultaneously enter the method getOrCreatSpringBean() for further analysis:

    private <T> T getOrCreateSpringBean(final String className) throws ClassNotFoundException, IllegalAccessException, InstantiationException {        // Confirm if it has been registered. If so, do not process it and return directly        if (SpringBeanUtils.getInstance().existBean(className)) {            return SpringBeanUtils.getInstance().getBeanByClassName(className);        }        lock.lock();        try {            // Double check,            T inst = SpringBeanUtils.getInstance().getBeanByClassName(className);            if (Objects.isNull(inst)) {                // Using ShenyuPluginClassLoader to load classes                Class<?> clazz = Class.forName(className, false, this);                //Exclude ShenyuPlugin subclass and PluginDataHandler subclass                // without adding @Component @Service annotation                // Confirm if it is a subclass of ShenyuPlugin or PluginDataHandler                boolean next = ShenyuPlugin.class.isAssignableFrom(clazz)                        || PluginDataHandler.class.isAssignableFrom(clazz);                if (!next) {                    // If not, confirm if @ Component and @ Service annotations are identified                    Annotation[] annotations = clazz.getAnnotations();                    next = Arrays.stream(annotations).anyMatch(e -> e.annotationType().equals(Component.class)                            || e.annotationType().equals(Service.class));                }                if (next) {                    // If the above content is met, register the bean                    GenericBeanDefinition beanDefinition = new GenericBeanDefinition();                    beanDefinition.setBeanClassName(className);                    beanDefinition.setAutowireCandidate(true);                    beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);                    // Registering beans                    String beanName = SpringBeanUtils.getInstance().registerBean(beanDefinition, this);                    // create object                    inst = SpringBeanUtils.getInstance().getBeanByClassName(beanName);                }            }            return inst;        } finally {            lock.unlock();        }    }

The logic is roughly as follows:

  1. Check if the interface ShenyuPlugin or PluginDataHandler has been implemented. If not, check if @Component or @Service has been identified`.
  2. If the condition of 1 is met, register the object in the Spring container and return the created object.

Sync Data#

After the plugin registration is successful, the plugin is only instantiated, but it will not take effect yet because it has not been added to Shenyu's plugin chain. The synchronization logic is implemented by the loaderPlugins() method.

    private void loaderPlugins(final List<ShenyuLoaderResult> results) {        if (CollectionUtils.isEmpty(results)) {            return;        }        // Get all objects that implement the interface ShenyuPlugin        List<ShenyuPlugin> shenyuExtendPlugins = results.stream().map(ShenyuLoaderResult::getShenyuPlugin).filter(Objects::nonNull).collect(Collectors.toList());        // Synchronize updating plugins in webHandler        webHandler.putExtPlugins(shenyuExtendPlugins);        // Get all objects that implement the interface PluginDataHandler        List<PluginDataHandler> handlers = results.stream().map(ShenyuLoaderResult::getPluginDataHandler).filter(Objects::nonNull).collect(Collectors.toList());        // Synchronize updating handlers in subscriber        subscriber.putExtendPluginDataHandler(handlers);
    }

The logic of this method processes two data points:

  1. Synchronize the data that implements the ShenyuPlugin interface to the plugins list of webHandler.
    public void putExtPlugins(final List<ShenyuPlugin> extPlugins) {        if (CollectionUtils.isEmpty(extPlugins)) {            return;        }        // Filter out newly added plugins        final List<ShenyuPlugin> shenyuAddPlugins = extPlugins.stream()                .filter(e -> plugins.stream().noneMatch(plugin -> plugin.named().equals(e.named())))                .collect(Collectors.toList());        // Filter out updated plugins and determine if they have the same name as the old one, then it is an update        final List<ShenyuPlugin> shenyuUpdatePlugins = extPlugins.stream()                .filter(e -> plugins.stream().anyMatch(plugin -> plugin.named().equals(e.named())))                .collect(Collectors.toList());        // If there is no data, skip        if (CollectionUtils.isEmpty(shenyuAddPlugins) && CollectionUtils.isEmpty(shenyuUpdatePlugins)) {            return;        }        // Copy old data        // copy new list        List<ShenyuPlugin> newPluginList = new ArrayList<>(plugins);        // Add new plugin data        // Add extend plugin from pluginData or shenyu ext-lib        this.sourcePlugins.addAll(shenyuAddPlugins);        // Add new data        if (CollectionUtils.isNotEmpty(shenyuAddPlugins)) {            shenyuAddPlugins.forEach(plugin -> LOG.info("shenyu auto add extends plugins:{}", plugin.named()));            newPluginList.addAll(shenyuAddPlugins);        }        // Modify updated data        if (CollectionUtils.isNotEmpty(shenyuUpdatePlugins)) {            shenyuUpdatePlugins.forEach(plugin -> LOG.info("shenyu auto update extends plugins:{}", plugin.named()));            for (ShenyuPlugin updatePlugin : shenyuUpdatePlugins) {                for (int i = 0; i < newPluginList.size(); i++) {                    if (newPluginList.get(i).named().equals(updatePlugin.named())) {                        newPluginList.set(i, updatePlugin);                    }                }                for (int i = 0; i < this.sourcePlugins.size(); i++) {                    if (this.sourcePlugins.get(i).named().equals(updatePlugin.named())) {                        this.sourcePlugins.set(i, updatePlugin);                    }                }            }        }        // REORDER        plugins = sortPlugins(newPluginList);    }
  1. Synchronize the data that implements the PluginDataHandler interface to the handlers list of the subscriber.
    public void putExtendPluginDataHandler(final List<PluginDataHandler> handlers) {        if (CollectionUtils.isEmpty(handlers)) {            return;        }        // Traverse all data        for (PluginDataHandler handler : handlers) {            String pluginNamed = handler.pluginNamed();            // Update existing PluginDataHandler list            MapUtils.computeIfAbsent(handlerMap, pluginNamed, name -> {                LOG.info("shenyu auto add extends plugin data handler name is :{}", pluginNamed);                return handler;            });        }    }

At this point, the analysis of the loading process of the extension plugin is completed.

Code Analysis For Divide Plugin

· 21 min read
Apache ShenYu Committer

The ShenYu gateway uses the divide plugin to handle http requests. You can see the official documentation Quick start with Http to learn how to use this plugin.

This article is based on shenyu-2.4.3 version for source code analysis, please refer to Http Proxy for the introduction of the official website.

1. Register Service#

1.1 Declaration of registration interface#

Use the annotation @ShenyuSpringMvcClient to register the service to the gateway. The simple demo is as follows.

@RestController@RequestMapping("/order")@ShenyuSpringMvcClient(path = "/order")  // APIpublic class OrderController {    @GetMapping("/findById")    @ShenyuSpringMvcClient(path = "/findById", desc = "Find by id") // method    public OrderDTO findById(@RequestParam("id") final String id) {        return build(id, "hello world findById");    }}

define annotation:


@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})public @interface ShenyuSpringMvcClient {        //path    String path() default "";        //rule name    String ruleName() default "";       //desc info    String desc() default "";
    //is enabled    boolean enabled() default true;        //register MetaData    boolean registerMetaData() default false;}

1.2 Scan annotation#

Annotation scanning is done through SpringMvcClientBeanPostProcessor, which implements the BeanPostProcessor interface and is a post-processor provided by Spring.

During constructor instantiation.

  • Read the property configuration
  • Add annotations, read path information
  • Start the registry and register with shenyu-admin
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {    //...    /**     * Constructor instantiation     */    public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,                                            final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        // 1. read Properties        Properties props = clientConfig.getProps();        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);        this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH, "");        if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {            String errorMsg = "http register param must config the appName or contextPath";            LOG.error(errorMsg);            throw new ShenyuClientIllegalArgumentException(errorMsg);        }        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));        // 2. add annotation        mappingAnnotation.add(ShenyuSpringMvcClient.class);        mappingAnnotation.add(PostMapping.class);        mappingAnnotation.add(GetMapping.class);        mappingAnnotation.add(DeleteMapping.class);        mappingAnnotation.add(PutMapping.class);        mappingAnnotation.add(RequestMapping.class);        // 3. start register cneter        publisher.start(shenyuClientRegisterRepository);    }        @Override    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {       // override post process                return bean;    }    
  • SpringMvcClientBeanPostProcessor#postProcessAfterInitialization()

Rewrite post-processor logic: read annotation information, construct metadata objects and URI objects, and register them with shenyu-admin.

    @Override    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {        // 1. If the all service is registered or is not a Controller class, it is not handled        if (Boolean.TRUE.equals(isFull) || !hasAnnotation(bean.getClass(), Controller.class)) {            return bean;        }        // 2. Read the annotations on the class ShenyuSpringMvcClient        final ShenyuSpringMvcClient beanShenyuClient = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);        // 2.1 build  superPath        final String superPath = buildApiSuperPath(bean.getClass());        // 2.2 whether to register the entire class method        if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {            // build the metadata object and register it with shenyu-admin            publisher.publishEvent(buildMetaDataDTO(beanShenyuClient, pathJoin(contextPath, superPath)));            return bean;        }        // 3. read all methods        final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());        for (Method method : methods) {            // 3.1 read the annotations on the method ShenyuSpringMvcClient            ShenyuSpringMvcClient methodShenyuClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);            // If there is no annotation on the method, use the annotation on the class            methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;            if (Objects.nonNull(methodShenyuClient)) {               // 3.2 Build path information, build metadata objects, register with shenyu-admin                publisher.publishEvent(buildMetaDataDTO(methodShenyuClient, buildApiPath(method, superPath)));            }        }                return bean;    }
    1. If you are registering the whole service or not Controller class, do not handle it
    1. read the annotation on the class ShenyuSpringMvcClient, if the whole class is registered, build the metadata object here and register it with shenyu-admin.
    1. Annotation on the handler method ShenyuSpringMvcClient, build path information for the specific method, build the metadata object and then register it with shenyu-admin

There are two methods here that take path and need special instructions.

  • buildApiSuperPath()

Construct SuperPath: first take the path property from the annotation ShenyuSpringMvcClient on the class, if not, take the path information from the RequestMapping annotation on the current class.

    private String buildApiSuperPath(@NonNull final Class<?> method) {        // First take the path property from the annotation ShenyuSpringMvcClient on the class        ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);        if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {            return shenyuSpringMvcClient.path();        }        // Take the path information from the RequestMapping annotation of the current class        RequestMapping requestMapping = AnnotationUtils.findAnnotation(method, RequestMapping.class);        if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {            return requestMapping.path()[0];        }        return "";    }
  • buildApiPath()

Build path: first read the annotation ShenyuSpringMvcClient on the method and build it if it exists; otherwise get the path information from other annotations on the method; complete path = contextPath(context information) + superPath(class information) + methodPath(method information).

    private String buildApiPath(@NonNull final Method method, @NonNull final String superPath) {        // 1. Read the annotation ShenyuSpringMvcClient on the method        ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);        // 1.1 If path exists, build        if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {            //1.2  path = contextPath+superPath+methodPath            return pathJoin(contextPath, superPath, shenyuSpringMvcClient.path());        }        // 2. Get path information from other annotations on the method        final String path = getPathByMethod(method);        if (StringUtils.isNotBlank(path)) {             // 2.1 path = contextPath+superPath+methodPath            return pathJoin(contextPath, superPath, path);        }        return pathJoin(contextPath, superPath);    }
  • getPathByMethod()

Get path information from other annotations on the method, other annotations include.

  • ShenyuSpringMvcClient
  • PostMapping
  • GetMapping
  • DeleteMapping
  • PutMapping
  • RequestMapping

    private String getPathByMethod(@NonNull final Method method) {        // Iterate through interface annotations to get path information        for (Class<? extends Annotation> mapping : mappingAnnotation) {            final String pathByAnnotation = getPathByAnnotation(AnnotationUtils.findAnnotation(method, mapping), pathAttributeNames);            if (StringUtils.isNotBlank(pathByAnnotation)) {                return pathByAnnotation;            }        }        return null;    }

After the scanning annotation is finished, construct the metadata object and send the object to shenyu-admin to complete the registration.

  • Metadata

Includes the rule information of the currently registered method: contextPath, appName, registration path, description information, registration type, whether it is enabled, rule name and whether to register metadata.

 private MetaDataRegisterDTO buildMetaDataDTO(@NonNull final ShenyuSpringMvcClient shenyuSpringMvcClient, final String path) {        return MetaDataRegisterDTO.builder()                .contextPath(contextPath) // contextPath                .appName(appName) // appName                .path(path) // Registered path, used when gateway rules match                .pathDesc(shenyuSpringMvcClient.desc()) // desc info                .rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, http type when default                .enabled(shenyuSpringMvcClient.enabled()) // is enabled?                .ruleName(StringUtils.defaultIfBlank(shenyuSpringMvcClient.ruleName(), path))//rule name                .registerMetaData(shenyuSpringMvcClient.registerMetaData()) // whether to register metadata information                .build();    }

The specific registration logic is implemented by the registration center, which has been analyzed in the previous articles and will not be analyzed in depth here.

1.3 Register URI Data#

ContextRegisterListener is responsible for registering the client's URI information to shenyu-admin, it implements the ApplicationListener interface, when the context refresh event ContextRefreshedEvent occurs, the onApplicationEvent() method is executed to implement the registration logic.


public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent>, BeanFactoryAware {    //......        /**     * Constructor instantiation     */    public ContextRegisterListener(final PropertiesConfig clientConfig) {        // read Properties        final Properties props = clientConfig.getProps();        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));        this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);        if (Boolean.TRUE.equals(isFull)) {            if (StringUtils.isBlank(contextPath)) {                final String errorMsg = "http register param must config the contextPath";                LOG.error(errorMsg);                throw new ShenyuClientIllegalArgumentException(errorMsg);            }        }        this.port = Integer.parseInt(Optional.ofNullable(props.getProperty(ShenyuClientConstants.PORT)).orElseGet(() -> "-1"));        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);        this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);        this.host = props.getProperty(ShenyuClientConstants.HOST);    }
    @Override    public void setBeanFactory(final BeanFactory beanFactory) throws BeansException {        this.beanFactory = beanFactory;    }
    // Execute application events    @Override    public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {          // The method is guaranteed to be executed once        if (!registered.compareAndSet(false, true)) {            return;        }        // 1. If you are registering for the entire service        if (Boolean.TRUE.equals(isFull)) {            // Build metadata and register            publisher.publishEvent(buildMetaDataDTO());        }        try {            // get port            final int mergedPort = port <= 0 ? PortUtils.findPort(beanFactory) : port;            // 2. Constructing URI data and registering            publisher.publishEvent(buildURIRegisterDTO(mergedPort));        } catch (ShenyuException e) {            throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");        }    }
    // build URI data    private URIRegisterDTO buildURIRegisterDTO(final int port) {        return URIRegisterDTO.builder()            .contextPath(this.contextPath) // contextPath            .appName(appName) // appName            .protocol(protocol) // protocol            .host(IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host)) //host            .port(port) // port            .rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, default registration http type            .build();    }
    // build MetaData    private MetaDataRegisterDTO buildMetaDataDTO() {        return MetaDataRegisterDTO.builder()            .contextPath(contextPath)            .appName(appName)            .path(contextPath)            .rpcType(RpcTypeEnum.HTTP.getName())            .enabled(true)            .ruleName(contextPath)            .build();    }}

1.4 Handle registration information#

The metadata and URI data registered by the client through the registry are processed in shenyu-admin, which is responsible for storing to the database and synchronizing to the shenyu gateway. The client registration processing logic of Divide plugin is in ShenyuClientRegisterDivideServiceImpl. The inheritance relationship is as follows.

  • ShenyuClientRegisterService: client registration service, top-level interface.
  • FallbackShenyuClientRegisterService: registration failure, provides retry operation.
  • AbstractShenyuClientRegisterServiceImpl: abstract class, implements part of the public registration logic;
  • AbstractContextPathRegisterService: abstract class, responsible for registering ContextPath.
  • ShenyuClientRegisterDivideServiceImpl: implementation of the Divide plug-in registration.
1.4.1 Register Service#
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

The metadata MetaDataRegisterDTO object registered by the client through the registry is picked up and dropped in the register() method of shenyu-admin.

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. register selector        String selectorHandler = selectorHandler(dto);        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);        //2. register rule        String ruleHandler = ruleHandler();        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        ruleService.registerDefault(ruleDTO);        //3. register metadat        registerMetadata(dto);        //4. register ContextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
1.4.1.1 Register Selector#
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

Build contextPath, find if the selector information exists, if it does, return id; if it doesn't, create the default selector information.

    @Override    public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {        // build contextPath        String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());        // Find if selector information exists by name        SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);        if (Objects.isNull(selectorDO)) {            // Create a default selector message if it does not exist            return registerSelector(contextPath, pluginName, selectorHandler);        }        return selectorDO.getId();    }
  • Default Selector Information

Construct the default selector information and its conditional properties here.

   //register selector   private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {        // build selector         SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());        selectorDTO.setHandle(selectorHandler);        //register default Selector        return registerDefault(selectorDTO);    }     //build    private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {        //build default        SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);        selectorDTO.setPluginId(pluginId);         //build the conditional properties of the default selector        selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));        return selectorDTO;    }
  • Build Default Selector
private SelectorDTO buildDefaultSelectorDTO(final String name) {    return SelectorDTO.builder()            .name(name) // name            .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // default CUSTOM_FLOW            .matchMode(MatchModeEnum.AND.getCode()) //default  AND            .enabled(Boolean.TRUE)  //default TRUE            .loged(Boolean.TRUE)  //default TRUE            .continued(Boolean.TRUE) //default TRUE            .sort(1) //default 1            .build();}

  • Build default selector conditional properties
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {    SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();    selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI    selectorConditionDTO.setParamName("/");    selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default match    selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // default /contextPath/**    return Collections.singletonList(selectorConditionDTO);}
  • Register default selector
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) {    //selector info    SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);    //selector condition  info    List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();    if (StringUtils.isEmpty(selectorDTO.getId())) {        // insert selector information into the database        selectorMapper.insertSelective(selectorDO);          // insert selector condition information into the database        selectorConditionDTOs.forEach(selectorConditionDTO -> {            selectorConditionDTO.setSelectorId(selectorDO.getId());                        selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));        });    }    // Publish synchronization events to synchronize selection information and its conditional attributes to the gateway    publishEvent(selectorDO, selectorConditionDTOs);    return selectorDO.getId();}
1.4.1.2 Register Rule#

In the second step of registering the service, start building the default rules and then register the rules.

@Override    public String register(final MetaDataRegisterDTO dto) {        //1. register selector        //......                //2. register rule        // default rule handle        String ruleHandler = ruleHandler();        // build default rule        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        // register rule        ruleService.registerDefault(ruleDTO);                //3. register Metadata        //......                //4. register ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • default rule handle
    @Override    protected String ruleHandler() {        // default rule handle        return new DivideRuleHandle().toJson();    }

Divide plugin default rule handle.


public class DivideRuleHandle implements RuleHandle {
    /**     * load balance: default RANDOM     */    private String loadBalance = LoadBalanceEnum.RANDOM.getName();
    /**     * retry strategy: default CURRENT     */    private String retryStrategy = RetryEnum.CURRENT.getName();
    /**     * retry: default 3     */    private int retry = 3;
    /**     *  retry: default 3000     */    private long timeout = Constants.TIME_OUT;
    /**     *  retry: default  10240 byte     */    private long headerMaxSize = Constants.HEADER_MAX_SIZE;
    /**     *  retry: default 102400 byte     */    private long requestMaxSize = Constants.REQUEST_MAX_SIZE;}
  • build default rule info
  // build default rule info    private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {        return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());    }   //  build default rule info    private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {        RuleDTO ruleDTO = RuleDTO.builder()                .selectorId(selectorId) //selector Id                .name(ruleName) //rule Name                .matchMode(MatchModeEnum.AND.getCode()) // default and                .enabled(Boolean.TRUE) // default TRUE                .loged(Boolean.TRUE) //default TRUE                .sort(1) //default 1                .handle(ruleHandler)                .build();        RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()                .paramType(ParamTypeEnum.URI.getName()) // default URI                .paramName("/")                .paramValue(path) // path                .build();        if (path.indexOf("*") > 1) {            ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //if the path conatins *, default match        } else {            ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // default =         }        ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));        return ruleDTO;    }
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

Registration rules: insert records to the database and publish events to the gateway for data synchronization.


    @Override    public String registerDefault(final RuleDTO ruleDTO) {        RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());        if (Objects.nonNull(exist)) {            return "";        }
        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);        List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();        if (StringUtils.isEmpty(ruleDTO.getId())) {            // insert rule into database             ruleMapper.insertSelective(ruleDO);            //insert rule condition into database             ruleConditions.forEach(ruleConditionDTO -> {                ruleConditionDTO.setRuleId(ruleDO.getId());                           ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));            });        }        // Publish events to the gateway for data synchronization        publishEvent(ruleDO, ruleConditions);        return ruleDO.getId();    }
1.4.1.3 Register Metadata#
   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. register selector        //......                //2. register rule        //......                //3. register metadata        registerMetadata(dto);                //4. register ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDivideServiceImpl#registerMetadata()

Insert or update metadata and then publish sync events to the gateway.


    @Override    protected void registerMetadata(final MetaDataRegisterDTO dto) {        if (dto.isRegisterMetaData()) {             MetaDataService metaDataService = getMetaDataService();            MetaDataDO exist = metaDataService.findByPath(dto.getPath());            // save or update MetaData            metaDataService.saveOrUpdateMetaData(exist, dto);        }    }
    @Override    public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {        DataEventTypeEnum eventType;        //  DTO->DO        MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);        // insert        if (Objects.isNull(exist)) {            Timestamp currentTime = new Timestamp(System.currentTimeMillis());            metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());            metaDataDO.setDateCreated(currentTime);            metaDataDO.setDateUpdated(currentTime);            metaDataMapper.insert(metaDataDO);            eventType = DataEventTypeEnum.CREATE;        } else {            // update            metaDataDO.setId(exist.getId());            metaDataMapper.update(metaDataDO);            eventType = DataEventTypeEnum.UPDATE;        }        // publish event to  gateway        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,                Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));    }
1.4.1.4 Register ContextPath#
   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. register selector        //......                //2. register rule        //......                //3. register metadata        //......                //4. register ContextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
  • org.apache.shenyu.admin.service.register.AbstractContextPathRegisterService#registerContextPath()
    @Override    public void registerContextPath(final MetaDataRegisterDTO dto) {        // set contextPath for selector        String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");        ContextMappingRuleHandle handle = new ContextMappingRuleHandle();        handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));        // set contextPath for rule        getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));    }
1.4.2 Register URI#
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

The server side receives the URI information registered by the client and processes it.

    @Override    public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {        String result;        String key = key(selectorName);        try {            this.removeFallBack(key);            // register URI            result = this.doRegisterURI(selectorName, uriList);            logger.info("Register success: {},{}", selectorName, uriList);        } catch (Exception ex) {            logger.warn("Register exception: cause:{}", ex.getMessage());            result = "";            // Retry after registration failure            this.addFallback(key, new FallbackHolder(selectorName, uriList));        }        return result;    }
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

Get a valid URI from the URI registered by the client, update the corresponding selector handle property, and send a selector update event to the gateway.

@Override    public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {        //check        if (CollectionUtils.isEmpty(uriList)) {            return "";        }        //get selector         SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        if (Objects.isNull(selectorDO)) {            throw new ShenyuException("doRegister Failed to execute,wait to retry.");        }        // gte valid URI        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());        // build handle        String handler = buildHandle(validUriList, selectorDO);        if (handler != null) {            selectorDO.setHandle(handler);            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));            selectorData.setHandle(handler);            // Update the handle property of the selector to the database            selectorService.updateSelective(selectorDO);            // Send selector update events to the gateway            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));        }        return ShenyuResultMessage.SUCCESS;    }

The source code analysis on service registration is completed as well as the analysis flow chart is as follows.

The next step is to analyze how the divide plugin initiates a call to the http service based on this information.

2. Call Http Service#

The divide plugin is the core processing plugin used by the gateway to handle http protocol requests.

Take the case provided on the official website Quick start with Http as an example, a direct connection request is as follows.

GET http://localhost:8189/order/findById?id=100Accept: application/json

After proxying through the ShenYu gateway, the request is as follows.

GET http://localhost:9195/http/order/findById?id=100Accept: application/json

The services proxied by the ShenYu gateway are still able to request the previous services, where the divide plugin comes into play. The class inheritance relationship is as follows.

  • ShenyuPlugin: top-level interface, defining interface methods.
  • AbstractShenyuPlugin: abstract class that implements the common logic of the pluin.
  • DividePlugin: Divide pluin.

2.1 Accept Request#

After passing the ShenYu gateway proxy, the request entry is ShenyuWebHandler, which implements the org.springframework.web.server.WebHandler interface.

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {    //......        /**     * hanlde web reuest     */    @Override    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {       // execute plugin chain        Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);        if (scheduled) {            return execute.subscribeOn(scheduler);        }        return execute;    }        private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
        private int index;
        private final List<ShenyuPlugin> plugins;
        /**         * Instantiating the default plugin chain         */        DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {            this.plugins = plugins;        }
        /**         * Execute each plugin         */        @Override        public Mono<Void> execute(final ServerWebExchange exchange) {            return Mono.defer(() -> {                if (this.index < plugins.size()) {                    // get current plugin                     ShenyuPlugin plugin = plugins.get(this.index++);                    // is skip ?                    boolean skip = plugin.skip(exchange);                    if (skip) {                        // If skipped, execute the next                        return this.execute(exchange);                    }                    // execute current plugin                     return plugin.execute(exchange, this);                }                return Mono.empty();            });        }    }}

2.2 Matching rule#

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

Execute the matching logic for selectors and rules in the execute() method.

  • Matching selectors.
  • Matching rules.
  • Execute the plugin.
@Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        String pluginName = named();        PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);        if (pluginData != null && pluginData.getEnabled()) {            // selector             final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);            if (CollectionUtils.isEmpty(selectors)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            // match selector            SelectorData selectorData = matchSelector(exchange, selectors);            if (Objects.isNull(selectorData)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            selectorLog(selectorData, pluginName);            // rule             List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());            if (CollectionUtils.isEmpty(rules)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            // match rule             RuleData rule;            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {                //get last                rule = rules.get(rules.size() - 1);            } else {                rule = matchRule(exchange, rules);            }            if (Objects.isNull(rule)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            ruleLog(rule, pluginName);            // execute             return doExecute(exchange, chain, selectorData, rule);        }        return chain.execute(exchange);    }

2.3 Execute Divide Plugin#

  • org.apache.shenyu.plugin.divide.DividePlugin#doExecute()

Execute the specific logic of the divide plugin in the doExecute() method.

  • Checks the header size.
  • Checking the request size.
  • Obtaining the list of services.
  • implementing load balancing.
  • Set request url, timeout time, retry policy.
@Override    protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {        // shenyu Context        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // Get the handle property of the rule        DivideRuleHandle ruleHandle = DividePluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));        long headerSize = 0;        // check header size        for (List<String> multiHeader : exchange.getRequest().getHeaders().values()) {            for (String value : multiHeader) {                headerSize += value.getBytes(StandardCharsets.UTF_8).length;            }        }        if (headerSize > ruleHandle.getHeaderMaxSize()) {            LOG.error("request header is too large");            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE, null);            return WebFluxResultUtils.result(exchange, error);        }                // check request size        if (exchange.getRequest().getHeaders().getContentLength() > ruleHandle.getRequestMaxSize()) {            LOG.error("request entity is too large");            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE, null);            return WebFluxResultUtils.result(exchange, error);        }        // upstream list        List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());        if (CollectionUtils.isEmpty(upstreamList)) {            LOG.error("divide upstream configuration error: {}", rule);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);            return WebFluxResultUtils.result(exchange, error);        }        // request ip        String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();        // load balance        Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip);        if (Objects.isNull(upstream)) {            LOG.error("divide has no upstream");            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);            return WebFluxResultUtils.result(exchange, error);        }        // set url        String domain = upstream.buildDomain();        exchange.getAttributes().put(Constants.HTTP_DOMAIN, domain);        // set timeout        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());        // set retry         exchange.getAttributes().put(Constants.RETRY_STRATEGY, ruleHandle.getRetryStrategy());        exchange.getAttributes().put(Constants.LOAD_BALANCE, ruleHandle.getLoadBalance());        exchange.getAttributes().put(Constants.DIVIDE_SELECTOR_ID, selector.getId());        return chain.execute(exchange);    }

2.4 Do Request#

By default, the WebClientPlugin initiates a call request to the http service with the following class inheritance relationship.

  • ShenyuPlugin: top-level plug-in, defining plug-in methods.
  • AbstractHttpClientPlugin: abstract class that implements the public logic of request invocation.
  • WebClientPlugin: initiating requests through WebClient.
  • NettyHttpClientPlugin: initiating requests through Netty.

Initiate the request call.

  • org.apache.shenyu.plugin.httpclient.AbstractHttpClientPlugin#execute()

Initiate the request call in the execute() method.

  • Get the specified timeout, number of retries
  • Initiate the request
  • Retry after failure according to the specified retry policy

public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class);
    @Override    public final Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        // shenyu Context        final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // uri        final URI uri = exchange.getAttribute(Constants.HTTP_URI);        if (Objects.isNull(uri)) {            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_URL, null);            return WebFluxResultUtils.result(exchange, error);        }        // get time out        final long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);        final Duration duration = Duration.ofMillis(timeout);        // get retry times        final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);        // get retry strategy        final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName);        LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy is {}", uri.toASCIIString(), retryTimes, retryStrategy);        // build header        final HttpHeaders httpHeaders = buildHttpHeaders(exchange);        // do request        final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethodValue(), uri, httpHeaders, exchange.getRequest().getBody())                .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))                .doOnError(e -> LOG.error(e.getMessage(), e));                // Retry Policy CURRENT, retries the current service.        if (RetryEnum.CURRENT.getName().equals(retryStrategy)) {            //old version of DividePlugin and SpringCloudPlugin will run on this            return response.retryWhen(Retry.anyOf(TimeoutException.class, ConnectTimeoutException.class, ReadTimeoutException.class, IllegalStateException.class)                    .retryMax(retryTimes)                    .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))                    .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))                    .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));        }                // Retry for other services        // Exclude services that have already been called        final Set<URI> exclude = Sets.newHashSet(uri);        // resend        return resend(response, exchange, duration, httpHeaders, exclude, retryTimes)                .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))                .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));    }
    private Mono<R> resend(final Mono<R> clientResponse,                           final ServerWebExchange exchange,                           final Duration duration,                           final HttpHeaders httpHeaders,                           final Set<URI> exclude,                           final int retryTimes) {        Mono<R> result = clientResponse;        // Retry according to the specified number of retries        for (int i = 0; i < retryTimes; i++) {            result = resend(result, exchange, duration, httpHeaders, exclude);        }        return result;    }
    private Mono<R> resend(final Mono<R> response,                           final ServerWebExchange exchange,                           final Duration duration,                           final HttpHeaders httpHeaders,                           final Set<URI> exclude) {        return response.onErrorResume(th -> {            final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID);            final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE);            //Check available services            final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId)                    .stream().filter(data -> {                        final String trimUri = data.getUrl().trim();                        for (URI needToExclude : exclude) {                            // exclude already called                            if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) {                                return false;                            }                        }                        return true;                    }).collect(Collectors.toList());            if (CollectionUtils.isEmpty(upstreamList)) {                // no need to retry anymore                return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));            }            // requets ip            final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();            // Load Balance            final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip);            if (Objects.isNull(upstream)) {                // no need to retry anymore                return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));            }            final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain());            // Exclude uri that has already been called            exclude.add(newUri);             // Make another call            return doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, httpHeaders, exchange.getRequest().getBody())                    .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))                    .doOnError(e -> LOG.error(e.getMessage(), e));        });    }
    //......}
  • org.apache.shenyu.plugin.httpclient.WebClientPlugin#doRequest()

Initiate a real request call via webClient in the doRequest() method.


@Override    protected Mono<ClientResponse> doRequest(final ServerWebExchange exchange, final String httpMethod, final URI uri,                                             final HttpHeaders httpHeaders, final Flux<DataBuffer> body) {        return webClient.method(HttpMethod.valueOf(httpMethod)).uri(uri) // uri                .headers(headers -> headers.addAll(httpHeaders)) // header                .body(BodyInserters.fromDataBuffers(body))                .exchange() // request                .doOnSuccess(res -> {                    if (res.statusCode().is2xxSuccessful()) { // success                        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());                    } else { // error                        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());                    }                    exchange.getResponse().setStatusCode(res.statusCode());                    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);                });    }

2.5 Response Result#

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

The response results are handled by the ResponsePlugin plugin.

    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // Processing results according to rpc type        return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);    }

The processing type is determined by MessageWriter and the class inheritance relationship is as follows.

  • MessageWriter: interface, defining message processing methods.
  • NettyClientMessageWriter: processing of Netty call results.
  • RPCMessageWriter: processing the results of RPC calls.
  • WebClientMessageWriter: processing WebClient call results.

The default is to initiate http requests via WebCient.

  • org.apache.shenyu.plugin.response.strategy.WebClientMessageWriter#writeWith()

Process the response results in the writeWith() method.


    @Override    public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        return chain.execute(exchange).then(Mono.defer(() -> {            // get response            ServerHttpResponse response = exchange.getResponse();            ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);            if (Objects.isNull(clientResponse)) {                Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);                return WebFluxResultUtils.result(exchange, error);            }            //cookies and headers            response.getCookies().putAll(clientResponse.cookies());            response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());            // image, pdf or stream does not do format processing.            // Handling special response types            if (clientResponse.headers().contentType().isPresent()) {                final String media = clientResponse.headers().contentType().get().toString().toLowerCase();                if (media.matches(COMMON_BIN_MEDIA_TYPE_REGEX)) {                    return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()))                            .doOnCancel(() -> clean(exchange));                }            }            // Handling general response types            clientResponse = ResponseUtils.buildClientResponse(response, clientResponse.body(BodyExtractors.toDataBuffers()));            return clientResponse.bodyToMono(byte[].class)                    .flatMap(originData -> WebFluxResultUtils.result(exchange, originData))                    .doOnCancel(() -> clean(exchange));        }));    }

Analysis to this point, the source code analysis on Divide plugin is complete, the analysis flow chart is as follows.

3. Summary#

The source code analysis in this article starts from the http service registration to the divide plugin service calls. The divide plugin is mainly used to handle http requests. Some of the source code does not enter the in-depth analysis, such as the implementation of load balancing, service probe live, will continue to analyze in the following.

Code Analysis For Dubbo Plugin

· 22 min read
Apache ShenYu Committer

Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.

The Apache ShenYu gateway uses the dubbo plugin to make calls to the dubbo service. You can see the official documentation Dubbo Quick Start to learn how to use the plugin.

This article is based on shenyu-2.4.3 version for source code analysis, please refer to Dubbo Service Access for the introduction of the official website.

1. Service Registration#

Take the example provided on the official website shenyu-examples-dubbo. Suppose your dubbo service is defined as follows (spring-dubbo.xml).

<beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"       xsi:schemaLocation="http://www.springframework.org/schema/beans       http://www.springframework.org/schema/beans/spring-beans.xsd       http://code.alibabatech.com/schema/dubbo       https://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    <dubbo:application name="test-dubbo-service"/>    <dubbo:registry address="${dubbo.registry.address}"/>    <dubbo:protocol name="dubbo" port="20888"/>
    <dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>
</beans>

Declare the application service name, register the center address, use the dubbo protocol, declare the service interface, and the corresponding interface implementation class.

/** * DubboTestServiceImpl. */@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService {        @Override    @ShenyuDubboClient(path = "/findById", desc = "Query by Id")    public DubboTest findById(final String id) {        return new DubboTest(id, "hello world shenyu Apache, findById");    }
    //......}

In the interface implementation class, use the annotation @ShenyuDubboClient to register the service with shenyu-admin. The role of this annotation and its rationale will be analyzed later.

The configuration information in the configuration file application.yml.

server:  port: 8011  address: 0.0.0.0  servlet:    context-path: /spring:  main:    allow-bean-definition-overriding: truedubbo:  registry:    address: zookeeper://localhost:2181  # dubbo registry    shenyu:  register:    registerType: http     serverLists: http://localhost:9095     props:      username: admin       password: 123456  client:    dubbo:      props:        contextPath: /dubbo          appName: dubbo

In the configuration file, declare the registry address used by dubbo. The dubbo service registers with shenyu-admin, using the method http, and the registration address is http://localhost:9095.

See Application Client Access for more information on the use of the registration method.

1.1 Declaration of registration interface#

Use the annotation @ShenyuDubboClient to register the service to the gateway. The simple demo is as follows.

// dubbo sevice@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService {        @Override    @ShenyuDubboClient(path = "/findById", desc = "Query by Id") // need to be registered method    public DubboTest findById(final String id) {        return new DubboTest(id, "hello world shenyu Apache, findById");    }
    //......}

annotation definition:

/** * Works on classes and methods */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})@Inheritedpublic @interface ShenyuDubboClient {        //path    String path();        //rule name    String ruleName() default "";       //desc    String desc() default "";
    //enabled    boolean enabled() default true;}

1.2 Scan annotation information#

Annotation scanning is done through the ApacheDubboServiceBeanListener, which implements the ApplicationListener<ContextRefreshedEvent> interface and starts executing the event handler method when a context refresh event occurs during the Spring container startup onApplicationEvent().

During constructor instantiation.

  • Read property configuration
  • Start the thread pool
  • Start the registry for registering with shenyu-admin
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {
    // ......
    //Constructor    public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        //1.Read property configuration        Properties props = clientConfig.getProps();        String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);        String appName = props.getProperty(ShenyuClientConstants.APP_NAME);        if (StringUtils.isBlank(contextPath)) {            throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName");        }        this.contextPath = contextPath;        this.appName = appName;        this.host = props.getProperty(ShenyuClientConstants.HOST);        this.port = props.getProperty(ShenyuClientConstants.PORT);        //2.Start the thread pool        executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build());        //3.Start the registry for registering with `shenyu-admin`        publisher.start(shenyuClientRegisterRepository);    }
    /**     * Context refresh event, execute method logic     */    @Override    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {        //......    }
  • ApacheDubboServiceBeanListener#onApplicationEvent()

Rewritten method logic: read Dubbo service ServiceBean, build metadata object and URI object, and register it with shenyu-admin.

    @Override    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {        //read ServiceBean        Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class);        if (serviceBean.isEmpty()) {            return;        }        //The method is guaranteed to be executed only once        if (!registered.compareAndSet(false, true)) {            return;        }        //handle metadata         for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) {            handler(entry.getValue());        }        //handle URI        serviceBean.values().stream().findFirst().ifPresent(bean -> {            publisher.publishEvent(buildURIRegisterDTO(bean));        });    }
  • handler()

    In the handler() method, read all methods from the serviceBean, determine if there is a ShenyuDubboClient annotation on the method, build a metadata object if it exists, and register the method with shenyu-admin through the registry.

    private void handler(final ServiceBean<?> serviceBean) {        //get proxy        Object refProxy = serviceBean.getRef();        //get class        Class<?> clazz = refProxy.getClass();        if (AopUtils.isAopProxy(refProxy)) {            clazz = AopUtils.getTargetClass(refProxy);        }        //all methods        Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);        for (Method method : methods) {            //read ShenyuDubboClient annotation            ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class);            if (Objects.nonNull(shenyuDubboClient)) {                //build meatdata and registry                publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method));            }        }    }
  • buildMetaDataDTO()

    Constructs a metadata object where the necessary information for method registration is constructed and subsequently used for selector or rule matching.

    private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) {        //app name        String appName = buildAppName(serviceBean);        //path        String path = contextPath + shenyuDubboClient.path();        //desc        String desc = shenyuDubboClient.desc();        //service name        String serviceName = serviceBean.getInterface();        //rule name        String configRuleName = shenyuDubboClient.ruleName();        String ruleName = ("".equals(configRuleName)) ? path : configRuleName;        //method name         String methodName = method.getName();        //parameter Types        Class<?>[] parameterTypesClazz = method.getParameterTypes();        String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(","));        return MetaDataRegisterDTO.builder()                .appName(appName)                .serviceName(serviceName)                .methodName(methodName)                .contextPath(contextPath)                .host(buildHost())                .port(buildPort(serviceBean))                .path(path)                .ruleName(ruleName)                .pathDesc(desc)                .parameterTypes(parameterTypes)                .rpcExt(buildRpcExt(serviceBean)) //dubbo ext                .rpcType(RpcTypeEnum.DUBBO.getName())                .enabled(shenyuDubboClient.enabled())                .build();    }
  • buildRpcExt()

    dubbo ext information.

       private String buildRpcExt(final ServiceBean serviceBean) {       DubboRpcExt build = DubboRpcExt.builder()               .group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//group               .version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//version               .loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//load balance               .retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//retry               .timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//time               .sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent               .cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//cluster               .url("")               .build();       return GsonUtils.getInstance().toJson(build);   }
  • buildURIRegisterDTO()

    Construct URI objects to register information about the service itself, which can be subsequently used for service probing live.

private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) {        return URIRegisterDTO.builder()                .contextPath(this.contextPath) //context path                .appName(buildAppName(serviceBean))//app name                .rpcType(RpcTypeEnum.DUBBO.getName())//dubbo                .host(buildHost()) //host                .port(buildPort(serviceBean))//port                .build(); }

The specific registration logic is implemented by the registration center, please refer to Client Access Principles .

//To the registration center, post registration events   publisher.publishEvent();

1.3 Processing registration information#

The metadata and URI data registered by the client through the registry are processed at the shenyu-admin end, which is responsible for storing to the database and synchronizing to the shenyu gateway. The client-side registration processing logic of the Dubbo plugin is in the ShenyuClientRegisterDubboServiceImpl. The inheritance relationship is as follows.

  • ShenyuClientRegisterService: client registration service, top-level interface.
  • FallbackShenyuClientRegisterService: registration failure, provides retry operation.
  • AbstractShenyuClientRegisterServiceImpl: abstract class, implements part of the public registration logic.
  • ShenyuClientRegisterDubboServiceImpl: implementation of the Dubbo plugin registration.
1.3.1 Registration Service#
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    The metadata MetaDataRegisterDTO object registered by the client through the registry is picked up and dropped in the register() method of shenyu-admin.

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. register selector        String selectorHandler = selectorHandler(dto);        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);        //2. register rule        String ruleHandler = ruleHandler();        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        ruleService.registerDefault(ruleDTO);        //3. register metadata        registerMetadata(dto);        //4. register contextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
1.3.1.1 Register Selector#
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

Construct contextPath, find if the selector information exists, if it does, return id; if it doesn't, create the default selector information.

    @Override    public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {        // build contextPath        String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());        // Find if selector information exists by name        SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);        if (Objects.isNull(selectorDO)) {            // Create a default selector message if it does not exist            return registerSelector(contextPath, pluginName, selectorHandler);        }        return selectorDO.getId();    }
  • Default selector information

    Construct the default selector information and its conditional properties here.

   //register selector   private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {        //build selector        SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());        selectorDTO.setHandle(selectorHandler);        //register default selector        return registerDefault(selectorDTO);    }     //build selector    private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {        //build default        SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);        selectorDTO.setPluginId(pluginId);         //build the conditional properties of the default selector        selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));        return selectorDTO;    }
  • Build default selector
private SelectorDTO buildDefaultSelectorDTO(final String name) {    return SelectorDTO.builder()            .name(name) // name            .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // default type cutom            .matchMode(MatchModeEnum.AND.getCode()) //default match mode            .enabled(Boolean.TRUE)  //enable            .loged(Boolean.TRUE)  //log            .continued(Boolean.TRUE)             .sort(1)             .build();}
  • Build default selector conditional properties
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {    SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();    selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI    selectorConditionDTO.setParamName("/");    selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default  match    selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX);     return Collections.singletonList(selectorConditionDTO);}
  • Register default selector
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) {    //selector information    SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);    //selector conditional properties    List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();    if (StringUtils.isEmpty(selectorDTO.getId())) {        // insert selector information into the database        selectorMapper.insertSelective(selectorDO);          // inserting selector conditional properties to the database        selectorConditionDTOs.forEach(selectorConditionDTO -> {            selectorConditionDTO.setSelectorId(selectorDO.getId());                        selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));        });    }    // Publish synchronization events to synchronize selection information and its conditional attributes to the gateway    publishEvent(selectorDO, selectorConditionDTOs);    return selectorDO.getId();}
1.3.1.2 Registration Rules#

In the second step of registering the service, start building the default rules and then register the rules.

@Override    public String register(final MetaDataRegisterDTO dto) {        //1. handle selector        //......                //2. handle rule                String ruleHandler = ruleHandler();        // build default rule        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        // register rule        ruleService.registerDefault(ruleDTO);                //3. reigster metadata        //......                //4. register ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • 默认规则处理属性
    @Override    protected String ruleHandler() {        // default rule        return new DubboRuleHandle().toJson();    }

Dubbo plugin default rule handling properties.

public class DubboRuleHandle implements RuleHandle {
    /**     * dubbo version.     */    private String version;
    /**     * group.     */    private String group;
    /**     * retry.     */    private Integer retries = 0;
    /**     * loadbalance:RANDOM     */    private String loadbalance = LoadBalanceEnum.RANDOM.getName();
    /**     * timeout default 3000     */    private long timeout = Constants.TIME_OUT;}
  • build default rule
  // build default rule    private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {        return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());    }   //  build default rule    private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {        RuleDTO ruleDTO = RuleDTO.builder()                .selectorId(selectorId)                .name(ruleName)                 .matchMode(MatchModeEnum.AND.getCode())                 .enabled(Boolean.TRUE)                 .loged(Boolean.TRUE)                 .sort(1)                .handle(ruleHandler)                .build();        RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()                .paramType(ParamTypeEnum.URI.getName())                 .paramName("/")                .paramValue(path)                 .build();        if (path.indexOf("*") > 1) {            ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias());         } else {            ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias());         }        ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));        return ruleDTO;    }
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

Registration rules: insert records to the database and publish events to the gateway for data synchronization.


    @Override    public String registerDefault(final RuleDTO ruleDTO) {        RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());        if (Objects.nonNull(exist)) {            return "";        }
        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);        List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();        if (StringUtils.isEmpty(ruleDTO.getId())) {            // insert rule information into the database            ruleMapper.insertSelective(ruleDO);            //insert  rule body conditional attributes into the database            ruleConditions.forEach(ruleConditionDTO -> {                ruleConditionDTO.setRuleId(ruleDO.getId());                     ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));            });        }        // Publish events to the gateway for data synchronization        publishEvent(ruleDO, ruleConditions);        return ruleDO.getId();    }
1.3.1.3 Register Metadata#

Metadata is mainly used for RPC service calls.

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. register selector        //......                //2. register rule        //......                //3. register metadata        registerMetadata(dto);                //4. register ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDubboServiceImpl#registerMetadata()

    Insert or update metadata and then publish sync events to the gateway.

    @Override    protected void registerMetadata(final MetaDataRegisterDTO dto) {            // get metaDataService            MetaDataService metaDataService = getMetaDataService();            MetaDataDO exist = metaDataService.findByPath(dto.getPath());            //insert or update metadata            metaDataService.saveOrUpdateMetaData(exist, dto);    }
    @Override    public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {        DataEventTypeEnum eventType;        // DTO->DO        MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);        // insert data        if (Objects.isNull(exist)) {            Timestamp currentTime = new Timestamp(System.currentTimeMillis());            metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());            metaDataDO.setDateCreated(currentTime);            metaDataDO.setDateUpdated(currentTime);            metaDataMapper.insert(metaDataDO);            eventType = DataEventTypeEnum.CREATE;        } else {            // update            metaDataDO.setId(exist.getId());            metaDataMapper.update(metaDataDO);            eventType = DataEventTypeEnum.UPDATE;        }        // Publish sync events to gateway        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,                Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));    }
1.3.2 Register URI#
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

The server side receives the URI information registered by the client and processes it.

    @Override    public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {        String result;        String key = key(selectorName);        try {            this.removeFallBack(key);            // register URI            result = this.doRegisterURI(selectorName, uriList);            logger.info("Register success: {},{}", selectorName, uriList);        } catch (Exception ex) {            logger.warn("Register exception: cause:{}", ex.getMessage());            result = "";            // Retry after registration failure            this.addFallback(key, new FallbackHolder(selectorName, uriList));        }        return result;    }
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

Get a valid URI from the URI registered by the client, update the corresponding selector handle property, and send a selector update event to the gateway.

@Override    public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {        //check        if (CollectionUtils.isEmpty(uriList)) {            return "";        }                SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        if (Objects.isNull(selectorDO)) {            throw new ShenyuException("doRegister Failed to execute,wait to retry.");        }        // gte valid URI        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());        // build handle        String handler = buildHandle(validUriList, selectorDO);        if (handler != null) {            selectorDO.setHandle(handler);            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));            selectorData.setHandle(handler);            // Update the handle property of the selector to the database            selectorService.updateSelective(selectorDO);            // Send selector update events to the gateway            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));        }        return ShenyuResultMessage.SUCCESS;    }

The source code analysis on service registration is completed as well as the analysis flow chart is as follows.

The next step is to analyze how the dubbo plugin initiates calls to the http service based on this information.

2. Service Invocation#

The dubbo plugin is the core processing plugin used by the ShenYu gateway to convert http requests into the dubbo protocol and invoke the dubbo service.

Take the case provided by the official website Quick Start with Dubbo as an example, a dubbo service is registered with shenyu-admin through the registry, and then requested through the ShenYu gateway proxy, the request is as follows.

GET http://localhost:9195/dubbo/findById?id=100Accept: application/json

The class inheritance relationship in the Dubbo plugin is as follows.

  • ShenyuPlugin: top-level interface, defining interface methods.
  • AbstractShenyuPlugin: abstract class that implements plugin common logic.
  • AbstractDubboPlugin: dubbo plugin abstract class, implementing dubbo common logic.
  • ApacheDubboPlugin: ApacheDubbo plugin.

ShenYu Gateway supports ApacheDubbo and AlibabaDubbo\

2.1 Receive requests#

After passing the ShenYu gateway proxy, the request entry is ShenyuWebHandler, which implements the org.springframework.web.server.WebHandler interface.

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {    //......        /**     * hanlde request     */    @Override    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {       // execute default plugin chain        Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);        if (scheduled) {            return execute.subscribeOn(scheduler);        }        return execute;    }        private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
        private int index;
        private final List<ShenyuPlugin> plugins;
          DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {            this.plugins = plugins;        }
        /**         * execute.         */        @Override        public Mono<Void> execute(final ServerWebExchange exchange) {            return Mono.defer(() -> {                if (this.index < plugins.size()) {                    // get plugin                     ShenyuPlugin plugin = plugins.get(this.index++);                    boolean skip = plugin.skip(exchange);                    if (skip) {                        // next                        return this.execute(exchange);                    }                    // execute                    return plugin.execute(exchange, this);                }                return Mono.empty();            });        }    }}

2.2 Match Rule#

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

Execute the matching logic for selectors and rules in the execute() method.

  • Matching selectors.
  • Matching rules.
  • Execute the plugin.
@Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        // plugin name        String pluginName = named();        // plugin data        PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);        if (pluginData != null && pluginData.getEnabled()) {            // selector data            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);            if (CollectionUtils.isEmpty(selectors)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            // match selector            SelectorData selectorData = matchSelector(exchange, selectors);            if (Objects.isNull(selectorData)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            selectorLog(selectorData, pluginName);            // rule data            List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());            if (CollectionUtils.isEmpty(rules)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            // match rule            RuleData rule;            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {                //get last                rule = rules.get(rules.size() - 1);            } else {                rule = matchRule(exchange, rules);            }            if (Objects.isNull(rule)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            ruleLog(rule, pluginName);            // execute            return doExecute(exchange, chain, selectorData, rule);        }        return chain.execute(exchange);    }

2.3 Execute GlobalPlugin#

  • org.apache.shenyu.plugin.global.GlobalPlugin#execute()

GlobalPlugin is a global plugin that constructs contextual information in the execute() method.

public class GlobalPlugin implements ShenyuPlugin {    // shenyu context    private final ShenyuContextBuilder builder;        //......        @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {       // build context information to be passed into the exchange        ShenyuContext shenyuContext = builder.build(exchange);        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);        return chain.execute(exchange);    }        //......}
  • org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()

Build the default context information.

public class DefaultShenyuContextBuilder implements ShenyuContextBuilder {    //......        @Override    public ShenyuContext build(final ServerWebExchange exchange) {        //build data        Pair<String, MetaData> buildData = buildData(exchange);        //wrap ShenyuContext        return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());    }        private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {        //......        //get the metadata according to the requested uri        MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());        if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {            exchange.getAttributes().put(Constants.META_DATA, metaData);            return Pair.of(metaData.getRpcType(), metaData);        } else {            return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());        }    }    //set the default context information    private ShenyuContext buildDefaultContext(final ServerHttpRequest request) {        String appKey = request.getHeaders().getFirst(Constants.APP_KEY);        String sign = request.getHeaders().getFirst(Constants.SIGN);        String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);        ShenyuContext shenyuContext = new ShenyuContext();        String path = request.getURI().getPath();        shenyuContext.setPath(path);         shenyuContext.setAppKey(appKey);        shenyuContext.setSign(sign);        shenyuContext.setTimestamp(timestamp);        shenyuContext.setStartDateTime(LocalDateTime.now());        Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name()));        return shenyuContext;    } }
  • org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()

wrap ShenyuContext:

public class DubboShenyuContextDecorator implements ShenyuContextDecorator {        @Override    public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) {        shenyuContext.setModule(metaData.getAppName());        shenyuContext.setMethod(metaData.getServiceName());         shenyuContext.setContextPath(metaData.getContextPath());         shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName());         return shenyuContext;    }        @Override    public String rpcType() {        return RpcTypeEnum.DUBBO.getName();    }}

2.4 Execute RpcParamTransformPlugin#

The RpcParamTransformPlugin is responsible for reading the parameters from the http request, saving them in the exchange and passing them to the rpc service.

  • org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()

In the execute() method, the core logic of the plugin is executed: get the request information from exchange and process the parameters according to the form of content passed in by the request.

public class RpcParamTransformPlugin implements ShenyuPlugin {
    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        //get request information from exchange        ServerHttpRequest request = exchange.getRequest();        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        if (Objects.nonNull(shenyuContext)) {           // APPLICATION_JSON            MediaType mediaType = request.getHeaders().getContentType();            if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {                return body(exchange, request, chain);            }            // APPLICATION_FORM_URLENCODED            if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {                return formData(exchange, request, chain);            }            //query            return query(exchange, request, chain);        }        return chain.execute(exchange);    }        //APPLICATION_JSON    private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())                .flatMap(body -> {                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中                    return chain.execute(exchange);                }));    }   // APPLICATION_FORM_URLENCODED    private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())                .flatMap(map -> {                    String param = resolveBodyFromRequest(map);                    LinkedMultiValueMap<String, String> linkedMultiValueMap;                    try {                        linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据                    } catch (UnsupportedEncodingException e) {                        return Mono.error(e);                    }                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中                    return chain.execute(exchange);                }));    }    //query    private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中        return chain.execute(exchange);    }    //...... }

2.5 Execute DubboPlugin#

  • org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()

In the doExecute() method, the main purpose is to check the metadata and parameters.

public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin {        @Override    public Mono<Void> doExecute(final ServerWebExchange exchange,                                   final ShenyuPluginChain chain,                                   final SelectorData selector,                                   final RuleData rule) {        //param        String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);        //context        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        //metaData        MetaData metaData = exchange.getAttribute(Constants.META_DATA);        //check metaData        if (!checkMetaData(metaData)) {            LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null);            return WebFluxResultUtils.result(exchange, error);        }        //check        if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null);            return WebFluxResultUtils.result(exchange, error);        }        //set rpcContext        this.rpcContext(exchange);        //dubbo invoke        return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);    }}
  • org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()

Set special context information in the doDubboInvoker() method, and then start the dubbo generalization call.

public class ApacheDubboPlugin extends AbstractDubboPlugin {        @Override    protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,                                        final ShenyuPluginChain chain,                                        final SelectorData selector,                                        final RuleData rule,                                        final MetaData metaData,                                        final String param) {        //set the current selector and rule information, and request address for dubbo graying support        RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());        RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());        RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());        //dubbo generic invoker        final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);        //execute next plugin in chain        return result.then(chain.execute(exchange));    }}
  • org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()

genericInvoker() method.

  • Gets the ReferenceConfig object.
  • Gets the generalization service GenericService object.
  • Constructs the request parameter pair object.
  • Initiates an asynchronous generalization invocation.
public class ApacheDubboProxyService {    //...... 
    /**     * Generic invoker object.     */    public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {        //1.Get the ReferenceConfig object        ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath());
        if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {            //Failure of the current cache information            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());            //Reinitialization with metadata            reference = ApacheDubboConfigCache.getInstance().initRef(metaData);        }        //2.Get the GenericService object of the generalization service        GenericService genericService = reference.get();        //3.Constructing the request parameter pair object        Pair<String[], Object[]> pair;        if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) {            pair = new ImmutablePair<>(new String[]{}, new Object[]{});        } else {            pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());        }        //4.Initiating asynchronous generalization calls        return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {            //handle result            if (Objects.isNull(ret)) {                ret = Constants.DUBBO_RPC_RESULT_EMPTY;            }            exchange.getAttributes().put(Constants.RPC_RESULT, ret);            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());            return ret;        })).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常    }        //Generalized calls, asynchronous operations    private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {        genericService.$invoke(method, parameterTypes, args);        Object resultFromFuture = RpcContext.getContext().getFuture();        return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);    }}

Calling the dubbo service at the gateway can be achieved by generalizing the call.

The ReferenceConfig object is the key object to support generalization calls , and its initialization operation is done during data synchronization. There are two parts of data involved here, one is the synchronized plugin handler information and the other is the synchronized plugin metadata information.

  • org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()

When the plugin data is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The initialization operation is performed in handlerPlugin().

public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {    //......        //Initializing the configuration cache   protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);
    @Override    public void handlerPlugin(final PluginData pluginData) {        if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {            //Data deserialization            DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);            DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);            if (Objects.isNull(dubboRegisterConfig)) {                return;            }            if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {                // Perform initialization operations                this.initConfigCache(dubboRegisterConfig);            }            Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);        }    }    //......}
  • org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()

Perform initialization operations.

public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {
    @Override    protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {        //perform initialization operations        ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);        //cached results before failure        ApacheDubboConfigCache.getInstance().invalidateAll();    }}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()

In the initialization, set registryConfig and consumerConfig.

public final class ApacheDubboConfigCache extends DubboConfigCache {    //......     /**     * init     */    public void init(final DubboRegisterConfig dubboRegisterConfig) {        //ApplicationConfig        if (Objects.isNull(applicationConfig)) {            applicationConfig = new ApplicationConfig("shenyu_proxy");        }        //When the protocol or address changes, you need to update the registryConfig        if (needUpdateRegistryConfig(dubboRegisterConfig)) {            RegistryConfig registryConfigTemp = new RegistryConfig();            registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol());            registryConfigTemp.setId("shenyu_proxy");            registryConfigTemp.setRegister(false);            registryConfigTemp.setAddress(dubboRegisterConfig.getRegister());            Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup);            registryConfig = registryConfigTemp;        }        //ConsumerConfig        if (Objects.isNull(consumerConfig)) {            consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> {                ConsumerConfig consumerConfig = new ConsumerConfig();                consumerConfig.refresh();                return consumerConfig;            });                       //ConsumerConfig            Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool);             Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads);            Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads);            Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues);        }    }        //Does the registration configuration need to be updated    private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) {        if (Objects.isNull(registryConfig)) {            return true;        }        return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol())                || !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress())                || !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol());    }
    //......}
  • org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()

When the metadata is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The metadata update operation is performed in the onSubscribe() method.

public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber {    //local memory cache    private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();
    //update metaData    public void onSubscribe(final MetaData metaData) {        // dubbo        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {            //Whether the corresponding metadata exists            MetaData exist = META_DATA.get(metaData.getPath());            if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) {                // initRef                ApacheDubboConfigCache.getInstance().initRef(metaData);            } else {                // The corresponding metadata has undergone an update operation                if (!Objects.equals(metaData.getServiceName(), exist.getServiceName())                        || !Objects.equals(metaData.getRpcExt(), exist.getRpcExt())                        || !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes())                        || !Objects.equals(metaData.getMethodName(), exist.getMethodName())) {                    //Build ReferenceConfig again based on the latest metadata                    ApacheDubboConfigCache.getInstance().build(metaData);                }            }            //local memory cache            META_DATA.put(metaData.getPath(), metaData);        }    }
    //dalete    public void unSubscribe(final MetaData metaData) {        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {            //使ReferenceConfig失效            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());            META_DATA.remove(metaData.getPath());        }    }}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()

Build ReferenceConfig objects from metaData.

public final class ApacheDubboConfigCache extends DubboConfigCache {    //......        public ReferenceConfig<GenericService> initRef(final MetaData metaData) {            try {                //First try to get it from the cache, and return it directly if it exists                ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());                if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {                    return referenceConfig;                }            } catch (ExecutionException e) {                LOG.error("init dubbo ref exception", e);            }                      //build if not exist            return build(metaData);        }
        /**         * Build reference config.         */        @SuppressWarnings("deprecation")        public ReferenceConfig<GenericService> build(final MetaData metaData) {            if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) {                return new ReferenceConfig<>();            }            ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //ReferenceConfig            reference.setGeneric("true"); //generic invoke            reference.setAsync(true);//async
            reference.setApplication(applicationConfig);//applicationConfig            reference.setRegistry(registryConfig);//registryConfig            reference.setConsumer(consumerConfig);//consumerConfig            reference.setInterface(metaData.getServiceName());//serviceName            reference.setProtocol("dubbo");//dubbo            reference.setCheck(false);             reference.setLoadbalance("gray");//gray
            Map<String, String> parameters = new HashMap<>(2);            parameters.put("dispatcher", "direct");            reference.setParameters(parameters);
            String rpcExt = metaData.getRpcExt();//rpc ext param            DubboParam dubboParam = parserToDubboParam(rpcExt);            if (Objects.nonNull(dubboParam)) {                if (StringUtils.isNoneBlank(dubboParam.getVersion())) {                    reference.setVersion(dubboParam.getVersion());//version                }                if (StringUtils.isNoneBlank(dubboParam.getGroup())) {                    reference.setGroup(dubboParam.getGroup());//group                }                if (StringUtils.isNoneBlank(dubboParam.getUrl())) {                    reference.setUrl(dubboParam.getUrl());//url                }                if (StringUtils.isNoneBlank(dubboParam.getCluster())) {                    reference.setCluster(dubboParam.getCluster());                }                Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout                Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires                Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent            }            try {                //get GenericService                Object obj = reference.get();                if (Objects.nonNull(obj)) {                    LOG.info("init apache dubbo reference success there meteData is :{}", metaData);                    //cache reference                    cache.put(metaData.getPath(), reference);                }            } catch (Exception e) {                LOG.error("init apache dubbo reference exception", e);            }            return reference;        }    //......    }

2.6 Execute ResponsePlugin#

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

The response results are handled by the ResponsePlugin plugin.

    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // handle results according to rpc type        return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);    }

The processing type is determined by MessageWriter and the class inheritance relationship is as follows.

  • MessageWriter: interface, defining message processing methods.
  • NettyClientMessageWriter: processing of Netty call results.
  • RPCMessageWriter: processing the results of RPC calls.
  • WebClientMessageWriter: processing the results of WebClient calls.

Dubbo service call, the processing result is RPCMessageWriter of course.

  • org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()

Process the response results in the writeWith() method.


public class RPCMessageWriter implements MessageWriter {
    @Override    public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        return chain.execute(exchange).then(Mono.defer(() -> {            Object result = exchange.getAttribute(Constants.RPC_RESULT); //result            if (Objects.isNull(result)) {                 Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);                return WebFluxResultUtils.result(exchange, error);            }            return WebFluxResultUtils.result(exchange, result);        }));    }}

At this point in the analysis, the source code analysis of the Dubbo plugin is complete, and the analysis flow chart is as follows.

3. Summary#

The source code analysis in this article starts from Dubbo service registration to Dubbo plug-in service calls. The Dubbo plugin is mainly used to handle the conversion of http requests to the dubbo protocol, and the main logic is implemented through generalized calls.