Skip to main content

One post tagged with "Apache ShenYu"

View All Tags

· One min read

本文基于shenyu-2.6.1版本进行源码分析,官网的介绍请参考 数据同步原理

Admin管理端

以新增插件的流程来理解下整体的流程

接收数据

  • PluginController.createPlugin()

进入PluginController类中的createPlugin()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/plugin")
public class PluginController {

@PostMapping("")
@RequiresPermissions("system:plugin:add")
public ShenyuAdminResult createPlugin(@Valid @ModelAttribute final PluginDTO pluginDTO) {
// 调用pluginService.createOrUpdate 进行处理逻辑
return ShenyuAdminResult.success(pluginService.createOrUpdate(pluginDTO));
}

// ......
}

处理数据

  • PluginServiceImpl.createOrUpdate() -> PluginServiceImpl.create()

PluginServiceImpl类中通过create()方法完成数据的转换,保存到数据库,发布事件。

@RequiredArgsConstructor
@Service
public class PluginServiceImpl implements SelectorService {
// 事件发布对象 pluginEventPublisher
private final PluginEventPublisher pluginEventPublisher;

private String create(final PluginDTO pluginDTO) {
// 判断有没有对应的插件
Assert.isNull(pluginMapper.nameExisted(pluginDTO.getName()), AdminConstants.PLUGIN_NAME_IS_EXIST);
// 自定义的插件jar
if (!Objects.isNull(pluginDTO.getFile())) {
Assert.isTrue(checkFile(Base64.decode(pluginDTO.getFile())), AdminConstants.THE_PLUGIN_JAR_FILE_IS_NOT_CORRECT_OR_EXCEEDS_16_MB);
}
// 创建plugin对象
PluginDO pluginDO = PluginDO.buildPluginDO(pluginDTO);
// 插入对象到数据库
if (pluginMapper.insertSelective(pluginDO) > 0) {
// 插件新增成功,则发布创建事件
// publish create event. init plugin data
pluginEventPublisher.onCreated(pluginDO);
}
return ShenyuResultMessage.CREATE_SUCCESS;
}


// ......

}

PluginServiceImpl类完成数据的持久化操作,即保存数据到数据库,并通过 pluginEventPublisher 进行发布事件。

pluginEventPublisher.onCreateed方法的逻辑是:发布变更的事件。

    @Override
public void onCreated(final PluginDO plugin) {
// 发布DataChangeEvent事件:事件分组(插件、选择器、规则)、事件类型(创建、删除、更新)、变更的数据
publisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, DataEventTypeEnum.CREATE,
Collections.singletonList(PluginTransfer.INSTANCE.mapToData(plugin))));
// 发布PluginCreatedEvent
publish(new PluginCreatedEvent(plugin, SessionUtil.visitorName()));
}

发布变更数据通过publisher.publishEvent()完成,这个publisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者;

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {
//......
}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

//......

}

分发数据

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(这里只会注册ApolloDataChangedListener)
for (DataChangedListener listener : listeners) {
// 依据不同的分组类型进行转发
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件事件
// 调用注册的listener对象
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则事件
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器事件
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据事件
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
case PROXY_SELECTOR: // 代理选择器事件
listener.onProxySelectorChanged((List<ProxySelectorData>) event.getSource(), event.getEventType());
break;
case DISCOVER_UPSTREAM: // 注册发现下游列表事件
listener.onDiscoveryUpstreamChanged((List<DiscoverySyncData>) event.getSource(), event.getEventType());
applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnUpstreamChanged((List<DiscoverySyncData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}

}

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共会有以下种:认证信息、插件信息、规则信息、选择器信息、元数据、代理选择器、发现下游事件。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,由特定的实现来处理,而不同的监听由不同的实现来处理,当前分析的是Apollo来 监听,所以这里只关注 ApolloDataChangedListener

// 继承AbstractNodeDataChangedListener
public class ApolloDataChangedListener extends AbstractNodeDataChangedListener {

}

ApolloDataChangedListener 继承了 AbstractNodeDataChangedListener 类,该类主要是以key作为存储方式的基类,如apollo、nacos等,其他的如zookeeper、 consul、etcd 等是以path的方式进行分层级来查找的。

// 以key作为查找存储方式的基类
public abstract class AbstractNodeDataChangedListener implements DataChangedListener {

protected AbstractNodeDataChangedListener(final ChangeData changeData) {
this.changeData = changeData;
}
}

AbstractNodeDataChangedListener 接收 ChangeData作为参数,该对象定义了存储于Apollo中的各个数据的key命名,存储于Apollo中的数据包括以下数据:

  • 插件(plugin)
  • 选择器(selector)
  • 规则(rule)
  • 授权(auth)
  • 元数据(meta)
  • 代理选择器(proxy.selector)
  • 下游列表(discovery)

这些信息由ApolloDataChangedListener构造器指定:

public class ApolloDataChangedListener extends AbstractNodeDataChangedListener {
public ApolloDataChangedListener(final ApolloClient apolloClient) {
// 配置几类分组数据的前缀
super(new ChangeData(ApolloPathConstants.PLUGIN_DATA_ID,
ApolloPathConstants.SELECTOR_DATA_ID,
ApolloPathConstants.RULE_DATA_ID,
ApolloPathConstants.AUTH_DATA_ID,
ApolloPathConstants.META_DATA_ID,
ApolloPathConstants.PROXY_SELECTOR_DATA_ID,
ApolloPathConstants.DISCOVERY_DATA_ID));
// 操作apollo的对象
this.apolloClient = apolloClient;
}
}

DataChangedListener 定义了以下几个方法:

// 数据变更监听器
public interface DataChangedListener {

// 授权信息变更时调用
default void onAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType) {
}

// 插件信息变更时调用
default void onPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType) {
}

// 选择器信息变更时调用
default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) {
}

// 元数据信息变更时调用
default void onMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType) {

}

// 规则信息变更时调用
default void onRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType) {
}

// 代理选择器变更时调用
default void onProxySelectorChanged(List<ProxySelectorData> changed, DataEventTypeEnum eventType) {
}
// 发现下游信息变更时调用
default void onDiscoveryUpstreamChanged(List<DiscoverySyncData> changed, DataEventTypeEnum eventType) {
}

}

DataChangedEventDispatcher处理插件时,调用方法 listener.onPluginChanged, 接下来分析下对象的逻辑,实现由AbstractNodeDataChangedListener处理:

public abstract class AbstractNodeDataChangedListener implements DataChangedListener {
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
// 配置前缀为plugin.
final String configKeyPrefix = changeData.getPluginDataId() + DefaultNodeConstants.JOIN_POINT;
this.onCommonChanged(configKeyPrefix, changed, eventType, PluginData::getName, PluginData.class);
LOG.debug("[DataChangedListener] PluginChanged {}", configKeyPrefix);
}
}

首先构建配置数据的key前缀为:plugin., 再调用onCommonChanged统一处理:

private <T> void onCommonChanged(final String configKeyPrefix, final List<T> changedList,
final DataEventTypeEnum eventType, final Function<? super T, ? extends String> mapperToKey,
final Class<T> tClass) {
// Avoiding concurrent operations on list nodes
final ReentrantLock reentrantLock = listSaveLockMap.computeIfAbsent(configKeyPrefix, key -> new ReentrantLock());
try {
reentrantLock.lock();
// 当前传入的插件列表
final List<String> changeNames = changedList.stream().map(mapperToKey).collect(Collectors.toList());
switch (eventType) {
// 删除操作
case DELETE:
// 按 plugin.${pluginName} 进行删除
changedList.stream().map(mapperToKey).forEach(removeKey -> {
delConfig(configKeyPrefix + removeKey);
});
// 从plugin.list中移除对应的插件名称
// plugin.list 记录下了目前启用的列表
delChangedData(configKeyPrefix, changeNames);
break;
case REFRESH:
case MYSELF:
// 重载逻辑
// 获取plugin.list中的所有插件列表
final List<String> configDataNames = this.getConfigDataNames(configKeyPrefix);
// 依次更新当前调整的每个插件
changedList.forEach(changedData -> {
// 发布配置
publishConfig(configKeyPrefix + mapperToKey.apply(changedData), changedData);
});
// 目前存储的列表中,如果数据比当前传入的多,则删除多余的数据
if (configDataNames != null && configDataNames.size() > changedList.size()) {
// 踢除当前加载的数据
configDataNames.removeAll(changeNames);
// 逐个删除已经取消的数据
configDataNames.forEach(this::delConfig);
}
// 重新更新列表数据
publishConfig(configKeyPrefix + DefaultNodeConstants.LIST_STR, changeNames);
break;
default:
// 新增或是更新
changedList.forEach(changedData -> {
publishConfig(configKeyPrefix + mapperToKey.apply(changedData), changedData);
});
// 将新加的插件更新
putChangeData(configKeyPrefix, changeNames);
break;
}
} catch (Exception e) {
LOG.error("AbstractNodeDataChangedListener onCommonMultiChanged error ", e);
} finally {
reentrantLock.unlock();
}
}

在以上逻辑,其实包含全量重载(REFRESH、MYSELF)与增量(DELETE、UPDATE、CREATE)的处理

在插件中主要包含两个节点:

  • plugin.list 当前生效的插件列表
  • plugin.${plugin.name} 具体插件的详细信息 最后,将这两个节点对应的数据写入Apollo。

数据初始化

admin启动后,会将当前的数据信息全量同步到apollo中,由ApolloDataChangedInit实现:

// 继承AbstractDataChangedInit
public class ApolloDataChangedInit extends AbstractDataChangedInit {
// apollo操作对象
private final ApolloClient apolloClient;

public ApolloDataChangedInit(final ApolloClient apolloClient) {
this.apolloClient = apolloClient;
}

@Override
protected boolean notExist() {
// 判断 plugin、auth、meta、proxy.selector等节点是否存在
// 只要有一个不存在,则进入重新加载(这些节点不会创建,为什么要判断一次呢?)
return Stream.of(ApolloPathConstants.PLUGIN_DATA_ID, ApolloPathConstants.AUTH_DATA_ID, ApolloPathConstants.META_DATA_ID, ApolloPathConstants.PROXY_SELECTOR_DATA_ID).allMatch(
this::dataIdNotExist);
}

/**
* Data id not exist boolean.
*
* @param pluginDataId the plugin data id
* @return the boolean
*/
private boolean dataIdNotExist(final String pluginDataId) {
return Objects.isNull(apolloClient.getItemValue(pluginDataId));
}
}

判断apollo中是否存在数据,如果不存在,则进行同步。 这里有一个bug, 因为这里判断的key,在同步时,并不会创建,则会导致每次重启时都重新加载数据,已提PR#5435

ApolloDataChangedInit实现了CommandLineRunner接口。它是springboot提供的接口,会在所有 Spring Beans初始化之后执行run()方法,常用于项目中初始化的操作。

  • SyncDataService.syncAll()

从数据库查询数据,然后进行全量数据同步,所有的认证信息、插件信息、规则信息、选择器信息、元数据、代理选择器、发现下游事件。主要是通过eventPublisher发布同步事件,eventPublisher通过publishEvent()发布完事件后,有ApplicationListener执行事件变更操作,在ShenYu中就是前面提到的DataChangedEventDispatcher

@Service
public class SyncDataServiceImpl implements SyncDataService {
// 事件发布
private final ApplicationEventPublisher eventPublisher;

/***
* 全量数据同步
* @param type the type
* @return
*/
@Override
public boolean syncAll(final DataEventTypeEnum type) {
// 同步auth数据
appAuthService.syncData();
// 同步插件数据
List<PluginData> pluginDataList = pluginService.listAll();
// 通过spring发布/订阅机制进行通知订阅者(发布DataChangedEvent)
// 统一由DataChangedEventDispatcher进行监听
// DataChangedEvent带上了配置分组类型、当前操作类型、数据
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
// 同步选择器
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
// 同步规则
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
//元数据
metaDataService.syncData();
// 下游列表
discoveryService.syncData();
return true;
}

}

bootstrap同步操作初始化

网关这边的数据同步初始化操作主要是订阅apollo中的节点,当有数据变更时,收到变更数据。这依赖于apollolistener机制。在ShenYu中,负责apollo数据同步的是ApolloDataService

ApolloDataService的功能逻辑是在实例化的过程中完成的:对apollo中的shenyu数据同步节点完成订阅。通过configService.addChangeListener()方法实现;

public class ApolloDataService extends AbstractNodeDataSyncService implements SyncDataService {
public ApolloDataService(final Config configService, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers,
final List<ProxySelectorDataSubscriber> proxySelectorDataSubscribers,
final List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) {
// 配置监听的前缀
super(new ChangeData(ApolloPathConstants.PLUGIN_DATA_ID,
ApolloPathConstants.SELECTOR_DATA_ID,
ApolloPathConstants.RULE_DATA_ID,
ApolloPathConstants.AUTH_DATA_ID,
ApolloPathConstants.META_DATA_ID,
ApolloPathConstants.PROXY_SELECTOR_DATA_ID,
ApolloPathConstants.DISCOVERY_DATA_ID),
pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
this.configService = configService;
// 开始监听
// 注:Apollo该方法,只负责获取apollo的数据获取,并添加到本地缓存中,不处理监听
startWatch();
// 配置监听
apolloWatchPrefixes();
}
}

首先配置需要处理的key信息,同admin同步的key。接着调用startWatch() 方法进行处理数据获取与监听。但对于Apollo的实现中,该方法只负责处理数据的获取并设置到本地缓存中。 监听由apolloWatchPrefixes方法来处理

private void apolloWatchPrefixes() {
// 定义监听器
final ConfigChangeListener listener = changeEvent -> {
changeEvent.changedKeys().forEach(changeKey -> {
try {
final ConfigChange configChange = changeEvent.getChange(changeKey);
// 未变更则跳过
if (configChange == null) {
LOG.error("apollo watchPrefixes error configChange is null {}", changeKey);
return;
}
final String newValue = configChange.getNewValue();
// skip last is "list"
// 如果是list结尾的Key,如plugin.list则跳过,因为这里只是记录生效的一个列表,不会在本地缓存中
final int lastListStrIndex = changeKey.length() - DefaultNodeConstants.LIST_STR.length();
if (changeKey.lastIndexOf(DefaultNodeConstants.LIST_STR) == lastListStrIndex) {
return;
}
// 如果是plugin.开头 => 处理插件数据
if (changeKey.indexOf(ApolloPathConstants.PLUGIN_DATA_ID) == 0) {
// 删除
if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {
// 清除缓存
unCachePluginData(changeKey);
} else {
// 更新缓存
cachePluginData(newValue);
}
// 如果是selector.开头 => 处理选择器数据
} else if (changeKey.indexOf(ApolloPathConstants.SELECTOR_DATA_ID) == 0) {
if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {
unCacheSelectorData(changeKey);
} else {
cacheSelectorData(newValue);
}
// 如果是rule.开头 => 处理规则数据
} else if (changeKey.indexOf(ApolloPathConstants.RULE_DATA_ID) == 0) {
if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {
unCacheRuleData(changeKey);
} else {
cacheRuleData(newValue);
}
// 如果是auth.开头 => 处理授权数据
} else if (changeKey.indexOf(ApolloPathConstants.AUTH_DATA_ID) == 0) {
if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {
unCacheAuthData(changeKey);
} else {
cacheAuthData(newValue);
}
// 如果是meta.开头 => 处理元数据
} else if (changeKey.indexOf(ApolloPathConstants.META_DATA_ID) == 0) {
if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {
unCacheMetaData(changeKey);
} else {
cacheMetaData(newValue);
}
// 如果是proxy.selector.开头 => 处理代理选择器数据
} else if (changeKey.indexOf(ApolloPathConstants.PROXY_SELECTOR_DATA_ID) == 0) {
if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {
unCacheProxySelectorData(changeKey);
} else {
cacheProxySelectorData(newValue);
}
// 如果是discovery.开头 => 处理下游列表数据
} else if (changeKey.indexOf(ApolloPathConstants.DISCOVERY_DATA_ID) == 0) {
if (PropertyChangeType.DELETED.equals(configChange.getChangeType())) {
unCacheDiscoveryUpstreamData(changeKey);
} else {
cacheDiscoveryUpstreamData(newValue);
}
}
} catch (Exception e) {
LOG.error("apollo sync listener change key handler error", e);
}
});
};
watchConfigChangeListener = listener;
// 添加监听
configService.addChangeListener(listener, Collections.emptySet(), ApolloPathConstants.pathKeySet());

}

由前面admin加载数据的逻辑,插件只会增加两个Key:plugin.listplugin.${plugin.name},而 plugin.list 是所有启用的插件列表,该key的数据在 本地缓存中没有数据,只会关注plugin.${plugin.name} key对应的数据,这是对应的插件的详细信息。

至此,bootstrap在apollo中的同步逻辑就分析完成。

· One min read

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于Etcd的数据同步源码分析。

本文基于shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. 关于Etcd

Etcd是一个分布式的键值对存储系统,它为大型分布式计算提供分布式配置服务、同步服务和命名注册。

2. Admin数据同步

我们从一个实际案例进行源码追踪,比如在后台管理系统中,对Divide插件中的一条选择器数据进行更新,将权重更新为90:

2.1 接收数据

  • SelectorController.createSelector()

进入SelectorController类中的updateSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/selector")
public class SelectorController {

@PutMapping("/{id}")
public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) {
// 设置当前选择器数据id
selectorDTO.setId(id);
// 创建或更新操作
Integer updateCount = selectorService.createOrUpdate(selectorDTO);
// 返回结果信息
return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount);
}

// ......
}

2.2 处理数据

  • SelectorServiceImpl.createOrUpdate()

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {
// 负责事件发布的eventPublisher
private final ApplicationEventPublisher eventPublisher;

@Override
@Transactional(rollbackFor = Exception.class)
public int createOrUpdate(final SelectorDTO selectorDTO) {
int selectorCount;
// 构建数据 DTO --> DO
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
// 判断是添加还是更新
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 插入选择器数据
selectorCount = selectorMapper.insertSelective(selectorDO);
// 插入选择器中的条件数据
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
// check selector add
// 权限检查
if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {
DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();
dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());
dataPermissionDTO.setDataId(selectorDO.getId());
dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);
dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));
}

} else {
// 更新数据,先删除再新增
selectorCount = selectorMapper.updateSelective(selectorDO);
//delete rule condition then add
selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
selectorConditionMapper.insertSelective(selectorConditionDO);
});
}
// 发布事件
publishEvent(selectorDO, selectorConditionDTOs);

// 更新upstream
updateDivideUpstream(selectorDO);
return selectorCount;
}


// ......

}

Service类完成数据的持久化操作,即保存数据到数据库,这个比较简单,就不深入追踪了。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会执行数据同步。

publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。

     private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {
// 找到选择器对应的插件
PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
// 构建条件数据
List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
// 发布变更数据
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}

发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者;

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {
//......
}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

//......

}

2.3 分发数据

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件信息
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则信息
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}

}

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:

  • WebsocketDataChangedListener:基于websocket的数据同步;
  • ZookeeperDataChangedListener:基于zookeeper的数据同步;
  • ConsulDataChangedListener:基于consul的数据同步;
  • EtcdDataDataChangedListener:基于etcd的数据同步;
  • HttpLongPollingDataChangedListener:基于http长轮询的数据同步;
  • NacosDataChangedListener:基于nacos的数据同步;

既然有这么多种实现策略,那么如何确定使用哪一种呢?

因为本文是基于Etcd的数据同步源码分析,所以这里以EtcdDataDataChangedListener为例,分析它是如何被加载并实现的。

通过查看对EtcdDataDataChangedListener类的调用,可以发现,它是在DataSyncConfiguration类进行配置的。

/**
* 数据同步配置类
* 通过springboot条件装配实现
* The type Data sync configuration.
*/
@Configuration
public class DataSyncConfiguration {

//省略了其他代码......

/**
* The type Etcd listener.
*/
@Configuration
@ConditionalOnProperty(prefix = "shenyu.sync.etcd", name = "url")
@EnableConfigurationProperties(EtcdProperties.class)
static class EtcdListener {

@Bean
public EtcdClient etcdClient(final EtcdProperties etcdProperties) {
Client client = Client.builder()
.endpoints(etcdProperties.getUrl())
.build();
return new EtcdClient(client);
}

/**
* Config event listener data changed listener.
* 创建Etcd数据变更监听器
* @param etcdClient the etcd client
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(EtcdDataDataChangedListener.class)
public DataChangedListener etcdDataChangedListener(final EtcdClient etcdClient) {
return new EtcdDataDataChangedListener(etcdClient);
}

/**
* data init.
* 创建Etcd数据初始化类
* @param etcdClient the etcd client
* @param syncDataService the sync data service
* @return the etcd data init
*/
@Bean
@ConditionalOnMissingBean(EtcdDataInit.class)
public EtcdDataInit etcdDataInit(final EtcdClient etcdClient, final SyncDataService syncDataService) {
return new EtcdDataInit(etcdClient, syncDataService);
}
}

//省略了其他代码......
}

这个配置类是通过SpringBoot条件装配类实现的。在EtcdListener类上面有几个注解:

  • @Configuration:配置文件,应用上下文;

  • @ConditionalOnProperty(prefix = "shenyu.sync.etcd", name = "url"):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用etcd进行数据同步。

    shenyu:  
    sync:
    etcd:
    url: localhost:2181
  • @EnableConfigurationProperties(EtcdProperties.class):导入另一个属性类EtcdPropertiesEtcdProperties中各属性对应配置文件中以shenyu.sync.etcd作为前缀的各属性。

@Data
@ConfigurationProperties(prefix = "shenyu.sync.etcd")
public class EtcdProperties {

private String url;

private Integer sessionTimeout;

private Integer connectionTimeout;

private String serializer;
}

当我们在配置文件中配置了shenyu.sync.etcd.url属性时,Admin将采用etcd进行数据同步,此时配置类EtcdListener会生效,并生成EtcdClient, EtcdDataDataChangedListenerEtcdDataInit类型的bean。

  • 生成EtcdClient类型的bean,etcdClient,这个bean根据配置文件,配置了与etcd服务器的连接信息,可以直接操作etcd节点。
  • 生成EtcdDataDataChangedListener类型的bean,etcdDataDataChangedListener,这个bean将beanetcdClient作为成员变量,当监听到事件时,进行回调操作,可以直接使用该bean操作etcd节点。
  • 生成EtcdDataInit类型的bean,etcdDataInit,这个bean将beanetcdClient和beansyncDataService作为成员变量,使用etcdClient根据etcd路径,判断数据是否未初始化,当未初始化时,将调用syncDataService进行刷新操作,将在下文详述。 根据上文所述,在事件处理方法onApplicationEvent()中,就会到相应的listener中。在我们的案例中,是对一条选择器数据进行更新,数据同步采用的是etcd,所以,代码会进入到EtcdDataDataChangedListener进行选择器数据变更处理。
    //DataChangedEventDispatcher.java
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {

// 省略了其他逻辑

case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); // 在我们的案例中,会进入到EtcdDataDataChangedListener进行选择器数据变更处理
break;
}
}

2.4 Etcd数据变更监听器

  • EtcdDataDataChangedListener.onSelectorChanged()

    onSelectorChanged()方法中,判断操作类型,是刷新同步还是更新或创建同步。根据当前选择器数据信息判断节点是否在etcd中。

/**
* EtcdDataDataChangedListener.
*/
@Slf4j
public class EtcdDataDataChangedListener implements DataChangedListener {

private final EtcdClient etcdClient;

public EtcdDataDataChangedListener(final EtcdClient client) {
this.etcdClient = client;
}

// 选择器信息发生改变
@Override
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
// 刷新操作
if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {
String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());
etcdClient.deleteEtcdPathRecursive(selectorParentPath);
}
// 发生变更的数据
for (SelectorData data : changed) {
// 构建选择器数据的真实路径
String selectorRealPath = DefaultPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());
//删除操作
if (eventType == DataEventTypeEnum.DELETE) {
etcdClient.delete(selectorRealPath);
continue;
}
//create or update,创建或更新操作
updateNode(selectorRealPath, data);
}
}

}

这部分是核心。changed表示需更新的SelectorData列表,eventType表示事件类型。当事件类型为刷新REFRESH,并且SelectorData有改动时,会先将etcd中该plugin下的selector节点都先删除。注意这里的条件SelectorData有改动是必须的,否则会出现没有改动时进行刷新,将所有selector节点都删除的bug。 获取到selector对应路径后,会对节点进行删除、创建或更新。

只要将变动的数据正确写入到etcd的节点上,admin这边的操作就执行完成了。

在我们当前的案例中,对Divide插件中的一条选择器数据进行更新,将权重更新为90,就会对图中的特定节点更新。

我们用时序图将上面的更新流程串联起来。

3. 网关数据同步

假设ShenYu网关已经在正常运行,使用的数据同步方式也是etcd。那么当在admin端更新选择器数据后,并且向etcd发送了变更的数据,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。

3.1 EtcdClient接收数据

  • EtcdClient.watchDataChange()

在网关端有一个EtcdSyncDataService类,它通过etcdClient订阅了数据节点,当数据发生变更时,可以感知到。

/**
* Data synchronize of etcd.
*/
@Slf4j
public class EtcdSyncDataService implements SyncDataService, AutoCloseable {
//省略其它代码
private void subscribeSelectorDataChanges(final String path) {
etcdClient.watchDataChange(path, (updateNode, updateValue) -> cacheSelectorData(updateValue),
this::unCacheSelectorData);
}
//省略其它代码
}

EtcdWatch机制,会给订阅的客户端发送节点变更通知。在我们的案例中,更新选择器信息,就会进入到watchDataChange()方法。通过cacheSelectorData()去处理数据。

3.2 处理数据

  • EtcdSyncDataService.cacheSelectorData()

经过判空逻辑之后,缓存选择器数据的操作又交给了PluginDataSubscriber处理。

    private void cacheSelectorData(final String dataString) {
final SelectorData selectorData = GsonUtils.getInstance().fromJson(dataString, SelectorData.class);
Optional.ofNullable(selectorData)
.ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));
}

PluginDataSubscriber是一个接口,它只有一个CommonPluginDataSubscriber实现类,负责处理插件、选择器和规则数据。

3.3 通用插件数据订阅者

  • PluginDataSubscriber.onSelectorSubscribe()

它没有其他逻辑,直接调用subscribeDataHandler()方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。

/**
* 通用插件数据订阅者,负责处理所有插件、选择器和规则信息
* The type Common plugin data subscriber.
*/
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
//......
// 处理选择器数据
@Override
public void onSelectorSubscribe(final SelectorData selectorData) {
subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}

// 订阅数据处理器,处理数据的更新或删除
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
// 插件数据
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cachePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
}
} else if (data instanceof SelectorData) { // 选择器数据
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理 Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) { // 规则数据
RuleData ruleData = (RuleData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
}
}
});
}

}

3.4 数据缓存到内存

那么更新一条选择器数据,会进入下面的逻辑:

// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

一是将数据保存到网关的内存中。BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

public final class BaseDataCache {
// 私有变量
private static final BaseDataCache INSTANCE = new BaseDataCache();
// 私有构造器
private BaseDataCache() {
}

/**
* Gets instance.
* 公开方法
* @return the instance
*/
public static BaseDataCache getInstance() {
return INSTANCE;
}

/**
* 缓存选择器数据的Map
* pluginName -> SelectorData.
*/
private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();

public void cacheSelectData(final SelectorData selectorData) {
Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
}

/**
* cache selector data.
* 缓存选择器数据
* @param data the selector data
*/
private void selectorAccept(final SelectorData data) {
String key = data.getPluginName();
if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入
List<SelectorData> existList = SELECTOR_MAP.get(key);
final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
resultList.add(data);
final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());
SELECTOR_MAP.put(key, collect);
} else { // 新增操作,直接放到Map中
SELECTOR_MAP.put(key, Lists.newArrayList(data));
}
}

}

二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。

经过以上的源码追踪,并通过一个实际的案例,在admin端新增更新一条选择器数据,就将etcd数据同步的流程分析清楚了。

我们还是通过时序图将网关端的数据同步流程串联一下:

数据同步的流程已经分析完了,为了不让同步流程被打断,在分析过程中就忽略了其他逻辑。我们还需要分析Admin同步数据初始化和网关同步操作初始化的流程。

4. Admin同步数据初始化

admin启动后,会将当前的数据信息全量同步到etcd中,实现逻辑如下:


/**
* EtcdDataInit.
*/
@Slf4j
public class EtcdDataInit implements CommandLineRunner {

private final EtcdClient etcdClient;

private final SyncDataService syncDataService;

public EtcdDataInit(final EtcdClient client, final SyncDataService syncDataService) {
this.etcdClient = client;
this.syncDataService = syncDataService;
}

@Override
public void run(final String... args) throws Exception {
final String pluginPath = DefaultPathConstants.PLUGIN_PARENT;
final String authPath = DefaultPathConstants.APP_AUTH_PARENT;
final String metaDataPath = DefaultPathConstants.META_DATA;
if (!etcdClient.exists(pluginPath) && !etcdClient.exists(authPath) && !etcdClient.exists(metaDataPath)) {
log.info("Init all data from database");
syncDataService.syncAll(DataEventTypeEnum.REFRESH);
}
}
}

判断etcd中是否存在数据,如果不存在,则进行同步。

EtcdDataInit实现了CommandLineRunner接口。它是springboot提供的接口,会在所有 Spring Beans初始化之后执行run()方法,常用于项目中初始化的操作。

  • SyncDataService.syncAll()

从数据库查询数据,然后进行全量数据同步,所有的认证信息、插件信息、选择器信息、规则信息和元数据信息。主要是通过eventPublisher发布同步事件。这里就跟前面提到的同步逻辑就又联系起来了,eventPublisher通过publishEvent()发布完事件后,有ApplicationListener执行事件变更操作,在ShenYu中就是前面提到的DataChangedEventDispatcher

@Service
public class SyncDataServiceImpl implements SyncDataService {
// 事件发布
private final ApplicationEventPublisher eventPublisher;

/***
* 全量数据同步
* @param type the type
* @return
*/
@Override
public boolean syncAll(final DataEventTypeEnum type) {
// 同步认证信息
appAuthService.syncData();
// 同步插件信息
List<PluginData> pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
// 同步选择器信息
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
// 同步规则信息
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
// 同步元数据信息
metaDataService.syncData();
return true;
}

}

5. 网关同步操作初始化

网关这边的数据同步初始化操作主要是订阅etcd中的节点,当有数据变更时,收到变更数据。这依赖于EtcdWatch机制。在ShenYu中,负责etcd数据同步的是EtcdSyncDataService,也在前面提到过。

EtcdSyncDataService的功能逻辑是在实例化的过程中完成的:对etcd中的shenyu数据同步节点完成订阅。这里的订阅分两类,一类是已经存在的节点上面数据发生更新,这通过etcdClient.watchDataChange()方法实现;另一类是当前节点下有新增或删除节点,即子节点发生变化,这通过etcdClient.watchChildChange()方法实现。

EtcdSyncDataService的代码有点多,这里我们以插件数据的读取和订阅进行追踪,其他类型的数据操作原理是一样的。

/**
* etcd 数据同步服务
*/
@Slf4j
public class EtcdSyncDataService implements SyncDataService, AutoCloseable {
// 在实例化的时候,完成从etcd中读取数据的操作,并订阅节点
public EtcdSyncDataService(/*省略构造参数*/) {
this.etcdClient = etcdClient;
this.pluginDataSubscriber = pluginDataSubscriber;
this.metaDataSubscribers = metaDataSubscribers;
this.authDataSubscribers = authDataSubscribers;
// 订阅插件、选择器和规则数据
watcherData();
// 订阅认证数据
watchAppAuth();
// 订阅元数据
watchMetaData();
}

private void watcherData() {
// 插件节点路径
final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;
// 所有插件节点
List<String> pluginZKs = etcdClientGetChildren(pluginParent);
for (String pluginName : pluginZKs) {
// 订阅当前所有插件、选择器和规则数据
watcherAll(pluginName);
}
// 订阅子节点(新增或删除一个插件)
etcdClient.watchChildChange(pluginParent, (updateNode, updateValue) -> {
if (!updateNode.isEmpty()) {
// 需要订阅子节点的所有插件、选择器和规则数据
watcherAll(updateNode);
}
}, null);
}

private void watcherAll(final String pluginName) {
// 订阅插件数据
watcherPlugin(pluginName);
// 订阅选择器数据
watcherSelector(pluginName);
// 订阅规则数据
watcherRule(pluginName);
}

private void watcherPlugin(final String pluginName) {
// 当前插件路径
String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);
// 缓存到网关内存中
cachePluginData(etcdClient.get(pluginPath));
// 订阅插件节点
subscribePluginDataChanges(pluginPath, pluginName);
}

private void cachePluginData(final String dataString) {
final PluginData pluginData = GsonUtils.getInstance().fromJson(dataString, PluginData.class);
Optional.ofNullable(pluginData)
.flatMap(data -> Optional.ofNullable(pluginDataSubscriber)).ifPresent(e -> e.onSubscribe(pluginData));
}

private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {
// 订阅数据变更:更新或删除,两个lambda表达式分别为更新和删除操作
etcdClient.watchDataChange(pluginPath, (updatePath, updateValue) -> {
final String dataPath = buildRealPath(pluginPath, updatePath);
final String dataStr = etcdClient.get(dataPath);
final PluginData data = GsonUtils.getInstance().fromJson(dataStr, PluginData.class);
Optional.ofNullable(data)
.ifPresent(d -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe(d)));
}, deleteNode -> deletePlugin(pluginName));
}

}

上面的源代码中都给出了注释,相信大家可以看明白。订阅插件数据的主要逻辑如下:

  1. 构造当前插件路径
  2. 读取etcd上当前节点数据,并反序列化
  3. 插件数据缓存到网关内存中
  4. 订阅插件节点

6. 总结

本文通过一个实际案例,对etcd的数据同步原理进行了源码分析。涉及到的主要知识点如下:

  • 基于etcd的数据同步,主要是通过watch机制实现;
  • 通过Spring完成事件发布和监听;
  • 通过抽象DataChangedListener接口,支持多种同步策略,面向接口编程;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过SpringBoot的条件装配和starter加载机制实现配置类的加载。

· One min read

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于Http长轮询的数据同步源码分析。

本文基于shenyu-2.5.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. Http长轮询

这里直接引用官网的相关描述:

ZookeeperWebSocket 数据同步的机制比较简单,而 Http长轮询则比较复杂。 Apache ShenYu 借鉴了 ApolloNacos 的设计思想,取其精华,自己实现了 Http长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

Http长轮询 机制如上所示,Apache ShenYu网关主动请求 shenyu-admin 的配置服务,读取超时时间为 90s,意味着网关层请求配置服务最多会等待 90s,这样便于 shenyu-admin 配置服务及时响应变更数据,从而实现准实时推送。

Http长轮询 机制是由网关主动请求 shenyu-admin ,所以这次的源码分析,我们从网关这一侧开始。

2. 网关数据同步

2.1 加载配置

Http长轮询 数据同步配置的加载是通过spring bootstarter机制,当我们引入相关依赖和在配置文件中有如下配置时,就会加载。

pom文件中引入依赖:

<!--shenyu data sync start use http-->
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>
<version>${project.version}</version>
</dependency>

application.yml配置文件中添加配置:

shenyu:
sync:
http:
url : http://localhost:9095

当网关启动时,配置类HttpSyncDataConfiguration就会执行,加载相应的Bean

/**
* Http sync data configuration for spring boot.
*/
@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")
@EnableConfigurationProperties(value = HttpConfig.class)
public class HttpSyncDataConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpSyncDataConfiguration.class);

/**
* Rest template.
* 创建RestTemplate
* @param httpConfig the http config http配置
* @return the rest template
*/
@Bean
public RestTemplate restTemplate(final HttpConfig httpConfig) {
OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
factory.setConnectTimeout(Objects.isNull(httpConfig.getConnectionTimeout()) ? (int) HttpConstants.CLIENT_POLLING_CONNECT_TIMEOUT : httpConfig.getConnectionTimeout());
factory.setReadTimeout(Objects.isNull(httpConfig.getReadTimeout()) ? (int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT : httpConfig.getReadTimeout());
factory.setWriteTimeout(Objects.isNull(httpConfig.getWriteTimeout()) ? (int) HttpConstants.CLIENT_POLLING_WRITE_TIMEOUT : httpConfig.getWriteTimeout());
return new RestTemplate(factory);
}

/**
* AccessTokenManager.
* 创建AccessTokenManager,专门用户对admin进行http请求时access token的处理
* @param httpConfig the http config.
* @param restTemplate the rest template.
* @return the access token manager.
*/
@Bean
public AccessTokenManager accessTokenManager(final HttpConfig httpConfig, final RestTemplate restTemplate) {
return new AccessTokenManager(restTemplate, httpConfig);
}

/**
* Http sync data service.
* 创建 HttpSyncDataService
* @param httpConfig the http config
* @param pluginSubscriber the plugin subscriber
* @param restTemplate the rest template
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @param accessTokenManager the access token manager
* @return the sync data service
*/
@Bean
public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig,
final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<RestTemplate> restTemplate,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
final ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
final ObjectProvider<AccessTokenManager> accessTokenManager) {
LOGGER.info("you use http long pull sync shenyu data");
return new HttpSyncDataService(
Objects.requireNonNull(httpConfig.getIfAvailable()),
Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
Objects.requireNonNull(restTemplate.getIfAvailable()),
metaSubscribers.getIfAvailable(Collections::emptyList),
authSubscribers.getIfAvailable(Collections::emptyList),
Objects.requireNonNull(accessTokenManager.getIfAvailable())
);
}
}

HttpSyncDataConfigurationHttp长轮询数据同步的配置类,负责创建HttpSyncDataService(负责http数据同步的具体实现)、RestTemplateAccessTokenManager (负责与adminhttp调用时access token的处理)。它的注解如下:

  • @Configuration:表示这是一个配置类;
  • @ConditionalOnClass(HttpSyncDataService.class):条件注解,表示要有HttpSyncDataService这个类;
  • @ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url"):条件注解,要有shenyu.sync.http.url这个属性配置。
  • @EnableConfigurationProperties(value = HttpConfig.class):表示让HttpConfig上的注解@ConfigurationProperties(prefix = "shenyu.sync.http")生效,将HttpConfig这个配置类注入Ioc容器中。

2.2 属性初始化

  • HttpSyncDataService

HttpSyncDataService的构造函数中,完成属性初始化。

public class HttpSyncDataService implements SyncDataService {

// 省略了属性字段......

public HttpSyncDataService(final HttpConfig httpConfig,
final PluginDataSubscriber pluginDataSubscriber,
final RestTemplate restTemplate,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers,
final AccessTokenManager accessTokenManager) {
// 1.设置accessTokenManager
this.accessTokenManager = accessTokenManager;
// 2.创建数据处理器
this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
// 3.shenyu-admin的url, 多个用逗号(,)分割
this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
// 4.只用于http长轮询的restTemplate
this.restTemplate = restTemplate;
// 5.开始执行长轮询任务
this.start();
}

//......
}

上面代码中省略了其他函数和相关字段,在构造函数中完成属性的初始化,主要是:

  • 设置accessTokenManager,定时向admin请求更新accessToken的值。然后每次向admin发起请求时都必须将headerX-Access-Token属性设置成accessToken对应的值;

  • 创建数据处理器,用于后续缓存各种类型的数据(插件、选择器、规则、元数据和认证数据);

  • 获取admin属性配置,主要是获取adminurladmin有可能是集群,多个用逗号(,)分割;

  • 设置RestTemplate,用于向admin发起请求;

  • 开始执行长轮询任务。

2.3 开始长轮询

  • HttpSyncDataService#start()

start()方法中,干了两件事情,一个是获取全量数据,即请求admin端获取所有需要同步的数据,然后将获取到的数据缓存到网关内存中。另一个是开启多线程执行长轮询任务。

public class HttpSyncDataService implements SyncDataService {

// ......

private void start() {
// // 只初始化一次,通过原子类实现。
if (RUNNING.compareAndSet(false, true)) {
// 初次启动,获取全量数据
this.fetchGroupConfig(ConfigGroupEnum.values());
// 一个后台服务,一个线程
int threadSize = serverList.size();
// 自定义线程池
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ShenyuThreadFactory.create("http-long-polling", true));
// 开始长轮询,一个admin服务,创建一个线程用于数据同步
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
LOG.info("shenyu http long polling was started, executor=[{}]", executor);
}
}

// ......
}
2.3.1 获取全量数据
  • HttpSyncDataService#fetchGroupConfig()

ShenYu将所有需要同步的数据进行了分组,一共有5种数据类型,分别是插件、选择器、规则、元数据和认证数据。

public enum ConfigGroupEnum {
APP_AUTH, // 认证数据
PLUGIN, //插件
RULE, // 规则
SELECTOR, // 选择器
META_DATA; // 元数据
}

admin有可能是集群,这里通过循环的方式向每个admin发起请求,有一个执行成功了,那么向admin获取全量数据并缓存到网关的操作就执行成功。如果出现了异常,就向下一个admin发起请求。

public class HttpSyncDataService implements SyncDataService {

// ......

private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException {
// admin有可能是集群,这里通过循环的方式向每个admin发起请求
for (int index = 0; index < this.serverList.size(); index++) {
String server = serverList.get(index);
try {
// 真正去执行
this.doFetchGroupConfig(server, groups);
// 有一个成功,就成功了,可以退出循环
break;
} catch (ShenyuException e) {
// 出现异常,尝试执行下一个
// 最后一个也执行失败了,抛出异常
if (index >= serverList.size() - 1) {
throw e;
}
LOG.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
}
}
}

// ......
}
  • HttpSyncDataService#doFetchGroupConfig()

在此方法中,首先拼装请求参数,然后通过httpClient发起请求,到admin中获取数据,最后将获取到的数据更新到网关内存中。

public class HttpSyncDataService implements SyncDataService {
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
// 1. 拼请求参数,所有分组枚举类型
StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
// admin端提供的接口 /configs/fetch
String url = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_FETCH + "?" + StringUtils.removeEnd(params.toString(), "&");
LOG.info("request configs: [{}]", url);
String json;
try {
HttpHeaders headers = new HttpHeaders();
// 设置accessToken
headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());
HttpEntity<String> httpEntity = new HttpEntity<>(headers);
// 2. 发起请求,获取变更数据
json = this.restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class).getBody();
} catch (RestClientException e) {
String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
LOG.warn(message);
throw new ShenyuException(message, e);
}
// 3. 更新网关内存中数据
boolean updated = this.updateCacheWithJson(json);
if (updated) {
LOG.debug("get latest configs: [{}]", json);
return;
}
// 更新成功,此方法就执行完成了
LOG.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
// 服务端没有数据更新,就等30s
ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}
}

从代码中,可以看到 admin端提供的获取全量数据接口是 /configs/fetch,这里先不进一步深入,放在后文再分析。

获取到admin返回结果数据,并成功更新,那么此方法就执行结束了。如果没有更新成功,那么有可能是服务端没有数据更新,就等待30s

这里需要提前说明一下,网关在判断是否更新成功时,有比对数据的操作,马上就会提到。

  • HttpSyncDataService#updateCacheWithJson()

更新网关内存中的数据。使用GSON进行反序列化,从属性data中拿真正的数据,然后交给DataRefreshFactory去做更新。

    private boolean updateCacheWithJson(final String json) {
// 使用GSON进行反序列化
JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
// if the config cache will be updated?
return factory.executor(jsonObject.getAsJsonObject("data"));
}
  • DataRefreshFactory#executor()

根据不同数据类型去更新数据,返回更新结果。具体更新逻辑交给了dataRefresh.refresh()方法。在更新结果中,有一种数据类型进行了更新,就表示此次操作发生了更新。

    public boolean executor(final JsonObject data) {
//并行更新数据
List<Boolean> result = ENUM_MAP.values().parallelStream()
.map(dataRefresh -> dataRefresh.refresh(data))
.collect(Collectors.toList());
//有一个更新就表示此次发生了更新操作
return result.stream().anyMatch(Boolean.TRUE::equals);
}
  • AbstractDataRefresh#refresh()

数据更新逻辑采用的是模板方法设计模式,通用操作在抽象方法中完成,不同的实现逻辑由子类完成。5种数据类型具体的更新逻辑有些差异,但是也存在通用的更新逻辑,类图关系如下:

在通用的refresh()方法中,负责数据类型转换,判断是否需要更新,和实际的数据刷新操作。

public abstract class AbstractDataRefresh<T> implements DataRefresh {

// ......

@Override
public Boolean refresh(final JsonObject data) {
// 数据类型转换
JsonObject jsonObject = convert(data);
if (Objects.isNull(jsonObject)) {
return false;
}

boolean updated = false;
// 得到数据类型
ConfigData<T> result = fromJson(jsonObject);
// 是否需要更新
if (this.updateCacheIfNeed(result)) {
updated = true;
// 真正的更新逻辑,数据刷新操作
refresh(result.getData());
}

return updated;
}

// ......
}
  • AbstractDataRefresh#updateCacheIfNeed()

数据转换的过程,就是根据不同的数据类型进行转换,我们就不再进一步追踪了,看看数据是否需要更新的逻辑。方法名是updateCacheIfNeed(),通过方法重载实现。

public abstract class AbstractDataRefresh<T> implements DataRefresh {

// ......

// result是数据
protected abstract boolean updateCacheIfNeed(ConfigData<T> result);

// newVal是获取到的最新的值
// groupEnum 是哪种数据类型
protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
// 如果是第一次,那么直接放到cache中,返回 true,表示此次进行了更新
if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
return true;
}
ResultHolder holder = new ResultHolder(false);
GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
// md5 值相同,不需要更新
if (StringUtils.equals(oldVal.getMd5(), newVal.getMd5())) {
LOG.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());
return oldVal;
}

// 当前缓存的数据修改时间大于 新来的数据,不需要更新
// must compare the last update time
if (oldVal.getLastModifyTime() >= newVal.getLastModifyTime()) {
LOG.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum);
return oldVal;
}
LOG.info("update {} config: {}", groupEnum, newVal);
holder.result = true;
return newVal;
});
return holder.result;
}

// ......
}

从上面的源码中可以看到,有两种情况不需要更新:

  • 两个的数据的md5 值相同,不需要更新;
  • 当前缓存的数据修改时间大于 新来的数据,不需要更新。

其他情况需要更新数据。

分析到这里,就将start() 方法中初次启动,获取全量数据的逻辑分析完了,接下来是长轮询的操作。为了方便,我将start()方法再粘贴一次:

public class HttpSyncDataService implements SyncDataService {

// ......

private void start() {
// // 只初始化一次,通过原子类实现。
if (RUNNING.compareAndSet(false, true)) {
// 初次启动,获取全量数据
this.fetchGroupConfig(ConfigGroupEnum.values());
// 一个后台服务,一个线程
int threadSize = serverList.size();
// 自定义线程池
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ShenyuThreadFactory.create("http-long-polling", true));
// 开始长轮询,一个admin服务,创建一个线程用于数据同步
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
LOG.info("shenyu http long polling was started, executor=[{}]", executor);
}
}

// ......
}
2.3.2 执行长轮询任务
  • HttpLongPollingTask#run()

长轮询任务是HttpLongPollingTask,它实现了Runnable接口,任务逻辑在run()方法中。通过while()循环实现不断执行任务,即长轮询。在每一次的轮询中有三次重试逻辑,一次轮询任务失败了,等 5s 再继续,3 次都失败了,等5 分钟再试。

开始长轮询,一个admin服务,创建一个线程用于数据同步。

class HttpLongPollingTask implements Runnable {

private final String server;

HttpLongPollingTask(final String server) {
this.server = server;
}

@Override
public void run() {
// 一直轮询
while (RUNNING.get()) {
// 默认重试 3 次
int retryTimes = 3;
for (int time = 1; time <= retryTimes; time++) {
try {
doLongPolling(server);
} catch (Exception e) {
if (time < retryTimes) {
LOG.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
time, retryTimes - time, e.getMessage());
// 长轮询失败了,等 5s 再继续
ThreadUtils.sleep(TimeUnit.SECONDS, 5);
continue;
}
LOG.error("Long polling failed, try again after 5 minutes!", e);
// 3 次都失败了,等 5 分钟再试
ThreadUtils.sleep(TimeUnit.MINUTES, 5);
}
}
}
LOG.warn("Stop http long polling.");
}
}
  • HttpSyncDataService#doLongPolling()

执行长轮询任务的核心逻辑:

  • 根据数据类型组装请求参数:md5lastModifyTime
  • 组装请求头和请求体;
  • admin发起请求,判断组数据是否发生变更;
  • 根据发生变更的组,再去获取数据。
public class HttpSyncDataService implements SyncDataService {
private void doLongPolling(final String server) {
// 组装请求参数:md5 和 lastModifyTime
MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
ConfigData<?> cacheConfig = factory.cacheConfigData(group);
if (cacheConfig != null) {
String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
params.put(group.name(), Lists.newArrayList(value));
}
}
// 组装请求头和请求体
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
// 设置accessToken
headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());
HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, headers);
String listenerUrl = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_LISTENER;

JsonArray groupJson;
//向admin发起请求,判断组数据是否发生变更
//这里只是判断了某个组是否发生变更
try {
String json = this.restTemplate.postForEntity(listenerUrl, httpEntity, String.class).getBody();
LOG.info("listener result: [{}]", json);
JsonObject responseFromServer = GsonUtils.getGson().fromJson(json, JsonObject.class);
groupJson = responseFromServer.getAsJsonArray("data");
} catch (RestClientException e) {
String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
throw new ShenyuException(message, e);
}
// 根据发生变更的组,再去获取数据
/**
* 官网对此处的解释:
* 网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。
* 这里可能会存在一个疑问:为什么不是直接将变更的数据写出?
* 我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,
* 或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。
*
* 个人理解:
* 如果将变更数据直接写出,当管理员频繁更新配置时,第一次更新了,将client移除阻塞队列,返回响应信息给网关。
* 如果这个时候进行了第二次更新,那么当前的client是不在阻塞队列中,所以这一次的变更就会错过。
* 网关层处理不及时,也是同理。
* 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。
* 如果admin有数据变更,当前网关client是没有在阻塞队列中,就不到数据。
*/
if (Objects.nonNull(groupJson) && groupJson.size() > 0) {
// fetch group configuration async.
ConfigGroupEnum[] changedGroups = GsonUtils.getGson().fromJson(groupJson, ConfigGroupEnum[].class);
LOG.info("Group config changed: {}", Arrays.toString(changedGroups));
this.doFetchGroupConfig(server, changedGroups);
}
}
}

这里需要特别解释一点的是:在长轮询任务中,为什么不直接拿到变更的数据?而是先判断哪个分组数据发生了变更,然后再次请求admin,获取变更数据?

官网对此处的解释是:

网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。 这里可能会存在一个疑问:为什么不是直接将变更的数据写出? 我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时, 或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。

个人理解是:

如果将变更数据直接写出,管理员频繁更新配置时,第一次更新了,将client移除阻塞队列,返回响应信息给网关。如果这个时候进行了第二次更新,那么当前的client是不在阻塞队列中,所以这一次的变更就会错过。网关层处理不及时,也是同理。 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。如果admin有数据变更,当前网关client是没有在阻塞队列中,就会更新不到数据。

我们还没有分析到admin端的处理逻辑,先大概说一下。在admin端,会将网关client放到阻塞队列,有数据变更,网关client就会出队列,发送变更数据。所以,如果有数据变更时,网关client不在阻塞队列,那么就无法得到当前变更的数据。

知道哪个分组数据发生变更时,主动再向admin获取变更的数据,根据分组不同,全量拿数据。调用方法是doFetchGroupConfig(),这个在前面已经分析过了。

分析到这里,网关端的数据同步操作就完成了。长轮询任务就是不断向admin发起请求,看看数据是否发生变更,如果有分组数据发生变更,那么就再主动向admin发起请求,获取变更数据,然后更新网关内存中的数据。

网关端长轮询任务流程:

3. admin数据同步

从前面分析的过程中,可以看到,网关端主要调用admin的两个接口:

  • /configs/listener:判断组数据是否发生变更;
  • /configs/fetch:获取变更组数据。

直接从这两个接口分析的话,可能有的地方不好理解,所以我们还是从admin启动流程开始分析数据同步过程。

3.1 加载配置

如果在配置文件application.yml中,进行了如下配置,就表示通过http长轮询的方式进行数据同步。

shenyu:
sync:
http:
enabled: true

程序启动时,通过springboot条件装配实现数据同步类的配置加载。在这个过程中,会创建HttpLongPollingDataChangedListener,负责处理长轮询的相关实现逻辑。

/**
* 数据同步配置类
* 通过springboot条件装配实现
* The type Data sync configuration.
*/
@Configuration
public class DataSyncConfiguration {

/**
* http长轮询
* http long polling.
*/
@Configuration
@ConditionalOnProperty(name = "shenyu.sync.http.enabled", havingValue = "true")
@EnableConfigurationProperties(HttpSyncProperties.class)
static class HttpLongPollingListener {

@Bean
@ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)
public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
return new HttpLongPollingDataChangedListener(httpSyncProperties);
}
}
}

3.2 数据变更监听器实例化

  • HttpLongPollingDataChangedListener

数据变更监听器通过构造函数的方式完成实例化和初始化操作。在构造函数中会创建阻塞队列,用于存放客户端;创建线程池,用于执行延迟任务,周期任务;保存长轮询相关属性信息。

    public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
// 默认客户端(这里是网关)1024个
this.clients = new ArrayBlockingQueue<>(1024);
// 创建线程池
// ScheduledThreadPoolExecutor 可以执行延迟任务,周期任务,普通任务
this.scheduler = new ScheduledThreadPoolExecutor(1,
ShenyuThreadFactory.create("long-polling", true));
// 长轮询的属性信息
this.httpSyncProperties = httpSyncProperties;
}

另外,它的类图关系如下:

实现了InitializingBean接口,所以在bean的初始化过程中执行afterInitialize()方法。通过线程池执行周期任务:更新内存中(CACHE)的数据每隔5分钟执行一次,5分钟后开始执行。刷新本地缓存就是从数据库读取数据到本地缓存(这里就是内存),通过refreshLocalCache()完成。

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {

// ......

/**
* 在 InitializingBean接口中的afterPropertiesSet()方法中被调用,即在bean的初始化过程中执行
*/
@Override
protected void afterInitialize() {
long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
// 执行周期任务:更新内存中(CACHE)的数据每隔5分钟执行一次,5分钟后开始执行
// 防止admin先启动一段时间后,产生了数据;然后网关初次连接时,没有拿到全量数据
scheduler.scheduleWithFixedDelay(() -> {
LOG.info("http sync strategy refresh config start.");
try {
// 从数据库读取数据到本地缓存(这里就是内存)
this.refreshLocalCache();
LOG.info("http sync strategy refresh config success.");
} catch (Exception e) {
LOG.error("http sync strategy refresh config error!", e);
}
}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
LOG.info("http sync strategy refresh interval: {}ms", syncInterval);
}

// ......
}
  • refreshLocalCache()

分别对5种数据类型进行更新。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {

// ......

// 从数据库读取数据到本地缓存(这里就是内存)
private void refreshLocalCache() {
//更新认证数据
this.updateAppAuthCache();
//更新插件数据
this.updatePluginCache();
//更新规则数据
this.updateRuleCache();
//更新选择器数据
this.updateSelectorCache();
//更新元数据
this.updateMetaDataCache();
}

// ......
}

5个更新方法的逻辑是类似的,调用service方法获取数据,然后放到内存CACHE中。以更新规则数据方法updateRuleCache()为例,传入规则枚举类型,调用ruleService.listAll()从数据库获取所有规则数据。

    /**
* Update rule cache.
*/
protected void updateRuleCache() {
this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll());
}
  • updateCache()

使用数据库中的数据更新内存中的数据。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {

// ......

// 缓存数据的 Map
protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();

/**
* if md5 is not the same as the original, then update lcoal cache.
* 更新缓存中的数据
* @param group ConfigGroupEnum
* @param <T> the type of class
* @param data the new config data
*/
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
//数据序列化
String json = GsonUtils.getInstance().toJson(data);
//传入md5值和修改时间
ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
//更新分组数据
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
LOG.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}

// ......
}

初始化的过程就是启动周期性任务,定时从数据库获取数据更新内存数据。

接下来开始对两个接口开始分析:

  • /configs/listener:判断组数据是否发生变更;
  • /configs/fetch:获取变更组数据。

3.3 数据变更轮询接口

  • /configs/listener:判断组数据是否发生变更;

接口类是ConfigController,只有使用http长轮询进行数据同步时才会生效。接口方法listener()没有其他逻辑,直接调用doLongPolling()方法。

   
/**
* This Controller only when HttpLongPollingDataChangedListener exist, will take effect.
*/
@ConditionalOnBean(HttpLongPollingDataChangedListener.class)
@RestController
@RequestMapping("/configs")
public class ConfigController {

private final HttpLongPollingDataChangedListener longPollingListener;

public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {
this.longPollingListener = longPollingListener;
}

// 省略其他逻辑

/**
* Listener.
* 监听数据变更,执行长轮询
* @param request the request
* @param response the response
*/
@PostMapping(value = "/listener")
public void listener(final HttpServletRequest request, final HttpServletResponse response) {
longPollingListener.doLongPolling(request, response);
}

}
  • HttpLongPollingDataChangedListener#doLongPolling()

执行长轮询任务:如果有数据变更,将会立即响应给客户端(这里就是网关端)。否则,客户端会一直被阻塞,直到有数据变更或者超时。

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {

// ......

/**
* 执行长轮询:如果有数据变更,会立即响应给客户端(这里就是网关端)。
* 否则,否则客户端会一直被阻塞,直到有数据变更或者超时。
* @param request
* @param response
*/
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// compare group md5
// 比较md5,判断网关的数据和admin端的数据是否一致,得到发生变更的数据组
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
// response immediately.
// 有变更的数据,则立即向网关响应
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
Log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}

// 没有变更,则将客户端(这里就是网关)放进阻塞队列
final AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0L);
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}

// ......
}
  • HttpLongPollingDataChangedListener#compareChangedGroup()

判断组数据是否发生变更,判断逻辑是比较网关端和admin端的md5值和lastModifyTime

  • 如果md5值不一样,那么需要更新;
  • 如果admin端的lastModifyTime大于网关端的lastModifyTime,那么需要更新。
 /**
* 判断组数据是否发生变更
* @param request
* @return
*/
private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
List<ConfigGroupEnum> changedGroup = new ArrayList<>(ConfigGroupEnum.values().length);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
// 网关端数据的md5值和lastModifyTime
String[] params = StringUtils.split(request.getParameter(group.name()), ',');
if (params == null || params.length != 2) {
throw new ShenyuException("group param invalid:" + request.getParameter(group.name()));
}
String clientMd5 = params[0];
long clientModifyTime = NumberUtils.toLong(params[1]);
ConfigDataCache serverCache = CACHE.get(group.name());
// do check. 判断组数据是否发生变更
if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
changedGroup.add(group);
}
}
return changedGroup;
}
  • LongPollingClient

没有变更数据,则将客户端(这里就是网关)放进阻塞队列。阻塞时间是60秒,即60秒后移除,并响应客户端。

class LongPollingClient implements Runnable {
// 省略了其他逻辑

@Override
public void run() {
try {
// 先设置定时任务:60秒后移除,并响应客户端
this.asyncTimeoutFuture = scheduler.schedule(() -> {
clients.remove(LongPollingClient.this);
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);

// 添加到阻塞队列
clients.add(this);

} catch (Exception ex) {
log.error("add long polling client error", ex);
}
}

/**
* Send response.
*
* @param changedGroups the changed groups
*/
void sendResponse(final List<ConfigGroupEnum> changedGroups) {
// cancel scheduler
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
// 响应变更的组
generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
asyncContext.complete();
}
}

3.4 获取变更数据接口

  • /configs/fetch:获取变更数据;

根据网关传入的参数,获取分组数据,返回结果。主要实现方法是longPollingListener.fetchConfig()


@ConditionalOnBean(HttpLongPollingDataChangedListener.class)
@RestController
@RequestMapping("/configs")
public class ConfigController {

private final HttpLongPollingDataChangedListener longPollingListener;

public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {
this.longPollingListener = longPollingListener;
}

/**
* Fetch configs shenyu result.
* 全量获取分组数据
* @param groupKeys the group keys
* @return the shenyu result
*/
@GetMapping("/fetch")
public ShenyuAdminResult fetchConfigs(@NotNull final String[] groupKeys) {
Map<String, ConfigData<?>> result = Maps.newHashMap();
for (String groupKey : groupKeys) {
ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));
result.put(groupKey, data);
}
return ShenyuAdminResult.success(ShenyuResultMessage.SUCCESS, result);
}

// 省略了其他接口

}
  • AbstractDataChangedListener#fetchConfig()

数据获取直接从CACHE中拿,然后根据不同分组类型进行匹配,封装。

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
/**
* fetch configuration from cache.
* 获取分组下的全量数据
* @param groupKey the group key
* @return the configuration data
*/
public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
// 直接从 CACHE 中拿数据
ConfigDataCache config = CACHE.get(groupKey.name());
switch (groupKey) {
case APP_AUTH: // 认证数据
return buildConfigData(config, AppAuthData.class);
case PLUGIN: // 插件数据
return buildConfigData(config, PluginData.class);
case RULE: // 规则数据
return buildConfigData(config, RuleData.class);
case SELECTOR: // 选择器数据
return buildConfigData(config, SelectorData.class);
case META_DATA: // 元数据
return buildConfigData(config, MetaData.class);
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected groupKey: " + groupKey);
}
}
}

3.5 数据变更

在之前的websocket数据同步和zookeeper数据同步源码分析文章中,我们知道admin端数据同步设计结构如下:

各种数据变更监听器都是DataChangedListener的子类。

当在admin端修改数据后,通过Spring的事件处理机制,发送事件通知。发送逻辑如下:


/**
* Event forwarders, which forward the changed events to each ConfigEventListener.
* 数据变更事件分发器:当admin端有数据发生变更时,将变更数据同步到 ShenYu 网关
* 数据变更依赖于Spring的事件监听机制:ApplicationEventPublisher --> ApplicationEvent --> ApplicationListener
*
*/
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

//省略了其他逻辑

/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件信息
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则信息
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
// 当选择器数据更新时,更新API文档信息
applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
}

假设,对插件信息进行了修改,通过http长轮询的方式进行数据同步,那么listener.onPluginChanged()的实际调用的是org.apache.shenyu.admin.listener.AbstractDataChangedListener#onPluginChanged

    /**
* 在admin的操作,有插件发生了更新
* @param changed the changed
* @param eventType the event type
*/
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
// 更新内存CACHE
this.updatePluginCache();
// 执行变更任务
this.afterPluginChanged(changed, eventType);
}

有两个处理操作,一是更新内存CACHE,这个在前面分析过了;另一个是执行变更任务,在线程池中执行。

  • HttpLongPollingDataChangedListener#afterPluginChanged()
    @Override
protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
// 在线程池中执行
scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));
}
  • DataChangeTask

数据变更任务:将阻塞队列中的客户端依次移除,并发送响应,通知网关有组数据发生变更。

class DataChangeTask implements Runnable {
//省略了其他逻辑
@Override
public void run() {
// 阻塞队列中的客户端超过了给定的值100,则分批执行
if (clients.size() > httpSyncProperties.getNotifyBatchSize()) {
List<LongPollingClient> targetClients = new ArrayList<>(clients.size());
clients.drainTo(targetClients);
List<List<LongPollingClient>> partitionClients = Lists.partition(targetClients, httpSyncProperties.getNotifyBatchSize());
// 分批执行
partitionClients.forEach(item -> scheduler.execute(() -> doRun(item)));
} else {
// 执行任务
doRun(clients);
}
}

private void doRun(final Collection<LongPollingClient> clients) {
// 通知所有客户端发生了数据变更
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
LongPollingClient client = iter.next();
iter.remove();
// 发送响应
client.sendResponse(Collections.singletonList(groupKey));
LOG.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
}
}
}

至此,admin端数据同步逻辑就分析完了。在基于http长轮询数据同步是,它主要有三个功能:

  • 提供数据变更监听接口;
  • 提供获取变更数据接口;
  • 有数据变更时,移除阻塞队列中的客户端,并响应结果。

最后,用三张图描述下admin端长轮询任务流程:

  • /configs/listener数据变更监听接口:

  • /configs/fetch获取变更数据接口:

  • 在admin后台管理系统更新数据,进行数据同步:

4. 总结

本文主要对ShenYu网关中的http长轮询数据同步进行了源码分析。涉及到的主要知识点如下:

  • http长轮询由网关端主动发起请求,不断请求admin端;
  • 变更数据以组为粒度(认证信息、插件、选择器、规则、元数据);
  • http长轮询结果只拿到了变更组,还需要再次发起请求获取组数据;
  • 数据是否更新由md5值和修改时间lastModifyTime决定。

· One min read

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于Nacos的数据同步源码分析。

本文基于shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. 关于Nacos

Nacos 平台用于动态服务发现,以及配置和服务管理。 Shenyu网关可选择使用Nacos进行数据同步。

2. Admin数据同步

我们从一个实际案例进行源码追踪,比如在后台管理系统中,对Divide插件中的一条选择器数据进行更新,将权重更新为90:

2.1 接收数据

  • SelectorController.updateSelector()

进入SelectorController类中的updateSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/selector")
public class SelectorController {

@PutMapping("/{id}")
public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) {
// 设置当前选择器数据id
selectorDTO.setId(id);
// 创建或更新操作
Integer updateCount = selectorService.createOrUpdate(selectorDTO);
// 返回结果信息
return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount);
}

// ......
}

2.2 处理数据

  • SelectorServiceImpl.createOrUpdate()

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {
// 负责事件发布的eventPublisher
private final ApplicationEventPublisher eventPublisher;

@Override
@Transactional(rollbackFor = Exception.class)
public int createOrUpdate(final SelectorDTO selectorDTO) {
int selectorCount;
// 构建数据 DTO --> DO
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
// 判断是添加还是更新
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 插入选择器数据
selectorCount = selectorMapper.insertSelective(selectorDO);
// 插入选择器中的条件数据
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
// check selector add
// 权限检查
if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {
DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();
dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());
dataPermissionDTO.setDataId(selectorDO.getId());
dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);
dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));
}

} else {
// 更新数据,先删除再新增
selectorCount = selectorMapper.updateSelective(selectorDO);
//delete rule condition then add
selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
selectorConditionMapper.insertSelective(selectorConditionDO);
});
}
// 发布事件
publishEvent(selectorDO, selectorConditionDTOs);

// 更新upstream
updateDivideUpstream(selectorDO);
return selectorCount;
}


// ......

}

Service类完成数据的持久化操作,即保存数据到数据库,这个比较简单,就不深入追踪了。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会执行数据同步。

publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。

     private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {
// 找到选择器对应的插件
PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
// 构建条件数据
List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
// 发布变更数据
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}

发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者;

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {
//......
}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

//......

}

2.3 分发数据

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件信息
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则信息
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}

}

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:

  • WebsocketDataChangedListener:基于websocket的数据同步;
  • ZookeeperDataChangedListener:基于zookeeper的数据同步;
  • ConsulDataChangedListener:基于consul的数据同步;
  • EtcdDataDataChangedListener:基于etcd的数据同步;
  • HttpLongPollingDataChangedListener:基于http长轮询的数据同步;
  • NacosDataChangedListener:基于nacos的数据同步;

既然有这么多种实现策略,那么如何确定使用哪一种呢?

因为本文是基于Nacos的数据同步源码分析,所以这里以NacosDataChangedListener为例,分析它是如何被加载并实现的。

通过查看对NacosDataChangedListener类的调用,可以发现,它是在DataSyncConfiguration类进行配置的。

/**
* 数据同步配置类
* 通过springboot条件装配实现
* The type Data sync configuration.
*/
@Configuration
public class DataSyncConfiguration {

//省略了其他代码......

/**
* The type Nacos listener.
*/
@Configuration
@ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url")
@Import(NacosConfiguration.class)
static class NacosListener {

/**
* Data changed listener data changed listener.
*
* @param configService the config service
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(NacosDataChangedListener.class)
public DataChangedListener nacosDataChangedListener(final ConfigService configService) {
return new NacosDataChangedListener(configService);
}

/**
* Nacos data init zookeeper data init.
*
* @param configService the config service
* @param syncDataService the sync data service
* @return the nacos data init
*/
@Bean
@ConditionalOnMissingBean(NacosDataInit.class)
public NacosDataInit nacosDataInit(final ConfigService configService, final SyncDataService syncDataService) {
return new NacosDataInit(configService, syncDataService);
}
}

//省略了其他代码......
}

这个配置类是通过SpringBoot条件装配类实现的。在NacosListener类上面有几个注解:

  • @Configuration:配置文件,应用上下文;

  • @ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url"):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用nacos进行数据同步。

    shenyu:  
    sync:
    nacos:
    url: localhost:8848
  • @Import(NacosConfiguration.class):导入另一个配置类NacosConfigurationNacosConfiguration提供了一个方法ConfigService nacosConfigService(final NacosProperties nacosProp),将Nacos属性转换为ConfigService类型的bean,而Nacos属性是通过@EnableConfigurationProperties(NacosProperties.class) 导入的。我们先看ConfigService类型的bean定义。再分析属性配置类和对应的属性配置文件。

/**
* Nacos configuration.
*/
@EnableConfigurationProperties(NacosProperties.class)
public class NacosConfiguration {

/**
* register configService in spring ioc.
*
* @param nacosProp the nacos configuration
* @return ConfigService {@linkplain ConfigService}
* @throws Exception the exception
*/
@Bean
@ConditionalOnMissingBean(ConfigService.class)
public ConfigService nacosConfigService(final NacosProperties nacosProp) throws Exception {
Properties properties = new Properties();
if (nacosProp.getAcm() != null && nacosProp.getAcm().isEnabled()) {
// Use aliyun ACM service
properties.put(PropertyKeyConst.ENDPOINT, nacosProp.getAcm().getEndpoint());
properties.put(PropertyKeyConst.NAMESPACE, nacosProp.getAcm().getNamespace());
// Use subaccount ACM administrative authority
properties.put(PropertyKeyConst.ACCESS_KEY, nacosProp.getAcm().getAccessKey());
properties.put(PropertyKeyConst.SECRET_KEY, nacosProp.getAcm().getSecretKey());
} else {
properties.put(PropertyKeyConst.SERVER_ADDR, nacosProp.getUrl());
if (StringUtils.isNotBlank(nacosProp.getNamespace())) {
properties.put(PropertyKeyConst.NAMESPACE, nacosProp.getNamespace());
}
if (StringUtils.isNotBlank(nacosProp.getUsername())) {
properties.put(PropertyKeyConst.USERNAME, nacosProp.getUsername());
}
if (StringUtils.isNotBlank(nacosProp.getPassword())) {
properties.put(PropertyKeyConst.PASSWORD, nacosProp.getPassword());
}
}
return NacosFactory.createConfigService(properties);
}
}

这个方法主要分成两步,第一步根据是否使用了aliyun的ACM服务,从NacosProperties中获取不同的nacos路径和鉴权信息,第二步根据获取到的这些属性,使用Nacos官方的工厂方法,使用反射的方式,创建configService。

接下来,让我们分析一下Nacos的属性配置和对应的配置文件。

/**
* The type Nacos config.
*/
@ConfigurationProperties(prefix = "shenyu.sync.nacos")
public class NacosProperties {

private String url;

private String namespace;

private String username;

private String password;

private NacosACMProperties acm;

/**
* Gets the value of url.
*
* @return the value of url
*/
public String getUrl() {
return url;
}

/**
* Sets the url.
*
* @param url url
*/
public void setUrl(final String url) {
this.url = url;
}

/**
* Gets the value of namespace.
*
* @return the value of namespace
*/
public String getNamespace() {
return namespace;
}

/**
* Sets the namespace.
*
* @param namespace namespace
*/
public void setNamespace(final String namespace) {
this.namespace = namespace;
}

/**
* Gets the value of username.
*
* @return the value of username
*/
public String getUsername() {
return username;
}

/**
* Sets the username.
*
* @param username username
*/
public void setUsername(final String username) {
this.username = username;
}

/**
* Gets the value of password.
*
* @return the value of password
*/
public String getPassword() {
return password;
}

/**
* Sets the password.
*
* @param password password
*/
public void setPassword(final String password) {
this.password = password;
}

/**
* Gets the value of acm.
*
* @return the value of acm
*/
public NacosACMProperties getAcm() {
return acm;
}

/**
* Sets the acm.
*
* @param acm acm
*/
public void setAcm(final NacosACMProperties acm) {
this.acm = acm;
}

public static class NacosACMProperties {

private boolean enabled;

private String endpoint;

private String namespace;

private String accessKey;

private String secretKey;

/**
* Gets the value of enabled.
*
* @return the value of enabled
*/
public boolean isEnabled() {
return enabled;
}

/**
* Sets the enabled.
*
* @param enabled enabled
*/
public void setEnabled(final boolean enabled) {
this.enabled = enabled;
}

/**
* Gets the value of endpoint.
*
* @return the value of endpoint
*/
public String getEndpoint() {
return endpoint;
}

/**
* Sets the endpoint.
*
* @param endpoint endpoint
*/
public void setEndpoint(final String endpoint) {
this.endpoint = endpoint;
}

/**
* Gets the value of namespace.
*
* @return the value of namespace
*/
public String getNamespace() {
return namespace;
}

/**
* Sets the namespace.
*
* @param namespace namespace
*/
public void setNamespace(final String namespace) {
this.namespace = namespace;
}

/**
* Gets the value of accessKey.
*
* @return the value of accessKey
*/
public String getAccessKey() {
return accessKey;
}

/**
* Sets the accessKey.
*
* @param accessKey accessKey
*/
public void setAccessKey(final String accessKey) {
this.accessKey = accessKey;
}

/**
* Gets the value of secretKey.
*
* @return the value of secretKey
*/
public String getSecretKey() {
return secretKey;
}

/**
* Sets the secretKey.
*
* @param secretKey secretKey
*/
public void setSecretKey(final String secretKey) {
this.secretKey = secretKey;
}
}

}

当我们在配置文件中配置了shenyu.sync.nacos.url属性时,将采用nacos进行数据同步,此时配置类NacosListener会生效,并生成NacosDataChangedListenerNacosDataInit类型的bean。

  • 生成NacosDataChangedListener类型的bean,nacosDataChangedListener,这个bean将ConfigService类型的bean作为成员变量,ConfigService是nacos官方提供的api,当nacosDataChangedListener监听到事件时,进行回调操作,可以通过该api直接与nacos服务器交互,修改配置。
  • 生成NacosDataInit类型的bean,nacosDataInit,这个bean将beanconfigService和beansyncDataService作为成员变量,调用Nacos的api configService判断配置是否未初始化,未初始化则调用syncDataService进行刷新操作,将在下文详述。 根据上文所述,在事件处理方法onApplicationEvent()中,会触发相应的listener的操作。在我们的案例中,是对一条选择器数据进行更新,数据同步采用的是nacos,所以,代码会进入到NacosDataChangedListener进行选择器数据变更处理。
    //DataChangedEventDispatcher.java
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {

// 省略了其他逻辑

case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); // 在我们的案例中,会进入到NacosDataChangedListener进行选择器数据变更处理
break;
}
}

2.4 Nacos数据变更监听器

  • NacosDataChangedListener.onSelectorChanged()

    onSelectorChanged()方法中,判断操作类型,是刷新同步还是更新或创建同步。根据当前选择器数据信息判断节点是否在nacos中。

/**
* Use nacos to push data changes.
*/
public class NacosDataChangedListener implements DataChangedListener {
// 选择器信息发生改变
@Override
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
updateSelectorMap(getConfig(NacosPathConstants.SELECTOR_DATA_ID));
switch (eventType) {
case DELETE:
changed.forEach(selector -> {
List<SelectorData> ls = SELECTOR_MAP
.getOrDefault(selector.getPluginName(), new ArrayList<>())
.stream()
.filter(s -> !s.getId().equals(selector.getId()))
.sorted(SELECTOR_DATA_COMPARATOR)
.collect(Collectors.toList());
SELECTOR_MAP.put(selector.getPluginName(), ls);
});
break;
case REFRESH:
case MYSELF:
SELECTOR_MAP.keySet().removeAll(SELECTOR_MAP.keySet());
changed.forEach(selector -> {
List<SelectorData> ls = SELECTOR_MAP
.getOrDefault(selector.getPluginName(), new ArrayList<>())
.stream()
.sorted(SELECTOR_DATA_COMPARATOR)
.collect(Collectors.toList());
ls.add(selector);
SELECTOR_MAP.put(selector.getPluginName(), ls);
});
break;
default:
changed.forEach(selector -> {
List<SelectorData> ls = SELECTOR_MAP
.getOrDefault(selector.getPluginName(), new ArrayList<>())
.stream()
.filter(s -> !s.getId().equals(selector.getId()))
.sorted(SELECTOR_DATA_COMPARATOR)
.collect(Collectors.toList());
ls.add(selector);
SELECTOR_MAP.put(selector.getPluginName(), ls);
});
break;
}
publishConfig(NacosPathConstants.SELECTOR_DATA_ID, SELECTOR_MAP);
}
}

这部分是核心。changed表示需更新的SelectorData列表,eventType表示事件类型。SELECTOR_MAP的类型是ConcurrentMap<String, List<SelectorData>>,该map的key为selector所属的plugin的名称,value为该plugin下的selector列表。NacosPathConstants.SELECTOR_DATA_ID的值为shenyu.selector.json。操作步骤如下,第一步,使用getConfig方法调用Nacos的api,从Nacos获取groupshenyu.selector.json的配置信息,updateSelectorMap方法使用这些配置信息更新SELECTOR_MAP,这样就同步到了Nacos上最新的selector信息。第二步,再根据事件类型来更新SELECTOR_MAP,最后使用publishConfig方法,调用Nacos的api,将Nacos上,groupshenyu.selector.json的配置进行全量替换。

只要将变动的数据正确写入到Nacos上,admin这边的操作就执行完成了。

在我们当前的案例中,对Divide插件中的一条选择器数据进行更新,将权重更新为90,就会对图中的特定节点更新。

我们用时序图将上面的更新流程串联起来。

3. 网关数据同步

假设ShenYu网关已经在正常运行,使用的数据同步方式也是nacos。那么当在admin端更新选择器数据后,并且向nacos发送了变更的数据,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。

3.1 NacosSyncDataService接收数据

网关是通过NacosSyncDataServicenacos进行监听并获取数据更新的,但是在这部分内容之前,我们先看一下NacosSyncDataService类型的bean是如何生成的。答案是在Spring配置类NacosSyncDataConfiguration中定义的。我们看到NacosSyncDataConfiguration类上的注解,@ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url"),这个注解我们在上文对ShenYu的Admin端中的NacosListener类进行分析时看到过,是一个属性条件判断,满足条件,该配置类才会生效。也就是说,当我们在Shenyu网关端有如下配置时,就表示Shenyu网关端采用nacos进行数据同步,NacosSyncDataConfiguration这个配置类生效。

shenyu:  
sync:
nacos:
url: localhost:8848
/**
* Nacos sync data configuration for spring boot.
*/
@Configuration
@ConditionalOnClass(NacosSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url")
public class NacosSyncDataConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncDataConfiguration.class);

/**
* Nacos sync data service.
*
* @param configService the config service
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
@Bean
public SyncDataService nacosSyncDataService(final ObjectProvider<ConfigService> configService, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
LOGGER.info("you use nacos sync shenyu data.......");
return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}

/**
* Nacos config service config service.
*
* @param nacosConfig the nacos config
* @return the config service
* @throws Exception the exception
*/
@Bean
public ConfigService nacosConfigService(final NacosConfig nacosConfig) throws Exception {
Properties properties = new Properties();
if (nacosConfig.getAcm() != null && nacosConfig.getAcm().isEnabled()) {
properties.put(PropertyKeyConst.ENDPOINT, nacosConfig.getAcm().getEndpoint());
properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getAcm().getNamespace());
properties.put(PropertyKeyConst.ACCESS_KEY, nacosConfig.getAcm().getAccessKey());
properties.put(PropertyKeyConst.SECRET_KEY, nacosConfig.getAcm().getSecretKey());
} else {
properties.put(PropertyKeyConst.SERVER_ADDR, nacosConfig.getUrl());
if (StringUtils.isNotBlank(nacosConfig.getNamespace())) {
properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getNamespace());
}
if (nacosConfig.getUsername() != null) {
properties.put(PropertyKeyConst.USERNAME, nacosConfig.getUsername());
}
if (nacosConfig.getPassword() != null) {
properties.put(PropertyKeyConst.PASSWORD, nacosConfig.getPassword());
}
}
return NacosFactory.createConfigService(properties);
}

/**
* Http config http config.
*
* @return the http config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu.sync.nacos")
public NacosConfig nacosConfig() {
return new NacosConfig();
}
}

我们重点关注一下上面代码中nacosSyncDataService这个bean的生成:

@Bean
public SyncDataService nacosSyncDataService(final ObjectProvider<ConfigService> configService, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
LOGGER.info("you use nacos sync shenyu data.......");
return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}

是直接调用NacosSyncDataService的构造方法new了一个该类型的对象。我们继续看构造方法:

public NacosSyncDataService(final ConfigService configService, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {

super(configService, pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
start();
}
    public void start() {
watcherData(NacosPathConstants.PLUGIN_DATA_ID, this::updatePluginMap);
watcherData(NacosPathConstants.SELECTOR_DATA_ID, this::updateSelectorMap);
watcherData(NacosPathConstants.RULE_DATA_ID, this::updateRuleMap);
watcherData(NacosPathConstants.META_DATA_ID, this::updateMetaDataMap);
watcherData(NacosPathConstants.AUTH_DATA_ID, this::updateAuthMap);
}
    protected void watcherData(final String dataId, final OnChange oc) {
Listener listener = new Listener() {
@Override
public void receiveConfigInfo(final String configInfo) {
oc.change(configInfo);
}

@Override
public Executor getExecutor() {
return null;
}
};
oc.change(getConfigAndSignListener(dataId, listener));
LISTENERS.computeIfAbsent(dataId, key -> new ArrayList<>()).add(listener);
}

可以看到,在构造方法中调用了start方法,并且通过watcherData方法创建了监听器,并且关联了回调函数oc,由于我们正在分析selector类型组件的变化,对应的回调函数是updateSelectorMap。这个回调函数用于处理数据。

3.2 处理数据

  • NacosCacheHandler.updateSelectorMap()

经过判空逻辑之后,缓存选择器数据的操作又交给了PluginDataSubscriber处理。

    protected void updateSelectorMap(final String configInfo) {
try {
List<SelectorData> selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
subscriber.unSelectorSubscribe(selectorData);
subscriber.onSelectorSubscribe(selectorData);
}));
} catch (JsonParseException e) {
LOG.error("sync selector data have error:", e);
}
}

PluginDataSubscriber是一个接口,它只有一个CommonPluginDataSubscriber实现类,负责处理插件、选择器和规则数据。

3.3 通用插件数据订阅者

  • PluginDataSubscriber.onSelectorSubscribe()

它没有其他逻辑,直接调用subscribeDataHandler()方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。

/**
* 通用插件数据订阅者,负责处理所有插件、选择器和规则信息
* The type Common plugin data subscriber.
*/
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
//......
// 处理选择器数据
@Override
public void onSelectorSubscribe(final SelectorData selectorData) {
subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}

// 订阅数据处理器,处理数据的更新或删除
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
// 插件数据
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cachePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
}
} else if (data instanceof SelectorData) { // 选择器数据
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理 Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) { // 规则数据
RuleData ruleData = (RuleData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
}
}
});
}

}

3.4 数据缓存到内存

那么更新一条选择器数据,会进入下面的逻辑:

// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

一是将数据保存到网关的内存中。BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

public final class BaseDataCache {
// 私有变量
private static final BaseDataCache INSTANCE = new BaseDataCache();
// 私有构造器
private BaseDataCache() {
}

/**
* Gets instance.
* 公开方法
* @return the instance
*/
public static BaseDataCache getInstance() {
return INSTANCE;
}

/**
* 缓存选择器数据的Map
* pluginName -> SelectorData.
*/
private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();

public void cacheSelectData(final SelectorData selectorData) {
Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
}

/**
* cache selector data.
* 缓存选择器数据
* @param data the selector data
*/
private void selectorAccept(final SelectorData data) {
String key = data.getPluginName();
if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入
List<SelectorData> existList = SELECTOR_MAP.get(key);
final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
resultList.add(data);
final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());
SELECTOR_MAP.put(key, collect);
} else { // 新增操作,直接放到Map中
SELECTOR_MAP.put(key, Lists.newArrayList(data));
}
}

}

二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。

经过以上的源码追踪,并通过一个实际的案例,在admin端新增更新一条选择器数据,就将nacos数据同步的流程分析清楚了。

我们还是通过时序图将网关端的数据同步流程串联一下:

数据同步的流程已经分析完了,为了不让同步流程被打断,在分析过程中就忽略了其他逻辑。网关同步操作初始化的流程在NacosSyncDataServicestart方法中,我们在上文分析网关数据同步时分析过了,下面分析Admin的同步数据初始化。

4. Admin同步数据初始化

admin端,NacosDataInit类型的bean,在NacosListener中进行定义和生成,如果admin的配置中指定了使用nacos进行数据同步,当admin启动后,会将当前的数据信息全量同步到nacos中,实现逻辑如下:


/**
* The type Nacos data init.
*/
public class NacosDataInit implements CommandLineRunner {

private static final Logger LOG = LoggerFactory.getLogger(NacosDataInit.class);

private final ConfigService configService;

private final SyncDataService syncDataService;

/**
* Instantiates a new Nacos data init.
* @param configService the nacos config service
* @param syncDataService the sync data service
*/
public NacosDataInit(final ConfigService configService, final SyncDataService syncDataService) {
this.configService = configService;
this.syncDataService = syncDataService;
}

@Override
public void run(final String... args) {
String pluginDataId = NacosPathConstants.PLUGIN_DATA_ID;
String authDataId = NacosPathConstants.AUTH_DATA_ID;
String metaDataId = NacosPathConstants.META_DATA_ID;
if (dataIdNotExist(pluginDataId) && dataIdNotExist(authDataId) && dataIdNotExist(metaDataId)) {
syncDataService.syncAll(DataEventTypeEnum.REFRESH);
}
}

private boolean dataIdNotExist(final String pluginDataId) {
try {
String group = NacosPathConstants.GROUP;
long timeout = NacosPathConstants.DEFAULT_TIME_OUT;
return configService.getConfig(pluginDataId, group, timeout) == null;
} catch (NacosException e) {
LOG.error("Get data from nacos error.", e);
throw new ShenyuException(e.getMessage());
}
}
}

判断nacos中是否存在数据,如果不存在,则进行同步。

NacosDataInit实现了CommandLineRunner接口。它是springboot提供的接口,会在所有 Spring Beans初始化之后执行run()方法,常用于项目中初始化的操作。

  • SyncDataService.syncAll()

从数据库查询数据,然后进行全量数据同步,所有的认证信息、插件信息、选择器信息、规则信息和元数据信息。主要是通过eventPublisher发布同步事件。这里就跟前面提到的同步逻辑就又联系起来了,eventPublisher通过publishEvent()发布完事件后,有ApplicationListener执行事件变更操作,在ShenYu中就是前面提到的DataChangedEventDispatcher

@Service
public class SyncDataServiceImpl implements SyncDataService {
// 事件发布
private final ApplicationEventPublisher eventPublisher;

/***
* 全量数据同步
* @param type the type
* @return
*/
@Override
public boolean syncAll(final DataEventTypeEnum type) {
// 同步认证信息
appAuthService.syncData();
// 同步插件信息
List<PluginData> pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
// 同步选择器信息
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
// 同步规则信息
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
// 同步元数据信息
metaDataService.syncData();
return true;
}

}

5. 总结

本文通过一个实际案例,对nacos的数据同步原理进行了源码分析。涉及到的主要知识点如下:

  • 基于nacos的数据同步,主要是通过watch机制实现;
  • 通过Spring完成事件发布和监听;
  • 通过抽象DataChangedListener接口,支持多种同步策略,面向接口编程;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过SpringBoot的条件装配和starter加载机制实现配置类的加载。

· One min read

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosetcdConsul 进行数据同步。本文的主要内容是基于WebSocket的数据同步源码分析。

本文基于shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. 关于WebSocket通信

WebSocket协议诞生于2008年,在2011年成为国际标准。它可以双向通信,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息。WebSocket协议建立在 TCP 协议之上,属于应用层,性能开销小,通信高效,协议标识符是ws

2. Admin数据同步

我们从一个实际案例进行源码追踪,比如在后台管理系统中,新增一条选择器数据:

2.1 接收数据

  • SelectorController.createSelector()

进入SelectorController类中的createSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/selector")
public class SelectorController {

@PostMapping("")
public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) { // @Valid 数校验
// 添加或更新数据
Integer createCount = selectorService.createOrUpdate(selectorDTO);
// 返回结果信息
return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);
}

// ......
}

2.2 处理数据

  • SelectorServiceImpl.createOrUpdate()

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {
// 负责事件发布的eventPublisher
private final ApplicationEventPublisher eventPublisher;

@Override
@Transactional(rollbackFor = Exception.class)
public int createOrUpdate(final SelectorDTO selectorDTO) {
int selectorCount;
// 构建数据 DTO --> DO
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
// 判断是添加还是更新
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 插入选择器数据
selectorCount = selectorMapper.insertSelective(selectorDO);
// 插入选择器中的条件数据
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
// check selector add
// 权限检查
if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {
DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();
dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());
dataPermissionDTO.setDataId(selectorDO.getId());
dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);
dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));
}

} else {
// 更新数据,先删除再新增
selectorCount = selectorMapper.updateSelective(selectorDO);
//delete rule condition then add
selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
selectorConditionMapper.insertSelective(selectorConditionDO);
});
}
// 发布事件
publishEvent(selectorDO, selectorConditionDTOs);

// 更新upstream
updateDivideUpstream(selectorDO);
return selectorCount;
}


// ......

}

Service类完成数据的持久化操作,即保存数据到数据库,这个大家应该很熟悉了,就不展开。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会进行数据同步。

publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {
// 找到选择器对应的插件
PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
// 构建条件数据
List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
// 发布变更数据
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}

发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者。

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {
//......
}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

//......

}

2.3 分发数据

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件信息
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则信息
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}

}

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:

  • WebsocketDataChangedListener:基于websocket的数据同步;
  • ZookeeperDataChangedListener:基于zookeeper的数据同步;
  • ConsulDataChangedListener:基于consul的数据同步;
  • EtcdDataDataChangedListener:基于etcd的数据同步;
  • HttpLongPollingDataChangedListener:基于http长轮询的数据同步;
  • NacosDataChangedListener:基于nacos的数据同步;

既然有这么多种实现策略,那么如何确定使用哪一种呢?

因为本文是基于websocket的数据同步源码分析,所以这里以WebsocketDataChangedListener为例,分析它是如何被加载并实现的。

通过在源码工程中进行全局搜索,可以看到,它的实现是在DataSyncConfiguration类完成的。

/**
* 数据同步配置类
* 通过springboot条件装配实现
* The type Data sync configuration.
*/
@Configuration
public class DataSyncConfiguration {

/**
* websocket数据同步(默认策略)
* The WebsocketListener(default strategy).
*/
@Configuration
@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {

/**
* Config event listener data changed listener.
* 配置websocket数据变更监听器
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(WebsocketDataChangedListener.class)
public DataChangedListener websocketDataChangedListener() {
return new WebsocketDataChangedListener();
}

/**
* Websocket collector.
* Websocket处理类:建立连接,发送消息,关闭连接等操作
* @return the websocket collector
*/
@Bean
@ConditionalOnMissingBean(WebsocketCollector.class)
public WebsocketCollector websocketCollector() {
return new WebsocketCollector();
}

/**
* Server endpoint exporter
*
* @return the server endpoint exporter
*/
@Bean
@ConditionalOnMissingBean(ServerEndpointExporter.class)
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

//......
}

这个配置类是通过SpringBoot条件装配类实现的。在WebsocketListener类上面有几个注解:

  • @Configuration:配置文件,应用上下文;

  • @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用websocket进行数据同步。不过,这里需要注意下matchIfMissing = true这个属性,它表示,如果你没有如下的配置,该配置类也会生效。基于websocket的数据同步时官方推荐的方式,也是默认采用的方式。

    shenyu:  
    sync:
    websocket:
    enabled: true
  • @EnableConfigurationProperties:启用配置属性;

当我们主动配置,采用websocket进行数据同步时,WebsocketDataChangedListener就会生成。所以在事件处理方法onApplicationEvent()中,就会到相应的listener中。在我们的案例中,是新增加了一条选择器数据,数据通过采用的是websocket,所以,代码会进入到WebsocketDataChangedListener进行选择器数据变更处理。

    @Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {

// 省略了其他逻辑

case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); // WebsocketDataChangedListener进行选择器数据变更处理
break;
}
}

2.4 Websocket数据变更监听器

  • WebsocketDataChangedListener.onSelectorChanged()

    onSelectorChanged()方法中,将数据进行了封装,转成WebsocketData,然后通过WebsocketCollector.send()发送数据。

    // 选择器数据有更新
@Override
public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
// 构造 WebsocketData 数据
WebsocketData<SelectorData> websocketData =
new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
// 通过websocket发送数据
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}

2.5 Websocket发送数据

  • WebsocketCollector.send()

send()方法中,判断了一下同步的类型,根据不同的类型,进行处理。

@Slf4j
@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {

/**
* Send.
*
* @param message the message
* @param type the type
*/
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
// 如果是MYSELF(第一次的全量同步)
if (DataEventTypeEnum.MYSELF == type) {
// 从threadlocal中获取session
Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);
if (session != null) {
// 向该session发送全量数据
sendMessageBySession(session, message);
}
} else {
// 后续的增量同步
// 向所有的session中同步变更数据
SESSION_SET.forEach(session -> sendMessageBySession(session, message));
}
}
}

private static void sendMessageBySession(final Session session, final String message) {
try {
// 通过websocket的session把消息发送出去
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}

我们给的案例是一个新增操作 ,是一个增量同步,所以会走

SESSION_SET.forEach(session -> sendMessageBySession(session, message));

这个逻辑。

再通过

session.getBasicRemote().sendText(message);

将数据发送了出去。

至此,当admin端发生数据变更时,就将变更的数据以增量形式通过WebSocket发给了网关。

分析到这里,不知道大家有没有疑问呢?比如session是怎么来的?网关如何和admin建立连接的?

不要着急,我们接下来就进行网关端的同步分析。

不过,在继续源码分析前,我们用一张图将上面的分析过程串联起来。

3. 网关数据同步

假设ShenYu网关已经在正常运行了,使用的数据同步方式也是websocket。那么当在admin端新增一条选择器数据后,并且通过WebSocket发送到网关,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。

3.1 WebsocketClient接收数据

  • ShenyuWebsocketClient.onMessage()

在网关端有一个ShenyuWebsocketClient类,它继承了WebSocketClient,可以和WebSocket建立连接并通信。

public final class ShenyuWebsocketClient extends WebSocketClient {
// ......
}

当在admin端通过websocket发送数据后,ShenyuWebsocketClient就可以通过onMessage()接收到数据,然后就可以自己进行处理。

public final class ShenyuWebsocketClient extends WebSocketClient {
// 接受到消息后执行
@Override
public void onMessage(final String result) {
// 处理接收到的数据
handleResult(result);
}

private void handleResult(final String result) {
// 数据反序列化
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
// 哪种数据类型,插件、选择器、规则...
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
// 哪种操作类型,更新、删除...
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());

// 处理数据
websocketDataHandler.executor(groupEnum, json, eventType);
}
}

接收到数据后,首先进行了反序列化操作,读取数据类型和操作类型,紧接着,就交给websocketDataHandler.executor()进行处理。

3.2 执行Websocket事件处理器

  • WebsocketDataHandler.executor()

通过工厂模式创建了Websocket数据处理器,每种数据类型,都提供了一个处理器:

插件 --> 插件数据处理器;

选择器 --> 选择器数据处理器;

规则 --> 规则数据处理器;

认证信息 --> 认证数据处理器;

元数据 --> 元数据处理器。


/**
* 通过工厂模式创建 Websocket数据处理器
* The type Websocket cache handler.
*/
public class WebsocketDataHandler {

private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);

/**
* Instantiates a new Websocket data handler.
* 每种数据类型,提供一个处理器
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
// 插件 --> 插件数据处理器
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
// 选择器 --> 选择器数据处理器
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
// 规则 --> 规则数据处理器
ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
// 认证信息 --> 认证数据处理器
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
// 元数据 --> 元数据处理器
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
}

/**
* Executor.
*
* @param type the type
* @param json the json
* @param eventType the event type
*/
public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
// 根据数据类型,找到对应的数据处理器
ENUM_MAP.get(type).handle(json, eventType);
}
}

不同的数据类型,有不同的数据处理方式,所以有不同的实现类。但是它们之间也有相同的处理逻辑,所以可以通过模板方法设计模式来实现。相同的逻辑放在抽象类中的handle()方法中,不同逻辑就交给各自的实现类。

我们的案例是新增了一条选择器数据,所以会交给SelectorDataHandler( 选择器 --> 选择器数据处理器)进行数据处理。

3.3 判断事件类型

  • AbstractDataHandler.handle()

实现数据变更的通用逻辑处理:根据不同的操作类型调用不同方法。


public abstract class AbstractDataHandler<T> implements DataHandler {

/**
* Convert list.
* 不同的逻辑由各自实现类去实现
* @param json the json
* @return the list
*/
protected abstract List<T> convert(String json);

/**
* Do refresh.
* 不同的逻辑由各自实现类去实现
* @param dataList the data list
*/
protected abstract void doRefresh(List<T> dataList);

/**
* Do update.
* 不同的逻辑由各自实现类去实现
* @param dataList the data list
*/
protected abstract void doUpdate(List<T> dataList);

/**
* Do delete.
* 不同的逻辑由各自实现类去实现
* @param dataList the data list
*/
protected abstract void doDelete(List<T> dataList);

// 通用逻辑,抽象类实现
@Override
public void handle(final String json, final String eventType) {
List<T> dataList = convert(json);
if (CollectionUtils.isNotEmpty(dataList)) {
DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
switch (eventTypeEnum) {
case REFRESH:
case MYSELF:
doRefresh(dataList); //刷新数据,全量同步
break;
case UPDATE:
case CREATE:
doUpdate(dataList); // 更新或创建数据,增量同步
break;
case DELETE:
doDelete(dataList); // 删除数据
break;
default:
break;
}
}
}
}

新增一条选择器数据,是新增操作,通过switch-case进入到doUpdate()方法中。

3.4 进入具体的数据处理器

  • SelectorDataHandler.doUpdate()

/**
* 选择器数据处理器
* The type Selector data handler.
*/
@RequiredArgsConstructor
public class SelectorDataHandler extends AbstractDataHandler<SelectorData> {

private final PluginDataSubscriber pluginDataSubscriber;

//......

// 更新操作
@Override
protected void doUpdate(final List<SelectorData> dataList) {
dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);
}
}

遍历数据,进入onSelectorSubscribe()方法。

  • PluginDataSubscriber.onSelectorSubscribe()

它没有其他逻辑,直接调用subscribeDataHandler()方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。

/**
* 通用插件数据订阅者,负责处理所有插件、选择器和规则信息
* The type Common plugin data subscriber.
*/
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
//......
// 处理选择器数据
@Override
public void onSelectorSubscribe(final SelectorData selectorData) {
subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}

// 订阅数据处理器,处理数据的更新或删除
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
// 插件数据
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cachePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
}
} else if (data instanceof SelectorData) { // 选择器数据
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) { // 规则数据
RuleData ruleData = (RuleData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
}
}
});
}

}

那么新增一条选择器数据,会进入下面的逻辑:

// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

一是将数据保存到网关的内存中。BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

public final class BaseDataCache {
// 私有变量
private static final BaseDataCache INSTANCE = new BaseDataCache();
// 私有构造器
private BaseDataCache() {
}

/**
* Gets instance.
* 公开方法
* @return the instance
*/
public static BaseDataCache getInstance() {
return INSTANCE;
}

/**
* 缓存选择器数据的Map
* pluginName -> SelectorData.
*/
private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();

public void cacheSelectData(final SelectorData selectorData) {
Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
}

/**
* cache selector data.
* 缓存选择器数据
* @param data the selector data
*/
private void selectorAccept(final SelectorData data) {
String key = data.getPluginName();
if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入
List<SelectorData> existList = SELECTOR_MAP.get(key);
final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
resultList.add(data);
final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());
SELECTOR_MAP.put(key, collect);
} else { // 新增操作,直接放到Map中
SELECTOR_MAP.put(key, Lists.newArrayList(data));
}
}

}

二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。

经过以上的源码追踪,并通过一个实际的案例,在admin端新增一条选择器数据,就将websocket数据同步的流程分析清楚了。

我们还是用下面的一张图将网关端的数据同步流程串联一下:

数据同步的流程已经分析完了,但是还有一些问题没有分析到,就是网关是如何跟admin建立连接的?

4. 网关和admin建立websocket连接

  • websocket配置

在网关的配置文件中有如下配置,并且引入了相关依赖,就会启动websocket相关服务。

shenyu:
file:
enabled: true
cross:
enabled: true
dubbo :
parameter: multi
sync:
websocket : # 使用websocket进行数据同步
urls: ws://localhost:9095/websocket # admin端的websocket地址
allowOrigin: ws://localhost:9195

在网关中引入websocket的依赖。

<!--shenyu data sync start use websocket-->
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-spring-boot-starter-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
  • Websocket数据同步配置

通过springboot的条件装配,创建相关的bean。在网关启动的时候,如果我们配置了shenyu.sync.websocket.urls,那么Websocket数据同步配置就会被加载。这里通过spring boot starter完成依赖的加载。


/**
* Websocket数据同步配置
* 通过springboot实现条件注入
* Websocket sync data configuration for spring boot.
*/
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {

/**
* Websocket sync data service.
* Websocket数据同步服务
* @param websocketConfig the websocket config
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
// 创建websocketSyncDataService
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync shenyu data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}

/**
* Config websocket config.
*
* @return the websocket config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu.sync.websocket")
public WebsocketConfig websocketConfig() {
return new WebsocketConfig(); // 创建WebsocketConfig
}
}

在项目的resources/META-INF目录先新建spring.factories文件,在文件中指明配置类。

  • Websocket数据同步服务

WebsocketSyncDataService中做了如下几件事情:

  • 读取配置中的urls,这个表示admin端的同步地址,有多个的话,使用","分割;
  • 创建调度线程池,一个admin分配一个,用于执行定时任务;
  • 创建ShenyuWebsocketClient,一个admin分配一个,用于和admin建立websocket通信;
  • 开始和admin端的websocket 建立连接;
  • 执行定时任务,每隔10秒执行一次。主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。如果没有断开,就进行 ping-pong 检测。

/**
* Websocket数据同步服务
* Websocket sync data service.
*/
@Slf4j
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {

private final List<WebSocketClient> clients = new ArrayList<>();

private final ScheduledThreadPoolExecutor executor;

/**
* Instantiates a new Websocket sync cache.
* 创建Websocket数据同步服务
* @param websocketConfig the websocket config
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
// admin端的同步地址,有多个的话,使用","分割
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
// 创建调度线程池,一个admin分配一个
executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
//创建WebsocketClient,一个admin分配一个
clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
// 和websocket server建立连接
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}

// 执行定时任务,每隔10秒执行一次
// 主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。
// 如果没有断开,就进行 ping-pong 检测
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());
} else {
log.error("websocket reconnection server[{}] is error.....", client.getURI().toString());
}
} else {
client.sendPing();
log.debug("websocket send to [{}] ping message successful", client.getURI().toString());
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 10, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}

}

@Override
public void close() {
// 关闭 websocket client
for (WebSocketClient client : clients) {
if (!client.isClosed()) {
client.close();
}
}
// 关闭线程池
if (Objects.nonNull(executor)) {
executor.shutdown();
}
}
}
  • ShenyuWebsocketClient

ShenYu中创建的WebSocket客户端,用于和admin端通信。第一次成功建立连接后,同步全量数据,后续进行增量同步。


/**
* 在ShenYu中自定义的WebSocket客户端
* The type shenyu websocket client.
*/
@Slf4j
public final class ShenyuWebsocketClient extends WebSocketClient {

private volatile boolean alreadySync = Boolean.FALSE;

private final WebsocketDataHandler websocketDataHandler;

/**
* Instantiates a new shenyu websocket client.
* 创建ShenyuWebsocketClient
* @param serverUri the server uri 服务端uri
* @param pluginDataSubscriber the plugin data subscriber 插件数据订阅器
* @param metaDataSubscribers the meta data subscribers 元数据订阅器
* @param authDataSubscribers the auth data subscribers 认证数据订阅器
*/
public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
super(serverUri);
this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
}

// 成功建立连接后执行
@Override
public void onOpen(final ServerHandshake serverHandshake) {
// 防止重新建立连接时,再次执行,所以用alreadySync进行判断
if (!alreadySync) {
// 同步所有数据,MYSELF 类型
send(DataEventTypeEnum.MYSELF.name());
alreadySync = true;
}
}

// 接收到消息后执行
@Override
public void onMessage(final String result) {
// 处理接收到的数据
handleResult(result);
}

// 关闭后执行
@Override
public void onClose(final int i, final String s, final boolean b) {
this.close();
}

// 失败后执行
@Override
public void onError(final Exception e) {
this.close();
}

@SuppressWarnings("ALL")
private void handleResult(final String result) {
// 数据反序列化
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
// 哪种数据类型,插件、选择器、规则...
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
// 哪种操作类型,更新、删除...
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());

// 处理数据
websocketDataHandler.executor(groupEnum, json, eventType);
}
}

5. 总结

本文通过一个实际案例,对websocket的数据同步原理进行了源码分析。涉及到的主要知识点如下:

  • websocket支持双向通信,性能好,推荐使用;
  • 通过Spring完成事件发布和监听;
  • 通过抽象DataChangedListener接口,支持多种同步策略,面向接口编程;
  • 使用工厂模式创建 WebsocketDataHandler,实现不同数据类型的处理;
  • 使用模板方法设计模式实现AbstractDataHandler,处理通用的操作类型;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过SpringBoot的条件装配和starter加载机制实现配置类的加载。

· One min read

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于ZooKeeper的数据同步源码分析。

本文基于shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理

1. 关于ZooKeeper

Apache ZooKeeperApache软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。ZooKeeper节点将它们的数据存储于一个分层的名字空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。

2. Admin数据同步

我们从一个实际案例进行源码追踪,比如在后台管理系统中,对Divide插件中的一条选择器数据进行更新,将权重更新为90:

2.1 接收数据

  • SelectorController.createSelector()

进入SelectorController类中的updateSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/selector")
public class SelectorController {

@PutMapping("/{id}")
public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) {
// 设置当前选择器数据id
selectorDTO.setId(id);
// 创建或更新操作
Integer updateCount = selectorService.createOrUpdate(selectorDTO);
// 返回结果信息
return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount);
}

// ......
}

2.2 处理数据

  • SelectorServiceImpl.createOrUpdate()

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {
// 负责事件发布的eventPublisher
private final ApplicationEventPublisher eventPublisher;

@Override
@Transactional(rollbackFor = Exception.class)
public int createOrUpdate(final SelectorDTO selectorDTO) {
int selectorCount;
// 构建数据 DTO --> DO
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
// 判断是添加还是更新
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 插入选择器数据
selectorCount = selectorMapper.insertSelective(selectorDO);
// 插入选择器中的条件数据
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
// check selector add
// 权限检查
if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {
DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();
dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());
dataPermissionDTO.setDataId(selectorDO.getId());
dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);
dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));
}

} else {
// 更新数据,先删除再新增
selectorCount = selectorMapper.updateSelective(selectorDO);
//delete rule condition then add
selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
selectorConditionMapper.insertSelective(selectorConditionDO);
});
}
// 发布事件
publishEvent(selectorDO, selectorConditionDTOs);

// 更新upstream
updateDivideUpstream(selectorDO);
return selectorCount;
}


// ......

}

Serrvice类完成数据的持久化操作,即保存数据到数据库,这个比较简单,就不深入追踪了。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会执行数据同步。

publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {
// 找到选择器对应的插件
PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
// 构建条件数据
List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
// 发布变更数据
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}

发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者;

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {
//......
}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

//......

}

2.3 分发数据

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件信息
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则信息
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}

}

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:

  • WebsocketDataChangedListener:基于websocket的数据同步;
  • ZookeeperDataChangedListener:基于zookeeper的数据同步;
  • ConsulDataChangedListener:基于consul的数据同步;
  • EtcdDataDataChangedListener:基于etcd的数据同步;
  • HttpLongPollingDataChangedListener:基于http长轮询的数据同步;
  • NacosDataChangedListener:基于nacos的数据同步;

既然有这么多种实现策略,那么如何确定使用哪一种呢?

因为本文是基于Zookeeper的数据同步源码分析,所以这里以ZookeeperDataChangedListener为例,分析它是如何被加载并实现的。

通过在源码工程中进行全局搜索,可以看到,它的实现是在DataSyncConfiguration类完成的。

/**
* 数据同步配置类
* 通过springboot条件装配实现
* The type Data sync configuration.
*/
@Configuration
public class DataSyncConfiguration {


/**
* zookeeper数据同步
* The type Zookeeper listener.
*/
@Configuration
@ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url") // 条件属性,满足才会被加载
@Import(ZookeeperConfiguration.class)
static class ZookeeperListener {

/**
* Config event listener data changed listener.
* 创建Zookeeper数据变更监听器
* @param zkClient the zk client
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {
return new ZookeeperDataChangedListener(zkClient);
}

/**
* Zookeeper data init zookeeper data init.
* 创建 Zookeeper 数据初始化类
* @param zkClient the zk client
* @param syncDataService the sync data service
* @return the zookeeper data init
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataInit.class)
public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
return new ZookeeperDataInit(zkClient, syncDataService);
}
}

//省略了其他代码......
}

这个配置类是通过SpringBoot条件装配类实现的。在ZookeeperListener类上面有几个注解:

  • @Configuration:配置文件,应用上下文;

  • @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url"):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用zookeeper进行数据同步。

    shenyu:  
    sync:
    zookeeper:
    url: localhost:2181
    sessionTimeout: 5000
    connectionTimeout: 2000
  • @Import(ZookeeperConfiguration.class):导入另一个类ZookeeperConfiguration

  @EnableConfigurationProperties(ZookeeperProperties.class)  // 启用zk属性配置类
public class ZookeeperConfiguration {

/**
* register zkClient in spring ioc.
* 向 Spring IOC 容器注册 zkClient
* @param zookeeperProp the zookeeper configuration
* @return ZkClient {@linkplain ZkClient}
*/
@Bean
@ConditionalOnMissingBean(ZkClient.class)
public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {
return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout()); // 读取zk配置信息,并创建zkClient
}
}
@Data
@ConfigurationProperties(prefix = "shenyu.sync.zookeeper") // zk属性配置
public class ZookeeperProperties {

private String url;

private Integer sessionTimeout;

private Integer connectionTimeout;

private String serializer;
}

当我们主动配置,采用zookeeper进行数据同步时,zookeeperDataChangedListener就会生成。所以在事件处理方法onApplicationEvent()中,就会到相应的listener中。在我们的案例中,是对一条选择器数据进行更新,数据同步采用的是zookeeper,所以,代码会进入到ZookeeperDataChangedListener进行选择器数据变更处理。

    @Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {

// 省略了其他逻辑

case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); // 在我们的案例中,会进入到ZookeeperDataChangedListener进行选择器数据变更处理
break;
}
}

2.4 Zookeeper数据变更监听器

  • ZookeeperDataChangedListener.onSelectorChanged()

    onSelectorChanged()方法中,判断操作类型,是刷新同步还是更新或创建同步。根据当前选择器数据信息判断节点是否在zk中。


/**
* 使用 zookeeper 发布变更数据
*/
public class ZookeeperDataChangedListener implements DataChangedListener {

// 选择器信息发生改变
@Override
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
// 刷新操作
if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {
String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());
deleteZkPathRecursive(selectorParentPath);
}
// 发生变更的数据
for (SelectorData data : changed) {
// 构建选择器数据的真实路径
String selectorRealPath = DefaultPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());
// 如果是删除操作
if (eventType == DataEventTypeEnum.DELETE) {
// 删除当前数据
deleteZkPath(selectorRealPath);
continue;
}
// 父节点路径
String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(data.getPluginName());
// 创建父节点
createZkNode(selectorParentPath);
// 插入或更新数据
insertZkNode(selectorRealPath, data);
}
}

// 创建 zk 节点
private void createZkNode(final String path) {
// 不存在才创建
if (!zkClient.exists(path)) {
zkClient.createPersistent(path, true);
}
}

// 插入zk节点
private void insertZkNode(final String path, final Object data) {
// 创建节点
createZkNode(path);
// 通过 zkClient 写入数据
zkClient.writeData(path, null == data ? "" : GsonUtils.getInstance().toJson(data));
}

}

只要将变动的数据正确写入到zk的节点上,admin这边的操作就执行完成了。ShenYu在使用zk进行数据同步时,zk的节点是通过精心设计的。

在我们当前的案例中,对Divide插件中的一条选择器数据进行更新,将权重更新为90,就会对图中的特定节点更新。

我们用时序图将上面的更新流程串联起来。

3. 网关数据同步

假设ShenYu网关已经在正常运行,使用的数据同步方式也是zookeeper。那么当在admin端更新选择器数据后,并且向zk发送了变更的数据,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。

3.1 ZkClient接收数据

  • ZkClient.subscribeDataChanges()

在网关端有一个ZookeeperSyncDataService类,它通过ZkClient订阅了数据节点,当数据发生变更时,可以感知到。

/**
* 使用 zookeeper 缓存数据
*/
public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {

private void subscribeSelectorDataChanges(final String path) {
// zkClient订阅数据节点
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(final String dataPath, final Object data) {
cacheSelectorData(GsonUtils.getInstance().fromJson(data.toString(), SelectorData.class)); // 节点数据被更新
}

@Override
public void handleDataDeleted(final String dataPath) {
unCacheSelectorData(dataPath); // 节点数据被删除
}
});
}

// 省略了其他逻辑
}

ZooKeeperWatch机制,会给订阅的客户端发送节点变更通知。在我们的案例中,更新选择器信息,就会进入到handleDataChange()方法。通过cacheSelectorData()去处理数据。

3.2 处理数据

  • ZookeeperSyncDataService.cacheSelectorData()

经过判空逻辑之后,缓存选择器数据的操作又交给了PluginDataSubscriber处理。

    private void cacheSelectorData(final SelectorData selectorData) {
Optional.ofNullable(selectorData)
.ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));
}

PluginDataSubscriber是一个接口,它只有一个CommonPluginDataSubscriber实现类,负责处理插件、选择器和规则数据。

3.3 通用插件数据订阅者

  • PluginDataSubscriber.onSelectorSubscribe()

它没有其他逻辑,直接调用subscribeDataHandler()方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。

/**
* 通用插件数据订阅者,负责处理所有插件、选择器和规则信息
* The type Common plugin data subscriber.
*/
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
//......
// 处理选择器数据
@Override
public void onSelectorSubscribe(final SelectorData selectorData) {
subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}

// 订阅数据处理器,处理数据的更新或删除
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
// 插件数据
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cachePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
}
} else if (data instanceof SelectorData) { // 选择器数据
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理 Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) { // 规则数据
RuleData ruleData = (RuleData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
}
}
});
}

}

3.4 数据缓存到内存

那么更新一条选择器数据,会进入下面的逻辑:

// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

一是将数据保存到网关的内存中。BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

public final class BaseDataCache {
// 私有变量
private static final BaseDataCache INSTANCE = new BaseDataCache();
// 私有构造器
private BaseDataCache() {
}

/**
* Gets instance.
* 公开方法
* @return the instance
*/
public static BaseDataCache getInstance() {
return INSTANCE;
}

/**
* 缓存选择器数据的Map
* pluginName -> SelectorData.
*/
private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();

public void cacheSelectData(final SelectorData selectorData) {
Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
}

/**
* cache selector data.
* 缓存选择器数据
* @param data the selector data
*/
private void selectorAccept(final SelectorData data) {
String key = data.getPluginName();
if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入
List<SelectorData> existList = SELECTOR_MAP.get(key);
final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
resultList.add(data);
final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());
SELECTOR_MAP.put(key, collect);
} else { // 新增操作,直接放到Map中
SELECTOR_MAP.put(key, Lists.newArrayList(data));
}
}

}

二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。

经过以上的源码追踪,并通过一个实际的案例,在admin端新增更新一条选择器数据,就将zookeeper数据同步的流程分析清楚了。

我们还是通过时序图将网关端的数据同步流程串联一下:

数据同步的流程已经分析完了,为了不让同步流程被打断,在分析过程中就忽略了其他逻辑。我们还需要分析Admin同步数据初始化和网关同步操作初始化的流程。

4. Admin同步数据初始化

admin启动后,会将当前的数据信息全量同步到zk中,实现逻辑如下:


/**
* Zookeeper 数据初始化
*/
public class ZookeeperDataInit implements CommandLineRunner {

private final ZkClient zkClient;

private final SyncDataService syncDataService;

/**
* Instantiates a new Zookeeper data init.
*
* @param zkClient the zk client
* @param syncDataService the sync data service
*/
public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
this.zkClient = zkClient;
this.syncDataService = syncDataService;
}

@Override
public void run(final String... args) {
String pluginPath = DefaultPathConstants.PLUGIN_PARENT;
String authPath = DefaultPathConstants.APP_AUTH_PARENT;
String metaDataPath = DefaultPathConstants.META_DATA;
// 判断zk中是否存在数据
if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {
syncDataService.syncAll(DataEventTypeEnum.REFRESH);
}
}
}

判断zk中是否存在数据,如果不存在,则进行同步。

ZookeeperDataInit实现了CommandLineRunner接口。它是springboot提供的接口,会在所有 Spring Beans初始化之后执行run()方法,常用于项目中初始化的操作。

  • SyncDataService.syncAll()

从数据库查询数据,然后进行全量数据同步,所有的认证信息、插件信息、选择器信息、规则信息和元数据信息。主要是通过eventPublisher发布同步事件。这里就跟前面提到的同步逻辑就又联系起来了,eventPublisher通过publishEvent()发布完事件后,有ApplicationListener执行事件变更操作,在ShenYu中就是前面提到的DataChangedEventDispatcher

@Service
public class SyncDataServiceImpl implements SyncDataService {
// 事件发布
private final ApplicationEventPublisher eventPublisher;

/***
* 全量数据同步
* @param type the type
* @return
*/
@Override
public boolean syncAll(final DataEventTypeEnum type) {
// 同步认证信息
appAuthService.syncData();
// 同步插件信息
List<PluginData> pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
// 同步选择器信息
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
// 同步规则信息
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
// 同步元数据信息
metaDataService.syncData();
return true;
}

}

5. 网关同步操作初始化

网关这边的数据同步初始化操作主要是订阅zk中的节点,当有数据变更时,收到变更数据。这依赖于ZooKeeperWatch机制。在ShenYu中,负责zk数据同步的是ZookeeperSyncDataService,也在前面提到过。

ZookeeperSyncDataService的功能逻辑是在实例化的过程中完成的:对zk中的shenyu数据同步节点完成订阅。这里的订阅分两类,一类是已经存在的节点上面数据发生更新,这通过zkClient.subscribeDataChanges()方法实现;另一类是当前节点下有新增或删除节点,即子节点发生变化,这通过zkClient.subscribeChildChanges()方法实现。

ZookeeperSyncDataService的代码有点多,这里我们以插件数据的读取和订阅进行追踪,其他类型的数据操作原理是一样的。


/**
* zookeeper 数据同步服务
*/
public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
// 在实例化的时候,完成从zk中读取数据的操作,并订阅节点
public ZookeeperSyncDataService( /*省略构造参数参数*/ ) {
this.zkClient = zkClient;
this.pluginDataSubscriber = pluginDataSubscriber;
this.metaDataSubscribers = metaDataSubscribers;
this.authDataSubscribers = authDataSubscribers;
// 订阅插件、选择器和规则数据
watcherData();
// 订阅认证数据
watchAppAuth();
// 订阅元数据
watchMetaData();
}

private void watcherData() {
// 插件节点路径
final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;
// 所有插件节点
List<String> pluginZKs = zkClientGetChildren(pluginParent);
for (String pluginName : pluginZKs) {
// 订阅当前所有插件、选择器和规则数据
watcherAll(pluginName);
}
// 订阅子节点(新增或删除一个插件)
zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
for (String pluginName : currentChildren) {
// 需要订阅子节点的所有插件、选择器和规则数据
watcherAll(pluginName);
}
}
});
}

private void watcherAll(final String pluginName) {
// 订阅插件数据
watcherPlugin(pluginName);
// 订阅选择器数据
watcherSelector(pluginName);
// 订阅规则数据
watcherRule(pluginName);
}

private void watcherPlugin(final String pluginName) {
// 当前插件路径
String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);
// 是否存在,不存在就创建
if (!zkClient.exists(pluginPath)) {
zkClient.createPersistent(pluginPath, true);
}
// 读取zk上当前节点数据,并反序列化
PluginData pluginData = null == zkClient.readData(pluginPath) ? null
: GsonUtils.getInstance().fromJson((String) zkClient.readData(pluginPath), PluginData.class);
// 缓存到网关内存中
cachePluginData(pluginData);
// 订阅插件节点
subscribePluginDataChanges(pluginPath, pluginName);
}

private void cachePluginData(final PluginData pluginData) {
// 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来
}

private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {
// 订阅数据变更:更新或删除
zkClient.subscribeDataChanges(pluginPath, new IZkDataListener() {

@Override
public void handleDataChange(final String dataPath, final Object data) { // 更新操作
// 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来
}

@Override
public void handleDataDeleted(final String dataPath) { // 删除操作
// 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来

}
});
}

}

上面的源代码中都给出了注释,相信大家可以看明白。订阅插件数据的主要逻辑如下:

  1. 构造当前插件路径
  2. 路径是否存在,不存在就创建
  3. 读取zk上当前节点数据,并反序列化
  4. 插件数据缓存到网关内存中
  5. 订阅插件节点

6. 总结

本文通过一个实际案例,对zookeeper的数据同步原理进行了源码分析。涉及到的主要知识点如下:

  • 基于zookeeper的数据同步,主要是通过watch机制实现;
  • 通过Spring完成事件发布和监听;
  • 通过抽象DataChangedListener接口,支持多种同步策略,面向接口编程;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过SpringBoot的条件装配和starter加载机制实现配置类的加载。

· One min read
Haiqi Qin

这篇文章将会对Apache ShenYu的e2e模块进行深入剖析。

什么是e2e

e2e(end to end),也叫端到端测试,是一种用于测试应用程序流是否从头到尾按设计执行的方法。 执行端到端测试的目的是识别系统依赖关系,并确保在各种系统组件和系统之间传递正确的信息。端到端测试的目的是测试 整个软件的依赖性、数据完整性以及与其他系统、接口和数据库的通信,以模拟完整的生产场景。

e2e的优势

e2e测试能够模拟真实用户场景下测试软件系统的完整性和准确性,能够验证整个系统是否按照预期工作,以及不同组件是否能够协同工作。 e2e测试有以下几个好处:

  1. 帮助保证系统功能的正确性:e2e测试能够模拟真实用户场景下的交互和操作,验证整个系统是否能够按照预期工作,帮助发现系统中的潜在问题和缺陷。
  2. 提高测试覆盖率:e2e测试能够覆盖整个系统,包括前端、后端、数据库等不同层面和组件,从而提高测试覆盖率,保证测试的全面性和准确性。
  3. 保证系统的稳定性:E2E测试可以检查系统在各种情况下的稳定性和健壮性,包括系统的响应时间、错误处理能力、并发性等方面,帮助确保系统在面对高负载和异常情况时仍然能够保持稳定运行。
  4. 减少测试成本:e2e测试能够提高测试效率和准确性,减少测试成本和时间,从而帮助企业更快速地发布和交付高质量的软件产品。

总之,e2e测试是一种全面的测试方式,能够验证整个系统是否按照预期工作,提高测试覆盖率和测试效率,从而保证系统的稳定性和正确性,减少测试成本和时间,是一种非常重要和有效的测试方法,所以我们需要完善 e2e相关代码。

自动化e2e测试如何实现

在Apache ShenYu中,e2e测试的主要步骤体现在GitHub Action工作流的脚本中,如下所示,该脚本位于 ~/.github/workflows目录下的e2e文件中。

name: e2e

on:
pull_request:
push:
branches:
- master
jobs:
changes:
...
build-docker-images:
...
e2e-http:
...
e2e-case:
runs-on: ubuntu-latest
needs:
- changes
- build-docker-images
if: ${{ needs.changes.outputs.e2e == 'true' }}
strategy:
matrix:
case: [ "shenyu-e2e-case-spring-cloud", "shenyu-e2e-case-apache-dubbo", "shenyu-e2e-case-sofa" ]
steps:
- uses: actions/checkout@v3
with:
submodules: true
- name: Load ShenYu Docker Images
run: |
docker load --input /tmp/apache-shenyu-admin.tar
docker load --input /tmp/apache-shenyu-bootstrap.tar
docker image ls -a
- name: Build examples with Maven
run: ./mvnw -B clean install -Pexample -Dmaven.javadoc.skip=true -Dmaven.test.skip=true -f ./shenyu-examples/pom.xml
- name: Run ShenYu E2E Tests
env:
storage: mysql
run: |
bash ./shenyu-e2e/script/storage_init.sh
./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/${{ matrix.case }} -Dstorage=mysql test

当工作流触发时,使用shenyu-dist模块下的dockerfile文件构建admin与bootstrap项目的镜像并上传,当e2e测试模块运行时可以加载admin与bootstrap镜像。紧接着构建examples中的模块,最后执行对应测试模块的测试方法。

本地如何运行e2e测试

如果需要编写e2e测试用例,首先需要在本地编码并调试。目前e2e支持两种启动方式,一个是docker启动,另一个是host启动。这两种模式可以通过在测试类中的@ShenYuTest注解中切换。host启动方式直接在本地将需要启动的服务直接启动即可运行测试代码。采用docker进行启动前,需要在先构建出相应镜像。因为ShenYu目前需要支持在github工作流进行e2e测试,建议采用docker启动方式。

e2e启动流程剖析

目前e2e模块主要分为四个部分,分别为:case、client、common以及engine。

e2e-modules

case模块存放插件的测试用例,client模块编写了admin与gateway的客户端,以便请求对应接口。common存放一些公共类,engine模块是框架的核心,依托testcontainer框架利用java代码启动docker容器并完成对admin以及gatewat的配置操作。

接下来我将依托源码对e2e启动流程进行剖析。

当我们执行case中的测试方法时,@ShenYuTest注解将会生效,对测试类进行扩展。通过@ShenYuTest,我们可以选择启动方法、对admin以及gateway配置相关参数,以及选择将要执行的docker-compose文件。对于admin以及gateway,可以配置登陆所需的用户名、密码、数据同步方式以及修改yaml的内容。

@ShenYuTest(
mode = ShenYuEngineConfigure.Mode.DOCKER,
services = {
@ShenYuTest.ServiceConfigure(
serviceName = "admin",
port = 9095,
baseUrl = "http://{hostname:localhost}:9095",
parameters = {
@ShenYuTest.Parameter(key = "username", value = "admin"),
@ShenYuTest.Parameter(key = "password", value = "123456"),
@ShenYuTest.Parameter(key = "dataSyn", value = "admin_websocket")
}
),
@ShenYuTest.ServiceConfigure(
serviceName = "gateway",
port = 9195,
baseUrl = "http://{hostname:localhost}:9195",
type = ShenYuEngineConfigure.ServiceType.SHENYU_GATEWAY,
parameters = {
@ShenYuTest.Parameter(key = "application", value = "spring.cloud.discovery.enabled:true,eureka.client.enabled:true"),
@ShenYuTest.Parameter(key = "dataSyn", value = "gateway_websocket")})}, dockerComposeFile = "classpath:./docker-compose.mysql.yml")

@ShenYuTest通过ShenYuExtension类进行扩展,对admin与gateway的配置在ShenYuExtension中的beforeAll中生效。具体的生效逻辑在DockerServiceCompose类中实现。

e2e-shenyutest

e2e-beforeall

@ShenYuTest配置项在docker启动前生效,主要通过修改测试模块中resource目录下的yaml文件。目前e2e支持对不同数据同步方式进行测试,其原理就是通过DockerServiceCompose类中的chooseDataSyn方法。在DataSyncHandler中对各种数据同步方式需要修改的内容进行初始化,最后启动container。

e2e-docer-service-compose

e2e-datahandle-syn

当docker启动完后,开始对插件功能进行测试。在PluginsTest类中,有针对测试进行的前置以及后置操作。

    @BeforeAll
static void setup(final AdminClient adminClient, final GatewayClient gatewayClient) throws InterruptedException, JsonProcessingException {
adminClient.login();
Thread.sleep(10000);
List<SelectorDTO> selectorDTOList = adminClient.listAllSelectors();
List<MetaDataDTO> metaDataDTOList = adminClient.listAllMetaData();
List<RuleDTO> ruleDTOList = adminClient.listAllRules();
Assertions.assertEquals(2, selectorDTOList.size());
Assertions.assertEquals(13, metaDataDTOList.size());
Assertions.assertEquals(14, ruleDTOList.size());

for (SelectorDTO selectorDTO : selectorDTOList) {
if (selectorDTO.getHandle() != null && !"".equals(selectorDTO.getHandle())) {
SpringCloudPluginCases.verifierUri(selectorDTO.getHandle());
}
}

List<MetaData> metaDataCacheList = gatewayClient.getMetaDataCache();
List<SelectorCacheData> selectorCacheList = gatewayClient.getSelectorCache();
List<RuleCacheData> ruleCacheList = gatewayClient.getRuleCache();
Assertions.assertEquals(2, selectorCacheList.size());
Assertions.assertEquals(13, metaDataCacheList.size());
Assertions.assertEquals(14, ruleCacheList.size());

MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();
formData.add("id", "8");
formData.add("name", "springCloud");
formData.add("enabled", "true");
formData.add("role", "Proxy");
formData.add("sort", "200");
adminClient.changePluginStatus("8", formData);
String id = "";
for (SelectorDTO selectorDTO : selectorDTOList) {
if (!"".equals(selectorDTO.getHandle())) {
id = selectorDTO.getId();
}
}
adminClient.deleteSelectors(id);
selectorDTOList = adminClient.listAllSelectors();
Assertions.assertEquals(1, selectorDTOList.size());
}

以springcloud插件为例,首先需要测试注册中心以及数据同步能否正常工作,接着启动插件并删除已存在的选择器。测试数据是否成功注册进注册中心,可以调用admin客户端的接口进行测试,测试数据同步是否成功,可以获取gateway的缓存进行测试。

接着运行case文件中的测试用例,通过@ShenYuScenario获取用例。

    @ShenYuScenario(provider = SpringCloudPluginCases.class)
void testSpringCloud(GatewayClient gateway, CaseSpec spec) {
spec.getVerifiers().forEach(verifier -> verifier.verify(gateway.getHttpRequesterSupplier().get()));
}

针对不同的插件,我们可以构建Case类,存放要测试的规则。所有的测试规则存放进list中,按顺序进行测试。beforeEachSpec中进行构建选择器与规则,caseSpec存放测试实体,如果符合uri规则的应存在,否则不存在。我们需要模拟用户对选择器和规则进行新增,因为各个插件的选择器的handler规则不一定相同,所以我们需要根据插件需求去编写其handle类。并通过请求验证其符合规则。具体测试用例主要分为两大类,一类是对uri规则进行匹配,比如euqal、path_pattern、start_with、end_with,一类是请求类型,比如get、put、post、delete。

当八种匹配情况都测试通过后,可以判断该插件功能正常,我们在测试结束后需要恢复环境,将所有的选择器删除,将该插件设置为不可用,最后关闭所有容器。

    @Override
public List<ScenarioSpec> get() {
return Lists.newArrayList(
testWithUriEquals(),
testWithUriPathPattern(),
testWithUriStartWith(),
testWithEndWith(),
testWithMethodGet(),
testWithMethodPost(),
testWithMethodPut(),
testWithMethodDelete()
);
}

private ShenYuScenarioSpec testWithUriEquals() {
return ShenYuScenarioSpec.builder()
.name("single-spring-cloud uri =]")
.beforeEachSpec(
ShenYuBeforeEachSpec.builder()
.addSelectorAndRule(
newSelectorBuilder("selector", Plugin.SPRING_CLOUD) .handle(SpringCloudSelectorHandle.builder().serviceId("springCloud-test")
.gray(true)
.divideUpstreams(DIVIDE_UPSTREAMS)
.build())
.conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.EQUAL, TEST))
.build(),
newRuleBuilder("rule") .handle(SpringCloudRuleHandle.builder().loadBalance("hash").timeout(3000).build())
.conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.EQUAL, TEST))
.build()
)
.checker(notExists(TEST))
.waiting(exists(TEST))
.build()
)
.caseSpec(
ShenYuCaseSpec.builder()
.addExists(TEST)
.addNotExists("/springcloud/te")
.addNotExists("/put")
.addNotExists("/get")
.build()
)
.afterEachSpec(ShenYuAfterEachSpec.DEFAULT)
.build();
}

· One min read
Kunshuai Zhu

这篇文章将会对Apache ShenYu的集成测试进行深入剖析。

什么是集成测试?

集成测试在一些项目里也叫E2E (End To End)测试,主要用于测试各个模块组装成一个系统后是否能符合预期。

Apache ShenYu将集成测试放在了持续集成中,利用GitHub Action,在每次向主分支提交Pull Request或是Merge时触发。这样可以大大降低项目的维护成本,提升Apache ShenYu的稳定性。

自动化的集成测试如何实现?

Apache ShenYu中,集成测试的主要步骤体现在GitHub Action工作流的脚本中,如下所示,该脚本位于 ~/.github/workflows目录下。

name: it
on:
pull_request:
push:
branches:
- master
jobs:
build:
strategy:
matrix:
case:
- shenyu-integrated-test-alibaba-dubbo
...
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: true
...

下面我将从这个yaml文件出发,带你剖析整个自动化集成测试的流程。

工作流的触发

由于我们在 on 中指定了 pull_requestpush.branch: master,那么当我们提交pull_request或是merge分支到master(push)的时候,就会触发这个工作流。

关于更多GitHub Action的用法,可以参考 GitHub Action 的文档,这里不会做详细的介绍。

初始化环境

  • 拉取代码
- uses: actions/checkout@v2
with:
submodules: true
  • 设置跳过标志
- name: Set Skip Env Var
uses: ./.github/actions/skip-ci

当发生的是一些对功能无关的改动(如改动文档)时,会跳过集成测试,以节约资源。

  • 缓存maven依赖、安装Java
- name: Cache Maven Repos
...
- uses: actions/setup-java@v1

构建整个项目,同时构建docker镜像

./mvnw -B clean install -Prelease,docker -Dmaven.javadoc.skip=true -Dmaven.test.skip=true

上面这行命令中,-P后面跟着release,docker,表示会激活pom文件中相关的profile配置。

而release和docker这两个profile,目前只在 shenyu-dist 下的几个子模块中存在。下面将以 shenyu-dist-admin 模块为例,介绍profile为release和docker的配置的具体内容。另外,集成测试只使用了这一步构建的 shenyu-admin 镜像。

  • 首先是release

    <profile>
    <id>release</id>
    <activation>
    <activeByDefault>false</activeByDefault>
    </activation>
    <build>
    <finalName>apache-shenyu-incubating-${project.version}</finalName>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <executions>
    <execution>
    <id>admin-bin</id>
    <phase>package</phase>
    <goals>
    <goal>single</goal>
    </goals>
    </execution>
    </executions>
    <configuration>
    <descriptors>
    <descriptor>${project.basedir}/src/main/assembly/binary.xml</descriptor>
    </descriptors>
    <tarLongFileMode>posix</tarLongFileMode>
    </configuration>
    </plugin>
    </plugins>
    </build>
    </profile>

    当-P后面跟着release时,就会激活上面的 maven-assembly-plugin 插件。而executions中将插件的执行时机绑定在了maven生命周期package中,这也就意味着,当我们执行 mvn install 的时候就会触发。

    configuration中指定了我们编写好的 binary.xmlmaven-assembly-plugin 插件将会按照这个文件,将需要的文件复制进来,并打包。你可以点击链接查看该文件:shenyu-dist/shenyu-admin-dist/src/main/assembly/binary.xml

    根据这个文件,插件会将其他模块下打包好的jar包、配置文件、启动脚本等“复制”过来,最终打成 tar.gz 格式的压缩包。

  • 然后是docker

    <profile>
    <id>docker</id>
    <activation>
    <activeByDefault>false</activeByDefault>
    </activation>
    <build>
    <plugins>
    <plugin>
    <groupId>com.spotify</groupId>
    <artifactId>dockerfile-maven-plugin</artifactId>
    <version>${dockerfile-maven-plugin.version}</version>
    <executions>
    <execution>
    <id>tag-latest</id>
    <goals>
    <goal>build</goal>
    </goals>
    <configuration>
    <tag>latest</tag>
    </configuration>
    </execution>
    <execution>
    <id>tag-version</id>
    <goals>
    <goal>build</goal>
    </goals>
    <configuration>
    <tag>${project.version}</tag>
    </configuration>
    </execution>
    </executions>
    <configuration>
    <repository>apache/shenyu-admin</repository>
    <buildArgs>
    <APP_NAME>apache-shenyu-incubating-${project.version}-admin-bin</APP_NAME>
    </buildArgs>
    </configuration>
    </plugin>
    </plugins>
    </build>
    </profile>

    类比上面的release,这里是激活 dockerfile-maven-plugin 插件。当 mvn install -Pdocker 时,插件就会利用我们编写好的dockerfile构建docker镜像。

需要注意的是,dockerfile-maven-plugin目前对aarch64架构的设备支持有限,在aarch64架构的机器上运行该插件时会出现如下错误。且在本人写这篇文章的时候已经很久没有维护,这意味着aarch64架构的设备使用这个插件的问题在短期内不会解决。

[ERROR] Failed to execute goal com.spotify:dockerfile-maven-plugin:1.4.6:build (tag-latest) on project shenyu-admin-dist: Could not build image: java.util.concurrent.ExecutionException: com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException: java.lang.UnsatisfiedLinkError: could not load FFI provider jnr.ffi.provider.jffi.Provider: ExceptionInInitializerError: Can't overwrite cause with java.lang.UnsatisfiedLinkError: java.lang.UnsatisfiedLinkError: /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: dlopen(/private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib, 1): no suitable image found.  Did find:
[ERROR] /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: no matching architecture in universal wrapper
[ERROR] /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: no matching architecture in universal wrapper
...

这里有个临时的解决方案:

  1. 打开一个新的shell,输入如下命令,利用 socat 将 unix socket 路由到 tcp 端口

    socat TCP-LISTEN:2375,range=127.0.0.1/32,reuseaddr,fork UNIX-CLIENT:/var/run/docker.sock
  2. 设置环境变量

    export DOCKER_HOST=tcp://127.0.0.1:2375

构建examples模块

- name: Build examples
if: env.SKIP_CI != 'true'
run: ./mvnw -B clean install -Pexample -Dmaven.javadoc.skip=true -Dmaven.test.skip=true -f ./shenyu-examples/pom.xml

因为考虑到release的需要,目前项目根目录下的pom文件中不饱含example子模块,所以上面这个步骤另外构建了examples模块。

与上面类似,这行命令也会利用maven的插件构建镜像,以供我们后续docker编排使用。

构建定制化网关

- name: Build integrated tests
if: env.SKIP_CI != 'true'
run: ./mvnw -B clean install -Pit -DskipTests -f ./shenyu-integrated-test/pom.xml

为了细分Apache ShenYu的不同功能的集成测试,我们在这一步将构建集成测试模块定制的网关。所谓的“定制”就是在pom文件中引入需要的最少依赖,然后代替默认的 shenyu-bootstrap。与上面两个步骤类似,这一步也会构建出docker镜像。

值得注意的是,这里的打包构建的方式与 shenyu-dist 模块的有一些不同,你可以通过对比pom文件发现。

运行docker compose

- name: Start docker compose
if: env.SKIP_CI != 'true'
run: docker-compose -f ./shenyu-integrated-test/${{ matrix.case }}/docker-compose.yml up -d

这一步将会根据集成测试模块下编写好的不同的 docker-compose.yml 文件,进行docker编排。

version: "3.9"
services:
shenyu-zk:
container_name: shenyu-zk
image: zookeeper:3.5
...
shenyu-redis:
image: redis:6.0-alpine
container_name: shenyu-redis
...

shenyu-examples-http:
deploy:
resources:
limits:
memory: 2048M
container_name: shenyu-examples-http
image: shenyu-examples-http:latest
...

shenyu-admin:
image: apache/shenyu-admin:latest
container_name: shenyu-admin
...

shenyu-integrated-test-http:
container_name: shenyu-integrated-test-http
image: apache/shenyu-integrated-test-http:latest
...
depends_on:
shenyu-admin:
condition: service_healthy
healthcheck:
test: [ "CMD", "wget", "http://shenyu-integrated-test-http:9195/actuator/health" ]
timeout: 2s
retries: 30

networks:
shenyu:
name: shenyu

例如 shenyu-integrated-test-http 模块下的 docker-compose.yml,按顺序启动了zookeeper、redis、example、admin、网关等服务。其中,example、admin、网关的镜像是我们之前构建的。

其中,docker-compose利用 depends_on 确定了服务之间的拓扑关系,并且大部分服务都有相应的健康检查,待健康检查通过后才会启动下一个服务。

运行健康检查,等待docker-compose启动完毕

- name: Wait for docker compose start up completely
if: env.SKIP_CI != 'true'
run: bash ./shenyu-integrated-test/${{ matrix.case }}/script/healthcheck.sh

在这一步,宿主机会运行 healthcheck.sh 这个脚本,然后利用 curl 命令访问各个服务列表(在services.list文件中)的健康状态接口 /actuator/health,一直到服务状态都为正常才会继续。

运行测试

- name: Run test
id: test
if: env.SKIP_CI != 'true'
run: ./mvnw test -Pit -f ./shenyu-integrated-test/${{ matrix.case }}/pom.xml
continue-on-error: true

这一步就是利用maven test命令,逐个执行 /src/test/ 目录下的测试类。

查看Docker Compose日志

- name: Check test result
if: env.SKIP_CI != 'true'
run: |
docker-compose -f ./shenyu-integrated-test/${{ matrix.case }}/docker-compose.yml logs --tail="all"
if [[ ${{steps.test.outcome}} == "failure" ]]; then
echo "Test Failed"
exit 1
else
echo "Test Successful"
exit 0
fi

当工作流出现错误时,docker compose的日志可以帮助我们更好的排查问题,所以在这一步我们将docker compose的日志打印出来。

· One min read

本文基于shenyu-2.6.1版本进行源码分析.

Shenyu 提供了一种机制来定制自己的插件或是修改已有的插件,在其内部通过extPlugin的配置实现,其需要满足以下两点:

  1. 实现接口 ShenyuPlugin 或是 PluginDataHandler
  2. 将实现的包打包后,放置于shenyu.extPlugin.path对应的路径下

入口

真正实现该逻辑的类是ShenyuLoaderService,接下来看下该类是如何处理

    public ShenyuLoaderService(final ShenyuWebHandler webHandler, final CommonPluginDataSubscriber subscriber, final ShenyuConfig shenyuConfig) {
// 插件信息的信息订阅
this.subscriber = subscriber;
// Shenyu封装的WebHandler,包含了所有的插件逻辑
this.webHandler = webHandler;
// 配置信息
this.shenyuConfig = shenyuConfig;
// 扩展插件的配置信息,如路径,是否启用、开启多少线程来处理、检查加载的频率等信息
ExtPlugin config = shenyuConfig.getExtPlugin();
// 如果启用的,则创建定时任务来检查并加载
if (config.getEnabled()) {
// 创建一个指定线程名称的定时任务
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(config.getThreads(), ShenyuThreadFactory.create("plugin-ext-loader", true));
// 创建固定频率执行的任务,默认在30s,每300s,执行一次
executor.scheduleAtFixedRate(() -> loadExtOrUploadPlugins(null), config.getScheduleDelay(), config.getScheduleTime(), TimeUnit.SECONDS);
}
}

该类有以下几个属性:

webHandler: 该类是shenyu 处理请求的入口,引用了所有的插件数据,在扩展插件加载后,需要进行更新

subscriber: 该类是插件的订阅的入口,引用了所有插件的订阅处理类,在扩展配置加载后,也需要进行同步更新

executor: 在ShenyuLoaderService内部会创建一个定时任务,来定时扫描加载指定路径下的jar包,便于加载扩展的插件,实现动态发现 默认会在启动30秒后,每300秒扫描一次

同时这里可以通过 shenyu.extPlugin.enabled配置来决定是否要开启扩展插件功能的启用

以上的配置可以在配置文件中进行调整:

shenyu:
extPlugin:
path: # 扩展插件的存储目录
enabled: true # 是否启用扩展功能
threads: 1 # 扫描加载的线程数
scheduleTime: 300 # 任务执行的频率
scheduleDelay: 30 # 任务启动后多久开始执行

接下来看下加载的逻辑:

   public void loadExtOrUploadPlugins(final PluginData uploadedJarResource) {
try {
List<ShenyuLoaderResult> plugins = new ArrayList<>();
// 获取ShenyuPluginClassloader的持有对象
ShenyuPluginClassloaderHolder singleton = ShenyuPluginClassloaderHolder.getSingleton();
if (Objects.isNull(uploadedJarResource)) {
// 参数为空,则从扩展的目录,加载所有的jar包
// PluginJar:包含ShenyuPlugin接口、PluginDataHandler接口的数据
List<PluginJarParser.PluginJar> uploadPluginJars = ShenyuExtPathPluginJarLoader.loadExtendPlugins(shenyuConfig.getExtPlugin().getPath());
// 遍历所有的待加载插件
for (PluginJarParser.PluginJar extPath : uploadPluginJars) {
LOG.info("shenyu extPlugin find new {} to load", extPath.getAbsolutePath());
// 使用扩展插件的加载器来加载指定的插件,便于后续的加载和卸载
ShenyuPluginClassLoader extPathClassLoader = singleton.createPluginClassLoader(extPath);
// 使用ShenyuPluginClassLoader 进行加载
// 主要逻辑是:判断是否实现ShenyuPlugin接口、PluginDataHandler接口 或是否标识 @Component\@Service等注解,如果有,则注册为SpringBean
// 构造 ShenyuLoaderResult对象
plugins.addAll(extPathClassLoader.loadUploadedJarPlugins());
}
} else {
// 加载指定jar,逻辑同加载全部
PluginJarParser.PluginJar pluginJar = PluginJarParser.parseJar(Base64.getDecoder().decode(uploadedJarResource.getPluginJar()));
LOG.info("shenyu upload plugin jar find new {} to load", pluginJar.getJarKey());
ShenyuPluginClassLoader uploadPluginClassLoader = singleton.createPluginClassLoader(pluginJar);
plugins.addAll(uploadPluginClassLoader.loadUploadedJarPlugins());
}
// 将扩展的插件,加入到ShenyuWebHandler的插件列表,后续的请求则会经过加入的插件内容
loaderPlugins(plugins);
} catch (Exception e) {
LOG.error("shenyu plugins load has error ", e);
}
}

该方法处理的逻辑:

  1. 判断参数uploadedJarResource是否有值,如果没有,则会加载全部,否则加载指定资源jar包进行处理
  2. shenyu.extPlugin.path 中获取到指定jar包,并封装成 PluginJar对象,该对象包含了jar包以下信息
    • version: 版本信息
    • groupId:包的groupId
    • artifactId: 包的 artifactId
    • absolutePath: 绝对路径
    • clazzMap:class对应的字节码
    • resourceMap:jar包的字节码
  3. 通过ShenyuPluginClassloaderHolder创建对应的ClassLoader,对应的类是ShenyuPluginClassLoader, 并进行加载对应的类
    • 调用ShenyuPluginClassLoader.loadUploadedJarPlugins 加载对应的类并注册成Spring Bean,这样可以使用Spring容器来管理
  4. 调用loaderPlugins方法,将扩展的插件更新到 webHandler 以及 subscriber

插件注册

对于提供的jar包里的内容,加载器只会处理指定接口类型的类,实现逻辑在 ShenyuPluginClassLoader.loadUploadedJarPlugins() 方法

public List<ShenyuLoaderResult> loadUploadedJarPlugins() {
List<ShenyuLoaderResult> results = new ArrayList<>();
// 所有的类映射关系
Set<String> names = pluginJar.getClazzMap().keySet();
// 遍历所有的类
names.forEach(className -> {
Object instance;
try {
// 尝试创建对象,如果可以,则加入到Spring容器中
instance = getOrCreateSpringBean(className);
if (Objects.nonNull(instance)) {
// 构建ShenyuLoaderResult对象
results.add(buildResult(instance));
LOG.info("The class successfully loaded into a upload-Jar-plugin {} is registered as a spring bean", className);
}
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
LOG.warn("Registering upload-Jar-plugins succeeds spring bean fails:{}", className, e);
}
});
return results;
}

该方法就是负责构建所有符合条件的对象,并封装成 ShenyuLoaderResult对象,该对象对于创建后对象,进行了封装,会在方法 buildResult()中进行处理

    private ShenyuLoaderResult buildResult(final Object instance) {
ShenyuLoaderResult result = new ShenyuLoaderResult();
// 创建的对象是否实现了ShenyuPlugin
if (instance instanceof ShenyuPlugin) {
result.setShenyuPlugin((ShenyuPlugin) instance);
// 创建的对象是否实现了PluginDataHandler
} else if (instance instanceof PluginDataHandler) {
result.setPluginDataHandler((PluginDataHandler) instance);
}
return result;
}

同时进入方法 getOrCreateSpringBean() 进一步分析

    private <T> T getOrCreateSpringBean(final String className) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
// 确认是否已经注册过了,如果有则不处理,直接返回
if (SpringBeanUtils.getInstance().existBean(className)) {
return SpringBeanUtils.getInstance().getBeanByClassName(className);
}
lock.lock();
try {
// Double check,
T inst = SpringBeanUtils.getInstance().getBeanByClassName(className);
if (Objects.isNull(inst)) {
// 使用 ShenyuPluginClassLoader 进行加载类
Class<?> clazz = Class.forName(className, false, this);
//Exclude ShenyuPlugin subclass and PluginDataHandler subclass
// without adding @Component @Service annotation
// 确认是否是 ShenyuPlugin 或是 PluginDataHandler的子类
boolean next = ShenyuPlugin.class.isAssignableFrom(clazz)
|| PluginDataHandler.class.isAssignableFrom(clazz);
if (!next) {
// 如果不是,确认是否标识了 @Component 与 @Service 注解
Annotation[] annotations = clazz.getAnnotations();
next = Arrays.stream(annotations).anyMatch(e -> e.annotationType().equals(Component.class)
|| e.annotationType().equals(Service.class));
}
if (next) {
// 如果符合以上内容,则注册Bean
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClassName(className);
beanDefinition.setAutowireCandidate(true);
beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
// 注册bean
String beanName = SpringBeanUtils.getInstance().registerBean(beanDefinition, this);
// 创建对象
inst = SpringBeanUtils.getInstance().getBeanByClassName(beanName);
}
}
return inst;
} finally {
lock.unlock();
}
}

逻辑大致如下:

  1. 判断是否实现了接口 ShenyuPluginPluginDataHandler, 如果没有,则是否标识了 @Component 或是 @Service
  2. 如果符合1的条件,则将该对象注册到Spring 容器,并返回创建的对象

同步

在插件注册成功后,这时只是实例化了插件,但它还不会生效,因为它还未添加到 Shenyu的插件链中,同步逻辑由 loaderPlugins()方法实现

    private void loaderPlugins(final List<ShenyuLoaderResult> results) {
if (CollectionUtils.isEmpty(results)) {
return;
}
// 获取所有实现了接口ShenyuPlugin的对象
List<ShenyuPlugin> shenyuExtendPlugins = results.stream().map(ShenyuLoaderResult::getShenyuPlugin).filter(Objects::nonNull).collect(Collectors.toList());
// 同步更新webHandler中plugins
webHandler.putExtPlugins(shenyuExtendPlugins);
// 获取所有实现了接口PluginDataHandler的对象
List<PluginDataHandler> handlers = results.stream().map(ShenyuLoaderResult::getPluginDataHandler).filter(Objects::nonNull).collect(Collectors.toList());
// 同步扩展的PluginDataHandler
subscriber.putExtendPluginDataHandler(handlers);

}

该方法的逻辑处理了两个数据:

  1. 将实现了 ShenyuPlugin 接口的数据,同步至 webHandler的plugins 列表
    public void putExtPlugins(final List<ShenyuPlugin> extPlugins) {
if (CollectionUtils.isEmpty(extPlugins)) {
return;
}
// 过滤出新增的插件
final List<ShenyuPlugin> shenyuAddPlugins = extPlugins.stream()
.filter(e -> plugins.stream().noneMatch(plugin -> plugin.named().equals(e.named())))
.collect(Collectors.toList());
// 过滤出更新的插件,以名称和旧的相同来判断,则为更新
final List<ShenyuPlugin> shenyuUpdatePlugins = extPlugins.stream()
.filter(e -> plugins.stream().anyMatch(plugin -> plugin.named().equals(e.named())))
.collect(Collectors.toList());
// 如果没有数据,则跳过
if (CollectionUtils.isEmpty(shenyuAddPlugins) && CollectionUtils.isEmpty(shenyuUpdatePlugins)) {
return;
}
// 复制旧的数据
// copy new list
List<ShenyuPlugin> newPluginList = new ArrayList<>(plugins);
// 添加新的插件数据
// Add extend plugin from pluginData or shenyu ext-lib
this.sourcePlugins.addAll(shenyuAddPlugins);
// 添加新数据
if (CollectionUtils.isNotEmpty(shenyuAddPlugins)) {
shenyuAddPlugins.forEach(plugin -> LOG.info("shenyu auto add extends plugins:{}", plugin.named()));
newPluginList.addAll(shenyuAddPlugins);
}
// 修改更新的数据
if (CollectionUtils.isNotEmpty(shenyuUpdatePlugins)) {
shenyuUpdatePlugins.forEach(plugin -> LOG.info("shenyu auto update extends plugins:{}", plugin.named()));
for (ShenyuPlugin updatePlugin : shenyuUpdatePlugins) {
for (int i = 0; i < newPluginList.size(); i++) {
if (newPluginList.get(i).named().equals(updatePlugin.named())) {
newPluginList.set(i, updatePlugin);
}
}
for (int i = 0; i < this.sourcePlugins.size(); i++) {
if (this.sourcePlugins.get(i).named().equals(updatePlugin.named())) {
this.sourcePlugins.set(i, updatePlugin);
}
}
}
}
// 重新排序
plugins = sortPlugins(newPluginList);
}
  1. 将实现了 PluginDataHandler 接口的数据,同步至 subscriber 的handlers 列表
    public void putExtendPluginDataHandler(final List<PluginDataHandler> handlers) {
if (CollectionUtils.isEmpty(handlers)) {
return;
}
// 遍历所有数据
for (PluginDataHandler handler : handlers) {
String pluginNamed = handler.pluginNamed();
// 更新现有的PluginDataHandler列表
MapUtils.computeIfAbsent(handlerMap, pluginNamed, name -> {
LOG.info("shenyu auto add extends plugin data handler name is :{}", pluginNamed);
return handler;
});
}
}

至此,扩展插件的加载过程分析结束。

· One min read
Kunshuai Zhu

开始前,可以参考 这篇文章 运行shenyu网关

正文

首先,看ContextPathPlugin#doExecute方法,这是这个插件的核心。

protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
...
// 1. 从JVM缓存中取得contextMappingHandle
ContextMappingHandle contextMappingHandle = ContextPathPluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));
...
// 2. 根据contextMappingHandle设置shenyu上下文
buildContextPath(shenyuContext, contextMappingHandle);
return chain.execute(exchange);
}
  1. 从JVM缓存中取得contextMappingHandle

    这里的contextMappingHandleContextMappingHandle类的实例,里面有两个成员变量:contextPathaddPrefix

    这两个变量在之前Admin里面的Rules表单里有出现过,是在数据同步的时候更新的。

  2. 根据contextMappingHandle设置shenyu上下文

    下面是ContextPathPlugin#buildContextPath方法的源代码

    private void buildContextPath(final ShenyuContext context, final ContextMappingHandle handle) {
    String realURI = "";
    // 1. 设置shenyu的context path,根据contextPath的长度将真实URI的前缀去掉
    if (StringUtils.isNoneBlank(handle.getContextPath())) {
    context.setContextPath(handle.getContextPath());
    context.setModule(handle.getContextPath());
    realURI = context.getPath().substring(handle.getContextPath().length());
    }
    // 加上前缀
    if (StringUtils.isNoneBlank(handle.getAddPrefix())) {
    if (StringUtils.isNotBlank(realURI)) {
    realURI = handle.getAddPrefix() + realURI;
    } else {
    realURI = handle.getAddPrefix() + context.getPath();
    }
    }
    context.setRealUrl(realURI);
    }
    • 设置shenyu的context path,根据contextPath的长度将真实URI的前缀去掉

      你可能会有疑问,这里所谓的「根据contextPath的长度」会不会有问题呢?

      实际上这样的判断是没有问题的,因为请求在被Selector和Rules匹配到之后,才会被插件处理。所以在设置好Selector和Rules的前提下,是完全可以满足转换特定contextPath的需求的。

然后,ContextPathPlugin类还有一个比较重要的方法skip,下面展示了部分代码。我们可以发现:如果是对RPC服务的调用,就会直接跳过context_path插件。

public Boolean skip(final ServerWebExchange exchange) {
...
return Objects.equals(rpcType, RpcTypeEnum.DUBBO.getName())
|| Objects.equals(rpcType, RpcTypeEnum.GRPC.getName())
|| Objects.equals(rpcType, RpcTypeEnum.TARS.getName())
|| Objects.equals(rpcType, RpcTypeEnum.MOTAN.getName())
|| Objects.equals(rpcType, RpcTypeEnum.SOFA.getName());
}

最后,context_path插件还有另一个类ContextPathPluginDataHandler。这个类的作用是订阅插件的数据,当插件配置被修改、删除、增加时,就往JVM缓存里面修改、删除、新增数据。