This article is based on the source code analysis of version 'shenyu-2.6.1'
Content
Shenyu provides a mechanism to customize its own plugins or modify existing plugins, which is implemented internally through the configuration of extPlugin. It needs to meet the following two points:
Implement interface ShenyuPlugin or PluginDataHandler.
After packaging the implemented package, place it in the corresponding path of 'shenyu. extPlugin. path'.
The class that truly implements this logic is' ShenyuLoaderService '. Now let's take a look at how this class handles it.
public ShenyuLoaderService(final ShenyuWebHandler webHandler, final CommonPluginDataSubscriber subscriber, final ShenyuConfig shenyuConfig) { // Information subscription for plugin information this.subscriber = subscriber; // The WebHandler encapsulated by Shenyu contains all the plugin logic this.webHandler = webHandler; // configuration information this.shenyuConfig = shenyuConfig; // The configuration information of the extension plugin, such as path, whether it is enabled, how many threads are enabled to process, and the frequency of loading checks ExtPlugin config = shenyuConfig.getExtPlugin(); // If enabled, create a scheduled task to check and load if (config.getEnabled()) { // Create a scheduled task with a specified thread name ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(config.getThreads(), ShenyuThreadFactory.create("plugin-ext-loader", true)); // Create a task to be executed at a fixed frequency, with a default time of 30 seconds and execution every 300 seconds executor.scheduleAtFixedRate(() -> loadExtOrUploadPlugins(null), config.getScheduleDelay(), config.getScheduleTime(), TimeUnit.SECONDS); } }
This class has the following properties:
WebHandler: This class is the entry point for shenyu to process requests, referencing all plugin data. After the extension plugin is loaded, it needs to be updated.
Subscriber: This class is the entry point for the subscription of plugins, referencing the subscription processing classes of all plugins. After the extension configuration is loaded, synchronous updates are also required.
Executor: A scheduled task will be created inside' ShenyuLoaderService 'to periodically scan and load jar packages under the specified path, facilitating the loading of extended plugins and achieving dynamic discovery
By default, it will scan every 300 seconds after 30 seconds of startup.
Meanwhile, the decision to enable extension plugin functionality can be made through the configuration of shenyu. extPlugin. enabled.
The above configurations can be adjusted in the configuration file:
shenyu:extPlugin:path:# Storage directory for extension pluginsenabled:true# Is the extension function enabledthreads:1# Number of threads loaded by scanningscheduleTime:300# The frequency of task executionscheduleDelay:30# How long after the task starts to execute
Next, let's take a look at the loading logic:
public void loadExtOrUploadPlugins(final PluginData uploadedJarResource) { try { List<ShenyuLoaderResult> plugins = new ArrayList<>(); // Obtain the holding object of ShenyuPluginClassloader ShenyuPluginClassloaderHolder singleton = ShenyuPluginClassloaderHolder.getSingleton(); if (Objects.isNull(uploadedJarResource)) { // If the parameter is empty, load all jar packages from the extended directory // PluginJar: Data containing the ShenyuPlugin interface and PluginDataHandler interface List<PluginJarParser.PluginJar> uploadPluginJars = ShenyuExtPathPluginJarLoader.loadExtendPlugins(shenyuConfig.getExtPlugin().getPath()); // Traverse all pending plugins for (PluginJarParser.PluginJar extPath : uploadPluginJars) { LOG.info("shenyu extPlugin find new {} to load", extPath.getAbsolutePath()); // Use the loader of the extension plugin to load the specified plugin, facilitating subsequent loading and unloading ShenyuPluginClassLoader extPathClassLoader = singleton.createPluginClassLoader(extPath); // Using ShenyuPluginClassLoader for loading // The main logic is to determine whether to implement ShenyuPlugin interface, PluginDataHandler interface, or identify annotations such as @ Component \ @ Service. If so, register as SpringBean // Construct ShenyuLoaderResult object plugins.addAll(extPathClassLoader.loadUploadedJarPlugins()); } } else { // Load the specified jar, with the same logic as loading all PluginJarParser.PluginJar pluginJar = PluginJarParser.parseJar(Base64.getDecoder().decode(uploadedJarResource.getPluginJar())); LOG.info("shenyu upload plugin jar find new {} to load", pluginJar.getJarKey()); ShenyuPluginClassLoader uploadPluginClassLoader = singleton.createPluginClassLoader(pluginJar); plugins.addAll(uploadPluginClassLoader.loadUploadedJarPlugins()); } // Add the extended plugins to the plugin list of ShenyuWebHandler, and subsequent requests will go through the added plugin content loaderPlugins(plugins); } catch (Exception e) { LOG.error("shenyu plugins load has error ", e); } }
The logic processed by this method:
Check if the parameter uploadedJarResource has a value. If not, all will be loaded. Otherwise, load the specified resource jar package for processing.
Retrieve the specified jar package from shenyu. extPlugin. path and encapsulate it as a PluginJar object, which contains the following information about the jar package:
version: version information
groupId: The groupId of the package
artifactId: The artifactId of the package
absolutePath: Absolute path
clazzMap: Bytecode corresponding to class
resourceMap: Bytecode of jar package
Create a corresponding ClassLoader using ShenyuPluginClassloaderHolder, with the corresponding class being 'ShenyuPluginClassLoader', and load the corresponding class accordingly.
Call ShenyuPluginClassLoader. loadUploadedJarPlugins to load the corresponding class and register it as a Spring Bean, which can be managed using the Spring container
Call the loaderPlugins method to update the extended plugin to'webHandler and subscriber.
For the content in the provided jar package, the loader will only handle classes of the specified interface type, and the implementation logic is in the ShenyuPluginClassLoader.loadUploadedJarPlugins() method.
public List<ShenyuLoaderResult> loadUploadedJarPlugins() { List<ShenyuLoaderResult> results = new ArrayList<>(); // All class mapping relationships Set<String> names = pluginJar.getClazzMap().keySet(); // Traverse all classes names.forEach(className -> { Object instance; try { // Try creating objects and, if possible, add them to the Spring container instance = getOrCreateSpringBean(className); if (Objects.nonNull(instance)) { // Building the ShenyuLoaderResult object results.add(buildResult(instance)); LOG.info("The class successfully loaded into a upload-Jar-plugin {} is registered as a spring bean", className); } } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) { LOG.warn("Registering upload-Jar-plugins succeeds spring bean fails:{}", className, e); } }); return results; }
This method is responsible for building all eligible objects and encapsulating them into a ShenyuLoaderResult object. This object is encapsulated for the created object and will be processed in the method buildResult().
private ShenyuLoaderResult buildResult(final Object instance) { ShenyuLoaderResult result = new ShenyuLoaderResult(); // Does the created object implement ShenyuPlugin if (instance instanceof ShenyuPlugin) { result.setShenyuPlugin((ShenyuPlugin) instance); // Does the created object implement PluginDataHandler } else if (instance instanceof PluginDataHandler) { result.setPluginDataHandler((PluginDataHandler) instance); } return result; }
Simultaneously enter the method getOrCreatSpringBean() for further analysis:
private <T> T getOrCreateSpringBean(final String className) throws ClassNotFoundException, IllegalAccessException, InstantiationException { // Confirm if it has been registered. If so, do not process it and return directly if (SpringBeanUtils.getInstance().existBean(className)) { return SpringBeanUtils.getInstance().getBeanByClassName(className); } lock.lock(); try { // Double check, T inst = SpringBeanUtils.getInstance().getBeanByClassName(className); if (Objects.isNull(inst)) { // Using ShenyuPluginClassLoader to load classes Class<?> clazz = Class.forName(className, false, this); //Exclude ShenyuPlugin subclass and PluginDataHandler subclass // without adding @Component @Service annotation // Confirm if it is a subclass of ShenyuPlugin or PluginDataHandler boolean next = ShenyuPlugin.class.isAssignableFrom(clazz) || PluginDataHandler.class.isAssignableFrom(clazz); if (!next) { // If not, confirm if @ Component and @ Service annotations are identified Annotation[] annotations = clazz.getAnnotations(); next = Arrays.stream(annotations).anyMatch(e -> e.annotationType().equals(Component.class) || e.annotationType().equals(Service.class)); } if (next) { // If the above content is met, register the bean GenericBeanDefinition beanDefinition = new GenericBeanDefinition(); beanDefinition.setBeanClassName(className); beanDefinition.setAutowireCandidate(true); beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); // Registering beans String beanName = SpringBeanUtils.getInstance().registerBean(beanDefinition, this); // create object inst = SpringBeanUtils.getInstance().getBeanByClassName(beanName); } } return inst; } finally { lock.unlock(); } }
The logic is roughly as follows:
Check if the interface ShenyuPlugin or PluginDataHandler has been implemented. If not, check if @Component or @Service has been identified`.
If the condition of 1 is met, register the object in the Spring container and return the created object.
After the plugin registration is successful, the plugin is only instantiated, but it will not take effect yet because it has not been added to Shenyu's plugin chain. The synchronization logic is implemented by the loaderPlugins() method.
private void loaderPlugins(final List<ShenyuLoaderResult> results) { if (CollectionUtils.isEmpty(results)) { return; } // Get all objects that implement the interface ShenyuPlugin List<ShenyuPlugin> shenyuExtendPlugins = results.stream().map(ShenyuLoaderResult::getShenyuPlugin).filter(Objects::nonNull).collect(Collectors.toList()); // Synchronize updating plugins in webHandler webHandler.putExtPlugins(shenyuExtendPlugins); // Get all objects that implement the interface PluginDataHandler List<PluginDataHandler> handlers = results.stream().map(ShenyuLoaderResult::getPluginDataHandler).filter(Objects::nonNull).collect(Collectors.toList()); // Synchronize updating handlers in subscriber subscriber.putExtendPluginDataHandler(handlers); }
The logic of this method processes two data points:
Synchronize the data that implements the ShenyuPlugin interface to the plugins list of webHandler.
public void putExtPlugins(final List<ShenyuPlugin> extPlugins) { if (CollectionUtils.isEmpty(extPlugins)) { return; } // Filter out newly added plugins final List<ShenyuPlugin> shenyuAddPlugins = extPlugins.stream() .filter(e -> plugins.stream().noneMatch(plugin -> plugin.named().equals(e.named()))) .collect(Collectors.toList()); // Filter out updated plugins and determine if they have the same name as the old one, then it is an update final List<ShenyuPlugin> shenyuUpdatePlugins = extPlugins.stream() .filter(e -> plugins.stream().anyMatch(plugin -> plugin.named().equals(e.named()))) .collect(Collectors.toList()); // If there is no data, skip if (CollectionUtils.isEmpty(shenyuAddPlugins) && CollectionUtils.isEmpty(shenyuUpdatePlugins)) { return; } // Copy old data // copy new list List<ShenyuPlugin> newPluginList = new ArrayList<>(plugins); // Add new plugin data // Add extend plugin from pluginData or shenyu ext-lib this.sourcePlugins.addAll(shenyuAddPlugins); // Add new data if (CollectionUtils.isNotEmpty(shenyuAddPlugins)) { shenyuAddPlugins.forEach(plugin -> LOG.info("shenyu auto add extends plugins:{}", plugin.named())); newPluginList.addAll(shenyuAddPlugins); } // Modify updated data if (CollectionUtils.isNotEmpty(shenyuUpdatePlugins)) { shenyuUpdatePlugins.forEach(plugin -> LOG.info("shenyu auto update extends plugins:{}", plugin.named())); for (ShenyuPlugin updatePlugin : shenyuUpdatePlugins) { for (int i = 0; i < newPluginList.size(); i++) { if (newPluginList.get(i).named().equals(updatePlugin.named())) { newPluginList.set(i, updatePlugin); } } for (int i = 0; i < this.sourcePlugins.size(); i++) { if (this.sourcePlugins.get(i).named().equals(updatePlugin.named())) { this.sourcePlugins.set(i, updatePlugin); } } } } // REORDER plugins = sortPlugins(newPluginList); }
Synchronize the data that implements the PluginDataHandler interface to the handlers list of the subscriber.
public void putExtendPluginDataHandler(final List<PluginDataHandler> handlers) { if (CollectionUtils.isEmpty(handlers)) { return; } // Traverse all data for (PluginDataHandler handler : handlers) { String pluginNamed = handler.pluginNamed(); // Update existing PluginDataHandler list MapUtils.computeIfAbsent(handlerMap, pluginNamed, name -> { LOG.info("shenyu auto add extends plugin data handler name is :{}", pluginNamed); return handler; }); } }
At this point, the analysis of the loading process of the extension plugin is completed.
The ShenYu gateway uses the divide plugin to handle http requests. You can see the official documentation Quick start with Http to learn how to use this plugin.
This article is based on shenyu-2.4.3 version for source code analysis, please refer to Http Proxy for the introduction of the official website.
Annotation scanning is done through SpringMvcClientBeanPostProcessor, which implements the BeanPostProcessor interface and is a post-processor provided by Spring.
During constructor instantiation.
Read the property configuration
Add annotations, read path information
Start the registry and register with shenyu-admin
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor { //... /** * Constructor instantiation */ public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) { // 1. read Properties Properties props = clientConfig.getProps(); this.appName = props.getProperty(ShenyuClientConstants.APP_NAME); this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH, ""); if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) { String errorMsg = "http register param must config the appName or contextPath"; LOG.error(errorMsg); throw new ShenyuClientIllegalArgumentException(errorMsg); } this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString())); // 2. add annotation mappingAnnotation.add(ShenyuSpringMvcClient.class); mappingAnnotation.add(PostMapping.class); mappingAnnotation.add(GetMapping.class); mappingAnnotation.add(DeleteMapping.class); mappingAnnotation.add(PutMapping.class); mappingAnnotation.add(RequestMapping.class); // 3. start register cneter publisher.start(shenyuClientRegisterRepository); } @Override public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException { // override post process return bean; }
Rewrite post-processor logic: read annotation information, construct metadata objects and URI objects, and register them with shenyu-admin.
@Override public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException { // 1. If the all service is registered or is not a Controller class, it is not handled if (Boolean.TRUE.equals(isFull) || !hasAnnotation(bean.getClass(), Controller.class)) { return bean; } // 2. Read the annotations on the class ShenyuSpringMvcClient final ShenyuSpringMvcClient beanShenyuClient = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class); // 2.1 build superPath final String superPath = buildApiSuperPath(bean.getClass()); // 2.2 whether to register the entire class method if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) { // build the metadata object and register it with shenyu-admin publisher.publishEvent(buildMetaDataDTO(beanShenyuClient, pathJoin(contextPath, superPath))); return bean; } // 3. read all methods final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass()); for (Method method : methods) { // 3.1 read the annotations on the method ShenyuSpringMvcClient ShenyuSpringMvcClient methodShenyuClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class); // If there is no annotation on the method, use the annotation on the class methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient; if (Objects.nonNull(methodShenyuClient)) { // 3.2 Build path information, build metadata objects, register with shenyu-admin publisher.publishEvent(buildMetaDataDTO(methodShenyuClient, buildApiPath(method, superPath))); } } return bean; }
If you are registering the whole service or not Controller class, do not handle it
read the annotation on the class ShenyuSpringMvcClient, if the whole class is registered, build the metadata object here and register it with shenyu-admin.
Annotation on the handler method ShenyuSpringMvcClient, build path information for the specific method, build the metadata object and then register it with shenyu-admin
There are two methods here that take path and need special instructions.
buildApiSuperPath()
Construct SuperPath: first take the path property from the annotation ShenyuSpringMvcClient on the class, if not, take the path information from the RequestMapping annotation on the current class.
private String buildApiSuperPath(@NonNull final Class<?> method) { // First take the path property from the annotation ShenyuSpringMvcClient on the class ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class); if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) { return shenyuSpringMvcClient.path(); } // Take the path information from the RequestMapping annotation of the current class RequestMapping requestMapping = AnnotationUtils.findAnnotation(method, RequestMapping.class); if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) { return requestMapping.path()[0]; } return ""; }
buildApiPath()
Build path: first read the annotation ShenyuSpringMvcClient on the method and build it if it exists; otherwise get the path information from other annotations on the method; complete path = contextPath(context information) + superPath(class information) + methodPath(method information).
private String buildApiPath(@NonNull final Method method, @NonNull final String superPath) { // 1. Read the annotation ShenyuSpringMvcClient on the method ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class); // 1.1 If path exists, build if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) { //1.2 path = contextPath+superPath+methodPath return pathJoin(contextPath, superPath, shenyuSpringMvcClient.path()); } // 2. Get path information from other annotations on the method final String path = getPathByMethod(method); if (StringUtils.isNotBlank(path)) { // 2.1 path = contextPath+superPath+methodPath return pathJoin(contextPath, superPath, path); } return pathJoin(contextPath, superPath); }
getPathByMethod()
Get path information from other annotations on the method, other annotations include.
ShenyuSpringMvcClient
PostMapping
GetMapping
DeleteMapping
PutMapping
RequestMapping
private String getPathByMethod(@NonNull final Method method) { // Iterate through interface annotations to get path information for (Class<? extends Annotation> mapping : mappingAnnotation) { final String pathByAnnotation = getPathByAnnotation(AnnotationUtils.findAnnotation(method, mapping), pathAttributeNames); if (StringUtils.isNotBlank(pathByAnnotation)) { return pathByAnnotation; } } return null; }
After the scanning annotation is finished, construct the metadata object and send the object to shenyu-admin to complete the registration.
Metadata
Includes the rule information of the currently registered method: contextPath, appName, registration path, description information, registration type, whether it is enabled, rule name and whether to register metadata.
private MetaDataRegisterDTO buildMetaDataDTO(@NonNull final ShenyuSpringMvcClient shenyuSpringMvcClient, final String path) { return MetaDataRegisterDTO.builder() .contextPath(contextPath) // contextPath .appName(appName) // appName .path(path) // Registered path, used when gateway rules match .pathDesc(shenyuSpringMvcClient.desc()) // desc info .rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, http type when default .enabled(shenyuSpringMvcClient.enabled()) // is enabled? .ruleName(StringUtils.defaultIfBlank(shenyuSpringMvcClient.ruleName(), path))//rule name .registerMetaData(shenyuSpringMvcClient.registerMetaData()) // whether to register metadata information .build(); }
The specific registration logic is implemented by the registration center, which has been analyzed in the previous articles and will not be analyzed in depth here.
ContextRegisterListener is responsible for registering the client's URI information to shenyu-admin, it implements the ApplicationListener interface, when the context refresh event ContextRefreshedEvent occurs, the onApplicationEvent() method is executed to implement the registration logic.
public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent>, BeanFactoryAware { //...... /** * Constructor instantiation */ public ContextRegisterListener(final PropertiesConfig clientConfig) { // read Properties final Properties props = clientConfig.getProps(); this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString())); this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH); if (Boolean.TRUE.equals(isFull)) { if (StringUtils.isBlank(contextPath)) { final String errorMsg = "http register param must config the contextPath"; LOG.error(errorMsg); throw new ShenyuClientIllegalArgumentException(errorMsg); } } this.port = Integer.parseInt(Optional.ofNullable(props.getProperty(ShenyuClientConstants.PORT)).orElseGet(() -> "-1")); this.appName = props.getProperty(ShenyuClientConstants.APP_NAME); this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP); this.host = props.getProperty(ShenyuClientConstants.HOST); } @Override public void setBeanFactory(final BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } // Execute application events @Override public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) { // The method is guaranteed to be executed once if (!registered.compareAndSet(false, true)) { return; } // 1. If you are registering for the entire service if (Boolean.TRUE.equals(isFull)) { // Build metadata and register publisher.publishEvent(buildMetaDataDTO()); } try { // get port final int mergedPort = port <= 0 ? PortUtils.findPort(beanFactory) : port; // 2. Constructing URI data and registering publisher.publishEvent(buildURIRegisterDTO(mergedPort)); } catch (ShenyuException e) { throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !"); } } // build URI data private URIRegisterDTO buildURIRegisterDTO(final int port) { return URIRegisterDTO.builder() .contextPath(this.contextPath) // contextPath .appName(appName) // appName .protocol(protocol) // protocol .host(IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host)) //host .port(port) // port .rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, default registration http type .build(); } // build MetaData private MetaDataRegisterDTO buildMetaDataDTO() { return MetaDataRegisterDTO.builder() .contextPath(contextPath) .appName(appName) .path(contextPath) .rpcType(RpcTypeEnum.HTTP.getName()) .enabled(true) .ruleName(contextPath) .build(); }}
The metadata and URI data registered by the client through the registry are processed in shenyu-admin, which is responsible for storing to the database and synchronizing to the shenyu gateway. The client registration processing logic of Divide plugin is in ShenyuClientRegisterDivideServiceImpl. The inheritance relationship is as follows.
The metadata MetaDataRegisterDTO object registered by the client through the registry is picked up and dropped in the register() method of shenyu-admin.
Build contextPath, find if the selector information exists, if it does, return id; if it doesn't, create the default selector information.
@Override public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) { // build contextPath String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName()); // Find if selector information exists by name SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName); if (Objects.isNull(selectorDO)) { // Create a default selector message if it does not exist return registerSelector(contextPath, pluginName, selectorHandler); } return selectorDO.getId(); }
Default Selector Information
Construct the default selector information and its conditional properties here.
//register selector private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) { // build selector SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId()); selectorDTO.setHandle(selectorHandler); //register default Selector return registerDefault(selectorDTO); } //build private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) { //build default SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath); selectorDTO.setPluginId(pluginId); //build the conditional properties of the default selector selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath)); return selectorDTO; }
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) { SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO(); selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI selectorConditionDTO.setParamName("/"); selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default match selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // default /contextPath/** return Collections.singletonList(selectorConditionDTO);}
Register default selector
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) { //selector info SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); //selector condition info List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); if (StringUtils.isEmpty(selectorDTO.getId())) { // insert selector information into the database selectorMapper.insertSelective(selectorDO); // insert selector condition information into the database selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); } // Publish synchronization events to synchronize selection information and its conditional attributes to the gateway publishEvent(selectorDO, selectorConditionDTOs); return selectorDO.getId();}
Get a valid URI from the URI registered by the client, update the corresponding selector handle property, and send a selector update event to the gateway.
@Override public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) { //check if (CollectionUtils.isEmpty(uriList)) { return ""; } //get selector SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); if (Objects.isNull(selectorDO)) { throw new ShenyuException("doRegister Failed to execute,wait to retry."); } // gte valid URI List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList()); // build handle String handler = buildHandle(validUriList, selectorDO); if (handler != null) { selectorDO.setHandle(handler); SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); selectorData.setHandle(handler); // Update the handle property of the selector to the database selectorService.updateSelective(selectorDO); // Send selector update events to the gateway eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData))); } return ShenyuResultMessage.SUCCESS; }
The source code analysis on service registration is completed as well as the analysis flow chart is as follows.
The next step is to analyze how the divide plugin initiates a call to the http service based on this information.
The divide plugin is the core processing plugin used by the gateway to handle http protocol requests.
Take the case provided on the official website Quick start with Http as an example, a direct connection request is as follows.
GET http://localhost:8189/order/findById?id=100Accept: application/json
After proxying through the ShenYu gateway, the request is as follows.
GET http://localhost:9195/http/order/findById?id=100Accept: application/json
The services proxied by the ShenYu gateway are still able to request the previous services, where the divide plugin comes into play. The class inheritance relationship is as follows.
After passing the ShenYu gateway proxy, the request entry is ShenyuWebHandler, which implements the org.springframework.web.server.WebHandler interface.
public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> { //...... /** * hanlde web reuest */ @Override public Mono<Void> handle(@NonNull final ServerWebExchange exchange) { // execute plugin chain Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange); if (scheduled) { return execute.subscribeOn(scheduler); } return execute; } private static class DefaultShenyuPluginChain implements ShenyuPluginChain { private int index; private final List<ShenyuPlugin> plugins; /** * Instantiating the default plugin chain */ DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) { this.plugins = plugins; } /** * Execute each plugin */ @Override public Mono<Void> execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { // get current plugin ShenyuPlugin plugin = plugins.get(this.index++); // is skip ? boolean skip = plugin.skip(exchange); if (skip) { // If skipped, execute the next return this.execute(exchange); } // execute current plugin return plugin.execute(exchange, this); } return Mono.empty(); }); } }}
Initiate the request call in the execute() method.
Get the specified timeout, number of retries
Initiate the request
Retry after failure according to the specified retry policy
public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin { protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class); @Override public final Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { // shenyu Context final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT); assert shenyuContext != null; // uri final URI uri = exchange.getAttribute(Constants.HTTP_URI); if (Objects.isNull(uri)) { Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_URL, null); return WebFluxResultUtils.result(exchange, error); } // get time out final long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L); final Duration duration = Duration.ofMillis(timeout); // get retry times final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0); // get retry strategy final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName); LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy is {}", uri.toASCIIString(), retryTimes, retryStrategy); // build header final HttpHeaders httpHeaders = buildHttpHeaders(exchange); // do request final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethodValue(), uri, httpHeaders, exchange.getRequest().getBody()) .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration))) .doOnError(e -> LOG.error(e.getMessage(), e)); // Retry Policy CURRENT, retries the current service. if (RetryEnum.CURRENT.getName().equals(retryStrategy)) { //old version of DividePlugin and SpringCloudPlugin will run on this return response.retryWhen(Retry.anyOf(TimeoutException.class, ConnectTimeoutException.class, ReadTimeoutException.class, IllegalStateException.class) .retryMax(retryTimes) .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true))) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)) .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange)); } // Retry for other services // Exclude services that have already been called final Set<URI> exclude = Sets.newHashSet(uri); // resend return resend(response, exchange, duration, httpHeaders, exclude, retryTimes) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)) .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange)); } private Mono<R> resend(final Mono<R> clientResponse, final ServerWebExchange exchange, final Duration duration, final HttpHeaders httpHeaders, final Set<URI> exclude, final int retryTimes) { Mono<R> result = clientResponse; // Retry according to the specified number of retries for (int i = 0; i < retryTimes; i++) { result = resend(result, exchange, duration, httpHeaders, exclude); } return result; } private Mono<R> resend(final Mono<R> response, final ServerWebExchange exchange, final Duration duration, final HttpHeaders httpHeaders, final Set<URI> exclude) { return response.onErrorResume(th -> { final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID); final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE); //Check available services final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId) .stream().filter(data -> { final String trimUri = data.getUrl().trim(); for (URI needToExclude : exclude) { // exclude already called if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) { return false; } } return true; }).collect(Collectors.toList()); if (CollectionUtils.isEmpty(upstreamList)) { // no need to retry anymore return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg())); } // requets ip final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress(); // Load Balance final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip); if (Objects.isNull(upstream)) { // no need to retry anymore return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg())); } final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain()); // Exclude uri that has already been called exclude.add(newUri); // Make another call return doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, httpHeaders, exchange.getRequest().getBody()) .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration))) .doOnError(e -> LOG.error(e.getMessage(), e)); }); } //......}
The source code analysis in this article starts from the http service registration to the divide plugin service calls. The divide plugin is mainly used to handle http requests. Some of the source code does not enter the in-depth analysis, such as the implementation of load balancing, service probe live, will continue to analyze in the following.
Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.
The Apache ShenYu gateway uses the dubbo plugin to make calls to the dubbo service. You can see the official documentation Dubbo Quick Start to learn how to use the plugin.
This article is based on shenyu-2.4.3 version for source code analysis, please refer to Dubbo Service Access for the introduction of the official website.
Declare the application service name, register the center address, use the dubbo protocol, declare the service interface, and the corresponding interface implementation class.
/** * DubboTestServiceImpl. */@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService { @Override @ShenyuDubboClient(path = "/findById", desc = "Query by Id") public DubboTest findById(final String id) { return new DubboTest(id, "hello world shenyu Apache, findById"); } //......}
In the interface implementation class, use the annotation @ShenyuDubboClient to register the service with shenyu-admin. The role of this annotation and its rationale will be analyzed later.
The configuration information in the configuration file application.yml.
In the configuration file, declare the registry address used by dubbo. The dubbo service registers with shenyu-admin, using the method http, and the registration address is http://localhost:9095.
Use the annotation @ShenyuDubboClient to register the service to the gateway. The simple demo is as follows.
// dubbo sevice@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService { @Override @ShenyuDubboClient(path = "/findById", desc = "Query by Id") // need to be registered method public DubboTest findById(final String id) { return new DubboTest(id, "hello world shenyu Apache, findById"); } //......}
annotation definition:
/** * Works on classes and methods */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})@Inheritedpublic @interface ShenyuDubboClient { //path String path(); //rule name String ruleName() default ""; //desc String desc() default ""; //enabled boolean enabled() default true;}
Annotation scanning is done through the ApacheDubboServiceBeanListener, which implements the ApplicationListener<ContextRefreshedEvent> interface and starts executing the event handler method when a context refresh event occurs during the Spring container startup onApplicationEvent().
During constructor instantiation.
Read property configuration
Start the thread pool
Start the registry for registering with shenyu-admin
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> { // ...... //Constructor public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) { //1.Read property configuration Properties props = clientConfig.getProps(); String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH); String appName = props.getProperty(ShenyuClientConstants.APP_NAME); if (StringUtils.isBlank(contextPath)) { throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName"); } this.contextPath = contextPath; this.appName = appName; this.host = props.getProperty(ShenyuClientConstants.HOST); this.port = props.getProperty(ShenyuClientConstants.PORT); //2.Start the thread pool executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build()); //3.Start the registry for registering with `shenyu-admin` publisher.start(shenyuClientRegisterRepository); } /** * Context refresh event, execute method logic */ @Override public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) { //...... }
Rewritten method logic: read Dubbo service ServiceBean, build metadata object and URI object, and register it with shenyu-admin.
@Override public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) { //read ServiceBean Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class); if (serviceBean.isEmpty()) { return; } //The method is guaranteed to be executed only once if (!registered.compareAndSet(false, true)) { return; } //handle metadata for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) { handler(entry.getValue()); } //handle URI serviceBean.values().stream().findFirst().ifPresent(bean -> { publisher.publishEvent(buildURIRegisterDTO(bean)); }); }
handler()
In the handler() method, read all methods from the serviceBean, determine if there is a ShenyuDubboClient annotation on the method, build a metadata object if it exists, and register the method with shenyu-admin through the registry.
Constructs a metadata object where the necessary information for method registration is constructed and subsequently used for selector or rule matching.
The metadata and URI data registered by the client through the registry are processed at the shenyu-admin end, which is responsible for storing to the database and synchronizing to the shenyu gateway. The client-side registration processing logic of the Dubbo plugin is in the ShenyuClientRegisterDubboServiceImpl. The inheritance relationship is as follows.
The metadata MetaDataRegisterDTO object registered by the client through the registry is picked up and dropped in the register() method of shenyu-admin.
Construct contextPath, find if the selector information exists, if it does, return id; if it doesn't, create the default selector information.
@Override public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) { // build contextPath String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName()); // Find if selector information exists by name SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName); if (Objects.isNull(selectorDO)) { // Create a default selector message if it does not exist return registerSelector(contextPath, pluginName, selectorHandler); } return selectorDO.getId(); }
Default selector information
Construct the default selector information and its conditional properties here.
//register selector private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) { //build selector SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId()); selectorDTO.setHandle(selectorHandler); //register default selector return registerDefault(selectorDTO); } //build selector private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) { //build default SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath); selectorDTO.setPluginId(pluginId); //build the conditional properties of the default selector selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath)); return selectorDTO; }
Build default selector
private SelectorDTO buildDefaultSelectorDTO(final String name) { return SelectorDTO.builder() .name(name) // name .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // default type cutom .matchMode(MatchModeEnum.AND.getCode()) //default match mode .enabled(Boolean.TRUE) //enable .loged(Boolean.TRUE) //log .continued(Boolean.TRUE) .sort(1) .build();}
Build default selector conditional properties
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) { SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO(); selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI selectorConditionDTO.setParamName("/"); selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default match selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); return Collections.singletonList(selectorConditionDTO);}
Register default selector
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) { //selector information SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); //selector conditional properties List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); if (StringUtils.isEmpty(selectorDTO.getId())) { // insert selector information into the database selectorMapper.insertSelective(selectorDO); // inserting selector conditional properties to the database selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); } // Publish synchronization events to synchronize selection information and its conditional attributes to the gateway publishEvent(selectorDO, selectorConditionDTOs); return selectorDO.getId();}
Get a valid URI from the URI registered by the client, update the corresponding selector handle property, and send a selector update event to the gateway.
@Override public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) { //check if (CollectionUtils.isEmpty(uriList)) { return ""; } SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); if (Objects.isNull(selectorDO)) { throw new ShenyuException("doRegister Failed to execute,wait to retry."); } // gte valid URI List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList()); // build handle String handler = buildHandle(validUriList, selectorDO); if (handler != null) { selectorDO.setHandle(handler); SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); selectorData.setHandle(handler); // Update the handle property of the selector to the database selectorService.updateSelective(selectorDO); // Send selector update events to the gateway eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData))); } return ShenyuResultMessage.SUCCESS; }
The source code analysis on service registration is completed as well as the analysis flow chart is as follows.
The next step is to analyze how the dubbo plugin initiates calls to the http service based on this information.
The dubbo plugin is the core processing plugin used by the ShenYu gateway to convert http requests into the dubbo protocol and invoke the dubbo service.
Take the case provided by the official website Quick Start with Dubbo as an example, a dubbo service is registered with shenyu-admin through the registry, and then requested through the ShenYu gateway proxy, the request is as follows.
GET http://localhost:9195/dubbo/findById?id=100Accept: application/json
The class inheritance relationship in the Dubbo plugin is as follows.
After passing the ShenYu gateway proxy, the request entry is ShenyuWebHandler, which implements the org.springframework.web.server.WebHandler interface.
public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> { //...... /** * hanlde request */ @Override public Mono<Void> handle(@NonNull final ServerWebExchange exchange) { // execute default plugin chain Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange); if (scheduled) { return execute.subscribeOn(scheduler); } return execute; } private static class DefaultShenyuPluginChain implements ShenyuPluginChain { private int index; private final List<ShenyuPlugin> plugins; DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) { this.plugins = plugins; } /** * execute. */ @Override public Mono<Void> execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { // get plugin ShenyuPlugin plugin = plugins.get(this.index++); boolean skip = plugin.skip(exchange); if (skip) { // next return this.execute(exchange); } // execute return plugin.execute(exchange, this); } return Mono.empty(); }); } }}
GlobalPlugin is a global plugin that constructs contextual information in the execute() method.
public class GlobalPlugin implements ShenyuPlugin { // shenyu context private final ShenyuContextBuilder builder; //...... @Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { // build context information to be passed into the exchange ShenyuContext shenyuContext = builder.build(exchange); exchange.getAttributes().put(Constants.CONTEXT, shenyuContext); return chain.execute(exchange); } //......}
The RpcParamTransformPlugin is responsible for reading the parameters from the http request, saving them in the exchange and passing them to the rpc service.
In the execute() method, the core logic of the plugin is executed: get the request information from exchange and process the parameters according to the form of content passed in by the request.
Set special context information in the doDubboInvoker() method, and then start the dubbo generalization call.
public class ApacheDubboPlugin extends AbstractDubboPlugin { @Override protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule, final MetaData metaData, final String param) { //set the current selector and rule information, and request address for dubbo graying support RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId()); RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId()); RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress()); //dubbo generic invoker final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange); //execute next plugin in chain return result.then(chain.execute(exchange)); }}
Gets the generalization service GenericService object.
Constructs the request parameter pair object.
Initiates an asynchronous generalization invocation.
public class ApacheDubboProxyService { //...... /** * Generic invoker object. */ public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException { //1.Get the ReferenceConfig object ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath()); if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) { //Failure of the current cache information ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath()); //Reinitialization with metadata reference = ApacheDubboConfigCache.getInstance().initRef(metaData); } //2.Get the GenericService object of the generalization service GenericService genericService = reference.get(); //3.Constructing the request parameter pair object Pair<String[], Object[]> pair; if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) { pair = new ImmutablePair<>(new String[]{}, new Object[]{}); } else { pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes()); } //4.Initiating asynchronous generalization calls return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> { //handle result if (Objects.isNull(ret)) { ret = Constants.DUBBO_RPC_RESULT_EMPTY; } exchange.getAttributes().put(Constants.RPC_RESULT, ret); exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName()); return ret; })).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常 } //Generalized calls, asynchronous operations private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException { genericService.$invoke(method, parameterTypes, args); Object resultFromFuture = RpcContext.getContext().getFuture(); return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture); }}
Calling the dubbo service at the gateway can be achieved by generalizing the call.
The ReferenceConfig object is the key object to support generalization calls , and its initialization operation is done during data synchronization. There are two parts of data involved here, one is the synchronized plugin handler information and the other is the synchronized plugin metadata information.
When the plugin data is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The initialization operation is performed in handlerPlugin().
public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler { //...... //Initializing the configuration cache protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig); @Override public void handlerPlugin(final PluginData pluginData) { if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) { //Data deserialization DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class); DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class); if (Objects.isNull(dubboRegisterConfig)) { return; } if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) { // Perform initialization operations this.initConfigCache(dubboRegisterConfig); } Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig); } } //......}
When the metadata is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The metadata update operation is performed in the onSubscribe() method.
public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber { //local memory cache private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap(); //update metaData public void onSubscribe(final MetaData metaData) { // dubbo if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) { //Whether the corresponding metadata exists MetaData exist = META_DATA.get(metaData.getPath()); if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) { // initRef ApacheDubboConfigCache.getInstance().initRef(metaData); } else { // The corresponding metadata has undergone an update operation if (!Objects.equals(metaData.getServiceName(), exist.getServiceName()) || !Objects.equals(metaData.getRpcExt(), exist.getRpcExt()) || !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes()) || !Objects.equals(metaData.getMethodName(), exist.getMethodName())) { //Build ReferenceConfig again based on the latest metadata ApacheDubboConfigCache.getInstance().build(metaData); } } //local memory cache META_DATA.put(metaData.getPath(), metaData); } } //dalete public void unSubscribe(final MetaData metaData) { if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) { //使ReferenceConfig失效 ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath()); META_DATA.remove(metaData.getPath()); } }}
The source code analysis in this article starts from Dubbo service registration to Dubbo plug-in service calls. The Dubbo plugin is mainly used to handle the conversion of http requests to the dubbo protocol, and the main logic is implemented through generalized calls.