Skip to main content

One post tagged with "dubbo"

View All Tags

Code Analysis For Dubbo Plugin

· 22 min read
Apache ShenYu Committer

Apache ShenYu is an asynchronous, high-performance, cross-language, responsive API gateway.

The Apache ShenYu gateway uses the dubbo plugin to make calls to the dubbo service. You can see the official documentation Dubbo Quick Start to learn how to use the plugin.

This article is based on shenyu-2.4.3 version for source code analysis, please refer to Dubbo Service Access for the introduction of the official website.

1. Service Registration#

Take the example provided on the official website shenyu-examples-dubbo. Suppose your dubbo service is defined as follows (spring-dubbo.xml).

<beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"       xsi:schemaLocation="http://www.springframework.org/schema/beans       http://www.springframework.org/schema/beans/spring-beans.xsd       http://code.alibabatech.com/schema/dubbo       https://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    <dubbo:application name="test-dubbo-service"/>    <dubbo:registry address="${dubbo.registry.address}"/>    <dubbo:protocol name="dubbo" port="20888"/>
    <dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>
</beans>

Declare the application service name, register the center address, use the dubbo protocol, declare the service interface, and the corresponding interface implementation class.

/** * DubboTestServiceImpl. */@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService {        @Override    @ShenyuDubboClient(path = "/findById", desc = "Query by Id")    public DubboTest findById(final String id) {        return new DubboTest(id, "hello world shenyu Apache, findById");    }
    //......}

In the interface implementation class, use the annotation @ShenyuDubboClient to register the service with shenyu-admin. The role of this annotation and its rationale will be analyzed later.

The configuration information in the configuration file application.yml.

server:  port: 8011  address: 0.0.0.0  servlet:    context-path: /spring:  main:    allow-bean-definition-overriding: truedubbo:  registry:    address: zookeeper://localhost:2181  # dubbo registry    shenyu:  register:    registerType: http     serverLists: http://localhost:9095     props:      username: admin       password: 123456  client:    dubbo:      props:        contextPath: /dubbo          appName: dubbo

In the configuration file, declare the registry address used by dubbo. The dubbo service registers with shenyu-admin, using the method http, and the registration address is http://localhost:9095.

See Application Client Access for more information on the use of the registration method.

1.1 Declaration of registration interface#

Use the annotation @ShenyuDubboClient to register the service to the gateway. The simple demo is as follows.

// dubbo sevice@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService {        @Override    @ShenyuDubboClient(path = "/findById", desc = "Query by Id") // need to be registered method    public DubboTest findById(final String id) {        return new DubboTest(id, "hello world shenyu Apache, findById");    }
    //......}

annotation definition:

/** * Works on classes and methods */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})@Inheritedpublic @interface ShenyuDubboClient {        //path    String path();        //rule name    String ruleName() default "";       //desc    String desc() default "";
    //enabled    boolean enabled() default true;}

1.2 Scan annotation information#

Annotation scanning is done through the ApacheDubboServiceBeanListener, which implements the ApplicationListener<ContextRefreshedEvent> interface and starts executing the event handler method when a context refresh event occurs during the Spring container startup onApplicationEvent().

During constructor instantiation.

  • Read property configuration
  • Start the thread pool
  • Start the registry for registering with shenyu-admin
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {
    // ......
    //Constructor    public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        //1.Read property configuration        Properties props = clientConfig.getProps();        String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);        String appName = props.getProperty(ShenyuClientConstants.APP_NAME);        if (StringUtils.isBlank(contextPath)) {            throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName");        }        this.contextPath = contextPath;        this.appName = appName;        this.host = props.getProperty(ShenyuClientConstants.HOST);        this.port = props.getProperty(ShenyuClientConstants.PORT);        //2.Start the thread pool        executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build());        //3.Start the registry for registering with `shenyu-admin`        publisher.start(shenyuClientRegisterRepository);    }
    /**     * Context refresh event, execute method logic     */    @Override    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {        //......    }
  • ApacheDubboServiceBeanListener#onApplicationEvent()

Rewritten method logic: read Dubbo service ServiceBean, build metadata object and URI object, and register it with shenyu-admin.

    @Override    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {        //read ServiceBean        Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class);        if (serviceBean.isEmpty()) {            return;        }        //The method is guaranteed to be executed only once        if (!registered.compareAndSet(false, true)) {            return;        }        //handle metadata         for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) {            handler(entry.getValue());        }        //handle URI        serviceBean.values().stream().findFirst().ifPresent(bean -> {            publisher.publishEvent(buildURIRegisterDTO(bean));        });    }
  • handler()

    In the handler() method, read all methods from the serviceBean, determine if there is a ShenyuDubboClient annotation on the method, build a metadata object if it exists, and register the method with shenyu-admin through the registry.

    private void handler(final ServiceBean<?> serviceBean) {        //get proxy        Object refProxy = serviceBean.getRef();        //get class        Class<?> clazz = refProxy.getClass();        if (AopUtils.isAopProxy(refProxy)) {            clazz = AopUtils.getTargetClass(refProxy);        }        //all methods        Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);        for (Method method : methods) {            //read ShenyuDubboClient annotation            ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class);            if (Objects.nonNull(shenyuDubboClient)) {                //build meatdata and registry                publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method));            }        }    }
  • buildMetaDataDTO()

    Constructs a metadata object where the necessary information for method registration is constructed and subsequently used for selector or rule matching.

    private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) {        //app name        String appName = buildAppName(serviceBean);        //path        String path = contextPath + shenyuDubboClient.path();        //desc        String desc = shenyuDubboClient.desc();        //service name        String serviceName = serviceBean.getInterface();        //rule name        String configRuleName = shenyuDubboClient.ruleName();        String ruleName = ("".equals(configRuleName)) ? path : configRuleName;        //method name         String methodName = method.getName();        //parameter Types        Class<?>[] parameterTypesClazz = method.getParameterTypes();        String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(","));        return MetaDataRegisterDTO.builder()                .appName(appName)                .serviceName(serviceName)                .methodName(methodName)                .contextPath(contextPath)                .host(buildHost())                .port(buildPort(serviceBean))                .path(path)                .ruleName(ruleName)                .pathDesc(desc)                .parameterTypes(parameterTypes)                .rpcExt(buildRpcExt(serviceBean)) //dubbo ext                .rpcType(RpcTypeEnum.DUBBO.getName())                .enabled(shenyuDubboClient.enabled())                .build();    }
  • buildRpcExt()

    dubbo ext information.

       private String buildRpcExt(final ServiceBean serviceBean) {       DubboRpcExt build = DubboRpcExt.builder()               .group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//group               .version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//version               .loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//load balance               .retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//retry               .timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//time               .sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent               .cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//cluster               .url("")               .build();       return GsonUtils.getInstance().toJson(build);   }
  • buildURIRegisterDTO()

    Construct URI objects to register information about the service itself, which can be subsequently used for service probing live.

private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) {        return URIRegisterDTO.builder()                .contextPath(this.contextPath) //context path                .appName(buildAppName(serviceBean))//app name                .rpcType(RpcTypeEnum.DUBBO.getName())//dubbo                .host(buildHost()) //host                .port(buildPort(serviceBean))//port                .build(); }

The specific registration logic is implemented by the registration center, please refer to Client Access Principles .

//To the registration center, post registration events   publisher.publishEvent();

1.3 Processing registration information#

The metadata and URI data registered by the client through the registry are processed at the shenyu-admin end, which is responsible for storing to the database and synchronizing to the shenyu gateway. The client-side registration processing logic of the Dubbo plugin is in the ShenyuClientRegisterDubboServiceImpl. The inheritance relationship is as follows.

  • ShenyuClientRegisterService: client registration service, top-level interface.
  • FallbackShenyuClientRegisterService: registration failure, provides retry operation.
  • AbstractShenyuClientRegisterServiceImpl: abstract class, implements part of the public registration logic.
  • ShenyuClientRegisterDubboServiceImpl: implementation of the Dubbo plugin registration.
1.3.1 Registration Service#
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    The metadata MetaDataRegisterDTO object registered by the client through the registry is picked up and dropped in the register() method of shenyu-admin.

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. register selector        String selectorHandler = selectorHandler(dto);        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);        //2. register rule        String ruleHandler = ruleHandler();        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        ruleService.registerDefault(ruleDTO);        //3. register metadata        registerMetadata(dto);        //4. register contextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
1.3.1.1 Register Selector#
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

Construct contextPath, find if the selector information exists, if it does, return id; if it doesn't, create the default selector information.

    @Override    public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {        // build contextPath        String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());        // Find if selector information exists by name        SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);        if (Objects.isNull(selectorDO)) {            // Create a default selector message if it does not exist            return registerSelector(contextPath, pluginName, selectorHandler);        }        return selectorDO.getId();    }
  • Default selector information

    Construct the default selector information and its conditional properties here.

   //register selector   private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {        //build selector        SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());        selectorDTO.setHandle(selectorHandler);        //register default selector        return registerDefault(selectorDTO);    }     //build selector    private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {        //build default        SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);        selectorDTO.setPluginId(pluginId);         //build the conditional properties of the default selector        selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));        return selectorDTO;    }
  • Build default selector
private SelectorDTO buildDefaultSelectorDTO(final String name) {    return SelectorDTO.builder()            .name(name) // name            .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // default type cutom            .matchMode(MatchModeEnum.AND.getCode()) //default match mode            .enabled(Boolean.TRUE)  //enable            .loged(Boolean.TRUE)  //log            .continued(Boolean.TRUE)             .sort(1)             .build();}
  • Build default selector conditional properties
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {    SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();    selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI    selectorConditionDTO.setParamName("/");    selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default  match    selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX);     return Collections.singletonList(selectorConditionDTO);}
  • Register default selector
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) {    //selector information    SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);    //selector conditional properties    List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();    if (StringUtils.isEmpty(selectorDTO.getId())) {        // insert selector information into the database        selectorMapper.insertSelective(selectorDO);          // inserting selector conditional properties to the database        selectorConditionDTOs.forEach(selectorConditionDTO -> {            selectorConditionDTO.setSelectorId(selectorDO.getId());                        selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));        });    }    // Publish synchronization events to synchronize selection information and its conditional attributes to the gateway    publishEvent(selectorDO, selectorConditionDTOs);    return selectorDO.getId();}
1.3.1.2 Registration Rules#

In the second step of registering the service, start building the default rules and then register the rules.

@Override    public String register(final MetaDataRegisterDTO dto) {        //1. handle selector        //......                //2. handle rule                String ruleHandler = ruleHandler();        // build default rule        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        // register rule        ruleService.registerDefault(ruleDTO);                //3. reigster metadata        //......                //4. register ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • 默认规则处理属性
    @Override    protected String ruleHandler() {        // default rule        return new DubboRuleHandle().toJson();    }

Dubbo plugin default rule handling properties.

public class DubboRuleHandle implements RuleHandle {
    /**     * dubbo version.     */    private String version;
    /**     * group.     */    private String group;
    /**     * retry.     */    private Integer retries = 0;
    /**     * loadbalance:RANDOM     */    private String loadbalance = LoadBalanceEnum.RANDOM.getName();
    /**     * timeout default 3000     */    private long timeout = Constants.TIME_OUT;}
  • build default rule
  // build default rule    private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {        return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());    }   //  build default rule    private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {        RuleDTO ruleDTO = RuleDTO.builder()                .selectorId(selectorId)                .name(ruleName)                 .matchMode(MatchModeEnum.AND.getCode())                 .enabled(Boolean.TRUE)                 .loged(Boolean.TRUE)                 .sort(1)                .handle(ruleHandler)                .build();        RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()                .paramType(ParamTypeEnum.URI.getName())                 .paramName("/")                .paramValue(path)                 .build();        if (path.indexOf("*") > 1) {            ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias());         } else {            ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias());         }        ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));        return ruleDTO;    }
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

Registration rules: insert records to the database and publish events to the gateway for data synchronization.


    @Override    public String registerDefault(final RuleDTO ruleDTO) {        RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());        if (Objects.nonNull(exist)) {            return "";        }
        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);        List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();        if (StringUtils.isEmpty(ruleDTO.getId())) {            // insert rule information into the database            ruleMapper.insertSelective(ruleDO);            //insert  rule body conditional attributes into the database            ruleConditions.forEach(ruleConditionDTO -> {                ruleConditionDTO.setRuleId(ruleDO.getId());                     ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));            });        }        // Publish events to the gateway for data synchronization        publishEvent(ruleDO, ruleConditions);        return ruleDO.getId();    }
1.3.1.3 Register Metadata#

Metadata is mainly used for RPC service calls.

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. register selector        //......                //2. register rule        //......                //3. register metadata        registerMetadata(dto);                //4. register ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDubboServiceImpl#registerMetadata()

    Insert or update metadata and then publish sync events to the gateway.

    @Override    protected void registerMetadata(final MetaDataRegisterDTO dto) {            // get metaDataService            MetaDataService metaDataService = getMetaDataService();            MetaDataDO exist = metaDataService.findByPath(dto.getPath());            //insert or update metadata            metaDataService.saveOrUpdateMetaData(exist, dto);    }
    @Override    public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {        DataEventTypeEnum eventType;        // DTO->DO        MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);        // insert data        if (Objects.isNull(exist)) {            Timestamp currentTime = new Timestamp(System.currentTimeMillis());            metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());            metaDataDO.setDateCreated(currentTime);            metaDataDO.setDateUpdated(currentTime);            metaDataMapper.insert(metaDataDO);            eventType = DataEventTypeEnum.CREATE;        } else {            // update            metaDataDO.setId(exist.getId());            metaDataMapper.update(metaDataDO);            eventType = DataEventTypeEnum.UPDATE;        }        // Publish sync events to gateway        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,                Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));    }
1.3.2 Register URI#
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

The server side receives the URI information registered by the client and processes it.

    @Override    public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {        String result;        String key = key(selectorName);        try {            this.removeFallBack(key);            // register URI            result = this.doRegisterURI(selectorName, uriList);            logger.info("Register success: {},{}", selectorName, uriList);        } catch (Exception ex) {            logger.warn("Register exception: cause:{}", ex.getMessage());            result = "";            // Retry after registration failure            this.addFallback(key, new FallbackHolder(selectorName, uriList));        }        return result;    }
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

Get a valid URI from the URI registered by the client, update the corresponding selector handle property, and send a selector update event to the gateway.

@Override    public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {        //check        if (CollectionUtils.isEmpty(uriList)) {            return "";        }                SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        if (Objects.isNull(selectorDO)) {            throw new ShenyuException("doRegister Failed to execute,wait to retry.");        }        // gte valid URI        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());        // build handle        String handler = buildHandle(validUriList, selectorDO);        if (handler != null) {            selectorDO.setHandle(handler);            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));            selectorData.setHandle(handler);            // Update the handle property of the selector to the database            selectorService.updateSelective(selectorDO);            // Send selector update events to the gateway            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));        }        return ShenyuResultMessage.SUCCESS;    }

The source code analysis on service registration is completed as well as the analysis flow chart is as follows.

The next step is to analyze how the dubbo plugin initiates calls to the http service based on this information.

2. Service Invocation#

The dubbo plugin is the core processing plugin used by the ShenYu gateway to convert http requests into the dubbo protocol and invoke the dubbo service.

Take the case provided by the official website Quick Start with Dubbo as an example, a dubbo service is registered with shenyu-admin through the registry, and then requested through the ShenYu gateway proxy, the request is as follows.

GET http://localhost:9195/dubbo/findById?id=100Accept: application/json

The class inheritance relationship in the Dubbo plugin is as follows.

  • ShenyuPlugin: top-level interface, defining interface methods.
  • AbstractShenyuPlugin: abstract class that implements plugin common logic.
  • AbstractDubboPlugin: dubbo plugin abstract class, implementing dubbo common logic.
  • ApacheDubboPlugin: ApacheDubbo plugin.

ShenYu Gateway supports ApacheDubbo and AlibabaDubbo\

2.1 Receive requests#

After passing the ShenYu gateway proxy, the request entry is ShenyuWebHandler, which implements the org.springframework.web.server.WebHandler interface.

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {    //......        /**     * hanlde request     */    @Override    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {       // execute default plugin chain        Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);        if (scheduled) {            return execute.subscribeOn(scheduler);        }        return execute;    }        private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
        private int index;
        private final List<ShenyuPlugin> plugins;
          DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {            this.plugins = plugins;        }
        /**         * execute.         */        @Override        public Mono<Void> execute(final ServerWebExchange exchange) {            return Mono.defer(() -> {                if (this.index < plugins.size()) {                    // get plugin                     ShenyuPlugin plugin = plugins.get(this.index++);                    boolean skip = plugin.skip(exchange);                    if (skip) {                        // next                        return this.execute(exchange);                    }                    // execute                    return plugin.execute(exchange, this);                }                return Mono.empty();            });        }    }}

2.2 Match Rule#

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

Execute the matching logic for selectors and rules in the execute() method.

  • Matching selectors.
  • Matching rules.
  • Execute the plugin.
@Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        // plugin name        String pluginName = named();        // plugin data        PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);        if (pluginData != null && pluginData.getEnabled()) {            // selector data            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);            if (CollectionUtils.isEmpty(selectors)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            // match selector            SelectorData selectorData = matchSelector(exchange, selectors);            if (Objects.isNull(selectorData)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            selectorLog(selectorData, pluginName);            // rule data            List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());            if (CollectionUtils.isEmpty(rules)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            // match rule            RuleData rule;            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {                //get last                rule = rules.get(rules.size() - 1);            } else {                rule = matchRule(exchange, rules);            }            if (Objects.isNull(rule)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            ruleLog(rule, pluginName);            // execute            return doExecute(exchange, chain, selectorData, rule);        }        return chain.execute(exchange);    }

2.3 Execute GlobalPlugin#

  • org.apache.shenyu.plugin.global.GlobalPlugin#execute()

GlobalPlugin is a global plugin that constructs contextual information in the execute() method.

public class GlobalPlugin implements ShenyuPlugin {    // shenyu context    private final ShenyuContextBuilder builder;        //......        @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {       // build context information to be passed into the exchange        ShenyuContext shenyuContext = builder.build(exchange);        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);        return chain.execute(exchange);    }        //......}
  • org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()

Build the default context information.

public class DefaultShenyuContextBuilder implements ShenyuContextBuilder {    //......        @Override    public ShenyuContext build(final ServerWebExchange exchange) {        //build data        Pair<String, MetaData> buildData = buildData(exchange);        //wrap ShenyuContext        return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());    }        private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {        //......        //get the metadata according to the requested uri        MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());        if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {            exchange.getAttributes().put(Constants.META_DATA, metaData);            return Pair.of(metaData.getRpcType(), metaData);        } else {            return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());        }    }    //set the default context information    private ShenyuContext buildDefaultContext(final ServerHttpRequest request) {        String appKey = request.getHeaders().getFirst(Constants.APP_KEY);        String sign = request.getHeaders().getFirst(Constants.SIGN);        String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);        ShenyuContext shenyuContext = new ShenyuContext();        String path = request.getURI().getPath();        shenyuContext.setPath(path);         shenyuContext.setAppKey(appKey);        shenyuContext.setSign(sign);        shenyuContext.setTimestamp(timestamp);        shenyuContext.setStartDateTime(LocalDateTime.now());        Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name()));        return shenyuContext;    } }
  • org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()

wrap ShenyuContext:

public class DubboShenyuContextDecorator implements ShenyuContextDecorator {        @Override    public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) {        shenyuContext.setModule(metaData.getAppName());        shenyuContext.setMethod(metaData.getServiceName());         shenyuContext.setContextPath(metaData.getContextPath());         shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName());         return shenyuContext;    }        @Override    public String rpcType() {        return RpcTypeEnum.DUBBO.getName();    }}

2.4 Execute RpcParamTransformPlugin#

The RpcParamTransformPlugin is responsible for reading the parameters from the http request, saving them in the exchange and passing them to the rpc service.

  • org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()

In the execute() method, the core logic of the plugin is executed: get the request information from exchange and process the parameters according to the form of content passed in by the request.

public class RpcParamTransformPlugin implements ShenyuPlugin {
    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        //get request information from exchange        ServerHttpRequest request = exchange.getRequest();        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        if (Objects.nonNull(shenyuContext)) {           // APPLICATION_JSON            MediaType mediaType = request.getHeaders().getContentType();            if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {                return body(exchange, request, chain);            }            // APPLICATION_FORM_URLENCODED            if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {                return formData(exchange, request, chain);            }            //query            return query(exchange, request, chain);        }        return chain.execute(exchange);    }        //APPLICATION_JSON    private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())                .flatMap(body -> {                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中                    return chain.execute(exchange);                }));    }   // APPLICATION_FORM_URLENCODED    private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())                .flatMap(map -> {                    String param = resolveBodyFromRequest(map);                    LinkedMultiValueMap<String, String> linkedMultiValueMap;                    try {                        linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据                    } catch (UnsupportedEncodingException e) {                        return Mono.error(e);                    }                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中                    return chain.execute(exchange);                }));    }    //query    private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中        return chain.execute(exchange);    }    //...... }

2.5 Execute DubboPlugin#

  • org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()

In the doExecute() method, the main purpose is to check the metadata and parameters.

public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin {        @Override    public Mono<Void> doExecute(final ServerWebExchange exchange,                                   final ShenyuPluginChain chain,                                   final SelectorData selector,                                   final RuleData rule) {        //param        String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);        //context        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        //metaData        MetaData metaData = exchange.getAttribute(Constants.META_DATA);        //check metaData        if (!checkMetaData(metaData)) {            LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null);            return WebFluxResultUtils.result(exchange, error);        }        //check        if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null);            return WebFluxResultUtils.result(exchange, error);        }        //set rpcContext        this.rpcContext(exchange);        //dubbo invoke        return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);    }}
  • org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()

Set special context information in the doDubboInvoker() method, and then start the dubbo generalization call.

public class ApacheDubboPlugin extends AbstractDubboPlugin {        @Override    protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,                                        final ShenyuPluginChain chain,                                        final SelectorData selector,                                        final RuleData rule,                                        final MetaData metaData,                                        final String param) {        //set the current selector and rule information, and request address for dubbo graying support        RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());        RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());        RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());        //dubbo generic invoker        final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);        //execute next plugin in chain        return result.then(chain.execute(exchange));    }}
  • org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()

genericInvoker() method.

  • Gets the ReferenceConfig object.
  • Gets the generalization service GenericService object.
  • Constructs the request parameter pair object.
  • Initiates an asynchronous generalization invocation.
public class ApacheDubboProxyService {    //...... 
    /**     * Generic invoker object.     */    public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {        //1.Get the ReferenceConfig object        ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath());
        if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {            //Failure of the current cache information            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());            //Reinitialization with metadata            reference = ApacheDubboConfigCache.getInstance().initRef(metaData);        }        //2.Get the GenericService object of the generalization service        GenericService genericService = reference.get();        //3.Constructing the request parameter pair object        Pair<String[], Object[]> pair;        if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) {            pair = new ImmutablePair<>(new String[]{}, new Object[]{});        } else {            pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());        }        //4.Initiating asynchronous generalization calls        return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {            //handle result            if (Objects.isNull(ret)) {                ret = Constants.DUBBO_RPC_RESULT_EMPTY;            }            exchange.getAttributes().put(Constants.RPC_RESULT, ret);            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());            return ret;        })).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常    }        //Generalized calls, asynchronous operations    private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {        genericService.$invoke(method, parameterTypes, args);        Object resultFromFuture = RpcContext.getContext().getFuture();        return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);    }}

Calling the dubbo service at the gateway can be achieved by generalizing the call.

The ReferenceConfig object is the key object to support generalization calls , and its initialization operation is done during data synchronization. There are two parts of data involved here, one is the synchronized plugin handler information and the other is the synchronized plugin metadata information.

  • org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()

When the plugin data is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The initialization operation is performed in handlerPlugin().

public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {    //......        //Initializing the configuration cache   protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);
    @Override    public void handlerPlugin(final PluginData pluginData) {        if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {            //Data deserialization            DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);            DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);            if (Objects.isNull(dubboRegisterConfig)) {                return;            }            if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {                // Perform initialization operations                this.initConfigCache(dubboRegisterConfig);            }            Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);        }    }    //......}
  • org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()

Perform initialization operations.

public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {
    @Override    protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {        //perform initialization operations        ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);        //cached results before failure        ApacheDubboConfigCache.getInstance().invalidateAll();    }}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()

In the initialization, set registryConfig and consumerConfig.

public final class ApacheDubboConfigCache extends DubboConfigCache {    //......     /**     * init     */    public void init(final DubboRegisterConfig dubboRegisterConfig) {        //ApplicationConfig        if (Objects.isNull(applicationConfig)) {            applicationConfig = new ApplicationConfig("shenyu_proxy");        }        //When the protocol or address changes, you need to update the registryConfig        if (needUpdateRegistryConfig(dubboRegisterConfig)) {            RegistryConfig registryConfigTemp = new RegistryConfig();            registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol());            registryConfigTemp.setId("shenyu_proxy");            registryConfigTemp.setRegister(false);            registryConfigTemp.setAddress(dubboRegisterConfig.getRegister());            Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup);            registryConfig = registryConfigTemp;        }        //ConsumerConfig        if (Objects.isNull(consumerConfig)) {            consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> {                ConsumerConfig consumerConfig = new ConsumerConfig();                consumerConfig.refresh();                return consumerConfig;            });                       //ConsumerConfig            Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool);             Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads);            Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads);            Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues);        }    }        //Does the registration configuration need to be updated    private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) {        if (Objects.isNull(registryConfig)) {            return true;        }        return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol())                || !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress())                || !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol());    }
    //......}
  • org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()

When the metadata is updated, the data synchronization module synchronizes the data from shenyu-admin to the gateway. The metadata update operation is performed in the onSubscribe() method.

public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber {    //local memory cache    private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();
    //update metaData    public void onSubscribe(final MetaData metaData) {        // dubbo        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {            //Whether the corresponding metadata exists            MetaData exist = META_DATA.get(metaData.getPath());            if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) {                // initRef                ApacheDubboConfigCache.getInstance().initRef(metaData);            } else {                // The corresponding metadata has undergone an update operation                if (!Objects.equals(metaData.getServiceName(), exist.getServiceName())                        || !Objects.equals(metaData.getRpcExt(), exist.getRpcExt())                        || !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes())                        || !Objects.equals(metaData.getMethodName(), exist.getMethodName())) {                    //Build ReferenceConfig again based on the latest metadata                    ApacheDubboConfigCache.getInstance().build(metaData);                }            }            //local memory cache            META_DATA.put(metaData.getPath(), metaData);        }    }
    //dalete    public void unSubscribe(final MetaData metaData) {        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {            //使ReferenceConfig失效            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());            META_DATA.remove(metaData.getPath());        }    }}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()

Build ReferenceConfig objects from metaData.

public final class ApacheDubboConfigCache extends DubboConfigCache {    //......        public ReferenceConfig<GenericService> initRef(final MetaData metaData) {            try {                //First try to get it from the cache, and return it directly if it exists                ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());                if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {                    return referenceConfig;                }            } catch (ExecutionException e) {                LOG.error("init dubbo ref exception", e);            }                      //build if not exist            return build(metaData);        }
        /**         * Build reference config.         */        @SuppressWarnings("deprecation")        public ReferenceConfig<GenericService> build(final MetaData metaData) {            if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) {                return new ReferenceConfig<>();            }            ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //ReferenceConfig            reference.setGeneric("true"); //generic invoke            reference.setAsync(true);//async
            reference.setApplication(applicationConfig);//applicationConfig            reference.setRegistry(registryConfig);//registryConfig            reference.setConsumer(consumerConfig);//consumerConfig            reference.setInterface(metaData.getServiceName());//serviceName            reference.setProtocol("dubbo");//dubbo            reference.setCheck(false);             reference.setLoadbalance("gray");//gray
            Map<String, String> parameters = new HashMap<>(2);            parameters.put("dispatcher", "direct");            reference.setParameters(parameters);
            String rpcExt = metaData.getRpcExt();//rpc ext param            DubboParam dubboParam = parserToDubboParam(rpcExt);            if (Objects.nonNull(dubboParam)) {                if (StringUtils.isNoneBlank(dubboParam.getVersion())) {                    reference.setVersion(dubboParam.getVersion());//version                }                if (StringUtils.isNoneBlank(dubboParam.getGroup())) {                    reference.setGroup(dubboParam.getGroup());//group                }                if (StringUtils.isNoneBlank(dubboParam.getUrl())) {                    reference.setUrl(dubboParam.getUrl());//url                }                if (StringUtils.isNoneBlank(dubboParam.getCluster())) {                    reference.setCluster(dubboParam.getCluster());                }                Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout                Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires                Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent            }            try {                //get GenericService                Object obj = reference.get();                if (Objects.nonNull(obj)) {                    LOG.info("init apache dubbo reference success there meteData is :{}", metaData);                    //cache reference                    cache.put(metaData.getPath(), reference);                }            } catch (Exception e) {                LOG.error("init apache dubbo reference exception", e);            }            return reference;        }    //......    }

2.6 Execute ResponsePlugin#

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

The response results are handled by the ResponsePlugin plugin.

    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // handle results according to rpc type        return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);    }

The processing type is determined by MessageWriter and the class inheritance relationship is as follows.

  • MessageWriter: interface, defining message processing methods.
  • NettyClientMessageWriter: processing of Netty call results.
  • RPCMessageWriter: processing the results of RPC calls.
  • WebClientMessageWriter: processing the results of WebClient calls.

Dubbo service call, the processing result is RPCMessageWriter of course.

  • org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()

Process the response results in the writeWith() method.


public class RPCMessageWriter implements MessageWriter {
    @Override    public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        return chain.execute(exchange).then(Mono.defer(() -> {            Object result = exchange.getAttribute(Constants.RPC_RESULT); //result            if (Objects.isNull(result)) {                 Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);                return WebFluxResultUtils.result(exchange, error);            }            return WebFluxResultUtils.result(exchange, result);        }));    }}

At this point in the analysis, the source code analysis of the Dubbo plugin is complete, and the analysis flow chart is as follows.

3. Summary#

The source code analysis in this article starts from Dubbo service registration to Dubbo plug-in service calls. The Dubbo plugin is mainly used to handle the conversion of http requests to the dubbo protocol, and the main logic is implemented through generalized calls.