Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package org.springframework.ai.mcp;

import java.net.HttpURLConnection;
import java.net.URL;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import io.modelcontextprotocol.AbstractMcpClientServerIntegrationTests;
Expand All @@ -28,11 +31,13 @@
import io.modelcontextprotocol.server.McpServer.AsyncSpecification;
import io.modelcontextprotocol.server.McpServer.SingleSessionSyncSpecification;
import io.modelcontextprotocol.server.McpTransportContextExtractor;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.provider.Arguments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

Expand All @@ -44,10 +49,11 @@
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;

@Disabled("Flaky test - needs investigation")
@Timeout(45)
class WebFluxSseIT extends AbstractMcpClientServerIntegrationTests {

private static final Logger log = LoggerFactory.getLogger(WebFluxSseIT.class);

private static final String CUSTOM_SSE_ENDPOINT = "/somePath/sse";

private static final String CUSTOM_MESSAGE_ENDPOINT = "/otherPath/mcp/message";
Expand Down Expand Up @@ -102,9 +108,35 @@ public void before() {

HttpHandler httpHandler = RouterFunctions.toHttpHandler(this.mcpServerTransportProvider.getRouterFunction());
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
this.httpServer = HttpServer.create().port(0).handle(adapter).bindNow();

prepareClients(this.httpServer.port(), null);
this.httpServer = HttpServer.create().port(0).host("0.0.0.0").handle(adapter).bindNow();

int port = this.httpServer.port();
log.info("Reactor Netty server bound to host='{}' port={}", this.httpServer.host(), port);

String probeUrl = "http://127.0.0.1:" + port + CUSTOM_SSE_ENDPOINT;
Awaitility.await()
.alias("MCP server reachable at " + probeUrl)
.atMost(10, TimeUnit.SECONDS)
.pollInterval(200, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
HttpURLConnection conn = (HttpURLConnection) new URL(probeUrl).openConnection();
conn.setConnectTimeout(1000);
conn.setReadTimeout(1000);
conn.setRequestMethod("GET");
try {
int status = conn.getResponseCode();
log.info("Sanity probe {} -> HTTP {}", probeUrl, status);
// SSE endpoint returns 200 (it streams); message endpoint returns 4xx
// for a bare GET. Either way, any HTTP response means the server is
// up.
org.assertj.core.api.Assertions.assertThat(status).isGreaterThan(0);
}
finally {
conn.disconnect();
}
});

prepareClients(port, null);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void before() {

HttpHandler httpHandler = RouterFunctions.toHttpHandler(this.mcpStreamableServerTransport.getRouterFunction());
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
this.httpServer = HttpServer.create().port(0).handle(adapter).bindNow();
this.httpServer = HttpServer.create().port(0).host("0.0.0.0").handle(adapter).bindNow();

prepareClients(this.httpServer.port(), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
Expand All @@ -50,7 +49,6 @@

import static org.assertj.core.api.Assertions.assertThat;

@Disabled("Flaky in CI, needs investigation")
class WebFluxStreamableHttpVersionNegotiationIT {

private DisposableServer httpServer;
Expand Down Expand Up @@ -90,7 +88,7 @@ void setUp() {

ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);

this.httpServer = HttpServer.create().port(0).handle(adapter).bindNow();
this.httpServer = HttpServer.create().port(0).host("0.0.0.0").handle(adapter).bindNow();
this.port = this.httpServer.port();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void before() {
HttpHandler httpHandler = RouterFunctions
.toHttpHandler(this.mcpStreamableServerTransportProvider.getRouterFunction());
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
this.httpServer = HttpServer.create().port(0).handle(adapter).bindNow();
this.httpServer = HttpServer.create().port(0).host("0.0.0.0").handle(adapter).bindNow();

prepareClients(this.httpServer.port(), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private void startHttpServer(RouterFunction<?> routerFunction) {

HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
this.httpServer = HttpServer.create().port(0).handle(adapter).bindNow();
this.httpServer = HttpServer.create().port(0).host("0.0.0.0").handle(adapter).bindNow();
int port = this.httpServer.port();
this.asyncStreamableClient = McpClient
.async(WebClientStreamableHttpTransport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private void startHttpServer(RouterFunction<?> routerFunction) {

HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
this.httpServer = HttpServer.create().port(0).handle(adapter).bindNow();
this.httpServer = HttpServer.create().port(0).host("0.0.0.0").handle(adapter).bindNow();
int port = this.httpServer.port();
this.streamableClient = McpClient.sync(WebClientStreamableHttpTransport.builder(WebClient.builder()
.baseUrl("http://127.0.0.1:" + port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void messageHostNotAllowed() {
private static void startServer(RouterFunction<?> routerFunction) {
HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
httpServer = HttpServer.create().port(0).handle(adapter).bindNow();
httpServer = HttpServer.create().port(0).host("0.0.0.0").handle(adapter).bindNow();
baseUrl = "http://localhost:" + httpServer.port();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private McpServerTransportProvider createMcpTransportProvider() {

HttpHandler httpHandler = RouterFunctions.toHttpHandler(transportProvider.getRouterFunction());
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
this.httpServer = HttpServer.create().port(0).handle(adapter).bindNow();
this.httpServer = HttpServer.create().port(0).host("0.0.0.0").handle(adapter).bindNow();
return transportProvider;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private McpServerTransportProvider createMcpTransportProvider() {
protected void onStart() {
HttpHandler httpHandler = RouterFunctions.toHttpHandler(this.transportProvider.getRouterFunction());
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
this.httpServer = HttpServer.create().port(0).handle(adapter).bindNow();
this.httpServer = HttpServer.create().port(0).host("0.0.0.0").handle(adapter).bindNow();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private McpStreamableServerTransportProvider createMcpTransportProvider() {

HttpHandler httpHandler = RouterFunctions.toHttpHandler(transportProvider.getRouterFunction());
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
this.httpServer = HttpServer.create().port(0).handle(adapter).bindNow();
this.httpServer = HttpServer.create().port(0).host("0.0.0.0").handle(adapter).bindNow();
return transportProvider;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private McpStreamableServerTransportProvider createMcpTransportProvider() {

HttpHandler httpHandler = RouterFunctions.toHttpHandler(transportProvider.getRouterFunction());
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
this.httpServer = HttpServer.create().port(0).handle(adapter).bindNow();
this.httpServer = HttpServer.create().port(0).host("0.0.0.0").handle(adapter).bindNow();
return transportProvider;
}

Expand Down
11 changes: 2 additions & 9 deletions mcp/transport/mcp-spring-webflux/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,8 @@
</encoder>
</appender>

<!-- Main MCP package -->
<logger name="io.modelcontextprotocol" level="INFO"/>

<!-- Client packages -->
<logger name="io.modelcontextprotocol.client" level="INFO"/>

<!-- Spec package -->
<logger name="io.modelcontextprotocol.spec" level="INFO"/>

<logger name="io.modelcontextprotocol" level="DEBUG"/>
<logger name="org.springframewor.ai.mcp" level="DEBUG"/>

<!-- Root logger -->
<root level="INFO">
Expand Down
Loading