天天看點

05.Mapreduce執行個體——Map端join

05.Mapreduce執行個體——Map端join

實驗原理

MapReduce提供了表連接配接操作其中包括Map端join、Reduce端join還有單表連接配接,現在我們要讨論的是Map端join,Map端join是指資料到達map處理函數之前進行合并的,效率要遠遠高于Reduce端join,因為Reduce端join是把所有的資料都經過Shuffle,非常消耗資源。

1.Map端join的使用場景:一張表資料十分小、一張表資料很大。

Map端join是針對以上場景進行的優化:将小表中的資料全部加載到記憶體,按關鍵字建立索引。大表中的資料作為map的輸入,對map()函數每一對<key,value>輸入,都能夠友善地和已加載到記憶體的小資料進行連接配接。把連接配接結果按key輸出,經過shuffle階段,reduce端得到的就是已經按key分組并且連接配接好了的資料。

為了支援檔案的複制,Hadoop提供了一個類DistributedCache,使用該類的方法如下:

(1)使用者使用靜态方法DistributedCache.addCacheFile()指定要複制的檔案,它的參數是檔案的URI(如果是HDFS上的檔案,可以這樣:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作業啟動之前會擷取這個URI清單,并将相應的檔案拷貝到各個TaskTracker的本地磁盤上。

(2)使用者使用DistributedCache.getLocalCacheFiles()方法擷取檔案目錄,并使用标準的檔案讀寫API讀取相應的檔案。

2.本實驗Map端Join的執行流程

(1)首先在送出作業的時候先将小表檔案放到該作業的DistributedCache中,然後從DistributeCache中取出該小表進行join連接配接的 <key ,value>鍵值對,将其解釋分割放到記憶體中(可以放大Hash Map等等容器中)。

(2)要重寫MyMapper類下面的setup()方法,因為這個方法是先于map方法執行的,将較小表先讀入到一個HashMap中。

(3)重寫map函數,一行行讀入大表的内容,逐一的與HashMap中的内容進行比較,若Key相同,則對資料進行格式化處理,然後直接輸出。

(4)map函數輸出的<key,value >鍵值對首先經過一個suffle把key值相同的所有value放到一個疊代器中形成values,然後将<key,values>鍵值對傳遞給reduce函數,reduce函數輸入的key直接複制給輸出的key,輸入的values通過增強版for循環周遊逐一輸出,循環的次數決定了<key,value>輸出的次數。

實驗步驟:

  1. 建兩個文本文檔,用逗号分隔開,資料如下

orders1表

訂單ID   訂單号          使用者ID    下單日期  

52304   111215052630    176474  2011-12-15 04:58:21  

52303   111215052629    178350  2011-12-15 04:45:31  

52302   111215052628    172296  2011-12-15 03:12:23  

52301   111215052627    178348  2011-12-15 02:37:32  

52300   111215052626    174893  2011-12-15 02:18:56  

52299   111215052625    169471  2011-12-15 01:33:46  

52298   111215052624    178345  2011-12-15 01:04:41  

52297   111215052623    176369  2011-12-15 01:02:20  

52296   111215052622    178343  2011-12-15 00:38:02  

52295   111215052621    178342  2011-12-15 00:18:43  

52294   111215052620    178341  2011-12-15 00:14:37  

52293   111215052619    178338  2011-12-15 00:13:07  

order_items1表

明細ID  訂單ID   商品ID  

252578  52293   1016840  

252579  52293   1014040  

252580  52294   1014200  

252581  52294   1001012  

252582  52294   1022245  

252583  52294   1014724  

252584  52294   1010731  

252586  52295   1023399  

252587  52295   1016840  

252592  52296   1021134  

252593  52296   1021133  

252585  52295   1021840  

252588  52295   1014040  

252589  52296   1014040  

252590  52296   1019043  

  1. 虛拟機中啟動Hadoop
  2. 建立/data/mapreduce5目錄

       mkdir -p /data/mapreduce5 

  1. 将兩個表上傳到虛拟機中
  2. 上傳并解壓hadoop2lib檔案
  3. 在HDFS上建立/mymapreduce5/in目錄,然後将Linux本地/data/mapreduce5目錄下的orders1和order_items1檔案導入到HDFS的/mymapreduce5/in目錄中。

         hadoop fs -mkdir -p /mymapreduce5/in 

         hadoop fs -put /data/mapreduce5/orders1 /mymapreduce5/in 

         hadoop fs -put /data/mapreduce5/order_items1 /mymapreduce5/in 

  1. IDEA中編寫Java代碼
  2. package mapreduce5;

    import java.io.BufferedReader;

    import java.io.FileReader;

    import java.io.IOException;

    import java.net.URI;

    import java.net.URISyntaxException;

    import java.util.HashMap;

    import java.util.Map;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class MapJoin {

        public static class MyMapper extends Mapper<Object, Text, Text, Text>{

            private Map<String, String> dict = new HashMap<>();

            @Override

            protected void setup(Context context) throws IOException,

                    InterruptedException {

                String fileName = context.getLocalCacheFiles()[0].getName();

                //System.out.println(fileName);

                BufferedReader reader = new BufferedReader(new FileReader(fileName));

                String codeandname = null;

                while (null != ( codeandname = reader.readLine() ) ) {

                    String

    str[]=codeandname.split("\t");

                    dict.put(str[0], str[2]+"\t"+str[3]);

                }

                reader.close();

            }

            protected void map(Object key, Text value, Context context)

                    throws IOException, InterruptedException {

                String[] kv =

    value.toString().split("\t");

                if (dict.containsKey(kv[1])) {

                    context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));

        }

        public static class MyReducer extends Reducer<Text, Text, Text, Text>{

            protected void reduce(Text key, Iterable<Text> values, Context context)

                for (Text text : values) {

                    context.write(key, text);

        public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {

            Job job = Job.getInstance();

            job.setJobName("mapjoin");

            job.setJarByClass(MapJoin.class);

            job.setMapperClass(MyMapper.class);

            job.setReducerClass(MyReducer.class);

            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(Text.class);

            Path in = new Path("hdfs://192.168.149.10:9000/mymapreduce5/in/order_items1");

            Path out = new Path("hdfs://192.168.149.10:9000/mymapreduce5/out");

            FileInputFormat.addInputPath(job, in);

            FileOutputFormat.setOutputPath(job, out);

            URI uri = new URI("hdfs://192.168.149.10:9000/mymapreduce5/in/orders1");

            job.addCacheFile(uri);

            System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

  3. 将hadoop2lib目錄中的jar包,拷貝到hadoop2lib目錄下。
  4. 拷貝log4j.properties檔案
  5. 運作結果

運作失敗,顯示找不到檔案orders1,報錯filename

在filename前加上”Windows中存放orders1檔案的位址”

再次運作,運作成功

05.Mapreduce執行個體——Map端join
05.Mapreduce執行個體——Map端join
05.Mapreduce執行個體——Map端join
05.Mapreduce執行個體——Map端join