Skip to main content

集成测试剖析

· One min read
Kunshuai Zhu
Apache ShenYu Committer

这篇文章将会对Apache ShenYu的集成测试进行深入剖析。

什么是集成测试?#

集成测试在一些项目里也叫E2E (End To End)测试,主要用于测试各个模块组装成一个系统后是否能符合预期。

Apache ShenYu将集成测试放在了持续集成中,利用GitHub Action,在每次向主分支提交Pull Request或是Merge时触发。这样可以大大降低项目的维护成本,提升Apache ShenYu的稳定性。

自动化的集成测试如何实现?#

Apache ShenYu中,集成测试的主要步骤体现在GitHub Action工作流的脚本中,如下所示,该脚本位于 ~/.github/workflows目录下。

name: iton:  pull_request:  push:    branches:      - masterjobs:  build:    strategy:      matrix:        case:          - shenyu-integrated-test-alibaba-dubbo          ...    runs-on: ubuntu-latest    steps:      - uses: actions/checkout@v2        with:          submodules: true      ...

下面我将从这个yaml文件出发,带你剖析整个自动化集成测试的流程。

工作流的触发#

由于我们在 on 中指定了 pull_requestpush.branch: master,那么当我们提交pull_request或是merge分支到master(push)的时候,就会触发这个工作流。

关于更多GitHub Action的用法,可以参考 GitHub Action 的文档,这里不会做详细的介绍。

初始化环境#

  • 拉取代码
- uses: actions/checkout@v2  with:        submodules: true
  • 设置跳过标志
- name: Set Skip Env Var      uses: ./.github/actions/skip-ci

当发生的是一些对功能无关的改动(如改动文档)时,会跳过集成测试,以节约资源。

  • 缓存maven依赖、安装Java
- name: Cache Maven Repos...- uses: actions/setup-java@v1

构建整个项目,同时构建docker镜像#

./mvnw -B clean install -Prelease,docker -Dmaven.javadoc.skip=true -Dmaven.test.skip=true

上面这行命令中,-P后面跟着release,docker,表示会激活pom文件中相关的profile配置。

而release和docker这两个profile,目前只在 shenyu-dist 下的几个子模块中存在。下面将以 shenyu-dist-admin 模块为例,介绍profile为release和docker的配置的具体内容。另外,集成测试只使用了这一步构建的 shenyu-admin 镜像。

  • 首先是release

    <profile>    <id>release</id>    <activation>        <activeByDefault>false</activeByDefault>    </activation>    <build>        <finalName>apache-shenyu-incubating-${project.version}</finalName>        <plugins>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-assembly-plugin</artifactId>                <executions>                    <execution>                        <id>admin-bin</id>                        <phase>package</phase>                        <goals>                            <goal>single</goal>                        </goals>                    </execution>                </executions>                <configuration>                    <descriptors>                        <descriptor>${project.basedir}/src/main/assembly/binary.xml</descriptor>                    </descriptors>                    <tarLongFileMode>posix</tarLongFileMode>                </configuration>            </plugin>        </plugins>    </build></profile>

    当-P后面跟着release时,就会激活上面的 maven-assembly-plugin 插件。而executions中将插件的执行时机绑定在了maven生命周期package中,这也就意味着,当我们执行 mvn install 的时候就会触发。

    configuration中指定了我们编写好的 binary.xmlmaven-assembly-plugin 插件将会按照这个文件,将需要的文件复制进来,并打包。你可以点击链接查看该文件:shenyu-dist/shenyu-admin-dist/src/main/assembly/binary.xml

    根据这个文件,插件会将其他模块下打包好的jar包、配置文件、启动脚本等“复制”过来,最终打成 tar.gz 格式的压缩包。

  • 然后是docker

    <profile>    <id>docker</id>    <activation>        <activeByDefault>false</activeByDefault>    </activation>    <build>        <plugins>            <plugin>                <groupId>com.spotify</groupId>                <artifactId>dockerfile-maven-plugin</artifactId>                <version>${dockerfile-maven-plugin.version}</version>                <executions>                    <execution>                        <id>tag-latest</id>                        <goals>                            <goal>build</goal>                        </goals>                        <configuration>                            <tag>latest</tag>                        </configuration>                    </execution>                    <execution>                        <id>tag-version</id>                        <goals>                            <goal>build</goal>                        </goals>                        <configuration>                            <tag>${project.version}</tag>                        </configuration>                    </execution>                </executions>                <configuration>                    <repository>apache/shenyu-admin</repository>                    <buildArgs>                        <APP_NAME>apache-shenyu-incubating-${project.version}-admin-bin</APP_NAME>                    </buildArgs>                </configuration>            </plugin>        </plugins>    </build></profile>

    类比上面的release,这里是激活 dockerfile-maven-plugin 插件。当 mvn install -Pdocker 时,插件就会利用我们编写好的dockerfile构建docker镜像。

需要注意的是,dockerfile-maven-plugin目前对aarch64架构的设备支持有限,在aarch64架构的机器上运行该插件时会出现如下错误。且在本人写这篇文章的时候已经很久没有维护,这意味着aarch64架构的设备使用这个插件的问题在短期内不会解决。

[ERROR] Failed to execute goal com.spotify:dockerfile-maven-plugin:1.4.6:build (tag-latest) on project shenyu-admin-dist: Could not build image: java.util.concurrent.ExecutionException: com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException: java.lang.UnsatisfiedLinkError: could not load FFI provider jnr.ffi.provider.jffi.Provider: ExceptionInInitializerError: Can't overwrite cause with java.lang.UnsatisfiedLinkError: java.lang.UnsatisfiedLinkError: /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: dlopen(/private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib, 1): no suitable image found.  Did find:[ERROR]         /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: no matching architecture in universal wrapper[ERROR]         /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: no matching architecture in universal wrapper...

这里有个临时的解决方案:

  1. 打开一个新的shell,输入如下命令,利用 socat 将 unix socket 路由到 tcp 端口

    socat TCP-LISTEN:2375,range=127.0.0.1/32,reuseaddr,fork UNIX-CLIENT:/var/run/docker.sock
  2. 设置环境变量

    export DOCKER_HOST=tcp://127.0.0.1:2375

构建examples模块#

- name: Build examples  if: env.SKIP_CI != 'true'  run: ./mvnw -B clean install -Pexample -Dmaven.javadoc.skip=true -Dmaven.test.skip=true -f ./shenyu-examples/pom.xml

因为考虑到release的需要,目前项目根目录下的pom文件中不饱含example子模块,所以上面这个步骤另外构建了examples模块。

与上面类似,这行命令也会利用maven的插件构建镜像,以供我们后续docker编排使用。

构建定制化网关#

- name: Build integrated tests  if: env.SKIP_CI != 'true'  run: ./mvnw -B clean install -Pit -DskipTests -f ./shenyu-integrated-test/pom.xml

为了细分Apache ShenYu的不同功能的集成测试,我们在这一步将构建集成测试模块定制的网关。所谓的“定制”就是在pom文件中引入需要的最少依赖,然后代替默认的 shenyu-bootstrap。与上面两个步骤类似,这一步也会构建出docker镜像。

值得注意的是,这里的打包构建的方式与 shenyu-dist 模块的有一些不同,你可以通过对比pom文件发现。

运行docker compose#

- name: Start docker compose  if: env.SKIP_CI != 'true'  run: docker-compose -f ./shenyu-integrated-test/${{ matrix.case }}/docker-compose.yml up -d

这一步将会根据集成测试模块下编写好的不同的 docker-compose.yml 文件,进行docker编排。

version: "3.9"services:  shenyu-zk:    container_name: shenyu-zk    image: zookeeper:3.5    ...  shenyu-redis:    image: redis:6.0-alpine    container_name: shenyu-redis    ...
  shenyu-examples-http:    deploy:      resources:        limits:          memory: 2048M    container_name: shenyu-examples-http    image: shenyu-examples-http:latest    ...
  shenyu-admin:    image: apache/shenyu-admin:latest    container_name: shenyu-admin    ...
  shenyu-integrated-test-http:    container_name: shenyu-integrated-test-http    image: apache/shenyu-integrated-test-http:latest    ...    depends_on:      shenyu-admin:        condition: service_healthy    healthcheck:      test: [ "CMD", "wget", "http://shenyu-integrated-test-http:9195/actuator/health" ]      timeout: 2s      retries: 30
networks:  shenyu:    name: shenyu

例如 shenyu-integrated-test-http 模块下的 docker-compose.yml,按顺序启动了zookeeper、redis、example、admin、网关等服务。其中,example、admin、网关的镜像是我们之前构建的。

其中,docker-compose利用 depends_on 确定了服务之间的拓扑关系,并且大部分服务都有相应的健康检查,待健康检查通过后才会启动下一个服务。

运行健康检查,等待docker-compose启动完毕#

- name: Wait for docker compose start up completely  if: env.SKIP_CI != 'true'  run: bash ./shenyu-integrated-test/${{ matrix.case }}/script/healthcheck.sh

在这一步,宿主机会运行 healthcheck.sh 这个脚本,然后利用 curl 命令访问各个服务列表(在services.list文件中)的健康状态接口 /actuator/health,一直到服务状态都为正常才会继续。

运行测试#

- name: Run test  id: test  if: env.SKIP_CI != 'true'  run: ./mvnw test -Pit -f ./shenyu-integrated-test/${{ matrix.case }}/pom.xml  continue-on-error: true

这一步就是利用maven test命令,逐个执行 /src/test/ 目录下的测试类。

查看Docker Compose日志#

- name: Check test result  if: env.SKIP_CI != 'true'  run: |    docker-compose -f ./shenyu-integrated-test/${{ matrix.case }}/docker-compose.yml logs --tail="all"    if [[ ${{steps.test.outcome}} == "failure" ]]; then      echo "Test Failed"      exit 1    else      echo "Test Successful"      exit 0    fi

当工作流出现错误时,docker compose的日志可以帮助我们更好的排查问题,所以在这一步我们将docker compose的日志打印出来。

ZooKeeper数据同步源码分析

· One min read
Apache ShenYu Committer

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于ZooKeeper的数据同步源码分析。

本文基于shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. 关于ZooKeeper#

Apache ZooKeeperApache软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。ZooKeeper节点将它们的数据存储于一个分层的名字空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。

2. Admin数据同步#

我们从一个实际案例进行源码追踪,比如在后台管理系统中,对Divide插件中的一条选择器数据进行更新,将权重更新为90:

2.1 接收数据#

  • SelectorController.createSelector()

进入SelectorController类中的updateSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController {        @PutMapping("/{id}")    public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) {        // 设置当前选择器数据id        selectorDTO.setId(id);        // 创建或更新操作        Integer updateCount = selectorService.createOrUpdate(selectorDTO);        // 返回结果信息        return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount);    }        // ......}

2.2 处理数据#

  • SelectorServiceImpl.createOrUpdate()

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService {    // 负责事件发布的eventPublisher    private final ApplicationEventPublisher eventPublisher;        @Override    @Transactional(rollbackFor = Exception.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        // 构建数据 DTO --> DO        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        // 判断是添加还是更新        if (StringUtils.isEmpty(selectorDTO.getId())) {            // 插入选择器数据            selectorCount = selectorMapper.insertSelective(selectorDO);            // 插入选择器中的条件数据            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });            // check selector add            // 权限检查            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());                dataPermissionDTO.setDataId(selectorDO.getId());                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));            }
        } else {            // 更新数据,先删除再新增            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        // 发布事件        publishEvent(selectorDO, selectorConditionDTOs);
        // 更新upstream        updateDivideUpstream(selectorDO);        return selectorCount;    }            // ......    }

Serrvice类完成数据的持久化操作,即保存数据到数据库,这个比较简单,就不深入追踪了。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会执行数据同步。

publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        // 找到选择器对应的插件        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        // 构建条件数据        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // 发布变更数据        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者;

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {//......}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

2.3 分发数据#

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * 有数据变更时,调用此方法     * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                case APP_AUTH: // 认证信息                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // 插件信息                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // 规则信息                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // 元数据                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // 其他类型,抛出异常                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    }

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:

  • WebsocketDataChangedListener:基于websocket的数据同步;
  • ZookeeperDataChangedListener:基于zookeeper的数据同步;
  • ConsulDataChangedListener:基于consul的数据同步;
  • EtcdDataDataChangedListener:基于etcd的数据同步;
  • HttpLongPollingDataChangedListener:基于http长轮询的数据同步;
  • NacosDataChangedListener:基于nacos的数据同步;

既然有这么多种实现策略,那么如何确定使用哪一种呢?

因为本文是基于Zookeeper的数据同步源码分析,所以这里以ZookeeperDataChangedListener为例,分析它是如何被加载并实现的。

通过在源码工程中进行全局搜索,可以看到,它的实现是在DataSyncConfiguration类完成的。

/** * 数据同步配置类 * 通过springboot条件装配实现 * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {            /**     * zookeeper数据同步     * The type Zookeeper listener.     */    @Configuration    @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url")  // 条件属性,满足才会被加载    @Import(ZookeeperConfiguration.class)    static class ZookeeperListener {
        /**         * Config event listener data changed listener.         * 创建Zookeeper数据变更监听器         * @param zkClient the zk client         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(ZookeeperDataChangedListener.class)        public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {            return new ZookeeperDataChangedListener(zkClient);        }
        /**         * Zookeeper data init zookeeper data init.         *  创建 Zookeeper 数据初始化类         * @param zkClient        the zk client         * @param syncDataService the sync data service         * @return the zookeeper data init         */        @Bean        @ConditionalOnMissingBean(ZookeeperDataInit.class)        public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {            return new ZookeeperDataInit(zkClient, syncDataService);        }    }        //省略了其他代码......}

这个配置类是通过SpringBoot条件装配类实现的。在ZookeeperListener类上面有几个注解:

  • @Configuration:配置文件,应用上下文;

  • @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url"):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用zookeeper进行数据同步。

    shenyu:    sync:     zookeeper:          url: localhost:2181          sessionTimeout: 5000          connectionTimeout: 2000
  • @Import(ZookeeperConfiguration.class):导入另一个类ZookeeperConfiguration

  @EnableConfigurationProperties(ZookeeperProperties.class)  // 启用zk属性配置类  public class ZookeeperConfiguration {
    /**     * register zkClient in spring ioc.     * 向 Spring IOC 容器注册 zkClient     * @param zookeeperProp the zookeeper configuration     * @return ZkClient {@linkplain ZkClient}        */      @Bean      @ConditionalOnMissingBean(ZkClient.class)      public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {        return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout()); // 读取zk配置信息,并创建zkClient      }  }
@Data@ConfigurationProperties(prefix = "shenyu.sync.zookeeper") // zk属性配置public class ZookeeperProperties {
    private String url;
    private Integer sessionTimeout;
    private Integer connectionTimeout;
    private String serializer;}

当我们主动配置,采用zookeeper进行数据同步时,zookeeperDataChangedListener就会生成。所以在事件处理方法onApplicationEvent()中,就会到相应的listener中。在我们的案例中,是对一条选择器数据进行更新,数据同步采用的是zookeeper,所以,代码会进入到ZookeeperDataChangedListener进行选择器数据变更处理。

    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                                    // 省略了其他逻辑                                    case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // 在我们的案例中,会进入到ZookeeperDataChangedListener进行选择器数据变更处理                    break;         }    }

2.4 Zookeeper数据变更监听器#

  • ZookeeperDataChangedListener.onSelectorChanged()

    onSelectorChanged()方法中,判断操作类型,是刷新同步还是更新或创建同步。根据当前选择器数据信息判断节点是否在zk中。


/** * 使用 zookeeper 发布变更数据 */public class ZookeeperDataChangedListener implements DataChangedListener {        // 选择器信息发生改变    @Override    public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {        // 刷新操作        if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {            String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());            deleteZkPathRecursive(selectorParentPath);        }        // 发生变更的数据        for (SelectorData data : changed) {            // 构建选择器数据的真实路径            String selectorRealPath = DefaultPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());            // 如果是删除操作            if (eventType == DataEventTypeEnum.DELETE) {                // 删除当前数据                deleteZkPath(selectorRealPath);                continue;            }            // 父节点路径            String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(data.getPluginName());            // 创建父节点            createZkNode(selectorParentPath);            // 插入或更新数据            insertZkNode(selectorRealPath, data);        }    }
    // 创建 zk 节点    private void createZkNode(final String path) {        // 不存在才创建        if (!zkClient.exists(path)) {            zkClient.createPersistent(path, true);        }    }
    // 插入zk节点    private void insertZkNode(final String path, final Object data) {        // 创建节点        createZkNode(path);        // 通过 zkClient 写入数据        zkClient.writeData(path, null == data ? "" : GsonUtils.getInstance().toJson(data));    }    }

只要将变动的数据正确写入到zk的节点上,admin这边的操作就执行完成了。ShenYu在使用zk进行数据同步时,zk的节点是通过精心设计的。

在我们当前的案例中,对Divide插件中的一条选择器数据进行更新,将权重更新为90,就会对图中的特定节点更新。

我们用时序图将上面的更新流程串联起来。

3. 网关数据同步#

假设ShenYu网关已经在正常运行,使用的数据同步方式也是zookeeper。那么当在admin端更新选择器数据后,并且向zk发送了变更的数据,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。

3.1 ZkClient接收数据#

  • ZkClient.subscribeDataChanges()

在网关端有一个ZookeeperSyncDataService类,它通过ZkClient订阅了数据节点,当数据发生变更时,可以感知到。

/** * 使用 zookeeper 缓存数据 */public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {    private void subscribeSelectorDataChanges(final String path) {       // zkClient订阅数据节点        zkClient.subscribeDataChanges(path, new IZkDataListener() {            @Override            public void handleDataChange(final String dataPath, final Object data) {                cacheSelectorData(GsonUtils.getInstance().fromJson(data.toString(), SelectorData.class)); // 节点数据被更新            }
            @Override            public void handleDataDeleted(final String dataPath) {                unCacheSelectorData(dataPath);  // 节点数据被删除            }        });    }     // 省略了其他逻辑}

ZooKeeperWatch机制,会给订阅的客户端发送节点变更通知。在我们的案例中,更新选择器信息,就会进入到handleDataChange()方法。通过cacheSelectorData()去处理数据。

3.2 处理数据#

  • ZookeeperSyncDataService.cacheSelectorData()

经过判空逻辑之后,缓存选择器数据的操作又交给了PluginDataSubscriber处理。

    private void cacheSelectorData(final SelectorData selectorData) {        Optional.ofNullable(selectorData)                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));    }

PluginDataSubscriber是一个接口,它只有一个CommonPluginDataSubscriber实现类,负责处理插件、选择器和规则数据。

3.3 通用插件数据订阅者#

  • PluginDataSubscriber.onSelectorSubscribe()

它没有其他逻辑,直接调用subscribeDataHandler()方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。

/** * 通用插件数据订阅者,负责处理所有插件、选择器和规则信息 * The type Common plugin data subscriber. */public class CommonPluginDataSubscriber implements PluginDataSubscriber {    //......     // 处理选择器数据    @Override    public void onSelectorSubscribe(final SelectorData selectorData) {        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);    }            // 订阅数据处理器,处理数据的更新或删除    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {        Optional.ofNullable(classData).ifPresent(data -> {            // 插件数据            if (data instanceof PluginData) {                PluginData pluginData = (PluginData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cachePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));                }            } else if (data instanceof SelectorData) {  // 选择器数据                SelectorData selectorData = (SelectorData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));                }            } else if (data instanceof RuleData) {  // 规则数据                RuleData ruleData = (RuleData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));                } else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));                }            }        });    }    }

3.4 数据缓存到内存#

那么更新一条选择器数据,会进入下面的逻辑:

// 将数据保存到网关内存BaseDataCache.getInstance().cacheSelectData(selectorData);// 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

一是将数据保存到网关的内存中。BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

public final class BaseDataCache {    // 私有变量    private static final BaseDataCache INSTANCE = new BaseDataCache();    // 私有构造器    private BaseDataCache() {    }        /**     * Gets instance.     *  公开方法     * @return the instance     */    public static BaseDataCache getInstance() {        return INSTANCE;    }        /**    *  缓存选择器数据的Map     * pluginName -> SelectorData.     */    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();        public void cacheSelectData(final SelectorData selectorData) {        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);    }           /**     * cache selector data.     * 缓存选择器数据     * @param data the selector data     */    private void selectorAccept(final SelectorData data) {        String key = data.getPluginName();        if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入            List<SelectorData> existList = SELECTOR_MAP.get(key);            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());            resultList.add(data);            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());            SELECTOR_MAP.put(key, collect);        } else {  // 新增操作,直接放到Map中            SELECTOR_MAP.put(key, Lists.newArrayList(data));        }    }    }

二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。

经过以上的源码追踪,并通过一个实际的案例,在admin端新增更新一条选择器数据,就将zookeeper数据同步的流程分析清楚了。

我们还是通过时序图将网关端的数据同步流程串联一下:

数据同步的流程已经分析完了,为了不让同步流程被打断,在分析过程中就忽略了其他逻辑。我们还需要分析Admin同步数据初始化和网关同步操作初始化的流程。

4. Admin同步数据初始化#

admin启动后,会将当前的数据信息全量同步到zk中,实现逻辑如下:


/** * Zookeeper 数据初始化 */public class ZookeeperDataInit implements CommandLineRunner {
    private final ZkClient zkClient;
    private final SyncDataService syncDataService;
    /**     * Instantiates a new Zookeeper data init.     *     * @param zkClient        the zk client     * @param syncDataService the sync data service     */    public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {        this.zkClient = zkClient;        this.syncDataService = syncDataService;    }
    @Override    public void run(final String... args) {        String pluginPath = DefaultPathConstants.PLUGIN_PARENT;        String authPath = DefaultPathConstants.APP_AUTH_PARENT;        String metaDataPath = DefaultPathConstants.META_DATA;        // 判断zk中是否存在数据        if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {            syncDataService.syncAll(DataEventTypeEnum.REFRESH);        }    }}

判断zk中是否存在数据,如果不存在,则进行同步。

ZookeeperDataInit实现了CommandLineRunner接口。它是springboot提供的接口,会在所有 Spring Beans初始化之后执行run()方法,常用于项目中初始化的操作。

  • SyncDataService.syncAll()

从数据库查询数据,然后进行全量数据同步,所有的认证信息、插件信息、选择器信息、规则信息和元数据信息。主要是通过eventPublisher发布同步事件。这里就跟前面提到的同步逻辑就又联系起来了,eventPublisher通过publishEvent()发布完事件后,有ApplicationListener执行事件变更操作,在ShenYu中就是前面提到的DataChangedEventDispatcher

@Servicepublic class SyncDataServiceImpl implements SyncDataService {    // 事件发布    private final ApplicationEventPublisher eventPublisher;         /***     * 全量数据同步     * @param type the type     * @return     */    @Override    public boolean syncAll(final DataEventTypeEnum type) {        // 同步认证信息        appAuthService.syncData();        // 同步插件信息        List<PluginData> pluginDataList = pluginService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));        // 同步选择器信息        List<SelectorData> selectorDataList = selectorService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));        // 同步规则信息        List<RuleData> ruleDataList = ruleService.listAll();        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));        // 同步元数据信息        metaDataService.syncData();        return true;    }    }

5. 网关同步操作初始化#

网关这边的数据同步初始化操作主要是订阅zk中的节点,当有数据变更时,收到变更数据。这依赖于ZooKeeperWatch机制。在ShenYu中,负责zk数据同步的是ZookeeperSyncDataService,也在前面提到过。

ZookeeperSyncDataService的功能逻辑是在实例化的过程中完成的:对zk中的shenyu数据同步节点完成订阅。这里的订阅分两类,一类是已经存在的节点上面数据发生更新,这通过zkClient.subscribeDataChanges()方法实现;另一类是当前节点下有新增或删除节点,即子节点发生变化,这通过zkClient.subscribeChildChanges()方法实现。

ZookeeperSyncDataService的代码有点多,这里我们以插件数据的读取和订阅进行追踪,其他类型的数据操作原理是一样的。


/** *  zookeeper 数据同步服务 */public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {    // 在实例化的时候,完成从zk中读取数据的操作,并订阅节点    public ZookeeperSyncDataService( /*省略构造参数参数*/ ) {        this.zkClient = zkClient;        this.pluginDataSubscriber = pluginDataSubscriber;        this.metaDataSubscribers = metaDataSubscribers;        this.authDataSubscribers = authDataSubscribers;        // 订阅插件、选择器和规则数据        watcherData();        // 订阅认证数据        watchAppAuth();        // 订阅元数据        watchMetaData();    }        private void watcherData() {        // 插件节点路径        final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;        // 所有插件节点        List<String> pluginZKs = zkClientGetChildren(pluginParent);        for (String pluginName : pluginZKs) {            // 订阅当前所有插件、选择器和规则数据            watcherAll(pluginName);        }        // 订阅子节点(新增或删除一个插件)        zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {            if (CollectionUtils.isNotEmpty(currentChildren)) {                for (String pluginName : currentChildren) {                    // 需要订阅子节点的所有插件、选择器和规则数据                    watcherAll(pluginName);                }            }        });    }        private void watcherAll(final String pluginName) {        // 订阅插件数据        watcherPlugin(pluginName);        // 订阅选择器数据        watcherSelector(pluginName);        // 订阅规则数据        watcherRule(pluginName);    }
    private void watcherPlugin(final String pluginName) {        // 当前插件路径        String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);        // 是否存在,不存在就创建        if (!zkClient.exists(pluginPath)) {            zkClient.createPersistent(pluginPath, true);        }        // 读取zk上当前节点数据,并反序列化        PluginData pluginData = null == zkClient.readData(pluginPath) ? null                : GsonUtils.getInstance().fromJson((String) zkClient.readData(pluginPath), PluginData.class);        // 缓存到网关内存中        cachePluginData(pluginData);        // 订阅插件节点        subscribePluginDataChanges(pluginPath, pluginName);    }       private void cachePluginData(final PluginData pluginData) {       // 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来    }        private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {        // 订阅数据变更:更新或删除        zkClient.subscribeDataChanges(pluginPath, new IZkDataListener() {
            @Override            public void handleDataChange(final String dataPath, final Object data) {  // 更新操作                 // 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来            }
            @Override            public void handleDataDeleted(final String dataPath) {   // 删除操作                  // 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来
            }        });    }    }    

上面的源代码中都给出了注释,相信大家可以看明白。订阅插件数据的主要逻辑如下:

  1. 构造当前插件路径
  2. 路径是否存在,不存在就创建
  3. 读取zk上当前节点数据,并反序列化
  4. 插件数据缓存到网关内存中
  5. 订阅插件节点

6. 总结#

本文通过一个实际案例,对zookeeper的数据同步原理进行了源码分析。涉及到的主要知识点如下:

  • 基于zookeeper的数据同步,主要是通过watch机制实现;
  • 通过Spring完成事件发布和监听;
  • 通过抽象DataChangedListener接口,支持多种同步策略,面向接口编程;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过SpringBoot的条件装配和starter加载机制实现配置类的加载。

Context-Path插件源码分析

· One min read
Kunshuai Zhu
Apache ShenYu Contributor

开始前,可以参考 这篇文章 运行shenyu网关

正文#

首先,看ContextPathPlugin#doExecute方法,这是这个插件的核心。

protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {    ...    // 1. 从JVM缓存中取得contextMappingHandle    ContextMappingHandle contextMappingHandle = ContextPathPluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));    ...    // 2. 根据contextMappingHandle设置shenyu上下文    buildContextPath(shenyuContext, contextMappingHandle);    return chain.execute(exchange);}
  1. 从JVM缓存中取得contextMappingHandle

    这里的contextMappingHandleContextMappingHandle类的实例,里面有两个成员变量:contextPathaddPrefix

    这两个变量在之前Admin里面的Rules表单里有出现过,是在数据同步的时候更新的。

  2. 根据contextMappingHandle设置shenyu上下文

    下面是ContextPathPlugin#buildContextPath方法的源代码

    private void buildContextPath(final ShenyuContext context, final ContextMappingHandle handle) {    String realURI = "";    // 1. 设置shenyu的context path,根据contextPath的长度将真实URI的前缀去掉    if (StringUtils.isNoneBlank(handle.getContextPath())) {        context.setContextPath(handle.getContextPath());        context.setModule(handle.getContextPath());        realURI = context.getPath().substring(handle.getContextPath().length());    }    // 加上前缀    if (StringUtils.isNoneBlank(handle.getAddPrefix())) {        if (StringUtils.isNotBlank(realURI)) {            realURI = handle.getAddPrefix() + realURI;        } else {            realURI = handle.getAddPrefix() + context.getPath();        }    }    context.setRealUrl(realURI);}
    • 设置shenyu的context path,根据contextPath的长度将真实URI的前缀去掉

      你可能会有疑问,这里所谓的「根据contextPath的长度」会不会有问题呢?

      实际上这样的判断是没有问题的,因为请求在被Selector和Rules匹配到之后,才会被插件处理。所以在设置好Selector和Rules的前提下,是完全可以满足转换特定contextPath的需求的。

然后,ContextPathPlugin类还有一个比较重要的方法skip,下面展示了部分代码。我们可以发现:如果是对RPC服务的调用,就会直接跳过context_path插件。

public Boolean skip(final ServerWebExchange exchange) {    ...    return Objects.equals(rpcType, RpcTypeEnum.DUBBO.getName())            || Objects.equals(rpcType, RpcTypeEnum.GRPC.getName())            || Objects.equals(rpcType, RpcTypeEnum.TARS.getName())            || Objects.equals(rpcType, RpcTypeEnum.MOTAN.getName())            || Objects.equals(rpcType, RpcTypeEnum.SOFA.getName());}

最后,context_path插件还有另一个类ContextPathPluginDataHandler。这个类的作用是订阅插件的数据,当插件配置被修改、删除、增加时,就往JVM缓存里面修改、删除、新增数据。

Param-Mapping插件源码分析

· One min read
Kunshuai Zhu
Apache ShenYu Contributor

开始前,可以参考 这篇文章 运行shenyu网关

正文#

先看一下这个插件的结构,如下图。

param-mapping-structure

猜测:handler是用来做数据同步的;strategy中文意思是策略,可能是对各种请求体做了适配,应该是这个插件的重点;ParamMappingPlugin 应该是 ShenyuPlugin 的实现。

首先,看一下 ParamMappingPlugin ,里面主要是对 doExecute 方法的重写。

public Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {    ... // paramMappingHandle判断是否为空    // 根据首部行中的contentType确定请求体类型    HttpHeaders headers = exchange.getRequest().getHeaders();    MediaType contentType = headers.getContentType();    // *    return match(contentType).apply(exchange, chain, paramMappingHandle);}
  • match方法是根据contentType返回对应的 Operator

    private Operator match(final MediaType mediaType) {    if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {        return operatorMap.get(MediaType.APPLICATION_JSON.toString());    } else if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {        return operatorMap.get(MediaType.APPLICATION_FORM_URLENCODED.toString());    } else {        return operatorMap.get(Constants.DEFAULT);    }}

    从match方法的代码可以看出,目前有 DefaultOperatorFormDataOperatorJsonOperator三种,支持 x-www-form-urlencodedjson 两种格式的请求体。

那么我们就来看一下上面三种Operator究竟是怎么样的吧。

一、DefaultOperator#

虚晃一枪,它的apply方法只是继续执行插件链,并没有实质功能。当请求体没有匹配到Operator时,就会通过 DefaultOperator 跳过。

二、FormDataOperator#

这个类是用来处理 x-www-form-urlencoded 格式的请求体的。

主要是看apply方法,但是这个apply方法长得有点奇怪。

public Mono<Void> apply(final ServerWebExchange exchange, final ShenyuPluginChain shenyuPluginChain, final ParamMappingHandle paramMappingHandle) {    return exchange.getFormData()            .switchIfEmpty(Mono.defer(() -> Mono.just(new LinkedMultiValueMap<>())))            .flatMap(multiValueMap -> {                ...            });}

省略号中的代码是对请求体的处理,如下。

// 判空if (Objects.isNull(multiValueMap) || multiValueMap.isEmpty()) {    return shenyuPluginChain.execute(exchange);}// 将form-data转化成jsonString original = GsonUtils.getInstance().toJson(multiValueMap);LOG.info("get from data success data:{}", original);// *修改请求体*String modify = operation(original, paramMappingHandle);if (StringUtils.isEmpty(modify)) {    return shenyuPluginChain.execute(exchange);}...// 将修改后的json,转换成LinkedMultiValueMap。注意一下这一行,后面会提到!LinkedMultiValueMap<String, String> modifyMap = GsonUtils.getInstance().toLinkedMultiValueMap(modify);...final BodyInserter bodyInserter = BodyInserters.fromValue(modifyMap);...// 修改exchange中的请求体,然后继续执行插件链return bodyInserter.insert(cachedBodyOutputMessage, new BodyInserterContext())        .then(Mono.defer(() -> shenyuPluginChain.execute(exchange.mutate()                .request(new ModifyServerHttpRequestDecorator(httpHeaders, exchange.getRequest(), cachedBodyOutputMessage))                .build())        )).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(cachedBodyOutputMessage, throwable));

PS: 省略的部分是设置请求头等操作。

上面比较重要的应该是打星的修改请求体,也就是 operation 方法的调用。这里因为参数类型的原因,会先调用 Operator 接口的默认方法(而不是 FormDataOperator 重写的)。

default String operation(final String jsonValue, final ParamMappingHandle paramMappingHandle) {    DocumentContext context = JsonPath.parse(jsonValue);    // 调用重写的operation方法,添加addParameterKey    operation(context, paramMappingHandle);    // 对设置的replacedParameterKey进行替换    if (!CollectionUtils.isEmpty(paramMappingHandle.getReplaceParameterKeys())) {        paramMappingHandle.getReplaceParameterKeys().forEach(info -> {            context.renameKey(info.getPath(), info.getKey(), info.getValue());        });    }    // 对设置的removeParameterKey进行删除    if (!CollectionUtils.isEmpty(paramMappingHandle.getRemoveParameterKeys())) {        paramMappingHandle.getRemoveParameterKeys().forEach(info -> {            context.delete(info);        });    }    return context.jsonString();}

梳理下来可以发现,这里引入的json工具JsonPath使得请求体的加工变得简单、清晰很多。

另外,我们可以注意到 FormDataOperator 重写了 operation(DocumentContext, ParamMappingHandle) 方法。

为什么要重写呢? 接口中有对应处理addParameterKey的默认方法啊。

// Operator接口中的默认方法default void operation(final DocumentContext context, final ParamMappingHandle paramMappingHandle) {    if (!CollectionUtils.isEmpty(paramMappingHandle.getAddParameterKeys())) {        paramMappingHandle.getAddParameterKeys().forEach(info -> {            context.put(info.getPath(), info.getKey(), info.getValue()); //不同之处        });    }}
// FormDataOperator重写的方法@Overridepublic void operation(final DocumentContext context, final ParamMappingHandle paramMappingHandle) {    if (!CollectionUtils.isEmpty(paramMappingHandle.getAddParameterKeys())) {        paramMappingHandle.getAddParameterKeys().forEach(info -> {            context.put(info.getPath(), info.getKey(), Arrays.asList(info.getValue()));        });    }}

实际上,在 FormDataOperator#apply 中有这么一行(前面有提到):LinkedMultiValueMap<String, String> modifyMap = GsonUtils.getInstance().toLinkedMultiValueMap(modify);

这一行是将修改后的json转换成 LinkedMultiValueMapGsonUtils#toLinkedMultiValueMap 如下。

public LinkedMultiValueMap<String, String> toLinkedMultiValueMap(final String json) {    return GSON.fromJson(json, new TypeToken<LinkedMultiValueMap<String, String>>() {    }.getType());}

LinkedMultiValueMap 类中的属性 targetMap 定义为:private final Map<K, List<V>> targetMap

因此,json字符串中的value必须是列表形式的,不然Gson就会抛出转换错误的异常,这也就是为什么 FormDataOperator 要重写operator方法。

那么为什么要用 LinkedMultiValueMap 呢?

回到 FormDataOperator#apply 方法的第一行 exchange.getFormData 。而SpringMVC中,DefaultServerWebExchange#getFormData 的返回值类型就是 Mono<MultiValueMap<String, String>> ,而 LinkedMultiValueMapMultiValueMap 的子类。并且,getFormData 方法就是针对 x-www-form-urlencoded 格式的请求体的。

param-mapping-getFormData

三、JsonOperator#

显然,这个类是用来处理Json格式的请求体的。

public Mono<Void> apply(final ServerWebExchange exchange, final ShenyuPluginChain shenyuPluginChain, final ParamMappingHandle paramMappingHandle) {    ServerRequest serverRequest = ServerRequest.create(exchange, MESSAGE_READERS);    Mono<String> mono = serverRequest.bodyToMono(String.class).switchIfEmpty(Mono.defer(() -> Mono.just(""))).flatMap(originalBody -> {        LOG.info("get body data success data:{}", originalBody);        // 调用默认的operation方法修改请求体        String modify = operation(originalBody, paramMappingHandle);        return Mono.just(modify);    });    BodyInserter bodyInserter = BodyInserters.fromPublisher(mono, String.class);    ... //处理首部行    CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);    // 修改exchange中的请求体,然后继续执行插件链    return bodyInserter.insert(outputMessage, new BodyInserterContext())            .then(Mono.defer(() -> {                ServerHttpRequestDecorator decorator = new ModifyServerHttpRequestDecorator(headers, exchange.getRequest(), outputMessage);                return shenyuPluginChain.execute(exchange.mutate().request(decorator).build());            })).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(outputMessage, throwable));}

JsonOperator 的处理流程与 FormDataOperator 大致类似。

总结#

最后,用一张图来简单总结一下。

param-mapping-summary

LoadBalance SPI 代码分析

· One min read
Apache ShenYu Contributor

​ 网关应用需要支持多种负载均衡的方案,包括随机选择、Hash、轮询等方式。Apache Shenyu网关中不仅实现了传统网关的这些均衡策略,还通过流量预热(warmup)等细节处理,对服务器节点的加入,做了更平滑的流量处理,获得了更好的整体稳定性。让我们来看看Shenyu是是如何设计和实现这部分功能的。

本文基于shenyu-2.4.0版本进行源码分析.

[TOC]

LoadBalance SPI#

LoadBalance SPI 定义在shenyu-plugin-divide模组中,以下是这个核心接口的代码,这个接口很好的诠释了这样一个理念:负载均衡是在一系列服务器节点中选出最合适的节点,也就是选择策略。做流量转发、路由和负载均衡是LoadBalance SPI的基本功能

@SPIpublic interface LoadBalance {
    /**     * @param upstreamList upstream list     * @param ip ip     * @return divide upstream     */    DivideUpstream select(List<DivideUpstream> upstreamList, String ip);}

接口中,upstreamList是可选路由的一组服务器节点,DivideUpstream 是服务器节点的数据结构,它包括的重要元素有:协议、upstreamUrl 、权重、时间戳,warmup等。

public class DivideUpstream implements Serializable {    private String upstreamHost;    /**     * this is http protocol.     */    private String protocol;    private String upstreamUrl;    private int weight;    /**     * false close/ true open.     */    @Builder.Default    private boolean status = true;    private long timestamp;    private int warmup;}

Design of LoadBalance module#

图1是LoadBalance模组的类图:

loadbalance-class-diagram

从类图上可以看出LoadBalance的设计概要:

  1. 抽象类AbstractLoadBalance继承自LoadBalance SPI接口,并提供选择的模板方法,及权重计算。

  2. 三个实做类继承AbstractLoadBalance, 实现各自的逻辑处理。

    • RandomLoadBalance -加权随机选择 Weight Random
    • HashLoadBalance - 一致性Hash
    • RoundRobinLoadBalance -加权轮询(Weight Round Robin per-packet)
  3. 由Util类LoadBalanceUtil 实现对外的静态调用方法。

    另外根据Apache Sheny SPI规范,在SHENYU_DIERECTORY中的添加profile,配置LoadBalance的实现类,配置key=class形式,左边的operator要和LoadBalanceEnum中的定义一致。

random=org.apache.shenyu.plugin.divide.balance.spi.RandomLoadBalanceroundRobin=org.apache.shenyu.plugin.divide.balance.spi.RoundRobinLoadBalancehash=org.apache.shenyu.plugin.divide.balance.spi.HashLoadBalance

LoadBalanceEnum的定义如下:

public enum LoadBalanceEnum {    /**     * Hash load balance enum.     */    HASH(1, "hash", true),
    /**     * Random load balance enum.     */    RANDOM(2, "random", true),
    /**     * Round robin load balance enum.     */    ROUND_ROBIN(3, "roundRobin", true);
    private final int code;    private final String name;    private final boolean support;}

AbstractLoadBalance#

这个抽象类实做了LoadBalance接口, 定义了抽象方法doSelect()留给实作类处理,在模板方法select() 中先进行校验,之后调用由实作类实现的doSelect()方法。

   /**     * Do select divide upstream.     *     * @param upstreamList the upstream list     * @param ip           the ip     * @return the divide upstream     */    protected abstract DivideUpstream doSelect(List<DivideUpstream> upstreamList, String ip);
    @Override    public DivideUpstream select(final List<DivideUpstream> upstreamList, final String ip) {        if (CollectionUtils.isEmpty(upstreamList)) {            return null;        }        if (upstreamList.size() == 1) {            return upstreamList.get(0);        }        return doSelect(upstreamList, ip);    }

权重的处理方法getWeight()的逻辑是:当有时间戳,并且当前时间与时间戳间隔在流量预热warmup时间内,权重计算的公式为: $$ {1-1} ww = min(1,uptime/(warmup/weight)) $$ 从公式可以看出,最终的权值,与设置的weigth成正比,时间间隔越接近warmup时间,权重就越大。也就是说等待的时间越长,被分派的权重越高。没有时间戳时等其他情况下,返回DivideUpstream设置的weight值。

考虑流量预热(warmup)的核心思想是避免在添加新服务器和启动新JVM时网关性能不佳。

下面我们看一下三个实做类的实现。

RandomLoadBalance#

这里随机LoadBalance 可以处理两种情况:

  1. 没有权重:所有服务器都没有设定权重,或者权重都一样, 会随机选择一个。
  2. 有权重:服务器设定有不同的权重,会根据权重,进行随机选择。

下面是有权重时的随机选择代码random(): 遍历服务器列表,当产生的随机值小于某个服务器权重时,这个服务器被选中。 若遍历后没有满足条件,就返回服务器列表的第一个。这里getWeight(DivideUpstream upstream) 方法是在AbstractLoadBalance 中定义的,按公式计算权重。

    private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {        // If the weights are not the same and the weights are greater than 0, then random by the total number of weights        int offset = RANDOM.nextInt(totalWeight);        // Determine which segment the random value falls on        for (DivideUpstream divideUpstream : upstreamList) {            offset -= getWeight(divideUpstream);            if (offset < 0) {                return divideUpstream;            }        }        return upstreamList.get(0);    }

因此,当采用RandomLoadBalance时,是按权重随机分派服务器的。

HashLoadBalance#

Apache ShenyuHashLoadBalance 中采用了一致性hash算法,使用有序hash环,将key与服务器节点的hash映射缓存起来。对于请求的ip地址,计算出其hash值, 在hash环上顺时针查找距离这个key的hash值最近的节点,将其作为要路由的节点。一致性hash解决了传统取余hash算法的可伸缩性差的问题。

HashLoadBalance中的采用的是加密的单向MD5散列函数,这个hash函数会hash后产生不可预期但确定性的()的结果,输出为32-bit的长整数。hash代码如下:

private static long hash(final String key) {    // md5 byte    MessageDigest md5;    try {        md5 = MessageDigest.getInstance("MD5");    } catch (NoSuchAlgorithmException e) {        throw new ShenyuException("MD5 not supported", e);    }    md5.reset();    byte[] keyBytes;    keyBytes = key.getBytes(StandardCharsets.UTF_8);    md5.update(keyBytes);    byte[] digest = md5.digest();    // hash code, Truncate to 32-bits    long hashCode = (long) (digest[3] & 0xFF) << 24            | ((long) (digest[2] & 0xFF) << 16)            | ((long) (digest[1] & 0xFF) << 8)            | (digest[0] & 0xFF);    return hashCode & 0xffffffffL;}

再看一下HashLoadBalance的选择函数doSelect()的实现:

    private static final int VIRTUAL_NODE_NUM = 5;
    @Override    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {        final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();        for (DivideUpstream address : upstreamList) {            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {                long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);                treeMap.put(addressHash, address);            }        }        long hash = hash(String.valueOf(ip));        SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);        if (!lastRing.isEmpty()) {            return lastRing.get(lastRing.firstKey());        }        return treeMap.firstEntry().getValue();    }

这个方法中,生成带虚拟服务器节点的hash环, 一个实际节点会生成5个虚拟节点,因此整个hash环的均匀性大大增加,降低数据倾斜的发生。

为了实现hash环的有序性及顺时针查找功能,代码中使用Java 的ConcurrentSkipListMap 来存储带虚拟节点的服务器节点及其hash值, 它既能保证线程安全,又能保证数据的有序性,支持高并发。 另外,ConcurrentSkipListMap提供了一个tailMap(K fromKey)方法,可从map中查找比fromKey大的值的集合,但并不需要遍历整个数据结构。

上述代码中,生成hash环之后,就是调用ConcurrentSkipListMaptailMap()方法,找到大于等于请求的ip的hash值的子集,这个子集的第一个就是要路由的服务器节点。采用了合适的数据结构,这里的代码看上去是不是特别的简洁流畅?

RoundRobinLoadBalance#

Round-robin轮询方法的原始定义是顺序循环将请求依次循环地连接到每个服务器。当某个服务器发生故障(例如:一分钟连接不上的服务器),从候选队列中取出,不参与下一次的轮询,直到其恢复正常。在 RoundRobinLoadBalance中实现的是组内加权轮询(Weight Round Robin per-packet)方法:

为了计算和存储每个服务器节点的轮询次数,在这个类中定义了一个静态内部类WeigthRoundRobin,我们先看一下它的主要代码(去掉了注释):

protected static class WeightedRoundRobin {
    private int weight;    private final AtomicLong current = new AtomicLong(0);
    private long lastUpdate;
    void setWeight(final int weight) {        this.weight = weight;        current.set(0);    }    long increaseCurrent() {        return current.addAndGet(weight);    }
    void sel(final int total) {        current.addAndGet(-1 * total);    }    void setLastUpdate(final long lastUpdate) {        this.lastUpdate = lastUpdate;    }}

请重点关注这几个方法:

  • setWeight(final int weight) ,为对象设定权重,并将current重置为0.

  • increaseCurrent() : 对AtomicLong类型的对象current,累加其权重值。

  • sel(final int total): current减去传入的 total值。

下面我们看一下带权重的轮询过程是如何实现的。 首先定义了一个ConcurrentMap类型对象methodWeightMap 两层对象来存储服务器列表与其各个明细节点的轮询资料。

private final ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<>(16);

这个map对象第一层的key为当前服务器列表的第一个节点的upstreamUrl, 第二个对象ConcurrentMap<String, WeightedRoundRobin>存储了组内各个服务器节点的轮询情况,内层Map的key为组内每个服务器的upstreamUrlMap对象使用JUCConcurrentHashMap,不仅存取高效,而且线程安全,支持高并发。

内层map的每个节点对应的WeighedRoundRobin作为静态内部类能确保线程安全,并实现组内的加权轮询选择功能。下面是这个类的doSelect()方法的代码。

@Overridepublic DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {    String key = upstreamList.get(0).getUpstreamUrl();    ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);    if (map == null) {        methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));        map = methodWeightMap.get(key);    }    int totalWeight = 0;    long maxCurrent = Long.MIN_VALUE;    long now = System.currentTimeMillis();    DivideUpstream selectedInvoker = null;    WeightedRoundRobin selectedWRR = null;    for (DivideUpstream upstream : upstreamList) {        String rKey = upstream.getUpstreamUrl();        WeightedRoundRobin weightedRoundRobin = map.get(rKey);        int weight = getWeight(upstream);        if (weightedRoundRobin == null) {            weightedRoundRobin = new WeightedRoundRobin();            weightedRoundRobin.setWeight(weight);            map.putIfAbsent(rKey, weightedRoundRobin);        }        if (weight != weightedRoundRobin.getWeight()) {            //weight changed            weightedRoundRobin.setWeight(weight);        }        long cur = weightedRoundRobin.increaseCurrent();        weightedRoundRobin.setLastUpdate(now);        if (cur > maxCurrent) {            maxCurrent = cur;            selectedInvoker = upstream;            selectedWRR = weightedRoundRobin;        }        totalWeight += weight;    }    ......  //erase the section which handles the time-out upstreams.     if (selectedInvoker != null) {        selectedWRR.sel(totalWeight);        return selectedInvoker;    }    // should not happen here    return upstreamList.get(0);}

举例,若服务器组upstreamUrl 分别为: LIST = [upstream-20, upstream-50, upstream-30]时,经过一轮执行后,建立的methodWeightMap 资料如下:

methodWeightMap

假设上述的LIST中,各个服务器节点的权重数组为: [20,50,30], 下图是内部类current 值变化和轮询选择过程:

weighted-roundrobin-demo

每一轮,选择值current最大的服务器节点:

  • Round1:
    • 对当前服务器LIST做遍历,当服务器节点的weightedRoundRobin 为null时,current被置为各自的权重; 不为null时,累加各自的权重。
    • 即:遍历后current 分别为 [20, 50,30] , 会选择Stream-50, Stream-50对应的WeightRoundRobin静态类做 sel(-total)处理,current 更新为[20,-50, 30].
  • Round 2 遍历后的current是[40,0,60], 会选择Stream-30, current分别更新为[40,0,-40].
  • Round 3 遍历后的current是[60,50,-10], 会选择Stream-20,current分别更新为[-40,50,-10].

中间进行了容错处理, 当服务器的个数与map个数不一样,就对methodWeightMap 加锁做处理。 用先copy 后modify的方式, 把超时的服务器remove掉,即移除掉发生故障的服务器,并更新Map资料。如下是异常时的处理代码:

    if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {        try {            // copy -> modify -> update reference            ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);            newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);            methodWeightMap.put(key, newMap);        } finally {            updateLock.set(false);        }    }
    if (selectedInvoker != null) {        selectedWRR.sel(totalWeight);        return selectedInvoker;    }    // should not happen here    return upstreamList.get(0);

LoadBalanceUtils#

在这个类中,提供了调用LoadBalance的静态方法, 其中ExtensionLoaderApache ShenyuSPI执行入口。也就是说,LoadBalance模组是可配置、可扩展的。这个静态方法中的algorithm变量是LoadBalanceEnum中定义name枚举类型。

    /**     * Selector divide upstream.     *     * @param upstreamList the upstream list     * @param algorithm    the loadBalance algorithm     * @param ip           the ip     * @return the divide upstream     */    public static DivideUpstream selector(final List<DivideUpstream> upstreamList, final String algorithm, final String ip) {        LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm);        return loadBalance.select(upstreamList, ip);    }

Using of LoadBalance module#

上面说明了LoadBalance SPI接口及三个实作类。下面看一下LoadBalanceApache Shenyu中是如何被调用的。DividePlugin是路由选择插件,所有的Http请求都由该插件进行负载均衡处理。当请求头rpcType = http, 且开启该插件时,它将根据请求参数匹配规则,最终交由下游插件进行响应式代理调用。

DividePlugindoExecute方法中,先对要转发的请求的Header大小、content长度等做校验,

@SneakyThrows@Overrideprotected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {   ......}

接口方法的第二个参数是ShenyuPluginChain 类型,代表plugin的调用链,具体可参见Apache Sheyuplugin的调用机制。第三个SelectorData类型的参数是选择器, 第四个是RuldData类型,代表规则。分别请查看对应的代码。

下面给出了doExecute()方法中,有关LoadBalance调用的代码片段:

   //取到要路由的服务器节点列表。   List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());    ...     //取到请求的ip    String ip =   Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
    //调用Util方法,执行LoadBalance处理    DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);

这里UpstreamCacheManager 是缓存的要路由的服务器节点 , ruleHandle.getLoadBalance()取到的是LoadBalanceEnum定义的枚举name, 如random, hash, roundRobin等.

经过封装,调用负载均衡功能非常的方便。 未来增加新的LoadBalance类,这些调用的Plugin代码完全不需要变更。

Summary#

经过上面的代码解读,从设计角度总结LoadBalance 模组具有如下的特点:

  1. 可扩展性:面向接口的设计,及基于Apache Shenyu SPI的实现,使得系统具有良好的可扩展性。可以方便的扩展为其他的动态的负载均衡算法,如最少连接方式(least connection)、最快模式( fastest)。并支持集群处理,具有良好的可扩展性。

  2. 可伸缩性: 采用的一致性hash LoadBalance、权重随机和权重轮询,都可以无缝支持集群扩容或缩容。

  3. 流量预热等更细致的设计,能带来整体上更为平滑的负载均衡。

MatchStrategy--基于SPI的代码分析

· One min read
Apache ShenYu Contributor

Apache Shenyu 网关的各个Plugin(包括Dubbo, gRPC,Spring-cloud等) 中,routing参数均设计为可以接受多个条件的组合。 为了实现这样的目的,遵循其SPI的机制进行将参数及行为抽象为如下三部分,这些SPIshenyu-plugin-base模组中实现

  • ParameterData-参数资料
  • PredictJudge-断言
  • MatchStrategy-匹配策略

相对而言,匹配策略是需要扩展点最少的部分。想象一下,对多个条件的组合判断,最常见的几种规则是:全部都满足、至少满足一个条件、至少满足第一个,或者大部分满足等等。 并且要做到对各种plugin的不同类型的参数,如IP, header, uri等。针对这些需求,如何将MatchStrategy设计得简单易用且容易扩展?

MatchStrategy#

MatchStrategy的实现代码在shenyu-plugin-base模组中,基于Apache ShenyuSPI创建机制, 设计上结合了工厂模式和策略模式,整体MatchStrategy的设计类图如下下:

MatchStrategy-class-diagram

以接口MatchStrategy为基础,设计实现类,并由抽象类AbstractMatchStrategy实现公共方法,由工厂类MatchStrategyFactory提供创建和外部调用功能。

MatchStrategy Interface#

首先来看MatchStrategy SPI接口的定义:

@SPIpublic interface MatchStrategy {
    Boolean match(List<ConditionData> conditionDataList, ServerWebExchange exchange);}

@SPI annotation代表这是一个SPI接口。ServerWebExchangeorg.springframework.web.server.ServerWebExchange ,代表HTTPrequest-response 的交互内容。ConditionData的代码如下,更多说明可以参考PredicateJudge代码分析中的说明,

public class ConditionData {
    private String paramType;    private String operator;
    private String paramName;    private String paramValue;}

AbstractMatchStrategy#

在抽象类AbstractMatchStrategy中,定义MatchStrategy的公共方法, 用buildRealData方法中,用ParameterData工厂类ParameterDataFactory,将多种参数如 Ip, Cookie, Header,uri等资料都以统一的接口方法来呈现。这些参数格式及规则的修改,不会影响到对参数规则匹配MatchStrategy的调用。

public abstract class AbstractMatchStrategy {
    public String buildRealData(final ConditionData condition, final ServerWebExchange exchange) {        return ParameterDataFactory.builderData(condition.getParamType(), condition.getParamName(), exchange);    }}

实现类及Profile#

基于上述接口定义, shenyu-plugin-base 模组提供了两个MatchStrategy实现类

  • AndMatchStrategy-多个条件 AND

  • OrMatchStrategy- 多个条件 OR

    并在SHENYU_DIRECTORY目录下的配置文件中,对实作类做了配置。在系统启动时会由顶层SPIkey-value形式加载并cache起来。

and=org.apache.shenyu.plugin.base.condition.strategy.AndMatchStrategyor=org.apache.shenyu.plugin.base.condition.strategy.OrMatchStrategy

两个实现类AndMatchStrategy 继承AbstractMatchStrategy 并实做了MatchStrategy

AndMatchStrategy- “与”的关系#

由于PredicateJudge封装了条件判断的多样性,ConditionDataParameData封装了多种参数。那么对于多个条件的匹配来说,采用Stream流处理及lamda表达式,非常简洁高效达成了:全部条件都满足,即"AND"的逻辑。

@Joinpublic class AndMatchStrategy extends AbstractMatchStrategy implements MatchStrategy {
    @Override    public Boolean match(final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {        return conditionDataList                .stream()                .allMatch(condition -> PredicateJudgeFactory.judge(condition, buildRealData(condition, exchange)));    }}

OrMatchStrategy是同样的实现方式,实现: 至少满足一个条件"OR"的规则,在此不做赘述。

MatchStrategyFactory#

这是MatchStrategy的工厂类,实现了两个方法,一个是newInstance()方法根据策略代码和名称,返回由SPI ExtensionLoader按key来加载对应的MatchStrategy实现类。

    public static MatchStrategy newInstance(final Integer strategy) {        String matchMode = MatchModeEnum.getMatchModeByCode(strategy);        return ExtensionLoader.getExtensionLoader(MatchStrategy.class).getJoin(matchMode);    }

MatchModeEnum 中定义了match策略的code和name。 调用时由策略名称,如"and","or",根据启动时SPI加载的key-value资料,找到对应的实现类:

AND(0, "and"),  OR(1, "or");

另一个是match()方法,调用实作类的match方法。

    public static boolean match(final Integer strategy, final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {        return newInstance(strategy).match(conditionDataList, exchange);    }

调用方式#

shenyu-plugin模组的各个plugin的基类AbstractShenyuPlugin 中,定义了两个选择的方法:filterSelectorfilterRule 它们都调用了MatchStrategyFactory 方法,下面是AbstractShenyuPluginfilterSelector方法的代码:

    private Boolean filterSelector(final SelectorData selector, final ServerWebExchange exchange) {        if (selector.getType() == SelectorTypeEnum.CUSTOM_FLOW.getCode()) {            if (CollectionUtils.isEmpty(selector.getConditionList())) {                return false;            }            return MatchStrategyFactory.match(selector.getMatchMode(), selector.getConditionList(), exchange);        }        return true;    }

这段代码中,先检测参数匹配条件SelectorData是否为空,之后调用MatchStrategyFactorymatch方法,工厂方法将调用对应的实作类的match方法。同理,如下是AbstractShenyuPluginfilterRule 方法

    private Boolean filterRule(final RuleData ruleData, final ServerWebExchange exchange) {        return ruleData.getEnabled() && MatchStrategyFactory.match(ruleData.getMatchMode(), ruleData.getConditionDataList(), exchange);    }

也同样是调用MatchStrategyFactorymatch方法,看上去是不是特别的简洁甚至是简单? 在PredicteJudge代码分析文中,对shenyu-plugin如何做参数调用方面做了更进一步的描述。

Summary#

由于应用了Apache shenyuSPI框架,使得整体上具有松耦合、易于扩展的特点。在多个参数规则策略方面,MatchStrategy提供了良好的设计,虽然目前只提供了两个AND 和OR的实现类,但未来可以很轻松地扩展为更多MatchStrategy规则,例如 firstOf:即必须满足第一个条件,或mostOf-满足大部分条件等更多复杂策略,而其他调用部分的代码完全不受影响。

有兴趣的读者可以去阅读Shenyu plugin的源码了解更多内容。

PredicateJudge-- 基于SPI的设计实现分析

· One min read
Apache ShenYu Contributor

灵活的插件和规则定义,是Shenyu网关的一大特色。它以插件形式支持多种网络协议和多种流行的微服务框架,如Dubbo, gRPC和 Spring-Cloud 等。 为了实现对各种协议及插件的配置规则的解析,网关在规则策略解析方面,采用了优雅的SPI(Service Provider Interface)实现,当添加新的插件时,规则解析部分可以沿用现有实现或采用SPI机制快速实现,具有良好的可扩展性。

SPI 的顶层设计#

Shenyu的SPI采用接口+ 工厂模式+配置文件的方式,来实现模组的动态加载。在其shen-SPI-模组,做了SPI的顶层设计。定义了@ Join ,@SPI 两个annotation。 其中@Join 代表此类会加入扩展机制,相当于是做申请注册。 @SPI 标明当前类为SPI功能扩展类。

Fig 1 classes in the shenyu-spi

toplevel-SPI

配置文件方面,定义SPI加载的目录为 META-INF/shenyu/

SHENYU_DIRECTORY = "META-INF/shenyu/";

系统启动时,会扫描 SHENYU_DIRECTORY 下的配置文件,并由 ExtensionLoader 类来加载所配置的SPI扩展类,并cache到内存中。 配置文件内容为 key=class的形式。 在系统执行期间, 由ExtensionFactory的实现类,返回key所对应的SPI实现类。

shenyu-plugin的SPI 实现#

shenyu-plugin模组中,按照插件机制,实现了各种请求转发功能,包括支持request, redirect, response, rewrite等http协议功能,及 gRPC, dubbo, hystrix等微服务框架, 并且插件功能还在不断增加中。如果在各自的功能插件实做类中,还要做对routing 参数的解析等处理,不仅会造成程序的冗余,而且当要支持各自匹配规则,如通配符、正则表达式、SpEL解析等,会造成频繁对插件核心代码的修改。因此,在shenyu-plugin模组中,将routing参数解析做了更高一层的抽象,并按照SPI机制做了规则解析的实现。解析由三个部分组成:

  • ParameterData-参数资料,

  • PredictJudge-断言

  • MatchStrategy-匹配策略三个SPI实现。

    这些扩展类定义在 shenyu-plugin-base module中,经过这样抽象后,每个插件实现中,routing 参数解析的功能全部由AbstractShenyuPlugin 来调用上述三个SPI工厂类来定义和实现。做到了功能的专一,并易于扩展,符合SOLID原则。

本节就其中的PredictJudge-断言做详细解析。可以看到这个module中的pom文件中,添加了对shenyu-SPI的依赖

<dependency>    <groupId>org.apache.shenyu</groupId>    <artifactId>shenyu-spi</artifactId>    <version>${project.version}</version></dependency>

PredicateJudge SPI 设计#

PredicateJudge SPI 实现用来解析判断各类规则,当网关中配置的。这个类命名和功能都类似于java 的Predicate ,但对接受行为做了更进一步的抽象。这个SPI通过一个工厂和策略模式实现,首先来看PredicateJudge SPI接口的定义:

@SPI@FunctionalInterfacepublic interface PredicateJudge {
    /**     * judge conditionData and realData is match.     *     * @param conditionData {@linkplain ConditionData}     * @param realData       realData     * @return true is pass  false is not pass.     */    Boolean judge(ConditionData conditionData, String realData);}

这部分的类图如下:

Fig 2-Predicate class diagram

predicate-class-diagram

PredicateJudgeFactory的重要方法如下:

    public static PredicateJudge newInstance(final String operator) {        return ExtensionLoader.getExtensionLoader(PredicateJudge.class).getJoin(processSpecialOperator(operator));    }
    public static Boolean judge(final ConditionData conditionData, final String realData) {        if (Objects.isNull(conditionData) || StringUtils.isBlank(realData)) {            return false;        }        return newInstance(conditionData.getOperator()).judge(conditionData, realData);    }

这里ConditionData定义如下包含属性四个String类型的属性: paramType, operator,paramName,paramValue

ParamTypeEnum#

参数 paramType必须为系统中枚举类型 ParamTypeEnum,默认支持的paramType有:

post, uri,query, host, ip,header, cookie,req_method

OperatorEnum#

operator 必须为枚举类型 OperatorEnum ,目前支持的操作符有:(注意,严格区分大小写)

   match, =,regex, >,<, contains, SpEL,  Groovy, TimeBefore,TimeAfter

基于以上的规则, plugin 模组实现了如下8个 PredicateJudge 实现类,分别实现上述operator的逻辑匹配规则.

Implementation classRule denotes 规则说明corespondece operator
ContainsPredicateJudge包含关系 "contains", 实际结果,需要包含所定规则的值contains
EqualsPredicateJudge相等"=",=
MatchPredicateJudge用于URI 路径匹配的处理match
TimerAfterPredicateJudge当前local时间是否晚于设定的时间TimeBefore
TimerBeforePredicateJudge当前local时间是否早于设定的时间TimeAfter
GroovyPredicateJudgeGroovy,设定ParamName的值,与设定ParamValue相同Groovy
RegexPredicateJudge正则表达式匹配资料regex

调用方法#

当要做一组参数的解析时,只需要调用PredicateJudgeFactory的judge方法即可:

PredicateJudgeFactory.judge(final ConditionData conditionData, final String realData);

SPI配置文件#

这些PredicateJudge实现类在 SHENYU_DIRECTORY 中的config文件中做了配置,在启动时会加加载并cache到内存中。

PredicateJudge文件的内容如下,为key=class形式,左边的operator要和ParamEnum中的定义一致。

equals=org.apache.shenyu.plugin.base.condition.judge.EqualsPredicateJudge
contains=org.apache.shenyu.plugin.base.condition.judge.ContainsPredicateJudgeGroovy=org.apache.shenyu.plugin.base.condition.judge.GroovyPredicateJudgematch=org.apache.shenyu.plugin.base.condition.judge.MatchPredicateJudgeregex=org.apache.shenyu.plugin.base.condition.judge.RegexPredicateJudgeSpEL=org.apache.shenyu.plugin.base.condition.judge.SpELPredicateJudgeTimeAfter=org.apache.shenyu.plugin.base.condition.judge.TimerAfterPredicateJudgeTimeBefore=org.apache.shenyu.plugin.base.condition.judge.TimerBeforePredicateJudge

PredicateJudge SPI在网关Plugin中的使用#

网关系统中,大部分的Plugin 都继承自AbstractShenyuPlugin,这个抽象类中,在做选择和规则解析时,调用了上述SPI中的MatchStrategy,继而在策略判断时调用PredicateJudge 的各个断言类来处理。

Plugin与SPI 的类图如下:

Fig 3- class diagram of plugins with PredicateJudge and MatchStrategy SPI

plugin-SPI-class-diagram

从客户端发来的请求,在系统中调用规则部分的SPI的流程如下:

Fig 4- flow chart for Shenyu gateway filter with parameter processing

SPI-flow-diagram

  • 系统启动时,会加载目录下配置的SPI资料到内存中
  • 当client有新的请求发到Apache shenyu 网关系统时,在网关内部,会调用对应的plugin
  • 对实际请求资料做规则匹配时,会根据所包含的operator,调用的对应的PredicateJudge实现类

其他#

PredicateJudge 判断结果举例#

ContainsPredicateJudge- " contains“ rule#

举例:给定一组参数(ConditionData ), paramType="uri", paramValue 是 "/http/**"

当应用 ContainsPredicateJudge包含关系时,判断结果如下表:

ConditionData (operator="contains")real datajudge result
paramType="uri", "/http/**""/http/**/test"true
"/test/http/**/other"true
"/http1/**"false

其他的几个PredicateJudge的具体功能可参考其代码和测试类.

RateLimiter SPI 代码分析

· One min read
Apache ShenYu Contributor

限流是网关必备的功能,用来应对高并发请求的场景。当系统受到异常攻击,短期内聚集了大量的流量;当有大量低级别的请求,处理这些请求会影响关键业务的处理,需要限制这些请求的访问速度; 或者系统内部出现一些异常,不能满负荷的服务整个应用请求等等。这些情况下,都需要启用限流来保护系统。可以拒绝服务、等待或降级处理,将流量限制到系统可接受的量,或者只允许某些域名(或某些业务)的请求优先处理。

针对以上的场景需求,在设计一个API网关的限流功能时,就需要考虑如下的扩展点:

  1. 可以支持多种限流的算法,并易于扩展。

  2. 要可以支持多种限流的方式,能区分用户群、高低优先级的请求。

  3. 要支持高并发,能快速的做出限制或通过的决策。

  4. 要有容错处理,如果限流程序出错,网关系统能继续执行。

    本文会先介绍shenyu网关限流部分的总体技术架构,之后重点分析RateLimiter SPI扩展实现的代码。

This article based on shenyu-2.4.0 version of the source code analysis.

RateLimiter 总体设计说明#

​ WebFlux是Spring 提供的基于Reactor模型的异步非阻塞框架,能提升吞吐量,使系统有更好的可伸缩性。Apache Shenyu网关的插件功能基于WebFlux框架实现的。RateLimiter功能是在ratelimiter-plugin中实现。在限流过程中,常用的算法有令牌桶、漏桶等算法,这些算法执行中,需要检核请求的来源,对已使用的流量做计数及逻辑计算,判定是否允许通过。为了提高并发及性能, 将计数和算法逻辑处理,都放到redis中。Java代码负责做数据参数的传递。在调用redis时,lua脚本可以常驻在redis内存中,能减少网络开销,并可以作为一个整体执行,具有原子性。Spring Data Redis 提供了对redis命令执行的抽象,执行序列化,及自动使用redis 脚本缓存。在这个plugin中,由于采用了reactor 非阻塞框架,所以采用Spring Redis Reactive类库实现对redis的功能调用。

​ 这个plugin中的类包图如下,重点标出了与RateLimiter SPI相关的两个package: resolver 和algorithm.

ratelimiter-package-diagram

RateLimiter SPI的设计#

由于采用了Spring data+ Redis +lua架构实现了高并发的需求。 如何做到对算法和限流方式的扩展呢? Shenyu ratelimiter plugin中设计了两个SPI来实现这两个需求:

  • RateLimiterAlgorithm:用来扩展不同的限流算法。
  • RateLimiterKeyResolver: 用于扩展获取请求的关键信息,用于区分流量,例如按IP 地址、按某一段域名等来区分访问的请求。

SPI的具体实作类与配置信息位于:SHENYU_DIRECTORY目录下 (默认在/META-INF/shenyu)下。

RateLimiterKeyResolver#

获取请求的关键信息,用于分组限流,例如按URL/ 用户 / IP 等, RateLimiterKeyResolver 接口定义如下:

@SPIpublic interface RateLimiterKeyResolver {
    /**     * get Key resolver's name.     *     * @return Key resolver's name     */    String getKeyResolverName();
    /**     * resolve.     *     * @param exchange exchange the current server exchange {@linkplain ServerWebExchange}     * @return rate limiter key     */    String resolve(ServerWebExchange exchange);}

@SPI将当前interface 注册为Shenyu SPI 接口。resolve(ServerWebExchange exchange)方法用来提供解析方式。

RateLimiterKeyResolver SPI 提供了两种key resolver, WholeKeyResolveRemoteAddrKeyResolver,其中RemoteAddrKeyResolver中的resolve方法代码如下:

    @Override    public String resolve(final ServerWebExchange exchange) {        return Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();    }

其key值为请求的IP地址。 基于SPI及工厂类的实现,可以非常方便的扩展实现新的key resolver,如URL,用户等等。

RateLimiterAlgorithm SPI#

RateLimiterAlgorithm SPI 用来实现对不同限流算法的识别、加载和定义,其类图如下:

ratelimiteral-class-diagram

本模组使用了工厂模式,提供了接口类、抽象类和工厂类,提供了4个实现类,其中实现类对应的Lua脚本在 RateLimitEnum 中做了定义,放置在 /META-INF/scripts 目录下。接口RateLimiterAlgorithm的代码如下:

@SPIpublic interface RateLimiterAlgorithm<T> {        RedisScript<T> getScript();    List<String> getKeys(String id);        /**     * Callback string.     *     * @param script the script     * @param keys the keys     * @param scriptArgs the script args     */    default void callback(final RedisScript<?> script, final List<String> keys, final List<String> scriptArgs) {    }}

@SPI 将这个接口注册为shenyu SPI, 其中定义了三个方法:

  • getScript() 方法返回一个 RedisScript对象,这个对象将传递给Redis。
  • getKeys(String id) 返回一个键值的List.
  • callback()回调函数用于异步处理一些需要在返回后做的处理,缺省是空方法。

抽象类 AbstractRateLimiterAlgorithm#

在这个类中,实现了接口的模板方法,使用参数类型为List<Long>, 抽象方法getScriptName() 和getKeyName() 留给各个实作类来实现。如下的getScript() 是这个类中读取lua脚本的处理代码。

    public RedisScript<List<Long>> getScript() {        if (!this.initialized.get()) {            DefaultRedisScript redisScript = new DefaultRedisScript<>();            String scriptPath = "/META-INF/scripts/" + getScriptName();            redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(scriptPath)));            redisScript.setResultType(List.class);            this.script = redisScript;            initialized.compareAndSet(false, true);            return redisScript;        }        return script;    }

AtomicBoolean类型的变量initialized 用来标记lua脚本是否有被加载。 如果还没有加载,就从/META-INF/scripts/目录下,读取scriptName指定的Lua文件,加载成RedisScript对象。指定结果为List类型, 设定量initialized为true,避免重复加载。 返回 RedisScript对象。

AbstractRateLimiterAlgorithmgetKeys()的代码如下,

    @Override    public List<String> getKeys(final String id) {        String prefix = getKeyName() + ".{" + id;        String tokenKey = prefix + "}.tokens";        String timestampKey = prefix + "}.timestamp";        return Arrays.asList(tokenKey, timestampKey);    }

这个模板方法中,产生了两个字符串,其中,tokenKey会作为Key传递给redis, 指向一个有序集合。 timestampKey是一个以传入id 为识别的字符串。

可以从上面的类图中看到,ConcurrentRateLimiterAlgorithmSlidingWindowRateLimiterAlgorithm 有覆写getKeys(String id)方法,而两外两个算法程序,则采用的是抽象类中的实现。也只有 ConcurrentRateLimiterAlgorithm 重写了callback()方法。下文中我们会对此做进一步的分析。

工厂类RateLimiterAlgorithmFactory#

RateLimiterAlgorithmFactory 中依据算法名称,获取RateLimiterAlgorithm实例的方法代码如下:

public static RateLimiterAlgorithm<?> newInstance(final String name) {    return Optional.ofNullable(ExtensionLoader.getExtensionLoader(RateLimiterAlgorithm.class).getJoin(name)).orElse(new TokenBucketRateLimiterAlgorithm());}

按照Apache shenyu SPI的规则,由加载器ExtensionLoader获得实作类,当找不到算法时,默认返回令牌桶算法实现类。

与Redis做资料交互#

从上面代码我们了解到Apache Shenyu网关中,RateLimiter SPI 的基本扩展点,在Shenyu网关运行中,应用ReactiveRedisTemplate 来异步执行对redis的调用处理。实现代码在RedisRateLimiter类的isAllowed()方法中,其部分代码如下:

    public Mono<RateLimiterResponse> isAllowed(final String id, final RateLimiterHandle limiterHandle) {        // get parameters that will pass to redis from RateLimiterHandle Object        double replenishRate = limiterHandle.getReplenishRate();        double burstCapacity = limiterHandle.getBurstCapacity();        double requestCount = limiterHandle.getRequestCount();        // get the current used RateLimiterAlgorithm        RateLimiterAlgorithm<?> rateLimiterAlgorithm = RateLimiterAlgorithmFactory.newInstance(limiterHandle.getAlgorithmName());                ........        Flux<List<Long>> resultFlux = Singleton.INST.get(ReactiveRedisTemplate.class).execute(script, keys, scriptArgs);        return resultFlux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))                .reduce(new ArrayList<Long>(), (longs, l) -> {                    longs.addAll(l);                    return longs;                }).map(results -> {                    boolean allowed = results.get(0) == 1L;                    Long tokensLeft = results.get(1);                    return new RateLimiterResponse(allowed, tokensLeft);                })                .doOnError(throwable -> log.error("Error occurred while judging if user is allowed by RedisRateLimiter:{}", throwable.getMessage()))                .doFinally(signalType -> rateLimiterAlgorithm.callback(script, keys, scriptArgs));    }

POJO 对象RateLimiterHandle 中,定义了限流所需的属性算法名称, 速录,容量,请求的数量 。首先 从limiterHandle 包装类中取得需要传入redis的几个参数。之后从RateLimiterAlgorithmFactory 从工厂类取得当前配置的限流算法。 之后做Key值和参数的传递。

为了更方便阅读,下图给出了java代码与redis执行参数输入、输出的传递过程。左边是isAllowed() 函数的后半部分代码,右边是一个Lua脚本的输入输出代码。

下面说明Java代码的执行过程:

  1. getKeys()方法获得两个键值List<String>. 其中第一个Key会映射为Redis中的有序集合。

  2. 设定4个参数:速率 replenishRate ,容量 burstCapacity, 时间戳, 返回当前java 纪元秒数(长整数)EpochSecond, 请求的数量 requestcount.

  3. 按所设定的脚本、Key值、参数调用ReactiveRedisTemplate功能,执行redis处理。返回参数是Flux<List<Long>>类型

  4. 通过reduce方法将其返回值从Flux<ArrayList<Long>> 类型转换为Mono<ArrayList<Long>>,再经过map方法,转换为Mono<RateLimiterResponse>返回。

    返回结果有两个资料,allowed =1, 代表允许通过,0-不通过;而第二个返回参数tokensLeft,是可用的剩余请求数量。

5.容错性方面,由于使用的是reactor 的非阻塞通讯模型,当发生错误时,会执行onErrorResume()语句,Flux.just产生返回资料, 默认为allowed=1, 代表允许通过, 并丢出错误日志。

6.之后执行doFinally()方法,执行算法实现类的callback方法。

io-with-lua

4种限流算法#

上面我们了解了网关中如何通过Java代码如何与Redis 做通讯,这一节我们通过简要分析网关中提供的4种限流算法中的一些代码,来理解如何开发使用RateLimiter SPI的接口方法,并与Redis有效协作。

Ratelimiter SPI目前提供了4种限流算法:

Algorithm nameJava classLua script file
Request rate limiterTokenBucketRateLimiterAlgorithmrequest_rate_limiter.lua
Slide window rate limiterSlidingWindowRateLimiterAlgorithmliding_window_request_rate_limiter.lua
Concurrent rate limiterConcurrentRateLimiterAlgorithmconcurrent_request_rate_limiter.lua
Leaky bucket algorithmLeakyBucketRateLimiterAlgorithmrequest_leaky_rate_limiter.lua
  1. 令牌桶限流:按请求数量限流,设置每秒N个请求,超过N的请求会拒绝服务。算法实现时,以时间间隔计算匀速产生令牌的数量。若每次请求的数量,小于桶内令牌的数量,则允许通过。 时间窗口为 2*容积/速率。
  2. 滑动窗口限流:与令牌桶限流不同在于,其窗口大小比令牌桶的窗口小,为一个容积/速率。并且每次移动向后一个时间时间窗口。其他限流原理与令牌桶类似。
  3. 并发的请求速率限流:严格限制并发访问量为N个请求,大于N的请求会被拒绝。每次当有新请求,查看计数是否大于N, 若小于N则允许通过,计数加1。 当这个请求调用结束时,会释放这个信号(计数减1)
  4. 漏桶算法: 相对于令牌桶算法,漏桶算法有助于减少流量聚集,实现更为平滑的限流处理。 漏桶算法强制以常数N的速率输出流量,其以漏桶为模型,可漏水的量为时间间隔 *速率。若可漏水量>已使用量,则已使用量设为0( 清空漏桶),否则已使用量要减去可漏水量。 若请求数量+ 已使用量< 总容量,则允许请求通过。

下面以 并行限流算法为例,解读Lua和Java代码,查看callback 方法的使用。 通过解读令牌桶和滑动窗口算法代码,了解getKey()方法的使用。

并发请求数限流中使用callback方法#

首先ConcurrentRateLimiterAlgorithmgetKeys() 方法覆写了抽象类中的模板方法:

    @Override    public List<String> getKeys(final String id) {        String tokenKey = getKeyName() + ".{" + id + "}.tokens";        String requestKey = UUIDUtils.getInstance().generateShortUuid();        return Arrays.asList(tokenKey, requestKey);    }

第二个元素 requestKey 是一个long型不重复值(由一个分布式ID产生器产生的,递增,比当前时间EpochSecond小), 相应的concurrent_request_rate_limiter.lua的代码:

local key = KEYS[1]
local capacity = tonumber(ARGV[2])local timestamp = tonumber(ARGV[3])local id = KEYS[2]

这里id 即是取得上面的getKeys()方法产生的requestKey, 一个uuid. 后续的处理如下:

local count = redis.call("zcard", key)local allowed = 0
if count < capacity then  redis.call("zadd", key, timestamp, id)  allowed = 1  count = count + 1endreturn { allowed, count }

先用zcard命令统计redis中key值所对应的有序集合中的元素个数,若元素总数count小于容量,则允许通过,并用zadd key score member方法,向key所在的有序集合中,添加一个元素id, 其score为timestamp. 则此时元素的总个数count实际为count+1.

以上的代码都是在redis中作为一个原子操作来执行的。当同一个key (例如Ip下)有大量并发请求时,redis记录的该ip的有序集合的数量count也在不断累加中。当超过容量限制,则会拒绝服务。

并发请求数限流算法中,要求当请求调用结束时,要释放这个信号量,lua代码中并没有做这个处理。

我们来看看 ConcurrentRateLimiterAlgorithm类中的回调函数:

    @Override    @SuppressWarnings("unchecked")    public void callback(final RedisScript<?> script, final List<String> keys, final List<String> scriptArgs) {        Singleton.INST.get(ReactiveRedisTemplate.class).opsForZSet().remove(keys.get(0), keys.get(1)).subscribe();    }

这里做了一个异步的订阅处理,通过ReactiveRedisTemplate删除redis中(key, id)的元素,等待调用结束后,释放这个信号。这个remove的处理不能放到lua脚本中执行,否则逻辑就是错误的。这也正是RateLimiterAlgorithm SPI 设计callback方法的用意。

令牌桶算法中使用getKeys()#

对应的Lua 代码如下:

local tokens_key = KEYS[1]local timestamp_key = KEYS[2]

省略获取参数的代码

local fill_time = capacity/ratelocal ttl = math.floor(fill_time*2)

时间窗口ttl 大概是 2* 容量/速率.

local last_tokens = tonumber(redis.call("get", tokens_key))if last_tokens == nil then  last_tokens = capacityend

从有序集合中取得上次使用的token,如果没有则last_tokens = 容量。

local last_refreshed = tonumber(redis.call("get", timestamp_key))if last_refreshed == nil then  last_refreshed = 0end

以timestamp_key为key,从有序集合中取得上次刷新时间,默认为0.

local delta = math.max(0, now-last_refreshed)local filled_tokens = math.min(capacity, last_tokens+(delta*rate))local allowed = filled_tokens >= requestedlocal allowed_num = 0if allowed then  new_tokens = filled_tokens - requested  allowed_num = 1end

时间间隔*速率匀速产生令牌,若令牌数量>请求数量,则allowed=1, 并且更新令牌数量。

redis.call("setex", tokens_key, ttl, new_tokens)redis.call("setex", timestamp_key, ttl, now)
return { allowed_num, new_tokens }

这里now是传入的当前时间(EpochSecond),设置tokens_key所对应的有序集合的值为 new_tokens(即新令牌数量) , 过期时间为ttl。 更新集合中,timestamp_key的值为当前时间,过期时间为ttl.

滑动窗口算法中使用getKeys方法#

SlidingWindowRateLimiterAlgorithmgetKeys()同样覆写了父类,代码与ConcurrentRateLimiterAlgorithm 方法代码一致。

如下为滑动窗口算法的Lua代码,省略了其他参数的接收代码。

local timestamp_key = KEYS[2]...... local window_size = tonumber(capacity / rate)local window_time = 1

设定窗口大小为容积/速率。

local last_requested = 0local exists_key = redis.call('exists', tokens_key)if (exists_key == 1) then    last_requested = redis.call('zcard', tokens_key)end

获取当前key 的基数

local remain_request = capacity - last_requestedlocal allowed_num = 0if (last_requested < capacity) then    allowed_num = 1    redis.call('zadd', tokens_key, now, timestamp_key)end

计算剩余可用量 = 容量 减去已使用量,若last_requested < capacity ,则允许通过,并且在tokens_key为key的有序集合中,增加一个 元素(key =timestam_key,value= now)

redis.call('zremrangebyscore', tokens_key, 0, now - window_size / window_time)redis.call('expire', tokens_key, window_size)
return { allowed_num, remain_request }

前面已经设定window_time=1, 用Redis的 zremrangebyscore命令,移除有序集合中,score为[0- 当前时间-窗口大小]的元素,即移动一个窗口大小。设定tokens_key的过期时间为窗口大小。

AbstractRateLimiterAlgorithm的模板方法中,getKeys(final String id) 给出的第二个值(以secondKey指代),是拼接了{id} (即resolve key)的一个固定字符串。从上面三个算法代码可以看到,在令牌桶算法中,secondKey在Lua代码执行中会更新为最新的时间,所以无所谓传入的值。而在并发限流算法中,会以此secondKey为条件,在java callback方法中移除对应的元素。而在滑动窗口算法中,这个secondKey的值,会作为一个新元素的key, 增加到当前有序集合中,并在做窗口滑动中,过期的资料会被删除掉。

总之,当设计新的限流算法时,要根据算法需要仔细设计getKey()方法。

如何调用 RateLimiter SPI#

RateLimiter Plug中的doExecute()方法中,传入的三个参数 exchange 为请求的连接, chain 为shenyu插件的调用链,selector 是选择器,rule是系统中配置的规则参数资料。

protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {    RateLimiterHandle limiterHandle = RatelimiterRuleHandleCache.getInstance()        .obtainHandle(CacheKeyUtils.INST.getKey(rule));    String resolverKey = Optional.ofNullable(limiterHandle.getKeyResolverName())        .flatMap(name -> Optional.of("-" + RateLimiterKeyResolverFactory.newInstance(name).resolve(exchange)))        .orElse("");    return redisRateLimiter.isAllowed(rule.getId() + resolverKey, limiterHandle)        .flatMap(response -> {            if (!response.isAllowed()) {                exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);                Object error = ShenyuResultWrap.error(ShenyuResultEnum.TOO_MANY_REQUESTS.getCode(), ShenyuResultEnum.TOO_MANY_REQUESTS.getMsg(), null);                return WebFluxResultUtils.result(exchange, error);            }            return chain.execute(exchange);        });}

1.首先,从缓存中,取得系统设定的限流参数RateLimiterHandle实例 limiterHandle. 2.根据name指定的Resolver 获得请求的连接Key信息(如地址等). 3.调用 RedisRateLimiter的 isAllowed方法, 获取返回值后, 4.若isAllowd=false,做错误处理 5.如果 isAllowed=true,return chain.execute(exchange), 对该请求做后续处理,传递到调用链的下一关。

Summary#

整个RateLimiter plugin框架基于Spring WebFlux开发,用redis 和lua脚本做限流计数及核心逻辑处理,支持高并发及弹性扩展。

  1. RateLimiter SPI 提供了两个SPI 接口,通过应用面向接口设计及各种设计模式,可以方便的增加新的限流算法,以及各种流量解析规则。

  2. 提供了令牌桶、并发速率限流、滑动窗口、漏桶4种限流算法。在设计算法实现时,需要根据算法特征设计KEY值,用Lua脚本实现在redis中要处理的逻辑,设计callback()方法做后续的数据处理。

  3. 响应式编程,实现过程简洁高效。

Apache ShenYu 启动示例

· One min read
Kunshuai Zhu
Apache ShenYu Contributor

环境准备#

  • 本地正确安装JDK1.8+
  • 本地正确安装Git
  • 本地正确安装Maven
  • 选择一款开发工具,比如IDEA

拉取ShenYu代码#

使用Git拉取代码

> git clone https://github.com/apache/incubator-shenyu.git

编译代码#

使用Maven进行编译

> cd incubator-shenyu> mvn clean install -Dmaven.javadoc.skip=true -B -Drat.skip=true -Djacoco.skip=true -DskipITs -DskipTests

启动网关服务#

使用开发工具,以IDEA为例。

启动 shenyu-admin 控制台(默认使用H2数据库)

start-demo-admin

启动 shenyu-bootstrap

start-demo-bootstrap

到这一步,shenyu网关已经启动。

我们可以打开浏览器,访问admin控制台:http://localhost:9095/

启动应用服务#

Apache ShenYu提供了Http、Dubbo、SpringCloud等应用接入shenyu网关的样例,位于 shenyu-example 模块,这里以Http服务为例。

shenyu-example 未被IDEA标记为Maven项目,可以右键点击 shenyu-example 目录下的 pom.xml 文件,将其添加为Maven项目。

start-demo-maven

启动 shenyu-examples-http

start-demo-examples-http

这时,shenyu-examples-http 会自动把加 @ShenyuSpringMvcClient 注解的接口方法,以及application.yml中的相关配置注册到网关。我们打开admin控制台,即可在divide、context_path中看到相关配置。

测试Http请求#

下面使用postman模拟http的方式来请求你的http服务:

start-demo-post-http

使用更多插件#

我们可以参考 官方文档,来使用其他的插件。

这里以使用 param-mapping 插件为例。

BasicConfig -> Plugin 编辑 param-mapping 插件,设置 status

start-demo-plugin

PluginList -> http process 配置选择器和规则。

start-demo-selector

start-demo-rules

然后使用 postman/http/test/payment 发起http请求。

start-demo-post-param-mapping

WebSocket数据同步源码分析

· One min read
Apache ShenYu Committer

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosetcdConsul 进行数据同步。本文的主要内容是基于WebSocket的数据同步源码分析。

本文基于shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. 关于WebSocket通信#

WebSocket协议诞生于2008年,在2011年成为国际标准。它可以双向通信,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息。WebSocket协议建立在 TCP 协议之上,属于应用层,性能开销小,通信高效,协议标识符是ws

2. Admin数据同步#

我们从一个实际案例进行源码追踪,比如在后台管理系统中,新增一条选择器数据:

2.1 接收数据#

  • SelectorController.createSelector()

进入SelectorController类中的createSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController {        @PostMapping("")    public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) { // @Valid 数校验        // 添加或更新数据        Integer createCount = selectorService.createOrUpdate(selectorDTO);        // 返回结果信息        return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);    }        // ......}

2.2 处理数据#

  • SelectorServiceImpl.createOrUpdate()

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService {    // 负责事件发布的eventPublisher    private final ApplicationEventPublisher eventPublisher;        @Override    @Transactional(rollbackFor = Exception.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        // 构建数据 DTO --> DO        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        // 判断是添加还是更新        if (StringUtils.isEmpty(selectorDTO.getId())) {            // 插入选择器数据            selectorCount = selectorMapper.insertSelective(selectorDO);            // 插入选择器中的条件数据            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });            // check selector add            // 权限检查            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());                dataPermissionDTO.setDataId(selectorDO.getId());                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));            }
        } else {            // 更新数据,先删除再新增            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        // 发布事件        publishEvent(selectorDO, selectorConditionDTOs);
        // 更新upstream        updateDivideUpstream(selectorDO);        return selectorCount;    }            // ......    }

Service类完成数据的持久化操作,即保存数据到数据库,这个大家应该很熟悉了,就不展开。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会进行数据同步。

publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        // 找到选择器对应的插件        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        // 构建条件数据        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // 发布变更数据        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者。

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {//......}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    //......    }

2.3 分发数据#

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  /**     * 有数据变更时,调用此方法     * @param event     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                case APP_AUTH: // 认证信息                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:  // 插件信息                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:    // 规则信息                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:  // 元数据                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:  // 其他类型,抛出异常                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    }

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:

  • WebsocketDataChangedListener:基于websocket的数据同步;
  • ZookeeperDataChangedListener:基于zookeeper的数据同步;
  • ConsulDataChangedListener:基于consul的数据同步;
  • EtcdDataDataChangedListener:基于etcd的数据同步;
  • HttpLongPollingDataChangedListener:基于http长轮询的数据同步;
  • NacosDataChangedListener:基于nacos的数据同步;

既然有这么多种实现策略,那么如何确定使用哪一种呢?

因为本文是基于websocket的数据同步源码分析,所以这里以WebsocketDataChangedListener为例,分析它是如何被加载并实现的。

通过在源码工程中进行全局搜索,可以看到,它的实现是在DataSyncConfiguration类完成的。

/** * 数据同步配置类 * 通过springboot条件装配实现 * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration {     /**     * websocket数据同步(默认策略)     * The WebsocketListener(default strategy).     */    @Configuration    @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)    @EnableConfigurationProperties(WebsocketSyncProperties.class)    static class WebsocketListener {
        /**         * Config event listener data changed listener.         * 配置websocket数据变更监听器         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(WebsocketDataChangedListener.class)        public DataChangedListener websocketDataChangedListener() {            return new WebsocketDataChangedListener();        }
        /**         * Websocket collector.         * Websocket处理类:建立连接,发送消息,关闭连接等操作         * @return the websocket collector         */        @Bean        @ConditionalOnMissingBean(WebsocketCollector.class)        public WebsocketCollector websocketCollector() {            return new WebsocketCollector();        }
        /**         * Server endpoint exporter          *         * @return the server endpoint exporter         */        @Bean        @ConditionalOnMissingBean(ServerEndpointExporter.class)        public ServerEndpointExporter serverEndpointExporter() {            return new ServerEndpointExporter();        }    }        //......}

这个配置类是通过SpringBoot条件装配类实现的。在WebsocketListener类上面有几个注解:

  • @Configuration:配置文件,应用上下文;

  • @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用websocket进行数据同步。不过,这里需要注意下matchIfMissing = true这个属性,它表示,如果你没有如下的配置,该配置类也会生效。基于websocket的数据同步时官方推荐的方式,也是默认采用的方式。

    shenyu:    sync:    websocket:      enabled: true
  • @EnableConfigurationProperties:启用配置属性;

当我们主动配置,采用websocket进行数据同步时,WebsocketDataChangedListener就会生成。所以在事件处理方法onApplicationEvent()中,就会到相应的listener中。在我们的案例中,是新增加了一条选择器数据,数据通过采用的是websocket,所以,代码会进入到WebsocketDataChangedListener进行选择器数据变更处理。

    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        // 遍历数据变更监听器(一般使用一种数据同步的方式就好了)        for (DataChangedListener listener : listeners) {            // 哪种数据发生变更            switch (event.getGroupKey()) {                                    // 省略了其他逻辑                                    case SELECTOR:   // 选择器信息                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // WebsocketDataChangedListener进行选择器数据变更处理                    break;         }    }

2.4 Websocket数据变更监听器#

  • WebsocketDataChangedListener.onSelectorChanged()

    onSelectorChanged()方法中,将数据进行了封装,转成WebsocketData,然后通过WebsocketCollector.send()发送数据。

    // 选择器数据有更新    @Override    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {        // 构造 WebsocketData 数据        WebsocketData<SelectorData> websocketData =                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);        // 通过websocket发送数据        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);    }

2.5 Websocket发送数据#

  • WebsocketCollector.send()

send()方法中,判断了一下同步的类型,根据不同的类型,进行处理。

@Slf4j@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)public class WebsocketCollector {    /**     * Send.     *     * @param message the message     * @param type    the type     */    public static void send(final String message, final DataEventTypeEnum type) {        if (StringUtils.isNotBlank(message)) {            // 如果是MYSELF(第一次的全量同步)            if (DataEventTypeEnum.MYSELF == type) {                // 从threadlocal中获取session                Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);                if (session != null) {                    // 向该session发送全量数据                    sendMessageBySession(session, message);                }            } else {                // 后续的增量同步                // 向所有的session中同步变更数据                SESSION_SET.forEach(session -> sendMessageBySession(session, message));            }        }    }
    private static void sendMessageBySession(final Session session, final String message) {        try {            // 通过websocket的session把消息发送出去            session.getBasicRemote().sendText(message);        } catch (IOException e) {            log.error("websocket send result is exception: ", e);        }    }}

我们给的案例是一个新增操作 ,是一个增量同步,所以会走

SESSION_SET.forEach(session -> sendMessageBySession(session, message));

这个逻辑。

再通过

session.getBasicRemote().sendText(message);

将数据发送了出去。

至此,当admin端发生数据变更时,就将变更的数据以增量形式通过WebSocket发给了网关。

分析到这里,不知道大家有没有疑问呢?比如session是怎么来的?网关如何和admin建立连接的?

不要着急,我们接下来就进行网关端的同步分析。

不过,在继续源码分析前,我们用一张图将上面的分析过程串联起来。

3. 网关数据同步#

假设ShenYu网关已经在正常运行了,使用的数据同步方式也是websocket。那么当在admin端新增一条选择器数据后,并且通过WebSocket发送到网关,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。

3.1 WebsocketClient接收数据#

  • ShenyuWebsocketClient.onMessage()

在网关端有一个ShenyuWebsocketClient类,它继承了WebSocketClient,可以和WebSocket建立连接并通信。

public final class ShenyuWebsocketClient extends WebSocketClient {  // ......}

当在admin端通过websocket发送数据后,ShenyuWebsocketClient就可以通过onMessage()接收到数据,然后就可以自己进行处理。

public final class ShenyuWebsocketClient extends WebSocketClient {      // 接受到消息后执行    @Override    public void onMessage(final String result) {        // 处理接收到的数据        handleResult(result);    }        private void handleResult(final String result) {        // 数据反序列化        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);        // 哪种数据类型,插件、选择器、规则...        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());        // 哪种操作类型,更新、删除...        String eventType = websocketData.getEventType();        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // 处理数据        websocketDataHandler.executor(groupEnum, json, eventType);    }}

接收到数据后,首先进行了反序列化操作,读取数据类型和操作类型,紧接着,就交给websocketDataHandler.executor()进行处理。

3.2 执行Websocket事件处理器#

  • WebsocketDataHandler.executor()

通过工厂模式创建了Websocket数据处理器,每种数据类型,都提供了一个处理器:

插件 --> 插件数据处理器;

选择器 --> 选择器数据处理器;

规则 --> 规则数据处理器;

认证信息 --> 认证数据处理器;

元数据 --> 元数据处理器。


/** * 通过工厂模式创建 Websocket数据处理器 * The type Websocket cache handler. */public class WebsocketDataHandler {
    private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
    /**     * Instantiates a new Websocket data handler.     * 每种数据类型,提供一个处理器     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers  the meta data subscribers     * @param authDataSubscribers  the auth data subscribers     */    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,                                final List<MetaDataSubscriber> metaDataSubscribers,                                final List<AuthDataSubscriber> authDataSubscribers) {        // 插件 --> 插件数据处理器        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));        // 选择器 --> 选择器数据处理器        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));        // 规则 --> 规则数据处理器        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));        // 认证信息 --> 认证数据处理器        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));        // 元数据 --> 元数据处理器        ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));    }
    /**     * Executor.     *     * @param type      the type     * @param json      the json     * @param eventType the event type     */    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {        // 根据数据类型,找到对应的数据处理器        ENUM_MAP.get(type).handle(json, eventType);    }}

不同的数据类型,有不同的数据处理方式,所以有不同的实现类。但是它们之间也有相同的处理逻辑,所以可以通过模板方法设计模式来实现。相同的逻辑放在抽象类中的handle()方法中,不同逻辑就交给各自的实现类。

我们的案例是新增了一条选择器数据,所以会交给SelectorDataHandler( 选择器 --> 选择器数据处理器)进行数据处理。

3.3 判断事件类型#

  • AbstractDataHandler.handle()

实现数据变更的通用逻辑处理:根据不同的操作类型调用不同方法。


public abstract class AbstractDataHandler<T> implements DataHandler {
    /**     * Convert list.     * 不同的逻辑由各自实现类去实现     * @param json the json     * @return the list     */    protected abstract List<T> convert(String json);
    /**     * Do refresh.     * 不同的逻辑由各自实现类去实现     * @param dataList the data list     */    protected abstract void doRefresh(List<T> dataList);
    /**     * Do update.     * 不同的逻辑由各自实现类去实现     * @param dataList the data list     */    protected abstract void doUpdate(List<T> dataList);
    /**     * Do delete.     * 不同的逻辑由各自实现类去实现     * @param dataList the data list     */    protected abstract void doDelete(List<T> dataList);
    // 通用逻辑,抽象类实现    @Override    public void handle(final String json, final String eventType) {        List<T> dataList = convert(json);        if (CollectionUtils.isNotEmpty(dataList)) {            DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);            switch (eventTypeEnum) {                case REFRESH:                case MYSELF:                    doRefresh(dataList);  //刷新数据,全量同步                    break;                case UPDATE:                case CREATE:                    doUpdate(dataList); // 更新或创建数据,增量同步                    break;                case DELETE:                    doDelete(dataList);  // 删除数据                    break;                default:                    break;            }        }    }}

新增一条选择器数据,是新增操作,通过switch-case进入到doUpdate()方法中。

3.4 进入具体的数据处理器#

  • SelectorDataHandler.doUpdate()

/** * 选择器数据处理器 * The type Selector data handler. */@RequiredArgsConstructorpublic class SelectorDataHandler extends AbstractDataHandler<SelectorData> {
    private final PluginDataSubscriber pluginDataSubscriber;
    //......
    // 更新操作    @Override    protected void doUpdate(final List<SelectorData> dataList) {        dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);    }}

遍历数据,进入onSelectorSubscribe()方法。

  • PluginDataSubscriber.onSelectorSubscribe()

它没有其他逻辑,直接调用subscribeDataHandler()方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。

/** * 通用插件数据订阅者,负责处理所有插件、选择器和规则信息 * The type Common plugin data subscriber. */public class CommonPluginDataSubscriber implements PluginDataSubscriber {    //......     // 处理选择器数据    @Override    public void onSelectorSubscribe(final SelectorData selectorData) {        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);    }            // 订阅数据处理器,处理数据的更新或删除    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {        Optional.ofNullable(classData).ifPresent(data -> {            // 插件数据            if (data instanceof PluginData) {                PluginData pluginData = (PluginData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cachePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removePluginData(pluginData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));                }            } else if (data instanceof SelectorData) {  // 选择器数据                SelectorData selectorData = (SelectorData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                     Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));                } else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeSelectData(selectorData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));                }            } else if (data instanceof RuleData) {  // 规则数据                RuleData ruleData = (RuleData) data;                if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作                    // 将数据保存到网关内存                    BaseDataCache.getInstance().cacheRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));                } else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作                    // 从网关内存移除数据                    BaseDataCache.getInstance().removeRuleData(ruleData);                    // 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));                }            }        });    }    }

那么新增一条选择器数据,会进入下面的逻辑:

// 将数据保存到网关内存BaseDataCache.getInstance().cacheSelectData(selectorData);// 如果每个插件还有自己的处理逻辑,那么就去处理                  Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

一是将数据保存到网关的内存中。BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

public final class BaseDataCache {    // 私有变量    private static final BaseDataCache INSTANCE = new BaseDataCache();    // 私有构造器    private BaseDataCache() {    }        /**     * Gets instance.     *  公开方法     * @return the instance     */    public static BaseDataCache getInstance() {        return INSTANCE;    }        /**    *  缓存选择器数据的Map     * pluginName -> SelectorData.     */    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();        public void cacheSelectData(final SelectorData selectorData) {        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);    }           /**     * cache selector data.     * 缓存选择器数据     * @param data the selector data     */    private void selectorAccept(final SelectorData data) {        String key = data.getPluginName();        if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入            List<SelectorData> existList = SELECTOR_MAP.get(key);            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());            resultList.add(data);            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());            SELECTOR_MAP.put(key, collect);        } else {  // 新增操作,直接放到Map中            SELECTOR_MAP.put(key, Lists.newArrayList(data));        }    }    }

二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。

经过以上的源码追踪,并通过一个实际的案例,在admin端新增一条选择器数据,就将websocket数据同步的流程分析清除了。

我们还是用下面的一张图将网关端的数据同步流程串联一下:

数据同步的流程已经分析完了,但是还有一些问题没有分析到,就是网关是如何跟admin建立连接的?

4. 网关和admin建立websocket连接#

  • websocket配置

在网关的配置文件中有如下配置,并且引入了相关依赖,就会启动websocket相关服务。

shenyu:    file:      enabled: true    cross:      enabled: true    dubbo :      parameter: multi    sync:        websocket :  # 使用websocket进行数据同步             urls: ws://localhost:9095/websocket   # admin端的websocket地址

在网关中引入websocket的依赖。

<!--shenyu data sync start use websocket--><dependency>    <groupId>org.apache.shenyu</groupId>    <artifactId>shenyu-spring-boot-starter-sync-data-websocket</artifactId>    <version>${project.version}</version></dependency>
  • Websocket数据同步配置

通过springboot的条件装配,创建相关的bean。在网关启动的时候,如果我们配置了shenyu.sync.websocket.urls,那么Websocket数据同步配置就会被加载。这里通过spring boot starter完成依赖的加载。


/** * Websocket数据同步配置 * 通过springboot实现条件注入 * Websocket sync data configuration for spring boot. */@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {
    /**     * Websocket sync data service.     * Websocket数据同步服务     * @param websocketConfig   the websocket config     * @param pluginSubscriber the plugin subscriber     * @param metaSubscribers   the meta subscribers     * @param authSubscribers   the auth subscribers     * @return the sync data service     */    // 创建websocketSyncDataService    @Bean    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        log.info("you use websocket sync shenyu data.......");        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));    }
    /**     * Config websocket config.     *     * @return the websocket config     */    @Bean    @ConfigurationProperties(prefix = "shenyu.sync.websocket")    public WebsocketConfig websocketConfig() {        return new WebsocketConfig();  // 创建WebsocketConfig    }}

在项目的resources/META-INF目录先新建spring.factories文件,在文件中指明配置类。

  • Websocket数据同步服务

WebsocketSyncDataService中做了如下几件事情:

  • 读取配置中的urls,这个表示admin端的同步地址,有多个的话,使用","分割;
  • 创建调度线程池,一个admin分配一个,用于执行定时任务;
  • 创建ShenyuWebsocketClient,一个admin分配一个,用于和admin建立websocket通信;
  • 开始和admin端的websocket 建立连接;
  • 执行定时任务,每隔10秒执行一次。主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。如果没有断开,就进行 ping-pong 检测。

/** * Websocket数据同步服务 * Websocket sync data service. */@Slf4jpublic class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
    private final List<WebSocketClient> clients = new ArrayList<>();
    private final ScheduledThreadPoolExecutor executor;
    /**     * Instantiates a new Websocket sync cache.     * 创建Websocket数据同步服务     * @param websocketConfig      the websocket config     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers  the meta data subscribers     * @param authDataSubscribers  the auth data subscribers     */    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,                                    final PluginDataSubscriber pluginDataSubscriber,                                    final List<MetaDataSubscriber> metaDataSubscribers,                                    final List<AuthDataSubscriber> authDataSubscribers) {        // admin端的同步地址,有多个的话,使用","分割        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");        // 创建调度线程池,一个admin分配一个        executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));        for (String url : urls) {            try {                //创建WebsocketClient,一个admin分配一个                clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));            } catch (URISyntaxException e) {                log.error("websocket url({}) is error", url, e);            }        }        try {            for (WebSocketClient client : clients) {                // 和websocket server建立连接                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);                if (success) {                    log.info("websocket connection is successful.....");                } else {                    log.error("websocket connection is error.....");                }
                // 执行定时任务,每隔10秒执行一次                // 主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。                // 如果没有断开,就进行 ping-pong 检测                executor.scheduleAtFixedRate(() -> {                    try {                        if (client.isClosed()) {                            boolean reconnectSuccess = client.reconnectBlocking();                            if (reconnectSuccess) {                                log.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());                            } else {                                log.error("websocket reconnection server[{}] is error.....", client.getURI().toString());                            }                        } else {                            client.sendPing();                            log.debug("websocket send to [{}] ping message successful", client.getURI().toString());                        }                    } catch (InterruptedException e) {                        log.error("websocket connect is error :{}", e.getMessage());                    }                }, 10, 10, TimeUnit.SECONDS);            }            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/        } catch (InterruptedException e) {            log.info("websocket connection...exception....", e);        }
    }
    @Override    public void close() {        // 关闭 websocket client        for (WebSocketClient client : clients) {            if (!client.isClosed()) {                client.close();            }        }        // 关闭线程池        if (Objects.nonNull(executor)) {            executor.shutdown();        }    }}
  • ShenyuWebsocketClient

ShenYu中创建的WebSocket客户端,用于和admin端通信。第一次成功建立连接后,同步全量数据,后续进行增量同步。


/** * 在ShenYu中自定义的WebSocket客户端 * The type shenyu websocket client. */@Slf4jpublic final class ShenyuWebsocketClient extends WebSocketClient {        private volatile boolean alreadySync = Boolean.FALSE;        private final WebsocketDataHandler websocketDataHandler;        /**     * Instantiates a new shenyu websocket client.     * 创建ShenyuWebsocketClient     * @param serverUri             the server uri  服务端uri     * @param pluginDataSubscriber the plugin data subscriber 插件数据订阅器     * @param metaDataSubscribers   the meta data subscribers 元数据订阅器     * @param authDataSubscribers   the auth data subscribers 认证数据订阅器     */    public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {        super(serverUri);        this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);    }
    // 成功建立连接后执行    @Override    public void onOpen(final ServerHandshake serverHandshake) {        // 防止重新建立连接时,再次执行,所以用alreadySync进行判断        if (!alreadySync) {            // 同步所有数据,MYSELF 类型            send(DataEventTypeEnum.MYSELF.name());            alreadySync = true;        }    }
    // 接收到消息后执行    @Override    public void onMessage(final String result) {        // 处理接收到的数据        handleResult(result);    }        // 关闭后执行    @Override    public void onClose(final int i, final String s, final boolean b) {        this.close();    }        // 失败后执行    @Override    public void onError(final Exception e) {        this.close();    }        @SuppressWarnings("ALL")    private void handleResult(final String result) {        // 数据反序列化        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);        // 哪种数据类型,插件、选择器、规则...        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());        // 哪种操作类型,更新、删除...        String eventType = websocketData.getEventType();        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // 处理数据        websocketDataHandler.executor(groupEnum, json, eventType);    }}

5. 总结#

本文通过一个实际案例,对websocket的数据同步原理进行了源码分析。涉及到的主要知识点如下:

  • websocket支持双向通信,性能好,推荐使用;
  • 通过Spring完成事件发布和监听;
  • 通过抽象DataChangedListener接口,支持多种同步策略,面向接口编程;
  • 使用工厂模式创建 WebsocketDataHandler,实现不同数据类型的处理;
  • 使用模板方法设计模式实现AbstractDataHandler,处理通用的操作类型;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过SpringBoot的条件装配和starter加载机制实现配置类的加载。