天天看點

hadoop學習筆記——NO.5_HADOOP分布式檔案系統HDFSHadoop分布式檔案系統HDFS

Hadoop分布式檔案系統HDFS

HDFS前言

設計思想

分而治之:将大檔案、大批量檔案,分布式存放在大量伺服器上,以便于采取分而治之的方式對海量資料進行運算分析
           

在大資料系統中作用

為各類分布式運算架構(如:mapreduce,spark,tez,……)提供資料存儲服務
           

重點概念:檔案切塊,副本存放,中繼資料

HDFS的概念和特性

首先,它是一個檔案系統,用于存儲檔案,通過統一的命名空間——目錄樹來定位檔案。
其次,它是分布式的,由很多伺服器聯合起來實作其功能,叢集中的伺服器有各自的角色。
           

重要特性如下:

  1. HDFS中的檔案在實體上是分塊存儲(block),塊的大小可以通過配置參數( dfs.blocksize)來規定,預設大小在hadoop2.x版本中是128M,老版本中是64M
  2. HDFS檔案系統會給用戶端提供一個統一的抽象目錄樹,用戶端通過路徑來通路檔案,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data
  3. 目錄結構及檔案分塊資訊(中繼資料)的管理由namenode節點承擔

    ——namenode是HDFS叢集主節點,負責維護整個hdfs檔案系統的目錄樹,以及每一個路徑(檔案)所對應的block塊資訊(block的id,及所在的datanode伺服器)

  4. 檔案的各個block的存儲管理由datanode節點承擔

    —- datanode是HDFS叢集從節點,每一個block都可以在多個datanode上存儲多個副本(副本數量也可以通過參數設定dfs.replication)

  5. HDFS是設計成适應一次寫入,多次讀出的場景,且不支援檔案的修改

    (注:适合用來做資料分析,并不适合用來做網盤應用,因為,不便修改,延遲大,網絡開銷大,成本太高)

HDFS基本操作

HDFS的shell(指令行用戶端)操作

指令行用戶端支援的指令參數
           
$hadoop fs -ls /

        [-appendToFile <localsrc> ... <dst>]
        [-cat [-ignoreCrc] <src> ...]
        [-checksum <src> ...]
        [-chgrp [-R] GROUP PATH...]
        [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
        [-chown [-R] [OWNER][:[GROUP]] PATH...]
        [-copyFromLocal [-f] [-p] <localsrc> ... <dst>]
        [-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
        [-count [-q] <path> ...]
        [-cp [-f] [-p] <src> ... <dst>]
        [-createSnapshot <snapshotDir> [<snapshotName>]]
        [-deleteSnapshot <snapshotDir> <snapshotName>]
        [-df [-h] [<path> ...]]
        [-du [-s] [-h] <path> ...]
        [-expunge]
        [-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
        [-getfacl [-R] <path>]
        [-getmerge [-nl] <src> <localdst>]
        [-help [cmd ...]]
        [-ls [-d] [-h] [-R] [<path> ...]]
        [-mkdir [-p] <path> ...]
        [-moveFromLocal <localsrc> ... <dst>]
        [-moveToLocal <src> <localdst>]
        [-mv <src> ... <dst>]
        [-put [-f] [-p] <localsrc> ... <dst>]
        [-renameSnapshot <snapshotDir> <oldName> <newName>]
        [-rm [-f] [-r|-R] [-skipTrash] <src> ...]
        [-rmdir [--ignore-fail-on-non-empty] <dir> ...]
        [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
        [-setrep [-R] [-w] <rep> <path> ...]
        [-stat [format] <path> ...]
        [-tail [-f] <file>]
        [-test -[defsz] <path>]
        [-text [-ignoreCrc] <src> ...]
        [-touchz <path> ...]
        [-usage [cmd ...]]
           

常用指令參數介紹

-help             
功能:輸出這個指令參數手冊
-ls                  
功能:顯示目錄資訊
示例: hadoop fs -ls hdfs://hadoop-server01:9000/
備注:這些參數中,所有的hdfs路徑都可以簡寫
hadoop fs -ls /   等同于上一條指令的效果

-mkdir              
功能:在hdfs上建立目錄
示例:hadoop fs  -mkdir  -p  /aaa/bbb/cc/dd

-moveFromLocal            
功能:從本地剪切粘貼到hdfs
示例:hadoop  fs  - moveFromLocal  /home/hadoop/a.txt  /aaa/bbb/cc/dd

-moveToLocal              
功能:從hdfs剪切粘貼到本地
示例:hadoop  fs  - moveToLocal   /aaa/bbb/cc/dd  /home/hadoop/a.txt 

--appendToFile  
功能:追加一個檔案到已經存在的檔案末尾
示例:hadoop  fs  -appendToFile  ./hello.txt 
hdfs://hadoop-server01:9000/hello.txt
可以簡寫為:
hadoop  fs  -appendToFile  ./hello.txt  /hello.txt

-cat  
功能:顯示檔案内容  
示例:hadoop fs -cat  /hello.txt

-tail                 
功能:顯示一個檔案的末尾
示例:hadoop  fs  -tail  /weblog/access_log

-text                  
功能:以字元形式列印一個檔案的内容
示例:hadoop  fs  -text  /weblog/access_log

-chgrp 
-chmod
-chown
功能:linux檔案系統中的用法一樣,對檔案所屬權限
示例:
hadoop  fs  -chmod    /hello.txt
hadoop  fs  -chown  someuser:somegrp   /hello.txt

-copyFromLocal    
功能:從本地檔案系統中拷貝檔案到hdfs路徑去
示例:hadoop  fs  -copyFromLocal  ./jdk.tar.gz  /aaa/

-copyToLocal      
功能:從hdfs拷貝到本地
示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz

-cp              
功能:從hdfs的一個路徑拷貝hdfs的另一個路徑
示例: hadoop  fs  -cp  /aaa/jdk.tar.gz  /bbb/jdk.tar.gz

-mv                     
功能:在hdfs目錄中移動檔案
示例: hadoop  fs  -mv  /aaa/jdk.tar.gz  /

-get              
功能:等同于copyToLocal,就是從hdfs下載下傳檔案到本地
示例:hadoop fs -get  /aaa/jdk.tar.gz

-getmerge             
功能:合并下載下傳多個檔案
示例:比如hdfs的目錄 /aaa/下有多個檔案:log, log,log,...
hadoop fs -getmerge /aaa/log.* ./log.sum

-put                
功能:等同于copyFromLocal
示例:hadoop  fs  -put  /aaa/jdk.tar.gz  /bbb/jdk.tar.gz

-rm                
功能:删除檔案或檔案夾
示例:hadoop fs -rm -r /aaa/bbb/

-rmdir                 
功能:删除空目錄
示例:hadoop  fs  -rmdir   /aaa/bbb/ccc
-df               
功能:統計檔案系統的可用空間資訊
示例:hadoop  fs  -df  -h  /

-count         
功能:統計一個指定目錄下的檔案節點數量
示例:hadoop fs -count /aaa/

-setrep                
功能:設定hdfs中檔案的副本數量
示例:hadoop fs -setrep  /aaa/jdk.tar.gz

-du 
功能:統計檔案夾的大小資訊
示例:hadoop  fs  -du  -s  -h /aaa/*
           

HDFS原理

hdfs的工作機制

工作機制的學習主要是為加深對分布式系統的了解,以及增強遇到各種問題時的分析解決能力,形成一定的叢集運維能力
很多不是真正了解hadoop技術體系的人會常常覺得HDFS可用于網盤類應用,但實際并非如此。要想将技術準确用在恰當的地方,必須對技術有深刻的了解
           

概述

  1. HDFS叢集分為兩大角色:NameNode、DataNode
  2. NameNode負責管理整個檔案系統的中繼資料
  3. DataNode 負責管理使用者的檔案資料塊
  4. 檔案會按照固定的大小(blocksize)切成若幹塊後分布式存儲在若幹台datanode上
  5. 每一個檔案塊可以有多個副本,并存放在不同的datanode上
  6. Datanode會定期向Namenode彙報自身所儲存的檔案block資訊,而namenode則會負責保持檔案的副本數量
  7. HDFS的内部工作機制對用戶端保持透明,用戶端請求通路HDFS都是通過向namenode申請來進行

HDFS寫資料流程

用戶端要向HDFS寫資料,首先要跟namenode通信以确認可以寫檔案并獲得接收檔案block的datanode,然後,用戶端按順序将檔案逐個block傳遞給相應datanode,并由接收到block的datanode負責向其他datanode複制block的副本
           

詳細步驟圖

hadoop學習筆記——NO.5_HADOOP分布式檔案系統HDFSHadoop分布式檔案系統HDFS

詳細步驟解析

  1. 根namenode通信請求上傳檔案,namenode檢查目标檔案是否已存在,父目錄是否存在
  2. namenode傳回是否可以上傳
  3. client請求第一個 block該傳輸到哪些datanode伺服器上
  4. namenode傳回3個datanode伺服器ABC
  5. client請求3台dn中的一台A上傳資料(本質上是一個RPC調用,建立pipeline),A收到請求會繼續調用B,然後B調用C,将真個pipeline建立完成,逐級傳回用戶端
  6. client開始往A上傳第一個block(先從磁盤讀取資料放到一個本地記憶體緩存),以packet為機關,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答隊列等待應答
  7. 當一個block傳輸完成之後,client再次請求namenode上傳第二個block的伺服器。

HDFS讀資料流程

用戶端将要讀取的檔案路徑發送給namenode,namenode擷取檔案的元資訊(主要是block的存放位置資訊)傳回給用戶端,用戶端根據傳回的資訊找到相應datanode逐個擷取檔案的block并在用戶端本地進行資料追加合并進而獲得整個檔案
           

詳細步驟圖

hadoop學習筆記——NO.5_HADOOP分布式檔案系統HDFSHadoop分布式檔案系統HDFS

詳細步驟解析

  1. 跟namenode通信查詢中繼資料,找到檔案塊所在的datanode伺服器
  2. 挑選一台datanode(就近原則,然後随機)伺服器,請求建立socket流
  3. datanode開始發送資料(從磁盤裡面讀取資料放入流,以packet為機關來做校驗)
  4. 用戶端以packet為機關接收,現在本地緩存,然後寫入目标檔案

NAMENODE工作機制

學習目标:

了解namenode的工作機制尤其是中繼資料管理機制,以增強對HDFS工作原理的了解,及培養hadoop叢集營運中“性能調優”、“namenode”故障問題的分析解決能力

問題場景:

1. 叢集啟動後,可以檢視檔案,但是上傳檔案時報錯,打開web頁面可看到namenode正處于safemode狀态,怎麼處理?

2. Namenode伺服器的磁盤故障導緻namenode當機,如何挽救叢集及資料?

3. Namenode是否可以有多個?namenode記憶體要配置多大?namenode跟叢集資料存儲能力有關系嗎?

4. 檔案的blocksize究竟調大好還是調小好?

……

諸如此類問題的回答,都需要基于對namenode自身的工作原理的深刻了解

NameNode職責

負責用戶端請求的響應

中繼資料的管理(查詢,修改)

中繼資料管理

namenode對資料的管理采用了三種存儲形式:

1. 記憶體中繼資料(NameSystem)

2. 磁盤中繼資料鏡像檔案

3. 資料記錄檔檔案(可通過日志運算出中繼資料)

中繼資料存儲機制

  1. 記憶體中有一份完整的中繼資料(記憶體meta data)
  2. 磁盤有一個“準完整”的中繼資料鏡像(fsimage)檔案(在namenode的工作目錄中)
  3. 用于銜接記憶體metadata和持久化中繼資料鏡像fsimage之間的記錄檔(edits檔案)注:當用戶端對hdfs中的檔案進行新增或者修改操作,操作記錄首先被記入edits日志檔案中,當用戶端操作成功後,相應的中繼資料會更新到記憶體meta.data中

中繼資料手動檢視

可以通過hdfs的一個工具來檢視edits中的資訊

bin/hdfs oev -i edits -o edits.xml
bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml
           

中繼資料的checkpoint

每隔一段時間,會由secondary namenode将namenode上積累的所有edits和一個最新的fsimage下載下傳到本地,并加載到記憶體進行merge(這個過程稱為checkpoint)
           
checkpoint的詳細過程
hadoop學習筆記——NO.5_HADOOP分布式檔案系統HDFSHadoop分布式檔案系統HDFS
checkpoint操作的觸發條件配置參數
dfs.namenode.checkpoint.check.period=  #檢查觸發條件是否滿足的頻率,60秒
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
#以上兩個參數做checkpoint操作時,secondary namenode的本地工作目錄
dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}

dfs.namenode.checkpoint.max-retries=  #最大重試次數
dfs.namenode.checkpoint.period=  #兩次checkpoint之間的時間間隔3600秒
dfs.namenode.checkpoint.txns= #兩次checkpoint之間最大的操作記錄
checkpoint的附帶作用
namenode和secondary namenode的工作目錄存儲結構完全相同,是以,當namenode故障退出需要重新恢複時,可以從secondary namenode的工作目錄中将fsimage拷貝到namenode的工作目錄,以恢複namenode的中繼資料
           

DataNode的工作機制

問題場景:

1. 叢集容量不夠,怎麼擴容?

2. 如果有一些datanode當機,該怎麼辦?

3. datanode明明已啟動,但是叢集中的可用datanode清單中就是沒有,怎麼辦?

……

以上這類問題的解答,有賴于對datanode工作機制的深刻了解

概述

1. Datanode工作職責:

存儲管理使用者的檔案塊資料

定期向namenode彙報自身所持有的block資訊(通過心跳資訊上報)

(這點很重要,因為,當叢集中發生某些block副本失效時,叢集如何恢複block初始副本數量的問題)

<property>
    <name>dfs.blockreport.intervalMsec</name>
    <value>3600000</value>
    <description>Determines block reporting interval in milliseconds.</description>
</property>
           
2. DataNode掉線判斷時限參數

datanode程序死亡或者網絡故障造成datanode無法與namenode通信,namenode不會立即把該節點判定為死亡,要經過一段時間,這段時間暫稱作逾時時長。HDFS預設的逾時時長為10分鐘+30秒。如果定義逾時時間為timeout,則逾時時長的計算公式為:

timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。

而預設的heartbeat.recheck.interval 大小為5分鐘,dfs.heartbeat.interval預設為3秒。

需要注意的是hdfs-site.xml 配置檔案中的heartbeat.recheck.interval的機關為毫秒,dfs.heartbeat.interval的機關為秒。是以,舉個例子,如果heartbeat.recheck.interval設定為5000(毫秒),dfs.heartbeat.interval設定為3(秒,預設),則總的逾時時間為40秒。

<property>
        <name>heartbeat.recheck.interval</name>
        <value>2000</value>
</property>
<property>
        <name>dfs.heartbeat.interval</name>
        <value>1</value>
</property>
           

觀察驗證DATANODE功能

上傳一個檔案,觀察檔案的block具體的實體存放情況:
           

在每一台datanode機器上的這個目錄中能找到檔案的切塊:

HDFS應用開發

HDFS的java操作

hdfs在生産應用中主要是用戶端的開發,其核心步驟是從hdfs提供的api中構造一個HDFS的通路用戶端對象,然後通過該用戶端對象操作(增删改查)HDFS上的檔案
           

搭建開發環境

1. 引入依賴

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.1</version>
</dependency>
           
注:如需手動引入jar包,hdfs的jar包—-hadoop的安裝目錄的share下

2. window下開發的說明

建議在linux下進行hadoop應用的開發,不會存在相容性問題。如在window上做用戶端應用開發,需要設定以下環境:

A、在windows的某個目錄下解壓一個hadoop的安裝包

B、将安裝包下的lib和bin目錄用對應windows版本平台編譯的本地庫替換

C、在window系統中配置HADOOP_HOME指向你解壓的安裝包

D、在windows系統的path變量中加入hadoop的bin目錄

擷取api中的用戶端對象

在java中操作hdfs,首先要獲得一個用戶端執行個體
           
Configuration conf = new Configuration()
FileSystem fs = FileSystem.get(conf)
           

而我們的操作目标是HDFS,是以擷取到的fs對象應該是DistributedFileSystem的執行個體。

get方法是從何處判斷具體執行個體化那種用戶端類呢?

從conf中的一個參數 fs.defaultFS的配置值判斷。

如果我們的代碼中沒有指定fs.defaultFS,并且工程classpath下也沒有給定相應的配置,conf中的預設值就來自于hadoop的jar包中的core-default.xml,預設值為: file:///,則擷取的将不是一個DistributedFileSystem的執行個體,而是一個本地檔案系統的用戶端對象

HDFS用戶端操作資料代碼示例:

檔案的增删改查

public class HdfsClient {

    FileSystem fs = null;

    @Before
    public void init() throws Exception {

        // 構造一個配置參數對象,設定一個參數:我們要通路的hdfs的URI
        // 進而FileSystem.get()方法就知道應該是去構造一個通路hdfs檔案系統的用戶端,以及hdfs的通路位址
        // new Configuration();的時候,它就會去加載jar包中的hdfs-default.xml
        // 然後再加載classpath下的hdfs-site.xml
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");
        /**
         * 參數優先級: 1、用戶端代碼中設定的值 2、classpath下的使用者自定義配置檔案 3、然後是伺服器的預設配置
         */
        conf.set("dfs.replication", "3");

        // 擷取一個hdfs的通路用戶端,根據參數,這個執行個體應該是DistributedFileSystem的執行個體
        // fs = FileSystem.get(conf);

        // 如果這樣去擷取,那conf裡面就可以不要配"fs.defaultFS"參數,而且,這個用戶端的身份辨別已經是hadoop使用者
        fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");

    }

    /**
     * 往hdfs上傳檔案
     * 
     * @throws Exception
     */
    @Test
    public void testAddFileToHdfs() throws Exception {

        // 要上傳的檔案所在的本地路徑
        Path src = new Path("g:/redis-recommend.zip");
        // 要上傳到hdfs的目标路徑
        Path dst = new Path("/aaa");
        fs.copyFromLocalFile(src, dst);
        fs.close();
    }

    /**
     * 從hdfs中複制檔案到本地檔案系統
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     */
    @Test
    public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
        fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));
        fs.close();
    }

    @Test
    public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {

        // 建立目錄
        fs.mkdirs(new Path("/a1/b1/c1"));

        // 删除檔案夾 ,如果是非空檔案夾,參數2必須給值true
        fs.delete(new Path("/aaa"), true);

        // 重命名檔案或檔案夾
        fs.rename(new Path("/a1"), new Path("/a2"));

    }

    /**
     * 檢視目錄資訊,隻顯示檔案
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     * @throws FileNotFoundException
     */
    @Test
    public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {

        // 思考:為什麼傳回疊代器,而不是List之類的容器
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

        while (listFiles.hasNext()) {
            LocatedFileStatus fileStatus = listFiles.next();
            System.out.println(fileStatus.getPath().getName());
            System.out.println(fileStatus.getBlockSize());
            System.out.println(fileStatus.getPermission());
            System.out.println(fileStatus.getLen());
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            for (BlockLocation bl : blockLocations) {
                System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
                String[] hosts = bl.getHosts();
                for (String host : hosts) {
                    System.out.println(host);
                }
            }
            System.out.println("--------------為angelababy列印的分割線--------------");
        }
    }

    /**
     * 檢視檔案及檔案夾資訊
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     * @throws FileNotFoundException
     */
    @Test
    public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {

        FileStatus[] listStatus = fs.listStatus(new Path("/"));

        String flag = "d--             ";
        for (FileStatus fstatus : listStatus) {
            if (fstatus.isFile())  flag = "f--         ";
            System.out.println(flag + fstatus.getPath().getName());
        }
    }
}
           

通過流的方式通路hdfs

/**
 * 相對那些封裝好的方法而言的更底層一些的操作方式
 * 上層那些mapreduce   spark等運算架構,去hdfs中擷取資料的時候,就是調的這種底層的api
 * @author
 *
 */
public class StreamAccess {

    FileSystem fs = null;

    @Before
    public void init() throws Exception {

        Configuration conf = new Configuration();
        fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");

    }



    @Test
    public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{

        //先擷取一個檔案的輸入流----針對hdfs上的
        FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));

        //再構造一個檔案的輸出流----針對本地的
        FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));

        //再将輸入流中資料傳輸到輸出流
        IOUtils.copyBytes(in, out, );


    }


    /**
     * hdfs支援随機定位進行檔案讀取,而且可以友善地讀取指定長度
     * 用于上層分布式運算架構并發處理資料
     * @throws IllegalArgumentException
     * @throws IOException
     */
    @Test
    public void testRandomAccess() throws IllegalArgumentException, IOException{
        //先擷取一個檔案的輸入流----針對hdfs上的
        FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));


        //可以将流的起始偏移量進行自定義
        in.seek();

        //再構造一個檔案的輸出流----針對本地的
        FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));

        IOUtils.copyBytes(in,out,L,true);

    }



    /**
     * 顯示hdfs上檔案的内容
     * @throws IOException 
     * @throws IllegalArgumentException 
     */
    @Test
    public void testCat() throws IllegalArgumentException, IOException{

        FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));

        IOUtils.copyBytes(in, System.out, );
    }
}
           

場景程式設計

/**在mapreduce 、spark等運算架構中,有一個核心思想就是将運算移往資料,或者說,就是要在并發計算中盡可能讓運算本地化,這就需要擷取資料所在位置的資訊并進行相應範圍讀取
以下模拟實作:擷取一個檔案的所有block位置資訊,然後讀取指定block中的内容*/
    @Test
    public void testCat() throws IllegalArgumentException, IOException{

        FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10"));
        //拿到檔案資訊
        FileStatus[] listStatus = fs.listStatus(new Path("/weblog/input/access.log.10"));
        //擷取這個檔案的所有block的資訊
        BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[], L, listStatus[].getLen());
        //第一個block的長度
        long length = fileBlockLocations[].getLength();
        //第一個block的起始偏移量
        long offset = fileBlockLocations[].getOffset();

        System.out.println(length);
        System.out.println(offset);

        //擷取第一個block寫入輸出流
//      IOUtils.copyBytes(in, System.out, (int)length);
        byte[] b = new byte[];

        FileOutputStream os = new FileOutputStream(new File("d:/block0"));
        while(in.read(offset, b, , )!=-){
            os.write(b);
            offset += ;
            if(offset>=length) return;
        };
        os.flush();
        os.close();
        in.close();
    }
           

繼續閱讀