Skip to main content

One post tagged with "plugin"

View All Tags

扩展插件加载逻辑

· One min read
hql0312 Coder

本文基于shenyu-2.6.1版本进行源码分析.

正文

Shenyu 提供了一种机制来定制自己的插件或是修改已有的插件,在其内部通过extPlugin的配置实现,其需要满足以下两点:

  1. 实现接口 ShenyuPlugin 或是 PluginDataHandler
  2. 将实现的包打包后,放置于shenyu.extPlugin.path对应的路径下

入口#

真正实现该逻辑的类是ShenyuLoaderService,接下来看下该类是如何处理

    public ShenyuLoaderService(final ShenyuWebHandler webHandler, final CommonPluginDataSubscriber subscriber, final ShenyuConfig shenyuConfig) {        // 插件信息的信息订阅        this.subscriber = subscriber;        // Shenyu封装的WebHandler,包含了所有的插件逻辑        this.webHandler = webHandler;        // 配置信息        this.shenyuConfig = shenyuConfig;        // 扩展插件的配置信息,如路径,是否启用、开启多少线程来处理、检查加载的频率等信息        ExtPlugin config = shenyuConfig.getExtPlugin();        // 如果启用的,则创建定时任务来检查并加载        if (config.getEnabled()) {            // 创建一个指定线程名称的定时任务            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(config.getThreads(), ShenyuThreadFactory.create("plugin-ext-loader", true));            // 创建固定频率执行的任务,默认在30s,每300s,执行一次            executor.scheduleAtFixedRate(() -> loadExtOrUploadPlugins(null), config.getScheduleDelay(), config.getScheduleTime(), TimeUnit.SECONDS);        }    }    

该类有以下几个属性:

webHandler: 该类是shenyu 处理请求的入口,引用了所有的插件数据,在扩展插件加载后,需要进行更新

subscriber: 该类是插件的订阅的入口,引用了所有插件的订阅处理类,在扩展配置加载后,也需要进行同步更新

executor: 在ShenyuLoaderService内部会创建一个定时任务,来定时扫描加载指定路径下的jar包,便于加载扩展的插件,实现动态发现 默认会在启动30秒后,每300秒扫描一次

同时这里可以通过 shenyu.extPlugin.enabled配置来决定是否要开启扩展插件功能的启用

以上的配置可以在配置文件中进行调整:

shenyu:  extPlugin:    path:   # 扩展插件的存储目录    enabled: true # 是否启用扩展功能    threads: 1 # 扫描加载的线程数    scheduleTime: 300 # 任务执行的频率    scheduleDelay: 30 # 任务启动后多久开始执行

接下来看下加载的逻辑:

   public void loadExtOrUploadPlugins(final PluginData uploadedJarResource) {        try {            List<ShenyuLoaderResult> plugins = new ArrayList<>();            // 获取ShenyuPluginClassloader的持有对象            ShenyuPluginClassloaderHolder singleton = ShenyuPluginClassloaderHolder.getSingleton();            if (Objects.isNull(uploadedJarResource)) {                // 参数为空,则从扩展的目录,加载所有的jar包                // PluginJar:包含ShenyuPlugin接口、PluginDataHandler接口的数据                List<PluginJarParser.PluginJar> uploadPluginJars = ShenyuExtPathPluginJarLoader.loadExtendPlugins(shenyuConfig.getExtPlugin().getPath());                // 遍历所有的待加载插件                for (PluginJarParser.PluginJar extPath : uploadPluginJars) {                    LOG.info("shenyu extPlugin find new {} to load", extPath.getAbsolutePath());                    // 使用扩展插件的加载器来加载指定的插件,便于后续的加载和卸载                    ShenyuPluginClassLoader extPathClassLoader = singleton.createPluginClassLoader(extPath);                    // 使用ShenyuPluginClassLoader 进行加载                    // 主要逻辑是:判断是否实现ShenyuPlugin接口、PluginDataHandler接口 或是否标识 @Component\@Service等注解,如果有,则注册为SpringBean                    // 构造 ShenyuLoaderResult对象                    plugins.addAll(extPathClassLoader.loadUploadedJarPlugins());                }            } else {                // 加载指定jar,逻辑同加载全部                PluginJarParser.PluginJar pluginJar = PluginJarParser.parseJar(Base64.getDecoder().decode(uploadedJarResource.getPluginJar()));                LOG.info("shenyu upload plugin jar find new {} to load", pluginJar.getJarKey());                ShenyuPluginClassLoader uploadPluginClassLoader = singleton.createPluginClassLoader(pluginJar);                plugins.addAll(uploadPluginClassLoader.loadUploadedJarPlugins());            }            // 将扩展的插件,加入到ShenyuWebHandler的插件列表,后续的请求则会经过加入的插件内容            loaderPlugins(plugins);        } catch (Exception e) {            LOG.error("shenyu plugins load has error ", e);        }    }

该方法处理的逻辑:

  1. 判断参数uploadedJarResource是否有值,如果没有,则会加载全部,否则加载指定资源jar包进行处理
  2. shenyu.extPlugin.path 中获取到指定jar包,并封装成 PluginJar对象,该对象包含了jar包以下信息
    • version: 版本信息
    • groupId:包的groupId
    • artifactId: 包的 artifactId
    • absolutePath: 绝对路径
    • clazzMap:class对应的字节码
    • resourceMap:jar包的字节码
  3. 通过ShenyuPluginClassloaderHolder创建对应的ClassLoader,对应的类是ShenyuPluginClassLoader, 并进行加载对应的类
    • 调用ShenyuPluginClassLoader.loadUploadedJarPlugins 加载对应的类并注册成Spring Bean,这样可以使用Spring容器来管理
  4. 调用loaderPlugins方法,将扩展的插件更新到 webHandler 以及 subscriber

插件注册#

对于提供的jar包里的内容,加载器只会处理指定接口类型的类,实现逻辑在 ShenyuPluginClassLoader.loadUploadedJarPlugins() 方法

public List<ShenyuLoaderResult> loadUploadedJarPlugins() {        List<ShenyuLoaderResult> results = new ArrayList<>();        // 所有的类映射关系        Set<String> names = pluginJar.getClazzMap().keySet();        // 遍历所有的类        names.forEach(className -> {            Object instance;            try {                // 尝试创建对象,如果可以,则加入到Spring容器中                instance = getOrCreateSpringBean(className);                if (Objects.nonNull(instance)) {                    // 构建ShenyuLoaderResult对象                    results.add(buildResult(instance));                    LOG.info("The class successfully loaded into a upload-Jar-plugin {} is registered as a spring bean", className);                }            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {                LOG.warn("Registering upload-Jar-plugins succeeds spring bean fails:{}", className, e);            }        });        return results;    }

该方法就是负责构建所有符合条件的对象,并封装成 ShenyuLoaderResult对象,该对象对于创建后对象,进行了封装,会在方法 buildResult()中进行处理

    private ShenyuLoaderResult buildResult(final Object instance) {        ShenyuLoaderResult result = new ShenyuLoaderResult();        // 创建的对象是否实现了ShenyuPlugin        if (instance instanceof ShenyuPlugin) {            result.setShenyuPlugin((ShenyuPlugin) instance);            // 创建的对象是否实现了PluginDataHandler        } else if (instance instanceof PluginDataHandler) {            result.setPluginDataHandler((PluginDataHandler) instance);        }        return result;    }

同时进入方法 getOrCreateSpringBean() 进一步分析

    private <T> T getOrCreateSpringBean(final String className) throws ClassNotFoundException, IllegalAccessException, InstantiationException {        // 确认是否已经注册过了,如果有则不处理,直接返回        if (SpringBeanUtils.getInstance().existBean(className)) {            return SpringBeanUtils.getInstance().getBeanByClassName(className);        }        lock.lock();        try {            // Double check,            T inst = SpringBeanUtils.getInstance().getBeanByClassName(className);            if (Objects.isNull(inst)) {                // 使用 ShenyuPluginClassLoader 进行加载类                Class<?> clazz = Class.forName(className, false, this);                //Exclude ShenyuPlugin subclass and PluginDataHandler subclass                // without adding @Component @Service annotation                // 确认是否是 ShenyuPlugin 或是 PluginDataHandler的子类                boolean next = ShenyuPlugin.class.isAssignableFrom(clazz)                        || PluginDataHandler.class.isAssignableFrom(clazz);                if (!next) {                    // 如果不是,确认是否标识了 @Component 与 @Service 注解                    Annotation[] annotations = clazz.getAnnotations();                    next = Arrays.stream(annotations).anyMatch(e -> e.annotationType().equals(Component.class)                            || e.annotationType().equals(Service.class));                }                if (next) {                    // 如果符合以上内容,则注册Bean                    GenericBeanDefinition beanDefinition = new GenericBeanDefinition();                    beanDefinition.setBeanClassName(className);                    beanDefinition.setAutowireCandidate(true);                    beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);                    // 注册bean                    String beanName = SpringBeanUtils.getInstance().registerBean(beanDefinition, this);                    // 创建对象                    inst = SpringBeanUtils.getInstance().getBeanByClassName(beanName);                }            }            return inst;        } finally {            lock.unlock();        }    }

逻辑大致如下:

  1. 判断是否实现了接口 ShenyuPluginPluginDataHandler, 如果没有,则是否标识了 @Component 或是 @Service
  2. 如果符合1的条件,则将该对象注册到Spring 容器,并返回创建的对象

同步#

在插件注册成功后,这时只是实例化了插件,但它还不会生效,因为它还未添加到 Shenyu的插件链中,同步逻辑由 loaderPlugins()方法实现

    private void loaderPlugins(final List<ShenyuLoaderResult> results) {        if (CollectionUtils.isEmpty(results)) {            return;        }        // 获取所有实现了接口ShenyuPlugin的对象        List<ShenyuPlugin> shenyuExtendPlugins = results.stream().map(ShenyuLoaderResult::getShenyuPlugin).filter(Objects::nonNull).collect(Collectors.toList());        // 同步更新webHandler中plugins        webHandler.putExtPlugins(shenyuExtendPlugins);        // 获取所有实现了接口PluginDataHandler的对象        List<PluginDataHandler> handlers = results.stream().map(ShenyuLoaderResult::getPluginDataHandler).filter(Objects::nonNull).collect(Collectors.toList());        // 同步扩展的PluginDataHandler        subscriber.putExtendPluginDataHandler(handlers);
    }

该方法的逻辑处理了两个数据:

  1. 将实现了 ShenyuPlugin 接口的数据,同步至 webHandler的plugins 列表
    public void putExtPlugins(final List<ShenyuPlugin> extPlugins) {        if (CollectionUtils.isEmpty(extPlugins)) {            return;        }        // 过滤出新增的插件        final List<ShenyuPlugin> shenyuAddPlugins = extPlugins.stream()                .filter(e -> plugins.stream().noneMatch(plugin -> plugin.named().equals(e.named())))                .collect(Collectors.toList());        // 过滤出更新的插件,以名称和旧的相同来判断,则为更新        final List<ShenyuPlugin> shenyuUpdatePlugins = extPlugins.stream()                .filter(e -> plugins.stream().anyMatch(plugin -> plugin.named().equals(e.named())))                .collect(Collectors.toList());        // 如果没有数据,则跳过        if (CollectionUtils.isEmpty(shenyuAddPlugins) && CollectionUtils.isEmpty(shenyuUpdatePlugins)) {            return;        }        // 复制旧的数据        // copy new list        List<ShenyuPlugin> newPluginList = new ArrayList<>(plugins);        // 添加新的插件数据        // Add extend plugin from pluginData or shenyu ext-lib        this.sourcePlugins.addAll(shenyuAddPlugins);        // 添加新数据        if (CollectionUtils.isNotEmpty(shenyuAddPlugins)) {            shenyuAddPlugins.forEach(plugin -> LOG.info("shenyu auto add extends plugins:{}", plugin.named()));            newPluginList.addAll(shenyuAddPlugins);        }        // 修改更新的数据        if (CollectionUtils.isNotEmpty(shenyuUpdatePlugins)) {            shenyuUpdatePlugins.forEach(plugin -> LOG.info("shenyu auto update extends plugins:{}", plugin.named()));            for (ShenyuPlugin updatePlugin : shenyuUpdatePlugins) {                for (int i = 0; i < newPluginList.size(); i++) {                    if (newPluginList.get(i).named().equals(updatePlugin.named())) {                        newPluginList.set(i, updatePlugin);                    }                }                for (int i = 0; i < this.sourcePlugins.size(); i++) {                    if (this.sourcePlugins.get(i).named().equals(updatePlugin.named())) {                        this.sourcePlugins.set(i, updatePlugin);                    }                }            }        }        // 重新排序        plugins = sortPlugins(newPluginList);    }
  1. 将实现了 PluginDataHandler 接口的数据,同步至 subscriber 的handlers 列表
    public void putExtendPluginDataHandler(final List<PluginDataHandler> handlers) {        if (CollectionUtils.isEmpty(handlers)) {            return;        }        // 遍历所有数据        for (PluginDataHandler handler : handlers) {            String pluginNamed = handler.pluginNamed();            // 更新现有的PluginDataHandler列表            MapUtils.computeIfAbsent(handlerMap, pluginNamed, name -> {                LOG.info("shenyu auto add extends plugin data handler name is :{}", pluginNamed);                return handler;            });        }    }

至此,扩展插件的加载过程分析结束。

Dubbo插件源码分析

· One min read
Apache ShenYu Committer

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

Apache ShenYu 网关使用 dubbo 插件完成对 dubbo服务的调用。你可以查看官方文档 Dubbo快速开始 了解如何使用该插件。

本文基于shenyu-2.4.3版本进行源码分析,官网的介绍请参考 Dubbo服务接入

1. 服务注册#

以官网提供的例子为例 shenyu-examples-dubbo 。 假如你的dubbo服务定义如下(spring-dubbo.xml):

<beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"       xsi:schemaLocation="http://www.springframework.org/schema/beans       http://www.springframework.org/schema/beans/spring-beans.xsd       http://code.alibabatech.com/schema/dubbo       https://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    <dubbo:application name="test-dubbo-service"/>    <dubbo:registry address="${dubbo.registry.address}"/>    <dubbo:protocol name="dubbo" port="20888"/>
    <dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>
</beans>

声明应用服务名称,注册中心地址,使用dubbo协议,声明服务接口,对应接口实现类:

/** * DubboTestServiceImpl. */@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService {        @Override    @ShenyuDubboClient(path = "/findById", desc = "Query by Id")    public DubboTest findById(final String id) {        return new DubboTest(id, "hello world shenyu Apache, findById");    }
    //......}

在接口实现类中,使用注解@ShenyuDubboClientshenyu-admin注册服务。该注解的作用及原理,稍后再进行分析。

在配置文件application.yml中的配置信息:

server:  port: 8011  address: 0.0.0.0  servlet:    context-path: /spring:  main:    allow-bean-definition-overriding: truedubbo:  registry:    address: zookeeper://localhost:2181  # dubbo使用的注册中心    shenyu:  register:    registerType: http #注册方式    serverLists: http://localhost:9095 #注册地址    props:      username: admin       password: 123456  client:    dubbo:      props:        contextPath: /dubbo          appName: dubbo

在配置文件中,声明dubbo使用的注册中心地址,dubbo服务向shenyu-admin注册,使用的方式是http,注册地址是http://localhost:9095

关于注册方式的使用,请参考 应用客户端接入

1.1 声明注册接口#

使用注解@ShenyuDubboClient将服务注册到网关。简单demo如下:

// dubbo服务@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService {        @Override    @ShenyuDubboClient(path = "/findById", desc = "Query by Id") // 需要注册的方法    public DubboTest findById(final String id) {        return new DubboTest(id, "hello world shenyu Apache, findById");    }
    //......}

注解定义:

/** * 作用于类和方法上 */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})@Inheritedpublic @interface ShenyuDubboClient {        //注册路径    String path();        //规则名称    String ruleName() default "";       //描述信息    String desc() default "";
    //是否启用    boolean enabled() default true;}

1.2 扫描注解信息#

注解扫描通过ApacheDubboServiceBeanListener完成,它实现了ApplicationListener<ContextRefreshedEvent>接口,在Spring容器启动过程中,发生上下文刷新事件时,开始执行事件处理方法onApplicationEvent()

在构造器实例化的过程中:

  • 读取属性配置
  • 开启线程池
  • 启动注册中心,用于向shenyu-admin注册
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {
    // ......
    //构造器    public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        //1.读取属性配置        Properties props = clientConfig.getProps();        String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);        String appName = props.getProperty(ShenyuClientConstants.APP_NAME);        if (StringUtils.isBlank(contextPath)) {            throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName");        }        this.contextPath = contextPath;        this.appName = appName;        this.host = props.getProperty(ShenyuClientConstants.HOST);        this.port = props.getProperty(ShenyuClientConstants.PORT);        //2.开启线程池        executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build());        //3.启动注册中心        publisher.start(shenyuClientRegisterRepository);    }
    /**     * 上下文刷新事件,执行方法逻辑     */    @Override    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {        //......    }
  • ApacheDubboServiceBeanListener#onApplicationEvent()

重写的方法逻辑:读取Dubbo服务ServiceBean,构建元数据对象和URI对象,并向shenyu-admin注册。

    @Override    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {        //读取ServiceBean        Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class);        if (serviceBean.isEmpty()) {            return;        }        //保证该方法只执行一次        if (!registered.compareAndSet(false, true)) {            return;        }        //处理元数据对象        for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) {            handler(entry.getValue());        }        //处理URI对象        serviceBean.values().stream().findFirst().ifPresent(bean -> {            publisher.publishEvent(buildURIRegisterDTO(bean));        });    }
  • handler()

    handler()方法中,从serviceBean中读取所有方法,判断方法上是否有ShenyuDubboClient注解,如果存在就构建元数据对象,并通过注册中心,向shenyu-admin注册该方法。

    private void handler(final ServiceBean<?> serviceBean) {        //获取代理对象        Object refProxy = serviceBean.getRef();        //获取class信息        Class<?> clazz = refProxy.getClass();        if (AopUtils.isAopProxy(refProxy)) {            clazz = AopUtils.getTargetClass(refProxy);        }        //获取所有方法        Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);        for (Method method : methods) {            //读取ShenyuDubboClient注解信息            ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class);            if (Objects.nonNull(shenyuDubboClient)) {                //构建元数据对象,并注册                publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method));            }        }    }
  • buildMetaDataDTO()

    构建元数据对象,在这里构建方法注册的必要信息,后续用于选择器或规则匹配。

    private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) {        //应用名称        String appName = buildAppName(serviceBean);        //方法路径        String path = contextPath + shenyuDubboClient.path();        //描述信息        String desc = shenyuDubboClient.desc();        //服务名称        String serviceName = serviceBean.getInterface();        //规则名称        String configRuleName = shenyuDubboClient.ruleName();        String ruleName = ("".equals(configRuleName)) ? path : configRuleName;        //方法名称        String methodName = method.getName();        //参数类型        Class<?>[] parameterTypesClazz = method.getParameterTypes();        String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(","));        return MetaDataRegisterDTO.builder()                .appName(appName)                .serviceName(serviceName)                .methodName(methodName)                .contextPath(contextPath)                .host(buildHost())                .port(buildPort(serviceBean))                .path(path)                .ruleName(ruleName)                .pathDesc(desc)                .parameterTypes(parameterTypes)                .rpcExt(buildRpcExt(serviceBean)) //dubbo服务的扩展信息                .rpcType(RpcTypeEnum.DUBBO.getName())                .enabled(shenyuDubboClient.enabled())                .build();    }
  • buildRpcExt()

    dubbo服务的扩展信息

       private String buildRpcExt(final ServiceBean serviceBean) {       DubboRpcExt build = DubboRpcExt.builder()               .group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//分组               .version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//版本               .loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//负载均衡策略,默认随机               .retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//重试次数,默认2               .timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//超时,默认3000               .sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent,默认false               .cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//集群策略,默认failover               .url("")               .build();       return GsonUtils.getInstance().toJson(build);   }
  • buildURIRegisterDTO()

    构建URI对象,注册服务本身的信息,后续可用于服务探活。

private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) {        return URIRegisterDTO.builder()                .contextPath(this.contextPath) //上下文路径                .appName(buildAppName(serviceBean))//应用名称                .rpcType(RpcTypeEnum.DUBBO.getName())//rpc类型:dubbo                .host(buildHost()) //host                .port(buildPort(serviceBean))//port                .build(); }

具体的注册逻辑由注册中心实现,请参考 客户端接入原理

//向注册中心,发布注册事件   publisher.publishEvent();

1.3 处理注册信息#

客户端通过注册中心注册的元数据和URI数据,在shenyu-admin端进行处理,负责存储到数据库和同步给shenyu网关。Dubbo插件的客户端注册处理逻辑在ShenyuClientRegisterDubboServiceImpl中。继承关系如下:

  • ShenyuClientRegisterService:客户端注册服务,顶层接口;
  • FallbackShenyuClientRegisterService:注册失败,提供重试操作;
  • AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
  • ShenyuClientRegisterDubboServiceImpl:实现Dubbo插件的注册;
1.3.1 注册服务#
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    客户端通过注册中心注册的元数据MetaDataRegisterDTO对象在shenyu-adminregister()方法被接送到。

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        String selectorHandler = selectorHandler(dto);        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);        //2. 注册规则        String ruleHandler = ruleHandler();        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        ruleService.registerDefault(ruleDTO);        //3. 注册元数据        registerMetadata(dto);        //4. 注册ContextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
1.3.1.1 注册选择器#
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

构建contextPath,查找选择器信息是否存在,如果存在就返回id;不存在就创建默认的选择器信息。

    @Override    public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {        // 构建contextPath        String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());        // 通过名称查找选择器信息是否存在        SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);        if (Objects.isNull(selectorDO)) {            // 不存在就创建默认的选择器信息            return registerSelector(contextPath, pluginName, selectorHandler);        }        return selectorDO.getId();    }
  • 默认选择器信息

    在这里构建默认选择器信息及其条件属性。

   //注册选择器   private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {        //构建选择器        SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());        selectorDTO.setHandle(selectorHandler);        //注册默认选择器        return registerDefault(selectorDTO);    }     //构建选择器    private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {        //构建默认选择器        SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);        selectorDTO.setPluginId(pluginId);         //构建默认选择器的条件属性        selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));        return selectorDTO;    }
  • 构建默认选择器
private SelectorDTO buildDefaultSelectorDTO(final String name) {    return SelectorDTO.builder()            .name(name) // 名称            .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义            .matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and            .enabled(Boolean.TRUE)  //默认启开启            .loged(Boolean.TRUE)  //默认记录日志            .continued(Boolean.TRUE) //默认继续后续选择器            .sort(1) //默认顺序1            .build();}
  • 构建默认选择器条件属性
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {    SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();    selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI    selectorConditionDTO.setParamName("/");    selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match    selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**    return Collections.singletonList(selectorConditionDTO);}
  • 注册默认选择器
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) {    //选择器信息    SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);    //选择器条件属性    List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();    if (StringUtils.isEmpty(selectorDTO.getId())) {        // 向数据库插入选择器信息        selectorMapper.insertSelective(selectorDO);          // 向数据库插入选择器条件属性        selectorConditionDTOs.forEach(selectorConditionDTO -> {            selectorConditionDTO.setSelectorId(selectorDO.getId());            selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));        });    }    // 发布同步事件,向网关同步选择信息及其条件属性    publishEvent(selectorDO, selectorConditionDTOs);    return selectorDO.getId();}
1.3.1.2 注册规则#

在注册服务的第二步中,开始构建默认规则,然后注册规则。

@Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        // 默认规则处理属性        String ruleHandler = ruleHandler();        // 构建默认规则信息        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        // 注册规则        ruleService.registerDefault(ruleDTO);                //3. 注册元数据        //......                //4. 注册ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • 默认规则处理属性
    @Override    protected String ruleHandler() {        // 默认规则处理属性        return new DubboRuleHandle().toJson();    }

Dubbo插件默认规则处理属性

public class DubboRuleHandle implements RuleHandle {
    /**     * dubbo服务版本信息.     */    private String version;
    /**     * 分组.     */    private String group;
    /**     * 重试次数.     */    private Integer retries = 0;
    /**     * 负载均衡策略:默认随机     */    private String loadbalance = LoadBalanceEnum.RANDOM.getName();
    /**     * 超时,默认3000     */    private long timeout = Constants.TIME_OUT;}
  • 构建默认规则信息
  // 构建默认规则信息    private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {        return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());    }   //  构建默认规则信息    private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {        RuleDTO ruleDTO = RuleDTO.builder()                .selectorId(selectorId) //关联的选择器id                .name(ruleName) //规则名称                .matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and                .enabled(Boolean.TRUE) // 默认开启                .loged(Boolean.TRUE) //默认记录日志                .sort(1) //默认顺序 1                .handle(ruleHandler)                .build();        RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()                .paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI                .paramName("/")                .paramValue(path) //参数值path                .build();        if (path.indexOf("*") > 1) {            ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match        } else {            ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 =         }        ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));        return ruleDTO;    }
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。


    @Override    public String registerDefault(final RuleDTO ruleDTO) {        RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());        if (Objects.nonNull(exist)) {            return "";        }
        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);        List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();        if (StringUtils.isEmpty(ruleDTO.getId())) {            // 向数据库插入规则信息            ruleMapper.insertSelective(ruleDO);            //向数据库插入规则体条件属性            ruleConditions.forEach(ruleConditionDTO -> {                ruleConditionDTO.setRuleId(ruleDO.getId());                ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));            });        }        // 向网关发布事件,进行数据同步        publishEvent(ruleDO, ruleConditions);        return ruleDO.getId();    }
1.3.1.3 注册元数据#

元数据主要用于RPC服务的调用。

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        //......                //3. 注册元数据        registerMetadata(dto);                //4. 注册ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDubboServiceImpl#registerMetadata()

    插入或更新元数据,然后发布同步事件到网关。

    @Override    protected void registerMetadata(final MetaDataRegisterDTO dto) {            // 获取metaDataService            MetaDataService metaDataService = getMetaDataService();            // 元数据是否存在            MetaDataDO exist = metaDataService.findByPath(dto.getPath());            // 插入或更新元数据            metaDataService.saveOrUpdateMetaData(exist, dto);    }
    @Override    public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {        DataEventTypeEnum eventType;        // 数据类型转换 DTO->DO        MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);        // 插入数据        if (Objects.isNull(exist)) {            Timestamp currentTime = new Timestamp(System.currentTimeMillis());            metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());            metaDataDO.setDateCreated(currentTime);            metaDataDO.setDateUpdated(currentTime);            metaDataMapper.insert(metaDataDO);            eventType = DataEventTypeEnum.CREATE;        } else {            // 更新数据            metaDataDO.setId(exist.getId());            metaDataMapper.update(metaDataDO);            eventType = DataEventTypeEnum.UPDATE;        }        // 发布同步事件到网关        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,                Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));    }
1.3.2 注册URI#
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

服务端收到客户端注册的URI信息后,进行处理。

    @Override    public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {        String result;        String key = key(selectorName);        try {            this.removeFallBack(key);            // 注册URI            result = this.doRegisterURI(selectorName, uriList);            logger.info("Register success: {},{}", selectorName, uriList);        } catch (Exception ex) {            logger.warn("Register exception: cause:{}", ex.getMessage());            result = "";            // 注册失败后,进行重试            this.addFallback(key, new FallbackHolder(selectorName, uriList));        }        return result;    }
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

从客户端注册的URI中获取有效的URI,更新对应的选择器handle属性,向网关发送选择器更新事件。

@Override    public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {        //参数检查        if (CollectionUtils.isEmpty(uriList)) {            return "";        }        //获取选择器信息        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        if (Objects.isNull(selectorDO)) {            throw new ShenyuException("doRegister Failed to execute,wait to retry.");        }        // 获取有效的URI        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());        // 构建选择器的handle属性        String handler = buildHandle(validUriList, selectorDO);        if (handler != null) {            selectorDO.setHandle(handler);            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));            selectorData.setHandle(handler);            // 向数据库更新选择器的handle属性            selectorService.updateSelective(selectorDO);            // 向网关发送选择器更新事件            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));        }        return ShenyuResultMessage.SUCCESS;    }

关于服务注册的源码分析就以及完成了,分析流程图如下:

接下来就分析dubbo插件是如何根据这些信息向http服务发起调用。

2. 服务调用#

dubbo插件是ShenYu网关用于将http请求转成 dubbo协议,调用dubbo服务的核心处理插件。

以官网提供的案例 Dubbo快速开始 为例,一个dubbo服务通过注册中心向shenyu-admin注册后,通过ShenYu网关代理,请求如下:

GET http://localhost:9195/dubbo/findById?id=100Accept: application/json

Dubbo插件中,类继承关系如下:

  • ShenyuPlugin:顶层接口,定义接口方法;
  • AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
  • AbstractDubboPlugin:dubbo插件抽象类,实现dubbo共有逻辑;
  • ApacheDubboPlugin:ApacheDubbo插件。

ShenYu网关支持ApacheDubbo和AlibabaDubbo

2.1 接收请求#

通过ShenYu网关代理后,请求入口是ShenyuWebHandler,它实现了org.springframework.web.server.WebHandler接口。

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {    //......        /**     * 处理web请求     */    @Override    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {       // 执行默认插件链        Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);        if (scheduled) {            return execute.subscribeOn(scheduler);        }        return execute;    }        private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
        private int index;
        private final List<ShenyuPlugin> plugins;
        /**         * 实例化默认插件链         */        DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {            this.plugins = plugins;        }
        /**         * 执行每个插件.         */        @Override        public Mono<Void> execute(final ServerWebExchange exchange) {            return Mono.defer(() -> {                if (this.index < plugins.size()) {                    // 获取当前执行插件                    ShenyuPlugin plugin = plugins.get(this.index++);                    // 是否跳过当前插件                    boolean skip = plugin.skip(exchange);                    if (skip) {                        // 如果跳过就执行下一个                        return this.execute(exchange);                    }                    // 执行当前插件                    return plugin.execute(exchange, this);                }                return Mono.empty();            });        }    }}

2.2 匹配规则#

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

execute()方法中执行选择器和规则的匹配逻辑。

  • 匹配选择器;
  • 匹配规则;
  • 执行插件。
@Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        // 插件名称        String pluginName = named();        // 插件信息        PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);        if (pluginData != null && pluginData.getEnabled()) {            // 选择器信息            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);            if (CollectionUtils.isEmpty(selectors)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            // 匹配选择器            SelectorData selectorData = matchSelector(exchange, selectors);            if (Objects.isNull(selectorData)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            selectorLog(selectorData, pluginName);            // 规则信息            List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());            if (CollectionUtils.isEmpty(rules)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            // 匹配规则            RuleData rule;            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {                //get last                rule = rules.get(rules.size() - 1);            } else {                rule = matchRule(exchange, rules);            }            if (Objects.isNull(rule)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            ruleLog(rule, pluginName);            // 执行插件            return doExecute(exchange, chain, selectorData, rule);        }        return chain.execute(exchange);    }

2.3 执行GlobalPlugin#

  • org.apache.shenyu.plugin.global.GlobalPlugin#execute()

GlobalPlugin是一个全局插件,在execute()方法中构建上下文信息。

public class GlobalPlugin implements ShenyuPlugin {    // 构建上下文信息    private final ShenyuContextBuilder builder;        //......        @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {       // 构建上下文信息,传入到 exchange 中        ShenyuContext shenyuContext = builder.build(exchange);        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);        return chain.execute(exchange);    }        //......}
  • org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()

构建默认的上下文信息。

public class DefaultShenyuContextBuilder implements ShenyuContextBuilder {    //......        @Override    public ShenyuContext build(final ServerWebExchange exchange) {        //构建参数        Pair<String, MetaData> buildData = buildData(exchange);        //包装ShenyuContext        return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());    }        private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {        //......        //根据请求的uri获取元数据        MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());        if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {            exchange.getAttributes().put(Constants.META_DATA, metaData);            return Pair.of(metaData.getRpcType(), metaData);        } else {            return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());        }    }    //设置默认的上下文信息    private ShenyuContext buildDefaultContext(final ServerHttpRequest request) {        String appKey = request.getHeaders().getFirst(Constants.APP_KEY);        String sign = request.getHeaders().getFirst(Constants.SIGN);        String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);        ShenyuContext shenyuContext = new ShenyuContext();        String path = request.getURI().getPath();        shenyuContext.setPath(path); //请求路径        shenyuContext.setAppKey(appKey);        shenyuContext.setSign(sign);        shenyuContext.setTimestamp(timestamp);        shenyuContext.setStartDateTime(LocalDateTime.now());        Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name()));//请求方法        return shenyuContext;    } }
  • org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()

包装ShenyuContext

public class DubboShenyuContextDecorator implements ShenyuContextDecorator {        @Override    public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) {        shenyuContext.setModule(metaData.getAppName());//获取AppName        shenyuContext.setMethod(metaData.getServiceName()); //获取ServiceName        shenyuContext.setContextPath(metaData.getContextPath()); //获取contextPath        shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName()); // dubbo服务        return shenyuContext;    }        @Override    public String rpcType() {        return RpcTypeEnum.DUBBO.getName();    }}

2.4 执行RpcParamTransformPlugin#

RpcParamTransformPlugin负责从http请求中读取参数,保存到exchange中,传递给rpc服务。

  • org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()

execute()方法中,执行该插件的核心逻辑:从exchange中获取请求信息,根据请求传入的内容形式处理参数。

public class RpcParamTransformPlugin implements ShenyuPlugin {
    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        //从exchange中获取请求信息        ServerHttpRequest request = exchange.getRequest();        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        if (Objects.nonNull(shenyuContext)) {           // 如果请求参数格式是APPLICATION_JSON            MediaType mediaType = request.getHeaders().getContentType();            if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {                return body(exchange, request, chain);            }            // 如果请求参数格式是APPLICATION_FORM_URLENCODED            if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {                return formData(exchange, request, chain);            }            //一般查询请求            return query(exchange, request, chain);        }        return chain.execute(exchange);    }        // 如果请求参数格式是APPLICATION_JSON    private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())                .flatMap(body -> {                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中                    return chain.execute(exchange);                }));    }   // 如果请求参数格式是APPLICATION_FORM_URLENCODED    private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())                .flatMap(map -> {                    String param = resolveBodyFromRequest(map);                    LinkedMultiValueMap<String, String> linkedMultiValueMap;                    try {                        linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据                    } catch (UnsupportedEncodingException e) {                        return Mono.error(e);                    }                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中                    return chain.execute(exchange);                }));    }    //一般查询请求    private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中        return chain.execute(exchange);    }    //...... }

2.5 执行DubboPlugin#

  • org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()

doExecute()方法中,主要是检查元数据和参数。

public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin {        @Override    public Mono<Void> doExecute(final ServerWebExchange exchange,                                   final ShenyuPluginChain chain,                                   final SelectorData selector,                                   final RuleData rule) {        //获取参数        String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);        //获取上下文信息        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        //获取元数据        MetaData metaData = exchange.getAttribute(Constants.META_DATA);        //检查元数据        if (!checkMetaData(metaData)) {            LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null);            return WebFluxResultUtils.result(exchange, error);        }        //检查元数据和参数        if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null);            return WebFluxResultUtils.result(exchange, error);        }        //设置rpcContext        this.rpcContext(exchange);        //进行dubbo服务调用        return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);    }}
  • org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()

doDubboInvoker()方法中设置特殊的上下文信息,然后开始dubbo的泛化调用。

public class ApacheDubboPlugin extends AbstractDubboPlugin {        @Override    protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,                                        final ShenyuPluginChain chain,                                        final SelectorData selector,                                        final RuleData rule,                                        final MetaData metaData,                                        final String param) {        //设置当前的选择器和规则信息,以及请求地址,用于支持dubbo的灰度        RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());        RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());        RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());        //dubbo的泛化调用        final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);        //执行下一个插件        return result.then(chain.execute(exchange));    }}
  • org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()

genericInvoker()方法:

  • 获取ReferenceConfig对象;
  • 获取泛化服务GenericService对象;
  • 构造请求参数pair对象;
  • 发起异步的泛化调用。
public class ApacheDubboProxyService {    //...... 
    /**     * Generic invoker object.     */    public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {        //1.获取ReferenceConfig对象        ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath());        //如果没有获取到        if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {            //失效当前缓存的信息            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());            //使用元数据进行再次初始化            reference = ApacheDubboConfigCache.getInstance().initRef(metaData);        }        //2.获取泛化服务GenericService对象        GenericService genericService = reference.get();        //3.构造请求参数pair对象        Pair<String[], Object[]> pair;        if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) {            pair = new ImmutablePair<>(new String[]{}, new Object[]{});        } else {            pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());        }        //4.发起异步的泛化调用        return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {            //处理结果            if (Objects.isNull(ret)) {                ret = Constants.DUBBO_RPC_RESULT_EMPTY;            }            exchange.getAttributes().put(Constants.RPC_RESULT, ret);            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());            return ret;        })).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常    }        //泛化调用,异步操作    private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {        genericService.$invoke(method, parameterTypes, args);        Object resultFromFuture = RpcContext.getContext().getFuture();        return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);    }}

通过泛化调用就可以实现在网关调用dubbo服务了。

ReferenceConfig对象是支持泛化调用的关键对象 ,它的初始化操作是在数据同步的时候完成的。这里涉及两部分数据,一是同步的插件handler信息,二是同步的插件元数据信息。

  • org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()

当插件数据更新时,数据同步模块会将数据从shenyu-admin同步到网关。在handlerPlugin()中执行初始化操作。

public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {    //......        //初始化配置缓存   protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);
    @Override    public void handlerPlugin(final PluginData pluginData) {        if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {            //数据反序列化            DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);            DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);            if (Objects.isNull(dubboRegisterConfig)) {                return;            }            if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {                // 执行初始化操作                this.initConfigCache(dubboRegisterConfig);            }            Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);        }    }    //......}
  • org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()

执行初始化操作。

public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {
    @Override    protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {        //执行初始化操作        ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);        //失效之前缓存的结果        ApacheDubboConfigCache.getInstance().invalidateAll();    }}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()

在初始化中,设置registryConfigconsumerConfig

public final class ApacheDubboConfigCache extends DubboConfigCache {    //......     /**     * 初始化     */    public void init(final DubboRegisterConfig dubboRegisterConfig) {        //创建ApplicationConfig        if (Objects.isNull(applicationConfig)) {            applicationConfig = new ApplicationConfig("shenyu_proxy");        }        //协议或者地址发生改变时,需要更新registryConfig        if (needUpdateRegistryConfig(dubboRegisterConfig)) {            RegistryConfig registryConfigTemp = new RegistryConfig();            registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol());            registryConfigTemp.setId("shenyu_proxy");            registryConfigTemp.setRegister(false);            registryConfigTemp.setAddress(dubboRegisterConfig.getRegister());            Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup);            registryConfig = registryConfigTemp;        }        //创建ConsumerConfig        if (Objects.isNull(consumerConfig)) {            consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> {                ConsumerConfig consumerConfig = new ConsumerConfig();                consumerConfig.refresh();                return consumerConfig;            });            //设置ConsumerConfig            Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool);             Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads);             Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads);             Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues);        }    }        //是否需要更新注册配置    private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) {        if (Objects.isNull(registryConfig)) {            return true;        }        return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol())                || !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress())                || !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol());    }
    //......}
  • org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()

当元数据更新时,数据同步模块会将数据从shenyu-admin同步到网关。在onSubscribe()方法中执行元数据更新操作。

public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber {    //本地内存缓存    private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();
    //元数据发生更新    public void onSubscribe(final MetaData metaData) {        // dubbo服务的元数据更新        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {            //对应的元数据是否存在            MetaData exist = META_DATA.get(metaData.getPath());            if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) {                // 首次初始化                ApacheDubboConfigCache.getInstance().initRef(metaData);            } else {                // 对应的元数据发生了更新操作                if (!Objects.equals(metaData.getServiceName(), exist.getServiceName())                        || !Objects.equals(metaData.getRpcExt(), exist.getRpcExt())                        || !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes())                        || !Objects.equals(metaData.getMethodName(), exist.getMethodName())) {                    //根据最新的元数据再次构建ReferenceConfig                    ApacheDubboConfigCache.getInstance().build(metaData);                }            }            //本地内存缓存            META_DATA.put(metaData.getPath(), metaData);        }    }
    //删除元数据    public void unSubscribe(final MetaData metaData) {        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {            //使ReferenceConfig失效            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());            META_DATA.remove(metaData.getPath());        }    }}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()

通过metaData构建ReferenceConfig对象。

public final class ApacheDubboConfigCache extends DubboConfigCache {    //......        public ReferenceConfig<GenericService> initRef(final MetaData metaData) {            try {                //先尝试从缓存中获取,存在就直接返回                ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());                if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {                    return referenceConfig;                }            } catch (ExecutionException e) {                LOG.error("init dubbo ref exception", e);            }           //不存在,就构建            return build(metaData);        }
        /**         * Build reference config.         */        @SuppressWarnings("deprecation")        public ReferenceConfig<GenericService> build(final MetaData metaData) {            if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) {                return new ReferenceConfig<>();            }            ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //新建ReferenceConfig            reference.setGeneric("true"); //泛化调用            reference.setAsync(true);//支持异步
            reference.setApplication(applicationConfig);//设置应用配置            reference.setRegistry(registryConfig);//设置注册中心配置            reference.setConsumer(consumerConfig);//设置消费者配置            reference.setInterface(metaData.getServiceName());//设置服务接口            reference.setProtocol("dubbo");//设置dubbo协议            reference.setCheck(false); //不检查 service provider            reference.setLoadbalance("gray");//支持灰度
            Map<String, String> parameters = new HashMap<>(2);            parameters.put("dispatcher", "direct");            reference.setParameters(parameters);//自定义参数
            String rpcExt = metaData.getRpcExt();//rpc扩展参数            DubboParam dubboParam = parserToDubboParam(rpcExt);//反序列化            if (Objects.nonNull(dubboParam)) {                if (StringUtils.isNoneBlank(dubboParam.getVersion())) {                    reference.setVersion(dubboParam.getVersion());//设置版本                }                if (StringUtils.isNoneBlank(dubboParam.getGroup())) {                    reference.setGroup(dubboParam.getGroup());//设置分组                }                if (StringUtils.isNoneBlank(dubboParam.getUrl())) {                    reference.setUrl(dubboParam.getUrl());//设置url                }                if (StringUtils.isNoneBlank(dubboParam.getCluster())) {                    reference.setCluster(dubboParam.getCluster());//设置Cluster type                }                Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout                Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires                Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent            }            try {                //获取GenericService                Object obj = reference.get();                if (Objects.nonNull(obj)) {                    LOG.info("init apache dubbo reference success there meteData is :{}", metaData);                    //缓存当前的reference                    cache.put(metaData.getPath(), reference);                }            } catch (Exception e) {                LOG.error("init apache dubbo reference exception", e);            }            return reference;        }    //......    }

2.6 执行ResponsePlugin#

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

响应结果由ResponsePlugin插件处理。

    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // 根据rpc类型处理结果        return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);    }

处理类型由MessageWriter决定,类继承关系如下:

  • MessageWriter:接口,定义消息处理方法;
  • NettyClientMessageWriter:处理Netty调用结果;
  • RPCMessageWriter:处理RPC调用结果;
  • WebClientMessageWriter:处理WebClient调用结果;

Dubbo服务调用,处理结果当然是RPCMessageWriter了。

  • org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()

writeWith()方法中处理响应结果。


public class RPCMessageWriter implements MessageWriter {
    @Override    public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        return chain.execute(exchange).then(Mono.defer(() -> {            Object result = exchange.getAttribute(Constants.RPC_RESULT); //获取结果            if (Objects.isNull(result)) { //处理异常                Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);                return WebFluxResultUtils.result(exchange, error);            }            return WebFluxResultUtils.result(exchange, result);//返回结果        }));    }}

分析至此,关于Dubbo插件的源码分析就完成了,分析流程图如下:

3. 小结#

本文源码分析从Dubbo服务注册开始,到Dubbo插件的服务调用。Dubbo插件主要用来处理将http请求转成dubbo协议,主要逻辑是通过泛化调用实现。

Divide插件源码分析

· One min read
Apache ShenYu Committer

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu 网关使用 divide 插件来处理 http 请求。你可以查看官方文档 Http快速开始 了解如何使用该插件。

本文基于shenyu-2.4.3版本进行源码分析,官网的介绍请参考 Http服务接入

1. 服务注册#

1.1 声明注册接口#

使用注解@ShenyuSpringMvcClient将服务注册到网关。简单demo如下:

@RestController@RequestMapping("/order")@ShenyuSpringMvcClient(path = "/order")  // API注册public class OrderController {    @GetMapping("/findById")    @ShenyuSpringMvcClient(path = "/findById", desc = "Find by id") // 方法注册    public OrderDTO findById(@RequestParam("id") final String id) {        return build(id, "hello world findById");    }}

注解定义:


/** * 作用于类和方法上 */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})public @interface ShenyuSpringMvcClient {        //注册路径    String path() default "";        //规则名称    String ruleName() default "";       //描述信息    String desc() default "";
    //是否启用    boolean enabled() default true;        //注册元数据    boolean registerMetaData() default false;}

1.2 扫描注解信息#

注解扫描通过SpringMvcClientBeanPostProcessor完成,它实现了BeanPostProcessor接口,是Spring提供的后置处理器。

在构造器实例化的过程中:

  • 读取属性配置
  • 添加注解,读取path信息
  • 启动注册中心,向shenyu-admin注册
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {    //...    /**     * 构造器实例化     */    public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,                                            final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        // 1. 读取属性配置        Properties props = clientConfig.getProps();        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);        this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH, "");        if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {            String errorMsg = "http register param must config the appName or contextPath";            LOG.error(errorMsg);            throw new ShenyuClientIllegalArgumentException(errorMsg);        }        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));        // 2. 添加注解        mappingAnnotation.add(ShenyuSpringMvcClient.class);        mappingAnnotation.add(PostMapping.class);        mappingAnnotation.add(GetMapping.class);        mappingAnnotation.add(DeleteMapping.class);        mappingAnnotation.add(PutMapping.class);        mappingAnnotation.add(RequestMapping.class);        // 3. 启动注册中心        publisher.start(shenyuClientRegisterRepository);    }        @Override    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {       // 重写后置处理器逻辑                return bean;    }    
  • SpringMvcClientBeanPostProcessor#postProcessAfterInitialization()

重写后置处理器逻辑:读取注解信息,构建元数据对象和URI对象,并向shenyu-admin注册。

    @Override    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {        // 1. 如果是注册整个服务或者不是Controller类,就不处理        if (Boolean.TRUE.equals(isFull) || !hasAnnotation(bean.getClass(), Controller.class)) {            return bean;        }        // 2. 读取类上的注解 ShenyuSpringMvcClient        final ShenyuSpringMvcClient beanShenyuClient = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);        // 2.1构建superPath        final String superPath = buildApiSuperPath(bean.getClass());        // 2.2 是否注册整个类方法        if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {            // 构建元数据对象,然后向shenyu-admin注册            publisher.publishEvent(buildMetaDataDTO(beanShenyuClient, pathJoin(contextPath, superPath)));            return bean;        }        // 3. 读取所有方法        final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());        for (Method method : methods) {            // 3.1 读取方法上的注解 ShenyuSpringMvcClient            ShenyuSpringMvcClient methodShenyuClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);            // 如果方法上面没有注解,就用类上面的注解            methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;            if (Objects.nonNull(methodShenyuClient)) {               // 3.2 构建path信息,构建元数据对象,向shenyu-admin注册                publisher.publishEvent(buildMetaDataDTO(methodShenyuClient, buildApiPath(method, superPath)));            }        }                return bean;    }
  • 1.如果是注册整个服务或者不是Controller类,就不处理
  • 2.读取类上的注解 ShenyuSpringMvcClient,如果是注册整个类,就在这里构建元数据对象,然后向shenyu-admin注册
  • 3.处理方法上的注解 ShenyuSpringMvcClient,针对特定方法构建path信息,构建元数据对象,然后向shenyu-admin注册

这里有两个取path的方法,需要特别说明一下:

  • buildApiSuperPath()

    构造SuperPath:先从类上的注解ShenyuSpringMvcClientpath属性,如果没有,就从当前类的RequestMapping注解中取path信息。

    private String buildApiSuperPath(@NonNull final Class<?> method) {        // 先从类上的注解ShenyuSpringMvcClient取path属性        ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);        if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {            return shenyuSpringMvcClient.path();        }        // 从当前类的RequestMapping注解中取path信息        RequestMapping requestMapping = AnnotationUtils.findAnnotation(method, RequestMapping.class);        if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {            return requestMapping.path()[0];        }        return "";    }
  • buildApiPath()

    构建path:先读取方法上的注解ShenyuSpringMvcClient,如果存在就构建;否则从方法的其他注解上获取path信息;完整的path = contextPath(上下文信息)+superPath(类信息)+methodPath(方法信息)

    private String buildApiPath(@NonNull final Method method, @NonNull final String superPath) {        // 1. 读取方法上的注解ShenyuSpringMvcClient        ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);        // 1.1如果存在path,就构建        if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {            //1.2完整 path = contextPath+superPath+methodPath            return pathJoin(contextPath, superPath, shenyuSpringMvcClient.path());        }        // 2.从方法的其他注解上获取path信息        final String path = getPathByMethod(method);        if (StringUtils.isNotBlank(path)) {             // 2.1 完整的path = contextPath+superPath+methodPath            return pathJoin(contextPath, superPath, path);        }        return pathJoin(contextPath, superPath);    }
  • getPathByMethod()

    从方法的其他注解上获取path信息,其他注解包括:

    • ShenyuSpringMvcClient
    • PostMapping
    • GetMapping
    • DeleteMapping
    • PutMapping
    • RequestMapping

    private String getPathByMethod(@NonNull final Method method) {        // 遍历接口注解获取path信息        for (Class<? extends Annotation> mapping : mappingAnnotation) {            final String pathByAnnotation = getPathByAnnotation(AnnotationUtils.findAnnotation(method, mapping), pathAttributeNames);            if (StringUtils.isNotBlank(pathByAnnotation)) {                return pathByAnnotation;            }        }        return null;    }

扫描注解完成后,构建元数据对象,然后将该对象发送到shenyu-admin,即可完成注册。

  • 元数据对象

    包括当前注册方法的规则信息:contextPath,appName,注册路径,描述信息,注册类型,是否启用,规则名称和是否注册元数据。

 private MetaDataRegisterDTO buildMetaDataDTO(@NonNull final ShenyuSpringMvcClient shenyuSpringMvcClient, final String path) {        return MetaDataRegisterDTO.builder()                .contextPath(contextPath) // contextPath                .appName(appName) // appName                .path(path) // 注册路径,在网关规则匹配时使用                .pathDesc(shenyuSpringMvcClient.desc()) // 描述信息                .rpcType(RpcTypeEnum.HTTP.getName()) // divide插件,默认时http类型                .enabled(shenyuSpringMvcClient.enabled()) // 是否启用规则                .ruleName(StringUtils.defaultIfBlank(shenyuSpringMvcClient.ruleName(), path))//规则名称                .registerMetaData(shenyuSpringMvcClient.registerMetaData()) //是否注册元数据信息                .build();    }

具体的注册逻辑由注册中心实现,在之前的文章中已经分析过了,这里就不再深入分析。

1.3 注册URI信息#

ContextRegisterListener负责将客户端的URI信息注册到shenyu-admin,它实现了ApplicationListener接口,发生上下文刷新事件ContextRefreshedEvent时,执行onApplicationEvent()方法,实现注册逻辑。


public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent>, BeanFactoryAware {    //......        /**     * 构造器实例化     */    public ContextRegisterListener(final PropertiesConfig clientConfig) {        // 读取属性配置        final Properties props = clientConfig.getProps();        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));        this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);        if (Boolean.TRUE.equals(isFull)) {            if (StringUtils.isBlank(contextPath)) {                final String errorMsg = "http register param must config the contextPath";                LOG.error(errorMsg);                throw new ShenyuClientIllegalArgumentException(errorMsg);            }        }        this.port = Integer.parseInt(Optional.ofNullable(props.getProperty(ShenyuClientConstants.PORT)).orElseGet(() -> "-1"));        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);        this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);        this.host = props.getProperty(ShenyuClientConstants.HOST);    }
    @Override    public void setBeanFactory(final BeanFactory beanFactory) throws BeansException {        this.beanFactory = beanFactory;    }
    // 执行应用事件    @Override    public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {        // 保证该方法执行一次        if (!registered.compareAndSet(false, true)) {            return;        }        // 1. 如果是注册整个服务        if (Boolean.TRUE.equals(isFull)) {            // 构建元数据,并注册            publisher.publishEvent(buildMetaDataDTO());        }        try {            // 获取端口信息            final int mergedPort = port <= 0 ? PortUtils.findPort(beanFactory) : port;            // 2. 构建URI数据,并注册            publisher.publishEvent(buildURIRegisterDTO(mergedPort));        } catch (ShenyuException e) {            throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");        }    }
    // 构建URI数据    private URIRegisterDTO buildURIRegisterDTO(final int port) {        return URIRegisterDTO.builder()            .contextPath(this.contextPath) // contextPath            .appName(appName) // appName            .protocol(protocol) // 服务使用的协议            .host(IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host)) //主机            .port(port) // 端口            .rpcType(RpcTypeEnum.HTTP.getName()) // divide插件,默认注册http类型            .build();    }
    // 构建元数据    private MetaDataRegisterDTO buildMetaDataDTO() {        return MetaDataRegisterDTO.builder()            .contextPath(contextPath)            .appName(appName)            .path(contextPath)            .rpcType(RpcTypeEnum.HTTP.getName())            .enabled(true)            .ruleName(contextPath)            .build();    }}

1.4 处理注册信息#

客户端通过注册中心注册的元数据和URI数据,在shenyu-admin进行处理,负责存储到数据库和同步给shenyu网关。Divide插件的客户端注册处理逻辑在ShenyuClientRegisterDivideServiceImpl中。继承关系如下:

  • ShenyuClientRegisterService:客户端注册服务,顶层接口;
  • FallbackShenyuClientRegisterService:注册失败,提供重试操作;
  • AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
  • AbstractContextPathRegisterService:抽象类,负责注册ContextPath
  • ShenyuClientRegisterDivideServiceImpl:实现Divide插件的注册;
1.4.1 注册服务#
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    客户端通过注册中心注册的元数据MetaDataRegisterDTO对象在shenyu-adminregister()方法被接送到。

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        String selectorHandler = selectorHandler(dto);        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);        //2. 注册规则        String ruleHandler = ruleHandler();        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        ruleService.registerDefault(ruleDTO);        //3. 注册元数据        registerMetadata(dto);        //4. 注册ContextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
1.4.1.1 注册选择器#
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

构建contextPath,查找选择器信息是否存在,如果存在就返回id;不存在就创建默认的选择器信息。

    @Override    public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {        // 构建contextPath        String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());        // 通过名称查找选择器信息是否存在        SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);        if (Objects.isNull(selectorDO)) {            // 不存在就创建默认的选择器信息            return registerSelector(contextPath, pluginName, selectorHandler);        }        return selectorDO.getId();    }
  • 默认选择器信息

    在这里构建默认选择器信息及其条件属性。

   //注册选择器   private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {        //构建选择器        SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());        selectorDTO.setHandle(selectorHandler);        //注册默认选择器        return registerDefault(selectorDTO);    }     //构建选择器    private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {        //构建默认选择器        SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);        selectorDTO.setPluginId(pluginId);         //构建默认选择器的条件属性        selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));        return selectorDTO;    }
  • 构建默认选择器
private SelectorDTO buildDefaultSelectorDTO(final String name) {    return SelectorDTO.builder()            .name(name) // 名称            .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义            .matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and            .enabled(Boolean.TRUE)  //默认启开启            .loged(Boolean.TRUE)  //默认记录日志            .continued(Boolean.TRUE) //默认继续后续选择器            .sort(1) //默认顺序1            .build();}

  • 构建默认选择器条件属性
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {    SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();    selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI    selectorConditionDTO.setParamName("/");    selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match    selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**    return Collections.singletonList(selectorConditionDTO);}
  • 注册默认选择器
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) {    //选择器信息    SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);    //选择器条件属性    List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();    if (StringUtils.isEmpty(selectorDTO.getId())) {        // 向数据库插入选择器信息        selectorMapper.insertSelective(selectorDO);          // 向数据库插入选择器条件属性        selectorConditionDTOs.forEach(selectorConditionDTO -> {            selectorConditionDTO.setSelectorId(selectorDO.getId());                        selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));        });    }    // 发布同步事件,向网关同步选择信息及其条件属性    publishEvent(selectorDO, selectorConditionDTOs);    return selectorDO.getId();}
1.4.1.2 注册规则#

在注册服务的第二步中,开始构建默认规则,然后注册规则。

@Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        // 默认规则处理属性        String ruleHandler = ruleHandler();        // 构建默认规则信息        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        // 注册规则        ruleService.registerDefault(ruleDTO);                //3. 注册元数据        //......                //4. 注册ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • 默认规则处理属性
    @Override    protected String ruleHandler() {        // 默认规则处理属性        return new DivideRuleHandle().toJson();    }

Divide插件默认规则处理属性


public class DivideRuleHandle implements RuleHandle {
    /**     * 负载均衡:默认随机     */    private String loadBalance = LoadBalanceEnum.RANDOM.getName();
    /**     * 重试策略:默认重试当前服务     */    private String retryStrategy = RetryEnum.CURRENT.getName();
    /**     * 重试次数:默认3次     */    private int retry = 3;
    /**     * 调用超时:默认 3000     */    private long timeout = Constants.TIME_OUT;
    /**     * header最大值:10240 byte     */    private long headerMaxSize = Constants.HEADER_MAX_SIZE;
    /**     * request最大值:102400 byte     */    private long requestMaxSize = Constants.REQUEST_MAX_SIZE;}
  • 构建默认规则信息
  // 构建默认规则信息    private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {        return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());    }   //  构建默认规则信息    private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {        RuleDTO ruleDTO = RuleDTO.builder()                .selectorId(selectorId) //关联的选择器id                .name(ruleName) //规则名称                .matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and                .enabled(Boolean.TRUE) // 默认开启                .loged(Boolean.TRUE) //默认记录日志                .sort(1) //默认顺序 1                .handle(ruleHandler)                .build();        RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()                .paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI                .paramName("/")                .paramValue(path) //参数值path                .build();        if (path.indexOf("*") > 1) {            ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match        } else {            ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 =         }        ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));        return ruleDTO;    }
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。


    @Override    public String registerDefault(final RuleDTO ruleDTO) {        RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());        if (Objects.nonNull(exist)) {            return "";        }
        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);        List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();        if (StringUtils.isEmpty(ruleDTO.getId())) {            // 向数据库插入规则信息            ruleMapper.insertSelective(ruleDO);            //向数据库插入规则体条件属性            ruleConditions.forEach(ruleConditionDTO -> {                ruleConditionDTO.setRuleId(ruleDO.getId());                            ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));            });        }        // 向网关发布事件,进行数据同步        publishEvent(ruleDO, ruleConditions);        return ruleDO.getId();    }
1.4.1.3 注册元数据#
   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        //......                //3. 注册元数据        registerMetadata(dto);                //4. 注册ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDivideServiceImpl#registerMetadata()

    插入或更新元数据,然后发布同步事件到网关。


    @Override    protected void registerMetadata(final MetaDataRegisterDTO dto) {        if (dto.isRegisterMetaData()) { // 如果注册元数据            // 获取metaDataService            MetaDataService metaDataService = getMetaDataService();            // 元数据是否存在            MetaDataDO exist = metaDataService.findByPath(dto.getPath());            // 插入或更新元数据            metaDataService.saveOrUpdateMetaData(exist, dto);        }    }
    @Override    public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {        DataEventTypeEnum eventType;        // 数据类型转换 DTO->DO        MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);        // 插入数据        if (Objects.isNull(exist)) {            Timestamp currentTime = new Timestamp(System.currentTimeMillis());            metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());            metaDataDO.setDateCreated(currentTime);            metaDataDO.setDateUpdated(currentTime);            metaDataMapper.insert(metaDataDO);            eventType = DataEventTypeEnum.CREATE;        } else {            // 更新数据            metaDataDO.setId(exist.getId());            metaDataMapper.update(metaDataDO);            eventType = DataEventTypeEnum.UPDATE;        }        // 发布同步事件到网关        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,                Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));    }
1.4.1.4 注册ContextPath#
   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        //......                //3. 注册元数据        //......                //4. 注册ContextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
  • org.apache.shenyu.admin.service.register.AbstractContextPathRegisterService#registerContextPath()
    @Override    public void registerContextPath(final MetaDataRegisterDTO dto) {        // 设置选择器的contextPath        String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");        ContextMappingRuleHandle handle = new ContextMappingRuleHandle();        handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));        // 设置规则的contextPath        getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));    }
1.4.2 注册URI#
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

服务端收到客户端注册的URI信息后,进行处理。

    @Override    public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {        String result;        String key = key(selectorName);        try {            this.removeFallBack(key);            // 注册URI            result = this.doRegisterURI(selectorName, uriList);            logger.info("Register success: {},{}", selectorName, uriList);        } catch (Exception ex) {            logger.warn("Register exception: cause:{}", ex.getMessage());            result = "";            // 注册失败后,进行重试            this.addFallback(key, new FallbackHolder(selectorName, uriList));        }        return result;    }
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

从客户端注册的URI中获取有效的URI,更新对应的选择器handle属性,向网关发送选择器更新事件。

@Override    public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {        //参数检查        if (CollectionUtils.isEmpty(uriList)) {            return "";        }        //获取选择器信息        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        if (Objects.isNull(selectorDO)) {            throw new ShenyuException("doRegister Failed to execute,wait to retry.");        }        // 获取有效的URI        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());        // 构建选择器的handle属性        String handler = buildHandle(validUriList, selectorDO);        if (handler != null) {            selectorDO.setHandle(handler);            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));            selectorData.setHandle(handler);            // 向数据库更新选择器的handle属性            selectorService.updateSelective(selectorDO);            // 向网关发送选择器更新事件            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));        }        return ShenyuResultMessage.SUCCESS;    }

关于服务注册的源码分析就以及完成了,分析流程图如下:

接下来就分析divide插件是如何根据这些信息向http服务发起调用。

2. 服务调用#

divide插件是网关用于处理 http协议请求的核心处理插件。

以官网提供的案例 Http快速开始 为例,一个直连请求如下:

GET http://localhost:8189/order/findById?id=100Accept: application/json

通过ShenYu网关代理后,请求如下:

GET http://localhost:9195/http/order/findById?id=100Accept: application/json

通过ShenYu网关代理后的服务仍然能够请求到之前的服务,在这里起作用的就是divide插件。类继承关系如下:

  • ShenyuPlugin:顶层接口,定义接口方法;
  • AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
  • DividePlugin:Divide插件。

2.1 接收请求#

通过ShenYu网关代理后,请求入口是ShenyuWebHandler,它实现了org.springframework.web.server.WebHandler接口。

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {    //......        /**     * 处理web请求     */    @Override    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {       // 执行默认插件链        Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);        if (scheduled) {            return execute.subscribeOn(scheduler);        }        return execute;    }        private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
        private int index;
        private final List<ShenyuPlugin> plugins;
        /**         * 实例化默认插件链         */        DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {            this.plugins = plugins;        }
        /**         * 执行每个插件.         */        @Override        public Mono<Void> execute(final ServerWebExchange exchange) {            return Mono.defer(() -> {                if (this.index < plugins.size()) {                    // 获取当前执行插件                    ShenyuPlugin plugin = plugins.get(this.index++);                    // 是否跳过当前插件                    boolean skip = plugin.skip(exchange);                    if (skip) {                        // 如果跳过就执行下一个                        return this.execute(exchange);                    }                    // 执行当前插件                    return plugin.execute(exchange, this);                }                return Mono.empty();            });        }    }}

2.2 匹配规则#

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

execute()方法中执行选择器和规则的匹配逻辑。

  • 匹配选择器;
  • 匹配规则;
  • 执行插件。
@Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        // 插件名称        String pluginName = named();        // 插件信息        PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);        if (pluginData != null && pluginData.getEnabled()) {            // 选择器信息            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);            if (CollectionUtils.isEmpty(selectors)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            // 匹配选择器            SelectorData selectorData = matchSelector(exchange, selectors);            if (Objects.isNull(selectorData)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            selectorLog(selectorData, pluginName);            // 规则信息            List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());            if (CollectionUtils.isEmpty(rules)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            // 匹配规则            RuleData rule;            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {                //get last                rule = rules.get(rules.size() - 1);            } else {                rule = matchRule(exchange, rules);            }            if (Objects.isNull(rule)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            ruleLog(rule, pluginName);            // 执行插件            return doExecute(exchange, chain, selectorData, rule);        }        return chain.execute(exchange);    }

2.3 执行divide插件#

  • org.apache.shenyu.plugin.divide.DividePlugin#doExecute()

doExecute()方法中执行divide插件的具体逻辑:

  • 校验header大小;
  • 校验request大小;
  • 获取服务列表;
  • 实现负载均衡;
  • 设置请求url,超时时间,重试策略。
@Override    protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {        // 获取上下文信息        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // 获取规则的handle属性        DivideRuleHandle ruleHandle = DividePluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));        long headerSize = 0;        // 校验header大小        for (List<String> multiHeader : exchange.getRequest().getHeaders().values()) {            for (String value : multiHeader) {                headerSize += value.getBytes(StandardCharsets.UTF_8).length;            }        }        if (headerSize > ruleHandle.getHeaderMaxSize()) {            LOG.error("request header is too large");            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE, null);            return WebFluxResultUtils.result(exchange, error);        }                // 校验request大小        if (exchange.getRequest().getHeaders().getContentLength() > ruleHandle.getRequestMaxSize()) {            LOG.error("request entity is too large");            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE, null);            return WebFluxResultUtils.result(exchange, error);        }        // 获取服务列表upstreamList        List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());        if (CollectionUtils.isEmpty(upstreamList)) {            LOG.error("divide upstream configuration error: {}", rule);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);            return WebFluxResultUtils.result(exchange, error);        }        // 请求ip        String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();        // 实现负载均衡        Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip);        if (Objects.isNull(upstream)) {            LOG.error("divide has no upstream");            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);            return WebFluxResultUtils.result(exchange, error);        }        // 设置url        String domain = upstream.buildDomain();        exchange.getAttributes().put(Constants.HTTP_DOMAIN, domain);        // 设置超时时间        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());        // 设置重试策略        exchange.getAttributes().put(Constants.RETRY_STRATEGY, ruleHandle.getRetryStrategy());        exchange.getAttributes().put(Constants.LOAD_BALANCE, ruleHandle.getLoadBalance());        exchange.getAttributes().put(Constants.DIVIDE_SELECTOR_ID, selector.getId());        return chain.execute(exchange);    }

2.4 发起请求#

默认由WebClientPluginhttp服务发起调用请求,类继承关系如下:

  • ShenyuPlugin:顶层插件,定义插件方法;
  • AbstractHttpClientPlugin:抽象类,实现请求调用的公共逻辑;
  • WebClientPlugin:通过WebClient发起请求;
  • NettyHttpClientPlugin:通过Netty发起请求。

发起请求调用:

  • org.apache.shenyu.plugin.httpclient.AbstractHttpClientPlugin#execute()

execute()方法中发起请求调用:

  • 获取指定的超时时间,重试次数
  • 发起请求
  • 根据指定的重试策略进行失败后重试操作

public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class);
    @Override    public final Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        // 获取上下文信息        final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // 获取uri        final URI uri = exchange.getAttribute(Constants.HTTP_URI);        if (Objects.isNull(uri)) {            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_URL, null);            return WebFluxResultUtils.result(exchange, error);        }        // 获取指定的超时时间        final long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);        final Duration duration = Duration.ofMillis(timeout);        // 获取指定重试次数        final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);        // 获取指定的重试策略        final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName);        LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy is {}", uri.toASCIIString(), retryTimes, retryStrategy);        // 构建header        final HttpHeaders httpHeaders = buildHttpHeaders(exchange);        // 发起请求        final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethodValue(), uri, httpHeaders, exchange.getRequest().getBody())                .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))                .doOnError(e -> LOG.error(e.getMessage(), e));                // 重试策略CURRENT,对当前服务进行重试        if (RetryEnum.CURRENT.getName().equals(retryStrategy)) {            //old version of DividePlugin and SpringCloudPlugin will run on this            return response.retryWhen(Retry.anyOf(TimeoutException.class, ConnectTimeoutException.class, ReadTimeoutException.class, IllegalStateException.class)                    .retryMax(retryTimes)                    .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))                    .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))                    .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));        }                // 对其他服务进行重试        // 排除已经调用过的服务        final Set<URI> exclude = Sets.newHashSet(uri);        // 请求重试        return resend(response, exchange, duration, httpHeaders, exclude, retryTimes)                .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))                .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));    }
    private Mono<R> resend(final Mono<R> clientResponse,                           final ServerWebExchange exchange,                           final Duration duration,                           final HttpHeaders httpHeaders,                           final Set<URI> exclude,                           final int retryTimes) {        Mono<R> result = clientResponse;        // 根据指定的重试次数进行重试        for (int i = 0; i < retryTimes; i++) {            result = resend(result, exchange, duration, httpHeaders, exclude);        }        return result;    }
    private Mono<R> resend(final Mono<R> response,                           final ServerWebExchange exchange,                           final Duration duration,                           final HttpHeaders httpHeaders,                           final Set<URI> exclude) {        return response.onErrorResume(th -> {            final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID);            final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE);            //查询可用服务            final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId)                    .stream().filter(data -> {                        final String trimUri = data.getUrl().trim();                        for (URI needToExclude : exclude) {                            // exclude already called                            if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) {                                return false;                            }                        }                        return true;                    }).collect(Collectors.toList());            if (CollectionUtils.isEmpty(upstreamList)) {                // no need to retry anymore                return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));            }            // 请求ip            final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();            // 实现负载均衡            final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip);            if (Objects.isNull(upstream)) {                // no need to retry anymore                return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));            }            final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain());            // 排除已经调用的uri            exclude.add(newUri);             // 进行再次调用            return doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, httpHeaders, exchange.getRequest().getBody())                    .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))                    .doOnError(e -> LOG.error(e.getMessage(), e));        });    }
    //......}
  • org.apache.shenyu.plugin.httpclient.WebClientPlugin#doRequest()

doRequest()方法中通过webClient发起真正的请求调用。


@Override    protected Mono<ClientResponse> doRequest(final ServerWebExchange exchange, final String httpMethod, final URI uri,                                             final HttpHeaders httpHeaders, final Flux<DataBuffer> body) {        return webClient.method(HttpMethod.valueOf(httpMethod)).uri(uri) //请求uri                .headers(headers -> headers.addAll(httpHeaders)) // 请求header                .body(BodyInserters.fromDataBuffers(body))                .exchange() // 发起请求                .doOnSuccess(res -> {                    if (res.statusCode().is2xxSuccessful()) { // 成功                        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());                    } else { // 失败                        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());                    }                    exchange.getResponse().setStatusCode(res.statusCode());                    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);                });    }

2.5 处理响应结果#

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

响应结果由ResponsePlugin插件处理。

    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // 根据rpc类型处理结果        return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);    }

处理类型由MessageWriter决定,类继承关系如下:

  • MessageWriter:接口,定义消息处理方法;
  • NettyClientMessageWriter:处理Netty调用结果;
  • RPCMessageWriter:处理RPC调用结果;
  • WebClientMessageWriter:处理WebClient调用结果;

默认是通过WebCient发起http请求。

  • org.apache.shenyu.plugin.response.strategy.WebClientMessageWriter#writeWith()

writeWith()方法中处理响应结果。


    @Override    public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        return chain.execute(exchange).then(Mono.defer(() -> {            // 获取响应            ServerHttpResponse response = exchange.getResponse();            ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);            if (Objects.isNull(clientResponse)) {                Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);                return WebFluxResultUtils.result(exchange, error);            }            //获取cookies和headers            response.getCookies().putAll(clientResponse.cookies());            response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());            // image, pdf or stream does not do format processing.            // 处理特殊响应类型            if (clientResponse.headers().contentType().isPresent()) {                final String media = clientResponse.headers().contentType().get().toString().toLowerCase();                if (media.matches(COMMON_BIN_MEDIA_TYPE_REGEX)) {                    return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()))                            .doOnCancel(() -> clean(exchange));                }            }            // 处理一般响应类型            clientResponse = ResponseUtils.buildClientResponse(response, clientResponse.body(BodyExtractors.toDataBuffers()));            return clientResponse.bodyToMono(byte[].class)                    .flatMap(originData -> WebFluxResultUtils.result(exchange, originData))                    .doOnCancel(() -> clean(exchange));        }));    }

分析至此,关于Divide插件的源码分析就完成了,分析流程图如下:

3. 小结#

本文源码分析从http服务注册开始,到divide插件的服务调用。divide插件主要用来处理http请求。有些源码没有进入深入分析,比如负载均衡的实现,服务探活,将在后续继续分析。