Hadoop學習(2)-java用戶端操作hdfs及secondarynode作用
首先要在windows下解壓一個windows版本的hadoop
然後在配置他的環境變量,同時要把hadoop的share目錄下的hadoop下的相關jar包拷貝到esclipe
然後Build Path
下面上代碼
複制代碼
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Before;
import org.junit.Test;
public class HdfsClientDemo {
public static void main(String[] args) throws Exception {
/**
* Configuration參數對象的機制:
* 構造時,會加載jar包中的預設配置 xx-default.xml
* 再加載 使用者配置xx-site.xml ,覆寫掉預設參數
* 構造完成之後,還可以conf.set("p","v"),會再次覆寫使用者配置檔案中的參數值
*/
// new Configuration()會從項目的classpath中加載core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等檔案
Configuration conf = new Configuration();
// 指定本用戶端上傳檔案到hdfs時需要儲存的副本數為:2
conf.set("dfs.replication", "2");
// 指定本用戶端上傳檔案到hdfs時切塊的規格大小:64M
conf.set("dfs.blocksize", "64m");
// 構造一個通路指定HDFS系統的用戶端對象: 參數1:——HDFS系統的URI,參數2:——用戶端要特别指定的參數,參數3:用戶端的身份(使用者名)
FileSystem fs = FileSystem.get(new URI("hdfs://172.31.2.38:9000/"), conf, "root");
// 上傳一個檔案到HDFS中
fs.copyFromLocalFile(new Path("D:/install-pkgs/hbase-1.2.1-bin.tar.gz"), new Path("/aaa/"));
fs.close();
}
FileSystem fs = null;
@Before
public void init() throws Exception{
Configuration conf = new Configuration();
conf.set("dfs.replication", "2");
conf.set("dfs.blocksize", "64m");
fs = FileSystem.get(new URI("hdfs://172.31.2.38:9000/"), conf, "root");
}
/**
* 從HDFS中下載下傳檔案到用戶端本地磁盤
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testGet() throws IllegalArgumentException, IOException{
fs.copyToLocalFile(new Path("/test"), new Path("d:/"));
fs.close();
}
/**
* 在hdfs内部移動檔案\修改名稱
*/
@Test
public void testRename() throws Exception{
fs.rename(new Path("/install.log"), new Path("/aaa/in.log"));
fs.close();
}
/**
* 在hdfs中建立檔案夾
*/
@Test
public void testMkdir() throws Exception{
fs.mkdirs(new Path("/xx/yy/zz"));
fs.close();
}
/**
* 在hdfs中删除檔案或檔案夾
*/
@Test
public void testRm() throws Exception{
fs.delete(new Path("/aaa"), true);
fs.close();
}
/**
* 查詢hdfs指定目錄下的檔案資訊
*/
@Test
public void testLs() throws Exception{
// 隻查詢檔案的資訊,不傳回檔案夾的資訊
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/"), true);
while(iter.hasNext()){
LocatedFileStatus status = iter.next();
System.out.println("檔案全路徑:"+status.getPath());
System.out.println("塊大小:"+status.getBlockSize());
System.out.println("檔案長度:"+status.getLen());
System.out.println("副本數量:"+status.getReplication());
System.out.println("塊資訊:"+Arrays.toString(status.getBlockLocations()));
System.out.println("--------------------------------");
}
fs.close();
}
/**
* 讀取hdfs中的檔案的内容
*
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testReadData() throws IllegalArgumentException, IOException {
FSDataInputStream in = fs.open(new Path("/test.txt"));
BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
br.close();
in.close();
fs.close();
}
/**
* 讀取hdfs中檔案的指定偏移量範圍的内容
*
*
*
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testRandomReadData() throws IllegalArgumentException, IOException {
FSDataInputStream in = fs.open(new Path("/xx.dat"));
// 将讀取的起始位置進行指定
in.seek(12);
// 讀16個位元組
byte[] buf = new byte[16];
in.read(buf);
System.out.println(new String(buf));
in.close();
fs.close();
}
/**
* 往hdfs中的檔案寫内容
*
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testWriteData() throws IllegalArgumentException, IOException {
FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false);
// D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg
FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg");
byte[] buf = new byte[1024];
int read = 0;
while ((read = in.read(buf)) != -1) {
out.write(buf,0,read);
}
in.close();
out.close();
fs.close();
}
}
練習:從一個檔案裡面不斷地采集日志上傳到hdfs裡面
1.流程介紹
---啟動一個定時任務
--定時探測日志原目錄
--擷取檔案上傳到一個待上傳的臨時目錄
--逐一上傳到hdfs目标路徑,同時移動到備份目錄裡
--啟動一個定時任務:
--探測備份目錄中的備份資料是否已經超出,如果超出就删除
主類為:
import java.util.Timer;
public class DataCollectMain {
public static void main(String[] args) {
Timer timer = new Timer();
//第一個為task類,第二個開始時間 第三個沒隔多久執行一次
timer.schedule(new CollectTask(), 0, 60*60*1000L);
timer.schedule(new BackupCleanTask(), 0, 60*60*1000L);
}
CollectTask類:
這個類要繼承TimerTask,重寫run方法,主要内容就是不斷收集日志檔案
package cn.edu360.hdfs.datacollect;
import java.io.File;
import java.io.FilenameFilter;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.TimerTask;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
public class CollectTask extends TimerTask {
@Override
public void run() {
try {
// 擷取配置參數
Properties props = PropertyHolderLazy.getProps();
// 擷取本次采集時的日期
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
String day = sdf.format(new Date());
File srcDir = new File("d:/logs/accesslog");
// 列出日志源目錄中需要采集的檔案
//裡面傳了一個檔案過濾器,重寫accept方法,return true就要
File[] listFiles = srcDir.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if (name.startsWith("access.log")) {
return true;
}
return false;
}
});
// 将要采集的檔案移動到待上傳臨時目錄
File toUploadDir = new File("d:/logs/toupload");
for (File file : listFiles) {
//這裡如果是 file.renameTo(toUploadDir)是不對的,因為會生成一個toupload的檔案而不是檔案夾
//要用renameTo的話你要自己加上檔案的新名字比較麻煩
//用FileUtiles是對file操作的一些工具類
//第一個目标檔案,第二個路徑,第三個是否存在覆寫
FileUtils.moveFileToDirectory(file, toUploadDir, true);
}
// 構造一個HDFS的用戶端對象
FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), new Configuration(), "root");
File[] toUploadFiles = toUploadDir.listFiles();
// 檢查HDFS中的日期目錄是否存在,如果不存在,則建立
Path hdfsDestPath = new Path("/logs" + day);
if (!fs.exists(hdfsDestPath)) {
fs.mkdirs(hdfsDestPath);
}
// 檢查本地的備份目錄是否存在,如果不存在,則建立
File backupDir = new File("d:/logs/backup" + day + "/");
if (!backupDir.exists()) {
backupDir.mkdirs();
}
for (File file : toUploadFiles) {
// 傳輸檔案到HDFS并改名access_log_
fs.copyFromLocalFile(new Path(file.getAbsolutePath()), new Path("/logs"+day+"/access_log_"+UUID.randomUUID()+".log"));
// 将傳輸完成的檔案移動到備份目錄
//注意這裡依然不要用renameTo
FileUtils.moveFileToDirectory(file, backupDir, true);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 讀取hdfs中的檔案的内容
*
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testReadData() throws IllegalArgumentException, IOException {
FSDataInputStream in = fs.open(new Path("/test.txt"));
BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
br.close();
in.close();
fs.close();
}
/**
* 讀取hdfs中檔案的指定偏移量範圍的内容
*
*
* 作業題:用本例中的知識,實作讀取一個文本檔案中的指定BLOCK塊中的所有資料
*
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testRandomReadData() throws IllegalArgumentException, IOException {
FSDataInputStream in = fs.open(new Path("/xx.dat"));
// 将讀取的起始位置進行指定
in.seek(12);
// 讀16個位元組
byte[] buf = new byte[16];
in.read(buf);
System.out.println(new String(buf));
in.close();
fs.close();
}
/**
* 往hdfs中的檔案寫内容
*
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testWriteData() throws IllegalArgumentException, IOException {
FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false);
// D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg
FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg");
byte[] buf = new byte[1024];
int read = 0;
while ((read = in.read(buf)) != -1) {
out.write(buf,0,read);
}
in.close();
out.close();
fs.close();
}
BackupCleanTask類
public class BackupCleanTask extends TimerTask {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
long now = new Date().getTime();
try {
// 探測本地備份目錄
File backupBaseDir = new File("d:/logs/backup/");
File[] dayBackDir = backupBaseDir.listFiles();
// 判斷備份日期子目錄是否已超24小時
for (File dir : dayBackDir) {
long time = sdf.parse(dir.getName()).getTime();
if(now-time>24*60*60*1000L){
FileUtils.deleteDirectory(dir);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
hdfs中namenode中儲存中繼資料(對資料的描述資訊)是在記憶體中以樹的形式儲存的,并且每隔一段時間都會把這些中繼資料序列化到磁盤中。序列化的東西在磁盤中叫 fsimage檔案。
中繼資料可能會很大很大,是以隻能是定期的序列化
問題1:序列化的時候,發生了中繼資料的修改怎麼辦
答:namenode會把每次使用者的操作都記錄下來,記錄成日志檔案,存在edits日志檔案中
其中edits日志檔案也會像log4j滾動日志檔案一樣,當檔案太大的時候會另起一個檔案并改名字
問題2:當edits檔案太多的時候,一次當機也會花大量的時間從edits裡恢複,怎麼辦
答:會定期吧edits檔案重放fsimage檔案,并記錄edits的編号,把那些重放過的日志檔案給删除。這樣也相當于重新序列化了,
是以namenode并不會做這樣的事情,是由secondary node做的,他會定期吧namenode的fsimage檔案和edits檔案下載下傳下來
并把fsimage檔案反序列化,并且讀日志檔案更新中繼資料,然後序列化到磁盤,然後把他上傳給namenode。
這個機制叫做checkpoint機制
這裡secondarynode 相當一一個小秘書
用戶端寫資料到hdfs的流程
上面是建立響應的過程
然後 是傳遞檔案block塊的過程
用戶端從hdfs讀資料流程
額外知識點
注意,在windows裡面不要寫有些路徑不要寫絕對路徑,因為程式放到linux下面可能會找不到,是以報錯
一般使用class加載器,這樣當這個class加載的時候就會知道這個class在哪
類加載器的一些使用例子
比如我加載一個配置檔案,為了避免出現絕對路徑,我們可以是用類加載器
Properties props = new Properties();
//加載配置檔案,這樣寫的目的是為了避免在windows裡出現絕對路徑,用類加載器,再把檔案傳化成流
props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream("job.properties"));
而對于一些功能性的類,我們最好在寫邏輯的時候也不要直接去導入這個包,而是使用Class.forName
//這樣不直接導入這個包,直接用類加載器,是面向接口程式設計的一種思想。這裡我并不是在開始import xxxx.Mapper,這裡Mapper是一個接口,這裡我用了多态
Class<?> mapper_class = Class.forName(props.getProperty("MAPPER_CLASS"));
Mapper mapper = (Mapper) mapper_class.newInstance();
單例模式
https://www.cnblogs.com/crazy-wang-android/p/9054771.html隻有個一執行個體,必須自己建立自己這個執行個體,必須為别人提供這個執行個體
餓漢式單例:就算沒有人調用這個class,他也會加載進去;
如對于一個配置檔案的加載
-
單例設計模式,方式一: 餓漢式單例
*
*/
public class PropertyHolderHungery {
private static Properties prop = new Properties();
static {
try {
//将一個檔案prop.load(stram)
//這裡面如果傳一個IO流不好,因為要用到絕對路徑,使用了類加載器 這種不管有沒有使用這個類都會加載
prop.load(PropertyHolderHungery.class.getClassLoader().getResourceAsStream("collect.properties"));
} catch (Exception e) {
}
}
public static Properties getProps() throws Exception {
return prop;
}
懶漢式:隻有調用的時候才會有,但會有線程安全問題
- 單例模式:懶漢式——考慮了線程安全
public class PropertyHolderLazy {
private static Properties prop = null;
public static Properties getProps() throws Exception {
if (prop == null) {
synchronized (PropertyHolderLazy.class) {
if (prop == null) {
prop = new Properties();
prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties"));
}
}
}
return prop;
}
原文位址
https://www.cnblogs.com/wpbing/archive/2019/07/23/11233399.html