SpringCloud gateway自定义请求的 httpClient


SpringCloud gateway自定义请求的 httpClient

文章插图
 
SpringCloud gateway 在实现服务路由并请求的具体过程是在 org.springframework.cloud.gateway.filter.NETtyRoutingFilter 的过滤器中,该过滤器封装了具体的请求参数,以及根据路由规则请求的对应服务,然后根据 HttpClient 进行微服务之间的请求; 该 httpClient 类是 用netty 封装的 客户端,其包路径为 : reactor.netty.http.client.HttpClient ;
查看 NettyRoutingFilter 中的 filter 实现过程:
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if (!ServerWebExchangeUtils.isAlreadyRouted(exchange) && ("http".equals(scheme) || "https".equals(scheme))) {ServerWebExchangeUtils.setAlreadyRouted(exchange);ServerHttpRequest request = exchange.getRequest();HttpMethod method = HttpMethod.valueOf(request.getMethodValue());String url = requestUrl.toASCIIString();HttpHeaders filtered = HttpHeadersFilter.filterRequest(this.getHeadersFilters(), exchange);DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();filtered.forEach(httpHeaders::set);boolean preserveHost = (Boolean)exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);Flux<HttpClientResponse> responseFlux = ((RequestSender)this.getHttpClient(route, exchange).headers((headers) -> {headers.add(httpHeaders);headers.remove("Host");if (preserveHost) {String host = request.getHeaders().getFirst("Host");headers.add("Host", host);}}).request(method).uri(url)).send((req, nettyOutbound) -> {if (log.isTraceEnabled()) {nettyOutbound.withConnection((connection) -> {log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix());});}return nettyOutbound.send(request.getBody().map(this::getByteBuf));}).responseConnection((res, connection) -> {exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR, res);exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR, connection);ServerHttpResponse response = exchange.getResponse();HttpHeaders headers = new HttpHeaders();res.responseHeaders().forEach((entry) -> {headers.add((String)entry.getKey(), (String)entry.getValue());});String contentTypeValue = https://www.isolves.com/it/cxkf/jiagou/2022-07-30/headers.getFirst("Content-Type");if (StringUtils.hasLength(contentTypeValue)) {exchange.getAttributes().put("original_response_content_type", contentTypeValue);}this.setResponseStatus(res, response);HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(this.getHeadersFilters(), headers, exchange, Type.RESPONSE);if (!filteredResponseHeaders.containsKey("Transfer-Encoding") && filteredResponseHeaders.containsKey("Content-Length")) {response.getHeaders().remove("Transfer-Encoding");}exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());response.getHeaders().putAll(filteredResponseHeaders);return Mono.just(res);});Duration responseTimeout = this.getResponseTimeout(route);if (responseTimeout != null) {responseFlux = responseFlux.timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))).onErrorMap(TimeoutException.class, (th) -> {return new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th);});}return responseFlux.then(chain.filter(exchange));} else {return chain.filter(exchange);}} 
该方法中 有一个 getHttpClient 方法获取 httpClient 客户端实例的过程,由于在 GatewayAutoConfiguration 中 定义了 springCloud gateway 使用的 httpclient 实例,其声明并自动加载的代码如下:
@Configuration(proxyBeanMethods = false)@ConditionalOnClass({HttpClient.class})protected static class NettyConfiguration {protected final Log logger = LogFactory.getLog(this.getClass());protected NettyConfiguration() {}@Bean@ConditionalOnProperty(name = {"spring.cloud.gateway.httpserver.wiretap"})public NettyWebServerFactoryCustomizer nettyServerWiretapCustomizer(Environment environment, ServerProperties serverProperties) {return new NettyWebServerFactoryCustomizer(environment, serverProperties) {public void customize(NettyReactiveWebServerFactory factory) {factory.addServerCustomizers(new NettyServerCustomizer[]{(httpServer) -> {return httpServer.wiretap(true);}});super.customize(factory);}};}@Bean@ConditionalOnMissingBeanpublic HttpClient gatewayHttpClient(HttpClientProperties properties, List<HttpClientCustomizer> customizers) {Pool pool = properties.getPool();ConnectionProvider connectionProvider;if (pool.getType() == PoolType.DISABLED) {connectionProvider = ConnectionProvider.newConnection();} else if (pool.getType() == PoolType.FIXED) {connectionProvider = ConnectionProvider.fixed(pool.getName(), pool.getMaxConnections(), pool.getAcquireTimeout(), pool.getMaxIdleTime(), pool.getMaxLifeTime());} else {connectionProvider = ConnectionProvider.elastic(pool.getName(), pool.getMaxIdleTime(), pool.getMaxLifeTime());}HttpClient httpClient = HttpClient.create(connectionProvider).httpResponseDecoder((spec) -> {if (properties.getMaxHeaderSize() != null) {spec.maxHeaderSize((int)properties.getMaxHeaderSize().toBytes());}if (properties.getMaxInitialLineLength() != null) {spec.maxInitialLineLength((int)properties.getMaxInitialLineLength().toBytes());}return spec;}).tcpConfiguration((tcpClient) -> {if (properties.getConnectTimeout() != null) {tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, properties.getConnectTimeout());}Proxy proxy = properties.getProxy();if (StringUtils.hasText(proxy.getHost())) {tcpClient = tcpClient.proxy((proxySpec) -> {Builder builder = proxySpec.type(reactor.netty.tcp.ProxyProvider.Proxy.HTTP).host(proxy.getHost());PropertyMApper map = PropertyMapper.get();proxy.getClass();map.from(proxy::getPort).whenNonNull().to(builder::port);proxy.getClass();map.from(proxy::getUsername).whenHasText().to(builder::username);proxy.getClass();map.from(proxy::getPassword).whenHasText().to((password) -> {builder.password((s) -> {return password;});});proxy.getClass();map.from(proxy::getNonProxyHostsPattern).whenHasText().to(builder::nonProxyHosts);});}return tcpClient;});Ssl ssl = properties.getSsl();if (ssl.getKeyStore() != null && ssl.getKeyStore().length() > 0 || ssl.getTrustedX509CertificatesForTrustManager().length > 0 || ssl.isUseInsecureTrustManager()) {httpClient = httpClient.secure((sslContextSpec) -> {SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();X509Certificate[] trustedX509Certificates = ssl.getTrustedX509CertificatesForTrustManager();if (trustedX509Certificates.length > 0) {sslContextBuilder = sslContextBuilder.trustManager(trustedX509Certificates);} else if (ssl.isUseInsecureTrustManager()) {sslContextBuilder = sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);}try {sslContextBuilder = sslContextBuilder.keyManager(ssl.getKeyManagerFactory());} catch (Exception var6) {this.logger.error(var6);}sslContextSpec.sslContext(sslContextBuilder).defaultConfiguration(ssl.getDefaultConfigurationType()).handshakeTimeout(ssl.getHandshakeTimeout()).closeNotifyFlushTimeout(ssl.getCloseNotifyFlushTimeout()).closeNotifyReadTimeout(ssl.getCloseNotifyReadTimeout());});}if (properties.isWiretap()) {httpClient = httpClient.wiretap(true);}if (!CollectionUtils.isEmpty(customizers)) {customizers.sort(AnnotationAwareOrderComparator.INSTANCE);HttpClientCustomizer customizer;for(Iterator var7 = customizers.iterator(); var7.hasNext(); httpClient = customizer.customize(httpClient)) {customizer = (HttpClientCustomizer)var7.next();}}return httpClient;}@Beanpublic HttpClientProperties httpClientProperties() {return new HttpClientProperties();}@Beanpublic NettyRoutingFilter routingFilter(HttpClient httpClient, ObjectProvider<List<HttpHeadersFilter>> headersFilters, HttpClientProperties properties) {return new NettyRoutingFilter(httpClient, headersFilters, properties);}@Beanpublic NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties) {return new NettyWriteResponseFilter(properties.getStreamingMediaTypes());}@Beanpublic ReactorNettyWebSocketClient reactorNettyWebSocketClient(HttpClientProperties properties, HttpClient httpClient) {ReactorNettyWebSocketClient webSocketClient = new ReactorNettyWebSocketClient(httpClient);if (properties.getWebsocket().getMaxFramePayloadLength() != null) {webSocketClient.setMaxFramePayloadLength(properties.getWebsocket().getMaxFramePayloadLength());}webSocketClient.setHandlePing(properties.getWebsocket().isProxyPing());return webSocketClient;}@Beanpublic ReactorNettyRequestUpgradeStrategy reactorNettyRequestUpgradeStrategy(HttpClientProperties httpClientProperties) {ReactorNettyRequestUpgradeStrategy requestUpgradeStrategy = new ReactorNettyRequestUpgradeStrategy();Websocket websocket = httpClientProperties.getWebsocket();PropertyMapper map = PropertyMapper.get();websocket.getClass();map.from(websocket::getMaxFramePayloadLength).whenNonNull().to(requestUpgradeStrategy::setMaxFramePayloadLength);websocket.getClass();map.from(websocket::isProxyPing).to(requestUpgradeStrategy::setHandlePing);return requestUpgradeStrategy;}}


推荐阅读