Dubbo插件源码分析
Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的
API网关。
Apache ShenYu 网关使用 dubbo 插件完成对 dubbo服务的调用。你可以查看官方文档 Dubbo快速开始 了解如何使用该插件。
本文基于
shenyu-2.4.3版本进行源码分析,官网的介绍请参考 Dubbo服务接入 。
1. 服务注册
以官网提供的例子为例 shenyu-examples-dubbo 。 假如你的dubbo服务定义如下(spring-dubbo.xml):
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
https://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<dubbo:application name="test-dubbo-service"/>
<dubbo:registry address="${dubbo.registry.address}"/>
<dubbo:protocol name="dubbo" port="20888"/>
<dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>
</beans>
声明应用服务名称,注册中心地址,使用dubbo协议,声明服务接口,对应接口实现类:
/**
* DubboTestServiceImpl.
*/
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {
@Override
@ShenyuDubboClient(path = "/findById", desc = "Query by Id")
public DubboTest findById(final String id) {
return new DubboTest(id, "hello world shenyu Apache, findById");
}
//......
}
在接口实现类中,使用注解@ShenyuDubboClient向shenyu-admin注册服务。该注解的作用及原理,稍后再进行分析。
在配置文件application.yml中的配置信息:
server:
port: 8011
address: 0.0.0.0
servlet:
context-path: /
spring:
main:
allow-bean-definition-overriding: true
dubbo:
registry:
address: zookeeper://localhost:2181 # dubbo使用的注册中心
shenyu:
register:
registerType: http #注册方式
serverLists: http://localhost:9095 #注册地址
props:
username: admin
password: 123456
client:
dubbo:
props:
contextPath: /dubbo
appName: dubbo
在配置文件中,声明dubbo使用的注册中心地址,dubbo服务向shenyu-admin注册,使用的方式是http,注册地址是http://localhost:9095。
关于注册方式的使用,请参考 应用客户端接入 。
1.1 声明注册接口
使用注解@ShenyuDubboClient将服务注册到网关。简单demo如下:
// dubbo服务
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {
@Override
@ShenyuDubboClient(path = "/findById", desc = "Query by Id") // 需要注册的方法
public DubboTest findById(final String id) {
return new DubboTest(id, "hello world shenyu Apache, findById");
}
//......
}
注解定义:
/**
* 作用于类和方法上
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Inherited
public @interface ShenyuDubboClient {
//注册路径
String path();
//规则名称
String ruleName() default "";
//描述信息
String desc() default "";
//是否启用
boolean enabled() default true;
}
1.2 扫描注解信息
注解扫描通过ApacheDubboServiceBeanListener完成,它实现了ApplicationListener<ContextRefreshedEvent>接口,在Spring容器启动过程中,发生上下文刷新事件时,开始执行事件处理方法onApplicationEvent()。
在构造器实例化的过程中:
- 读取属性配置
- 开启线程池
- 启动注册中心,用于向
shenyu-admin注册
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {
// ......
//构造器
public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
//1.读取属性配置
Properties props = clientConfig.getProps();
String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
String appName = props.getProperty(ShenyuClientConstants.APP_NAME);
if (StringUtils.isBlank(contextPath)) {
throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName");
}
this.contextPath = contextPath;
this.appName = appName;
this.host = props.getProperty(ShenyuClientConstants.HOST);
this.port = props.getProperty(ShenyuClientConstants.PORT);
//2.开启线程池
executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build());
//3.启动注册中心
publisher.start(shenyuClientRegisterRepository);
}
/**
* 上下文刷新事件,执行方法逻辑
*/
@Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
//......
}
- ApacheDubboServiceBeanListener#onApplicationEvent()
重写的方法逻辑:读取Dubbo服务ServiceBean,构建元数据对象和URI对象,并向shenyu-admin注册。
@Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
//读取ServiceBean
Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class);
if (serviceBean.isEmpty()) {
return;
}
//保证该方法只执行一次
if (!registered.compareAndSet(false, true)) {
return;
}
//处理元数据对象
for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) {
handler(entry.getValue());
}
//处理URI对象
serviceBean.values().stream().findFirst().ifPresent(bean -> {
publisher.publishEvent(buildURIRegisterDTO(bean));
});
}
-
handler()
在
handler()方法中,从serviceBean中读取所有方法,判断方法上是否有ShenyuDubboClient注解,如果存在就构建元数据对象,并通过注册中心,向shenyu-admin注册该方法。
private void handler(final ServiceBean<?> serviceBean) {
//获取代理对象
Object refProxy = serviceBean.getRef();
//获取class信息
Class<?> clazz = refProxy.getClass();
if (AopUtils.isAopProxy(refProxy)) {
clazz = AopUtils.getTargetClass(refProxy);
}
//获取所有方法
Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
for (Method method : methods) {
//读取ShenyuDubboClient注解信息
ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class);
if (Objects.nonNull(shenyuDubboClient)) {
//构建元数据对象,并注册
publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method));
}
}
}
-
buildMetaDataDTO()
构建元数据对象,在这里构建方法注册的必要信息,后续用于选择器或规则匹配。
private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) {
//应用名称
String appName = buildAppName(serviceBean);
//方法路径
String path = contextPath + shenyuDubboClient.path();
//描述信息
String desc = shenyuDubboClient.desc();
//服务名称
String serviceName = serviceBean.getInterface();
//规则名称
String configRuleName = shenyuDubboClient.ruleName();
String ruleName = ("".equals(configRuleName)) ? path : configRuleName;
//方法名称
String methodName = method.getName();
//参数类型
Class<?>[] parameterTypesClazz = method.getParameterTypes();
String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(","));
return MetaDataRegisterDTO.builder()
.appName(appName)
.serviceName(serviceName)
.methodName(methodName)
.contextPath(contextPath)
.host(buildHost())
.port(buildPort(serviceBean))
.path(path)
.ruleName(ruleName)
.pathDesc(desc)
.parameterTypes(parameterTypes)
.rpcExt(buildRpcExt(serviceBean)) //dubbo服务的扩展信息
.rpcType(RpcTypeEnum.DUBBO.getName())
.enabled(shenyuDubboClient.enabled())
.build();
}
-
buildRpcExt()
dubbo服务的扩展信息
private String buildRpcExt(final ServiceBean serviceBean) {
DubboRpcExt build = DubboRpcExt.builder()
.group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//分组
.version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//版本
.loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//负载均衡策略,默认随机
.retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//重试次数,默认2
.timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//超时,默认3000
.sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent,默认false
.cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//集群策略,默认failover
.url("")
.build();
return GsonUtils.getInstance().toJson(build);
}
-
buildURIRegisterDTO()
构建
URI对象,注册服务本身的信息,后续可用于服务探活。
private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) {
return URIRegisterDTO.builder()
.contextPath(this.contextPath) //上下文路径
.appName(buildAppName(serviceBean))//应用名称
.rpcType(RpcTypeEnum.DUBBO.getName())//rpc类型:dubbo
.host(buildHost()) //host
.port(buildPort(serviceBean))//port
.build();
}
具体的注册逻辑由注册中心实现,请参考 客户端接入原理 。
//向注册中心,发布注册事件
publisher.publishEvent();
1.3 处理注册信息
客户端通过注册中心注册的元数据和URI数据,在shenyu-admin端进行处理,负责存储到数据库和同步给shenyu网关。Dubbo插件的客户端注册处理逻辑在ShenyuClientRegisterDubboServiceImpl中。继承关系如下:

- ShenyuClientRegisterService:客户端注册服务,顶层接口;
- FallbackShenyuClientRegisterService:注册失败,提供重试操作;
- AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
- ShenyuClientRegisterDubboServiceImpl:实现
Dubbo插件 的注册;
1.3.1 注册服务
-
org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()
客户端通过注册中心注册的元数据
MetaDataRegisterDTO对象在shenyu-admin的register()方法被接送到。
@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
//2. 注册规则
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
//3. 注册元数据
registerMetadata(dto);
//4. 注册ContextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
1.3.1.1 注册选择器
- org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()
构建contextPath,查找选择器信息是否存在,如果存在就返回id;不存在就创建默认的选择器信息。
@Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
// 构建contextPath
String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
// 通过名称查找选择器信息是否存在
SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
if (Objects.isNull(selectorDO)) {
// 不存在就创建默认的选择器信息
return registerSelector(contextPath, pluginName, selectorHandler);
}
return selectorDO.getId();
}
-
默认选择器信息
在这里构建默认选择器信息及其条件属性。
//注册选择器
private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {
//构建选择器
SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
selectorDTO.setHandle(selectorHandler);
//注册默认选择器
return registerDefault(selectorDTO);
}
//构建选择器
private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {
//构建默认选择器
SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);
selectorDTO.setPluginId(pluginId);
//构建默认选择器的条件属性
selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));
return selectorDTO;
}
- 构建默认选择器
private SelectorDTO buildDefaultSelectorDTO(final String name) {
return SelectorDTO.builder()
.name(name) // 名称
.type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义
.matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and
.enabled(Boolean.TRUE) //默认启开启
.loged(Boolean.TRUE) //默认记录日志
.continued(Boolean.TRUE) //默认继续后续选择器
.sort(1) //默认顺序1
.build();
}
- 构建默认选择器条件属性
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {
SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();
selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI
selectorConditionDTO.setParamName("/");
selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match
selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**
return Collections.singletonList(selectorConditionDTO);
}
- 注册默认选择器
@Override
public String registerDefault(final SelectorDTO selectorDTO) {
//选择器信息
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
//选择器条件属性
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 向数据库插入选择器信息
selectorMapper.insertSelective(selectorDO);
// 向数据库插入选择器条件属性
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
}
// 发布同步事件,向网关同步选择信息及其条件属性
publishEvent(selectorDO, selectorConditionDTOs);
return selectorDO.getId();
}
1.3.1.2 注册规则
在注册服务的第二步中,开始构建默认规则,然后注册规则。
@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......
//2. 注册规则
// 默认规则处理属性
String ruleHandler = ruleHandler();
// 构建默认规则信息
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
// 注册规则
ruleService.registerDefault(ruleDTO);
//3. 注册元数据
//......
//4. 注册ContextPath
//......
return ShenyuResultMessage.SUCCESS;
}
- 默认规则处理属性
@Override
protected String ruleHandler() {
// 默认规则处理属性
return new DubboRuleHandle().toJson();
}
Dubbo插件默认规则处理属性
public class DubboRuleHandle implements RuleHandle {
/**
* dubbo服务版本信息.
*/
private String version;
/**
* 分组.
*/
private String group;
/**
* 重试次数.
*/
private Integer retries = 0;
/**
* 负载均衡策略:默认随机
*/
private String loadbalance = LoadBalanceEnum.RANDOM.getName();
/**
* 超时,默认3000
*/
private long timeout = Constants.TIME_OUT;
}
- 构建默认规则信息
// 构建默认规则信息
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}
// 构建默认规则信息
private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
RuleDTO ruleDTO = RuleDTO.builder()
.selectorId(selectorId) //关联的选择器id
.name(ruleName) //规则名称
.matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and
.enabled(Boolean.TRUE) // 默认开启
.loged(Boolean.TRUE) //默认记录日志
.sort(1) //默认顺序 1
.handle(ruleHandler)
.build();
RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
.paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI
.paramName("/")
.paramValue(path) //参数值path
.build();
if (path.indexOf("*") > 1) {
ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match
} else {
ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 =
}
ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
return ruleDTO;
}
- org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()
注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。
@Override
public String registerDefault(final RuleDTO ruleDTO) {
RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());
if (Objects.nonNull(exist)) {
return "";
}
RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
if (StringUtils.isEmpty(ruleDTO.getId())) {
// 向数据库插入规则信息
ruleMapper.insertSelective(ruleDO);
//向数据库插入规则体条件属性
ruleConditions.forEach(ruleConditionDTO -> {
ruleConditionDTO.setRuleId(ruleDO.getId()); ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));
});
}
// 向网关发布事件,进行数据同步
publishEvent(ruleDO, ruleConditions);
return ruleDO.getId();
}
1.3.1.3 注册元数据
元数据主要用于RPC服务的调用。
@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......
//2. 注册规则
//......
//3. 注册元数据
registerMetadata(dto);
//4. 注册ContextPath
//......
return ShenyuResultMessage.SUCCESS;
}
-
org.apache.shenyu.admin.service.register.ShenyuClientRegisterDubboServiceImpl#registerMetadata()
插入或更新元数据,然后发布同步事件到网关。
@Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {
// 获取metaDataService
MetaDataService metaDataService = getMetaDataService();
// 元数据是否存在
MetaDataDO exist = metaDataService.findByPath(dto.getPath());
// 插入或更 新元数据
metaDataService.saveOrUpdateMetaData(exist, dto);
}
@Override
public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {
DataEventTypeEnum eventType;
// 数据类型转换 DTO->DO
MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);
// 插入数据
if (Objects.isNull(exist)) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());
metaDataDO.setDateCreated(currentTime);
metaDataDO.setDateUpdated(currentTime);
metaDataMapper.insert(metaDataDO);
eventType = DataEventTypeEnum.CREATE;
} else {
// 更新数据
metaDataDO.setId(exist.getId());
metaDataMapper.update(metaDataDO);
eventType = DataEventTypeEnum.UPDATE;
}
// 发布同步事件到网关
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,
Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
}