天天看點

Hadoop學習(2)-java用戶端操作hdfs及secondarynode作用

Hadoop學習(2)-java用戶端操作hdfs及secondarynode作用

首先要在windows下解壓一個windows版本的hadoop

然後在配置他的環境變量,同時要把hadoop的share目錄下的hadoop下的相關jar包拷貝到esclipe

然後Build Path

下面上代碼

複制代碼

import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.URI;

import java.net.URISyntaxException;

import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.LocatedFileStatus;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.fs.RemoteIterator;

import org.junit.Before;

import org.junit.Test;

public class HdfsClientDemo {

public static void main(String[] args) throws Exception {
    /**
     * Configuration參數對象的機制:
     *    構造時,會加載jar包中的預設配置 xx-default.xml
     *    再加載 使用者配置xx-site.xml  ,覆寫掉預設參數
     *    構造完成之後,還可以conf.set("p","v"),會再次覆寫使用者配置檔案中的參數值
     */
    // new Configuration()會從項目的classpath中加載core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等檔案
    Configuration conf = new Configuration();
    
    // 指定本用戶端上傳檔案到hdfs時需要儲存的副本數為:2
    conf.set("dfs.replication", "2");
    // 指定本用戶端上傳檔案到hdfs時切塊的規格大小:64M
    conf.set("dfs.blocksize", "64m");
    
    // 構造一個通路指定HDFS系統的用戶端對象: 參數1:——HDFS系統的URI,參數2:——用戶端要特别指定的參數,參數3:用戶端的身份(使用者名)
    FileSystem fs = FileSystem.get(new URI("hdfs://172.31.2.38:9000/"), conf, "root");
    
    // 上傳一個檔案到HDFS中
    fs.copyFromLocalFile(new Path("D:/install-pkgs/hbase-1.2.1-bin.tar.gz"), new Path("/aaa/"));
    
    fs.close();
}

FileSystem fs = null;

@Before
public void init() throws Exception{
    Configuration conf = new Configuration();
    conf.set("dfs.replication", "2");
    conf.set("dfs.blocksize", "64m");
    
    fs = FileSystem.get(new URI("hdfs://172.31.2.38:9000/"), conf, "root");
    
}


/**
 * 從HDFS中下載下傳檔案到用戶端本地磁盤
 * @throws IOException 
 * @throws IllegalArgumentException 
 */
@Test
public void testGet() throws IllegalArgumentException, IOException{
    
    fs.copyToLocalFile(new Path("/test"), new Path("d:/"));
    fs.close();
    
}


/**
 * 在hdfs内部移動檔案\修改名稱
 */
@Test
public void testRename() throws Exception{
    
    fs.rename(new Path("/install.log"), new Path("/aaa/in.log"));
    
    fs.close();
    
}

/**
 * 在hdfs中建立檔案夾
 */
@Test
public void testMkdir() throws Exception{
    
    fs.mkdirs(new Path("/xx/yy/zz"));
    
    fs.close();
}


/**
 * 在hdfs中删除檔案或檔案夾
 */
@Test
public void testRm() throws Exception{
    
    fs.delete(new Path("/aaa"), true);
    
    fs.close();
}



/**
 * 查詢hdfs指定目錄下的檔案資訊
 */
@Test
public void testLs() throws Exception{
    // 隻查詢檔案的資訊,不傳回檔案夾的資訊
    RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/"), true);
    
    while(iter.hasNext()){
        LocatedFileStatus status = iter.next();
        System.out.println("檔案全路徑:"+status.getPath());
        System.out.println("塊大小:"+status.getBlockSize());
        System.out.println("檔案長度:"+status.getLen());
        System.out.println("副本數量:"+status.getReplication());
        System.out.println("塊資訊:"+Arrays.toString(status.getBlockLocations()));
        
        System.out.println("--------------------------------");
    }
    fs.close();
}
           
/**
 * 讀取hdfs中的檔案的内容
 * 
 * @throws IOException
 * @throws IllegalArgumentException
 */
@Test
public void testReadData() throws IllegalArgumentException, IOException {

    FSDataInputStream in = fs.open(new Path("/test.txt"));

    BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));

    String line = null;
    while ((line = br.readLine()) != null) {
        System.out.println(line);
    }

    br.close();
    in.close();
    fs.close();

}

/**
 * 讀取hdfs中檔案的指定偏移量範圍的内容
 * 
 * 
 * 
 * @throws IOException
 * @throws IllegalArgumentException
 */
@Test
public void testRandomReadData() throws IllegalArgumentException, IOException {

    FSDataInputStream in = fs.open(new Path("/xx.dat"));

    // 将讀取的起始位置進行指定
    in.seek(12);

    // 讀16個位元組
    byte[] buf = new byte[16];
    in.read(buf);

    System.out.println(new String(buf));

    in.close();
    fs.close();

}

/**
 * 往hdfs中的檔案寫内容
 * 
 * @throws IOException
 * @throws IllegalArgumentException
 */

@Test
public void testWriteData() throws IllegalArgumentException, IOException {

    FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false);

    // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg

    FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg");

    byte[] buf = new byte[1024];
    int read = 0;
    while ((read = in.read(buf)) != -1) {
        out.write(buf,0,read);
    }
    
    in.close();
    out.close();
    fs.close();

}
           

}

練習:從一個檔案裡面不斷地采集日志上傳到hdfs裡面

1.流程介紹

---啟動一個定時任務

    --定時探測日志原目錄

    --擷取檔案上傳到一個待上傳的臨時目錄

    --逐一上傳到hdfs目标路徑,同時移動到備份目錄裡

--啟動一個定時任務:

    --探測備份目錄中的備份資料是否已經超出,如果超出就删除

主類為:

import java.util.Timer;

public class DataCollectMain {

public static void main(String[] args) {
    
    Timer timer = new Timer();
    //第一個為task類,第二個開始時間 第三個沒隔多久執行一次
    timer.schedule(new CollectTask(), 0, 60*60*1000L);
    
    timer.schedule(new BackupCleanTask(), 0, 60*60*1000L);
    
}

           

CollectTask類:

這個類要繼承TimerTask,重寫run方法,主要内容就是不斷收集日志檔案

package cn.edu360.hdfs.datacollect;

import java.io.File;

import java.io.FilenameFilter;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Properties;

import java.util.TimerTask;

import java.util.UUID;

import org.apache.commons.io.FileUtils;

import org.apache.log4j.Logger;

public class CollectTask extends TimerTask {

@Override
public void run() {
    try {
        // 擷取配置參數
        Properties props = PropertyHolderLazy.getProps();

        // 擷取本次采集時的日期
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
        String day = sdf.format(new Date());
        
        File srcDir = new File("d:/logs/accesslog");
        // 列出日志源目錄中需要采集的檔案
        //裡面傳了一個檔案過濾器,重寫accept方法,return true就要
        File[] listFiles = srcDir.listFiles(new FilenameFilter() {
            @Override
            public boolean accept(File dir, String name) {
                if (name.startsWith("access.log")) {
                    return true;
                }
                return false;
            }
        });
        // 将要采集的檔案移動到待上傳臨時目錄
        File toUploadDir = new File("d:/logs/toupload");
        for (File file : listFiles) {
            
            //這裡如果是 file.renameTo(toUploadDir)是不對的,因為會生成一個toupload的檔案而不是檔案夾
            //要用renameTo的話你要自己加上檔案的新名字比較麻煩
            //用FileUtiles是對file操作的一些工具類
            //第一個目标檔案,第二個路徑,第三個是否存在覆寫
            FileUtils.moveFileToDirectory(file, toUploadDir, true);
        }

        // 構造一個HDFS的用戶端對象
        FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), new Configuration(), "root");
        
        File[] toUploadFiles = toUploadDir.listFiles();

        // 檢查HDFS中的日期目錄是否存在,如果不存在,則建立
        Path hdfsDestPath = new Path("/logs" + day);
        if (!fs.exists(hdfsDestPath)) {
            fs.mkdirs(hdfsDestPath);
        }

        // 檢查本地的備份目錄是否存在,如果不存在,則建立
        File backupDir = new File("d:/logs/backup" + day + "/");
        if (!backupDir.exists()) {
            backupDir.mkdirs();
        }

        for (File file : toUploadFiles) {
            // 傳輸檔案到HDFS并改名access_log_
            fs.copyFromLocalFile(new Path(file.getAbsolutePath()), new Path("/logs"+day+"/access_log_"+UUID.randomUUID()+".log"));

            // 将傳輸完成的檔案移動到備份目錄
            //注意這裡依然不要用renameTo
            FileUtils.moveFileToDirectory(file, backupDir, true);
        }

    } catch (Exception e) {
        e.printStackTrace();
    }

}           

/**

* 讀取hdfs中的檔案的内容
 * 
 * @throws IOException
 * @throws IllegalArgumentException
 */
@Test
public void testReadData() throws IllegalArgumentException, IOException {

    FSDataInputStream in = fs.open(new Path("/test.txt"));

    BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));

    String line = null;
    while ((line = br.readLine()) != null) {
        System.out.println(line);
    }

    br.close();
    in.close();
    fs.close();

}

/**
 * 讀取hdfs中檔案的指定偏移量範圍的内容
 * 
 * 
 * 作業題:用本例中的知識,實作讀取一個文本檔案中的指定BLOCK塊中的所有資料
 * 
 * @throws IOException
 * @throws IllegalArgumentException
 */
@Test
public void testRandomReadData() throws IllegalArgumentException, IOException {

    FSDataInputStream in = fs.open(new Path("/xx.dat"));

    // 将讀取的起始位置進行指定
    in.seek(12);

    // 讀16個位元組
    byte[] buf = new byte[16];
    in.read(buf);

    System.out.println(new String(buf));

    in.close();
    fs.close();

}

/**
 * 往hdfs中的檔案寫内容
 * 
 * @throws IOException
 * @throws IllegalArgumentException
 */

@Test
public void testWriteData() throws IllegalArgumentException, IOException {

    FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false);

    // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg

    FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg");

    byte[] buf = new byte[1024];
    int read = 0;
    while ((read = in.read(buf)) != -1) {
        out.write(buf,0,read);
    }
    
    in.close();
    out.close();
    fs.close();

}           

BackupCleanTask類

public class BackupCleanTask extends TimerTask {

@Override
public void run() {

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
    long now = new Date().getTime();
    try {
        // 探測本地備份目錄
        File backupBaseDir = new File("d:/logs/backup/");
        File[] dayBackDir = backupBaseDir.listFiles();

        // 判斷備份日期子目錄是否已超24小時
        for (File dir : dayBackDir) {
            long time = sdf.parse(dir.getName()).getTime();
            if(now-time>24*60*60*1000L){
                FileUtils.deleteDirectory(dir);
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }

}
           

hdfs中namenode中儲存中繼資料(對資料的描述資訊)是在記憶體中以樹的形式儲存的,并且每隔一段時間都會把這些中繼資料序列化到磁盤中。序列化的東西在磁盤中叫 fsimage檔案。

中繼資料可能會很大很大,是以隻能是定期的序列化

問題1:序列化的時候,發生了中繼資料的修改怎麼辦

答:namenode會把每次使用者的操作都記錄下來,記錄成日志檔案,存在edits日志檔案中

其中edits日志檔案也會像log4j滾動日志檔案一樣,當檔案太大的時候會另起一個檔案并改名字

問題2:當edits檔案太多的時候,一次當機也會花大量的時間從edits裡恢複,怎麼辦

答:會定期吧edits檔案重放fsimage檔案,并記錄edits的編号,把那些重放過的日志檔案給删除。這樣也相當于重新序列化了,

是以namenode并不會做這樣的事情,是由secondary node做的,他會定期吧namenode的fsimage檔案和edits檔案下載下傳下來

并把fsimage檔案反序列化,并且讀日志檔案更新中繼資料,然後序列化到磁盤,然後把他上傳給namenode。

這個機制叫做checkpoint機制

這裡secondarynode 相當一一個小秘書

用戶端寫資料到hdfs的流程

上面是建立響應的過程

然後 是傳遞檔案block塊的過程

用戶端從hdfs讀資料流程

額外知識點

注意,在windows裡面不要寫有些路徑不要寫絕對路徑,因為程式放到linux下面可能會找不到,是以報錯

一般使用class加載器,這樣當這個class加載的時候就會知道這個class在哪

類加載器的一些使用例子

比如我加載一個配置檔案,為了避免出現絕對路徑,我們可以是用類加載器

     Properties props = new Properties();

//加載配置檔案,這樣寫的目的是為了避免在windows裡出現絕對路徑,用類加載器,再把檔案傳化成流
    props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream("job.properties"));
           

而對于一些功能性的類,我們最好在寫邏輯的時候也不要直接去導入這個包,而是使用Class.forName

//這樣不直接導入這個包,直接用類加載器,是面向接口程式設計的一種思想。這裡我并不是在開始import xxxx.Mapper,這裡Mapper是一個接口,這裡我用了多态

Class<?> mapper_class = Class.forName(props.getProperty("MAPPER_CLASS"));
    Mapper mapper = (Mapper) mapper_class.newInstance();
           

單例模式

https://www.cnblogs.com/crazy-wang-android/p/9054771.html

隻有個一執行個體,必須自己建立自己這個執行個體,必須為别人提供這個執行個體

餓漢式單例:就算沒有人調用這個class,他也會加載進去;

如對于一個配置檔案的加載

  • 單例設計模式,方式一: 餓漢式單例

    *

*/

public class PropertyHolderHungery {

private static Properties prop = new Properties();

static {
    try {
        //将一個檔案prop.load(stram)  
        //這裡面如果傳一個IO流不好,因為要用到絕對路徑,使用了類加載器  這種不管有沒有使用這個類都會加載
        prop.load(PropertyHolderHungery.class.getClassLoader().getResourceAsStream("collect.properties"));
    } catch (Exception e) {

    }
}
public static Properties getProps() throws Exception {
    return prop;
}
           

懶漢式:隻有調用的時候才會有,但會有線程安全問題

  • 單例模式:懶漢式——考慮了線程安全

public class PropertyHolderLazy {

private static Properties prop = null;

public static Properties getProps() throws Exception {
    if (prop == null) {
        synchronized (PropertyHolderLazy.class) {
            if (prop == null) {
                prop = new Properties();
                prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties"));
            }
        }
    }
    return prop;
}
           

原文位址

https://www.cnblogs.com/wpbing/archive/2019/07/23/11233399.html