天天看點

hadoop寫流程

1.調用用戶端對象Distributed FileSystem的create方法

2.Distributed FileSystem會向NameNode發起一個RPC連接配接,請求建立一個檔案,NameNode會通過一系列的檢查,判斷要建立的檔案是否存在以及用戶端是否有建立該檔案的權限。

若要建立的檔案不存在,以及用戶端存在建立該檔案的權限。NameNode會建立該檔案,實質上是向edits log檔案中寫入,若建立失敗,用戶端會抛出IOException。若檔案存在或使用者無權限,抛出異常結束

3.建立成功後,Distributed FileSystem會向用戶端傳回FSDataOutputStream,用于寫資料,FSDataOutputStream中封裝了DFSOutputStream,用于用戶端與NameNode和DataNode進行通信

4.FSDataOutputStream會将資料切分一個個小的資料包(64kb),并寫入到一個内部序列DataStreamer會讀取其中的内容,并向NameNode請求擷取存放目前block副本的DataNode清單。清單中的DataNode會根據與用戶端的距離,形成一個管線。DataStreamer會将讀取到的資料包發送給第一個DataNode,當第一個資料包發送完成後,DataStreamer會接着向第一個DataNode發送第二個資料包,此時第一個DataNode會将接受到的第一個資料包通過管線發送給第二個DataNode,在第一個DataNode将資料包發送完後之後,第二個會将這個資料包發送給第三個資料包。這樣就可以利用時間差加快資料的傳輸。在第一個接收完資料包後,會将該資料包寫入到用戶端的确認隊列中,當管線中所有的datanode寫入完成後,從确認隊列中删除該資料包。

如果在寫入資料期間,DataNode發送故障,則執行以下操作

a)關閉管線,把确認隊列中的所有包添加回資料隊列的最前端,以保證故障節點下遊的節點不會漏掉任意一個包

    b)在存儲在另一個節點處的目前資料塊指定一個标志,并将這個标志告知NameNode,以便故障節點恢複後,可以将已經存儲的資料包删除

    c)删除管線中的故障節點,并将剩餘資料包寫入其它正常的DataNode,NameNode檢測到副本數不足時,會在另一個DataNode上建立新的副本       

    d)後續的資料庫正常處理
           

5.DFSOutputStream中有一個資料包隊列,該隊列中的資料包是需要寫入DataNode的,該隊列被稱為确認隊列ack(用戶端存放,這是第二個隊列),當一個資料包寫入管線中所有DataNode,會将該資料包從隊列中移除

6.如果還有要寫入的block,則重複4,5操作

7.當用戶端完成資料的傳輸(是否是将最後一個的block寫入到确認隊列),調用資料流的close()方法。該方法将資料隊列中的所有資料包寫入到管線中,等待管線的确認

8.用戶端收到管線中所有DataNode的确認消息後,通知NameNode寫入完成

9.namenode已經知道檔案由哪些塊組成,是以它在傳回成功前隻需要等待資料塊進行最小量的複制(複制的是什麼??? 複制的是副本,達到最小副本數)

package com.bjsxt.hadoop.test;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class HadoopDemo1 {

	private Configuration conf;
	private FileSystem fs;
	
	@Before
	public void init() throws IOException, InterruptedException, URISyntaxException {
		conf = new Configuration(true);
		fs = FileSystem.get(new URI("hdfs://mycluster"), conf, "root");
	}
	
	/**
	 * 檔案上傳
	 * @throws IOException
	 * @throws InterruptedException
	 * @throws URISyntaxException
	 */
	@Test
	public void testWrite() throws IOException, InterruptedException, URISyntaxException {
		//上傳的檔案
		FileInputStream fis = new FileInputStream("/Users/zzw/Desktop/hello1.txt");
		//上傳到的位置
		Path path = new Path("/hello.txt");
		FSDataOutputStream dos = fs.create(path);
		//通過hadoop工具類直接将檔案輸出流寫入到輸出流上
		IOUtils.copyBytes(fis, dos, conf);
		//關閉輸入輸出流
		fis.close();
		dos.flush();
		dos.close();
	}

	@After
	public void close() throws IOException {
		if(fs != null) {
			fs.close();
		}
	}
	
}
           

繼續閱讀