介紹各種分布式檔案系統:http://elf8848.iteye.com/blog/1724382
架構研究參考:http://elf8848.iteye.com/blog/1739596
安裝配置位址:http://www.open-open.com/lib/view/open1435468300700.html
FastDFS 工作原理參考網址:http://blog.chinaunix.net/uid-20196318-id-4058561.html
FastDFS分布檔案系統介紹:
1.FastDFS是一個開源的輕量級分布式檔案系統,它對檔案進行管理,功能包括:檔案存儲、檔案同步、檔案通路(檔案上傳、檔案下載下傳)等,解決了大容量存儲和負載均衡的問題。特别适合以檔案為載體的線上服務,如相冊網站、視訊網站等等,不适合做資料處理.
2.該分布式檔案系統組成:
跟蹤伺服器(tracker server)、存儲伺服器(storage server)和用戶端(client)
存儲伺服器:檔案存儲,檔案同步,提供檔案通路接口,同時以key value的方式管理檔案的中繼資料。其中 檔案通路采用了nginx,用于通路檔案。
跟蹤伺服器:排程檔案以負載均衡的方式通路,它起到了一個協調的作用,負責管理storage。是以上傳檔案的監聽端口是歸跟蹤伺服器管的,不直接和存儲伺服器挂鈎。
它們倆的關系好比是,存儲伺服器是搬運的員工,而跟蹤伺服器是協調搬運的主管.
3.簡單工作原理:
1. 用戶端向檔案系統發送請求
2.檔案系統的tracker 查詢可用的storage,傳回給用戶端可用的storge ip和端口号
3.用戶端上傳檔案(檔案内容和檔案名)
4.檔案系統端存儲檔案,生成檔案id,傳回給用戶端(檔案名和檔案路徑)
5.用戶端做相應處理,結束.
用戶端上傳至檔案系統spring內建實作:
1.配置檔案準備:fdfs_client.conf,參數如下:
connect_timeout = 2
network_timeout = 30
charset = utf-8
http.tracker_http_port = 追蹤器通路檔案端口号
http.anti_steal_token = 是否啟用tokem(預設no)
http.secret_key = (上傳檔案密碼,和伺服器端的http.conf檔案内設的密碼保持一緻)
tracker_server=追蹤器ip:上傳檔案端口号
minPoolSize = (連接配接池最小數)
maxPoolSize = (連接配接池最大數)
waitTimes = 等待時間
2.web.xml建立bean
<!-- 初始化配置檔案伺服器配置 -->
<bean id="propertyConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:/fdfs_client.conf</value>
</list>
</property>
</bean>
<bean name="fileFastDFSUtil" class="com.csx.util.aliFile.FastDFSUtil"
init-method="init">
<property name="minPoolSize" value="${minPoolSize}" />
<property name="maxPoolSize" value="${maxPoolSize}" />
<property name="waitTimes" value="${waitTimes}" />
</bean>
3.重要工具類,可直接複制用
1.FastDFSUtil工具類
import java.net.SocketTimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.csource.common.NameValuePair;
import org.csource.fastdfs.StorageClient1;
import org.csource.fastdfs.StorageServer;
import org.csource.fastdfs.TrackerServer;
import com.csx.util.ReadProperties;
public class FastDFSUtil {
private final Log log = LogFactory.getLog("FastDfsUtil.class");
/** 連接配接池 */
private FastDFSFastDFSConnectionPool connectionPool = null;
/** 連接配接池預設最小連接配接數 */
private long minPoolSize = ;
/** 連接配接池預設最大連接配接數 */
private long maxPoolSize = ;
/** 目前建立的連接配接數 */
private volatile long nowPoolSize = ;
/** 預設等待時間(機關:秒) */
private long waitTimes = ;
/**
* 初始化線程池
*
* @Description:
*
*/
public void init() {
log.info("[初始化線程池(Init)[預設參數:minPoolSize=" + minPoolSize + ",maxPoolSize=" + maxPoolSize + ",waitTimes="
+ waitTimes + "]");
connectionPool = new FastDFSFastDFSConnectionPool(minPoolSize, maxPoolSize, waitTimes);
}
/**
*
* @Description:
* @param groupName
* 組名如group0
* @param fileBytes
* 檔案位元組數組
* @param extName
* 檔案擴充名:如png
* @param linkUrl
* 通路位址:http://image.xxx.com
* @return 圖檔上傳成功後位址
* @throws FastException
*
*/
public String upload(FastDFSFile file, NameValuePair[] valuePairs) throws FastException {
/** 封裝檔案資訊參數 */
TrackerServer trackerServer = null;
try {
/** 擷取fastdfs伺服器連接配接 */
trackerServer = connectionPool.checkout();
StorageServer storageServer = null;
StorageClient1 client = new StorageClient1(trackerServer, storageServer);
String[] results = client.upload_file(file.getContent(), file.getExt(), valuePairs);
/** 上傳完畢及時釋放連接配接 */
connectionPool.checkin(trackerServer);
log.info("[上傳檔案(upload)-fastdfs伺服器相應結果][result:results=" + results + "]");
/** results[0]:組名,results[1]:遠端檔案名 */
if (results != null && results.length == ) {
return ReadProperties.getFile_ip() + "/" + results[] + "/" + results[];
} else {
/** 檔案系統上傳傳回結果錯誤 */
throw FastDFSERROR.UPLOAD_RESULT_ERROR.ERROR();
}
} catch (FastException e) {
log.error("[上傳檔案(upload)][異常:" + e + "]");
throw e;
} catch (SocketTimeoutException e) {
log.error("[上傳檔案(upload)][異常:" + e + "]");
//将連接配接數減一
connectionPool.drop(trackerServer);
throw FastDFSERROR.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
} catch (Exception e) {
log.error("[上傳檔案(upload)][異常:" + e + "]");
connectionPool.drop(trackerServer);
throw FastDFSERROR.SYS_ERROR.ERROR();
}
}
/**
*
* @Description: 删除fastdfs伺服器中檔案
* @param group_name
* 組名
* @param remote_filename
* 遠端檔案名稱
* @throws FastException
*
*/
public void deleteFile(String group_name, String remote_filename) throws FastException {
log.info("[ 删除檔案(deleteFile)][parms:group_name=" + group_name + ",remote_filename=" + remote_filename + "]");
TrackerServer trackerServer = null;
try {
/** 擷取可用的tracker,并建立存儲server */
trackerServer = connectionPool.checkout();
StorageServer storageServer = null;
StorageClient1 client1 = new StorageClient1(trackerServer, storageServer);
/** 删除檔案,并釋放 trackerServer */
int result = client1.delete_file(group_name, remote_filename);
/** 上傳完畢及時釋放連接配接 */
connectionPool.checkin(trackerServer);
log.info("[ 删除檔案(deleteFile)--調用fastdfs用戶端傳回結果][results:result=" + result + "]");
/** 0:檔案删除成功,2:檔案不存在 ,其它:檔案删除出錯 */
if (result == ) {
throw FastDFSERROR.NOT_EXIST_FILE.ERROR();
} else if (result != ) {
throw FastDFSERROR.DELETE_RESULT_ERROR.ERROR();
}
} catch (FastException e) {
log.error("[ 删除檔案(deleteFile)][異常:" + e + "]");
throw e;
} catch (SocketTimeoutException e) {
log.error("[ 删除檔案(deleteFile)][異常:" + e + "]");
throw FastDFSERROR.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
} catch (Exception e) {
log.error("[ 删除檔案(deleteFile)][異常:" + e + "]");
connectionPool.drop(trackerServer);
throw FastDFSERROR.SYS_ERROR.ERROR();
}
}
public FastDFSFastDFSConnectionPool getConnectionPool() {
return connectionPool;
}
public void setConnectionPool(FastDFSFastDFSConnectionPool connectionPool) {
this.connectionPool = connectionPool;
}
public long getMinPoolSize() {
return minPoolSize;
}
public void setMinPoolSize(long minPoolSize) {
this.minPoolSize = minPoolSize;
}
public long getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(long maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public long getNowPoolSize() {
return nowPoolSize;
}
public void setNowPoolSize(long nowPoolSize) {
this.nowPoolSize = nowPoolSize;
}
public long getWaitTimes() {
return waitTimes;
}
public void setWaitTimes(long waitTimes) {
this.waitTimes = waitTimes;
}
}
2.自定義異常類:
public class FastException extends Exception {
private static final long serialVersionUID = -L;
private String code;
private String description;
public FastException(String code, String message) {
super(message);
this.code = code;
}
public FastException(String code, String message, String description) {
super(message);
this.code = code;
this.description = description;
}
/**
* 錯誤碼
*
* @return
*/
public String getCode() {
return code;
}
/**
* 使用者可讀描述資訊
*
* @return
*/
public String getDescription() {
return description;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getClass().getName());
sb.append(": [");
sb.append("] - ");
sb.append(code);
sb.append(" - ");
sb.append(getMessage());
if (getDescription() != null) {
sb.append(" - ");
sb.append(getDescription());
}
return sb.toString();
}
}
3.報錯資訊枚舉類FastDFSERROR
public enum FastDFSERROR {
PARAMETER_IS_NULL("21001", "必填參數為空", "必填參數為空"),
FASTDFS_CONNECTION_FAIL("21002", "連接配接fastdfs伺服器失敗", "檔案上傳異常,請重試"),
WAIT_IDLECONNECTION_TIMEOUT("21003", "等待空閑連接配接逾時", "連接配接逾時,請重試"),
NOT_EXIST_GROUP("21004", "檔案組不存在", "檔案組不存在"),
UPLOAD_RESULT_ERROR("21005", "fastdfs檔案系統上傳傳回結果錯誤", "檔案上傳異常,請重試"),
NOT_EXIST_PORTURL("21006", "未找到對應的端口和通路位址", "檔案上傳異常,請重試"),
SYS_ERROR("21007", "系統錯誤", "系統錯誤"),
FILE_PATH_ERROR("21008", "檔案通路位址格式不對", "檔案通路位址格式不對"),
DELETE_RESULT_ERROR("21009", "fastdfs檔案系統删除檔案傳回結果錯誤", "檔案删除異常,請重試"),
NOT_EXIST_FILE("21010", "檔案不存在", "檔案不存在");
/** 錯誤碼 */
String code;
/** 錯誤資訊,用于日志輸出,便于問題定位 */
String message;
/** 錯誤提示,用于用戶端提示 */
String descreption;
FastDFSERROR(String code, String message) {
this.message = message;
this.code = code;
}
FastDFSERROR(String code, String message, String descreption) {
this.message = message;
this.code = code;
this.descreption = descreption;
}
public FastException ERROR() {
return new FastException(this.code, this.message, this.descreption);
}
public FastException ERROR(String descreption) {
return new FastException(this.code, this.message, descreption);
}
}
4.檔案上傳類FastDFSFile
/**
* 檔案上傳屬性,調用fastdfs上傳方法必需
*
*/
public class FastDFSFile {
private byte[] content;// 檔案内容
private String name;// 檔案名
private String ext;// 字尾
private String fileId;// 檔案id(唯一id)
public FastDFSFile(byte[] content, String ext,String fileId) {
this.content = content;
this.ext = ext;
this.fileId = fileId;
}
public FastDFSFile(byte[] content, String name, String ext,String fileId) {
this.content = content;
this.name = name;
this.ext = ext;
this.fileId = fileId;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getExt() {
return ext;
}
public void setExt(String ext) {
this.ext = ext;
}
public String getFileId() {
return fileId;
}
public void setFileId(String fileId) {
this.fileId = fileId;
}
}
5.連接配接池實作類(由于每次上傳檔案都需要建立連接配接去上傳檔案,當并發數較多的情況下,會出現連接配接數過多的情況,為此我實作了連接配接池去實作,并每隔五秒去定時掃描看連接配接是否失效)
1. FastDFSFastDFSConnectionPool 連接配接池類
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.csource.fastdfs.ClientGlobal;
import org.csource.fastdfs.TrackerClient;
import org.csource.fastdfs.TrackerServer;
/**
*
* @ClassName: FastDFSConnectionPool
* @Description: fastdfs連接配接池
* @author william_zhong
* @date 2017-4-13
*
*/
public class FastDFSFastDFSConnectionPool {
private final Log log = LogFactory.getLog("FastDFSFastDFSConnectionPool.class");
/** 空閑的連接配接池 */
private LinkedBlockingQueue<TrackerServer> idleFastDFSConnectionPool = null;
/** 連接配接池預設最小連接配接數 */
private long minPoolSize = ;
/** 連接配接池預設最大連接配接數 */
private long maxPoolSize = ;
/** 目前建立的連接配接數 */
private volatile long nowPoolSize = ;
/** 預設等待時間(機關:秒) */
private long waitTimes = ;
/** fastdfs用戶端建立連接配接預設1次 */
private static final int COUNT = ;
public static final String CLIENT_CONFIG_FILE = "fdfs_client.conf";
/**
* 預設構造方法
*/
public FastDFSFastDFSConnectionPool(long minPoolSize, long maxPoolSize, long waitTimes) {
log.info("[線程池構造方法(FastDFSConnectionPool)][" + "][預設參數:minPoolSize=" + minPoolSize + ",maxPoolSize="
+ maxPoolSize + ",waitTimes=" + waitTimes + "]");
this.minPoolSize = minPoolSize;
this.maxPoolSize = maxPoolSize;
this.waitTimes = waitTimes;
/** 初始化連接配接池 */
poolInit();
/** 注冊心跳 */
FastDFSHeartBeat beat = new FastDFSHeartBeat(this);
beat.beat();
}
/**
*
* @Description: 連接配接池初始化 (在加載目前FastDFSConnectionPool時執行) 1).加載配置檔案
* 2).空閑連接配接池初始化; 3).建立最小連接配接數的連接配接,并放入到空閑連接配接池;
*
*/
private void poolInit() {
try {
/** 加載配置檔案 */
initClientGlobal();
/** 初始化空閑連接配接池 */
idleFastDFSConnectionPool = new LinkedBlockingQueue<TrackerServer>();
/** 往線程池中添加預設大小的線程 */
for (int i = ; i < minPoolSize; i++) {
createTrackerServer(COUNT);
}
} catch (Exception e) {
}
}
/**
*
* @Description: 建立TrackerServer,并放入空閑連接配接池
*
*/
public void createTrackerServer(int flag) {
TrackerServer trackerServer = null;
try {
TrackerClient trackerClient = new TrackerClient();
trackerServer = trackerClient.getConnection();
while (trackerServer == null && flag < ) {
log.info("[建立TrackerServer(createTrackerServer)][第" + flag + "次重建]");
flag++;
initClientGlobal();
trackerServer = trackerClient.getConnection();
}
org.csource.fastdfs.ProtoCommon.activeTest(trackerServer.getSocket());
idleFastDFSConnectionPool.add(trackerServer);
/** 同一時間隻允許一個線程對nowPoolSize操作 **/
synchronized (this) {
nowPoolSize++;
}
} catch (Exception e) {
log.error("[建立TrackerServer(createTrackerServer)][異常:{}]", e);
} finally {
if (trackerServer != null) {
try {
trackerServer.close();
} catch (Exception e) {
log.error("[建立TrackerServer(createTrackerServer)--關閉trackerServer異常][異常:{}]", e);
}
}
}
}
/**
*
* @Description: 擷取空閑連接配接 1).在空閑池(idleFastDFSConnectionPool)中彈出一個連接配接;
* 2).把該連接配接放入忙碌池(busyFastDFSConnectionPool)中; 3).傳回 connection
* 4).如果沒有idle connection, 等待 wait_time秒, and check again
*
* @throws AppException
*
*/
public TrackerServer checkout() throws FastException {
log.info("[擷取空閑連接配接(checkout)]");
TrackerServer trackerServer = idleFastDFSConnectionPool.poll();
if (trackerServer == null) {
if (nowPoolSize < maxPoolSize) {
createTrackerServer(COUNT);
try {
trackerServer = idleFastDFSConnectionPool.poll(waitTimes, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("[擷取空閑連接配接(checkout)-error][error:擷取連接配接逾時:{}]", e);
throw FastDFSERROR.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
}
}
if (trackerServer == null) {
log.error("[擷取空閑連接配接(checkout)-error][error:擷取連接配接逾時(" + waitTimes + "s)]");
throw FastDFSERROR.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
}
}
log.info("[擷取空閑連接配接(checkout)][擷取空閑連接配接成功]");
return trackerServer;
}
/**
*
* @Description: 釋放繁忙連接配接 1.如果空閑池的連接配接小于最小連接配接值,就把目前連接配接放入idleFastDFSConnectionPool;
* 2.如果空閑池的連接配接等于或大于最小連接配接值,就把目前釋放連接配接丢棄;
*
* @param client1
* 需釋放的連接配接對象
*
*/
public void checkin(TrackerServer trackerServer) {
log.info("[釋放目前連接配接(checkin)][prams:" + trackerServer + "] ");
if (trackerServer != null) {
if (idleFastDFSConnectionPool.size() < minPoolSize) {
idleFastDFSConnectionPool.add(trackerServer);
} else {
synchronized (this) {
if (nowPoolSize != ) {
nowPoolSize--;
}
}
}
}
}
/**
*
* @Description: 删除不可用的連接配接,并把目前連接配接數減一(調用過程中trackerServer報異常,調用一般在finally中)
* @param trackerServer
*
*/
public void drop(TrackerServer trackerServer) {
log.info("[删除不可用連接配接方法(drop)][parms:" + trackerServer + "] ");
if (trackerServer != null) {
try {
synchronized (this) {
if (nowPoolSize != ) {
nowPoolSize--;
}
}
trackerServer.close();
} catch (IOException e) {
log.info("[删除不可用連接配接方法(drop)--關閉trackerServer異常][異常:{}]", e);
}
}
}
private void initClientGlobal() throws Exception {
String classPath = FastDFSFastDFSConnectionPool.class.getResource("/").getPath().replaceAll("%20"," ");// 項目真實路徑
String fdfsClientConfigFilePath = classPath + CLIENT_CONFIG_FILE;// FastDFS用戶端配置檔案
ClientGlobal.init(fdfsClientConfigFilePath);
}
public LinkedBlockingQueue<TrackerServer> getIdleFastDFSConnectionPool() {
return idleFastDFSConnectionPool;
}
public long getMinPoolSize() {
return minPoolSize;
}
public void setMinPoolSize(long minPoolSize) {
if (minPoolSize != ) {
this.minPoolSize = minPoolSize;
}
}
public long getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(long maxPoolSize) {
if (maxPoolSize != ) {
this.maxPoolSize = maxPoolSize;
}
}
public long getWaitTimes() {
return waitTimes;
}
public void setWaitTimes(int waitTimes) {
if (waitTimes != ) {
this.waitTimes = waitTimes;
}
}
}
2. FastDFSHeartBeat 定時檢測連接配接類
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.csource.fastdfs.TrackerServer;
public class FastDFSHeartBeat {
private final Log log = LogFactory.getLog("FastDFSHeartBeat.class");
/** fastdfs連接配接池 */
private FastDFSFastDFSConnectionPool pool = null;
/** 小時毫秒數 */
public static int ahour = * * * ;
/** 等待時間 */
public static int waitTimes = ;
public FastDFSHeartBeat(FastDFSFastDFSConnectionPool pool) {
this.pool = pool;
}
/**
*
* @Description: 定時執行任務,檢測目前的空閑連接配接是否可用,如果不可用将從連接配接池中移除
*
*/
public void beat() {
log.info("執行心跳方法");
TimerTask task = new TimerTask() {
@Override
public void run() {
log.info("執行檢測連接配接方法");
LinkedBlockingQueue<TrackerServer> idleConnectionPool = pool.getIdleFastDFSConnectionPool();
TrackerServer ts = null;
for (int i = ; i < idleConnectionPool.size(); i++) {
try {
ts = idleConnectionPool.poll(waitTimes, TimeUnit.SECONDS);
if (ts != null) {
org.csource.fastdfs.ProtoCommon.activeTest(ts.getSocket());
idleConnectionPool.add(ts);
} else {
/** 代表已經沒有空閑長連接配接 */
break;
}
} catch (Exception e) {
/** 發生異常,要删除,進行重建 */
log.error("重新設定檔案連接配接");
pool.drop(ts);
}
}
}
};
Timer timer = new Timer();
timer.schedule(task, ahour, ahour);
}
}
備注:常用的指令
檔案存儲目錄
/opt/fastdfs_storage_data
啟動服務
service fdfs_trackerd start
service fdfs_storaged start
/nginx/sbin/nginx
檢視服務
netstat -unltp|grep fdfs