WebSocket Data Synchronization Source Code Analysis
In ShenYu
gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for ZooKeeper
, WebSocket
, http long poll
, Nacos
, etcd
and Consul
. The main content of this article is based on WebSocket
data synchronization source code analysis.
This paper based on
shenyu-2.4.0
version of the source code analysis, the official website of the introduction of please refer to the Data Synchronization Design .
#
1. About WebSocket CommunicationThe WebSocket protocol was born in 2008 and became an international standard in 2011. It can be two-way communication, the server can take the initiative to push information to the client, the client can also take the initiative to send information to the server. The WebSocket protocol is based on the TCP protocol and belongs to the application layer, with low performance overhead and high communication efficiency. The protocol identifier is ws
.
#
2. Admin Data SyncLet's trace the source code from a real case, such as adding a selector data in the background management system:
#
2.1 Accept Changed Data- SelectorController.createSelector()
Enter the createSelector() method of the SelectorController
class, which validates data, adds or updates data, and returns results.
@Validated@RequiredArgsConstructor@RestController@RequestMapping("/selector")public class SelectorController { @PostMapping("") public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) { // @Valid 数校验 // create or update data Integer createCount = selectorService.createOrUpdate(selectorDTO); // return result return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount); } // ......}
#
2.2 Handle Data- SelectorServiceImpl.createOrUpdate()
Convert data in the SelectorServiceImpl
class using the createOrUpdate()
method, save it to the database, publish the event, update upstream
.
@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService { // eventPublisher private final ApplicationEventPublisher eventPublisher; @Override @Transactional(rollbackFor = Exception.class) public int createOrUpdate(final SelectorDTO selectorDTO) { int selectorCount; // build data DTO --> DO SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); // insert or update ? if (StringUtils.isEmpty(selectorDTO.getId())) { // insert into data selectorCount = selectorMapper.insertSelective(selectorDO); // insert into condition data selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); // check selector add if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) { DataPermissionDTO dataPermissionDTO = new DataPermissionDTO(); dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId()); dataPermissionDTO.setDataId(selectorDO.getId()); dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE); dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO)); }
} else { // update data, delete and then insert selectorCount = selectorMapper.updateSelective(selectorDO); //delete rule condition then add selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId())); selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO); selectorConditionMapper.insertSelective(selectorConditionDO); }); } // publish event publishEvent(selectorDO, selectorConditionDTOs);
// update upstream updateDivideUpstream(selectorDO); return selectorCount; } // ...... }
In the Service
class to persist data, i.e. to the database, this should be familiar, not expand. The update upstream operation is analyzed in the corresponding section below, focusing on the publish event operation, which performs data synchronization.
The logic of the publishEvent()
method is to find the plugin corresponding to the selector, build the conditional data, and publish the change data.
private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) { // find plugin of selector PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId()); // build condition data List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList()); // publish event eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList)))); }
Change data released by eventPublisher.PublishEvent()
is complete, the eventPublisher
object is a ApplicationEventPublisher
class, The fully qualified class name is org.springframework.context.ApplicationEventPublisher
. Here we see that publishing data is done through Spring
related functionality.
ApplicationEventPublisher
:When a state change, the publisher calls
ApplicationEventPublisher
ofpublishEvent
method to release an event,Spring
container broadcast event for all observers, The observer'sonApplicationEvent
method is called to pass the event object to the observer. There are two ways to callpublishEvent
method, one is to implement the interface by the container injectionApplicationEventPublisher
object and then call the method, the other is a direct call container, the method of two methods of publishing events not too big difference.
ApplicationEventPublisher
: publish event;ApplicationEvent
:Spring
event, record the event source, time, and data;ApplicationListener
: event listener, observer.
In Spring event publishing mechanism, there are three objects,
An object is a publish event ApplicationEventPublisher
, in ShenYu
through the constructor in the injected a eventPublisher
.
The other object is ApplicationEvent
, inherited from ShenYu
through DataChangedEvent
, representing the event object.
public class DataChangedEvent extends ApplicationEvent {//......}
The last object is ApplicationListener
in ShenYu
in through DataChangedEventDispatcher
class implements this interface, as the event listener, responsible for handling the event object.
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
//...... }
#
2.3 Dispatch Data- DataChangedEventDispatcher.onApplicationEvent()
Released when the event is completed, will automatically enter the DataChangedEventDispatcher
class onApplicationEvent()
method of handling events.
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
/** * This method is called when there are data changes * @param event */ @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { // Iterate through the data change listener (usually using a data synchronization approach is fine) for (DataChangedListener listener : listeners) { // What kind of data has changed switch (event.getGroupKey()) { case APP_AUTH: // app auth data listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: // plugin data listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: // rule data listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: // selector data listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: // metadata listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; default: // Other types throw exception throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } } }
When there is a data change, the onApplicationEvent
method is called and all the data change listeners are iterated to determine the data type and handed over to the appropriate data listener for processing.
ShenYu groups all the data into five categories: APP_AUTH, PLUGIN, RULE, SELECTOR and META_DATA.
Here the data change listener (DataChangedListener
) is an abstraction of the data synchronization policy. Its concrete implementation is:
These implementation classes are the synchronization strategies currently supported by ShenYu:
WebsocketDataChangedListener
: data synchronization based on Websocket;ZookeeperDataChangedListener
:data synchronization based on Zookeeper;ConsulDataChangedListener
: data synchronization based on Consul;EtcdDataDataChangedListener
:data synchronization based on etcd;HttpLongPollingDataChangedListener
:data synchronization based on http long polling;NacosDataChangedListener
:data synchronization based on nacos;
Given that there are so many implementation strategies, how do you decide which to use?
Because this paper is based on websocket
data synchronization source code analysis, so here to WebsocketDataChangedListener
as an example, the analysis of how it is loaded and implemented.
A global search in the source code project shows that its implementation is done in the DataSyncConfiguration
class.
/** * Data Sync Configuration * By springboot conditional assembly * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration { /** * The WebsocketListener(default strategy). */ @Configuration @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true) @EnableConfigurationProperties(WebsocketSyncProperties.class) static class WebsocketListener {
/** * Config event listener data changed listener. * @return the data changed listener */ @Bean @ConditionalOnMissingBean(WebsocketDataChangedListener.class) public DataChangedListener websocketDataChangedListener() { return new WebsocketDataChangedListener(); }
/** * Websocket collector. * Websocket collector class: establish a connection, send a message, close the connection and other operations * @return the websocket collector */ @Bean @ConditionalOnMissingBean(WebsocketCollector.class) public WebsocketCollector websocketCollector() { return new WebsocketCollector(); }
/** * Server endpoint exporter * * @return the server endpoint exporter */ @Bean @ConditionalOnMissingBean(ServerEndpointExporter.class) public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } } //......}
This configuration class is implemented through the SpringBoot
conditional assembly class. The WebsocketListener
class has several annotations:
@Configuration
: Configuration file, application context;@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
: attribute condition. The configuration class takes effect only when the condition is met. That is, when we have the following configuration,websocket
is used for data synchronization. Note, however, thematchIfMissing = true
attribute, which means that this configuration class will work if you don't have the following configuration. Data synchronization based onwebSocket
is officially recommended and the default.shenyu: sync: websocket: enabled: true
@EnableConfigurationProperties
:enable configuration properties;
When we take the initiative to configuration, use the websocket
data synchronization, WebsocketDataChangedListener
is generated. So in the event handler onApplicationEvent()
, it goes to the corresponding listener
. In our case, a selector is to increase the new data, the data by adopting the websocket
, so, the code will enter the WebsocketDataChangedListener
selector data change process.
@Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { // Iterate through the data change listener (usually using a data synchronization approach is fine) for (DataChangedListener listener : listeners) { // What kind of data has changed switch (event.getGroupKey()) { // other logic is omitted case SELECTOR: // selector data listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); // WebsocketDataChangedListener handle selector data break; } }
#
2.4 Websocket Data Changed Listener- WebsocketDataChangedListener.onSelectorChanged()
In the onSelectorChanged()
method, the data is encapsulated into WebsocketData
and then sent via webSocketCollector.send()
.
// selector data has been updated @Override public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) { // build WebsocketData WebsocketData<SelectorData> websocketData = new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList); // websocket send data WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); }
#
2.5 Websocket Send Data- WebsocketCollector.send()
In the send()
method, the type of synchronization is determined and processed according to the different types.
@Slf4j@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)public class WebsocketCollector { /** * Send. * * @param message the message * @param type the type */ public static void send(final String message, final DataEventTypeEnum type) { if (StringUtils.isNotBlank(message)) { // If it's MYSELF (first full synchronization) if (DataEventTypeEnum.MYSELF == type) { // get the session from ThreadLocal Session session = (Session) ThreadLocalUtil.get(SESSION_KEY); if (session != null) { // send full data to the session sendMessageBySession(session, message); } } else { // subsequent incremental synchronization // synchronize change data to all sessions SESSION_SET.forEach(session -> sendMessageBySession(session, message)); } } }
private static void sendMessageBySession(final Session session, final String message) { try { // The message is sent through the Websocket session session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("websocket send result is exception: ", e); } }}
The example we give is a new operation, an incremental synchronization, so it goes
SESSION_SET.forEach(session -> sendMessageBySession(session, message));
then through
session.getBasicRemote().sendText(message);
the data was sent out.
At this point, when data changes occur on the admin side, the changed data is increments sent to the gateway through the WebSocket
.
At this point, do you have any questions? For example, where does session come from? How does the gateway establish a connection with admin?
Don't worry, let's do the synchronization analysis on the gateway side.
However, before continuing with the source code analysis, let's use a diagram to string together the above analysis process.
#
3. Gateway Data SyncAssume ShenYu
gateway is already in normal operation, using the data synchronization mode is also websocket
. How does the gateway receive and process new selector data from admin and send it to the gateway via WebSocket? Let's continue our source code analysis to find out.
#
3.1 WebsocketClient Accept Data- ShenyuWebsocketClient.onMessage()
There is a ShenyuWebsocketClient
class on the gateway, which inherits from WebSocketClient
and can establish a connection and communicate with WebSocket
.
public final class ShenyuWebsocketClient extends WebSocketClient { // ......}
After sending data via websocket
on the admin side, ShenyuWebsocketClient
can receive data via onMessage()
and then process it itself.
public final class ShenyuWebsocketClient extends WebSocketClient { // execute after receiving the message @Override public void onMessage(final String result) { // handle accept data handleResult(result); } private void handleResult(final String result) { // data deserialization WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class); // which data types, plug-ins, selectors, rules... ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType()); // which operation type, update, delete... String eventType = websocketData.getEventType(); String json = GsonUtils.getInstance().toJson(websocketData.getData());
// handle data websocketDataHandler.executor(groupEnum, json, eventType); }}
After receiving the data, first has carried on the deserialization operation, read the data type and operation type, then hand over to websocketDataHandler.executor()
for processing.
#
3.2 Execute Websocket Data Handler- WebsocketDataHandler.executor()
A Websocket
data handler is created in factory mode, providing one handler for each data type:
plugin --> PluginDataHandler;
selector --> SelectorDataHandler;
rule --> RuleDataHandler;
auth --> AuthDataHandler;
metadata --> MetaDataHandler.
/** * Create Websocket data handlers through factory mode * The type Websocket cache handler. */public class WebsocketDataHandler {
private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
/** * Instantiates a new Websocket data handler. * @param pluginDataSubscriber the plugin data subscriber * @param metaDataSubscribers the meta data subscribers * @param authDataSubscribers the auth data subscribers */ public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { // plugin --> PluginDataHandler ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber)); // selector --> SelectorDataHandler ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber)); // rule --> RuleDataHandler ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber)); // auth --> AuthDataHandler ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers)); // metadata --> MetaDataHandler ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers)); }
/** * Executor. * * @param type the type * @param json the json * @param eventType the event type */ public void executor(final ConfigGroupEnum type, final String json, final String eventType) { // find the corresponding data handler based on the data type ENUM_MAP.get(type).handle(json, eventType); }}
Different data types have different ways of handling data, so there are different implementation classes. But they also have the same processing logic between them, so they can be implemented through the template approach to design patterns. The same logic is placed in the handle()
method of the abstract class, and the different logic is handed over to the respective implementation class.
In our case, a new selector is added, so it will be passed to the SelectorDataHandler
for data processing.
#
3.3 Determine the Event Type- AbstractDataHandler.handle()
Implement common logical handling of data changes: invoke different methods based on different operation types.
public abstract class AbstractDataHandler<T> implements DataHandler {
/** * Convert list. * The different logic is implemented by the respective implementation classes * @param json the json * @return the list */ protected abstract List<T> convert(String json);
/** * Do refresh. * The different logic is implemented by the respective implementation classes * @param dataList the data list */ protected abstract void doRefresh(List<T> dataList);
/** * Do update. * The different logic is implemented by the respective implementation classes * @param dataList the data list */ protected abstract void doUpdate(List<T> dataList);
/** * Do delete. * The different logic is implemented by the respective implementation classes * @param dataList the data list */ protected abstract void doDelete(List<T> dataList);
// General purpose logic, abstract class implementation @Override public void handle(final String json, final String eventType) { List<T> dataList = convert(json); if (CollectionUtils.isNotEmpty(dataList)) { DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType); switch (eventTypeEnum) { case REFRESH: case MYSELF: doRefresh(dataList); //Refreshes data and synchronizes all data break; case UPDATE: case CREATE: doUpdate(dataList); // Update or create data, incremental synchronization break; case DELETE: doDelete(dataList); // delete data break; default: break; } } }}
New selector data, new operation, through switch-case
into doUpdate()
method.
#
3.4 Enter the Specific Data Handler- SelectorDataHandler.doUpdate()
/** * The type Selector data handler. */@RequiredArgsConstructorpublic class SelectorDataHandler extends AbstractDataHandler<SelectorData> {
private final PluginDataSubscriber pluginDataSubscriber;
//......
// update data @Override protected void doUpdate(final List<SelectorData> dataList) { dataList.forEach(pluginDataSubscriber::onSelectorSubscribe); }}
Iterate over the data and enter the onSelectorSubscribe()
method.
- PluginDataSubscriber.onSelectorSubscribe()
It has no additional logic and calls the subscribeDataHandler()
method directly. Within methods, there are data types (plugins, selectors, or rules) and action types (update or delete) to perform different logic.
/** * The common plugin data subscriber, responsible for handling all plug-in, selector, and rule information */public class CommonPluginDataSubscriber implements PluginDataSubscriber { //...... // handle selector data @Override public void onSelectorSubscribe(final SelectoData selectorData) { subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE); } // A subscription data handler that handles updates or deletions of data private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { // plugin data if (data instanceof PluginData) { PluginData pluginData = (PluginData) data; if (dataType == DataEventTypeEnum.UPDATE) { // update // save the data to gateway memory BaseDataCache.getInstance().cachePluginData(pluginData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData)); } else if (dataType == DataEventTypeEnum.DELETE) { // delete // delete the data from gateway memory BaseDataCache.getInstance().removePluginData(pluginData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData)); } } else if (data instanceof SelectorData) { // selector data SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { // update // save the data to gateway memory BaseDataCache.getInstance().cacheSelectData(selectorData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData)); } else if (dataType == DataEventTypeEnum.DELETE) { // delete // delete the data from gateway memory BaseDataCache.getInstance().removeSelectData(selectorData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); } } else if (data instanceof RuleData) { // rule data RuleData ruleData = (RuleData) data; if (dataType == DataEventTypeEnum.UPDATE) { // update // save the data to gateway memory BaseDataCache.getInstance().cacheRuleData(ruleData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData)); } else if (dataType == DataEventTypeEnum.DELETE) { // delete // delete the data from gateway memory BaseDataCache.getInstance().removeRuleData(ruleData); // If each plugin has its own processing logic, then do it Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData)); } } }); } }
Adding a selector will enter the following logic:
// save the data to gateway memoryBaseDataCache.getInstance().cacheSelectData(selectorData);// If each plugin has its own processing logic, then do itOptional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
One is to save the data to the gateway's memory. BaseDataCache is the class that ultimately caches data, implemented in a singleton pattern. The selector data is stored in the SELECTOR_MAP
Map. In the subsequent use, also from this data.
public final class BaseDataCache { // private instance private static final BaseDataCache INSTANCE = new BaseDataCache(); // private constructor private BaseDataCache() { } /** * Gets instance. * public method * @return the instance */ public static BaseDataCache getInstance() { return INSTANCE; } /** * A Map of the cache selector data * pluginName -> SelectorData. */ private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap(); public void cacheSelectData(final SelectorData selectorData) { Optional.ofNullable(selectorData).ifPresent(this::selectorAccept); } /** * cache selector data. * @param data the selector data */ private void selectorAccept(final SelectorData data) { String key = data.getPluginName(); if (SELECTOR_MAP.containsKey(key)) { // Update operation, delete before insert List<SelectorData> existList = SELECTOR_MAP.get(key); final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList()); resultList.add(data); final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList()); SELECTOR_MAP.put(key, collect); } else { // Add new operations directly to Map SELECTOR_MAP.put(key, Lists.newArrayList(data)); } } }
Second, if each plugin has its own processing logic, then do it. Through the IDEA
editor, you can see that after adding a selector, there are the following plugins and processing. We're not going to expand it here.
After the above source tracing, and through a practical case, in the admin
side to add a selector data, will websocket
data synchronization process analysis cleared.
Let's use the following figure to concatenate the data synchronization process on the gateway side:
The data synchronization process has been analyzed, but there are still some problems that have not been analyzed, that is, how does the gateway establish a connection with admin?
#
4. The Gateway Establishes a Websocket Connection with Admin- websocket config
With the following configuration in the gateway configuration file and the dependency introduced, the websocket
related service is started.
shenyu: file: enabled: true cross: enabled: true dubbo : parameter: multi sync: websocket : # Use websocket for data synchronization urls: ws://localhost:9095/websocket # websocket address of admin allowOrigin: ws://localhost:9195
Add a dependency on websocket in the gateway.
<!--shenyu data sync start use websocket--><dependency> <groupId>org.apache.shenyu</groupId> <artifactId>shenyu-spring-boot-starter-sync-data-websocket</artifactId> <version>${project.version}</version></dependency>
- Websocket Data Sync Config
The associated bean is created by conditional assembly of springboot. In the gateway started, if we configure the shenyu.sync.websocket.urls
, then websocket
data synchronization configuration will be loaded. The dependency loading is done through the springboot starter
.
/** * WebsocketSyncDataService * Conditional injection is implemented through SpringBoot * Websocket sync data configuration for spring boot. */@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {
/** * Websocket sync data service. * @param websocketConfig the websocket config * @param pluginSubscriber the plugin subscriber * @param metaSubscribers the meta subscribers * @param authSubscribers the auth subscribers * @return the sync data service */ @Bean public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use websocket sync shenyu data......."); return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); }
/** * Config websocket config. * * @return the websocket config */ @Bean @ConfigurationProperties(prefix = "shenyu.sync.websocket") public WebsocketConfig websocketConfig() { return new WebsocketConfig(); }}
Start a new spring.factories
file in the resources/META-INF
directory of your project and specify the configuration classes in the file.
- WebsocketSyncDataService
The following things are done in 'WebsocketSyncDataService' :
Read configuration
urls
, which represent the admin side of the synchronization address, if there are more than one, use "," split;Create a scheduling thread pool, with each
admin
assigned one to perform scheduled tasks;Create
ShenyuWebsocketClient
, assign one to eachadmin
, set upwebsocket
communication withadmin
;Start connection with admin end
websocket
;Executes a scheduled task every 10 seconds. The main function is to determine whether the
websocket
connection has been disconnected, if so, try to reconnect. If not, aping-pong
test is performed.
/** * Websocket sync data service. */@Slf4jpublic class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
private final List<WebSocketClient> clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor;
/** * Instantiates a new Websocket sync cache. * @param websocketConfig the websocket config * @param pluginDataSubscriber the plugin data subscriber * @param metaDataSubscribers the meta data subscribers * @param authDataSubscribers the auth data subscribers */ public WebsocketSyncDataService(final WebsocketConfig websocketConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { // If there are multiple synchronization addresses on the admin side, use commas (,) to separate them String[] urls = StringUtils.split(websocketConfig.getUrls(), ","); // Create a scheduling thread pool, one for each admin executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true)); for (String url : urls) { try { //Create a WebsocketClient and assign one to each admin clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); } catch (URISyntaxException e) { log.error("websocket url({}) is error", url, e); } } try { for (WebSocketClient client : clients) { // Establish a connection with the WebSocket Server boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); if (success) { log.info("websocket connection is successful....."); } else { log.error("websocket connection is error....."); }
// Run a scheduled task every 10 seconds // The main function is to check whether the WebSocket connection is disconnected. If the connection is disconnected, retry the connection. // If it is not disconnected, the ping-pong test is performed executor.scheduleAtFixedRate(() -> { try { if (client.isClosed()) { boolean reconnectSuccess = client.reconnectBlocking(); if (reconnectSuccess) { log.info("websocket reconnect server[{}] is successful.....", client.getURI().toString()); } else { log.error("websocket reconnection server[{}] is error.....", client.getURI().toString()); } } else { client.sendPing(); log.debug("websocket send to [{}] ping message successful", client.getURI().toString()); } } catch (InterruptedException e) { log.error("websocket connect is error :{}", e.getMessage()); } }, 10, 10, TimeUnit.SECONDS); } /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/ } catch (InterruptedException e) { log.info("websocket connection...exception....", e); }
}
@Override public void close() { // close websocket client for (WebSocketClient client : clients) { if (!client.isClosed()) { client.close(); } } // close threadpool if (Objects.nonNull(executor)) { executor.shutdown(); } }}
- ShenyuWebsocketClient
The WebSocket
client created in ShenYu
to communicate with the admin
side. After the connection is successfully established for the first time, full data is synchronized and incremental data is subsequently synchronized.
/** * The type shenyu websocket client. */@Slf4jpublic final class ShenyuWebsocketClient extends WebSocketClient { private volatile boolean alreadySync = Boolean.FALSE; private final WebsocketDataHandler websocketDataHandler; /** * Instantiates a new shenyu websocket client. * @param serverUri the server uri * @param pluginDataSubscriber the plugin data subscriber * @param metaDataSubscribers the meta data subscribers * @param authDataSubscribers the auth data subscribers */ public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { super(serverUri); this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers); }
// Execute after the connection is successfully established @Override public void onOpen(final ServerHandshake serverHandshake) { // To prevent re-execution when reconnecting, use alreadySync to determine if (!alreadySync) { // Synchronize all data, type MYSELF send(DataEventTypeEnum.MYSELF.name()); alreadySync = true; } }
// Execute after receiving the message @Override public void onMessage(final String result) { // handle data handleResult(result); } // Execute after shutdown @Override public void onClose(final int i, final String s, final boolean b) { this.close(); } // Execute after error @Override public void onError(final Exception e) { this.close(); } @SuppressWarnings("ALL") private void handleResult(final String result) { // Data deserialization WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class); // Which data types, plugins, selectors, rules... ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType()); // Which operation type, update, delete... String eventType = websocketData.getEventType(); String json = GsonUtils.getInstance().toJson(websocketData.getData());
// handle data websocketDataHandler.executor(groupEnum, json, eventType); }}
#
5. SummaryThis paper through a practical case, the data synchronization principle of websocket source code analysis. The main knowledge points involved are as follows:
WebSocket
supports bidirectional communication and has good performance. It is recommended.Complete event publishing and listening via
Spring
;Support multiple synchronization strategies through abstract
DataChangedListener
interface, interface oriented programming;Use factory mode to create
WebsocketDataHandler
to handle different data types;Implement
AbstractDataHandler
using template method design patterns to handle general operation types;Use singleton design pattern to cache data class
BaseDataCache
;Loading of configuration classes via conditional assembly of SpringBoot and starter loading mechanism.