天天看點

HADOOP學習筆記----------------------(1)

ubuntu虛拟機,帶有gitlab和jenkins等測試環境:​​http://pan.baidu.com/s/1jIhljyI​​密碼: z39m

Hadoop說明

  Hadoop将幫助解決如下特殊問題:

Ø  使用Hadoop分布式檔案系統(HDFS)來存儲海量資料集,通過MapReduce對這些資料集運作分布式計算。

Ø  熟悉Hadoop的資料和I/O建構,用于壓縮、資料內建、序列化和持久處理

Ø  洞悉編寫MapReduce實際應用時的常見陷阱和進階特性

Ø  設計、建構和管理一個專用的Hadoop叢集或在雲上運作Hadoop

Ø  使用進階查詢語言Pig來處理大規模資料

Ø  利用Hadoop資料庫HBase來儲存和處理結構化/半結構化資料

Ø  學會使用ZooKeeper來建構分布式系統

                 第一章 Hadoop概述

   Hadoop提供了一個穩定的共享存儲和分析系統。存儲由HDFS(Hadoop分布式檔案管理系統)實作,分析有MapReduce實作。這是Hadoop的核心。MapReduce針對每個查詢,每一個資料集(鍵/值  )(至少是很大一部分)都會被處理。MapReduce可以處理一批查詢,并且它針對整個資料集處理。MapReduce很适合處理那些需要分析整個資料集的問題,以批處理的方式,尤其是Ad Hoc(自主或即時)分析。RDBMS适用于點查詢和更新(其中,資料集已經被索引以提供延遲的檢索和短時間的少量資料更新)。MapReduce适合資料被一次寫入和多次讀取的應用,而關系資料庫更适合持續更新的資料集。

HADOOP學習筆記----------------------(1)

    MapReduce對于非結構化或半結構化資料非常有效,它被設計為處理時間内解釋資料。MapReduce輸入的鍵和值并不是資料固有的屬性,它們是由分析資料源的人來選擇的。

    MapReduce是一種線性的課伸縮的程式設計模式。程式員編寫兩個函數---map函數和Reduce函數----每一個都定義一個鍵/值對集映射到另一個。這些函數無視資料的大小或者它們正在使用的叢集的特性,這樣它們就可以原封不動地應用到小規模資料集或者大的資料集上。更重要的是,如果放入兩倍的資料量,運作的時間會少于兩倍。但是如果是兩倍大小的叢集,一個任務任然隻是和原來的一樣快。基于MapReduce的進階查詢語言(如Pig和Hive)使MapReduce的系統更接近傳統的資料庫程式設計語言。

    高性能計算(High Performance Computing,HPC)和網格計算社群多年來一直在做大規模的資料處理,它們使用的是消息傳遞接口(MessagePassing Interface,MPI)這樣的API。從廣義上來講,高性能計算的方法是将作業配置設定給一個機器叢集,這些機器通路共享檔案系統,由于一個存儲區域網絡(Storage Area Network,SAN)進行管理。這個非常适用于以主計算機密集型為主的作業,但當節點需要通路的大資料量時,這也成為一個問題,因為網絡帶寬成為“瓶頸”,是以計算機節點閑置下來了。

    MapReduce嘗試在計算節點本地存儲資料,是以資料通路速度回因為它是本地資料而比較快。這項“資料本地化”功能,成為MapReduce的核心功能并且也是它擁有良好性能的原因之一。MapReduce檢測失敗的map或者reduce任務并且在健康的機器上重制安排任務。MapReduce能夠做到這一點,得益于它是一個無共享的架構,這也意味着各任務之間彼此并不依賴。(mapper的輸出是回報給reducer的,但這通過MapReduce系統控制。在這種情況下,相當于傳回失敗的map,應該對傳回reducer給予更多關注,因為它必須確定它可以檢束到必要的map輸出,如果不行,必須重新運作相關的map進而生成必要的這些輸出。)

    MapReduce限定于鍵/值對的類型,mapper和reducer彼此間的協作有限,一個接一個地運作(mapper傳輸鍵/值對給reducer)。MapReduce作為一個建立搜尋索引産品系統。MapReduce被設計為用來運作那些需要數分鐘或數小時的作業,這些作業在一個聚集帶寬很高的資料中心中可信任的專用硬體裝置上運作。

1.1 Hadoop子項目

         Hadoop最出名的是MapReduce及其分布式檔案系統(HDFS),還有其他子項目提供配套服務,其他子項目提供補充性服務。這些子項目的簡要描述如下,其他技術棧如下圖:

HADOOP學習筆記----------------------(1)

         Core:一系列分布式檔案系統和通用I/O的元件和接口(序列化、JavaRPC和持久化資料結構)。

         Avro:一種提供高效、跨語言RPC的資料序列系統,持久化資料存儲。

         MapReduce:分布式資料處理模式和執行環境,運作與大型商用機叢集。

         HDFS:分布式檔案系統,運作于大型商用機叢集。

         Pig:一種資料流語言和運作環境,用于檢索非常大的資料集。Pig運作在MapReduce和HDFS的叢集上。

         Hbase:一個分布式的、列存儲資料庫。HBase使用HDFS作為底層存儲,同時支援MapReduce的批量式計算和點查詢(随機讀取)。

         ZooKeeper:一個分布式的、高可用性的協調服務。ZooKeeper提供分布式鎖之類的基本服務用于建構分布式應用。

         Hive:分布式資料倉庫。Hive管理HDFS中存儲的資料,并提供基于SQL的查詢語言(由運作時引擎翻譯長MapReduce作業)用于查詢資料。

        Chukwa:分布式資料收集和分析系統。Chukwa運作HDFS中存儲資料的收集器,它使用MapReduce來生成報告。

第2章 MapReduce簡介

         MapReduce是一個用于資料處理的程式設計模型。同一個程式Hadoop可以運作于用各種語言編寫的MapReduce程式上。MapReduce程式本質上是并行的,是以可以将大規模的資料分析交給任何一個擁有足夠多機器的營運商。MapReduce的優勢在與處理大型資料集。

2.1 一個氣象資料集

         在這個例子,要編寫一個挖掘氣象資料的程式。分布在全球各地的氣象傳感器每隔一個小時便收集當地的氣象資料,進而積累了大量的日志資料。這些日志資料适合用MapReduce進行分析的最佳候選,它們是半結構化且面向記錄的資料。

2.1.1 資料格式

         我們将使用NCDC(國家氣候資料中心)提供的資料。資料是以面向行的ASCII格式存儲的,每一行便是一個記錄。改格式支援許多氣象元素,其中許多資料是可選的或長度可變的。為簡單起見,将重點讨論基本元素(如氣溫),這些資料是始終都有且有固定寬帶的。

         如下一些國家氣候資料中心資料記錄的格式,該行已被分成多行以顯示出每個字段,在實際檔案中,字段被整合成一行且沒有任何分隔符。

HADOOP學習筆記----------------------(1)

         資料檔案按照日期和氣象站進行組織。從1901年到2001年,每一年都有一個目錄,每一個目錄都包括一個打封包件,檔案中的每一個氣象站都帶有當年的資料。例如,1990年的前面的資料項如下:

HADOOP學習筆記----------------------(1)

2.2使用Unix Tools來分析資料

         對于面向行的資料,傳統的處理工具是awk。下面有一個小程式腳本,用于計算每年的最高氣溫。

#! /usr/bin/env bash
for year in all/*
do
       echo -ne 'basename $year .gz' "\t"
       gunzip -c $year | \
         awk '{ temp=substr($0,88,5)+0;
                     q=substr($0,93,1);
                     if(temp!=9999 && q  ~ /[01459]/ && temp>max) max=temp}
              END {print max}'
done      

         該腳本循環周遊壓縮檔案,首先顯示年份,然後使用awk處理每個檔案。awk腳本從資料中提取兩個字段:氣溫和品質代碼。氣溫值通過加上一個0變成一個整數。接下來,執行測試,進而判斷氣溫值是否有效,品質代碼顯示的讀數是有疑問還是根本就是錯誤的。如果讀數是正确的,那麼該值将與目前看到的最大值進行比較,如果該值比原先的最大值大,就替換掉目前的最大值。當檔案中所有的行都已處理完畢并列印出最大值後,END塊中的代碼才會被執行。

         為完成跨越一世紀這麼長時間的查找,程式在EC2 High-CPU Extra Large Instance機器上運作了42分鐘。

2.3 使用Hadoop進行資料分析

         為了更好地發揮Hadoop提供的并行處理機制的優勢,必須把查詢表示成MapReduce作業。經過一些本地的小規模測試,将能夠在在機器叢集上運作它。

2.3.1 map和reduce

         MapReduce的工作過程分為兩個階段:map階段和reduce階段。每個階段都有鍵/值對作為輸入和輸出,并且它們的類型可由程式員選擇。程式員還具體定義了兩個函數:map函數和reduce函數。

         map階段輸入的是原始的資料,選擇的是一種文本輸入格式,以便資料集的每一行都會是一個文本值。鍵是在檔案開頭部分文本行起始處的偏移量。map函數很簡單,通過map函數來找出年份和氣溫。在本例中,map函數隻是一個資料準備階段,通過這種方式來建立資料,使得reducer函數能在此基礎上進行工作:找出每年的最高氣溫。map函數也是很适合去除已損記錄的地方:在這裡,我們将篩選掉缺失的、不可靠的或錯誤的氣溫資料。

         為了全面了解map的工作方式,思考下面幾行示例的輸入資料(考慮到頁面篇幅,一些未使用的列已被去除,用省略号表示):

HADOOP學習筆記----------------------(1)

         這些鍵/值對的方式來表示map函數:

HADOOP學習筆記----------------------(1)

         鍵是檔案中的行偏移量,而往往是我們在map函數中所忽視的。map函數的功能僅僅提取年份和氣溫(以粗體顯示),并将其作為輸出被發送。(氣溫值已被解釋為整數)

HADOOP學習筆記----------------------(1)

         map函數的輸出先由MapReduce架構處理,然後再被發送到reduce函數。這一處理過程根據鍵/值對進行排序和分組。是以,reduce函數會看到如下輸入:

HADOOP學習筆記----------------------(1)

         每年的年份後都有一系列氣溫讀數。所有reduce函數現在必須重複這個清單并從中找出最大的讀數:

HADOOP學習筆記----------------------(1)

         這是最後的輸出:全年氣溫記錄中每年的最高氣溫。

         這個資料流下圖所示。在圖的底部是Unix的管道,模拟整個MapReduce的流程:

HADOOP學習筆記----------------------(1)

2.3.2 Java MapReduce

         了解MapReduce程式的工作原理之後,下一步就是要用代碼來實作它。需要有三樣東西:一個map函數、一個reduce函數和一些來運作作業的代碼。map函數是由一個Mapper接口來實作,其中聲明了一個map()方法。map函數的實作:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureMapper extends MapReduceBase
                   implements Mapper<LongWritable,Text,Text,IntWritable>
{
         private static final int MISSING=9999;
         public void map(LongWritable key,Text value,OutputCollector<Text,IntWritable> output,Reporter reporter)
                   throw IOException{
                   String line=value.toString();
                   String year=line.substring(15,19);
                   if(line.charAt(87)=='+'){
                            airTemperture=Integer.parseInt(line.substring(88,92));
                   }else{
                            airTemperture=Integer.parseInt(line.substring(87,92));
                   }
                   String quality=line.substring(92,93);
                   if(airTemperture !=MISSING && quality.matches("[01459]")){
                            output.collect(new Text(year),new IntWritable(airTemperture));
                   }
         }
}<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">      </span>      

         Mapper接口是一個泛型類型,它有4個形式參數類型,由它們來指定map函數的輸入鍵、輸入值、輸出鍵和輸出值得類型。就上述代碼而言,輸入鍵是一個長整數偏移量,輸入的值是一行文本,輸出的鍵是年份,輸出的值是氣溫(整數)。Hadoop規定了一套可用于網絡序列優化的基本類型,而不是使用内置的Java類型。這些都可以在importorg.apache.hadoop.io中找到。LongWritable類型相當于Java中的Long類型,Text類型相當于Java的String類型和IntWritable類型相當于Java的Integer類型。

         map()方法需要傳入一個鍵和一個值。将一個包含Java字元串輸入行的Text值轉換成Java的String類型,然後利用其substring()方法提取的相應的列。

         map()方法還提供一個OutputCollector執行個體來寫輸入輸出内容。在這種情況下,寫入年份作為一個Text對象(隻使用一個鍵),用IntWritable類型包裝氣溫值。隻有在氣溫顯示出來後并且它的品質代碼表示的是正确的氣溫讀數時才能寫入輸出記錄。

         reduce函數同樣使用Reducer時被定義,最高氣溫示例的Reducer:  

import java.io.IOException;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
 
public class MaxTemperatureReducer extends MapReduceBase
                   implements Reducer<Text,IntWritable,Text,IntWritable>
{
        
         public void reduce(Text key,Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,Reporter reporter)
                   throw IOException{
                   int maxValue=Integer.MIN_VALUE;
                   while(values.hasNext()){
                            maxValue=Math.max(maxValue,values.next().get());
                   }
                   output.collect(key,new IntWritable(maxValue));
                  
         }
}<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">    </span>      

         四個形式參數類型用于指定reduce函數的輸入和輸出類型。reduce函數的輸入類型必須與map函數的輸出類型相比對:Text類型和IntWritable類型。在這中情況下,reduce函數的輸出類型是Text和IntWritable這兩種類型,前者是年份的類型而後者是最高氣溫的類型,在這些輸入類型之中,周遊所有氣溫,并把每個記錄進行比較知道找到一個最高的為止。

第三部分代碼運作的是MapReduce作業,在氣象資料集中找出最高氣溫的應用程式:

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
 
public class MaxTemperture{
       public static void main(String [] args) throws IOException{
              if(args.length !=2){
                     System.err.println("Usage:MaxTemperture{<input path> <output path>");
                     System.exit(-1);
              }
              JobConf conf=new JobConf(MaxTemperture.class);
              conf.setJobName("Max temperture");
              FileInputFormat.addInputPath(conf,new Path(args[0]));
              FileOutputFormat.setOutputPath(conf,new Path(args[1]));
             
              conf.setMapperClass(MaxTempertureMapper.class);
              conf.setReducerClass(MaxTempertureReducer.class);
              conf.setOutputKeyClass(Text.class);
              conf.setOutputValueClass(IntWritable.class);
             
              JobClient.runJob(conf);
       }
}      

        對象指定了作業的各種參數。它授予使用者對整個代碼如何運作的控制權。當使用者在Hadoop叢集上運作這個作業時,使用者把代碼打包成一個JAR檔案(Hadoop會在叢集分發這個包)。使用者沒有明确指定JAR檔案的名稱,而是在JobConf構造函數中傳送一個類,Hadoop會找到這個包含此類的JAR檔案。

         在建立JobConf對象後,使用者将指定輸入好輸出的路徑。通過調用FileInputFormat内的靜态方法addInputPath()來定義輸入的路徑,它可以是單個檔案、目錄或檔案模式的路徑。同時,addInputPath()可以被調用多次進而實作使用多路徑輸入。

         輸出路徑(其中隻有一個)是在FileOutputFormat内的靜态方法setOutputPath()來指定。它指定了reduce函數輸出檔案寫入的目錄。在運作作業前該目錄不應該存在,否則Hadoop會報錯并且拒絕運作任務。這種預防措施是為了防止資料丢失。

         接下類,通過setMapperClass()和setReducerClass()這兩個方法來指定要使用的map和reduce類型。

         setOutputKeyClass()和setOutputValueClass()方法控制map和reduce函數的輸出類型,正如本例所示,這兩個方法往往是相同的。如果它們不同,那麼map的輸出類型可設定成使用setMapOutputKeyClass()和setMapOutputValueClass()方法。輸入的類型通過輸入格式來控制,使用者沒有設定,因為使用者使用的是預設的TextInputFormat(文本輸入格式)。在設定了定義map和reduce函數的類之後,運作作業的準備工作就算完成了。JobClient内的靜态方法runJob()會送出作業并等待它完成,把進展情況寫入控制台。

         寫完MapReduce作業之後,拿一個小型的資料集進行測試以排除與代碼直接有關的問題。首先,以獨立模式安裝Hadoop。在這種模式寫,Hadoop運作中使用本地帶job runner(作業運作程式)的檔案系統。用前面讨論過的五行代碼的例子來測試(指令):

% export HADOOP_CLASSPATH=build/classes
% hadoop MaxTemperature input/ncdc/sample.txt output      

         如果Hadoop指令是以類名作為第一個參數,它會啟動一個JVM來運作這個類。使用指令比直接使用Java更友善,因為前者把類的路徑(及其依賴關系)加入Hadoop的庫中,并獲得Hadoop的配置。要添加應用程式類的路徑,使用者需要定義一個環境變量,Hadoop腳本會來執行相關操作。

注意:以本地(獨立)模式運作時,筆記中所有程式希望都以這種方式來設定HADOOP_CLA-

SSPATH。指令必須在示例代碼所在的檔案夾下被運作。

         運作作業所得到的輸出提供了一些有用的資訊。(無法找到作業JAR檔案相關資訊是意料之中的,因為是在本地模式下沒有JAR的情況下運作的。在叢集上運作時,不會看到此警告。)輸出的最後一部分叫“計數器”(Counter),顯示了在Hadoop上運作的每個作業産生的統計資訊。這些對檢查處理的資料量是否符合預期非常有用。例如,可以遵循整個系統中記錄的數目:5個map輸入産生了5個map的輸出,然後5個reduce輸入産生兩個reduce輸出。

         輸出被寫入到output目錄,其中每個reducer包括一個輸出檔案。作業包含一個reducer,是以隻能找到一個檔案名,名為part-0000:

繼續閱讀