05.Mapreduce執行個體——Map端join
實驗原理
MapReduce提供了表連接配接操作其中包括Map端join、Reduce端join還有單表連接配接,現在我們要讨論的是Map端join,Map端join是指資料到達map處理函數之前進行合并的,效率要遠遠高于Reduce端join,因為Reduce端join是把所有的資料都經過Shuffle,非常消耗資源。
1.Map端join的使用場景:一張表資料十分小、一張表資料很大。
Map端join是針對以上場景進行的優化:将小表中的資料全部加載到記憶體,按關鍵字建立索引。大表中的資料作為map的輸入,對map()函數每一對<key,value>輸入,都能夠友善地和已加載到記憶體的小資料進行連接配接。把連接配接結果按key輸出,經過shuffle階段,reduce端得到的就是已經按key分組并且連接配接好了的資料。
為了支援檔案的複制,Hadoop提供了一個類DistributedCache,使用該類的方法如下:
(1)使用者使用靜态方法DistributedCache.addCacheFile()指定要複制的檔案,它的參數是檔案的URI(如果是HDFS上的檔案,可以這樣:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作業啟動之前會擷取這個URI清單,并将相應的檔案拷貝到各個TaskTracker的本地磁盤上。
(2)使用者使用DistributedCache.getLocalCacheFiles()方法擷取檔案目錄,并使用标準的檔案讀寫API讀取相應的檔案。
2.本實驗Map端Join的執行流程
(1)首先在送出作業的時候先将小表檔案放到該作業的DistributedCache中,然後從DistributeCache中取出該小表進行join連接配接的 <key ,value>鍵值對,将其解釋分割放到記憶體中(可以放大Hash Map等等容器中)。
(2)要重寫MyMapper類下面的setup()方法,因為這個方法是先于map方法執行的,将較小表先讀入到一個HashMap中。
(3)重寫map函數,一行行讀入大表的内容,逐一的與HashMap中的内容進行比較,若Key相同,則對資料進行格式化處理,然後直接輸出。
(4)map函數輸出的<key,value >鍵值對首先經過一個suffle把key值相同的所有value放到一個疊代器中形成values,然後将<key,values>鍵值對傳遞給reduce函數,reduce函數輸入的key直接複制給輸出的key,輸入的values通過增強版for循環周遊逐一輸出,循環的次數決定了<key,value>輸出的次數。
實驗步驟:
- 建兩個文本文檔,用逗号分隔開,資料如下
orders1表
訂單ID 訂單号 使用者ID 下單日期
52304 111215052630 176474 2011-12-15 04:58:21
52303 111215052629 178350 2011-12-15 04:45:31
52302 111215052628 172296 2011-12-15 03:12:23
52301 111215052627 178348 2011-12-15 02:37:32
52300 111215052626 174893 2011-12-15 02:18:56
52299 111215052625 169471 2011-12-15 01:33:46
52298 111215052624 178345 2011-12-15 01:04:41
52297 111215052623 176369 2011-12-15 01:02:20
52296 111215052622 178343 2011-12-15 00:38:02
52295 111215052621 178342 2011-12-15 00:18:43
52294 111215052620 178341 2011-12-15 00:14:37
52293 111215052619 178338 2011-12-15 00:13:07
order_items1表
明細ID 訂單ID 商品ID
252578 52293 1016840
252579 52293 1014040
252580 52294 1014200
252581 52294 1001012
252582 52294 1022245
252583 52294 1014724
252584 52294 1010731
252586 52295 1023399
252587 52295 1016840
252592 52296 1021134
252593 52296 1021133
252585 52295 1021840
252588 52295 1014040
252589 52296 1014040
252590 52296 1019043
- 虛拟機中啟動Hadoop
- 建立/data/mapreduce5目錄
mkdir -p /data/mapreduce5
- 将兩個表上傳到虛拟機中
- 上傳并解壓hadoop2lib檔案
- 在HDFS上建立/mymapreduce5/in目錄,然後将Linux本地/data/mapreduce5目錄下的orders1和order_items1檔案導入到HDFS的/mymapreduce5/in目錄中。
hadoop fs -mkdir -p /mymapreduce5/in
hadoop fs -put /data/mapreduce5/orders1 /mymapreduce5/in
hadoop fs -put /data/mapreduce5/order_items1 /mymapreduce5/in
- IDEA中編寫Java代碼
-
package mapreduce5;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapJoin {
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
private Map<String, String> dict = new HashMap<>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String fileName = context.getLocalCacheFiles()[0].getName();
//System.out.println(fileName);
BufferedReader reader = new BufferedReader(new FileReader(fileName));
String codeandname = null;
while (null != ( codeandname = reader.readLine() ) ) {
String
str[]=codeandname.split("\t");
dict.put(str[0], str[2]+"\t"+str[3]);
}
reader.close();
}
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] kv =
value.toString().split("\t");
if (dict.containsKey(kv[1])) {
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));
}
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
protected void reduce(Text key, Iterable<Text> values, Context context)
for (Text text : values) {
context.write(key, text);
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
Job job = Job.getInstance();
job.setJobName("mapjoin");
job.setJarByClass(MapJoin.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path in = new Path("hdfs://192.168.149.10:9000/mymapreduce5/in/order_items1");
Path out = new Path("hdfs://192.168.149.10:9000/mymapreduce5/out");
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
URI uri = new URI("hdfs://192.168.149.10:9000/mymapreduce5/in/orders1");
job.addCacheFile(uri);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
- 将hadoop2lib目錄中的jar包,拷貝到hadoop2lib目錄下。
- 拷貝log4j.properties檔案
- 運作結果
運作失敗,顯示找不到檔案orders1,報錯filename
在filename前加上”Windows中存放orders1檔案的位址”
再次運作,運作成功
