This article is based on the source code analysis of version 'shenyu-2.6.1'
Content
Shenyu provides a mechanism to customize its own plugins or modify existing plugins, which is implemented internally through the configuration of extPlugin. It needs to meet the following two points:
Implement interface ShenyuPlugin or PluginDataHandler.
After packaging the implemented package, place it in the corresponding path of 'shenyu. extPlugin. path'.
The class that truly implements this logic is' ShenyuLoaderService '. Now let's take a look at how this class handles it.
public ShenyuLoaderService(final ShenyuWebHandler webHandler, final CommonPluginDataSubscriber subscriber, final ShenyuConfig shenyuConfig) { // Information subscription for plugin information this.subscriber = subscriber; // The WebHandler encapsulated by Shenyu contains all the plugin logic this.webHandler = webHandler; // configuration information this.shenyuConfig = shenyuConfig; // The configuration information of the extension plugin, such as path, whether it is enabled, how many threads are enabled to process, and the frequency of loading checks ExtPlugin config = shenyuConfig.getExtPlugin(); // If enabled, create a scheduled task to check and load if (config.getEnabled()) { // Create a scheduled task with a specified thread name ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(config.getThreads(), ShenyuThreadFactory.create("plugin-ext-loader", true)); // Create a task to be executed at a fixed frequency, with a default time of 30 seconds and execution every 300 seconds executor.scheduleAtFixedRate(() -> loadExtOrUploadPlugins(null), config.getScheduleDelay(), config.getScheduleTime(), TimeUnit.SECONDS); } }
This class has the following properties:
WebHandler: This class is the entry point for shenyu to process requests, referencing all plugin data. After the extension plugin is loaded, it needs to be updated.
Subscriber: This class is the entry point for the subscription of plugins, referencing the subscription processing classes of all plugins. After the extension configuration is loaded, synchronous updates are also required.
Executor: A scheduled task will be created inside' ShenyuLoaderService 'to periodically scan and load jar packages under the specified path, facilitating the loading of extended plugins and achieving dynamic discovery
By default, it will scan every 300 seconds after 30 seconds of startup.
Meanwhile, the decision to enable extension plugin functionality can be made through the configuration of shenyu. extPlugin. enabled.
The above configurations can be adjusted in the configuration file:
shenyu:extPlugin:path:# Storage directory for extension pluginsenabled:true# Is the extension function enabledthreads:1# Number of threads loaded by scanningscheduleTime:300# The frequency of task executionscheduleDelay:30# How long after the task starts to execute
Next, let's take a look at the loading logic:
public void loadExtOrUploadPlugins(final PluginData uploadedJarResource) { try { List<ShenyuLoaderResult> plugins = new ArrayList<>(); // Obtain the holding object of ShenyuPluginClassloader ShenyuPluginClassloaderHolder singleton = ShenyuPluginClassloaderHolder.getSingleton(); if (Objects.isNull(uploadedJarResource)) { // If the parameter is empty, load all jar packages from the extended directory // PluginJar: Data containing the ShenyuPlugin interface and PluginDataHandler interface List<PluginJarParser.PluginJar> uploadPluginJars = ShenyuExtPathPluginJarLoader.loadExtendPlugins(shenyuConfig.getExtPlugin().getPath()); // Traverse all pending plugins for (PluginJarParser.PluginJar extPath : uploadPluginJars) { LOG.info("shenyu extPlugin find new {} to load", extPath.getAbsolutePath()); // Use the loader of the extension plugin to load the specified plugin, facilitating subsequent loading and unloading ShenyuPluginClassLoader extPathClassLoader = singleton.createPluginClassLoader(extPath); // Using ShenyuPluginClassLoader for loading // The main logic is to determine whether to implement ShenyuPlugin interface, PluginDataHandler interface, or identify annotations such as @ Component \ @ Service. If so, register as SpringBean // Construct ShenyuLoaderResult object plugins.addAll(extPathClassLoader.loadUploadedJarPlugins()); } } else { // Load the specified jar, with the same logic as loading all PluginJarParser.PluginJar pluginJar = PluginJarParser.parseJar(Base64.getDecoder().decode(uploadedJarResource.getPluginJar())); LOG.info("shenyu upload plugin jar find new {} to load", pluginJar.getJarKey()); ShenyuPluginClassLoader uploadPluginClassLoader = singleton.createPluginClassLoader(pluginJar); plugins.addAll(uploadPluginClassLoader.loadUploadedJarPlugins()); } // Add the extended plugins to the plugin list of ShenyuWebHandler, and subsequent requests will go through the added plugin content loaderPlugins(plugins); } catch (Exception e) { LOG.error("shenyu plugins load has error ", e); } }
The logic processed by this method:
Check if the parameter uploadedJarResource has a value. If not, all will be loaded. Otherwise, load the specified resource jar package for processing.
Retrieve the specified jar package from shenyu. extPlugin. path and encapsulate it as a PluginJar object, which contains the following information about the jar package:
version: version information
groupId: The groupId of the package
artifactId: The artifactId of the package
absolutePath: Absolute path
clazzMap: Bytecode corresponding to class
resourceMap: Bytecode of jar package
Create a corresponding ClassLoader using ShenyuPluginClassloaderHolder, with the corresponding class being 'ShenyuPluginClassLoader', and load the corresponding class accordingly.
Call ShenyuPluginClassLoader. loadUploadedJarPlugins to load the corresponding class and register it as a Spring Bean, which can be managed using the Spring container
Call the loaderPlugins method to update the extended plugin to'webHandler and subscriber.
For the content in the provided jar package, the loader will only handle classes of the specified interface type, and the implementation logic is in the ShenyuPluginClassLoader.loadUploadedJarPlugins() method.
public List<ShenyuLoaderResult> loadUploadedJarPlugins() { List<ShenyuLoaderResult> results = new ArrayList<>(); // All class mapping relationships Set<String> names = pluginJar.getClazzMap().keySet(); // Traverse all classes names.forEach(className -> { Object instance; try { // Try creating objects and, if possible, add them to the Spring container instance = getOrCreateSpringBean(className); if (Objects.nonNull(instance)) { // Building the ShenyuLoaderResult object results.add(buildResult(instance)); LOG.info("The class successfully loaded into a upload-Jar-plugin {} is registered as a spring bean", className); } } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) { LOG.warn("Registering upload-Jar-plugins succeeds spring bean fails:{}", className, e); } }); return results; }
This method is responsible for building all eligible objects and encapsulating them into a ShenyuLoaderResult object. This object is encapsulated for the created object and will be processed in the method buildResult().
private ShenyuLoaderResult buildResult(final Object instance) { ShenyuLoaderResult result = new ShenyuLoaderResult(); // Does the created object implement ShenyuPlugin if (instance instanceof ShenyuPlugin) { result.setShenyuPlugin((ShenyuPlugin) instance); // Does the created object implement PluginDataHandler } else if (instance instanceof PluginDataHandler) { result.setPluginDataHandler((PluginDataHandler) instance); } return result; }
Simultaneously enter the method getOrCreatSpringBean() for further analysis:
private <T> T getOrCreateSpringBean(final String className) throws ClassNotFoundException, IllegalAccessException, InstantiationException { // Confirm if it has been registered. If so, do not process it and return directly if (SpringBeanUtils.getInstance().existBean(className)) { return SpringBeanUtils.getInstance().getBeanByClassName(className); } lock.lock(); try { // Double check, T inst = SpringBeanUtils.getInstance().getBeanByClassName(className); if (Objects.isNull(inst)) { // Using ShenyuPluginClassLoader to load classes Class<?> clazz = Class.forName(className, false, this); //Exclude ShenyuPlugin subclass and PluginDataHandler subclass // without adding @Component @Service annotation // Confirm if it is a subclass of ShenyuPlugin or PluginDataHandler boolean next = ShenyuPlugin.class.isAssignableFrom(clazz) || PluginDataHandler.class.isAssignableFrom(clazz); if (!next) { // If not, confirm if @ Component and @ Service annotations are identified Annotation[] annotations = clazz.getAnnotations(); next = Arrays.stream(annotations).anyMatch(e -> e.annotationType().equals(Component.class) || e.annotationType().equals(Service.class)); } if (next) { // If the above content is met, register the bean GenericBeanDefinition beanDefinition = new GenericBeanDefinition(); beanDefinition.setBeanClassName(className); beanDefinition.setAutowireCandidate(true); beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); // Registering beans String beanName = SpringBeanUtils.getInstance().registerBean(beanDefinition, this); // create object inst = SpringBeanUtils.getInstance().getBeanByClassName(beanName); } } return inst; } finally { lock.unlock(); } }
The logic is roughly as follows:
Check if the interface ShenyuPlugin or PluginDataHandler has been implemented. If not, check if @Component or @Service has been identified`.
If the condition of 1 is met, register the object in the Spring container and return the created object.
After the plugin registration is successful, the plugin is only instantiated, but it will not take effect yet because it has not been added to Shenyu's plugin chain. The synchronization logic is implemented by the loaderPlugins() method.
private void loaderPlugins(final List<ShenyuLoaderResult> results) { if (CollectionUtils.isEmpty(results)) { return; } // Get all objects that implement the interface ShenyuPlugin List<ShenyuPlugin> shenyuExtendPlugins = results.stream().map(ShenyuLoaderResult::getShenyuPlugin).filter(Objects::nonNull).collect(Collectors.toList()); // Synchronize updating plugins in webHandler webHandler.putExtPlugins(shenyuExtendPlugins); // Get all objects that implement the interface PluginDataHandler List<PluginDataHandler> handlers = results.stream().map(ShenyuLoaderResult::getPluginDataHandler).filter(Objects::nonNull).collect(Collectors.toList()); // Synchronize updating handlers in subscriber subscriber.putExtendPluginDataHandler(handlers); }
The logic of this method processes two data points:
Synchronize the data that implements the ShenyuPlugin interface to the plugins list of webHandler.
public void putExtPlugins(final List<ShenyuPlugin> extPlugins) { if (CollectionUtils.isEmpty(extPlugins)) { return; } // Filter out newly added plugins final List<ShenyuPlugin> shenyuAddPlugins = extPlugins.stream() .filter(e -> plugins.stream().noneMatch(plugin -> plugin.named().equals(e.named()))) .collect(Collectors.toList()); // Filter out updated plugins and determine if they have the same name as the old one, then it is an update final List<ShenyuPlugin> shenyuUpdatePlugins = extPlugins.stream() .filter(e -> plugins.stream().anyMatch(plugin -> plugin.named().equals(e.named()))) .collect(Collectors.toList()); // If there is no data, skip if (CollectionUtils.isEmpty(shenyuAddPlugins) && CollectionUtils.isEmpty(shenyuUpdatePlugins)) { return; } // Copy old data // copy new list List<ShenyuPlugin> newPluginList = new ArrayList<>(plugins); // Add new plugin data // Add extend plugin from pluginData or shenyu ext-lib this.sourcePlugins.addAll(shenyuAddPlugins); // Add new data if (CollectionUtils.isNotEmpty(shenyuAddPlugins)) { shenyuAddPlugins.forEach(plugin -> LOG.info("shenyu auto add extends plugins:{}", plugin.named())); newPluginList.addAll(shenyuAddPlugins); } // Modify updated data if (CollectionUtils.isNotEmpty(shenyuUpdatePlugins)) { shenyuUpdatePlugins.forEach(plugin -> LOG.info("shenyu auto update extends plugins:{}", plugin.named())); for (ShenyuPlugin updatePlugin : shenyuUpdatePlugins) { for (int i = 0; i < newPluginList.size(); i++) { if (newPluginList.get(i).named().equals(updatePlugin.named())) { newPluginList.set(i, updatePlugin); } } for (int i = 0; i < this.sourcePlugins.size(); i++) { if (this.sourcePlugins.get(i).named().equals(updatePlugin.named())) { this.sourcePlugins.set(i, updatePlugin); } } } } // REORDER plugins = sortPlugins(newPluginList); }
Synchronize the data that implements the PluginDataHandler interface to the handlers list of the subscriber.
public void putExtendPluginDataHandler(final List<PluginDataHandler> handlers) { if (CollectionUtils.isEmpty(handlers)) { return; } // Traverse all data for (PluginDataHandler handler : handlers) { String pluginNamed = handler.pluginNamed(); // Update existing PluginDataHandler list MapUtils.computeIfAbsent(handlerMap, pluginNamed, name -> { LOG.info("shenyu auto add extends plugin data handler name is :{}", pluginNamed); return handler; }); } }
At this point, the analysis of the loading process of the extension plugin is completed.
Integration testing is also called E2E (End To End) testing in some projects. It is mainly used to test whether each module can meet expectations after being assembled into a system.
Apache ShenYu puts integration tests in continuous integration, using GitHub Actions to trigger each time a Pull Request or Merge is submitted to the main branch. This can greatly reduce the maintenance cost of the project and improve the stability of Apache ShenYu.
In Apache ShenYu, the main steps of integration testing are embodied in the script of the GitHub Action workflow, as shown below, which is located at ~/.github/workflows directory.
Since we specified pull_request and push.branch: master in on, this workflow will be triggered when we submit pull_request or merge branch to master (push).
For more usage of GitHub Action, you can refer to the documentation of GitHub Action, which will not be introduced in detail here.
In the above command, -P is followed by release,docker, which means that the relevant profile configuration in the pom file will be activated.
The two profiles, release and docker, currently only exist in several submodules under shenyu-dist. The following will take the shenyu-dist-admin module as an example to introduce profiles as release and docker The specific content of the configuration. Also, integration tests only use the shenyu-admin image built in this step.
When -P is followed by release, the above maven-assembly-plugin plugin is activated. In executions, the execution timing of the plugin is bound to the maven life cycle package, which means that it will be triggered when we execute mvn install.
The binary.xml we wrote is specified in the configuration, and the maven-assembly-plugin plugin will copy the required files and package them according to this file. You can click the link to view the file: [shenyu-dist/shenyu-admin-dist/src/main/assembly/binary.xml](https://github.com/apache/shenyu/blob/master/shenyu- dist/shenyu-admin-dist/src/main/assembly/binary.xml)
According to this file, the plugin will "copy" the packaged jar packages, configuration files, startup scripts, etc. under other modules, and finally make them into a compressed package in tar.gz format.
Similar to the release above, here is the activation of the dockerfile-maven-plugin plugin. When mvn install -Pdocker, the plugin will use the dockerfile we wrote to build the docker image.
It should be noted that the dockerfile-maven-plugin currently has limited support for aarch64 architecture devices, and the following error will occur when running the plugin on aarch64 architecture machines. And when I wrote this article, it has not been maintained for a long time, which means that the problem of aarch64 architecture devices using this plugin will not be solved in the short term.
[ERROR] Failed to execute goal com.spotify:dockerfile-maven-plugin:1.4.6:build (tag-latest) on project shenyu-admin-dist: Could not build image: java.util.concurrent.ExecutionException: com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException: java.lang.UnsatisfiedLinkError: could not load FFI provider jnr.ffi.provider.jffi.Provider: ExceptionInInitializerError: Can't overwrite cause with java.lang.UnsatisfiedLinkError: java.lang.UnsatisfiedLinkError: /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: dlopen(/private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib, 1): no suitable image found. Did find:[ERROR] /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: no matching architecture in universal wrapper[ERROR] /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: no matching architecture in universal wrapper...
Here is a temporary solution:
Open a new shell, enter the following command, and use socat to route the unix socket to the tcp port
Considering the need for release, the current pom file in the project root directory does not contain the example submodule, so the examples module is additionally built in the above step.
Similar to the above, this line of command will also use the maven plugin to build an image for our subsequent docker orchestration.
In order to subdivide the integration tests of different functions of Apache ShenYu, we will build a gateway customized by the integration test module in this step. The so-called "customization" is to introduce the minimum required dependencies in the pom file, and then replace the default shenyu-bootstrap. Similar to the above two steps, this step will also build the docker image.
It is worth noting that the way of packaging and building here is slightly different from that of the shenyu-dist module, which you can find by comparing the pom file.
For example, the docker-compose.yml under the shenyu-integrated-test-http module starts zookeeper, redis, example, admin, gateway and other services in sequence. Among them, the mirrors of example, admin, and gateway are built by us before.
Among them, docker-compose uses depends_on to determine the topological relationship between services, and most services have corresponding health checks, and the next service will not be started until the health check passes.
Run the health check and wait for docker-compose to start#
-name: Wait for docker compose start up completelyif: env.SKIP_CI != 'true'run: bash ./shenyu-integrated-test/${{ matrix.case }}/script/healthcheck.sh
In this step, the host will run the healthcheck.sh script, and then use the curl command to access the health status interface /actuator/health of each service list (in the services.list file), until the service status is normal. will continue.
-name: Check test resultif: env.SKIP_CI != 'true'run:| docker-compose -f ./shenyu-integrated-test/${{ matrix.case }}/docker-compose.yml logs --tail="all" if [[ ${{steps.test.outcome}} == "failure" ]]; then echo "Test Failed" exit 1 else echo "Test Successful" exit 0 fi
When there is an error in the workflow, the log of docker compose can help us to better troubleshoot the problem, so in this step, we will print the log of docker compose.
First, look at the ContextPathPlugin#doExecute method, which is the core of this plugin.
protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) { ... // 1. get the contextMappingHandle from the JVM cache ContextMappingHandle contextMappingHandle = ContextPathPluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule)); ... // 2. set shenyu context according to contextMappingHandle buildContextPath(shenyuContext, contextMappingHandle); return chain.execute(exchange);}
Get the contextMappingHandle from the JVM cache
The contextMappingHandle here is an instance of the ContextMappingHandle class, which has two member variables: contextPath and addPrefix
These two variables have appeared in the Rules form in the Admin before, and they are updated when the data is synchronized.
Set shenyu context according to contextMappingHandle
Below is the source code of the ContextPathPlugin#buildContextPath method
private void buildContextPath(final ShenyuContext context, final ContextMappingHandle handle) { String realURI = ""; // 1. set the context path of shenyu, remove the prefix of the real URI according to the length of the contextPath if (StringUtils.isNoneBlank(handle.getContextPath())) { context.setContextPath(handle.getContextPath()); context.setModule(handle.getContextPath()); realURI = context.getPath().substring(handle.getContextPath().length()); } // add prefix if (StringUtils.isNoneBlank(handle.getAddPrefix())) { if (StringUtils.isNotBlank(realURI)) { realURI = handle.getAddPrefix() + realURI; } else { realURI = handle.getAddPrefix() + context.getPath(); } } context.setRealUrl(realURI);}
Set the context path of shenyu, remove the prefix of the real URI according to the length of the contextPath
You may be wondering whether there is a problem with the so-called "according to the length of the contextPath" here?
In fact, such a judgment is not a problem, because the request will be processed by the plugin only after it is matched by the Selector and Rules. Therefore, under the premise of setting up Selector and Rules, it is completely possible to meet the needs of converting a specific contextPath.
Then, the ContextPathPlugin class has a more important method skip, part of the code is shown below. We can find: If it is a call to the RPC service, the context_path plugin will be skipped directly.
Finally, the context-path plugin has another class ContextPathPluginDataHandler. The function of this class is to subscribe to the data of the plug-in. When the plugin configuration is modified, deleted, or added, the data is modified, deleted, or added to the JVM cache.
In the Shenyu gateway, when you start this plugin, Shenyu becomes a fully-featured McpServer.
You can easily register a service as a tool within the Shenyu gateway by simple configuration and use the extended functions the gateway offers.
This article is based on version shenyu-2.7.0.2. Here, I will track the Shenyu Mcp plugin chain and analyze the source code of its SSE communication.
The Shenyu gateway's Mcp plugin is built on top of the spring-ai-mcp extension. To better understand how the Mcp plugin works, I’ll briefly introduce how some official Mcp Java classes collaborate within its JDK.
I want to start by introducing three key official Mcp Java classes:
McpServer This class manages resources like tools, Resource, promote, etc.
TransportProvider Provides corresponding communication methods based on client-server communication protocols.
Session Handles request data, response data, and notifications, offers some basic methods and corresponding handlers, and executes tool queries and calls here.
In Shenyu Admin, after filling in endpoint and tool information for the McpServer plugin, this info is automatically registered into Shenyu bootstrap.
You can refer to the official websocket data sync source code for details.
Shenyu bootstrap receives the data synced from admin in the handler() method of McpServerPluginDataHandler.
handlerSelector() receives URL data and creates McpServer.
handlerRule() receives tool info and registers tools.
These two methods together form the service registration part of the Shenyu Mcp plugin. Below, I will analyze these two methods in detail.
Let’s analyze the handlerSelector() method, which handles McpServer registration.
What handlerSelector() does:
public class McpServerPluginDataHandler implements PluginDataHandler { @Override public void handlerSelector(final SelectorData selectorData) { // Get URI String uri = selectorData.getConditionList().stream() .filter(condition -> Constants.URI.equals(condition.getParamType())) .map(ConditionData::getParamValue) .findFirst() .orElse(null); // Build McpServer ShenyuMcpServer shenyuMcpServer = GsonUtils.getInstance().fromJson(Objects.isNull(selectorData.getHandle()) ? DEFAULT_MESSAGE_ENDPOINT : selectorData.getHandle(), ShenyuMcpServer.class); shenyuMcpServer.setPath(path); // Cache shenyuMcpServer CACHED_SERVER.get().cachedHandle( selectorData.getId(), shenyuMcpServer); String messageEndpoint = shenyuMcpServer.getMessageEndpoint(); // Try to get or register transportProvider shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint); }}
ShenyuMcpServerManager is the management center of McpServer in Shenyu. It not only stores McpAsyncServer, CompositeTransportProvider, etc., but also contains methods to register Transport and McpServer.
The getOrCreateMcpServerTransport() method works as follows:
@Componentpublic class ShenyuMcpServerManager { public ShenyuSseServerTransportProvider getOrCreateMcpServerTransport(final String uri, final String messageEndPoint) { // Remove /streamablehttp and /message suffixes String normalizedPath = processPath(uri); return getOrCreateTransport(normalizedPath, SSE_PROTOCOL, () -> createSseTransport(normalizedPath, messageEndPoint)); } private <T> T getOrCreateTransport(final String normalizedPath, final String protocol, final java.util.function.Supplier<T> transportFactory) { // Get composite Transport instance CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(normalizedPath); T transport = switch (protocol) { case SSE_PROTOCOL -> (T) compositeTransport.getSseTransport(); case STREAMABLE_HTTP_PROTOCOL -> (T) compositeTransport.getStreamableHttpTransport(); default -> null; }; // If instance is missing in cache, create a new one if (Objects.isNull(transport)) { // Call createSseTransport() to create and store a new transport transport = transportFactory.get(); // Create McpAsyncServer and register the transport addTransportToSharedServer(normalizedPath, protocol, transport); } return transport; }}
This method is called within getOrCreateMcpServerTransport() and is used to create and save McpServer
This method creates a new McpServer, stores it in sharedServerMap, and saves the TransportProvider obtained above into compositeTransportMap.
@Componentpublic class ShenyuMcpServerManager { private void addTransportToSharedServer(final String normalizedPath, final String protocol, final Object transportProvider) { // Get or create and register McpServer getOrCreateSharedServer(normalizedPath); // Save the new transport protocol into compositeTransportMap compositeTransport.addTransport(protocol, transportProvider); } private McpAsyncServer getOrCreateSharedServer(final String normalizedPath) { return sharedServerMap.computeIfAbsent(normalizedPath, path -> { // Get transport protocols CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(path); // Select server capabilities var capabilities = McpSchema.ServerCapabilities.builder() .tools(true) .logging() .build(); // Create and store McpServer McpAsyncServer server = McpServer .async(compositeTransport) .serverInfo("MCP Shenyu Server (Multi-Protocol)", "1.0.0") .capabilities(capabilities) .tools(Lists.newArrayList()) .build(); return server; }); }}
Captures the tool configuration info users fill in for the Tool, all used to build the tool
Deserializes to create ShenyuMcpServerTool and obtains tool info
Note: ShenyuMcpServerTool is also a Shenyu-side object for storing tool info, unrelated by inheritance to McpServerTool
Calls addTool() method to create the tool using this info and registers the tool to the matching McpServer based on SelectorId
public class McpServerPluginDataHandler implements PluginDataHandler { @Override public void handlerRule(final RuleData ruleData) { Optional.ofNullable(ruleData.getHandle()).ifPresent(s -> { // Deserialize a new ShenyuMcpServerTool ShenyuMcpServerTool mcpServerTool = GsonUtils.getInstance().fromJson(s, ShenyuMcpServerTool.class); // Cache mcpServerTool CACHED_TOOL.get().cachedHandle(CacheKeyUtils.INST.getKey(ruleData), mcpServerTool); // Build MCP schema List<McpServerToolParameter> parameters = mcpServerTool.getParameters(); String inputSchema = JsonSchemaUtil.createParameterSchema(parameters); ShenyuMcpServer server = CACHED_SERVER.get().obtainHandle(ruleData.getSelectorId()); if (Objects.nonNull(server)) { // Save tool info into Manager's sharedServerMap shenyuMcpServerManager.addTool(server.getPath(), StringUtils.isBlank(mcpServerTool.getName()) ? ruleData.getName() : mcpServerTool.getName(), mcpServerTool.getDescription(), mcpServerTool.getRequestConfig(), inputSchema); } }); }}
addTool() method
This method is called by handlerRule() to add a new tool
This method performs:
Converts the previous tool info into a shenyuToolDefinition object
Creates a ShenyuToolCallback object using the converted shenyuToolDefinition
ShenyuToolCallback overrides the call() method of ToolCallBack and registers this overridden method to AsyncToolSpecification, so calling the tool's call() will actually invoke this overridden method
Converts ShenyuToolCallback to AsyncToolSpecification and registers it to the corresponding McpServer
public class McpServerPluginDataHandler implements PluginDataHandler { public void addTool(final String serverPath, final String name, final String description, final String requestTemplate, final String inputSchema) { String normalizedPath = normalizeServerPath(serverPath); // Build Definition object ToolDefinition shenyuToolDefinition = ShenyuToolDefinition.builder() .name(name) .description(description) .requestConfig(requestTemplate) .inputSchema(inputSchema) .build(); ShenyuToolCallback shenyuToolCallback = new ShenyuToolCallback(shenyuToolDefinition); // Get previously registered McpServer and register the Tool McpAsyncServer sharedServer = sharedServerMap.get(normalizedPath); for (AsyncToolSpecification asyncToolSpecification : McpToolUtils.toAsyncToolSpecifications(shenyuToolCallback)) { sharedServer.addTool(asyncToolSpecification).block(); } }}
With this, service registration analysis is complete.
Clients will send two types of messages with /sse and /message suffixes. These messages are captured by the Shenyu McpServer plugin, which handles them differently. When receiving /sse messages, the plugin creates and saves a session object, then returns a session id for /message usage. When receiving /message messages, the plugin executes methods based on the method info carried by the /message message, such as fetching work lists, tool invocation, and resource lists.
doExecute() method works as follows:
Matches the path and checks if the Mcp plugin registered it
Calls routeByProtocol() to choose the appropriate handling plan based on the request protocol
This article focuses on the SSE request mode, so we enter the handleSseRequest() method
public class McpServerPlugin extends AbstractShenyuPlugin { @Override protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) { final String uri = exchange.getRequest().getURI().getRawPath(); // Check if Mcp plugin registered this route; if not, continue chain without handling if (!shenyuMcpServerManager.canRoute(uri)) { return chain.execute(exchange); } final ServerRequest request = ServerRequest.create(exchange, messageReaders); // Choose handling method based on URI protocol return routeByProtocol(exchange, chain, request, selector, uri); } private Mono<Void> routeByProtocol(final ServerWebExchange exchange, final ShenyuPluginChain chain, final ServerRequest request, final SelectorData selector, final String uri) { if (isStreamableHttpProtocol(uri)) { return handleStreamableHttpRequest(exchange, chain, request, uri); } else if (isSseProtocol(uri)) { // Handle SSE requests return handleSseRequest(exchange, chain, request, selector, uri); } }}
handleSseRequest() method
Called by routeByProtocol() to determine if the client wants to create a session or call a tool based on URI suffix
public class McpServerPlugin extends AbstractShenyuPlugin { private Mono<Void> handleSseRequest(final ServerWebExchange exchange, final ShenyuPluginChain chain, final ServerRequest request, final SelectorData selector, final String uri) { ShenyuMcpServer server = McpServerPluginDataHandler.CACHED_SERVER.get().obtainHandle(selector.getId()); String messageEndpoint = server.getMessageEndpoint(); // Get the transport provider ShenyuSseServerTransportProvider transportProvider = shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint); // Determine if the request is an SSE connection or a message call if (uri.endsWith(messageEndpoint)) { setupSessionContext(exchange, chain); return handleMessageEndpoint(exchange, transportProvider, request); } else { return handleSseEndpoint(exchange, transportProvider, request); } }}
If the client sends a request ending with /message, the current ShenyuPluginChain info is saved into the session, and handleMessageEndpoint() is called.
Subsequent tool calls continue executing this plugin chain, so plugins after the Mcp plugin will affect tool requests.
handleMessageEndpoint() method, calls ShenyuSseServerTransportProvider.handleMessageEndpoint() to process
if (uri.endsWith(messageEndpoint)) { setupSessionContext(exchange, chain); return handleMessageEndpoint(exchange, transportProvider, request);}
public class McpServerPlugin extends AbstractShenyuPlugin { private Mono<Void> handleMessageEndpoint(final ServerWebExchange exchange, final ShenyuSseServerTransportProvider transportProvider, final ServerRequest request) { // Handle message requests return transportProvider.handleMessageEndpoint(request) .flatMap(result -> { return exchange.getResponse() .writeWith(Mono.just(exchange.getResponse().bufferFactory().wrap(responseBody.getBytes()))); }); }}
handleMessageEndpoint() method
Called by McpServerPlugin.handleMessageEndpoint(), hands over the request to the session for processing
The session's handler() method performs different actions depending on the message.
For example, when the method in the message is "tools/call", the tool invocation handler executes the call() method to call the tool.
The related source is omitted here.
public class ShenyuSseServerTransportProvider implements McpServerTransportProvider { public Mono<MessageHandlingResult> handleMessageEndpoint(final ServerRequest request) { // Get session String sessionId = request.queryParam("sessionId").get(); McpServerSession session = sessions.get(sessionId); return request.bodyToMono(String.class) .flatMap(body -> { McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body); return session.handle(message); }); }}
At this point, the Shenyu Mcp Plugin service invocation source code analysis is complete.
If the client sends a message to invoke a tool, the session will use the tool invocation handler to execute the tool’s call() method.
From service registration, we know the tool call actually runs the call() method of ShenyuToolCallback.
Therefore, the tool invocation executes the following:
call() method mainly does:
Gets session id
Gets requestTemplate, the extra configuration provided by Shenyu
Gets the previously stored Shenyu plugin chain and passes the tool call info to the chain for continued execution
Asynchronously waits for the tool response
After the plugin chain completes, the tool call request is actually sent to the service hosting the tool.
public class ShenyuToolCallback implements ToolCallback { @NonNull @Override public String call(@NonNull final String input, final ToolContext toolContext) { // Extract sessionId from MCP request final McpSyncServerExchange mcpExchange = extractMcpExchange(toolContext); final String sessionId = extractSessionId(mcpExchange); // Extract requestTemplate info final String configStr = extractRequestConfig(shenyuTool); // Get the previously stored plugin chain by sessionId final ServerWebExchange originExchange = getOriginExchange(sessionId); final ShenyuPluginChain chain = getPluginChain(originExchange); // Execute the tool call return executeToolCall(originExchange, chain, sessionId, configStr, input); } private String executeToolCall(final ServerWebExchange originExchange, final ShenyuPluginChain chain, final String sessionId, final String configStr, final String input) { final CompletableFuture<String> responseFuture = new CompletableFuture<>(); final ServerWebExchange decoratedExchange = buildDecoratedExchange( originExchange, responseFuture, sessionId, configStr, input); // Execute plugin chain, call the actual tool chain.execute(decoratedExchange) .subscribe(); // Wait for response final String result = responseFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); return result; }}
This concludes the Shenyu MCP Plugin tool invocation analysis.
This article analyzed the source code from Mcp service registration, through Mcp plugin service invocation, to tool invocation.
The McpServer plugin makes Shenyu a powerful and centralized McpServer.
The ShenYu gateway uses the divide plugin to handle http requests. You can see the official documentation Quick start with Http to learn how to use this plugin.
This article is based on shenyu-2.4.3 version for source code analysis, please refer to Http Proxy for the introduction of the official website.
Annotation scanning is done through SpringMvcClientBeanPostProcessor, which implements the BeanPostProcessor interface and is a post-processor provided by Spring.
During constructor instantiation.
Read the property configuration
Add annotations, read path information
Start the registry and register with shenyu-admin
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor { //... /** * Constructor instantiation */ public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) { // 1. read Properties Properties props = clientConfig.getProps(); this.appName = props.getProperty(ShenyuClientConstants.APP_NAME); this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH, ""); if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) { String errorMsg = "http register param must config the appName or contextPath"; LOG.error(errorMsg); throw new ShenyuClientIllegalArgumentException(errorMsg); } this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString())); // 2. add annotation mappingAnnotation.add(ShenyuSpringMvcClient.class); mappingAnnotation.add(PostMapping.class); mappingAnnotation.add(GetMapping.class); mappingAnnotation.add(DeleteMapping.class); mappingAnnotation.add(PutMapping.class); mappingAnnotation.add(RequestMapping.class); // 3. start register cneter publisher.start(shenyuClientRegisterRepository); } @Override public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException { // override post process return bean; }
Rewrite post-processor logic: read annotation information, construct metadata objects and URI objects, and register them with shenyu-admin.
@Override public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException { // 1. If the all service is registered or is not a Controller class, it is not handled if (Boolean.TRUE.equals(isFull) || !hasAnnotation(bean.getClass(), Controller.class)) { return bean; } // 2. Read the annotations on the class ShenyuSpringMvcClient final ShenyuSpringMvcClient beanShenyuClient = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class); // 2.1 build superPath final String superPath = buildApiSuperPath(bean.getClass()); // 2.2 whether to register the entire class method if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) { // build the metadata object and register it with shenyu-admin publisher.publishEvent(buildMetaDataDTO(beanShenyuClient, pathJoin(contextPath, superPath))); return bean; } // 3. read all methods final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass()); for (Method method : methods) { // 3.1 read the annotations on the method ShenyuSpringMvcClient ShenyuSpringMvcClient methodShenyuClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class); // If there is no annotation on the method, use the annotation on the class methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient; if (Objects.nonNull(methodShenyuClient)) { // 3.2 Build path information, build metadata objects, register with shenyu-admin publisher.publishEvent(buildMetaDataDTO(methodShenyuClient, buildApiPath(method, superPath))); } } return bean; }
If you are registering the whole service or not Controller class, do not handle it
read the annotation on the class ShenyuSpringMvcClient, if the whole class is registered, build the metadata object here and register it with shenyu-admin.
Annotation on the handler method ShenyuSpringMvcClient, build path information for the specific method, build the metadata object and then register it with shenyu-admin
There are two methods here that take path and need special instructions.
buildApiSuperPath()
Construct SuperPath: first take the path property from the annotation ShenyuSpringMvcClient on the class, if not, take the path information from the RequestMapping annotation on the current class.
private String buildApiSuperPath(@NonNull final Class<?> method) { // First take the path property from the annotation ShenyuSpringMvcClient on the class ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class); if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) { return shenyuSpringMvcClient.path(); } // Take the path information from the RequestMapping annotation of the current class RequestMapping requestMapping = AnnotationUtils.findAnnotation(method, RequestMapping.class); if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) { return requestMapping.path()[0]; } return ""; }
buildApiPath()
Build path: first read the annotation ShenyuSpringMvcClient on the method and build it if it exists; otherwise get the path information from other annotations on the method; complete path = contextPath(context information) + superPath(class information) + methodPath(method information).
private String buildApiPath(@NonNull final Method method, @NonNull final String superPath) { // 1. Read the annotation ShenyuSpringMvcClient on the method ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class); // 1.1 If path exists, build if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) { //1.2 path = contextPath+superPath+methodPath return pathJoin(contextPath, superPath, shenyuSpringMvcClient.path()); } // 2. Get path information from other annotations on the method final String path = getPathByMethod(method); if (StringUtils.isNotBlank(path)) { // 2.1 path = contextPath+superPath+methodPath return pathJoin(contextPath, superPath, path); } return pathJoin(contextPath, superPath); }
getPathByMethod()
Get path information from other annotations on the method, other annotations include.
ShenyuSpringMvcClient
PostMapping
GetMapping
DeleteMapping
PutMapping
RequestMapping
private String getPathByMethod(@NonNull final Method method) { // Iterate through interface annotations to get path information for (Class<? extends Annotation> mapping : mappingAnnotation) { final String pathByAnnotation = getPathByAnnotation(AnnotationUtils.findAnnotation(method, mapping), pathAttributeNames); if (StringUtils.isNotBlank(pathByAnnotation)) { return pathByAnnotation; } } return null; }
After the scanning annotation is finished, construct the metadata object and send the object to shenyu-admin to complete the registration.
Metadata
Includes the rule information of the currently registered method: contextPath, appName, registration path, description information, registration type, whether it is enabled, rule name and whether to register metadata.
private MetaDataRegisterDTO buildMetaDataDTO(@NonNull final ShenyuSpringMvcClient shenyuSpringMvcClient, final String path) { return MetaDataRegisterDTO.builder() .contextPath(contextPath) // contextPath .appName(appName) // appName .path(path) // Registered path, used when gateway rules match .pathDesc(shenyuSpringMvcClient.desc()) // desc info .rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, http type when default .enabled(shenyuSpringMvcClient.enabled()) // is enabled? .ruleName(StringUtils.defaultIfBlank(shenyuSpringMvcClient.ruleName(), path))//rule name .registerMetaData(shenyuSpringMvcClient.registerMetaData()) // whether to register metadata information .build(); }
The specific registration logic is implemented by the registration center, which has been analyzed in the previous articles and will not be analyzed in depth here.
ContextRegisterListener is responsible for registering the client's URI information to shenyu-admin, it implements the ApplicationListener interface, when the context refresh event ContextRefreshedEvent occurs, the onApplicationEvent() method is executed to implement the registration logic.
public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent>, BeanFactoryAware { //...... /** * Constructor instantiation */ public ContextRegisterListener(final PropertiesConfig clientConfig) { // read Properties final Properties props = clientConfig.getProps(); this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString())); this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH); if (Boolean.TRUE.equals(isFull)) { if (StringUtils.isBlank(contextPath)) { final String errorMsg = "http register param must config the contextPath"; LOG.error(errorMsg); throw new ShenyuClientIllegalArgumentException(errorMsg); } } this.port = Integer.parseInt(Optional.ofNullable(props.getProperty(ShenyuClientConstants.PORT)).orElseGet(() -> "-1")); this.appName = props.getProperty(ShenyuClientConstants.APP_NAME); this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP); this.host = props.getProperty(ShenyuClientConstants.HOST); } @Override public void setBeanFactory(final BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } // Execute application events @Override public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) { // The method is guaranteed to be executed once if (!registered.compareAndSet(false, true)) { return; } // 1. If you are registering for the entire service if (Boolean.TRUE.equals(isFull)) { // Build metadata and register publisher.publishEvent(buildMetaDataDTO()); } try { // get port final int mergedPort = port <= 0 ? PortUtils.findPort(beanFactory) : port; // 2. Constructing URI data and registering publisher.publishEvent(buildURIRegisterDTO(mergedPort)); } catch (ShenyuException e) { throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !"); } } // build URI data private URIRegisterDTO buildURIRegisterDTO(final int port) { return URIRegisterDTO.builder() .contextPath(this.contextPath) // contextPath .appName(appName) // appName .protocol(protocol) // protocol .host(IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host)) //host .port(port) // port .rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, default registration http type .build(); } // build MetaData private MetaDataRegisterDTO buildMetaDataDTO() { return MetaDataRegisterDTO.builder() .contextPath(contextPath) .appName(appName) .path(contextPath) .rpcType(RpcTypeEnum.HTTP.getName()) .enabled(true) .ruleName(contextPath) .build(); }}
The metadata and URI data registered by the client through the registry are processed in shenyu-admin, which is responsible for storing to the database and synchronizing to the shenyu gateway. The client registration processing logic of Divide plugin is in ShenyuClientRegisterDivideServiceImpl. The inheritance relationship is as follows.
The metadata MetaDataRegisterDTO object registered by the client through the registry is picked up and dropped in the register() method of shenyu-admin.
Build contextPath, find if the selector information exists, if it does, return id; if it doesn't, create the default selector information.
@Override public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) { // build contextPath String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName()); // Find if selector information exists by name SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName); if (Objects.isNull(selectorDO)) { // Create a default selector message if it does not exist return registerSelector(contextPath, pluginName, selectorHandler); } return selectorDO.getId(); }
Default Selector Information
Construct the default selector information and its conditional properties here.
//register selector private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) { // build selector SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId()); selectorDTO.setHandle(selectorHandler); //register default Selector return registerDefault(selectorDTO); } //build private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) { //build default SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath); selectorDTO.setPluginId(pluginId); //build the conditional properties of the default selector selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath)); return selectorDTO; }
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) { SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO(); selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI selectorConditionDTO.setParamName("/"); selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default match selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // default /contextPath/** return Collections.singletonList(selectorConditionDTO);}
Register default selector
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) { //selector info SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); //selector condition info List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); if (StringUtils.isEmpty(selectorDTO.getId())) { // insert selector information into the database selectorMapper.insertSelective(selectorDO); // insert selector condition information into the database selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); } // Publish synchronization events to synchronize selection information and its conditional attributes to the gateway publishEvent(selectorDO, selectorConditionDTOs); return selectorDO.getId();}
Get a valid URI from the URI registered by the client, update the corresponding selector handle property, and send a selector update event to the gateway.
@Override public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) { //check if (CollectionUtils.isEmpty(uriList)) { return ""; } //get selector SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); if (Objects.isNull(selectorDO)) { throw new ShenyuException("doRegister Failed to execute,wait to retry."); } // gte valid URI List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList()); // build handle String handler = buildHandle(validUriList, selectorDO); if (handler != null) { selectorDO.setHandle(handler); SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); selectorData.setHandle(handler); // Update the handle property of the selector to the database selectorService.updateSelective(selectorDO); // Send selector update events to the gateway eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData))); } return ShenyuResultMessage.SUCCESS; }
The source code analysis on service registration is completed as well as the analysis flow chart is as follows.
The next step is to analyze how the divide plugin initiates a call to the http service based on this information.
The divide plugin is the core processing plugin used by the gateway to handle http protocol requests.
Take the case provided on the official website Quick start with Http as an example, a direct connection request is as follows.
GET http://localhost:8189/order/findById?id=100Accept: application/json
After proxying through the ShenYu gateway, the request is as follows.
GET http://localhost:9195/http/order/findById?id=100Accept: application/json
The services proxied by the ShenYu gateway are still able to request the previous services, where the divide plugin comes into play. The class inheritance relationship is as follows.
After passing the ShenYu gateway proxy, the request entry is ShenyuWebHandler, which implements the org.springframework.web.server.WebHandler interface.
public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> { //...... /** * hanlde web reuest */ @Override public Mono<Void> handle(@NonNull final ServerWebExchange exchange) { // execute plugin chain Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange); if (scheduled) { return execute.subscribeOn(scheduler); } return execute; } private static class DefaultShenyuPluginChain implements ShenyuPluginChain { private int index; private final List<ShenyuPlugin> plugins; /** * Instantiating the default plugin chain */ DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) { this.plugins = plugins; } /** * Execute each plugin */ @Override public Mono<Void> execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { // get current plugin ShenyuPlugin plugin = plugins.get(this.index++); // is skip ? boolean skip = plugin.skip(exchange); if (skip) { // If skipped, execute the next return this.execute(exchange); } // execute current plugin return plugin.execute(exchange, this); } return Mono.empty(); }); } }}
Initiate the request call in the execute() method.
Get the specified timeout, number of retries
Initiate the request
Retry after failure according to the specified retry policy
public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin { protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class); @Override public final Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { // shenyu Context final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT); assert shenyuContext != null; // uri final URI uri = exchange.getAttribute(Constants.HTTP_URI); if (Objects.isNull(uri)) { Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_URL, null); return WebFluxResultUtils.result(exchange, error); } // get time out final long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L); final Duration duration = Duration.ofMillis(timeout); // get retry times final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0); // get retry strategy final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName); LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy is {}", uri.toASCIIString(), retryTimes, retryStrategy); // build header final HttpHeaders httpHeaders = buildHttpHeaders(exchange); // do request final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethodValue(), uri, httpHeaders, exchange.getRequest().getBody()) .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration))) .doOnError(e -> LOG.error(e.getMessage(), e)); // Retry Policy CURRENT, retries the current service. if (RetryEnum.CURRENT.getName().equals(retryStrategy)) { //old version of DividePlugin and SpringCloudPlugin will run on this return response.retryWhen(Retry.anyOf(TimeoutException.class, ConnectTimeoutException.class, ReadTimeoutException.class, IllegalStateException.class) .retryMax(retryTimes) .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true))) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)) .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange)); } // Retry for other services // Exclude services that have already been called final Set<URI> exclude = Sets.newHashSet(uri); // resend return resend(response, exchange, duration, httpHeaders, exclude, retryTimes) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)) .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange)); } private Mono<R> resend(final Mono<R> clientResponse, final ServerWebExchange exchange, final Duration duration, final HttpHeaders httpHeaders, final Set<URI> exclude, final int retryTimes) { Mono<R> result = clientResponse; // Retry according to the specified number of retries for (int i = 0; i < retryTimes; i++) { result = resend(result, exchange, duration, httpHeaders, exclude); } return result; } private Mono<R> resend(final Mono<R> response, final ServerWebExchange exchange, final Duration duration, final HttpHeaders httpHeaders, final Set<URI> exclude) { return response.onErrorResume(th -> { final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID); final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE); //Check available services final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId) .stream().filter(data -> { final String trimUri = data.getUrl().trim(); for (URI needToExclude : exclude) { // exclude already called if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) { return false; } } return true; }).collect(Collectors.toList()); if (CollectionUtils.isEmpty(upstreamList)) { // no need to retry anymore return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg())); } // requets ip final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress(); // Load Balance final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip); if (Objects.isNull(upstream)) { // no need to retry anymore return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg())); } final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain()); // Exclude uri that has already been called exclude.add(newUri); // Make another call return doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, httpHeaders, exchange.getRequest().getBody()) .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration))) .doOnError(e -> LOG.error(e.getMessage(), e)); }); } //......}
The source code analysis in this article starts from the http service registration to the divide plugin service calls. The divide plugin is mainly used to handle http requests. Some of the source code does not enter the in-depth analysis, such as the implementation of load balancing, service probe live, will continue to analyze in the following.
Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.
The Apache ShenYu gateway uses the dubbo plugin to make calls to the dubbo service. You can see the official documentation Dubbo Quick Start to learn how to use the plugin.
This article is based on shenyu-2.4.3 version for source code analysis, please refer to Dubbo Service Access for the introduction of the official website.
Declare the application service name, register the center address, use the dubbo protocol, declare the service interface, and the corresponding interface implementation class.
/** * DubboTestServiceImpl. */@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService { @Override @ShenyuDubboClient(path = "/findById", desc = "Query by Id") public DubboTest findById(final String id) { return new DubboTest(id, "hello world shenyu Apache, findById"); } //......}
In the interface implementation class, use the annotation @ShenyuDubboClient to register the service with shenyu-admin. The role of this annotation and its rationale will be analyzed later.
The configuration information in the configuration file application.yml.
In the configuration file, declare the registry address used by dubbo. The dubbo service registers with shenyu-admin, using the method http, and the registration address is http://localhost:9095.
Use the annotation @ShenyuDubboClient to register the service to the gateway. The simple demo is as follows.
// dubbo sevice@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService { @Override @ShenyuDubboClient(path = "/findById", desc = "Query by Id") // need to be registered method public DubboTest findById(final String id) { return new DubboTest(id, "hello world shenyu Apache, findById"); } //......}
annotation definition:
/** * Works on classes and methods */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})@Inheritedpublic @interface ShenyuDubboClient { //path String path(); //rule name String ruleName() default ""; //desc String desc() default ""; //enabled boolean enabled() default true;}
Annotation scanning is done through the ApacheDubboServiceBeanListener, which implements the ApplicationListener<ContextRefreshedEvent> interface and starts executing the event handler method when a context refresh event occurs during the Spring container startup onApplicationEvent().
During constructor instantiation.
Read property configuration
Start the thread pool
Start the registry for registering with shenyu-admin
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> { // ...... //Constructor public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) { //1.Read property configuration Properties props = clientConfig.getProps(); String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH); String appName = props.getProperty(ShenyuClientConstants.APP_NAME); if (StringUtils.isBlank(contextPath)) { throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName"); } this.contextPath = contextPath; this.appName = appName; this.host = props.getProperty(ShenyuClientConstants.HOST); this.port = props.getProperty(ShenyuClientConstants.PORT); //2.Start the thread pool executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build()); //3.Start the registry for registering with `shenyu-admin` publisher.start(shenyuClientRegisterRepository); } /** * Context refresh event, execute method logic */ @Override public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) { //...... }
Rewritten method logic: read Dubbo service ServiceBean, build metadata object and URI object, and register it with shenyu-admin.
@Override public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) { //read ServiceBean Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class); if (serviceBean.isEmpty()) { return; } //The method is guaranteed to be executed only once if (!registered.compareAndSet(false, true)) { return; } //handle metadata for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) { handler(entry.getValue()); } //handle URI serviceBean.values().stream().findFirst().ifPresent(bean -> { publisher.publishEvent(buildURIRegisterDTO(bean)); }); }
handler()
In the handler() method, read all methods from the serviceBean, determine if there is a ShenyuDubboClient annotation on the method, build a metadata object if it exists, and register the method with shenyu-admin through the registry.
Constructs a metadata object where the necessary information for method registration is constructed and subsequently used for selector or rule matching.
The metadata and URI data registered by the client through the registry are processed at the shenyu-admin end, which is responsible for storing to the database and synchronizing to the shenyu gateway. The client-side registration processing logic of the Dubbo plugin is in the ShenyuClientRegisterDubboServiceImpl. The inheritance relationship is as follows.
The metadata MetaDataRegisterDTO object registered by the client through the registry is picked up and dropped in the register() method of shenyu-admin.
Construct contextPath, find if the selector information exists, if it does, return id; if it doesn't, create the default selector information.
@Override public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) { // build contextPath String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName()); // Find if selector information exists by name SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName); if (Objects.isNull(selectorDO)) { // Create a default selector message if it does not exist return registerSelector(contextPath, pluginName, selectorHandler); } return selectorDO.getId(); }
Default selector information
Construct the default selector information and its conditional properties here.
//register selector private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) { //build selector SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId()); selectorDTO.setHandle(selectorHandler); //register default selector return registerDefault(selectorDTO); } //build selector private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) { //build default SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath); selectorDTO.setPluginId(pluginId); //build the conditional properties of the default selector selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath)); return selectorDTO; }
Build default selector
private SelectorDTO buildDefaultSelectorDTO(final String name) { return SelectorDTO.builder() .name(name) // name .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // default type cutom .matchMode(MatchModeEnum.AND.getCode()) //default match mode .enabled(Boolean.TRUE) //enable .loged(Boolean.TRUE) //log .continued(Boolean.TRUE) .sort(1) .build();}
Build default selector conditional properties
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) { SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO(); selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI selectorConditionDTO.setParamName("/"); selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default match selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); return Collections.singletonList(selectorConditionDTO);}
Register default selector
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) { //selector information SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); //selector conditional properties List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); if (StringUtils.isEmpty(selectorDTO.getId())) { // insert selector information into the database selectorMapper.insertSelective(selectorDO); // inserting selector conditional properties to the database selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); } // Publish synchronization events to synchronize selection information and its conditional attributes to the gateway publishEvent(selectorDO, selectorConditionDTOs); return selectorDO.getId();}
Get a valid URI from the URI registered by the client, update the corresponding selector handle property, and send a selector update event to the gateway.
@Override public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) { //check if (CollectionUtils.isEmpty(uriList)) { return ""; } SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); if (Objects.isNull(selectorDO)) { throw new ShenyuException("doRegister Failed to execute,wait to retry."); } // gte valid URI List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList()); // build handle String handler = buildHandle(validUriList, selectorDO); if (handler != null) { selectorDO.setHandle(handler); SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); selectorData.setHandle(handler); // Update the handle property of the selector to the database selectorService.updateSelective(selectorDO); // Send selector update events to the gateway eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData))); } return ShenyuResultMessage.SUCCESS; }
The source code analysis on service registration is completed as well as the analysis flow chart is as follows.
The next step is to analyze how the dubbo plugin initiates calls to the http service based on this information.
The dubbo plugin is the core processing plugin used by the ShenYu gateway to convert http requests into the dubbo protocol and invoke the dubbo service.
Take the case provided by the official website Quick Start with Dubbo as an example, a dubbo service is registered with shenyu-admin through the registry, and then requested through the ShenYu gateway proxy, the request is as follows.
GET http://localhost:9195/dubbo/findById?id=100Accept: application/json
The class inheritance relationship in the Dubbo plugin is as follows.
After passing the ShenYu gateway proxy, the request entry is ShenyuWebHandler, which implements the org.springframework.web.server.WebHandler interface.
public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> { //...... /** * hanlde request */ @Override public Mono<Void> handle(@NonNull final ServerWebExchange exchange) { // execute default plugin chain Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange); if (scheduled) { return execute.subscribeOn(scheduler); } return execute; } private static class DefaultShenyuPluginChain implements ShenyuPluginChain { private int index; private final List<ShenyuPlugin> plugins; DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) { this.plugins = plugins; } /** * execute. */ @Override public Mono<Void> execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { // get plugin ShenyuPlugin plugin = plugins.get(this.index++); boolean skip = plugin.skip(exchange); if (skip) { // next return this.execute(exchange); } // execute return plugin.execute(exchange, this); } return Mono.empty(); }); } }}
GlobalPlugin is a global plugin that constructs contextual information in the execute() method.
public class GlobalPlugin implements ShenyuPlugin { // shenyu context private final ShenyuContextBuilder builder; //...... @Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { // build context information to be passed into the exchange ShenyuContext shenyuContext = builder.build(exchange); exchange.getAttributes().put(Constants.CONTEXT, shenyuContext); return chain.execute(exchange); } //......}
The RpcParamTransformPlugin is responsible for reading the parameters from the http request, saving them in the exchange and passing them to the rpc service.
In the execute() method, the core logic of the plugin is executed: get the request information from exchange and process the parameters according to the form of content passed in by the request.
Set special context information in the doDubboInvoker() method, and then start the dubbo generalization call.
public class ApacheDubboPlugin extends AbstractDubboPlugin { @Override protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule, final MetaData metaData, final String param) { //set the current selector and rule information, and request address for dubbo graying support RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId()); RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId()); RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress()); //dubbo generic invoker final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange); //execute next plugin in chain return result.then(chain.execute(exchange)); }}
Gets the generalization service GenericService object.
Constructs the request parameter pair object.
Initiates an asynchronous generalization invocation.
public class ApacheDubboProxyService { //...... /** * Generic invoker object. */ public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException { //1.Get the ReferenceConfig object ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath()); if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) { //Failure of the current cache information ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath()); //Reinitialization with metadata reference = ApacheDubboConfigCache.getInstance().initRef(metaData); } //2.Get the GenericService object of the generalization service GenericService genericService = reference.get(); //3.Constructing the request parameter pair object Pair<String[], Object[]> pair; if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) { pair = new ImmutablePair<>(new String[]{}, new Object[]{}); } else { pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes()); } //4.Initiating asynchronous generalization calls return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> { //handle result if (Objects.isNull(ret)) { ret = Constants.DUBBO_RPC_RESULT_EMPTY; } exchange.getAttributes().put(Constants.RPC_RESULT, ret); exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName()); return ret; })).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常 } //Generalized calls, asynchronous operations private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException { genericService.$invoke(method, parameterTypes, args); Object resultFromFuture = RpcContext.getContext().getFuture(); return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture); }}
Calling the dubbo service at the gateway can be achieved by generalizing the call.
The ReferenceConfig object is the key object to support generalization calls , and its initialization operation is done during data synchronization. There are two parts of data involved here, one is the synchronized plugin handler information and the other is the synchronized plugin metadata information.
When the plugin data is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The initialization operation is performed in handlerPlugin().
public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler { //...... //Initializing the configuration cache protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig); @Override public void handlerPlugin(final PluginData pluginData) { if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) { //Data deserialization DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class); DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class); if (Objects.isNull(dubboRegisterConfig)) { return; } if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) { // Perform initialization operations this.initConfigCache(dubboRegisterConfig); } Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig); } } //......}
When the metadata is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The metadata update operation is performed in the onSubscribe() method.
public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber { //local memory cache private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap(); //update metaData public void onSubscribe(final MetaData metaData) { // dubbo if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) { //Whether the corresponding metadata exists MetaData exist = META_DATA.get(metaData.getPath()); if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) { // initRef ApacheDubboConfigCache.getInstance().initRef(metaData); } else { // The corresponding metadata has undergone an update operation if (!Objects.equals(metaData.getServiceName(), exist.getServiceName()) || !Objects.equals(metaData.getRpcExt(), exist.getRpcExt()) || !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes()) || !Objects.equals(metaData.getMethodName(), exist.getMethodName())) { //Build ReferenceConfig again based on the latest metadata ApacheDubboConfigCache.getInstance().build(metaData); } } //local memory cache META_DATA.put(metaData.getPath(), metaData); } } //dalete public void unSubscribe(final MetaData metaData) { if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) { //使ReferenceConfig失效 ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath()); META_DATA.remove(metaData.getPath()); } }}
The source code analysis in this article starts from Dubbo service registration to Dubbo plug-in service calls. The Dubbo plugin is mainly used to handle the conversion of http requests to the dubbo protocol, and the main logic is implemented through generalized calls.
This article is based on the source code analysis of version 'shenyu-2.6.1'. Please refer to the official website for an introduction Data Synchronization Design.
Enter the createPlugin() method in the PluginController class, which is responsible for data validation, adding or updating data, and returning result information.
@Validated@RequiredArgsConstructor@RestController@RequestMapping("/plugin")public class PluginController { @PostMapping("") @RequiresPermissions("system:plugin:add") public ShenyuAdminResult createPlugin(@Valid @ModelAttribute final PluginDTO pluginDTO) { // Call pluginService.createOrUpdate for processing logic return ShenyuAdminResult.success(pluginService.createOrUpdate(pluginDTO)); } // ......}
Use the create() method in the PluginServiceImpl class to convert data, save it to the database, and publish events.
@RequiredArgsConstructor@Servicepublic class PluginServiceImpl implements SelectorService { // Event publishing object pluginEventPublisher private final PluginEventPublisher pluginEventPublisher; private String create(final PluginDTO pluginDTO) { // Check if there is a corresponding plugin Assert.isNull(pluginMapper.nameExisted(pluginDTO.getName()), AdminConstants.PLUGIN_NAME_IS_EXIST); // check if Customized plugin jar if (!Objects.isNull(pluginDTO.getFile())) { Assert.isTrue(checkFile(Base64.decode(pluginDTO.getFile())), AdminConstants.THE_PLUGIN_JAR_FILE_IS_NOT_CORRECT_OR_EXCEEDS_16_MB); } // Create plugin object PluginDO pluginDO = PluginDO.buildPluginDO(pluginDTO); // Insert object into database if (pluginMapper.insertSelective(pluginDO) > 0) { // publish create event. init plugin data pluginEventPublisher.onCreated(pluginDO); } return ShenyuResultMessage.CREATE_SUCCESS; } // ......}
Complete the data persistence operation in the PluginServiceImpl class, that is, save the data to the database and publish events through pluginEventPublisher.
The logic of the pluginEventPublisher.onCreated method is to publish the changed event:
Publishing change data is completed through publisher.publishEvent(), which is an 'Application EventPublisher' object with the fully qualified name of 'org. springframework. contentxt.' Application EventPublisher `. From here, we know that publishing data is accomplished through the Spring related features.
About ApplicationEventPublisher:
When there is a state change, the publisher calls the publishEvent method of ApplicationEventPublisher to publish an event, the Spring container broadcasts the event to all observers, and calls the observer's onApplicationEvent method to pass the event object to the observer. There are two ways to call the publishEvent method. One is to implement the interface, inject the ApplicationEventPublisher object into the container, and then call its method. The other is to call the container directly. There is not much difference between the two methods to publish events.
ApplicationEventPublisher:Publish events;
ApplicationEvent:Spring events,Record the source, time, and data of the event;
ApplicationListener:Event listeners, observers;
In the event publishing mechanism of Spring, there are three objects,
One is the ApplicationEventPublisher that publishes events, injecting an publisher through a constructor in ShenYu.
The other object is ApplicationEvent, which is inherited from ShenYu through DataChangedEvent, representing the event object
public class DataChangedEvent extends ApplicationEvent {//......}
The last one is ApplicationListener, which is implemented in ShenYu through the DataChangedEventDispatcher class as a listener for events, responsible for handling event objects.
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean { //......}
After the event is published, it will automatically enter the onApplicationEvent() method in the DataChangedEventDispatcher class for event processing.
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean { /** * When there is a data change, call this method * @param event */ @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { // Traverse data change listeners (only ApolloDataChangedListener will be registered here) for (DataChangedListener listener : listeners) { // Forward according to different grouping types switch (event.getGroupKey()) { case APP_AUTH: // authentication information listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: // Plugin events // Calling the registered listener object listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: // Rule events listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: // Selector event listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: // Metadata events listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; case PROXY_SELECTOR: // Proxy selector event listener.onProxySelectorChanged((List<ProxySelectorData>) event.getSource(), event.getEventType()); break; case DISCOVER_UPSTREAM: // Registration discovery of downstream list events listener.onDiscoveryUpstreamChanged((List<DiscoverySyncData>) event.getSource(), event.getEventType()); applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnUpstreamChanged((List<DiscoverySyncData>) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } }}
When there is a data change, call the onApplicationEvent method, then traverse all data change listeners, determine which data type it is, and hand it over to the corresponding data listeners for processing.
ShenYu has grouped all data into the following types: authentication information, plugin information, rule information, selector information, metadata, proxy selector, and downstream event discovery.
The Data Change Listener here is an abstraction of the data synchronization strategy, processed by specific implementations, and different listeners are processed by different implementations. Currently, Apollo is being analyzed
Listening, so here we only focus on ApolloDataChangedListener.
// Inheriting AbstractNodeDataChangedListenerpublic class ApolloDataChangedListener extends AbstractNodeDataChangedListener {}
ApolloDataChangedListener inherits the AbstractNodeDataChangedListener class, which mainly uses key as the base class for storage, such as Apollo, Nacos, etc., while others such as Zookeeper
Consul, etc. are searched in a hierarchical manner using a path.
// Using key as the base class for finding storage methodspublic abstract class AbstractNodeDataChangedListener implements DataChangedListener { protected AbstractNodeDataChangedListener(final ChangeData changeData) { this.changeData = changeData; }}
AbstractNodeDataChangedListener receives ChangeData as a parameter, which defines the key names for each data stored in Apollo. The data stored in Apollo includes the following data:
Plugin(plugin)
Selector(selector)
Rules(rule)
Authorization(auth)
Metadata(meta)
Proxy selector(proxy.selector)
Downstream List (discovery)
These information are specified by the ApolloDataChangedListener constructor:
public class ApolloDataChangedListener extends AbstractNodeDataChangedListener { public ApolloDataChangedListener(final ApolloClient apolloClient) { // Configure prefixes for several types of grouped data super(new ChangeData(ApolloPathConstants.PLUGIN_DATA_ID, ApolloPathConstants.SELECTOR_DATA_ID, ApolloPathConstants.RULE_DATA_ID, ApolloPathConstants.AUTH_DATA_ID, ApolloPathConstants.META_DATA_ID, ApolloPathConstants.PROXY_SELECTOR_DATA_ID, ApolloPathConstants.DISCOVERY_DATA_ID)); // Manipulating objects of Apollo this.apolloClient = apolloClient; }}
DataChangedListener defines the following methods:
// Data Change Listenerpublic interface DataChangedListener { // Call when authorization information changes default void onAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType) { } // Called when plugin information changes default void onPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType) { } // Called when selector information changes default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) { } // Called when metadata information changes default void onMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType) { } // Call when rule information changes default void onRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType) { } // Called when proxy selector changes default void onProxySelectorChanged(List<ProxySelectorData> changed, DataEventTypeEnum eventType) { } // Called when downstream information changes are discovered default void onDiscoveryUpstreamChanged(List<DiscoverySyncData> changed, DataEventTypeEnum eventType) { }}
When the plugin is processed by DataChangedEventDispatcher, the method listener.onPluginChanged is called. Next, analyze the logic of the object and implement the processing by AbstractNodeDataChangedListener:
public abstract class AbstractNodeDataChangedListener implements DataChangedListener { @Override public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) { //Configure prefix as plugin. final String configKeyPrefix = changeData.getPluginDataId() + DefaultNodeConstants.JOIN_POINT; this.onCommonChanged(configKeyPrefix, changed, eventType, PluginData::getName, PluginData.class); LOG.debug("[DataChangedListener] PluginChanged {}", configKeyPrefix); }}
Firstly, the key prefix for constructing configuration data is: plugin., Call onCommonChanged again for unified processing:
private <T> void onCommonChanged(final String configKeyPrefix, final List<T> changedList, final DataEventTypeEnum eventType, final Function<? super T, ? extends String> mapperToKey, final Class<T> tClass) { // Avoiding concurrent operations on list nodes final ReentrantLock reentrantLock = listSaveLockMap.computeIfAbsent(configKeyPrefix, key -> new ReentrantLock()); try { reentrantLock.lock(); // Current incoming plugin list final List<String> changeNames = changedList.stream().map(mapperToKey).collect(Collectors.toList()); switch (eventType) { // Delete Operation case DELETE: // delete plugin.${pluginName} changedList.stream().map(mapperToKey).forEach(removeKey -> { delConfig(configKeyPrefix + removeKey); }); // Remove the corresponding plugin name from plugin. list // The plugin.list records the currently enabled list delChangedData(configKeyPrefix, changeNames); break; case REFRESH: case MYSELF: // Overload logic // Get a list of all plugins in plugin.list final List<String> configDataNames = this.getConfigDataNames(configKeyPrefix); // Update each currently adjusted plug-in in turn changedList.forEach(changedData -> { // Publish Configuration publishConfig(configKeyPrefix + mapperToKey.apply(changedData), changedData); }); // If there is more data in the currently stored list than what is currently being passed in, delete the excess data if (configDataNames != null && configDataNames.size() > changedList.size()) { // Kick out the currently loaded data configDataNames.removeAll(changeNames); // Delete cancelled data one by one configDataNames.forEach(this::delConfig); } // Update list data again publishConfig(configKeyPrefix + DefaultNodeConstants.LIST_STR, changeNames); break; default: // Add or update changedList.forEach(changedData -> { publishConfig(configKeyPrefix + mapperToKey.apply(changedData), changedData); }); // Update the newly added plugin putChangeData(configKeyPrefix, changeNames); break; } } catch (Exception e) { LOG.error("AbstractNodeDataChangedListener onCommonMultiChanged error ", e); } finally { reentrantLock.unlock(); } }
In the above logic, it actually includes the handling of full overloading (REFRESH, MYSELF) and increment (Delete, UPDATE, CREATE)
The plugin mainly includes two nodes:
plugin.list List of currently effective plugins
plugin.${plugin.name} Detailed information on specific plugins
Finally, write the data corresponding to these two nodes into Apollo.
After starting admin, the current data information will be fully synchronized to Apollo, which is implemented by ApolloDataChangedInit:
// Inheriting AbstractDataChangedInitpublic class ApolloDataChangedInit extends AbstractDataChangedInit { // Apollo operation object private final ApolloClient apolloClient; public ApolloDataChangedInit(final ApolloClient apolloClient) { this.apolloClient = apolloClient; } @Override protected boolean notExist() { // Check if nodes such as plugin, auth, meta, proxy.selector exist // As long as one does not exist, it enters reload (these nodes will not be created, why check once?) return Stream.of(ApolloPathConstants.PLUGIN_DATA_ID, ApolloPathConstants.AUTH_DATA_ID, ApolloPathConstants.META_DATA_ID, ApolloPathConstants.PROXY_SELECTOR_DATA_ID).allMatch( this::dataIdNotExist); } /** * Data id not exist boolean. * * @param pluginDataId the plugin data id * @return the boolean */ private boolean dataIdNotExist(final String pluginDataId) { return Objects.isNull(apolloClient.getItemValue(pluginDataId)); }}
Check if there is data in apollo, and if it does not exist, synchronize it.
There is a bug here because the key determined here will not be created during synchronization, which will cause data to be reloaded every time it is restarted. PR#5435
ApolloDataChangedInit implements the CommandLineRunner interface. It is an interface provided by springboot that executes the run() method after all Spring Beans are initialized. It is commonly used for initialization operations in projects.
SyncDataService.syncAll()
Query data from the database, then perform full data synchronization, including all authentication information, plugin information, rule information, selector information, metadata, proxy selector, and discover downstream events. Mainly, synchronization events are published through eventPublisher. After publishing events through publishEvent(), ApplicationListener performs event change operations, which is referred to as DataChangedEventDispatcher in ShenYu.
@Servicepublic class SyncDataServiceImpl implements SyncDataService { // Event Publishing private final ApplicationEventPublisher eventPublisher; /*** * Full data synchronization * @param type the type * @return */ @Override public boolean syncAll(final DataEventTypeEnum type) { // Synchronize auth data appAuthService.syncData(); // Synchronize plugin data List<PluginData> pluginDataList = pluginService.listAll(); //Notify subscribers through the Spring publish/subscribe mechanism (publishing DataChangedEvent) //Unified monitoring by DataChangedEventDispatcher //DataChangedEvent comes with configuration grouping type, current operation type, and data eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList)); // synchronizing selector List<SelectorData> selectorDataList = selectorService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList)); // Synchronization rules List<RuleData> ruleDataList = ruleService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList)); // Synchronization metadata metaDataService.syncData(); // Synchronization Downstream List discoveryService.syncData(); return true; }}
The data synchronization initialization operation on the gateway side mainly involves subscribing to nodes in apollo, and receiving changed data when there are changes. This depends on the listener mechanism of apollo. In ShenYu, the person responsible for Apollo data synchronization is ApolloDataService.
The functional logic of Apollo DataService is completed during the instantiation process: subscribe to the shenyu data synchronization node in Apollo. Implement through the configService.addChangeListener() method;
public class ApolloDataService extends AbstractNodeDataSyncService implements SyncDataService { public ApolloDataService(final Config configService, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers, final List<ProxySelectorDataSubscriber> proxySelectorDataSubscribers, final List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) { // Configure the prefix for listening super(new ChangeData(ApolloPathConstants.PLUGIN_DATA_ID, ApolloPathConstants.SELECTOR_DATA_ID, ApolloPathConstants.RULE_DATA_ID, ApolloPathConstants.AUTH_DATA_ID, ApolloPathConstants.META_DATA_ID, ApolloPathConstants.PROXY_SELECTOR_DATA_ID, ApolloPathConstants.DISCOVERY_DATA_ID), pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers); this.configService = configService; // Start listening // Note: The Apollo method is only responsible for obtaining data from Apollo and adding it to the local cache, and does not handle listening startWatch(); // Configure listening apolloWatchPrefixes(); }}
Firstly, configure the key information that needs to be processed and synchronize it with the admin's key. Next, call the startWatch() method to process data acquisition and listening. But in the implementation of Apollo, this method is only responsible for handling data retrieval and setting it to the local cache.
Listening is handled by the apolloWatchPrefixes method
private void apolloWatchPrefixes() { // Defining Listeners final ConfigChangeListener listener = changeEvent -> { changeEvent.changedKeys().forEach(changeKey -> { try { final ConfigChange configChange = changeEvent.getChange(changeKey); // Skip if not changed if (configChange == null) { LOG.error("apollo watchPrefixes error configChange is null {}", changeKey); return; } final String newValue = configChange.getNewValue(); // skip last is "list" // If it is a Key at the end of the list, such as plugin.list, skip it because it is only a list that records the effectiveness and will not be cached locally final int lastListStrIndex = changeKey.length() - DefaultNodeConstants.LIST_STR.length(); if (changeKey.lastIndexOf(DefaultNodeConstants.LIST_STR) == lastListStrIndex) { return; } // If it starts with plugin. => Process plugin data if (changeKey.indexOf(ApolloPathConstants.PLUGIN_DATA_ID) == 0) { // delete if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) { // clear cache unCachePluginData(changeKey); } else { // update cache cachePluginData(newValue); } // If it starts with selector. => Process selector data } else if (changeKey.indexOf(ApolloPathConstants.SELECTOR_DATA_ID) == 0) { if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) { unCacheSelectorData(changeKey); } else { cacheSelectorData(newValue); } // If it starts with rule. => Process rule data } else if (changeKey.indexOf(ApolloPathConstants.RULE_DATA_ID) == 0) { if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) { unCacheRuleData(changeKey); } else { cacheRuleData(newValue); } // If it starts with auth. => Process auth data } else if (changeKey.indexOf(ApolloPathConstants.AUTH_DATA_ID) == 0) { if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) { unCacheAuthData(changeKey); } else { cacheAuthData(newValue); } // If it starts with meta. => Process meta data } else if (changeKey.indexOf(ApolloPathConstants.META_DATA_ID) == 0) { if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) { unCacheMetaData(changeKey); } else { cacheMetaData(newValue); } // If it starts with proxy.selector. => Process proxy.selector meta } else if (changeKey.indexOf(ApolloPathConstants.PROXY_SELECTOR_DATA_ID) == 0) { if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) { unCacheProxySelectorData(changeKey); } else { cacheProxySelectorData(newValue); } // If it starts with discovery. => Process discovery meta } else if (changeKey.indexOf(ApolloPathConstants.DISCOVERY_DATA_ID) == 0) { if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) { unCacheDiscoveryUpstreamData(changeKey); } else { cacheDiscoveryUpstreamData(newValue); } } } catch (Exception e) { LOG.error("apollo sync listener change key handler error", e); } }); }; watchConfigChangeListener = listener; // Add listening configService.addChangeListener(listener, Collections.emptySet(), ApolloPathConstants.pathKeySet()); }
The logic of loading data from the previous admin will only add two keys to the plugin: plugin.list and plugin.${plugin.name}, while plugin.list is a list of all enabled plugins, and the data for this key is in the
There is no data in the local cache, only `plugin${plugin.name} will be focus.
At this point, the synchronization logic of bootstrap in apollo has been analyzed.
Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.
In ShenYu gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for ZooKeeper, WebSocket, http long poll, Nacos, Etcd and Consul. The main content of this article is based on Etcd data synchronization source code analysis.
This paper based on shenyu-2.4.0 version of the source code analysis, the official website of the introduction of please refer to the Data Synchronization Design .
Etcd is a strongly consistent, distributed key-value store that provides a reliable way to store data that needs to be accessed by a distributed system or cluster of machines.
We traced the source code from a real case, such as updating a selector data in the Divide plugin to a weight of 90 in a background administration system:
Enter the createSelector() method of the SelectorController class, which validates data, adds or updates data, and returns results.
@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController { @PutMapping("/{id}") public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) { // set the current selector data ID selectorDTO.setId(id); // create or update operation Integer updateCount = selectorService.createOrUpdate(selectorDTO); // return result return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount); } // ......}
Convert data in the SelectorServiceImpl class using the createOrUpdate() method, save it to the database, publish the event, update upstream.
@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService { // eventPublisher private final ApplicationEventPublisher eventPublisher; @Override @Transactional(rollbackFor = Exception.class) public int createOrUpdate(final SelectorDTO selectorDTO) { int selectorCount; // build data DTO --> DO SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); // insert or update ? if (StringUtils.isEmpty(selectorDTO.getId())) { // insert into data selectorCount = selectorMapper.insertSelective(selectorDO); // insert into condition data selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); // check selector add if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) { DataPermissionDTO dataPermissionDTO = new DataPermissionDTO(); dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId()); dataPermissionDTO.setDataId(selectorDO.getId()); dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE); dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO)); } } else { // update data, delete and then insert selectorCount = selectorMapper.updateSelective(selectorDO); //delete rule condition then add selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId())); selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO); selectorConditionMapper.insertSelective(selectorConditionDO); }); } // publish event publishEvent(selectorDO, selectorConditionDTOs); // update upstream updateDivideUpstream(selectorDO); return selectorCount; } // ......}
In the Service class to persist data, i.e. to the database, this should be familiar, not expand. The update upstream operation is analyzed in the corresponding section below, focusing on the publish event operation, which performs data synchronization.
The logic of the publishEvent() method is to find the plugin corresponding to the selector, build the conditional data, and publish the change data.
Change data released by eventPublisher.PublishEvent() is complete, the eventPublisher object is a ApplicationEventPublisher class, The fully qualified class name is org.springframework.context.ApplicationEventPublisher. Here we see that publishing data is done through Spring related functionality.
ApplicationEventPublisher:
When a state change, the publisher calls ApplicationEventPublisher of publishEvent method to release an event, Spring container broadcast event for all observers, The observer's onApplicationEvent method is called to pass the event object to the observer. There are two ways to call publishEvent method, one is to implement the interface by the container injection ApplicationEventPublisher object and then call the method, the other is a direct call container, the method of two methods of publishing events not too big difference.
ApplicationEventPublisher: publish event;
ApplicationEvent: Spring event, record the event source, time, and data;
ApplicationListener: event listener, observer.
In Spring event publishing mechanism, there are three objects,
An object is a publish event ApplicationEventPublisher, in ShenYu through the constructor in the injected a eventPublisher.
The other object is ApplicationEvent , inherited from ShenYu through DataChangedEvent, representing the event object.
public class DataChangedEvent extends ApplicationEvent {//......}
The last object is ApplicationListener in ShenYu in through DataChangedEventDispatcher class implements this interface, as the event listener, responsible for handling the event object.
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean { //......}
Released when the event is completed, will automatically enter the DataChangedEventDispatcher class onApplicationEvent() method of handling events.
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean { /** * This method is called when there are data changes * @param event */ @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { // Iterate through the data change listener (usually using a data synchronization approach is fine) for (DataChangedListener listener : listeners) { // What kind of data has changed switch (event.getGroupKey()) { case APP_AUTH: // app auth data listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: // plugin data listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: // rule data listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: // selector data listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: // metadata listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; default: // other types throw exception throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } }}
When there is a data change, the onApplicationEvent method is called and all the data change listeners are iterated to determine the data type and handed over to the appropriate data listener for processing.
ShenYu groups all the data into five categories: APP_AUTH, PLUGIN, RULE, SELECTOR and META_DATA.
Here the data change listener (DataChangedListener) is an abstraction of the data synchronization policy. Its concrete implementation is:
These implementation classes are the synchronization strategies currently supported by ShenYu:
WebsocketDataChangedListener: data synchronization based on Websocket;
ZookeeperDataChangedListener:data synchronization based on Zookeeper;
ConsulDataChangedListener: data synchronization based on Consul;
EtcdDataDataChangedListener:data synchronization based on etcd;
HttpLongPollingDataChangedListener:data synchronization based on http long polling;
NacosDataChangedListener:data synchronization based on nacos;
Given that there are so many implementation strategies, how do you decide which to use?
Because this paper is based on Etcd data synchronization source code analysis, so here to EtcdDataDataChangedListener as an example, the analysis of how it is loaded and implemented.
A global search in the source code project shows that its implementation is done in the DataSyncConfiguration class.
/** * Data Sync Configuration * By springboot conditional assembly * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration { /** * The type Etcd listener. */ @Configuration @ConditionalOnProperty(prefix = "shenyu.sync.etcd", name = "url") @EnableConfigurationProperties(EtcdProperties.class) static class EtcdListener { @Bean public EtcdClient etcdClient(final EtcdProperties etcdProperties) { Client client = Client.builder() .endpoints(etcdProperties.getUrl()) .build(); return new EtcdClient(client); } /** * Config event listener data changed listener. * * @param etcdClient the etcd client * @return the data changed listener */ @Bean @ConditionalOnMissingBean(EtcdDataDataChangedListener.class) public DataChangedListener etcdDataChangedListener(final EtcdClient etcdClient) { return new EtcdDataDataChangedListener(etcdClient); } /** * data init. * * @param etcdClient the etcd client * @param syncDataService the sync data service * @return the etcd data init */ @Bean @ConditionalOnMissingBean(EtcdDataInit.class) public EtcdDataInit etcdDataInit(final EtcdClient etcdClient, final SyncDataService syncDataService) { return new EtcdDataInit(etcdClient, syncDataService); } } // other code is omitted......}
This configuration class is implemented through the SpringBoot conditional assembly class. The EtcdListener class has several annotations:
@ConditionalOnProperty(prefix = "shenyu.sync.etcd", name = "url"): attribute condition. The configuration class takes effect only when the condition is met. That is, when we have the following configuration, etcd is used for data synchronization.
shenyu: sync: etcd: url: localhost:2181
@EnableConfigurationProperties(EtcdProperties.class):import EtcdProperties; The properties in the class EtcdProperties is relative to the properties which is with shenyu.sync.etcd as prefix in the configuration file.
When the shenyu.sync.etcd.url property is set in the configuration file, Admin would use the etcd data synchronization, EtcdListener is generated and the beans with type EtcdClient, EtcdDataDataChangedListener and EtcdDataInit would also be generated.
The bean with the type EtcdClient would be generated, named etcdClient. This bean configues the connection properties of the etcd server based on the configuration file and can operate the etcdnodes directly.
The bean with the type EtcdDataDataChangedListener would be generated, named etcdDataDataChangedListener. This bean use the bean etcdClient as a member variable and so when the event is listened, etcdDataDataChangedListener would call the callback method and use the etcdClient to operate the etcd nodes.
The bean with the type EtcdDataInit would be generated, named etcdDataInit. This bean use the bean etcdClient and syncDataService as member variables, and use etcdClient to judge whether the data are initialized, if not, would use syncDataService to refresh data. We would dive into the details later.
So in the event handler onApplicationEvent(), it goes to the corresponding listener. In our case, it is a selector data update, data synchronization is etcd, so, the code will enter the EtcdDataDataChangedListener selector data change process.
@Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { // Iterate through the data change listener (usually using a data synchronization approach is fine) for (DataChangedListener listener : listeners) { // what kind of data has changed switch (event.getGroupKey()) { // other code logic is omitted case SELECTOR: // selector data listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); // In our case, will enter the EtcdDataDataChangedListener selector data change process break; } }
In the onSelectorChanged() method, determine the type of action, whether to refresh synchronization or update or create synchronization. Determine whether the node is in etcd based on the current selector data.
/** * EtcdDataDataChangedListener. */@Slf4jpublic class EtcdDataDataChangedListener implements DataChangedListener { @Override public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) { if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) { String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(changed.get(0).getPluginName()); etcdClient.deleteEtcdPathRecursive(selectorParentPath); } for (SelectorData data : changed) { String selectorRealPath = DefaultPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId()); if (eventType == DataEventTypeEnum.DELETE) { etcdClient.delete(selectorRealPath); continue; } //create or update updateNode(selectorRealPath, data); } }}
This part is very important. The variable changed represents the SelectorData list, the variable eventType reprents the event type. When the event type is REFRESH and the SelectorData has changed, all the selector nodes under this plugin would be deleted in etcd. We should notice that the condition that the SelectorData has changed is necessary, otherwise a bug would appear that all the selector nodes would be deleted when no SelectorData data has changed.
As long as the changed data is correctly written to the etcd node, the admin side of the operation is complete.
In our current case, updating one of the selector data in the Divide plugin with a weight of 90 updates specific nodes in the graph.
We series the above update flow with a sequence diagram.
Assume that the ShenYu gateway is already running properly, and the data synchronization mode is also etcd. How does the gateway receive and process the selector data after updating it on the admin side and sending the changed data to etcd? Let's continue our source code analysis to find out.
There is a EtcdSyncDataService class on the gateway, which subscribing to the data node through etcdClient and can sense when the data changes.
/** * Data synchronize of etcd. */@Slf4jpublic class EtcdSyncDataService implements SyncDataService, AutoCloseable { private void subscribeSelectorDataChanges(final String path) { etcdClient.watchDataChange(path, (updateNode, updateValue) -> cacheSelectorData(updateValue), this::unCacheSelectorData); } //other codes omitted}
Etcd's Watch mechanism notifies subscribing clients of node changes. In our case, updating the selector information goes to the watchDataChange() method. cacheSelectorData() is used to process data.
PluginDataSubscriber is an interface, it is only a CommonPluginDataSubscriber implementation class, responsible for data processing plugin, selector and rules.
It has no additional logic and calls the subscribeDataHandler() method directly. Within methods, there are data types (plugins, selectors, or rules) and action types (update or delete) to perform different logic.
/** * The common plugin data subscriber, responsible for handling all plug-in, selector, and rule information */public class CommonPluginDataSubscriber implements PluginDataSubscriber { //...... // handle selector data @Override public void onSelectorSubscribe(final SelectoData selectorData) { subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE); } // A subscription data handler that handles updates or deletions of data private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { // plugin data if (data instanceof PluginData) { PluginData pluginData = (PluginData) data; if (dataType == DataEventTypeEnum.UPDATE) { // update // save the data to gateway memory BaseDataCache.getInstance().cachePluginData(pluginData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData)); } else if (dataType == DataEventTypeEnum.DELETE) { // delete // delete the data from gateway memory BaseDataCache.getInstance().removePluginData(pluginData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData)); } } else if (data instanceof SelectorData) { // selector data SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { // update // save the data to gateway memory BaseDataCache.getInstance().cacheSelectData(selectorData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData)); } else if (dataType == DataEventTypeEnum.DELETE) { // delete // delete the data from gateway memory BaseDataCache.getInstance().removeSelectData(selectorData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); } } else if (data instanceof RuleData) { // rule data RuleData ruleData = (RuleData) data; if (dataType == DataEventTypeEnum.UPDATE) { // update // save the data to gateway memory BaseDataCache.getInstance().cacheRuleData(ruleData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData)); } else if (dataType == DataEventTypeEnum.DELETE) { // delete // delete the data from gateway memory BaseDataCache.getInstance().removeRuleData(ruleData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData)); } } }); }}
// save the data to gateway memoryBaseDataCache.getInstance().cacheSelectData(selectorData);// If each plugin has its own processing logic, then do itOptional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
One is to save the data to the gateway's memory. BaseDataCache is the class that ultimately caches data, implemented in a singleton pattern. The selector data is stored in the SELECTOR_MAP Map. In the subsequent use, also from this data.
public final class BaseDataCache { // private instance private static final BaseDataCache INSTANCE = new BaseDataCache(); // private constructor private BaseDataCache() { } /** * Gets instance. * public method * @return the instance */ public static BaseDataCache getInstance() { return INSTANCE; } /** * A Map of the cache selector data * pluginName -> SelectorData. */ private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap(); public void cacheSelectData(final SelectorData selectorData) { Optional.ofNullable(selectorData).ifPresent(this::selectorAccept); } /** * cache selector data. * @param data the selector data */ private void selectorAccept(final SelectorData data) { String key = data.getPluginName(); if (SELECTOR_MAP.containsKey(key)) { // Update operation, delete before insert List<SelectorData> existList = SELECTOR_MAP.get(key); final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList()); resultList.add(data); final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList()); SELECTOR_MAP.put(key, collect); } else { // Add new operations directly to Map SELECTOR_MAP.put(key, Lists.newArrayList(data)); } }}
Second, if each plugin has its own processing logic, then do it. Through the IDEA editor, you can see that after adding a selector, there are the following plugins and processing. We're not going to expand it here.
After the above source tracking, and through a practical case, in the admin end to update a selector data, the ZooKeeper data synchronization process analysis is clear.
Let's series the data synchronization process on the gateway side through the sequence diagram:
The data synchronization process has been analyzed. In order to prevent the synchronization process from being interrupted, other logic is ignored during the analysis. We also need to analyze the process of Admin synchronization data initialization and gateway synchronization operation initialization.
When admin starts, the current data will be fully synchronized to etcd, the implementation logic is as follows:
/** * EtcdDataInit. */@Slf4jpublic class EtcdDataInit implements CommandLineRunner { private final EtcdClient etcdClient; private final SyncDataService syncDataService; public EtcdDataInit(final EtcdClient client, final SyncDataService syncDataService) { this.etcdClient = client; this.syncDataService = syncDataService; } @Override public void run(final String... args) throws Exception { final String pluginPath = DefaultPathConstants.PLUGIN_PARENT; final String authPath = DefaultPathConstants.APP_AUTH_PARENT; final String metaDataPath = DefaultPathConstants.META_DATA; if (!etcdClient.exists(pluginPath) && !etcdClient.exists(authPath) && !etcdClient.exists(metaDataPath)) { log.info("Init all data from database"); syncDataService.syncAll(DataEventTypeEnum.REFRESH); } }}
Check whether there is data in etcd, if not, then synchronize.
EtcdDataInit implements the CommandLineRunner interface. It is an interface provided by SpringBoot that executes the run() method after all Spring Beans initializations and is often used for initialization operations in a project.
SyncDataService.syncAll()
Query data from the database, and then perform full data synchronization, all authentication information, plugin information, selector information, rule information, and metadata information. Synchronous events are published primarily through eventPublisher. After publishing the event via publishEvent(), the ApplicationListener performs the event change operation. In ShenYu is mentioned in DataChangedEventDispatcher.
@Servicepublic class SyncDataServiceImpl implements SyncDataService { // eventPublisher private final ApplicationEventPublisher eventPublisher; /*** * sync all data * @param type the type * @return */ @Override public boolean syncAll(final DataEventTypeEnum type) { // app auth data appAuthService.syncData(); // plugin data List<PluginData> pluginDataList = pluginService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList)); // selector data List<SelectorData> selectorDataList = selectorService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList)); // rule data List<RuleData> ruleDataList = ruleService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList)); // metadata metaDataService.syncData(); return true; }}
The initial operation of data synchronization on the gateway side is mainly the node in the subscription etcd. When there is a data change, the changed data will be received. This relies on the Watch mechanism of etcd. In ShenYu, the one responsible for etcd data synchronization is EtcdSyncDataService, also mentioned earlier.
The function logic of EtcdSyncDataService is completed in the process of instantiation: the subscription to Shenyu data synchronization node in etcd is completed. Subscription here is divided into two kinds, one kind is existing node data updated above, through this etcdClient.subscribeDataChanges() method; Another kind is under the current node, add or delete nodes change namely child nodes, it through etcdClient.subscribeChildChanges() method.
EtcdSyncDataService code is a bit too much, here we use plugin data read and subscribe to track, other types of data operation principle is the same.
/** * Data synchronize of etcd. */@Slf4jpublic class EtcdSyncDataService implements SyncDataService, AutoCloseable { /** * Instantiates a new Zookeeper cache manager. * * @param etcdClient the etcd client * @param pluginDataSubscriber the plugin data subscriber * @param metaDataSubscribers the meta data subscribers * @param authDataSubscribers the auth data subscribers */ public EtcdSyncDataService(final EtcdClient etcdClient, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { this.etcdClient = etcdClient; this.pluginDataSubscriber = pluginDataSubscriber; this.metaDataSubscribers = metaDataSubscribers; this.authDataSubscribers = authDataSubscribers; watcherData(); watchAppAuth(); watchMetaData(); } private void watcherData() { final String pluginParent = DefaultPathConstants.PLUGIN_PARENT; List<String> pluginZKs = etcdClientGetChildren(pluginParent); for (String pluginName : pluginZKs) { watcherAll(pluginName); } etcdClient.watchChildChange(pluginParent, (updateNode, updateValue) -> { if (!updateNode.isEmpty()) { watcherAll(updateNode); } }, null); } private void watcherAll(final String pluginName) { watcherPlugin(pluginName); watcherSelector(pluginName); watcherRule(pluginName); } private void watcherPlugin(final String pluginName) { String pluginPath = DefaultPathConstants.buildPluginPath(pluginName); cachePluginData(etcdClient.get(pluginPath)); subscribePluginDataChanges(pluginPath, pluginName); } private void cachePluginData(final String dataString) { final PluginData pluginData = GsonUtils.getInstance().fromJson(dataString, PluginData.class); Optional.ofNullable(pluginData) .flatMap(data -> Optional.ofNullable(pluginDataSubscriber)).ifPresent(e -> e.onSubscribe(pluginData)); } private void subscribePluginDataChanges(final String pluginPath, final String pluginName) { etcdClient.watchDataChange(pluginPath, (updatePath, updateValue) -> { final String dataPath = buildRealPath(pluginPath, updatePath); final String dataStr = etcdClient.get(dataPath); final PluginData data = GsonUtils.getInstance().fromJson(dataStr, PluginData.class); Optional.ofNullable(data) .ifPresent(d -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe(d))); }, deleteNode -> deletePlugin(pluginName)); }}
The above source code is given comments, I believe you can understand. The main logic for subscribing to plug-in data is as follows:
Create the current plugin path
Read the current node data on etcd and deserialize it
Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.
In ShenYu gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for ZooKeeper, WebSocket, http long poll, Nacos, etcd and Consul. The main content of this article is based on Nacos data synchronization source code analysis.
This paper based on shenyu-2.4.0 version of the source code analysis, the official website of the introduction of please refer to the Data Synchronization Design .
We traced the source code from a real case, such as updating a selector data in the Divide plugin to a weight of 90 in a background administration system:
Enter the createSelector() method of the SelectorController class, which validates data, adds or updates data, and returns results.
@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController { @PutMapping("/{id}") public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) { // set the current selector data ID selectorDTO.setId(id); // create or update operation Integer updateCount = selectorService.createOrUpdate(selectorDTO); // return result return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount); } // ......}
Convert data in the SelectorServiceImpl class using the createOrUpdate() method, save it to the database, publish the event, update upstream.
@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService { // eventPublisher private final ApplicationEventPublisher eventPublisher; @Override @Transactional(rollbackFor = Exception.class) public int createOrUpdate(final SelectorDTO selectorDTO) { int selectorCount; // build data DTO --> DO SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); // insert or update ? if (StringUtils.isEmpty(selectorDTO.getId())) { // insert into data selectorCount = selectorMapper.insertSelective(selectorDO); // insert into condition data selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); // check selector add if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) { DataPermissionDTO dataPermissionDTO = new DataPermissionDTO(); dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId()); dataPermissionDTO.setDataId(selectorDO.getId()); dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE); dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO)); } } else { // update data, delete and then insert selectorCount = selectorMapper.updateSelective(selectorDO); //delete rule condition then add selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId())); selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO); selectorConditionMapper.insertSelective(selectorConditionDO); }); } // publish event publishEvent(selectorDO, selectorConditionDTOs); // update upstream updateDivideUpstream(selectorDO); return selectorCount; } // ......}
In the Service class to persist data, i.e. to the database, this should be familiar, not expand. The update upstream operation is analyzed in the corresponding section below, focusing on the publish event operation, which performs data synchronization.
The logic of the publishEvent() method is to find the plugin corresponding to the selector, build the conditional data, and publish the change data.
Change data released by eventPublisher.PublishEvent() is complete, the eventPublisher object is a ApplicationEventPublisher class, The fully qualified class name is org.springframework.context.ApplicationEventPublisher. Here we see that publishing data is done through Spring related functionality.
ApplicationEventPublisher:
When a state change, the publisher calls ApplicationEventPublisher of publishEvent method to release an event, Spring container broadcast event for all observers, The observer's onApplicationEvent method is called to pass the event object to the observer. There are two ways to call publishEvent method, one is to implement the interface by the container injection ApplicationEventPublisher object and then call the method, the other is a direct call container, the method of two methods of publishing events not too big difference.
ApplicationEventPublisher: publish event;
ApplicationEvent: Spring event, record the event source, time, and data;
ApplicationListener: event listener, observer.
In Spring event publishing mechanism, there are three objects,
An object is a publish event ApplicationEventPublisher, in ShenYu through the constructor in the injected a eventPublisher.
The other object is ApplicationEvent , inherited from ShenYu through DataChangedEvent, representing the event object.
public class DataChangedEvent extends ApplicationEvent {//......}
The last object is ApplicationListener in ShenYu in through DataChangedEventDispatcher class implements this interface, as the event listener, responsible for handling the event object.
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean { //......}
Released when the event is completed, will automatically enter the DataChangedEventDispatcher class onApplicationEvent() method of handling events.
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean { /** * This method is called when there are data changes * @param event */ @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { // Iterate through the data change listener (usually using a data synchronization approach is fine) for (DataChangedListener listener : listeners) { // What kind of data has changed switch (event.getGroupKey()) { case APP_AUTH: // app auth data listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: // plugin data listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: // rule data listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: // selector data listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: // metadata listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; default: // other types throw exception throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } }}
When there is a data change, the onApplicationEvent method is called and all the data change listeners are iterated to determine the data type and handed over to the appropriate data listener for processing.
ShenYu groups all the data into five categories: APP_AUTH, PLUGIN, RULE, SELECTOR and META_DATA.
Here the data change listener (DataChangedListener) is an abstraction of the data synchronization policy. Its concrete implementation is:
These implementation classes are the synchronization strategies currently supported by ShenYu:
WebsocketDataChangedListener: data synchronization based on Websocket;
ZookeeperDataChangedListener:data synchronization based on Zookeeper;
ConsulDataChangedListener: data synchronization based on Consul;
EtcdDataDataChangedListener:data synchronization based on etcd;
HttpLongPollingDataChangedListener:data synchronization based on http long polling;
NacosDataChangedListener:data synchronization based on nacos;
Given that there are so many implementation strategies, how do you decide which to use?
Because this paper is based on nacos data synchronization source code analysis, so here to NacosDataChangedListener as an example, the analysis of how it is loaded and implemented.
A global search in the source code project shows that its implementation is done in the DataSyncConfiguration class.
/** * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration { // some codes omitted here /** * The type Nacos listener. */ @Configuration @ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url") @Import(NacosConfiguration.class) static class NacosListener { /** * Data changed listener data changed listener. * * @param configService the config service * @return the data changed listener */ @Bean @ConditionalOnMissingBean(NacosDataChangedListener.class) public DataChangedListener nacosDataChangedListener(final ConfigService configService) { return new NacosDataChangedListener(configService); } /** * Nacos data init zookeeper data init. * * @param configService the config service * @param syncDataService the sync data service * @return the nacos data init */ @Bean @ConditionalOnMissingBean(NacosDataInit.class) public NacosDataInit nacosDataInit(final ConfigService configService, final SyncDataService syncDataService) { return new NacosDataInit(configService, syncDataService); } } // some codes omitted here}
This configuration class is implemented through the SpringBoot conditional assembly class. The NacosListener class has several annotations:
@ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url"): attribute condition. The configuration class takes effect only when the condition is met. That is, when we have the following configuration, nacos is used for data synchronization.
shenyu: sync: nacos: url: localhost:8848
@Import(NacosConfiguration.class):import a configration class NacosConfiguration, which provides a method ConfigService nacosConfigService(final NacosProperties nacosProp) to convert the nacos properties to a bean with the ConfigService type. We would take a look at how to generate the bean and then analyze the property configuration class and the property configuration file.
/** * Nacos configuration. */@EnableConfigurationProperties(NacosProperties.class)public class NacosConfiguration { /** * register configService in spring ioc. * * @param nacosProp the nacos configuration * @return ConfigService {@linkplain ConfigService} * @throws Exception the exception */ @Bean @ConditionalOnMissingBean(ConfigService.class) public ConfigService nacosConfigService(final NacosProperties nacosProp) throws Exception { Properties properties = new Properties(); if (nacosProp.getAcm() != null && nacosProp.getAcm().isEnabled()) { // Use aliyun ACM service properties.put(PropertyKeyConst.ENDPOINT, nacosProp.getAcm().getEndpoint()); properties.put(PropertyKeyConst.NAMESPACE, nacosProp.getAcm().getNamespace()); // Use subaccount ACM administrative authority properties.put(PropertyKeyConst.ACCESS_KEY, nacosProp.getAcm().getAccessKey()); properties.put(PropertyKeyConst.SECRET_KEY, nacosProp.getAcm().getSecretKey()); } else { properties.put(PropertyKeyConst.SERVER_ADDR, nacosProp.getUrl()); if (StringUtils.isNotBlank(nacosProp.getNamespace())) { properties.put(PropertyKeyConst.NAMESPACE, nacosProp.getNamespace()); } if (StringUtils.isNotBlank(nacosProp.getUsername())) { properties.put(PropertyKeyConst.USERNAME, nacosProp.getUsername()); } if (StringUtils.isNotBlank(nacosProp.getPassword())) { properties.put(PropertyKeyConst.PASSWORD, nacosProp.getPassword()); } } return NacosFactory.createConfigService(properties); }}
There are two steps in this method. Firstly, Properties object is generated and populated with the specified nacos path value and authority values on whether the alyun ACM service is used. Secondly, the nacos factory class would use its static factory method to create a configService object via reflect methods and then populate the object with the Properties object generated in the first step.
Now, let's analyze the NacosProperties class and its counterpart property file.
/** * The type Nacos config. */@ConfigurationProperties(prefix = "shenyu.sync.nacos")public class NacosProperties { private String url; private String namespace; private String username; private String password; private NacosACMProperties acm; /** * Gets the value of url. * * @return the value of url */ public String getUrl() { return url; } /** * Sets the url. * * @param url url */ public void setUrl(final String url) { this.url = url; } /** * Gets the value of namespace. * * @return the value of namespace */ public String getNamespace() { return namespace; } /** * Sets the namespace. * * @param namespace namespace */ public void setNamespace(final String namespace) { this.namespace = namespace; } /** * Gets the value of username. * * @return the value of username */ public String getUsername() { return username; } /** * Sets the username. * * @param username username */ public void setUsername(final String username) { this.username = username; } /** * Gets the value of password. * * @return the value of password */ public String getPassword() { return password; } /** * Sets the password. * * @param password password */ public void setPassword(final String password) { this.password = password; } /** * Gets the value of acm. * * @return the value of acm */ public NacosACMProperties getAcm() { return acm; } /** * Sets the acm. * * @param acm acm */ public void setAcm(final NacosACMProperties acm) { this.acm = acm; } public static class NacosACMProperties { private boolean enabled; private String endpoint; private String namespace; private String accessKey; private String secretKey; /** * Gets the value of enabled. * * @return the value of enabled */ public boolean isEnabled() { return enabled; } /** * Sets the enabled. * * @param enabled enabled */ public void setEnabled(final boolean enabled) { this.enabled = enabled; } /** * Gets the value of endpoint. * * @return the value of endpoint */ public String getEndpoint() { return endpoint; } /** * Sets the endpoint. * * @param endpoint endpoint */ public void setEndpoint(final String endpoint) { this.endpoint = endpoint; } /** * Gets the value of namespace. * * @return the value of namespace */ public String getNamespace() { return namespace; } /** * Sets the namespace. * * @param namespace namespace */ public void setNamespace(final String namespace) { this.namespace = namespace; } /** * Gets the value of accessKey. * * @return the value of accessKey */ public String getAccessKey() { return accessKey; } /** * Sets the accessKey. * * @param accessKey accessKey */ public void setAccessKey(final String accessKey) { this.accessKey = accessKey; } /** * Gets the value of secretKey. * * @return the value of secretKey */ public String getSecretKey() { return secretKey; } /** * Sets the secretKey. * * @param secretKey secretKey */ public void setSecretKey(final String secretKey) { this.secretKey = secretKey; } }}
When the property shenyu.sync.nacos.url is set in the property file, the shenyu admin would choose the nacos to sync data. At this time, the configuration class NacosListener would take effect and a bean with the type NacosDataChangedListener and another bean with the type NacosDataInit would both be generated.
nacosDataChangedListener, the bean with the type NacosDataChangedListener , takes the bean with the type ConfigService as a member variable. ConfigService is an api provided by nacos and can be used to send request to nacos server to modify configurations once the nacosDataChangedListener has accepted an event and trigger the callback method.
nacosDataInit, the bean with the type NacosDataInit, takes the bean configService and the bean syncDataService as memeber variables. It use configService to call the Nacos api to judge whether the configurations have been initialized, and would use syncDataService to refresh them if the answer is no.
As mentioned above, some operations of the listener would be triggered in the event handle method onApplicationEvent(). In this example, we update selector data and choose nacos to sync data, so the code about logic of the selector data changes in the NacosDataChangedListener class would be called.
//DataChangedEventDispatcher.java @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { // Iterate through the data change listener (usually using a data synchronization approach is fine) for (DataChangedListener listener : listeners) { // What kind of data has changed switch (event.getGroupKey()) { // some codes omitted case SELECTOR: // selector data listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; } } }
In the onSelectorChanged() method, determine the type of action, whether to refresh synchronization or update or create synchronization. Determine whether the node is in etcd based on the current selector data.
/** * Use nacos to push data changes. */public class NacosDataChangedListener implements DataChangedListener { @Override public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) { updateSelectorMap(getConfig(NacosPathConstants.SELECTOR_DATA_ID)); switch (eventType) { case DELETE: changed.forEach(selector -> { List<SelectorData> ls = SELECTOR_MAP .getOrDefault(selector.getPluginName(), new ArrayList<>()) .stream() .filter(s -> !s.getId().equals(selector.getId())) .sorted(SELECTOR_DATA_COMPARATOR) .collect(Collectors.toList()); SELECTOR_MAP.put(selector.getPluginName(), ls); }); break; case REFRESH: case MYSELF: SELECTOR_MAP.keySet().removeAll(SELECTOR_MAP.keySet()); changed.forEach(selector -> { List<SelectorData> ls = SELECTOR_MAP .getOrDefault(selector.getPluginName(), new ArrayList<>()) .stream() .sorted(SELECTOR_DATA_COMPARATOR) .collect(Collectors.toList()); ls.add(selector); SELECTOR_MAP.put(selector.getPluginName(), ls); }); break; default: changed.forEach(selector -> { List<SelectorData> ls = SELECTOR_MAP .getOrDefault(selector.getPluginName(), new ArrayList<>()) .stream() .filter(s -> !s.getId().equals(selector.getId())) .sorted(SELECTOR_DATA_COMPARATOR) .collect(Collectors.toList()); ls.add(selector); SELECTOR_MAP.put(selector.getPluginName(), ls); }); break; } publishConfig(NacosPathConstants.SELECTOR_DATA_ID, SELECTOR_MAP); } }
This is the core part. The variable changed represents the list , which needs to be updated, with the elements of the SelectorData type. The variable eventType represents the event type. The variable SELECTOR_MAP is with the type ConcurrentMap<String, List<SelectorData>>, so the key of the map is with the String type and the value is the selector list of this plugin. The value of the constant NacosPathConstants.SELECTOR_DATA_ID is shenyu.selector.json. The steps are as follows, firstly, use the method getConfig to call the api of Nacos to fetch the config with the group value of shenyu.selector.json from Nacos and call the updateSelectorMap method to use the config fetched above to update the SELECTOR_MAP so that the we refresh the selector config from Nacos. Secondly, we can update SELECTOR_MAP according to the event type and then use the publishConfig method to call the Nacos api to update all the config with the group value of shenyu.selector.json.
As long as the changed data is correctly written to the Nacos node, the admin side of the operation is complete.
In our current case, updating one of the selector data in the Divide plugin with a weight of 90 updates specific nodes in the graph.
We series the above update flow with a sequence diagram.
Assume that the ShenYu gateway is already running properly, and the data synchronization mode is also nacos. How does the gateway receive and process the selector data after updating it on the admin side and sending the changed data to nacos? Let's continue our source code analysis to find out.
The gateway side use NacosSyncDataService to watch nacos and fetch the data update, but before we dive into this part, let us take a look on how the bean with the type NacosSyncDataService is generated. The answer is it's defined in the Spring config class NacosSyncDataConfiguration. Let's focus on the annotation @ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url") on the class NacosSyncDataConfiguration again. We have met this annotation when we analyzed the NacosListener class on the Admin side before, this config class would take effect only and if only the condition on this annotation is matched. In other words, when we have the config as below on the gateway side, the gateway would use nacos to sync data and the config class NacosSyncDataConfiguration would take effect.
shenyu: sync: nacos: url: localhost:8848
/** * Nacos sync data configuration for spring boot. */@Configuration@ConditionalOnClass(NacosSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url")public class NacosSyncDataConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncDataConfiguration.class); /** * Nacos sync data service. * * @param configService the config service * @param pluginSubscriber the plugin subscriber * @param metaSubscribers the meta subscribers * @param authSubscribers the auth subscribers * @return the sync data service */ @Bean public SyncDataService nacosSyncDataService(final ObjectProvider<ConfigService> configService, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { LOGGER.info("you use nacos sync shenyu data......."); return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); } /** * Nacos config service config service. * * @param nacosConfig the nacos config * @return the config service * @throws Exception the exception */ @Bean public ConfigService nacosConfigService(final NacosConfig nacosConfig) throws Exception { Properties properties = new Properties(); if (nacosConfig.getAcm() != null && nacosConfig.getAcm().isEnabled()) { properties.put(PropertyKeyConst.ENDPOINT, nacosConfig.getAcm().getEndpoint()); properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getAcm().getNamespace()); properties.put(PropertyKeyConst.ACCESS_KEY, nacosConfig.getAcm().getAccessKey()); properties.put(PropertyKeyConst.SECRET_KEY, nacosConfig.getAcm().getSecretKey()); } else { properties.put(PropertyKeyConst.SERVER_ADDR, nacosConfig.getUrl()); if (StringUtils.isNotBlank(nacosConfig.getNamespace())) { properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getNamespace()); } if (nacosConfig.getUsername() != null) { properties.put(PropertyKeyConst.USERNAME, nacosConfig.getUsername()); } if (nacosConfig.getPassword() != null) { properties.put(PropertyKeyConst.PASSWORD, nacosConfig.getPassword()); } } return NacosFactory.createConfigService(properties); } /** * Http config http config. * * @return the http config */ @Bean @ConfigurationProperties(prefix = "shenyu.sync.nacos") public NacosConfig nacosConfig() { return new NacosConfig(); }}
Let's focus on the part of code above which is about the generation of the bean nacosSyncDataService:
@Beanpublic SyncDataService nacosSyncDataService(final ObjectProvider<ConfigService> configService, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { LOGGER.info("you use nacos sync shenyu data......."); return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));}
As we can see, the bean is generated by the construction method of the Class NacosSyncDataService. Let's dive into the construction method.
public NacosSyncDataService(final ConfigService configService, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { super(configService, pluginDataSubscriber, metaDataSubscribers, authDataSubscribers); start();}
protected void watcherData(final String dataId, final OnChange oc) { Listener listener = new Listener() { @Override public void receiveConfigInfo(final String configInfo) { oc.change(configInfo); } @Override public Executor getExecutor() { return null; } }; oc.change(getConfigAndSignListener(dataId, listener)); LISTENERS.computeIfAbsent(dataId, key -> new ArrayList<>()).add(listener); }
As we can see, the construction method calls the start method and calls the watcherData method to create a listener which relates itself to a callback method oc, since we're analyzing the changes on the component with the selector type, the relative callback method is updateSelectorMap. This callback method is used to handle data.
PluginDataSubscriber is an interface, it is only a CommonPluginDataSubscriber implementation class, responsible for data processing plugin, selector and rules.
It has no additional logic and calls the unSelectorSubscribe()andsubscribeDataHandler() method directly. Within methods, there are data types (plugins, selectors, or rules) and action types (update or delete) to perform different logic.
/** * The common plugin data subscriber, responsible for handling all plug-in, selector, and rule information */public class CommonPluginDataSubscriber implements PluginDataSubscriber { //...... // handle selector data @Override public void onSelectorSubscribe(final SelectoData selectorData) { subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE); } @Override public void unSelectorSubscribe(final SelectorData selectorData) { subscribeDataHandler(selectorData, DataEventTypeEnum.DELETE); } // A subscription data handler that handles updates or deletions of data private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { // plugin data if (data instanceof PluginData) { PluginData pluginData = (PluginData) data; if (dataType == DataEventTypeEnum.UPDATE) { // update // save the data to gateway memory BaseDataCache.getInstance().cachePluginData(pluginData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData)); } else if (dataType == DataEventTypeEnum.DELETE) { // delete // delete the data from gateway memory BaseDataCache.getInstance().removePluginData(pluginData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData)); } } else if (data instanceof SelectorData) { // selector data SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { // update // save the data to gateway memory BaseDataCache.getInstance().cacheSelectData(selectorData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData)); } else if (dataType == DataEventTypeEnum.DELETE) { // delete // delete the data from gateway memory BaseDataCache.getInstance().removeSelectData(selectorData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); } } else if (data instanceof RuleData) { // rule data RuleData ruleData = (RuleData) data; if (dataType == DataEventTypeEnum.UPDATE) { // update // save the data to gateway memory BaseDataCache.getInstance().cacheRuleData(ruleData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData)); } else if (dataType == DataEventTypeEnum.DELETE) { // delete // delete the data from gateway memory BaseDataCache.getInstance().removeRuleData(ruleData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData)); } } }); }}
// save the data to gateway memoryBaseDataCache.getInstance().cacheSelectData(selectorData);// If each plugin has its own processing logic, then do itOptional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
One is to save the data to the gateway's memory. BaseDataCache is the class that ultimately caches data, implemented in a singleton pattern. The selector data is stored in the SELECTOR_MAP Map. In the subsequent use, also from this data.
public final class BaseDataCache { // private instance private static final BaseDataCache INSTANCE = new BaseDataCache(); // private constructor private BaseDataCache() { } /** * Gets instance. * public method * @return the instance */ public static BaseDataCache getInstance() { return INSTANCE; } /** * A Map of the cache selector data * pluginName -> SelectorData. */ private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap(); public void cacheSelectData(final SelectorData selectorData) { Optional.ofNullable(selectorData).ifPresent(this::selectorAccept); } /** * cache selector data. * @param data the selector data */ private void selectorAccept(final SelectorData data) { String key = data.getPluginName(); if (SELECTOR_MAP.containsKey(key)) { // Update operation, delete before insert List<SelectorData> existList = SELECTOR_MAP.get(key); final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList()); resultList.add(data); final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList()); SELECTOR_MAP.put(key, collect); } else { // Add new operations directly to Map SELECTOR_MAP.put(key, Lists.newArrayList(data)); } }}
Second, if each plugin has its own processing logic, then do it. Through the IDEA editor, you can see that after adding a selector, there are the following plugins and processing. We're not going to expand it here.
After the above source tracking, and through a practical case, in the admin end to update a selector data, the ZooKeeper data synchronization process analysis is clear.
Let's series the data synchronization process on the gateway side through the sequence diagram:
The data synchronization process has been analyzed. In order to prevent the synchronization process from being interrupted, other logic is ignored during the analysis. We have analyzed the process of gateway synchronization operation initialization in the start method of NacosSyncDataService class. We also need to analyze the process of Admin synchronization data initialization.
On the admin side, the bean with the type NacosDataInit, is defined and generated in the NacosListener, if the configuration of the admin side decides to use nacos to sync data, when admin starts, the current data will be fully synchronized to nacos, the implementation logic is as follows:
/** * The type Nacos data init. */public class NacosDataInit implements CommandLineRunner { private static final Logger LOG = LoggerFactory.getLogger(NacosDataInit.class); private final ConfigService configService; private final SyncDataService syncDataService; /** * Instantiates a new Nacos data init. * @param configService the nacos config service * @param syncDataService the sync data service */ public NacosDataInit(final ConfigService configService, final SyncDataService syncDataService) { this.configService = configService; this.syncDataService = syncDataService; } @Override public void run(final String... args) { String pluginDataId = NacosPathConstants.PLUGIN_DATA_ID; String authDataId = NacosPathConstants.AUTH_DATA_ID; String metaDataId = NacosPathConstants.META_DATA_ID; if (dataIdNotExist(pluginDataId) && dataIdNotExist(authDataId) && dataIdNotExist(metaDataId)) { syncDataService.syncAll(DataEventTypeEnum.REFRESH); } } private boolean dataIdNotExist(final String pluginDataId) { try { String group = NacosPathConstants.GROUP; long timeout = NacosPathConstants.DEFAULT_TIME_OUT; return configService.getConfig(pluginDataId, group, timeout) == null; } catch (NacosException e) { LOG.error("Get data from nacos error.", e); throw new ShenyuException(e.getMessage()); } }}
Check whether there is data in nacos, if not, then synchronize.
NacosDataInit implements the CommandLineRunner interface. It is an interface provided by SpringBoot that executes the run() method after all Spring Beans initializations and is often used for initialization operations in a project.
SyncDataService.syncAll()
Query data from the database, and then perform full data synchronization, all authentication information, plugin information, selector information, rule information, and metadata information. Synchronous events are published primarily through eventPublisher. After publishing the event via publishEvent(), the ApplicationListener performs the event change operation. In ShenYu is mentioned in DataChangedEventDispatcher.
@Servicepublic class SyncDataServiceImpl implements SyncDataService { // eventPublisher private final ApplicationEventPublisher eventPublisher; /*** * sync all data * @param type the type * @return */ @Override public boolean syncAll(final DataEventTypeEnum type) { // app auth data appAuthService.syncData(); // plugin data List<PluginData> pluginDataList = pluginService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList)); // selector data List<SelectorData> selectorDataList = selectorService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList)); // rule data List<RuleData> ruleDataList = ruleService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList)); // metadata metaDataService.syncData(); return true; }}
Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.
In ShenYu gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for ZooKeeper, WebSocket, http long poll, Nacos, etcd and Consul. The main content of this article is based on http long poll data synchronization source code analysis.
This paper based on shenyu-2.5.0 version of the source code analysis, the official website of the introduction of please refer to the Data Synchronization Design .
Here is a direct quote from the official website with the relevant description.
The mechanism of Zookeeper and WebSocket data synchronization is relatively simple, while Http long polling is more complex. Apache ShenYu borrowed the design ideas of Apollo and Nacos, took their essence, and implemented Http long polling data synchronization function by itself. Note that this is not the traditional ajax long polling!
Http Long Polling mechanism as shown above, Apache ShenYu gateway active request shenyu-admin configuration service, read timeout time is 90s, means that the gateway layer request configuration service will wait at most 90s, so as to facilitate shenyu-admin configuration service timely response to change data, so as to achieve quasi real-time push.
The Http long polling mechanism is initiated by the gateway requesting shenyu-admin, so for this source code analysis, we start from the gateway side.
The Http long polling data synchronization configuration is loaded through spring boot starter mechanism when we introduce the relevant dependencies and have the following configuration in the configuration file.
Introduce dependencies in the pom file.
<!--shenyu data sync start use http--><dependency><groupId>org.apache.shenyu</groupId><artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId><version>${project.version}</version></dependency>
Add the following configuration to the application.yml configuration file.
shenyu:sync:http:url: http://localhost:9095
When the gateway is started, the configuration class HttpSyncDataConfiguration is executed, loading the corresponding Bean.
/** * Http sync data configuration for spring boot. */@Configuration@ConditionalOnClass(HttpSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")@EnableConfigurationProperties(value = HttpConfig.class)public class HttpSyncDataConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(HttpSyncDataConfiguration.class); /** * Rest template. * * @param httpConfig the http config * @return the rest template */ @Bean public RestTemplate restTemplate(final HttpConfig httpConfig) { OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory(); factory.setConnectTimeout(Objects.isNull(httpConfig.getConnectionTimeout()) ? (int) HttpConstants.CLIENT_POLLING_CONNECT_TIMEOUT : httpConfig.getConnectionTimeout()); factory.setReadTimeout(Objects.isNull(httpConfig.getReadTimeout()) ? (int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT : httpConfig.getReadTimeout()); factory.setWriteTimeout(Objects.isNull(httpConfig.getWriteTimeout()) ? (int) HttpConstants.CLIENT_POLLING_WRITE_TIMEOUT : httpConfig.getWriteTimeout()); return new RestTemplate(factory); } /** * AccessTokenManager. * * @param httpConfig the http config. * @param restTemplate the rest template. * @return the access token manager. */ @Bean public AccessTokenManager accessTokenManager(final HttpConfig httpConfig, final RestTemplate restTemplate) { return new AccessTokenManager(restTemplate, httpConfig); } /** * Http sync data service. * * @param httpConfig the http config * @param pluginSubscriber the plugin subscriber * @param restTemplate the rest template * @param metaSubscribers the meta subscribers * @param authSubscribers the auth subscribers * @param accessTokenManager the access token manager * @return the sync data service */ @Bean public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<RestTemplate> restTemplate, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers, final ObjectProvider<AccessTokenManager> accessTokenManager) { LOGGER.info("you use http long pull sync shenyu data"); return new HttpSyncDataService( Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()), Objects.requireNonNull(restTemplate.getIfAvailable()), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList), Objects.requireNonNull(accessTokenManager.getIfAvailable()) ); }}
HttpSyncDataConfiguration is the configuration class for Http long polling data synchronization, responsible for creating HttpSyncDataService (responsible for the concrete implementation of http data synchronization) 、 RestTemplate and AccessTokenManager (responsible for the access token processing). It is annotated as follows.
@Configuration: indicates that this is a configuration class.
@ConditionalOnClass(HttpSyncDataService.class): conditional annotation indicating that the class HttpSyncDataService is to be present.
@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url"): conditional annotation to have the property shenyu.sync.http.url configured.
@EnableConfigurationProperties(value = HttpConfig.class): indicates that the annotation @ConfigurationProperties(prefix = "shenyu.sync.http") on HttpConfig will take effect, and the configuration class HttpConfig will be injected into the Ioc container.
In the start() method, two things are done, one is to get the full amount of data, that is, to request the admin side to get all the data that needs to be synchronized, and then cache the acquired data into the gateway memory. The other is to open a multi-threaded execution of a long polling task.
public class HttpSyncDataService implements SyncDataService { // ...... private void start() { // It could be initialized multiple times, so you need to control that. if (RUNNING.compareAndSet(false, true)) { // fetch all group configs. // Initial startup, get full data this.fetchGroupConfig(ConfigGroupEnum.values()); // one backend service, one thread int threadSize = serverList.size(); // ThreadPoolExecutor this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), ShenyuThreadFactory.create("http-long-polling", true)); // start long polling, each server creates a thread to listen for changes. this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server))); } else { LOG.info("shenyu http long polling was started, executor=[{}]", executor); } } // ......}
ShenYu groups all the data that needs to be synchronized, there are 5 data types, namely plugins, selectors, rules, metadata and authentication data.
public enum ConfigGroupEnum { APP_AUTH, // app auth data PLUGIN, // plugin data RULE, // rule data SELECTOR, // selector data META_DATA; // meta data}
The admin may be a cluster, and here a request is made to each admin in a round-robin fashion, and if one succeeds, then the operation to get the full amount of data from the admin and cache it to the gateway is executed successfully. If there is an exception, the request is launched to the next admin.
public class HttpSyncDataService implements SyncDataService { // ...... private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException { // It is possible that admins are clustered, and here requests are made to each admin by means of a loop. for (int index = 0; index < this.serverList.size(); index++) { String server = serverList.get(index); try { // do execute this.doFetchGroupConfig(server, groups); // If you have a success, you are successful and can exit the loop break; } catch (ShenyuException e) { // An exception occurs, try executing the next // The last one also failed to execute, throwing an exception // no available server, throw exception. if (index >= serverList.size() - 1) { throw e; } LOG.warn("fetch config fail, try another one: {}", serverList.get(index + 1)); } } } // ......}
HttpSyncDataService#doFetchGroupConfig()
In this method, the request parameters are first assembled, then the request is launched through httpClient to admin to get the data, and finally the obtained data is updated to the gateway memory.
public class HttpSyncDataService implements SyncDataService { // ...... // Launch a request to the admin backend management system to get all synchronized data private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) { // 1. build request parameters, all grouped enumeration types StringBuilder params = new StringBuilder(); for (ConfigGroupEnum groupKey : groups) { params.append("groupKeys").append("=").append(groupKey.name()).append("&"); } // admin url: /configs/fetch String url = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_FETCH + "?" + StringUtils.removeEnd(params.toString(), "&"); LOG.info("request configs: [{}]", url); String json; try { HttpHeaders headers = new HttpHeaders(); // set accessToken headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken()); HttpEntity<String> httpEntity = new HttpEntity<>(headers); // 2. get a request for change data json = this.restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class).getBody(); } catch (RestClientException e) { String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage()); LOG.warn(message); throw new ShenyuException(message, e); } // update local cache // 3. Update data in gateway memory boolean updated = this.updateCacheWithJson(json); if (updated) { LOG.debug("get latest configs: [{}]", json); return; } // not updated. it is likely that the current config server has not been updated yet. wait a moment. LOG.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server); // No data update on the server side, just wait 30s ThreadUtils.sleep(TimeUnit.SECONDS, 30); } // ......}
From the code, we can see that the admin side provides the interface to get the full amount of data is /configs/fetch, so we will not go further here and put it in the later analysis.
If you get the result data from admin and update it successfully, then this method is finished. If there is no successful update, then it is possible that there is no data update on the server side, so wait 30s.
Here you need to explain in advance, the gateway in determining whether the update is successful, there is a comparison of the data operation, immediately mentioned.
HttpSyncDataService#updateCacheWithJson()
Update the data in the gateway memory. Use GSON for deserialization, take the real data from the property data and give it to DataRefreshFactory to do the update.
public class HttpSyncDataService implements SyncDataService { // ...... private boolean updateCacheWithJson(final String json) { // Using GSON for deserialization JsonObject jsonObject = GSON.fromJson(json, JsonObject.class); // if the config cache will be updated? return factory.executor(jsonObject.getAsJsonObject("data")); } // ......}
DataRefreshFactory#executor()
Update the data according to different data types and return the updated result. The specific update logic is given to the dataRefresh.refresh() method. In the update result, one of the data types is updated, which means that the operation has been updated.
public final class DataRefreshFactory { // ...... public boolean executor(final JsonObject data) { // update data List<Boolean> result = ENUM_MAP.values().parallelStream() .map(dataRefresh -> dataRefresh.refresh(data)) .collect(Collectors.toList()); // one of the data types is updated, which means that the operation has been updated. return result.stream().anyMatch(Boolean.TRUE::equals); } // ......}
AbstractDataRefresh#refresh()
The data update logic uses the template method design pattern, where the generic operation is done in the abstract method and the different implementation logic is done by subclasses. 5 data types have some differences in the specific update logic, but there is also a common update logic, and the class diagram relationship is as follows.
In the generic refresh() method, it is responsible for data type conversion, determining whether an update is needed, and the actual data refresh operation.
public abstract class AbstractDataRefresh<T> implements DataRefresh { // ...... @Override public Boolean refresh(final JsonObject data) { // convert data JsonObject jsonObject = convert(data); if (Objects.isNull(jsonObject)) { return false; } boolean updated = false; // get data ConfigData<T> result = fromJson(jsonObject); // does it need to be updated if (this.updateCacheIfNeed(result)) { updated = true; // real update logic, data refresh operation refresh(result.getData()); } return updated; } // ......}
AbstractDataRefresh#updateCacheIfNeed()
The process of data conversion, which is based on different data types, we will not trace further to see if the data needs to be updated logically. The method name is updateCacheIfNeed(), which is implemented by method overloading.
public abstract class AbstractDataRefresh<T> implements DataRefresh { // ...... // result is data protected abstract boolean updateCacheIfNeed(ConfigData<T> result); // newVal is the latest value obtained // What kind of data type is groupEnum protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) { // If it is the first time, then it is put directly into the cache and returns true, indicating that the update was made this time if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) { return true; } ResultHolder holder = new ResultHolder(false); GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> { // md5 value is the same, no need to update if (StringUtils.equals(oldVal.getMd5(), newVal.getMd5())) { LOG.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5()); return oldVal; } // The current cached data has been modified for a longer period than the new data and does not need to be updated. // must compare the last update time if (oldVal.getLastModifyTime() >= newVal.getLastModifyTime()) { LOG.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum); return oldVal; } LOG.info("update {} config: {}", groupEnum, newVal); holder.result = true; return newVal; }); return holder.result; } // ......}
As you can see from the source code above, there are two cases where updates are not required.
The md5 values of both data are the same, so no update is needed;
The current cached data has been modified longer than the new data, so no update is needed.
In other cases, the data needs to be updated.
At this point, we have finished analyzing the logic of the start() method to get the full amount of data for the first time, followed by the long polling operation. For convenience, I will paste the start() method once more.
public class HttpSyncDataService implements SyncDataService { // ...... private void start() { // It could be initialized multiple times, so you need to control that. if (RUNNING.compareAndSet(false, true)) { // fetch all group configs. // Initial startup, get full data this.fetchGroupConfig(ConfigGroupEnum.values()); // one backend service, one thread int threadSize = serverList.size(); // ThreadPoolExecutor this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), ShenyuThreadFactory.create("http-long-polling", true)); // start long polling, each server creates a thread to listen for changes. this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server))); } else { LOG.info("shenyu http long polling was started, executor=[{}]", executor); } } // ......}
The long polling task is HttpLongPollingTask, which implements the Runnable interface and the task logic is in the run() method. The task is executed continuously through a while() loop, i.e., long polling. There are three retries in each polling logic, one polling task fails, wait 5s and continue, 3 times all fail, wait 5 minutes and try again.
Start long polling, an admin service, and create a thread for data synchronization.
class HttpLongPollingTask implements Runnable { private final String server; HttpLongPollingTask(final String server) { this.server = server; } @Override public void run() { // long polling while (RUNNING.get()) { // Default retry 3 times int retryTimes = 3; for (int time = 1; time <= retryTimes; time++) { try { doLongPolling(server); } catch (Exception e) { if (time < retryTimes) { LOG.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}", time, retryTimes - time, e.getMessage()); // long polling failed, wait 5s and continue ThreadUtils.sleep(TimeUnit.SECONDS, 5); continue; } // print error, then suspended for a while. LOG.error("Long polling failed, try again after 5 minutes!", e); // 3 次都失败了,等 5 分钟再试 ThreadUtils.sleep(TimeUnit.MINUTES, 5); } } } LOG.warn("Stop http long polling."); }}
HttpSyncDataService#doLongPolling()
Core logic for performing long polling tasks.
Assembling request parameters based on data types: md5 and lastModifyTime.
Assembling the request header and request body.
Launching a request to admin to determine if the group data has changed.
Based on the group that has changed, go back and get the data.
public class HttpSyncDataService implements SyncDataService { private void doLongPolling(final String server) { // build request params: md5 and lastModifyTime MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8); for (ConfigGroupEnum group : ConfigGroupEnum.values()) { ConfigData<?> cacheConfig = factory.cacheConfigData(group); if (cacheConfig != null) { String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime())); params.put(group.name(), Lists.newArrayList(value)); } } // build request head and body HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); // set accessToken headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken()); HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, headers); String listenerUrl = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_LISTENER; JsonArray groupJson; //Initiate a request to admin to determine if the group data has changed //Here it just determines whether a group has changed or not try { String json = this.restTemplate.postForEntity(listenerUrl, httpEntity, String.class).getBody(); LOG.info("listener result: [{}]", json); JsonObject responseFromServer = GsonUtils.getGson().fromJson(json, JsonObject.class); groupJson = responseFromServer.getAsJsonArray("data"); } catch (RestClientException e) { String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage()); throw new ShenyuException(message, e); } // Depending on the group where the change occurred, go back and get the data /** * The official website explains here. * After the gateway receives the response message, it only knows which Group has made the configuration change, and it still needs to request the configuration data of that Group again. * There may be a question here: why not write out the changed data directly? * We also discussed this issue in depth during development, because the http long polling mechanism can only guarantee quasi-real time, if the processing at the gateway layer is not timely, * or the administrator frequently updates the configuration, it is very difficult to get the information from the gateway layer. * If it is not processed in time at the gateway level, or if the administrator updates the configuration frequently, it is very likely to miss the push of a configuration change, so for security reasons, we only inform a group that the information has changed. *For security reasons, we only notify a group of changes. * Personal understanding. * If the change data is written out directly, when the administrator frequently updates the configuration, the first update will remove the client from the blocking queue and return the response information to the gateway. * If a second update is made at this time, the current client is not in the blocking queue, so this time the change is missed. * The same is true for untimely processing by the gateway layer. * This is a long polling, one gateway one synchronization thread, there may be time consuming process. * If the admin has data changes, the current gateway client is not in the blocking queue and will not get the data. */ if (groupJson != null) { // fetch group configuration async. ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class); if (ArrayUtils.isNotEmpty(changedGroups)) { log.info("Group config changed: {}", Arrays.toString(changedGroups)); // Proactively get the changed data from admin, depending on the grouping, and take the data in full this.doFetchGroupConfig(server, changedGroups); } } if (Objects.nonNull(groupJson) && groupJson.size() > 0) { // fetch group configuration async. ConfigGroupEnum[] changedGroups = GsonUtils.getGson().fromJson(groupJson, ConfigGroupEnum[].class); LOG.info("Group config changed: {}", Arrays.toString(changedGroups)); // Proactively get the changed data from admin, depending on the grouping, and take the data in full this.doFetchGroupConfig(server, changedGroups); } }}
One special point needs to be explained here: In the long polling task, why don't you get the changed data directly? Instead, we determine which group data has been changed, and then request admin again to get the changed data?
The official explanation here is.
After the gateway receives the response information, it only knows which Group has changed its configuration, and it needs to request the configuration data of that Group again.
There may be a question here: Why not write out the changed data directly?
We have discussed this issue in depth during development, because the http long polling mechanism can only guarantee quasi-real time, and if it is not processed in time at the gateway layer, it will be very difficult to update the configuration data.
If the gateway layer is not processed in time, or the administrator updates the configuration frequently, it is likely to miss the push of a configuration change, so for security reasons, we only inform a group that the information has changed.
My personal understanding is that.
If the change data is written out directly, when the administrator updates the configuration frequently, the first update will remove the client from blocking queue and return the response information to the gateway. If a second update is made at this time, then the current client is not in the blocking queue, so this time the change is missed. The same is true for the gateway layer's untimely processing. This is a long polling, one gateway one synchronization thread, there may be a time-consuming process. If admin has data changes, the current gateway client is not in the blocking queue and will not get the data.
We have not yet analyzed the processing logic of the admin side, so let's talk about it roughly. At the admin end, the gateway client will be put into the blocking queue, and when there is a data change, the gateway client will come out of the queue and send the change data. So, if the gateway client is not in the blocking queue when there is a data change, then the current changed data is not available.
When we know which grouping data has changed, we actively get the changed data from admin again, and get the data in full depending on the grouping. The call method is doFetchGroupConfig(), which has been analyzed in the previous section.
At this point of analysis, the data synchronization operation on the gateway side is complete. The long polling task is to keep making requests to admin to see if the data has changed, and if any group data has changed, then initiate another request to admin to get the changed data, and then update the data in the gateway's memory.
From the previous analysis, it can be seen that the gateway side mainly calls two interfaces of admin.
/configs/listener: determine whether the group data has changed.
/configs/fetch: get the changed group data.
If we analyze directly from these two interfaces, some parts may not be well understood, so let's start analyzing the data synchronization process from the admin startup process.
If the following configuration is done in the configuration file application.yml, it means that the data synchronization is done by http long polling.
shenyu:sync:http:enabled:true
When the program starts, the configuration of the data synchronization class is loaded through springboot conditional assembly. In this process, HttpLongPollingDataChangedListener is created to handle the implementation logic related to long polling.
/** * Data synchronization configuration class * Conditional assembly via springboot * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration { /** * http long polling. */ @Configuration @ConditionalOnProperty(name = "shenyu.sync.http.enabled", havingValue = "true") @EnableConfigurationProperties(HttpSyncProperties.class) static class HttpLongPollingListener { @Bean @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class) public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) { return new HttpLongPollingDataChangedListener(httpSyncProperties); } }}
The data change listener is instantiated and initialized by means of a constructor. In the constructor, a blocking queue is created to hold clients, a thread pool is created to execute deferred tasks and periodic tasks, and information about the properties of long polling is stored.
public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) { // default client (here is the gateway) 1024 this.clients = new ArrayBlockingQueue<>(1024); // create thread pool // ScheduledThreadPoolExecutor can perform delayed tasks, periodic tasks, and normal tasks this.scheduler = new ScheduledThreadPoolExecutor(1, ShenyuThreadFactory.create("long-polling", true)); // http sync properties this.httpSyncProperties = httpSyncProperties; }
In addition, it has the following class diagram relationships.
The InitializingBean interface is implemented, so the afterInitialize() method is executed during the initialization of the bean. Execute periodic tasks via thread pool: updating the data in memory (CACHE) is executed every 5 minutes and starts after 5 minutes. Refreshing the local cache is reading data from the database to the local cache (in this case the memory), done by refreshLocalCache().
public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener { // ...... /** * is called in the afterPropertiesSet() method of the InitializingBean interface, which is executed during the initialization of the bean */ @Override protected void afterInitialize() { long syncInterval = httpSyncProperties.getRefreshInterval().toMillis(); // Periodically check the data for changes and update the cache // Execution cycle task: Update data in memory (CACHE) is executed every 5 minutes and starts after 5 minutes // Prevent the admin from starting up first for a while and then generating data; then the gateway doesn't get the full amount of data when it first connects scheduler.scheduleWithFixedDelay(() -> { LOG.info("http sync strategy refresh config start."); try { // Read data from database to local cache (in this case, memory) this.refreshLocalCache(); LOG.info("http sync strategy refresh config success."); } catch (Exception e) { LOG.error("http sync strategy refresh config error!", e); } }, syncInterval, syncInterval, TimeUnit.MILLISECONDS); LOG.info("http sync strategy refresh interval: {}ms", syncInterval); } // ......}
refreshLocalCache()
Update for each of the 5 data types.
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean { // ...... // Read data from database to local cache (in this case, memory) private void refreshLocalCache() { //update app auth data this.updateAppAuthCache(); //update plugin data this.updatePluginCache(); //update rule data this.updateRuleCache(); //update selector data this.updateSelectorCache(); //update meta data this.updateMetaDataCache(); } // ......}
The logic of the 5 update methods is similar, call the service method to get the data and put it into the memory CACHE. Take the updateRuleData method updateRuleCache() for example, pass in the rule enumeration type and call ruleService.listAll() to get all the rule data from the database.
Update the data in memory using the data in the database.
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean { // ...... // cache Map protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>(); /** * if md5 is not the same as the original, then update lcoal cache. * @param group ConfigGroupEnum * @param <T> the type of class * @param data the new config data */ protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) { // data serialization String json = GsonUtils.getInstance().toJson(data); // pass in md5 value and modification time ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis()); // update group data ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal); log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal); } // ......}
The initialization process is to start periodic tasks to update the memory data by fetching data from the database at regular intervals.
Next, we start the analysis of two interfaces.
/configs/listener: determines if the group data has changed.
/configs/listener: determines if the group data has changed.
The interface class is ConfigController, which only takes effect when using http long polling for data synchronization. The interface method listener() has no other logic and calls the doLongPolling() method directly.
/** * This Controller only when HttpLongPollingDataChangedListener exist, will take effect. */@ConditionalOnBean(HttpLongPollingDataChangedListener.class)@RestController@RequestMapping("/configs")public class ConfigController { private final HttpLongPollingDataChangedListener longPollingListener; public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) { this.longPollingListener = longPollingListener; } // Omit other logic /** * Listener. * Listen for data changes and perform long polling * @param request the request * @param response the response */ @PostMapping(value = "/listener") public void listener(final HttpServletRequest request, final HttpServletResponse response) { longPollingListener.doLongPolling(request, response); }}
Perform long polling tasks: If there are data changes, they will be responded to the client (in this case, the gateway side) immediately. Otherwise, the client will be blocked until there is a data change or a timeout.
public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener { // ...... /** * Execute long polling: If there is a data change, it will be responded to the client (here is the gateway side) immediately. * Otherwise, the client will otherwise remain blocked until there is a data change or a timeout. * @param request * @param response */ public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) { // compare group md5 // Compare the md5, determine whether the data of the gateway and the data of the admin side are consistent, and get the data group that has changed List<ConfigGroupEnum> changedGroup = compareChangedGroup(request); String clientIp = getRemoteIp(request); // response immediately. // Immediate response to the gateway if there is changed data if (CollectionUtils.isNotEmpty(changedGroup)) { this.generateResponse(response, changedGroup); Log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup); return; } // No change, then the client (in this case the gateway) is put into the blocking queue // listen for configuration changed. final AsyncContext asyncContext = request.startAsync(); // AsyncContext.settimeout() does not timeout properly, so you have to control it yourself asyncContext.setTimeout(0L); // block client's thread. scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT)); }}
To determine whether the group data has changed, the judgment logic is to compare the md5 value and lastModifyTime at the gateway side and the admin side.
If the md5 value is different, then it needs to be updated.
If the lastModifyTime on the admin side is greater than the lastModifyTime on the gateway side, then it needs to be updated.
/** * Determine if the group data has changed * @param request * @return */ private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) { List<ConfigGroupEnum> changedGroup = new ArrayList<>(ConfigGroupEnum.values().length); for (ConfigGroupEnum group : ConfigGroupEnum.values()) { // The md5 value and lastModifyTime of the data on the gateway side String[] params = StringUtils.split(request.getParameter(group.name()), ','); if (params == null || params.length != 2) { throw new ShenyuException("group param invalid:" + request.getParameter(group.name())); } String clientMd5 = params[0]; long clientModifyTime = NumberUtils.toLong(params[1]); ConfigDataCache serverCache = CACHE.get(group.name()); // do check. determine if the group data has changed if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) { changedGroup.add(group); } } return changedGroup; }
LongPollingClient
No change data, then the client (in this case the gateway) is put into the blocking queue. The blocking time is 60 seconds, i.e. after 60 seconds remove and respond to the client.
class LongPollingClient implements Runnable { // omitted other logic @Override public void run() { try { // Removal after 60 seconds and response to the client this.asyncTimeoutFuture = scheduler.schedule(() -> { clients.remove(LongPollingClient.this); List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest()); sendResponse(changedGroups); }, timeoutTime, TimeUnit.MILLISECONDS); // Add to blocking queue clients.add(this); } catch (Exception ex) { log.error("add long polling client error", ex); } } /** * Send response. * * @param changedGroups the changed groups */ void sendResponse(final List<ConfigGroupEnum> changedGroups) { // cancel scheduler if (null != asyncTimeoutFuture) { asyncTimeoutFuture.cancel(false); } // Groups responding to changes generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups); asyncContext.complete(); } }
Get the grouped data and return the result according to the parameters passed in by the gateway. The main implementation method is longPollingListener.fetchConfig().
@ConditionalOnBean(HttpLongPollingDataChangedListener.class)@RestController@RequestMapping("/configs")public class ConfigController { private final HttpLongPollingDataChangedListener longPollingListener; public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) { this.longPollingListener = longPollingListener; } /** * Fetch configs shenyu result. * @param groupKeys the group keys * @return the shenyu result */ @GetMapping("/fetch") public ShenyuAdminResult fetchConfigs(@NotNull final String[] groupKeys) { Map<String, ConfigData<?>> result = Maps.newHashMap(); for (String groupKey : groupKeys) { ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey)); result.put(groupKey, data); } return ShenyuAdminResult.success(ShenyuResultMessage.SUCCESS, result); } // Other interfaces are omitted}
AbstractDataChangedListener#fetchConfig()
Data fetching is taken directly from CACHE, and then matched and encapsulated according to different grouping types.
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean { // ...... /** * fetch configuration from cache. * @param groupKey the group key * @return the configuration data */ public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) { // get data from CACHE ConfigDataCache config = CACHE.get(groupKey.name()); switch (groupKey) { case APP_AUTH: // app auth data return buildConfigData(config, AppAuthData.class); case PLUGIN: // plugin data return buildConfigData(config, PluginData.class); case RULE: // rule data return buildConfigData(config, RuleData.class); case SELECTOR: // selector data return buildConfigData(config, SelectorData.class); case META_DATA: // meta data return buildConfigData(config, MetaData.class); default: // other data type, throw exception throw new IllegalStateException("Unexpected groupKey: " + groupKey); } } // ......}
In the previous websocket data synchronization and zookeeper data synchronization source code analysis article, we know that the admin side data synchronization design structure is as follows.
Various data change listeners are subclasses of DataChangedListener.
When the data is modified on the admin side, event notifications are sent through the Spring event handling mechanism. The sending logic is as follows.
/** * Event forwarders, which forward the changed events to each ConfigEventListener. * Data change event distributor: synchronize the change data to ShenYu gateway when there is a data change in admin side * Data changes rely on Spring's event-listening mechanism: ApplicationEventPublisher --> ApplicationEvent --> ApplicationListener * */@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean { // other logic omitted /** * Call this method when there are data changes * @param event */ @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { // Iterate through the data change listeners (it's generally good to use a kind of data synchronization) for (DataChangedListener listener : listeners) { // What kind of data has changed switch (event.getGroupKey()) { case APP_AUTH: // app auth data listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: // plugin data listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: // rule data listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: // selector data listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); // pull and save API document on seletor changed applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: // meta data listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; default: // other data type, throw exception throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } }}
Suppose, the plugin information is modified and the data is synchronized by http long polling, then the actual call to listener.onPluginChanged() is org.apache.shenyu.admin.listener. AbstractDataChangedListener#onPluginChanged.
/** * In the operation of the admin, there is an update of the plugin occurred * @param changed the changed * @param eventType the event type */ @Override public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) { if (CollectionUtils.isEmpty(changed)) { return; } // update CACHE this.updatePluginCache(); // execute change task this.afterPluginChanged(changed, eventType); }
There are two processing operations, one is to update the memory CACHE, which was analyzed earlier, and the other is to execute the change task, which is executed in the thread pool.
@Override protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) { // execute by thread pool scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN)); }
DataChangeTask
Data change task: remove the clients in the blocking queue in turn and send a response to notify the gateway that a group of data has changed.
class DataChangeTask implements Runnable { //other logic omitted @Override public void run() { // If the client in the blocking queue exceeds the given value of 100, it is executed in batches if (clients.size() > httpSyncProperties.getNotifyBatchSize()) { List<LongPollingClient> targetClients = new ArrayList<>(clients.size()); clients.drainTo(targetClients); List<List<LongPollingClient>> partitionClients = Lists.partition(targetClients, httpSyncProperties.getNotifyBatchSize()); // batch execution partitionClients.forEach(item -> scheduler.execute(() -> doRun(item))); } else { // execute task doRun(clients); } } private void doRun(final Collection<LongPollingClient> clients) { // Notify all clients that a data change has occurred for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) { LongPollingClient client = iter.next(); iter.remove(); // send response to client client.sendResponse(Collections.singletonList(groupKey)); Log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime); } } }
At this point, the data synchronization logic on the admin side is analyzed. In the http long polling based data synchronization is, it has three main functions.
providing a data change listening interface.
providing the interface to get the changed data.
When there is a data change, remove the client in the blocking queue and respond to the result.
Finally, three diagrams describe the long polling task flow on the admin side.
/configs/listener data change listener interface.
/configs/fetch fetch change data interface.
Update data in the admin backend management system for data synchronization.
This article focuses on the source code analysis of http long polling data synchronization in the ShenYu gateway. The main knowledge points involved are as follows.
http long polling is initiated by the gateway side, which constantly requests the admin side.
change data at group granularity (authentication information, plugins, selectors, rules, metadata).
http long polling results in getting only the change group, and another request needs to be initiated to get the group data.
Whether the data is updated or not is determined by the md5 value and the modification time lastModifyTime.