Skip to main content

One post tagged with "Apache ShenYu"

View All Tags

· One min read

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu 网关使用 divide 插件来处理 http 请求。你可以查看官方文档 Http快速开始 了解如何使用该插件。

本文基于shenyu-2.4.3版本进行源码分析,官网的介绍请参考 Http服务接入

1. 服务注册

1.1 声明注册接口

使用注解@ShenyuSpringMvcClient将服务注册到网关。简单demo如下:

@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order") // API注册
public class OrderController {
@GetMapping("/findById")
@ShenyuSpringMvcClient(path = "/findById", desc = "Find by id") // 方法注册
public OrderDTO findById(@RequestParam("id") final String id) {
return build(id, "hello world findById");
}
}

注解定义:


/**
* 作用于类和方法上
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {

//注册路径
String path() default "";

//规则名称
String ruleName() default "";

//描述信息
String desc() default "";

//是否启用
boolean enabled() default true;

//注册元数据
boolean registerMetaData() default false;
}

1.2 扫描注解信息

注解扫描通过SpringMvcClientBeanPostProcessor完成,它实现了BeanPostProcessor接口,是Spring提供的后置处理器。

在构造器实例化的过程中:

  • 读取属性配置
  • 添加注解,读取path信息
  • 启动注册中心,向shenyu-admin注册
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
//...
/**
* 构造器实例化
*/
public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 1. 读取属性配置
Properties props = clientConfig.getProps();
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH, "");
if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
String errorMsg = "http register param must config the appName or contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// 2. 添加注解
mappingAnnotation.add(ShenyuSpringMvcClient.class);
mappingAnnotation.add(PostMapping.class);
mappingAnnotation.add(GetMapping.class);
mappingAnnotation.add(DeleteMapping.class);
mappingAnnotation.add(PutMapping.class);
mappingAnnotation.add(RequestMapping.class);
// 3. 启动注册中心
publisher.start(shenyuClientRegisterRepository);
}

@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// 重写后置处理器逻辑

return bean;
}

  • SpringMvcClientBeanPostProcessor#postProcessAfterInitialization()

重写后置处理器逻辑:读取注解信息,构建元数据对象和URI对象,并向shenyu-admin注册。

    @Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// 1. 如果是注册整个服务或者不是Controller类,就不处理
if (Boolean.TRUE.equals(isFull) || !hasAnnotation(bean.getClass(), Controller.class)) {
return bean;
}
// 2. 读取类上的注解 ShenyuSpringMvcClient
final ShenyuSpringMvcClient beanShenyuClient = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);
// 2.1构建superPath
final String superPath = buildApiSuperPath(bean.getClass());
// 2.2 是否注册整个类方法
if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {
// 构建元数据对象,然后向shenyu-admin注册
publisher.publishEvent(buildMetaDataDTO(beanShenyuClient, pathJoin(contextPath, superPath)));
return bean;
}
// 3. 读取所有方法
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
for (Method method : methods) {
// 3.1 读取方法上的注解 ShenyuSpringMvcClient
ShenyuSpringMvcClient methodShenyuClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
// 如果方法上面没有注解,就用类上面的注解
methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;
if (Objects.nonNull(methodShenyuClient)) {
// 3.2 构建path信息,构建元数据对象,向shenyu-admin注册
publisher.publishEvent(buildMetaDataDTO(methodShenyuClient, buildApiPath(method, superPath)));
}
}

return bean;
}
  • 1.如果是注册整个服务或者不是Controller类,就不处理
  • 2.读取类上的注解 ShenyuSpringMvcClient,如果是注册整个类,就在这里构建元数据对象,然后向shenyu-admin注册
  • 3.处理方法上的注解 ShenyuSpringMvcClient,针对特定方法构建path信息,构建元数据对象,然后向shenyu-admin注册

这里有两个取path的方法,需要特别说明一下:

  • buildApiSuperPath()

    构造SuperPath:先从类上的注解ShenyuSpringMvcClientpath属性,如果没有,就从当前类的RequestMapping注解中取path信息。

    private String buildApiSuperPath(@NonNull final Class<?> method) {
// 先从类上的注解ShenyuSpringMvcClient取path属性
ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {
return shenyuSpringMvcClient.path();
}
// 从当前类的RequestMapping注解中取path信息
RequestMapping requestMapping = AnnotationUtils.findAnnotation(method, RequestMapping.class);
if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {
return requestMapping.path()[0];
}
return "";
}
  • buildApiPath()

    构建path:先读取方法上的注解ShenyuSpringMvcClient,如果存在就构建;否则从方法的其他注解上获取path信息;完整的path = contextPath(上下文信息)+superPath(类信息)+methodPath(方法信息)

    private String buildApiPath(@NonNull final Method method, @NonNull final String superPath) {
// 1. 读取方法上的注解ShenyuSpringMvcClient
ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
// 1.1如果存在path,就构建
if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {
//1.2完整 path = contextPath+superPath+methodPath
return pathJoin(contextPath, superPath, shenyuSpringMvcClient.path());
}
// 2.从方法的其他注解上获取path信息
final String path = getPathByMethod(method);
if (StringUtils.isNotBlank(path)) {
// 2.1 完整的path = contextPath+superPath+methodPath
return pathJoin(contextPath, superPath, path);
}
return pathJoin(contextPath, superPath);
}
  • getPathByMethod()

    从方法的其他注解上获取path信息,其他注解包括:

    • ShenyuSpringMvcClient
    • PostMapping
    • GetMapping
    • DeleteMapping
    • PutMapping
    • RequestMapping

private String getPathByMethod(@NonNull final Method method) {
// 遍历接口注解获取path信息
for (Class<? extends Annotation> mapping : mappingAnnotation) {
final String pathByAnnotation = getPathByAnnotation(AnnotationUtils.findAnnotation(method, mapping), pathAttributeNames);
if (StringUtils.isNotBlank(pathByAnnotation)) {
return pathByAnnotation;
}
}
return null;
}

扫描注解完成后,构建元数据对象,然后将该对象发送到shenyu-admin,即可完成注册。

  • 元数据对象

    包括当前注册方法的规则信息:contextPath,appName,注册路径,描述信息,注册类型,是否启用,规则名称和是否注册元数据。

 private MetaDataRegisterDTO buildMetaDataDTO(@NonNull final ShenyuSpringMvcClient shenyuSpringMvcClient, final String path) {
return MetaDataRegisterDTO.builder()
.contextPath(contextPath) // contextPath
.appName(appName) // appName
.path(path) // 注册路径,在网关规则匹配时使用
.pathDesc(shenyuSpringMvcClient.desc()) // 描述信息
.rpcType(RpcTypeEnum.HTTP.getName()) // divide插件,默认时http类型
.enabled(shenyuSpringMvcClient.enabled()) // 是否启用规则
.ruleName(StringUtils.defaultIfBlank(shenyuSpringMvcClient.ruleName(), path))//规则名称
.registerMetaData(shenyuSpringMvcClient.registerMetaData()) //是否注册元数据信息
.build();
}

具体的注册逻辑由注册中心实现,在之前的文章中已经分析过了,这里就不再深入分析。

1.3 注册URI信息

ContextRegisterListener负责将客户端的URI信息注册到shenyu-admin,它实现了ApplicationListener接口,发生上下文刷新事件ContextRefreshedEvent时,执行onApplicationEvent()方法,实现注册逻辑。


public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent>, BeanFactoryAware {
//......

/**
* 构造器实例化
*/
public ContextRegisterListener(final PropertiesConfig clientConfig) {
// 读取属性配置
final Properties props = clientConfig.getProps();
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
if (Boolean.TRUE.equals(isFull)) {
if (StringUtils.isBlank(contextPath)) {
final String errorMsg = "http register param must config the contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
}
this.port = Integer.parseInt(Optional.ofNullable(props.getProperty(ShenyuClientConstants.PORT)).orElseGet(() -> "-1"));
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);
this.host = props.getProperty(ShenyuClientConstants.HOST);
}

@Override
public void setBeanFactory(final BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

// 执行应用事件
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {
// 保证该方法执行一次
if (!registered.compareAndSet(false, true)) {
return;
}
// 1. 如果是注册整个服务
if (Boolean.TRUE.equals(isFull)) {
// 构建元数据,并注册
publisher.publishEvent(buildMetaDataDTO());
}
try {
// 获取端口信息
final int mergedPort = port <= 0 ? PortUtils.findPort(beanFactory) : port;
// 2. 构建URI数据,并注册
publisher.publishEvent(buildURIRegisterDTO(mergedPort));
} catch (ShenyuException e) {
throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");
}
}

// 构建URI数据
private URIRegisterDTO buildURIRegisterDTO(final int port) {
return URIRegisterDTO.builder()
.contextPath(this.contextPath) // contextPath
.appName(appName) // appName
.protocol(protocol) // 服务使用的协议
.host(IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host)) //主机
.port(port) // 端口
.rpcType(RpcTypeEnum.HTTP.getName()) // divide插件,默认注册http类型
.build();
}

// 构建元数据
private MetaDataRegisterDTO buildMetaDataDTO() {
return MetaDataRegisterDTO.builder()
.contextPath(contextPath)
.appName(appName)
.path(contextPath)
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(contextPath)
.build();
}
}

1.4 处理注册信息

客户端通过注册中心注册的元数据和URI数据,在shenyu-admin进行处理,负责存储到数据库和同步给shenyu网关。Divide插件的客户端注册处理逻辑在ShenyuClientRegisterDivideServiceImpl中。继承关系如下:

  • ShenyuClientRegisterService:客户端注册服务,顶层接口;
  • FallbackShenyuClientRegisterService:注册失败,提供重试操作;
  • AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
  • AbstractContextPathRegisterService:抽象类,负责注册ContextPath
  • ShenyuClientRegisterDivideServiceImpl:实现Divide插件的注册;
1.4.1 注册服务
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    客户端通过注册中心注册的元数据MetaDataRegisterDTO对象在shenyu-adminregister()方法被接送到。

   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
//2. 注册规则
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
//3. 注册元数据
registerMetadata(dto);
//4. 注册ContextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
1.4.1.1 注册选择器
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

构建contextPath,查找选择器信息是否存在,如果存在就返回id;不存在就创建默认的选择器信息。

    @Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
// 构建contextPath
String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
// 通过名称查找选择器信息是否存在
SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
if (Objects.isNull(selectorDO)) {
// 不存在就创建默认的选择器信息
return registerSelector(contextPath, pluginName, selectorHandler);
}
return selectorDO.getId();
}
  • 默认选择器信息

    在这里构建默认选择器信息及其条件属性。

   //注册选择器
private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {
//构建选择器
SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
selectorDTO.setHandle(selectorHandler);
//注册默认选择器
return registerDefault(selectorDTO);
}
//构建选择器
private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {
//构建默认选择器
SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);
selectorDTO.setPluginId(pluginId);
//构建默认选择器的条件属性
selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));
return selectorDTO;
}
  • 构建默认选择器
private SelectorDTO buildDefaultSelectorDTO(final String name) {
return SelectorDTO.builder()
.name(name) // 名称
.type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义
.matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and
.enabled(Boolean.TRUE) //默认启开启
.loged(Boolean.TRUE) //默认记录日志
.continued(Boolean.TRUE) //默认继续后续选择器
.sort(1) //默认顺序1
.build();
}


  • 构建默认选择器条件属性
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {
SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();
selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI
selectorConditionDTO.setParamName("/");
selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match
selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**
return Collections.singletonList(selectorConditionDTO);
}
  • 注册默认选择器
@Override
public String registerDefault(final SelectorDTO selectorDTO) {
//选择器信息
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
//选择器条件属性
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 向数据库插入选择器信息
selectorMapper.insertSelective(selectorDO);
// 向数据库插入选择器条件属性
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
}
// 发布同步事件,向网关同步选择信息及其条件属性
publishEvent(selectorDO, selectorConditionDTOs);
return selectorDO.getId();
}
1.4.1.2 注册规则

在注册服务的第二步中,开始构建默认规则,然后注册规则。

@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......

//2. 注册规则
// 默认规则处理属性
String ruleHandler = ruleHandler();
// 构建默认规则信息
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
// 注册规则
ruleService.registerDefault(ruleDTO);

//3. 注册元数据
//......

//4. 注册ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • 默认规则处理属性
    @Override
protected String ruleHandler() {
// 默认规则处理属性
return new DivideRuleHandle().toJson();
}

Divide插件默认规则处理属性


public class DivideRuleHandle implements RuleHandle {

/**
* 负载均衡:默认随机
*/
private String loadBalance = LoadBalanceEnum.RANDOM.getName();

/**
* 重试策略:默认重试当前服务
*/
private String retryStrategy = RetryEnum.CURRENT.getName();

/**
* 重试次数:默认3次
*/
private int retry = 3;

/**
* 调用超时:默认 3000
*/
private long timeout = Constants.TIME_OUT;

/**
* header最大值:10240 byte
*/
private long headerMaxSize = Constants.HEADER_MAX_SIZE;

/**
* request最大值:102400 byte
*/
private long requestMaxSize = Constants.REQUEST_MAX_SIZE;
}
  • 构建默认规则信息
  // 构建默认规则信息
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}
// 构建默认规则信息
private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
RuleDTO ruleDTO = RuleDTO.builder()
.selectorId(selectorId) //关联的选择器id
.name(ruleName) //规则名称
.matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and
.enabled(Boolean.TRUE) // 默认开启
.loged(Boolean.TRUE) //默认记录日志
.sort(1) //默认顺序 1
.handle(ruleHandler)
.build();
RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
.paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI
.paramName("/")
.paramValue(path) //参数值path
.build();
if (path.indexOf("*") > 1) {
ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match
} else {
ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 =
}
ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
return ruleDTO;
}
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。


@Override
public String registerDefault(final RuleDTO ruleDTO) {
RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());
if (Objects.nonNull(exist)) {
return "";
}

RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
if (StringUtils.isEmpty(ruleDTO.getId())) {
// 向数据库插入规则信息
ruleMapper.insertSelective(ruleDO);
//向数据库插入规则体条件属性
ruleConditions.forEach(ruleConditionDTO -> {
ruleConditionDTO.setRuleId(ruleDO.getId());
ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));
});
}
// 向网关发布事件,进行数据同步
publishEvent(ruleDO, ruleConditions);
return ruleDO.getId();
}

1.4.1.3 注册元数据
   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......

//2. 注册规则
//......

//3. 注册元数据
registerMetadata(dto);

//4. 注册ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDivideServiceImpl#registerMetadata()

    插入或更新元数据,然后发布同步事件到网关。


@Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {
if (dto.isRegisterMetaData()) { // 如果注册元数据
// 获取metaDataService
MetaDataService metaDataService = getMetaDataService();
// 元数据是否存在
MetaDataDO exist = metaDataService.findByPath(dto.getPath());
// 插入或更新元数据
metaDataService.saveOrUpdateMetaData(exist, dto);
}
}

@Override
public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {
DataEventTypeEnum eventType;
// 数据类型转换 DTO->DO
MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);
// 插入数据
if (Objects.isNull(exist)) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());
metaDataDO.setDateCreated(currentTime);
metaDataDO.setDateUpdated(currentTime);
metaDataMapper.insert(metaDataDO);
eventType = DataEventTypeEnum.CREATE;
} else {
// 更新数据
metaDataDO.setId(exist.getId());
metaDataMapper.update(metaDataDO);
eventType = DataEventTypeEnum.UPDATE;
}
// 发布同步事件到网关
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,
Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
}
1.4.1.4 注册ContextPath
   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......

//2. 注册规则
//......

//3. 注册元数据
//......

//4. 注册ContextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
  • org.apache.shenyu.admin.service.register.AbstractContextPathRegisterService#registerContextPath()
    @Override
public void registerContextPath(final MetaDataRegisterDTO dto) {
// 设置选择器的contextPath
String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");
ContextMappingRuleHandle handle = new ContextMappingRuleHandle();
handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));
// 设置规则的contextPath
getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));
}
1.4.2 注册URI
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

服务端收到客户端注册的URI信息后,进行处理。

    @Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
String result;
String key = key(selectorName);
try {
this.removeFallBack(key);
// 注册URI
result = this.doRegisterURI(selectorName, uriList);
logger.info("Register success: {},{}", selectorName, uriList);
} catch (Exception ex) {
logger.warn("Register exception: cause:{}", ex.getMessage());
result = "";
// 注册失败后,进行重试
this.addFallback(key, new FallbackHolder(selectorName, uriList));
}
return result;
}
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

从客户端注册的URI中获取有效的URI,更新对应的选择器handle属性,向网关发送选择器更新事件。

@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
//参数检查
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
//获取选择器信息
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
throw new ShenyuException("doRegister Failed to execute,wait to retry.");
}
// 获取有效的URI
List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
// 构建选择器的handle属性
String handler = buildHandle(validUriList, selectorDO);
if (handler != null) {
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// 向数据库更新选择器的handle属性
selectorService.updateSelective(selectorDO);
// 向网关发送选择器更新事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
}
return ShenyuResultMessage.SUCCESS;
}

关于服务注册的源码分析就以及完成了,分析流程图如下:

接下来就分析divide插件是如何根据这些信息向http服务发起调用。

2. 服务调用

divide插件是网关用于处理 http协议请求的核心处理插件。

以官网提供的案例 Http快速开始 为例,一个直连请求如下:

GET http://localhost:8189/order/findById?id=100
Accept: application/json

通过ShenYu网关代理后,请求如下:

GET http://localhost:9195/http/order/findById?id=100
Accept: application/json

通过ShenYu网关代理后的服务仍然能够请求到之前的服务,在这里起作用的就是divide插件。类继承关系如下:

  • ShenyuPlugin:顶层接口,定义接口方法;
  • AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
  • DividePlugin:Divide插件。

2.1 接收请求

通过ShenYu网关代理后,请求入口是ShenyuWebHandler,它实现了org.springframework.web.server.WebHandler接口。

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {
//......

/**
* 处理web请求
*/
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
// 执行默认插件链
Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);
if (scheduled) {
return execute.subscribeOn(scheduler);
}
return execute;
}

private static class DefaultShenyuPluginChain implements ShenyuPluginChain {

private int index;

private final List<ShenyuPlugin> plugins;

/**
* 实例化默认插件链
*/
DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {
this.plugins = plugins;
}

/**
* 执行每个插件.
*/
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
// 获取当前执行插件
ShenyuPlugin plugin = plugins.get(this.index++);
// 是否跳过当前插件
boolean skip = plugin.skip(exchange);
if (skip) {
// 如果跳过就执行下一个
return this.execute(exchange);
}
// 执行当前插件
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
}

2.2 匹配规则

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

execute()方法中执行选择器和规则的匹配逻辑。

  • 匹配选择器;
  • 匹配规则;
  • 执行插件。
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// 插件名称
String pluginName = named();
// 插件信息
PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
// 选择器信息
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
// 匹配选择器
SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
// 规则信息
List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
// 匹配规则
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
// 执行插件
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}

2.3 执行divide插件

  • org.apache.shenyu.plugin.divide.DividePlugin#doExecute()

doExecute()方法中执行divide插件的具体逻辑:

  • 校验header大小;
  • 校验request大小;
  • 获取服务列表;
  • 实现负载均衡;
  • 设置请求url,超时时间,重试策略。
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
// 获取上下文信息
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// 获取规则的handle属性
DivideRuleHandle ruleHandle = DividePluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));
long headerSize = 0;
// 校验header大小
for (List<String> multiHeader : exchange.getRequest().getHeaders().values()) {
for (String value : multiHeader) {
headerSize += value.getBytes(StandardCharsets.UTF_8).length;
}
}
if (headerSize > ruleHandle.getHeaderMaxSize()) {
LOG.error("request header is too large");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE, null);
return WebFluxResultUtils.result(exchange, error);
}

// 校验request大小
if (exchange.getRequest().getHeaders().getContentLength() > ruleHandle.getRequestMaxSize()) {
LOG.error("request entity is too large");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE, null);
return WebFluxResultUtils.result(exchange, error);
}
// 获取服务列表upstreamList
List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
if (CollectionUtils.isEmpty(upstreamList)) {
LOG.error("divide upstream configuration error: {}", rule);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// 请求ip
String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// 实现负载均衡
Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(upstream)) {
LOG.error("divide has no upstream");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// 设置url
String domain = upstream.buildDomain();
exchange.getAttributes().put(Constants.HTTP_DOMAIN, domain);
// 设置超时时间
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
// 设置重试策略
exchange.getAttributes().put(Constants.RETRY_STRATEGY, ruleHandle.getRetryStrategy());
exchange.getAttributes().put(Constants.LOAD_BALANCE, ruleHandle.getLoadBalance());
exchange.getAttributes().put(Constants.DIVIDE_SELECTOR_ID, selector.getId());
return chain.execute(exchange);
}

2.4 发起请求

默认由WebClientPluginhttp服务发起调用请求,类继承关系如下:

  • ShenyuPlugin:顶层插件,定义插件方法;
  • AbstractHttpClientPlugin:抽象类,实现请求调用的公共逻辑;
  • WebClientPlugin:通过WebClient发起请求;
  • NettyHttpClientPlugin:通过Netty发起请求。

发起请求调用:

  • org.apache.shenyu.plugin.httpclient.AbstractHttpClientPlugin#execute()

execute()方法中发起请求调用:

  • 获取指定的超时时间,重试次数
  • 发起请求
  • 根据指定的重试策略进行失败后重试操作

public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin {

protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class);

@Override
public final Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// 获取上下文信息
final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// 获取uri
final URI uri = exchange.getAttribute(Constants.HTTP_URI);
if (Objects.isNull(uri)) {
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// 获取指定的超时时间
final long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
final Duration duration = Duration.ofMillis(timeout);
// 获取指定重试次数
final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
// 获取指定的重试策略
final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName);
LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy is {}", uri.toASCIIString(), retryTimes, retryStrategy);
// 构建header
final HttpHeaders httpHeaders = buildHttpHeaders(exchange);
// 发起请求
final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethodValue(), uri, httpHeaders, exchange.getRequest().getBody())
.timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
.doOnError(e -> LOG.error(e.getMessage(), e));

// 重试策略CURRENT,对当前服务进行重试
if (RetryEnum.CURRENT.getName().equals(retryStrategy)) {
//old version of DividePlugin and SpringCloudPlugin will run on this
return response.retryWhen(Retry.anyOf(TimeoutException.class, ConnectTimeoutException.class, ReadTimeoutException.class, IllegalStateException.class)
.retryMax(retryTimes)
.backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
.onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
.flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));
}

// 对其他服务进行重试
// 排除已经调用过的服务
final Set<URI> exclude = Sets.newHashSet(uri);
// 请求重试
return resend(response, exchange, duration, httpHeaders, exclude, retryTimes)
.onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
.flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));
}

private Mono<R> resend(final Mono<R> clientResponse,
final ServerWebExchange exchange,
final Duration duration,
final HttpHeaders httpHeaders,
final Set<URI> exclude,
final int retryTimes) {
Mono<R> result = clientResponse;
// 根据指定的重试次数进行重试
for (int i = 0; i < retryTimes; i++) {
result = resend(result, exchange, duration, httpHeaders, exclude);
}
return result;
}

private Mono<R> resend(final Mono<R> response,
final ServerWebExchange exchange,
final Duration duration,
final HttpHeaders httpHeaders,
final Set<URI> exclude) {
return response.onErrorResume(th -> {
final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID);
final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE);
//查询可用服务
final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId)
.stream().filter(data -> {
final String trimUri = data.getUrl().trim();
for (URI needToExclude : exclude) {
// exclude already called
if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) {
return false;
}
}
return true;
}).collect(Collectors.toList());
if (CollectionUtils.isEmpty(upstreamList)) {
// no need to retry anymore
return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
}
// 请求ip
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// 实现负载均衡
final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip);
if (Objects.isNull(upstream)) {
// no need to retry anymore
return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
}
final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain());
// 排除已经调用的uri
exclude.add(newUri);
// 进行再次调用
return doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, httpHeaders, exchange.getRequest().getBody())
.timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
.doOnError(e -> LOG.error(e.getMessage(), e));
});
}

//......
}

  • org.apache.shenyu.plugin.httpclient.WebClientPlugin#doRequest()

doRequest()方法中通过webClient发起真正的请求调用。


@Override
protected Mono<ClientResponse> doRequest(final ServerWebExchange exchange, final String httpMethod, final URI uri,
final HttpHeaders httpHeaders, final Flux<DataBuffer> body) {
return webClient.method(HttpMethod.valueOf(httpMethod)).uri(uri) //请求uri
.headers(headers -> headers.addAll(httpHeaders)) // 请求header
.body(BodyInserters.fromDataBuffers(body))
.exchange() // 发起请求
.doOnSuccess(res -> {
if (res.statusCode().is2xxSuccessful()) { // 成功
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
} else { // 失败
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
}
exchange.getResponse().setStatusCode(res.statusCode());
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
});
}

2.5 处理响应结果

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

响应结果由ResponsePlugin插件处理。

    @Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// 根据rpc类型处理结果
return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
}

处理类型由MessageWriter决定,类继承关系如下:

  • MessageWriter:接口,定义消息处理方法;
  • NettyClientMessageWriter:处理Netty调用结果;
  • RPCMessageWriter:处理RPC调用结果;
  • WebClientMessageWriter:处理WebClient调用结果;

默认是通过WebCient发起http请求。

  • org.apache.shenyu.plugin.response.strategy.WebClientMessageWriter#writeWith()

writeWith()方法中处理响应结果。


@Override
public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
// 获取响应
ServerHttpResponse response = exchange.getResponse();
ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
if (Objects.isNull(clientResponse)) {
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
//获取cookies和headers
response.getCookies().putAll(clientResponse.cookies());
response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
// image, pdf or stream does not do format processing.
// 处理特殊响应类型
if (clientResponse.headers().contentType().isPresent()) {
final String media = clientResponse.headers().contentType().get().toString().toLowerCase();
if (media.matches(COMMON_BIN_MEDIA_TYPE_REGEX)) {
return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()))
.doOnCancel(() -> clean(exchange));
}
}
// 处理一般响应类型
clientResponse = ResponseUtils.buildClientResponse(response, clientResponse.body(BodyExtractors.toDataBuffers()));
return clientResponse.bodyToMono(byte[].class)
.flatMap(originData -> WebFluxResultUtils.result(exchange, originData))
.doOnCancel(() -> clean(exchange));
}));
}

分析至此,关于Divide插件的源码分析就完成了,分析流程图如下:

3. 小结

本文源码分析从http服务注册开始,到divide插件的服务调用。divide插件主要用来处理http请求。有些源码没有进入深入分析,比如负载均衡的实现,服务探活,将在后续继续分析。

· One min read

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

Apache ShenYu 网关使用 dubbo 插件完成对 dubbo服务的调用。你可以查看官方文档 Dubbo快速开始 了解如何使用该插件。

本文基于shenyu-2.4.3版本进行源码分析,官网的介绍请参考 Dubbo服务接入

1. 服务注册

以官网提供的例子为例 shenyu-examples-dubbo 。 假如你的dubbo服务定义如下(spring-dubbo.xml):

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
https://code.alibabatech.com/schema/dubbo/dubbo.xsd">

<dubbo:application name="test-dubbo-service"/>
<dubbo:registry address="${dubbo.registry.address}"/>
<dubbo:protocol name="dubbo" port="20888"/>

<dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>

</beans>

声明应用服务名称,注册中心地址,使用dubbo协议,声明服务接口,对应接口实现类:

/**
* DubboTestServiceImpl.
*/
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {

@Override
@ShenyuDubboClient(path = "/findById", desc = "Query by Id")
public DubboTest findById(final String id) {
return new DubboTest(id, "hello world shenyu Apache, findById");
}

//......
}

在接口实现类中,使用注解@ShenyuDubboClientshenyu-admin注册服务。该注解的作用及原理,稍后再进行分析。

在配置文件application.yml中的配置信息:

server:
port: 8011
address: 0.0.0.0
servlet:
context-path: /
spring:
main:
allow-bean-definition-overriding: true
dubbo:
registry:
address: zookeeper://localhost:2181 # dubbo使用的注册中心

shenyu:
register:
registerType: http #注册方式
serverLists: http://localhost:9095 #注册地址
props:
username: admin
password: 123456
client:
dubbo:
props:
contextPath: /dubbo
appName: dubbo

在配置文件中,声明dubbo使用的注册中心地址,dubbo服务向shenyu-admin注册,使用的方式是http,注册地址是http://localhost:9095

关于注册方式的使用,请参考 应用客户端接入

1.1 声明注册接口

使用注解@ShenyuDubboClient将服务注册到网关。简单demo如下:

// dubbo服务
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {

@Override
@ShenyuDubboClient(path = "/findById", desc = "Query by Id") // 需要注册的方法
public DubboTest findById(final String id) {
return new DubboTest(id, "hello world shenyu Apache, findById");
}

//......
}

注解定义:

/**
* 作用于类和方法上
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Inherited
public @interface ShenyuDubboClient {

//注册路径
String path();

//规则名称
String ruleName() default "";

//描述信息
String desc() default "";

//是否启用
boolean enabled() default true;
}

1.2 扫描注解信息

注解扫描通过ApacheDubboServiceBeanListener完成,它实现了ApplicationListener<ContextRefreshedEvent>接口,在Spring容器启动过程中,发生上下文刷新事件时,开始执行事件处理方法onApplicationEvent()

在构造器实例化的过程中:

  • 读取属性配置
  • 开启线程池
  • 启动注册中心,用于向shenyu-admin注册
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {

// ......

//构造器
public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
//1.读取属性配置
Properties props = clientConfig.getProps();
String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
String appName = props.getProperty(ShenyuClientConstants.APP_NAME);
if (StringUtils.isBlank(contextPath)) {
throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName");
}
this.contextPath = contextPath;
this.appName = appName;
this.host = props.getProperty(ShenyuClientConstants.HOST);
this.port = props.getProperty(ShenyuClientConstants.PORT);
//2.开启线程池
executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build());
//3.启动注册中心
publisher.start(shenyuClientRegisterRepository);
}

/**
* 上下文刷新事件,执行方法逻辑
*/
@Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
//......
}
  • ApacheDubboServiceBeanListener#onApplicationEvent()

重写的方法逻辑:读取Dubbo服务ServiceBean,构建元数据对象和URI对象,并向shenyu-admin注册。

    @Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
//读取ServiceBean
Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class);
if (serviceBean.isEmpty()) {
return;
}
//保证该方法只执行一次
if (!registered.compareAndSet(false, true)) {
return;
}
//处理元数据对象
for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) {
handler(entry.getValue());
}
//处理URI对象
serviceBean.values().stream().findFirst().ifPresent(bean -> {
publisher.publishEvent(buildURIRegisterDTO(bean));
});
}
  • handler()

    handler()方法中,从serviceBean中读取所有方法,判断方法上是否有ShenyuDubboClient注解,如果存在就构建元数据对象,并通过注册中心,向shenyu-admin注册该方法。

    private void handler(final ServiceBean<?> serviceBean) {
//获取代理对象
Object refProxy = serviceBean.getRef();
//获取class信息
Class<?> clazz = refProxy.getClass();
if (AopUtils.isAopProxy(refProxy)) {
clazz = AopUtils.getTargetClass(refProxy);
}
//获取所有方法
Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
for (Method method : methods) {
//读取ShenyuDubboClient注解信息
ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class);
if (Objects.nonNull(shenyuDubboClient)) {
//构建元数据对象,并注册
publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method));
}
}
}
  • buildMetaDataDTO()

    构建元数据对象,在这里构建方法注册的必要信息,后续用于选择器或规则匹配。

    private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) {
//应用名称
String appName = buildAppName(serviceBean);
//方法路径
String path = contextPath + shenyuDubboClient.path();
//描述信息
String desc = shenyuDubboClient.desc();
//服务名称
String serviceName = serviceBean.getInterface();
//规则名称
String configRuleName = shenyuDubboClient.ruleName();
String ruleName = ("".equals(configRuleName)) ? path : configRuleName;
//方法名称
String methodName = method.getName();
//参数类型
Class<?>[] parameterTypesClazz = method.getParameterTypes();
String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(","));
return MetaDataRegisterDTO.builder()
.appName(appName)
.serviceName(serviceName)
.methodName(methodName)
.contextPath(contextPath)
.host(buildHost())
.port(buildPort(serviceBean))
.path(path)
.ruleName(ruleName)
.pathDesc(desc)
.parameterTypes(parameterTypes)
.rpcExt(buildRpcExt(serviceBean)) //dubbo服务的扩展信息
.rpcType(RpcTypeEnum.DUBBO.getName())
.enabled(shenyuDubboClient.enabled())
.build();
}
  • buildRpcExt()

    dubbo服务的扩展信息

   private String buildRpcExt(final ServiceBean serviceBean) {
DubboRpcExt build = DubboRpcExt.builder()
.group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//分组
.version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//版本
.loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//负载均衡策略,默认随机
.retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//重试次数,默认2
.timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//超时,默认3000
.sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent,默认false
.cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//集群策略,默认failover
.url("")
.build();
return GsonUtils.getInstance().toJson(build);
}
  • buildURIRegisterDTO()

    构建URI对象,注册服务本身的信息,后续可用于服务探活。

private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) {
return URIRegisterDTO.builder()
.contextPath(this.contextPath) //上下文路径
.appName(buildAppName(serviceBean))//应用名称
.rpcType(RpcTypeEnum.DUBBO.getName())//rpc类型:dubbo
.host(buildHost()) //host
.port(buildPort(serviceBean))//port
.build();
}

具体的注册逻辑由注册中心实现,请参考 客户端接入原理

//向注册中心,发布注册事件   
publisher.publishEvent();

1.3 处理注册信息

客户端通过注册中心注册的元数据和URI数据,在shenyu-admin端进行处理,负责存储到数据库和同步给shenyu网关。Dubbo插件的客户端注册处理逻辑在ShenyuClientRegisterDubboServiceImpl中。继承关系如下:

  • ShenyuClientRegisterService:客户端注册服务,顶层接口;
  • FallbackShenyuClientRegisterService:注册失败,提供重试操作;
  • AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
  • ShenyuClientRegisterDubboServiceImpl:实现Dubbo插件的注册;
1.3.1 注册服务
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    客户端通过注册中心注册的元数据MetaDataRegisterDTO对象在shenyu-adminregister()方法被接送到。

   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
//2. 注册规则
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
//3. 注册元数据
registerMetadata(dto);
//4. 注册ContextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
1.3.1.1 注册选择器
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

构建contextPath,查找选择器信息是否存在,如果存在就返回id;不存在就创建默认的选择器信息。

    @Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
// 构建contextPath
String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
// 通过名称查找选择器信息是否存在
SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
if (Objects.isNull(selectorDO)) {
// 不存在就创建默认的选择器信息
return registerSelector(contextPath, pluginName, selectorHandler);
}
return selectorDO.getId();
}
  • 默认选择器信息

    在这里构建默认选择器信息及其条件属性。

   //注册选择器
private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {
//构建选择器
SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
selectorDTO.setHandle(selectorHandler);
//注册默认选择器
return registerDefault(selectorDTO);
}
//构建选择器
private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {
//构建默认选择器
SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);
selectorDTO.setPluginId(pluginId);
//构建默认选择器的条件属性
selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));
return selectorDTO;
}
  • 构建默认选择器
private SelectorDTO buildDefaultSelectorDTO(final String name) {
return SelectorDTO.builder()
.name(name) // 名称
.type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义
.matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and
.enabled(Boolean.TRUE) //默认启开启
.loged(Boolean.TRUE) //默认记录日志
.continued(Boolean.TRUE) //默认继续后续选择器
.sort(1) //默认顺序1
.build();
}
  • 构建默认选择器条件属性
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {
SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();
selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI
selectorConditionDTO.setParamName("/");
selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match
selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**
return Collections.singletonList(selectorConditionDTO);
}
  • 注册默认选择器
@Override
public String registerDefault(final SelectorDTO selectorDTO) {
//选择器信息
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
//选择器条件属性
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 向数据库插入选择器信息
selectorMapper.insertSelective(selectorDO);
// 向数据库插入选择器条件属性
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
}
// 发布同步事件,向网关同步选择信息及其条件属性
publishEvent(selectorDO, selectorConditionDTOs);
return selectorDO.getId();
}
1.3.1.2 注册规则

在注册服务的第二步中,开始构建默认规则,然后注册规则。

@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......

//2. 注册规则
// 默认规则处理属性
String ruleHandler = ruleHandler();
// 构建默认规则信息
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
// 注册规则
ruleService.registerDefault(ruleDTO);

//3. 注册元数据
//......

//4. 注册ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • 默认规则处理属性
    @Override
protected String ruleHandler() {
// 默认规则处理属性
return new DubboRuleHandle().toJson();
}

Dubbo插件默认规则处理属性

public class DubboRuleHandle implements RuleHandle {

/**
* dubbo服务版本信息.
*/
private String version;

/**
* 分组.
*/
private String group;

/**
* 重试次数.
*/
private Integer retries = 0;

/**
* 负载均衡策略:默认随机
*/
private String loadbalance = LoadBalanceEnum.RANDOM.getName();

/**
* 超时,默认3000
*/
private long timeout = Constants.TIME_OUT;
}
  • 构建默认规则信息
  // 构建默认规则信息
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}
// 构建默认规则信息
private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
RuleDTO ruleDTO = RuleDTO.builder()
.selectorId(selectorId) //关联的选择器id
.name(ruleName) //规则名称
.matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and
.enabled(Boolean.TRUE) // 默认开启
.loged(Boolean.TRUE) //默认记录日志
.sort(1) //默认顺序 1
.handle(ruleHandler)
.build();
RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
.paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI
.paramName("/")
.paramValue(path) //参数值path
.build();
if (path.indexOf("*") > 1) {
ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match
} else {
ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 =
}
ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
return ruleDTO;
}
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。


@Override
public String registerDefault(final RuleDTO ruleDTO) {
RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());
if (Objects.nonNull(exist)) {
return "";
}

RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
if (StringUtils.isEmpty(ruleDTO.getId())) {
// 向数据库插入规则信息
ruleMapper.insertSelective(ruleDO);
//向数据库插入规则体条件属性
ruleConditions.forEach(ruleConditionDTO -> {
ruleConditionDTO.setRuleId(ruleDO.getId()); ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));
});
}
// 向网关发布事件,进行数据同步
publishEvent(ruleDO, ruleConditions);
return ruleDO.getId();
}

1.3.1.3 注册元数据

元数据主要用于RPC服务的调用。

   @Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......

//2. 注册规则
//......

//3. 注册元数据
registerMetadata(dto);

//4. 注册ContextPath
//......

return ShenyuResultMessage.SUCCESS;
}
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDubboServiceImpl#registerMetadata()

    插入或更新元数据,然后发布同步事件到网关。

    @Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {
// 获取metaDataService
MetaDataService metaDataService = getMetaDataService();
// 元数据是否存在
MetaDataDO exist = metaDataService.findByPath(dto.getPath());
// 插入或更新元数据
metaDataService.saveOrUpdateMetaData(exist, dto);
}

@Override
public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {
DataEventTypeEnum eventType;
// 数据类型转换 DTO->DO
MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);
// 插入数据
if (Objects.isNull(exist)) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());
metaDataDO.setDateCreated(currentTime);
metaDataDO.setDateUpdated(currentTime);
metaDataMapper.insert(metaDataDO);
eventType = DataEventTypeEnum.CREATE;
} else {
// 更新数据
metaDataDO.setId(exist.getId());
metaDataMapper.update(metaDataDO);
eventType = DataEventTypeEnum.UPDATE;
}
// 发布同步事件到网关
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,
Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
}
1.3.2 注册URI
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

服务端收到客户端注册的URI信息后,进行处理。

    @Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
String result;
String key = key(selectorName);
try {
this.removeFallBack(key);
// 注册URI
result = this.doRegisterURI(selectorName, uriList);
logger.info("Register success: {},{}", selectorName, uriList);
} catch (Exception ex) {
logger.warn("Register exception: cause:{}", ex.getMessage());
result = "";
// 注册失败后,进行重试
this.addFallback(key, new FallbackHolder(selectorName, uriList));
}
return result;
}
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

从客户端注册的URI中获取有效的URI,更新对应的选择器handle属性,向网关发送选择器更新事件。

@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
//参数检查
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
//获取选择器信息
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
throw new ShenyuException("doRegister Failed to execute,wait to retry.");
}
// 获取有效的URI
List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
// 构建选择器的handle属性
String handler = buildHandle(validUriList, selectorDO);
if (handler != null) {
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// 向数据库更新选择器的handle属性
selectorService.updateSelective(selectorDO);
// 向网关发送选择器更新事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
}
return ShenyuResultMessage.SUCCESS;
}

关于服务注册的源码分析就以及完成了,分析流程图如下:

接下来就分析dubbo插件是如何根据这些信息向http服务发起调用。

2. 服务调用

dubbo插件是ShenYu网关用于将http请求转成 dubbo协议,调用dubbo服务的核心处理插件。

以官网提供的案例 Dubbo快速开始 为例,一个dubbo服务通过注册中心向shenyu-admin注册后,通过ShenYu网关代理,请求如下:

GET http://localhost:9195/dubbo/findById?id=100
Accept: application/json

Dubbo插件中,类继承关系如下:

  • ShenyuPlugin:顶层接口,定义接口方法;
  • AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
  • AbstractDubboPlugin:dubbo插件抽象类,实现dubbo共有逻辑;
  • ApacheDubboPlugin:ApacheDubbo插件。

ShenYu网关支持ApacheDubbo和AlibabaDubbo

2.1 接收请求

通过ShenYu网关代理后,请求入口是ShenyuWebHandler,它实现了org.springframework.web.server.WebHandler接口。

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {
//......

/**
* 处理web请求
*/
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
// 执行默认插件链
Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);
if (scheduled) {
return execute.subscribeOn(scheduler);
}
return execute;
}

private static class DefaultShenyuPluginChain implements ShenyuPluginChain {

private int index;

private final List<ShenyuPlugin> plugins;

/**
* 实例化默认插件链
*/
DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {
this.plugins = plugins;
}

/**
* 执行每个插件.
*/
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
// 获取当前执行插件
ShenyuPlugin plugin = plugins.get(this.index++);
// 是否跳过当前插件
boolean skip = plugin.skip(exchange);
if (skip) {
// 如果跳过就执行下一个
return this.execute(exchange);
}
// 执行当前插件
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
}

2.2 匹配规则

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

execute()方法中执行选择器和规则的匹配逻辑。

  • 匹配选择器;
  • 匹配规则;
  • 执行插件。
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// 插件名称
String pluginName = named();
// 插件信息
PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
// 选择器信息
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
// 匹配选择器
SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
// 规则信息
List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
// 匹配规则
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
// 执行插件
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}

2.3 执行GlobalPlugin

  • org.apache.shenyu.plugin.global.GlobalPlugin#execute()

GlobalPlugin是一个全局插件,在execute()方法中构建上下文信息。

public class GlobalPlugin implements ShenyuPlugin {
// 构建上下文信息
private final ShenyuContextBuilder builder;

//......

@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// 构建上下文信息,传入到 exchange 中
ShenyuContext shenyuContext = builder.build(exchange);
exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
return chain.execute(exchange);
}

//......
}
  • org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()

构建默认的上下文信息。

public class DefaultShenyuContextBuilder implements ShenyuContextBuilder {
//......

@Override
public ShenyuContext build(final ServerWebExchange exchange) {
//构建参数
Pair<String, MetaData> buildData = buildData(exchange);
//包装ShenyuContext
return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());
}

private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {
//......
//根据请求的uri获取元数据
MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());
if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {
exchange.getAttributes().put(Constants.META_DATA, metaData);
return Pair.of(metaData.getRpcType(), metaData);
} else {
return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());
}
}
//设置默认的上下文信息
private ShenyuContext buildDefaultContext(final ServerHttpRequest request) {
String appKey = request.getHeaders().getFirst(Constants.APP_KEY);
String sign = request.getHeaders().getFirst(Constants.SIGN);
String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);
ShenyuContext shenyuContext = new ShenyuContext();
String path = request.getURI().getPath();
shenyuContext.setPath(path); //请求路径
shenyuContext.setAppKey(appKey);
shenyuContext.setSign(sign);
shenyuContext.setTimestamp(timestamp);
shenyuContext.setStartDateTime(LocalDateTime.now());
Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name()));//请求方法
return shenyuContext;
}
}
  • org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()

包装ShenyuContext

public class DubboShenyuContextDecorator implements ShenyuContextDecorator {

@Override
public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) {
shenyuContext.setModule(metaData.getAppName());//获取AppName
shenyuContext.setMethod(metaData.getServiceName()); //获取ServiceName
shenyuContext.setContextPath(metaData.getContextPath()); //获取contextPath
shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName()); // dubbo服务
return shenyuContext;
}

@Override
public String rpcType() {
return RpcTypeEnum.DUBBO.getName();
}
}

2.4 执行RpcParamTransformPlugin

RpcParamTransformPlugin负责从http请求中读取参数,保存到exchange中,传递给rpc服务。

  • org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()

execute()方法中,执行该插件的核心逻辑:从exchange中获取请求信息,根据请求传入的内容形式处理参数。

public class RpcParamTransformPlugin implements ShenyuPlugin {

@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
//从exchange中获取请求信息
ServerHttpRequest request = exchange.getRequest();
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
if (Objects.nonNull(shenyuContext)) {
// 如果请求参数格式是APPLICATION_JSON
MediaType mediaType = request.getHeaders().getContentType();
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
return body(exchange, request, chain);
}
// 如果请求参数格式是APPLICATION_FORM_URLENCODED
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
return formData(exchange, request, chain);
}
//一般查询请求
return query(exchange, request, chain);
}
return chain.execute(exchange);
}

// 如果请求参数格式是APPLICATION_JSON
private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())
.flatMap(body -> {
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中
return chain.execute(exchange);
}));
}
// 如果请求参数格式是APPLICATION_FORM_URLENCODED
private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())
.flatMap(map -> {
String param = resolveBodyFromRequest(map);
LinkedMultiValueMap<String, String> linkedMultiValueMap;
try {
linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据
} catch (UnsupportedEncodingException e) {
return Mono.error(e);
}
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中
return chain.execute(exchange);
}));
}
//一般查询请求
private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中
return chain.execute(exchange);
}
//......
}

2.5 执行DubboPlugin

  • org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()

doExecute()方法中,主要是检查元数据和参数。

public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin {

@Override
public Mono<Void> doExecute(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule) {
//获取参数
String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);
//获取上下文信息
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
//获取元数据
MetaData metaData = exchange.getAttribute(Constants.META_DATA);
//检查元数据
if (!checkMetaData(metaData)) {
LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
//检查元数据和参数
if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null);
return WebFluxResultUtils.result(exchange, error);
}
//设置rpcContext
this.rpcContext(exchange);
//进行dubbo服务调用
return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);
}
}
  • org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()

doDubboInvoker()方法中设置特殊的上下文信息,然后开始dubbo的泛化调用。

public class ApacheDubboPlugin extends AbstractDubboPlugin {

@Override
protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule,
final MetaData metaData,
final String param) {
//设置当前的选择器和规则信息,以及请求地址,用于支持dubbo的灰度
RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());
RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());
RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());
//dubbo的泛化调用
final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);
//执行下一个插件
return result.then(chain.execute(exchange));
}
}
  • org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()

genericInvoker()方法:

  • 获取ReferenceConfig对象;
  • 获取泛化服务GenericService对象;
  • 构造请求参数pair对象;
  • 发起异步的泛化调用。
public class ApacheDubboProxyService {
//......

/**
* Generic invoker object.
*/
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {
//1.获取ReferenceConfig对象
ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath());
//如果没有获取到
if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {
//失效当前缓存的信息
ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());
//使用元数据进行再次初始化
reference = ApacheDubboConfigCache.getInstance().initRef(metaData);
}
//2.获取泛化服务GenericService对象
GenericService genericService = reference.get();
//3.构造请求参数pair对象
Pair<String[], Object[]> pair;
if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) {
pair = new ImmutablePair<>(new String[]{}, new Object[]{});
} else {
pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
}
//4.发起异步的泛化调用
return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {
//处理结果
if (Objects.isNull(ret)) {
ret = Constants.DUBBO_RPC_RESULT_EMPTY;
}
exchange.getAttributes().put(Constants.RPC_RESULT, ret);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
return ret;
})).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常
}

//泛化调用,异步操作
private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {
genericService.$invoke(method, parameterTypes, args);
Object resultFromFuture = RpcContext.getContext().getFuture();
return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);
}
}

通过泛化调用就可以实现在网关调用dubbo服务了。

ReferenceConfig对象是支持泛化调用的关键对象 ,它的初始化操作是在数据同步的时候完成的。这里涉及两部分数据,一是同步的插件handler信息,二是同步的插件元数据信息。

  • org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()

当插件数据更新时,数据同步模块会将数据从shenyu-admin同步到网关。在handlerPlugin()中执行初始化操作。

public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {
//......

//初始化配置缓存
protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);

@Override
public void handlerPlugin(final PluginData pluginData) {
if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {
//数据反序列化
DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);
DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);
if (Objects.isNull(dubboRegisterConfig)) {
return;
}
if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {
// 执行初始化操作
this.initConfigCache(dubboRegisterConfig);
}
Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);
}
}
//......
}
  • org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()

执行初始化操作。

public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {

@Override
protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {
//执行初始化操作
ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);
//失效之前缓存的结果
ApacheDubboConfigCache.getInstance().invalidateAll();
}
}

  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()

在初始化中,设置registryConfigconsumerConfig

public final class ApacheDubboConfigCache extends DubboConfigCache {
//......
/**
* 初始化
*/
public void init(final DubboRegisterConfig dubboRegisterConfig) {
//创建ApplicationConfig
if (Objects.isNull(applicationConfig)) {
applicationConfig = new ApplicationConfig("shenyu_proxy");
}
//协议或者地址发生改变时,需要更新registryConfig
if (needUpdateRegistryConfig(dubboRegisterConfig)) {
RegistryConfig registryConfigTemp = new RegistryConfig();
registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol());
registryConfigTemp.setId("shenyu_proxy");
registryConfigTemp.setRegister(false);
registryConfigTemp.setAddress(dubboRegisterConfig.getRegister()); Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup);
registryConfig = registryConfigTemp;
}
//创建ConsumerConfig
if (Objects.isNull(consumerConfig)) {
consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.refresh();
return consumerConfig;
});

//设置ConsumerConfig
Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool);
Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads);

Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads);

Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues);
}
}

//是否需要更新注册配置
private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) {
if (Objects.isNull(registryConfig)) {
return true;
}
return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol())
|| !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress())
|| !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol());
}

//......
}
  • org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()

当元数据更新时,数据同步模块会将数据从shenyu-admin同步到网关。在onSubscribe()方法中执行元数据更新操作。

public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber {
//本地内存缓存
private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();

//元数据发生更新
public void onSubscribe(final MetaData metaData) {
// dubbo服务的元数据更新
if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
//对应的元数据是否存在
MetaData exist = META_DATA.get(metaData.getPath());
if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) {
// 首次初始化
ApacheDubboConfigCache.getInstance().initRef(metaData);
} else {
// 对应的元数据发生了更新操作
if (!Objects.equals(metaData.getServiceName(), exist.getServiceName())
|| !Objects.equals(metaData.getRpcExt(), exist.getRpcExt())
|| !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes())
|| !Objects.equals(metaData.getMethodName(), exist.getMethodName())) {
//根据最新的元数据再次构建ReferenceConfig
ApacheDubboConfigCache.getInstance().build(metaData);
}
}
//本地内存缓存
META_DATA.put(metaData.getPath(), metaData);
}
}

//删除元数据
public void unSubscribe(final MetaData metaData) {
if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
//使ReferenceConfig失效
ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());
META_DATA.remove(metaData.getPath());
}
}
}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()

通过metaData构建ReferenceConfig对象。

public final class ApacheDubboConfigCache extends DubboConfigCache {
//......

public ReferenceConfig<GenericService> initRef(final MetaData metaData) {
try {
//先尝试从缓存中获取,存在就直接返回
ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());
if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {
return referenceConfig;
}
} catch (ExecutionException e) {
LOG.error("init dubbo ref exception", e);
}
//不存在,就构建
return build(metaData);
}

/**
* Build reference config.
*/
@SuppressWarnings("deprecation")
public ReferenceConfig<GenericService> build(final MetaData metaData) {
if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) {
return new ReferenceConfig<>();
}
ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //新建ReferenceConfig
reference.setGeneric("true"); //泛化调用
reference.setAsync(true);//支持异步

reference.setApplication(applicationConfig);//设置应用配置
reference.setRegistry(registryConfig);//设置注册中心配置
reference.setConsumer(consumerConfig);//设置消费者配置
reference.setInterface(metaData.getServiceName());//设置服务接口
reference.setProtocol("dubbo");//设置dubbo协议
reference.setCheck(false); //不检查 service provider
reference.setLoadbalance("gray");//支持灰度

Map<String, String> parameters = new HashMap<>(2);
parameters.put("dispatcher", "direct");
reference.setParameters(parameters);//自定义参数

String rpcExt = metaData.getRpcExt();//rpc扩展参数
DubboParam dubboParam = parserToDubboParam(rpcExt);//反序列化
if (Objects.nonNull(dubboParam)) {
if (StringUtils.isNoneBlank(dubboParam.getVersion())) {
reference.setVersion(dubboParam.getVersion());//设置版本
}
if (StringUtils.isNoneBlank(dubboParam.getGroup())) {
reference.setGroup(dubboParam.getGroup());//设置分组
}
if (StringUtils.isNoneBlank(dubboParam.getUrl())) {
reference.setUrl(dubboParam.getUrl());//设置url
}
if (StringUtils.isNoneBlank(dubboParam.getCluster())) {
reference.setCluster(dubboParam.getCluster());//设置Cluster type
}
Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout
Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires
Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent
}
try {
//获取GenericService
Object obj = reference.get();
if (Objects.nonNull(obj)) {
LOG.info("init apache dubbo reference success there meteData is :{}", metaData);
//缓存当前的reference
cache.put(metaData.getPath(), reference);
}
} catch (Exception e) {
LOG.error("init apache dubbo reference exception", e);
}
return reference;
}
//......
}

2.6 执行ResponsePlugin

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

响应结果由ResponsePlugin插件处理。

    @Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// 根据rpc类型处理结果
return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
}

处理类型由MessageWriter决定,类继承关系如下:

  • MessageWriter:接口,定义消息处理方法;
  • NettyClientMessageWriter:处理Netty调用结果;
  • RPCMessageWriter:处理RPC调用结果;
  • WebClientMessageWriter:处理WebClient调用结果;

Dubbo服务调用,处理结果当然是RPCMessageWriter了。

  • org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()

writeWith()方法中处理响应结果。


public class RPCMessageWriter implements MessageWriter {

@Override
public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
Object result = exchange.getAttribute(Constants.RPC_RESULT); //获取结果
if (Objects.isNull(result)) { //处理异常
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
return WebFluxResultUtils.result(exchange, result);//返回结果
}));
}
}

分析至此,关于Dubbo插件的源码分析就完成了,分析流程图如下:

3. 小结

本文源码分析从Dubbo服务注册开始,到Dubbo插件的服务调用。Dubbo插件主要用来处理将http请求转成dubbo协议,主要逻辑是通过泛化调用实现。

· One min read

在 shenyu 网关中,启动该插件,shenyu 将成为一个功能丰富的 mcpServer, 你可以通过简单配置来将一个服务作为 tool 注册到 shenyu 网关中,并使用网关提供的扩展功能。

本文基于shenyu-2.7.0.2版本进行源码分析, 在本篇中我将追踪 Shenyu Mcp 插件链路,对 Mcp 插件的 sse 通信方式进行源码分析

前言

shenyu 网关的 mcp 插件基于 spring-ai-mcp 扩展而来,为了更好的了解 mcp 插件的工作原理 ,我将简单介绍 mcp 官方提供的 jdk 中各个 java 类是如何协同运作的

我想先简单介绍三个 Mcp 官方提供的 java 类

  1. McpServer

该类负责管理,tool,Resource,promote 等资源

  1. TransportProvider

根据客户端和服务端之间通信协议,提供之相对应的通讯方法

  1. Session

处理请求数据、响应数据和通知数据,提供一些基本方法和其对应的处理器,查询工具,调用工具都在此处执行

1. 服务注册

在 shenyu admin 的 McpServer 中插件填写 endpoint 和 tool 信息后,这些信息将自动注册到 shenyu bootstrap 中, 数据同步源码可以参考官网websocket数据同步

shenyu bootstrap 将在 McpServerPluginDataHandlerhandler() 方法中接收到 admin 同步来的数据。

handlerSelector() 方法接收 url 数据创建 McpServer

handlerRule() 方法接收 tool 信息,注册 tool

这两个方法共同组成了 Shenyu Mcp 插件的服务注册部分,下面我将对这个两个方法,详细展开分析

1.1 Transport,McpServer注册

我们先来分析 handlerSelector() 方法,也就是 McpServer 的注册

  • handlerSelector() 方法 工作内容如下
  1. 捕捉用户在 Selector 上的填写的 url,这个 url 将作为一个 key 存储 McpServer TransportProvider 等信息
  2. 序列化创建 ShenyuMcpServerShenyuMcpServer 将 SelectorId 和这些 url 也就是这些 key 绑定,以此来实现 selectorId 和 key 的绑定。

注意 ShenyuMcpServer 是 Shenyu 用于绑定 SelectorId 和 url 的对象,和 McpServer 没有继承关系,功能也完全不同

  1. 调用 ShenyuMcpServerManagergetOrCreateMcpServerTransport() 方法注册 McpServer TransportProvider
public class McpServerPluginDataHandler implements PluginDataHandler {
@Override
public void handlerSelector(final SelectorData selectorData) {
// 获取URI
String uri = selectorData.getConditionList().stream()
.filter(condition -> Constants.URI.equals(condition.getParamType()))
.map(ConditionData::getParamValue)
.findFirst()
.orElse(null);

// 构建McpServer
ShenyuMcpServer shenyuMcpServer = GsonUtils.getInstance().fromJson(Objects.isNull(selectorData.getHandle()) ? DEFAULT_MESSAGE_ENDPOINT : selectorData.getHandle(), ShenyuMcpServer.class);
shenyuMcpServer.setPath(path);
// 缓存shenyuMcpServer
CACHED_SERVER.get().cachedHandle(
selectorData.getId(),
shenyuMcpServer);
String messageEndpoint = shenyuMcpServer.getMessageEndpoint();
// 尝试获取或者注册transportProvider
shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint);
}

}

ShenyuMcpServerManager 该类是 ShenYu 中 McpServer 的管理中心,不仅储存了 McpAsyncServer CompositeTransportProvider 等内容,注册 Transport 和 McpServer 的方法也在其中

  • getOrCreateMcpServerTransport() 方法工作内容具体如下
  1. 处理传递来的 url 去除/streamablehttp 以及 /message后缀 使其恢复为原始的 url
  2. 尝试获取 CompositeTransportProvider 对象,该对象是 Transport 的复合对象,包含了多种协议对应的 Transport
  3. 如果没有获取到,则调用 createSseTransport() 方法创建 CompositeTransportProvider 对象
  4. 创建 McpAsyncServer 对象,保存 Transport 对象到 Map 中,将 Transport 注册到 McpAsyncServer
@Component
public class ShenyuMcpServerManager {
public ShenyuSseServerTransportProvider getOrCreateMcpServerTransport(final String uri, final String messageEndPoint) {
// 去除/streamablehttp 以及 /message后缀
String normalizedPath = processPath(uri);
return getOrCreateTransport(normalizedPath, SSE_PROTOCOL,
() -> createSseTransport(normalizedPath, messageEndPoint));
}

private <T> T getOrCreateTransport(final String normalizedPath, final String protocol,
final java.util.function.Supplier<T> transportFactory) {
// 获取复合Transport实例
CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(normalizedPath);

T transport = switch (protocol) {
case SSE_PROTOCOL -> (T) compositeTransport.getSseTransport();
case STREAMABLE_HTTP_PROTOCOL -> (T) compositeTransport.getStreamableHttpTransport();
default -> null;
};
// 如果缓存中没有该实例,则需要重新创建
if (Objects.isNull(transport)) {
// 调用createSseTransport()方法,创建一个新的transport并存储
transport = transportFactory.get();
// 创建McpAsyncServer,并注册transport
addTransportToSharedServer(normalizedPath, protocol, transport);
}

return transport;
}
}
1.1.1 Transport注册
  • createSseTransport() 方法

该方法在 getOrCreateMcpServerTransport() 方法被调用,用于创建 Transport


@Component
public class ShenyuMcpServerManager {
private ShenyuSseServerTransportProvider createSseTransport(final String normalizedPath, final String messageEndPoint) {
String messageEndpoint = normalizedPath + messageEndPoint;
ShenyuSseServerTransportProvider transportProvider = ShenyuSseServerTransportProvider.builder()
.objectMapper(objectMapper)
.sseEndpoint(normalizedPath)
.messageEndpoint(messageEndpoint)
.build();
// 向Manager的routeMap中注册transportProvider的两个函数
registerRoutes(normalizedPath, messageEndpoint, transportProvider::handleSseConnection, transportProvider::handleMessage);
return transportProvider;
}
}
1.1.2 mcpServer注册
  • addTransportToSharedServer() 方法

该方法在 getOrCreateMcpServerTransport() 方法被调用,用于创建 McpServer 并保存

该方法创建了一个新的 McpServer并存储到 sharedServerMap 中,并将上一步得到的 TransportProvider 存入 compositeTransportMap

@Component
public class ShenyuMcpServerManager {
private void addTransportToSharedServer(final String normalizedPath, final String protocol, final Object transportProvider) {
// 获取或者创建并注册 McpServer
getOrCreateSharedServer(normalizedPath);

// 将新增的传输协议存进compositeTransportMap中
compositeTransport.addTransport(protocol, transportProvider);

}

private McpAsyncServer getOrCreateSharedServer(final String normalizedPath) {
return sharedServerMap.computeIfAbsent(normalizedPath, path -> {
// 获取传输协议
CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(path);

// 选择Server拥有的能力
var capabilities = McpSchema.ServerCapabilities.builder()
.tools(true)
.logging()
.build();

// 创建McpServer并存储
McpAsyncServer server = McpServer
.async(compositeTransport)
.serverInfo("MCP Shenyu Server (Multi-Protocol)", "1.0.0")
.capabilities(capabilities)
.tools(Lists.newArrayList())
.build();

return server;
});
}
}

1.2 Tools注册

  • handlerRule() 方法 工作内容如下
  1. 捕捉用户在 Tool 上的填写的 tool 配置信息,这些信息将全部用于 tool 的构建
  2. 序列化创建 ShenyuMcpServerTool 获取 tool 信息

注意 ShenyuMcpServerTool 也是 Shenyu 存储 tool 信息的工具,和 McpServerTool 没有继承关系

  1. 调用 addTool() 方法, 利用该 tool 信息创建 tool,并根据 SelectorId 将 tool 注册到与之匹配的 McpServer 中
public class McpServerPluginDataHandler implements PluginDataHandler {
@Override
public void handlerRule(final RuleData ruleData) {
Optional.ofNullable(ruleData.getHandle()).ifPresent(s -> {
// 序列化一个新的 ShenyuMcpServerTool
ShenyuMcpServerTool mcpServerTool = GsonUtils.getInstance().fromJson(s, ShenyuMcpServerTool.class);
// 缓存mcpServerTool
CACHED_TOOL.get().cachedHandle(CacheKeyUtils.INST.getKey(ruleData), mcpServerTool);
// 获取并构建 mcp schema
List<McpServerToolParameter> parameters = mcpServerTool.getParameters();
String inputSchema = JsonSchemaUtil.createParameterSchema(parameters);
ShenyuMcpServer server = CACHED_SERVER.get().obtainHandle(ruleData.getSelectorId());
if (Objects.nonNull(server)) {
// 向Manager的sharedServerMap中存储Tool信息
shenyuMcpServerManager.addTool(server.getPath(),
StringUtils.isBlank(mcpServerTool.getName()) ? ruleData.getName()
: mcpServerTool.getName(),
mcpServerTool.getDescription(),
mcpServerTool.getRequestConfig(),
inputSchema);
}
});
}
}
  • addTool()方法

该方法被 handlerRule() 方法调用,用于新增工具

该方法做了下述工作

  1. 将上一步传来的 tool 信息转换为 shenyuToolDefinition 对象
  2. 利用转换来的 shenyuToolDefinition 对象创建 ShenyuToolCallback 对象

ShenyuToolCallback 重写了 ToolCallBackcall() 方法,并将该 call() 方法注册到了 AsyncToolSpecification 中, 此后调用 tool 的 call() 方法,则实际会调用这个重写的 call() 方法

  1. ShenyuToolCallback 转换为 AsyncToolSpecification 并注册到相关的 mcpServer 中
public class McpServerPluginDataHandler implements PluginDataHandler {
public void addTool(final String serverPath, final String name, final String description,
final String requestTemplate, final String inputSchema) {
String normalizedPath = normalizeServerPath(serverPath);
// 构建Definition对象
ToolDefinition shenyuToolDefinition = ShenyuToolDefinition.builder()
.name(name)
.description(description)
.requestConfig(requestTemplate)
.inputSchema(inputSchema)
.build();

ShenyuToolCallback shenyuToolCallback = new ShenyuToolCallback(shenyuToolDefinition);

// 获取到先前注册的 McpServer, 并向其中注册Tool
McpAsyncServer sharedServer = sharedServerMap.get(normalizedPath);
for (AsyncToolSpecification asyncToolSpecification : McpToolUtils.toAsyncToolSpecifications(shenyuToolCallback)) {
sharedServer.addTool(asyncToolSpecification).block();
}
}
}

到此为止,服务注册分析完毕

服务注册一图览

2. 插件调用

客户端先后会发送后缀为 /sse/message 的两种消息,这两种消息都会被 Shenyu McpServer plugin 捕捉,Shenyu McpServer plugin 会对 /sse 消息和 /message 消息做不同处理。收到 /sse 消息时 plugin 会创建 session 对象并保存,最后返回 session id 供 message 消息使用。收到 /message 消息时,会根据 /message 消息携带的 method 信息,选择执行的方法 如:获取工作列表,工具调用,获取资源列表等等

  • doExecute() 方法 工作内容如下
  1. 路径匹配,判断 mcp plugin 是否注册该路径
  2. 调用 routeByProtocol() 方法,根据请求协议选择合适的处理方案

本篇是对 sse 请求方式的解析,因此接着进入 handleSseRequest() 方法

public class McpServerPlugin extends AbstractShenyuPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule) {
final String uri = exchange.getRequest().getURI().getRawPath();
// 判断 Mcp 插件是否注册了该路由规则,没有则不执行
if (!shenyuMcpServerManager.canRoute(uri)) {
return chain.execute(exchange);
}
final ServerRequest request = ServerRequest.create(exchange, messageReaders);
// 根据 uri 判断路由协议,选择对应的处理方案
return routeByProtocol(exchange, chain, request, selector, uri);
}

private Mono<Void> routeByProtocol(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final ServerRequest request,
final SelectorData selector,
final String uri) {

if (isStreamableHttpProtocol(uri)) {
return handleStreamableHttpRequest(exchange, chain, request, uri);
} else if (isSseProtocol(uri)) {
// 处理sse请求
return handleSseRequest(exchange, chain, request, selector, uri);
}
}
}

handlerSseRequest() 方法

该方法由 routeByProtocol() 方法调用,根据请求后缀判断客户端是要创建 session 还是调用工具

public class McpServerPlugin extends AbstractShenyuPlugin {
private Mono<Void> handleSseRequest(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final ServerRequest request,
final SelectorData selector,
final String uri) {
ShenyuMcpServer server = McpServerPluginDataHandler.CACHED_SERVER.get().obtainHandle(selector.getId());
String messageEndpoint = server.getMessageEndpoint();
// 获取传输者
ShenyuSseServerTransportProvider transportProvider
= shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint);
// 根据请求的后缀判断是 sse 连接请求还是 message 调用请求
if (uri.endsWith(messageEndpoint)) {
setupSessionContext(exchange, chain);
return handleMessageEndpoint(exchange, transportProvider, request);
} else {
return handleSseEndpoint(exchange, transportProvider, request);
}
}
}

2.1 客户端发送 sse 请求

如果我们客户端发送的是后缀为 /sse 的请求,那么将会执行 handleSseEndpoint() 方法

  • handleSseEndpoint() 方法主要做了如下工作
  1. 配置 sse 请求头
  2. 调用 ShenyuSseServerTransportProvidercreateSseFlux() 创建 sse 流
public class McpServerPlugin extends AbstractShenyuPlugin {
private Mono<Void> handleSseEndpoint(final ServerWebExchange exchange,
final ShenyuSseServerTransportProvider transportProvider,
final ServerRequest request) {
// 配置 sse 请求头
configureSseHeaders(exchange);

// 创建 sse 流
return exchange.getResponse()
.writeWith(transportProvider
.createSseFlux(request));
}
}
  • createSseFlux() 方法

该方法被 handleSseEndpoint()调用 主要用于创建并保存 session

  1. 创建 session ,创建 session 的工厂在创建 session 时会将一系列 handler 注册到 session 中,这些 handler 是真正执行 callTool 的对象
  2. 将 session 保存,session复用
  3. 将 session id 作为 endpoint 的请求参数返回给客户端,在调用 message 方法时会使用该 endpoint
public class ShenyuSseServerTransportProvider implements McpServerTransportProvider {
public Flux<ServerSentEvent<?>> createSseFlux(final ServerRequest request) {
return Flux.<ServerSentEvent<?>>create(sink -> {
WebFluxMcpSessionTransport sessionTransport = new WebFluxMcpSessionTransport(sink);
// 创建 McpServerSession 并暂存插件链信息
McpServerSession session = sessionFactory.create(sessionTransport);
String sessionId = session.getId();
sessions.put(sessionId, session);

// 将 session id等信息传递回客户端
String endpointUrl = this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId;
ServerSentEvent<String> endpointEvent = ServerSentEvent.<String>builder()
.event(ENDPOINT_EVENT_TYPE)
.data(endpointUrl)
.build();
}).doOnSubscribe(subscription -> LOGGER.info("SSE Flux subscribed"))
.doOnRequest(n -> LOGGER.debug("SSE Flux requested {} items", n));
}
}

2.2 客户端发送 message 请求

如果我们客户端发送的是后缀为 /message 的请求,那么将会执行 把当前 ShenyuPluginChain 信息存入 session 中,并调用 handleMessageEndpoint() 方法, 后续工具调用时会继续执行该插件链,因此 mcp plugin 后的插件会对进入 tool 的请求造成影响

  • handleMessageEndpoint() 方法,调用 ShenyuSseServerTransportProviderhandleMessageEndpoint() 方法
if (uri.endsWith(messageEndpoint)) {
setupSessionContext(exchange, chain);
return handleMessageEndpoint(exchange, transportProvider, request);
}
public class McpServerPlugin extends AbstractShenyuPlugin {
private Mono<Void> handleMessageEndpoint(final ServerWebExchange exchange,
final ShenyuSseServerTransportProvider transportProvider,
final ServerRequest request) {
// 处理message请求
return transportProvider.handleMessageEndpoint(request)
.flatMap(result -> {
return exchange.getResponse()
.writeWith(Mono.just(exchange.getResponse().bufferFactory().wrap(responseBody.getBytes())));
});
}
}
  • handleMessageEndpoint() 方法

该方法由 McpServerPlugin.handleMessageEndpoint() 调用,将请求交给 session 处理

session 的 handler() 方法会对 message 的不同,而进行对应的操作 例如 : 当 message 中 method 是 "tools/call" 时,则会使用工具调用的 handler() 执行 call() 方法调用工具 相关源码在此不过多赘述

public class ShenyuSseServerTransportProvider implements McpServerTransportProvider {
public Mono<MessageHandlingResult> handleMessageEndpoint(final ServerRequest request) {
// 获取到session
String sessionId = request.queryParam("sessionId").get();
McpServerSession session = sessions.get(sessionId);
return request.bodyToMono(String.class)
.flatMap(body -> {
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);
return session.handle(message);
});
}
}

至此 Shenyu Mcp Plugin 服务调用源码分析完毕

流程图一览

3. 工具调用

如果客户端传递的消息是调用工具的消息,那么 session 将使用工具调用的 handler() 并执行 tool 的 call() 方法, 在服务注册中,我们说明了 tool 在被调用时,实际执行的是 ShenyuToolCallback()call() 方法

因此执行工具调用时会执行以下方法

  • call() 主要工作内容如下
  1. 获取 session id
  2. 获取 requestTemplate 即 shenyu 提供的额外功能的配置信息
  3. 获取上一步暂存的 shenyu 插件链,并将工具调用的信息交给插件链继续执行
  4. 异步等待工具响应

插件链执行完成后,会将调用 tool 请求真正的发送到 tool 所在的服务之中

public class ShenyuToolCallback implements ToolCallback {
@NonNull
@Override
public String call(@NonNull final String input, final ToolContext toolContext) {
// 从 mcp 请求中提取 sessionId
final McpSyncServerExchange mcpExchange = extractMcpExchange(toolContext);
final String sessionId = extractSessionId(mcpExchange);
// 提取requestTemplate信息
final String configStr = extractRequestConfig(shenyuTool);

// 利用sessionId 获取到先前暂存的插件执行链
final ServerWebExchange originExchange = getOriginExchange(sessionId);
final ShenyuPluginChain chain = getPluginChain(originExchange);

// 执行工具调用
return executeToolCall(originExchange, chain, sessionId, configStr, input);

}

private String executeToolCall(final ServerWebExchange originExchange,
final ShenyuPluginChain chain,
final String sessionId,
final String configStr,
final String input) {

final CompletableFuture<String> responseFuture = new CompletableFuture<>();
final ServerWebExchange decoratedExchange = buildDecoratedExchange(
originExchange, responseFuture, sessionId, configStr, input);
// 执行插件链,调用实际工具
chain.execute(decoratedExchange)
.subscribe();

// 等待响应
final String result = responseFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
return result;

}
}

至此Shenyu MCP Plugin 工具调用分析完毕


4. 小结

本文源码分析从 mcp 服务注册开始,到 mcp 插件的服务调用,再到 tool 的调用。 mcpServer 插件让 shenyu 成为一个功能强大,集中管理的 mcpServer。


· One min read
Kunshuai Zhu

开始前,可以参考 这篇文章 运行shenyu网关

正文

先看一下这个插件的结构,如下图。

param-mapping-structure

猜测:handler是用来做数据同步的;strategy中文意思是策略,可能是对各种请求体做了适配,应该是这个插件的重点;ParamMappingPlugin 应该是 ShenyuPlugin 的实现。

首先,看一下 ParamMappingPlugin ,里面主要是对 doExecute 方法的重写。

public Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
... // paramMappingHandle判断是否为空
// 根据首部行中的contentType确定请求体类型
HttpHeaders headers = exchange.getRequest().getHeaders();
MediaType contentType = headers.getContentType();
// *
return match(contentType).apply(exchange, chain, paramMappingHandle);
}
  • match方法是根据contentType返回对应的 Operator

    private Operator match(final MediaType mediaType) {
    if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
    return operatorMap.get(MediaType.APPLICATION_JSON.toString());
    } else if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
    return operatorMap.get(MediaType.APPLICATION_FORM_URLENCODED.toString());
    } else {
    return operatorMap.get(Constants.DEFAULT);
    }
    }

    从match方法的代码可以看出,目前有 DefaultOperatorFormDataOperatorJsonOperator三种,支持 x-www-form-urlencodedjson 两种格式的请求体。

那么我们就来看一下上面三种Operator究竟是怎么样的吧。

一、DefaultOperator

虚晃一枪,它的apply方法只是继续执行插件链,并没有实质功能。当请求体没有匹配到Operator时,就会通过 DefaultOperator 跳过。

二、FormDataOperator

这个类是用来处理 x-www-form-urlencoded 格式的请求体的。

主要是看apply方法,但是这个apply方法长得有点奇怪。

public Mono<Void> apply(final ServerWebExchange exchange, final ShenyuPluginChain shenyuPluginChain, final ParamMappingHandle paramMappingHandle) {
return exchange.getFormData()
.switchIfEmpty(Mono.defer(() -> Mono.just(new LinkedMultiValueMap<>())))
.flatMap(multiValueMap -> {
...
});
}

省略号中的代码是对请求体的处理,如下。

// 判空
if (Objects.isNull(multiValueMap) || multiValueMap.isEmpty()) {
return shenyuPluginChain.execute(exchange);
}
// 将form-data转化成json
String original = GsonUtils.getInstance().toJson(multiValueMap);
LOG.info("get from data success data:{}", original);
// *修改请求体*
String modify = operation(original, paramMappingHandle);
if (StringUtils.isEmpty(modify)) {
return shenyuPluginChain.execute(exchange);
}
...
// 将修改后的json,转换成LinkedMultiValueMap。注意一下这一行,后面会提到!
LinkedMultiValueMap<String, String> modifyMap = GsonUtils.getInstance().toLinkedMultiValueMap(modify);
...
final BodyInserter bodyInserter = BodyInserters.fromValue(modifyMap);
...
// 修改exchange中的请求体,然后继续执行插件链
return bodyInserter.insert(cachedBodyOutputMessage, new BodyInserterContext())
.then(Mono.defer(() -> shenyuPluginChain.execute(exchange.mutate()
.request(new ModifyServerHttpRequestDecorator(httpHeaders, exchange.getRequest(), cachedBodyOutputMessage))
.build())
)).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(cachedBodyOutputMessage, throwable));

PS: 省略的部分是设置请求头等操作。

上面比较重要的应该是打星的修改请求体,也就是 operation 方法的调用。这里因为参数类型的原因,会先调用 Operator 接口的默认方法(而不是 FormDataOperator 重写的)。

default String operation(final String jsonValue, final ParamMappingHandle paramMappingHandle) {
DocumentContext context = JsonPath.parse(jsonValue);
// 调用重写的operation方法,添加addParameterKey
operation(context, paramMappingHandle);
// 对设置的replacedParameterKey进行替换
if (!CollectionUtils.isEmpty(paramMappingHandle.getReplaceParameterKeys())) {
paramMappingHandle.getReplaceParameterKeys().forEach(info -> {
context.renameKey(info.getPath(), info.getKey(), info.getValue());
});
}
// 对设置的removeParameterKey进行删除
if (!CollectionUtils.isEmpty(paramMappingHandle.getRemoveParameterKeys())) {
paramMappingHandle.getRemoveParameterKeys().forEach(info -> {
context.delete(info);
});
}
return context.jsonString();
}

梳理下来可以发现,这里引入的json工具JsonPath使得请求体的加工变得简单、清晰很多。

另外,我们可以注意到 FormDataOperator 重写了 operation(DocumentContext, ParamMappingHandle) 方法。

为什么要重写呢? 接口中有对应处理addParameterKey的默认方法啊。

// Operator接口中的默认方法
default void operation(final DocumentContext context, final ParamMappingHandle paramMappingHandle) {
if (!CollectionUtils.isEmpty(paramMappingHandle.getAddParameterKeys())) {
paramMappingHandle.getAddParameterKeys().forEach(info -> {
context.put(info.getPath(), info.getKey(), info.getValue()); //不同之处
});
}
}

// FormDataOperator重写的方法
@Override
public void operation(final DocumentContext context, final ParamMappingHandle paramMappingHandle) {
if (!CollectionUtils.isEmpty(paramMappingHandle.getAddParameterKeys())) {
paramMappingHandle.getAddParameterKeys().forEach(info -> {
context.put(info.getPath(), info.getKey(), Arrays.asList(info.getValue()));
});
}
}

实际上,在 FormDataOperator#apply 中有这么一行(前面有提到):LinkedMultiValueMap<String, String> modifyMap = GsonUtils.getInstance().toLinkedMultiValueMap(modify);

这一行是将修改后的json转换成 LinkedMultiValueMapGsonUtils#toLinkedMultiValueMap 如下。

public LinkedMultiValueMap<String, String> toLinkedMultiValueMap(final String json) {
return GSON.fromJson(json, new TypeToken<LinkedMultiValueMap<String, String>>() {
}.getType());
}

LinkedMultiValueMap 类中的属性 targetMap 定义为:private final Map<K, List<V>> targetMap

因此,json字符串中的value必须是列表形式的,不然Gson就会抛出转换错误的异常,这也就是为什么 FormDataOperator 要重写operator方法。

那么为什么要用 LinkedMultiValueMap 呢?

回到 FormDataOperator#apply 方法的第一行 exchange.getFormData 。而SpringMVC中,DefaultServerWebExchange#getFormData 的返回值类型就是 Mono<MultiValueMap<String, String>> ,而 LinkedMultiValueMapMultiValueMap 的子类。并且,getFormData 方法就是针对 x-www-form-urlencoded 格式的请求体的。

param-mapping-getFormData

三、JsonOperator

显然,这个类是用来处理Json格式的请求体的。

public Mono<Void> apply(final ServerWebExchange exchange, final ShenyuPluginChain shenyuPluginChain, final ParamMappingHandle paramMappingHandle) {
ServerRequest serverRequest = ServerRequest.create(exchange, MESSAGE_READERS);
Mono<String> mono = serverRequest.bodyToMono(String.class).switchIfEmpty(Mono.defer(() -> Mono.just(""))).flatMap(originalBody -> {
LOG.info("get body data success data:{}", originalBody);
// 调用默认的operation方法修改请求体
String modify = operation(originalBody, paramMappingHandle);
return Mono.just(modify);
});
BodyInserter bodyInserter = BodyInserters.fromPublisher(mono, String.class);
... //处理首部行
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
// 修改exchange中的请求体,然后继续执行插件链
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
ServerHttpRequestDecorator decorator = new ModifyServerHttpRequestDecorator(headers, exchange.getRequest(), outputMessage);
return shenyuPluginChain.execute(exchange.mutate().request(decorator).build());
})).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(outputMessage, throwable));
}

JsonOperator 的处理流程与 FormDataOperator 大致类似。

总结

最后,用一张图来简单总结一下。

param-mapping-summary

· One min read

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,注册中心是用于将客户端信息注册到shenyu-adminadmin再通过数据同步将这些信息同步到网关,网关通过这些数据完成流量筛选。客户端信息主要包括接口信息URI信息

本文基于shenyu-2.5.0版本进行源码分析,官网的介绍请参考 客户端接入原理

1. 注册中心原理

当客户端启动时,读取接口信息和uri信息,通过指定的注册类型,将数据发送到shenyu-admin

图中的注册中心需要用户指定使用哪种注册类型,ShenYu当前支持HttpZookeeperEtcdConsulNacos进行注册。具体如何配置请参考 客户端接入配置

ShenYu在注册中心的原理设计上引入了DisruptorDisruptor队列在其中起到数据与操作解耦,利于扩展。如果注册请求过多,导致注册异常,也有数据缓冲作用。

如图所示,注册中心分为两个部分,一是注册中心客户端register-client,负责处理客户端数据读取。另一个是注册中心服务端register-server,负责处理服务端(就是shenyu-admin)数据写入。通过指定注册类型进行数据发送和接收。

  • 客户端:通常来说就是一个微服务,可以是springmvcspring-clouddubbogrpc等。
  • register-client:注册中心客户端,读取客户接口和uri信息。
  • Disruptor:数据与操作解耦,数据缓冲作用。
  • register-server:注册中心服务端,这里就是shenyu-admin,接收数据,写入数据库,发数据同步事件。
  • 注册类型:指定注册类型,完成数据注册,当前支持HttpZookeeperEtcdConsulNacos

本文分析的是使用Http的方式进行注册,所以具体的处理流程如下:

在客户端,数据出队列后,通过http传输数据,在服务端,提供相应的接口,接收数据,然后写入队列。

2. 客户端注册流程

当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。官方提供的例子是一个由springboot构建的微服务。注册中心的相关配置可以参考官网 客户端接入配置

2.1 加载配置,读取属性

先用一张图串联下注册中心客户端初始化流程:

我们分析的是通过http的方式进行注册,所以需要进行如下配置:

shenyu:
register:
registerType: http
serverLists: http://localhost:9095
props:
username: admin
password: 123456
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false

每个属性表示的含义如下:

  • registerType: 服务注册类型,填写 http
  • serverList: 为http注册类型时,填写Shenyu-Admin项目的地址,注意加上http://,多个地址用英文逗号分隔。
  • username: Shenyu-Admin用户名
  • password: Shenyu-Admin用户对应的密码
  • port: 你本项目的启动端口,目前springmvc/tars/grpc需要进行填写。
  • contextPath: 为你的这个mvc项目在shenyu网关的路由前缀, 比如/order/product 等等,网关会根据你的这个前缀来进行路由。
  • appName:你的应用名称,不配置的话,会默认取 spring.application.name 的值。
  • isFull: 设置 true 代表代理你的整个服务,false表示代理你其中某几个controller;目前适用于springmvc/springcloud

项目启动后,会先加载配置文件,读取属性信息,生成相应的Bean

首先读取到的配置文件是 ShenyuSpringMvcClientConfiguration,它是shenyu 客户端http注册配置类,通过@Configuration表示这是一个配置类,通过@ImportAutoConfiguration引入其他配置类。创建SpringMvcClientEventListener,主要处理元数据和 URI 信息。

/**
* shenyu 客户端http注册配置类
*/
@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {

// 创建SpringMvcClientEventListener,主要处理元数据和URI信息
@Bean
public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
}
}

ShenyuClientCommonBeanConfigurationshenyu客户端通用配置类,会创建注册中心客户端通用的bean

  • 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
  • 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置。
  • 创建ShenyuClientConfig,读取shenyu.client属性配置。

/**
* shenyu客户端通用配置类
*/
@Configuration
public class ShenyuClientCommonBeanConfiguration {

// 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
@Bean
public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
return ShenyuClientRegisterRepositoryFactory.newInstance(config);
}

// 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}

// 创建ShenyuClientConfig,读取shenyu.client属性配置
@Bean
@ConfigurationProperties(prefix = "shenyu")
public ShenyuClientConfig shenyuClientConfig() {
return new ShenyuClientConfig();
}
}

2.2 用于注册的 HttpClientRegisterRepository

上面的配置文件中生成的ShenyuClientRegisterRepository是客户端注册的具体实现,它是一个接口,它的实现类如下。

  • HttpClientRegisterRepository:通过http进行注册;
  • ConsulClientRegisterRepository:通过Consul进行注册;
  • EtcdClientRegisterRepository:通过Etcd进行注册;
  • NacosClientRegisterRepository:通过nacos进行注册;
  • ZookeeperClientRegisterRepository通过Zookeeper进行注册。

具体是哪一种方式,是通过SPI进行加载实现的,实现逻辑如下:


/**
* 加载 ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {

private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();

/**
* 创建 ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// 通过SPI的方式进行加载,类型由registerType决定
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
//执行初始化操作
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}

加载类型通过registerType指定,也就是我们在配置文件中指定的类型:

shenyu:
register:
registerType: http
serverLists: http://localhost:9095

我们指定的是http,所以会去加载HttpClientRegisterRepository。对象创建成功后,执行的初始化方法init()如下:

@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {

@Override
public void init(final ShenyuRegisterCenterConfig config) {
this.username = config.getProps().getProperty(Constants.USER_NAME);
this.password = config.getProps().getProperty(Constants.PASS_WORD);
this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
this.setAccessToken();
}

// 暂时省略其他逻辑
}

读取配置文件中的usernamepasswordserverLists,即sheenyu-admin的访问账号、密码和地址信息,为后续数据发送做准备。类注解@Join用于SPI的加载。

SPI 全称为 Service Provider Interface, 是 JDK 内置的一种服务提供发现功能, 一种动态替换发现的机制。

shenyu-spiApache ShenYu网关自定义的SPI扩展实现,设计和实现原理参考了DubboSPI扩展实现

2.3 构建 元数据 和 URI信息 的 SpringMvcClientEventListener

创建 SpringMvcClientEventListener,负责客户端 元数据URI 数据的构建和注册,它的创建是在配置文件中完成。

@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {
// ......

// 创建 SpringMvcClientEventListener
@Bean
public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
}
}
  • SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener

AbstractContextRefreshedEventListener是一个抽象类,它实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。它的实现目前有八种,每一种表示对应的RPC调用协议的 元数据URI 信息的注册。

  • AlibabaDubboServiceBeanListener:处理使用Alibaba Dubbo协议;
  • ApacheDubboServiceBeanListener:处理使用Apacge Dubbo协议;
  • GrpcClientEventListener:处理使用grpc协议;
  • MotanServiceEventListener:处理使用Mortan协议;
  • SofaServiceEventListener:处理使用Sofa协议;
  • SpringMvcClientEventListener:处理使用http协议;
  • SpringWebSocketClientEventListener:处理使用websocket协议;
  • TarsServiceBeanEventListener:处理使用Tars注册类型;
// 实现了ApplicationListener接口
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {

//......

//构造函数
public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 读取 shenyu.client.http 配置信息
Properties props = clientConfig.getProps();
// appName 应用名称
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
// contextPath上下文路径
this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");
if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
String errorMsg = "client register param must config the appName or contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);
// host信息
this.host = props.getProperty(ShenyuClientConstants.HOST);
// port 客户端端口信息
this.port = props.getProperty(ShenyuClientConstants.PORT);
// 开始事件发布
publisher.start(shenyuClientRegisterRepository);
}

// 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {
//保证该方法的内容只执行一次
if (!registered.compareAndSet(false, true)) {
return;
}
final ApplicationContext context = event.getApplicationContext();
// 获取声明RPC调用的类
Map<String, T> beans = getBeans(context);
if (MapUtils.isEmpty(beans)) {
return;
}
// 构建URI数据并注册
publisher.publishEvent(buildURIRegisterDTO(context, beans));
// 构建元数据并注册
beans.forEach(this::handle);
}

// 交给不同的子类实现
@SuppressWarnings("all")
protected abstract URIRegisterDTO buildURIRegisterDTO(ApplicationContext context,
Map<String, T> beans);


protected void handle(final String beanName, final T bean) {
Class<?> clazz = getCorrectedClass(bean);
// 获取当前bean的对应shenyu客户端的注解(对应不同的RPC调用注解不一样,像http的就是@ShenyuSpringMvcClient,而像SpringCloud的则是@ShenyuSpringCloudClient)
final A beanShenyuClient = AnnotatedElementUtils.findMergedAnnotation(clazz, getAnnotationType());
// 根据bean获取对应的path(不同子类实现不一样)
final String superPath = buildApiSuperPath(clazz, beanShenyuClient);
// 如果包含Shenyu客户端注解或者path中包括'*',表示注册整个类的接口
if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {
// 构建类的元数据,发送注册事件
handleClass(clazz, bean, beanShenyuClient, superPath);
return;
}
// 获取当前bean的所有方法
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
// 遍历方法
for (Method method : methods) {
// 注册符合条件的方法
handleMethod(bean, clazz, beanShenyuClient, method, superPath);
}
}

// 构建类元数据并注册的默认实现
protected void handleClass(final Class<?> clazz,
final T bean,
@NonNull final A beanShenyuClient,
final String superPath) {
publisher.publishEvent(buildMetaDataDTO(bean, beanShenyuClient, pathJoin(contextPath, superPath), clazz, null));
}

// 构建方法元数据并注册的默认实现
protected void handleMethod(final T bean,
final Class<?> clazz,
@Nullable final A beanShenyuClient,
final Method method,
final String superPath) {
// 如果方法上有Shenyu客户端注解,就表示该方法需要注册
A methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, getAnnotationType());
if (Objects.nonNull(methodShenyuClient)) {
// 构建元数据,发送注册事件
publisher.publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));
}
}

// 交给不同子类实现
protected abstract MetaDataRegisterDTO buildMetaDataDTO(T bean,
@NonNull A shenyuClient,
String path,
Class<?> clazz,
Method method);
}

在构造函数中主要是读取属性配置。

shenyu:
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false

最后,执行了publisher.start(),开始事件发布,为注册做准备。

  • ShenyuClientRegisterEventPublisher

ShenyuClientRegisterEventPublisher通过单例模式实现,主要是生成元数据和URI订阅器(后续用于数据发布),然后启动Disruptor队列。提供了一个共有方法publishEvent(),发布事件,向Disruptor队列发数据。


public class ShenyuClientRegisterEventPublisher {
// 私有变量
private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();

private DisruptorProviderManage<DataTypeParent> providerManage;

/**
* 公开静态方法
*
* @return ShenyuClientRegisterEventPublisher instance
*/
public static ShenyuClientRegisterEventPublisher getInstance() {
return INSTANCE;
}

/**
* Start方法执行
*
* @param shenyuClientRegisterRepository shenyuClientRegisterRepository
*/
public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 创建客户端注册工厂类
RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();
// 添加元数据订阅器
factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
// 添加URI订阅器
factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
// 启动Disruptor队列
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}

/**
* 发布事件,向Disruptor队列发数据
*
* @param data the data
*/
public <T> void publishEvent(final DataTypeParent data) {
DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
provider.onData(data);
}
}

AbstractContextRefreshedEventListener的构造函数逻辑分析完成了,主要是读取属性配置,创建元数据URI订阅器,启动Disruptor队列。

onApplicationEvent()方法是有Spring事件发生时会执行,这里的参数是ContextRefreshedEvent,表示上下文刷新事件。当Spring容器就绪后执行此处逻辑:先构建URI数据并注册,再构建元数据并注册,

ContextRefreshedEventSpring内置事件。ApplicationContext被初始化或刷新时,该事件被触发。这也可以在 ConfigurableApplicationContext接口中使用 refresh() 方法来发生。此处的初始化是指:所有的Bean被成功装载,后处理Bean被检测并激活,所有Singleton Bean 被预实例化,ApplicationContext容器已就绪可用。

再来看AbstractContextRefreshedEventListener的http实现SpringMvcClientEventListener

public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {

private final List<Class<? extends Annotation>> mappingAnnotation = new ArrayList<>(3);

private final Boolean isFull;

private final String protocol;

// 构造函数
public SpringMvcClientEventListener(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
super(clientConfig, shenyuClientRegisterRepository);
Properties props = clientConfig.getProps();
// 获取 isFull
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// 表示是http协议的实现
this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);
mappingAnnotation.add(ShenyuSpringMvcClient.class);
mappingAnnotation.add(RequestMapping.class);
}

@Override
protected Map<String, Object> getBeans(final ApplicationContext context) {
// 配置属性,如果 isFull=true 的话,表示注册整个微服务
if (Boolean.TRUE.equals(isFull)) {
getPublisher().publishEvent(MetaDataRegisterDTO.builder()
.contextPath(getContextPath())
.appName(getAppName())
.path(PathUtils.decoratorPathWithSlash(getContextPath()))
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(getContextPath())
.build());
return null;
}
// 否则获取带Controller注解的bean
return context.getBeansWithAnnotation(Controller.class);
}

// 构造URI数据
@Override
protected URIRegisterDTO buildURIRegisterDTO(final ApplicationContext context,
final Map<String, Object> beans) {
// ...
}

@Override
protected String buildApiSuperPath(final Class<?> clazz, @Nullable final ShenyuSpringMvcClient beanShenyuClient) {
// 如果有带上Shenyu客户端注解,则优先取注解中的不为空的path属性
if (Objects.nonNull(beanShenyuClient) && StringUtils.isNotBlank(beanShenyuClient.path())) {
return beanShenyuClient.path();
}
// 如果有带上RequestMapping注解,且path属性不为空,则返回path数组的第一个值
RequestMapping requestMapping = AnnotationUtils.findAnnotation(clazz, RequestMapping.class);
if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {
return requestMapping.path()[0];
}
return "";
}

// 声明http实现的客户端注解是ShenyuSpringMvcClient
@Override
protected Class<ShenyuSpringMvcClient> getAnnotationType() {
return ShenyuSpringMvcClient.class;
}

@Override
protected void handleMethod(final Object bean, final Class<?> clazz,
@Nullable final ShenyuSpringMvcClient beanShenyuClient,
final Method method, final String superPath) {
// 获取当前bean的RequestMapping注解
final RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(method, RequestMapping.class);
// 获取当前bean的 ShenyuSpringMvcClient 注解
ShenyuSpringMvcClient methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, ShenyuSpringMvcClient.class);
methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;
//如果有 ShenyuSpringMvcClient 注解并且包含RequestMapping注解(表示是一个接口),则进行注册
if (Objects.nonNull(methodShenyuClient) && Objects.nonNull(requestMapping)) {
getPublisher().publishEvent(buildMetaDataDTO(bean, methodShenyuClient, buildApiPath(method, superPath, methodShenyuClient), clazz, method));
}
}

//...

// 构造元数据
@Override
protected MetaDataRegisterDTO buildMetaDataDTO(final Object bean,
@NonNull final ShenyuSpringMvcClient shenyuClient,
final String path, final Class<?> clazz,
final Method method) {
//...
}
}

注册逻辑都是通过 publisher.publishEvent()完成。

Controller注解和RequestMapping注解是由Spring提供的,这个大家应该很熟悉,不过多赘述。ShenyuSpringMvcClient 注解是由Apache ShenYu提供的,用于注册SpringMvc客户端,它的定义如下:


/**
* shenyu 客户端接口,用于方法上或类上
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {

// path 注册路径
@AliasFor(attribute = "path")
String value() default "";

// path 注册路径
@AliasFor(attribute = "value")
String path();

// ruleName 规则名称
String ruleName() default "";

// desc 描述信息
String desc() default "";

// enabled是否启用
boolean enabled() default true;

// registerMetaData 注册元数据
boolean registerMetaData() default false;
}

它的使用如下:

  • 注册整个接口
@RestController
@RequestMapping("/test")
@ShenyuSpringMvcClient(path = "/test/**") // 表示整个接口注册
public class HttpTestController {
//......
}
  • 注册当前方法
@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order")
public class OrderController {

/**
* Save order dto.
*
* @param orderDTO the order dto
* @return the order dto
*/
@PostMapping("/save")
@ShenyuSpringMvcClient(path = "/save", desc = "Save order") // 注册当前方法
public OrderDTO save(@RequestBody final OrderDTO orderDTO) {
orderDTO.setName("hello world save order");
return orderDTO;
}
}
  • publisher.publishEvent() 发布注册事件

该方法会将数据发送到Disruptor队列中,关于Disruptor队列更多细节这里不做更多介绍,这不影响分析注册的流程。

当数据发送后,Disruptor队列的消费者会处理数据,进行消费。

  • QueueConsumer 消费数据

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;

public interface WorkHandler<T> {
void onEvent(T event) throws Exception;
}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。


/**
*
* 队列消费者
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {

// 省略了其他逻辑

@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// 根据事件类型使用不同的线程池
ThreadPoolExecutor executor = orderly(t);
// 通过工厂创建队列消费任务
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 保存数据
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// 放在线程池中执行 消费任务
executor.execute(queueConsumerExecutor);
}
}
}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务(服务端就是admin,在下文进行分析)。

  • RegisterClientConsumerExecutor 消费者执行器

重写的run()逻辑如下:


public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {

//......

@Override
public void run() {
// 获取数据
final T data = getData();
// 根据数据类型调用相应的处理器进行处理
subscribers.get(data.getType()).executor(Lists.newArrayList(data));
}

}

根据不同的数据类型调用不同的处理器去执行相应的任务。数据类型有两种,一个是元数据,记录客户端注册信息。一个是URI数据,记录客户端服务信息。

//数据类型
public enum DataType {
// 元数据
META_DATA,

// URI数据
URI,
}
  • ExecutorSubscriber#executor() 执行器订阅者

执行器订阅者也分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

先看元数据处理

  • ShenyuClientMetadataExecutorSubscriber#executor()

客户端这边对元数据处理逻辑是:遍历元数据信息,调用接口方法persistInterface()完成数据的发布。

public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.META_DATA; // 元数据
}

@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
// 调用接口方法persistInterface()完成数据的发布
shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);
}
}
}
  • ShenyuClientRegisterRepository#persistInterface()

ShenyuClientRegisterRepository是一个接口,用于表示客户端数据注册,它的实现类目前有五种,每一种就表示一种注册方法。

  • ConsulClientRegisterRepository:通过Consul实现客户端注册;
  • EtcdClientRegisterRepository:通过Etcd实现客户端注册;
  • HttpClientRegisterRepository:通过Http实现客户端注册;
  • NacosClientRegisterRepository:通过Nacos实现客户端注册;
  • ZookeeperClientRegisterRepository:通过Zookeeper实现客户端注册;

从图中可以看出,注册中心的加载是通过SPI的方式完成的。这个在前面提到过了,在客户端通用配置文件中,通过指定配置文件中的属性完成具体的类加载。


/**
* 加载 ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {

private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();

/**
* 创建 ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// 通过SPI的方式进行加载,类型由registerType决定
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
//执行初始化操作
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}

本文的源码分析是基于Http的方式进行注册,所以我们先分析HttpClientRegisterRepository,其他的注册方式后续再分析。HttpClientRegisterRepository继承了FailbackRegistryRepository,而FailbackRegistryRepository本身主要用于对Http注册过程中的失败异常的处理,这里就省略了。

通过http的方式注册很简单,就是调用工具类发送http请求。注册元数据和URI都是调用的同一个方法doRegister(),指定接口和类型就好。

  • Constants.URI_PATH的值/shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • Constants.META_PATH的值/shenyu-client/register-uri: 服务端提供的接口用于注册URI。
@Join
public class HttpClientRegisterRepository extends FailbackRegistryRepository {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientRegisterRepository.class);

private static URIRegisterDTO uriRegisterDTO;

private String username;

private String password;

private List<String> serverList;

private String accessToken;

public HttpClientRegisterRepository() {
}

public HttpClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
init(config);
}

@Override
public void init(final ShenyuRegisterCenterConfig config) {
// admin的用户名
this.username = config.getProps().getProperty(Constants.USER_NAME);
// admin的用户名对应的密码
this.password = config.getProps().getProperty(Constants.PASS_WORD);
// admin服务列表
this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
// 设置访问的token
this.setAccessToken();
}

/**
* Persist uri.
*
* @param registerDTO the register dto
*/
@Override
public void doPersistURI(final URIRegisterDTO registerDTO) {
if (RuntimeUtils.listenByOther(registerDTO.getPort())) {
return;
}
doRegister(registerDTO, Constants.URI_PATH, Constants.URI);
uriRegisterDTO = registerDTO;
}

@Override
public void doPersistInterface(final MetaDataRegisterDTO metadata) {
doRegister(metadata, Constants.META_PATH, Constants.META_TYPE);
}

@Override
public void close() {
if (uriRegisterDTO != null) {
uriRegisterDTO.setEventType(EventType.DELETED);
doRegister(uriRegisterDTO, Constants.URI_PATH, Constants.URI);
}
}

private void setAccessToken() {
for (String server : serverList) {
try {
Optional<?> login = RegisterUtils.doLogin(username, password, server.concat(Constants.LOGIN_PATH));
login.ifPresent(v -> this.accessToken = String.valueOf(v));
} catch (Exception e) {
LOGGER.error("Login admin url :{} is fail, will retry. cause: {} ", server, e.getMessage());
}
}
}

private <T> void doRegister(final T t, final String path, final String type) {
int i = 0;
// 遍历admin服务列表(admin可能是集群)
for (String server : serverList) {
i++;
String concat = server.concat(path);
try {
// 设置访问token
if (StringUtils.isBlank(accessToken)) {
this.setAccessToken();
if (StringUtils.isBlank(accessToken)) {
throw new NullPointerException("accessToken is null");
}
}
// 调用工具类发送 http 请求
RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);
return;
} catch (Exception e) {
LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());
if (i == serverList.size()) {
throw new RuntimeException(e);
}
}
}
}
}

将数据序列化后,通过OkHttp发送数据。


public final class RegisterUtils {

//......

// 通过OkHttp发送数据
public static void doRegister(final String json, final String url, final String type) throws IOException {
if (!StringUtils.hasLength(accessToken)) {
LOGGER.error("{} client register error accessToken is null, please check the config : {} ", type, json);
return;
}
Headers headers = new Headers.Builder().add(Constants.X_ACCESS_TOKEN, accessToken).build();
String result = OkHttpTools.getInstance().post(url, json, headers);
if (Objects.equals(SUCCESS, result)) {
LOGGER.info("{} client register success: {} ", type, json);
} else {
LOGGER.error("{} client register error: {} ", type, json);
}
}
}

至此,客户端通过http的方式注册元数据的逻辑就分析完了。小结一下:通过读取自定义的注解信息构造元数据,将数据发到Disruptor队列,然后从队列中消费数据,将消费者放到线程池中去执行,最终通过发送http请求到admin

再来看看 URI 数据的处理

  • ShenyuClientURIExecutorSubscriber#executor()

主要逻辑是遍历URI数据集合,通过persistURI()方法实现数据注册。


public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.URI; //数据类型是URI
}

// 注册URI数据
@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
for (URIRegisterDTO uriRegisterDTO : dataList) {
Stopwatch stopwatch = Stopwatch.createStarted();
while (true) {
try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
break;
} catch (IOException e) {
long sleepTime = 1000;
// maybe the port is delay exposed
if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
LOG.error("host:{}, port:{} connection failed, will retry",
uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
// If the connection fails for a long time, Increase sleep time
if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
sleepTime = 10000;
}
}
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
//添加hook,优雅停止客户端
ShenyuClientShutdownHook.delayOtherHooks();

// 注册URI
shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
}
}
}

代码中的while(true)循环是为了保证客户端已经成功启动了,通过hostport可以连接上。

后面的逻辑是:添加hook函数,用于优雅停止客户端 。

通过persistURI()方法实现数据注册。整个逻辑也在前面分析过了,最终就是通过OkHttp客户端向shenyu-admin发起http,通过http的方式注册URI

分析到这里就将客户端的注册逻辑分析完了,将构建的元数据和URI数据发送到Disruptor队列,再从中消费,读取数据,通过httpadmin发送数据。

客户端元数据URI注册流程的源码分析完成了,流程图如下:

3. 服务端注册流程

3.1 注册接口ShenyuClientHttpRegistryController

从前面的分析可以知道,服务端提供了注册的两个接口:

  • /shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • /shenyu-client/register-uri: 服务端提供的接口用于注册URI。

这两个接口位于ShenyuClientHttpRegistryController中,它实现了ShenyuClientServerRegisterRepository接口,是服务端注册的实现类。它用@Join标记,表示通过SPI进行加载。

// shenuyu客户端接口
@RequestMapping("/shenyu-client")
@Join
public class ShenyuClientHttpRegistryController implements ShenyuClientServerRegisterRepository {

private ShenyuClientServerRegisterPublisher publisher;

@Override
public void init(final ShenyuClientServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {
this.publisher = publisher;
}

@Override
public void close() {
publisher.close();
}

// 注册元数据
@PostMapping("/register-metadata")
@ResponseBody
public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
publisher.publish(metaDataRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}

// 注册URI
@PostMapping("/register-uri")
@ResponseBody
public String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {
publisher.publish(uriRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}
}

两个注册接口获取到数据好,就调用了publisher.publish()方法,把数据发布到Disruptor队列中。

  • ShenyuClientServerRegisterRepository接口

ShenyuClientServerRegisterRepository接口是服务注册接口,它有五个实现类,表示有五种注册方式:

  • ConsulClientServerRegisterRepository:通过Consul实现注册;
  • EtcdClientServerRegisterRepository:通过Etcd实现注册;
  • NacosClientServerRegisterRepository:通过Nacos实现注册;
  • ShenyuClientHttpRegistryController:通过Http实现注册;
  • ZookeeperClientServerRegisterRepository:通过Zookeeper实现注册。

具体用哪一种方式,是通过配置文件指定的,然后通过SPI进行加载。

shenyu-admin中的application.yml文件中配置注册方式,registerType指定注册类型,当用http进行注册时,serverLists不需要填写,更多配置说明可以参考官网 客户端接入配置

shenyu:
register:
registerType: http
serverLists:
  • RegisterCenterConfiguration 加载配置

在引入相关依赖和属性配置后,启动shenyu-admin时,会先加载配置文件,和注册中心相关的配置文件类是RegisterCenterConfiguration

// 注册中心配置类
@Configuration
public class RegisterCenterConfiguration {
// 读取配置属性
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}

//创建ShenyuServerRegisterRepository,用于服务端注册
@Bean(destroyMethod = "close")
public ShenyuServerRegisterRepository shenyuServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
// 1.从配置属性中获取注册类型
String registerType = shenyuRegisterCenterConfig.getRegisterType();
// 2.通过注册类型,以SPI的方法加载实现类
ShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);
// 3.获取publisher,向Disruptor队列中写数据
RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();
// 4.注册Service, rpcType -> registerService
Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));
// 5.事件发布的准备工作
publisher.start(registerServiceMap);
// 6.注册的初始化操作
registerRepository.init(publisher, shenyuRegisterCenterConfig);
return registerRepository;
}
}

在配置类中生成了两个bean

  • shenyuRegisterCenterConfig:读取属性配置;

  • shenyuClientServerRegisterRepository:用于服务端注册。

在创建shenyuClientServerRegisterRepository的过程中,也进行了一系列的准备工作:

  • 1.从配置属性中获取注册类型。

  • 2.通过注册类型,以SPI的方法加载实现类:比如指定的类型是http,就会加载ShenyuClientHttpRegistryController

  • 3.获取publisher,向Disruptor队列中写数据。

  • 4.注册ServicerpcType -> registerService:获取注册的Service,每种rpc都有对应的Service。本文的客户端构建是通过springboot,属于http类型,还有其他客户端类型:dubboSpring CloudgRPC等。

  • 5.事件发布的准备工作:添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。

  • 6.注册的初始化操作:http类型的注册初始化操作就是保存publisher

  • RegisterServerDisruptorPublisher#publish()

服务端向Disruptor队列写入数据的发布者 ,通过单例模式构建。


public class RegisterClientServerDisruptorPublisher implements ShenyuClientServerRegisterPublisher {
//私有属性
private static final RegisterClientServerDisruptorPublisher INSTANCE = new RegisterClientServerDisruptorPublisher();

private DisruptorProviderManage<Collection<DataTypeParent>> providerManage;

//公开静态方法获取实例
public static RegisterServerDisruptorPublisher getInstance() {
return INSTANCE;
}

//事件发布的准备工作,添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。
public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
//服务端注册工厂
RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();
//添加URI数据订阅器
factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
//添加元数据订阅器
factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
//启动Disruptor队列
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}

// 向队列中写入数据
@Override
public void publish(final DataTypeParent data) {
DisruptorProvider<Collection<DataTypeParent>> provider = providerManage.getProvider();
provider.onData(Collections.singleton(data));
}

// 批量向队列中写入数据
@Override
public void publish(final Collection<? extends DataTypeParent> dataList) {
DisruptorProvider<Collection<DataTypeParent>> provider = providerManage.getProvider();
provider.onData(dataList.stream().map(DataTypeParent.class::cast).collect(Collectors.toList()));
}

@Override
public void close() {
providerManage.getProvider().shutdown();
}
}

配置文件的加载,可看作是注册中心服务端初始化流程,用图描述如下:

3.2 消费数据QueueConsumer

在前面分析了客户端disruptor队列消费数据的过。服务端也是一样的逻辑,只是其中执行任务的执行者变了。

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;

public interface WorkHandler<T> {
void onEvent(T event) throws Exception;
}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。

/**
*
* 队列消费者
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {

// 省略了其他逻辑

@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// 根据事件类型获取相应的线程池
ThreadPoolExecutor executor = orderly(t);
// 通过工厂创建队列消费任务
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 保存数据
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// 放在线程池中执行 消费任务
executor.execute(queueConsumerExecutor);
}
}
}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务。

  • RegisterServerConsumerExecutor#run()

RegisterServerConsumerExecutor是服务端消费者执行器,它通过QueueConsumerExecutor间接实现了Runnable接口,并重写了run()方法。


public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<Collection<DataTypeParent>> {
// ...

@Override
public void run() {
//获取从disruptor队列中拿到的数据
Collection<DataTypeParent> results = getData()
.stream()
.filter(this::isValidData)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(results)) {
return;
}
//根据类型执行操作
selectExecutor(results).executor(results);
}

// 根据类型获取订阅者
private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {
final Optional<DataTypeParent> first = list.stream().findFirst();
return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());
}
}

  • ExecutorSubscriber#executor()

执行器订阅者分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

  • MetadataExecutorSubscriber#executor()

如果是注册元数据,则通过MetadataExecutorSubscriber#executor()实现:根据类型获取注册Service,调用register()

public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {

//......

@Override
public DataType getType() {
return DataType.META_DATA; // 元数据类型
}

@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
// 遍历元数据列表
metaDataRegisterDTOList.forEach(meta -> {
Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType())) // 根据类型获取注册Service
.ifPresent(shenyuClientRegisterService -> {
// 对元数据进行注册,加锁确保顺序执行,防止并发错误
synchronized (shenyuClientRegisterService) {
shenyuClientRegisterService.register(meta);
}
});
});
}
}
  • URIRegisterExecutorSubscriber#executor()

如果是注册元数据,则通过URIRegisterExecutorSubscriber#executor()实现:构建URI数据,根据注册类型查找Service,通过registerURI方法实现注册。


public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
//......

@Override
public DataType getType() {
return DataType.URI; // URI数据类型
}

@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return;
}
// 根据rpc调用类型聚集数据
final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream()
.filter(data -> StringUtils.isNotBlank(data.getRpcType()))
.collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));
for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {
final String rpcType = entry.getKey();
// 根据类型查找Service
Optional.ofNullable(shenyuClientRegisterService.get(rpcType))
.ifPresent(service -> {
final List<URIRegisterDTO> list = entry.getValue();
// 构建URI数据类型,通过registerURI方法实现注册
Map<String, List<URIRegisterDTO>> listMap = buildData(list);
listMap.forEach(service::registerURI);
});
}
}
}

  • ShenyuClientRegisterService#register()

ShenyuClientRegisterService是注册方法接口,它有多个实现类:

  • AbstractContextPathRegisterService:抽象类,处理部分公共逻辑;
  • AbstractShenyuClientRegisterServiceImpl::抽象类,处理部分公共逻辑;
  • ShenyuClientRegisterDivideServiceImpldivide类,处理http注册类型;
  • ShenyuClientRegisterDubboServiceImpldubbo类,处理dubbo注册类型;
  • ShenyuClientRegisterGrpcServiceImplgRPC类,处理gRPC注册类型;
  • ShenyuClientRegisterMotanServiceImplMotan类,处理Motan注册类型;
  • ShenyuClientRegisterSofaServiceImplSofa类,处理Sofa注册类型;
  • ShenyuClientRegisterSpringCloudServiceImplSpringCloud类,处理SpringCloud注册类型;
  • ShenyuClientRegisterTarsServiceImplTars类,处理Tars注册类型;
  • ShenyuClientRegisterWebSocketServiceImplWebsocket类,处理Websocket注册类型;

从上面可以看出每种微服务都有对应的注册实现类,本文的源码分析是 以官方提供的 shenyu-examples-http 为例,是属http注册类型,所以元数据和URI数据的注册实现类是 ShenyuClientRegisterDivideServiceImpl

  • register(): 注册元数据
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {

//......

public String register(final MetaDataRegisterDTO dto) {
// 1.注册选择器信息
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
// 2.注册规则信息
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
// 3.注册元数据信息
registerMetadata(dto);
// 4.注册contextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
}

整个注册逻辑可以分为4个步骤:

  • 1.注册选择器信息
  • 2.注册规则信息
  • 3.注册元数据信息
  • 4.注册contextPath

admin这一侧通过客户端的元数据信息需要构建选择器、规则、元数据和ContextPath。具体的注册过程和细节处理跟rpc类型有关。我们就不再继续向下追踪了,对于注册中心的逻辑分析,跟踪到这里就够了。

服务端元数据注册流程的源码分析完了,流程图描述如下:

  • registerURI(): 注册URI数据
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {

//......

public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
// 对应的选择器是否存在
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
throw new ShenyuException("doRegister Failed to execute,wait to retry.");
}
List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
// 处理选择器中的handler信息
String handler = buildHandle(validUriList, selectorDO);
if (handler != null) {
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// 更新数据库中的记录
selectorService.updateSelective(selectorDO);
// 发布事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
}
return ShenyuResultMessage.SUCCESS;
}
}

admin拿到URI数据后,主要是更新选择器中的handler信息,然后写入到数据库,最后发布事件通知网关。通知网关的逻辑是由数据同步操作完成,这在之前的文章中已经分析过了,就不再赘述。

服务端URI注册流程的源码分析完成了,用图描述如下:

至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor队列,再从中消费数据,根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler

4. 总结

本文主要对Apache ShenYu网关中的http注册模块进行了源码分析。涉及到的主要知识点,归纳如下:

  • 注册中心是为了将客户端信息注册到admin,方便流量筛选;
  • http注册是将客户端元数据信息和URI信息注册到admin
  • http服务的接入通过注解@ShenyuSpringMvcClient标识;
  • 注册信息的构建主要通过Spring应用监听器ApplicationListener
  • 注册类型的加载通过SPI完成;
  • 引入Disruptor队列是为了数据与操作解耦,以及数据缓冲。
  • 注册中心的实现采用了面向接口编程,使用模板方法、单例、观察者等设计模式。

· One min read

​ 网关应用需要支持多种负载均衡的方案,包括随机选择、Hash、轮询等方式。Apache Shenyu网关中不仅实现了传统网关的这些均衡策略,还通过流量预热(warmup)等细节处理,对服务器节点的加入,做了更平滑的流量处理,获得了更好的整体稳定性。让我们来看看Shenyu是是如何设计和实现这部分功能的。

本文基于shenyu-2.5.0版本进行源码分析.

[TOC]

LoadBalancer SPI

LoadBalancer SPI 定义在shenyu-loadbalancer模组中,以下是这个核心接口的代码,这个接口很好的诠释了这样一个理念:负载均衡是在一系列服务器节点中选出最合适的节点,也就是选择策略。做流量转发、路由和负载均衡是LoadBalance SPI的基本功能

@SPI
public interface LoadBalancer {

/**
* this is select one for upstream list.
*
* @param upstreamList upstream list
* @param ip ip
* @return upstream
*/
Upstream select(List<Upstream> upstreamList, String ip);
}

接口中,upstreamList是可选路由的一组服务器节点,Upstream 是服务器节点的数据结构,它包括的重要元素有:协议、url 、权重、时间戳,warmup,健康状态等。

public class Upstream {
/**
* protocol.
*/
private final String protocol;

/**
* url.
*/
private String url;

/**
* weight.
*/
private final int weight;

/**
* false close, true open.
*/
private boolean status;

/**
* startup time.
*/
private final long timestamp;

/**
* warmup.
*/
private final int warmup;

/**
* healthy.
*/
private boolean healthy;

/**
* lastHealthTimestamp.
*/
private long lastHealthTimestamp;

/**
* lastUnhealthyTimestamp.
*/
private long lastUnhealthyTimestamp;

/**
* group.
*/
private String group;

/**
* version.
*/
private String version;
}

Design of LoadBalance module`

图1是LoadBalancer模组的类图:

loadbalancer-class-diagram

从类图上可以看出LoadBalance的设计概要:

  1. 抽象类AbstractLoadBalancer继承自LoadBalancer SPI接口,并提供选择的模板方法,及权重计算。

  2. 三个实做类继承AbstractLoadBalancer, 实现各自的逻辑处理。

    • RandomLoadBalancer -加权随机选择 Weight Random
    • HashLoadBalancer - 一致性Hash
    • RoundRobinLoadBalancer -加权轮询(Weight Round Robin per-packet)
  3. 由工厂类LoadBalancerFactory 实现对外的静态调用方法。

    另外根据Apache Sheny SPI规范,在SHENYU_DIERECTORY中的添加profile,配置LoadBalance的实现类,配置key=class形式,左边的operator要和LoadBalanceEnum中的定义一致。

random=org.apache.shenyu.loadbalancer.spi.RandomLoadBalancer
roundRobin=org.apache.shenyu.loadbalancer.spi.RoundRobinLoadBalancer
hash=org.apache.shenyu.loadbalancer.spi.HashLoadBalancer

LoadBalanceEnum的定义如下:

public enum LoadBalanceEnum {
/**
* Hash load balance enum.
*/
HASH(1, "hash", true),

/**
* Random load balance enum.
*/
RANDOM(2, "random", true),

/**
* Round robin load balance enum.
*/
ROUND_ROBIN(3, "roundRobin", true);

private final int code;
private final String name;
private final boolean support;
}

AbstractLoadBalancer

这个抽象类实做了LoadBalancer接口, 定义了抽象方法doSelect()留给实作类处理,在模板方法select() 中先进行校验,之后调用由实作类实现的doSelect()方法。

public abstract class AbstractLoadBalancer implements LoadBalancer {
/**
* Do select divide upstream.
*
* @param upstreamList the upstream list
* @param ip the ip
* @return the divide upstream
*/
protected abstract Upstream doSelect(List<Upstream> upstreamList, String ip);

@Override
public Upstream select(final List<Upstream> upstreamList, final String ip) {
if (CollectionUtils.isEmpty(upstreamList)) {
return null;
}
if (upstreamList.size() == 1) {
return upstreamList.get(0);
}
return doSelect(upstreamList, ip);
}
}

权重的处理方法getWeight()的逻辑是:当有时间戳,并且当前时间与时间戳间隔在流量预热warmup时间内,权重计算的公式为: $$ 0 ww = min(1,uptime/(warmup/weight)) $$ 从公式可以看出,最终的权值,与设置的weight成正比,时间间隔越接近warmup时间,权重就越大。也就是说等待的时间越长,被分派的权重越高。没有时间戳时等其他情况下,返回Upstream设置的weight值。

考虑流量预热(warmup)的核心思想是避免在添加新服务器和启动新JVM时网关性能不佳。

下面我们看一下三个实做类的实现。

RandomLoadBalancer

这里随机LoadBalancer 可以处理两种情况:

  1. 没有权重:所有服务器都没有设定权重,或者权重都一样, 会随机选择一个。
  2. 有权重:服务器设定有不同的权重,会根据权重,进行随机选择。

下面是有权重时的随机选择代码random(): 遍历全部服务器列表,当随机值小于某个服务器权重时,这个服务器被选中(这里提前计算了前一半服务器的权重和,如果随机值大于halfLengthTotalWeight,则遍历从(weights.length + 1) / 2开始,提高了小效率)。 若遍历后没有满足条件,就在全部服务器列表中随机选择一个返回。这里getWeight(final Upstream upstream) 方法是在AbstractLoadBalancer 中定义的,按公式计算权重。

@Override
public Upstream doSelect(final List<Upstream> upstreamList, final String ip) {
int length = upstreamList.size();
// every upstream has the same weight?
boolean sameWeight = true;
// the weight of every upstream
int[] weights = new int[length];
int firstUpstreamWeight = getWeight(upstreamList.get(0));
weights[0] = firstUpstreamWeight;
// init the totalWeight
int totalWeight = firstUpstreamWeight;
int halfLengthTotalWeight = 0;
for (int i = 1; i < length; i++) {
int currentUpstreamWeight = getWeight(upstreamList.get(i));
if (i <= (length + 1) / 2) {
halfLengthTotalWeight = totalWeight;
}
weights[i] = currentUpstreamWeight;
totalWeight += currentUpstreamWeight;
if (sameWeight && currentUpstreamWeight != firstUpstreamWeight) {
// Calculate whether the weight of ownership is the same.
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
return random(totalWeight, halfLengthTotalWeight, weights, upstreamList);
}
return random(upstreamList);
}

private Upstream random(final int totalWeight, final int halfLengthTotalWeight, final int[] weights, final List<Upstream> upstreamList) {
// If the weights are not the same and the weights are greater than 0, then random by the total number of weights.
int offset = RANDOM.nextInt(totalWeight);
int index = 0;
int end = weights.length;
if (offset >= halfLengthTotalWeight) {
index = (weights.length + 1) / 2;
offset -= halfLengthTotalWeight;
} else {
end = (weights.length + 1) / 2;
}
// Determine which segment the random value falls on
for (; index < end; index++) {
offset -= weights[index];
if (offset < 0) {
return upstreamList.get(index);
}
}
return random(upstreamList);
}

因此,当采用RandomLoadBalancer时,是按权重随机分派服务器的。

HashLoadBalancer

Apache ShenyuHashLoadBalancer 中采用了一致性hash算法,使用有序hash环,将key与服务器节点的hash映射缓存起来。对于请求的ip地址,计算出其hash值, 在hash环上顺时针查找距离这个key的hash值最近的节点,将其作为要路由的节点。一致性hash解决了传统取余hash算法的可伸缩性差的问题。

HashLoadBalancer中的采用的是加密的单向MD5散列函数,这个hash函数会hash后产生不可预期但确定性的()的结果,输出为32-bit的长整数。hash代码如下:

private static long hash(final String key) {
// md5 byte
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new ShenyuException("MD5 not supported", e);
}
md5.reset();
byte[] keyBytes;
keyBytes = key.getBytes(StandardCharsets.UTF_8);
md5.update(keyBytes);
byte[] digest = md5.digest();
// hash code, Truncate to 32-bits
long hashCode = (long) (digest[3] & 0xFF) << 24
| ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8)
| (digest[0] & 0xFF);
return hashCode & 0xffffffffL;
}

再看一下HashLoadBalancer的选择函数doSelect()的实现:

    private static final int VIRTUAL_NODE_NUM = 5;

@Override
public Upstream doSelect(final List<Upstream> upstreamList, final String ip) {
final ConcurrentSkipListMap<Long, Upstream> treeMap = new ConcurrentSkipListMap<>();
upstreamList.forEach(upstream -> IntStream.range(0, VIRTUAL_NODE_NUM).forEach(i -> {
long addressHash = hash("SHENYU-" + upstream.getUrl() + "-HASH-" + i);
treeMap.put(addressHash, upstream);
}));
long hash = hash(ip);
SortedMap<Long, Upstream> lastRing = treeMap.tailMap(hash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return treeMap.firstEntry().getValue();
}

这个方法中,生成带虚拟服务器节点的hash环, 一个实际节点会生成5个虚拟节点,因此整个hash环的均匀性大大增加,降低数据倾斜的发生。

为了实现hash环的有序性及顺时针查找功能,代码中使用Java 的ConcurrentSkipListMap 来存储带虚拟节点的服务器节点及其hash值, 它既能保证线程安全,又能保证数据的有序性,支持高并发。 另外,ConcurrentSkipListMap提供了一个tailMap(K fromKey)方法,可从map中查找比fromKey大的值的集合,但并不需要遍历整个数据结构。

上述代码中,生成hash环之后,就是调用ConcurrentSkipListMaptailMap()方法,找到大于等于请求的ip的hash值的子集,这个子集的第一个就是要路由的服务器节点。采用了合适的数据结构,这里的代码看上去是不是特别的简洁流畅?

RoundRobinLoadBalancer

Round-robin轮询方法的原始定义是顺序循环将请求依次循环地连接到每个服务器。当某个服务器发生故障(例如:一分钟连接不上的服务器),从候选队列中取出,不参与下一次的轮询,直到其恢复正常。在 RoundRobinLoadBalancer中实现的是组内加权轮询(Weight Round Robin per-packet)方法:

为了计算和存储每个服务器节点的轮询次数,在这个类中定义了一个静态内部类WeigthRoundRobin,我们先看一下它的主要代码(去掉了注释):

protected static class WeightedRoundRobin {

private int weight;

private final AtomicLong current = new AtomicLong(0);

private long lastUpdate;

void setWeight(final int weight) {
this.weight = weight;
current.set(0);
}
long increaseCurrent() {
return current.addAndGet(weight);
}

void sel(final int total) {
current.addAndGet(-1 * total);
}
void setLastUpdate(final long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}

请重点关注这几个方法:

  • setWeight(final int weight) ,为对象设定权重,并将current重置为0.

  • increaseCurrent() : 对AtomicLong类型的对象current,累加其权重值。

  • sel(final int total): current减去传入的 total值。

下面我们看一下带权重的轮询过程是如何实现的。 首先定义了一个ConcurrentMap类型对象methodWeightMap 两层对象来存储服务器列表与其各个明细节点的轮询资料。

private final ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<>(16);

这个map对象第一层的key为当前服务器列表的第一个节点的upstreamUrl, 第二个对象ConcurrentMap<String, WeightedRoundRobin>存储了组内各个服务器节点的轮询情况,内层Map的key为组内每个服务器的upstreamUrlMap对象使用JUCConcurrentHashMap,不仅存取高效,而且线程安全,支持高并发。

内层map的每个节点对应的WeighedRoundRobin作为静态内部类能确保线程安全,并实现组内的加权轮询选择功能。下面是这个类的doSelect()方法的代码。

@Override
public Upstream doSelect(final List<Upstream> upstreamList, final String ip) {
String key = upstreamList.get(0).getUrl();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (Objects.isNull(map)) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Upstream selectedInvoker = null;
WeightedRoundRobin selectedWeightedRoundRobin = null;
for (Upstream upstream : upstreamList) {
String rKey = upstream.getUrl();
WeightedRoundRobin weightedRoundRobin = map.get(rKey);
int weight = getWeight(upstream);
if (Objects.isNull(weightedRoundRobin)) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(rKey, weightedRoundRobin);
}
if (weight != weightedRoundRobin.getWeight()) {
// weight changed.
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = upstream;
selectedWeightedRoundRobin = weightedRoundRobin;
}
totalWeight += weight;
}
...... //erase the section which handles the time-out upstreams.
if (selectedInvoker != null) {
selectedWeightedRoundRobin.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return upstreamList.get(0);
}

举例,若服务器组upstreamUrl 分别为: LIST = [upstream-20, upstream-50, upstream-30]时,经过一轮执行后,建立的methodWeightMap 资料如下:

methodWeightMap

假设上述的LIST中,各个服务器节点的权重数组为: [20,50,30], 下图是内部类current 值变化和轮询选择过程:

weighted-roundrobin-demo

每一轮,选择值current最大的服务器节点:

  • Round1:
    • 对当前服务器LIST做遍历,当服务器节点的weightedRoundRobin 为null时,current被置为各自的权重; 不为null时,累加各自的权重。
    • 即:遍历后current 分别为 [20, 50,30] , 会选择Stream-50, Stream-50对应的WeightRoundRobin静态类做 sel(-total)处理,current 更新为[20,-50, 30].
  • Round 2 遍历后的current是[40,0,60], 会选择Stream-30, current分别更新为[40,0,-40].
  • Round 3 遍历后的current是[60,50,-10], 会选择Stream-20,current分别更新为[-40,50,-10].

中间进行了容错处理, 当服务器的个数与map个数不一样,就对methodWeightMap 加锁做处理。 用先copy 后modify的方式, 把超时的服务器remove掉,即移除掉发生故障的服务器,并更新Map资料。如下是异常时的处理代码:

    if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference.
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
if (Objects.nonNull(selectedInvoker)) {
selectedWeightedRoundRobin.sel(totalWeight);
return selectedInvoker;
}
// should not happen here.
return upstreamList.get(0);

LoadBalancerFactory

在这个工厂类中,提供了调用LoadBalancer的静态方法, 其中ExtensionLoaderApache ShenyuSPI执行入口。也就是说,LoadBalancer模组是可配置、可扩展的。这个静态方法中的algorithm变量是LoadBalanceEnum中定义name枚举类型。

/**
* Selector upstream.
*
* @param upstreamList the upstream list
* @param algorithm the loadBalance algorithm
* @param ip the ip
* @return the upstream
*/
public static Upstream selector(final List<Upstream> upstreamList, final String algorithm, final String ip) {
LoadBalancer loadBalance = ExtensionLoader.getExtensionLoader(LoadBalancer.class).getJoin(algorithm);
return loadBalance.select(upstreamList, ip);
}

Using of LoadBalancer module

上面说明了LoadBalancer SPI接口及三个实作类。下面看一下LoadBalancerApache Shenyu中是如何被调用的。DividePlugin是路由选择插件,所有的Http请求都由该插件进行负载均衡处理。当请求头rpcType = http, 且开启该插件时,它将根据请求参数匹配规则,最终交由下游插件进行响应式代理调用。

DividePlugindoExecute方法中,先对要转发的请求的Header大小、content长度等做校验,

@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
......
}

接口方法的第二个参数是ShenyuPluginChain 类型,代表plugin的调用链,具体可参见Apache Sheyuplugin的调用机制。第三个SelectorData类型的参数是选择器, 第四个是RuldData类型,代表规则。分别请查看对应的代码。

下面给出了doExecute()方法中,有关LoadBalancer调用的代码片段:

   //取到要路由的服务器节点列表。
List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
...
//取到请求的ip
String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();

//调用Util方法,执行LoadBalancer处理
Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip);

这里UpstreamCacheManager 是缓存的要路由的服务器节点 , ruleHandle.getLoadBalance()取到的是LoadBalanceEnum定义的枚举name, 如random, hash, roundRobin等.

经过封装,调用负载均衡功能非常的方便。 未来增加新的LoadBalancer类,这些调用的Plugin代码完全不需要变更。

Summary

经过上面的代码解读,从设计角度总结LoadBalancer 模组具有如下的特点:

  1. 可扩展性:面向接口的设计,及基于Apache Shenyu SPI的实现,使得系统具有良好的可扩展性。可以方便的扩展为其他的动态的负载均衡算法,如最少连接方式(least connection)、最快模式( fastest)。并支持集群处理,具有良好的可扩展性。

  2. 可伸缩性:采用的一致性hash、权重随机和权重轮询算法,都可以无缝支持集群扩容或缩容。

  3. 流量预热等更细致的设计,能带来整体上更为平滑的负载均衡。

· One min read

Apache Shenyu 网关的各个Plugin(包括Dubbo, gRPC,Spring-cloud等) 中,routing参数均设计为可以接受多个条件的组合。 为了实现这样的目的,遵循其SPI的机制进行将参数及行为抽象为如下三部分,这些SPIshenyu-plugin-base模组中实现

  • ParameterData-参数资料
  • PredictJudge-断言
  • MatchStrategy-匹配策略

相对而言,匹配策略是需要扩展点最少的部分。想象一下,对多个条件的组合判断,最常见的几种规则是:全部都满足、至少满足一个条件、至少满足第一个,或者大部分满足等等。 并且要做到对各种plugin的不同类型的参数,如IP, header, uri等。针对这些需求,如何将MatchStrategy设计得简单易用且容易扩展?

MatchStrategy

MatchStrategy的实现代码在shenyu-plugin-base模组中,基于Apache ShenyuSPI创建机制, 设计上结合了工厂模式和策略模式,整体MatchStrategy的设计类图如下下:

MatchStrategy-class-diagram

以接口MatchStrategy为基础,设计实现类,并由抽象类AbstractMatchStrategy实现公共方法,由工厂类MatchStrategyFactory提供创建和外部调用功能。

MatchStrategy Interface

首先来看MatchStrategy SPI接口的定义:

@SPI
public interface MatchStrategy {

Boolean match(List<ConditionData> conditionDataList, ServerWebExchange exchange);
}

@SPI annotation代表这是一个SPI接口。ServerWebExchangeorg.springframework.web.server.ServerWebExchange ,代表HTTPrequest-response 的交互内容。ConditionData的代码如下,更多说明可以参考PredicateJudge代码分析中的说明,

public class ConditionData {

private String paramType;
private String operator;

private String paramName;
private String paramValue;
}

AbstractMatchStrategy

在抽象类AbstractMatchStrategy中,定义MatchStrategy的公共方法, 用buildRealData方法中,用ParameterData工厂类ParameterDataFactory,将多种参数如 Ip, Cookie, Header,uri等资料都以统一的接口方法来呈现。这些参数格式及规则的修改,不会影响到对参数规则匹配MatchStrategy的调用。

public abstract class AbstractMatchStrategy {

public String buildRealData(final ConditionData condition, final ServerWebExchange exchange) {
return ParameterDataFactory.builderData(condition.getParamType(), condition.getParamName(), exchange);
}
}

实现类及Profile

基于上述接口定义, shenyu-plugin-base 模组提供了两个MatchStrategy实现类

  • AndMatchStrategy-多个条件 AND

  • OrMatchStrategy- 多个条件 OR

    并在SHENYU_DIRECTORY目录下的配置文件中,对实作类做了配置。在系统启动时会由顶层SPIkey-value形式加载并cache起来。

and=org.apache.shenyu.plugin.base.condition.strategy.AndMatchStrategy
or=org.apache.shenyu.plugin.base.condition.strategy.OrMatchStrategy

两个实现类AndMatchStrategy 继承AbstractMatchStrategy 并实做了MatchStrategy

AndMatchStrategy- “与”的关系

由于PredicateJudge封装了条件判断的多样性,ConditionDataParameData封装了多种参数。那么对于多个条件的匹配来说,采用Stream流处理及lamda表达式,非常简洁高效达成了:全部条件都满足,即"AND"的逻辑。

@Join
public class AndMatchStrategy extends AbstractMatchStrategy implements MatchStrategy {

@Override
public Boolean match(final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {
return conditionDataList
.stream()
.allMatch(condition -> PredicateJudgeFactory.judge(condition, buildRealData(condition, exchange)));
}
}

OrMatchStrategy是同样的实现方式,实现: 至少满足一个条件"OR"的规则,在此不做赘述。

MatchStrategyFactory

这是MatchStrategy的工厂类,实现了两个方法,一个是newInstance()方法根据策略代码和名称,返回由SPI ExtensionLoader按key来加载对应的MatchStrategy实现类。

    public static MatchStrategy newInstance(final Integer strategy) {
String matchMode = MatchModeEnum.getMatchModeByCode(strategy);
return ExtensionLoader.getExtensionLoader(MatchStrategy.class).getJoin(matchMode);
}

MatchModeEnum 中定义了match策略的code和name。 调用时由策略名称,如"and","or",根据启动时SPI加载的key-value资料,找到对应的实现类:

AND(0, "and"),  
OR(1, "or");

另一个是match()方法,调用实作类的match方法。

    public static boolean match(final Integer strategy, final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {
return newInstance(strategy).match(conditionDataList, exchange);
}

调用方式

shenyu-plugin模组的各个plugin的基类AbstractShenyuPlugin 中,定义了两个选择的方法:filterSelectorfilterRule 它们都调用了MatchStrategyFactory 方法,下面是AbstractShenyuPluginfilterSelector方法的代码:

    private Boolean filterSelector(final SelectorData selector, final ServerWebExchange exchange) {        if (selector.getType() == SelectorTypeEnum.CUSTOM_FLOW.getCode()) {            if (CollectionUtils.isEmpty(selector.getConditionList())) {                return false;            }            return MatchStrategyFactory.match(selector.getMatchMode(), selector.getConditionList(), exchange);        }        return true;    }

这段代码中,先检测参数匹配条件SelectorData是否为空,之后调用MatchStrategyFactorymatch方法,工厂方法将调用对应的实作类的match方法。同理,如下是AbstractShenyuPluginfilterRule 方法

    private Boolean filterRule(final RuleData ruleData, final ServerWebExchange exchange) {        return ruleData.getEnabled() && MatchStrategyFactory.match(ruleData.getMatchMode(), ruleData.getConditionDataList(), exchange);    }

也同样是调用MatchStrategyFactorymatch方法,看上去是不是特别的简洁甚至是简单? 在PredicteJudge代码分析文中,对shenyu-plugin如何做参数调用方面做了更进一步的描述。

Summary

由于应用了Apache shenyuSPI框架,使得整体上具有松耦合、易于扩展的特点。在多个参数规则策略方面,MatchStrategy提供了良好的设计,虽然目前只提供了两个AND 和OR的实现类,但未来可以很轻松地扩展为更多MatchStrategy规则,例如 firstOf:即必须满足第一个条件,或mostOf-满足大部分条件等更多复杂策略,而其他调用部分的代码完全不受影响。

有兴趣的读者可以去阅读Shenyu plugin的源码了解更多内容。

· One min read

灵活的插件和规则定义,是Shenyu网关的一大特色。它以插件形式支持多种网络协议和多种流行的微服务框架,如Dubbo, gRPC和 Spring-Cloud 等。 为了实现对各种协议及插件的配置规则的解析,网关在规则策略解析方面,采用了优雅的SPI(Service Provider Interface)实现,当添加新的插件时,规则解析部分可以沿用现有实现或采用SPI机制快速实现,具有良好的可扩展性。

SPI 的顶层设计

Shenyu的SPI采用接口+ 工厂模式+配置文件的方式,来实现模组的动态加载。在其shen-SPI-模组,做了SPI的顶层设计。定义了@ Join ,@SPI 两个annotation。 其中@Join 代表此类会加入扩展机制,相当于是做申请注册。 @SPI 标明当前类为SPI功能扩展类。

Fig 1 classes in the shenyu-spi

toplevel-SPI

配置文件方面,定义SPI加载的目录为 META-INF/shenyu/

SHENYU_DIRECTORY = "META-INF/shenyu/";

系统启动时,会扫描 SHENYU_DIRECTORY 下的配置文件,并由 ExtensionLoader 类来加载所配置的SPI扩展类,并cache到内存中。 配置文件内容为 key=class的形式。 在系统执行期间, 由ExtensionFactory的实现类,返回key所对应的SPI实现类。

shenyu-plugin的SPI 实现

shenyu-plugin模组中,按照插件机制,实现了各种请求转发功能,包括支持request, redirect, response, rewrite等http协议功能,及 gRPC, dubbo, hystrix等微服务框架, 并且插件功能还在不断增加中。如果在各自的功能插件实做类中,还要做对routing 参数的解析等处理,不仅会造成程序的冗余,而且当要支持各自匹配规则,如通配符、正则表达式、SpEL解析等,会造成频繁对插件核心代码的修改。因此,在shenyu-plugin模组中,将routing参数解析做了更高一层的抽象,并按照SPI机制做了规则解析的实现。解析由三个部分组成:

  • ParameterData-参数资料,

  • PredictJudge-断言

  • MatchStrategy-匹配策略三个SPI实现。

    这些扩展类定义在 shenyu-plugin-base module中,经过这样抽象后,每个插件实现中,routing 参数解析的功能全部由AbstractShenyuPlugin 来调用上述三个SPI工厂类来定义和实现。做到了功能的专一,并易于扩展,符合SOLID原则。

本节就其中的PredictJudge-断言做详细解析。可以看到这个module中的pom文件中,添加了对***shenyu-SPI***的依赖

<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-spi</artifactId>
<version>${project.version}</version>
</dependency>

PredicateJudge SPI 设计

PredicateJudge SPI 实现用来解析判断各类规则,当网关中配置的。这个类命名和功能都类似于java 的Predicate ,但对接受行为做了更进一步的抽象。这个SPI通过一个工厂和策略模式实现,首先来看PredicateJudge SPI接口的定义:

@SPI
@FunctionalInterface
public interface PredicateJudge {

/**
* judge conditionData and realData is match.
*
* @param conditionData {@linkplain ConditionData}
* @param realData realData
* @return true is pass false is not pass.
*/
Boolean judge(ConditionData conditionData, String realData);
}

这部分的类图如下:

Fig 2-Predicate class diagram

predicate-class-diagram

PredicateJudgeFactory的重要方法如下:

    public static PredicateJudge newInstance(final String operator) {
return ExtensionLoader.getExtensionLoader(PredicateJudge.class).getJoin(processSpecialOperator(operator));
}
    public static Boolean judge(final ConditionData conditionData, final String realData) {
if (Objects.isNull(conditionData) || StringUtils.isBlank(realData)) {
return false;
}
return newInstance(conditionData.getOperator()).judge(conditionData, realData);
}

这里ConditionData定义如下包含属性四个String类型的属性: paramType, operator,paramName,paramValue

ParamTypeEnum

参数 paramType必须为系统中枚举类型 ParamTypeEnum,默认支持的paramType有:

post, uri,query, host, ip,header, cookie,req_method

OperatorEnum

operator 必须为枚举类型 OperatorEnum ,目前支持的操作符有:(注意,严格区分大小写)

   match, =,regex, >,<, contains, SpEL,  Groovy, TimeBefore,TimeAfter

基于以上的规则, plugin 模组实现了如下8个 PredicateJudge 实现类,分别实现上述operator的逻辑匹配规则.

Implementation classRule denotes 规则说明corespondece operator
ContainsPredicateJudge包含关系 "contains", 实际结果,需要包含所定规则的值contains
EqualsPredicateJudge相等"=",=
MatchPredicateJudge用于URI 路径匹配的处理match
TimerAfterPredicateJudge当前local时间是否晚于设定的时间TimeAfter
TimerBeforePredicateJudge当前local时间是否早于设定的时间TimeBefore
GroovyPredicateJudgeGroovy,设定ParamName的值,与设定ParamValue相同Groovy
RegexPredicateJudge正则表达式匹配资料regex

调用方法

当要做一组参数的解析时,只需要调用PredicateJudgeFactory的judge方法即可:

PredicateJudgeFactory.judge(final ConditionData conditionData, final String realData);

SPI配置文件

这些PredicateJudge实现类在 SHENYU_DIRECTORY 中的config文件中做了配置,在启动时会加加载并cache到内存中。

PredicateJudge文件的内容如下,为key=class形式,左边的operator要和ParamEnum中的定义一致。

equals=org.apache.shenyu.plugin.base.condition.judge.EqualsPredicateJudge

contains=org.apache.shenyu.plugin.base.condition.judge.ContainsPredicateJudge
Groovy=org.apache.shenyu.plugin.base.condition.judge.GroovyPredicateJudge
match=org.apache.shenyu.plugin.base.condition.judge.MatchPredicateJudge
regex=org.apache.shenyu.plugin.base.condition.judge.RegexPredicateJudge
SpEL=org.apache.shenyu.plugin.base.condition.judge.SpELPredicateJudge
TimeAfter=org.apache.shenyu.plugin.base.condition.judge.TimerAfterPredicateJudge
TimeBefore=org.apache.shenyu.plugin.base.condition.judge.TimerBeforePredicateJudge

PredicateJudge SPI在网关Plugin中的使用

网关系统中,大部分的Plugin 都继承自AbstractShenyuPlugin,这个抽象类中,在做选择和规则解析时,调用了上述SPI中的MatchStrategy,继而在策略判断时调用PredicateJudge 的各个断言类来处理。

Plugin与SPI 的类图如下:

Fig 3- class diagram of plugins with PredicateJudge and MatchStrategy SPI

plugin-SPI-class-diagram

从客户端发来的请求,在系统中调用规则部分的SPI的流程如下:

Fig 4- flow chart for Shenyu gateway filter with parameter processing

SPI-flow-diagram

  • 系统启动时,会加载目录下配置的SPI资料到内存中
  • 当client有新的请求发到Apache shenyu 网关系统时,在网关内部,会调用对应的plugin
  • 对实际请求资料做规则匹配时,会根据所包含的operator,调用的对应的PredicateJudge实现类

其他

PredicateJudge 判断结果举例

ContainsPredicateJudge- " contains“ rule

举例:给定一组参数(ConditionData ), paramType="uri", paramValue 是 "/http/**"

当应用 ContainsPredicateJudge包含关系时,判断结果如下表:

ConditionData (operator="contains")real datajudge result
paramType="uri", "/http/**""/http/**/test"true
"/test/http/**/other"true
"/http1/**"false

其他的几个PredicateJudge的具体功能可参考其代码和测试类.

· One min read

限流是网关必备的功能,用来应对高并发请求的场景。当系统受到异常攻击,短期内聚集了大量的流量;当有大量低级别的请求,处理这些请求会影响关键业务的处理,需要限制这些请求的访问速度; 或者系统内部出现一些异常,不能满负荷的服务整个应用请求等等。这些情况下,都需要启用限流来保护系统。可以拒绝服务、等待或降级处理,将流量限制到系统可接受的量,或者只允许某些域名(或某些业务)的请求优先处理。

针对以上的场景需求,在设计一个API网关的限流功能时,就需要考虑如下的扩展点:

  1. 可以支持多种限流的算法,并易于扩展。
  2. 要可以支持多种限流的方式,能区分用户群、高低优先级的请求。
  3. 要支持高并发,能快速的做出限制或通过的决策。
  4. 要有容错处理,如果限流程序出错,网关系统能继续执行。

本文会先介绍shenyu网关限流部分的总体技术架构,之后重点分析RateLimiter SPI扩展实现的代码。

This article based on shenyu-2.4.0 version of the source code analysis.

RateLimiter 总体设计说明

​ WebFlux是Spring 提供的基于Reactor模型的异步非阻塞框架,能提升吞吐量,使系统有更好的可伸缩性。Apache Shenyu网关的插件功能基于WebFlux框架实现的。RateLimiter功能是在ratelimiter-plugin中实现。在限流过程中,常用的算法有令牌桶、漏桶等算法,这些算法执行中,需要检核请求的来源,对已使用的流量做计数及逻辑计算,判定是否允许通过。为了提高并发及性能, 将计数和算法逻辑处理,都放到redis中。Java代码负责做数据参数的传递。在调用redis时,lua脚本可以常驻在redis内存中,能减少网络开销,并可以作为一个整体执行,具有原子性。Spring Data Redis 提供了对redis命令执行的抽象,执行序列化,及自动使用redis 脚本缓存。在这个plugin中,由于采用了reactor 非阻塞框架,所以采用Spring Redis Reactive类库实现对redis的功能调用。

​ 这个plugin中的类包图如下,重点标出了与RateLimiter SPI相关的两个package: resolver 和algorithm.

ratelimiter-package-diagram

RateLimiter SPI的设计

由于采用了Spring data+ Redis +lua架构实现了高并发的需求。 如何做到对算法和限流方式的扩展呢? Shenyu ratelimiter plugin中设计了两个SPI来实现这两个需求:

  • RateLimiterAlgorithm:用来扩展不同的限流算法。
  • RateLimiterKeyResolver: 用于扩展获取请求的关键信息,用于区分流量,例如按IP 地址、按某一段域名等来区分访问的请求。

SPI的具体实作类与配置信息位于:SHENYU_DIRECTORY目录下 (默认在/META-INF/shenyu)下。

RateLimiterKeyResolver

获取请求的关键信息,用于分组限流,例如按URL/ 用户 / IP 等, RateLimiterKeyResolver 接口定义如下:

@SPI
public interface RateLimiterKeyResolver {

/**
* get Key resolver's name.
*
* @return Key resolver's name
*/
String getKeyResolverName();

/**
* resolve.
*
* @param exchange exchange the current server exchange {@linkplain ServerWebExchange}
* @return rate limiter key
*/
String resolve(ServerWebExchange exchange);
}

@SPI将当前interface 注册为Shenyu SPI 接口。resolve(ServerWebExchange exchange)方法用来提供解析方式。

RateLimiterKeyResolver SPI 提供了两种key resolver, WholeKeyResolveRemoteAddrKeyResolver,其中RemoteAddrKeyResolver中的resolve方法代码如下:

    @Override
public String resolve(final ServerWebExchange exchange) {
return Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
}

其key值为请求的IP地址。 基于SPI及工厂类的实现,可以非常方便的扩展实现新的key resolver,如URL,用户等等。

RateLimiterAlgorithm SPI

RateLimiterAlgorithm SPI 用来实现对不同限流算法的识别、加载和定义,其类图如下:

ratelimiteral-class-diagram

本模组使用了工厂模式,提供了接口类、抽象类和工厂类,提供了4个实现类,其中实现类对应的Lua脚本在 RateLimitEnum 中做了定义,放置在 /META-INF/scripts 目录下。接口RateLimiterAlgorithm的代码如下:

@SPI
public interface RateLimiterAlgorithm<T> {

RedisScript<T> getScript();
List<String> getKeys(String id);

/**
* Callback string.
*
* @param script the script
* @param keys the keys
* @param scriptArgs the script args
*/
default void callback(final RedisScript<?> script, final List<String> keys, final List<String> scriptArgs) {
}
}

@SPI 将这个接口注册为shenyu SPI, 其中定义了三个方法:

  • getScript() 方法返回一个 RedisScript对象,这个对象将传递给Redis。
  • getKeys(String id) 返回一个键值的List.
  • callback()回调函数用于异步处理一些需要在返回后做的处理,缺省是空方法。

抽象类 AbstractRateLimiterAlgorithm

在这个类中,实现了接口的模板方法,使用参数类型为List<Long>, 抽象方法getScriptName() 和getKeyName() 留给各个实作类来实现。如下的getScript() 是这个类中读取lua脚本的处理代码。

    public RedisScript<List<Long>> getScript() {
if (!this.initialized.get()) {
DefaultRedisScript redisScript = new DefaultRedisScript<>();
String scriptPath = "/META-INF/scripts/" + getScriptName();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(scriptPath)));
redisScript.setResultType(List.class);
this.script = redisScript;
initialized.compareAndSet(false, true);
return redisScript;
}
return script;
}

AtomicBoolean类型的变量initialized 用来标记lua脚本是否有被加载。 如果还没有加载,就从/META-INF/scripts/目录下,读取scriptName指定的Lua文件,加载成RedisScript对象。指定结果为List类型, 设定量initialized为true,避免重复加载。 返回 RedisScript对象。

AbstractRateLimiterAlgorithmgetKeys()的代码如下,

    @Override
public List<String> getKeys(final String id) {
String prefix = getKeyName() + ".{" + id;
String tokenKey = prefix + "}.tokens";
String timestampKey = prefix + "}.timestamp";
return Arrays.asList(tokenKey, timestampKey);
}

这个模板方法中,产生了两个字符串,其中,tokenKey会作为Key传递给redis, 指向一个有序集合。 timestampKey是一个以传入id 为识别的字符串。

可以从上面的类图中看到,ConcurrentRateLimiterAlgorithmSlidingWindowRateLimiterAlgorithm 有覆写getKeys(String id)方法,而两外两个算法程序,则采用的是抽象类中的实现。也只有 ConcurrentRateLimiterAlgorithm 重写了callback()方法。下文中我们会对此做进一步的分析。

工厂类RateLimiterAlgorithmFactory

RateLimiterAlgorithmFactory 中依据算法名称,获取RateLimiterAlgorithm实例的方法代码如下:

public static RateLimiterAlgorithm<?> newInstance(final String name) {
return Optional.ofNullable(ExtensionLoader.getExtensionLoader(RateLimiterAlgorithm.class).getJoin(name)).orElse(new TokenBucketRateLimiterAlgorithm());
}

按照Apache shenyu SPI的规则,由加载器ExtensionLoader获得实作类,当找不到算法时,默认返回令牌桶算法实现类。

与Redis做资料交互

从上面代码我们了解到Apache Shenyu网关中,RateLimiter SPI 的基本扩展点,在Shenyu网关运行中,应用ReactiveRedisTemplate 来异步执行对redis的调用处理。实现代码在RedisRateLimiter类的isAllowed()方法中,其部分代码如下:

    public Mono<RateLimiterResponse> isAllowed(final String id, final RateLimiterHandle limiterHandle) {
// get parameters that will pass to redis from RateLimiterHandle Object
double replenishRate = limiterHandle.getReplenishRate();
double burstCapacity = limiterHandle.getBurstCapacity();
double requestCount = limiterHandle.getRequestCount();
// get the current used RateLimiterAlgorithm
RateLimiterAlgorithm<?> rateLimiterAlgorithm = RateLimiterAlgorithmFactory.newInstance(limiterHandle.getAlgorithmName());

........
Flux<List<Long>> resultFlux = Singleton.INST.get(ReactiveRedisTemplate.class).execute(script, keys, scriptArgs);
return resultFlux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
return new RateLimiterResponse(allowed, tokensLeft);
})
.doOnError(throwable -> log.error("Error occurred while judging if user is allowed by RedisRateLimiter:{}", throwable.getMessage()))
.doFinally(signalType -> rateLimiterAlgorithm.callback(script, keys, scriptArgs));
}

POJO 对象RateLimiterHandle 中,定义了限流所需的属性算法名称, 速录,容量,请求的数量 。首先 从limiterHandle 包装类中取得需要传入redis的几个参数。之后从RateLimiterAlgorithmFactory 从工厂类取得当前配置的限流算法。 之后做Key值和参数的传递。

为了更方便阅读,下图给出了java代码与redis执行参数输入、输出的传递过程。左边是isAllowed() 函数的后半部分代码,右边是一个Lua脚本的输入输出代码。

下面说明Java代码的执行过程:

  1. getKeys()方法获得两个键值List<String>. 其中第一个Key会映射为Redis中的有序集合。

  2. 设定4个参数:速率 replenishRate ,容量 burstCapacity, 时间戳, 返回当前java 纪元秒数(长整数)EpochSecond, 请求的数量 requestcount.

  3. 按所设定的脚本、Key值、参数调用ReactiveRedisTemplate功能,执行redis处理。返回参数是Flux<List<Long>>类型

  4. 通过reduce方法将其返回值从Flux<ArrayList<Long>> 类型转换为Mono<ArrayList<Long>>,再经过map方法,转换为Mono<RateLimiterResponse>返回。

    返回结果有两个资料,allowed =1, 代表允许通过,0-不通过;而第二个返回参数tokensLeft,是可用的剩余请求数量。

5.容错性方面,由于使用的是reactor 的非阻塞通讯模型,当发生错误时,会执行onErrorResume()语句,Flux.just产生返回资料, 默认为allowed=1, 代表允许通过, 并丢出错误日志。

6.之后执行doFinally()方法,执行算法实现类的callback方法。

io-with-lua

4种限流算法

上面我们了解了网关中如何通过Java代码如何与Redis 做通讯,这一节我们通过简要分析网关中提供的4种限流算法中的一些代码,来理解如何开发使用RateLimiter SPI的接口方法,并与Redis有效协作。

Ratelimiter SPI目前提供了4种限流算法:

Algorithm nameJava classLua script file
Request rate limiterTokenBucketRateLimiterAlgorithmrequest_rate_limiter.lua
Slide window rate limiterSlidingWindowRateLimiterAlgorithmliding_window_request_rate_limiter.lua
Concurrent rate limiterConcurrentRateLimiterAlgorithmconcurrent_request_rate_limiter.lua
Leaky bucket algorithmLeakyBucketRateLimiterAlgorithmrequest_leaky_rate_limiter.lua
  1. 令牌桶限流:按请求数量限流,设置每秒N个请求,超过N的请求会拒绝服务。算法实现时,以时间间隔计算匀速产生令牌的数量。若每次请求的数量,小于桶内令牌的数量,则允许通过。 时间窗口为 2*容积/速率。
  2. 滑动窗口限流:与令牌桶限流不同在于,其窗口大小比令牌桶的窗口小,为一个容积/速率。并且每次移动向后一个时间时间窗口。其他限流原理与令牌桶类似。
  3. 并发的请求速率限流:严格限制并发访问量为N个请求,大于N的请求会被拒绝。每次当有新请求,查看计数是否大于N, 若小于N则允许通过,计数加1。 当这个请求调用结束时,会释放这个信号(计数减1)
  4. 漏桶算法: 相对于令牌桶算法,漏桶算法有助于减少流量聚集,实现更为平滑的限流处理。 漏桶算法强制以常数N的速率输出流量,其以漏桶为模型,可漏水的量为时间间隔 *速率。若可漏水量>已使用量,则已使用量设为0( 清空漏桶),否则已使用量要减去可漏水量。 若请求数量+ 已使用量< 总容量,则允许请求通过。

下面以 并行限流算法为例,解读Lua和Java代码,查看callback 方法的使用。 通过解读令牌桶和滑动窗口算法代码,了解getKey()方法的使用。

并发请求数限流中使用callback方法

首先ConcurrentRateLimiterAlgorithmgetKeys() 方法覆写了抽象类中的模板方法:

    @Override
public List<String> getKeys(final String id) {
String tokenKey = getKeyName() + ".{" + id + "}.tokens";
String requestKey = UUIDUtils.getInstance().generateShortUuid();
return Arrays.asList(tokenKey, requestKey);
}

第二个元素 requestKey 是一个long型不重复值(由一个分布式ID产生器产生的,递增,比当前时间EpochSecond小), 相应的concurrent_request_rate_limiter.lua的代码:

local key = KEYS[1]

local capacity = tonumber(ARGV[2])
local timestamp = tonumber(ARGV[3])
local id = KEYS[2]

这里id 即是取得上面的getKeys()方法产生的requestKey, 一个uuid. 后续的处理如下:

local count = redis.call("zcard", key)
local allowed = 0

if count < capacity then
redis.call("zadd", key, timestamp, id)
allowed = 1
count = count + 1
end
return { allowed, count }

先用zcard命令统计redis中key值所对应的有序集合中的元素个数,若元素总数count小于容量,则允许通过,并用zadd key score member方法,向key所在的有序集合中,添加一个元素id, 其score为timestamp. 则此时元素的总个数count实际为count+1.

以上的代码都是在redis中作为一个原子操作来执行的。当同一个key (例如Ip下)有大量并发请求时,redis记录的该ip的有序集合的数量count也在不断累加中。当超过容量限制,则会拒绝服务。

并发请求数限流算法中,要求当请求调用结束时,要释放这个信号量,lua代码中并没有做这个处理。

我们来看看 ConcurrentRateLimiterAlgorithm类中的回调函数:

    @Override
@SuppressWarnings("unchecked")
public void callback(final RedisScript<?> script, final List<String> keys, final List<String> scriptArgs) {
Singleton.INST.get(ReactiveRedisTemplate.class).opsForZSet().remove(keys.get(0), keys.get(1)).subscribe();
}

这里做了一个异步的订阅处理,通过ReactiveRedisTemplate删除redis中(key, id)的元素,等待调用结束后,释放这个信号。这个remove的处理不能放到lua脚本中执行,否则逻辑就是错误的。这也正是RateLimiterAlgorithm SPI 设计callback方法的用意。

令牌桶算法中使用getKeys()

对应的Lua 代码如下:

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]

省略获取参数的代码

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)

时间窗口ttl 大概是 2* 容量/速率.

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end

从有序集合中取得上次使用的token,如果没有则last_tokens = 容量。

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end

以timestamp_key为key,从有序集合中取得上次刷新时间,默认为0.

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end

时间间隔*速率匀速产生令牌,若令牌数量>请求数量,则allowed=1, 并且更新令牌数量。

redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

return { allowed_num, new_tokens }

这里now是传入的当前时间(EpochSecond),设置tokens_key所对应的有序集合的值为 new_tokens(即新令牌数量) , 过期时间为ttl。 更新集合中,timestamp_key的值为当前时间,过期时间为ttl.

滑动窗口算法中使用getKeys方法

SlidingWindowRateLimiterAlgorithmgetKeys()同样覆写了父类,代码与ConcurrentRateLimiterAlgorithm 方法代码一致。

如下为滑动窗口算法的Lua代码,省略了其他参数的接收代码。

local timestamp_key = KEYS[2]
......
local window_size = tonumber(capacity / rate)
local window_time = 1

设定窗口大小为容积/速率。

local last_requested = 0
local exists_key = redis.call('exists', tokens_key)
if (exists_key == 1) then
last_requested = redis.call('zcard', tokens_key)
end

获取当前key 的基数

local remain_request = capacity - last_requested
local allowed_num = 0
if (last_requested < capacity) then
allowed_num = 1
redis.call('zadd', tokens_key, now, timestamp_key)
end

计算剩余可用量 = 容量 减去已使用量,若last_requested < capacity ,则允许通过,并且在tokens_key为key的有序集合中,增加一个 元素(key =timestam_key,value= now)

redis.call('zremrangebyscore', tokens_key, 0, now - window_size / window_time)
redis.call('expire', tokens_key, window_size)

return { allowed_num, remain_request }

前面已经设定window_time=1, 用Redis的 zremrangebyscore命令,移除有序集合中,score为[0- 当前时间-窗口大小]的元素,即移动一个窗口大小。设定tokens_key的过期时间为窗口大小。

AbstractRateLimiterAlgorithm的模板方法中,getKeys(final String id) 给出的第二个值(以secondKey指代),是拼接了{id} (即resolve key)的一个固定字符串。从上面三个算法代码可以看到,在令牌桶算法中,secondKey在Lua代码执行中会更新为最新的时间,所以无所谓传入的值。而在并发限流算法中,会以此secondKey为条件,在java callback方法中移除对应的元素。而在滑动窗口算法中,这个secondKey的值,会作为一个新元素的key, 增加到当前有序集合中,并在做窗口滑动中,过期的资料会被删除掉。

总之,当设计新的限流算法时,要根据算法需要仔细设计getKey()方法。

如何调用 RateLimiter SPI

RateLimiter Plug中的doExecute()方法中,传入的三个参数 exchange 为请求的连接, chain 为shenyu插件的调用链,selector 是选择器,rule是系统中配置的规则参数资料。

protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
RateLimiterHandle limiterHandle = RatelimiterRuleHandleCache.getInstance()
.obtainHandle(CacheKeyUtils.INST.getKey(rule));
String resolverKey = Optional.ofNullable(limiterHandle.getKeyResolverName())
.flatMap(name -> Optional.of("-" + RateLimiterKeyResolverFactory.newInstance(name).resolve(exchange)))
.orElse("");
return redisRateLimiter.isAllowed(rule.getId() + resolverKey, limiterHandle)
.flatMap(response -> {
if (!response.isAllowed()) {
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
Object error = ShenyuResultWrap.error(ShenyuResultEnum.TOO_MANY_REQUESTS.getCode(), ShenyuResultEnum.TOO_MANY_REQUESTS.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
return chain.execute(exchange);
});
}

1.首先,从缓存中,取得系统设定的限流参数RateLimiterHandle实例 limiterHandle. 2.根据name指定的Resolver 获得请求的连接Key信息(如地址等). 3.调用 RedisRateLimiter的 isAllowed方法, 获取返回值后, 4.若isAllowd=false,做错误处理 5.如果 isAllowed=true,return chain.execute(exchange), 对该请求做后续处理,传递到调用链的下一关。

Summary

整个RateLimiter plugin框架基于Spring WebFlux开发,用redis 和lua脚本做限流计数及核心逻辑处理,支持高并发及弹性扩展。

  1. RateLimiter SPI 提供了两个SPI 接口,通过应用面向接口设计及各种设计模式,可以方便的增加新的限流算法,以及各种流量解析规则。

  2. 提供了令牌桶、并发速率限流、滑动窗口、漏桶4种限流算法。在设计算法实现时,需要根据算法特征设计KEY值,用Lua脚本实现在redis中要处理的逻辑,设计callback()方法做后续的数据处理。

  3. 响应式编程,实现过程简洁高效。

· One min read
Yuxuan Zhang

前言

作为 Shenyu 社区初来乍到的开发者,我在按照相关教程进行项目启动及开发的过程中,遇到了一些教程中并未提及到的 “坑” , 我将我启动shenyu , shenyu-dashboard, shenyu-website 的详细步骤记录在这篇博客中,希望可以帮到社区中更多的新人开发者。

环境准备

  • 本地正确安装 JDK17 或更高版本
  • 本地正确安装 Git
  • 本地正确安装Maven3.63 或更高版本
  • 选择一款开发工具,本文使用 IDEA 为例

ShenYu 后端启动指南

安装并配置Maven

Maven是一个跨平台的项目管理工具。作为Apache组织顶级开源项目,其主要服务于基于Java平台的项目创建,依赖管理和项目信息管理。

  1. 下载 maven,并解压到一个没有中文没有空格的路径下。

  2. maven 目录下的 bin 目录添加至环境变量中。以 Windows 为例,若下载目录为 E:\apache-maven-3.9.1 ,则将E:\apache-maven-3.9.1\bin 添加至 Path 系统变量中。

  3. 验证是否安装成功。在命令行窗口中输入 mvn -v ,若出现 Maven 版本及 Java 版本即为安装成功。如下所示:

    C:\Users\pc>mvn -v
    Apache Maven 3.9.1 (2e178502fcdbffc201671fb2537d0cb4b4cc58f8)
    Maven home: E:\apache-maven-3.9.1
    Java version: 18.0.1.1, vendor: Oracle Corporation, runtime: C:\Program Files\Java\jdk-18.0.1.1
    Default locale: zh_CN, platform encoding: UTF-8
    OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
  4. 为了加快项目相关依赖的下载速度,需要更改 Maven 镜像,此处添加阿里云等镜像。将 conf/settings.xml<mirrors> </mirrors> 标签对更改为以下内容:

    <mirrors>
    <mirror>
    <id>alimaven</id>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    <mirrorOf>central</mirrorOf>
    </mirror>

    <mirror>
    <id>alimaven</id>
    <mirrorOf>central</mirrorOf>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
    </mirror>

    <mirror>
    <id>maven</id>
    <mirrorOf>central</mirrorOf>
    <name>name_</name>
    <url>http://repo1.maven.org/maven2</url>
    </mirror>

    <mirror>
    <id>junit</id>
    <mirrorOf>central</mirrorOf>
    <name>junit address/</name>
    <url>http://jcenter.bintray.com/</url>
    </mirror>
    </mirrors>

    并在 </mirrors> 下一行添加 <localRepository>E:/maven_local_repository</localRepository>设置 Maven 本地仓库位置。具体位置可自行指定。

拉取 ShenYu 代码

  1. 在 Github 上 Fork ShenYu 仓库到自己的存储库中,以后可在此仓库中进行开发并提交 PR

  2. 使用 Git 将上一步 Fork 的仓库下载到本地:

    git clone git@github.com:${YOUR_USERNAME}/${TARGET_REPO}.git

    若提示文件名过长,则通过命令行执行下面的命令:

    git config --global core.longpaths true

    Tips: 如果提示如下错误或者网络不好无法拉取全部代码:

    RPC failed; curl 92 HTTP/2 stream 5 was not closed cleanly: CANCEL (err 8) 2057 bytes of body are still expected fetch-pack: unexpected disconnect while reading sideband packet early EOF fetch-pack: invalid index-pack output

    可以执行以下命令先拉取一个版本的代码,然后在获取全量代码.

    git clone https://github.com/apache/shenyu.git --depth 1
    cd ./shenyu
    git fetch --unshallow

ShenYu 初启动

准备工作

  1. shenyu 目录下使用 Maven 进行编译:

    mvn clean install -Dmaven.javadoc.skip=true -B -Drat.skip=true -Djacoco.skip=true -DskipITs -DskipTests
  2. 配置 IDEA 环境。使用 IDEA 打开 shenyu 项目,点击左上角 File -> Settings ,按照下图配置 Maven 。其中 User settings file 选择你的 settings.xml 所在目录, Local repository 会自动加载 settings.xml 中设置的 localRepository 路径:

  3. 此时,IDEA 会自动下载项目相关依赖,需等待一会,完成后如下图所示:

    可以发现, shenyu-e2e, shenyu-examples, shenyu-integrated-test 没有被 IDEA 标记为 Maven 项目,需手动添加。分别选中包中的 pom.xml 文件,右键点击 Add as Maven Project 。 若 shenyu-e2e 构建失败,则将其 pom.xml<relativePath>./pom.xml</relativePath> 改为 <relativePath/>

启动网关服务

  1. 启动 shenyu-admin 控制台(默认使用H2数据库)

  2. 启动 shenyu-bootstrap

到这一步,shenyu网关已经启动。

我们可以打开浏览器,访问admin控制台:http://localhost:9095/

默认账号:admin ,默认密码:123456

启动应用服务

Apache ShenYu提供了Http、Dubbo、SpringCloud等应用接入shenyu网关的样例,位于 shenyu-example 模块,这里以Http服务为例。

启动 shenyu-examples-http

这时,shenyu-examples-http 会自动把加 @ShenyuSpringMvcClient 注解的接口方法,以及application.yml中的相关配置注册到网关。我们打开 admin控制台,即可在插件列表 -> Proxy -> divide 中看到相关配置。

测试Http请求

下面使用 IDEA HTTP Client Plugin 模拟 http 的方式来访问 http 服务。

  • 本地访问,不使用 shenyu 代理

  • 使用 shenyu 代理

使用更多插件

我们可以参考 官方文档左侧插件集合,来使用所需要插件。

Shenyu 前端启动指南

安装 Node.js

下载

  1. 官网下载并安装Node.js ,选择 LTS 版本即可

  2. 安装时,除了设置安装路径,其他一直点 Next 即可

  3. 安装完成后,在命令行中进行验证:

    C:\Users\pc>node -v
    v12.22.12

    C:\Users\pc>npm -v
    6.14.16

换源

为了加快 npm 下载速度,需要进行换源:

# 查看当前源
npm config get registry
# 换为中国 npmmirror 镜像源
npm config set registry https://registry.npmmirror.com
# 查看是否换源成功
npm config get registry

拉取 ShenYu Dashboard 代码

  1. Fork ShenYu Dashboard 仓库

  2. 使用 Git 下载到本地:

    git clone git@github.com:${YOUR_USERNAME}/${TARGET_REPO}.git

前后端联合开发

  1. 在后端仓库 shenyushenyu-admin/src/main/resources/application.yml 文件中按下图所示添加 enablePrintApiLog: true ,以在后端控制台显示前端接口被调用的日志。

  2. 启动 ShenyuAdminBootstrap

  3. 切换至前端仓库 shenyu-dashboard ,打开 README ,依次点击 npm install, npm start 或通过命令行输入上述命令即可通过 http://localhost:8000 访问前端界面,并可在后端控制台中显示前端接口被调用的日志,实现前后端联合开发。

打包前端代码

执行 READMEnpm build 命令,并将 dist 文件夹下生成的所有文件复制到后端仓库中 shenyu-admin/src/main/resources/static/ 目录下。

为 Shenyu 官网做贡献

按照 shenyu-websiteREADME 进行操作即可。

小贴士

  1. 可以为 yarn 进行换源,流程同 npm
  2. 建议下载 Node 官网LTS 版本
  3. Windows 系统无法进行部署,如需对你的更改进行验证,可以在Linux 虚拟机或服务器上进行部署