Skip to main content

One post tagged with "plugin"

View All Tags

扩展插件加载逻辑

· One min read
hql0312 Coder

本文基于shenyu-2.6.1版本进行源码分析.

正文

Shenyu 提供了一种机制来定制自己的插件或是修改已有的插件,在其内部通过extPlugin的配置实现,其需要满足以下两点:

  1. 实现接口 ShenyuPlugin 或是 PluginDataHandler
  2. 将实现的包打包后,放置于shenyu.extPlugin.path对应的路径下

入口#

真正实现该逻辑的类是ShenyuLoaderService,接下来看下该类是如何处理

    public ShenyuLoaderService(final ShenyuWebHandler webHandler, final CommonPluginDataSubscriber subscriber, final ShenyuConfig shenyuConfig) {        // 插件信息的信息订阅        this.subscriber = subscriber;        // Shenyu封装的WebHandler,包含了所有的插件逻辑        this.webHandler = webHandler;        // 配置信息        this.shenyuConfig = shenyuConfig;        // 扩展插件的配置信息,如路径,是否启用、开启多少线程来处理、检查加载的频率等信息        ExtPlugin config = shenyuConfig.getExtPlugin();        // 如果启用的,则创建定时任务来检查并加载        if (config.getEnabled()) {            // 创建一个指定线程名称的定时任务            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(config.getThreads(), ShenyuThreadFactory.create("plugin-ext-loader", true));            // 创建固定频率执行的任务,默认在30s,每300s,执行一次            executor.scheduleAtFixedRate(() -> loadExtOrUploadPlugins(null), config.getScheduleDelay(), config.getScheduleTime(), TimeUnit.SECONDS);        }    }    

该类有以下几个属性:

webHandler: 该类是shenyu 处理请求的入口,引用了所有的插件数据,在扩展插件加载后,需要进行更新

subscriber: 该类是插件的订阅的入口,引用了所有插件的订阅处理类,在扩展配置加载后,也需要进行同步更新

executor: 在ShenyuLoaderService内部会创建一个定时任务,来定时扫描加载指定路径下的jar包,便于加载扩展的插件,实现动态发现 默认会在启动30秒后,每300秒扫描一次

同时这里可以通过 shenyu.extPlugin.enabled配置来决定是否要开启扩展插件功能的启用

以上的配置可以在配置文件中进行调整:

shenyu:  extPlugin:    path:   # 扩展插件的存储目录    enabled: true # 是否启用扩展功能    threads: 1 # 扫描加载的线程数    scheduleTime: 300 # 任务执行的频率    scheduleDelay: 30 # 任务启动后多久开始执行

接下来看下加载的逻辑:

   public void loadExtOrUploadPlugins(final PluginData uploadedJarResource) {        try {            List<ShenyuLoaderResult> plugins = new ArrayList<>();            // 获取ShenyuPluginClassloader的持有对象            ShenyuPluginClassloaderHolder singleton = ShenyuPluginClassloaderHolder.getSingleton();            if (Objects.isNull(uploadedJarResource)) {                // 参数为空,则从扩展的目录,加载所有的jar包                // PluginJar:包含ShenyuPlugin接口、PluginDataHandler接口的数据                List<PluginJarParser.PluginJar> uploadPluginJars = ShenyuExtPathPluginJarLoader.loadExtendPlugins(shenyuConfig.getExtPlugin().getPath());                // 遍历所有的待加载插件                for (PluginJarParser.PluginJar extPath : uploadPluginJars) {                    LOG.info("shenyu extPlugin find new {} to load", extPath.getAbsolutePath());                    // 使用扩展插件的加载器来加载指定的插件,便于后续的加载和卸载                    ShenyuPluginClassLoader extPathClassLoader = singleton.createPluginClassLoader(extPath);                    // 使用ShenyuPluginClassLoader 进行加载                    // 主要逻辑是:判断是否实现ShenyuPlugin接口、PluginDataHandler接口 或是否标识 @Component\@Service等注解,如果有,则注册为SpringBean                    // 构造 ShenyuLoaderResult对象                    plugins.addAll(extPathClassLoader.loadUploadedJarPlugins());                }            } else {                // 加载指定jar,逻辑同加载全部                PluginJarParser.PluginJar pluginJar = PluginJarParser.parseJar(Base64.getDecoder().decode(uploadedJarResource.getPluginJar()));                LOG.info("shenyu upload plugin jar find new {} to load", pluginJar.getJarKey());                ShenyuPluginClassLoader uploadPluginClassLoader = singleton.createPluginClassLoader(pluginJar);                plugins.addAll(uploadPluginClassLoader.loadUploadedJarPlugins());            }            // 将扩展的插件,加入到ShenyuWebHandler的插件列表,后续的请求则会经过加入的插件内容            loaderPlugins(plugins);        } catch (Exception e) {            LOG.error("shenyu plugins load has error ", e);        }    }

该方法处理的逻辑:

  1. 判断参数uploadedJarResource是否有值,如果没有,则会加载全部,否则加载指定资源jar包进行处理
  2. shenyu.extPlugin.path 中获取到指定jar包,并封装成 PluginJar对象,该对象包含了jar包以下信息
    • version: 版本信息
    • groupId:包的groupId
    • artifactId: 包的 artifactId
    • absolutePath: 绝对路径
    • clazzMap:class对应的字节码
    • resourceMap:jar包的字节码
  3. 通过ShenyuPluginClassloaderHolder创建对应的ClassLoader,对应的类是ShenyuPluginClassLoader, 并进行加载对应的类
    • 调用ShenyuPluginClassLoader.loadUploadedJarPlugins 加载对应的类并注册成Spring Bean,这样可以使用Spring容器来管理
  4. 调用loaderPlugins方法,将扩展的插件更新到 webHandler 以及 subscriber

插件注册#

对于提供的jar包里的内容,加载器只会处理指定接口类型的类,实现逻辑在 ShenyuPluginClassLoader.loadUploadedJarPlugins() 方法

public List<ShenyuLoaderResult> loadUploadedJarPlugins() {        List<ShenyuLoaderResult> results = new ArrayList<>();        // 所有的类映射关系        Set<String> names = pluginJar.getClazzMap().keySet();        // 遍历所有的类        names.forEach(className -> {            Object instance;            try {                // 尝试创建对象,如果可以,则加入到Spring容器中                instance = getOrCreateSpringBean(className);                if (Objects.nonNull(instance)) {                    // 构建ShenyuLoaderResult对象                    results.add(buildResult(instance));                    LOG.info("The class successfully loaded into a upload-Jar-plugin {} is registered as a spring bean", className);                }            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {                LOG.warn("Registering upload-Jar-plugins succeeds spring bean fails:{}", className, e);            }        });        return results;    }

该方法就是负责构建所有符合条件的对象,并封装成 ShenyuLoaderResult对象,该对象对于创建后对象,进行了封装,会在方法 buildResult()中进行处理

    private ShenyuLoaderResult buildResult(final Object instance) {        ShenyuLoaderResult result = new ShenyuLoaderResult();        // 创建的对象是否实现了ShenyuPlugin        if (instance instanceof ShenyuPlugin) {            result.setShenyuPlugin((ShenyuPlugin) instance);            // 创建的对象是否实现了PluginDataHandler        } else if (instance instanceof PluginDataHandler) {            result.setPluginDataHandler((PluginDataHandler) instance);        }        return result;    }

同时进入方法 getOrCreateSpringBean() 进一步分析

    private <T> T getOrCreateSpringBean(final String className) throws ClassNotFoundException, IllegalAccessException, InstantiationException {        // 确认是否已经注册过了,如果有则不处理,直接返回        if (SpringBeanUtils.getInstance().existBean(className)) {            return SpringBeanUtils.getInstance().getBeanByClassName(className);        }        lock.lock();        try {            // Double check,            T inst = SpringBeanUtils.getInstance().getBeanByClassName(className);            if (Objects.isNull(inst)) {                // 使用 ShenyuPluginClassLoader 进行加载类                Class<?> clazz = Class.forName(className, false, this);                //Exclude ShenyuPlugin subclass and PluginDataHandler subclass                // without adding @Component @Service annotation                // 确认是否是 ShenyuPlugin 或是 PluginDataHandler的子类                boolean next = ShenyuPlugin.class.isAssignableFrom(clazz)                        || PluginDataHandler.class.isAssignableFrom(clazz);                if (!next) {                    // 如果不是,确认是否标识了 @Component 与 @Service 注解                    Annotation[] annotations = clazz.getAnnotations();                    next = Arrays.stream(annotations).anyMatch(e -> e.annotationType().equals(Component.class)                            || e.annotationType().equals(Service.class));                }                if (next) {                    // 如果符合以上内容,则注册Bean                    GenericBeanDefinition beanDefinition = new GenericBeanDefinition();                    beanDefinition.setBeanClassName(className);                    beanDefinition.setAutowireCandidate(true);                    beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);                    // 注册bean                    String beanName = SpringBeanUtils.getInstance().registerBean(beanDefinition, this);                    // 创建对象                    inst = SpringBeanUtils.getInstance().getBeanByClassName(beanName);                }            }            return inst;        } finally {            lock.unlock();        }    }

逻辑大致如下:

  1. 判断是否实现了接口 ShenyuPluginPluginDataHandler, 如果没有,则是否标识了 @Component 或是 @Service
  2. 如果符合1的条件,则将该对象注册到Spring 容器,并返回创建的对象

同步#

在插件注册成功后,这时只是实例化了插件,但它还不会生效,因为它还未添加到 Shenyu的插件链中,同步逻辑由 loaderPlugins()方法实现

    private void loaderPlugins(final List<ShenyuLoaderResult> results) {        if (CollectionUtils.isEmpty(results)) {            return;        }        // 获取所有实现了接口ShenyuPlugin的对象        List<ShenyuPlugin> shenyuExtendPlugins = results.stream().map(ShenyuLoaderResult::getShenyuPlugin).filter(Objects::nonNull).collect(Collectors.toList());        // 同步更新webHandler中plugins        webHandler.putExtPlugins(shenyuExtendPlugins);        // 获取所有实现了接口PluginDataHandler的对象        List<PluginDataHandler> handlers = results.stream().map(ShenyuLoaderResult::getPluginDataHandler).filter(Objects::nonNull).collect(Collectors.toList());        // 同步扩展的PluginDataHandler        subscriber.putExtendPluginDataHandler(handlers);
    }

该方法的逻辑处理了两个数据:

  1. 将实现了 ShenyuPlugin 接口的数据,同步至 webHandler的plugins 列表
    public void putExtPlugins(final List<ShenyuPlugin> extPlugins) {        if (CollectionUtils.isEmpty(extPlugins)) {            return;        }        // 过滤出新增的插件        final List<ShenyuPlugin> shenyuAddPlugins = extPlugins.stream()                .filter(e -> plugins.stream().noneMatch(plugin -> plugin.named().equals(e.named())))                .collect(Collectors.toList());        // 过滤出更新的插件,以名称和旧的相同来判断,则为更新        final List<ShenyuPlugin> shenyuUpdatePlugins = extPlugins.stream()                .filter(e -> plugins.stream().anyMatch(plugin -> plugin.named().equals(e.named())))                .collect(Collectors.toList());        // 如果没有数据,则跳过        if (CollectionUtils.isEmpty(shenyuAddPlugins) && CollectionUtils.isEmpty(shenyuUpdatePlugins)) {            return;        }        // 复制旧的数据        // copy new list        List<ShenyuPlugin> newPluginList = new ArrayList<>(plugins);        // 添加新的插件数据        // Add extend plugin from pluginData or shenyu ext-lib        this.sourcePlugins.addAll(shenyuAddPlugins);        // 添加新数据        if (CollectionUtils.isNotEmpty(shenyuAddPlugins)) {            shenyuAddPlugins.forEach(plugin -> LOG.info("shenyu auto add extends plugins:{}", plugin.named()));            newPluginList.addAll(shenyuAddPlugins);        }        // 修改更新的数据        if (CollectionUtils.isNotEmpty(shenyuUpdatePlugins)) {            shenyuUpdatePlugins.forEach(plugin -> LOG.info("shenyu auto update extends plugins:{}", plugin.named()));            for (ShenyuPlugin updatePlugin : shenyuUpdatePlugins) {                for (int i = 0; i < newPluginList.size(); i++) {                    if (newPluginList.get(i).named().equals(updatePlugin.named())) {                        newPluginList.set(i, updatePlugin);                    }                }                for (int i = 0; i < this.sourcePlugins.size(); i++) {                    if (this.sourcePlugins.get(i).named().equals(updatePlugin.named())) {                        this.sourcePlugins.set(i, updatePlugin);                    }                }            }        }        // 重新排序        plugins = sortPlugins(newPluginList);    }
  1. 将实现了 PluginDataHandler 接口的数据,同步至 subscriber 的handlers 列表
    public void putExtendPluginDataHandler(final List<PluginDataHandler> handlers) {        if (CollectionUtils.isEmpty(handlers)) {            return;        }        // 遍历所有数据        for (PluginDataHandler handler : handlers) {            String pluginNamed = handler.pluginNamed();            // 更新现有的PluginDataHandler列表            MapUtils.computeIfAbsent(handlerMap, pluginNamed, name -> {                LOG.info("shenyu auto add extends plugin data handler name is :{}", pluginNamed);                return handler;            });        }    }

至此,扩展插件的加载过程分析结束。

Dubbo插件源码分析

· One min read
Apache ShenYu Committer

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

Apache ShenYu 网关使用 dubbo 插件完成对 dubbo服务的调用。你可以查看官方文档 Dubbo快速开始 了解如何使用该插件。

本文基于shenyu-2.4.3版本进行源码分析,官网的介绍请参考 Dubbo服务接入

1. 服务注册#

以官网提供的例子为例 shenyu-examples-dubbo 。 假如你的dubbo服务定义如下(spring-dubbo.xml):

<beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"       xsi:schemaLocation="http://www.springframework.org/schema/beans       http://www.springframework.org/schema/beans/spring-beans.xsd       http://code.alibabatech.com/schema/dubbo       https://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    <dubbo:application name="test-dubbo-service"/>    <dubbo:registry address="${dubbo.registry.address}"/>    <dubbo:protocol name="dubbo" port="20888"/>
    <dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>
</beans>

声明应用服务名称,注册中心地址,使用dubbo协议,声明服务接口,对应接口实现类:

/** * DubboTestServiceImpl. */@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService {        @Override    @ShenyuDubboClient(path = "/findById", desc = "Query by Id")    public DubboTest findById(final String id) {        return new DubboTest(id, "hello world shenyu Apache, findById");    }
    //......}

在接口实现类中,使用注解@ShenyuDubboClientshenyu-admin注册服务。该注解的作用及原理,稍后再进行分析。

在配置文件application.yml中的配置信息:

server:  port: 8011  address: 0.0.0.0  servlet:    context-path: /spring:  main:    allow-bean-definition-overriding: truedubbo:  registry:    address: zookeeper://localhost:2181  # dubbo使用的注册中心    shenyu:  register:    registerType: http #注册方式    serverLists: http://localhost:9095 #注册地址    props:      username: admin       password: 123456  client:    dubbo:      props:        contextPath: /dubbo          appName: dubbo

在配置文件中,声明dubbo使用的注册中心地址,dubbo服务向shenyu-admin注册,使用的方式是http,注册地址是http://localhost:9095

关于注册方式的使用,请参考 应用客户端接入

1.1 声明注册接口#

使用注解@ShenyuDubboClient将服务注册到网关。简单demo如下:

// dubbo服务@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService {        @Override    @ShenyuDubboClient(path = "/findById", desc = "Query by Id") // 需要注册的方法    public DubboTest findById(final String id) {        return new DubboTest(id, "hello world shenyu Apache, findById");    }
    //......}

注解定义:

/** * 作用于类和方法上 */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})@Inheritedpublic @interface ShenyuDubboClient {        //注册路径    String path();        //规则名称    String ruleName() default "";       //描述信息    String desc() default "";
    //是否启用    boolean enabled() default true;}

1.2 扫描注解信息#

注解扫描通过ApacheDubboServiceBeanListener完成,它实现了ApplicationListener<ContextRefreshedEvent>接口,在Spring容器启动过程中,发生上下文刷新事件时,开始执行事件处理方法onApplicationEvent()

在构造器实例化的过程中:

  • 读取属性配置
  • 开启线程池
  • 启动注册中心,用于向shenyu-admin注册
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {
    // ......
    //构造器    public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        //1.读取属性配置        Properties props = clientConfig.getProps();        String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);        String appName = props.getProperty(ShenyuClientConstants.APP_NAME);        if (StringUtils.isBlank(contextPath)) {            throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName");        }        this.contextPath = contextPath;        this.appName = appName;        this.host = props.getProperty(ShenyuClientConstants.HOST);        this.port = props.getProperty(ShenyuClientConstants.PORT);        //2.开启线程池        executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build());        //3.启动注册中心        publisher.start(shenyuClientRegisterRepository);    }
    /**     * 上下文刷新事件,执行方法逻辑     */    @Override    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {        //......    }
  • ApacheDubboServiceBeanListener#onApplicationEvent()

重写的方法逻辑:读取Dubbo服务ServiceBean,构建元数据对象和URI对象,并向shenyu-admin注册。

    @Override    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {        //读取ServiceBean        Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class);        if (serviceBean.isEmpty()) {            return;        }        //保证该方法只执行一次        if (!registered.compareAndSet(false, true)) {            return;        }        //处理元数据对象        for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) {            handler(entry.getValue());        }        //处理URI对象        serviceBean.values().stream().findFirst().ifPresent(bean -> {            publisher.publishEvent(buildURIRegisterDTO(bean));        });    }
  • handler()

    handler()方法中,从serviceBean中读取所有方法,判断方法上是否有ShenyuDubboClient注解,如果存在就构建元数据对象,并通过注册中心,向shenyu-admin注册该方法。

    private void handler(final ServiceBean<?> serviceBean) {        //获取代理对象        Object refProxy = serviceBean.getRef();        //获取class信息        Class<?> clazz = refProxy.getClass();        if (AopUtils.isAopProxy(refProxy)) {            clazz = AopUtils.getTargetClass(refProxy);        }        //获取所有方法        Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);        for (Method method : methods) {            //读取ShenyuDubboClient注解信息            ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class);            if (Objects.nonNull(shenyuDubboClient)) {                //构建元数据对象,并注册                publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method));            }        }    }
  • buildMetaDataDTO()

    构建元数据对象,在这里构建方法注册的必要信息,后续用于选择器或规则匹配。

    private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) {        //应用名称        String appName = buildAppName(serviceBean);        //方法路径        String path = contextPath + shenyuDubboClient.path();        //描述信息        String desc = shenyuDubboClient.desc();        //服务名称        String serviceName = serviceBean.getInterface();        //规则名称        String configRuleName = shenyuDubboClient.ruleName();        String ruleName = ("".equals(configRuleName)) ? path : configRuleName;        //方法名称        String methodName = method.getName();        //参数类型        Class<?>[] parameterTypesClazz = method.getParameterTypes();        String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(","));        return MetaDataRegisterDTO.builder()                .appName(appName)                .serviceName(serviceName)                .methodName(methodName)                .contextPath(contextPath)                .host(buildHost())                .port(buildPort(serviceBean))                .path(path)                .ruleName(ruleName)                .pathDesc(desc)                .parameterTypes(parameterTypes)                .rpcExt(buildRpcExt(serviceBean)) //dubbo服务的扩展信息                .rpcType(RpcTypeEnum.DUBBO.getName())                .enabled(shenyuDubboClient.enabled())                .build();    }
  • buildRpcExt()

    dubbo服务的扩展信息

       private String buildRpcExt(final ServiceBean serviceBean) {       DubboRpcExt build = DubboRpcExt.builder()               .group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//分组               .version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//版本               .loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//负载均衡策略,默认随机               .retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//重试次数,默认2               .timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//超时,默认3000               .sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent,默认false               .cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//集群策略,默认failover               .url("")               .build();       return GsonUtils.getInstance().toJson(build);   }
  • buildURIRegisterDTO()

    构建URI对象,注册服务本身的信息,后续可用于服务探活。

private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) {        return URIRegisterDTO.builder()                .contextPath(this.contextPath) //上下文路径                .appName(buildAppName(serviceBean))//应用名称                .rpcType(RpcTypeEnum.DUBBO.getName())//rpc类型:dubbo                .host(buildHost()) //host                .port(buildPort(serviceBean))//port                .build(); }

具体的注册逻辑由注册中心实现,请参考 客户端接入原理

//向注册中心,发布注册事件   publisher.publishEvent();

1.3 处理注册信息#

客户端通过注册中心注册的元数据和URI数据,在shenyu-admin端进行处理,负责存储到数据库和同步给shenyu网关。Dubbo插件的客户端注册处理逻辑在ShenyuClientRegisterDubboServiceImpl中。继承关系如下:

  • ShenyuClientRegisterService:客户端注册服务,顶层接口;
  • FallbackShenyuClientRegisterService:注册失败,提供重试操作;
  • AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
  • ShenyuClientRegisterDubboServiceImpl:实现Dubbo插件的注册;
1.3.1 注册服务#
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    客户端通过注册中心注册的元数据MetaDataRegisterDTO对象在shenyu-adminregister()方法被接送到。

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        String selectorHandler = selectorHandler(dto);        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);        //2. 注册规则        String ruleHandler = ruleHandler();        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        ruleService.registerDefault(ruleDTO);        //3. 注册元数据        registerMetadata(dto);        //4. 注册ContextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
1.3.1.1 注册选择器#
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

构建contextPath,查找选择器信息是否存在,如果存在就返回id;不存在就创建默认的选择器信息。

    @Override    public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {        // 构建contextPath        String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());        // 通过名称查找选择器信息是否存在        SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);        if (Objects.isNull(selectorDO)) {            // 不存在就创建默认的选择器信息            return registerSelector(contextPath, pluginName, selectorHandler);        }        return selectorDO.getId();    }
  • 默认选择器信息

    在这里构建默认选择器信息及其条件属性。

   //注册选择器   private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {        //构建选择器        SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());        selectorDTO.setHandle(selectorHandler);        //注册默认选择器        return registerDefault(selectorDTO);    }     //构建选择器    private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {        //构建默认选择器        SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);        selectorDTO.setPluginId(pluginId);         //构建默认选择器的条件属性        selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));        return selectorDTO;    }
  • 构建默认选择器
private SelectorDTO buildDefaultSelectorDTO(final String name) {    return SelectorDTO.builder()            .name(name) // 名称            .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义            .matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and            .enabled(Boolean.TRUE)  //默认启开启            .loged(Boolean.TRUE)  //默认记录日志            .continued(Boolean.TRUE) //默认继续后续选择器            .sort(1) //默认顺序1            .build();}
  • 构建默认选择器条件属性
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {    SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();    selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI    selectorConditionDTO.setParamName("/");    selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match    selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**    return Collections.singletonList(selectorConditionDTO);}
  • 注册默认选择器
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) {    //选择器信息    SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);    //选择器条件属性    List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();    if (StringUtils.isEmpty(selectorDTO.getId())) {        // 向数据库插入选择器信息        selectorMapper.insertSelective(selectorDO);          // 向数据库插入选择器条件属性        selectorConditionDTOs.forEach(selectorConditionDTO -> {            selectorConditionDTO.setSelectorId(selectorDO.getId());            selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));        });    }    // 发布同步事件,向网关同步选择信息及其条件属性    publishEvent(selectorDO, selectorConditionDTOs);    return selectorDO.getId();}
1.3.1.2 注册规则#

在注册服务的第二步中,开始构建默认规则,然后注册规则。

@Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        // 默认规则处理属性        String ruleHandler = ruleHandler();        // 构建默认规则信息        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        // 注册规则        ruleService.registerDefault(ruleDTO);                //3. 注册元数据        //......                //4. 注册ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • 默认规则处理属性
    @Override    protected String ruleHandler() {        // 默认规则处理属性        return new DubboRuleHandle().toJson();    }

Dubbo插件默认规则处理属性

public class DubboRuleHandle implements RuleHandle {
    /**     * dubbo服务版本信息.     */    private String version;
    /**     * 分组.     */    private String group;
    /**     * 重试次数.     */    private Integer retries = 0;
    /**     * 负载均衡策略:默认随机     */    private String loadbalance = LoadBalanceEnum.RANDOM.getName();
    /**     * 超时,默认3000     */    private long timeout = Constants.TIME_OUT;}
  • 构建默认规则信息
  // 构建默认规则信息    private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {        return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());    }   //  构建默认规则信息    private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {        RuleDTO ruleDTO = RuleDTO.builder()                .selectorId(selectorId) //关联的选择器id                .name(ruleName) //规则名称                .matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and                .enabled(Boolean.TRUE) // 默认开启                .loged(Boolean.TRUE) //默认记录日志                .sort(1) //默认顺序 1                .handle(ruleHandler)                .build();        RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()                .paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI                .paramName("/")                .paramValue(path) //参数值path                .build();        if (path.indexOf("*") > 1) {            ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match        } else {            ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 =         }        ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));        return ruleDTO;    }
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。


    @Override    public String registerDefault(final RuleDTO ruleDTO) {        RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());        if (Objects.nonNull(exist)) {            return "";        }
        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);        List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();        if (StringUtils.isEmpty(ruleDTO.getId())) {            // 向数据库插入规则信息            ruleMapper.insertSelective(ruleDO);            //向数据库插入规则体条件属性            ruleConditions.forEach(ruleConditionDTO -> {                ruleConditionDTO.setRuleId(ruleDO.getId());                ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));            });        }        // 向网关发布事件,进行数据同步        publishEvent(ruleDO, ruleConditions);        return ruleDO.getId();    }
1.3.1.3 注册元数据#

元数据主要用于RPC服务的调用。

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        //......                //3. 注册元数据        registerMetadata(dto);                //4. 注册ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDubboServiceImpl#registerMetadata()

    插入或更新元数据,然后发布同步事件到网关。

    @Override    protected void registerMetadata(final MetaDataRegisterDTO dto) {            // 获取metaDataService            MetaDataService metaDataService = getMetaDataService();            // 元数据是否存在            MetaDataDO exist = metaDataService.findByPath(dto.getPath());            // 插入或更新元数据            metaDataService.saveOrUpdateMetaData(exist, dto);    }
    @Override    public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {        DataEventTypeEnum eventType;        // 数据类型转换 DTO->DO        MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);        // 插入数据        if (Objects.isNull(exist)) {            Timestamp currentTime = new Timestamp(System.currentTimeMillis());            metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());            metaDataDO.setDateCreated(currentTime);            metaDataDO.setDateUpdated(currentTime);            metaDataMapper.insert(metaDataDO);            eventType = DataEventTypeEnum.CREATE;        } else {            // 更新数据            metaDataDO.setId(exist.getId());            metaDataMapper.update(metaDataDO);            eventType = DataEventTypeEnum.UPDATE;        }        // 发布同步事件到网关        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,                Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));    }
1.3.2 注册URI#
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

服务端收到客户端注册的URI信息后,进行处理。

    @Override    public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {        String result;        String key = key(selectorName);        try {            this.removeFallBack(key);            // 注册URI            result = this.doRegisterURI(selectorName, uriList);            logger.info("Register success: {},{}", selectorName, uriList);        } catch (Exception ex) {            logger.warn("Register exception: cause:{}", ex.getMessage());            result = "";            // 注册失败后,进行重试            this.addFallback(key, new FallbackHolder(selectorName, uriList));        }        return result;    }
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

从客户端注册的URI中获取有效的URI,更新对应的选择器handle属性,向网关发送选择器更新事件。

@Override    public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {        //参数检查        if (CollectionUtils.isEmpty(uriList)) {            return "";        }        //获取选择器信息        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        if (Objects.isNull(selectorDO)) {            throw new ShenyuException("doRegister Failed to execute,wait to retry.");        }        // 获取有效的URI        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());        // 构建选择器的handle属性        String handler = buildHandle(validUriList, selectorDO);        if (handler != null) {            selectorDO.setHandle(handler);            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));            selectorData.setHandle(handler);            // 向数据库更新选择器的handle属性            selectorService.updateSelective(selectorDO);            // 向网关发送选择器更新事件            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));        }        return ShenyuResultMessage.SUCCESS;    }

关于服务注册的源码分析就以及完成了,分析流程图如下:

接下来就分析dubbo插件是如何根据这些信息向http服务发起调用。

2. 服务调用#

dubbo插件是ShenYu网关用于将http请求转成 dubbo协议,调用dubbo服务的核心处理插件。

以官网提供的案例 Dubbo快速开始 为例,一个dubbo服务通过注册中心向shenyu-admin注册后,通过ShenYu网关代理,请求如下:

GET http://localhost:9195/dubbo/findById?id=100Accept: application/json

Dubbo插件中,类继承关系如下:

  • ShenyuPlugin:顶层接口,定义接口方法;
  • AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
  • AbstractDubboPlugin:dubbo插件抽象类,实现dubbo共有逻辑;
  • ApacheDubboPlugin:ApacheDubbo插件。

ShenYu网关支持ApacheDubbo和AlibabaDubbo

2.1 接收请求#

通过ShenYu网关代理后,请求入口是ShenyuWebHandler,它实现了org.springframework.web.server.WebHandler接口。

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {    //......        /**     * 处理web请求     */    @Override    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {       // 执行默认插件链        Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);        if (scheduled) {            return execute.subscribeOn(scheduler);        }        return execute;    }        private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
        private int index;
        private final List<ShenyuPlugin> plugins;
        /**         * 实例化默认插件链         */        DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {            this.plugins = plugins;        }
        /**         * 执行每个插件.         */        @Override        public Mono<Void> execute(final ServerWebExchange exchange) {            return Mono.defer(() -> {                if (this.index < plugins.size()) {                    // 获取当前执行插件                    ShenyuPlugin plugin = plugins.get(this.index++);                    // 是否跳过当前插件                    boolean skip = plugin.skip(exchange);                    if (skip) {                        // 如果跳过就执行下一个                        return this.execute(exchange);                    }                    // 执行当前插件                    return plugin.execute(exchange, this);                }                return Mono.empty();            });        }    }}

2.2 匹配规则#

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

execute()方法中执行选择器和规则的匹配逻辑。

  • 匹配选择器;
  • 匹配规则;
  • 执行插件。
@Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        // 插件名称        String pluginName = named();        // 插件信息        PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);        if (pluginData != null && pluginData.getEnabled()) {            // 选择器信息            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);            if (CollectionUtils.isEmpty(selectors)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            // 匹配选择器            SelectorData selectorData = matchSelector(exchange, selectors);            if (Objects.isNull(selectorData)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            selectorLog(selectorData, pluginName);            // 规则信息            List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());            if (CollectionUtils.isEmpty(rules)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            // 匹配规则            RuleData rule;            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {                //get last                rule = rules.get(rules.size() - 1);            } else {                rule = matchRule(exchange, rules);            }            if (Objects.isNull(rule)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            ruleLog(rule, pluginName);            // 执行插件            return doExecute(exchange, chain, selectorData, rule);        }        return chain.execute(exchange);    }

2.3 执行GlobalPlugin#

  • org.apache.shenyu.plugin.global.GlobalPlugin#execute()

GlobalPlugin是一个全局插件,在execute()方法中构建上下文信息。

public class GlobalPlugin implements ShenyuPlugin {    // 构建上下文信息    private final ShenyuContextBuilder builder;        //......        @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {       // 构建上下文信息,传入到 exchange 中        ShenyuContext shenyuContext = builder.build(exchange);        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);        return chain.execute(exchange);    }        //......}
  • org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()

构建默认的上下文信息。

public class DefaultShenyuContextBuilder implements ShenyuContextBuilder {    //......        @Override    public ShenyuContext build(final ServerWebExchange exchange) {        //构建参数        Pair<String, MetaData> buildData = buildData(exchange);        //包装ShenyuContext        return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());    }        private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {        //......        //根据请求的uri获取元数据        MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());        if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {            exchange.getAttributes().put(Constants.META_DATA, metaData);            return Pair.of(metaData.getRpcType(), metaData);        } else {            return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());        }    }    //设置默认的上下文信息    private ShenyuContext buildDefaultContext(final ServerHttpRequest request) {        String appKey = request.getHeaders().getFirst(Constants.APP_KEY);        String sign = request.getHeaders().getFirst(Constants.SIGN);        String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);        ShenyuContext shenyuContext = new ShenyuContext();        String path = request.getURI().getPath();        shenyuContext.setPath(path); //请求路径        shenyuContext.setAppKey(appKey);        shenyuContext.setSign(sign);        shenyuContext.setTimestamp(timestamp);        shenyuContext.setStartDateTime(LocalDateTime.now());        Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name()));//请求方法        return shenyuContext;    } }
  • org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()

包装ShenyuContext

public class DubboShenyuContextDecorator implements ShenyuContextDecorator {        @Override    public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) {        shenyuContext.setModule(metaData.getAppName());//获取AppName        shenyuContext.setMethod(metaData.getServiceName()); //获取ServiceName        shenyuContext.setContextPath(metaData.getContextPath()); //获取contextPath        shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName()); // dubbo服务        return shenyuContext;    }        @Override    public String rpcType() {        return RpcTypeEnum.DUBBO.getName();    }}

2.4 执行RpcParamTransformPlugin#

RpcParamTransformPlugin负责从http请求中读取参数,保存到exchange中,传递给rpc服务。

  • org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()

execute()方法中,执行该插件的核心逻辑:从exchange中获取请求信息,根据请求传入的内容形式处理参数。

public class RpcParamTransformPlugin implements ShenyuPlugin {
    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        //从exchange中获取请求信息        ServerHttpRequest request = exchange.getRequest();        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        if (Objects.nonNull(shenyuContext)) {           // 如果请求参数格式是APPLICATION_JSON            MediaType mediaType = request.getHeaders().getContentType();            if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {                return body(exchange, request, chain);            }            // 如果请求参数格式是APPLICATION_FORM_URLENCODED            if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {                return formData(exchange, request, chain);            }            //一般查询请求            return query(exchange, request, chain);        }        return chain.execute(exchange);    }        // 如果请求参数格式是APPLICATION_JSON    private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())                .flatMap(body -> {                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中                    return chain.execute(exchange);                }));    }   // 如果请求参数格式是APPLICATION_FORM_URLENCODED    private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())                .flatMap(map -> {                    String param = resolveBodyFromRequest(map);                    LinkedMultiValueMap<String, String> linkedMultiValueMap;                    try {                        linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据                    } catch (UnsupportedEncodingException e) {                        return Mono.error(e);                    }                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中                    return chain.execute(exchange);                }));    }    //一般查询请求    private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中        return chain.execute(exchange);    }    //...... }

2.5 执行DubboPlugin#

  • org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()

doExecute()方法中,主要是检查元数据和参数。

public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin {        @Override    public Mono<Void> doExecute(final ServerWebExchange exchange,                                   final ShenyuPluginChain chain,                                   final SelectorData selector,                                   final RuleData rule) {        //获取参数        String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);        //获取上下文信息        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        //获取元数据        MetaData metaData = exchange.getAttribute(Constants.META_DATA);        //检查元数据        if (!checkMetaData(metaData)) {            LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null);            return WebFluxResultUtils.result(exchange, error);        }        //检查元数据和参数        if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null);            return WebFluxResultUtils.result(exchange, error);        }        //设置rpcContext        this.rpcContext(exchange);        //进行dubbo服务调用        return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);    }}
  • org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()

doDubboInvoker()方法中设置特殊的上下文信息,然后开始dubbo的泛化调用。

public class ApacheDubboPlugin extends AbstractDubboPlugin {        @Override    protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,                                        final ShenyuPluginChain chain,                                        final SelectorData selector,                                        final RuleData rule,                                        final MetaData metaData,                                        final String param) {        //设置当前的选择器和规则信息,以及请求地址,用于支持dubbo的灰度        RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());        RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());        RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());        //dubbo的泛化调用        final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);        //执行下一个插件        return result.then(chain.execute(exchange));    }}
  • org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()

genericInvoker()方法:

  • 获取ReferenceConfig对象;
  • 获取泛化服务GenericService对象;
  • 构造请求参数pair对象;
  • 发起异步的泛化调用。
public class ApacheDubboProxyService {    //...... 
    /**     * Generic invoker object.     */    public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {        //1.获取ReferenceConfig对象        ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath());        //如果没有获取到        if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {            //失效当前缓存的信息            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());            //使用元数据进行再次初始化            reference = ApacheDubboConfigCache.getInstance().initRef(metaData);        }        //2.获取泛化服务GenericService对象        GenericService genericService = reference.get();        //3.构造请求参数pair对象        Pair<String[], Object[]> pair;        if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) {            pair = new ImmutablePair<>(new String[]{}, new Object[]{});        } else {            pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());        }        //4.发起异步的泛化调用        return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {            //处理结果            if (Objects.isNull(ret)) {                ret = Constants.DUBBO_RPC_RESULT_EMPTY;            }            exchange.getAttributes().put(Constants.RPC_RESULT, ret);            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());            return ret;        })).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常    }        //泛化调用,异步操作    private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {        genericService.$invoke(method, parameterTypes, args);        Object resultFromFuture = RpcContext.getContext().getFuture();        return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);    }}

通过泛化调用就可以实现在网关调用dubbo服务了。

ReferenceConfig对象是支持泛化调用的关键对象 ,它的初始化操作是在数据同步的时候完成的。这里涉及两部分数据,一是同步的插件handler信息,二是同步的插件元数据信息。

  • org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()

当插件数据更新时,数据同步模块会将数据从shenyu-admin同步到网关。在handlerPlugin()中执行初始化操作。

public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {    //......        //初始化配置缓存   protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);
    @Override    public void handlerPlugin(final PluginData pluginData) {        if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {            //数据反序列化            DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);            DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);            if (Objects.isNull(dubboRegisterConfig)) {                return;            }            if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {                // 执行初始化操作                this.initConfigCache(dubboRegisterConfig);            }            Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);        }    }    //......}
  • org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()

执行初始化操作。

public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {
    @Override    protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {        //执行初始化操作        ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);        //失效之前缓存的结果        ApacheDubboConfigCache.getInstance().invalidateAll();    }}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()

在初始化中,设置registryConfigconsumerConfig

public final class ApacheDubboConfigCache extends DubboConfigCache {    //......     /**     * 初始化     */    public void init(final DubboRegisterConfig dubboRegisterConfig) {        //创建ApplicationConfig        if (Objects.isNull(applicationConfig)) {            applicationConfig = new ApplicationConfig("shenyu_proxy");        }        //协议或者地址发生改变时,需要更新registryConfig        if (needUpdateRegistryConfig(dubboRegisterConfig)) {            RegistryConfig registryConfigTemp = new RegistryConfig();            registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol());            registryConfigTemp.setId("shenyu_proxy");            registryConfigTemp.setRegister(false);            registryConfigTemp.setAddress(dubboRegisterConfig.getRegister());            Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup);            registryConfig = registryConfigTemp;        }        //创建ConsumerConfig        if (Objects.isNull(consumerConfig)) {            consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> {                ConsumerConfig consumerConfig = new ConsumerConfig();                consumerConfig.refresh();                return consumerConfig;            });            //设置ConsumerConfig            Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool);             Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads);             Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads);             Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues);        }    }        //是否需要更新注册配置    private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) {        if (Objects.isNull(registryConfig)) {            return true;        }        return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol())                || !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress())                || !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol());    }
    //......}
  • org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()

当元数据更新时,数据同步模块会将数据从shenyu-admin同步到网关。在onSubscribe()方法中执行元数据更新操作。

public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber {    //本地内存缓存    private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();
    //元数据发生更新    public void onSubscribe(final MetaData metaData) {        // dubbo服务的元数据更新        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {            //对应的元数据是否存在            MetaData exist = META_DATA.get(metaData.getPath());            if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) {                // 首次初始化                ApacheDubboConfigCache.getInstance().initRef(metaData);            } else {                // 对应的元数据发生了更新操作                if (!Objects.equals(metaData.getServiceName(), exist.getServiceName())                        || !Objects.equals(metaData.getRpcExt(), exist.getRpcExt())                        || !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes())                        || !Objects.equals(metaData.getMethodName(), exist.getMethodName())) {                    //根据最新的元数据再次构建ReferenceConfig                    ApacheDubboConfigCache.getInstance().build(metaData);                }            }            //本地内存缓存            META_DATA.put(metaData.getPath(), metaData);        }    }
    //删除元数据    public void unSubscribe(final MetaData metaData) {        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {            //使ReferenceConfig失效            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());            META_DATA.remove(metaData.getPath());        }    }}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()

通过metaData构建ReferenceConfig对象。

public final class ApacheDubboConfigCache extends DubboConfigCache {    //......        public ReferenceConfig<GenericService> initRef(final MetaData metaData) {            try {                //先尝试从缓存中获取,存在就直接返回                ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());                if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {                    return referenceConfig;                }            } catch (ExecutionException e) {                LOG.error("init dubbo ref exception", e);            }           //不存在,就构建            return build(metaData);        }
        /**         * Build reference config.         */        @SuppressWarnings("deprecation")        public ReferenceConfig<GenericService> build(final MetaData metaData) {            if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) {                return new ReferenceConfig<>();            }            ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //新建ReferenceConfig            reference.setGeneric("true"); //泛化调用            reference.setAsync(true);//支持异步
            reference.setApplication(applicationConfig);//设置应用配置            reference.setRegistry(registryConfig);//设置注册中心配置            reference.setConsumer(consumerConfig);//设置消费者配置            reference.setInterface(metaData.getServiceName());//设置服务接口            reference.setProtocol("dubbo");//设置dubbo协议            reference.setCheck(false); //不检查 service provider            reference.setLoadbalance("gray");//支持灰度
            Map<String, String> parameters = new HashMap<>(2);            parameters.put("dispatcher", "direct");            reference.setParameters(parameters);//自定义参数
            String rpcExt = metaData.getRpcExt();//rpc扩展参数            DubboParam dubboParam = parserToDubboParam(rpcExt);//反序列化            if (Objects.nonNull(dubboParam)) {                if (StringUtils.isNoneBlank(dubboParam.getVersion())) {                    reference.setVersion(dubboParam.getVersion());//设置版本                }                if (StringUtils.isNoneBlank(dubboParam.getGroup())) {                    reference.setGroup(dubboParam.getGroup());//设置分组                }                if (StringUtils.isNoneBlank(dubboParam.getUrl())) {                    reference.setUrl(dubboParam.getUrl());//设置url                }                if (StringUtils.isNoneBlank(dubboParam.getCluster())) {                    reference.setCluster(dubboParam.getCluster());//设置Cluster type                }                Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout                Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires                Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent            }            try {                //获取GenericService                Object obj = reference.get();                if (Objects.nonNull(obj)) {                    LOG.info("init apache dubbo reference success there meteData is :{}", metaData);                    //缓存当前的reference                    cache.put(metaData.getPath(), reference);                }            } catch (Exception e) {                LOG.error("init apache dubbo reference exception", e);            }            return reference;        }    //......    }

2.6 执行ResponsePlugin#

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

响应结果由ResponsePlugin插件处理。

    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // 根据rpc类型处理结果        return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);    }

处理类型由MessageWriter决定,类继承关系如下:

  • MessageWriter:接口,定义消息处理方法;
  • NettyClientMessageWriter:处理Netty调用结果;
  • RPCMessageWriter:处理RPC调用结果;
  • WebClientMessageWriter:处理WebClient调用结果;

Dubbo服务调用,处理结果当然是RPCMessageWriter了。

  • org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()

writeWith()方法中处理响应结果。


public class RPCMessageWriter implements MessageWriter {
    @Override    public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        return chain.execute(exchange).then(Mono.defer(() -> {            Object result = exchange.getAttribute(Constants.RPC_RESULT); //获取结果            if (Objects.isNull(result)) { //处理异常                Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);                return WebFluxResultUtils.result(exchange, error);            }            return WebFluxResultUtils.result(exchange, result);//返回结果        }));    }}

分析至此,关于Dubbo插件的源码分析就完成了,分析流程图如下:

3. 小结#

本文源码分析从Dubbo服务注册开始,到Dubbo插件的服务调用。Dubbo插件主要用来处理将http请求转成dubbo协议,主要逻辑是通过泛化调用实现。

Divide插件源码分析

· One min read
Apache ShenYu Committer

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu 网关使用 divide 插件来处理 http 请求。你可以查看官方文档 Http快速开始 了解如何使用该插件。

本文基于shenyu-2.4.3版本进行源码分析,官网的介绍请参考 Http服务接入

1. 服务注册#

1.1 声明注册接口#

使用注解@ShenyuSpringMvcClient将服务注册到网关。简单demo如下:

@RestController@RequestMapping("/order")@ShenyuSpringMvcClient(path = "/order")  // API注册public class OrderController {    @GetMapping("/findById")    @ShenyuSpringMvcClient(path = "/findById", desc = "Find by id") // 方法注册    public OrderDTO findById(@RequestParam("id") final String id) {        return build(id, "hello world findById");    }}

注解定义:


/** * 作用于类和方法上 */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})public @interface ShenyuSpringMvcClient {        //注册路径    String path() default "";        //规则名称    String ruleName() default "";       //描述信息    String desc() default "";
    //是否启用    boolean enabled() default true;        //注册元数据    boolean registerMetaData() default false;}

1.2 扫描注解信息#

注解扫描通过SpringMvcClientBeanPostProcessor完成,它实现了BeanPostProcessor接口,是Spring提供的后置处理器。

在构造器实例化的过程中:

  • 读取属性配置
  • 添加注解,读取path信息
  • 启动注册中心,向shenyu-admin注册
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {    //...    /**     * 构造器实例化     */    public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,                                            final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        // 1. 读取属性配置        Properties props = clientConfig.getProps();        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);        this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH, "");        if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {            String errorMsg = "http register param must config the appName or contextPath";            LOG.error(errorMsg);            throw new ShenyuClientIllegalArgumentException(errorMsg);        }        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));        // 2. 添加注解        mappingAnnotation.add(ShenyuSpringMvcClient.class);        mappingAnnotation.add(PostMapping.class);        mappingAnnotation.add(GetMapping.class);        mappingAnnotation.add(DeleteMapping.class);        mappingAnnotation.add(PutMapping.class);        mappingAnnotation.add(RequestMapping.class);        // 3. 启动注册中心        publisher.start(shenyuClientRegisterRepository);    }        @Override    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {       // 重写后置处理器逻辑                return bean;    }    
  • SpringMvcClientBeanPostProcessor#postProcessAfterInitialization()

重写后置处理器逻辑:读取注解信息,构建元数据对象和URI对象,并向shenyu-admin注册。

    @Override    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {        // 1. 如果是注册整个服务或者不是Controller类,就不处理        if (Boolean.TRUE.equals(isFull) || !hasAnnotation(bean.getClass(), Controller.class)) {            return bean;        }        // 2. 读取类上的注解 ShenyuSpringMvcClient        final ShenyuSpringMvcClient beanShenyuClient = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);        // 2.1构建superPath        final String superPath = buildApiSuperPath(bean.getClass());        // 2.2 是否注册整个类方法        if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {            // 构建元数据对象,然后向shenyu-admin注册            publisher.publishEvent(buildMetaDataDTO(beanShenyuClient, pathJoin(contextPath, superPath)));            return bean;        }        // 3. 读取所有方法        final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());        for (Method method : methods) {            // 3.1 读取方法上的注解 ShenyuSpringMvcClient            ShenyuSpringMvcClient methodShenyuClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);            // 如果方法上面没有注解,就用类上面的注解            methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;            if (Objects.nonNull(methodShenyuClient)) {               // 3.2 构建path信息,构建元数据对象,向shenyu-admin注册                publisher.publishEvent(buildMetaDataDTO(methodShenyuClient, buildApiPath(method, superPath)));            }        }                return bean;    }
  • 1.如果是注册整个服务或者不是Controller类,就不处理
  • 2.读取类上的注解 ShenyuSpringMvcClient,如果是注册整个类,就在这里构建元数据对象,然后向shenyu-admin注册
  • 3.处理方法上的注解 ShenyuSpringMvcClient,针对特定方法构建path信息,构建元数据对象,然后向shenyu-admin注册

这里有两个取path的方法,需要特别说明一下:

  • buildApiSuperPath()

    构造SuperPath:先从类上的注解ShenyuSpringMvcClientpath属性,如果没有,就从当前类的RequestMapping注解中取path信息。

    private String buildApiSuperPath(@NonNull final Class<?> method) {        // 先从类上的注解ShenyuSpringMvcClient取path属性        ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);        if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {            return shenyuSpringMvcClient.path();        }        // 从当前类的RequestMapping注解中取path信息        RequestMapping requestMapping = AnnotationUtils.findAnnotation(method, RequestMapping.class);        if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {            return requestMapping.path()[0];        }        return "";    }
  • buildApiPath()

    构建path:先读取方法上的注解ShenyuSpringMvcClient,如果存在就构建;否则从方法的其他注解上获取path信息;完整的path = contextPath(上下文信息)+superPath(类信息)+methodPath(方法信息)

    private String buildApiPath(@NonNull final Method method, @NonNull final String superPath) {        // 1. 读取方法上的注解ShenyuSpringMvcClient        ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);        // 1.1如果存在path,就构建        if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {            //1.2完整 path = contextPath+superPath+methodPath            return pathJoin(contextPath, superPath, shenyuSpringMvcClient.path());        }        // 2.从方法的其他注解上获取path信息        final String path = getPathByMethod(method);        if (StringUtils.isNotBlank(path)) {             // 2.1 完整的path = contextPath+superPath+methodPath            return pathJoin(contextPath, superPath, path);        }        return pathJoin(contextPath, superPath);    }
  • getPathByMethod()

    从方法的其他注解上获取path信息,其他注解包括:

    • ShenyuSpringMvcClient
    • PostMapping
    • GetMapping
    • DeleteMapping
    • PutMapping
    • RequestMapping

    private String getPathByMethod(@NonNull final Method method) {        // 遍历接口注解获取path信息        for (Class<? extends Annotation> mapping : mappingAnnotation) {            final String pathByAnnotation = getPathByAnnotation(AnnotationUtils.findAnnotation(method, mapping), pathAttributeNames);            if (StringUtils.isNotBlank(pathByAnnotation)) {                return pathByAnnotation;            }        }        return null;    }

扫描注解完成后,构建元数据对象,然后将该对象发送到shenyu-admin,即可完成注册。

  • 元数据对象

    包括当前注册方法的规则信息:contextPath,appName,注册路径,描述信息,注册类型,是否启用,规则名称和是否注册元数据。

 private MetaDataRegisterDTO buildMetaDataDTO(@NonNull final ShenyuSpringMvcClient shenyuSpringMvcClient, final String path) {        return MetaDataRegisterDTO.builder()                .contextPath(contextPath) // contextPath                .appName(appName) // appName                .path(path) // 注册路径,在网关规则匹配时使用                .pathDesc(shenyuSpringMvcClient.desc()) // 描述信息                .rpcType(RpcTypeEnum.HTTP.getName()) // divide插件,默认时http类型                .enabled(shenyuSpringMvcClient.enabled()) // 是否启用规则                .ruleName(StringUtils.defaultIfBlank(shenyuSpringMvcClient.ruleName(), path))//规则名称                .registerMetaData(shenyuSpringMvcClient.registerMetaData()) //是否注册元数据信息                .build();    }

具体的注册逻辑由注册中心实现,在之前的文章中已经分析过了,这里就不再深入分析。

1.3 注册URI信息#

ContextRegisterListener负责将客户端的URI信息注册到shenyu-admin,它实现了ApplicationListener接口,发生上下文刷新事件ContextRefreshedEvent时,执行onApplicationEvent()方法,实现注册逻辑。


public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent>, BeanFactoryAware {    //......        /**     * 构造器实例化     */    public ContextRegisterListener(final PropertiesConfig clientConfig) {        // 读取属性配置        final Properties props = clientConfig.getProps();        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));        this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);        if (Boolean.TRUE.equals(isFull)) {            if (StringUtils.isBlank(contextPath)) {                final String errorMsg = "http register param must config the contextPath";                LOG.error(errorMsg);                throw new ShenyuClientIllegalArgumentException(errorMsg);            }        }        this.port = Integer.parseInt(Optional.ofNullable(props.getProperty(ShenyuClientConstants.PORT)).orElseGet(() -> "-1"));        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);        this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);        this.host = props.getProperty(ShenyuClientConstants.HOST);    }
    @Override    public void setBeanFactory(final BeanFactory beanFactory) throws BeansException {        this.beanFactory = beanFactory;    }
    // 执行应用事件    @Override    public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {        // 保证该方法执行一次        if (!registered.compareAndSet(false, true)) {            return;        }        // 1. 如果是注册整个服务        if (Boolean.TRUE.equals(isFull)) {            // 构建元数据,并注册            publisher.publishEvent(buildMetaDataDTO());        }        try {            // 获取端口信息            final int mergedPort = port <= 0 ? PortUtils.findPort(beanFactory) : port;            // 2. 构建URI数据,并注册            publisher.publishEvent(buildURIRegisterDTO(mergedPort));        } catch (ShenyuException e) {            throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");        }    }
    // 构建URI数据    private URIRegisterDTO buildURIRegisterDTO(final int port) {        return URIRegisterDTO.builder()            .contextPath(this.contextPath) // contextPath            .appName(appName) // appName            .protocol(protocol) // 服务使用的协议            .host(IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host)) //主机            .port(port) // 端口            .rpcType(RpcTypeEnum.HTTP.getName()) // divide插件,默认注册http类型            .build();    }
    // 构建元数据    private MetaDataRegisterDTO buildMetaDataDTO() {        return MetaDataRegisterDTO.builder()            .contextPath(contextPath)            .appName(appName)            .path(contextPath)            .rpcType(RpcTypeEnum.HTTP.getName())            .enabled(true)            .ruleName(contextPath)            .build();    }}

1.4 处理注册信息#

客户端通过注册中心注册的元数据和URI数据,在shenyu-admin进行处理,负责存储到数据库和同步给shenyu网关。Divide插件的客户端注册处理逻辑在ShenyuClientRegisterDivideServiceImpl中。继承关系如下:

  • ShenyuClientRegisterService:客户端注册服务,顶层接口;
  • FallbackShenyuClientRegisterService:注册失败,提供重试操作;
  • AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
  • AbstractContextPathRegisterService:抽象类,负责注册ContextPath
  • ShenyuClientRegisterDivideServiceImpl:实现Divide插件的注册;
1.4.1 注册服务#
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    客户端通过注册中心注册的元数据MetaDataRegisterDTO对象在shenyu-adminregister()方法被接送到。

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        String selectorHandler = selectorHandler(dto);        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);        //2. 注册规则        String ruleHandler = ruleHandler();        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        ruleService.registerDefault(ruleDTO);        //3. 注册元数据        registerMetadata(dto);        //4. 注册ContextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
1.4.1.1 注册选择器#
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

构建contextPath,查找选择器信息是否存在,如果存在就返回id;不存在就创建默认的选择器信息。

    @Override    public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {        // 构建contextPath        String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());        // 通过名称查找选择器信息是否存在        SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);        if (Objects.isNull(selectorDO)) {            // 不存在就创建默认的选择器信息            return registerSelector(contextPath, pluginName, selectorHandler);        }        return selectorDO.getId();    }
  • 默认选择器信息

    在这里构建默认选择器信息及其条件属性。

   //注册选择器   private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {        //构建选择器        SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());        selectorDTO.setHandle(selectorHandler);        //注册默认选择器        return registerDefault(selectorDTO);    }     //构建选择器    private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {        //构建默认选择器        SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);        selectorDTO.setPluginId(pluginId);         //构建默认选择器的条件属性        selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));        return selectorDTO;    }
  • 构建默认选择器
private SelectorDTO buildDefaultSelectorDTO(final String name) {    return SelectorDTO.builder()            .name(name) // 名称            .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义            .matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and            .enabled(Boolean.TRUE)  //默认启开启            .loged(Boolean.TRUE)  //默认记录日志            .continued(Boolean.TRUE) //默认继续后续选择器            .sort(1) //默认顺序1            .build();}

  • 构建默认选择器条件属性
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {    SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();    selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI    selectorConditionDTO.setParamName("/");    selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match    selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**    return Collections.singletonList(selectorConditionDTO);}
  • 注册默认选择器
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) {    //选择器信息    SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);    //选择器条件属性    List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();    if (StringUtils.isEmpty(selectorDTO.getId())) {        // 向数据库插入选择器信息        selectorMapper.insertSelective(selectorDO);          // 向数据库插入选择器条件属性        selectorConditionDTOs.forEach(selectorConditionDTO -> {            selectorConditionDTO.setSelectorId(selectorDO.getId());                        selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));        });    }    // 发布同步事件,向网关同步选择信息及其条件属性    publishEvent(selectorDO, selectorConditionDTOs);    return selectorDO.getId();}
1.4.1.2 注册规则#

在注册服务的第二步中,开始构建默认规则,然后注册规则。

@Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        // 默认规则处理属性        String ruleHandler = ruleHandler();        // 构建默认规则信息        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        // 注册规则        ruleService.registerDefault(ruleDTO);                //3. 注册元数据        //......                //4. 注册ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • 默认规则处理属性
    @Override    protected String ruleHandler() {        // 默认规则处理属性        return new DivideRuleHandle().toJson();    }

Divide插件默认规则处理属性


public class DivideRuleHandle implements RuleHandle {
    /**     * 负载均衡:默认随机     */    private String loadBalance = LoadBalanceEnum.RANDOM.getName();
    /**     * 重试策略:默认重试当前服务     */    private String retryStrategy = RetryEnum.CURRENT.getName();
    /**     * 重试次数:默认3次     */    private int retry = 3;
    /**     * 调用超时:默认 3000     */    private long timeout = Constants.TIME_OUT;
    /**     * header最大值:10240 byte     */    private long headerMaxSize = Constants.HEADER_MAX_SIZE;
    /**     * request最大值:102400 byte     */    private long requestMaxSize = Constants.REQUEST_MAX_SIZE;}
  • 构建默认规则信息
  // 构建默认规则信息    private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {        return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());    }   //  构建默认规则信息    private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {        RuleDTO ruleDTO = RuleDTO.builder()                .selectorId(selectorId) //关联的选择器id                .name(ruleName) //规则名称                .matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and                .enabled(Boolean.TRUE) // 默认开启                .loged(Boolean.TRUE) //默认记录日志                .sort(1) //默认顺序 1                .handle(ruleHandler)                .build();        RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()                .paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI                .paramName("/")                .paramValue(path) //参数值path                .build();        if (path.indexOf("*") > 1) {            ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match        } else {            ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 =         }        ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));        return ruleDTO;    }
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。


    @Override    public String registerDefault(final RuleDTO ruleDTO) {        RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());        if (Objects.nonNull(exist)) {            return "";        }
        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);        List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();        if (StringUtils.isEmpty(ruleDTO.getId())) {            // 向数据库插入规则信息            ruleMapper.insertSelective(ruleDO);            //向数据库插入规则体条件属性            ruleConditions.forEach(ruleConditionDTO -> {                ruleConditionDTO.setRuleId(ruleDO.getId());                            ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));            });        }        // 向网关发布事件,进行数据同步        publishEvent(ruleDO, ruleConditions);        return ruleDO.getId();    }
1.4.1.3 注册元数据#
   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        //......                //3. 注册元数据        registerMetadata(dto);                //4. 注册ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDivideServiceImpl#registerMetadata()

    插入或更新元数据,然后发布同步事件到网关。


    @Override    protected void registerMetadata(final MetaDataRegisterDTO dto) {        if (dto.isRegisterMetaData()) { // 如果注册元数据            // 获取metaDataService            MetaDataService metaDataService = getMetaDataService();            // 元数据是否存在            MetaDataDO exist = metaDataService.findByPath(dto.getPath());            // 插入或更新元数据            metaDataService.saveOrUpdateMetaData(exist, dto);        }    }
    @Override    public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {        DataEventTypeEnum eventType;        // 数据类型转换 DTO->DO        MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);        // 插入数据        if (Objects.isNull(exist)) {            Timestamp currentTime = new Timestamp(System.currentTimeMillis());            metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());            metaDataDO.setDateCreated(currentTime);            metaDataDO.setDateUpdated(currentTime);            metaDataMapper.insert(metaDataDO);            eventType = DataEventTypeEnum.CREATE;        } else {            // 更新数据            metaDataDO.setId(exist.getId());            metaDataMapper.update(metaDataDO);            eventType = DataEventTypeEnum.UPDATE;        }        // 发布同步事件到网关        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,                Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));    }
1.4.1.4 注册ContextPath#
   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        //......                //3. 注册元数据        //......                //4. 注册ContextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
  • org.apache.shenyu.admin.service.register.AbstractContextPathRegisterService#registerContextPath()
    @Override    public void registerContextPath(final MetaDataRegisterDTO dto) {        // 设置选择器的contextPath        String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");        ContextMappingRuleHandle handle = new ContextMappingRuleHandle();        handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));        // 设置规则的contextPath        getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));    }
1.4.2 注册URI#
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

服务端收到客户端注册的URI信息后,进行处理。

    @Override    public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {        String result;        String key = key(selectorName);        try {            this.removeFallBack(key);            // 注册URI            result = this.doRegisterURI(selectorName, uriList);            logger.info("Register success: {},{}", selectorName, uriList);        } catch (Exception ex) {            logger.warn("Register exception: cause:{}", ex.getMessage());            result = "";            // 注册失败后,进行重试            this.addFallback(key, new FallbackHolder(selectorName, uriList));        }        return result;    }
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

从客户端注册的URI中获取有效的URI,更新对应的选择器handle属性,向网关发送选择器更新事件。

@Override    public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {        //参数检查        if (CollectionUtils.isEmpty(uriList)) {            return "";        }        //获取选择器信息        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        if (Objects.isNull(selectorDO)) {            throw new ShenyuException("doRegister Failed to execute,wait to retry.");        }        // 获取有效的URI        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());        // 构建选择器的handle属性        String handler = buildHandle(validUriList, selectorDO);        if (handler != null) {            selectorDO.setHandle(handler);            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));            selectorData.setHandle(handler);            // 向数据库更新选择器的handle属性            selectorService.updateSelective(selectorDO);            // 向网关发送选择器更新事件            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));        }        return ShenyuResultMessage.SUCCESS;    }

关于服务注册的源码分析就以及完成了,分析流程图如下:

接下来就分析divide插件是如何根据这些信息向http服务发起调用。

2. 服务调用#

divide插件是网关用于处理 http协议请求的核心处理插件。

以官网提供的案例 Http快速开始 为例,一个直连请求如下:

GET http://localhost:8189/order/findById?id=100Accept: application/json

通过ShenYu网关代理后,请求如下:

GET http://localhost:9195/http/order/findById?id=100Accept: application/json

通过ShenYu网关代理后的服务仍然能够请求到之前的服务,在这里起作用的就是divide插件。类继承关系如下:

  • ShenyuPlugin:顶层接口,定义接口方法;
  • AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
  • DividePlugin:Divide插件。

2.1 接收请求#

通过ShenYu网关代理后,请求入口是ShenyuWebHandler,它实现了org.springframework.web.server.WebHandler接口。

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {    //......        /**     * 处理web请求     */    @Override    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {       // 执行默认插件链        Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);        if (scheduled) {            return execute.subscribeOn(scheduler);        }        return execute;    }        private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
        private int index;
        private final List<ShenyuPlugin> plugins;
        /**         * 实例化默认插件链         */        DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {            this.plugins = plugins;        }
        /**         * 执行每个插件.         */        @Override        public Mono<Void> execute(final ServerWebExchange exchange) {            return Mono.defer(() -> {                if (this.index < plugins.size()) {                    // 获取当前执行插件                    ShenyuPlugin plugin = plugins.get(this.index++);                    // 是否跳过当前插件                    boolean skip = plugin.skip(exchange);                    if (skip) {                        // 如果跳过就执行下一个                        return this.execute(exchange);                    }                    // 执行当前插件                    return plugin.execute(exchange, this);                }                return Mono.empty();            });        }    }}

2.2 匹配规则#

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

execute()方法中执行选择器和规则的匹配逻辑。

  • 匹配选择器;
  • 匹配规则;
  • 执行插件。
@Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        // 插件名称        String pluginName = named();        // 插件信息        PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);        if (pluginData != null && pluginData.getEnabled()) {            // 选择器信息            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);            if (CollectionUtils.isEmpty(selectors)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            // 匹配选择器            SelectorData selectorData = matchSelector(exchange, selectors);            if (Objects.isNull(selectorData)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            selectorLog(selectorData, pluginName);            // 规则信息            List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());            if (CollectionUtils.isEmpty(rules)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            // 匹配规则            RuleData rule;            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {                //get last                rule = rules.get(rules.size() - 1);            } else {                rule = matchRule(exchange, rules);            }            if (Objects.isNull(rule)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            ruleLog(rule, pluginName);            // 执行插件            return doExecute(exchange, chain, selectorData, rule);        }        return chain.execute(exchange);    }

2.3 执行divide插件#

  • org.apache.shenyu.plugin.divide.DividePlugin#doExecute()

doExecute()方法中执行divide插件的具体逻辑:

  • 校验header大小;
  • 校验request大小;
  • 获取服务列表;
  • 实现负载均衡;
  • 设置请求url,超时时间,重试策略。
@Override    protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {        // 获取上下文信息        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // 获取规则的handle属性        DivideRuleHandle ruleHandle = DividePluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));        long headerSize = 0;        // 校验header大小        for (List<String> multiHeader : exchange.getRequest().getHeaders().values()) {            for (String value : multiHeader) {                headerSize += value.getBytes(StandardCharsets.UTF_8).length;            }        }        if (headerSize > ruleHandle.getHeaderMaxSize()) {            LOG.error("request header is too large");            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE, null);            return WebFluxResultUtils.result(exchange, error);        }                // 校验request大小        if (exchange.getRequest().getHeaders().getContentLength() > ruleHandle.getRequestMaxSize()) {            LOG.error("request entity is too large");            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE, null);            return WebFluxResultUtils.result(exchange, error);        }        // 获取服务列表upstreamList        List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());        if (CollectionUtils.isEmpty(upstreamList)) {            LOG.error("divide upstream configuration error: {}", rule);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);            return WebFluxResultUtils.result(exchange, error);        }        // 请求ip        String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();        // 实现负载均衡        Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip);        if (Objects.isNull(upstream)) {            LOG.error("divide has no upstream");            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);            return WebFluxResultUtils.result(exchange, error);        }        // 设置url        String domain = upstream.buildDomain();        exchange.getAttributes().put(Constants.HTTP_DOMAIN, domain);        // 设置超时时间        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());        // 设置重试策略        exchange.getAttributes().put(Constants.RETRY_STRATEGY, ruleHandle.getRetryStrategy());        exchange.getAttributes().put(Constants.LOAD_BALANCE, ruleHandle.getLoadBalance());        exchange.getAttributes().put(Constants.DIVIDE_SELECTOR_ID, selector.getId());        return chain.execute(exchange);    }

2.4 发起请求#

默认由WebClientPluginhttp服务发起调用请求,类继承关系如下:

  • ShenyuPlugin:顶层插件,定义插件方法;
  • AbstractHttpClientPlugin:抽象类,实现请求调用的公共逻辑;
  • WebClientPlugin:通过WebClient发起请求;
  • NettyHttpClientPlugin:通过Netty发起请求。

发起请求调用:

  • org.apache.shenyu.plugin.httpclient.AbstractHttpClientPlugin#execute()

execute()方法中发起请求调用:

  • 获取指定的超时时间,重试次数
  • 发起请求
  • 根据指定的重试策略进行失败后重试操作

public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class);
    @Override    public final Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        // 获取上下文信息        final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // 获取uri        final URI uri = exchange.getAttribute(Constants.HTTP_URI);        if (Objects.isNull(uri)) {            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_URL, null);            return WebFluxResultUtils.result(exchange, error);        }        // 获取指定的超时时间        final long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);        final Duration duration = Duration.ofMillis(timeout);        // 获取指定重试次数        final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);        // 获取指定的重试策略        final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName);        LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy is {}", uri.toASCIIString(), retryTimes, retryStrategy);        // 构建header        final HttpHeaders httpHeaders = buildHttpHeaders(exchange);        // 发起请求        final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethodValue(), uri, httpHeaders, exchange.getRequest().getBody())                .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))                .doOnError(e -> LOG.error(e.getMessage(), e));                // 重试策略CURRENT,对当前服务进行重试        if (RetryEnum.CURRENT.getName().equals(retryStrategy)) {            //old version of DividePlugin and SpringCloudPlugin will run on this            return response.retryWhen(Retry.anyOf(TimeoutException.class, ConnectTimeoutException.class, ReadTimeoutException.class, IllegalStateException.class)                    .retryMax(retryTimes)                    .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))                    .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))                    .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));        }                // 对其他服务进行重试        // 排除已经调用过的服务        final Set<URI> exclude = Sets.newHashSet(uri);        // 请求重试        return resend(response, exchange, duration, httpHeaders, exclude, retryTimes)                .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))                .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));    }
    private Mono<R> resend(final Mono<R> clientResponse,                           final ServerWebExchange exchange,                           final Duration duration,                           final HttpHeaders httpHeaders,                           final Set<URI> exclude,                           final int retryTimes) {        Mono<R> result = clientResponse;        // 根据指定的重试次数进行重试        for (int i = 0; i < retryTimes; i++) {            result = resend(result, exchange, duration, httpHeaders, exclude);        }        return result;    }
    private Mono<R> resend(final Mono<R> response,                           final ServerWebExchange exchange,                           final Duration duration,                           final HttpHeaders httpHeaders,                           final Set<URI> exclude) {        return response.onErrorResume(th -> {            final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID);            final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE);            //查询可用服务            final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId)                    .stream().filter(data -> {                        final String trimUri = data.getUrl().trim();                        for (URI needToExclude : exclude) {                            // exclude already called                            if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) {                                return false;                            }                        }                        return true;                    }).collect(Collectors.toList());            if (CollectionUtils.isEmpty(upstreamList)) {                // no need to retry anymore                return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));            }            // 请求ip            final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();            // 实现负载均衡            final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip);            if (Objects.isNull(upstream)) {                // no need to retry anymore                return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));            }            final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain());            // 排除已经调用的uri            exclude.add(newUri);             // 进行再次调用            return doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, httpHeaders, exchange.getRequest().getBody())                    .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))                    .doOnError(e -> LOG.error(e.getMessage(), e));        });    }
    //......}
  • org.apache.shenyu.plugin.httpclient.WebClientPlugin#doRequest()

doRequest()方法中通过webClient发起真正的请求调用。


@Override    protected Mono<ClientResponse> doRequest(final ServerWebExchange exchange, final String httpMethod, final URI uri,                                             final HttpHeaders httpHeaders, final Flux<DataBuffer> body) {        return webClient.method(HttpMethod.valueOf(httpMethod)).uri(uri) //请求uri                .headers(headers -> headers.addAll(httpHeaders)) // 请求header                .body(BodyInserters.fromDataBuffers(body))                .exchange() // 发起请求                .doOnSuccess(res -> {                    if (res.statusCode().is2xxSuccessful()) { // 成功                        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());                    } else { // 失败                        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());                    }                    exchange.getResponse().setStatusCode(res.statusCode());                    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);                });    }

2.5 处理响应结果#

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

响应结果由ResponsePlugin插件处理。

    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // 根据rpc类型处理结果        return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);    }

处理类型由MessageWriter决定,类继承关系如下:

  • MessageWriter:接口,定义消息处理方法;
  • NettyClientMessageWriter:处理Netty调用结果;
  • RPCMessageWriter:处理RPC调用结果;
  • WebClientMessageWriter:处理WebClient调用结果;

默认是通过WebCient发起http请求。

  • org.apache.shenyu.plugin.response.strategy.WebClientMessageWriter#writeWith()

writeWith()方法中处理响应结果。


    @Override    public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        return chain.execute(exchange).then(Mono.defer(() -> {            // 获取响应            ServerHttpResponse response = exchange.getResponse();            ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);            if (Objects.isNull(clientResponse)) {                Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);                return WebFluxResultUtils.result(exchange, error);            }            //获取cookies和headers            response.getCookies().putAll(clientResponse.cookies());            response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());            // image, pdf or stream does not do format processing.            // 处理特殊响应类型            if (clientResponse.headers().contentType().isPresent()) {                final String media = clientResponse.headers().contentType().get().toString().toLowerCase();                if (media.matches(COMMON_BIN_MEDIA_TYPE_REGEX)) {                    return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()))                            .doOnCancel(() -> clean(exchange));                }            }            // 处理一般响应类型            clientResponse = ResponseUtils.buildClientResponse(response, clientResponse.body(BodyExtractors.toDataBuffers()));            return clientResponse.bodyToMono(byte[].class)                    .flatMap(originData -> WebFluxResultUtils.result(exchange, originData))                    .doOnCancel(() -> clean(exchange));        }));    }

分析至此,关于Divide插件的源码分析就完成了,分析流程图如下:

3. 小结#

本文源码分析从http服务注册开始,到divide插件的服务调用。divide插件主要用来处理http请求。有些源码没有进入深入分析,比如负载均衡的实现,服务探活,将在后续继续分析。

McpServer 插件源码分析

· One min read
Apache ShenYu Contributor

在 shenyu 网关中,启动该插件,shenyu 将成为一个功能丰富的 mcpServer, 你可以通过简单配置来将一个服务作为 tool 注册到 shenyu 网关中,并使用网关提供的扩展功能。

本文基于shenyu-2.7.0.2版本进行源码分析, 在本篇中我将追踪 Shenyu Mcp 插件链路,对 Mcp 插件的 sse 通信方式进行源码分析

前言#

shenyu 网关的 mcp 插件基于 spring-ai-mcp 扩展而来,为了更好的了解 mcp 插件的工作原理 ,我将简单介绍 mcp 官方提供的 jdk 中各个 java 类是如何协同运作的

我想先简单介绍三个 Mcp 官方提供的 java 类

  1. McpServer

该类负责管理,tool,Resource,promote 等资源

  1. TransportProvider

根据客户端和服务端之间通信协议,提供之相对应的通讯方法

  1. Session

处理请求数据、响应数据和通知数据,提供一些基本方法和其对应的处理器,查询工具,调用工具都在此处执行

1. 服务注册#

在 shenyu admin 的 McpServer 中插件填写 endpoint 和 tool 信息后,这些信息将自动注册到 shenyu bootstrap 中, 数据同步源码可以参考官网websocket数据同步

shenyu bootstrap 将在 McpServerPluginDataHandlerhandler() 方法中接收到 admin 同步来的数据。

handlerSelector() 方法接收 url 数据创建 McpServer

handlerRule() 方法接收 tool 信息,注册 tool

这两个方法共同组成了 Shenyu Mcp 插件的服务注册部分,下面我将对这个两个方法,详细展开分析

1.1 Transport,McpServer注册#

我们先来分析 handlerSelector() 方法,也就是 McpServer 的注册

  • handlerSelector() 方法 工作内容如下
  1. 捕捉用户在 Selector 上的填写的 url,这个 url 将作为一个 key 存储 McpServer TransportProvider 等信息
  2. 序列化创建 ShenyuMcpServerShenyuMcpServer 将 SelectorId 和这些 url 也就是这些 key 绑定,以此来实现 selectorId 和 key 的绑定。

注意 ShenyuMcpServer 是 Shenyu 用于绑定 SelectorId 和 url 的对象,和 McpServer 没有继承关系,功能也完全不同

  1. 调用 ShenyuMcpServerManagergetOrCreateMcpServerTransport() 方法注册 McpServer TransportProvider
public class McpServerPluginDataHandler implements PluginDataHandler {    @Override    public void handlerSelector(final SelectorData selectorData) {        // 获取URI        String uri = selectorData.getConditionList().stream()                .filter(condition -> Constants.URI.equals(condition.getParamType()))                .map(ConditionData::getParamValue)                .findFirst()                .orElse(null);                // 构建McpServer        ShenyuMcpServer shenyuMcpServer = GsonUtils.getInstance().fromJson(Objects.isNull(selectorData.getHandle()) ? DEFAULT_MESSAGE_ENDPOINT : selectorData.getHandle(), ShenyuMcpServer.class);        shenyuMcpServer.setPath(path);        // 缓存shenyuMcpServer        CACHED_SERVER.get().cachedHandle(                selectorData.getId(),                shenyuMcpServer);        String messageEndpoint = shenyuMcpServer.getMessageEndpoint();        // 尝试获取或者注册transportProvider        shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint);    }    }

ShenyuMcpServerManager 该类是 ShenYu 中 McpServer 的管理中心,不仅储存了 McpAsyncServer CompositeTransportProvider 等内容,注册 Transport 和 McpServer 的方法也在其中

  • getOrCreateMcpServerTransport() 方法工作内容具体如下
  1. 处理传递来的 url 去除/streamablehttp 以及 /message后缀 使其恢复为原始的 url
  2. 尝试获取 CompositeTransportProvider 对象,该对象是 Transport 的复合对象,包含了多种协议对应的 Transport
  3. 如果没有获取到,则调用 createSseTransport() 方法创建 CompositeTransportProvider 对象
  4. 创建 McpAsyncServer 对象,保存 Transport 对象到 Map 中,将 Transport 注册到 McpAsyncServer
@Componentpublic class ShenyuMcpServerManager {    public ShenyuSseServerTransportProvider getOrCreateMcpServerTransport(final String uri, final String messageEndPoint) {        // 去除/streamablehttp 以及 /message后缀        String normalizedPath = processPath(uri);        return getOrCreateTransport(normalizedPath, SSE_PROTOCOL,                () -> createSseTransport(normalizedPath, messageEndPoint));    }        private <T> T getOrCreateTransport(final String normalizedPath, final String protocol,                                       final java.util.function.Supplier<T> transportFactory) {        // 获取复合Transport实例        CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(normalizedPath);
        T transport = switch (protocol) {            case SSE_PROTOCOL -> (T) compositeTransport.getSseTransport();            case STREAMABLE_HTTP_PROTOCOL -> (T) compositeTransport.getStreamableHttpTransport();            default -> null;        };        // 如果缓存中没有该实例,则需要重新创建        if (Objects.isNull(transport)) {            // 调用createSseTransport()方法,创建一个新的transport并存储            transport = transportFactory.get();            // 创建McpAsyncServer,并注册transport            addTransportToSharedServer(normalizedPath, protocol, transport);        }
        return transport;    }}
1.1.1 Transport注册#
  • createSseTransport() 方法

    该方法在 getOrCreateMcpServerTransport() 方法被调用,用于创建 Transport


@Componentpublic class ShenyuMcpServerManager {    private ShenyuSseServerTransportProvider createSseTransport(final String normalizedPath, final String messageEndPoint) {        String messageEndpoint = normalizedPath + messageEndPoint;        ShenyuSseServerTransportProvider transportProvider = ShenyuSseServerTransportProvider.builder()                .objectMapper(objectMapper)                .sseEndpoint(normalizedPath)                .messageEndpoint(messageEndpoint)                .build();        // 向Manager的routeMap中注册transportProvider的两个函数        registerRoutes(normalizedPath, messageEndpoint, transportProvider::handleSseConnection, transportProvider::handleMessage);        return transportProvider;    }}
1.1.2 mcpServer注册#
  • addTransportToSharedServer() 方法

    该方法在 getOrCreateMcpServerTransport() 方法被调用,用于创建 McpServer 并保存

该方法创建了一个新的 McpServer并存储到 sharedServerMap 中,并将上一步得到的 TransportProvider 存入 compositeTransportMap

@Componentpublic class ShenyuMcpServerManager {    private void addTransportToSharedServer(final String normalizedPath, final String protocol, final Object transportProvider) {        // 获取或者创建并注册 McpServer        getOrCreateSharedServer(normalizedPath);
        // 将新增的传输协议存进compositeTransportMap中        compositeTransport.addTransport(protocol, transportProvider);            }
    private McpAsyncServer getOrCreateSharedServer(final String normalizedPath) {        return sharedServerMap.computeIfAbsent(normalizedPath, path -> {            // 获取传输协议            CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(path);
            // 选择Server拥有的能力            var capabilities = McpSchema.ServerCapabilities.builder()                    .tools(true)                    .logging()                    .build();
            // 创建McpServer并存储            McpAsyncServer server = McpServer                    .async(compositeTransport)                    .serverInfo("MCP Shenyu Server (Multi-Protocol)", "1.0.0")                    .capabilities(capabilities)                    .tools(Lists.newArrayList())                    .build();                        return server;        });    }}

1.2 Tools注册#

  • handlerRule() 方法 工作内容如下
  1. 捕捉用户在 Tool 上的填写的 tool 配置信息,这些信息将全部用于 tool 的构建
  2. 序列化创建 ShenyuMcpServerTool 获取 tool 信息

注意 ShenyuMcpServerTool 也是 Shenyu 存储 tool 信息的工具,和 McpServerTool 没有继承关系

  1. 调用 addTool() 方法, 利用该 tool 信息创建 tool,并根据 SelectorId 将 tool 注册到与之匹配的 McpServer 中
public class McpServerPluginDataHandler implements PluginDataHandler {    @Override    public void handlerRule(final RuleData ruleData) {        Optional.ofNullable(ruleData.getHandle()).ifPresent(s -> {            // 序列化一个新的 ShenyuMcpServerTool            ShenyuMcpServerTool mcpServerTool = GsonUtils.getInstance().fromJson(s, ShenyuMcpServerTool.class);            // 缓存mcpServerTool            CACHED_TOOL.get().cachedHandle(CacheKeyUtils.INST.getKey(ruleData), mcpServerTool);            // 获取并构建 mcp schema            List<McpServerToolParameter> parameters = mcpServerTool.getParameters();            String inputSchema = JsonSchemaUtil.createParameterSchema(parameters);            ShenyuMcpServer server = CACHED_SERVER.get().obtainHandle(ruleData.getSelectorId());            if (Objects.nonNull(server)) {                // 向Manager的sharedServerMap中存储Tool信息                shenyuMcpServerManager.addTool(server.getPath(),                        StringUtils.isBlank(mcpServerTool.getName()) ? ruleData.getName()                                : mcpServerTool.getName(),                        mcpServerTool.getDescription(),                        mcpServerTool.getRequestConfig(),                        inputSchema);            }        });    }}
  • addTool()方法

    该方法被 handlerRule() 方法调用,用于新增工具

该方法做了下述工作

  1. 将上一步传来的 tool 信息转换为 shenyuToolDefinition 对象

  2. 利用转换来的 shenyuToolDefinition 对象创建 ShenyuToolCallback 对象

    ShenyuToolCallback 重写了 ToolCallBackcall() 方法,并将该 call() 方法注册到了 AsyncToolSpecification 中, 此后调用 tool 的 call() 方法,则实际会调用这个重写的 call() 方法

  3. ShenyuToolCallback 转换为 AsyncToolSpecification 并注册到相关的 mcpServer 中

public class McpServerPluginDataHandler implements PluginDataHandler {    public void addTool(final String serverPath, final String name, final String description,                        final String requestTemplate, final String inputSchema) {        String normalizedPath = normalizeServerPath(serverPath);        // 构建Definition对象        ToolDefinition shenyuToolDefinition = ShenyuToolDefinition.builder()                .name(name)                .description(description)                .requestConfig(requestTemplate)                .inputSchema(inputSchema)                .build();                ShenyuToolCallback shenyuToolCallback = new ShenyuToolCallback(shenyuToolDefinition);
        // 获取到先前注册的 McpServer, 并向其中注册Tool        McpAsyncServer sharedServer = sharedServerMap.get(normalizedPath);        for (AsyncToolSpecification asyncToolSpecification : McpToolUtils.toAsyncToolSpecifications(shenyuToolCallback)) {            sharedServer.addTool(asyncToolSpecification).block();        }            }}

到此为止,服务注册分析完毕

服务注册一图览

2. 插件调用#

客户端先后会发送后缀为 /sse/message 的两种消息,这两种消息都会被 Shenyu McpServer plugin 捕捉,Shenyu McpServer plugin 会对 /sse 消息和 /message 消息做不同处理。收到 /sse 消息时 plugin 会创建 session 对象并保存,最后返回 session id 供 message 消息使用。收到 /message 消息时,会根据 /message 消息携带的 method 信息,选择执行的方法 如:获取工作列表,工具调用,获取资源列表等等

  • doExecute() 方法 工作内容如下
  1. 路径匹配,判断 mcp plugin 是否注册该路径
  2. 调用 routeByProtocol() 方法,根据请求协议选择合适的处理方案

本篇是对 sse 请求方式的解析,因此接着进入 handleSseRequest() 方法

public class McpServerPlugin extends AbstractShenyuPlugin {    @Override    protected Mono<Void> doExecute(final ServerWebExchange exchange,                                   final ShenyuPluginChain chain,                                   final SelectorData selector,                                   final RuleData rule) {        final String uri = exchange.getRequest().getURI().getRawPath();        // 判断 Mcp 插件是否注册了该路由规则,没有则不执行        if (!shenyuMcpServerManager.canRoute(uri)) {            return chain.execute(exchange);        }        final ServerRequest request = ServerRequest.create(exchange, messageReaders);        // 根据 uri 判断路由协议,选择对应的处理方案        return routeByProtocol(exchange, chain, request, selector, uri);    }
    private Mono<Void> routeByProtocol(final ServerWebExchange exchange,                                       final ShenyuPluginChain chain,                                       final ServerRequest request,                                       final SelectorData selector,                                       final String uri) {
        if (isStreamableHttpProtocol(uri)) {            return handleStreamableHttpRequest(exchange, chain, request, uri);        } else if (isSseProtocol(uri)) {            // 处理sse请求            return handleSseRequest(exchange, chain, request, selector, uri);        }     }}

handlerSseRequest() 方法

该方法由 routeByProtocol() 方法调用,根据请求后缀判断客户端是要创建 session 还是调用工具

public class McpServerPlugin extends AbstractShenyuPlugin {    private Mono<Void> handleSseRequest(final ServerWebExchange exchange,                                        final ShenyuPluginChain chain,                                        final ServerRequest request,                                        final SelectorData selector,                                        final String uri) {        ShenyuMcpServer server = McpServerPluginDataHandler.CACHED_SERVER.get().obtainHandle(selector.getId());        String messageEndpoint = server.getMessageEndpoint();        // 获取传输者        ShenyuSseServerTransportProvider transportProvider                = shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint);        // 根据请求的后缀判断是 sse 连接请求还是 message 调用请求        if (uri.endsWith(messageEndpoint)) {            setupSessionContext(exchange, chain);            return handleMessageEndpoint(exchange, transportProvider, request);        } else {            return handleSseEndpoint(exchange, transportProvider, request);        }    }}

2.1 客户端发送 sse 请求#

如果我们客户端发送的是后缀为 /sse 的请求,那么将会执行 handleSseEndpoint() 方法

  • handleSseEndpoint() 方法主要做了如下工作
  1. 配置 sse 请求头
  2. 调用 ShenyuSseServerTransportProvidercreateSseFlux() 创建 sse 流
public class McpServerPlugin extends AbstractShenyuPlugin {    private Mono<Void> handleSseEndpoint(final ServerWebExchange exchange,                                         final ShenyuSseServerTransportProvider transportProvider,                                         final ServerRequest request) {        // 配置 sse 请求头        configureSseHeaders(exchange);
        // 创建 sse 流        return exchange.getResponse()                .writeWith(transportProvider                        .createSseFlux(request));    }}
  • createSseFlux() 方法

    该方法被 handleSseEndpoint()调用 主要用于创建并保存 session

  1. 创建 session ,创建 session 的工厂在创建 session 时会将一系列 handler 注册到 session 中,这些 handler 是真正执行 callTool 的对象
  2. 将 session 保存,session复用
  3. 将 session id 作为 endpoint 的请求参数返回给客户端,在调用 message 方法时会使用该 endpoint
public class ShenyuSseServerTransportProvider implements McpServerTransportProvider {    public Flux<ServerSentEvent<?>> createSseFlux(final ServerRequest request) {        return Flux.<ServerSentEvent<?>>create(sink -> {                    WebFluxMcpSessionTransport sessionTransport = new WebFluxMcpSessionTransport(sink);                    // 创建 McpServerSession 并暂存插件链信息                    McpServerSession session = sessionFactory.create(sessionTransport);                    String sessionId = session.getId();                    sessions.put(sessionId, session);
                    // 将 session id等信息传递回客户端                    String endpointUrl = this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId;                    ServerSentEvent<String> endpointEvent = ServerSentEvent.<String>builder()                            .event(ENDPOINT_EVENT_TYPE)                            .data(endpointUrl)                            .build();                }).doOnSubscribe(subscription -> LOGGER.info("SSE Flux subscribed"))                .doOnRequest(n -> LOGGER.debug("SSE Flux requested {} items", n));    }}

2.2 客户端发送 message 请求#

如果我们客户端发送的是后缀为 /message 的请求,那么将会执行 把当前 ShenyuPluginChain 信息存入 session 中,并调用 handleMessageEndpoint() 方法, 后续工具调用时会继续执行该插件链,因此 mcp plugin 后的插件会对进入 tool 的请求造成影响

  • handleMessageEndpoint() 方法,调用 ShenyuSseServerTransportProviderhandleMessageEndpoint() 方法
if (uri.endsWith(messageEndpoint)) {       setupSessionContext(exchange, chain);       return handleMessageEndpoint(exchange, transportProvider, request);} 
public class McpServerPlugin extends AbstractShenyuPlugin {    private Mono<Void> handleMessageEndpoint(final ServerWebExchange exchange,                                             final ShenyuSseServerTransportProvider transportProvider,                                             final ServerRequest request) {        // 处理message请求        return transportProvider.handleMessageEndpoint(request)                .flatMap(result -> {                    return exchange.getResponse()                            .writeWith(Mono.just(exchange.getResponse().bufferFactory().wrap(responseBody.getBytes())));                });    }}
  • handleMessageEndpoint() 方法

    该方法由 McpServerPlugin.handleMessageEndpoint() 调用,将请求交给 session 处理

session 的 handler() 方法会对 message 的不同,而进行对应的操作 例如 : 当 message 中 method 是 "tools/call" 时,则会使用工具调用的 handler() 执行 call() 方法调用工具 相关源码在此不过多赘述

public class ShenyuSseServerTransportProvider implements McpServerTransportProvider {    public Mono<MessageHandlingResult> handleMessageEndpoint(final ServerRequest request) {        // 获取到session        String sessionId = request.queryParam("sessionId").get();        McpServerSession session = sessions.get(sessionId);        return request.bodyToMono(String.class)                .flatMap(body -> {                    McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);                    return session.handle(message);                });    }}

至此 Shenyu Mcp Plugin 服务调用源码分析完毕

流程图一览

3. 工具调用#

如果客户端传递的消息是调用工具的消息,那么 session 将使用工具调用的 handler() 并执行 tool 的 call() 方法, 在服务注册中,我们说明了 tool 在被调用时,实际执行的是 ShenyuToolCallback()call() 方法

因此执行工具调用时会执行以下方法

  • call() 主要工作内容如下
  1. 获取 session id
  2. 获取 requestTemplate 即 shenyu 提供的额外功能的配置信息
  3. 获取上一步暂存的 shenyu 插件链,并将工具调用的信息交给插件链继续执行
  4. 异步等待工具响应

插件链执行完成后,会将调用 tool 请求真正的发送到 tool 所在的服务之中

public class ShenyuToolCallback implements ToolCallback {    @NonNull    @Override    public String call(@NonNull final String input, final ToolContext toolContext) {        // 从 mcp 请求中提取 sessionId        final McpSyncServerExchange mcpExchange = extractMcpExchange(toolContext);        final String sessionId = extractSessionId(mcpExchange);        // 提取requestTemplate信息        final String configStr = extractRequestConfig(shenyuTool);
        // 利用sessionId 获取到先前暂存的插件执行链        final ServerWebExchange originExchange = getOriginExchange(sessionId);        final ShenyuPluginChain chain = getPluginChain(originExchange);
        // 执行工具调用        return executeToolCall(originExchange, chain, sessionId, configStr, input);
    }
    private String executeToolCall(final ServerWebExchange originExchange,                                   final ShenyuPluginChain chain,                                   final String sessionId,                                   final String configStr,                                   final String input) {
        final CompletableFuture<String> responseFuture = new CompletableFuture<>();        final ServerWebExchange decoratedExchange = buildDecoratedExchange(                originExchange, responseFuture, sessionId, configStr, input);        // 执行插件链,调用实际工具        chain.execute(decoratedExchange)                .subscribe();
        // 等待响应        final String result = responseFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);        return result;
    }}

至此Shenyu MCP Plugin 工具调用分析完毕


4. 小结#

本文源码分析从 mcp 服务注册开始,到 mcp 插件的服务调用,再到 tool 的调用。 mcpServer 插件让 shenyu 成为一个功能强大,集中管理的 mcpServer。