天天看點

分布式檔案系統之-FastDFS

介紹各種分布式檔案系統:http://elf8848.iteye.com/blog/1724382

架構研究參考:http://elf8848.iteye.com/blog/1739596

安裝配置位址:http://www.open-open.com/lib/view/open1435468300700.html

FastDFS 工作原理參考網址:http://blog.chinaunix.net/uid-20196318-id-4058561.html

FastDFS分布檔案系統介紹:

1.FastDFS是一個開源的輕量級分布式檔案系統,它對檔案進行管理,功能包括:檔案存儲、檔案同步、檔案通路(檔案上傳、檔案下載下傳)等,解決了大容量存儲和負載均衡的問題。特别适合以檔案為載體的線上服務,如相冊網站、視訊網站等等,不适合做資料處理.

 2.該分布式檔案系統組成:
     跟蹤伺服器(tracker server)、存儲伺服器(storage server)和用戶端(client)
     存儲伺服器:檔案存儲,檔案同步,提供檔案通路接口,同時以key value的方式管理檔案的中繼資料。其中      檔案通路采用了nginx,用于通路檔案。
     跟蹤伺服器:排程檔案以負載均衡的方式通路,它起到了一個協調的作用,負責管理storage。是以上傳檔案的監聽端口是歸跟蹤伺服器管的,不直接和存儲伺服器挂鈎。
     它們倆的關系好比是,存儲伺服器是搬運的員工,而跟蹤伺服器是協調搬運的主管.
3.簡單工作原理:
     1. 用戶端向檔案系統發送請求
     2.檔案系統的tracker 查詢可用的storage,傳回給用戶端可用的storge ip和端口号
     3.用戶端上傳檔案(檔案内容和檔案名)
     4.檔案系統端存儲檔案,生成檔案id,傳回給用戶端(檔案名和檔案路徑)
     5.用戶端做相應處理,結束.
           

用戶端上傳至檔案系統spring內建實作:

1.配置檔案準備:fdfs_client.conf,參數如下:

connect_timeout = 2

network_timeout = 30

charset = utf-8

http.tracker_http_port = 追蹤器通路檔案端口号

http.anti_steal_token = 是否啟用tokem(預設no)

http.secret_key = (上傳檔案密碼,和伺服器端的http.conf檔案内設的密碼保持一緻)

tracker_server=追蹤器ip:上傳檔案端口号

minPoolSize = (連接配接池最小數)

maxPoolSize = (連接配接池最大數)

waitTimes = 等待時間

2.web.xml建立bean

<!-- 初始化配置檔案伺服器配置 -->
    <bean id="propertyConfigurer"
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:/fdfs_client.conf</value>
            </list>
        </property>
    </bean>
    <bean name="fileFastDFSUtil" class="com.csx.util.aliFile.FastDFSUtil"
        init-method="init">
        <property name="minPoolSize" value="${minPoolSize}" />
        <property name="maxPoolSize" value="${maxPoolSize}" />
        <property name="waitTimes" value="${waitTimes}" />
    </bean>
           

3.重要工具類,可直接複制用

1.FastDFSUtil工具類

import java.net.SocketTimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.csource.common.NameValuePair;
import org.csource.fastdfs.StorageClient1;
import org.csource.fastdfs.StorageServer;
import org.csource.fastdfs.TrackerServer;

import com.csx.util.ReadProperties;

public class FastDFSUtil {

    private final Log log = LogFactory.getLog("FastDfsUtil.class");
    /** 連接配接池 */
    private FastDFSFastDFSConnectionPool connectionPool = null;
    /** 連接配接池預設最小連接配接數 */
    private long minPoolSize = ;
    /** 連接配接池預設最大連接配接數 */
    private long maxPoolSize = ;
    /** 目前建立的連接配接數 */
    private volatile long nowPoolSize = ;
    /** 預設等待時間(機關:秒) */
    private long waitTimes = ;

    /**
     * 初始化線程池
     * 
     * @Description:
     * 
     */
    public void init() {

        log.info("[初始化線程池(Init)[預設參數:minPoolSize=" + minPoolSize + ",maxPoolSize=" + maxPoolSize + ",waitTimes="
                + waitTimes + "]");
        connectionPool = new FastDFSFastDFSConnectionPool(minPoolSize, maxPoolSize, waitTimes);
    }

    /**
     * 
     * @Description:
     * @param groupName
     *            組名如group0
     * @param fileBytes
     *            檔案位元組數組
     * @param extName
     *            檔案擴充名:如png
     * @param linkUrl
     *            通路位址:http://image.xxx.com
     * @return 圖檔上傳成功後位址
     * @throws FastException
     * 
     */
    public String upload(FastDFSFile file, NameValuePair[] valuePairs) throws FastException {
        /** 封裝檔案資訊參數 */
        TrackerServer trackerServer = null;
        try {

            /** 擷取fastdfs伺服器連接配接 */
            trackerServer = connectionPool.checkout();
            StorageServer storageServer = null;
            StorageClient1 client = new StorageClient1(trackerServer, storageServer);

            String[] results = client.upload_file(file.getContent(), file.getExt(), valuePairs);

            /** 上傳完畢及時釋放連接配接 */
            connectionPool.checkin(trackerServer);

            log.info("[上傳檔案(upload)-fastdfs伺服器相應結果][result:results=" + results + "]");

            /** results[0]:組名,results[1]:遠端檔案名 */
            if (results != null && results.length == ) {
                return ReadProperties.getFile_ip() + "/" + results[] + "/" + results[];
            } else {
                /** 檔案系統上傳傳回結果錯誤 */
                throw FastDFSERROR.UPLOAD_RESULT_ERROR.ERROR();
            }
        } catch (FastException e) {

            log.error("[上傳檔案(upload)][異常:" + e + "]");
            throw e;

        } catch (SocketTimeoutException e) {
            log.error("[上傳檔案(upload)][異常:" + e + "]");

            //将連接配接數減一
            connectionPool.drop(trackerServer);

            throw FastDFSERROR.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
        } catch (Exception e) {

            log.error("[上傳檔案(upload)][異常:" + e + "]");
            connectionPool.drop(trackerServer);
            throw FastDFSERROR.SYS_ERROR.ERROR();

        }

    }

    /**
     * 
     * @Description: 删除fastdfs伺服器中檔案
     * @param group_name
     *            組名
     * @param remote_filename
     *            遠端檔案名稱
     * @throws FastException
     * 
     */
    public void deleteFile(String group_name, String remote_filename) throws FastException {
        log.info("[ 删除檔案(deleteFile)][parms:group_name=" + group_name + ",remote_filename=" + remote_filename + "]");
        TrackerServer trackerServer = null;

        try {
            /** 擷取可用的tracker,并建立存儲server */
            trackerServer = connectionPool.checkout();
            StorageServer storageServer = null;
            StorageClient1 client1 = new StorageClient1(trackerServer, storageServer);
            /** 删除檔案,并釋放 trackerServer */
            int result = client1.delete_file(group_name, remote_filename);

            /** 上傳完畢及時釋放連接配接 */
            connectionPool.checkin(trackerServer);

            log.info("[ 删除檔案(deleteFile)--調用fastdfs用戶端傳回結果][results:result=" + result + "]");

            /** 0:檔案删除成功,2:檔案不存在 ,其它:檔案删除出錯 */
            if (result == ) {

                throw FastDFSERROR.NOT_EXIST_FILE.ERROR();

            } else if (result != ) {

                throw FastDFSERROR.DELETE_RESULT_ERROR.ERROR();

            }

        } catch (FastException e) {

            log.error("[ 删除檔案(deleteFile)][異常:" + e + "]");
            throw e;

        } catch (SocketTimeoutException e) {
            log.error("[ 删除檔案(deleteFile)][異常:" + e + "]");
            throw FastDFSERROR.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
        } catch (Exception e) {

            log.error("[ 删除檔案(deleteFile)][異常:" + e + "]");
            connectionPool.drop(trackerServer);
            throw FastDFSERROR.SYS_ERROR.ERROR();

        }
    }

    public FastDFSFastDFSConnectionPool getConnectionPool() {
        return connectionPool;
    }

    public void setConnectionPool(FastDFSFastDFSConnectionPool connectionPool) {
        this.connectionPool = connectionPool;
    }

    public long getMinPoolSize() {
        return minPoolSize;
    }

    public void setMinPoolSize(long minPoolSize) {
        this.minPoolSize = minPoolSize;
    }

    public long getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(long maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

    public long getNowPoolSize() {
        return nowPoolSize;
    }

    public void setNowPoolSize(long nowPoolSize) {
        this.nowPoolSize = nowPoolSize;
    }

    public long getWaitTimes() {
        return waitTimes;
    }

    public void setWaitTimes(long waitTimes) {
        this.waitTimes = waitTimes;
    }
}
           

2.自定義異常類:

public class FastException extends Exception {
    private static final long serialVersionUID = -L;
    private String code;
    private String description;

    public FastException(String code, String message) {
        super(message);
        this.code = code;
    }

    public FastException(String code, String message, String description) {
        super(message);
        this.code = code;
        this.description = description;
    }

    /**
     * 錯誤碼
     * 
     * @return
     */
    public String getCode() {
        return code;
    }

    /**
     * 使用者可讀描述資訊
     * 
     * @return
     */
    public String getDescription() {
        return description;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(getClass().getName());
        sb.append(": [");
        sb.append("] - ");
        sb.append(code);
        sb.append(" - ");
        sb.append(getMessage());
        if (getDescription() != null) {
            sb.append(" - ");
            sb.append(getDescription());
        }
        return sb.toString();
    }
}
           

3.報錯資訊枚舉類FastDFSERROR

public enum FastDFSERROR {

    PARAMETER_IS_NULL("21001", "必填參數為空", "必填參數為空"),

    FASTDFS_CONNECTION_FAIL("21002", "連接配接fastdfs伺服器失敗", "檔案上傳異常,請重試"),

    WAIT_IDLECONNECTION_TIMEOUT("21003", "等待空閑連接配接逾時", "連接配接逾時,請重試"),

    NOT_EXIST_GROUP("21004", "檔案組不存在", "檔案組不存在"),

    UPLOAD_RESULT_ERROR("21005", "fastdfs檔案系統上傳傳回結果錯誤", "檔案上傳異常,請重試"),

    NOT_EXIST_PORTURL("21006", "未找到對應的端口和通路位址", "檔案上傳異常,請重試"),

    SYS_ERROR("21007", "系統錯誤", "系統錯誤"),

    FILE_PATH_ERROR("21008", "檔案通路位址格式不對", "檔案通路位址格式不對"),

    DELETE_RESULT_ERROR("21009", "fastdfs檔案系統删除檔案傳回結果錯誤", "檔案删除異常,請重試"),

    NOT_EXIST_FILE("21010", "檔案不存在", "檔案不存在");

    /** 錯誤碼 */
    String code;

    /** 錯誤資訊,用于日志輸出,便于問題定位 */
    String message;

    /** 錯誤提示,用于用戶端提示 */
    String descreption;

    FastDFSERROR(String code, String message) {
        this.message = message;
        this.code = code;
    }

    FastDFSERROR(String code, String message, String descreption) {
        this.message = message;
        this.code = code;
        this.descreption = descreption;
    }

    public FastException ERROR() {
        return new FastException(this.code, this.message, this.descreption);
    }

    public FastException ERROR(String descreption) {
        return new FastException(this.code, this.message, descreption);
    }

}
           

4.檔案上傳類FastDFSFile

/**
 * 檔案上傳屬性,調用fastdfs上傳方法必需
 *
 */
public class FastDFSFile {
    private byte[] content;// 檔案内容
    private String name;// 檔案名
    private String ext;// 字尾
    private String fileId;// 檔案id(唯一id)

    public FastDFSFile(byte[] content, String ext,String fileId) {
        this.content = content;
        this.ext = ext;
        this.fileId = fileId;
    }

    public FastDFSFile(byte[] content, String name, String ext,String fileId) {
        this.content = content;
        this.name = name;
        this.ext = ext;
        this.fileId = fileId;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getExt() {
        return ext;
    }

    public void setExt(String ext) {
        this.ext = ext;
    }

    public String getFileId() {
        return fileId;
    }

    public void setFileId(String fileId) {
        this.fileId = fileId;
    }

}
           

5.連接配接池實作類(由于每次上傳檔案都需要建立連接配接去上傳檔案,當并發數較多的情況下,會出現連接配接數過多的情況,為此我實作了連接配接池去實作,并每隔五秒去定時掃描看連接配接是否失效)

1. FastDFSFastDFSConnectionPool 連接配接池類

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.csource.fastdfs.ClientGlobal;
import org.csource.fastdfs.TrackerClient;
import org.csource.fastdfs.TrackerServer;

/**
 * 
 * @ClassName: FastDFSConnectionPool
 * @Description: fastdfs連接配接池
 * @author william_zhong
 * @date 2017-4-13
 * 
 */
public class FastDFSFastDFSConnectionPool {
    private final Log log = LogFactory.getLog("FastDFSFastDFSConnectionPool.class");
    /** 空閑的連接配接池 */
    private LinkedBlockingQueue<TrackerServer> idleFastDFSConnectionPool = null;
    /** 連接配接池預設最小連接配接數 */
    private long minPoolSize = ;
    /** 連接配接池預設最大連接配接數 */
    private long maxPoolSize = ;
    /** 目前建立的連接配接數 */
    private volatile long nowPoolSize = ;
    /** 預設等待時間(機關:秒) */
    private long waitTimes = ;
    /** fastdfs用戶端建立連接配接預設1次 */
    private static final int COUNT = ;

    public static final String CLIENT_CONFIG_FILE = "fdfs_client.conf";

    /**
     * 預設構造方法
     */
    public FastDFSFastDFSConnectionPool(long minPoolSize, long maxPoolSize, long waitTimes) {

        log.info("[線程池構造方法(FastDFSConnectionPool)][" + "][預設參數:minPoolSize=" + minPoolSize + ",maxPoolSize="
                + maxPoolSize + ",waitTimes=" + waitTimes + "]");
        this.minPoolSize = minPoolSize;
        this.maxPoolSize = maxPoolSize;
        this.waitTimes = waitTimes;
        /** 初始化連接配接池 */
        poolInit();
        /** 注冊心跳 */
        FastDFSHeartBeat beat = new FastDFSHeartBeat(this);
        beat.beat();
    }

    /**
     * 
     * @Description: 連接配接池初始化 (在加載目前FastDFSConnectionPool時執行) 1).加載配置檔案
     *               2).空閑連接配接池初始化; 3).建立最小連接配接數的連接配接,并放入到空閑連接配接池;
     * 
     */
    private void poolInit() {
        try {
            /** 加載配置檔案 */
            initClientGlobal();
            /** 初始化空閑連接配接池 */
            idleFastDFSConnectionPool = new LinkedBlockingQueue<TrackerServer>();
            /** 往線程池中添加預設大小的線程 */
            for (int i = ; i < minPoolSize; i++) {
                createTrackerServer(COUNT);
            }
        } catch (Exception e) {

        }
    }

    /**
     * 
     * @Description: 建立TrackerServer,并放入空閑連接配接池
     * 
     */
    public void createTrackerServer(int flag) {

        TrackerServer trackerServer = null;

        try {

            TrackerClient trackerClient = new TrackerClient();
            trackerServer = trackerClient.getConnection();
            while (trackerServer == null && flag < ) {
                log.info("[建立TrackerServer(createTrackerServer)][第" + flag + "次重建]");
                flag++;
                initClientGlobal();
                trackerServer = trackerClient.getConnection();
            }
            org.csource.fastdfs.ProtoCommon.activeTest(trackerServer.getSocket());
            idleFastDFSConnectionPool.add(trackerServer);
            /** 同一時間隻允許一個線程對nowPoolSize操作 **/
            synchronized (this) {
                nowPoolSize++;
            }

        } catch (Exception e) {

            log.error("[建立TrackerServer(createTrackerServer)][異常:{}]", e);

        } finally {

            if (trackerServer != null) {
                try {
                    trackerServer.close();
                } catch (Exception e) {
                    log.error("[建立TrackerServer(createTrackerServer)--關閉trackerServer異常][異常:{}]", e);
                }
            }

        }
    }

    /**
     * 
     * @Description: 擷取空閑連接配接 1).在空閑池(idleFastDFSConnectionPool)中彈出一個連接配接;
     *               2).把該連接配接放入忙碌池(busyFastDFSConnectionPool)中; 3).傳回 connection
     *               4).如果沒有idle connection, 等待 wait_time秒, and check again
     * 
     * @throws AppException
     * 
     */
    public TrackerServer checkout() throws FastException {

        log.info("[擷取空閑連接配接(checkout)]");
        TrackerServer trackerServer = idleFastDFSConnectionPool.poll();

        if (trackerServer == null) {

            if (nowPoolSize < maxPoolSize) {
                createTrackerServer(COUNT);
                try {
                    trackerServer = idleFastDFSConnectionPool.poll(waitTimes, TimeUnit.SECONDS);
                } catch (Exception e) {
                    log.error("[擷取空閑連接配接(checkout)-error][error:擷取連接配接逾時:{}]", e);
                    throw FastDFSERROR.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
                }
            }
            if (trackerServer == null) {
                log.error("[擷取空閑連接配接(checkout)-error][error:擷取連接配接逾時(" + waitTimes + "s)]");
                throw FastDFSERROR.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
            }

        }
        log.info("[擷取空閑連接配接(checkout)][擷取空閑連接配接成功]");
        return trackerServer;

    }

    /**
     * 
     * @Description: 釋放繁忙連接配接 1.如果空閑池的連接配接小于最小連接配接值,就把目前連接配接放入idleFastDFSConnectionPool;
     *               2.如果空閑池的連接配接等于或大于最小連接配接值,就把目前釋放連接配接丢棄;
     * 
     * @param client1
     *            需釋放的連接配接對象
     * 
     */

    public void checkin(TrackerServer trackerServer) {

        log.info("[釋放目前連接配接(checkin)][prams:" + trackerServer + "] ");
        if (trackerServer != null) {
            if (idleFastDFSConnectionPool.size() < minPoolSize) {
                idleFastDFSConnectionPool.add(trackerServer);
            } else {
                synchronized (this) {
                    if (nowPoolSize != ) {
                        nowPoolSize--;
                    }
                }
            }
        }

    }

    /**
     * 
     * @Description: 删除不可用的連接配接,并把目前連接配接數減一(調用過程中trackerServer報異常,調用一般在finally中)
     * @param trackerServer
     * 
     */
    public void drop(TrackerServer trackerServer) {
        log.info("[删除不可用連接配接方法(drop)][parms:" + trackerServer + "] ");
        if (trackerServer != null) {
            try {
                synchronized (this) {
                    if (nowPoolSize != ) {
                        nowPoolSize--;
                    }
                }
                trackerServer.close();
            } catch (IOException e) {
                log.info("[删除不可用連接配接方法(drop)--關閉trackerServer異常][異常:{}]", e);
            }
        }
    }

    private void initClientGlobal() throws Exception {
        String classPath = FastDFSFastDFSConnectionPool.class.getResource("/").getPath().replaceAll("%20"," ");// 項目真實路徑
        String fdfsClientConfigFilePath = classPath + CLIENT_CONFIG_FILE;// FastDFS用戶端配置檔案
        ClientGlobal.init(fdfsClientConfigFilePath);
    }

    public LinkedBlockingQueue<TrackerServer> getIdleFastDFSConnectionPool() {
        return idleFastDFSConnectionPool;
    }

    public long getMinPoolSize() {
        return minPoolSize;
    }

    public void setMinPoolSize(long minPoolSize) {
        if (minPoolSize != ) {
            this.minPoolSize = minPoolSize;
        }
    }

    public long getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(long maxPoolSize) {
        if (maxPoolSize != ) {
            this.maxPoolSize = maxPoolSize;
        }
    }

    public long getWaitTimes() {
        return waitTimes;
    }

    public void setWaitTimes(int waitTimes) {
        if (waitTimes != ) {
            this.waitTimes = waitTimes;
        }
    }

}
           
2. FastDFSHeartBeat 定時檢測連接配接類
           
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.csource.fastdfs.TrackerServer;

public class FastDFSHeartBeat {

    private final Log log = LogFactory.getLog("FastDFSHeartBeat.class");
    /** fastdfs連接配接池 */
    private FastDFSFastDFSConnectionPool pool = null;
    /** 小時毫秒數 */
    public static int ahour =  *  *  * ;
    /** 等待時間 */
    public static int waitTimes = ;

    public FastDFSHeartBeat(FastDFSFastDFSConnectionPool pool) {
        this.pool = pool;
    }

    /**
     * 
     * @Description: 定時執行任務,檢測目前的空閑連接配接是否可用,如果不可用将從連接配接池中移除
     * 
     */
    public void beat() {
        log.info("執行心跳方法");
        TimerTask task = new TimerTask() {
            @Override
            public void run() {
                log.info("執行檢測連接配接方法");
                LinkedBlockingQueue<TrackerServer> idleConnectionPool = pool.getIdleFastDFSConnectionPool();
                TrackerServer ts = null;
                for (int i = ; i < idleConnectionPool.size(); i++) {
                    try {
                        ts = idleConnectionPool.poll(waitTimes, TimeUnit.SECONDS);
                        if (ts != null) {
                            org.csource.fastdfs.ProtoCommon.activeTest(ts.getSocket());
                            idleConnectionPool.add(ts);
                        } else {
                            /** 代表已經沒有空閑長連接配接 */
                            break;
                        }
                    } catch (Exception e) {
                        /** 發生異常,要删除,進行重建 */
                        log.error("重新設定檔案連接配接");
                        pool.drop(ts);
                    }
                }
            }
        };
        Timer timer = new Timer();
        timer.schedule(task, ahour, ahour);
    }

}
           

備注:常用的指令

檔案存儲目錄

/opt/fastdfs_storage_data

啟動服務

service fdfs_trackerd start

service fdfs_storaged start

/nginx/sbin/nginx

檢視服務

netstat -unltp|grep fdfs

繼續閱讀