天天看点

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)
Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

  一共12列,我们只需提取有用的列:第二列(犯罪类型)、第四列(一周的哪一天)、第五列(具体时间)和第七列(犯罪场所)。

        基于项目的需求,我们通过以下几步完成:

1、首先根据数据集,分别统计出不同犯罪类别在周时段内发生犯罪次数和不同区域在周时段内发生犯罪的次数。

2、然后根据第一步的输出结果,再按日期统计出每天每种犯罪类别在每个区域发生的犯罪次数。

3、将前两步的输出结果,按需求插入数据库,便于对犯罪数据的分析。

        我们要编写5个文件:

编写基类,MapReduceJobBase.java

数据处理类,DataFile.java

编写第一个任务类,SanFranciscoCrime.java

编写第二个任务类,SanFranciscoCrimePrepOlap.java

编写第三个任务,插入数据库类,LoadStarDB.java

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)
Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)
Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)
Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)
Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

    Hive那边的 数据库首先需要创建4个表,

分别为:category(name,cid)、

district(name,did)、

fact(fid,district_id,category_id,time_id,crimes)和

timeperiod(tpid,year,month,week,day)。

  1、myclipse将项目编译和打包为crime.jar,使用SSH将crime.jar上传至hadoop的/home/hadoop/目录下。

  2、使用cd /home/hadoop/djt 切换到当前目录,通过命令行执行任务。

         2.1 首先执行第一个作业 SanFranciscoCrime.java。

     2.2    然后执行第二个作业SanFranciscoCrimePrepOlap.java。

       2.3      最后执行第三个作业LoadStarDB.java,将数据插入数据库。

        任务的最终结果插入数据库,数据结果如下图所示。字段分别为:区域主键district_id、类别主键category_id、时间主键time_id、犯罪次数crimes和主键fid。

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

代码

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.text.DateFormat;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import org.apache.hadoop.conf.Configured;

/**

* @function 在 MapReduce 基类中,定义基础成员变量,减少 MapReduce 主类的工作量

*/

public class MapReduceJobBase extends Configured

{

* 犯罪类型在犯罪数据数组的下标为1的位置

protected static final int CATEGORY_COLUMN_INDEX = 1;

* 礼拜几在犯罪数据数组的下标为3的位置

protected static final int DAY_OF_WEEK_COLUMN_INDEX = 3;

* 日期在犯罪数据数组的下标为4的位置

protected static final int DATE_COLUMN_INDEX = 4;

* 犯罪区域在犯罪数据数组的下标为6的位置

protected static final int DISTRICT_COLUMN_INDEX = 6;

* 定义日期的数据格式

protected static final DateFormat df = new SimpleDateFormat("MM/dd/yyyy");

* 定义 map/reduce job结果中,日期的输出格式

protected static final DateFormat outputDateFormat = new SimpleDateFormat("yyyy/MM/dd");

* @function 将字符串格式的日期转换为自定义Date类型的日期

* @param value 包含完整的日期字符串

* @return Date类型的日期

* @throws ParseException

protected static Date getDate(String value) throws ParseException 

Date retVal = null;

String[] dp = value.split(" ");

if (dp.length > 0) {

retVal = df.parse(dp[0]);

}

return retVal;

import java.io.BufferedReader;

import java.io.ByteArrayInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.ArrayList;

import java.util.Collections;

import java.util.List;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import com.opencsv.CSVReader;

* @function 从 map/reduce的输出结果中读取并提取数据

public abstract class DataFile 

* @function 从 map/reduce job 的输出结果,提取key值集合

* @param fn HDFS上的文件路径

* @return list key值的集合

* @throws IOException

public static List<String> extractKeys(String fn,FileSystem fs) throws IOException 

FSDataInputStream in = fs.open(new Path(fn));//打开文件

List<String> retVal = new ArrayList<String>();//新建存储key值的集合list

BufferedReader br = new BufferedReader(new InputStreamReader(in));

String line = br.readLine();//按行读取数据

while (line != null) 

String[] lp = line.split("\t");

if (lp.length > 0) 

retVal.add(lp[0]);//提取每行的第一个字段key

line = br.readLine();

br.close();

Collections.sort(retVal);//对key值进行排序

* @function 将 csv文件格式的每行内容转换为数组返回

* @param 读取的一行数据

* @return array 数组

public static String[] getColumns(String line) throws IOException

CSVReader reader = new CSVReader(new InputStreamReader(new ByteArrayInputStream(line.getBytes())));

String[] retVal = reader.readNext();

reader.close();

import java.text.MessageFormat;

import java.util.Calendar;

import java.util.HashMap;

import java.util.logging.Level;

import java.util.logging.Logger;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;

import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

* 时段系统(bucketed system),在物料需求计划(MRP)、配销资源规划(DRP)或其他时程化(time-phased)的系统里,

* 所有时程化的资料都累积在同一时期,或称时段(buchet)。如果累积的时间是以周为时间单位,此系统就称为周时段(weekly buckets)。

* 周时段(weekly buckets)即是一种以周为单位的统计方式

* @function 统计每个事件在每个周时段内发生的次数

*

public class SanFranciscoCrime extends MapReduceJobBase implements Tool 

private static Logger log = Logger

.getLogger(SanFranciscoCrime.class.getCanonicalName());

* CrimeMapper是一个公共的父类

public static class CrimeMapper extends Mapper<LongWritable, Text, Text, Text>

protected int keyID = 0;

protected int valueID = 0;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 

String line = value.toString();

try {

String[] col = DataFile.getColumns(line);

if (col != null)

// 防止数组超界

if (col.length >= (DISTRICT_COLUMN_INDEX + 1))

//过滤文件第一行头部名称

if (!"date".equalsIgnoreCase(col[valueID])) 

Text tk = new Text();

tk.set(col[keyID]);

Text tv = new Text();

tv.set(col[valueID]);

context.write(tk, tv);

} else 

log.warning(MessageFormat.format(

"Data {0} did not parse into columns.",

new Object[] { line }));

} catch (NumberFormatException nfe)

log.log(Level.WARNING, MessageFormat

.format("Expected {0} to be a number.\n",

new Object[] { line }), nfe);

} catch (IOException e) {

log.log(Level.WARNING, MessageFormat.format(

"Cannot parse {0} into columns.\n",

new Object[] { line }), e);

* 输出key为犯罪类别,value为日期

public static class CategoryMapByDate extends CrimeMapper 

public CategoryMapByDate() 

keyID = CATEGORY_COLUMN_INDEX;//key为犯罪类别

valueID = DATE_COLUMN_INDEX;//value为日期

* 输出key为犯罪区域,value为日期

public static class DistrictMapByDate extends CrimeMapper

public DistrictMapByDate()

keyID = DISTRICT_COLUMN_INDEX;//key为犯罪区域

* 统计并解析 Mapper 端的输出结果

public static class CrimeReducerByWeek extends Reducer<Text, Text, Text, Text>

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException

List<String> incidents = new ArrayList<String>();

// 将values放入incidents列表中

for (Text value : values)

incidents.add(value.toString());

if (incidents.size() > 0)

//对incidents列表排序

Collections.sort(incidents);

java.util.Map<Integer, Integer> weekSummary = new HashMap<Integer, Integer>();

//因为是对1-3月数据分析,周时段(weekly buckets)最大为15,所以weekSummary长度为15即可

for (int i = 0; i < 16; i++)

weekSummary.put(i, 0);

//统计每个周时段(weekly buckets)内,该事件发生的次数

for (String incidentDay : incidents) 

try 

Date d = getDate(incidentDay);

Calendar cal = Calendar.getInstance();

cal.setTime(d);

int week = cal.get(Calendar.WEEK_OF_MONTH);//这个月的第几周

int month = cal.get(Calendar.MONTH);//第几个月,从0开始

//如果累积的时间是以周为时间单位,此系统就称为周时段(weekly buckets)。

//周时段的计算公式,最大为15,它只是一种统计方式,不必深究

int bucket = (month * 5) + week;

//统计每个周时段内,该事件发生的次数

if (weekSummary.containsKey(bucket)) 

weekSummary.put(bucket, new Integer(weekSummary

.get(bucket).intValue() + 1));

} else

weekSummary.put(bucket, new Integer(1));

} catch (ParseException pe)

log.warning(MessageFormat.format("Invalid date {0}",

new Object[] { incidentDay }));

// 将该事件在每个周时段内发生的次数生成字符串输出

StringBuffer rpt = new StringBuffer();

boolean first = true;

for (int week : weekSummary.keySet())

if (first) 

first = false;

rpt.append(",");

rpt.append(new Integer(weekSummary.get(week)).toString());

String list = rpt.toString();

tv.set(list);

//value为0-15周时段内,该事件发生的次数

context.write(key, tv);

@Override

public int run(String[] args) throws Exception 

Configuration conf1 = new Configuration();

Path out1 = new Path(args[1]);

FileSystem hdfs1 = out1.getFileSystem(conf1);

if (hdfs1.isDirectory(out1)) 

hdfs1.delete(out1, true);

// 任务1

Job job1 = new Job(conf1, "crime");

job1.setJarByClass(SanFranciscoCrime.class);

job1.setMapperClass(CategoryMapByDate.class);

job1.setReducerClass(CrimeReducerByWeek.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job1, new Path(args[0]));

FileOutputFormat.setOutputPath(job1, new Path(args[1]));

// 任务2

Configuration conf2 = new Configuration();

Path out2 = new Path(args[2]);

FileSystem hdfs2 = out2.getFileSystem(conf2);

if (hdfs2.isDirectory(out2))

hdfs2.delete(out2, true);

Job job2 = new Job(conf2, "crime");

job2.setJarByClass(SanFranciscoCrime.class);

job2.setMapperClass(DistrictMapByDate.class);

job2.setReducerClass(CrimeReducerByWeek.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job2, new Path(args[0]));

FileOutputFormat.setOutputPath(job2, new Path(args[2]));

// 构造一个 cJob1

ControlledJob cJob1 = new ControlledJob(conf1);

//设置 MapReduce job1

cJob1.setJob(job1);

// 构造一个 cJob2

ControlledJob cJob2 = new ControlledJob(conf2);

//设置 MapReduce job2

cJob2.setJob(job2);

//cJob2.addDependingJob(cJob1);// cjob2依赖cjob1

// 定义job管理对象

JobControl jobControl = new JobControl("12");

//把两个构造的job加入到JobControl中

jobControl.addJob(cJob1);

jobControl.addJob(cJob2);

//启动线程运行任务

Thread t = new Thread(jobControl);

t.start();

while (true)

if (jobControl.allFinished())

jobControl.stop();

break;

return 0;

public static void main(String[] args) throws Exception 

String[] args0 = 

"hdfs://HadoopMaster:9000/middle/crime/crime.csv",

"hdfs://HadoopMaster:9000/middle/test/out1/",

"hdfs://HadoopMaster:9000/middle/test/out2/" };

int ec = ToolRunner.run(new Configuration(), new SanFranciscoCrime(), args0);

System.exit(ec);

import java.net.URI;

* @function 统计每天每种犯罪类型在每个区域发生的次数

public class SanFranciscoCrimePrepOlap extends MapReduceJobBase implements Tool 

private static Logger log = Logger.getLogger(SanFranciscoCrimePrepOlap.class.getCanonicalName());

private static List<String> categories = null;

private static List<String> districts = null;

private static final java.util.Map<String, Integer> categoryLookup = new HashMap<String, Integer>();

private static final java.util.Map<String, Integer> districtLookup = new HashMap<String, Integer>();

public static abstract class Map extends Mapper<LongWritable, Text, Text, Text> 

protected int value2ID = 0;

* @function 将key值转换为规范的数据格式

* @param value 包含不规范的 key值

* @return 返回规范的key值

protected abstract String formatKey(String value) throws ParseException;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException

String[] col = DataFile.getColumns(line);//将读取的每行数据转换为数组

if (col != null) 

tk.set(formatKey(col[keyID]));//将日期作为key值

StringBuffer sv = new StringBuffer();

sv.append("\"");

sv.append(col[valueID]);//犯罪区域

sv.append(",");

sv.append(col[value2ID]);//犯罪类型

tv.set(sv.toString());

log.warning(MessageFormat.format("Data {0} did not parse into columns.", new Object[]{line}));

} catch (NumberFormatException nfe) 

log.log(Level.WARNING, MessageFormat.format("Expected {0} to be a number.\n", new Object[]{line}), nfe);

} catch (IOException e) 

log.log(Level.WARNING, MessageFormat.format("Cannot parse {0} into columns.\n", new Object[]{line}), e);

} catch (ParseException e)

log.log(Level.WARNING, MessageFormat.format("Expected {0} to be a date but it was not.\n", new Object[]{line}), e);

* @function 将 map 输入数据的日期作为key,犯罪区域和犯罪类型作为value,然后输出

public static class DateMapByCategoryAndDistrict extends Map 

public DateMapByCategoryAndDistrict() 

keyID = DATE_COLUMN_INDEX;//代表日期下标

valueID = DISTRICT_COLUMN_INDEX;//代表犯罪区域下标

value2ID = CATEGORY_COLUMN_INDEX;//代表犯罪类型下标

protected String formatKey(String value) throws ParseException 

return outputDateFormat.format(getDate(value));

public static class Reduce extends Reducer<Text, Text, Text, Text> 

// 分配和初始化犯罪类型所在区域的二维数组

int[][] crimes = new int[categories.size()][districts.size()];

for (int i = 0; i < categories.size(); i++) 

for (int j = 0; j < districts.size(); j++) 

crimes[i][j] = 0;

//统计犯罪类型/区域二维数组的值(即每种犯罪类型在每个区域发生的次数)

for (Text crime:values)

String[] cols = DataFile.getColumns(crime.toString());

if (cols.length == 2)

if (categoryLookup.containsKey(cols[1]))

if (districtLookup.containsKey(cols[0]))

int cat = categoryLookup.get(cols[1]);

int dist = districtLookup.get(cols[0]);

crimes[cat][dist]++;

log.warning(MessageFormat.format("District {0} not found.", new Object[]{cols[0]}));

log.warning(MessageFormat.format("Category {0} not found.", new Object[]{cols[1]}));

log.warning(MessageFormat.format("Input {0} was in unexpected format", new Object[]{crime}));

//将非0二维数组的犯罪类别下标,犯罪区域下标,犯罪次数作为value输出

for (int j = 0; j < districts.size(); j++)

if (crimes[i][j] > 0) 

sv.append(new Integer(i).toString());//犯罪类别下标

sv.append(new Integer(j).toString());//犯罪区域下标

sv.append(new Integer(crimes[i][j]));//犯罪次数

* @function 加载已经生成的 犯罪类别数据和犯罪区域数据,并将这些数据排序后存入Map

* @param categoryReport SanFranciscoCrime job任务输出犯罪类别的文件路径

* @param districtReport SanFranciscoCrime job任务输出犯罪区域的文件路径

private static void setup(String categoryReport, String districtReport,FileSystem fs) throws IOException 

categories = DataFile.extractKeys(categoryReport,fs);

districts = DataFile.extractKeys(districtReport,fs);

int i = 0;

for (String category : categories) 

categoryLookup.put(category, i++);

i = 0;

for (String district : districts) 

districtLookup.put(district, i++);

public int run(String[] arg0) throws Exception 

// TODO Auto-generated method stub

Configuration conf = new Configuration();

Path out = new Path(arg0[3]);

FileSystem hdfs = out.getFileSystem(conf);

if (hdfs.isDirectory(out)) 

hdfs.delete(out, true);

Job job = new Job(conf, "SanFranciscoCrimePrepOlap");

job.setJarByClass(SanFranciscoCrimePrepOlap.class);

job.setMapperClass(DateMapByCategoryAndDistrict.class);//Mapper

job.setReducerClass(Reduce.class);//Reducer

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(arg0[0]));

FileOutputFormat.setOutputPath(job, new Path(arg0[3]));

job.waitForCompletion(true);//提交任务

public static void main(String[] args) throws Exception

String[] args0 = {

"hdfs://HadoopMaster:9000/middle/test/out1/part-r-00000",

"hdfs://HadoopMaster:9000/middle/test/out2/part-r-00000",

"hdfs://HadoopMaster:9000/middle/test/out3/"};

if (args0.length == 4) 

FileSystem fs = FileSystem.get(URI.create("hdfs://HadoopMaster:9000"), conf);

//调用setup

setup(args0[1], args0[2],fs);

//执行MapReduce任务

int ec = ToolRunner.run(conf, new SanFranciscoCrimePrepOlap(), args0);

System.err.println("\nusage: bin/hadoop jar sfcrime.hadoop.mapreduce.jobs-0.0.1-SNAPSHOT.jar SanFranciscoCrimePrepOlap path/to/category/report path/to/district/report path/to/input/data path/to/output/data");

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.Map;

/***

* @function 从 MapReduce 任务中,提取数据,插入到mysql数据库

public class LoadStarDB

private Connection db = null;//mysql数据库连接

private Map<String, Integer> lastPrimaryKey = new HashMap<String, Integer>();

private List<String> categories = null;//犯罪类别list

private List<String> districts = null;//犯罪区域list

//映射date主键的关系

private final java.util.Map<Date, Integer> timeperiodLookup = new HashMap<Date, Integer>();

private final DateFormat df = new SimpleDateFormat("MM/dd/yyyy");//插入数据库的日期格式

private final DateFormat kdf = new SimpleDateFormat("yyyy/MM/dd");//从map/reduce任务输出文件中,解析出此日期

* @function 向数据库表中插入一条记录

* @param table 表名称

* @param row 包含插入字段的数据

* @return 返回此记录的主键id

* @throws SQLException

private int insert(String table, DataRecord row) throws SQLException 

int retVal = 0;

Statement s = db.createStatement();

StringBuffer sql = new StringBuffer();

sql.append("insert into ");

sql.append(table);

sql.append(" ");

sql.append(row.toString());

s.execute(sql.toString());

if (lastPrimaryKey.containsKey(table)) 

retVal = lastPrimaryKey.get(table) + 1;

lastPrimaryKey.put(table, retVal);

lastPrimaryKey.put(table, 1);

retVal = 1;

* @function 向数据库中插入一条犯罪类别记录

* @param category name字段对应的值

private int insertCategory(String category) throws SQLException

DataRecord dr = new DataRecord();

dr.put("name", category);

return insert("category", dr);

* @function 向数据库中插入一条犯罪区域记录

* @param district name字段对应的值 

private int insertDistrict(String district) throws SQLException 

dr.put("name", district);

return insert("district", dr);

* @function 将日期date拆分为字段 year, month, week, 和 day

* @param dr 包含date被拆分的字段

* @param d 需要拆分的date日期

private void setTimePeriod(DataRecord dr, Date d) 

dr.put("year", cal.get(Calendar.YEAR));

dr.put("month", cal.get(Calendar.MONTH));

dr.put("week", cal.get(Calendar.WEEK_OF_MONTH));

dr.put("day", cal.get(Calendar.DAY_OF_MONTH));

* @function 如果日期date已经存在表中,返回主键id,如果不存在,则插入数据库并返回主键id

* @param d 日期date

* @return 返回此日期对应的主键id

private int insertTimePeriod(Date d) throws SQLException

if (timeperiodLookup.containsKey(d)) 

retVal = timeperiodLookup.get(d);

setTimePeriod(dr, d);

retVal = insert("timeperiod", dr);

timeperiodLookup.put(d, retVal);

* @function 将数据记录插入fact表中

* @param districtId 犯罪区域外键id

* @param categoryId 犯罪类别外键id

* @param timeId 日期外键id

* @param crimes 在某一日期 某一区域 发生某一犯罪类别的总犯罪次数

* committed in this district of this category at his time* 

private void insertFact(int districtId, int categoryId, int timeId, int crimes) throws SQLException 

dr.put("district_id", districtId);

dr.put("category_id", categoryId);

dr.put("time_id", timeId);

dr.put("crimes", crimes);

insert("fact", dr);

* @function 从SanFrancisco Crime map/reduce job输出结果中,读取数据

* @param categoryReport 犯罪类别文件路径

* @param districtReport 犯罪区域文件路径

* @throws IOException* 

private void setup(String categoryReport, String districtReport,FileSystem fs) throws IOException, SQLException 

for (String category : categories)

insertCategory(category);

for (String district : districts)

insertDistrict(district);

* @function 清空name表中的所有记录

* @param name 表名称

private void truncate(String name) throws SQLException

s.execute("truncate table ".concat(name));

s.close();

* @function 调用truncate()方法,清空表记录

private void reset() throws SQLException 

truncate("fact");

truncate("category");

truncate("district");

truncate("timeperiod");

* @function 解析加载的数据

* @param dbhost 数据库地址

* @param dbname 数据库名称

* @param dbuser 用户名

* @param dbpassword 密码

* @throws ClassNotFoundException* 

* @throws SQLException* 

private LoadStarDB(String categoryReport, String districtReport,

String dbhost, String dbname, String dbuser, String dbpassword,FileSystem fs)

throws ClassNotFoundException, SQLException, IOException

Class.forName("com.mysql.jdbc.Driver");

String cs = MessageFormat

.format("jdbc:mysql://192.168.80.128:3306/test?user=root&password=root&autoReconnect=true",

new Object[] { dbhost, dbname, dbuser, dbpassword });

db = DriverManager.getConnection(cs);

reset();

setup(categoryReport, districtReport,fs);

* @function 处理 SanFranciscoCrimPrepOlap map/reduce job任务输出结果,填充 timeperiod表和fact表 

* @param dataFile 文件路径

private void processData(String dataFile,FileSystem fs) throws IOException,ParseException 

FSDataInputStream in = fs.open(new Path(dataFile));//打开数据流

BufferedReader br = new BufferedReader(new InputStreamReader(in));//读取数据

String line = br.readLine();

if (lp.length > 0)

Date d = kdf.parse(lp[0]);//日期

String[] data = DataFile.getColumns(lp[1]);

if (data.length == 3) 

try

int categoryId = Integer.parseInt(data[0]) + 1;//犯罪类别id

int districtId = Integer.parseInt(data[1]) + 1;//犯罪区域id

int crimes = Integer.parseInt(data[2]);//犯罪次数

int timeId = insertTimePeriod(d);//时间id

insertFact(districtId, categoryId, timeId, crimes);//插入fact表

System.err.println("invalid data: " + line);

} catch (SQLException e)

e.printStackTrace();

/*** 

* @function 运行job任务

* @param args 

* @throws IOException 

* */

public static void main(String[] args) throws IOException

"hdfs://HadoopMaster:9000/middle/crime/out1/part-r-00000",

"hdfs://HadoopMaster:9000/middle/crime/out2/part-r-00000",

"hdfs://HadoopMaster:9000/middle/crime/out3/part-r-00000",

"192.168.80.128:3306",

"test",

"root",

"root"};

if (args0.length == 7) 

LoadStarDB m = new LoadStarDB(args0[0], args0[1], args0[3],args0[4], args0[5], args0[6],fs);

m.processData(args0[2],fs);

} catch (ClassNotFoundException e)

} else {

System.err

.println("\nusage: java -jar sfcrime.hadoop.mapreduce.jobs-0.0.1-SNAPSHOT.jar com.dynamicalsoftware.olap.etl.LoadStarDB path/to/category/report path/to/district/report path/to/star/data dbhost dbname dbuser dbpassword\n");

* 生成一条数据记录

class DataRecord extends HashMap<String, Object> 

public String toString()

StringBuffer retVal = new StringBuffer();

// 生成表的数据字段

retVal.append("(");

for (String key : keySet()) 

retVal.append(",");

retVal.append(key);

//生成表字段对应的值

retVal.append(") values (");

first = true;

Object o = get(key);

if (first)

if (o instanceof Long) 

retVal.append(((Long) o).toString());

} else if (o instanceof Integer)

retVal.append(((Integer) o).toString());

} else if (o instanceof Date)

Date d = (Date) o;

retVal.append("'");

retVal.append(df.format(d));

} else if (o instanceof String) 

retVal.append(o.toString());

retVal.append(")");

//返回一条sql格式的数据记录

return retVal.toString();

本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6166276.html,如需转载请自行联系原作者