天天看點

FTPClient自定義連接配接池實作

最新在工作當中使用到了

ftp

伺服器,我是使用

org.apache.commons.net.ftp.FTPClient

做的連接配接,但是在測試調用的時候經常報

斷開的管道

錯誤,就此問題我做了一些分析和總結,是因為在調用的時候我隻

new

了一個對象,而使用了多線程進行調用,也就是同一個對象進行了多次調用,而每次調用都有調

disconnect()

方法,所有再調用的時候就報了

斷開的管道

錯誤。
  • 針對上面的問題,我首先想到了使用連接配接池來避免這種底層的錯誤,而

    FTPClient

    沒有提供連接配接池,就此我就手工寫了一個連接配接池。
  • 下面是實作代碼,總共用三個類,在實際使用的時候需要根據業務場景做一些輕微的修改。
    • FTPClientConnectPool

      :FTPClinet 連接配接池抽象類
    • FTPClientConnectPoolImpl

      :FTPClient 連接配接池具體實作類
    • ConnectTest

      :連接配接測試類
package com.jin.demo.lock;

import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;

import java.io.IOException;
import java.util.LinkedList;

/**
 * FTPClinet 連接配接池抽象類
 *
 * @auther jinsx
 * @date 2019-01-02 10:24
 */
public abstract class FTPClientConnectPool {
    /**
     * 預設最小連接配接數
     */
    protected int initCount = 3;
    /**
     * 預設最大連接配接數
     */
    protected int initMaxCount = 5;
    /**
     * 目前正在使用的連接配接數
     */
    protected int connectCount = 0;
    /**
     * 擷取連接配接預設等待逾時時間
     */
    protected Long waitTime = 10000L;
    /**
     * 擷取鎖預設等待逾時時間,防止死鎖
     */
    protected Long waitTimeLock = 30000L;
    /**
     * hostname
     */
    protected String hostname = "192.168.0.100";
    /**
     * port
     */
    protected int port = 21;
    /**
     * username
     */
    protected String username = "jin01";
    /**
     * password
     */
    protected String password = "123456";
    /**
     * ftp預設工作目錄
     */
    protected String path = "/";
    /**
     * 定義存放FTPClient的連接配接池
     */
    protected LinkedList<FTPClient> pool = new LinkedList<>();


    /**
     * 擷取FTPClient連接配接對象
     *
     * @return
     */
    protected abstract FTPClient getFTPClient();

    /**
     * 釋放FTPClient連接配接對象
     *
     * @param ftp
     */
    protected abstract void releaseFTPClient(FTPClient ftp);

    /**
     * 初始化FTPClient連接配接對象
     *
     * @return
     */
    protected FTPClient initFTPClient() {
        FTPClient ftp = new FTPClient();
        try {
            ftp.connect(hostname, port);
            if(!FTPReply.isPositiveCompletion(ftp.getReplyCode())){
                ftp.disconnect();
                System.out.println("FTPServer refused connection");
            }
            boolean login = ftp.login(username, password);
            if(!login){
                ftp.disconnect();
                System.out.println("FTPServer login failed");
            }
            ftp.setControlEncoding("UTF-8");
            ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
            ftp.changeWorkingDirectory(path);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("initFTPClient result : " + ftp.getReplyString());
        return ftp;
    }

    /**
     * 銷毀FTPClient連接配接對象
     *
     * @param ftp
     */
    protected void destroyFTPClient(FTPClient ftp) {
        try {
            if (ftp != null && ftp.isConnected()) {
                ftp.logout();
            }
        } catch (IOException io) {
            io.printStackTrace();
        } finally {
            // 注意,一定要在finally代碼中斷開連接配接,否則會導緻占用ftp連接配接情況
            try {
                if (ftp != null) {
                    ftp.disconnect();
                }
            } catch (IOException io) {
                io.printStackTrace();
            }
        }
    }

    /**
     * 驗證FTPClient是否可用
     * @param ftp
     * @return
     */
    protected boolean validateFTPClient(FTPClient ftp) {
        try {
            return ftp.sendNoOp();
        } catch (IOException e) {
            throw new RuntimeException("Failed to validate client: " + e, e);
        }
    }
    public int getInitCount() {
        return initCount;
    }
    public void setInitCount(int initCount) {
        this.initCount = initCount;
    }
    public int getInitMaxCount() {
        return initMaxCount;
    }
    public void setInitMaxCount(int initMaxCount) {
        this.initMaxCount = initMaxCount;
    }
    public int getConnectCount() {
        return connectCount;
    }
    public void setConnectCount(int connectCount) {
        this.connectCount = connectCount;
    }
    public Long getWaitTime() {
        return waitTime;
    }
    public void setWaitTime(Long waitTime) {
        this.waitTime = waitTime;
    }
    public Long getWaitTimeLock() {
        return waitTimeLock;
    }
    public void setWaitTimeLock(Long waitTimeLock) {
        this.waitTimeLock = waitTimeLock;
    }
    public String getHostname() {
        return hostname;
    }
    public void setHostname(String hostname) {
        this.hostname = hostname;
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public LinkedList<FTPClient> getPool() {
        return pool;
    }
    public void setPool(LinkedList<FTPClient> pool) {
        this.pool = pool;
    }
}
           
package com.jin.demo.lock;

import org.apache.commons.net.ftp.FTPClient;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * FTPClient 連接配接池具體實作類
 *
 * @auther jinsx
 * @date 2019-01-02 10:46
 */
public class FTPClientConnectPoolImpl extends FTPClientConnectPool {

    private static final FTPClientConnectPoolImpl FTP_CLIENT_CONNECT_POOL = new FTPClientConnectPoolImpl();
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    /***
     * 私有構造器,并初始化FTPClient連接配接池
     */
    private FTPClientConnectPoolImpl() {
        for (int i = 0; i < super.initCount; i++) {
            FTPClient ftp = super.initFTPClient();
            super.pool.addLast(ftp);
        }
    }

    /**
     * 提供對外擷取執行個體的靜态方法
     *
     * @return
     */
    public static FTPClientConnectPoolImpl getInstance() {
        return FTP_CLIENT_CONNECT_POOL;
    }

    @Override
    protected FTPClient getFTPClient() {
        lock.lock();
        try {
            // 1.計算逾時結束時間
            Long endTime = System.currentTimeMillis() + waitTime;
            while (endTime >= System.currentTimeMillis()) {
                if (super.pool.isEmpty()) {
                    // 2.連接配接池中沒有連接配接對象,進入等待
                    System.out.println(Thread.currentThread().getName() + "進入等待");
                    condition.await(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                }
                if (!super.pool.isEmpty()) {
                    // 3.從連接配接池中取出一個連接配接對象
                    FTPClient ftp = pool.removeFirst();
                    // 4.驗證是否可用
                    if (super.validateFTPClient(ftp)) {
                        return ftp;
                    } else {
                        // 5.銷毀不可用連接配接對象
                        super.destroyFTPClient(ftp);
                        // 6.重新建立一個連接配接對象
                        return super.initFTPClient();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        // 4.傳回null或抛出異常
        System.out.println(Thread.currentThread().getName() + "擷取逾時傳回null");
        return null;
    }

    @Override
    protected void releaseFTPClient(FTPClient ftp) {
        lock.lock();
        try {
            // 1.如果連接配接池沒有滿,并且ftp可用,則放到連接配接池中
            if (super.pool.size() < super.initCount && ftp != null && super.validateFTPClient(ftp)) {
                ftp.changeWorkingDirectory(super.path);
                super.pool.addLast(ftp);
            } else {
                super.destroyFTPClient(ftp);
            }
            // 2.如果連接配接池有連接配接,則喚醒一個等待
            if (!super.pool.isEmpty()) {
                condition.signal();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
           
package com.jin.demo.lock;

import org.apache.commons.net.ftp.FTPClient;

/**
 * 連接配接測試類
 * @auther jinsx
 * @date 2019-01-02 09:10
 */
public class ConnectTest {

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "前" + System.currentTimeMillis());
                    FTPClientConnectPoolImpl pool = FTPClientConnectPoolImpl.getInstance();
                    FTPClient ftp = pool.getFTPClient();
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "後" + System.currentTimeMillis() + ftp);
                    pool.releaseFTPClient(ftp);
                }
            }).start();
        }
    }

}