The following examples show how to use
org.springframework.http.client.reactive.ReactorClientHttpConnector
.
You can vote up the ones you like or vote down the ones you don't like,
and go to the original project or source file by following the links above each example. You may check out the related API usage on the sidebar.
public static void main(String[] args) throws JsonProcessingException {
// ObjectMapper objectMapper = new ObjectMapper();
// objectMapper.setSerializationInclusion(NON_NULL);
// InfluxDB influxdb = InfluxDBFactory.connect("http://172.29.64.250:18086");
// QueryResult result = influxdb.query(new Query("select * from health limit 1", "micrometerDb"));
// System.out.println(objectMapper.writeValueAsString(result));
// influxdb.close();
TcpClient tcpClient = TcpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8086")
.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
.filter(logRequest())
.build();
System.out.println(webClient.get().uri(uriBuilder -> uriBuilder
.path("query")
.queryParam("db", "micrometerDb")
.queryParam("q", "select * from cpu")
.build()
.exchange()
.flatMap(clientResponse -> clientResponse.toEntity(String.class))
.block().getBody());
private static WebClient.Builder createDefaultWebClient(Duration connectTimeout, Duration readTimeout) {
HttpClient httpClient = HttpClient.create()
.compress(true)
.tcpConfiguration(tcp -> tcp.bootstrap(bootstrap -> bootstrap.option(
ChannelOption.CONNECT_TIMEOUT_MILLIS,
(int) connectTimeout.toMillis()
)).observe((connection, newState) -> {
if (ConnectionObserver.State.CONNECTED.equals(newState)) {
connection.addHandlerLast(new ReadTimeoutHandler(readTimeout.toMillis(),
TimeUnit.MILLISECONDS
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
return WebClient.builder().clientConnector(connector);
private WebClient getSslIgnoringWebClient() {
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(HttpClient
.create()
.secure(t -> {
try {
t.sslContext(SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build());
catch (SSLException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("problem ignoring SSL in WebClient", e);
.build();
static ClientHttpConnector usingReactorNetty(ClientOptions options, SslConfiguration sslConfiguration) {
HttpClient client = HttpClient.create();
if (hasSslConfiguration(sslConfiguration)) {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
configureSsl(sslConfiguration, sslContextBuilder);
client = client.secure(builder -> {
builder.sslContext(sslContextBuilder);
client = client.tcpConfiguration(it -> it.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(options.getConnectionTimeout().toMillis())));
return new ReactorClientHttpConnector(client);
@Test
public void givenNettyHttpClientWithWiretap_whenEndpointIsConsumed_thenRequestAndResponseBodyLogged() {
reactor.netty.http.client.HttpClient httpClient = HttpClient
.create()
.wiretap(true);
WebClient
.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build()
.post()
.uri(sampleUrl)
.body(BodyInserters.fromObject(post))
.exchange()
.block();
verify(nettyAppender).doAppend(argThat(argument -> (((LoggingEvent) argument).getFormattedMessage()).contains("00000300")));
@Test
public void givenNettyHttpClientWithCustomLogger_whenEndpointIsConsumed_thenRequestAndResponseBodyLogged() {
reactor.netty.http.client.HttpClient httpClient = HttpClient
.create()
.tcpConfiguration(
tc -> tc.bootstrap(
b -> BootstrapHandlers.updateLogSupport(b, new CustomLogger(HttpClient.class))));
WebClient
.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build()
.post()
.uri(sampleUrl)
.body(BodyInserters.fromObject(post))
.exchange()
.block();
verify(nettyAppender).doAppend(argThat(argument -> (((LoggingEvent) argument).getFormattedMessage()).contains(sampleResponseBody)));
private ReactorClientHttpConnector getConnector() throws SSLException {
SslContext sslContext = SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
TcpClient tcpClient = TcpClient
.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, TIMEOUT_MILLIS)
.doOnConnected(connection -> {
connection.addHandlerLast(new ReadTimeoutHandler(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
connection.addHandlerLast(new WriteTimeoutHandler(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
HttpClient httpClient = HttpClient.from(tcpClient).secure(t -> t.sslContext(sslContext));
return new ReactorClientHttpConnector(httpClient);
public WebClient getWebClientFromCacheOrCreate(BackendNode node) {
WebClient client = webClientCache.get(node.getUrl());
if (client != null) {
return client;
synchronized (webClientCache) {
client = webClientCache.get(node.getUrl());
if (client != null) {
return client;
int queryTimeout=Optional.ofNullable(node.getQueryTimeout()).orElse(DEFAULT_QUERY_TIMEOUT);
int writeTimeout=Optional.ofNullable(node.getWriteTimeout()).orElse(DEFAULT_WRITE_TIMEOUT);
int timeout=Math.max(queryTimeout,writeTimeout);
TcpClient tcpClient = TcpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(timeout))
.addHandlerLast(new WriteTimeoutHandler(timeout)));
WebClient webClient = WebClient.builder()
.baseUrl(node.getUrl())
.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient).keepAlive(false)))
.filter(logRequest())
.build();
webClientCache.put(node.getUrl(), webClient);
return webClient;
private ExchangeFunction initExchangeFunction() {
if (this.exchangeFunction != null) {
return this.exchangeFunction;
else if (this.connector != null) {
return ExchangeFunctions.create(this.connector, this.exchangeStrategies);
else {
return ExchangeFunctions.create(new ReactorClientHttpConnector(), this.exchangeStrategies);
@Parameterized.Parameters(name = "webClient [{0}]")
public static Object[][] arguments() {
return new Object[][] {
{new JettyClientHttpConnector()},
{new ReactorClientHttpConnector()}
private ReactorClientHttpConnector initConnector() {
if (bufferFactory instanceof NettyDataBufferFactory) {
ByteBufAllocator allocator = ((NettyDataBufferFactory) bufferFactory).getByteBufAllocator();
return new ReactorClientHttpConnector(this.factory, httpClient ->
httpClient.tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.ALLOCATOR, allocator)));
else {
return new ReactorClientHttpConnector();
@Parameterized.Parameters(name = "server [{0}] webClient [{1}]")
public static Object[][] arguments() {
File base = new File(System.getProperty("java.io.tmpdir"));
return new Object[][] {
{new JettyHttpServer(), new ReactorClientHttpConnector()},
{new JettyHttpServer(), new JettyClientHttpConnector()},
{new ReactorHttpServer(), new ReactorClientHttpConnector()},
{new ReactorHttpServer(), new JettyClientHttpConnector()},
{new TomcatHttpServer(base.getAbsolutePath()), new ReactorClientHttpConnector()},
{new TomcatHttpServer(base.getAbsolutePath()), new JettyClientHttpConnector()},
{new UndertowHttpServer(), new ReactorClientHttpConnector()},
{new UndertowHttpServer(), new JettyClientHttpConnector()}
@Bean
@ConditionalOnProperty(name = "cf.sslValidationSkipped", havingValue="true")
public WebClient insecureWebClient(WebClient.Builder builder) throws SSLException {
SslContext sslContext =
SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
TcpClient tcpClient = TcpClient.create().secure(sslProviderBuilder -> sslProviderBuilder.sslContext(sslContext));
HttpClient httpClient = HttpClient.from(tcpClient);
return builder
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
private ExchangeFunction initExchangeFunction() {
if (this.exchangeFunction != null) {
return this.exchangeFunction;
else if (this.connector != null) {
return ExchangeFunctions.create(this.connector, this.exchangeStrategies);
else {
return ExchangeFunctions.create(new ReactorClientHttpConnector(), this.exchangeStrategies);
@Parameterized.Parameters(name = "webClient [{0}]")
public static Object[][] arguments() {
return new Object[][] {
{new JettyClientHttpConnector()},
{new ReactorClientHttpConnector()}
private ReactorClientHttpConnector initConnector() {
if (bufferFactory instanceof NettyDataBufferFactory) {
ByteBufAllocator allocator = ((NettyDataBufferFactory) bufferFactory).getByteBufAllocator();
return new ReactorClientHttpConnector(this.factory, httpClient ->
httpClient.tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.ALLOCATOR, allocator)));
else {
return new ReactorClientHttpConnector();
@Parameterized.Parameters(name = "server [{0}] webClient [{1}]")
public static Object[][] arguments() {
File base = new File(System.getProperty("java.io.tmpdir"));
return new Object[][] {
{new JettyHttpServer(), new ReactorClientHttpConnector()},
{new JettyHttpServer(), new JettyClientHttpConnector()},
{new ReactorHttpServer(), new ReactorClientHttpConnector()},
{new ReactorHttpServer(), new JettyClientHttpConnector()},
{new TomcatHttpServer(base.getAbsolutePath()), new ReactorClientHttpConnector()},
{new TomcatHttpServer(base.getAbsolutePath()), new JettyClientHttpConnector()},
{new UndertowHttpServer(), new ReactorClientHttpConnector()},
{new UndertowHttpServer(), new JettyClientHttpConnector()}
@Bean
public SoulPlugin webClientPlugin(final ObjectProvider<HttpClient> httpClient) {
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(Objects.requireNonNull(httpClient.getIfAvailable())))
.build();
return new WebClientPlugin(webClient);
public static final void main(String[] args) throws IOException {
WebClient client = WebClient.builder()
//see: https://github.com/jetty-project/jetty-reactive-httpclient
//.clientConnector(new JettyClientHttpConnector())
.clientConnector(new ReactorClientHttpConnector())
.codecs(
clientCodecConfigurer ->{
// use defaultCodecs() to apply DefaultCodecs
// clientCodecConfigurer.defaultCodecs();
// alter a registered encoder/decoder based on the default config.
// clientCodecConfigurer.defaultCodecs().jackson2Encoder(...)
// Or
// use customCodecs to register Codecs from scratch.
clientCodecConfigurer.customCodecs().register(new Jackson2JsonDecoder());
clientCodecConfigurer.customCodecs().register(new Jackson2JsonEncoder());
.exchangeStrategies(ExchangeStrategies.withDefaults())
// .exchangeFunction(ExchangeFunctions.create(new ReactorClientHttpConnector())
// .filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {})))
// .filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {clientRequest.}))
//.defaultHeaders(httpHeaders -> httpHeaders.addAll())
.baseUrl("http://localhost:8080")
.build();
client
.get()
.uri("/posts")
.exchange()
.flatMapMany(res -> res.bodyToFlux(Post.class))
.log()
.subscribe(post -> System.out.println("post: " + post));
System.out.println("Client is started!");
System.in.read();
* Create a {@link ClientHttpConnector} for the given {@link ClientOptions}.
* @param options must not be {@literal null}
* @return a new {@link ClientHttpConnector}.
public static ClientHttpConnector create(ClientOptions options) {
HttpClient httpClient = HttpClient.create();
if (usingCustomCerts(options)) {
TrustManagerFactory trustManagerFactory = sslCertificateUtils
.createTrustManagerFactory(options.getCaCertFiles());
httpClient = httpClient.secure((sslContextSpec) -> sslContextSpec.sslContext(
SslContextBuilder.forClient().sslProvider(SslProvider.JDK).trustManager(trustManagerFactory)));
else {
httpClient = httpClient.secure((sslContextSpec) -> {
try {
sslContextSpec.sslContext(new JdkSslContext(SSLContext.getDefault(), true, null,
IdentityCipherSuiteFilter.INSTANCE, null, ClientAuth.REQUIRE, null, false));
catch (NoSuchAlgorithmException ex) {
logger.error("Error configuring HTTP connections", ex);
throw new RuntimeException("Error configuring HTTP connections", ex);
if (options.getConnectionTimeout() != null) {
httpClient = httpClient
.tcpConfiguration((tcpClient) -> tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(options.getConnectionTimeout().toMillis())));
return new ReactorClientHttpConnector(httpClient);
public static WebClient.Builder addTitusDefaults(WebClient.Builder clientBuilder,
HttpClient httpClient,
WebClientMetric webClientMetric) {
HttpClient updatedHttpClient = addMetricCallbacks(
addLoggingCallbacks(httpClient),
webClientMetric
return clientBuilder.clientConnector(new ReactorClientHttpConnector(updatedHttpClient));
@Override
public ClientHttpConnector createConnector(TimeoutConfiguration configuration) {
return new ReactorClientHttpConnector(httpClient.tcpConfiguration(client ->
client.option(CONNECT_TIMEOUT_MILLIS, toMillis(configuration.getConnection()))
.doOnConnected(connection -> connection
.addHandlerLast(new ReadTimeoutHandler(configuration.getRead().toMillis(), MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(configuration.getWrite().toMillis(), MILLISECONDS)))));
public void setup() throws Exception {
try {
SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
HttpClient httpClient = HttpClient.create()
.secure(ssl -> ssl.sslContext(sslContext));
setup(new ReactorClientHttpConnector(httpClient),
"https://localhost:" + port);
catch (SSLException e) {
throw new RuntimeException(e);
try {
SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
HttpClient httpClient = HttpClient.create()
.secure(ssl -> ssl.sslContext(sslContext));
setup(new ReactorClientHttpConnector(httpClient),
"https://localhost:" + port);
catch (SSLException e) {
throw new RuntimeException(e);
* Normally, the HTTP connector would be statically initialized. This ensures the
* {@link HttpClient} is configured for the mock endpoint.
@Bean
@Order(0)
public WebClientCustomizer clientConnectorCustomizer(HttpClient httpClient,
URI baseUrl) {
return (builder) -> builder.baseUrl(baseUrl.toString())
.clientConnector(new ReactorClientHttpConnector(httpClient));
private ReactorClientHttpConnector getConnector() throws SSLException {
SslContext sslContext = SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
HttpClient httpClient = HttpClient.create().secure(t -> t.sslContext(sslContext));
return new ReactorClientHttpConnector(httpClient);
/** Connect to server via Reactor Netty. */
DefaultWebTestClientBuilder() {
this(new ReactorClientHttpConnector());
/** Connect to server via Reactor Netty. */
DefaultWebTestClientBuilder() {
this(new ReactorClientHttpConnector());
@Bean
public WebClient webClientWithNetty(reactor.netty.http.client.HttpClient httpClient) {
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
@Before
public void setup() throws Exception {
setup(new ReactorClientHttpConnector(), "http://localhost:" + port);