Skip to main content

Dubbo插件源码分析

· One min read
Apache ShenYu Committer

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

Apache ShenYu 网关使用 dubbo 插件完成对 dubbo服务的调用。你可以查看官方文档 Dubbo快速开始 了解如何使用该插件。

本文基于shenyu-2.4.3版本进行源码分析,官网的介绍请参考 Dubbo服务接入

1. 服务注册#

以官网提供的例子为例 shenyu-examples-dubbo 。 假如你的dubbo服务定义如下(spring-dubbo.xml):

<beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"       xsi:schemaLocation="http://www.springframework.org/schema/beans       http://www.springframework.org/schema/beans/spring-beans.xsd       http://code.alibabatech.com/schema/dubbo       https://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    <dubbo:application name="test-dubbo-service"/>    <dubbo:registry address="${dubbo.registry.address}"/>    <dubbo:protocol name="dubbo" port="20888"/>
    <dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>
</beans>

声明应用服务名称,注册中心地址,使用dubbo协议,声明服务接口,对应接口实现类:

/** * DubboTestServiceImpl. */@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService {        @Override    @ShenyuDubboClient(path = "/findById", desc = "Query by Id")    public DubboTest findById(final String id) {        return new DubboTest(id, "hello world shenyu Apache, findById");    }
    //......}

在接口实现类中,使用注解@ShenyuDubboClientshenyu-admin注册服务。该注解的作用及原理,稍后再进行分析。

在配置文件application.yml中的配置信息:

server:  port: 8011  address: 0.0.0.0  servlet:    context-path: /spring:  main:    allow-bean-definition-overriding: truedubbo:  registry:    address: zookeeper://localhost:2181  # dubbo使用的注册中心    shenyu:  register:    registerType: http #注册方式    serverLists: http://localhost:9095 #注册地址    props:      username: admin       password: 123456  client:    dubbo:      props:        contextPath: /dubbo          appName: dubbo

在配置文件中,声明dubbo使用的注册中心地址,dubbo服务向shenyu-admin注册,使用的方式是http,注册地址是http://localhost:9095

关于注册方式的使用,请参考 应用客户端接入

1.1 声明注册接口#

使用注解@ShenyuDubboClient将服务注册到网关。简单demo如下:

// dubbo服务@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService {        @Override    @ShenyuDubboClient(path = "/findById", desc = "Query by Id") // 需要注册的方法    public DubboTest findById(final String id) {        return new DubboTest(id, "hello world shenyu Apache, findById");    }
    //......}

注解定义:

/** * 作用于类和方法上 */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})@Inheritedpublic @interface ShenyuDubboClient {        //注册路径    String path();        //规则名称    String ruleName() default "";       //描述信息    String desc() default "";
    //是否启用    boolean enabled() default true;}

1.2 扫描注解信息#

注解扫描通过ApacheDubboServiceBeanListener完成,它实现了ApplicationListener<ContextRefreshedEvent>接口,在Spring容器启动过程中,发生上下文刷新事件时,开始执行事件处理方法onApplicationEvent()

在构造器实例化的过程中:

  • 读取属性配置
  • 开启线程池
  • 启动注册中心,用于向shenyu-admin注册
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {
    // ......
    //构造器    public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        //1.读取属性配置        Properties props = clientConfig.getProps();        String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);        String appName = props.getProperty(ShenyuClientConstants.APP_NAME);        if (StringUtils.isBlank(contextPath)) {            throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName");        }        this.contextPath = contextPath;        this.appName = appName;        this.host = props.getProperty(ShenyuClientConstants.HOST);        this.port = props.getProperty(ShenyuClientConstants.PORT);        //2.开启线程池        executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build());        //3.启动注册中心        publisher.start(shenyuClientRegisterRepository);    }
    /**     * 上下文刷新事件,执行方法逻辑     */    @Override    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {        //......    }
  • ApacheDubboServiceBeanListener#onApplicationEvent()

重写的方法逻辑:读取Dubbo服务ServiceBean,构建元数据对象和URI对象,并向shenyu-admin注册。

    @Override    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {        //读取ServiceBean        Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class);        if (serviceBean.isEmpty()) {            return;        }        //保证该方法只执行一次        if (!registered.compareAndSet(false, true)) {            return;        }        //处理元数据对象        for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) {            handler(entry.getValue());        }        //处理URI对象        serviceBean.values().stream().findFirst().ifPresent(bean -> {            publisher.publishEvent(buildURIRegisterDTO(bean));        });    }
  • handler()

    handler()方法中,从serviceBean中读取所有方法,判断方法上是否有ShenyuDubboClient注解,如果存在就构建元数据对象,并通过注册中心,向shenyu-admin注册该方法。

    private void handler(final ServiceBean<?> serviceBean) {        //获取代理对象        Object refProxy = serviceBean.getRef();        //获取class信息        Class<?> clazz = refProxy.getClass();        if (AopUtils.isAopProxy(refProxy)) {            clazz = AopUtils.getTargetClass(refProxy);        }        //获取所有方法        Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);        for (Method method : methods) {            //读取ShenyuDubboClient注解信息            ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class);            if (Objects.nonNull(shenyuDubboClient)) {                //构建元数据对象,并注册                publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method));            }        }    }
  • buildMetaDataDTO()

    构建元数据对象,在这里构建方法注册的必要信息,后续用于选择器或规则匹配。

    private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) {        //应用名称        String appName = buildAppName(serviceBean);        //方法路径        String path = contextPath + shenyuDubboClient.path();        //描述信息        String desc = shenyuDubboClient.desc();        //服务名称        String serviceName = serviceBean.getInterface();        //规则名称        String configRuleName = shenyuDubboClient.ruleName();        String ruleName = ("".equals(configRuleName)) ? path : configRuleName;        //方法名称        String methodName = method.getName();        //参数类型        Class<?>[] parameterTypesClazz = method.getParameterTypes();        String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(","));        return MetaDataRegisterDTO.builder()                .appName(appName)                .serviceName(serviceName)                .methodName(methodName)                .contextPath(contextPath)                .host(buildHost())                .port(buildPort(serviceBean))                .path(path)                .ruleName(ruleName)                .pathDesc(desc)                .parameterTypes(parameterTypes)                .rpcExt(buildRpcExt(serviceBean)) //dubbo服务的扩展信息                .rpcType(RpcTypeEnum.DUBBO.getName())                .enabled(shenyuDubboClient.enabled())                .build();    }
  • buildRpcExt()

    dubbo服务的扩展信息

       private String buildRpcExt(final ServiceBean serviceBean) {       DubboRpcExt build = DubboRpcExt.builder()               .group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//分组               .version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//版本               .loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//负载均衡策略,默认随机               .retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//重试次数,默认2               .timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//超时,默认3000               .sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent,默认false               .cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//集群策略,默认failover               .url("")               .build();       return GsonUtils.getInstance().toJson(build);   }
  • buildURIRegisterDTO()

    构建URI对象,注册服务本身的信息,后续可用于服务探活。

private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) {        return URIRegisterDTO.builder()                .contextPath(this.contextPath) //上下文路径                .appName(buildAppName(serviceBean))//应用名称                .rpcType(RpcTypeEnum.DUBBO.getName())//rpc类型:dubbo                .host(buildHost()) //host                .port(buildPort(serviceBean))//port                .build(); }

具体的注册逻辑由注册中心实现,请参考 客户端接入原理

//向注册中心,发布注册事件   publisher.publishEvent();

1.3 处理注册信息#

客户端通过注册中心注册的元数据和URI数据,在shenyu-admin端进行处理,负责存储到数据库和同步给shenyu网关。Dubbo插件的客户端注册处理逻辑在ShenyuClientRegisterDubboServiceImpl中。继承关系如下:

  • ShenyuClientRegisterService:客户端注册服务,顶层接口;
  • FallbackShenyuClientRegisterService:注册失败,提供重试操作;
  • AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
  • ShenyuClientRegisterDubboServiceImpl:实现Dubbo插件的注册;
1.3.1 注册服务#
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    客户端通过注册中心注册的元数据MetaDataRegisterDTO对象在shenyu-adminregister()方法被接送到。

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        String selectorHandler = selectorHandler(dto);        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);        //2. 注册规则        String ruleHandler = ruleHandler();        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        ruleService.registerDefault(ruleDTO);        //3. 注册元数据        registerMetadata(dto);        //4. 注册ContextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
1.3.1.1 注册选择器#
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

构建contextPath,查找选择器信息是否存在,如果存在就返回id;不存在就创建默认的选择器信息。

    @Override    public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {        // 构建contextPath        String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());        // 通过名称查找选择器信息是否存在        SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);        if (Objects.isNull(selectorDO)) {            // 不存在就创建默认的选择器信息            return registerSelector(contextPath, pluginName, selectorHandler);        }        return selectorDO.getId();    }
  • 默认选择器信息

    在这里构建默认选择器信息及其条件属性。

   //注册选择器   private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {        //构建选择器        SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());        selectorDTO.setHandle(selectorHandler);        //注册默认选择器        return registerDefault(selectorDTO);    }     //构建选择器    private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {        //构建默认选择器        SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);        selectorDTO.setPluginId(pluginId);         //构建默认选择器的条件属性        selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));        return selectorDTO;    }
  • 构建默认选择器
private SelectorDTO buildDefaultSelectorDTO(final String name) {    return SelectorDTO.builder()            .name(name) // 名称            .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义            .matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and            .enabled(Boolean.TRUE)  //默认启开启            .loged(Boolean.TRUE)  //默认记录日志            .continued(Boolean.TRUE) //默认继续后续选择器            .sort(1) //默认顺序1            .build();}
  • 构建默认选择器条件属性
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {    SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();    selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI    selectorConditionDTO.setParamName("/");    selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match    selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**    return Collections.singletonList(selectorConditionDTO);}
  • 注册默认选择器
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) {    //选择器信息    SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);    //选择器条件属性    List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();    if (StringUtils.isEmpty(selectorDTO.getId())) {        // 向数据库插入选择器信息        selectorMapper.insertSelective(selectorDO);          // 向数据库插入选择器条件属性        selectorConditionDTOs.forEach(selectorConditionDTO -> {            selectorConditionDTO.setSelectorId(selectorDO.getId());            selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));        });    }    // 发布同步事件,向网关同步选择信息及其条件属性    publishEvent(selectorDO, selectorConditionDTOs);    return selectorDO.getId();}
1.3.1.2 注册规则#

在注册服务的第二步中,开始构建默认规则,然后注册规则。

@Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        // 默认规则处理属性        String ruleHandler = ruleHandler();        // 构建默认规则信息        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        // 注册规则        ruleService.registerDefault(ruleDTO);                //3. 注册元数据        //......                //4. 注册ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • 默认规则处理属性
    @Override    protected String ruleHandler() {        // 默认规则处理属性        return new DubboRuleHandle().toJson();    }

Dubbo插件默认规则处理属性

public class DubboRuleHandle implements RuleHandle {
    /**     * dubbo服务版本信息.     */    private String version;
    /**     * 分组.     */    private String group;
    /**     * 重试次数.     */    private Integer retries = 0;
    /**     * 负载均衡策略:默认随机     */    private String loadbalance = LoadBalanceEnum.RANDOM.getName();
    /**     * 超时,默认3000     */    private long timeout = Constants.TIME_OUT;}
  • 构建默认规则信息
  // 构建默认规则信息    private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {        return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());    }   //  构建默认规则信息    private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {        RuleDTO ruleDTO = RuleDTO.builder()                .selectorId(selectorId) //关联的选择器id                .name(ruleName) //规则名称                .matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and                .enabled(Boolean.TRUE) // 默认开启                .loged(Boolean.TRUE) //默认记录日志                .sort(1) //默认顺序 1                .handle(ruleHandler)                .build();        RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()                .paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI                .paramName("/")                .paramValue(path) //参数值path                .build();        if (path.indexOf("*") > 1) {            ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match        } else {            ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 =         }        ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));        return ruleDTO;    }
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。


    @Override    public String registerDefault(final RuleDTO ruleDTO) {        RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());        if (Objects.nonNull(exist)) {            return "";        }
        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);        List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();        if (StringUtils.isEmpty(ruleDTO.getId())) {            // 向数据库插入规则信息            ruleMapper.insertSelective(ruleDO);            //向数据库插入规则体条件属性            ruleConditions.forEach(ruleConditionDTO -> {                ruleConditionDTO.setRuleId(ruleDO.getId());                ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));            });        }        // 向网关发布事件,进行数据同步        publishEvent(ruleDO, ruleConditions);        return ruleDO.getId();    }
1.3.1.3 注册元数据#

元数据主要用于RPC服务的调用。

   @Override    public String register(final MetaDataRegisterDTO dto) {        //1. 注册选择器        //......                //2. 注册规则        //......                //3. 注册元数据        registerMetadata(dto);                //4. 注册ContextPath        //......                return ShenyuResultMessage.SUCCESS;    }
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDubboServiceImpl#registerMetadata()

    插入或更新元数据,然后发布同步事件到网关。

    @Override    protected void registerMetadata(final MetaDataRegisterDTO dto) {            // 获取metaDataService            MetaDataService metaDataService = getMetaDataService();            // 元数据是否存在            MetaDataDO exist = metaDataService.findByPath(dto.getPath());            // 插入或更新元数据            metaDataService.saveOrUpdateMetaData(exist, dto);    }
    @Override    public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {        DataEventTypeEnum eventType;        // 数据类型转换 DTO->DO        MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);        // 插入数据        if (Objects.isNull(exist)) {            Timestamp currentTime = new Timestamp(System.currentTimeMillis());            metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());            metaDataDO.setDateCreated(currentTime);            metaDataDO.setDateUpdated(currentTime);            metaDataMapper.insert(metaDataDO);            eventType = DataEventTypeEnum.CREATE;        } else {            // 更新数据            metaDataDO.setId(exist.getId());            metaDataMapper.update(metaDataDO);            eventType = DataEventTypeEnum.UPDATE;        }        // 发布同步事件到网关        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,                Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));    }
1.3.2 注册URI#
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

服务端收到客户端注册的URI信息后,进行处理。

    @Override    public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {        String result;        String key = key(selectorName);        try {            this.removeFallBack(key);            // 注册URI            result = this.doRegisterURI(selectorName, uriList);            logger.info("Register success: {},{}", selectorName, uriList);        } catch (Exception ex) {            logger.warn("Register exception: cause:{}", ex.getMessage());            result = "";            // 注册失败后,进行重试            this.addFallback(key, new FallbackHolder(selectorName, uriList));        }        return result;    }
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

从客户端注册的URI中获取有效的URI,更新对应的选择器handle属性,向网关发送选择器更新事件。

@Override    public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {        //参数检查        if (CollectionUtils.isEmpty(uriList)) {            return "";        }        //获取选择器信息        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        if (Objects.isNull(selectorDO)) {            throw new ShenyuException("doRegister Failed to execute,wait to retry.");        }        // 获取有效的URI        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());        // 构建选择器的handle属性        String handler = buildHandle(validUriList, selectorDO);        if (handler != null) {            selectorDO.setHandle(handler);            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));            selectorData.setHandle(handler);            // 向数据库更新选择器的handle属性            selectorService.updateSelective(selectorDO);            // 向网关发送选择器更新事件            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));        }        return ShenyuResultMessage.SUCCESS;    }

关于服务注册的源码分析就以及完成了,分析流程图如下:

接下来就分析dubbo插件是如何根据这些信息向http服务发起调用。

2. 服务调用#

dubbo插件是ShenYu网关用于将http请求转成 dubbo协议,调用dubbo服务的核心处理插件。

以官网提供的案例 Dubbo快速开始 为例,一个dubbo服务通过注册中心向shenyu-admin注册后,通过ShenYu网关代理,请求如下:

GET http://localhost:9195/dubbo/findById?id=100Accept: application/json

Dubbo插件中,类继承关系如下:

  • ShenyuPlugin:顶层接口,定义接口方法;
  • AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
  • AbstractDubboPlugin:dubbo插件抽象类,实现dubbo共有逻辑;
  • ApacheDubboPlugin:ApacheDubbo插件。

ShenYu网关支持ApacheDubbo和AlibabaDubbo

2.1 接收请求#

通过ShenYu网关代理后,请求入口是ShenyuWebHandler,它实现了org.springframework.web.server.WebHandler接口。

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {    //......        /**     * 处理web请求     */    @Override    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {       // 执行默认插件链        Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);        if (scheduled) {            return execute.subscribeOn(scheduler);        }        return execute;    }        private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
        private int index;
        private final List<ShenyuPlugin> plugins;
        /**         * 实例化默认插件链         */        DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {            this.plugins = plugins;        }
        /**         * 执行每个插件.         */        @Override        public Mono<Void> execute(final ServerWebExchange exchange) {            return Mono.defer(() -> {                if (this.index < plugins.size()) {                    // 获取当前执行插件                    ShenyuPlugin plugin = plugins.get(this.index++);                    // 是否跳过当前插件                    boolean skip = plugin.skip(exchange);                    if (skip) {                        // 如果跳过就执行下一个                        return this.execute(exchange);                    }                    // 执行当前插件                    return plugin.execute(exchange, this);                }                return Mono.empty();            });        }    }}

2.2 匹配规则#

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

execute()方法中执行选择器和规则的匹配逻辑。

  • 匹配选择器;
  • 匹配规则;
  • 执行插件。
@Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        // 插件名称        String pluginName = named();        // 插件信息        PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);        if (pluginData != null && pluginData.getEnabled()) {            // 选择器信息            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);            if (CollectionUtils.isEmpty(selectors)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            // 匹配选择器            SelectorData selectorData = matchSelector(exchange, selectors);            if (Objects.isNull(selectorData)) {                return handleSelectorIfNull(pluginName, exchange, chain);            }            selectorLog(selectorData, pluginName);            // 规则信息            List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());            if (CollectionUtils.isEmpty(rules)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            // 匹配规则            RuleData rule;            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {                //get last                rule = rules.get(rules.size() - 1);            } else {                rule = matchRule(exchange, rules);            }            if (Objects.isNull(rule)) {                return handleRuleIfNull(pluginName, exchange, chain);            }            ruleLog(rule, pluginName);            // 执行插件            return doExecute(exchange, chain, selectorData, rule);        }        return chain.execute(exchange);    }

2.3 执行GlobalPlugin#

  • org.apache.shenyu.plugin.global.GlobalPlugin#execute()

GlobalPlugin是一个全局插件,在execute()方法中构建上下文信息。

public class GlobalPlugin implements ShenyuPlugin {    // 构建上下文信息    private final ShenyuContextBuilder builder;        //......        @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {       // 构建上下文信息,传入到 exchange 中        ShenyuContext shenyuContext = builder.build(exchange);        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);        return chain.execute(exchange);    }        //......}
  • org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()

构建默认的上下文信息。

public class DefaultShenyuContextBuilder implements ShenyuContextBuilder {    //......        @Override    public ShenyuContext build(final ServerWebExchange exchange) {        //构建参数        Pair<String, MetaData> buildData = buildData(exchange);        //包装ShenyuContext        return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());    }        private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {        //......        //根据请求的uri获取元数据        MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());        if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {            exchange.getAttributes().put(Constants.META_DATA, metaData);            return Pair.of(metaData.getRpcType(), metaData);        } else {            return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());        }    }    //设置默认的上下文信息    private ShenyuContext buildDefaultContext(final ServerHttpRequest request) {        String appKey = request.getHeaders().getFirst(Constants.APP_KEY);        String sign = request.getHeaders().getFirst(Constants.SIGN);        String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);        ShenyuContext shenyuContext = new ShenyuContext();        String path = request.getURI().getPath();        shenyuContext.setPath(path); //请求路径        shenyuContext.setAppKey(appKey);        shenyuContext.setSign(sign);        shenyuContext.setTimestamp(timestamp);        shenyuContext.setStartDateTime(LocalDateTime.now());        Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name()));//请求方法        return shenyuContext;    } }
  • org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()

包装ShenyuContext

public class DubboShenyuContextDecorator implements ShenyuContextDecorator {        @Override    public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) {        shenyuContext.setModule(metaData.getAppName());//获取AppName        shenyuContext.setMethod(metaData.getServiceName()); //获取ServiceName        shenyuContext.setContextPath(metaData.getContextPath()); //获取contextPath        shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName()); // dubbo服务        return shenyuContext;    }        @Override    public String rpcType() {        return RpcTypeEnum.DUBBO.getName();    }}

2.4 执行RpcParamTransformPlugin#

RpcParamTransformPlugin负责从http请求中读取参数,保存到exchange中,传递给rpc服务。

  • org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()

execute()方法中,执行该插件的核心逻辑:从exchange中获取请求信息,根据请求传入的内容形式处理参数。

public class RpcParamTransformPlugin implements ShenyuPlugin {
    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        //从exchange中获取请求信息        ServerHttpRequest request = exchange.getRequest();        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        if (Objects.nonNull(shenyuContext)) {           // 如果请求参数格式是APPLICATION_JSON            MediaType mediaType = request.getHeaders().getContentType();            if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {                return body(exchange, request, chain);            }            // 如果请求参数格式是APPLICATION_FORM_URLENCODED            if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {                return formData(exchange, request, chain);            }            //一般查询请求            return query(exchange, request, chain);        }        return chain.execute(exchange);    }        // 如果请求参数格式是APPLICATION_JSON    private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())                .flatMap(body -> {                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中                    return chain.execute(exchange);                }));    }   // 如果请求参数格式是APPLICATION_FORM_URLENCODED    private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())                .flatMap(map -> {                    String param = resolveBodyFromRequest(map);                    LinkedMultiValueMap<String, String> linkedMultiValueMap;                    try {                        linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据                    } catch (UnsupportedEncodingException e) {                        return Mono.error(e);                    }                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中                    return chain.execute(exchange);                }));    }    //一般查询请求    private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {        exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中        return chain.execute(exchange);    }    //...... }

2.5 执行DubboPlugin#

  • org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()

doExecute()方法中,主要是检查元数据和参数。

public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin {        @Override    public Mono<Void> doExecute(final ServerWebExchange exchange,                                   final ShenyuPluginChain chain,                                   final SelectorData selector,                                   final RuleData rule) {        //获取参数        String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);        //获取上下文信息        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        //获取元数据        MetaData metaData = exchange.getAttribute(Constants.META_DATA);        //检查元数据        if (!checkMetaData(metaData)) {            LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null);            return WebFluxResultUtils.result(exchange, error);        }        //检查元数据和参数        if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null);            return WebFluxResultUtils.result(exchange, error);        }        //设置rpcContext        this.rpcContext(exchange);        //进行dubbo服务调用        return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);    }}
  • org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()

doDubboInvoker()方法中设置特殊的上下文信息,然后开始dubbo的泛化调用。

public class ApacheDubboPlugin extends AbstractDubboPlugin {        @Override    protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,                                        final ShenyuPluginChain chain,                                        final SelectorData selector,                                        final RuleData rule,                                        final MetaData metaData,                                        final String param) {        //设置当前的选择器和规则信息,以及请求地址,用于支持dubbo的灰度        RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());        RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());        RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());        //dubbo的泛化调用        final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);        //执行下一个插件        return result.then(chain.execute(exchange));    }}
  • org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()

genericInvoker()方法:

  • 获取ReferenceConfig对象;
  • 获取泛化服务GenericService对象;
  • 构造请求参数pair对象;
  • 发起异步的泛化调用。
public class ApacheDubboProxyService {    //...... 
    /**     * Generic invoker object.     */    public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {        //1.获取ReferenceConfig对象        ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath());        //如果没有获取到        if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {            //失效当前缓存的信息            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());            //使用元数据进行再次初始化            reference = ApacheDubboConfigCache.getInstance().initRef(metaData);        }        //2.获取泛化服务GenericService对象        GenericService genericService = reference.get();        //3.构造请求参数pair对象        Pair<String[], Object[]> pair;        if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) {            pair = new ImmutablePair<>(new String[]{}, new Object[]{});        } else {            pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());        }        //4.发起异步的泛化调用        return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {            //处理结果            if (Objects.isNull(ret)) {                ret = Constants.DUBBO_RPC_RESULT_EMPTY;            }            exchange.getAttributes().put(Constants.RPC_RESULT, ret);            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());            return ret;        })).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常    }        //泛化调用,异步操作    private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {        genericService.$invoke(method, parameterTypes, args);        Object resultFromFuture = RpcContext.getContext().getFuture();        return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);    }}

通过泛化调用就可以实现在网关调用dubbo服务了。

ReferenceConfig对象是支持泛化调用的关键对象 ,它的初始化操作是在数据同步的时候完成的。这里涉及两部分数据,一是同步的插件handler信息,二是同步的插件元数据信息。

  • org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()

当插件数据更新时,数据同步模块会将数据从shenyu-admin同步到网关。在handlerPlugin()中执行初始化操作。

public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {    //......        //初始化配置缓存   protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);
    @Override    public void handlerPlugin(final PluginData pluginData) {        if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {            //数据反序列化            DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);            DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);            if (Objects.isNull(dubboRegisterConfig)) {                return;            }            if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {                // 执行初始化操作                this.initConfigCache(dubboRegisterConfig);            }            Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);        }    }    //......}
  • org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()

执行初始化操作。

public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {
    @Override    protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {        //执行初始化操作        ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);        //失效之前缓存的结果        ApacheDubboConfigCache.getInstance().invalidateAll();    }}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()

在初始化中,设置registryConfigconsumerConfig

public final class ApacheDubboConfigCache extends DubboConfigCache {    //......     /**     * 初始化     */    public void init(final DubboRegisterConfig dubboRegisterConfig) {        //创建ApplicationConfig        if (Objects.isNull(applicationConfig)) {            applicationConfig = new ApplicationConfig("shenyu_proxy");        }        //协议或者地址发生改变时,需要更新registryConfig        if (needUpdateRegistryConfig(dubboRegisterConfig)) {            RegistryConfig registryConfigTemp = new RegistryConfig();            registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol());            registryConfigTemp.setId("shenyu_proxy");            registryConfigTemp.setRegister(false);            registryConfigTemp.setAddress(dubboRegisterConfig.getRegister());            Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup);            registryConfig = registryConfigTemp;        }        //创建ConsumerConfig        if (Objects.isNull(consumerConfig)) {            consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> {                ConsumerConfig consumerConfig = new ConsumerConfig();                consumerConfig.refresh();                return consumerConfig;            });            //设置ConsumerConfig            Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool);             Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads);             Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads);             Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues);        }    }        //是否需要更新注册配置    private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) {        if (Objects.isNull(registryConfig)) {            return true;        }        return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol())                || !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress())                || !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol());    }
    //......}
  • org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()

当元数据更新时,数据同步模块会将数据从shenyu-admin同步到网关。在onSubscribe()方法中执行元数据更新操作。

public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber {    //本地内存缓存    private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();
    //元数据发生更新    public void onSubscribe(final MetaData metaData) {        // dubbo服务的元数据更新        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {            //对应的元数据是否存在            MetaData exist = META_DATA.get(metaData.getPath());            if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) {                // 首次初始化                ApacheDubboConfigCache.getInstance().initRef(metaData);            } else {                // 对应的元数据发生了更新操作                if (!Objects.equals(metaData.getServiceName(), exist.getServiceName())                        || !Objects.equals(metaData.getRpcExt(), exist.getRpcExt())                        || !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes())                        || !Objects.equals(metaData.getMethodName(), exist.getMethodName())) {                    //根据最新的元数据再次构建ReferenceConfig                    ApacheDubboConfigCache.getInstance().build(metaData);                }            }            //本地内存缓存            META_DATA.put(metaData.getPath(), metaData);        }    }
    //删除元数据    public void unSubscribe(final MetaData metaData) {        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {            //使ReferenceConfig失效            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());            META_DATA.remove(metaData.getPath());        }    }}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()

通过metaData构建ReferenceConfig对象。

public final class ApacheDubboConfigCache extends DubboConfigCache {    //......        public ReferenceConfig<GenericService> initRef(final MetaData metaData) {            try {                //先尝试从缓存中获取,存在就直接返回                ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());                if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {                    return referenceConfig;                }            } catch (ExecutionException e) {                LOG.error("init dubbo ref exception", e);            }           //不存在,就构建            return build(metaData);        }
        /**         * Build reference config.         */        @SuppressWarnings("deprecation")        public ReferenceConfig<GenericService> build(final MetaData metaData) {            if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) {                return new ReferenceConfig<>();            }            ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //新建ReferenceConfig            reference.setGeneric("true"); //泛化调用            reference.setAsync(true);//支持异步
            reference.setApplication(applicationConfig);//设置应用配置            reference.setRegistry(registryConfig);//设置注册中心配置            reference.setConsumer(consumerConfig);//设置消费者配置            reference.setInterface(metaData.getServiceName());//设置服务接口            reference.setProtocol("dubbo");//设置dubbo协议            reference.setCheck(false); //不检查 service provider            reference.setLoadbalance("gray");//支持灰度
            Map<String, String> parameters = new HashMap<>(2);            parameters.put("dispatcher", "direct");            reference.setParameters(parameters);//自定义参数
            String rpcExt = metaData.getRpcExt();//rpc扩展参数            DubboParam dubboParam = parserToDubboParam(rpcExt);//反序列化            if (Objects.nonNull(dubboParam)) {                if (StringUtils.isNoneBlank(dubboParam.getVersion())) {                    reference.setVersion(dubboParam.getVersion());//设置版本                }                if (StringUtils.isNoneBlank(dubboParam.getGroup())) {                    reference.setGroup(dubboParam.getGroup());//设置分组                }                if (StringUtils.isNoneBlank(dubboParam.getUrl())) {                    reference.setUrl(dubboParam.getUrl());//设置url                }                if (StringUtils.isNoneBlank(dubboParam.getCluster())) {                    reference.setCluster(dubboParam.getCluster());//设置Cluster type                }                Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout                Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires                Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent            }            try {                //获取GenericService                Object obj = reference.get();                if (Objects.nonNull(obj)) {                    LOG.info("init apache dubbo reference success there meteData is :{}", metaData);                    //缓存当前的reference                    cache.put(metaData.getPath(), reference);                }            } catch (Exception e) {                LOG.error("init apache dubbo reference exception", e);            }            return reference;        }    //......    }

2.6 执行ResponsePlugin#

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

响应结果由ResponsePlugin插件处理。

    @Override    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);        assert shenyuContext != null;        // 根据rpc类型处理结果        return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);    }

处理类型由MessageWriter决定,类继承关系如下:

  • MessageWriter:接口,定义消息处理方法;
  • NettyClientMessageWriter:处理Netty调用结果;
  • RPCMessageWriter:处理RPC调用结果;
  • WebClientMessageWriter:处理WebClient调用结果;

Dubbo服务调用,处理结果当然是RPCMessageWriter了。

  • org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()

writeWith()方法中处理响应结果。


public class RPCMessageWriter implements MessageWriter {
    @Override    public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {        return chain.execute(exchange).then(Mono.defer(() -> {            Object result = exchange.getAttribute(Constants.RPC_RESULT); //获取结果            if (Objects.isNull(result)) { //处理异常                Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);                return WebFluxResultUtils.result(exchange, error);            }            return WebFluxResultUtils.result(exchange, result);//返回结果        }));    }}

分析至此,关于Dubbo插件的源码分析就完成了,分析流程图如下:

3. 小结#

本文源码分析从Dubbo服务注册开始,到Dubbo插件的服务调用。Dubbo插件主要用来处理将http请求转成dubbo协议,主要逻辑是通过泛化调用实现。

注册中心实现原理之Http注册

· One min read
Apache ShenYu Committer

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,注册中心是用于将客户端信息注册到shenyu-adminadmin再通过数据同步将这些信息同步到网关,网关通过这些数据完成流量筛选。客户端信息主要包括接口信息URI信息

本文基于shenyu-2.5.0版本进行源码分析,官网的介绍请参考 客户端接入原理

1. 注册中心原理#

当客户端启动时,读取接口信息和uri信息,通过指定的注册类型,将数据发送到shenyu-admin

图中的注册中心需要用户指定使用哪种注册类型,ShenYu当前支持HttpZookeeperEtcdConsulNacos进行注册。具体如何配置请参考 客户端接入配置

ShenYu在注册中心的原理设计上引入了DisruptorDisruptor队列在其中起到数据与操作解耦,利于扩展。如果注册请求过多,导致注册异常,也有数据缓冲作用。

如图所示,注册中心分为两个部分,一是注册中心客户端register-client,负责处理客户端数据读取。另一个是注册中心服务端register-server,负责处理服务端(就是shenyu-admin)数据写入。通过指定注册类型进行数据发送和接收。

  • 客户端:通常来说就是一个微服务,可以是springmvcspring-clouddubbogrpc等。
  • register-client:注册中心客户端,读取客户接口和uri信息。
  • Disruptor:数据与操作解耦,数据缓冲作用。
  • register-server:注册中心服务端,这里就是shenyu-admin,接收数据,写入数据库,发数据同步事件。
  • 注册类型:指定注册类型,完成数据注册,当前支持HttpZookeeperEtcdConsulNacos

本文分析的是使用Http的方式进行注册,所以具体的处理流程如下:

在客户端,数据出队列后,通过http传输数据,在服务端,提供相应的接口,接收数据,然后写入队列。

2. 客户端注册流程#

当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。官方提供的例子是一个由springboot构建的微服务。注册中心的相关配置可以参考官网 客户端接入配置

2.1 加载配置,读取属性#

先用一张图串联下注册中心客户端初始化流程:

我们分析的是通过http的方式进行注册,所以需要进行如下配置:

shenyu:  register:    registerType: http    serverLists: http://localhost:9095  props:    username: admin    password: 123456  client:    http:        props:          contextPath: /http          appName: http          port: 8189            isFull: false

每个属性表示的含义如下:

  • registerType: 服务注册类型,填写 http
  • serverList: 为http注册类型时,填写Shenyu-Admin项目的地址,注意加上http://,多个地址用英文逗号分隔。
  • username: Shenyu-Admin用户名
  • password: Shenyu-Admin用户对应的密码
  • port: 你本项目的启动端口,目前springmvc/tars/grpc需要进行填写。
  • contextPath: 为你的这个mvc项目在shenyu网关的路由前缀, 比如/order/product 等等,网关会根据你的这个前缀来进行路由。
  • appName:你的应用名称,不配置的话,会默认取 spring.application.name 的值。
  • isFull: 设置 true 代表代理你的整个服务,false表示代理你其中某几个controller;目前适用于springmvc/springcloud

项目启动后,会先加载配置文件,读取属性信息,生成相应的Bean

首先读取到的配置文件是 ShenyuSpringMvcClientConfiguration,它是shenyu 客户端http注册配置类,通过@Configuration表示这是一个配置类,通过@ImportAutoConfiguration引入其他配置类。创建SpringMvcClientEventListener,主要处理元数据和 URI 信息。

/** * shenyu 客户端http注册配置类 */@Configuration@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")public class ShenyuSpringMvcClientConfiguration {       // 创建SpringMvcClientEventListener,主要处理元数据和URI信息   @Bean   public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,                                                                     final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {       return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);   }}

ShenyuClientCommonBeanConfigurationshenyu客户端通用配置类,会创建注册中心客户端通用的bean

  • 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
  • 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置。
  • 创建ShenyuClientConfig,读取shenyu.client属性配置。

/** * shenyu客户端通用配置类 */@Configurationpublic class ShenyuClientCommonBeanConfiguration {       // 创建ShenyuClientRegisterRepository,通过工厂类创建而成。    @Bean    public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {        return ShenyuClientRegisterRepositoryFactory.newInstance(config);    }        // 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置    @Bean    @ConfigurationProperties(prefix = "shenyu.register")    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {        return new ShenyuRegisterCenterConfig();    }      // 创建ShenyuClientConfig,读取shenyu.client属性配置    @Bean    @ConfigurationProperties(prefix = "shenyu")    public ShenyuClientConfig shenyuClientConfig() {        return new ShenyuClientConfig();    }}

2.2 用于注册的 HttpClientRegisterRepository#

上面的配置文件中生成的ShenyuClientRegisterRepository是客户端注册的具体实现,它是一个接口,它的实现类如下。

  • HttpClientRegisterRepository:通过http进行注册;
  • ConsulClientRegisterRepository:通过Consul进行注册;
  • EtcdClientRegisterRepository:通过Etcd进行注册;
  • NacosClientRegisterRepository:通过nacos进行注册;
  • ZookeeperClientRegisterRepository通过Zookeeper进行注册。

具体是哪一种方式,是通过SPI进行加载实现的,实现逻辑如下:


/** * 加载 ShenyuClientRegisterRepository */public final class ShenyuClientRegisterRepositoryFactory {        private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();        /**     * 创建 ShenyuClientRegisterRepository     */    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {            // 通过SPI的方式进行加载,类型由registerType决定            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());            //执行初始化操作            result.init(shenyuRegisterCenterConfig);            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);            return result;        }        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());    }}

加载类型通过registerType指定,也就是我们在配置文件中指定的类型:

shenyu:  register:    registerType: http    serverLists: http://localhost:9095

我们指定的是http,所以会去加载HttpClientRegisterRepository。对象创建成功后,执行的初始化方法init()如下:

@Joinpublic class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {        @Override    public void init(final ShenyuRegisterCenterConfig config) {        this.username = config.getProps().getProperty(Constants.USER_NAME);        this.password = config.getProps().getProperty(Constants.PASS_WORD);        this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));        this.setAccessToken();    }    // 暂时省略其他逻辑}

读取配置文件中的usernamepasswordserverLists,即sheenyu-admin的访问账号、密码和地址信息,为后续数据发送做准备。类注解@Join用于SPI的加载。

SPI 全称为 Service Provider Interface, 是 JDK 内置的一种服务提供发现功能, 一种动态替换发现的机制。

shenyu-spiApache ShenYu网关自定义的SPI扩展实现,设计和实现原理参考了DubboSPI扩展实现

2.3 构建 元数据 和 URI信息 的 SpringMvcClientEventListener#

创建 SpringMvcClientEventListener,负责客户端 元数据URI 数据的构建和注册,它的创建是在配置文件中完成。

@Configuration@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")public class ShenyuSpringMvcClientConfiguration {     // ......        //  创建 SpringMvcClientEventListener    @Bean    public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,                                                                      final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);    }}
  • SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener

AbstractContextRefreshedEventListener是一个抽象类,它实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。它的实现目前有八种,每一种表示对应的RPC调用协议的 元数据URI 信息的注册。

  • AlibabaDubboServiceBeanListener:处理使用Alibaba Dubbo协议;
  • ApacheDubboServiceBeanListener:处理使用Apacge Dubbo协议;
  • GrpcClientEventListener:处理使用grpc协议;
  • MotanServiceEventListener:处理使用Mortan协议;
  • SofaServiceEventListener:处理使用Sofa协议;
  • SpringMvcClientEventListener:处理使用http协议;
  • SpringWebSocketClientEventListener:处理使用websocket协议;
  • TarsServiceBeanEventListener:处理使用Tars注册类型;
// 实现了ApplicationListener接口public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {
     //......
    //构造函数    public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,                                                 final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        // 读取 shenyu.client.http 配置信息        Properties props = clientConfig.getProps();        // appName 应用名称        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);        // contextPath上下文路径        this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");        if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {            String errorMsg = "client register param must config the appName or contextPath";            LOG.error(errorMsg);            throw new ShenyuClientIllegalArgumentException(errorMsg);        }        this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);        // host信息        this.host = props.getProperty(ShenyuClientConstants.HOST);        // port 客户端端口信息        this.port = props.getProperty(ShenyuClientConstants.PORT);        // 开始事件发布        publisher.start(shenyuClientRegisterRepository);    }
    // 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行    @Override    public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {        //保证该方法的内容只执行一次        if (!registered.compareAndSet(false, true)) {            return;        }        final ApplicationContext context = event.getApplicationContext();        // 获取声明RPC调用的类        Map<String, T> beans = getBeans(context);        if (MapUtils.isEmpty(beans)) {            return;        }        // 构建URI数据并注册        publisher.publishEvent(buildURIRegisterDTO(context, beans));        // 构建元数据并注册        beans.forEach(this::handle);    }
    // 交给不同的子类实现    @SuppressWarnings("all")    protected abstract URIRegisterDTO buildURIRegisterDTO(ApplicationContext context,                                                          Map<String, T> beans);            protected void handle(final String beanName, final T bean) {        Class<?> clazz = getCorrectedClass(bean);        // 获取当前bean的对应shenyu客户端的注解(对应不同的RPC调用注解不一样,像http的就是@ShenyuSpringMvcClient,而像SpringCloud的则是@ShenyuSpringCloudClient)        final A beanShenyuClient = AnnotatedElementUtils.findMergedAnnotation(clazz, getAnnotationType());        // 根据bean获取对应的path(不同子类实现不一样)        final String superPath = buildApiSuperPath(clazz, beanShenyuClient);        // 如果包含Shenyu客户端注解或者path中包括'*',表示注册整个类的接口        if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {            // 构建类的元数据,发送注册事件            handleClass(clazz, bean, beanShenyuClient, superPath);            return;        }        // 获取当前bean的所有方法        final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);        // 遍历方法        for (Method method : methods) {            // 注册符合条件的方法            handleMethod(bean, clazz, beanShenyuClient, method, superPath);        }    }
    // 构建类元数据并注册的默认实现    protected void handleClass(final Class<?> clazz,                               final T bean,                               @NonNull final A beanShenyuClient,                               final String superPath) {        publisher.publishEvent(buildMetaDataDTO(bean, beanShenyuClient, pathJoin(contextPath, superPath), clazz, null));    }
    // 构建方法元数据并注册的默认实现    protected void handleMethod(final T bean,                                final Class<?> clazz,                                @Nullable final A beanShenyuClient,                                final Method method,                                final String superPath) {        // 如果方法上有Shenyu客户端注解,就表示该方法需要注册        A methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, getAnnotationType());        if (Objects.nonNull(methodShenyuClient)) {            // 构建元数据,发送注册事件            publisher.publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));        }    }
    // 交给不同子类实现    protected abstract MetaDataRegisterDTO buildMetaDataDTO(T bean,                                                            @NonNull A shenyuClient,                                                            String path,                                                            Class<?> clazz,                                                            Method method);}

在构造函数中主要是读取属性配置。

shenyu:  client:    http:      props:        contextPath: /http        appName: http        port: 8189        isFull: false

最后,执行了publisher.start(),开始事件发布,为注册做准备。

  • ShenyuClientRegisterEventPublisher

ShenyuClientRegisterEventPublisher通过单例模式实现,主要是生成元数据和URI订阅器(后续用于数据发布),然后启动Disruptor队列。提供了一个共有方法publishEvent(),发布事件,向Disruptor队列发数据。


public class ShenyuClientRegisterEventPublisher {    // 私有变量    private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
    private DisruptorProviderManage<DataTypeParent> providerManage;        /**     * 公开静态方法     *     * @return ShenyuClientRegisterEventPublisher instance     */    public static ShenyuClientRegisterEventPublisher getInstance() {        return INSTANCE;    }        /**     * Start方法执行     *     * @param shenyuClientRegisterRepository shenyuClientRegisterRepository     */    public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        // 创建客户端注册工厂类        RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();        // 添加元数据订阅器        factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));        //  添加URI订阅器        factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));        // 启动Disruptor队列        providerManage = new DisruptorProviderManage(factory);        providerManage.startup();    }        /**     * 发布事件,向Disruptor队列发数据     *     * @param data the data     */    public <T> void publishEvent(final DataTypeParent data) {        DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();        provider.onData(data);    }}

AbstractContextRefreshedEventListener的构造函数逻辑分析完成了,主要是读取属性配置,创建元数据URI订阅器,启动Disruptor队列。

onApplicationEvent()方法是有Spring事件发生时会执行,这里的参数是ContextRefreshedEvent,表示上下文刷新事件。当Spring容器就绪后执行此处逻辑:先构建URI数据并注册,再构建元数据并注册,

ContextRefreshedEventSpring内置事件。ApplicationContext被初始化或刷新时,该事件被触发。这也可以在 ConfigurableApplicationContext接口中使用 refresh() 方法来发生。此处的初始化是指:所有的Bean被成功装载,后处理Bean被检测并激活,所有Singleton Bean 被预实例化,ApplicationContext容器已就绪可用。

再来看AbstractContextRefreshedEventListener的http实现SpringMvcClientEventListener

public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {        private final List<Class<? extends Annotation>> mappingAnnotation = new ArrayList<>(3);        private final Boolean isFull;        private final String protocol;        // 构造函数    public SpringMvcClientEventListener(final PropertiesConfig clientConfig,                                        final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        super(clientConfig, shenyuClientRegisterRepository);        Properties props = clientConfig.getProps();        // 获取 isFull        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));        // 表示是http协议的实现        this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);        mappingAnnotation.add(ShenyuSpringMvcClient.class);        mappingAnnotation.add(RequestMapping.class);    }        @Override    protected Map<String, Object> getBeans(final ApplicationContext context) {        // 配置属性,如果 isFull=true 的话,表示注册整个微服务        if (Boolean.TRUE.equals(isFull)) {            getPublisher().publishEvent(MetaDataRegisterDTO.builder()                    .contextPath(getContextPath())                    .appName(getAppName())                    .path(PathUtils.decoratorPathWithSlash(getContextPath()))                    .rpcType(RpcTypeEnum.HTTP.getName())                    .enabled(true)                    .ruleName(getContextPath())                    .build());            return null;        }        // 否则获取带Controller注解的bean        return context.getBeansWithAnnotation(Controller.class);    }        // 构造URI数据    @Override    protected URIRegisterDTO buildURIRegisterDTO(final ApplicationContext context,                                                 final Map<String, Object> beans) {        // ...    }        @Override    protected String buildApiSuperPath(final Class<?> clazz, @Nullable final ShenyuSpringMvcClient beanShenyuClient) {        // 如果有带上Shenyu客户端注解,则优先取注解中的不为空的path属性        if (Objects.nonNull(beanShenyuClient) && StringUtils.isNotBlank(beanShenyuClient.path())) {            return beanShenyuClient.path();        }        // 如果有带上RequestMapping注解,且path属性不为空,则返回path数组的第一个值        RequestMapping requestMapping = AnnotationUtils.findAnnotation(clazz, RequestMapping.class);        if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {            return requestMapping.path()[0];        }        return "";    }        // 声明http实现的客户端注解是ShenyuSpringMvcClient    @Override    protected Class<ShenyuSpringMvcClient> getAnnotationType() {        return ShenyuSpringMvcClient.class;    }        @Override    protected void handleMethod(final Object bean, final Class<?> clazz,                                @Nullable final ShenyuSpringMvcClient beanShenyuClient,                                final Method method, final String superPath) {        // 获取当前bean的RequestMapping注解        final RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(method, RequestMapping.class);        // 获取当前bean的 ShenyuSpringMvcClient 注解        ShenyuSpringMvcClient methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, ShenyuSpringMvcClient.class);        methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;        //如果有 ShenyuSpringMvcClient 注解并且包含RequestMapping注解(表示是一个接口),则进行注册        if (Objects.nonNull(methodShenyuClient) && Objects.nonNull(requestMapping)) {            getPublisher().publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));        }    }        //...        // 构造元数据    @Override    protected MetaDataRegisterDTO buildMetaDataDTO(final Object bean,                                                   @NonNull final ShenyuSpringMvcClient shenyuClient,                                                   final String path, final Class<?> clazz,                                                   final Method method) {        //...    }}

注册逻辑都是通过 publisher.publishEvent()完成。

Controller注解和RequestMapping注解是由Spring提供的,这个大家应该很熟悉,不过多赘述。ShenyuSpringMvcClient 注解是由Apache ShenYu提供的,用于注册SpringMvc客户端,它的定义如下:


/** * shenyu 客户端接口,用于方法上或类上 */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})public @interface ShenyuSpringMvcClient {
    // path 注册路径    @AliasFor(attribute = "path")    String value() default "";        // path 注册路径    @AliasFor(attribute = "value")    String path();        // ruleName 规则名称    String ruleName() default "";        // desc 描述信息    String desc() default "";
    // enabled是否启用    boolean enabled() default true;        // registerMetaData 注册元数据    boolean  registerMetaData() default false;}

它的使用如下:

  • 注册整个接口
@RestController@RequestMapping("/test")@ShenyuSpringMvcClient(path = "/test/**")  // 表示整个接口注册public class HttpTestController { //......}
  • 注册当前方法
@RestController@RequestMapping("/order")@ShenyuSpringMvcClient(path = "/order")public class OrderController {
    /**     * Save order dto.     *     * @param orderDTO the order dto     * @return the order dto     */    @PostMapping("/save")    @ShenyuSpringMvcClient(path = "/save", desc = "Save order") // 注册当前方法    public OrderDTO save(@RequestBody final OrderDTO orderDTO) {        orderDTO.setName("hello world save order");        return orderDTO;    }}
  • publisher.publishEvent() 发布注册事件

该方法会将数据发送到Disruptor队列中,关于Disruptor队列更多细节这里不做更多介绍,这不影响分析注册的流程。

当数据发送后,Disruptor队列的消费者会处理数据,进行消费。

  • QueueConsumer 消费数据

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;
public interface WorkHandler<T> {    void onEvent(T event) throws Exception;}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。


/** *  * 队列消费者 */public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {        // 省略了其他逻辑
    @Override    public void onEvent(final DataEvent<T> t) {        if (t != null) {            // 根据事件类型使用不同的线程池            ThreadPoolExecutor executor = orderly(t);            // 通过工厂创建队列消费任务            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();            // 保存数据            queueConsumerExecutor.setData(t.getData());            // help gc            t.setData(null);            // 放在线程池中执行 消费任务            executor.execute(queueConsumerExecutor);        }    }}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务(服务端就是admin,在下文进行分析)。

  • RegisterClientConsumerExecutor 消费者执行器

重写的run()逻辑如下:


public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {        //...... 
    @Override    public void run() {        // 获取数据        final T data = getData();        // 根据数据类型调用相应的处理器进行处理        subscribers.get(data.getType()).executor(Lists.newArrayList(data));    }    }

根据不同的数据类型调用不同的处理器去执行相应的任务。数据类型有两种,一个是元数据,记录客户端注册信息。一个是URI数据,记录客户端服务信息。

//数据类型public enum DataType {   // 元数据    META_DATA,       // URI数据    URI,}
  • ExecutorSubscriber#executor() 执行器订阅者

执行器订阅者也分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

先看元数据处理

  • ShenyuClientMetadataExecutorSubscriber#executor()

客户端这边对元数据处理逻辑是:遍历元数据信息,调用接口方法persistInterface()完成数据的发布。

public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {       //......        @Override    public DataType getType() {        return DataType.META_DATA; // 元数据    }        @Override    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {        for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {            // 调用接口方法persistInterface()完成数据的发布            shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);        }    }}
  • ShenyuClientRegisterRepository#persistInterface()

ShenyuClientRegisterRepository是一个接口,用于表示客户端数据注册,它的实现类目前有五种,每一种就表示一种注册方法。

  • ConsulClientRegisterRepository:通过Consul实现客户端注册;
  • EtcdClientRegisterRepository:通过Etcd实现客户端注册;
  • HttpClientRegisterRepository:通过Http实现客户端注册;
  • NacosClientRegisterRepository:通过Nacos实现客户端注册;
  • ZookeeperClientRegisterRepository:通过Zookeeper实现客户端注册;

从图中可以看出,注册中心的加载是通过SPI的方式完成的。这个在前面提到过了,在客户端通用配置文件中,通过指定配置文件中的属性完成具体的类加载。


/** * 加载 ShenyuClientRegisterRepository */public final class ShenyuClientRegisterRepositoryFactory {        private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();        /**     * 创建 ShenyuClientRegisterRepository     */    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {            // 通过SPI的方式进行加载,类型由registerType决定            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());            //执行初始化操作            result.init(shenyuRegisterCenterConfig);            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);            return result;        }        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());    }}

本文的源码分析是基于Http的方式进行注册,所以我们先分析HttpClientRegisterRepository,其他的注册方式后续再分析。HttpClientRegisterRepository继承了FailbackRegistryRepository,而FailbackRegistryRepository本身主要用于对Http注册过程中的失败异常的处理,这里就省略了。

通过http的方式注册很简单,就是调用工具类发送http请求。注册元数据和URI都是调用的同一个方法doRegister(),指定接口和类型就好。

  • Constants.URI_PATH的值/shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • Constants.META_PATH的值/shenyu-client/register-uri: 服务端提供的接口用于注册URI。
@Joinpublic class HttpClientRegisterRepository extends FailbackRegistryRepository {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientRegisterRepository.class);
    private static URIRegisterDTO uriRegisterDTO;
    private String username;
    private String password;
    private List<String> serverList;
    private String accessToken;        public HttpClientRegisterRepository() {    }        public HttpClientRegisterRepository(final ShenyuRegisterCenterConfig config) {        init(config);    }
    @Override    public void init(final ShenyuRegisterCenterConfig config) {        // admin的用户名        this.username = config.getProps().getProperty(Constants.USER_NAME);        // admin的用户名对应的密码        this.password = config.getProps().getProperty(Constants.PASS_WORD);        // admin服务列表        this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));        // 设置访问的token        this.setAccessToken();    }
    /**     * Persist uri.     *     * @param registerDTO the register dto     */    @Override    public void doPersistURI(final URIRegisterDTO registerDTO) {        if (RuntimeUtils.listenByOther(registerDTO.getPort())) {            return;        }        doRegister(registerDTO, Constants.URI_PATH, Constants.URI);        uriRegisterDTO = registerDTO;    }
    @Override    public void doPersistInterface(final MetaDataRegisterDTO metadata) {        doRegister(metadata, Constants.META_PATH, Constants.META_TYPE);    }
    @Override    public void close() {        if (uriRegisterDTO != null) {            uriRegisterDTO.setEventType(EventType.DELETED);            doRegister(uriRegisterDTO, Constants.URI_PATH, Constants.URI);        }    }
    private void setAccessToken() {        for (String server : serverList) {            try {                Optional<?> login = RegisterUtils.doLogin(username, password, server.concat(Constants.LOGIN_PATH));                login.ifPresent(v -> this.accessToken = String.valueOf(v));            } catch (Exception e) {                LOGGER.error("Login admin url :{} is fail, will retry. cause: {} ", server, e.getMessage());            }        }    }
    private <T> void doRegister(final T t, final String path, final String type) {        int i = 0;        // 遍历admin服务列表(admin可能是集群)        for (String server : serverList) {            i++;            String concat = server.concat(path);            try {                // 设置访问token                if (StringUtils.isBlank(accessToken)) {                    this.setAccessToken();                    if (StringUtils.isBlank(accessToken)) {                        throw new NullPointerException("accessToken is null");                    }                }                // 调用工具类发送 http 请求                RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);                return;            } catch (Exception e) {                LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());                if (i == serverList.size()) {                    throw new RuntimeException(e);                }            }        }    }}

将数据序列化后,通过OkHttp发送数据。


public final class RegisterUtils {      //...... 
    // 通过OkHttp发送数据    public static void doRegister(final String json, final String url, final String type) throws IOException {        if (!StringUtils.hasLength(accessToken)) {            LOGGER.error("{} client register error accessToken is null, please check the config : {} ", type, json);            return;        }        Headers headers = new Headers.Builder().add(Constants.X_ACCESS_TOKEN, accessToken).build();        String result = OkHttpTools.getInstance().post(url, json, headers);        if (Objects.equals(SUCCESS, result)) {            LOGGER.info("{} client register success: {} ", type, json);        } else {            LOGGER.error("{} client register error: {} ", type, json);        }    }}

至此,客户端通过http的方式注册元数据的逻辑就分析完了。小结一下:通过读取自定义的注解信息构造元数据,将数据发到Disruptor队列,然后从队列中消费数据,将消费者放到线程池中去执行,最终通过发送http请求到admin

再来看看 URI 数据的处理

  • ShenyuClientURIExecutorSubscriber#executor()

主要逻辑是遍历URI数据集合,通过persistURI()方法实现数据注册。


public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {        //......        @Override    public DataType getType() {        return DataType.URI; //数据类型是URI    }        // 注册URI数据    @Override    public void executor(final Collection<URIRegisterDTO> dataList) {        for (URIRegisterDTO uriRegisterDTO : dataList) {            Stopwatch stopwatch = Stopwatch.createStarted();            while (true) {                try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {                    break;                } catch (IOException e) {                    long sleepTime = 1000;                    // maybe the port is delay exposed                    if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {                        LOG.error("host:{}, port:{} connection failed, will retry",                                uriRegisterDTO.getHost(), uriRegisterDTO.getPort());                        // If the connection fails for a long time, Increase sleep time                        if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {                            sleepTime = 10000;                        }                    }                    try {                        TimeUnit.MILLISECONDS.sleep(sleepTime);                    } catch (InterruptedException ex) {                        ex.printStackTrace();                    }                }            }            //添加hook,优雅停止客户端             ShenyuClientShutdownHook.delayOtherHooks();                        // 注册URI            shenyuClientRegisterRepository.persistURI(uriRegisterDTO);        }    }}

代码中的while(true)循环是为了保证客户端已经成功启动了,通过hostport可以连接上。

后面的逻辑是:添加hook函数,用于优雅停止客户端 。

通过persistURI()方法实现数据注册。整个逻辑也在前面分析过了,最终就是通过OkHttp客户端向shenyu-admin发起http,通过http的方式注册URI

分析到这里就将客户端的注册逻辑分析完了,将构建的元数据和URI数据发送到Disruptor队列,再从中消费,读取数据,通过httpadmin发送数据。

客户端元数据URI注册流程的源码分析完成了,流程图如下:

3. 服务端注册流程#

3.1 注册接口ShenyuClientHttpRegistryController#

从前面的分析可以知道,服务端提供了注册的两个接口:

  • /shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • /shenyu-client/register-uri: 服务端提供的接口用于注册URI。

这两个接口位于ShenyuClientHttpRegistryController中,它实现了ShenyuClientServerRegisterRepository接口,是服务端注册的实现类。它用@Join标记,表示通过SPI进行加载。

// shenuyu客户端接口@RequestMapping("/shenyu-client")@Joinpublic class ShenyuClientHttpRegistryController implements ShenyuClientServerRegisterRepository {
    private ShenyuClientServerRegisterPublisher publisher;
    @Override    public void init(final ShenyuClientServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {        this.publisher = publisher;    }
    @Override    public void close() {        publisher.close();    }        // 注册元数据    @PostMapping("/register-metadata")    @ResponseBody    public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {        publisher.publish(metaDataRegisterDTO);        return ShenyuResultMessage.SUCCESS;    }           // 注册URI    @PostMapping("/register-uri")    @ResponseBody    public String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {        publisher.publish(uriRegisterDTO);        return ShenyuResultMessage.SUCCESS;    }}

两个注册接口获取到数据好,就调用了publisher.publish()方法,把数据发布到Disruptor队列中。

  • ShenyuClientServerRegisterRepository接口

ShenyuClientServerRegisterRepository接口是服务注册接口,它有五个实现类,表示有五种注册方式:

  • ConsulClientServerRegisterRepository:通过Consul实现注册;
  • EtcdClientServerRegisterRepository:通过Etcd实现注册;
  • NacosClientServerRegisterRepository:通过Nacos实现注册;
  • ShenyuClientHttpRegistryController:通过Http实现注册;
  • ZookeeperClientServerRegisterRepository:通过Zookeeper实现注册。

具体用哪一种方式,是通过配置文件指定的,然后通过SPI进行加载。

shenyu-admin中的application.yml文件中配置注册方式,registerType指定注册类型,当用http进行注册时,serverLists不需要填写,更多配置说明可以参考官网 客户端接入配置

shenyu:  register:    registerType: http     serverLists:
  • RegisterCenterConfiguration 加载配置

在引入相关依赖和属性配置后,启动shenyu-admin时,会先加载配置文件,和注册中心相关的配置文件类是RegisterCenterConfiguration

// 注册中心配置类@Configurationpublic class RegisterCenterConfiguration {    // 读取配置属性    @Bean    @ConfigurationProperties(prefix = "shenyu.register")    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {        return new ShenyuRegisterCenterConfig();    }        //创建ShenyuServerRegisterRepository,用于服务端注册    @Bean(destroyMethod = "close")    public ShenyuServerRegisterRepository shenyuServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, final List<ShenyuClientRegisterService> shenyuClientRegisterService) {        // 1.从配置属性中获取注册类型        String registerType = shenyuRegisterCenterConfig.getRegisterType();        // 2.通过注册类型,以SPI的方法加载实现类        ShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);        // 3.获取publisher,向Disruptor队列中写数据        RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();        // 4.注册Service, rpcType -> registerService        Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));        // 5.事件发布的准备工作        publisher.start(registerServiceMap);        // 6.注册的初始化操作        registerRepository.init(publisher, shenyuRegisterCenterConfig);        return registerRepository;    }}

在配置类中生成了两个bean

  • shenyuRegisterCenterConfig:读取属性配置;

  • shenyuClientServerRegisterRepository:用于服务端注册。

在创建shenyuClientServerRegisterRepository的过程中,也进行了一系列的准备工作:

  • 1.从配置属性中获取注册类型。
  • 2.通过注册类型,以SPI的方法加载实现类:比如指定的类型是http,就会加载ShenyuClientHttpRegistryController
  • 3.获取publisher,向Disruptor队列中写数据。
  • 4.注册ServicerpcType -> registerService:获取注册的Service,每种rpc都有对应的Service。本文的客户端构建是通过springboot,属于http类型,还有其他客户端类型:dubboSpring CloudgRPC等。
  • 5.事件发布的准备工作:添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。
  • 6.注册的初始化操作:http类型的注册初始化操作就是保存publisher
  • RegisterServerDisruptorPublisher#publish()

服务端向Disruptor队列写入数据的发布者 ,通过单例模式构建。


public class RegisterClientServerDisruptorPublisher implements ShenyuClientServerRegisterPublisher {    //私有属性    private static final RegisterClientServerDisruptorPublisher INSTANCE = new RegisterClientServerDisruptorPublisher();
    private DisruptorProviderManage<Collection<DataTypeParent>> providerManage;
    //公开静态方法获取实例    public static RegisterServerDisruptorPublisher getInstance() {        return INSTANCE;    }       //事件发布的准备工作,添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。    public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {        //服务端注册工厂        RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();        //添加URI数据订阅器        factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));        //添加元数据订阅器        factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));        //启动Disruptor队列        providerManage = new DisruptorProviderManage(factory);        providerManage.startup();    }        // 向队列中写入数据    @Override    public void publish(final DataTypeParent data) {        DisruptorProvider<Collection<DataTypeParent>> provider = providerManage.getProvider();        provider.onData(Collections.singleton(data));    }
    // 批量向队列中写入数据    @Override    public void publish(final Collection<? extends DataTypeParent> dataList) {        DisruptorProvider<Collection<DataTypeParent>> provider = providerManage.getProvider();        provider.onData(dataList.stream().map(DataTypeParent.class::cast).collect(Collectors.toList()));    }        @Override    public void close() {        providerManage.getProvider().shutdown();    }}

配置文件的加载,可看作是注册中心服务端初始化流程,用图描述如下:

3.2 消费数据QueueConsumer#

在前面分析了客户端disruptor队列消费数据的过。服务端也是一样的逻辑,只是其中执行任务的执行者变了。

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;
public interface WorkHandler<T> {    void onEvent(T event) throws Exception;}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。

/** *  * 队列消费者 */public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {        // 省略了其他逻辑
    @Override    public void onEvent(final DataEvent<T> t) {        if (t != null) {            // 根据事件类型获取相应的线程池            ThreadPoolExecutor executor = orderly(t);            // 通过工厂创建队列消费任务            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();            // 保存数据            queueConsumerExecutor.setData(t.getData());            // help gc            t.setData(null);            // 放在线程池中执行 消费任务            executor.execute(queueConsumerExecutor);        }    }}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务。

  • RegisterServerConsumerExecutor#run()

RegisterServerConsumerExecutor是服务端消费者执行器,它通过QueueConsumerExecutor间接实现了Runnable接口,并重写了run()方法。


public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<Collection<DataTypeParent>> {   // ...
    @Override    public void run() {        //获取从disruptor队列中拿到的数据        Collection<DataTypeParent> results = getData()                .stream()                .filter(this::isValidData)                .collect(Collectors.toList());        if (CollectionUtils.isEmpty(results)) {            return;        }        //根据类型执行操作        selectExecutor(results).executor(results);    }
    // 根据类型获取订阅者    private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {        final Optional<DataTypeParent> first = list.stream().findFirst();        return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());    }}
  • ExecutorSubscriber#executor()

执行器订阅者分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

  • MetadataExecutorSubscriber#executor()

如果是注册元数据,则通过MetadataExecutorSubscriber#executor()实现:根据类型获取注册Service,调用register()

public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {     //......
    @Override    public DataType getType() {        return DataType.META_DATA;  // 元数据类型    }
    @Override    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {        // 遍历元数据列表        metaDataRegisterDTOList.forEach(meta -> {            Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType())) // 根据类型获取注册Service                    .ifPresent(shenyuClientRegisterService -> {                        // 对元数据进行注册,加锁确保顺序执行,防止并发错误                        synchronized (shenyuClientRegisterService) {                            shenyuClientRegisterService.register(meta);                        }                    });        });    }}
  • URIRegisterExecutorSubscriber#executor()

如果是注册元数据,则通过URIRegisterExecutorSubscriber#executor()实现:构建URI数据,根据注册类型查找Service,通过registerURI方法实现注册。


public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {    //......        @Override    public DataType getType() {        return DataType.URI; // URI数据类型    }        @Override    public void executor(final Collection<URIRegisterDTO> dataList) {        if (CollectionUtils.isEmpty(dataList)) {            return;        }        // 根据rpc调用类型聚集数据        final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream()                .filter(data -> StringUtils.isNotBlank(data.getRpcType()))                .collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));        for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {            final String rpcType = entry.getKey();            // 根据类型查找Service            Optional.ofNullable(shenyuClientRegisterService.get(rpcType))                    .ifPresent(service -> {                        final List<URIRegisterDTO> list = entry.getValue();                        // 构建URI数据类型,通过registerURI方法实现注册                        Map<String, List<URIRegisterDTO>> listMap = buildData(list);                        listMap.forEach(service::registerURI);                    });        }    }}
  • ShenyuClientRegisterService#register()

ShenyuClientRegisterService是注册方法接口,它有多个实现类:

  • AbstractContextPathRegisterService:抽象类,处理部分公共逻辑;
  • AbstractShenyuClientRegisterServiceImpl::抽象类,处理部分公共逻辑;
  • ShenyuClientRegisterDivideServiceImpldivide类,处理http注册类型;
  • ShenyuClientRegisterDubboServiceImpldubbo类,处理dubbo注册类型;
  • ShenyuClientRegisterGrpcServiceImplgRPC类,处理gRPC注册类型;
  • ShenyuClientRegisterMotanServiceImplMotan类,处理Motan注册类型;
  • ShenyuClientRegisterSofaServiceImplSofa类,处理Sofa注册类型;
  • ShenyuClientRegisterSpringCloudServiceImplSpringCloud类,处理SpringCloud注册类型;
  • ShenyuClientRegisterTarsServiceImplTars类,处理Tars注册类型;
  • ShenyuClientRegisterWebSocketServiceImplWebsocket类,处理Websocket注册类型;

从上面可以看出每种微服务都有对应的注册实现类,本文的源码分析是 以官方提供的 shenyu-examples-http 为例,是属http注册类型,所以元数据和URI数据的注册实现类是 ShenyuClientRegisterDivideServiceImpl

  • register(): 注册元数据
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {        //......        public String register(final MetaDataRegisterDTO dto) {        // 1.注册选择器信息        String selectorHandler = selectorHandler(dto);        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);        // 2.注册规则信息        String ruleHandler = ruleHandler();        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        ruleService.registerDefault(ruleDTO);        // 3.注册元数据信息        registerMetadata(dto);        // 4.注册contextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }}

整个注册逻辑可以分为4个步骤:

  • 1.注册选择器信息
  • 2.注册规则信息
  • 3.注册元数据信息
  • 4.注册contextPath

admin这一侧通过客户端的元数据信息需要构建选择器、规则、元数据和ContextPath。具体的注册过程和细节处理跟rpc类型有关。我们就不再继续向下追踪了,对于注册中心的逻辑分析,跟踪到这里就够了。

服务端元数据注册流程的源码分析完了,流程图描述如下:

  • registerURI(): 注册URI数据
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {
    //......        public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {        if (CollectionUtils.isEmpty(uriList)) {            return "";        }        // 对应的选择器是否存在        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        if (Objects.isNull(selectorDO)) {            throw new ShenyuException("doRegister Failed to execute,wait to retry.");        }        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());        // 处理选择器中的handler信息        String handler = buildHandle(validUriList, selectorDO);        if (handler != null) {            selectorDO.setHandle(handler);            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));            selectorData.setHandle(handler);            // 更新数据库中的记录            selectorService.updateSelective(selectorDO);            // 发布事件            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));        }        return ShenyuResultMessage.SUCCESS;    }}

admin拿到URI数据后,主要是更新选择器中的handler信息,然后写入到数据库,最后发布事件通知网关。通知网关的逻辑是由数据同步操作完成,这在之前的文章中已经分析过了,就不再赘述。

服务端URI注册流程的源码分析完成了,用图描述如下:

至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor队列,再从中消费数据,根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler

4. 总结#

本文主要对Apache ShenYu网关中的http注册模块进行了源码分析。涉及到的主要知识点,归纳如下:

  • 注册中心是为了将客户端信息注册到admin,方便流量筛选;
  • http注册是将客户端元数据信息和URI信息注册到admin
  • http服务的接入通过注解@ShenyuSpringMvcClient标识;
  • 注册信息的构建主要通过Spring应用监听器ApplicationListener
  • 注册类型的加载通过SPI完成;
  • 引入Disruptor队列是为了数据与操作解耦,以及数据缓冲。
  • 注册中心的实现采用了面向接口编程,使用模板方法、单例、观察者等设计模式。

Nacos数据同步源码分析

· One min read
Apache ShenYu Contributor

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于Nacos的数据同步源码分析。

本文基于shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. 关于Nacos#

Nacos 平台用于动态服务发现,以及配置和服务管理。 Shenyu网关可选择使用Nacos进行数据同步。

2. Admin数据同步#

我们从一个实际案例进行源码追踪,比如在后台管理系统中,对Divide插件中的一条选择器数据进行更新,将权重更新为90:

2.1 接收数据#

  • SelectorController.updateSelector()

进入SelectorController类中的updateSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController {        @PutMapping("/{id}")    public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) {        // 设置当前选择器数据id        selectorDTO.setId(id);        // 创建或更新操作        Integer updateCount = selectorService.createOrUpdate(selectorDTO);        // 返回结果信息        return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount);    }        // ......}

2.2 处理数据#

  • SelectorServiceImpl.createOrUpdate()

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService {    // 负责事件发布的eventPublisher    private final ApplicationEventPublisher eventPublisher;        @Override    @Transactional(rollbackFor = Exception.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        // 构建数据 DTO --> DO        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        // 判断是添加还是更新        if (StringUtils.isEmpty(selectorDTO.getId())) {            // 插入选择器数据            selectorCount = selectorMapper.insertSelective(selectorDO);            // 插入选择器中的条件数据            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });            // check selector add            // 权限检查            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());                dataPermissionDTO.setDataId(selectorDO.getId());                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));            }
        } else {            // 更新数据,先删除再新增            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        // 发布事件        publishEvent(selectorDO, selectorConditionDTOs);
        // 更新upstream        updateDivideUpstream(selectorDO);        return selectorCount;    }            // ......    }

Service类完成数据的持久化操作,即保存数据到数据库,这个比较简单,就不深入追踪了。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会执行数据同步。

publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。

     private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        // 找到选择器对应的插件        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        // 构建条件数据        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // 发布变更数据        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者;

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {//......}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

2.3 分发数据#

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * 有数据变更时,调用此方法     * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                case APP_AUTH: // 认证信息                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // 插件信息                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // 规则信息                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // 元数据                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // 其他类型,抛出异常                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    }

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:

  • WebsocketDataChangedListener:基于websocket的数据同步;
  • ZookeeperDataChangedListener:基于zookeeper的数据同步;
  • ConsulDataChangedListener:基于consul的数据同步;
  • EtcdDataDataChangedListener:基于etcd的数据同步;
  • HttpLongPollingDataChangedListener:基于http长轮询的数据同步;
  • NacosDataChangedListener:基于nacos的数据同步;

既然有这么多种实现策略,那么如何确定使用哪一种呢?

因为本文是基于Nacos的数据同步源码分析,所以这里以NacosDataChangedListener为例,分析它是如何被加载并实现的。

通过查看对NacosDataChangedListener类的调用,可以发现,它是在DataSyncConfiguration类进行配置的。

/** * 数据同步配置类 * 通过springboot条件装配实现 * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {
   //省略了其他代码......      /**     * The type Nacos listener.     */    @Configuration    @ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url")    @Import(NacosConfiguration.class)    static class NacosListener {
        /**         * Data changed listener data changed listener.         *         * @param configService the config service         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(NacosDataChangedListener.class)        public DataChangedListener nacosDataChangedListener(final ConfigService configService) {            return new NacosDataChangedListener(configService);        }
        /**         * Nacos data init zookeeper data init.         *         * @param configService the config service         * @param syncDataService the sync data service         * @return the nacos data init         */        @Bean        @ConditionalOnMissingBean(NacosDataInit.class)        public NacosDataInit nacosDataInit(final ConfigService configService, final SyncDataService syncDataService) {            return new NacosDataInit(configService, syncDataService);        }    }         //省略了其他代码......}

这个配置类是通过SpringBoot条件装配类实现的。在NacosListener类上面有几个注解:

  • @Configuration:配置文件,应用上下文;

  • @ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url"):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用nacos进行数据同步。

    shenyu:    sync:     nacos:          url: localhost:8848
  • @Import(NacosConfiguration.class):导入另一个配置类NacosConfigurationNacosConfiguration提供了一个方法ConfigService nacosConfigService(final NacosProperties nacosProp),将Nacos属性转换为ConfigService类型的bean,而Nacos属性是通过@EnableConfigurationProperties(NacosProperties.class) 导入的。我们先看ConfigService类型的bean定义。再分析属性配置类和对应的属性配置文件。

/** * Nacos configuration. */@EnableConfigurationProperties(NacosProperties.class)public class NacosConfiguration {
    /**     * register configService in spring ioc.     *     * @param nacosProp the nacos configuration     * @return ConfigService {@linkplain ConfigService}     * @throws Exception the exception     */    @Bean    @ConditionalOnMissingBean(ConfigService.class)    public ConfigService nacosConfigService(final NacosProperties nacosProp) throws Exception {        Properties properties = new Properties();        if (nacosProp.getAcm() != null && nacosProp.getAcm().isEnabled()) {            // Use aliyun ACM service            properties.put(PropertyKeyConst.ENDPOINT, nacosProp.getAcm().getEndpoint());            properties.put(PropertyKeyConst.NAMESPACE, nacosProp.getAcm().getNamespace());            // Use subaccount ACM administrative authority            properties.put(PropertyKeyConst.ACCESS_KEY, nacosProp.getAcm().getAccessKey());            properties.put(PropertyKeyConst.SECRET_KEY, nacosProp.getAcm().getSecretKey());        } else {            properties.put(PropertyKeyConst.SERVER_ADDR, nacosProp.getUrl());            if (StringUtils.isNotBlank(nacosProp.getNamespace())) {                properties.put(PropertyKeyConst.NAMESPACE, nacosProp.getNamespace());            }            if (StringUtils.isNotBlank(nacosProp.getUsername())) {                properties.put(PropertyKeyConst.USERNAME, nacosProp.getUsername());            }            if (StringUtils.isNotBlank(nacosProp.getPassword())) {                properties.put(PropertyKeyConst.PASSWORD, nacosProp.getPassword());            }        }        return NacosFactory.createConfigService(properties);    }}

这个方法主要分成两步,第一步根据是否使用了aliyun的ACM服务,从NacosProperties中获取不同的nacos路径和鉴权信息,第二步根据获取到的这些属性,使用Nacos官方的工厂方法,使用反射的方式,创建configService。

接下来,让我们分析一下Nacos的属性配置和对应的配置文件。

/** * The type Nacos config. */@ConfigurationProperties(prefix = "shenyu.sync.nacos")public class NacosProperties {
    private String url;
    private String namespace;
    private String username;
    private String password;
    private NacosACMProperties acm;
    /**     * Gets the value of url.     *     * @return the value of url     */    public String getUrl() {        return url;    }
    /**     * Sets the url.     *     * @param url url     */    public void setUrl(final String url) {        this.url = url;    }
    /**     * Gets the value of namespace.     *     * @return the value of namespace     */    public String getNamespace() {        return namespace;    }
    /**     * Sets the namespace.     *     * @param namespace namespace     */    public void setNamespace(final String namespace) {        this.namespace = namespace;    }
    /**     * Gets the value of username.     *     * @return the value of username     */    public String getUsername() {        return username;    }
    /**     * Sets the username.     *     * @param username username     */    public void setUsername(final String username) {        this.username = username;    }
    /**     * Gets the value of password.     *     * @return the value of password     */    public String getPassword() {        return password;    }
    /**     * Sets the password.     *     * @param password password     */    public void setPassword(final String password) {        this.password = password;    }
    /**     * Gets the value of acm.     *     * @return the value of acm     */    public NacosACMProperties getAcm() {        return acm;    }
    /**     * Sets the acm.     *     * @param acm acm     */    public void setAcm(final NacosACMProperties acm) {        this.acm = acm;    }
    public static class NacosACMProperties {
        private boolean enabled;
        private String endpoint;
        private String namespace;
        private String accessKey;
        private String secretKey;
        /**         * Gets the value of enabled.         *         * @return the value of enabled         */        public boolean isEnabled() {            return enabled;        }
        /**         * Sets the enabled.         *         * @param enabled enabled         */        public void setEnabled(final boolean enabled) {            this.enabled = enabled;        }
        /**         * Gets the value of endpoint.         *         * @return the value of endpoint         */        public String getEndpoint() {            return endpoint;        }
        /**         * Sets the endpoint.         *         * @param endpoint endpoint         */        public void setEndpoint(final String endpoint) {            this.endpoint = endpoint;        }
        /**         * Gets the value of namespace.         *         * @return the value of namespace         */        public String getNamespace() {            return namespace;        }
        /**         * Sets the namespace.         *         * @param namespace namespace         */        public void setNamespace(final String namespace) {            this.namespace = namespace;        }
        /**         * Gets the value of accessKey.         *         * @return the value of accessKey         */        public String getAccessKey() {            return accessKey;        }
        /**         * Sets the accessKey.         *         * @param accessKey accessKey         */        public void setAccessKey(final String accessKey) {            this.accessKey = accessKey;        }
        /**         * Gets the value of secretKey.         *         * @return the value of secretKey         */        public String getSecretKey() {            return secretKey;        }
        /**         * Sets the secretKey.         *         * @param secretKey secretKey         */        public void setSecretKey(final String secretKey) {            this.secretKey = secretKey;        }    }
}

当我们在配置文件中配置了shenyu.sync.nacos.url属性时,将采用nacos进行数据同步,此时配置类NacosListener会生效,并生成NacosDataChangedListenerNacosDataInit类型的bean。

  • 生成NacosDataChangedListener类型的bean,nacosDataChangedListener,这个bean将ConfigService类型的bean作为成员变量,ConfigService是nacos官方提供的api,当nacosDataChangedListener监听到事件时,进行回调操作,可以通过该api直接与nacos服务器交互,修改配置。
  • 生成NacosDataInit类型的bean,nacosDataInit,这个bean将beanconfigService和beansyncDataService作为成员变量,调用Nacos的api configService判断配置是否未初始化,未初始化则调用syncDataService进行刷新操作,将在下文详述。 根据上文所述,在事件处理方法onApplicationEvent()中,会触发相应的listener的操作。在我们的案例中,是对一条选择器数据进行更新,数据同步采用的是nacos,所以,代码会进入到NacosDataChangedListener进行选择器数据变更处理。
    //DataChangedEventDispatcher.java        @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                                    // 省略了其他逻辑                                    case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // 在我们的案例中,会进入到NacosDataChangedListener进行选择器数据变更处理                    break;         }    }

2.4 Nacos数据变更监听器#

  • NacosDataChangedListener.onSelectorChanged()

    onSelectorChanged()方法中,判断操作类型,是刷新同步还是更新或创建同步。根据当前选择器数据信息判断节点是否在nacos中。

/** * Use nacos to push data changes. */public class NacosDataChangedListener implements DataChangedListener {    // 选择器信息发生改变    @Override    public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {        updateSelectorMap(getConfig(NacosPathConstants.SELECTOR_DATA_ID));        switch (eventType) {            case DELETE:                changed.forEach(selector -> {                    List<SelectorData> ls = SELECTOR_MAP                            .getOrDefault(selector.getPluginName(), new ArrayList<>())                            .stream()                            .filter(s -> !s.getId().equals(selector.getId()))                            .sorted(SELECTOR_DATA_COMPARATOR)                            .collect(Collectors.toList());                    SELECTOR_MAP.put(selector.getPluginName(), ls);                });                break;            case REFRESH:            case MYSELF:                SELECTOR_MAP.keySet().removeAll(SELECTOR_MAP.keySet());                changed.forEach(selector -> {                    List<SelectorData> ls = SELECTOR_MAP                            .getOrDefault(selector.getPluginName(), new ArrayList<>())                            .stream()                            .sorted(SELECTOR_DATA_COMPARATOR)                            .collect(Collectors.toList());                    ls.add(selector);                    SELECTOR_MAP.put(selector.getPluginName(), ls);                });                break;            default:                changed.forEach(selector -> {                    List<SelectorData> ls = SELECTOR_MAP                            .getOrDefault(selector.getPluginName(), new ArrayList<>())                            .stream()                            .filter(s -> !s.getId().equals(selector.getId()))                            .sorted(SELECTOR_DATA_COMPARATOR)                            .collect(Collectors.toList());                    ls.add(selector);                    SELECTOR_MAP.put(selector.getPluginName(), ls);                });                break;        }        publishConfig(NacosPathConstants.SELECTOR_DATA_ID, SELECTOR_MAP);    }}

这部分是核心。changed表示需更新的SelectorData列表,eventType表示事件类型。SELECTOR_MAP的类型是ConcurrentMap<String, List<SelectorData>>,该map的key为selector所属的plugin的名称,value为该plugin下的selector列表。NacosPathConstants.SELECTOR_DATA_ID的值为shenyu.selector.json。操作步骤如下,第一步,使用getConfig方法调用Nacos的api,从Nacos获取groupshenyu.selector.json的配置信息,updateSelectorMap方法使用这些配置信息更新SELECTOR_MAP,这样就同步到了Nacos上最新的selector信息。第二步,再根据事件类型来更新SELECTOR_MAP,最后使用publishConfig方法,调用Nacos的api,将Nacos上,groupshenyu.selector.json的配置进行全量替换。

只要将变动的数据正确写入到Nacos上,admin这边的操作就执行完成了。

在我们当前的案例中,对Divide插件中的一条选择器数据进行更新,将权重更新为90,就会对图中的特定节点更新。

我们用时序图将上面的更新流程串联起来。

3. 网关数据同步#

假设ShenYu网关已经在正常运行,使用的数据同步方式也是nacos。那么当在admin端更新选择器数据后,并且向nacos发送了变更的数据,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。

3.1 NacosSyncDataService接收数据#

网关是通过NacosSyncDataServicenacos进行监听并获取数据更新的,但是在这部分内容之前,我们先看一下NacosSyncDataService类型的bean是如何生成的。答案是在Spring配置类NacosSyncDataConfiguration中定义的。我们看到NacosSyncDataConfiguration类上的注解,@ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url"),这个注解我们在上文对ShenYu的Admin端中的NacosListener类进行分析时看到过,是一个属性条件判断,满足条件,该配置类才会生效。也就是说,当我们在Shenyu网关端有如下配置时,就表示Shenyu网关端采用nacos进行数据同步,NacosSyncDataConfiguration这个配置类生效。

shenyu:    sync:     nacos:          url: localhost:8848
/** * Nacos sync data configuration for spring boot. */@Configuration@ConditionalOnClass(NacosSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url")public class NacosSyncDataConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncDataConfiguration.class);
    /**     * Nacos sync data service.     *     * @param configService     the config service     * @param pluginSubscriber the plugin subscriber     * @param metaSubscribers   the meta subscribers     * @param authSubscribers   the auth subscribers     * @return the sync data service     */    @Bean    public SyncDataService nacosSyncDataService(final ObjectProvider<ConfigService> configService, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        LOGGER.info("you use nacos sync shenyu data.......");        return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));    }
    /**     * Nacos config service config service.     *     * @param nacosConfig the nacos config     * @return the config service     * @throws Exception the exception     */    @Bean    public ConfigService nacosConfigService(final NacosConfig nacosConfig) throws Exception {        Properties properties = new Properties();        if (nacosConfig.getAcm() != null && nacosConfig.getAcm().isEnabled()) {            properties.put(PropertyKeyConst.ENDPOINT, nacosConfig.getAcm().getEndpoint());            properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getAcm().getNamespace());            properties.put(PropertyKeyConst.ACCESS_KEY, nacosConfig.getAcm().getAccessKey());            properties.put(PropertyKeyConst.SECRET_KEY, nacosConfig.getAcm().getSecretKey());        } else {            properties.put(PropertyKeyConst.SERVER_ADDR, nacosConfig.getUrl());            if (StringUtils.isNotBlank(nacosConfig.getNamespace())) {                properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getNamespace());            }            if (nacosConfig.getUsername() != null) {                properties.put(PropertyKeyConst.USERNAME, nacosConfig.getUsername());            }            if (nacosConfig.getPassword() != null) {                properties.put(PropertyKeyConst.PASSWORD, nacosConfig.getPassword());            }        }        return NacosFactory.createConfigService(properties);    }
    /**     * Http config http config.     *     * @return the http config     */    @Bean    @ConfigurationProperties(prefix = "shenyu.sync.nacos")    public NacosConfig nacosConfig() {        return new NacosConfig();    }}

我们重点关注一下上面代码中nacosSyncDataService这个bean的生成:

@Beanpublic SyncDataService nacosSyncDataService(final ObjectProvider<ConfigService> configService, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        LOGGER.info("you use nacos sync shenyu data.......");        return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));}

是直接调用NacosSyncDataService的构造方法new了一个该类型的对象。我们继续看构造方法:

public NacosSyncDataService(final ConfigService configService, final PluginDataSubscriber pluginDataSubscriber,                                final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
        super(configService, pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);        start();}
    public void start() {        watcherData(NacosPathConstants.PLUGIN_DATA_ID, this::updatePluginMap);        watcherData(NacosPathConstants.SELECTOR_DATA_ID, this::updateSelectorMap);        watcherData(NacosPathConstants.RULE_DATA_ID, this::updateRuleMap);        watcherData(NacosPathConstants.META_DATA_ID, this::updateMetaDataMap);        watcherData(NacosPathConstants.AUTH_DATA_ID, this::updateAuthMap);    }
    protected void watcherData(final String dataId, final OnChange oc) {        Listener listener = new Listener() {            @Override            public void receiveConfigInfo(final String configInfo) {                oc.change(configInfo);            }
            @Override            public Executor getExecutor() {                return null;            }        };        oc.change(getConfigAndSignListener(dataId, listener));        LISTENERS.computeIfAbsent(dataId, key -> new ArrayList<>()).add(listener);    }

可以看到,在构造方法中调用了start方法,并且通过watcherData方法创建了监听器,并且关联了回调函数oc,由于我们正在分析selector类型组件的变化,对应的回调函数是updateSelectorMap。这个回调函数用于处理数据。

3.2 处理数据#

  • NacosCacheHandler.updateSelectorMap()

经过判空逻辑之后,缓存选择器数据的操作又交给了PluginDataSubscriber处理。

    protected void updateSelectorMap(final String configInfo) {        try {            List<SelectorData> selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList());            selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {                subscriber.unSelectorSubscribe(selectorData);                subscriber.onSelectorSubscribe(selectorData);            }));        } catch (JsonParseException e) {            LOG.error("sync selector data have error:", e);        }    }

PluginDataSubscriber是一个接口,它只有一个CommonPluginDataSubscriber实现类,负责处理插件、选择器和规则数据。

3.3 通用插件数据订阅者#

  • PluginDataSubscriber.onSelectorSubscribe()

它没有其他逻辑,直接调用subscribeDataHandler()方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。

/** * 通用插件数据订阅者,负责处理所有插件、选择器和规则信息 * The type Common plugin data subscriber. */public class CommonPluginDataSubscriber implements PluginDataSubscriber {    //......     // 处理选择器数据    @Override    public void onSelectorSubscribe(final SelectorData selectorData) {        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);    }            // 订阅数据处理器,处理数据的更新或删除    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {        Optional.ofNullable(classData).ifPresent(data -> {            // 插件数据            if (data instanceof PluginData) {                PluginData pluginData = (PluginData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cachePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));                }            } else if (data instanceof SelectorData) {  // 选择器数据                SelectorData selectorData = (SelectorData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));                }            } else if (data instanceof RuleData) {  // 规则数据                RuleData ruleData = (RuleData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));                } else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));                }            }        });    }    }

3.4 数据缓存到内存#

那么更新一条选择器数据,会进入下面的逻辑:

// 将数据保存到网关内存BaseDataCache.getInstance().cacheSelectData(selectorData);// 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

一是将数据保存到网关的内存中。BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

public final class BaseDataCache {    // 私有变量    private static final BaseDataCache INSTANCE = new BaseDataCache();    // 私有构造器    private BaseDataCache() {    }        /**     * Gets instance.     *  公开方法     * @return the instance     */    public static BaseDataCache getInstance() {        return INSTANCE;    }        /**    *  缓存选择器数据的Map     * pluginName -> SelectorData.     */    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();        public void cacheSelectData(final SelectorData selectorData) {        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);    }           /**     * cache selector data.     * 缓存选择器数据     * @param data the selector data     */    private void selectorAccept(final SelectorData data) {        String key = data.getPluginName();        if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入            List<SelectorData> existList = SELECTOR_MAP.get(key);            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());            resultList.add(data);            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());            SELECTOR_MAP.put(key, collect);        } else {  // 新增操作,直接放到Map中            SELECTOR_MAP.put(key, Lists.newArrayList(data));        }    }    }

二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。

经过以上的源码追踪,并通过一个实际的案例,在admin端新增更新一条选择器数据,就将nacos数据同步的流程分析清楚了。

我们还是通过时序图将网关端的数据同步流程串联一下:

数据同步的流程已经分析完了,为了不让同步流程被打断,在分析过程中就忽略了其他逻辑。网关同步操作初始化的流程在NacosSyncDataServicestart方法中,我们在上文分析网关数据同步时分析过了,下面分析Admin的同步数据初始化。

4. Admin同步数据初始化#

admin端,NacosDataInit类型的bean,在NacosListener中进行定义和生成,如果admin的配置中指定了使用nacos进行数据同步,当admin启动后,会将当前的数据信息全量同步到nacos中,实现逻辑如下:


/** * The type Nacos data init. */public class NacosDataInit implements CommandLineRunner {
    private static final Logger LOG = LoggerFactory.getLogger(NacosDataInit.class);
    private final ConfigService configService;
    private final SyncDataService syncDataService;
    /**     * Instantiates a new Nacos data init.     * @param configService the nacos config service     * @param syncDataService the sync data service     */    public NacosDataInit(final ConfigService configService, final SyncDataService syncDataService) {        this.configService = configService;        this.syncDataService = syncDataService;    }
    @Override    public void run(final String... args) {        String pluginDataId = NacosPathConstants.PLUGIN_DATA_ID;        String authDataId = NacosPathConstants.AUTH_DATA_ID;        String metaDataId = NacosPathConstants.META_DATA_ID;        if (dataIdNotExist(pluginDataId) && dataIdNotExist(authDataId) && dataIdNotExist(metaDataId)) {            syncDataService.syncAll(DataEventTypeEnum.REFRESH);        }    }
    private boolean dataIdNotExist(final String pluginDataId) {        try {            String group = NacosPathConstants.GROUP;            long timeout = NacosPathConstants.DEFAULT_TIME_OUT;            return configService.getConfig(pluginDataId, group, timeout) == null;        } catch (NacosException e) {            LOG.error("Get data from nacos error.", e);            throw new ShenyuException(e.getMessage());        }    }}

判断nacos中是否存在数据,如果不存在,则进行同步。

NacosDataInit实现了CommandLineRunner接口。它是springboot提供的接口,会在所有 Spring Beans初始化之后执行run()方法,常用于项目中初始化的操作。

  • SyncDataService.syncAll()

从数据库查询数据,然后进行全量数据同步,所有的认证信息、插件信息、选择器信息、规则信息和元数据信息。主要是通过eventPublisher发布同步事件。这里就跟前面提到的同步逻辑就又联系起来了,eventPublisher通过publishEvent()发布完事件后,有ApplicationListener执行事件变更操作,在ShenYu中就是前面提到的DataChangedEventDispatcher

@Servicepublic class SyncDataServiceImpl implements SyncDataService {    // 事件发布    private final ApplicationEventPublisher eventPublisher;         /***     * 全量数据同步     * @param type the type     * @return     */    @Override    public boolean syncAll(final DataEventTypeEnum type) {        // 同步认证信息        appAuthService.syncData();        // 同步插件信息        List<PluginData> pluginDataList = pluginService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));        // 同步选择器信息        List<SelectorData> selectorDataList = selectorService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));        // 同步规则信息        List<RuleData> ruleDataList = ruleService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));        // 同步元数据信息        metaDataService.syncData();        return true;    }    }

5. 总结#

本文通过一个实际案例,对nacos的数据同步原理进行了源码分析。涉及到的主要知识点如下:

  • 基于nacos的数据同步,主要是通过watch机制实现;
  • 通过Spring完成事件发布和监听;
  • 通过抽象DataChangedListener接口,支持多种同步策略,面向接口编程;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过SpringBoot的条件装配和starter加载机制实现配置类的加载。

WebSocket数据同步源码分析

· One min read
Apache ShenYu Committer

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosetcdConsul 进行数据同步。本文的主要内容是基于WebSocket的数据同步源码分析。

本文基于shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. 关于WebSocket通信#

WebSocket协议诞生于2008年,在2011年成为国际标准。它可以双向通信,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息。WebSocket协议建立在 TCP 协议之上,属于应用层,性能开销小,通信高效,协议标识符是ws

2. Admin数据同步#

我们从一个实际案例进行源码追踪,比如在后台管理系统中,新增一条选择器数据:

2.1 接收数据#

  • SelectorController.createSelector()

进入SelectorController类中的createSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController {        @PostMapping("")    public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) { // @Valid 数校验        // 添加或更新数据        Integer createCount = selectorService.createOrUpdate(selectorDTO);        // 返回结果信息        return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);    }        // ......}

2.2 处理数据#

  • SelectorServiceImpl.createOrUpdate()

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService {    // 负责事件发布的eventPublisher    private final ApplicationEventPublisher eventPublisher;        @Override    @Transactional(rollbackFor = Exception.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        // 构建数据 DTO --> DO        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        // 判断是添加还是更新        if (StringUtils.isEmpty(selectorDTO.getId())) {            // 插入选择器数据            selectorCount = selectorMapper.insertSelective(selectorDO);            // 插入选择器中的条件数据            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });            // check selector add            // 权限检查            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());                dataPermissionDTO.setDataId(selectorDO.getId());                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));            }
        } else {            // 更新数据,先删除再新增            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        // 发布事件        publishEvent(selectorDO, selectorConditionDTOs);
        // 更新upstream        updateDivideUpstream(selectorDO);        return selectorCount;    }            // ......    }

Service类完成数据的持久化操作,即保存数据到数据库,这个大家应该很熟悉了,就不展开。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会进行数据同步。

publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        // 找到选择器对应的插件        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        // 构建条件数据        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // 发布变更数据        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者。

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {//......}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

2.3 分发数据#

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * 有数据变更时,调用此方法     * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                case APP_AUTH: // 认证信息                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // 插件信息                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // 规则信息                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // 元数据                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // 其他类型,抛出异常                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    }

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:

  • WebsocketDataChangedListener:基于websocket的数据同步;
  • ZookeeperDataChangedListener:基于zookeeper的数据同步;
  • ConsulDataChangedListener:基于consul的数据同步;
  • EtcdDataDataChangedListener:基于etcd的数据同步;
  • HttpLongPollingDataChangedListener:基于http长轮询的数据同步;
  • NacosDataChangedListener:基于nacos的数据同步;

既然有这么多种实现策略,那么如何确定使用哪一种呢?

因为本文是基于websocket的数据同步源码分析,所以这里以WebsocketDataChangedListener为例,分析它是如何被加载并实现的。

通过在源码工程中进行全局搜索,可以看到,它的实现是在DataSyncConfiguration类完成的。

/** * 数据同步配置类 * 通过springboot条件装配实现 * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {     /**     * websocket数据同步(默认策略)     * The WebsocketListener(default strategy).     */    @Configuration    @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)    @EnableConfigurationProperties(WebsocketSyncProperties.class)    static class WebsocketListener {
        /**         * Config event listener data changed listener.         * 配置websocket数据变更监听器         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(WebsocketDataChangedListener.class)        public DataChangedListener websocketDataChangedListener() {            return new WebsocketDataChangedListener();        }
        /**         * Websocket collector.         * Websocket处理类:建立连接,发送消息,关闭连接等操作         * @return the websocket collector         */        @Bean        @ConditionalOnMissingBean(WebsocketCollector.class)        public WebsocketCollector websocketCollector() {            return new WebsocketCollector();        }
        /**         * Server endpoint exporter          *         * @return the server endpoint exporter         */        @Bean        @ConditionalOnMissingBean(ServerEndpointExporter.class)        public ServerEndpointExporter serverEndpointExporter() {            return new ServerEndpointExporter();        }    }        //......}

这个配置类是通过SpringBoot条件装配类实现的。在WebsocketListener类上面有几个注解:

  • @Configuration:配置文件,应用上下文;

  • @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用websocket进行数据同步。不过,这里需要注意下matchIfMissing = true这个属性,它表示,如果你没有如下的配置,该配置类也会生效。基于websocket的数据同步时官方推荐的方式,也是默认采用的方式。

    shenyu:    sync:    websocket:      enabled: true
  • @EnableConfigurationProperties:启用配置属性;

当我们主动配置,采用websocket进行数据同步时,WebsocketDataChangedListener就会生成。所以在事件处理方法onApplicationEvent()中,就会到相应的listener中。在我们的案例中,是新增加了一条选择器数据,数据通过采用的是websocket,所以,代码会进入到WebsocketDataChangedListener进行选择器数据变更处理。

    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                                    // 省略了其他逻辑                                    case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // WebsocketDataChangedListener进行选择器数据变更处理                    break;         }    }

2.4 Websocket数据变更监听器#

  • WebsocketDataChangedListener.onSelectorChanged()

    onSelectorChanged()方法中,将数据进行了封装,转成WebsocketData,然后通过WebsocketCollector.send()发送数据。

    // 选择器数据有更新    @Override    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {        // 构造 WebsocketData 数据        WebsocketData<SelectorData> websocketData =                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);        // 通过websocket发送数据        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);    }

2.5 Websocket发送数据#

  • WebsocketCollector.send()

send()方法中,判断了一下同步的类型,根据不同的类型,进行处理。

@Slf4j@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)public class WebsocketCollector {    /**     * Send.     *     * @param message the message     * @param type    the type     */    public static void send(final String message, final DataEventTypeEnum type) {        if (StringUtils.isNotBlank(message)) {            // 如果是MYSELF(第一次的全量同步)            if (DataEventTypeEnum.MYSELF == type) {                // 从threadlocal中获取session                Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);                if (session != null) {                    // 向该session发送全量数据                    sendMessageBySession(session, message);                }            } else {                // 后续的增量同步                // 向所有的session中同步变更数据                SESSION_SET.forEach(session -> sendMessageBySession(session, message));            }        }    }
    private static void sendMessageBySession(final Session session, final String message) {        try {            // 通过websocket的session把消息发送出去            session.getBasicRemote().sendText(message);        } catch (IOException e) {            log.error("websocket send result is exception: ", e);        }    }}

我们给的案例是一个新增操作 ,是一个增量同步,所以会走

SESSION_SET.forEach(session -> sendMessageBySession(session, message));

这个逻辑。

再通过

session.getBasicRemote().sendText(message);

将数据发送了出去。

至此,当admin端发生数据变更时,就将变更的数据以增量形式通过WebSocket发给了网关。

分析到这里,不知道大家有没有疑问呢?比如session是怎么来的?网关如何和admin建立连接的?

不要着急,我们接下来就进行网关端的同步分析。

不过,在继续源码分析前,我们用一张图将上面的分析过程串联起来。

3. 网关数据同步#

假设ShenYu网关已经在正常运行了,使用的数据同步方式也是websocket。那么当在admin端新增一条选择器数据后,并且通过WebSocket发送到网关,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。

3.1 WebsocketClient接收数据#

  • ShenyuWebsocketClient.onMessage()

在网关端有一个ShenyuWebsocketClient类,它继承了WebSocketClient,可以和WebSocket建立连接并通信。

public final class ShenyuWebsocketClient extends WebSocketClient {  // ......}

当在admin端通过websocket发送数据后,ShenyuWebsocketClient就可以通过onMessage()接收到数据,然后就可以自己进行处理。

public final class ShenyuWebsocketClient extends WebSocketClient {      // 接受到消息后执行    @Override    public void onMessage(final String result) {        // 处理接收到的数据        handleResult(result);    }        private void handleResult(final String result) {        // 数据反序列化        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);        // 哪种数据类型,插件、选择器、规则...        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());        // 哪种操作类型,更新、删除...        String eventType = websocketData.getEventType();        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // 处理数据        websocketDataHandler.executor(groupEnum, json, eventType);    }}

接收到数据后,首先进行了反序列化操作,读取数据类型和操作类型,紧接着,就交给websocketDataHandler.executor()进行处理。

3.2 执行Websocket事件处理器#

  • WebsocketDataHandler.executor()

通过工厂模式创建了Websocket数据处理器,每种数据类型,都提供了一个处理器:

插件 --> 插件数据处理器;

选择器 --> 选择器数据处理器;

规则 --> 规则数据处理器;

认证信息 --> 认证数据处理器;

元数据 --> 元数据处理器。


/** * 通过工厂模式创建 Websocket数据处理器 * The type Websocket cache handler. */public class WebsocketDataHandler {
    private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
    /**     * Instantiates a new Websocket data handler.     * 每种数据类型,提供一个处理器     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers  the meta data subscribers     * @param authDataSubscribers  the auth data subscribers     */    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,                                final List<MetaDataSubscriber> metaDataSubscribers,                                final List<AuthDataSubscriber> authDataSubscribers) {        // 插件 --> 插件数据处理器        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));        // 选择器 --> 选择器数据处理器        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));        // 规则 --> 规则数据处理器        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));        // 认证信息 --> 认证数据处理器        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));        // 元数据 --> 元数据处理器        ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));    }
    /**     * Executor.     *     * @param type      the type     * @param json      the json     * @param eventType the event type     */    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {        // 根据数据类型,找到对应的数据处理器        ENUM_MAP.get(type).handle(json, eventType);    }}

不同的数据类型,有不同的数据处理方式,所以有不同的实现类。但是它们之间也有相同的处理逻辑,所以可以通过模板方法设计模式来实现。相同的逻辑放在抽象类中的handle()方法中,不同逻辑就交给各自的实现类。

我们的案例是新增了一条选择器数据,所以会交给SelectorDataHandler( 选择器 --> 选择器数据处理器)进行数据处理。

3.3 判断事件类型#

  • AbstractDataHandler.handle()

实现数据变更的通用逻辑处理:根据不同的操作类型调用不同方法。


public abstract class AbstractDataHandler<T> implements DataHandler {
    /**     * Convert list.     * 不同的逻辑由各自实现类去实现     * @param json the json     * @return the list     */    protected abstract List<T> convert(String json);
    /**     * Do refresh.     * 不同的逻辑由各自实现类去实现     * @param dataList the data list     */    protected abstract void doRefresh(List<T> dataList);
    /**     * Do update.     * 不同的逻辑由各自实现类去实现     * @param dataList the data list     */    protected abstract void doUpdate(List<T> dataList);
    /**     * Do delete.     * 不同的逻辑由各自实现类去实现     * @param dataList the data list     */    protected abstract void doDelete(List<T> dataList);
    // 通用逻辑,抽象类实现    @Override    public void handle(final String json, final String eventType) {        List<T> dataList = convert(json);        if (CollectionUtils.isNotEmpty(dataList)) {            DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);            switch (eventTypeEnum) {                case REFRESH:                case MYSELF:                    doRefresh(dataList);  //刷新数据,全量同步                    break;                case UPDATE:                case CREATE:                    doUpdate(dataList); // 更新或创建数据,增量同步                    break;                case DELETE:                    doDelete(dataList);  // 删除数据                    break;                default:                    break;            }        }    }}

新增一条选择器数据,是新增操作,通过switch-case进入到doUpdate()方法中。

3.4 进入具体的数据处理器#

  • SelectorDataHandler.doUpdate()

/** * 选择器数据处理器 * The type Selector data handler. */@RequiredArgsConstructorpublic class SelectorDataHandler extends AbstractDataHandler<SelectorData> {
    private final PluginDataSubscriber pluginDataSubscriber;
    //......
    // 更新操作    @Override    protected void doUpdate(final List<SelectorData> dataList) {        dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);    }}

遍历数据,进入onSelectorSubscribe()方法。

  • PluginDataSubscriber.onSelectorSubscribe()

它没有其他逻辑,直接调用subscribeDataHandler()方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。

/** * 通用插件数据订阅者,负责处理所有插件、选择器和规则信息 * The type Common plugin data subscriber. */public class CommonPluginDataSubscriber implements PluginDataSubscriber {    //......     // 处理选择器数据    @Override    public void onSelectorSubscribe(final SelectorData selectorData) {        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);    }            // 订阅数据处理器,处理数据的更新或删除    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {        Optional.ofNullable(classData).ifPresent(data -> {            // 插件数据            if (data instanceof PluginData) {                PluginData pluginData = (PluginData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cachePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));                }            } else if (data instanceof SelectorData) {  // 选择器数据                SelectorData selectorData = (SelectorData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                     Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));                }            } else if (data instanceof RuleData) {  // 规则数据                RuleData ruleData = (RuleData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));                } else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));                }            }        });    }    }

那么新增一条选择器数据,会进入下面的逻辑:

// 将数据保存到网关内存BaseDataCache.getInstance().cacheSelectData(selectorData);// 如果每个插件还有自己的处理逻辑,那么就去处理                  Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

一是将数据保存到网关的内存中。BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

public final class BaseDataCache {    // 私有变量    private static final BaseDataCache INSTANCE = new BaseDataCache();    // 私有构造器    private BaseDataCache() {    }        /**     * Gets instance.     *  公开方法     * @return the instance     */    public static BaseDataCache getInstance() {        return INSTANCE;    }        /**    *  缓存选择器数据的Map     * pluginName -> SelectorData.     */    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();        public void cacheSelectData(final SelectorData selectorData) {        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);    }           /**     * cache selector data.     * 缓存选择器数据     * @param data the selector data     */    private void selectorAccept(final SelectorData data) {        String key = data.getPluginName();        if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入            List<SelectorData> existList = SELECTOR_MAP.get(key);            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());            resultList.add(data);            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());            SELECTOR_MAP.put(key, collect);        } else {  // 新增操作,直接放到Map中            SELECTOR_MAP.put(key, Lists.newArrayList(data));        }    }    }

二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。

经过以上的源码追踪,并通过一个实际的案例,在admin端新增一条选择器数据,就将websocket数据同步的流程分析清楚了。

我们还是用下面的一张图将网关端的数据同步流程串联一下:

数据同步的流程已经分析完了,但是还有一些问题没有分析到,就是网关是如何跟admin建立连接的?

4. 网关和admin建立websocket连接#

  • websocket配置

在网关的配置文件中有如下配置,并且引入了相关依赖,就会启动websocket相关服务。

shenyu:    file:      enabled: true    cross:      enabled: true    dubbo :      parameter: multi    sync:      websocket :  # 使用websocket进行数据同步        urls: ws://localhost:9095/websocket   # admin端的websocket地址        allowOrigin: ws://localhost:9195

在网关中引入websocket的依赖。

<!--shenyu data sync start use websocket--><dependency>    <groupId>org.apache.shenyu</groupId>    <artifactId>shenyu-spring-boot-starter-sync-data-websocket</artifactId>    <version>${project.version}</version></dependency>
  • Websocket数据同步配置

通过springboot的条件装配,创建相关的bean。在网关启动的时候,如果我们配置了shenyu.sync.websocket.urls,那么Websocket数据同步配置就会被加载。这里通过spring boot starter完成依赖的加载。


/** * Websocket数据同步配置 * 通过springboot实现条件注入 * Websocket sync data configuration for spring boot. */@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {
    /**     * Websocket sync data service.     * Websocket数据同步服务     * @param websocketConfig   the websocket config     * @param pluginSubscriber the plugin subscriber     * @param metaSubscribers   the meta subscribers     * @param authSubscribers   the auth subscribers     * @return the sync data service     */    // 创建websocketSyncDataService    @Bean    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        log.info("you use websocket sync shenyu data.......");        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));    }
    /**     * Config websocket config.     *     * @return the websocket config     */    @Bean    @ConfigurationProperties(prefix = "shenyu.sync.websocket")    public WebsocketConfig websocketConfig() {        return new WebsocketConfig();  // 创建WebsocketConfig    }}

在项目的resources/META-INF目录先新建spring.factories文件,在文件中指明配置类。

  • Websocket数据同步服务

WebsocketSyncDataService中做了如下几件事情:

  • 读取配置中的urls,这个表示admin端的同步地址,有多个的话,使用","分割;
  • 创建调度线程池,一个admin分配一个,用于执行定时任务;
  • 创建ShenyuWebsocketClient,一个admin分配一个,用于和admin建立websocket通信;
  • 开始和admin端的websocket 建立连接;
  • 执行定时任务,每隔10秒执行一次。主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。如果没有断开,就进行 ping-pong 检测。

/** * Websocket数据同步服务 * Websocket sync data service. */@Slf4jpublic class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
    private final List<WebSocketClient> clients = new ArrayList<>();
    private final ScheduledThreadPoolExecutor executor;
    /**     * Instantiates a new Websocket sync cache.     * 创建Websocket数据同步服务     * @param websocketConfig      the websocket config     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers  the meta data subscribers     * @param authDataSubscribers  the auth data subscribers     */    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,                                    final PluginDataSubscriber pluginDataSubscriber,                                    final List<MetaDataSubscriber> metaDataSubscribers,                                    final List<AuthDataSubscriber> authDataSubscribers) {        // admin端的同步地址,有多个的话,使用","分割        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");        // 创建调度线程池,一个admin分配一个        executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));        for (String url : urls) {            try {                //创建WebsocketClient,一个admin分配一个                clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));            } catch (URISyntaxException e) {                log.error("websocket url({}) is error", url, e);            }        }        try {            for (WebSocketClient client : clients) {                // 和websocket server建立连接                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);                if (success) {                    log.info("websocket connection is successful.....");                } else {                    log.error("websocket connection is error.....");                }
                // 执行定时任务,每隔10秒执行一次                // 主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。                // 如果没有断开,就进行 ping-pong 检测                executor.scheduleAtFixedRate(() -> {                    try {                        if (client.isClosed()) {                            boolean reconnectSuccess = client.reconnectBlocking();                            if (reconnectSuccess) {                                log.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());                            } else {                                log.error("websocket reconnection server[{}] is error.....", client.getURI().toString());                            }                        } else {                            client.sendPing();                            log.debug("websocket send to [{}] ping message successful", client.getURI().toString());                        }                    } catch (InterruptedException e) {                        log.error("websocket connect is error :{}", e.getMessage());                    }                }, 10, 10, TimeUnit.SECONDS);            }            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/        } catch (InterruptedException e) {            log.info("websocket connection...exception....", e);        }
    }
    @Override    public void close() {        // 关闭 websocket client        for (WebSocketClient client : clients) {            if (!client.isClosed()) {                client.close();            }        }        // 关闭线程池        if (Objects.nonNull(executor)) {            executor.shutdown();        }    }}
  • ShenyuWebsocketClient

ShenYu中创建的WebSocket客户端,用于和admin端通信。第一次成功建立连接后,同步全量数据,后续进行增量同步。


/** * 在ShenYu中自定义的WebSocket客户端 * The type shenyu websocket client. */@Slf4jpublic final class ShenyuWebsocketClient extends WebSocketClient {        private volatile boolean alreadySync = Boolean.FALSE;        private final WebsocketDataHandler websocketDataHandler;        /**     * Instantiates a new shenyu websocket client.     * 创建ShenyuWebsocketClient     * @param serverUri             the server uri  服务端uri     * @param pluginDataSubscriber the plugin data subscriber 插件数据订阅器     * @param metaDataSubscribers   the meta data subscribers 元数据订阅器     * @param authDataSubscribers   the auth data subscribers 认证数据订阅器     */    public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {        super(serverUri);        this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);    }
    // 成功建立连接后执行    @Override    public void onOpen(final ServerHandshake serverHandshake) {        // 防止重新建立连接时,再次执行,所以用alreadySync进行判断        if (!alreadySync) {            // 同步所有数据,MYSELF 类型            send(DataEventTypeEnum.MYSELF.name());            alreadySync = true;        }    }
    // 接收到消息后执行    @Override    public void onMessage(final String result) {        // 处理接收到的数据        handleResult(result);    }        // 关闭后执行    @Override    public void onClose(final int i, final String s, final boolean b) {        this.close();    }        // 失败后执行    @Override    public void onError(final Exception e) {        this.close();    }        @SuppressWarnings("ALL")    private void handleResult(final String result) {        // 数据反序列化        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);        // 哪种数据类型,插件、选择器、规则...        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());        // 哪种操作类型,更新、删除...        String eventType = websocketData.getEventType();        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // 处理数据        websocketDataHandler.executor(groupEnum, json, eventType);    }}

5. 总结#

本文通过一个实际案例,对websocket的数据同步原理进行了源码分析。涉及到的主要知识点如下:

  • websocket支持双向通信,性能好,推荐使用;
  • 通过Spring完成事件发布和监听;
  • 通过抽象DataChangedListener接口,支持多种同步策略,面向接口编程;
  • 使用工厂模式创建 WebsocketDataHandler,实现不同数据类型的处理;
  • 使用模板方法设计模式实现AbstractDataHandler,处理通用的操作类型;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过SpringBoot的条件装配和starter加载机制实现配置类的加载。

集成测试剖析

· One min read
Kunshuai Zhu
Apache ShenYu Committer

这篇文章将会对Apache ShenYu的集成测试进行深入剖析。

什么是集成测试?#

集成测试在一些项目里也叫E2E (End To End)测试,主要用于测试各个模块组装成一个系统后是否能符合预期。

Apache ShenYu将集成测试放在了持续集成中,利用GitHub Action,在每次向主分支提交Pull Request或是Merge时触发。这样可以大大降低项目的维护成本,提升Apache ShenYu的稳定性。

自动化的集成测试如何实现?#

Apache ShenYu中,集成测试的主要步骤体现在GitHub Action工作流的脚本中,如下所示,该脚本位于 ~/.github/workflows目录下。

name: iton:  pull_request:  push:    branches:      - masterjobs:  build:    strategy:      matrix:        case:          - shenyu-integrated-test-alibaba-dubbo          ...    runs-on: ubuntu-latest    steps:      - uses: actions/checkout@v2        with:          submodules: true      ...

下面我将从这个yaml文件出发,带你剖析整个自动化集成测试的流程。

工作流的触发#

由于我们在 on 中指定了 pull_requestpush.branch: master,那么当我们提交pull_request或是merge分支到master(push)的时候,就会触发这个工作流。

关于更多GitHub Action的用法,可以参考 GitHub Action 的文档,这里不会做详细的介绍。

初始化环境#

  • 拉取代码
- uses: actions/checkout@v2  with:        submodules: true
  • 设置跳过标志
- name: Set Skip Env Var      uses: ./.github/actions/skip-ci

当发生的是一些对功能无关的改动(如改动文档)时,会跳过集成测试,以节约资源。

  • 缓存maven依赖、安装Java
- name: Cache Maven Repos...- uses: actions/setup-java@v1

构建整个项目,同时构建docker镜像#

./mvnw -B clean install -Prelease,docker -Dmaven.javadoc.skip=true -Dmaven.test.skip=true

上面这行命令中,-P后面跟着release,docker,表示会激活pom文件中相关的profile配置。

而release和docker这两个profile,目前只在 shenyu-dist 下的几个子模块中存在。下面将以 shenyu-dist-admin 模块为例,介绍profile为release和docker的配置的具体内容。另外,集成测试只使用了这一步构建的 shenyu-admin 镜像。

  • 首先是release

    <profile>    <id>release</id>    <activation>        <activeByDefault>false</activeByDefault>    </activation>    <build>        <finalName>apache-shenyu-incubating-${project.version}</finalName>        <plugins>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-assembly-plugin</artifactId>                <executions>                    <execution>                        <id>admin-bin</id>                        <phase>package</phase>                        <goals>                            <goal>single</goal>                        </goals>                    </execution>                </executions>                <configuration>                    <descriptors>                        <descriptor>${project.basedir}/src/main/assembly/binary.xml</descriptor>                    </descriptors>                    <tarLongFileMode>posix</tarLongFileMode>                </configuration>            </plugin>        </plugins>    </build></profile>

    当-P后面跟着release时,就会激活上面的 maven-assembly-plugin 插件。而executions中将插件的执行时机绑定在了maven生命周期package中,这也就意味着,当我们执行 mvn install 的时候就会触发。

    configuration中指定了我们编写好的 binary.xmlmaven-assembly-plugin 插件将会按照这个文件,将需要的文件复制进来,并打包。你可以点击链接查看该文件:shenyu-dist/shenyu-admin-dist/src/main/assembly/binary.xml

    根据这个文件,插件会将其他模块下打包好的jar包、配置文件、启动脚本等“复制”过来,最终打成 tar.gz 格式的压缩包。

  • 然后是docker

    <profile>    <id>docker</id>    <activation>        <activeByDefault>false</activeByDefault>    </activation>    <build>        <plugins>            <plugin>                <groupId>com.spotify</groupId>                <artifactId>dockerfile-maven-plugin</artifactId>                <version>${dockerfile-maven-plugin.version}</version>                <executions>                    <execution>                        <id>tag-latest</id>                        <goals>                            <goal>build</goal>                        </goals>                        <configuration>                            <tag>latest</tag>                        </configuration>                    </execution>                    <execution>                        <id>tag-version</id>                        <goals>                            <goal>build</goal>                        </goals>                        <configuration>                            <tag>${project.version}</tag>                        </configuration>                    </execution>                </executions>                <configuration>                    <repository>apache/shenyu-admin</repository>                    <buildArgs>                        <APP_NAME>apache-shenyu-incubating-${project.version}-admin-bin</APP_NAME>                    </buildArgs>                </configuration>            </plugin>        </plugins>    </build></profile>

    类比上面的release,这里是激活 dockerfile-maven-plugin 插件。当 mvn install -Pdocker 时,插件就会利用我们编写好的dockerfile构建docker镜像。

需要注意的是,dockerfile-maven-plugin目前对aarch64架构的设备支持有限,在aarch64架构的机器上运行该插件时会出现如下错误。且在本人写这篇文章的时候已经很久没有维护,这意味着aarch64架构的设备使用这个插件的问题在短期内不会解决。

[ERROR] Failed to execute goal com.spotify:dockerfile-maven-plugin:1.4.6:build (tag-latest) on project shenyu-admin-dist: Could not build image: java.util.concurrent.ExecutionException: com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException: java.lang.UnsatisfiedLinkError: could not load FFI provider jnr.ffi.provider.jffi.Provider: ExceptionInInitializerError: Can't overwrite cause with java.lang.UnsatisfiedLinkError: java.lang.UnsatisfiedLinkError: /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: dlopen(/private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib, 1): no suitable image found.  Did find:[ERROR]         /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: no matching architecture in universal wrapper[ERROR]         /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: no matching architecture in universal wrapper...

这里有个临时的解决方案:

  1. 打开一个新的shell,输入如下命令,利用 socat 将 unix socket 路由到 tcp 端口

    socat TCP-LISTEN:2375,range=127.0.0.1/32,reuseaddr,fork UNIX-CLIENT:/var/run/docker.sock
  2. 设置环境变量

    export DOCKER_HOST=tcp://127.0.0.1:2375

构建examples模块#

- name: Build examples  if: env.SKIP_CI != 'true'  run: ./mvnw -B clean install -Pexample -Dmaven.javadoc.skip=true -Dmaven.test.skip=true -f ./shenyu-examples/pom.xml

因为考虑到release的需要,目前项目根目录下的pom文件中不饱含example子模块,所以上面这个步骤另外构建了examples模块。

与上面类似,这行命令也会利用maven的插件构建镜像,以供我们后续docker编排使用。

构建定制化网关#

- name: Build integrated tests  if: env.SKIP_CI != 'true'  run: ./mvnw -B clean install -Pit -DskipTests -f ./shenyu-integrated-test/pom.xml

为了细分Apache ShenYu的不同功能的集成测试,我们在这一步将构建集成测试模块定制的网关。所谓的“定制”就是在pom文件中引入需要的最少依赖,然后代替默认的 shenyu-bootstrap。与上面两个步骤类似,这一步也会构建出docker镜像。

值得注意的是,这里的打包构建的方式与 shenyu-dist 模块的有一些不同,你可以通过对比pom文件发现。

运行docker compose#

- name: Start docker compose  if: env.SKIP_CI != 'true'  run: docker-compose -f ./shenyu-integrated-test/${{ matrix.case }}/docker-compose.yml up -d

这一步将会根据集成测试模块下编写好的不同的 docker-compose.yml 文件,进行docker编排。

version: "3.9"services:  shenyu-zk:    container_name: shenyu-zk    image: zookeeper:3.5    ...  shenyu-redis:    image: redis:6.0-alpine    container_name: shenyu-redis    ...
  shenyu-examples-http:    deploy:      resources:        limits:          memory: 2048M    container_name: shenyu-examples-http    image: shenyu-examples-http:latest    ...
  shenyu-admin:    image: apache/shenyu-admin:latest    container_name: shenyu-admin    ...
  shenyu-integrated-test-http:    container_name: shenyu-integrated-test-http    image: apache/shenyu-integrated-test-http:latest    ...    depends_on:      shenyu-admin:        condition: service_healthy    healthcheck:      test: [ "CMD", "wget", "http://shenyu-integrated-test-http:9195/actuator/health" ]      timeout: 2s      retries: 30
networks:  shenyu:    name: shenyu

例如 shenyu-integrated-test-http 模块下的 docker-compose.yml,按顺序启动了zookeeper、redis、example、admin、网关等服务。其中,example、admin、网关的镜像是我们之前构建的。

其中,docker-compose利用 depends_on 确定了服务之间的拓扑关系,并且大部分服务都有相应的健康检查,待健康检查通过后才会启动下一个服务。

运行健康检查,等待docker-compose启动完毕#

- name: Wait for docker compose start up completely  if: env.SKIP_CI != 'true'  run: bash ./shenyu-integrated-test/${{ matrix.case }}/script/healthcheck.sh

在这一步,宿主机会运行 healthcheck.sh 这个脚本,然后利用 curl 命令访问各个服务列表(在services.list文件中)的健康状态接口 /actuator/health,一直到服务状态都为正常才会继续。

运行测试#

- name: Run test  id: test  if: env.SKIP_CI != 'true'  run: ./mvnw test -Pit -f ./shenyu-integrated-test/${{ matrix.case }}/pom.xml  continue-on-error: true

这一步就是利用maven test命令,逐个执行 /src/test/ 目录下的测试类。

查看Docker Compose日志#

- name: Check test result  if: env.SKIP_CI != 'true'  run: |    docker-compose -f ./shenyu-integrated-test/${{ matrix.case }}/docker-compose.yml logs --tail="all"    if [[ ${{steps.test.outcome}} == "failure" ]]; then      echo "Test Failed"      exit 1    else      echo "Test Successful"      exit 0    fi

当工作流出现错误时,docker compose的日志可以帮助我们更好的排查问题,所以在这一步我们将docker compose的日志打印出来。

e2e测试详解

· One min read
Haiqi Qin
Apache ShenYu Committer

这篇文章将会对Apache ShenYu的e2e模块进行深入剖析。

什么是e2e#

e2e(end to end),也叫端到端测试,是一种用于测试应用程序流是否从头到尾按设计执行的方法。 执行端到端测试的目的是识别系统依赖关系,并确保在各种系统组件和系统之间传递正确的信息。端到端测试的目的是测试 整个软件的依赖性、数据完整性以及与其他系统、接口和数据库的通信,以模拟完整的生产场景。

e2e的优势#

e2e测试能够模拟真实用户场景下测试软件系统的完整性和准确性,能够验证整个系统是否按照预期工作,以及不同组件是否能够协同工作。 e2e测试有以下几个好处:

  1. 帮助保证系统功能的正确性:e2e测试能够模拟真实用户场景下的交互和操作,验证整个系统是否能够按照预期工作,帮助发现系统中的潜在问题和缺陷。
  2. 提高测试覆盖率:e2e测试能够覆盖整个系统,包括前端、后端、数据库等不同层面和组件,从而提高测试覆盖率,保证测试的全面性和准确性。
  3. 保证系统的稳定性:E2E测试可以检查系统在各种情况下的稳定性和健壮性,包括系统的响应时间、错误处理能力、并发性等方面,帮助确保系统在面对高负载和异常情况时仍然能够保持稳定运行。
  4. 减少测试成本:e2e测试能够提高测试效率和准确性,减少测试成本和时间,从而帮助企业更快速地发布和交付高质量的软件产品。

总之,e2e测试是一种全面的测试方式,能够验证整个系统是否按照预期工作,提高测试覆盖率和测试效率,从而保证系统的稳定性和正确性,减少测试成本和时间,是一种非常重要和有效的测试方法,所以我们需要完善 e2e相关代码。

自动化e2e测试如何实现#

在Apache ShenYu中,e2e测试的主要步骤体现在GitHub Action工作流的脚本中,如下所示,该脚本位于 ~/.github/workflows目录下的e2e文件中。

name: e2e
on:  pull_request:  push:    branches:      - masterjobs:  changes:    ...  build-docker-images:    ...  e2e-http:    ...  e2e-case:    runs-on: ubuntu-latest    needs:      - changes      - build-docker-images    if: ${{ needs.changes.outputs.e2e == 'true' }}    strategy:      matrix:        case: [ "shenyu-e2e-case-spring-cloud", "shenyu-e2e-case-apache-dubbo", "shenyu-e2e-case-sofa" ]    steps:      - uses: actions/checkout@v3        with:          submodules: true      - name: Load ShenYu Docker Images        run: |          docker load --input /tmp/apache-shenyu-admin.tar          docker load --input /tmp/apache-shenyu-bootstrap.tar          docker image ls -a      - name: Build examples with Maven        run: ./mvnw -B clean install -Pexample -Dmaven.javadoc.skip=true -Dmaven.test.skip=true -f ./shenyu-examples/pom.xml      - name: Run ShenYu E2E Tests        env:          storage: mysql        run: |          bash ./shenyu-e2e/script/storage_init.sh          ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/${{ matrix.case }} -Dstorage=mysql test

当工作流触发时,使用shenyu-dist模块下的dockerfile文件构建admin与bootstrap项目的镜像并上传,当e2e测试模块运行时可以加载admin与bootstrap镜像。紧接着构建examples中的模块,最后执行对应测试模块的测试方法。

本地如何运行e2e测试#

如果需要编写e2e测试用例,首先需要在本地编码并调试。目前e2e支持两种启动方式,一个是docker启动,另一个是host启动。这两种模式可以通过在测试类中的@ShenYuTest注解中切换。host启动方式直接在本地将需要启动的服务直接启动即可运行测试代码。采用docker进行启动前,需要在先构建出相应镜像。因为ShenYu目前需要支持在github工作流进行e2e测试,建议采用docker启动方式。

e2e启动流程剖析#

目前e2e模块主要分为四个部分,分别为:case、client、common以及engine。

e2e-modules

case模块存放插件的测试用例,client模块编写了admin与gateway的客户端,以便请求对应接口。common存放一些公共类,engine模块是框架的核心,依托testcontainer框架利用java代码启动docker容器并完成对admin以及gatewat的配置操作。

接下来我将依托源码对e2e启动流程进行剖析。

当我们执行case中的测试方法时,@ShenYuTest注解将会生效,对测试类进行扩展。通过@ShenYuTest,我们可以选择启动方法、对admin以及gateway配置相关参数,以及选择将要执行的docker-compose文件。对于admin以及gateway,可以配置登陆所需的用户名、密码、数据同步方式以及修改yaml的内容。

@ShenYuTest(        mode = ShenYuEngineConfigure.Mode.DOCKER,        services = {                @ShenYuTest.ServiceConfigure(                        serviceName = "admin",                        port = 9095,                        baseUrl = "http://{hostname:localhost}:9095",                        parameters = {                                @ShenYuTest.Parameter(key = "username", value = "admin"),                                @ShenYuTest.Parameter(key = "password", value = "123456"),                                @ShenYuTest.Parameter(key = "dataSyn", value = "admin_websocket")                        }                ),                @ShenYuTest.ServiceConfigure(                        serviceName = "gateway",                        port = 9195,                        baseUrl = "http://{hostname:localhost}:9195",                         type = ShenYuEngineConfigure.ServiceType.SHENYU_GATEWAY,                        parameters = {                          @ShenYuTest.Parameter(key = "application", value =  "spring.cloud.discovery.enabled:true,eureka.client.enabled:true"),                           @ShenYuTest.Parameter(key = "dataSyn", value = "gateway_websocket")})},           dockerComposeFile = "classpath:./docker-compose.mysql.yml")

@ShenYuTest通过ShenYuExtension类进行扩展,对admin与gateway的配置在ShenYuExtension中的beforeAll中生效。具体的生效逻辑在DockerServiceCompose类中实现。

e2e-shenyutest

e2e-beforeall

@ShenYuTest配置项在docker启动前生效,主要通过修改测试模块中resource目录下的yaml文件。目前e2e支持对不同数据同步方式进行测试,其原理就是通过DockerServiceCompose类中的chooseDataSyn方法。在DataSyncHandler中对各种数据同步方式需要修改的内容进行初始化,最后启动container。

e2e-docer-service-compose

e2e-datahandle-syn

当docker启动完后,开始对插件功能进行测试。在PluginsTest类中,有针对测试进行的前置以及后置操作。

    @BeforeAll    static void setup(final AdminClient adminClient, final GatewayClient gatewayClient) throws InterruptedException, JsonProcessingException {        adminClient.login();        Thread.sleep(10000);        List<SelectorDTO> selectorDTOList = adminClient.listAllSelectors();        List<MetaDataDTO> metaDataDTOList = adminClient.listAllMetaData();        List<RuleDTO> ruleDTOList = adminClient.listAllRules();        Assertions.assertEquals(2, selectorDTOList.size());        Assertions.assertEquals(13, metaDataDTOList.size());        Assertions.assertEquals(14, ruleDTOList.size());                for (SelectorDTO selectorDTO : selectorDTOList) {            if (selectorDTO.getHandle() != null && !"".equals(selectorDTO.getHandle())) {                SpringCloudPluginCases.verifierUri(selectorDTO.getHandle());            }        }                List<MetaData> metaDataCacheList = gatewayClient.getMetaDataCache();        List<SelectorCacheData> selectorCacheList = gatewayClient.getSelectorCache();        List<RuleCacheData> ruleCacheList = gatewayClient.getRuleCache();        Assertions.assertEquals(2, selectorCacheList.size());        Assertions.assertEquals(13, metaDataCacheList.size());        Assertions.assertEquals(14, ruleCacheList.size());
        MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();        formData.add("id", "8");        formData.add("name", "springCloud");        formData.add("enabled", "true");        formData.add("role", "Proxy");        formData.add("sort", "200");        adminClient.changePluginStatus("8", formData);        String id = "";        for (SelectorDTO selectorDTO : selectorDTOList) {            if (!"".equals(selectorDTO.getHandle())) {                id = selectorDTO.getId();            }        }        adminClient.deleteSelectors(id);        selectorDTOList = adminClient.listAllSelectors();        Assertions.assertEquals(1, selectorDTOList.size());    }

以springcloud插件为例,首先需要测试注册中心以及数据同步能否正常工作,接着启动插件并删除已存在的选择器。测试数据是否成功注册进注册中心,可以调用admin客户端的接口进行测试,测试数据同步是否成功,可以获取gateway的缓存进行测试。

接着运行case文件中的测试用例,通过@ShenYuScenario获取用例。

    @ShenYuScenario(provider = SpringCloudPluginCases.class)    void testSpringCloud(GatewayClient gateway, CaseSpec spec) {        spec.getVerifiers().forEach(verifier -> verifier.verify(gateway.getHttpRequesterSupplier().get()));    }

针对不同的插件,我们可以构建Case类,存放要测试的规则。所有的测试规则存放进list中,按顺序进行测试。beforeEachSpec中进行构建选择器与规则,caseSpec存放测试实体,如果符合uri规则的应存在,否则不存在。我们需要模拟用户对选择器和规则进行新增,因为各个插件的选择器的handler规则不一定相同,所以我们需要根据插件需求去编写其handle类。并通过请求验证其符合规则。具体测试用例主要分为两大类,一类是对uri规则进行匹配,比如euqal、path_pattern、start_with、end_with,一类是请求类型,比如get、put、post、delete。

当八种匹配情况都测试通过后,可以判断该插件功能正常,我们在测试结束后需要恢复环境,将所有的选择器删除,将该插件设置为不可用,最后关闭所有容器。

    @Override    public List<ScenarioSpec> get() {        return Lists.newArrayList(                testWithUriEquals(),                testWithUriPathPattern(),                testWithUriStartWith(),                testWithEndWith(),                testWithMethodGet(),                testWithMethodPost(),                testWithMethodPut(),                testWithMethodDelete()        );    }
    private ShenYuScenarioSpec testWithUriEquals() {        return ShenYuScenarioSpec.builder()                .name("single-spring-cloud uri =]")                .beforeEachSpec(                        ShenYuBeforeEachSpec.builder()                                .addSelectorAndRule(                                        newSelectorBuilder("selector", Plugin.SPRING_CLOUD)                                               .handle(SpringCloudSelectorHandle.builder().serviceId("springCloud-test")                                                        .gray(true)                                                        .divideUpstreams(DIVIDE_UPSTREAMS)                                                        .build())                                                .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.EQUAL, TEST))                                                .build(),                                        newRuleBuilder("rule")                               .handle(SpringCloudRuleHandle.builder().loadBalance("hash").timeout(3000).build())                                                .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.EQUAL, TEST))                                                .build()                                )                                .checker(notExists(TEST))                                .waiting(exists(TEST))                                .build()                )                .caseSpec(                        ShenYuCaseSpec.builder()                                .addExists(TEST)                                .addNotExists("/springcloud/te")                                .addNotExists("/put")                                .addNotExists("/get")                                .build()                )                .afterEachSpec(ShenYuAfterEachSpec.DEFAULT)                .build();    }

ZooKeeper数据同步源码分析

· One min read
Apache ShenYu Committer

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于ZooKeeper的数据同步源码分析。

本文基于shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. 关于ZooKeeper#

Apache ZooKeeperApache软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。ZooKeeper节点将它们的数据存储于一个分层的名字空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。

2. Admin数据同步#

我们从一个实际案例进行源码追踪,比如在后台管理系统中,对Divide插件中的一条选择器数据进行更新,将权重更新为90:

2.1 接收数据#

  • SelectorController.createSelector()

进入SelectorController类中的updateSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController {        @PutMapping("/{id}")    public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) {        // 设置当前选择器数据id        selectorDTO.setId(id);        // 创建或更新操作        Integer updateCount = selectorService.createOrUpdate(selectorDTO);        // 返回结果信息        return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount);    }        // ......}

2.2 处理数据#

  • SelectorServiceImpl.createOrUpdate()

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService {    // 负责事件发布的eventPublisher    private final ApplicationEventPublisher eventPublisher;        @Override    @Transactional(rollbackFor = Exception.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        // 构建数据 DTO --> DO        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        // 判断是添加还是更新        if (StringUtils.isEmpty(selectorDTO.getId())) {            // 插入选择器数据            selectorCount = selectorMapper.insertSelective(selectorDO);            // 插入选择器中的条件数据            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });            // check selector add            // 权限检查            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());                dataPermissionDTO.setDataId(selectorDO.getId());                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));            }
        } else {            // 更新数据,先删除再新增            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        // 发布事件        publishEvent(selectorDO, selectorConditionDTOs);
        // 更新upstream        updateDivideUpstream(selectorDO);        return selectorCount;    }            // ......    }

Serrvice类完成数据的持久化操作,即保存数据到数据库,这个比较简单,就不深入追踪了。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会执行数据同步。

publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        // 找到选择器对应的插件        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        // 构建条件数据        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // 发布变更数据        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者;

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {//......}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

2.3 分发数据#

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * 有数据变更时,调用此方法     * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                case APP_AUTH: // 认证信息                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // 插件信息                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // 规则信息                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // 元数据                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // 其他类型,抛出异常                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    }

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:

  • WebsocketDataChangedListener:基于websocket的数据同步;
  • ZookeeperDataChangedListener:基于zookeeper的数据同步;
  • ConsulDataChangedListener:基于consul的数据同步;
  • EtcdDataDataChangedListener:基于etcd的数据同步;
  • HttpLongPollingDataChangedListener:基于http长轮询的数据同步;
  • NacosDataChangedListener:基于nacos的数据同步;

既然有这么多种实现策略,那么如何确定使用哪一种呢?

因为本文是基于Zookeeper的数据同步源码分析,所以这里以ZookeeperDataChangedListener为例,分析它是如何被加载并实现的。

通过在源码工程中进行全局搜索,可以看到,它的实现是在DataSyncConfiguration类完成的。

/** * 数据同步配置类 * 通过springboot条件装配实现 * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {            /**     * zookeeper数据同步     * The type Zookeeper listener.     */    @Configuration    @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url")  // 条件属性,满足才会被加载    @Import(ZookeeperConfiguration.class)    static class ZookeeperListener {
        /**         * Config event listener data changed listener.         * 创建Zookeeper数据变更监听器         * @param zkClient the zk client         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(ZookeeperDataChangedListener.class)        public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {            return new ZookeeperDataChangedListener(zkClient);        }
        /**         * Zookeeper data init zookeeper data init.         *  创建 Zookeeper 数据初始化类         * @param zkClient        the zk client         * @param syncDataService the sync data service         * @return the zookeeper data init         */        @Bean        @ConditionalOnMissingBean(ZookeeperDataInit.class)        public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {            return new ZookeeperDataInit(zkClient, syncDataService);        }    }        //省略了其他代码......}

这个配置类是通过SpringBoot条件装配类实现的。在ZookeeperListener类上面有几个注解:

  • @Configuration:配置文件,应用上下文;

  • @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url"):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用zookeeper进行数据同步。

    shenyu:    sync:     zookeeper:          url: localhost:2181          sessionTimeout: 5000          connectionTimeout: 2000
  • @Import(ZookeeperConfiguration.class):导入另一个类ZookeeperConfiguration

  @EnableConfigurationProperties(ZookeeperProperties.class)  // 启用zk属性配置类  public class ZookeeperConfiguration {
    /**     * register zkClient in spring ioc.     * 向 Spring IOC 容器注册 zkClient     * @param zookeeperProp the zookeeper configuration     * @return ZkClient {@linkplain ZkClient}        */      @Bean      @ConditionalOnMissingBean(ZkClient.class)      public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {        return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout()); // 读取zk配置信息,并创建zkClient      }  }
@Data@ConfigurationProperties(prefix = "shenyu.sync.zookeeper") // zk属性配置public class ZookeeperProperties {
    private String url;
    private Integer sessionTimeout;
    private Integer connectionTimeout;
    private String serializer;}

当我们主动配置,采用zookeeper进行数据同步时,zookeeperDataChangedListener就会生成。所以在事件处理方法onApplicationEvent()中,就会到相应的listener中。在我们的案例中,是对一条选择器数据进行更新,数据同步采用的是zookeeper,所以,代码会进入到ZookeeperDataChangedListener进行选择器数据变更处理。

    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                                    // 省略了其他逻辑                                    case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // 在我们的案例中,会进入到ZookeeperDataChangedListener进行选择器数据变更处理                    break;         }    }

2.4 Zookeeper数据变更监听器#

  • ZookeeperDataChangedListener.onSelectorChanged()

    onSelectorChanged()方法中,判断操作类型,是刷新同步还是更新或创建同步。根据当前选择器数据信息判断节点是否在zk中。


/** * 使用 zookeeper 发布变更数据 */public class ZookeeperDataChangedListener implements DataChangedListener {        // 选择器信息发生改变    @Override    public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {        // 刷新操作        if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {            String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());            deleteZkPathRecursive(selectorParentPath);        }        // 发生变更的数据        for (SelectorData data : changed) {            // 构建选择器数据的真实路径            String selectorRealPath = DefaultPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());            // 如果是删除操作            if (eventType == DataEventTypeEnum.DELETE) {                // 删除当前数据                deleteZkPath(selectorRealPath);                continue;            }            // 父节点路径            String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(data.getPluginName());            // 创建父节点            createZkNode(selectorParentPath);            // 插入或更新数据            insertZkNode(selectorRealPath, data);        }    }
    // 创建 zk 节点    private void createZkNode(final String path) {        // 不存在才创建        if (!zkClient.exists(path)) {            zkClient.createPersistent(path, true);        }    }
    // 插入zk节点    private void insertZkNode(final String path, final Object data) {        // 创建节点        createZkNode(path);        // 通过 zkClient 写入数据        zkClient.writeData(path, null == data ? "" : GsonUtils.getInstance().toJson(data));    }    }

只要将变动的数据正确写入到zk的节点上,admin这边的操作就执行完成了。ShenYu在使用zk进行数据同步时,zk的节点是通过精心设计的。

在我们当前的案例中,对Divide插件中的一条选择器数据进行更新,将权重更新为90,就会对图中的特定节点更新。

我们用时序图将上面的更新流程串联起来。

3. 网关数据同步#

假设ShenYu网关已经在正常运行,使用的数据同步方式也是zookeeper。那么当在admin端更新选择器数据后,并且向zk发送了变更的数据,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。

3.1 ZkClient接收数据#

  • ZkClient.subscribeDataChanges()

在网关端有一个ZookeeperSyncDataService类,它通过ZkClient订阅了数据节点,当数据发生变更时,可以感知到。

/** * 使用 zookeeper 缓存数据 */public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {    private void subscribeSelectorDataChanges(final String path) {       // zkClient订阅数据节点        zkClient.subscribeDataChanges(path, new IZkDataListener() {            @Override            public void handleDataChange(final String dataPath, final Object data) {                cacheSelectorData(GsonUtils.getInstance().fromJson(data.toString(), SelectorData.class)); // 节点数据被更新            }
            @Override            public void handleDataDeleted(final String dataPath) {                unCacheSelectorData(dataPath);  // 节点数据被删除            }        });    }     // 省略了其他逻辑}

ZooKeeperWatch机制,会给订阅的客户端发送节点变更通知。在我们的案例中,更新选择器信息,就会进入到handleDataChange()方法。通过cacheSelectorData()去处理数据。

3.2 处理数据#

  • ZookeeperSyncDataService.cacheSelectorData()

经过判空逻辑之后,缓存选择器数据的操作又交给了PluginDataSubscriber处理。

    private void cacheSelectorData(final SelectorData selectorData) {        Optional.ofNullable(selectorData)                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));    }

PluginDataSubscriber是一个接口,它只有一个CommonPluginDataSubscriber实现类,负责处理插件、选择器和规则数据。

3.3 通用插件数据订阅者#

  • PluginDataSubscriber.onSelectorSubscribe()

它没有其他逻辑,直接调用subscribeDataHandler()方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。

/** * 通用插件数据订阅者,负责处理所有插件、选择器和规则信息 * The type Common plugin data subscriber. */public class CommonPluginDataSubscriber implements PluginDataSubscriber {    //......     // 处理选择器数据    @Override    public void onSelectorSubscribe(final SelectorData selectorData) {        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);    }            // 订阅数据处理器,处理数据的更新或删除    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {        Optional.ofNullable(classData).ifPresent(data -> {            // 插件数据            if (data instanceof PluginData) {                PluginData pluginData = (PluginData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cachePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));                }            } else if (data instanceof SelectorData) {  // 选择器数据                SelectorData selectorData = (SelectorData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));                }            } else if (data instanceof RuleData) {  // 规则数据                RuleData ruleData = (RuleData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));                } else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));                }            }        });    }    }

3.4 数据缓存到内存#

那么更新一条选择器数据,会进入下面的逻辑:

// 将数据保存到网关内存BaseDataCache.getInstance().cacheSelectData(selectorData);// 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

一是将数据保存到网关的内存中。BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

public final class BaseDataCache {    // 私有变量    private static final BaseDataCache INSTANCE = new BaseDataCache();    // 私有构造器    private BaseDataCache() {    }        /**     * Gets instance.     *  公开方法     * @return the instance     */    public static BaseDataCache getInstance() {        return INSTANCE;    }        /**    *  缓存选择器数据的Map     * pluginName -> SelectorData.     */    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();        public void cacheSelectData(final SelectorData selectorData) {        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);    }           /**     * cache selector data.     * 缓存选择器数据     * @param data the selector data     */    private void selectorAccept(final SelectorData data) {        String key = data.getPluginName();        if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入            List<SelectorData> existList = SELECTOR_MAP.get(key);            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());            resultList.add(data);            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());            SELECTOR_MAP.put(key, collect);        } else {  // 新增操作,直接放到Map中            SELECTOR_MAP.put(key, Lists.newArrayList(data));        }    }    }

二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。

经过以上的源码追踪,并通过一个实际的案例,在admin端新增更新一条选择器数据,就将zookeeper数据同步的流程分析清楚了。

我们还是通过时序图将网关端的数据同步流程串联一下:

数据同步的流程已经分析完了,为了不让同步流程被打断,在分析过程中就忽略了其他逻辑。我们还需要分析Admin同步数据初始化和网关同步操作初始化的流程。

4. Admin同步数据初始化#

admin启动后,会将当前的数据信息全量同步到zk中,实现逻辑如下:


/** * Zookeeper 数据初始化 */public class ZookeeperDataInit implements CommandLineRunner {
    private final ZkClient zkClient;
    private final SyncDataService syncDataService;
    /**     * Instantiates a new Zookeeper data init.     *     * @param zkClient        the zk client     * @param syncDataService the sync data service     */    public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {        this.zkClient = zkClient;        this.syncDataService = syncDataService;    }
    @Override    public void run(final String... args) {        String pluginPath = DefaultPathConstants.PLUGIN_PARENT;        String authPath = DefaultPathConstants.APP_AUTH_PARENT;        String metaDataPath = DefaultPathConstants.META_DATA;        // 判断zk中是否存在数据        if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {            syncDataService.syncAll(DataEventTypeEnum.REFRESH);        }    }}

判断zk中是否存在数据,如果不存在,则进行同步。

ZookeeperDataInit实现了CommandLineRunner接口。它是springboot提供的接口,会在所有 Spring Beans初始化之后执行run()方法,常用于项目中初始化的操作。

  • SyncDataService.syncAll()

从数据库查询数据,然后进行全量数据同步,所有的认证信息、插件信息、选择器信息、规则信息和元数据信息。主要是通过eventPublisher发布同步事件。这里就跟前面提到的同步逻辑就又联系起来了,eventPublisher通过publishEvent()发布完事件后,有ApplicationListener执行事件变更操作,在ShenYu中就是前面提到的DataChangedEventDispatcher

@Servicepublic class SyncDataServiceImpl implements SyncDataService {    // 事件发布    private final ApplicationEventPublisher eventPublisher;         /***     * 全量数据同步     * @param type the type     * @return     */    @Override    public boolean syncAll(final DataEventTypeEnum type) {        // 同步认证信息        appAuthService.syncData();        // 同步插件信息        List<PluginData> pluginDataList = pluginService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));        // 同步选择器信息        List<SelectorData> selectorDataList = selectorService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));        // 同步规则信息        List<RuleData> ruleDataList = ruleService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));        // 同步元数据信息        metaDataService.syncData();        return true;    }    }

5. 网关同步操作初始化#

网关这边的数据同步初始化操作主要是订阅zk中的节点,当有数据变更时,收到变更数据。这依赖于ZooKeeperWatch机制。在ShenYu中,负责zk数据同步的是ZookeeperSyncDataService,也在前面提到过。

ZookeeperSyncDataService的功能逻辑是在实例化的过程中完成的:对zk中的shenyu数据同步节点完成订阅。这里的订阅分两类,一类是已经存在的节点上面数据发生更新,这通过zkClient.subscribeDataChanges()方法实现;另一类是当前节点下有新增或删除节点,即子节点发生变化,这通过zkClient.subscribeChildChanges()方法实现。

ZookeeperSyncDataService的代码有点多,这里我们以插件数据的读取和订阅进行追踪,其他类型的数据操作原理是一样的。


/** *  zookeeper 数据同步服务 */public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {    // 在实例化的时候,完成从zk中读取数据的操作,并订阅节点    public ZookeeperSyncDataService( /*省略构造参数参数*/ ) {        this.zkClient = zkClient;        this.pluginDataSubscriber = pluginDataSubscriber;        this.metaDataSubscribers = metaDataSubscribers;        this.authDataSubscribers = authDataSubscribers;        // 订阅插件、选择器和规则数据        watcherData();        // 订阅认证数据        watchAppAuth();        // 订阅元数据        watchMetaData();    }        private void watcherData() {        // 插件节点路径        final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;        // 所有插件节点        List<String> pluginZKs = zkClientGetChildren(pluginParent);        for (String pluginName : pluginZKs) {            // 订阅当前所有插件、选择器和规则数据            watcherAll(pluginName);        }        // 订阅子节点(新增或删除一个插件)        zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {            if (CollectionUtils.isNotEmpty(currentChildren)) {                for (String pluginName : currentChildren) {                    // 需要订阅子节点的所有插件、选择器和规则数据                    watcherAll(pluginName);                }            }        });    }        private void watcherAll(final String pluginName) {        // 订阅插件数据        watcherPlugin(pluginName);        // 订阅选择器数据        watcherSelector(pluginName);        // 订阅规则数据        watcherRule(pluginName);    }
    private void watcherPlugin(final String pluginName) {        // 当前插件路径        String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);        // 是否存在,不存在就创建        if (!zkClient.exists(pluginPath)) {            zkClient.createPersistent(pluginPath, true);        }        // 读取zk上当前节点数据,并反序列化        PluginData pluginData = null == zkClient.readData(pluginPath) ? null                : GsonUtils.getInstance().fromJson((String) zkClient.readData(pluginPath), PluginData.class);        // 缓存到网关内存中        cachePluginData(pluginData);        // 订阅插件节点        subscribePluginDataChanges(pluginPath, pluginName);    }       private void cachePluginData(final PluginData pluginData) {       // 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来    }        private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {        // 订阅数据变更:更新或删除        zkClient.subscribeDataChanges(pluginPath, new IZkDataListener() {
            @Override            public void handleDataChange(final String dataPath, final Object data) {  // 更新操作                 // 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来            }
            @Override            public void handleDataDeleted(final String dataPath) {   // 删除操作                  // 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来
            }        });    }    }    

上面的源代码中都给出了注释,相信大家可以看明白。订阅插件数据的主要逻辑如下:

  1. 构造当前插件路径
  2. 路径是否存在,不存在就创建
  3. 读取zk上当前节点数据,并反序列化
  4. 插件数据缓存到网关内存中
  5. 订阅插件节点

6. 总结#

本文通过一个实际案例,对zookeeper的数据同步原理进行了源码分析。涉及到的主要知识点如下:

  • 基于zookeeper的数据同步,主要是通过watch机制实现;
  • 通过Spring完成事件发布和监听;
  • 通过抽象DataChangedListener接口,支持多种同步策略,面向接口编程;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过SpringBoot的条件装配和starter加载机制实现配置类的加载。

扩展插件加载逻辑

· One min read
hql0312 Coder

本文基于shenyu-2.6.1版本进行源码分析.

正文

Shenyu 提供了一种机制来定制自己的插件或是修改已有的插件,在其内部通过extPlugin的配置实现,其需要满足以下两点:

  1. 实现接口 ShenyuPlugin 或是 PluginDataHandler
  2. 将实现的包打包后,放置于shenyu.extPlugin.path对应的路径下

入口#

真正实现该逻辑的类是ShenyuLoaderService,接下来看下该类是如何处理

    public ShenyuLoaderService(final ShenyuWebHandler webHandler, final CommonPluginDataSubscriber subscriber, final ShenyuConfig shenyuConfig) {        // 插件信息的信息订阅        this.subscriber = subscriber;        // Shenyu封装的WebHandler,包含了所有的插件逻辑        this.webHandler = webHandler;        // 配置信息        this.shenyuConfig = shenyuConfig;        // 扩展插件的配置信息,如路径,是否启用、开启多少线程来处理、检查加载的频率等信息        ExtPlugin config = shenyuConfig.getExtPlugin();        // 如果启用的,则创建定时任务来检查并加载        if (config.getEnabled()) {            // 创建一个指定线程名称的定时任务            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(config.getThreads(), ShenyuThreadFactory.create("plugin-ext-loader", true));            // 创建固定频率执行的任务,默认在30s,每300s,执行一次            executor.scheduleAtFixedRate(() -> loadExtOrUploadPlugins(null), config.getScheduleDelay(), config.getScheduleTime(), TimeUnit.SECONDS);        }    }    

该类有以下几个属性:

webHandler: 该类是shenyu 处理请求的入口,引用了所有的插件数据,在扩展插件加载后,需要进行更新

subscriber: 该类是插件的订阅的入口,引用了所有插件的订阅处理类,在扩展配置加载后,也需要进行同步更新

executor: 在ShenyuLoaderService内部会创建一个定时任务,来定时扫描加载指定路径下的jar包,便于加载扩展的插件,实现动态发现 默认会在启动30秒后,每300秒扫描一次

同时这里可以通过 shenyu.extPlugin.enabled配置来决定是否要开启扩展插件功能的启用

以上的配置可以在配置文件中进行调整:

shenyu:  extPlugin:    path:   # 扩展插件的存储目录    enabled: true # 是否启用扩展功能    threads: 1 # 扫描加载的线程数    scheduleTime: 300 # 任务执行的频率    scheduleDelay: 30 # 任务启动后多久开始执行

接下来看下加载的逻辑:

   public void loadExtOrUploadPlugins(final PluginData uploadedJarResource) {        try {            List<ShenyuLoaderResult> plugins = new ArrayList<>();            // 获取ShenyuPluginClassloader的持有对象            ShenyuPluginClassloaderHolder singleton = ShenyuPluginClassloaderHolder.getSingleton();            if (Objects.isNull(uploadedJarResource)) {                // 参数为空,则从扩展的目录,加载所有的jar包                // PluginJar:包含ShenyuPlugin接口、PluginDataHandler接口的数据                List<PluginJarParser.PluginJar> uploadPluginJars = ShenyuExtPathPluginJarLoader.loadExtendPlugins(shenyuConfig.getExtPlugin().getPath());                // 遍历所有的待加载插件                for (PluginJarParser.PluginJar extPath : uploadPluginJars) {                    LOG.info("shenyu extPlugin find new {} to load", extPath.getAbsolutePath());                    // 使用扩展插件的加载器来加载指定的插件,便于后续的加载和卸载                    ShenyuPluginClassLoader extPathClassLoader = singleton.createPluginClassLoader(extPath);                    // 使用ShenyuPluginClassLoader 进行加载                    // 主要逻辑是:判断是否实现ShenyuPlugin接口、PluginDataHandler接口 或是否标识 @Component\@Service等注解,如果有,则注册为SpringBean                    // 构造 ShenyuLoaderResult对象                    plugins.addAll(extPathClassLoader.loadUploadedJarPlugins());                }            } else {                // 加载指定jar,逻辑同加载全部                PluginJarParser.PluginJar pluginJar = PluginJarParser.parseJar(Base64.getDecoder().decode(uploadedJarResource.getPluginJar()));                LOG.info("shenyu upload plugin jar find new {} to load", pluginJar.getJarKey());                ShenyuPluginClassLoader uploadPluginClassLoader = singleton.createPluginClassLoader(pluginJar);                plugins.addAll(uploadPluginClassLoader.loadUploadedJarPlugins());            }            // 将扩展的插件,加入到ShenyuWebHandler的插件列表,后续的请求则会经过加入的插件内容            loaderPlugins(plugins);        } catch (Exception e) {            LOG.error("shenyu plugins load has error ", e);        }    }

该方法处理的逻辑:

  1. 判断参数uploadedJarResource是否有值,如果没有,则会加载全部,否则加载指定资源jar包进行处理
  2. shenyu.extPlugin.path 中获取到指定jar包,并封装成 PluginJar对象,该对象包含了jar包以下信息
    • version: 版本信息
    • groupId:包的groupId
    • artifactId: 包的 artifactId
    • absolutePath: 绝对路径
    • clazzMap:class对应的字节码
    • resourceMap:jar包的字节码
  3. 通过ShenyuPluginClassloaderHolder创建对应的ClassLoader,对应的类是ShenyuPluginClassLoader, 并进行加载对应的类
    • 调用ShenyuPluginClassLoader.loadUploadedJarPlugins 加载对应的类并注册成Spring Bean,这样可以使用Spring容器来管理
  4. 调用loaderPlugins方法,将扩展的插件更新到 webHandler 以及 subscriber

插件注册#

对于提供的jar包里的内容,加载器只会处理指定接口类型的类,实现逻辑在 ShenyuPluginClassLoader.loadUploadedJarPlugins() 方法

public List<ShenyuLoaderResult> loadUploadedJarPlugins() {        List<ShenyuLoaderResult> results = new ArrayList<>();        // 所有的类映射关系        Set<String> names = pluginJar.getClazzMap().keySet();        // 遍历所有的类        names.forEach(className -> {            Object instance;            try {                // 尝试创建对象,如果可以,则加入到Spring容器中                instance = getOrCreateSpringBean(className);                if (Objects.nonNull(instance)) {                    // 构建ShenyuLoaderResult对象                    results.add(buildResult(instance));                    LOG.info("The class successfully loaded into a upload-Jar-plugin {} is registered as a spring bean", className);                }            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {                LOG.warn("Registering upload-Jar-plugins succeeds spring bean fails:{}", className, e);            }        });        return results;    }

该方法就是负责构建所有符合条件的对象,并封装成 ShenyuLoaderResult对象,该对象对于创建后对象,进行了封装,会在方法 buildResult()中进行处理

    private ShenyuLoaderResult buildResult(final Object instance) {        ShenyuLoaderResult result = new ShenyuLoaderResult();        // 创建的对象是否实现了ShenyuPlugin        if (instance instanceof ShenyuPlugin) {            result.setShenyuPlugin((ShenyuPlugin) instance);            // 创建的对象是否实现了PluginDataHandler        } else if (instance instanceof PluginDataHandler) {            result.setPluginDataHandler((PluginDataHandler) instance);        }        return result;    }

同时进入方法 getOrCreateSpringBean() 进一步分析

    private <T> T getOrCreateSpringBean(final String className) throws ClassNotFoundException, IllegalAccessException, InstantiationException {        // 确认是否已经注册过了,如果有则不处理,直接返回        if (SpringBeanUtils.getInstance().existBean(className)) {            return SpringBeanUtils.getInstance().getBeanByClassName(className);        }        lock.lock();        try {            // Double check,            T inst = SpringBeanUtils.getInstance().getBeanByClassName(className);            if (Objects.isNull(inst)) {                // 使用 ShenyuPluginClassLoader 进行加载类                Class<?> clazz = Class.forName(className, false, this);                //Exclude ShenyuPlugin subclass and PluginDataHandler subclass                // without adding @Component @Service annotation                // 确认是否是 ShenyuPlugin 或是 PluginDataHandler的子类                boolean next = ShenyuPlugin.class.isAssignableFrom(clazz)                        || PluginDataHandler.class.isAssignableFrom(clazz);                if (!next) {                    // 如果不是,确认是否标识了 @Component 与 @Service 注解                    Annotation[] annotations = clazz.getAnnotations();                    next = Arrays.stream(annotations).anyMatch(e -> e.annotationType().equals(Component.class)                            || e.annotationType().equals(Service.class));                }                if (next) {                    // 如果符合以上内容,则注册Bean                    GenericBeanDefinition beanDefinition = new GenericBeanDefinition();                    beanDefinition.setBeanClassName(className);                    beanDefinition.setAutowireCandidate(true);                    beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);                    // 注册bean                    String beanName = SpringBeanUtils.getInstance().registerBean(beanDefinition, this);                    // 创建对象                    inst = SpringBeanUtils.getInstance().getBeanByClassName(beanName);                }            }            return inst;        } finally {            lock.unlock();        }    }

逻辑大致如下:

  1. 判断是否实现了接口 ShenyuPluginPluginDataHandler, 如果没有,则是否标识了 @Component 或是 @Service
  2. 如果符合1的条件,则将该对象注册到Spring 容器,并返回创建的对象

同步#

在插件注册成功后,这时只是实例化了插件,但它还不会生效,因为它还未添加到 Shenyu的插件链中,同步逻辑由 loaderPlugins()方法实现

    private void loaderPlugins(final List<ShenyuLoaderResult> results) {        if (CollectionUtils.isEmpty(results)) {            return;        }        // 获取所有实现了接口ShenyuPlugin的对象        List<ShenyuPlugin> shenyuExtendPlugins = results.stream().map(ShenyuLoaderResult::getShenyuPlugin).filter(Objects::nonNull).collect(Collectors.toList());        // 同步更新webHandler中plugins        webHandler.putExtPlugins(shenyuExtendPlugins);        // 获取所有实现了接口PluginDataHandler的对象        List<PluginDataHandler> handlers = results.stream().map(ShenyuLoaderResult::getPluginDataHandler).filter(Objects::nonNull).collect(Collectors.toList());        // 同步扩展的PluginDataHandler        subscriber.putExtendPluginDataHandler(handlers);
    }

该方法的逻辑处理了两个数据:

  1. 将实现了 ShenyuPlugin 接口的数据,同步至 webHandler的plugins 列表
    public void putExtPlugins(final List<ShenyuPlugin> extPlugins) {        if (CollectionUtils.isEmpty(extPlugins)) {            return;        }        // 过滤出新增的插件        final List<ShenyuPlugin> shenyuAddPlugins = extPlugins.stream()                .filter(e -> plugins.stream().noneMatch(plugin -> plugin.named().equals(e.named())))                .collect(Collectors.toList());        // 过滤出更新的插件,以名称和旧的相同来判断,则为更新        final List<ShenyuPlugin> shenyuUpdatePlugins = extPlugins.stream()                .filter(e -> plugins.stream().anyMatch(plugin -> plugin.named().equals(e.named())))                .collect(Collectors.toList());        // 如果没有数据,则跳过        if (CollectionUtils.isEmpty(shenyuAddPlugins) && CollectionUtils.isEmpty(shenyuUpdatePlugins)) {            return;        }        // 复制旧的数据        // copy new list        List<ShenyuPlugin> newPluginList = new ArrayList<>(plugins);        // 添加新的插件数据        // Add extend plugin from pluginData or shenyu ext-lib        this.sourcePlugins.addAll(shenyuAddPlugins);        // 添加新数据        if (CollectionUtils.isNotEmpty(shenyuAddPlugins)) {            shenyuAddPlugins.forEach(plugin -> LOG.info("shenyu auto add extends plugins:{}", plugin.named()));            newPluginList.addAll(shenyuAddPlugins);        }        // 修改更新的数据        if (CollectionUtils.isNotEmpty(shenyuUpdatePlugins)) {            shenyuUpdatePlugins.forEach(plugin -> LOG.info("shenyu auto update extends plugins:{}", plugin.named()));            for (ShenyuPlugin updatePlugin : shenyuUpdatePlugins) {                for (int i = 0; i < newPluginList.size(); i++) {                    if (newPluginList.get(i).named().equals(updatePlugin.named())) {                        newPluginList.set(i, updatePlugin);                    }                }                for (int i = 0; i < this.sourcePlugins.size(); i++) {                    if (this.sourcePlugins.get(i).named().equals(updatePlugin.named())) {                        this.sourcePlugins.set(i, updatePlugin);                    }                }            }        }        // 重新排序        plugins = sortPlugins(newPluginList);    }
  1. 将实现了 PluginDataHandler 接口的数据,同步至 subscriber 的handlers 列表
    public void putExtendPluginDataHandler(final List<PluginDataHandler> handlers) {        if (CollectionUtils.isEmpty(handlers)) {            return;        }        // 遍历所有数据        for (PluginDataHandler handler : handlers) {            String pluginNamed = handler.pluginNamed();            // 更新现有的PluginDataHandler列表            MapUtils.computeIfAbsent(handlerMap, pluginNamed, name -> {                LOG.info("shenyu auto add extends plugin data handler name is :{}", pluginNamed);                return handler;            });        }    }

至此,扩展插件的加载过程分析结束。

Apollo数据同步源码分析

· One min read
Apache ShenYu Contributor

本文基于shenyu-2.6.1版本进行源码分析,官网的介绍请参考 数据同步原理

Admin管理端#

以新增插件的流程来理解下整体的流程

接收数据#

  • PluginController.createPlugin()

进入PluginController类中的createPlugin()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/plugin")public class PluginController {
  @PostMapping("")  @RequiresPermissions("system:plugin:add")  public ShenyuAdminResult createPlugin(@Valid @ModelAttribute final PluginDTO pluginDTO) {      // 调用pluginService.createOrUpdate 进行处理逻辑      return ShenyuAdminResult.success(pluginService.createOrUpdate(pluginDTO));  }        // ......}

处理数据#

  • PluginServiceImpl.createOrUpdate() -> PluginServiceImpl.create()

PluginServiceImpl类中通过create()方法完成数据的转换,保存到数据库,发布事件。

@RequiredArgsConstructor@Servicepublic class PluginServiceImpl implements SelectorService {    // 事件发布对象 pluginEventPublisher    private final PluginEventPublisher pluginEventPublisher;
   private String create(final PluginDTO pluginDTO) {      // 判断有没有对应的插件      Assert.isNull(pluginMapper.nameExisted(pluginDTO.getName()), AdminConstants.PLUGIN_NAME_IS_EXIST);      // 自定义的插件jar      if (!Objects.isNull(pluginDTO.getFile())) {        Assert.isTrue(checkFile(Base64.decode(pluginDTO.getFile())), AdminConstants.THE_PLUGIN_JAR_FILE_IS_NOT_CORRECT_OR_EXCEEDS_16_MB);      }      // 创建plugin对象      PluginDO pluginDO = PluginDO.buildPluginDO(pluginDTO);      // 插入对象到数据库      if (pluginMapper.insertSelective(pluginDO) > 0) {        // 插件新增成功,则发布创建事件        // publish create event. init plugin data        pluginEventPublisher.onCreated(pluginDO);      }      return ShenyuResultMessage.CREATE_SUCCESS;  }            // ......    }

PluginServiceImpl类完成数据的持久化操作,即保存数据到数据库,并通过 pluginEventPublisher 进行发布事件。

pluginEventPublisher.onCreateed方法的逻辑是:发布变更的事件。

    @Overridepublic void onCreated(final PluginDO plugin) {        // 发布DataChangeEvent事件:事件分组(插件、选择器、规则)、事件类型(创建、删除、更新)、变更的数据        publisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, DataEventTypeEnum.CREATE,        Collections.singletonList(PluginTransfer.INSTANCE.mapToData(plugin))));        // 发布PluginCreatedEvent        publish(new PluginCreatedEvent(plugin, SessionUtil.visitorName()));}

发布变更数据通过publisher.publishEvent()完成,这个publisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者;

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {//......}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

分发数据#

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * 有数据变更时,调用此方法     * @param event     */  @Override  @SuppressWarnings("unchecked")  public void onApplicationEvent(final DataChangedEvent event) {    // 遍历数据变更监听器(这里只会注册ApolloDataChangedListener)    for (DataChangedListener listener : listeners) {      // 依据不同的分组类型进行转发      switch (event.getGroupKey()) {        case APP_AUTH: // 认证信息          listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());          break;        case PLUGIN: // 插件事件          // 调用注册的listener对象          listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());          break;        case RULE: // 规则事件          listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());          break;        case SELECTOR: // 选择器事件          listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());          break;        case META_DATA: // 元数据事件          listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());          break;        case PROXY_SELECTOR: // 代理选择器事件          listener.onProxySelectorChanged((List<ProxySelectorData>) event.getSource(), event.getEventType());          break;        case DISCOVER_UPSTREAM: // 注册发现下游列表事件          listener.onDiscoveryUpstreamChanged((List<DiscoverySyncData>) event.getSource(), event.getEventType());          applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnUpstreamChanged((List<DiscoverySyncData>) event.getSource(), event.getEventType());          break;        default:          throw new IllegalStateException("Unexpected value: " + event.getGroupKey());      }    }  }    }

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共会有以下种:认证信息、插件信息、规则信息、选择器信息、元数据、代理选择器、发现下游事件。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,由特定的实现来处理,而不同的监听由不同的实现来处理,当前分析的是Apollo来 监听,所以这里只关注 ApolloDataChangedListener

// 继承AbstractNodeDataChangedListenerpublic class ApolloDataChangedListener extends AbstractNodeDataChangedListener {    }

ApolloDataChangedListener 继承了 AbstractNodeDataChangedListener 类,该类主要是以key作为存储方式的基类,如apollo、nacos等,其他的如zookeeper、 consul、etcd 等是以path的方式进行分层级来查找的。

// 以key作为查找存储方式的基类public abstract class AbstractNodeDataChangedListener implements DataChangedListener {         protected AbstractNodeDataChangedListener(final ChangeData changeData) {      this.changeData = changeData;    }}

AbstractNodeDataChangedListener 接收 ChangeData作为参数,该对象定义了存储于Apollo中的各个数据的key命名,存储于Apollo中的数据包括以下数据:

  • 插件(plugin)
  • 选择器(selector)
  • 规则(rule)
  • 授权(auth)
  • 元数据(meta)
  • 代理选择器(proxy.selector)
  • 下游列表(discovery)

这些信息由ApolloDataChangedListener构造器指定:

public class ApolloDataChangedListener extends AbstractNodeDataChangedListener {  public ApolloDataChangedListener(final ApolloClient apolloClient) {    // 配置几类分组数据的前缀    super(new ChangeData(ApolloPathConstants.PLUGIN_DATA_ID,            ApolloPathConstants.SELECTOR_DATA_ID,            ApolloPathConstants.RULE_DATA_ID,            ApolloPathConstants.AUTH_DATA_ID,            ApolloPathConstants.META_DATA_ID,            ApolloPathConstants.PROXY_SELECTOR_DATA_ID,            ApolloPathConstants.DISCOVERY_DATA_ID));    // 操作apollo的对象    this.apolloClient = apolloClient;  }}

DataChangedListener 定义了以下几个方法:

// 数据变更监听器public interface DataChangedListener {
    // 授权信息变更时调用    default void onAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType) {    }
    // 插件信息变更时调用    default void onPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType) {    }
    // 选择器信息变更时调用    default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) {    }         // 元数据信息变更时调用    default void onMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType) {
    }
    // 规则信息变更时调用    default void onRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType) {    }
    // 代理选择器变更时调用    default void onProxySelectorChanged(List<ProxySelectorData> changed, DataEventTypeEnum eventType) {    }    // 发现下游信息变更时调用    default void onDiscoveryUpstreamChanged(List<DiscoverySyncData> changed, DataEventTypeEnum eventType) {    }
}

DataChangedEventDispatcher处理插件时,调用方法 listener.onPluginChanged, 接下来分析下对象的逻辑,实现由AbstractNodeDataChangedListener处理:

public abstract class AbstractNodeDataChangedListener implements DataChangedListener {  @Override  public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {    // 配置前缀为plugin.    final String configKeyPrefix = changeData.getPluginDataId() + DefaultNodeConstants.JOIN_POINT;    this.onCommonChanged(configKeyPrefix, changed, eventType, PluginData::getName, PluginData.class);    LOG.debug("[DataChangedListener] PluginChanged {}", configKeyPrefix);  }}

首先构建配置数据的key前缀为:plugin., 再调用onCommonChanged统一处理:

private <T> void onCommonChanged(final String configKeyPrefix, final List<T> changedList,                                     final DataEventTypeEnum eventType, final Function<? super T, ? extends String> mapperToKey,                                     final Class<T> tClass) {        // Avoiding concurrent operations on list nodes        final ReentrantLock reentrantLock = listSaveLockMap.computeIfAbsent(configKeyPrefix, key -> new ReentrantLock());        try {            reentrantLock.lock();            // 当前传入的插件列表            final List<String> changeNames = changedList.stream().map(mapperToKey).collect(Collectors.toList());            switch (eventType) {                // 删除操作                case DELETE:                    // 按 plugin.${pluginName} 进行删除                    changedList.stream().map(mapperToKey).forEach(removeKey -> {                        delConfig(configKeyPrefix + removeKey);                    });                    // 从plugin.list中移除对应的插件名称                    // plugin.list 记录下了目前启用的列表                    delChangedData(configKeyPrefix, changeNames);                    break;                case REFRESH:                case MYSELF:                    // 重载逻辑                    // 获取plugin.list中的所有插件列表                    final List<String> configDataNames = this.getConfigDataNames(configKeyPrefix);                    // 依次更新当前调整的每个插件                    changedList.forEach(changedData -> {                        // 发布配置                        publishConfig(configKeyPrefix + mapperToKey.apply(changedData), changedData);                    });                    // 目前存储的列表中,如果数据比当前传入的多,则删除多余的数据                    if (configDataNames != null && configDataNames.size() > changedList.size()) {                        // 踢除当前加载的数据                        configDataNames.removeAll(changeNames);                        // 逐个删除已经取消的数据                        configDataNames.forEach(this::delConfig);                    }                    // 重新更新列表数据                    publishConfig(configKeyPrefix + DefaultNodeConstants.LIST_STR, changeNames);                    break;                default:                    // 新增或是更新                    changedList.forEach(changedData -> {                        publishConfig(configKeyPrefix + mapperToKey.apply(changedData), changedData);                    });                    // 将新加的插件更新                    putChangeData(configKeyPrefix, changeNames);                    break;            }        } catch (Exception e) {            LOG.error("AbstractNodeDataChangedListener onCommonMultiChanged error ", e);        } finally {            reentrantLock.unlock();        }    }

在以上逻辑,其实包含全量重载(REFRESH、MYSELF)与增量(DELETE、UPDATE、CREATE)的处理

在插件中主要包含两个节点:

  • plugin.list 当前生效的插件列表
  • plugin.${plugin.name} 具体插件的详细信息 最后,将这两个节点对应的数据写入Apollo。

数据初始化#

admin启动后,会将当前的数据信息全量同步到apollo中,由ApolloDataChangedInit实现:

// 继承AbstractDataChangedInitpublic class ApolloDataChangedInit extends AbstractDataChangedInit {    // apollo操作对象    private final ApolloClient apolloClient;        public ApolloDataChangedInit(final ApolloClient apolloClient) {        this.apolloClient = apolloClient;    }        @Override    protected boolean notExist() {        // 判断 plugin、auth、meta、proxy.selector等节点是否存在        // 只要有一个不存在,则进入重新加载(这些节点不会创建,为什么要判断一次呢?)        return Stream.of(ApolloPathConstants.PLUGIN_DATA_ID, ApolloPathConstants.AUTH_DATA_ID, ApolloPathConstants.META_DATA_ID, ApolloPathConstants.PROXY_SELECTOR_DATA_ID).allMatch(                this::dataIdNotExist);    }
    /**     * Data id not exist boolean.     *     * @param pluginDataId the plugin data id     * @return the boolean     */    private boolean dataIdNotExist(final String pluginDataId) {        return Objects.isNull(apolloClient.getItemValue(pluginDataId));    }}

判断apollo中是否存在数据,如果不存在,则进行同步。 这里有一个bug, 因为这里判断的key,在同步时,并不会创建,则会导致每次重启时都重新加载数据,已提PR#5435

ApolloDataChangedInit实现了CommandLineRunner接口。它是springboot提供的接口,会在所有 Spring Beans初始化之后执行run()方法,常用于项目中初始化的操作。

  • SyncDataService.syncAll()

从数据库查询数据,然后进行全量数据同步,所有的认证信息、插件信息、规则信息、选择器信息、元数据、代理选择器、发现下游事件。主要是通过eventPublisher发布同步事件,eventPublisher通过publishEvent()发布完事件后,有ApplicationListener执行事件变更操作,在ShenYu中就是前面提到的DataChangedEventDispatcher

@Servicepublic class SyncDataServiceImpl implements SyncDataService {    // 事件发布    private final ApplicationEventPublisher eventPublisher;         /***     * 全量数据同步     * @param type the type     * @return     */     @Override     public boolean syncAll(final DataEventTypeEnum type) {         // 同步auth数据         appAuthService.syncData();         // 同步插件数据         List<PluginData> pluginDataList = pluginService.listAll();         // 通过spring发布/订阅机制进行通知订阅者(发布DataChangedEvent)         // 统一由DataChangedEventDispatcher进行监听         // DataChangedEvent带上了配置分组类型、当前操作类型、数据         eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));         // 同步选择器         List<SelectorData> selectorDataList = selectorService.listAll();         eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));         // 同步规则         List<RuleData> ruleDataList = ruleService.listAll();         eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));         //元数据         metaDataService.syncData();         // 下游列表         discoveryService.syncData();         return true;     }    }

bootstrap同步操作初始化#

网关这边的数据同步初始化操作主要是订阅apollo中的节点,当有数据变更时,收到变更数据。这依赖于apollolistener机制。在ShenYu中,负责apollo数据同步的是ApolloDataService

ApolloDataService的功能逻辑是在实例化的过程中完成的:对apollo中的shenyu数据同步节点完成订阅。通过configService.addChangeListener()方法实现;

public class ApolloDataService extends AbstractNodeDataSyncService implements SyncDataService {    public ApolloDataService(final Config configService, final PluginDataSubscriber pluginDataSubscriber,                             final List<MetaDataSubscriber> metaDataSubscribers,                             final List<AuthDataSubscriber> authDataSubscribers,                             final List<ProxySelectorDataSubscriber> proxySelectorDataSubscribers,                             final List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) {        // 配置监听的前缀        super(new ChangeData(ApolloPathConstants.PLUGIN_DATA_ID,                        ApolloPathConstants.SELECTOR_DATA_ID,                        ApolloPathConstants.RULE_DATA_ID,                        ApolloPathConstants.AUTH_DATA_ID,                        ApolloPathConstants.META_DATA_ID,                        ApolloPathConstants.PROXY_SELECTOR_DATA_ID,                        ApolloPathConstants.DISCOVERY_DATA_ID),                pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);        this.configService = configService;        // 开始监听        // 注:Apollo该方法,只负责获取apollo的数据获取,并添加到本地缓存中,不处理监听        startWatch();        // 配置监听        apolloWatchPrefixes();    }}

首先配置需要处理的key信息,同admin同步的key。接着调用startWatch() 方法进行处理数据获取与监听。但对于Apollo的实现中,该方法只负责处理数据的获取并设置到本地缓存中。 监听由apolloWatchPrefixes方法来处理

private void apolloWatchPrefixes() {        // 定义监听器        final ConfigChangeListener listener = changeEvent -> {            changeEvent.changedKeys().forEach(changeKey -> {                try {                    final ConfigChange configChange = changeEvent.getChange(changeKey);                    // 未变更则跳过                    if (configChange == null) {                        LOG.error("apollo watchPrefixes error configChange is null {}", changeKey);                        return;                    }                    final String newValue = configChange.getNewValue();                    // skip last is "list"                    // 如果是list结尾的Key,如plugin.list则跳过,因为这里只是记录生效的一个列表,不会在本地缓存中                    final int lastListStrIndex = changeKey.length() - DefaultNodeConstants.LIST_STR.length();                    if (changeKey.lastIndexOf(DefaultNodeConstants.LIST_STR) == lastListStrIndex) {                        return;                    }                    // 如果是plugin.开头 => 处理插件数据                    if (changeKey.indexOf(ApolloPathConstants.PLUGIN_DATA_ID) == 0) {                        // 删除                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            // 清除缓存                            unCachePluginData(changeKey);                        } else {                            // 更新缓存                            cachePluginData(newValue);                        }                        // 如果是selector.开头 => 处理选择器数据                    } else if (changeKey.indexOf(ApolloPathConstants.SELECTOR_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheSelectorData(changeKey);                        } else {                            cacheSelectorData(newValue);                        }                        // 如果是rule.开头 => 处理规则数据                    } else if (changeKey.indexOf(ApolloPathConstants.RULE_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheRuleData(changeKey);                        } else {                            cacheRuleData(newValue);                        }                        // 如果是auth.开头 => 处理授权数据                    } else if (changeKey.indexOf(ApolloPathConstants.AUTH_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheAuthData(changeKey);                        } else {                            cacheAuthData(newValue);                        }                        // 如果是meta.开头 => 处理元数据                    } else if (changeKey.indexOf(ApolloPathConstants.META_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheMetaData(changeKey);                        } else {                            cacheMetaData(newValue);                        }                        // 如果是proxy.selector.开头 => 处理代理选择器数据                    } else if (changeKey.indexOf(ApolloPathConstants.PROXY_SELECTOR_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheProxySelectorData(changeKey);                        } else {                            cacheProxySelectorData(newValue);                        }                        // 如果是discovery.开头 => 处理下游列表数据                    } else if (changeKey.indexOf(ApolloPathConstants.DISCOVERY_DATA_ID) == 0) {                        if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {                            unCacheDiscoveryUpstreamData(changeKey);                        } else {                            cacheDiscoveryUpstreamData(newValue);                        }                    }                } catch (Exception e) {                    LOG.error("apollo sync listener change key handler error", e);                }            });        };        watchConfigChangeListener = listener;        // 添加监听        configService.addChangeListener(listener, Collections.emptySet(), ApolloPathConstants.pathKeySet());
    }

由前面admin加载数据的逻辑,插件只会增加两个Key:plugin.listplugin.${plugin.name},而 plugin.list 是所有启用的插件列表,该key的数据在 本地缓存中没有数据,只会关注plugin.${plugin.name} key对应的数据,这是对应的插件的详细信息。

至此,bootstrap在apollo中的同步逻辑就分析完成。

Etcd数据同步源码分析

· One min read
Apache ShenYu Contributor

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于Etcd的数据同步源码分析。

本文基于shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. 关于Etcd#

Etcd是一个分布式的键值对存储系统,它为大型分布式计算提供分布式配置服务、同步服务和命名注册。

2. Admin数据同步#

我们从一个实际案例进行源码追踪,比如在后台管理系统中,对Divide插件中的一条选择器数据进行更新,将权重更新为90:

2.1 接收数据#

  • SelectorController.createSelector()

进入SelectorController类中的updateSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController {        @PutMapping("/{id}")    public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) {        // 设置当前选择器数据id        selectorDTO.setId(id);        // 创建或更新操作        Integer updateCount = selectorService.createOrUpdate(selectorDTO);        // 返回结果信息        return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount);    }        // ......}

2.2 处理数据#

  • SelectorServiceImpl.createOrUpdate()

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService {    // 负责事件发布的eventPublisher    private final ApplicationEventPublisher eventPublisher;        @Override    @Transactional(rollbackFor = Exception.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        // 构建数据 DTO --> DO        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        // 判断是添加还是更新        if (StringUtils.isEmpty(selectorDTO.getId())) {            // 插入选择器数据            selectorCount = selectorMapper.insertSelective(selectorDO);            // 插入选择器中的条件数据            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });            // check selector add            // 权限检查            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());                dataPermissionDTO.setDataId(selectorDO.getId());                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));            }
        } else {            // 更新数据,先删除再新增            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        // 发布事件        publishEvent(selectorDO, selectorConditionDTOs);
        // 更新upstream        updateDivideUpstream(selectorDO);        return selectorCount;    }            // ......    }

Service类完成数据的持久化操作,即保存数据到数据库,这个比较简单,就不深入追踪了。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会执行数据同步。

publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。

     private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        // 找到选择器对应的插件        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        // 构建条件数据        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // 发布变更数据        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者;

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {//......}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

2.3 分发数据#

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * 有数据变更时,调用此方法     * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                case APP_AUTH: // 认证信息                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // 插件信息                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // 规则信息                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // 元数据                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // 其他类型,抛出异常                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    }

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:

  • WebsocketDataChangedListener:基于websocket的数据同步;
  • ZookeeperDataChangedListener:基于zookeeper的数据同步;
  • ConsulDataChangedListener:基于consul的数据同步;
  • EtcdDataDataChangedListener:基于etcd的数据同步;
  • HttpLongPollingDataChangedListener:基于http长轮询的数据同步;
  • NacosDataChangedListener:基于nacos的数据同步;

既然有这么多种实现策略,那么如何确定使用哪一种呢?

因为本文是基于Etcd的数据同步源码分析,所以这里以EtcdDataDataChangedListener为例,分析它是如何被加载并实现的。

通过查看对EtcdDataDataChangedListener类的调用,可以发现,它是在DataSyncConfiguration类进行配置的。

/** * 数据同步配置类 * 通过springboot条件装配实现 * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {
   //省略了其他代码......    /**   * The type Etcd listener.   */  @Configuration  @ConditionalOnProperty(prefix = "shenyu.sync.etcd", name = "url")  @EnableConfigurationProperties(EtcdProperties.class)  static class EtcdListener {
    @Bean    public EtcdClient etcdClient(final EtcdProperties etcdProperties) {      Client client = Client.builder()              .endpoints(etcdProperties.getUrl())              .build();      return new EtcdClient(client);    }
    /**     * Config event listener data changed listener.     * 创建Etcd数据变更监听器     * @param etcdClient the etcd client     * @return the data changed listener     */    @Bean    @ConditionalOnMissingBean(EtcdDataDataChangedListener.class)    public DataChangedListener etcdDataChangedListener(final EtcdClient etcdClient) {      return new EtcdDataDataChangedListener(etcdClient);    }
    /**     * data init.     * 创建Etcd数据初始化类     * @param etcdClient        the etcd client     * @param syncDataService the sync data service     * @return the etcd data init     */    @Bean    @ConditionalOnMissingBean(EtcdDataInit.class)    public EtcdDataInit etcdDataInit(final EtcdClient etcdClient, final SyncDataService syncDataService) {      return new EtcdDataInit(etcdClient, syncDataService);    }  }        //省略了其他代码......}

这个配置类是通过SpringBoot条件装配类实现的。在EtcdListener类上面有几个注解:

  • @Configuration:配置文件,应用上下文;

  • @ConditionalOnProperty(prefix = "shenyu.sync.etcd", name = "url"):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用etcd进行数据同步。

    shenyu:    sync:     etcd:          url: localhost:2181
  • @EnableConfigurationProperties(EtcdProperties.class):导入另一个属性类EtcdPropertiesEtcdProperties中各属性对应配置文件中以shenyu.sync.etcd作为前缀的各属性。

@Data@ConfigurationProperties(prefix = "shenyu.sync.etcd")public class EtcdProperties {
  private String url;
  private Integer sessionTimeout;
  private Integer connectionTimeout;
  private String serializer;}

当我们在配置文件中配置了shenyu.sync.etcd.url属性时,Admin将采用etcd进行数据同步,此时配置类EtcdListener会生效,并生成EtcdClient, EtcdDataDataChangedListenerEtcdDataInit类型的bean。

  • 生成EtcdClient类型的bean,etcdClient,这个bean根据配置文件,配置了与etcd服务器的连接信息,可以直接操作etcd节点。
  • 生成EtcdDataDataChangedListener类型的bean,etcdDataDataChangedListener,这个bean将beanetcdClient作为成员变量,当监听到事件时,进行回调操作,可以直接使用该bean操作etcd节点。
  • 生成EtcdDataInit类型的bean,etcdDataInit,这个bean将beanetcdClient和beansyncDataService作为成员变量,使用etcdClient根据etcd路径,判断数据是否未初始化,当未初始化时,将调用syncDataService进行刷新操作,将在下文详述。 根据上文所述,在事件处理方法onApplicationEvent()中,就会到相应的listener中。在我们的案例中,是对一条选择器数据进行更新,数据同步采用的是etcd,所以,代码会进入到EtcdDataDataChangedListener进行选择器数据变更处理。
    //DataChangedEventDispatcher.java        @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                                    // 省略了其他逻辑                                    case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // 在我们的案例中,会进入到EtcdDataDataChangedListener进行选择器数据变更处理                    break;         }    }

2.4 Etcd数据变更监听器#

  • EtcdDataDataChangedListener.onSelectorChanged()

    onSelectorChanged()方法中,判断操作类型,是刷新同步还是更新或创建同步。根据当前选择器数据信息判断节点是否在etcd中。

/** * EtcdDataDataChangedListener. */@Slf4jpublic class EtcdDataDataChangedListener implements DataChangedListener {
    private final EtcdClient etcdClient;
    public EtcdDataDataChangedListener(final EtcdClient client) {        this.etcdClient = client;    }
    // 选择器信息发生改变    @Override    public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {      // 刷新操作      if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {        String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());        etcdClient.deleteEtcdPathRecursive(selectorParentPath);      }      // 发生变更的数据      for (SelectorData data : changed) {        // 构建选择器数据的真实路径        String selectorRealPath = DefaultPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());        //删除操作        if (eventType == DataEventTypeEnum.DELETE) {          etcdClient.delete(selectorRealPath);          continue;        }        //create or update,创建或更新操作        updateNode(selectorRealPath, data);      }    }    }

这部分是核心。changed表示需更新的SelectorData列表,eventType表示事件类型。当事件类型为刷新REFRESH,并且SelectorData有改动时,会先将etcd中该plugin下的selector节点都先删除。注意这里的条件SelectorData有改动是必须的,否则会出现没有改动时进行刷新,将所有selector节点都删除的bug。 获取到selector对应路径后,会对节点进行删除、创建或更新。

只要将变动的数据正确写入到etcd的节点上,admin这边的操作就执行完成了。

在我们当前的案例中,对Divide插件中的一条选择器数据进行更新,将权重更新为90,就会对图中的特定节点更新。

我们用时序图将上面的更新流程串联起来。

3. 网关数据同步#

假设ShenYu网关已经在正常运行,使用的数据同步方式也是etcd。那么当在admin端更新选择器数据后,并且向etcd发送了变更的数据,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。

3.1 EtcdClient接收数据#

  • EtcdClient.watchDataChange()

在网关端有一个EtcdSyncDataService类,它通过etcdClient订阅了数据节点,当数据发生变更时,可以感知到。

/** * Data synchronize of etcd. */@Slf4jpublic class EtcdSyncDataService implements SyncDataService, AutoCloseable {    //省略其它代码    private void subscribeSelectorDataChanges(final String path) {      etcdClient.watchDataChange(path, (updateNode, updateValue) -> cacheSelectorData(updateValue),              this::unCacheSelectorData);    }  //省略其它代码}

EtcdWatch机制,会给订阅的客户端发送节点变更通知。在我们的案例中,更新选择器信息,就会进入到watchDataChange()方法。通过cacheSelectorData()去处理数据。

3.2 处理数据#

  • EtcdSyncDataService.cacheSelectorData()

经过判空逻辑之后,缓存选择器数据的操作又交给了PluginDataSubscriber处理。

    private void cacheSelectorData(final String dataString) {        final SelectorData selectorData = GsonUtils.getInstance().fromJson(dataString, SelectorData.class);        Optional.ofNullable(selectorData)            .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));        }

PluginDataSubscriber是一个接口,它只有一个CommonPluginDataSubscriber实现类,负责处理插件、选择器和规则数据。

3.3 通用插件数据订阅者#

  • PluginDataSubscriber.onSelectorSubscribe()

它没有其他逻辑,直接调用subscribeDataHandler()方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。

/** * 通用插件数据订阅者,负责处理所有插件、选择器和规则信息 * The type Common plugin data subscriber. */public class CommonPluginDataSubscriber implements PluginDataSubscriber {    //......     // 处理选择器数据    @Override    public void onSelectorSubscribe(final SelectorData selectorData) {        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);    }            // 订阅数据处理器,处理数据的更新或删除    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {        Optional.ofNullable(classData).ifPresent(data -> {            // 插件数据            if (data instanceof PluginData) {                PluginData pluginData = (PluginData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cachePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));                }            } else if (data instanceof SelectorData) {  // 选择器数据                SelectorData selectorData = (SelectorData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));                }            } else if (data instanceof RuleData) {  // 规则数据                RuleData ruleData = (RuleData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));                } else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));                }            }        });    }    }

3.4 数据缓存到内存#

那么更新一条选择器数据,会进入下面的逻辑:

// 将数据保存到网关内存BaseDataCache.getInstance().cacheSelectData(selectorData);// 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

一是将数据保存到网关的内存中。BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

public final class BaseDataCache {    // 私有变量    private static final BaseDataCache INSTANCE = new BaseDataCache();    // 私有构造器    private BaseDataCache() {    }        /**     * Gets instance.     *  公开方法     * @return the instance     */    public static BaseDataCache getInstance() {        return INSTANCE;    }        /**    *  缓存选择器数据的Map     * pluginName -> SelectorData.     */    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();        public void cacheSelectData(final SelectorData selectorData) {        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);    }           /**     * cache selector data.     * 缓存选择器数据     * @param data the selector data     */    private void selectorAccept(final SelectorData data) {        String key = data.getPluginName();        if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入            List<SelectorData> existList = SELECTOR_MAP.get(key);            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());            resultList.add(data);            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());            SELECTOR_MAP.put(key, collect);        } else {  // 新增操作,直接放到Map中            SELECTOR_MAP.put(key, Lists.newArrayList(data));        }    }    }

二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。

经过以上的源码追踪,并通过一个实际的案例,在admin端新增更新一条选择器数据,就将etcd数据同步的流程分析清楚了。

我们还是通过时序图将网关端的数据同步流程串联一下:

数据同步的流程已经分析完了,为了不让同步流程被打断,在分析过程中就忽略了其他逻辑。我们还需要分析Admin同步数据初始化和网关同步操作初始化的流程。

4. Admin同步数据初始化#

admin启动后,会将当前的数据信息全量同步到etcd中,实现逻辑如下:


/** * EtcdDataInit. */@Slf4jpublic class EtcdDataInit implements CommandLineRunner {
  private final EtcdClient etcdClient;
  private final SyncDataService syncDataService;
  public EtcdDataInit(final EtcdClient client, final SyncDataService syncDataService) {    this.etcdClient = client;    this.syncDataService = syncDataService;  }
  @Override  public void run(final String... args) throws Exception {    final String pluginPath = DefaultPathConstants.PLUGIN_PARENT;    final String authPath = DefaultPathConstants.APP_AUTH_PARENT;    final String metaDataPath = DefaultPathConstants.META_DATA;    if (!etcdClient.exists(pluginPath) && !etcdClient.exists(authPath) && !etcdClient.exists(metaDataPath)) {      log.info("Init all data from database");      syncDataService.syncAll(DataEventTypeEnum.REFRESH);    }  }}

判断etcd中是否存在数据,如果不存在,则进行同步。

EtcdDataInit实现了CommandLineRunner接口。它是springboot提供的接口,会在所有 Spring Beans初始化之后执行run()方法,常用于项目中初始化的操作。

  • SyncDataService.syncAll()

从数据库查询数据,然后进行全量数据同步,所有的认证信息、插件信息、选择器信息、规则信息和元数据信息。主要是通过eventPublisher发布同步事件。这里就跟前面提到的同步逻辑就又联系起来了,eventPublisher通过publishEvent()发布完事件后,有ApplicationListener执行事件变更操作,在ShenYu中就是前面提到的DataChangedEventDispatcher

@Servicepublic class SyncDataServiceImpl implements SyncDataService {    // 事件发布    private final ApplicationEventPublisher eventPublisher;         /***     * 全量数据同步     * @param type the type     * @return     */    @Override    public boolean syncAll(final DataEventTypeEnum type) {        // 同步认证信息        appAuthService.syncData();        // 同步插件信息        List<PluginData> pluginDataList = pluginService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));        // 同步选择器信息        List<SelectorData> selectorDataList = selectorService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));        // 同步规则信息        List<RuleData> ruleDataList = ruleService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));        // 同步元数据信息        metaDataService.syncData();        return true;    }    }

5. 网关同步操作初始化#

网关这边的数据同步初始化操作主要是订阅etcd中的节点,当有数据变更时,收到变更数据。这依赖于EtcdWatch机制。在ShenYu中,负责etcd数据同步的是EtcdSyncDataService,也在前面提到过。

EtcdSyncDataService的功能逻辑是在实例化的过程中完成的:对etcd中的shenyu数据同步节点完成订阅。这里的订阅分两类,一类是已经存在的节点上面数据发生更新,这通过etcdClient.watchDataChange()方法实现;另一类是当前节点下有新增或删除节点,即子节点发生变化,这通过etcdClient.watchChildChange()方法实现。

EtcdSyncDataService的代码有点多,这里我们以插件数据的读取和订阅进行追踪,其他类型的数据操作原理是一样的。

/** * etcd 数据同步服务 */@Slf4jpublic class EtcdSyncDataService implements SyncDataService, AutoCloseable {  // 在实例化的时候,完成从etcd中读取数据的操作,并订阅节点  public EtcdSyncDataService(/*省略构造参数*/) {    this.etcdClient = etcdClient;    this.pluginDataSubscriber = pluginDataSubscriber;    this.metaDataSubscribers = metaDataSubscribers;    this.authDataSubscribers = authDataSubscribers;    // 订阅插件、选择器和规则数据    watcherData();    // 订阅认证数据    watchAppAuth();    // 订阅元数据    watchMetaData();  }
  private void watcherData() {    // 插件节点路径    final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;    // 所有插件节点    List<String> pluginZKs = etcdClientGetChildren(pluginParent);    for (String pluginName : pluginZKs) {      // 订阅当前所有插件、选择器和规则数据      watcherAll(pluginName);    }    // 订阅子节点(新增或删除一个插件)    etcdClient.watchChildChange(pluginParent, (updateNode, updateValue) -> {      if (!updateNode.isEmpty()) {        // 需要订阅子节点的所有插件、选择器和规则数据        watcherAll(updateNode);      }    }, null);  }
  private void watcherAll(final String pluginName) {    // 订阅插件数据    watcherPlugin(pluginName);    // 订阅选择器数据    watcherSelector(pluginName);    // 订阅规则数据    watcherRule(pluginName);  }
  private void watcherPlugin(final String pluginName) {    // 当前插件路径    String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);    // 缓存到网关内存中    cachePluginData(etcdClient.get(pluginPath));    // 订阅插件节点    subscribePluginDataChanges(pluginPath, pluginName);  }    private void cachePluginData(final String dataString) {    final PluginData pluginData = GsonUtils.getInstance().fromJson(dataString, PluginData.class);    Optional.ofNullable(pluginData)      .flatMap(data -> Optional.ofNullable(pluginDataSubscriber)).ifPresent(e -> e.onSubscribe(pluginData));  }  
  private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {    // 订阅数据变更:更新或删除,两个lambda表达式分别为更新和删除操作    etcdClient.watchDataChange(pluginPath, (updatePath, updateValue) -> {      final String dataPath = buildRealPath(pluginPath, updatePath);      final String dataStr = etcdClient.get(dataPath);      final PluginData data = GsonUtils.getInstance().fromJson(dataStr, PluginData.class);      Optional.ofNullable(data)              .ifPresent(d -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe(d)));    }, deleteNode -> deletePlugin(pluginName));  }
}

上面的源代码中都给出了注释,相信大家可以看明白。订阅插件数据的主要逻辑如下:

  1. 构造当前插件路径
  2. 读取etcd上当前节点数据,并反序列化
  3. 插件数据缓存到网关内存中
  4. 订阅插件节点

6. 总结#

本文通过一个实际案例,对etcd的数据同步原理进行了源码分析。涉及到的主要知识点如下:

  • 基于etcd的数据同步,主要是通过watch机制实现;
  • 通过Spring完成事件发布和监听;
  • 通过抽象DataChangedListener接口,支持多种同步策略,面向接口编程;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过SpringBoot的条件装配和starter加载机制实现配置类的加载。