天天看点

Apache HttpClient链接如何实现复用,如何使用连接池Apache HttpClient链接如何实现复用,如何使用连接池

Apache HttpClient链接如何实现复用,如何使用连接池

注:所有getter/setter 略

/**
 * http请求配置
 *
 * @date 2019/12/18 11:06
 */
public class HttpRequestConfig {
    /**
     * 从连接池中获取链接的超时时间,单位毫秒
     * @date 2019/12/17 18:42
     **/
    private int connectionRequestTimeout = 500;
    /**
     * 客户端和服务端建立连接的超时时间,单位毫秒
     * @date 2019/12/17 18:46
     **/
    private int connectTimeout = 500;
    /**
     * 客户端从服务端读取数据的超时时间,单位毫秒
     * @date 2019/12/17 18:46
     **/
    private int socketTimeout = 1000;
}
           
/**
 * http请求处理抽象类
 * @date 2019/12/18 10:54
 */
public abstract class AbstractHttpRequestHandler {

    private static final Logger logger = LoggerFactory.getLogger(AbstractHttpRequestHandler.class);

    private HttpRequestConfig httpRequestConfig;

    public AbstractHttpRequestHandler(HttpRequestConfig httpRequestConfig) {
        this.httpRequestConfig = httpRequestConfig;
    }

    /**
     * 关闭链接
     * @date 2019/12/18 11:01
     * @throws IOException
     **/
    protected abstract void close() throws IOException;

    /**
     * 获取httpclient
     * @date 2019/12/18 14:19
     * @return org.apache.http.impl.client.CloseableHttpClient
     **/
    protected abstract CloseableHttpClient getCloseableHttpClient();

    /**
     * post请求,请求参数是json格式
     * @date 2019/12/18 10:55
     * @param url
     * @param headers
     * @param jsonParams
     * @return java.lang.String
     * @throws IOException
     **/
    public String post(String url,@Nullable Map<String, String> headers,@Nullable String jsonParams) throws IOException {
        HttpPost httpPost = new HttpPost(url);
        if (StringUtils.isNotBlank(jsonParams)) {
            httpPost.setEntity(new StringEntity(jsonParams, ContentType.APPLICATION_JSON));
        }
        return request(httpPost, headers);
    }
    /**
     * post请求,请求参数是K/V格式的form-data
     * @date 2019/12/18 10:56
     * @param url
     * @param headers
     * @param formData
     * @return java.lang.String
     * @throws IOException
     **/
    public String post(String url, @Nullable Map<String, String> headers,@Nullable Map<String, Object> formData) throws IOException {
        HttpPost httpPost = new HttpPost(url);
        if (formData != null) {
            List<NameValuePair> nvps = new ArrayList<>(formData.size());
            formData.entrySet().forEach(e -> nvps.add(new BasicNameValuePair(e.getKey(), String.valueOf(e.getValue()))));
            httpPost.setEntity(new UrlEncodedFormEntity(nvps));
        }
        return request(httpPost, headers);
    }
    /**
     * get请求,请求参数会自动拼接在url后面
     * @date 2019/12/18 10:57
     * @param url
     * @param headers
     * @param params
     * @return java.lang.String
     * @throws IOException
     **/
    public String get(String url, @Nullable Map<String, String> headers, @Nullable Map<String, Object> params) throws IOException {
        if (params != null) {
            StringBuilder sb = new StringBuilder();
            params.entrySet().forEach(e -> sb.append("&").append(e.getKey()).append("=").append(e.getValue()));
            String param = sb.toString().replaceFirst("&", "?");
            url = url + param;
        }
        return request(new HttpGet(url), headers);
    }

    private String request(HttpRequestBase httpRequestBase, Map<String, String> headers) throws IOException {
        config(httpRequestBase, headers);
        CloseableHttpResponse response = null;
        try {
            response = this.getCloseableHttpClient().execute(httpRequestBase);
            HttpEntity entity = response.getEntity();
            String result = EntityUtils.toString(entity);
            // 关闭流
            EntityUtils.consume(entity);
            return result;
        } catch (IOException e) {
            throw e;
        } finally {
            if (response != null) {
                try {
                    response.close();
                } catch (IOException e) {
                    logger.error("", e);
                }
            }
        }
    }

    private void config(HttpRequestBase httpRequestBase, Map<String, String> headers) {
        RequestConfig config = RequestConfig.custom()
                .setConnectionRequestTimeout(this.httpRequestConfig.getConnectionRequestTimeout())
                .setConnectTimeout(this.httpRequestConfig.getConnectTimeout())
                .setSocketTimeout(this.httpRequestConfig.getSocketTimeout())
                .build();
        if (headers != null) {
            headers.entrySet().forEach(e -> httpRequestBase.setHeader(e.getKey(), e.getValue()));
        }
        httpRequestBase.setConfig(config);
    }
}
           
/**
 * http连接池配置
 * @date 2019/12/17 18:22
 */
public class HttpClientPoolConfig extends HttpRequestConfig {
    /**
     * 每个host的默认最大连接数
     * @date 2019/12/17 18:23
     **/
    private int defaultMaxPerRoute = 500;
    /**
     * 连接池里的最大连接数
     * @date 2019/12/17 18:33
     **/
    private int maxTotal = 1000;
    /**
     * 链接空闲超时回收,单位毫秒
     * @date 2019/12/17 18:36
     **/
    private int idleTimeOut = 5000;
    /**
     * 链接重试次数
     * @date 2019/12/17 19:03
     **/
    private int retryCnt = 3;
    /**
     * 连接池监控间隔时长,单位毫秒
     * @date 2019/12/18 8:59
     **/
    private int monitorInterval = 2000;
}
           
/**
 * http 连接池
 *
 * @date 2019/12/17 18:52
 */
public class HttpClientPool {
    private static final Logger logger = LoggerFactory.getLogger(HttpClientPool.class);
    private HttpClientPoolConfig httpClientConfig;
    private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager;
    private CloseableHttpClient closeableHttpClient;
    private HttpClientPoolMonitorThread httpClientPoolMonitorThread;

    public HttpClientPool(HttpClientPoolConfig httpClientConfig) {
        Assert.notNull(httpClientConfig, "httpClientConfig argument must not be null");
        this.httpClientConfig = httpClientConfig;
        this.init(httpClientConfig);
    }

    private void init(HttpClientPoolConfig httpClientConfig) {
        this.poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();
        this.poolingHttpClientConnectionManager.setDefaultMaxPerRoute(httpClientConfig.getDefaultMaxPerRoute());
        this.poolingHttpClientConnectionManager.setMaxTotal(httpClientConfig.getMaxTotal());
        this.createHttpClient();
    }

    private void createHttpClient() {
        HttpRequestRetryHandler httpRequestRetryHandler = (exception, executionCount, context) -> {
            if (executionCount >= this.httpClientConfig.getRetryCnt() // 如果已经重试了3次,就放弃
                    || exception instanceof SSLHandshakeException // 不要重试SSL握手异常
                    || exception instanceof InterruptedIOException // 超时
                    || exception instanceof UnknownHostException // 目标服务器不可达
                    || exception instanceof ConnectTimeoutException // 连接被拒绝
                    || exception instanceof SSLException // SSL握手异常
            ) {
                return false;
            } else if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
                return true;
            }

            HttpClientContext clientContext = HttpClientContext.adapt(context);
            HttpRequest request = clientContext.getRequest();
            // 如果请求是幂等的,就再次尝试
            if (!(request instanceof HttpEntityEnclosingRequest)) {
                return true;
            }
            return false;
        };

        this.closeableHttpClient = HttpClients.custom().setConnectionManager(this.poolingHttpClientConnectionManager).setRetryHandler(httpRequestRetryHandler).build();
        this.httpClientPoolMonitorThread = new HttpClientPoolMonitorThread(this.poolingHttpClientConnectionManager, this.httpClientConfig);
        this.httpClientPoolMonitorThread.start();
    }

    public CloseableHttpClient getCloseableHttpClient() {
        return closeableHttpClient;
    }

    public void close() throws IOException {
        this.closeableHttpClient.close();
        this.httpClientPoolMonitorThread.shutdown();
        this.poolingHttpClientConnectionManager.close();
    }

    /**
     * 监控连接池链接状态的线程
     * @date 2019/12/18 9:09
     **/
    private final static class HttpClientPoolMonitorThread extends Thread {
        private final HttpClientConnectionManager httpClientConnectionManager;
        private final HttpClientPoolConfig httpConfig;
        private volatile boolean shutdown;
        private Object lock = new Object();

        public HttpClientPoolMonitorThread(HttpClientConnectionManager httpClientConnectionManager, HttpClientPoolConfig httpConfig) {
            this.httpClientConnectionManager = httpClientConnectionManager;
            this.httpConfig = httpConfig;
            this.shutdown = false;
        }

        @Override
        public void run() {
            try {
                while (!this.shutdown) {
                    synchronized (lock) {
                        lock.wait(this.httpConfig.getMonitorInterval());
                        // 关闭无效的连接
                        this.httpClientConnectionManager.closeExpiredConnections();
                        // 关闭空闲时间超过IDLE_ALIVE_MS的连接
                        this.httpClientConnectionManager.closeIdleConnections(this.httpConfig.getIdleTimeOut(), TimeUnit.MILLISECONDS);
                    }
                }
            } catch (InterruptedException e) {
                logger.warn("", e);
            }
        }

        public void shutdown() {
            this.shutdown = true;
            synchronized (lock) {
                lock.notifyAll();
            }
        }
    }
}

           
/**
 * 采用了连接池方式的请求处理器,链接可复用,close方法是关闭连接池
 * @date 2019/12/18 9:16
 */
public class PoolingHttpRequestHandler extends AbstractHttpRequestHandler {
    private static final Logger logger = LoggerFactory.getLogger(PoolingHttpRequestHandler.class);
    private HttpClientPoolConfig httpClientConfig;
    private HttpClientPool httpClientPool;

    public PoolingHttpRequestHandler() {
        this(new HttpClientPoolConfig());
    }

    public PoolingHttpRequestHandler(HttpClientPoolConfig httpClientConfig) {
        super(httpClientConfig);
        this.httpClientConfig = httpClientConfig;
        this.httpClientPool = new HttpClientPool(httpClientConfig);
    }

    @Override
    public void close() throws IOException {
        this.httpClientPool.close();
    }

    @Override
    protected CloseableHttpClient getCloseableHttpClient() {
        return this.httpClientPool.getCloseableHttpClient();
    }


}
           
/**
 * 连接池方式的http请求处理器的单例
 * 链接池配置采用了默认配置
 * @date 2019/12/18 10:02
 */
public class PoolingHttpRequestHandlerSingleton {
    private PoolingHttpRequestHandler httpHandler;
    private PoolingHttpRequestHandlerSingleton() {
        this.httpHandler = new PoolingHttpRequestHandler();
    }

    private static final class HttpHandlerSingletonInstance {
        private static final PoolingHttpRequestHandlerSingleton HTTP_HANDLER_SINGLETON = new PoolingHttpRequestHandlerSingleton();
    }

    public static PoolingHttpRequestHandlerSingleton getInstance() {
        return HttpHandlerSingletonInstance.HTTP_HANDLER_SINGLETON;
    }

    public PoolingHttpRequestHandler getHttpHandler() {
        return httpHandler;
    }
}