開篇先介紹業務場景:将電信手機上網日志中的資料導入到Hbase資料庫中,将部分資料以及相應字段描述列出:

圖檔格式描述:
先介紹一個日期格式的轉換:
<a href="http://blog.51cto.com/2226894115/1896517#">?</a>
1
2
3
4
5
6
7
8
9
10
11
<code>public</code> <code>class</code> <code>TestDate</code>
<code>{</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args)</code>
<code> </code><code>{</code>
<code> </code><code>Date d = </code><code>new</code> <code>Date();</code>
<code> </code><code>SimpleDateFormat df = </code><code>new</code> <code>SimpleDateFormat(</code><code>"yyyy-MM-dd HH:mm:ss"</code><code>); </code>
<code> </code><code>String time = df.format(d);</code>
<code> </code><code>System.out.println(time);</code>
<code> </code><code>}</code>
<code>}</code>
<code>/*2016-05-14 13:32:24*/</code>
<code> </code><code>在Java當中,我們經常利用SimpledateFormat這個類将給定的日期轉化成指定的格式。</code>
<code> </code><code>接下來在歸納一下Hbase結合MapReduce批量導入資料的時候,在代碼當中應該注意的事項:</code>
<code> </code><code>①MyReducer類繼承的是TableReduce類,而不在是MapReduce中常用的Reducer類</code>
<code> </code><code>②的數值類型沒有什麼用,通常将k3的數值類型設定為NullWritable即可</code>
<code> </code><code>③隻設定map函數的輸出類型,不在設定reduce函數的輸出類型,因為②的原因</code>
<code> </code><code>④指定對輸出檔案格式化處理的類改為TableOutputFormat,而不在是TextOutputFormat</code>
<code> </code><code>⑤輸出檔案的路徑改為指定的表名,在Configuration中進行設定,而不在是path的方式</code>
<code> </code><code>⑥如果想過jar包的方式運作程式,貌似還需要加入什麼jar包,我沒有整出來。</code>
<code> </code><code>接下來将貼出我在程式設計的時候第一次寫出的業務代碼:當然遇到了很多的問題。</code>
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
<code>package</code> <code>IT01;</code>
<code>import</code> <code>java.io.IOException;</code>
<code>import</code> <code>java.text.SimpleDateFormat;</code>
<code>import</code> <code>java.util.Date;</code>
<code>import</code> <code>org.apache.hadoop.conf.Configuration;</code>
<code>import</code> <code>org.apache.hadoop.fs.Path;</code>
<code>import</code> <code>org.apache.hadoop.hbase.client.Put;</code>
<code>import</code> <code>org.apache.hadoop.hbase.mapreduce.TableOutputFormat;</code>
<code>import</code> <code>org.apache.hadoop.hbase.mapreduce.TableReducer;</code>
<code>import</code> <code>org.apache.hadoop.io.LongWritable;</code>
<code>import</code> <code>org.apache.hadoop.io.NullWritable;</code>
<code>import</code> <code>org.apache.hadoop.io.Text;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.Job;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.Mapper;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.FileInputFormat;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.TextInputFormat;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;</code>
<code>public</code> <code>class</code> <code>HbaseApp</code>
<code> </code><code>public</code> <code>static</code> <code>String path1 = </code><code>"hdfs://hadoop80:9000/FlowData.txt"</code><code>;</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) </code><code>throws</code> <code>Exception</code>
<code> </code><code>Configuration conf = </code><code>new</code> <code>Configuration();</code>
<code> </code><code>conf.set(</code><code>"hbaser.rootdir"</code><code>,</code><code>"hdfs://hadoop80:9000/hbase"</code><code>);</code>
<code> </code><code>conf.set(</code><code>"hbase.zookeeper.quorum"</code><code>,</code><code>"hadoop80"</code><code>);</code>
<code> </code><code>conf.set(TableOutputFormat.OUTPUT_TABLE,</code><code>"wlan_log"</code><code>);</code><code>//在這裡需要指定表的名字:相當于輸出檔案的路徑</code>
<code> </code><code>conf.set(</code><code>"dfs.socket.timeout"</code><code>,</code><code>"2000"</code><code>);</code>
<code> </code><code>Job job = </code><code>new</code> <code>Job(conf,</code><code>"HbaseApp"</code><code>);</code>
<code> </code><code>FileInputFormat.setInputPaths(job, </code><code>new</code> <code>Path(path1));</code>
<code> </code><code>job.setInputFormatClass(TextInputFormat.</code><code>class</code><code>);</code>
<code> </code><code>job.setMapperClass(MyMapper.</code><code>class</code><code>);</code>
<code> </code><code>job.setMapOutputKeyClass(Text.</code><code>class</code><code>);</code>
<code> </code><code>job.setMapOutputValueClass(Text.</code><code>class</code><code>);</code>
<code> </code><code>job.setNumReduceTasks(</code><code>1</code><code>);</code>
<code> </code><code>job.setPartitionerClass(HashPartitioner.</code><code>class</code><code>);</code>
<code> </code><code>job.setReducerClass(MyReducer.</code><code>class</code><code>);</code>
<code>// job.setOutputKeyClass(Text.class);</code>
<code>// job.setOutputValueClass(NullWritable.class);</code>
<code> </code><code>job.setOutputFormatClass(TableOutputFormat.</code><code>class</code><code>);</code>
<code>// FileOutputFormat.setOutputPath(job, new Path(path2));</code>
<code> </code><code>job.waitForCompletion(</code><code>true</code><code>);</code>
<code> </code><code>public</code> <code>static</code> <code>class</code> <code>MyMapper </code><code>extends</code> <code>Mapper{</code>
<code> </code><code>protected</code> <code>void</code> <code>map(LongWritable k1, Text v1,Context context)</code><code>throws</code> <code>IOException, InterruptedException</code>
<code> </code><code>{</code>
<code> </code><code>String[] splited = v1.toString().split(</code><code>"\t"</code><code>);</code>
<code> </code><code>String reportTime = splited[</code><code>0</code><code>];</code>
<code> </code><code>String msisdn = splited[</code><code>1</code><code>];</code>
<code> </code><code>Date date = </code><code>new</code> <code>Date(Long.parseLong(reportTime));</code>
<code> </code><code>String time = DateConvert.dateParse(date);</code>
<code> </code><code>String rowkey = msisdn+</code><code>":"</code><code>+time;</code><code>//擷取到行健</code>
<code> </code><code>context.write(</code><code>new</code> <code>Text(rowkey),</code><code>new</code> <code>Text(v1.toString())); </code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>static</code> <code>class</code> <code>MyReducer </code><code>extends</code> <code>TableReducer{</code>
<code> </code><code>protected</code> <code>void</code> <code>reduce(Text k2, Iterablev2s,Context context)</code><code>throws</code> <code>IOException, InterruptedException</code>
<code> </code><code>for</code> <code>(Text v2 : v2s)</code>
<code> </code><code>{</code>
<code> </code><code>String[] splited = v2.toString().split(</code><code>"\t"</code><code>);</code>
<code> </code><code>/**添加記錄的時候需要指定行健、列族、列名、數值***/</code>
<code> </code><code>Put put = </code><code>new</code> <code>Put(k2.toString().getBytes());</code>
<code> </code><code>put.add(</code><code>"cf"</code><code>.getBytes(),</code><code>"reportTime"</code><code>.getBytes(), splited[</code><code>0</code><code>].getBytes());</code>
<code> </code><code>put.add(</code><code>"cf"</code><code>.getBytes(),</code><code>"msisdn"</code><code>.getBytes(), splited[</code><code>1</code><code>].getBytes());</code>
<code> </code><code>put.add(</code><code>"cf"</code><code>.getBytes(),</code><code>"apmac1"</code><code>.getBytes(), splited[</code><code>2</code><code>].getBytes());</code>
<code> </code><code>put.add(</code><code>"cf"</code><code>.getBytes(),</code><code>"apmac2"</code><code>.getBytes(), splited[</code><code>3</code><code>].getBytes());</code>
<code> </code><code>put.add(</code><code>"cf"</code><code>.getBytes(),</code><code>"host"</code><code>.getBytes(), splited[</code><code>4</code><code>].getBytes());</code>
<code> </code><code>put.add(</code><code>"cf"</code><code>.getBytes(),</code><code>"sitetype"</code><code>.getBytes(), splited[</code><code>5</code><code>].getBytes());</code>
<code> </code><code>put.add(</code><code>"cf"</code><code>.getBytes(),</code><code>"upPackNum"</code><code>.getBytes(), splited[</code><code>6</code><code>].getBytes());</code>
<code> </code><code>put.add(</code><code>"cf"</code><code>.getBytes(),</code><code>"downPackNum"</code><code>.getBytes(), splited[</code><code>7</code><code>].getBytes());</code>
<code> </code><code>put.add(</code><code>"cf"</code><code>.getBytes(),</code><code>"upPayLoad"</code><code>.getBytes(), splited[</code><code>8</code><code>].getBytes());</code>
<code> </code><code>put.add(</code><code>"cf"</code><code>.getBytes(),</code><code>"downPayLoad"</code><code>.getBytes(), splited[</code><code>9</code><code>].getBytes());</code>
<code> </code><code>put.add(</code><code>"cf"</code><code>.getBytes(),</code><code>"httpstatus"</code><code>.getBytes(), splited[</code><code>10</code><code>].getBytes());</code>
<code> </code><code>context.write(NullWritable.get(),put);</code>
<code> </code><code>} </code>
<code>class</code> <code>DateConvert</code>
<code> </code><code>public</code> <code>static</code> <code>String dateParse(Date date)</code>
<code> </code><code>{</code>
<code> </code><code>SimpleDateFormat df = </code><code>new</code> <code>SimpleDateFormat(</code><code>"yyyyMMddhhmmss"</code><code>);</code><code>//構造一個日期解析器</code>
<code> </code><code>return</code> <code>df.format(date); </code>
<code> </code><code>}</code>
程式運作完之後:顯示如下異常NumberFormatException
顯示的是數字格式異常, 于是我在map函數當中又加了一個throws NumberFormatException
<code> </code><code>protected</code> <code>void</code> <code>map(LongWritable k1, Text v1,Context context)</code><code>throws</code> <code>IOException, InterruptedException,NumberFormatException</code>
但是這樣我發現也不對,因為當我追蹤Mapp這個類的源代碼時,我發現父類的map方法并沒有抛出NumberFormatException這個異常,根據子類重寫方法抛出異常的範圍不能大于父類被重寫方法抛出異常的範圍,我又将上面這段代碼用try——catch這種異常處理方式進行處理:
<code>protected</code> <code>void</code> <code>map(LongWritable k1, Text v1,Context context)</code><code>throws</code> <code>IOException, InterruptedException</code>
<code> </code><code>try</code>
<code> </code><code>{</code>
<code> </code><code>String[] splited = v1.toString().split(</code><code>"\t"</code><code>);</code>
<code> </code><code>String reportTime = splited[</code><code>0</code><code>];</code>
<code> </code><code>String msisdn = splited[</code><code>1</code><code>];</code>
<code> </code><code>Date date = </code><code>new</code> <code>Date(Long.parseLong(reportTime));</code>
<code> </code><code>String time = DateConvert.dateParse(date);</code>
<code> </code><code>String rowkey = msisdn+</code><code>":"</code><code>+time;</code><code>//擷取到行健</code>
<code> </code><code>context.write(</code><code>new</code> <code>Text(rowkey),</code><code>new</code> <code>Text(v1.toString())); </code>
<code> </code><code>}</code><code>catch</code><code>(Exception e)</code>
<code> </code><code>Counter counter = context.getCounter(</code><code>"NumberExceptionNum"</code><code>, </code><code>"num"</code><code>);</code>
<code> </code><code>counter.increment(1L);</code>
<code> </code><code>}</code>
當我将代碼改成這樣的時候,此時程式并沒有顯示抛出NumberFormatException這個異常,說明異常得到了處理,但是當我去檢視Hbase資料的時候,發現HDFS中的日志資料并沒有導入到Hbase資料庫中,于是我又檢視了一下MapReduce的運作日志:
也就是我的22行資料在map函數中當中并沒有輸出,這個問題就匪夷所思了,為什麼22行資料都會抛出數字格式異常呢,而且都沒有輸出,于是我想到可能是SimpleDateFZ喎"/kf/ware/vc/" target="_blank" class="keylink">vcm1hdNXiuPbA4LXEzsrM4qOs09rKx87S09a/qsq8uPfW1rDZtsijrLeiz9bN+MnPuty24M7E1cK2vMrHxfrF0NXiuPbA4LXEo6zX7tbV1tXT2tXStb3By87KzOK1xL3ivva3vbC4o6zTw3RyaW0oKdXiuPa3vbeoyKWz/dfWt/u0rsewuvO1xL/VuPG8tL/JoaM8L3A+DQo8cHJlIGNsYXNzPQ=="brush:java;"><code>protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException { try { String[] splited = v1.toString().split("\t"); String reportTime = splited[0].trim(); String msisdn = splited[1].trim(); Date date = new Date(Long.parseLong(reportTime)); String time = DateConvert.dateParse(date); String rowkey = msisdn+":"+time;//擷取到行健 context.write(new Text(rowkey),new Text(v1.toString())); }catch(Exception e) { Counter counter = context.getCounter("NumberExceptionNum", "num"); counter.increment(1L); } }</code>
于是我又開始運作程式,但是當我運作完之後,從MapReduce的計數器當中,我發現第一條資料文本并沒有導入:因為數字格式異常的這個原因估計在運作過程中被終止了。下面是計數器的顯示:
于是我又想到了一個解決方案,将第一條資料多複制一條即可,然後重寫将資料上傳到HDFS中。
此時在一次 運作程式,顯示正确,此時資料也全部導入到Hbase資料庫中。
Hbase中資料檢視核實:
将HDFS中的資料通過MapReduce導入到Hbase資料庫時,總結如下:
核心步驟:先将資料檔案上傳到HDFS,然後用MapReduce進行處理,将處理後的資料插入到 hbase中
注意事項:
1>子類重寫方法抛出異常的範圍不能大于父類被重寫方法抛出異常的範圍
2>用trim()這個方法可以去除字元串前後的空格,換行符。
3>既然第一條資料總是顯示數字格式異常,将第一條資料複制為2份即可。
本文轉自 SimplePoint 51CTO部落格,原文連結:http://blog.51cto.com/2226894115/1896517,如需轉載請自行聯系原作者