Code Analysis For Divide Plugin
The ShenYu
gateway uses the divide
plugin to handle http
requests. You can see the official documentation Quick start with Http to learn how to use this plugin.
This article is based on
shenyu-2.4.3
version for source code analysis, please refer to Http Proxy for the introduction of the official website.
#
1. Register Service#
1.1 Declaration of registration interfaceUse the annotation @ShenyuSpringMvcClient
to register the service to the gateway. The simple demo
is as follows.
@RestController@RequestMapping("/order")@ShenyuSpringMvcClient(path = "/order") // APIpublic class OrderController { @GetMapping("/findById") @ShenyuSpringMvcClient(path = "/findById", desc = "Find by id") // method public OrderDTO findById(@RequestParam("id") final String id) { return build(id, "hello world findById"); }}
define annotation:
@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})public @interface ShenyuSpringMvcClient { //path String path() default ""; //rule name String ruleName() default ""; //desc info String desc() default "";
//is enabled boolean enabled() default true; //register MetaData boolean registerMetaData() default false;}
#
1.2 Scan annotationAnnotation scanning is done through SpringMvcClientBeanPostProcessor
, which implements the BeanPostProcessor
interface and is a post-processor provided by Spring
.
During constructor instantiation.
- Read the property configuration
- Add annotations, read
path
information - Start the registry and register with
shenyu-admin
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor { //... /** * Constructor instantiation */ public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) { // 1. read Properties Properties props = clientConfig.getProps(); this.appName = props.getProperty(ShenyuClientConstants.APP_NAME); this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH, ""); if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) { String errorMsg = "http register param must config the appName or contextPath"; LOG.error(errorMsg); throw new ShenyuClientIllegalArgumentException(errorMsg); } this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString())); // 2. add annotation mappingAnnotation.add(ShenyuSpringMvcClient.class); mappingAnnotation.add(PostMapping.class); mappingAnnotation.add(GetMapping.class); mappingAnnotation.add(DeleteMapping.class); mappingAnnotation.add(PutMapping.class); mappingAnnotation.add(RequestMapping.class); // 3. start register cneter publisher.start(shenyuClientRegisterRepository); } @Override public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException { // override post process return bean; }
- SpringMvcClientBeanPostProcessor#postProcessAfterInitialization()
Rewrite post-processor logic: read annotation information, construct metadata objects and URI
objects, and register them with shenyu-admin
.
@Override public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException { // 1. If the all service is registered or is not a Controller class, it is not handled if (Boolean.TRUE.equals(isFull) || !hasAnnotation(bean.getClass(), Controller.class)) { return bean; } // 2. Read the annotations on the class ShenyuSpringMvcClient final ShenyuSpringMvcClient beanShenyuClient = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class); // 2.1 build superPath final String superPath = buildApiSuperPath(bean.getClass()); // 2.2 whether to register the entire class method if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) { // build the metadata object and register it with shenyu-admin publisher.publishEvent(buildMetaDataDTO(beanShenyuClient, pathJoin(contextPath, superPath))); return bean; } // 3. read all methods final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass()); for (Method method : methods) { // 3.1 read the annotations on the method ShenyuSpringMvcClient ShenyuSpringMvcClient methodShenyuClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class); // If there is no annotation on the method, use the annotation on the class methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient; if (Objects.nonNull(methodShenyuClient)) { // 3.2 Build path information, build metadata objects, register with shenyu-admin publisher.publishEvent(buildMetaDataDTO(methodShenyuClient, buildApiPath(method, superPath))); } } return bean; }
- If you are registering the whole service or not
Controller
class, do not handle it
- If you are registering the whole service or not
- read the annotation on the class
ShenyuSpringMvcClient
, if the whole class is registered, build the metadata object here and register it withshenyu-admin
.
- read the annotation on the class
- Annotation on the handler method
ShenyuSpringMvcClient
, buildpath
information for the specific method, build the metadata object and then register it withshenyu-admin
- Annotation on the handler method
There are two methods here that take path
and need special instructions.
- buildApiSuperPath()
Construct SuperPath
: first take the path
property from the annotation ShenyuSpringMvcClient
on the class, if not, take the path
information from the RequestMapping
annotation on the current class.
private String buildApiSuperPath(@NonNull final Class<?> method) { // First take the path property from the annotation ShenyuSpringMvcClient on the class ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class); if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) { return shenyuSpringMvcClient.path(); } // Take the path information from the RequestMapping annotation of the current class RequestMapping requestMapping = AnnotationUtils.findAnnotation(method, RequestMapping.class); if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) { return requestMapping.path()[0]; } return ""; }
- buildApiPath()
Build path
: first read the annotation ShenyuSpringMvcClient
on the method and build it if it exists; otherwise get the path
information from other annotations on the method; complete path = contextPath(context information) + superPath(class information) + methodPath(method information)
.
private String buildApiPath(@NonNull final Method method, @NonNull final String superPath) { // 1. Read the annotation ShenyuSpringMvcClient on the method ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class); // 1.1 If path exists, build if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) { //1.2 path = contextPath+superPath+methodPath return pathJoin(contextPath, superPath, shenyuSpringMvcClient.path()); } // 2. Get path information from other annotations on the method final String path = getPathByMethod(method); if (StringUtils.isNotBlank(path)) { // 2.1 path = contextPath+superPath+methodPath return pathJoin(contextPath, superPath, path); } return pathJoin(contextPath, superPath); }
- getPathByMethod()
Get path
information from other annotations on the method, other annotations include.
- ShenyuSpringMvcClient
- PostMapping
- GetMapping
- DeleteMapping
- PutMapping
- RequestMapping
private String getPathByMethod(@NonNull final Method method) { // Iterate through interface annotations to get path information for (Class<? extends Annotation> mapping : mappingAnnotation) { final String pathByAnnotation = getPathByAnnotation(AnnotationUtils.findAnnotation(method, mapping), pathAttributeNames); if (StringUtils.isNotBlank(pathByAnnotation)) { return pathByAnnotation; } } return null; }
After the scanning annotation is finished, construct the metadata object and send the object to shenyu-admin
to complete the registration.
- Metadata
Includes the rule information of the currently registered method: contextPath, appName, registration path, description information, registration type, whether it is enabled, rule name and whether to register metadata.
private MetaDataRegisterDTO buildMetaDataDTO(@NonNull final ShenyuSpringMvcClient shenyuSpringMvcClient, final String path) { return MetaDataRegisterDTO.builder() .contextPath(contextPath) // contextPath .appName(appName) // appName .path(path) // Registered path, used when gateway rules match .pathDesc(shenyuSpringMvcClient.desc()) // desc info .rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, http type when default .enabled(shenyuSpringMvcClient.enabled()) // is enabled? .ruleName(StringUtils.defaultIfBlank(shenyuSpringMvcClient.ruleName(), path))//rule name .registerMetaData(shenyuSpringMvcClient.registerMetaData()) // whether to register metadata information .build(); }
The specific registration logic is implemented by the registration center, which has been analyzed in the previous articles and will not be analyzed in depth here.
#
1.3 Register URI DataContextRegisterListener
is responsible for registering the client's URI
information to shenyu-admin
, it implements the ApplicationListener
interface, when the context refresh event ContextRefreshedEvent
occurs, the onApplicationEvent()
method is executed to implement the registration logic.
public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent>, BeanFactoryAware { //...... /** * Constructor instantiation */ public ContextRegisterListener(final PropertiesConfig clientConfig) { // read Properties final Properties props = clientConfig.getProps(); this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString())); this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH); if (Boolean.TRUE.equals(isFull)) { if (StringUtils.isBlank(contextPath)) { final String errorMsg = "http register param must config the contextPath"; LOG.error(errorMsg); throw new ShenyuClientIllegalArgumentException(errorMsg); } } this.port = Integer.parseInt(Optional.ofNullable(props.getProperty(ShenyuClientConstants.PORT)).orElseGet(() -> "-1")); this.appName = props.getProperty(ShenyuClientConstants.APP_NAME); this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP); this.host = props.getProperty(ShenyuClientConstants.HOST); }
@Override public void setBeanFactory(final BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; }
// Execute application events @Override public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) { // The method is guaranteed to be executed once if (!registered.compareAndSet(false, true)) { return; } // 1. If you are registering for the entire service if (Boolean.TRUE.equals(isFull)) { // Build metadata and register publisher.publishEvent(buildMetaDataDTO()); } try { // get port final int mergedPort = port <= 0 ? PortUtils.findPort(beanFactory) : port; // 2. Constructing URI data and registering publisher.publishEvent(buildURIRegisterDTO(mergedPort)); } catch (ShenyuException e) { throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !"); } }
// build URI data private URIRegisterDTO buildURIRegisterDTO(final int port) { return URIRegisterDTO.builder() .contextPath(this.contextPath) // contextPath .appName(appName) // appName .protocol(protocol) // protocol .host(IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host)) //host .port(port) // port .rpcType(RpcTypeEnum.HTTP.getName()) // divide plugin, default registration http type .build(); }
// build MetaData private MetaDataRegisterDTO buildMetaDataDTO() { return MetaDataRegisterDTO.builder() .contextPath(contextPath) .appName(appName) .path(contextPath) .rpcType(RpcTypeEnum.HTTP.getName()) .enabled(true) .ruleName(contextPath) .build(); }}
#
1.4 Handle registration informationThe metadata and URI
data registered by the client through the registry are processed in shenyu-admin
, which is responsible for storing to the database and synchronizing to the shenyu
gateway. The client registration processing logic of Divide
plugin is in ShenyuClientRegisterDivideServiceImpl
. The inheritance relationship is as follows.
- ShenyuClientRegisterService: client registration service, top-level interface.
- FallbackShenyuClientRegisterService: registration failure, provides retry operation.
- AbstractShenyuClientRegisterServiceImpl: abstract class, implements part of the public registration logic;
- AbstractContextPathRegisterService: abstract class, responsible for registering
ContextPath
. - ShenyuClientRegisterDivideServiceImpl: implementation of the
Divide
plug-in registration.
#
1.4.1 Register Service- org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()
The metadata MetaDataRegisterDTO
object registered by the client through the registry is picked up and dropped in the register()
method of shenyu-admin
.
@Override public String register(final MetaDataRegisterDTO dto) { //1. register selector String selectorHandler = selectorHandler(dto); String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler); //2. register rule String ruleHandler = ruleHandler(); RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler); ruleService.registerDefault(ruleDTO); //3. register metadat registerMetadata(dto); //4. register ContextPath String contextPath = dto.getContextPath(); if (StringUtils.isNotEmpty(contextPath)) { registerContextPath(dto); } return ShenyuResultMessage.SUCCESS; }
#
1.4.1.1 Register Selector- org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()
Build contextPath
, find if the selector information exists, if it does, return id
; if it doesn't, create the default selector information.
@Override public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) { // build contextPath String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName()); // Find if selector information exists by name SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName); if (Objects.isNull(selectorDO)) { // Create a default selector message if it does not exist return registerSelector(contextPath, pluginName, selectorHandler); } return selectorDO.getId(); }
- Default Selector Information
Construct the default selector information and its conditional properties here.
//register selector private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) { // build selector SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId()); selectorDTO.setHandle(selectorHandler); //register default Selector return registerDefault(selectorDTO); } //build private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) { //build default SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath); selectorDTO.setPluginId(pluginId); //build the conditional properties of the default selector selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath)); return selectorDTO; }
- Build Default Selector
private SelectorDTO buildDefaultSelectorDTO(final String name) { return SelectorDTO.builder() .name(name) // name .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // default CUSTOM_FLOW .matchMode(MatchModeEnum.AND.getCode()) //default AND .enabled(Boolean.TRUE) //default TRUE .loged(Boolean.TRUE) //default TRUE .continued(Boolean.TRUE) //default TRUE .sort(1) //default 1 .build();}
- Build default selector conditional properties
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) { SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO(); selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI selectorConditionDTO.setParamName("/"); selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default match selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // default /contextPath/** return Collections.singletonList(selectorConditionDTO);}
- Register default selector
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) { //selector info SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); //selector condition info List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); if (StringUtils.isEmpty(selectorDTO.getId())) { // insert selector information into the database selectorMapper.insertSelective(selectorDO); // insert selector condition information into the database selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); } // Publish synchronization events to synchronize selection information and its conditional attributes to the gateway publishEvent(selectorDO, selectorConditionDTOs); return selectorDO.getId();}
#
1.4.1.2 Register RuleIn the second step of registering the service, start building the default rules and then register the rules.
@Override public String register(final MetaDataRegisterDTO dto) { //1. register selector //...... //2. register rule // default rule handle String ruleHandler = ruleHandler(); // build default rule RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler); // register rule ruleService.registerDefault(ruleDTO); //3. register Metadata //...... //4. register ContextPath //...... return ShenyuResultMessage.SUCCESS; }
- default rule handle
@Override protected String ruleHandler() { // default rule handle return new DivideRuleHandle().toJson(); }
Divide
plugin default rule handle.
public class DivideRuleHandle implements RuleHandle {
/** * load balance: default RANDOM */ private String loadBalance = LoadBalanceEnum.RANDOM.getName();
/** * retry strategy: default CURRENT */ private String retryStrategy = RetryEnum.CURRENT.getName();
/** * retry: default 3 */ private int retry = 3;
/** * retry: default 3000 */ private long timeout = Constants.TIME_OUT;
/** * retry: default 10240 byte */ private long headerMaxSize = Constants.HEADER_MAX_SIZE;
/** * retry: default 102400 byte */ private long requestMaxSize = Constants.REQUEST_MAX_SIZE;}
- build default rule info
// build default rule info private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) { return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath()); } // build default rule info private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) { RuleDTO ruleDTO = RuleDTO.builder() .selectorId(selectorId) //selector Id .name(ruleName) //rule Name .matchMode(MatchModeEnum.AND.getCode()) // default and .enabled(Boolean.TRUE) // default TRUE .loged(Boolean.TRUE) //default TRUE .sort(1) //default 1 .handle(ruleHandler) .build(); RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder() .paramType(ParamTypeEnum.URI.getName()) // default URI .paramName("/") .paramValue(path) // path .build(); if (path.indexOf("*") > 1) { ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //if the path conatins *, default match } else { ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // default = } ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO)); return ruleDTO; }
- org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()
Registration rules: insert records to the database and publish events to the gateway for data synchronization.
@Override public String registerDefault(final RuleDTO ruleDTO) { RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName()); if (Objects.nonNull(exist)) { return ""; }
RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO); List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions(); if (StringUtils.isEmpty(ruleDTO.getId())) { // insert rule into database ruleMapper.insertSelective(ruleDO); //insert rule condition into database ruleConditions.forEach(ruleConditionDTO -> { ruleConditionDTO.setRuleId(ruleDO.getId()); ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO)); }); } // Publish events to the gateway for data synchronization publishEvent(ruleDO, ruleConditions); return ruleDO.getId(); }
#
1.4.1.3 Register Metadata @Override public String register(final MetaDataRegisterDTO dto) { //1. register selector //...... //2. register rule //...... //3. register metadata registerMetadata(dto); //4. register ContextPath //...... return ShenyuResultMessage.SUCCESS; }
- org.apache.shenyu.admin.service.register.ShenyuClientRegisterDivideServiceImpl#registerMetadata()
Insert or update metadata and then publish sync events to the gateway.
@Override protected void registerMetadata(final MetaDataRegisterDTO dto) { if (dto.isRegisterMetaData()) { MetaDataService metaDataService = getMetaDataService(); MetaDataDO exist = metaDataService.findByPath(dto.getPath()); // save or update MetaData metaDataService.saveOrUpdateMetaData(exist, dto); } }
@Override public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) { DataEventTypeEnum eventType; // DTO->DO MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO); // insert if (Objects.isNull(exist)) { Timestamp currentTime = new Timestamp(System.currentTimeMillis()); metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid()); metaDataDO.setDateCreated(currentTime); metaDataDO.setDateUpdated(currentTime); metaDataMapper.insert(metaDataDO); eventType = DataEventTypeEnum.CREATE; } else { // update metaDataDO.setId(exist.getId()); metaDataMapper.update(metaDataDO); eventType = DataEventTypeEnum.UPDATE; } // publish event to gateway eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType, Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO)))); }
#
1.4.1.4 Register ContextPath @Override public String register(final MetaDataRegisterDTO dto) { //1. register selector //...... //2. register rule //...... //3. register metadata //...... //4. register ContextPath String contextPath = dto.getContextPath(); if (StringUtils.isNotEmpty(contextPath)) { registerContextPath(dto); } return ShenyuResultMessage.SUCCESS; }
- org.apache.shenyu.admin.service.register.AbstractContextPathRegisterService#registerContextPath()
@Override public void registerContextPath(final MetaDataRegisterDTO dto) { // set contextPath for selector String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), ""); ContextMappingRuleHandle handle = new ContextMappingRuleHandle(); handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath())); // set contextPath for rule getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson())); }
#
1.4.2 Register URI- org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()
The server side receives the URI
information registered by the client and processes it.
@Override public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) { String result; String key = key(selectorName); try { this.removeFallBack(key); // register URI result = this.doRegisterURI(selectorName, uriList); logger.info("Register success: {},{}", selectorName, uriList); } catch (Exception ex) { logger.warn("Register exception: cause:{}", ex.getMessage()); result = ""; // Retry after registration failure this.addFallback(key, new FallbackHolder(selectorName, uriList)); } return result; }
- org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()
Get a valid URI
from the URI
registered by the client, update the corresponding selector handle
property, and send a selector update event to the gateway.
@Override public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) { //check if (CollectionUtils.isEmpty(uriList)) { return ""; } //get selector SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); if (Objects.isNull(selectorDO)) { throw new ShenyuException("doRegister Failed to execute,wait to retry."); } // gte valid URI List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList()); // build handle String handler = buildHandle(validUriList, selectorDO); if (handler != null) { selectorDO.setHandle(handler); SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); selectorData.setHandle(handler); // Update the handle property of the selector to the database selectorService.updateSelective(selectorDO); // Send selector update events to the gateway eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData))); } return ShenyuResultMessage.SUCCESS; }
The source code analysis on service registration is completed as well as the analysis flow chart is as follows.
The next step is to analyze how the divide
plugin initiates a call to the http
service based on this information.
#
2. Call Http ServiceThe divide
plugin is the core processing plugin used by the gateway to handle http protocol
requests.
Take the case provided on the official website Quick start with Http as an example, a direct connection request is as follows.
GET http://localhost:8189/order/findById?id=100Accept: application/json
After proxying through the ShenYu
gateway, the request is as follows.
GET http://localhost:9195/http/order/findById?id=100Accept: application/json
The services proxied by the ShenYu
gateway are still able to request the previous services, where the divide
plugin comes into play. The class inheritance relationship is as follows.
- ShenyuPlugin: top-level interface, defining interface methods.
- AbstractShenyuPlugin: abstract class that implements the common logic of the pluin.
- DividePlugin: Divide pluin.
#
2.1 Accept RequestAfter passing the ShenYu
gateway proxy, the request entry is ShenyuWebHandler
, which implements the org.springframework.web.server.WebHandler
interface.
public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> { //...... /** * hanlde web reuest */ @Override public Mono<Void> handle(@NonNull final ServerWebExchange exchange) { // execute plugin chain Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange); if (scheduled) { return execute.subscribeOn(scheduler); } return execute; } private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
private int index;
private final List<ShenyuPlugin> plugins;
/** * Instantiating the default plugin chain */ DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) { this.plugins = plugins; }
/** * Execute each plugin */ @Override public Mono<Void> execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { // get current plugin ShenyuPlugin plugin = plugins.get(this.index++); // is skip ? boolean skip = plugin.skip(exchange); if (skip) { // If skipped, execute the next return this.execute(exchange); } // execute current plugin return plugin.execute(exchange, this); } return Mono.empty(); }); } }}
#
2.2 Matching rule- org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()
Execute the matching logic for selectors and rules in the execute()
method.
- Matching selectors.
- Matching rules.
- Execute the plugin.
@Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { String pluginName = named(); PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName); if (pluginData != null && pluginData.getEnabled()) { // selector final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName); if (CollectionUtils.isEmpty(selectors)) { return handleSelectorIfNull(pluginName, exchange, chain); } // match selector SelectorData selectorData = matchSelector(exchange, selectors); if (Objects.isNull(selectorData)) { return handleSelectorIfNull(pluginName, exchange, chain); } selectorLog(selectorData, pluginName); // rule List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId()); if (CollectionUtils.isEmpty(rules)) { return handleRuleIfNull(pluginName, exchange, chain); } // match rule RuleData rule; if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) { //get last rule = rules.get(rules.size() - 1); } else { rule = matchRule(exchange, rules); } if (Objects.isNull(rule)) { return handleRuleIfNull(pluginName, exchange, chain); } ruleLog(rule, pluginName); // execute return doExecute(exchange, chain, selectorData, rule); } return chain.execute(exchange); }
#
2.3 Execute Divide Plugin- org.apache.shenyu.plugin.divide.DividePlugin#doExecute()
Execute the specific logic of the divide
plugin in the doExecute()
method.
- Checks the
header
size. - Checking the
request
size. - Obtaining the list of services.
- implementing load balancing.
- Set request
url
, timeout time, retry policy.
@Override protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) { // shenyu Context ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT); assert shenyuContext != null; // Get the handle property of the rule DivideRuleHandle ruleHandle = DividePluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule)); long headerSize = 0; // check header size for (List<String> multiHeader : exchange.getRequest().getHeaders().values()) { for (String value : multiHeader) { headerSize += value.getBytes(StandardCharsets.UTF_8).length; } } if (headerSize > ruleHandle.getHeaderMaxSize()) { LOG.error("request header is too large"); Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE, null); return WebFluxResultUtils.result(exchange, error); } // check request size if (exchange.getRequest().getHeaders().getContentLength() > ruleHandle.getRequestMaxSize()) { LOG.error("request entity is too large"); Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE, null); return WebFluxResultUtils.result(exchange, error); } // upstream list List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId()); if (CollectionUtils.isEmpty(upstreamList)) { LOG.error("divide upstream configuration error: {}", rule); Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null); return WebFluxResultUtils.result(exchange, error); } // request ip String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress(); // load balance Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip); if (Objects.isNull(upstream)) { LOG.error("divide has no upstream"); Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null); return WebFluxResultUtils.result(exchange, error); } // set url String domain = upstream.buildDomain(); exchange.getAttributes().put(Constants.HTTP_DOMAIN, domain); // set timeout exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout()); exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry()); // set retry exchange.getAttributes().put(Constants.RETRY_STRATEGY, ruleHandle.getRetryStrategy()); exchange.getAttributes().put(Constants.LOAD_BALANCE, ruleHandle.getLoadBalance()); exchange.getAttributes().put(Constants.DIVIDE_SELECTOR_ID, selector.getId()); return chain.execute(exchange); }
#
2.4 Do RequestBy default, the WebClientPlugin
initiates a call request to the http
service with the following class inheritance relationship.
- ShenyuPlugin: top-level plug-in, defining plug-in methods.
- AbstractHttpClientPlugin: abstract class that implements the public logic of request invocation.
- WebClientPlugin: initiating requests through
WebClient
. - NettyHttpClientPlugin: initiating requests through
Netty
.
Initiate the request call.
- org.apache.shenyu.plugin.httpclient.AbstractHttpClientPlugin#execute()
Initiate the request call in the execute()
method.
- Get the specified timeout, number of retries
- Initiate the request
- Retry after failure according to the specified retry policy
public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class);
@Override public final Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { // shenyu Context final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT); assert shenyuContext != null; // uri final URI uri = exchange.getAttribute(Constants.HTTP_URI); if (Objects.isNull(uri)) { Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_URL, null); return WebFluxResultUtils.result(exchange, error); } // get time out final long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L); final Duration duration = Duration.ofMillis(timeout); // get retry times final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0); // get retry strategy final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName); LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy is {}", uri.toASCIIString(), retryTimes, retryStrategy); // build header final HttpHeaders httpHeaders = buildHttpHeaders(exchange); // do request final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethodValue(), uri, httpHeaders, exchange.getRequest().getBody()) .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration))) .doOnError(e -> LOG.error(e.getMessage(), e)); // Retry Policy CURRENT, retries the current service. if (RetryEnum.CURRENT.getName().equals(retryStrategy)) { //old version of DividePlugin and SpringCloudPlugin will run on this return response.retryWhen(Retry.anyOf(TimeoutException.class, ConnectTimeoutException.class, ReadTimeoutException.class, IllegalStateException.class) .retryMax(retryTimes) .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true))) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)) .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange)); } // Retry for other services // Exclude services that have already been called final Set<URI> exclude = Sets.newHashSet(uri); // resend return resend(response, exchange, duration, httpHeaders, exclude, retryTimes) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)) .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange)); }
private Mono<R> resend(final Mono<R> clientResponse, final ServerWebExchange exchange, final Duration duration, final HttpHeaders httpHeaders, final Set<URI> exclude, final int retryTimes) { Mono<R> result = clientResponse; // Retry according to the specified number of retries for (int i = 0; i < retryTimes; i++) { result = resend(result, exchange, duration, httpHeaders, exclude); } return result; }
private Mono<R> resend(final Mono<R> response, final ServerWebExchange exchange, final Duration duration, final HttpHeaders httpHeaders, final Set<URI> exclude) { return response.onErrorResume(th -> { final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID); final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE); //Check available services final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId) .stream().filter(data -> { final String trimUri = data.getUrl().trim(); for (URI needToExclude : exclude) { // exclude already called if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) { return false; } } return true; }).collect(Collectors.toList()); if (CollectionUtils.isEmpty(upstreamList)) { // no need to retry anymore return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg())); } // requets ip final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress(); // Load Balance final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip); if (Objects.isNull(upstream)) { // no need to retry anymore return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg())); } final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain()); // Exclude uri that has already been called exclude.add(newUri); // Make another call return doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, httpHeaders, exchange.getRequest().getBody()) .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration))) .doOnError(e -> LOG.error(e.getMessage(), e)); }); }
//......}
- org.apache.shenyu.plugin.httpclient.WebClientPlugin#doRequest()
Initiate a real request call via webClient
in the doRequest()
method.
@Override protected Mono<ClientResponse> doRequest(final ServerWebExchange exchange, final String httpMethod, final URI uri, final HttpHeaders httpHeaders, final Flux<DataBuffer> body) { return webClient.method(HttpMethod.valueOf(httpMethod)).uri(uri) // uri .headers(headers -> headers.addAll(httpHeaders)) // header .body(BodyInserters.fromDataBuffers(body)) .exchange() // request .doOnSuccess(res -> { if (res.statusCode().is2xxSuccessful()) { // success exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName()); } else { // error exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName()); } exchange.getResponse().setStatusCode(res.statusCode()); exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res); }); }
#
2.5 Response Result- org.apache.shenyu.plugin.response.ResponsePlugin#execute()
The response results are handled by the ResponsePlugin
plugin.
@Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT); assert shenyuContext != null; // Processing results according to rpc type return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain); }
The processing type is determined by MessageWriter
and the class inheritance relationship is as follows.
- MessageWriter: interface, defining message processing methods.
- NettyClientMessageWriter: processing of
Netty
call results. - RPCMessageWriter: processing the results of
RPC
calls. - WebClientMessageWriter: processing
WebClient
call results.
The default is to initiate http
requests via WebCient
.
- org.apache.shenyu.plugin.response.strategy.WebClientMessageWriter#writeWith()
Process the response results in the writeWith()
method.
@Override public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) { return chain.execute(exchange).then(Mono.defer(() -> { // get response ServerHttpResponse response = exchange.getResponse(); ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR); if (Objects.isNull(clientResponse)) { Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null); return WebFluxResultUtils.result(exchange, error); } //cookies and headers response.getCookies().putAll(clientResponse.cookies()); response.getHeaders().putAll(clientResponse.headers().asHttpHeaders()); // image, pdf or stream does not do format processing. // Handling special response types if (clientResponse.headers().contentType().isPresent()) { final String media = clientResponse.headers().contentType().get().toString().toLowerCase(); if (media.matches(COMMON_BIN_MEDIA_TYPE_REGEX)) { return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers())) .doOnCancel(() -> clean(exchange)); } } // Handling general response types clientResponse = ResponseUtils.buildClientResponse(response, clientResponse.body(BodyExtractors.toDataBuffers())); return clientResponse.bodyToMono(byte[].class) .flatMap(originData -> WebFluxResultUtils.result(exchange, originData)) .doOnCancel(() -> clean(exchange)); })); }
Analysis to this point, the source code analysis on Divide
plugin is complete, the analysis flow chart is as follows.
#
3. SummaryThe source code analysis in this article starts from the http
service registration to the divide
plugin service calls. The divide
plugin is mainly used to handle http
requests. Some of the source code does not enter the in-depth analysis, such as the implementation of load balancing, service probe live, will continue to analyze in the following.