天天看点

大数据与机器学习:实践方法与行业案例3.4 自动加载程序的多线程实现

<b>3.4 自动加载程序的多线程实现</b>

<b></b>

本书中的自动加载程序使用java+hibernate实现,需要首先准备项目所需要的hibernate和annotation相应版本的jar包。

程序需要初始化文件状态表,即根据file_status表中的配置,一次性产生当日需要处理的文件的初始记录,这些记录会插入file_status表中,供随后的处理模块使用。

为了便于并行处理,我们将自动加载程序的整个流程划分为多个子项目:

1)扫描文件子项目,取名为scanfiles。

2)将下载文件和解压文件归为一个java项目,取名为downloadandunzip。

3)加载文件为另一个子项目,例如针对oracle,可以创建一个loadtooracle的项目,针对加载至hive表中的场景,可以创建一个loadtohive的项目,当然也可以放在一起用参数区分。

这些java子项目之间通过file_status配置表进行数据通信,图3-10展示了这些子项目的信息交互。

图3-10 项目之间的关联关系

3.4.1 scanfiles

scanfiles是常驻内存进程,包含初始化文件状态和扫描文件两个功能。初始化文件状态模块在每天凌晨根据数据文件信息表(file_settings),将当日需要处理的数据文件信息初始化至数据文件状态表(file_status)中,初始化字段包含文件全名、批次日期、文件日期、文件状态(初始化为file_not_exists)等,更多内容可参考前面对数据文件状态表的介绍。

扫描文件模块根据数据文件状态表中当日的初始化信息,在对应的目录中扫描数据文件,更新对应的状态信息。

代码清单3-3和代码清单3-4分别给出了初始化文件状态表的代码和扫描文件的代码。

代码清单 3-3

private static void generatethelist() {

if (filestatusdao.gettodaystatuscnt().longvalue() &gt; 0l) {

// 设置taskday为today

taskdaystr = datecalc.formateddate(dateformatstr, datecalc.today());

return;

}

filesettingdao filesettingdao = new filesettingdaoimpl();

list&lt;filesetting&gt; filesettings = filesettingdao.getallfilesettings();

set&lt;long&gt; fileidset = new hashset&lt;long&gt;();

for (filesetting filesetting : filesettings) {

if (fileidset.contains(filesetting.getid()))

continue;

else

fileidset.add(filesetting.getid());

// 文件名中的日期

string filedate = datecalc.formateddate(filesetting.getfiledateformat(),

                 datecalc.dateadd(datecalc.today(),

                 filesetting.getdatedif()));

// 当天需要下载的文件全名,不含路径

string fullname = filesetting.getfileprefix() + filedate +

    filesetting.getfilemid() + filesetting.getfilesuffix();

filestatus filestatus = new filestatus();

filestatus.setfilesetting(filesetting);

filestatus.setfullname(fullname);

filestatus.setstatus(status.file_not_exists); // 初始状态为file_not_exists

filestatus.setfilesize(0l);

filestatus.setfilesizecheckcnt(0);

filestatus.setfiledate(filedate);

filestatus.setbatchdate(datecalc.formateddate(dateformatstr, datecalc.today()));

filestatus.setpriority(filesetting.getpriority());

for (targetserver targetserver : filesetting.gettargetserverset()) {

filestatus.settargetserver(targetserver);

filesettingdao.save(filestatus);

try {

thread.sleep(300);// 暂停0.3秒

} catch (interruptedexception e) {

logger.error(e.tostring());

// 设置taskday为今天

generatethelist()方法通过filestatusdao.gettodaystatuscnt(). longvalue() &gt; 0l判断当天的文件状态是否已经初始化,如果已经初始化,则返回;否则通过filesettingdao遍历所有的file_settings记录,并通过filestatus实体对象将每条file_settings记录初始化为一个filestatus对象,最后通过filesettingdao.save(filestatus)将对应的初始化文件状态记录保存至file_status表中。

generatethelist()方法仅在每天凌晨成功调用一次即可,调用成功之后,file_status表中将会插入当日需要处理的所有数据文件信息,并且文件的初始状态为file_not_exists。

代码清单 3-4

public class scanfilethread extends thread {

private static logger logger = logger.getlogger(scanfilethread.class);

private static filestatusdao filestatusdao = new filestatusdaoimpl();

public void run() {

synchronized (scanfilethread.class) {

scanfile();

private void scanfile() {

while (true) {

list&lt;filestatus&gt; filestatuslist = filestatusdao.getfilebystatus(

new status[] { status.file_not_exists, status.checking_ftp_file });

for (filestatus filestatus : filestatuslist) {

filesetting filesetting = filestatus.getfilesetting();

string remotefilename = filestatus.getfullname(); // 文件全名

long filesizeold = filestatus.getfilesize();

integer filecheckcnt = filestatus.getfilesizecheckcnt();

ftpclient ftpclient = ftputil.getftpclient(filesetting);

if (ftpclient == null)

ftpfile[] ftpfiles = ftpclient.listfiles(remotefilename);

if (ftpfiles.length == 0) {

ftpclient.logout();

ftpclient.disconnect();

if (filestatus.getstatus().equals(

status.file_not_exists.name())) {

filestatus.setstatus(status.checking_ftp_file);

} else if (filestatus.getstatus().equals(status.

checking_ftp_file.name())) {

long filesizenew = ftpfiles[0].getsize();

if (filesizenew.longvalue() != filesizeold.

longvalue()) {

filestatus.setfilesize(filesizenew);

} else {

if (filecheckcnt &lt; 5)

filestatus.setfilesizecheckcnt(filecheckcnt + 1);

filestatus.setstatus(status.ftp_file_ok);

filestatusdao.update(filestatus);

} catch (ioexception e) {

logger.error("扫描ftp文件时出错:" + e.tostring());

} finally {

if (ftpclient.isconnected()) {

logger.error("断开ftp连接时出错:" + e.

tostring());

thread.sleep(15000); // sleep 15秒

logger.error("thread.sleep(15000)时出错:" + e.tostring());

  }

scanfilethread类使用了filestatusdao类用于数据库的读/写功能,该类的父类basedao提供了hibernate的sessionfactory以及事务上的支持。

scanfilethread类的主要方法scanfile()中用于读取文件类型为status.file_not_exists以及status.checking_ftp_file的全部记录,通过ftpclient.listfiles(remotefilename)查看指定文件是否存在于ftp服务器中,一旦发现文件存在,就将文件的状态由status.file_not_exists更新为status.checking_ftp_file。

随后程序通过多次(每次间隔15秒)比较文件大小来确定ftp服务器上的文件是否已经上传完成,当连续5次检测到的文件大小都相同时,则认为文件已经完整上传至ftp服务器,程序将该文件的状态由status.checking_ftp_file更新为status.ftp_file_ok,标明该文件可以开始下载了。

3.4.2 downloadandunzip

在downloadandunzip项目中,包含两个重要的java类:getfilethread类和unzipthread类,它们分别负责下载文件和解压文件。

getfilethread类的主要作用是采用多线程的方式下载文件,其核心方法是代码清单3-5中所示的ftpgetfile方法。

代码清单 3-5

private static void ftpgetfile(filestatus filestatus) {

ftpclient ftpclient = ftputil.getftpclient(filestatus.getfilesetting());

string remotefilename = filestatus.getfullname(); // 文件全名,带路径

if (filestatus.getstatus().equalsignorecase("ftp_file_ok")) {

filestatus.setstatus(status.download_start); // 更新下载状态

string localbasedir = globalconfig.get("localbasedir").getconfigcontent();

string tablename = filesetting.getloadconfig().gettablename();

string ftppath = filesetting.getftppath();

date filedate = datecalc.dateadd(datecalc.today(), filesetting.getdatedif());

string filedatestr = datecalc.formateddate(filesetting.

getfiledateformat(), filedate);

string localdir = null; // 文件下载后,存放的本地目录

if (ftppath.equals(""))

localdir = localbasedir + "/" + tablename + "/" + filedatestr;

localdir = localbasedir + "/" + tablename + "/" + ftppath +

"/" + filedatestr;

// 创建指定路径

file ldir = new file(localdir);

if (!ldir.exists())

ldir.mkdirs();

fileoutputstream fos = null;

fos = new fileoutputstream(ldir.getabsolutepath() + "/" +

remotefilename);

} catch (filenotfoundexception e) {

logger.error("fileoutputstream:" + e.tostring());

ftpclient.setfiletype(ftpclient.binary_file_type);

ftpclient.setbuffersize(1024000);

string downloadfilepath = null;

if (ftpclient.retrievefile(remotefilename, fos)) {// 下载完成

downloadfilepath = localdir + "/" + filestatus.getfullname();

fos.close();

filestatus.setfilepath(downloadfilepath);

filestatus.setstatus(status.download_finished);

filestatus.setstatus(status.download_failed); // 更新下载状态

filestatus.setloginfo(e.tostring());

logger.error("下载文件出错:" + e.tostring());

if (ftpclient != null)

logger.error("ftpclient.disconnect()出错:" + e.

该方法用于检测状态为ftp_file_ok的数据文件,根据配置信息自动创建以文件对应表名、ftp远程路径和文件日期拼接的本地文件存放路径(localdir = localbasedir + "/" + tablename + "/" + ftppath + "/" + filedatestr;),然后将文件下载至上述创建的目录中。在下载过程中,同时更新文件的下载状态,并记录文件下载完成后的保存路径。

unzipthread类的作用是将文件状态为download_finished且需要解压缩的数据文件(通过文件后缀区分)进行解压,并记录解压后的文件名称和路径。代码清单3-6展示了unzipthread类的主要方法unzip()。

代码清单 3-6

public static void unzip(filestatus filestatus) {

if (!filestatus.getstatus().equals("download_finished")){

logger.error("file status not yet in download_finished!");

string filesuffix = filesetting.getfilesuffix().tolowercase();

filestatus.setstatus(status.unzip_start);

filestatus.setupdatetime(new timestamp(new date().gettime()));

if (filesuffix.equals(".zip"))

zip(filestatus);

else if (filesuffix.equals(".gz"))

gz(filestatus);

unzip()方法支持.zip和.gz两种压缩方式,如果压缩文件是其他压缩方式,比如.gzip,那么需要自行编写针对.gzip格式文件的解压代码。

unzip()方法调用成功后,会根据实际解压结果更新文件状态为unzip_failed或者unzip_finished,如果解压成功,则记录解压后的文件名称及文件路径。

3.4.3 loadtohive

loadtohive用于将load_config中db_type='hive'的数据文件导入对应的hive表中,并且根据load_type字段进行不同的加载处理。代码清单3-7展示了loadtohive的核心代码。

代码清单 3-7

private void loadtotable(targetserver targetserver,filestatus filestatus) {

filestatus.setstatus(status.load_start);

string loadtype = filestatus.getfilesetting().

getloadconfig().getloadtype();

string hdfsbasedir = targetserver.gethdfsbasedir();

string hdfsdir = null;

if (loadtype.equalsignorecase("added"))

hdfsdir = hdfsbasedir + "/" +

ilestatus.getfilesetting().getloadconfig().gettablename() + "/" +

filestatus.getfiledate();

else if (loadtype.equalsignorecase("covered"))

filestatus.getfilesetting().getloadconfig().gettablename();

if (!addtohive.createhdfsdir(filestatus, hdfsdir)) //创建hdfs目录

if (loadtype.equalsignorecase("added")) { // 增量,将新的partition加入hive表中

if (!addtohive.addpartitiontohive(filestatus, hdfsdir))

} else if (loadtype.equalsignorecase("covered")) { // 全量,将hive表中的历史数据删除

if (!addtohive.delolddataofhive(filestatus, hdfsdir))

addtohive.copydatatohdfs(filestatus, hdfsdir);// 将linux本地文件复制到hdfs目录

loadtohive首先创建hdfs目录,然后判断加载方式,如果加载方式是“added”(增量),则在hive表中增加一个partition(该partition指向之前创建的hdfs目录),再将数据文件复制到hdfs目录,完成hive表增量更新的过程;如果是“covered”(全量)加载方式,则先将历史数据删除,然后将新的数据文件复制到hdfs目录即可。

3.4.4 loadtooracle

loadtooracle是自动加载程序在oracle数据库上的实现。与hive数据仓库的不同之处在于,oracle数据库(以及其他关系型数据库)有其自身的dbms体系,因此其数据加载方式一般采用其自身提供的实用工具,loadtooracle调用oracle的sqlldr进行数据批量加载(sqlldr请参考第2章关于oracel:sqlldr的内容)。

loadtooracle与loadtohive的另一个主要不同点在于,数据文件在加载到目标数据库表中之前,会先创建一个中间表(在原表名后加上temp),数据文件先加载至temp表中,然后再分别根据增量或者全量的加载方式分别处理(参考1.2.2节中关于数据更新规则的内容)。

代码清单3-8给出了loadtooracle的核心代码。

代码清单 3-8

private void loadtotable(targetserver targetserver, filestatus filestatus) {

string loadtype = filestatus.getfilesetting().getloadconfig().getloadtype();

string tablename = filestatus.getfilesetting().getloadconfig().gettablename();

string createtemptableres = oracleutil.createtemptable(tablename);// 创建temp表

if (!createtemptableres.equals("")) {

filestatus.setstatus(status.load_failed);

filestatus.setloginfo(createtemptableres);

if (!loadintotemptable(filestatus)) { // 加载数据至temp表

if (loadtype.equalsignorecase("added")) { // 增量更新

// temp表中数据insert到原表,并将temp表drop

if (oracleutil.insertanddroptable(tablename))

filestatus.setstatus(status.load_finished);

} else if (loadtype.equalsignorecase("covered")) { // 全量更新

oracleutil.dropandrenametable(tablename); //drop原表并将temp表重命名为原表

当数据文件较大时,批量加载耗时较长,体现在代码上,loadintotemptable(filestatus)将花费大量时间等待sqlldr批量加载命令返回(可能达1个小时以上,取决于数据文件的大小以及oracle数据库服务器的硬件资源),这个过程中,file_status表中的文件状态一直不发生改变,这会让人有些担心。

一个改进的办法是,在等待sqlldr批量加载命令返回的过程中,通过count对应temp表中数据的条数来反馈加载进程,可以每隔5分钟count一次temp表,并将对应的行数记录下来,这样就可以直观感受到加载程序是否正在正常工作。

3.4.5 自动加载程序的部署架构

至此,我们已经完成了自动加载程序的设计和实现过程,现在看一下自动加载程序的部署框架。

将自动加载程序的主要组成项目scanfiles、downloadandunzip和加载(loadtooracle、loadtohive)导出为可执行jar包,得到scanfiles.jar、downloadandunzip.jar、loadtooracle. jar、loadtohive.jar。这些jar包各自专注于完成自己的功能,要完成自动加载程序的整个过程,需要这些可执行jar包之间相互协调工作,图3-11展示了这些jar包的部署架构。

图3-11 自动加载程序的部署架构图

如图3-11所示,将可执行jar文件复制到相应的服务器上,然后通过java-jar命令启动这些可执行jar文件即完成部署。例如在linux服务器上,通过执行命令“java-jar/home/queziyang/jars/scanfiles-1.0.jar &amp;”,即在后台启动scanfiles进程,通过jps命令可以查看对应的进程信息。

数据缓冲区上需要部署scanfiles.jar,所有后续进程均依赖于scanfiles.jar的产出。

每个目标服务器(或服务器集群)上都需要部署downloadandunzip.jar,它将对应的数据文件从ftp服务器上下载至目标服务器,并在目标服务器上完成文件解压。注意到,文件的解压缩并没有在ftp服务器上进行,而是下载至本地后解压,这样做是很有必要的,因为文件解压需要占用大量cpu和i/o资源,因此在ftp服务器上执行这项任务会严重影响数据缓冲区的正常功能。

3.4.6 程序的维护和优化

数据文件在目标服务器上解压后,自动加载程序并未将文件删除。为了节省服务器的存储空间,定期删除本地服务器上保留的数据文件是非常必要的。可以通过批处理的方式定期删除历史数据文件,也可以在loadtooracle或者loadtohive中加入删除数据文件的功能,即当数据文件成功加载后,将原始数据文件以及解压后的数据文件删除。

另一个优化点是可以将上述配置数据库的过程可视化,即在上述java项目的基础上包装成java web项目,这样第一部分

数据与平台篇

(data &amp; infrastructures)

迟序之数,非出神怪,有形可检,有数可推。

—祖冲之

数学是知识的工具,亦是其他知识工具的泉源。所有研究顺序和度量的科学均和数学有关。

—笛卡儿

对于大部分非计算机专业出身的分析人员和业务人员来说,数据库领域的专业术语简直让人抓狂,非要搞得那么高深吗?大可不必。

数据科学家是数据的应用者,以最大限度来提炼数据价值为目的,不必像数据仓库开发者那样对数据的存储、结构以及数据仓库的内生技术一清二楚,但应该站在找到数据、拼接数据、使用数据的角度,大体了解数据的分布、处理逻辑,以便为分析快速地准备素材。