McpServer 插件源码分析
在 shenyu 网关中,启动该插件,shenyu 将成为一个功能丰富的 mcpServer, 你可以通过简单配置来将一个服务作为 tool 注册到 shenyu 网关中,并使用网关提供的扩展功能。
本文基于
shenyu-2.7.0.2
版本进行源码分析, 在本篇中我将追踪 Shenyu Mcp 插件链路,对 Mcp 插件的 sse 通信方式进行源码分析
#
前言shenyu 网关的 mcp 插件基于 spring-ai-mcp 扩展而来,为了更好的了解 mcp 插件的工作原理 ,我将简单介绍 mcp 官方提供的 jdk 中各个 java 类是如何协同运作的
我想先简单介绍三个 Mcp 官方提供的 java 类
McpServer
该类负责管理,tool,Resource,promote 等资源
TransportProvider
根据客户端和服务端之间通信协议,提供之相对应的通讯方法
Session
处理请求数据、响应数据和通知数据,提供一些基本方法和其对应的处理器,查询工具,调用工具都在此处执行
#
1. 服务注册在 shenyu admin 的 McpServer 中插件填写 endpoint 和 tool 信息后,这些信息将自动注册到 shenyu bootstrap 中, 数据同步源码可以参考官网websocket数据同步
shenyu bootstrap 将在 McpServerPluginDataHandler
的 handler()
方法中接收到 admin 同步来的数据。
handlerSelector()
方法接收 url 数据创建 McpServer
handlerRule()
方法接收 tool 信息,注册 tool
这两个方法共同组成了 Shenyu Mcp 插件的服务注册部分,下面我将对这个两个方法,详细展开分析
#
1.1 Transport,McpServer注册我们先来分析 handlerSelector()
方法,也就是 McpServer 的注册
handlerSelector()
方法 工作内容如下
- 捕捉用户在 Selector 上的填写的 url,这个 url 将作为一个 key 存储 McpServer TransportProvider 等信息
- 序列化创建
ShenyuMcpServer
,ShenyuMcpServer
将 SelectorId 和这些 url 也就是这些 key 绑定,以此来实现 selectorId 和 key 的绑定。
注意
ShenyuMcpServer
是 Shenyu 用于绑定 SelectorId 和 url 的对象,和McpServer
没有继承关系,功能也完全不同
- 调用
ShenyuMcpServerManager
的getOrCreateMcpServerTransport()
方法注册 McpServer TransportProvider
public class McpServerPluginDataHandler implements PluginDataHandler { @Override public void handlerSelector(final SelectorData selectorData) { // 获取URI String uri = selectorData.getConditionList().stream() .filter(condition -> Constants.URI.equals(condition.getParamType())) .map(ConditionData::getParamValue) .findFirst() .orElse(null); // 构建McpServer ShenyuMcpServer shenyuMcpServer = GsonUtils.getInstance().fromJson(Objects.isNull(selectorData.getHandle()) ? DEFAULT_MESSAGE_ENDPOINT : selectorData.getHandle(), ShenyuMcpServer.class); shenyuMcpServer.setPath(path); // 缓存shenyuMcpServer CACHED_SERVER.get().cachedHandle( selectorData.getId(), shenyuMcpServer); String messageEndpoint = shenyuMcpServer.getMessageEndpoint(); // 尝试获取或者注册transportProvider shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint); } }
ShenyuMcpServerManager
该类是 ShenYu 中 McpServer 的管理中心,不仅储存了McpAsyncServer
CompositeTransportProvider
等内容,注册 Transport 和 McpServer 的方法也在其中
getOrCreateMcpServerTransport()
方法工作内容具体如下
- 处理传递来的 url 去除/streamablehttp 以及 /message后缀 使其恢复为原始的 url
- 尝试获取
CompositeTransportProvider
对象,该对象是 Transport 的复合对象,包含了多种协议对应的 Transport - 如果没有获取到,则调用
createSseTransport()
方法创建CompositeTransportProvider
对象 - 创建
McpAsyncServer
对象,保存 Transport 对象到 Map 中,将 Transport 注册到McpAsyncServer
中
@Componentpublic class ShenyuMcpServerManager { public ShenyuSseServerTransportProvider getOrCreateMcpServerTransport(final String uri, final String messageEndPoint) { // 去除/streamablehttp 以及 /message后缀 String normalizedPath = processPath(uri); return getOrCreateTransport(normalizedPath, SSE_PROTOCOL, () -> createSseTransport(normalizedPath, messageEndPoint)); } private <T> T getOrCreateTransport(final String normalizedPath, final String protocol, final java.util.function.Supplier<T> transportFactory) { // 获取复合Transport实例 CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(normalizedPath);
T transport = switch (protocol) { case SSE_PROTOCOL -> (T) compositeTransport.getSseTransport(); case STREAMABLE_HTTP_PROTOCOL -> (T) compositeTransport.getStreamableHttpTransport(); default -> null; }; // 如果缓存中没有该实例,则需要重新创建 if (Objects.isNull(transport)) { // 调用createSseTransport()方法,创建一个新的transport并存储 transport = transportFactory.get(); // 创建McpAsyncServer,并注册transport addTransportToSharedServer(normalizedPath, protocol, transport); }
return transport; }}
#
1.1.1 Transport注册createSseTransport()
方法该方法在
getOrCreateMcpServerTransport()
方法被调用,用于创建 Transport
@Componentpublic class ShenyuMcpServerManager { private ShenyuSseServerTransportProvider createSseTransport(final String normalizedPath, final String messageEndPoint) { String messageEndpoint = normalizedPath + messageEndPoint; ShenyuSseServerTransportProvider transportProvider = ShenyuSseServerTransportProvider.builder() .objectMapper(objectMapper) .sseEndpoint(normalizedPath) .messageEndpoint(messageEndpoint) .build(); // 向Manager的routeMap中注册transportProvider的两个函数 registerRoutes(normalizedPath, messageEndpoint, transportProvider::handleSseConnection, transportProvider::handleMessage); return transportProvider; }}
#
1.1.2 mcpServer注册addTransportToSharedServer()
方法该方法在
getOrCreateMcpServerTransport()
方法被调用,用于创建 McpServer 并保存
该方法创建了一个新的 McpServer并存储到 sharedServerMap
中,并将上一步得到的 TransportProvider 存入 compositeTransportMap
中
@Componentpublic class ShenyuMcpServerManager { private void addTransportToSharedServer(final String normalizedPath, final String protocol, final Object transportProvider) { // 获取或者创建并注册 McpServer getOrCreateSharedServer(normalizedPath);
// 将新增的传输协议存进compositeTransportMap中 compositeTransport.addTransport(protocol, transportProvider); }
private McpAsyncServer getOrCreateSharedServer(final String normalizedPath) { return sharedServerMap.computeIfAbsent(normalizedPath, path -> { // 获取传输协议 CompositeTransportProvider compositeTransport = getOrCreateCompositeTransport(path);
// 选择Server拥有的能力 var capabilities = McpSchema.ServerCapabilities.builder() .tools(true) .logging() .build();
// 创建McpServer并存储 McpAsyncServer server = McpServer .async(compositeTransport) .serverInfo("MCP Shenyu Server (Multi-Protocol)", "1.0.0") .capabilities(capabilities) .tools(Lists.newArrayList()) .build(); return server; }); }}
#
1.2 Tools注册handlerRule()
方法 工作内容如下
- 捕捉用户在 Tool 上的填写的 tool 配置信息,这些信息将全部用于 tool 的构建
- 序列化创建
ShenyuMcpServerTool
获取 tool 信息
注意
ShenyuMcpServerTool
也是 Shenyu 存储 tool 信息的工具,和McpServerTool
没有继承关系
- 调用
addTool()
方法, 利用该 tool 信息创建 tool,并根据 SelectorId 将 tool 注册到与之匹配的 McpServer 中
public class McpServerPluginDataHandler implements PluginDataHandler { @Override public void handlerRule(final RuleData ruleData) { Optional.ofNullable(ruleData.getHandle()).ifPresent(s -> { // 序列化一个新的 ShenyuMcpServerTool ShenyuMcpServerTool mcpServerTool = GsonUtils.getInstance().fromJson(s, ShenyuMcpServerTool.class); // 缓存mcpServerTool CACHED_TOOL.get().cachedHandle(CacheKeyUtils.INST.getKey(ruleData), mcpServerTool); // 获取并构建 mcp schema List<McpServerToolParameter> parameters = mcpServerTool.getParameters(); String inputSchema = JsonSchemaUtil.createParameterSchema(parameters); ShenyuMcpServer server = CACHED_SERVER.get().obtainHandle(ruleData.getSelectorId()); if (Objects.nonNull(server)) { // 向Manager的sharedServerMap中存储Tool信息 shenyuMcpServerManager.addTool(server.getPath(), StringUtils.isBlank(mcpServerTool.getName()) ? ruleData.getName() : mcpServerTool.getName(), mcpServerTool.getDescription(), mcpServerTool.getRequestConfig(), inputSchema); } }); }}
addTool()
方法该方法被
handlerRule()
方法调用,用于新增工具
该方法做了下述工作
将上一步传来的 tool 信息转换为
shenyuToolDefinition
对象利用转换来的
shenyuToolDefinition
对象创建ShenyuToolCallback
对象ShenyuToolCallback
重写了ToolCallBack
的call()
方法,并将该call()
方法注册到了AsyncToolSpecification
中, 此后调用 tool 的call()
方法,则实际会调用这个重写的call()
方法将
ShenyuToolCallback
转换为AsyncToolSpecification
并注册到相关的 mcpServer 中
public class McpServerPluginDataHandler implements PluginDataHandler { public void addTool(final String serverPath, final String name, final String description, final String requestTemplate, final String inputSchema) { String normalizedPath = normalizeServerPath(serverPath); // 构建Definition对象 ToolDefinition shenyuToolDefinition = ShenyuToolDefinition.builder() .name(name) .description(description) .requestConfig(requestTemplate) .inputSchema(inputSchema) .build(); ShenyuToolCallback shenyuToolCallback = new ShenyuToolCallback(shenyuToolDefinition);
// 获取到先前注册的 McpServer, 并向其中注册Tool McpAsyncServer sharedServer = sharedServerMap.get(normalizedPath); for (AsyncToolSpecification asyncToolSpecification : McpToolUtils.toAsyncToolSpecifications(shenyuToolCallback)) { sharedServer.addTool(asyncToolSpecification).block(); } }}
到此为止,服务注册分析完毕
服务注册一图览
#
2. 插件调用客户端先后会发送后缀为 /sse
和 /message
的两种消息,这两种消息都会被 Shenyu McpServer plugin
捕捉,Shenyu McpServer plugin
会对 /sse
消息和 /message
消息做不同处理。收到 /sse
消息时 plugin 会创建 session 对象并保存,最后返回 session id
供 message 消息使用。收到 /message
消息时,会根据 /message
消息携带的 method 信息,选择执行的方法
如:获取工作列表,工具调用,获取资源列表等等
doExecute()
方法 工作内容如下
- 路径匹配,判断 mcp plugin 是否注册该路径
- 调用
routeByProtocol()
方法,根据请求协议选择合适的处理方案
本篇是对 sse 请求方式的解析,因此接着进入
handleSseRequest()
方法
public class McpServerPlugin extends AbstractShenyuPlugin { @Override protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) { final String uri = exchange.getRequest().getURI().getRawPath(); // 判断 Mcp 插件是否注册了该路由规则,没有则不执行 if (!shenyuMcpServerManager.canRoute(uri)) { return chain.execute(exchange); } final ServerRequest request = ServerRequest.create(exchange, messageReaders); // 根据 uri 判断路由协议,选择对应的处理方案 return routeByProtocol(exchange, chain, request, selector, uri); }
private Mono<Void> routeByProtocol(final ServerWebExchange exchange, final ShenyuPluginChain chain, final ServerRequest request, final SelectorData selector, final String uri) {
if (isStreamableHttpProtocol(uri)) { return handleStreamableHttpRequest(exchange, chain, request, uri); } else if (isSseProtocol(uri)) { // 处理sse请求 return handleSseRequest(exchange, chain, request, selector, uri); } }}
handlerSseRequest()
方法
该方法由
routeByProtocol()
方法调用,根据请求后缀判断客户端是要创建 session 还是调用工具
public class McpServerPlugin extends AbstractShenyuPlugin { private Mono<Void> handleSseRequest(final ServerWebExchange exchange, final ShenyuPluginChain chain, final ServerRequest request, final SelectorData selector, final String uri) { ShenyuMcpServer server = McpServerPluginDataHandler.CACHED_SERVER.get().obtainHandle(selector.getId()); String messageEndpoint = server.getMessageEndpoint(); // 获取传输者 ShenyuSseServerTransportProvider transportProvider = shenyuMcpServerManager.getOrCreateMcpServerTransport(uri, messageEndpoint); // 根据请求的后缀判断是 sse 连接请求还是 message 调用请求 if (uri.endsWith(messageEndpoint)) { setupSessionContext(exchange, chain); return handleMessageEndpoint(exchange, transportProvider, request); } else { return handleSseEndpoint(exchange, transportProvider, request); } }}
#
2.1 客户端发送 sse 请求如果我们客户端发送的是后缀为
/sse
的请求,那么将会执行handleSseEndpoint()
方法
handleSseEndpoint()
方法主要做了如下工作
- 配置 sse 请求头
- 调用
ShenyuSseServerTransportProvider
的createSseFlux()
创建 sse 流
public class McpServerPlugin extends AbstractShenyuPlugin { private Mono<Void> handleSseEndpoint(final ServerWebExchange exchange, final ShenyuSseServerTransportProvider transportProvider, final ServerRequest request) { // 配置 sse 请求头 configureSseHeaders(exchange);
// 创建 sse 流 return exchange.getResponse() .writeWith(transportProvider .createSseFlux(request)); }}
createSseFlux()
方法该方法被
handleSseEndpoint()
调用 主要用于创建并保存 session
- 创建 session ,创建 session 的工厂在创建 session 时会将一系列 handler 注册到 session 中,这些 handler 是真正执行 callTool 的对象
- 将 session 保存,session复用
- 将 session id 作为 endpoint 的请求参数返回给客户端,在调用 message 方法时会使用该 endpoint
public class ShenyuSseServerTransportProvider implements McpServerTransportProvider { public Flux<ServerSentEvent<?>> createSseFlux(final ServerRequest request) { return Flux.<ServerSentEvent<?>>create(sink -> { WebFluxMcpSessionTransport sessionTransport = new WebFluxMcpSessionTransport(sink); // 创建 McpServerSession 并暂存插件链信息 McpServerSession session = sessionFactory.create(sessionTransport); String sessionId = session.getId(); sessions.put(sessionId, session);
// 将 session id等信息传递回客户端 String endpointUrl = this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId; ServerSentEvent<String> endpointEvent = ServerSentEvent.<String>builder() .event(ENDPOINT_EVENT_TYPE) .data(endpointUrl) .build(); }).doOnSubscribe(subscription -> LOGGER.info("SSE Flux subscribed")) .doOnRequest(n -> LOGGER.debug("SSE Flux requested {} items", n)); }}
#
2.2 客户端发送 message 请求如果我们客户端发送的是后缀为
/message
的请求,那么将会执行 把当前ShenyuPluginChain
信息存入 session 中,并调用handleMessageEndpoint()
方法, 后续工具调用时会继续执行该插件链,因此 mcp plugin 后的插件会对进入 tool 的请求造成影响
handleMessageEndpoint()
方法,调用ShenyuSseServerTransportProvider
的handleMessageEndpoint()
方法
if (uri.endsWith(messageEndpoint)) { setupSessionContext(exchange, chain); return handleMessageEndpoint(exchange, transportProvider, request);}
public class McpServerPlugin extends AbstractShenyuPlugin { private Mono<Void> handleMessageEndpoint(final ServerWebExchange exchange, final ShenyuSseServerTransportProvider transportProvider, final ServerRequest request) { // 处理message请求 return transportProvider.handleMessageEndpoint(request) .flatMap(result -> { return exchange.getResponse() .writeWith(Mono.just(exchange.getResponse().bufferFactory().wrap(responseBody.getBytes()))); }); }}
handleMessageEndpoint()
方法该方法由
McpServerPlugin.handleMessageEndpoint()
调用,将请求交给 session 处理
session 的 handler()
方法会对 message 的不同,而进行对应的操作
例如 : 当 message 中 method 是 "tools/call" 时,则会使用工具调用的 handler() 执行 call()
方法调用工具
相关源码在此不过多赘述
public class ShenyuSseServerTransportProvider implements McpServerTransportProvider { public Mono<MessageHandlingResult> handleMessageEndpoint(final ServerRequest request) { // 获取到session String sessionId = request.queryParam("sessionId").get(); McpServerSession session = sessions.get(sessionId); return request.bodyToMono(String.class) .flatMap(body -> { McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body); return session.handle(message); }); }}
至此 Shenyu Mcp Plugin
服务调用源码分析完毕
流程图一览
#
3. 工具调用如果客户端传递的消息是调用工具的消息,那么 session 将使用工具调用的 handler() 并执行 tool 的
call()
方法, 在服务注册中,我们说明了 tool 在被调用时,实际执行的是ShenyuToolCallback()
的call()
方法
因此执行工具调用时会执行以下方法
call()
主要工作内容如下
- 获取 session id
- 获取
requestTemplate
即 shenyu 提供的额外功能的配置信息 - 获取上一步暂存的 shenyu 插件链,并将工具调用的信息交给插件链继续执行
- 异步等待工具响应
插件链执行完成后,会将调用 tool 请求真正的发送到 tool 所在的服务之中
public class ShenyuToolCallback implements ToolCallback { @NonNull @Override public String call(@NonNull final String input, final ToolContext toolContext) { // 从 mcp 请求中提取 sessionId final McpSyncServerExchange mcpExchange = extractMcpExchange(toolContext); final String sessionId = extractSessionId(mcpExchange); // 提取requestTemplate信息 final String configStr = extractRequestConfig(shenyuTool);
// 利用sessionId 获取到先前暂存的插件执行链 final ServerWebExchange originExchange = getOriginExchange(sessionId); final ShenyuPluginChain chain = getPluginChain(originExchange);
// 执行工具调用 return executeToolCall(originExchange, chain, sessionId, configStr, input);
}
private String executeToolCall(final ServerWebExchange originExchange, final ShenyuPluginChain chain, final String sessionId, final String configStr, final String input) {
final CompletableFuture<String> responseFuture = new CompletableFuture<>(); final ServerWebExchange decoratedExchange = buildDecoratedExchange( originExchange, responseFuture, sessionId, configStr, input); // 执行插件链,调用实际工具 chain.execute(decoratedExchange) .subscribe();
// 等待响应 final String result = responseFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); return result;
}}
至此Shenyu MCP Plugin 工具调用分析完毕
#
4. 小结本文源码分析从 mcp 服务注册开始,到 mcp 插件的服务调用,再到 tool 的调用。 mcpServer 插件让 shenyu 成为一个功能强大,集中管理的 mcpServer。