Analysis of problems caused by improper use of resttemplate

background

Problem phenomenon

Analysis process

These three problems are interrelated. The fuse is the third problem, then leads to the second problem, and finally leads to the third problem; Brief description of the reason: the third problem is that the system does not hang system B under the nginx load, resulting in a 502 error when requesting the external system, but a does not correctly handle the exception, resulting in the HTTP request cannot be closed normally. By default, springboot opens opensessioninview. The database connection will be closed only when the request calling a is closed, but the request calling a is not closed at this time, The database connection was not closed.

Here we mainly analyze the first question: why does 504 timeout occur when requesting a connection

AbstractConnPool

Through the log, you can see that a blocks when calling B until timeout. Print out the thread stack. View:

Thread blocking is in getpoolentryblocking method of abstractconnpool class

    private E getPoolEntryBlocking(
            final T route,final Object state,final long timeout,final TimeUnit timeUnit,final Future<E> future) throws IOException,InterruptedException,TimeoutException {

        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
        }
        this.lock.lock();
        try {
           //根据route获取route对应的连接池
            final RouteSpecificPool<T,C,E> pool = getPool(route);
            E entry;
            for (;;) {
                Asserts.check(!this.isShutDown,"Connection pool shut down");
                for (;;) {
                   //获取可用的连接
                    entry = pool.getFree(state);
                    if (entry == null) {
                        break;
                    }
                    // 判断连接是否过期,如过期则关闭并从可用连接集合中删除
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry,false);
                    } else {
                        break;
                    }
                }
               // 如果从连接池中获取到可用连接,更新可用连接和待释放连接集合
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }

                // 如果没有可用连接,则创建新连接
                final int maxPerRoute = getMax(route);
                // 创建新连接之前,检查是否超过每个route连接池大小,如果超过,则删除可用连接集合相应数量的连接(从总的可用连接集合和每个route的可用连接集合中删除)
                final int excess = Math.max(0,pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }

                if (pool.getAllocatedCount() < maxPerRoute) {
                   //比较总的可用连接数量与总的可用连接集合大小,释放多余的连接资源
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed,0);
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T,E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                       // 真正创建连接的地方
                        final C conn = this.connFactory.create(route);
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }

               //如果已经超过了每个route的连接池大小,则加入队列等待有可用连接时被唤醒或直到某个终止时间
                boolean success = false;
                try {
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                    pool.queue(future);
                    this.pending.add(future);
                    if (deadline != null) {
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                } finally {
                    //如果到了终止时间或有被唤醒时,则出队,加入下次循环
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // 处理异常唤醒和超时情况
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }

The getpoolentryblocking method is used to obtain the connection. There are three main steps: (1) Check whether there are reusable connections in the available connection collection. If so, get the connection and return (2) When creating a new connection, note that it is also necessary to check whether the available connection set (divided into each route and global) has redundant connection resources. If so, it needs to be released. (3) Join the queue and wait;

As can be seen from the thread stack, the first problem is due to step 3. At the beginning, 504 exceptions are sometimes reported. After multiple refreshes, 504 exceptions are always reported. After tracking and debugging, it is found that the connection will be successfully obtained several times, and the subsequent requests will be blocked when the connection pool is full. Under normal circumstances, after the previous connection is released to the connection pool, the subsequent requests will continue to be executed by the connection resources. However, the reality is that the subsequent connection has been waiting. I guess it may be because the connection has not been released.

Let's see when the connection will be released.

RestTemplate

Since the getforobject method of resttemplate is used when calling external system B, start tracking and debugging from here.

	@Override
	public <T> T getForObject(String url,Class<T> responseType,Object... uriVariables) throws RestClientException {
		RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
		HttpMessageConverterExtractor<T> responseExtractor =
				new HttpMessageConverterExtractor<T>(responseType,getMessageConverters(),logger);
		return execute(url,HttpMethod.GET,requestCallback,responseExtractor,uriVariables);
	}

	@Override
	public <T> T getForObject(String url,Map<String,?> uriVariables) throws RestClientException {
		RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
		HttpMessageConverterExtractor<T> responseExtractor =
				new HttpMessageConverterExtractor<T>(responseType,uriVariables);
	}

	@Override
	public <T> T getForObject(URI url,Class<T> responseType) throws RestClientException {
		RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
		HttpMessageConverterExtractor<T> responseExtractor =
				new HttpMessageConverterExtractor<T>(responseType,responseExtractor);
	}

Getforobject calls the execute method (in fact, other HTTP request methods of resttemplate also call the execute method)

	@Override
	public <T> T execute(String url,HttpMethod method,RequestCallback requestCallback,ResponseExtractor<T> responseExtractor,Object... uriVariables) throws RestClientException {

		URI expanded = getUriTemplateHandler().expand(url,uriVariables);
		return doExecute(expanded,method,responseExtractor);
	}

	@Override
	public <T> T execute(String url,?> uriVariables) throws RestClientException {

		URI expanded = getUriTemplateHandler().expand(url,responseExtractor);
	}

	@Override
	public <T> T execute(URI url,ResponseExtractor<T> responseExtractor) throws RestClientException {

		return doExecute(url,responseExtractor);
	}

All execute methods call the same doexecute method

	protected <T> T doExecute(URI url,ResponseExtractor<T> responseExtractor) throws RestClientException {

		Assert.notNull(url,"'url' must not be null");
		Assert.notNull(method,"'method' must not be null");
		ClientHttpResponse response = null;
		try {
			ClientHttpRequest request = createRequest(url,method);
			if (requestCallback != null) {
				requestCallback.doWithRequest(request);
			}
			response = request.execute();
			handleResponse(url,response);
			if (responseExtractor != null) {
				return responseExtractor.extractData(response);
			}
			else {
				return null;
			}
		}
		catch (IOException ex) {
			String resource = url.toString();
			String query = url.getRawQuery();
			resource = (query != null ? resource.substring(0,resource.indexOf('?')) : resource);
			throw new ResourceAccessException("I/O error on " + method.name() +
					" request for \"" + resource + "\": " + ex.getMessage(),ex);
		}
		finally {
			if (response != null) {
				response.close();
			}
		}
	}

The doexecute method creates the request, then executes, handles exceptions, and finally closes. You can see that the close operation is placed in finally and will be executed in any case unless the returned response is null.

InterceptingClientHttpRequest

Enter request In the execute () method, the corresponding abstract class org springframework. http. client. Execute method of abstractclienthttprequest

	@Override
	public final ClientHttpResponse execute() throws IOException {
		assertNotExecuted();
		ClientHttpResponse result = executeInternal(this.headers);
		this.executed = true;
		return result;
	}

Call the internal method executeinternal. The executeinternal method is an abstract method implemented by subclasses (there are many ways to implement HTTP calls within resttemplate). Enter the executeinternal method and reach the abstract class org springframework. http. client. Abstractbufferingclienthttprequest

	protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
		byte[] bytes = this.bufferedOutput.toByteArray();
		if (headers.getContentLength() < 0) {
			headers.setContentLength(bytes.length);
		}
		ClientHttpResponse result = executeInternal(headers,bytes);
		this.bufferedOutput = null;
		return result;
	}

Slow charging requests body data and calls the internal method executeinternal

ClientHttpResponse result = executeInternal(headers,bytes);

Calling another executeInternal method in the executeInternal method is also an abstract method.

Enter the executeinternal method, which is provided by org springframework. http. client. Subclass of abstractbufferingclienthttprequest org springframework. http. client. Interceptingclienthttprequest implementation

	protected final ClientHttpResponse executeInternal(HttpHeaders headers,byte[] bufferedOutput) throws IOException {
		InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
		return requestExecution.execute(this,bufferedOutput);
	}

Instantiate a request execution object with interceptor interceptingrequestexecution

		public ClientHttpResponse execute(HttpRequest request,final byte[] body) throws IOException {
              // 如果有拦截器,则执行拦截器并返回结果
			if (this.iterator.hasNext()) {
				ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
				return nextInterceptor.intercept(request,body,this);
			}
			else {
               // 如果没有拦截器,则通过requestFactory创建request对象并执行
				ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(),request.getmethod());
				for (Map.Entry<String,List<String>> entry : request.getHeaders().entrySet()) {
					List<String> values = entry.getValue();
					for (String value : values) {
						delegate.getHeaders().add(entry.getKey(),value);
					}
				}
				if (body.length > 0) {
					if (delegate instanceof StreamingHttpOutputMessage) {
						StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
						streamingOutputMessage.setBody(new StreamingHttpOutputMessage.Body() {
							@Override
							public void writeTo(final OutputStream outputStream) throws IOException {
								StreamUtils.copy(body,outputStream);
							}
						});
					 }
					else {
						StreamUtils.copy(body,delegate.getBody());
					}
				}
				return delegate.execute();
			}
		}

The execute method of interceptclienthttprequest executes the interceptor first, and then the real request object (what is the real request object? See the design of interceptor later).

Take a look at the configuration of resttemplate:

        RestTemplateBuilder builder = new RestTemplateBuilder();
        return builder
                .setConnectTimeout(customConfig.getRest().getConnectTimeOut())
                .setReadTimeout(customConfig.getRest().getReadTimeout())
                .interceptors(restTemplateLogInterceptor)
                .errorHandler(new ThrowErrorHandler())
                .build();
    }

You can see that connection timeout, read timeout, interceptor, and error handler are configured. Take a look at the implementation of the Interceptor:

    public ClientHttpResponse intercept(HttpRequest httpRequest,byte[] bytes,ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
        // 打印访问前日志
        ClientHttpResponse execute = clientHttpRequestExecution.execute(httpRequest,bytes);
        if (如果返回码不是200) {
            // 抛出自定义运行时异常
        }
        // 打印访问后日志
        return execute;
    }

You can see that an exception is thrown when the return code is not 200. Remember the doexecute method in the resttemplate. If an exception is thrown here, although the finally code in the doexecute method will be executed, because the returned response is null (actually there is a response), the response is not closed, so an exception cannot be thrown here. If you really want to throw an exception, you can throw it in the error handler, This ensures that the response can return and close normally.

Resttemplate source code analysis

How to decide which underlying HTTP framework to use

Knowing the reason, let's take a look at when resttemplate decides what HTTP framework to use. In fact, it is determined when the resttemplate object is instantiated through resttemplatebuilder. Take a look at the build method of resttemplatebuilder

	public RestTemplate build() {
		return build(RestTemplate.class);
	}
	public <T extends RestTemplate> T build(Class<T> restTemplateClass) {
		return configure(BeanUtils.instantiate(restTemplateClass));
	}

You can see that after instantiating the resttemplate object, configure it. Requestfactory can be specified or detected automatically

	public <T extends RestTemplate> T configure(T restTemplate) {
               // 配置requestFactory
		configureRequestFactory(restTemplate);
        .....省略其它无关代码
	}


	private void configureRequestFactory(RestTemplate restTemplate) {
		ClientHttpRequestFactory requestFactory = null;
		if (this.requestFactory != null) {
			requestFactory = this.requestFactory;
		}
		else if (this.detectRequestFactory) {
			requestFactory = detectRequestFactory();
		}
		if (requestFactory != null) {
			ClientHttpRequestFactory unwrappedRequestFactory = unwrapRequestFactoryIfNecessary(
					requestFactory);
			for (RequestFactoryCustomizer customizer : this.requestFactoryCustomizers) {
				customizer.customize(unwrappedRequestFactory);
			}
			restTemplate.setRequestFactory(requestFactory);
		}
	}

Take a look at the detectrequestfactory method

	private ClientHttpRequestFactory detectRequestFactory() {
		for (Map.Entry<String,String> candidate : REQUEST_FACTORY_CANDIDATES
				.entrySet()) {
			ClassLoader classLoader = getClass().getClassLoader();
			if (ClassUtils.isPresent(candidate.getKey(),classLoader)) {
				Class<?> factoryClass = ClassUtils.resolveClassName(candidate.getValue(),classLoader);
				ClientHttpRequestFactory requestFactory = (ClientHttpRequestFactory) BeanUtils
						.instantiate(factoryClass);
				initializeIfNecessary(requestFactory);
				return requestFactory;
			}
		}
		return new SimpleClientHttpRequestFactory();
	}

Circular request_ FACTORY_ Candidates collection, check whether the corresponding jar package exists in the classpath class path, and if so, create the encapsulated class object of the corresponding framework. If none exists, the requestfactory object implemented in JDK mode is returned.

Take a look at request_ FACTORY_ Candidates set

	private static final Map<String,String> REQUEST_FACTORY_CANDIDATES;

	static {
		Map<String,String> candidates = new LinkedHashMap<String,String>();
		candidates.put("org.apache.http.client.HttpClient","org.springframework.http.client.HttpComponentsClientHttpRequestFactory");
		candidates.put("okhttp3.OkHttpClient","org.springframework.http.client.OkHttp3ClientHttpRequestFactory");
		candidates.put("com.squareup.okhttp.OkHttpClient","org.springframework.http.client.OkHttpClientHttpRequestFactory");
		candidates.put("io.netty.channel.EventLoopGroup","org.springframework.http.client.Netty4ClientHttpRequestFactory");
		REQUEST_FACTORY_CANDIDATES = Collections.unmodifiableMap(candidates);
	}

You can see that there are four implementation methods of HTTP call, which can be specified when configuring resttemplate, and the corresponding implementation jar package is provided in the classpath.

Design of request interceptor

Take another look at the execute method of the interceptingrequestexecution class.

  public ClientHttpResponse execute(HttpRequest request,final byte[] body) throws IOException {
        // 如果有拦截器,则执行拦截器并返回结果
	  if (this.iterator.hasNext()) {
			ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
			return nextInterceptor.intercept(request,this);
		}
		else {
         // 如果没有拦截器,则通过requestFactory创建request对象并执行
		    ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(),request.getmethod());
			for (Map.Entry<String,List<String>> entry : request.getHeaders().entrySet()) {
			    List<String> values = entry.getValue();
				for (String value : values) {
					delegate.getHeaders().add(entry.getKey(),value);
				}
			}
			if (body.length > 0) {
				if (delegate instanceof StreamingHttpOutputMessage) {
					StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
					streamingOutputMessage.setBody(new StreamingHttpOutputMessage.Body() {
						@Override
						public void writeTo(final OutputStream outputStream) throws IOException {
							StreamUtils.copy(body,outputStream);
						}
					});
			   }
			   else {
				StreamUtils.copy(body,delegate.getBody());
			   }
			}
			return delegate.execute();
		}
   }

You may have questions. The incoming object is already a request object. Why do you create a request object again when there is no interceptor? In fact, when there is an interceptor, the incoming request object is the interceptingclienthttprequest object. When there is no interceptor, it directly wraps the request of each HTTP call implementation box. Such as httpcomponentsclienthttprequest, okhttp3clienthttpprequest, etc. When there are interceptors, the interceptors will be executed. There can be multiple interceptors, and here this iterator. Hasnext () is not a loop. Why? The secret lies in the interceptor's intercept method.

ClientHttpResponse intercept(HttpRequest request,byte[] body,ClientHttpRequestExecution execution)
      throws IOException;

This method contains request and execution. The execution type is clienthttprequexexecution interface, which is implemented by interceptingrequestexecution above. In this way, when calling the interceptor, the execution object itself is passed in, and then the execute method is called again to judge whether there is still an interceptor. If so, the next interceptor is executed. After all interceptors are executed, the real request object is generated, Perform an HTTP call.

What if there's no interceptor? As we know above, resttemplate instantiates requestfactory when instantiating. When an HTTP request is initiated, the doexecute method of resttemplate will be executed. In this method, the request will be created, and in the createrequest method, the requestfactory will be obtained first

// org.springframework.http.client.support.HttpAccessor
protected ClientHttpRequest createRequest(URI url,HttpMethod method) throws IOException {
   ClientHttpRequest request = getRequestFactory().createRequest(url,method);
   if (logger.isDebugEnabled()) {
      logger.debug("Created " + method.name() + " request for \"" + url + "\"");
   }
   return request;
}


// org.springframework.http.client.support.InterceptingHttpAccessor
public ClientHttpRequestFactory getRequestFactory() {
   ClientHttpRequestFactory delegate = super.getRequestFactory();
   if (!CollectionUtils.isEmpty(getInterceptors())) {
      return new InterceptingClientHttpRequestFactory(delegate,getInterceptors());
   }
   else {
      return delegate;
   }
}

Take a look at the relationship between resttemplate and these two classes to know the calling relationship.

Connection acquisition logic flowchart

Flow chart Description:

Conclusion

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>