天天看點

hadoop distributedcache使用

主題思想:

hadoop distributedcache的作用将檔案分發到各個結點本地磁盤儲存,并且用完後并不會被立即清理的,而是專門的一個線程根據檔案大小限制(local.cache.size設定,預設是10G)

和檔案/目錄數目(mapreduce.tasktracker.local.cache.numberdirectories預設是是10000)上限周期性進行清理(預設60s,mapreduce.tasktracker.distributedcache.checkperiod控制)

儲存到本地的使用對于public級别的檔案(當權限設定最後一個其他使用者可執行權限x的時候)

會被所有使用者共享,隻會下載下傳一次,所有使用者即可使用,如果最後一個其他使用者無權限,則是private私有的,隻有該使用者的作業能重用.

public的時候,本地會儲存在公共目錄 ${mapred.local.dir}/taskTracker/distcache

private的時候,本地會儲存在私有目錄${mapred.local.dir}/taskTracker/${user}

有兩點注意:

1、mapreduce.tasktracker.local.cache.numberdirectories,mapreduce.tasktracker.distributedcache.checkperiod參數并沒有提供在相應的XML檔案配置,而是在源碼設定,對應的類TrackerDistributedCacheManager

2、老版本hadoop-0.20.2的distributedcache的清理機制較簡單,每次下載下傳緩存檔案的時候發現大小超過10G,則進行清理工作,清理目前沒有作業正使用的本地緩存檔案,而且應該也是沒有public,private機制的,在源代碼沒看到。

具體見代碼:

 public static Path getLocalCache(URI cache, Configuration conf, 

      Path baseDir, FileStatus fileStatus,

      boolean isArchive, long confFileStamp,

      Path currentWorkDir, boolean honorSymLinkConf) 

  throws IOException {

   ......    // setting the cache size to a default of 10GB

    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);

    if (allowedSize < size) {

      // try some cache deletions

      deleteCache(conf);

......  }

  private static void deleteCache(Configuration conf) throws IOException {

    // try deleting cache Status with refcount of zero

    synchronized (cachedArchives) {

      for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {

        String cacheId = (String) it.next();

        CacheStatus lcacheStatus = cachedArchives.get(cacheId);

        synchronized (lcacheStatus) {

          if (lcacheStatus.refcount == 0) { 

//隻删除未有操作使用的cache

...............            }

            it.remove();

          }

        }

      }

    }

hadoop distributedcache用法執行個體:

import org.apache.log4j.Logger;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
//import org.apache.hadoop.mapred.MapReduceBase;
//import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ToolRunner;
/*hdfs://localhost:8888/user/xu/input01 hdfs://localhost:8888/user/xu/output01

-Xmx800m*/
import org.apache.hadoop.util.Tool;

import com.hadoop.test.WordCount;

/*根據上一步得到的頻繁項目集-2候選集,掃描源資料集統計各候選集的數量
 * 
 * command
 * ./hadoop jar test71.jar com.hadoop.relationrule.Rule_Freq_2_Last hdfs://hadoopmaster:8085/user/hadoop/relationrule/input01/ hdfs://hadoopmaster:8085/user/hadoop/relationrule/freq_2_last/

 */
@SuppressWarnings("deprecation")
public class Rule_Freq_2_Last extends Configured implements Tool {
	

  public static class Freq_1_Mapper 
  extends Mapper<Object, Text, Text, IntWritable>  {
	  static Logger logger = Logger.getLogger(Freq_1_Mapper.class.getName());
	  
	  private ArrayList<ArrayList<String>> al=new ArrayList<ArrayList<String>>();
	  private ArrayList<String> al1=new ArrayList<String>();
	  public void setup(Context context) throws IOException,   InterruptedException
	  {
		  URI[] uris = DistributedCache.getCacheFiles(context   
				                  .getConfiguration());   
				     Path[] paths = DistributedCache.getLocalCacheFiles(context   
				                 .getConfiguration());   
				     if (paths != null && paths.length > 0) {  
		                    String line;  
		                    String[] tokens;  
		                    BufferedReader joinReader = new BufferedReader(  
		                                            new FileReader(paths[0].toString()));  
		                    try {  
		                        while ((line = joinReader.readLine()) != null) {  
		                            tokens = line.split(" ");  
		                            logger.info("readLine::!!!"+" "+line);
		                            
		                           al1.clear();
		                            for (String s:tokens)
		                            {
		                            	al1.add(s);
		                            }
		                            al.add(al1);
		               }
		                    }
		                    
		                    catch (Exception e )
		                    {
		                    	e.printStackTrace();
		                    }
		                    finally{
		                    	joinReader.close();
		                    }
				     }
				     
	  }
	  
	  private Text outkey =new Text();
	  private IntWritable one =new IntWritable(1);
	  public void map(Object key,Text value,Context context)
	  {
		  String[] vs=value.toString().split(" ");
		  int count=0;
		 
		  logger.info("value.toString()::!!!"+" "+value.toString());
			  for (int j=0;j<al.size();j++)
			  {
				  
				  for (int k=0;k<al.get(j).size();k++)
				  {
					  for (String val:vs)
					  {
					  if (val.equals(al.get(j).get(k)))
						  count++;
					  }
				  }
			  
				  if (count==2)
				  {
					   outkey.set(al.get(j).get(0)+" "+al.get(j).get(1));
					   try {
						context.write(outkey, one) ;
						 logger.info("context.write(outkey, one)::!!!"+" "+outkey.toString()+" "+one.toString());
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				 
				  }
			  count=0;
			  }
		  
		  
		  
		  
	  }
	  
  }
    

  
  public static class Freq_1_Reducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result =new IntWritable();
   //private Text text1 =new Text();
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
    	int sum=0;
    	 for (IntWritable val:values)
    	 {
    		 sum+=val.get();
    	 }
    	 
    	 if (sum>=2)
    	 {
    	  result.set(sum);
    	  context.write(key, result);
    	 } 
    	
    	

  }
  }
  
  public int run(String[] args) throws Exception {  
	   Configuration conf = new Configuration();
      DistributedCache.addCacheFile(new Path("hdfs://hadoopmaster:8085/user/hadoop/relationrule/freq_2/part-r-00000").toUri(), conf);  

      String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
      if (otherArgs.length != 2) {
        System.err.println("Usage: wordcount <in> <out>");
        System.exit(2);
      }
   
     //conf.set("mapred.textoutputformat.separator", " ");
      Job job = new Job(conf, "Rule_Freq_2_Last");
     
      //job.setNumReduceTasks(10);
     
      job.setJarByClass(Rule_Freq_2_Last.class);
      job.setMapperClass(Freq_1_Mapper.class);
    //  job.setCombinerClass(ColReducer.class);
     // job.setMapOutputKeyClass(Text.class);
     // job.setMapOutputValueClass(IntWritable.class);
      job.setReducerClass(Freq_1_Reducer.class);
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
     // job.setOutputValueClass(Text.class);
      //job.setNumReduceTasks(18);
      FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
      FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
      return 0;  
  }  


  public static void main(String[] args) throws Exception {
	  int res = ToolRunner.run(new Configuration(), new Rule_Freq_2_Last(), args);  
       System.exit(res);  


  }
}
           

可以有另外一種寫法,使用者可以通過 DistributedCache.createSymlink(Configuration)方法讓DistributedCache 在目前工作目錄下建立到緩存檔案的符号連結。 或者通過設定配置檔案屬性mapred.create.symlink為yes。 分布式緩存會截取URI的片段作為連結的名字。

例如:

Configuration conf = new Configuration();
	   DistributedCache.createSymlink(conf);
      DistributedCache.addCacheFile(new Path("hdfs://hadoopmaster:8085/user/hive/warehouse/t_pd_page_expr/expr.txt#expr").toUri(), conf);  
           

則在task的目前工作目錄會有expr的連結,相當于快捷方法,連結到expr.txt檔案,在setup方法使用的情況則要簡單許多

BufferedReader br = new BufferedReader(  
                                    new FileReader("expr")); 
           

繼續閱讀