1.調用用戶端對象Distributed FileSystem的create方法
2.Distributed FileSystem會向NameNode發起一個RPC連接配接,請求建立一個檔案,NameNode會通過一系列的檢查,判斷要建立的檔案是否存在以及用戶端是否有建立該檔案的權限。
若要建立的檔案不存在,以及用戶端存在建立該檔案的權限。NameNode會建立該檔案,實質上是向edits log檔案中寫入,若建立失敗,用戶端會抛出IOException。若檔案存在或使用者無權限,抛出異常結束
3.建立成功後,Distributed FileSystem會向用戶端傳回FSDataOutputStream,用于寫資料,FSDataOutputStream中封裝了DFSOutputStream,用于用戶端與NameNode和DataNode進行通信
4.FSDataOutputStream會将資料切分一個個小的資料包(64kb),并寫入到一個内部序列DataStreamer會讀取其中的内容,并向NameNode請求擷取存放目前block副本的DataNode清單。清單中的DataNode會根據與用戶端的距離,形成一個管線。DataStreamer會将讀取到的資料包發送給第一個DataNode,當第一個資料包發送完成後,DataStreamer會接着向第一個DataNode發送第二個資料包,此時第一個DataNode會将接受到的第一個資料包通過管線發送給第二個DataNode,在第一個DataNode将資料包發送完後之後,第二個會将這個資料包發送給第三個資料包。這樣就可以利用時間差加快資料的傳輸。在第一個接收完資料包後,會将該資料包寫入到用戶端的确認隊列中,當管線中所有的datanode寫入完成後,從确認隊列中删除該資料包。
如果在寫入資料期間,DataNode發送故障,則執行以下操作
a)關閉管線,把确認隊列中的所有包添加回資料隊列的最前端,以保證故障節點下遊的節點不會漏掉任意一個包
b)在存儲在另一個節點處的目前資料塊指定一個标志,并将這個标志告知NameNode,以便故障節點恢複後,可以将已經存儲的資料包删除
c)删除管線中的故障節點,并将剩餘資料包寫入其它正常的DataNode,NameNode檢測到副本數不足時,會在另一個DataNode上建立新的副本
d)後續的資料庫正常處理
5.DFSOutputStream中有一個資料包隊列,該隊列中的資料包是需要寫入DataNode的,該隊列被稱為确認隊列ack(用戶端存放,這是第二個隊列),當一個資料包寫入管線中所有DataNode,會将該資料包從隊列中移除
6.如果還有要寫入的block,則重複4,5操作
7.當用戶端完成資料的傳輸(是否是将最後一個的block寫入到确認隊列),調用資料流的close()方法。該方法将資料隊列中的所有資料包寫入到管線中,等待管線的确認
8.用戶端收到管線中所有DataNode的确認消息後,通知NameNode寫入完成
9.namenode已經知道檔案由哪些塊組成,是以它在傳回成功前隻需要等待資料塊進行最小量的複制(複制的是什麼??? 複制的是副本,達到最小副本數)
package com.bjsxt.hadoop.test;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class HadoopDemo1 {
private Configuration conf;
private FileSystem fs;
@Before
public void init() throws IOException, InterruptedException, URISyntaxException {
conf = new Configuration(true);
fs = FileSystem.get(new URI("hdfs://mycluster"), conf, "root");
}
/**
* 檔案上傳
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
@Test
public void testWrite() throws IOException, InterruptedException, URISyntaxException {
//上傳的檔案
FileInputStream fis = new FileInputStream("/Users/zzw/Desktop/hello1.txt");
//上傳到的位置
Path path = new Path("/hello.txt");
FSDataOutputStream dos = fs.create(path);
//通過hadoop工具類直接将檔案輸出流寫入到輸出流上
IOUtils.copyBytes(fis, dos, conf);
//關閉輸入輸出流
fis.close();
dos.flush();
dos.close();
}
@After
public void close() throws IOException {
if(fs != null) {
fs.close();
}
}
}