天天看點

大資料與機器學習:實踐方法與行業案例3.4 自動加載程式的多線程實作

<b>3.4 自動加載程式的多線程實作</b>

<b></b>

本書中的自動加載程式使用java+hibernate實作,需要首先準備項目所需要的hibernate和annotation相應版本的jar包。

程式需要初始化檔案狀态表,即根據file_status表中的配置,一次性産生當日需要處理的檔案的初始記錄,這些記錄會插入file_status表中,供随後的處理子產品使用。

為了便于并行處理,我們将自動加載程式的整個流程劃分為多個子項目:

1)掃描檔案子項目,取名為scanfiles。

2)将下載下傳檔案和解壓檔案歸為一個java項目,取名為downloadandunzip。

3)加載檔案為另一個子項目,例如針對oracle,可以建立一個loadtooracle的項目,針對加載至hive表中的場景,可以建立一個loadtohive的項目,當然也可以放在一起用參數區分。

這些java子項目之間通過file_status配置表進行資料通信,圖3-10展示了這些子項目的資訊互動。

圖3-10 項目之間的關聯關系

3.4.1 scanfiles

scanfiles是常駐記憶體程序,包含初始化檔案狀态和掃描檔案兩個功能。初始化檔案狀态子產品在每天淩晨根據資料檔案資訊表(file_settings),将當日需要處理的資料檔案資訊初始化至資料檔案狀态表(file_status)中,初始化字段包含檔案全名、批次日期、檔案日期、檔案狀态(初始化為file_not_exists)等,更多内容可參考前面對資料檔案狀态表的介紹。

掃描檔案子產品根據資料檔案狀态表中當日的初始化資訊,在對應的目錄中掃描資料檔案,更新對應的狀态資訊。

代碼清單3-3和代碼清單3-4分别給出了初始化檔案狀态表的代碼和掃描檔案的代碼。

代碼清單 3-3

private static void generatethelist() {

if (filestatusdao.gettodaystatuscnt().longvalue() &gt; 0l) {

// 設定taskday為today

taskdaystr = datecalc.formateddate(dateformatstr, datecalc.today());

return;

}

filesettingdao filesettingdao = new filesettingdaoimpl();

list&lt;filesetting&gt; filesettings = filesettingdao.getallfilesettings();

set&lt;long&gt; fileidset = new hashset&lt;long&gt;();

for (filesetting filesetting : filesettings) {

if (fileidset.contains(filesetting.getid()))

continue;

else

fileidset.add(filesetting.getid());

// 檔案名中的日期

string filedate = datecalc.formateddate(filesetting.getfiledateformat(),

                 datecalc.dateadd(datecalc.today(),

                 filesetting.getdatedif()));

// 當天需要下載下傳的檔案全名,不含路徑

string fullname = filesetting.getfileprefix() + filedate +

    filesetting.getfilemid() + filesetting.getfilesuffix();

filestatus filestatus = new filestatus();

filestatus.setfilesetting(filesetting);

filestatus.setfullname(fullname);

filestatus.setstatus(status.file_not_exists); // 初始狀态為file_not_exists

filestatus.setfilesize(0l);

filestatus.setfilesizecheckcnt(0);

filestatus.setfiledate(filedate);

filestatus.setbatchdate(datecalc.formateddate(dateformatstr, datecalc.today()));

filestatus.setpriority(filesetting.getpriority());

for (targetserver targetserver : filesetting.gettargetserverset()) {

filestatus.settargetserver(targetserver);

filesettingdao.save(filestatus);

try {

thread.sleep(300);// 暫停0.3秒

} catch (interruptedexception e) {

logger.error(e.tostring());

// 設定taskday為今天

generatethelist()方法通過filestatusdao.gettodaystatuscnt(). longvalue() &gt; 0l判斷當天的檔案狀态是否已經初始化,如果已經初始化,則傳回;否則通過filesettingdao周遊所有的file_settings記錄,并通過filestatus實體對象将每條file_settings記錄初始化為一個filestatus對象,最後通過filesettingdao.save(filestatus)将對應的初始化檔案狀态記錄儲存至file_status表中。

generatethelist()方法僅在每天淩晨成功調用一次即可,調用成功之後,file_status表中将會插入當日需要處理的所有資料檔案資訊,并且檔案的初始狀态為file_not_exists。

代碼清單 3-4

public class scanfilethread extends thread {

private static logger logger = logger.getlogger(scanfilethread.class);

private static filestatusdao filestatusdao = new filestatusdaoimpl();

public void run() {

synchronized (scanfilethread.class) {

scanfile();

private void scanfile() {

while (true) {

list&lt;filestatus&gt; filestatuslist = filestatusdao.getfilebystatus(

new status[] { status.file_not_exists, status.checking_ftp_file });

for (filestatus filestatus : filestatuslist) {

filesetting filesetting = filestatus.getfilesetting();

string remotefilename = filestatus.getfullname(); // 檔案全名

long filesizeold = filestatus.getfilesize();

integer filecheckcnt = filestatus.getfilesizecheckcnt();

ftpclient ftpclient = ftputil.getftpclient(filesetting);

if (ftpclient == null)

ftpfile[] ftpfiles = ftpclient.listfiles(remotefilename);

if (ftpfiles.length == 0) {

ftpclient.logout();

ftpclient.disconnect();

if (filestatus.getstatus().equals(

status.file_not_exists.name())) {

filestatus.setstatus(status.checking_ftp_file);

} else if (filestatus.getstatus().equals(status.

checking_ftp_file.name())) {

long filesizenew = ftpfiles[0].getsize();

if (filesizenew.longvalue() != filesizeold.

longvalue()) {

filestatus.setfilesize(filesizenew);

} else {

if (filecheckcnt &lt; 5)

filestatus.setfilesizecheckcnt(filecheckcnt + 1);

filestatus.setstatus(status.ftp_file_ok);

filestatusdao.update(filestatus);

} catch (ioexception e) {

logger.error("掃描ftp檔案時出錯:" + e.tostring());

} finally {

if (ftpclient.isconnected()) {

logger.error("斷開ftp連接配接時出錯:" + e.

tostring());

thread.sleep(15000); // sleep 15秒

logger.error("thread.sleep(15000)時出錯:" + e.tostring());

  }

scanfilethread類使用了filestatusdao類用于資料庫的讀/寫功能,該類的父類basedao提供了hibernate的sessionfactory以及事務上的支援。

scanfilethread類的主要方法scanfile()中用于讀取檔案類型為status.file_not_exists以及status.checking_ftp_file的全部記錄,通過ftpclient.listfiles(remotefilename)檢視指定檔案是否存在于ftp伺服器中,一旦發現檔案存在,就将檔案的狀态由status.file_not_exists更新為status.checking_ftp_file。

随後程式通過多次(每次間隔15秒)比較檔案大小來确定ftp伺服器上的檔案是否已經上傳完成,當連續5次檢測到的檔案大小都相同時,則認為檔案已經完整上傳至ftp伺服器,程式将該檔案的狀态由status.checking_ftp_file更新為status.ftp_file_ok,标明該檔案可以開始下載下傳了。

3.4.2 downloadandunzip

在downloadandunzip項目中,包含兩個重要的java類:getfilethread類和unzipthread類,它們分别負責下載下傳檔案和解壓檔案。

getfilethread類的主要作用是采用多線程的方式下載下傳檔案,其核心方法是代碼清單3-5中所示的ftpgetfile方法。

代碼清單 3-5

private static void ftpgetfile(filestatus filestatus) {

ftpclient ftpclient = ftputil.getftpclient(filestatus.getfilesetting());

string remotefilename = filestatus.getfullname(); // 檔案全名,帶路徑

if (filestatus.getstatus().equalsignorecase("ftp_file_ok")) {

filestatus.setstatus(status.download_start); // 更新下載下傳狀态

string localbasedir = globalconfig.get("localbasedir").getconfigcontent();

string tablename = filesetting.getloadconfig().gettablename();

string ftppath = filesetting.getftppath();

date filedate = datecalc.dateadd(datecalc.today(), filesetting.getdatedif());

string filedatestr = datecalc.formateddate(filesetting.

getfiledateformat(), filedate);

string localdir = null; // 檔案下載下傳後,存放的本地目錄

if (ftppath.equals(""))

localdir = localbasedir + "/" + tablename + "/" + filedatestr;

localdir = localbasedir + "/" + tablename + "/" + ftppath +

"/" + filedatestr;

// 建立指定路徑

file ldir = new file(localdir);

if (!ldir.exists())

ldir.mkdirs();

fileoutputstream fos = null;

fos = new fileoutputstream(ldir.getabsolutepath() + "/" +

remotefilename);

} catch (filenotfoundexception e) {

logger.error("fileoutputstream:" + e.tostring());

ftpclient.setfiletype(ftpclient.binary_file_type);

ftpclient.setbuffersize(1024000);

string downloadfilepath = null;

if (ftpclient.retrievefile(remotefilename, fos)) {// 下載下傳完成

downloadfilepath = localdir + "/" + filestatus.getfullname();

fos.close();

filestatus.setfilepath(downloadfilepath);

filestatus.setstatus(status.download_finished);

filestatus.setstatus(status.download_failed); // 更新下載下傳狀态

filestatus.setloginfo(e.tostring());

logger.error("下載下傳檔案出錯:" + e.tostring());

if (ftpclient != null)

logger.error("ftpclient.disconnect()出錯:" + e.

該方法用于檢測狀态為ftp_file_ok的資料檔案,根據配置資訊自動建立以檔案對應表名、ftp遠端路徑和檔案日期拼接的本地檔案存放路徑(localdir = localbasedir + "/" + tablename + "/" + ftppath + "/" + filedatestr;),然後将檔案下載下傳至上述建立的目錄中。在下載下傳過程中,同時更新檔案的下載下傳狀态,并記錄檔案下載下傳完成後的儲存路徑。

unzipthread類的作用是将檔案狀态為download_finished且需要解壓縮的資料檔案(通過檔案字尾區分)進行解壓,并記錄解壓後的檔案名稱和路徑。代碼清單3-6展示了unzipthread類的主要方法unzip()。

代碼清單 3-6

public static void unzip(filestatus filestatus) {

if (!filestatus.getstatus().equals("download_finished")){

logger.error("file status not yet in download_finished!");

string filesuffix = filesetting.getfilesuffix().tolowercase();

filestatus.setstatus(status.unzip_start);

filestatus.setupdatetime(new timestamp(new date().gettime()));

if (filesuffix.equals(".zip"))

zip(filestatus);

else if (filesuffix.equals(".gz"))

gz(filestatus);

unzip()方法支援.zip和.gz兩種壓縮方式,如果壓縮檔案是其他壓縮方式,比如.gzip,那麼需要自行編寫針對.gzip格式檔案的解壓代碼。

unzip()方法調用成功後,會根據實際解壓結果更新檔案狀态為unzip_failed或者unzip_finished,如果解壓成功,則記錄解壓後的檔案名稱及檔案路徑。

3.4.3 loadtohive

loadtohive用于将load_config中db_type='hive'的資料檔案導入對應的hive表中,并且根據load_type字段進行不同的加載處理。代碼清單3-7展示了loadtohive的核心代碼。

代碼清單 3-7

private void loadtotable(targetserver targetserver,filestatus filestatus) {

filestatus.setstatus(status.load_start);

string loadtype = filestatus.getfilesetting().

getloadconfig().getloadtype();

string hdfsbasedir = targetserver.gethdfsbasedir();

string hdfsdir = null;

if (loadtype.equalsignorecase("added"))

hdfsdir = hdfsbasedir + "/" +

ilestatus.getfilesetting().getloadconfig().gettablename() + "/" +

filestatus.getfiledate();

else if (loadtype.equalsignorecase("covered"))

filestatus.getfilesetting().getloadconfig().gettablename();

if (!addtohive.createhdfsdir(filestatus, hdfsdir)) //建立hdfs目錄

if (loadtype.equalsignorecase("added")) { // 增量,将新的partition加入hive表中

if (!addtohive.addpartitiontohive(filestatus, hdfsdir))

} else if (loadtype.equalsignorecase("covered")) { // 全量,将hive表中的曆史資料删除

if (!addtohive.delolddataofhive(filestatus, hdfsdir))

addtohive.copydatatohdfs(filestatus, hdfsdir);// 将linux本地檔案複制到hdfs目錄

loadtohive首先建立hdfs目錄,然後判斷加載方式,如果加載方式是“added”(增量),則在hive表中增加一個partition(該partition指向之前建立的hdfs目錄),再将資料檔案複制到hdfs目錄,完成hive表增量更新的過程;如果是“covered”(全量)加載方式,則先将曆史資料删除,然後将新的資料檔案複制到hdfs目錄即可。

3.4.4 loadtooracle

loadtooracle是自動加載程式在oracle資料庫上的實作。與hive資料倉庫的不同之處在于,oracle資料庫(以及其他關系型資料庫)有其自身的dbms體系,是以其資料加載方式一般采用其自身提供的實用工具,loadtooracle調用oracle的sqlldr進行資料批量加載(sqlldr請參考第2章關于oracel:sqlldr的内容)。

loadtooracle與loadtohive的另一個主要不同點在于,資料檔案在加載到目标資料庫表中之前,會先建立一個中間表(在原表名後加上temp),資料檔案先加載至temp表中,然後再分别根據增量或者全量的加載方式分别處理(參考1.2.2節中關于資料更新規則的内容)。

代碼清單3-8給出了loadtooracle的核心代碼。

代碼清單 3-8

private void loadtotable(targetserver targetserver, filestatus filestatus) {

string loadtype = filestatus.getfilesetting().getloadconfig().getloadtype();

string tablename = filestatus.getfilesetting().getloadconfig().gettablename();

string createtemptableres = oracleutil.createtemptable(tablename);// 建立temp表

if (!createtemptableres.equals("")) {

filestatus.setstatus(status.load_failed);

filestatus.setloginfo(createtemptableres);

if (!loadintotemptable(filestatus)) { // 加載資料至temp表

if (loadtype.equalsignorecase("added")) { // 增量更新

// temp表中資料insert到原表,并将temp表drop

if (oracleutil.insertanddroptable(tablename))

filestatus.setstatus(status.load_finished);

} else if (loadtype.equalsignorecase("covered")) { // 全量更新

oracleutil.dropandrenametable(tablename); //drop原表并将temp表重命名為原表

當資料檔案較大時,批量加載耗時較長,展現在代碼上,loadintotemptable(filestatus)将花費大量時間等待sqlldr批量加載指令傳回(可能達1個小時以上,取決于資料檔案的大小以及oracle資料庫伺服器的硬體資源),這個過程中,file_status表中的檔案狀态一直不發生改變,這會讓人有些擔心。

一個改進的辦法是,在等待sqlldr批量加載指令傳回的過程中,通過count對應temp表中資料的條數來回報加載程序,可以每隔5分鐘count一次temp表,并将對應的行數記錄下來,這樣就可以直覺感受到加載程式是否正在正常工作。

3.4.5 自動加載程式的部署架構

至此,我們已經完成了自動加載程式的設計和實作過程,現在看一下自動加載程式的部署架構。

将自動加載程式的主要組成項目scanfiles、downloadandunzip和加載(loadtooracle、loadtohive)導出為可執行jar包,得到scanfiles.jar、downloadandunzip.jar、loadtooracle. jar、loadtohive.jar。這些jar包各自專注于完成自己的功能,要完成自動加載程式的整個過程,需要這些可執行jar包之間互相協調工作,圖3-11展示了這些jar包的部署架構。

圖3-11 自動加載程式的部署架構圖

如圖3-11所示,将可執行jar檔案複制到相應的伺服器上,然後通過java-jar指令啟動這些可執行jar檔案即完成部署。例如在linux伺服器上,通過執行指令“java-jar/home/queziyang/jars/scanfiles-1.0.jar &amp;”,即在背景啟動scanfiles程序,通過jps指令可以檢視對應的程序資訊。

資料緩沖區上需要部署scanfiles.jar,所有後續程序均依賴于scanfiles.jar的産出。

每個目标伺服器(或伺服器叢集)上都需要部署downloadandunzip.jar,它将對應的資料檔案從ftp伺服器上下載下傳至目标伺服器,并在目标伺服器上完成檔案解壓。注意到,檔案的解壓縮并沒有在ftp伺服器上進行,而是下載下傳至本地後解壓,這樣做是很有必要的,因為檔案解壓需要占用大量cpu和i/o資源,是以在ftp伺服器上執行這項任務會嚴重影響資料緩沖區的正常功能。

3.4.6 程式的維護和優化

資料檔案在目标伺服器上解壓後,自動加載程式并未将檔案删除。為了節省伺服器的存儲空間,定期删除本地伺服器上保留的資料檔案是非常必要的。可以通過批處理的方式定期删除曆史資料檔案,也可以在loadtooracle或者loadtohive中加入删除資料檔案的功能,即當資料檔案成功加載後,将原始資料檔案以及解壓後的資料檔案删除。

另一個優化點是可以将上述配置資料庫的過程可視化,即在上述java項目的基礎上包裝成java web項目,這樣第一部分

資料與平台篇

(data &amp; infrastructures)

遲序之數,非出神怪,有形可檢,有數可推。

—祖沖之

數學是知識的工具,亦是其他知識工具的泉源。所有研究順序和度量的科學均和數學有關。

—笛卡兒

對于大部分非計算機專業出身的分析人員和業務人員來說,資料庫領域的專業術語簡直讓人抓狂,非要搞得那麼高深嗎?大可不必。

資料科學家是資料的應用者,以最大限度來提煉資料價值為目的,不必像資料倉庫開發者那樣對資料的存儲、結構以及資料倉庫的内生技術一清二楚,但應該站在找到資料、拼接資料、使用資料的角度,大體了解資料的分布、處理邏輯,以便為分析快速地準備素材。