Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API
网关。
Apache ShenYu
网关使用 dubbo
插件完成对 dubbo
服务的调用。你可以查看官方文档 Dubbo快速开始 了解如何使用该插件。
本文基于shenyu-2.4.3
版本进行源码分析,官网的介绍请参考 Dubbo服务接入 。
1. 服务注册#
以官网提供的例子为例 shenyu-examples-dubbo 。 假如你的dubbo
服务定义如下(spring-dubbo.xml
):
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo https://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<dubbo:application name="test-dubbo-service"/> <dubbo:registry address="${dubbo.registry.address}"/> <dubbo:protocol name="dubbo" port="20888"/>
<dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>
</beans>
声明应用服务名称,注册中心地址,使用dubbo
协议,声明服务接口,对应接口实现类:
/** * DubboTestServiceImpl. */@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService { @Override @ShenyuDubboClient(path = "/findById", desc = "Query by Id") public DubboTest findById(final String id) { return new DubboTest(id, "hello world shenyu Apache, findById"); }
//......}
在接口实现类中,使用注解@ShenyuDubboClient
向shenyu-admin
注册服务。该注解的作用及原理,稍后再进行分析。
在配置文件application.yml
中的配置信息:
server: port: 8011 address: 0.0.0.0 servlet: context-path: /spring: main: allow-bean-definition-overriding: truedubbo: registry: address: zookeeper://localhost:2181 shenyu: register: registerType: http serverLists: http://localhost:9095 props: username: admin password: 123456 client: dubbo: props: contextPath: /dubbo appName: dubbo
在配置文件中,声明dubbo
使用的注册中心地址,dubbo
服务向shenyu-admin
注册,使用的方式是http
,注册地址是http://localhost:9095
。
关于注册方式的使用,请参考 应用客户端接入 。
1.1 声明注册接口#
使用注解@ShenyuDubboClient
将服务注册到网关。简单demo
如下:
// dubbo服务@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService { @Override @ShenyuDubboClient(path = "/findById", desc = "Query by Id") // 需要注册的方法 public DubboTest findById(final String id) { return new DubboTest(id, "hello world shenyu Apache, findById"); }
//......}
注解定义:
/** * 作用于类和方法上 */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})@Inheritedpublic @interface ShenyuDubboClient { //注册路径 String path(); //规则名称 String ruleName() default ""; //描述信息 String desc() default "";
//是否启用 boolean enabled() default true;}
1.2 扫描注解信息#
注解扫描通过ApacheDubboServiceBeanListener
完成,它实现了ApplicationListener<ContextRefreshedEvent>
接口,在Spring
容器启动过程中,发生上下文刷新事件时,开始执行事件处理方法onApplicationEvent()
。
在构造器实例化的过程中:
- 读取属性配置
- 开启线程池
- 启动注册中心,用于向
shenyu-admin
注册
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {
// ......
//构造器 public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) { //1.读取属性配置 Properties props = clientConfig.getProps(); String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH); String appName = props.getProperty(ShenyuClientConstants.APP_NAME); if (StringUtils.isBlank(contextPath)) { throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName"); } this.contextPath = contextPath; this.appName = appName; this.host = props.getProperty(ShenyuClientConstants.HOST); this.port = props.getProperty(ShenyuClientConstants.PORT); //2.开启线程池 executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build()); //3.启动注册中心 publisher.start(shenyuClientRegisterRepository); }
/** * 上下文刷新事件,执行方法逻辑 */ @Override public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) { //...... }
- ApacheDubboServiceBeanListener#onApplicationEvent()
重写的方法逻辑:读取Dubbo
服务ServiceBean
,构建元数据对象和URI
对象,并向shenyu-admin
注册。
@Override public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) { //读取ServiceBean Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class); if (serviceBean.isEmpty()) { return; } //保证该方法只执行一次 if (!registered.compareAndSet(false, true)) { return; } //处理元数据对象 for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) { handler(entry.getValue()); } //处理URI对象 serviceBean.values().stream().findFirst().ifPresent(bean -> { publisher.publishEvent(buildURIRegisterDTO(bean)); }); }
private void handler(final ServiceBean<?> serviceBean) { //获取代理对象 Object refProxy = serviceBean.getRef(); //获取class信息 Class<?> clazz = refProxy.getClass(); if (AopUtils.isAopProxy(refProxy)) { clazz = AopUtils.getTargetClass(refProxy); } //获取所有方法 Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz); for (Method method : methods) { //读取ShenyuDubboClient注解信息 ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class); if (Objects.nonNull(shenyuDubboClient)) { //构建元数据对象,并注册 publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method)); } } }
private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) { //应用名称 String appName = buildAppName(serviceBean); //方法路径 String path = contextPath + shenyuDubboClient.path(); //描述信息 String desc = shenyuDubboClient.desc(); //服务名称 String serviceName = serviceBean.getInterface(); //规则名称 String configRuleName = shenyuDubboClient.ruleName(); String ruleName = ("".equals(configRuleName)) ? path : configRuleName; //方法名称 String methodName = method.getName(); //参数类型 Class<?>[] parameterTypesClazz = method.getParameterTypes(); String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(",")); return MetaDataRegisterDTO.builder() .appName(appName) .serviceName(serviceName) .methodName(methodName) .contextPath(contextPath) .host(buildHost()) .port(buildPort(serviceBean)) .path(path) .ruleName(ruleName) .pathDesc(desc) .parameterTypes(parameterTypes) .rpcExt(buildRpcExt(serviceBean)) //dubbo服务的扩展信息 .rpcType(RpcTypeEnum.DUBBO.getName()) .enabled(shenyuDubboClient.enabled()) .build(); }
buildRpcExt()
dubbo
服务的扩展信息
private String buildRpcExt(final ServiceBean serviceBean) { DubboRpcExt build = DubboRpcExt.builder() .group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//分组 .version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//版本 .loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//负载均衡策略,默认随机 .retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//重试次数,默认2 .timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//超时,默认3000 .sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent,默认false .cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//集群策略,默认failover .url("") .build(); return GsonUtils.getInstance().toJson(build); }
private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) { return URIRegisterDTO.builder() .contextPath(this.contextPath) //上下文路径 .appName(buildAppName(serviceBean))//应用名称 .rpcType(RpcTypeEnum.DUBBO.getName())//rpc类型:dubbo .host(buildHost()) //host .port(buildPort(serviceBean))//port .build(); }
具体的注册逻辑由注册中心实现,请参考 客户端接入原理 。
//向注册中心,发布注册事件 publisher.publishEvent();
1.3 处理注册信息#
客户端通过注册中心注册的元数据和URI
数据,在shenyu-admin
端进行处理,负责存储到数据库和同步给shenyu
网关。Dubbo
插件的客户端注册处理逻辑在ShenyuClientRegisterDubboServiceImpl
中。继承关系如下:
- ShenyuClientRegisterService:客户端注册服务,顶层接口;
- FallbackShenyuClientRegisterService:注册失败,提供重试操作;
- AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
- ShenyuClientRegisterDubboServiceImpl:实现
Dubbo
插件的注册;
1.3.1 注册服务#
@Override public String register(final MetaDataRegisterDTO dto) { //1. 注册选择器 String selectorHandler = selectorHandler(dto); String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler); //2. 注册规则 String ruleHandler = ruleHandler(); RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler); ruleService.registerDefault(ruleDTO); //3. 注册元数据 registerMetadata(dto); //4. 注册ContextPath String contextPath = dto.getContextPath(); if (StringUtils.isNotEmpty(contextPath)) { registerContextPath(dto); } return ShenyuResultMessage.SUCCESS; }
1.3.1.1 注册选择器#
- org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()
构建contextPath
,查找选择器信息是否存在,如果存在就返回id
;不存在就创建默认的选择器信息。
@Override public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) { // 构建contextPath String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName()); // 通过名称查找选择器信息是否存在 SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName); if (Objects.isNull(selectorDO)) { // 不存在就创建默认的选择器信息 return registerSelector(contextPath, pluginName, selectorHandler); } return selectorDO.getId(); }
默认选择器信息
在这里构建默认选择器信息及其条件属性。
//注册选择器 private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) { //构建选择器 SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId()); selectorDTO.setHandle(selectorHandler); //注册默认选择器 return registerDefault(selectorDTO); } //构建选择器 private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) { //构建默认选择器 SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath); selectorDTO.setPluginId(pluginId); //构建默认选择器的条件属性 selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath)); return selectorDTO; }
private SelectorDTO buildDefaultSelectorDTO(final String name) { return SelectorDTO.builder() .name(name) // 名称 .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义 .matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and .enabled(Boolean.TRUE) //默认启开启 .loged(Boolean.TRUE) //默认记录日志 .continued(Boolean.TRUE) //默认继续后续选择器 .sort(1) //默认顺序1 .build();}
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) { SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO(); selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI selectorConditionDTO.setParamName("/"); selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/** return Collections.singletonList(selectorConditionDTO);}
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) { //选择器信息 SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); //选择器条件属性 List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); if (StringUtils.isEmpty(selectorDTO.getId())) { // 向数据库插入选择器信息 selectorMapper.insertSelective(selectorDO); // 向数据库插入选择器条件属性 selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); } // 发布同步事件,向网关同步选择信息及其条件属性 publishEvent(selectorDO, selectorConditionDTOs); return selectorDO.getId();}
1.3.1.2 注册规则#
在注册服务的第二步中,开始构建默认规则,然后注册规则。
@Override public String register(final MetaDataRegisterDTO dto) { //1. 注册选择器 //...... //2. 注册规则 // 默认规则处理属性 String ruleHandler = ruleHandler(); // 构建默认规则信息 RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler); // 注册规则 ruleService.registerDefault(ruleDTO); //3. 注册元数据 //...... //4. 注册ContextPath //...... return ShenyuResultMessage.SUCCESS; }
@Override protected String ruleHandler() { // 默认规则处理属性 return new DubboRuleHandle().toJson(); }
Dubbo
插件默认规则处理属性
public class DubboRuleHandle implements RuleHandle {
/** * dubbo服务版本信息. */ private String version;
/** * 分组. */ private String group;
/** * 重试次数. */ private Integer retries = 0;
/** * 负载均衡策略:默认随机 */ private String loadbalance = LoadBalanceEnum.RANDOM.getName();
/** * 超时,默认3000 */ private long timeout = Constants.TIME_OUT;}
// 构建默认规则信息 private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) { return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath()); } // 构建默认规则信息 private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) { RuleDTO ruleDTO = RuleDTO.builder() .selectorId(selectorId) //关联的选择器id .name(ruleName) //规则名称 .matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and .enabled(Boolean.TRUE) // 默认开启 .loged(Boolean.TRUE) //默认记录日志 .sort(1) //默认顺序 1 .handle(ruleHandler) .build(); RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder() .paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI .paramName("/") .paramValue(path) //参数值path .build(); if (path.indexOf("*") > 1) { ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match } else { ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 = } ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO)); return ruleDTO; }
- org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()
注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。
@Override public String registerDefault(final RuleDTO ruleDTO) { RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName()); if (Objects.nonNull(exist)) { return ""; }
RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO); List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions(); if (StringUtils.isEmpty(ruleDTO.getId())) { // 向数据库插入规则信息 ruleMapper.insertSelective(ruleDO); //向数据库插入规则体条件属性 ruleConditions.forEach(ruleConditionDTO -> { ruleConditionDTO.setRuleId(ruleDO.getId()); ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO)); }); } // 向网关发布事件,进行数据同步 publishEvent(ruleDO, ruleConditions); return ruleDO.getId(); }
1.3.1.3 注册元数据#
元数据主要用于RPC
服务的调用。
@Override public String register(final MetaDataRegisterDTO dto) { //1. 注册选择器 //...... //2. 注册规则 //...... //3. 注册元数据 registerMetadata(dto); //4. 注册ContextPath //...... return ShenyuResultMessage.SUCCESS; }
@Override protected void registerMetadata(final MetaDataRegisterDTO dto) { // 获取metaDataService MetaDataService metaDataService = getMetaDataService(); // 元数据是否存在 MetaDataDO exist = metaDataService.findByPath(dto.getPath()); // 插入或更新元数据 metaDataService.saveOrUpdateMetaData(exist, dto); }
@Override public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) { DataEventTypeEnum eventType; // 数据类型转换 DTO->DO MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO); // 插入数据 if (Objects.isNull(exist)) { Timestamp currentTime = new Timestamp(System.currentTimeMillis()); metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid()); metaDataDO.setDateCreated(currentTime); metaDataDO.setDateUpdated(currentTime); metaDataMapper.insert(metaDataDO); eventType = DataEventTypeEnum.CREATE; } else { // 更新数据 metaDataDO.setId(exist.getId()); metaDataMapper.update(metaDataDO); eventType = DataEventTypeEnum.UPDATE; } // 发布同步事件到网关 eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType, Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO)))); }
1.3.2 注册URI#
- org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()
服务端收到客户端注册的URI
信息后,进行处理。
@Override public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) { String result; String key = key(selectorName); try { this.removeFallBack(key); // 注册URI result = this.doRegisterURI(selectorName, uriList); logger.info("Register success: {},{}", selectorName, uriList); } catch (Exception ex) { logger.warn("Register exception: cause:{}", ex.getMessage()); result = ""; // 注册失败后,进行重试 this.addFallback(key, new FallbackHolder(selectorName, uriList)); } return result; }
- org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()
从客户端注册的URI
中获取有效的URI
,更新对应的选择器handle
属性,向网关发送选择器更新事件。
@Override public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) { //参数检查 if (CollectionUtils.isEmpty(uriList)) { return ""; } //获取选择器信息 SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); if (Objects.isNull(selectorDO)) { throw new ShenyuException("doRegister Failed to execute,wait to retry."); } // 获取有效的URI List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList()); // 构建选择器的handle属性 String handler = buildHandle(validUriList, selectorDO); if (handler != null) { selectorDO.setHandle(handler); SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); selectorData.setHandle(handler); // 向数据库更新选择器的handle属性 selectorService.updateSelective(selectorDO); // 向网关发送选择器更新事件 eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData))); } return ShenyuResultMessage.SUCCESS; }
关于服务注册的源码分析就以及完成了,分析流程图如下:
接下来就分析dubbo
插件是如何根据这些信息向http
服务发起调用。
2. 服务调用#
dubbo
插件是ShenYu
网关用于将http
请求转成 dubbo协议
,调用dubbo
服务的核心处理插件。
以官网提供的案例 Dubbo快速开始 为例,一个dubbo
服务通过注册中心向shenyu-admin
注册后,通过ShenYu
网关代理,请求如下:
GET http://localhost:9195/dubbo/findById?id=100Accept: application/json
Dubbo
插件中,类继承关系如下:
- ShenyuPlugin:顶层接口,定义接口方法;
- AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
- AbstractDubboPlugin:dubbo插件抽象类,实现
dubbo
共有逻辑; - ApacheDubboPlugin:ApacheDubbo插件。
ShenYu网关支持ApacheDubbo和AlibabaDubbo
2.1 接收请求#
通过ShenYu
网关代理后,请求入口是ShenyuWebHandler
,它实现了org.springframework.web.server.WebHandler
接口。
public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> { //...... /** * 处理web请求 */ @Override public Mono<Void> handle(@NonNull final ServerWebExchange exchange) { // 执行默认插件链 Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange); if (scheduled) { return execute.subscribeOn(scheduler); } return execute; } private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
private int index;
private final List<ShenyuPlugin> plugins;
/** * 实例化默认插件链 */ DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) { this.plugins = plugins; }
/** * 执行每个插件. */ @Override public Mono<Void> execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { // 获取当前执行插件 ShenyuPlugin plugin = plugins.get(this.index++); // 是否跳过当前插件 boolean skip = plugin.skip(exchange); if (skip) { // 如果跳过就执行下一个 return this.execute(exchange); } // 执行当前插件 return plugin.execute(exchange, this); } return Mono.empty(); }); } }}
2.2 匹配规则#
- org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()
在execute()
方法中执行选择器和规则的匹配逻辑。
@Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { // 插件名称 String pluginName = named(); // 插件信息 PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName); if (pluginData != null && pluginData.getEnabled()) { // 选择器信息 final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName); if (CollectionUtils.isEmpty(selectors)) { return handleSelectorIfNull(pluginName, exchange, chain); } // 匹配选择器 SelectorData selectorData = matchSelector(exchange, selectors); if (Objects.isNull(selectorData)) { return handleSelectorIfNull(pluginName, exchange, chain); } selectorLog(selectorData, pluginName); // 规则信息 List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId()); if (CollectionUtils.isEmpty(rules)) { return handleRuleIfNull(pluginName, exchange, chain); } // 匹配规则 RuleData rule; if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) { //get last rule = rules.get(rules.size() - 1); } else { rule = matchRule(exchange, rules); } if (Objects.isNull(rule)) { return handleRuleIfNull(pluginName, exchange, chain); } ruleLog(rule, pluginName); // 执行插件 return doExecute(exchange, chain, selectorData, rule); } return chain.execute(exchange); }
2.3 执行GlobalPlugin#
- org.apache.shenyu.plugin.global.GlobalPlugin#execute()
GlobalPlugin
是一个全局插件,在execute()
方法中构建上下文信息。
public class GlobalPlugin implements ShenyuPlugin { // 构建上下文信息 private final ShenyuContextBuilder builder; //...... @Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { // 构建上下文信息,传入到 exchange 中 ShenyuContext shenyuContext = builder.build(exchange); exchange.getAttributes().put(Constants.CONTEXT, shenyuContext); return chain.execute(exchange); } //......}
- org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()
构建默认的上下文信息。
public class DefaultShenyuContextBuilder implements ShenyuContextBuilder { //...... @Override public ShenyuContext build(final ServerWebExchange exchange) { //构建参数 Pair<String, MetaData> buildData = buildData(exchange); //包装ShenyuContext return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight()); } private Pair<String, MetaData> buildData(final ServerWebExchange exchange) { //...... //根据请求的uri获取元数据 MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath()); if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) { exchange.getAttributes().put(Constants.META_DATA, metaData); return Pair.of(metaData.getRpcType(), metaData); } else { return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData()); } } //设置默认的上下文信息 private ShenyuContext buildDefaultContext(final ServerHttpRequest request) { String appKey = request.getHeaders().getFirst(Constants.APP_KEY); String sign = request.getHeaders().getFirst(Constants.SIGN); String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP); ShenyuContext shenyuContext = new ShenyuContext(); String path = request.getURI().getPath(); shenyuContext.setPath(path); //请求路径 shenyuContext.setAppKey(appKey); shenyuContext.setSign(sign); shenyuContext.setTimestamp(timestamp); shenyuContext.setStartDateTime(LocalDateTime.now()); Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name()));//请求方法 return shenyuContext; } }
- org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()
包装ShenyuContext
:
public class DubboShenyuContextDecorator implements ShenyuContextDecorator { @Override public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) { shenyuContext.setModule(metaData.getAppName());//获取AppName shenyuContext.setMethod(metaData.getServiceName()); //获取ServiceName shenyuContext.setContextPath(metaData.getContextPath()); //获取contextPath shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName()); // dubbo服务 return shenyuContext; } @Override public String rpcType() { return RpcTypeEnum.DUBBO.getName(); }}
2.4 执行RpcParamTransformPlugin#
RpcParamTransformPlugin
负责从http
请求中读取参数,保存到exchange
中,传递给rpc
服务。
- org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()
在execute()
方法中,执行该插件的核心逻辑:从exchange
中获取请求信息,根据请求传入的内容形式处理参数。
public class RpcParamTransformPlugin implements ShenyuPlugin {
@Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { //从exchange中获取请求信息 ServerHttpRequest request = exchange.getRequest(); ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT); if (Objects.nonNull(shenyuContext)) { // 如果请求参数格式是APPLICATION_JSON MediaType mediaType = request.getHeaders().getContentType(); if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { return body(exchange, request, chain); } // 如果请求参数格式是APPLICATION_FORM_URLENCODED if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) { return formData(exchange, request, chain); } //一般查询请求 return query(exchange, request, chain); } return chain.execute(exchange); } // 如果请求参数格式是APPLICATION_JSON private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) { return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody()) .flatMap(body -> { exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中 return chain.execute(exchange); })); } // 如果请求参数格式是APPLICATION_FORM_URLENCODED private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) { return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody()) .flatMap(map -> { String param = resolveBodyFromRequest(map); LinkedMultiValueMap<String, String> linkedMultiValueMap; try { linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据 } catch (UnsupportedEncodingException e) { return Mono.error(e); } exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中 return chain.execute(exchange); })); } //一般查询请求 private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) { exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中 return chain.execute(exchange); } //...... }
2.5 执行DubboPlugin#
- org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()
在doExecute()
方法中,主要是检查元数据和参数。
public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin { @Override public Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) { //获取参数 String param = exchange.getAttribute(Constants.PARAM_TRANSFORM); //获取上下文信息 ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT); assert shenyuContext != null; //获取元数据 MetaData metaData = exchange.getAttribute(Constants.META_DATA); //检查元数据 if (!checkMetaData(metaData)) { LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData); exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR); Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null); return WebFluxResultUtils.result(exchange, error); } //检查元数据和参数 if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) { exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR); Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null); return WebFluxResultUtils.result(exchange, error); } //设置rpcContext this.rpcContext(exchange); //进行dubbo服务调用 return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param); }}
- org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()
在doDubboInvoker()
方法中设置特殊的上下文信息,然后开始dubbo的泛化调用。
public class ApacheDubboPlugin extends AbstractDubboPlugin { @Override protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule, final MetaData metaData, final String param) { //设置当前的选择器和规则信息,以及请求地址,用于支持dubbo的灰度 RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId()); RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId()); RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress()); //dubbo的泛化调用 final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange); //执行下一个插件 return result.then(chain.execute(exchange)); }}
- org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()
genericInvoker()
方法:
- 获取
ReferenceConfig
对象; - 获取泛化服务
GenericService
对象; - 构造请求参数
pair
对象; - 发起异步的泛化调用。
public class ApacheDubboProxyService { //......
/** * Generic invoker object. */ public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException { //1.获取ReferenceConfig对象 ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath()); //如果没有获取到 if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) { //失效当前缓存的信息 ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath()); //使用元数据进行再次初始化 reference = ApacheDubboConfigCache.getInstance().initRef(metaData); } //2.获取泛化服务GenericService对象 GenericService genericService = reference.get(); //3.构造请求参数pair对象 Pair<String[], Object[]> pair; if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) { pair = new ImmutablePair<>(new String[]{}, new Object[]{}); } else { pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes()); } //4.发起异步的泛化调用 return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> { //处理结果 if (Objects.isNull(ret)) { ret = Constants.DUBBO_RPC_RESULT_EMPTY; } exchange.getAttributes().put(Constants.RPC_RESULT, ret); exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName()); return ret; })).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常 } //泛化调用,异步操作 private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException { genericService.$invoke(method, parameterTypes, args); Object resultFromFuture = RpcContext.getContext().getFuture(); return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture); }}
通过泛化调用就可以实现在网关调用dubbo
服务了。
ReferenceConfig
对象是支持泛化调用的关键对象 ,它的初始化操作是在数据同步的时候完成的。这里涉及两部分数据,一是同步的插件handler
信息,二是同步的插件元数据信息。
- org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()
当插件数据更新时,数据同步模块会将数据从shenyu-admin
同步到网关。在handlerPlugin()
中执行初始化操作。
public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler { //...... //初始化配置缓存 protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);
@Override public void handlerPlugin(final PluginData pluginData) { if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) { //数据反序列化 DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class); DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class); if (Objects.isNull(dubboRegisterConfig)) { return; } if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) { // 执行初始化操作 this.initConfigCache(dubboRegisterConfig); } Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig); } } //......}
- org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()
执行初始化操作。
public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {
@Override protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) { //执行初始化操作 ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig); //失效之前缓存的结果 ApacheDubboConfigCache.getInstance().invalidateAll(); }}
- org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()
在初始化中,设置registryConfig
和consumerConfig
。
public final class ApacheDubboConfigCache extends DubboConfigCache { //...... /** * 初始化 */ public void init(final DubboRegisterConfig dubboRegisterConfig) { //创建ApplicationConfig if (Objects.isNull(applicationConfig)) { applicationConfig = new ApplicationConfig("shenyu_proxy"); } //协议或者地址发生改变时,需要更新registryConfig if (needUpdateRegistryConfig(dubboRegisterConfig)) { RegistryConfig registryConfigTemp = new RegistryConfig(); registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol()); registryConfigTemp.setId("shenyu_proxy"); registryConfigTemp.setRegister(false); registryConfigTemp.setAddress(dubboRegisterConfig.getRegister()); Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup); registryConfig = registryConfigTemp; } //创建ConsumerConfig if (Objects.isNull(consumerConfig)) { consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> { ConsumerConfig consumerConfig = new ConsumerConfig(); consumerConfig.refresh(); return consumerConfig; }); //设置ConsumerConfig Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool); Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads); Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads); Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues); } } //是否需要更新注册配置 private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) { if (Objects.isNull(registryConfig)) { return true; } return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol()) || !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress()) || !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol()); }
//......}
- org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()
当元数据更新时,数据同步模块会将数据从shenyu-admin
同步到网关。在onSubscribe()
方法中执行元数据更新操作。
public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber { //本地内存缓存 private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();
//元数据发生更新 public void onSubscribe(final MetaData metaData) { // dubbo服务的元数据更新 if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) { //对应的元数据是否存在 MetaData exist = META_DATA.get(metaData.getPath()); if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) { // 首次初始化 ApacheDubboConfigCache.getInstance().initRef(metaData); } else { // 对应的元数据发生了更新操作 if (!Objects.equals(metaData.getServiceName(), exist.getServiceName()) || !Objects.equals(metaData.getRpcExt(), exist.getRpcExt()) || !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes()) || !Objects.equals(metaData.getMethodName(), exist.getMethodName())) { //根据最新的元数据再次构建ReferenceConfig ApacheDubboConfigCache.getInstance().build(metaData); } } //本地内存缓存 META_DATA.put(metaData.getPath(), metaData); } }
//删除元数据 public void unSubscribe(final MetaData metaData) { if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) { //使ReferenceConfig失效 ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath()); META_DATA.remove(metaData.getPath()); } }}
- org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()
通过metaData
构建ReferenceConfig
对象。
public final class ApacheDubboConfigCache extends DubboConfigCache { //...... public ReferenceConfig<GenericService> initRef(final MetaData metaData) { try { //先尝试从缓存中获取,存在就直接返回 ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath()); if (StringUtils.isNoneBlank(referenceConfig.getInterface())) { return referenceConfig; } } catch (ExecutionException e) { LOG.error("init dubbo ref exception", e); } //不存在,就构建 return build(metaData); }
/** * Build reference config. */ @SuppressWarnings("deprecation") public ReferenceConfig<GenericService> build(final MetaData metaData) { if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) { return new ReferenceConfig<>(); } ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //新建ReferenceConfig reference.setGeneric("true"); //泛化调用 reference.setAsync(true);//支持异步
reference.setApplication(applicationConfig);//设置应用配置 reference.setRegistry(registryConfig);//设置注册中心配置 reference.setConsumer(consumerConfig);//设置消费者配置 reference.setInterface(metaData.getServiceName());//设置服务接口 reference.setProtocol("dubbo");//设置dubbo协议 reference.setCheck(false); //不检查 service provider reference.setLoadbalance("gray");//支持灰度
Map<String, String> parameters = new HashMap<>(2); parameters.put("dispatcher", "direct"); reference.setParameters(parameters);//自定义参数
String rpcExt = metaData.getRpcExt();//rpc扩展参数 DubboParam dubboParam = parserToDubboParam(rpcExt);//反序列化 if (Objects.nonNull(dubboParam)) { if (StringUtils.isNoneBlank(dubboParam.getVersion())) { reference.setVersion(dubboParam.getVersion());//设置版本 } if (StringUtils.isNoneBlank(dubboParam.getGroup())) { reference.setGroup(dubboParam.getGroup());//设置分组 } if (StringUtils.isNoneBlank(dubboParam.getUrl())) { reference.setUrl(dubboParam.getUrl());//设置url } if (StringUtils.isNoneBlank(dubboParam.getCluster())) { reference.setCluster(dubboParam.getCluster());//设置Cluster type } Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent } try { //获取GenericService Object obj = reference.get(); if (Objects.nonNull(obj)) { LOG.info("init apache dubbo reference success there meteData is :{}", metaData); //缓存当前的reference cache.put(metaData.getPath(), reference); } } catch (Exception e) { LOG.error("init apache dubbo reference exception", e); } return reference; } //...... }
2.6 执行ResponsePlugin#
- org.apache.shenyu.plugin.response.ResponsePlugin#execute()
响应结果由ResponsePlugin
插件处理。
@Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT); assert shenyuContext != null; // 根据rpc类型处理结果 return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain); }
处理类型由MessageWriter
决定,类继承关系如下:
- MessageWriter:接口,定义消息处理方法;
- NettyClientMessageWriter:处理
Netty
调用结果; - RPCMessageWriter:处理
RPC
调用结果; - WebClientMessageWriter:处理
WebClient
调用结果;
Dubbo
服务调用,处理结果当然是RPCMessageWriter
了。
- org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()
在writeWith()
方法中处理响应结果。
public class RPCMessageWriter implements MessageWriter {
@Override public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) { return chain.execute(exchange).then(Mono.defer(() -> { Object result = exchange.getAttribute(Constants.RPC_RESULT); //获取结果 if (Objects.isNull(result)) { //处理异常 Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null); return WebFluxResultUtils.result(exchange, error); } return WebFluxResultUtils.result(exchange, result);//返回结果 })); }}
分析至此,关于Dubbo
插件的源码分析就完成了,分析流程图如下:
3. 小结#
本文源码分析从Dubbo
服务注册开始,到Dubbo
插件的服务调用。Dubbo
插件主要用来处理将http
请求转成dubbo
协议,主要逻辑是通过泛化调用实现。