天天看點

hadoop 分布式緩存

Hadoop 分布式緩存實作目的是在所有的MapReduce調用一個統一的配置檔案,首先将緩存檔案放置在HDFS中,然後程式在執行的過程中會可以通過設定将檔案下載下傳到本地具體設定如下:

public static void main(String[] arge) throws IOException, ClassNotFoundException, InterruptedException{

        Configuration conf=new Configuration();

        conf.set("fs.default.name", "hdfs://192.168.1.45:9000");

        FileSystem fs=FileSystem.get(conf);

        fs.delete(new Path("CASICJNJP/gongda/Test_gd20140104"));

        conf.set("mapred.job.tracker", "192.168.1.45:9001");

        conf.set("mapred.jar", "/home/hadoop/workspace/jar/OBDDataSelectWithImeiTxt.jar");

        Job job=new Job(conf,"myTaxiAnalyze");

        DistributedCache.createSymlink(job.getConfiguration());//

        try {

            DistributedCache.addCacheFile(new URI("/user/hadoop/CASICJNJP/DistributeFiles/imei.txt"), job.getConfiguration());

        } catch (URISyntaxException e1) {

            // TODO Auto-generated catch block

            e1.printStackTrace();

        }            

        job.setMapperClass(OBDDataSelectMaper.class);

        job.setReducerClass(OBDDataSelectReducer.class);

        //job.setNumReduceTasks(10);

        //job.setCombinerClass(IntSumReducer.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path("/user/hadoop/CASICJNJP/SortedData/20140104"));

        FileOutputFormat.setOutputPath(job, new Path("CASICJNJP/gongda/SelectedData"));

        System.exit(job.waitForCompletion(true)?0:1);

    }

    代碼中标紅的為将HDFS中的/user/hadoop/CASICJNJP/DistributeFiles/imei.txt作為分布式緩存

public class OBDDataSelectMaper extends Mapper<Object, Text, Text, Text> {

    String[] strs;

    String[] ImeiTimes;

    String timei;

    String time;

    private java.util.List<Integer> ImeiList = new java.util.ArrayList<Integer>();

    protected void setup(Context context) throws IOException,

            InterruptedException {

        try {

            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context

                    .getConfiguration());

            if (cacheFiles != null && cacheFiles.length > 0) {

                String line;

                BufferedReader br = new BufferedReader(new FileReader(

                        cacheFiles[0].toString()));

                try {

                    line = br.readLine();

                    while ((line = br.readLine()) != null) {

                        ImeiList.add(Integer.parseInt(line));

                    }

                } finally {

                    br.close();

                }

            }

        } catch (IOException e) {

            System.err.println("Exception reading DistributedCache: " + e);

        }

    public void map(Object key, Text value, Context context)

            throws IOException, InterruptedException {

            strs = value.toString().split("\t");

            ImeiTimes = strs[0].split("_");

            timei = ImeiTimes[0];

            if (ImeiList.contains(Integer.parseInt(timei))) {

                context.write(new Text(strs[0]), value);

        } catch (Exception ex) {

}

上述标紅代碼中在Map的setup函數中加載分布式緩存。