天天看點

Hadoop MapReduce程式設計 API入門系列之Crime資料分析(二十五)(未完)

Hadoop MapReduce程式設計 API入門系列之Crime資料分析(二十五)(未完)
Hadoop MapReduce程式設計 API入門系列之Crime資料分析(二十五)(未完)

  一共12列,我們隻需提取有用的列:第二列(犯罪類型)、第四列(一周的哪一天)、第五列(具體時間)和第七列(犯罪場所)。

        基于項目的需求,我們通過以下幾步完成:

1、首先根據資料集,分别統計出不同犯罪類别在周時段内發生犯罪次數和不同區域在周時段内發生犯罪的次數。

2、然後根據第一步的輸出結果,再按日期統計出每天每種犯罪類别在每個區域發生的犯罪次數。

3、将前兩步的輸出結果,按需求插入資料庫,便于對犯罪資料的分析。

        我們要編寫5個檔案:

編寫基類,MapReduceJobBase.java

資料處理類,DataFile.java

編寫第一個任務類,SanFranciscoCrime.java

編寫第二個任務類,SanFranciscoCrimePrepOlap.java

編寫第三個任務,插入資料庫類,LoadStarDB.java

Hadoop MapReduce程式設計 API入門系列之Crime資料分析(二十五)(未完)
Hadoop MapReduce程式設計 API入門系列之Crime資料分析(二十五)(未完)
Hadoop MapReduce程式設計 API入門系列之Crime資料分析(二十五)(未完)
Hadoop MapReduce程式設計 API入門系列之Crime資料分析(二十五)(未完)
Hadoop MapReduce程式設計 API入門系列之Crime資料分析(二十五)(未完)

    Hive那邊的 資料庫首先需要建立4個表,

分别為:category(name,cid)、

district(name,did)、

fact(fid,district_id,category_id,time_id,crimes)和

timeperiod(tpid,year,month,week,day)。

  1、myclipse将項目編譯和打包為crime.jar,使用SSH将crime.jar上傳至hadoop的/home/hadoop/目錄下。

  2、使用cd /home/hadoop/djt 切換到目前目錄,通過指令行執行任務。

         2.1 首先執行第一個作業 SanFranciscoCrime.java。

     2.2    然後執行第二個作業SanFranciscoCrimePrepOlap.java。

       2.3      最後執行第三個作業LoadStarDB.java,将資料插入資料庫。

        任務的最終結果插入資料庫,資料結果如下圖所示。字段分别為:區域主鍵district_id、類别主鍵category_id、時間主鍵time_id、犯罪次數crimes和主鍵fid。

Hadoop MapReduce程式設計 API入門系列之Crime資料分析(二十五)(未完)

代碼

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.text.DateFormat;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import org.apache.hadoop.conf.Configured;

/**

* @function 在 MapReduce 基類中,定義基礎成員變量,減少 MapReduce 主類的工作量

*/

public class MapReduceJobBase extends Configured

{

* 犯罪類型在犯罪資料數組的下标為1的位置

protected static final int CATEGORY_COLUMN_INDEX = 1;

* 禮拜幾在犯罪資料數組的下标為3的位置

protected static final int DAY_OF_WEEK_COLUMN_INDEX = 3;

* 日期在犯罪資料數組的下标為4的位置

protected static final int DATE_COLUMN_INDEX = 4;

* 犯罪區域在犯罪資料數組的下标為6的位置

protected static final int DISTRICT_COLUMN_INDEX = 6;

* 定義日期的資料格式

protected static final DateFormat df = new SimpleDateFormat("MM/dd/yyyy");

* 定義 map/reduce job結果中,日期的輸出格式

protected static final DateFormat outputDateFormat = new SimpleDateFormat("yyyy/MM/dd");

* @function 将字元串格式的日期轉換為自定義Date類型的日期

* @param value 包含完整的日期字元串

* @return Date類型的日期

* @throws ParseException

protected static Date getDate(String value) throws ParseException 

Date retVal = null;

String[] dp = value.split(" ");

if (dp.length > 0) {

retVal = df.parse(dp[0]);

}

return retVal;

import java.io.BufferedReader;

import java.io.ByteArrayInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.ArrayList;

import java.util.Collections;

import java.util.List;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import com.opencsv.CSVReader;

* @function 從 map/reduce的輸出結果中讀取并提取資料

public abstract class DataFile 

* @function 從 map/reduce job 的輸出結果,提取key值集合

* @param fn HDFS上的檔案路徑

* @return list key值的集合

* @throws IOException

public static List<String> extractKeys(String fn,FileSystem fs) throws IOException 

FSDataInputStream in = fs.open(new Path(fn));//打開檔案

List<String> retVal = new ArrayList<String>();//建立存儲key值的集合list

BufferedReader br = new BufferedReader(new InputStreamReader(in));

String line = br.readLine();//按行讀取資料

while (line != null) 

String[] lp = line.split("\t");

if (lp.length > 0) 

retVal.add(lp[0]);//提取每行的第一個字段key

line = br.readLine();

br.close();

Collections.sort(retVal);//對key值進行排序

* @function 将 csv檔案格式的每行内容轉換為數組傳回

* @param 讀取的一行資料

* @return array 數組

public static String[] getColumns(String line) throws IOException

CSVReader reader = new CSVReader(new InputStreamReader(new ByteArrayInputStream(line.getBytes())));

String[] retVal = reader.readNext();

reader.close();

import java.text.MessageFormat;

import java.util.Calendar;

import java.util.HashMap;

import java.util.logging.Level;

import java.util.logging.Logger;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;

import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

* 時段系統(bucketed system),在物料需求計劃(MRP)、配銷資源規劃(DRP)或其他時程化(time-phased)的系統裡,

* 所有時程化的資料都累積在同一時期,或稱時段(buchet)。如果累積的時間是以周為時間機關,此系統就稱為周時段(weekly buckets)。

* 周時段(weekly buckets)即是一種以周為機關的統計方式

* @function 統計每個事件在每個周時段内發生的次數

*

public class SanFranciscoCrime extends MapReduceJobBase implements Tool 

private static Logger log = Logger

.getLogger(SanFranciscoCrime.class.getCanonicalName());

* CrimeMapper是一個公共的父類

public static class CrimeMapper extends Mapper<LongWritable, Text, Text, Text>

protected int keyID = 0;

protected int valueID = 0;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 

String line = value.toString();

try {

String[] col = DataFile.getColumns(line);

if (col != null)

// 防止數組超界

if (col.length >= (DISTRICT_COLUMN_INDEX + 1))

//過濾檔案第一行頭部名稱

if (!"date".equalsIgnoreCase(col[valueID])) 

Text tk = new Text();

tk.set(col[keyID]);

Text tv = new Text();

tv.set(col[valueID]);

context.write(tk, tv);

} else 

log.warning(MessageFormat.format(

"Data {0} did not parse into columns.",

new Object[] { line }));

} catch (NumberFormatException nfe)

log.log(Level.WARNING, MessageFormat

.format("Expected {0} to be a number.\n",

new Object[] { line }), nfe);

} catch (IOException e) {

log.log(Level.WARNING, MessageFormat.format(

"Cannot parse {0} into columns.\n",

new Object[] { line }), e);

* 輸出key為犯罪類别,value為日期

public static class CategoryMapByDate extends CrimeMapper 

public CategoryMapByDate() 

keyID = CATEGORY_COLUMN_INDEX;//key為犯罪類别

valueID = DATE_COLUMN_INDEX;//value為日期

* 輸出key為犯罪區域,value為日期

public static class DistrictMapByDate extends CrimeMapper

public DistrictMapByDate()

keyID = DISTRICT_COLUMN_INDEX;//key為犯罪區域

* 統計并解析 Mapper 端的輸出結果

public static class CrimeReducerByWeek extends Reducer<Text, Text, Text, Text>

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException

List<String> incidents = new ArrayList<String>();

// 将values放入incidents清單中

for (Text value : values)

incidents.add(value.toString());

if (incidents.size() > 0)

//對incidents清單排序

Collections.sort(incidents);

java.util.Map<Integer, Integer> weekSummary = new HashMap<Integer, Integer>();

//因為是對1-3月資料分析,周時段(weekly buckets)最大為15,是以weekSummary長度為15即可

for (int i = 0; i < 16; i++)

weekSummary.put(i, 0);

//統計每個周時段(weekly buckets)内,該事件發生的次數

for (String incidentDay : incidents) 

try 

Date d = getDate(incidentDay);

Calendar cal = Calendar.getInstance();

cal.setTime(d);

int week = cal.get(Calendar.WEEK_OF_MONTH);//這個月的第幾周

int month = cal.get(Calendar.MONTH);//第幾個月,從0開始

//如果累積的時間是以周為時間機關,此系統就稱為周時段(weekly buckets)。

//周時段的計算公式,最大為15,它隻是一種統計方式,不必深究

int bucket = (month * 5) + week;

//統計每個周時段内,該事件發生的次數

if (weekSummary.containsKey(bucket)) 

weekSummary.put(bucket, new Integer(weekSummary

.get(bucket).intValue() + 1));

} else

weekSummary.put(bucket, new Integer(1));

} catch (ParseException pe)

log.warning(MessageFormat.format("Invalid date {0}",

new Object[] { incidentDay }));

// 将該事件在每個周時段内發生的次數生成字元串輸出

StringBuffer rpt = new StringBuffer();

boolean first = true;

for (int week : weekSummary.keySet())

if (first) 

first = false;

rpt.append(",");

rpt.append(new Integer(weekSummary.get(week)).toString());

String list = rpt.toString();

tv.set(list);

//value為0-15周時段内,該事件發生的次數

context.write(key, tv);

@Override

public int run(String[] args) throws Exception 

Configuration conf1 = new Configuration();

Path out1 = new Path(args[1]);

FileSystem hdfs1 = out1.getFileSystem(conf1);

if (hdfs1.isDirectory(out1)) 

hdfs1.delete(out1, true);

// 任務1

Job job1 = new Job(conf1, "crime");

job1.setJarByClass(SanFranciscoCrime.class);

job1.setMapperClass(CategoryMapByDate.class);

job1.setReducerClass(CrimeReducerByWeek.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job1, new Path(args[0]));

FileOutputFormat.setOutputPath(job1, new Path(args[1]));

// 任務2

Configuration conf2 = new Configuration();

Path out2 = new Path(args[2]);

FileSystem hdfs2 = out2.getFileSystem(conf2);

if (hdfs2.isDirectory(out2))

hdfs2.delete(out2, true);

Job job2 = new Job(conf2, "crime");

job2.setJarByClass(SanFranciscoCrime.class);

job2.setMapperClass(DistrictMapByDate.class);

job2.setReducerClass(CrimeReducerByWeek.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job2, new Path(args[0]));

FileOutputFormat.setOutputPath(job2, new Path(args[2]));

// 構造一個 cJob1

ControlledJob cJob1 = new ControlledJob(conf1);

//設定 MapReduce job1

cJob1.setJob(job1);

// 構造一個 cJob2

ControlledJob cJob2 = new ControlledJob(conf2);

//設定 MapReduce job2

cJob2.setJob(job2);

//cJob2.addDependingJob(cJob1);// cjob2依賴cjob1

// 定義job管理對象

JobControl jobControl = new JobControl("12");

//把兩個構造的job加入到JobControl中

jobControl.addJob(cJob1);

jobControl.addJob(cJob2);

//啟動線程運作任務

Thread t = new Thread(jobControl);

t.start();

while (true)

if (jobControl.allFinished())

jobControl.stop();

break;

return 0;

public static void main(String[] args) throws Exception 

String[] args0 = 

"hdfs://HadoopMaster:9000/middle/crime/crime.csv",

"hdfs://HadoopMaster:9000/middle/test/out1/",

"hdfs://HadoopMaster:9000/middle/test/out2/" };

int ec = ToolRunner.run(new Configuration(), new SanFranciscoCrime(), args0);

System.exit(ec);

import java.net.URI;

* @function 統計每天每種犯罪類型在每個區域發生的次數

public class SanFranciscoCrimePrepOlap extends MapReduceJobBase implements Tool 

private static Logger log = Logger.getLogger(SanFranciscoCrimePrepOlap.class.getCanonicalName());

private static List<String> categories = null;

private static List<String> districts = null;

private static final java.util.Map<String, Integer> categoryLookup = new HashMap<String, Integer>();

private static final java.util.Map<String, Integer> districtLookup = new HashMap<String, Integer>();

public static abstract class Map extends Mapper<LongWritable, Text, Text, Text> 

protected int value2ID = 0;

* @function 将key值轉換為規範的資料格式

* @param value 包含不規範的 key值

* @return 傳回規範的key值

protected abstract String formatKey(String value) throws ParseException;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException

String[] col = DataFile.getColumns(line);//将讀取的每行資料轉換為數組

if (col != null) 

tk.set(formatKey(col[keyID]));//将日期作為key值

StringBuffer sv = new StringBuffer();

sv.append("\"");

sv.append(col[valueID]);//犯罪區域

sv.append(",");

sv.append(col[value2ID]);//犯罪類型

tv.set(sv.toString());

log.warning(MessageFormat.format("Data {0} did not parse into columns.", new Object[]{line}));

} catch (NumberFormatException nfe) 

log.log(Level.WARNING, MessageFormat.format("Expected {0} to be a number.\n", new Object[]{line}), nfe);

} catch (IOException e) 

log.log(Level.WARNING, MessageFormat.format("Cannot parse {0} into columns.\n", new Object[]{line}), e);

} catch (ParseException e)

log.log(Level.WARNING, MessageFormat.format("Expected {0} to be a date but it was not.\n", new Object[]{line}), e);

* @function 将 map 輸入資料的日期作為key,犯罪區域和犯罪類型作為value,然後輸出

public static class DateMapByCategoryAndDistrict extends Map 

public DateMapByCategoryAndDistrict() 

keyID = DATE_COLUMN_INDEX;//代表日期下标

valueID = DISTRICT_COLUMN_INDEX;//代表犯罪區域下标

value2ID = CATEGORY_COLUMN_INDEX;//代表犯罪類型下标

protected String formatKey(String value) throws ParseException 

return outputDateFormat.format(getDate(value));

public static class Reduce extends Reducer<Text, Text, Text, Text> 

// 配置設定和初始化犯罪類型所在區域的二維數組

int[][] crimes = new int[categories.size()][districts.size()];

for (int i = 0; i < categories.size(); i++) 

for (int j = 0; j < districts.size(); j++) 

crimes[i][j] = 0;

//統計犯罪類型/區域二維數組的值(即每種犯罪類型在每個區域發生的次數)

for (Text crime:values)

String[] cols = DataFile.getColumns(crime.toString());

if (cols.length == 2)

if (categoryLookup.containsKey(cols[1]))

if (districtLookup.containsKey(cols[0]))

int cat = categoryLookup.get(cols[1]);

int dist = districtLookup.get(cols[0]);

crimes[cat][dist]++;

log.warning(MessageFormat.format("District {0} not found.", new Object[]{cols[0]}));

log.warning(MessageFormat.format("Category {0} not found.", new Object[]{cols[1]}));

log.warning(MessageFormat.format("Input {0} was in unexpected format", new Object[]{crime}));

//将非0二維數組的犯罪類别下标,犯罪區域下标,犯罪次數作為value輸出

for (int j = 0; j < districts.size(); j++)

if (crimes[i][j] > 0) 

sv.append(new Integer(i).toString());//犯罪類别下标

sv.append(new Integer(j).toString());//犯罪區域下标

sv.append(new Integer(crimes[i][j]));//犯罪次數

* @function 加載已經生成的 犯罪類别資料和犯罪區域資料,并将這些資料排序後存入Map

* @param categoryReport SanFranciscoCrime job任務輸出犯罪類别的檔案路徑

* @param districtReport SanFranciscoCrime job任務輸出犯罪區域的檔案路徑

private static void setup(String categoryReport, String districtReport,FileSystem fs) throws IOException 

categories = DataFile.extractKeys(categoryReport,fs);

districts = DataFile.extractKeys(districtReport,fs);

int i = 0;

for (String category : categories) 

categoryLookup.put(category, i++);

i = 0;

for (String district : districts) 

districtLookup.put(district, i++);

public int run(String[] arg0) throws Exception 

// TODO Auto-generated method stub

Configuration conf = new Configuration();

Path out = new Path(arg0[3]);

FileSystem hdfs = out.getFileSystem(conf);

if (hdfs.isDirectory(out)) 

hdfs.delete(out, true);

Job job = new Job(conf, "SanFranciscoCrimePrepOlap");

job.setJarByClass(SanFranciscoCrimePrepOlap.class);

job.setMapperClass(DateMapByCategoryAndDistrict.class);//Mapper

job.setReducerClass(Reduce.class);//Reducer

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(arg0[0]));

FileOutputFormat.setOutputPath(job, new Path(arg0[3]));

job.waitForCompletion(true);//送出任務

public static void main(String[] args) throws Exception

String[] args0 = {

"hdfs://HadoopMaster:9000/middle/test/out1/part-r-00000",

"hdfs://HadoopMaster:9000/middle/test/out2/part-r-00000",

"hdfs://HadoopMaster:9000/middle/test/out3/"};

if (args0.length == 4) 

FileSystem fs = FileSystem.get(URI.create("hdfs://HadoopMaster:9000"), conf);

//調用setup

setup(args0[1], args0[2],fs);

//執行MapReduce任務

int ec = ToolRunner.run(conf, new SanFranciscoCrimePrepOlap(), args0);

System.err.println("\nusage: bin/hadoop jar sfcrime.hadoop.mapreduce.jobs-0.0.1-SNAPSHOT.jar SanFranciscoCrimePrepOlap path/to/category/report path/to/district/report path/to/input/data path/to/output/data");

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.Map;

/***

* @function 從 MapReduce 任務中,提取資料,插入到mysql資料庫

public class LoadStarDB

private Connection db = null;//mysql資料庫連接配接

private Map<String, Integer> lastPrimaryKey = new HashMap<String, Integer>();

private List<String> categories = null;//犯罪類别list

private List<String> districts = null;//犯罪區域list

//映射date主鍵的關系

private final java.util.Map<Date, Integer> timeperiodLookup = new HashMap<Date, Integer>();

private final DateFormat df = new SimpleDateFormat("MM/dd/yyyy");//插入資料庫的日期格式

private final DateFormat kdf = new SimpleDateFormat("yyyy/MM/dd");//從map/reduce任務輸出檔案中,解析出此日期

* @function 向資料庫表中插入一條記錄

* @param table 表名稱

* @param row 包含插入字段的資料

* @return 傳回此記錄的主鍵id

* @throws SQLException

private int insert(String table, DataRecord row) throws SQLException 

int retVal = 0;

Statement s = db.createStatement();

StringBuffer sql = new StringBuffer();

sql.append("insert into ");

sql.append(table);

sql.append(" ");

sql.append(row.toString());

s.execute(sql.toString());

if (lastPrimaryKey.containsKey(table)) 

retVal = lastPrimaryKey.get(table) + 1;

lastPrimaryKey.put(table, retVal);

lastPrimaryKey.put(table, 1);

retVal = 1;

* @function 向資料庫中插入一條犯罪類别記錄

* @param category name字段對應的值

private int insertCategory(String category) throws SQLException

DataRecord dr = new DataRecord();

dr.put("name", category);

return insert("category", dr);

* @function 向資料庫中插入一條犯罪區域記錄

* @param district name字段對應的值 

private int insertDistrict(String district) throws SQLException 

dr.put("name", district);

return insert("district", dr);

* @function 将日期date拆分為字段 year, month, week, 和 day

* @param dr 包含date被拆分的字段

* @param d 需要拆分的date日期

private void setTimePeriod(DataRecord dr, Date d) 

dr.put("year", cal.get(Calendar.YEAR));

dr.put("month", cal.get(Calendar.MONTH));

dr.put("week", cal.get(Calendar.WEEK_OF_MONTH));

dr.put("day", cal.get(Calendar.DAY_OF_MONTH));

* @function 如果日期date已經存在表中,傳回主鍵id,如果不存在,則插入資料庫并傳回主鍵id

* @param d 日期date

* @return 傳回此日期對應的主鍵id

private int insertTimePeriod(Date d) throws SQLException

if (timeperiodLookup.containsKey(d)) 

retVal = timeperiodLookup.get(d);

setTimePeriod(dr, d);

retVal = insert("timeperiod", dr);

timeperiodLookup.put(d, retVal);

* @function 将資料記錄插入fact表中

* @param districtId 犯罪區域外鍵id

* @param categoryId 犯罪類别外鍵id

* @param timeId 日期外鍵id

* @param crimes 在某一日期 某一區域 發生某一犯罪類别的總犯罪次數

* committed in this district of this category at his time* 

private void insertFact(int districtId, int categoryId, int timeId, int crimes) throws SQLException 

dr.put("district_id", districtId);

dr.put("category_id", categoryId);

dr.put("time_id", timeId);

dr.put("crimes", crimes);

insert("fact", dr);

* @function 從SanFrancisco Crime map/reduce job輸出結果中,讀取資料

* @param categoryReport 犯罪類别檔案路徑

* @param districtReport 犯罪區域檔案路徑

* @throws IOException* 

private void setup(String categoryReport, String districtReport,FileSystem fs) throws IOException, SQLException 

for (String category : categories)

insertCategory(category);

for (String district : districts)

insertDistrict(district);

* @function 清空name表中的所有記錄

* @param name 表名稱

private void truncate(String name) throws SQLException

s.execute("truncate table ".concat(name));

s.close();

* @function 調用truncate()方法,清空表記錄

private void reset() throws SQLException 

truncate("fact");

truncate("category");

truncate("district");

truncate("timeperiod");

* @function 解析加載的資料

* @param dbhost 資料庫位址

* @param dbname 資料庫名稱

* @param dbuser 使用者名

* @param dbpassword 密碼

* @throws ClassNotFoundException* 

* @throws SQLException* 

private LoadStarDB(String categoryReport, String districtReport,

String dbhost, String dbname, String dbuser, String dbpassword,FileSystem fs)

throws ClassNotFoundException, SQLException, IOException

Class.forName("com.mysql.jdbc.Driver");

String cs = MessageFormat

.format("jdbc:mysql://192.168.80.128:3306/test?user=root&password=root&autoReconnect=true",

new Object[] { dbhost, dbname, dbuser, dbpassword });

db = DriverManager.getConnection(cs);

reset();

setup(categoryReport, districtReport,fs);

* @function 處理 SanFranciscoCrimPrepOlap map/reduce job任務輸出結果,填充 timeperiod表和fact表 

* @param dataFile 檔案路徑

private void processData(String dataFile,FileSystem fs) throws IOException,ParseException 

FSDataInputStream in = fs.open(new Path(dataFile));//打開資料流

BufferedReader br = new BufferedReader(new InputStreamReader(in));//讀取資料

String line = br.readLine();

if (lp.length > 0)

Date d = kdf.parse(lp[0]);//日期

String[] data = DataFile.getColumns(lp[1]);

if (data.length == 3) 

try

int categoryId = Integer.parseInt(data[0]) + 1;//犯罪類别id

int districtId = Integer.parseInt(data[1]) + 1;//犯罪區域id

int crimes = Integer.parseInt(data[2]);//犯罪次數

int timeId = insertTimePeriod(d);//時間id

insertFact(districtId, categoryId, timeId, crimes);//插入fact表

System.err.println("invalid data: " + line);

} catch (SQLException e)

e.printStackTrace();

/*** 

* @function 運作job任務

* @param args 

* @throws IOException 

* */

public static void main(String[] args) throws IOException

"hdfs://HadoopMaster:9000/middle/crime/out1/part-r-00000",

"hdfs://HadoopMaster:9000/middle/crime/out2/part-r-00000",

"hdfs://HadoopMaster:9000/middle/crime/out3/part-r-00000",

"192.168.80.128:3306",

"test",

"root",

"root"};

if (args0.length == 7) 

LoadStarDB m = new LoadStarDB(args0[0], args0[1], args0[3],args0[4], args0[5], args0[6],fs);

m.processData(args0[2],fs);

} catch (ClassNotFoundException e)

} else {

System.err

.println("\nusage: java -jar sfcrime.hadoop.mapreduce.jobs-0.0.1-SNAPSHOT.jar com.dynamicalsoftware.olap.etl.LoadStarDB path/to/category/report path/to/district/report path/to/star/data dbhost dbname dbuser dbpassword\n");

* 生成一條資料記錄

class DataRecord extends HashMap<String, Object> 

public String toString()

StringBuffer retVal = new StringBuffer();

// 生成表的資料字段

retVal.append("(");

for (String key : keySet()) 

retVal.append(",");

retVal.append(key);

//生成表字段對應的值

retVal.append(") values (");

first = true;

Object o = get(key);

if (first)

if (o instanceof Long) 

retVal.append(((Long) o).toString());

} else if (o instanceof Integer)

retVal.append(((Integer) o).toString());

} else if (o instanceof Date)

Date d = (Date) o;

retVal.append("'");

retVal.append(df.format(d));

} else if (o instanceof String) 

retVal.append(o.toString());

retVal.append(")");

//傳回一條sql格式的資料記錄

return retVal.toString();

本文轉自大資料躺過的坑部落格園部落格,原文連結:http://www.cnblogs.com/zlslch/p/6166276.html,如需轉載請自行聯系原作者