資料去重、多表查詢、反向索引、單元測試等案例程式設計
- 1 資料去重
- 2 多表查詢
- 2.1 笛卡爾積
- 2.2 等值連接配接
- 2.3 自連接配接
- 3 反向索引
- 4 單元測試
作者:Be_melting
1 資料去重
相當于實作SQL裡面的distinct的功能。廢話不說多,直接進行代碼程式設計,建立一個demo.distinct的package,然後進行架構的搭建(架構裡面包含Mapper、Reducer和執行的主程式三個檔案),如下
首先開發Mapper程式,具體的代碼如下(基本上之前都見過了,沒啥差別)
package demo.distinct;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k1 v1 k2:job v2:null
public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// 資料:7369,SMITH,CLERK,7902,1980/12/17,800,0,20
String data = v1.toString();
//分詞
String[] words = data.split(",");
//輸出
context.write(new Text(words[2]), NullWritable.get());
}
}
其次就是開發Reduce程式,隻需要指定一下資料類型然後寫入資料,不需要進行其它的操作
package demo.distinct;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text k3, Iterable<NullWritable> v3,Context context) throws IOException, InterruptedException {
context.write(k3, NullWritable.get());
}
}
接着就是執行的主程式,将原來的程式直接拿過來進行改寫(還是修改之前圈出來的三個紅框部分,分别對應下面的(1)(2)(3)中的内容)
package demo.distinct;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DistinctMain {
public static void main(String[] args) throws Exception {
// (1)建立任務Job,并且制定任務的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(DistinctMain.class); //指定為目前程式
//(2)指定任務的Map,Map的輸出類型
job.setMapperClass(DistinctMapper.class);
job.setMapOutputKeyClass(Text.class);//k2
job.setMapOutputValueClass(NullWritable.class);//v2
//(3)指定任務的Reduce,Reduce的輸出類型
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class);//k4
job.setOutputValueClass(NullWritable.class);//v4
//(4)指定任務的輸入和輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//(5)執行任務
job.waitForCompletion(true); //表示執行的時候列印日志
}
}
程式開發完成後打包為p12.jar,上傳到hadoop上運作
核實生成的檔案資料資訊是否去除了重複的資訊(核實無誤,啦啦啦~)
2 多表查詢
補充一些關于SQL中的表連接配接的知識:
- 笛卡爾積
- 連接配接條件:至少有N-1(N代表表的數量),根據連接配接條件的不同,多表查詢的類型
- 等值連接配接
- 不等值連接配接
- 外連接配接:左、右、全(在Flink DataSet API實作批處理)
- 自連接配接:隻需要一張表
2.1 笛卡爾積
關于笛卡爾積,比如部門号有2條記錄,然後員工表有4條記錄,如果兩表進行笛卡爾積,最後就是2x4=8條記錄,如下
但是可以發現笛卡爾積的結果有些内容根本就是錯的,不是我們所需要的,是以我們真正需要的是要有一定依據的連接配接,比如等值連接配接和自連接配接,還有外連接配接等
2.2 等值連接配接
查詢員工資訊,顯示:員工号、姓名,薪水,部門名稱(下面是Sql語句實作)
select e.ename,d.dname
from emp e,dept d
where e.deptno=d.deptno;
在MapReduce中實作,首先要分析一些等值連接配接的一個過程,理清楚裡面每一步的資料類型和步驟,寫起來就很友善了。MapReduce:分析等值連接配接資料處理的流程
- (1)遇到的第一個問題就是如何判斷讀取的資料是來自員工表還是部門表?(方式很多,比如最簡單的擷取檔案名)
- (2)還有就是Map的輸出階段,k2如何進行設定?(部門表和員工表分開即可,都是以部門号作為k2)
- (3)v3中如何識别哪一個是部門名稱,哪一個是員工姓名?(這裡就是在v2的時候進行部門表資訊的标記,比如部門資訊前面加個*号)
程式設計實作等值連接配接,建立一個名為demo.equal的package,然後搭建架構,還是三個檔案(Mapper程式、Reducer程式和運作主程式)
首先開發Mapper程式,就是先按照之前分析的流程中指定一下資料類型,接着解決上面問題,就是判斷是員工表資料還是部門表資料,然後對于部門表中的Text資料進行*号标記,用于差別部門和員工名稱
package demo.equaljoin;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k1 v1 k2 部門号 v2
public class EqualJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//部門表:10,,ACCOUNTING,NEW YORK
//員工表:7369,SMITH,CLERK,7902,1980/12/17,800,0,20
String data = v1.toString();
//分詞
String[] words = data.split(",");
//判斷
if(words.length == 3) {
//部門表:部門号和部門名稱
context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*"+words[1]));
}else {
//員工表:部門号,員工姓名
context.write(new IntWritable(Integer.parseInt(words[7])), new Text(words[1]));
}
}
}
Reducer程式開發的代碼稍微有點複雜,就是需要設定兩個空的字元,然後對Mapper輸出的資料,進行周遊,然後根據是否有*号進行分類,重新寫到部門号和員工姓名字段中
package demo.equaljoin;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class EqualJoinReducer extends Reducer<IntWritable, Text, Text, Text> {
@Override
protected void reduce(IntWritable k3, Iterable<Text> v3, Context context)
throws IOException, InterruptedException {
// 定義變量,分别儲存部門名稱和員工姓名
String dname = "";
String empNameList = "";
for(Text v:v3) {
String str = v.toString();
//判斷是否包含*号
int index = str.indexOf("*");
if(index >= 0) {
//是部門名稱
dname = str.substring(1);
}else {
//是員工姓名
empNameList = str + ";" + empNameList;
}
}
//輸出
context.write(new Text(dname), new Text(empNameList));
}
}
接着就是運作的主程式,還是修改(1)(2)(3)中的内容,其餘的保持不變
package demo.equaljoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EqualJoinMain {
public static void main(String[] args) throws Exception {
// (1)建立任務Job,并且制定任務的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EqualJoinMain.class); //指定為目前程式
//(2)指定任務的Map,Map的輸出類型
job.setMapperClass(EqualJoinMapper.class);
job.setMapOutputKeyClass(IntWritable.class);//k2
job.setMapOutputValueClass(Text.class);//v2
//(3)指定任務的Reduce,Reduce的輸出類型
job.setReducerClass(EqualJoinReducer.class);
job.setOutputKeyClass(Text.class);//k4
job.setOutputValueClass(Text.class);//v4
//(4)指定任務的輸入和輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//(5)執行任務
job.waitForCompletion(true); //表示執行的時候列印日志
}
}
程式開發完成後,打包為p13.jar檔案,上傳至hadoop上運作
核實生成檔案中的資料資訊(左側為部門表,右側為員工表,成功實作了)
2.3 自連接配接
通過表的别名,将同一張表視為多張表,查詢員工資訊,顯示:老闆名稱、員工姓名(Sql語句查詢如下)
select b.ename,e.ename
from emp e,emp b
where e.mgr=b.empno;
在MapReduce中實作自連接配接,首先梳理一下這個過程,理清資料類型和步驟,圖示如下
程式設計實作等值連接配接,建立一個名為demo.selfjoin的package,然後搭建架構,還是三個檔案(Mapper程式、Reducer程式和運作主程式)
首先開發Mapper程式,就是先按照之前分析的流程中指定一下資料類型,對于資料的寫入要進行兩次,表格同時作為員工表和老闆表,這裡還有有防錯的處理,因為資料中存在一個大boss,他上面是沒有老闆的,這個資料是空,是以如果遇到這條資料,就把他的老闆标記為-1,這樣就識别除了大boss
package demo.selfjoin;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SelfJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// 員工表:7369,SMITH,CLERK,7902,1980/12/17,800,0,20
String data = v1.toString();
//分詞
String[] words = data.split(",");
//輸出
try {
//作為老闆表:員工号 姓名
context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*"+words[1]));
//作為員工表: 老闆号 姓名
context.write(new IntWritable(Integer.parseInt(words[3])), new Text(words[1]));
}catch (Exception e) {
// 表示大老闆
context.write(new IntWritable(-1), new Text(words[1]));
}
}
}
Reducer程式開發的代碼這裡比Mapper程式相較簡單一點了,對比一下發現和前面的等值連接配接中的代碼幾乎一模一樣,隻是變量之間存在着差異
package demo.selfjoin;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SelfJoinReducer extends Reducer<IntWritable, Text, Text, Text> {
@Override
protected void reduce(IntWritable k3, Iterable<Text> v3, Context context)
throws IOException, InterruptedException {
//定義兩個變量:老闆姓名 員工姓名
String bossName = "";
String empNameList = "";
for(Text v:v3) {
String str = v.toString();
//判斷是否有*号
int index = str.indexOf("*");
if(index >=0) {
//表示老闆姓名
bossName = str.substring(1);
}else {
empNameList = str + ";" + empNameList;
}
}
context.write(new Text(bossName), new Text(empNameList));
}
}
運作主程式的代碼設計,将(1)(2)(3)中的類名稱修改一下即可
package demo.selfjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SelfJoinMain {
public static void main(String[] args) throws Exception {
// (1)建立任務Job,并且制定任務的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SelfJoinMain.class); //指定為目前程式
//(2)指定任務的Map,Map的輸出類型
job.setMapperClass(SelfJoinMapper.class);
job.setMapOutputKeyClass(IntWritable.class);//k2
job.setMapOutputValueClass(Text.class);//v2
//(3)指定任務的Reduce,Reduce的輸出類型
job.setReducerClass(SelfJoinReducer.class);
job.setOutputKeyClass(Text.class);//k4
job.setOutputValueClass(Text.class);//v4
//(4)指定任務的輸入和輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//(5)執行任務
job.waitForCompletion(true); //表示執行的時候列印日志
}
}
程式設計完成後打包生成p14.jar檔案,上傳hadoop上進行
檢視一下輸出的檔案中是夠有什麼問題不。直接輸出的結果中并不是我們想要的結果,主要是因為,公司的人員架構中大boss上面沒有人了,最底層的員工下面也沒有人了,是以就造成輸出的樣式
對代碼進行改進,判斷如果存在老闆和員工才進行輸出(在Reducer程式中修改)
重新打包一下生成p15.jar檔案,然後上傳hadoop上運作
接下來就是見證奇迹的時候了,啦啦啦~(很完美,和想象中的輸出一毛一樣)
3 反向索引
之前在介紹WordCount計數的時候就已經介紹過反向索引的過程,如下
接下來就是用程式設計的方式自己寫代碼實作一下反向索引。準備測試資料,在temp檔案夾下建立三個檔案,内容分别如下
vi data01.txt
I love Beijing and love Shanghai
vi data02.txt
I love China
vi
檢驗建立的資料,核實無誤
然後将建立的資料上傳到hdfs上,代碼指令:
hdfs dfs -put data0*.txt /indexdata
那麼就是用MapReduce實作反向索引,首先要分析一下這個過程的資料類型和步驟,如下
- 一個檔案中出現重複的内容,為了提高性能可以引入Combiner
- Combiner的加入不影響結果和處理的邏輯(這裡特别注意v2和v3,保證邏輯的統一v2和最後的v2’都應該為Text資料類型)
- 流程分析完畢後就是建立一個demo.revertedindex的package,然後搭建架構(Mapper程式、Reducer程式和運作主程式,注意這次還有個Combiner程式是繼承Reducer)
先開始設計Mapper程式,裡面的關鍵就是擷取檔案名稱,然後在進行字元串的切割求解得到(注意仿照預設定的格式進行擷取/存取資料)
package demo.revertedindex;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; /這個包别導錯了
import org.apache.hadoop.mapreduce.Mapper;
public class RevertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// 資料:data01.txt I love Beijing and love Shanghai
//擷取輸入資料的路徑: /indexdata/data01.txt
String path = ((FileSplit)context.getInputSplit()).getPath().toString();
//查詢最後一個斜線
int index = path.indexOf("/");
//得到檔案名
String fileName = path.substring(index+1);
String data = v1.toString();
//分詞
String[] words = data.split(",");
//輸出
for(String w:words) {
context.write(new Text(w+":"+fileName), new Text("1"));
}
}
}
接着處理Combiner程式,跟着分析的流程一步步進行就可以了
package demo.revertedindex;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class RevertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text k21, Iterable<Text> v21, Context context)
throws IOException, InterruptedException {
// 對v21求和,得到某個單詞在某個檔案中頻率
int total = 0;
for(Text v:v21) {
total = total + Integer.parseInt(v.toString());
}
//k21的資料是:love:data01.txt
String data = k21.toString();
int index = data.indexOf(":");
String word = data.substring(0,index);
String fileName = data.substring(index+1);
// love data01.txt:2
context.write(new Text(word), new Text(fileName+":"+total));
}
}
然後就是處理Reducer程式中的内容,對于Combiner傳輸的資料,進行周遊循壞,依次轉化為目标的格式,最後再輸出
package demo.revertedindex;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class RecertedIndexReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text k3, Iterable<Text> v3, Context context)
throws IOException, InterruptedException {
//對combiner的輸出結果進行拼加
String str = "";
for(Text v:v3) {
str = "(" + v.toString()+")" + str;
}
context.write(k3, new Text(str));
}
}
最後就是編寫運作主程式的代碼,把之前的代碼拿過來修改一下即可(還需要添加中間的Combiner)
package demo.revertedindex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RevertedIndexMain {
public static void main(String[] args) throws Exception {
// (1)建立任務Job,并且制定任務的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(RevertedIndexMain.class); //指定為目前程式
//(2)指定任務的Map,Map的輸出類型
job.setMapperClass(RevertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);//k2
job.setMapOutputValueClass(Text.class);//v2
//引入Combiner
job.setCombinerClass(RevertedIndexCombiner.class);
//(3)指定任務的Reduce,Reduce的輸出類型
job.setReducerClass(RevertedIndexReducer.class);
job.setOutputKeyClass(Text.class);//k4
job.setOutputValueClass(Text.class);//v4
//(4)指定任務的輸入和輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//(5)執行任務
job.waitForCompletion(true); //表示執行的時候列印日志
}
}
程式全部開發完成後,打包為p16.jar,上傳至hadoop上運作(這次是在map階段有了三步)
核實一下生成的檔案中的資訊(最後的内容輸出格式上貌似和原來設想的很不一樣,是以肯定是中間的資料處理出問題了)
接着就是找問題,發現在Mapper程式中有兩行代碼大意了,習慣性的敲出來了(一個是最後的索引少了last,第二個就是分割的時候這次是空格不是逗号)
修改完畢後重新打包上傳運作結果如下(很完美,這次和預期一樣,啦啦啦~)
4 單元測試
之前運作MapReduce程式都是程式開發完成之後,通過WinSCP軟體将本地的jar包上傳至hadoop上,然後再進行操作,但是這種方式并不是很友善,我們希望可以直接就在開發工具中運作測試(可以直接下載下傳hadoop插件,不過這裡還有一個MRUNIT架構就可以實作對MapReduce進行測試)
在工程項目中建立一個新的檔案夾,命名為mrunit,将提供的資料中的相關jar都放置在該檔案夾下,并添加環境,這樣MRUNIT環境就配置完成了,接下來就可以直接進行測試
特别注意一下,添加環境的jar包有一個是和别的jar包有沖突,需要進行剔除(mockito-all-1.8.5.jar)
接着就是見證奇迹的時候啦,以之前寫過的WordCount程式為例,測試一下能不能直接調試運作,建立一個新的package命名為demo.mrunit,然後将wc中的Mapper和Reducer程式直接拷貝過來,分别進行測試建立一個新的Java Class命名為MRUnitWordCount,設定好測試的架構,就是Mapper程式,Reducer程式和Job運作程式
首先處理Mapper的測試,注意導入的MapDriver是在org.apache.hadoop.mrunit.mapreduce下
@Test
public void testMapper() throws Exception{
//建立一個WordCountMapper的測試對象
WordCountMapper mapper = new WordCountMapper();
//建立一個Driver進行單元測試
MapDriver<LongWritable,Text, Text, IntWritable> driver = new MapDriver(mapper);
//指定Map輸入的資料
driver.withInput(new LongWritable(1),new Text("I love Beijing"));
//指定Map的輸出
driver.withOutput(new Text("I"),new IntWritable(1))
.withOutput(new Text("love"),new IntWritable(1))
.withOutput(new Text("Beijing"),new IntWritable(1));
//執行單元測試,對比:我們希望得到的結果和實際運作的結果
driver.runTest();
}
點選滑鼠右鍵進行運作,結果顯示為綠色,說明實際輸出和我們期望輸出一緻
不妨将上面的紅框的内容進行修改一下,比如love單詞的次數修改為2,然後再次運作,檢視一下運作結果(左側的狀态條為紅色,輸出報錯中有提醒,最終的love單詞出現的次數為1,不是期望的2)
Mappper程式測試成功,接着就是測試一下Reduce程式
@Test
public void testReducer() throws Exception{
WordCountReducer reducer = new WordCountReducer();
ReduceDriver<Text, IntWritable, Text, IntWritable>
driver = new ReduceDriver<Text, IntWritable, Text, IntWritable>(reducer);
//構造Reducer輸入 List
ArrayList<IntWritable> value3 = new ArrayList<IntWritable>();
value3.add(new IntWritable(1));
value3.add(new IntWritable(1));
value3.add(new IntWritable(1));
driver.withInput(new Text("Beijing"),value3);
//指定Reducer的輸出,是我們希望得到的結果
driver.withOutput(new Text("Beijing"),new IntWritable(3));
driver.runTest();
輸出的結果為:(運作狀态條為綠色,測試通過)
如果将期望的輸出結果調成4,運作的結果如下(證明程式可以來測試啦)
最後就是來測試運作的Job,代碼如下
@Test
public void testJob() throws Exception{
//建立對象
WordCountMapper mapper = new WordCountMapper();
WordCountReducer reducer = new WordCountReducer();
//建立Driver
MapReduceDriver<LongWritable,Text, Text, IntWritable,Text, IntWritable>
driver = new MapReduceDriver(mapper,reducer);
//指定Mapper輸入的資料
driver.withInput(new LongWritable(1),new Text("I love Beijing"))
.withInput(new LongWritable(2),new Text("I love China"))
.withInput(new LongWritable(3),new Text("Beijing is the capital of China"));
//指定Reducer的輸出
driver.withOutput(new Text("I"),new IntWritable(2))
.withOutput(new Text("love"),new IntWritable(2))
.withOutput(new Text("Beijing"),new IntWritable(2))
.withOutput(new Text("China"),new IntWritable(2))
.withOutput(new Text("is"),new IntWritable(1))
.withOutput(new Text("the"),new IntWritable(1))
.withOutput(new Text("capital"),new IntWritable(1))
.withOutput(new Text("of"),new IntWritable(1));
driver.runTest();
}
輸出的結果為:(可以發現最終的計數是正常的,但是順序不對)
因為MapReduce會有一個預設的排序規則,我們調整一下最後的輸出的内容,然後再運作,可以發現按照字典的順序進行輸出後,狀态條顯示綠色,測試正确