天天看點

Hadoop- The Definitive Guide 筆記

首先我們為什麼需要Hadoop?

The good news is that Big Data is here. The bad news is that we are struggling to store and analyze it.

面對海量的資料,我們需要高效的分析和存儲他們,而Hadoop可以做到這點,

This, in a nutshell, is what Hadoop provides: a reliable shared storage and analysis system. The storage is provided by HDFS , and analysis by MapReduce .

Hadoop和現有技術的差別是什麼?

和RDBMS的差別

  1. MapReduce is a good fit for problems that need to analyze the whole dataset , in a batch fashion, particularly for ad hoc analysis. An RDBMS is good for point queries or updates, where the dataset has been indexed to deliver low-latency retrieval and update times of a relatively small amount of data .
  2. MapReduce suits applications where the data is written once, and read many times , whereas a relational database is good for datasets that are continually updated .
  3. MapReduce works well on unstructured or semistructured data , since it is designed to interpret the data at processing time. In other words, the input keys and values for MapReduce are not an intrinsic property of the data, but they are chosen by the person analyzing the data.

Both as relational databases start incorporating some of the ideas from MapReduce (such as Aster Data’s and Greenplum’s databases), and, from the other direction, as higher-level query languages built on MapReduce (such as Pig and Hive) make MapReduce systems more approachable to traditional database programmers.

和Grid Computing的差別

MapReduce tries to colocate the data with the compute node, so data access is fast since it is local, known as data locality , is at the heart of MapReduce and is the reason for its good performance.

Grid Computing is to distribute the work across a cluster of machines, which access a shared filesystem , hosted by a SAN. This works well for predominantly compute-intensive jobs, but becomes a problem when nodes need to access larger data volumes (hundreds of gigabytes, the point at which MapReduce really starts to shine), since the network bandwidth is the bottleneck, and compute nodes become idle.

Hadoop的來曆

Hadoop was created by Doug Cutting , the creator of Apache Lucene, the widely used text search library. Hadoop has its origins in Apache Nutch , an open source web search engine, itself a part of the Lucene project.

MapReduce的例子

下面我們通過一個簡單的例子來了解MapReduce的過程,

這個例子就是給出了氣象台的氣溫的記錄,想從中找出每年的最高氣溫。

可以看出給出的初始資料是,非結構化的

Example

(0, 0067011990999991950 051507004...9999999N9+0000 1+99999999999...)

(424, 0043012650999991949 032418004...0500001N9+0078 1+99999999999...)

(106, 0043011990999991950 051512004...9999999N9+0022 1+99999999999...)

(212, 0043011990999991950 051518004...9999999N9-00 11 1+99999999999...)

(318, 0043012650999991949 032412004...0500001N9+0111 1+99999999999...)

Map的任務就是從大量的初始資料中抽取需要的少量資料,組成(Key, Value)的結構,因為Map過程都是本地進行的,是以是很高效的,隻需要通過網絡傳輸抽取出的少量的資料。

The map function merely extracts the year and the air temperature (indicated in bold text), and emits them as its output.

(1950, 0)

(1949, 78)

(1950, 22)

(1950, -11)

(1949, 111)

Framework在把Map輸出的資料傳給Reduce之前,需要做個預處理,就是把(Key, Value) pairs,按照key排序

The output from the map function is processed by the MapReduce framework before being sent to the reduce function. This processing sorts and groups the key-value pairs by key.

(1949, [111, 78])

(1950, [0, 22, -11])

上面隻是個抽象的表示,實際傳給reduce的應該隻是排序的序列,如下

(1949, 111)

(1949, 78)

(1950, 0)

(1950, 22)

(1950, -11)

Reduce就是按照你定義的邏輯,從Map産生的資料中得出最終的答案,這兒的邏輯就是找出最大值

All the reduce function has to do now is iterate through the list and pick up the maximum reading.

(1949, 111)

(1950, 22)

這就是一個完整的MapReduce的過程,還是比較簡單和容易了解的。

MapReduce Data Flow

這邊具體描述一下系統是怎麼進行上面那個MapReduce過程的,

  • A MapReduce job is a unit of work that the client wants to be performed: it consists of the input data , the MapReduce program , and configuration information.
  • Hadoop runs the job by dividing it into tasks , of which there are two types: map tasks and reduce tasks .
  • There are two types of nodes that control the job execution process: a jobtracker and a number of tasktrackers .The jobtracker coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers. Tasktrackers run tasks and send progress reports to the jobtracker, which keeps a record of the overall progress of each job. If a tasks fails, the jobtracker can reschedule it on a different tasktracker.
  • Hadoop divides the input to a MapReduce job into fixed-size pieces called input splits, or just splits . For most jobs, a good split size tends to be the size of a HDFS block , 64 MB by default . Hadoop does its best to run the map task on a node where the input data resides in HDFS. This is called the data locality optimization . It should now be clear why the optimal split size is the same as the block size: it is the largest size of input that can be guaranteed to be stored on a single node.
  • Map tasks write their output to local disk, not to HDFS . Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete the map output can be thrown away.
  • Reduce tasks don’t have the advantage of data locality—the input to a single reduce task is normally the output from all mappers . Therefore the sorted map outputs have to be transferred across the network to the node where the reduce task is running, where they are merged and then passed to the user-defined reduce function. The output of

    the reduce is normally stored in HDFS for reliability .

Reducer隻有比較少的幾個,而且需要把所有mapper的資料通過網絡傳給reducer,而mapper是具有data locality optimization特性,就是需要處理的資料都在本地,是以mapper的任務就是從大量的未處理的資料提取需要的資料,并進行預處理,讓傳給reducer的資料盡量的簡單。

Combiner Functions

Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks.

為了進一步減少map到reduce的網絡傳輸量,我們可以在map後加上combiner,相當于在local先run一下reduce邏輯,進一步減少需要傳輸的資料。看看下面的例子

The first map produced the output:

(1950, 0)

(1950, 20)  

(1950, 10)

And the second produced:

(1950, 25)

(1950, 15)

reduce function input:

(1950, [0, 20, 10, 25, 15])

加上combiner的效果是,對每個mapper的資料做預處理,在每個mapper node上先濾出最大的,再發給reducer, 這樣可以大大減少網絡傳輸量。

reduce function input:

(1950, [20, 25])

這個還是比較容易了解,但這個combiner是靈活的,而且不是每種reduce都可以加的,有些reduce過程局部和全局運作過程是不等價的,如求平均值。

Hadoop Streaming

Hadoop provides an API to MapReduce that allows you to write your map and reduce functions in languages other than Java. Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any language that can read standard input and write to standard output to write your MapReduce

program.

Hadoop的這個子產品很有意思,他本身是用Java編寫的,你通過Java API去調用map reduce接口當然可以實作map reduce的過程,但是這兒提供了一種可能性,你可以用你喜歡的任意的語言去開發map reduce函數,隻要它支援read standard input and write to standard output, 你根本就不需要去直接調用Map reduce接口,Hadoop Streaming會幫你搞定一切,so cool!!!

我比較喜歡用python,下面就給出一個python的例子,

python

Example 2-10. Map function for maximum temperature in Python

#!/usr/bin/env python

import re

import sys

for line in sys.stdin:

    val = line.strip()

    (year, temp, q) = (val[15:19], val[87:92], val[92:93])

    if (temp != "+9999" and re.match("[01459]", q)):

        print "%s/t%s" % (year, temp)

Example 2-11. Reduce function for maximum temperature in Python

#!/usr/bin/env python

import sys

(last_key, max_val) = (None, 0)

for line in sys.stdin:

    (key, val) = line.strip().split("/t")

    if last_key and last_key != key:

        print "%s/t%s" % (last_key, max_val) #因為framework再傳給reducer之前會按key排序,是以這個邏輯才成立

        (last_key, max_val) = (key, int(val))

    else:

        (last_key, max_val) = (key, max(max_val, int(val)))

if last_key:

print "%s/t%s" % (last_key, max_val)

你可以用這種簡單的方法來測試你寫的map,reduce函數是否正确

% cat input/ncdc/sample.txt | src/main/ch02/python/max_temperature_map.py | /

sort | src/main/ch02/python/max_temperature_reduce.py

1949 111

1950 22

用Hadoop Streaming來運作你的mapreduce過程

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar /

-input input/ncdc/sample.txt /

-output output /

-mapper src/main/ch02/python/max_temperature_map.py /

-reducer src/main/ch02/python/max_temperature_reduce.py

更加具體的用python寫mapreduce的例子參考這個blog

http://www.michael-noll.com/wiki/Writing_An_Hadoop_MapReduce_Program_In_Python

The Hadoop Distributed Filesystem

先看看HDFS的适用領域,和不适用于哪些應用

The Design of HDFS

Very large files

“Very large” in this context means files that are hundreds of megabytes, gigabytes, or terabytes in size. There are Hadoop clusters running today that store petabytes of data.

Streaming data access

HDFS is built around the idea that the most efficient data processing pattern is a write-once, read-many-times pattern. Each analysis will involve a large proportion, if not all, of the dataset, so the time to read the whole dataset is more important than the latency in reading the first record.

Commodity hardware

Hadoop doesn’t require expensive, highly reliable hardware to run on.

While this may change in the future, these are areas where HDFS is not a good fit today:

Low-latency data access

Applications that require low-latency access to data, in the tens of milliseconds range, will not work well with HDFS. Remember HDFS is optimized for delivering a high throughput of data, and this may be at the expense of latency. HBase is currently a better choice for low-latency access.

Lots of small files

Since the namenode holds filesystem metadata in memory, the limit to the number of files in a filesystem is governed by the amount of memory on the namenode.

Multiple writers, arbitrary file modifications

Files in HDFS may be written to by a single writer. Writes are always made at the end of the file. There is no support for multiple writers, or for modifications at arbitrary offsets in the file.

再了解一下其中的一些概念

HDFS Concepts

Blocks

在檔案系統裡面也有block的概念,是指這個最小的讀寫單元,一般是512bytes,在HDFS中也有block的概念,一般預設大小是64M,存儲在HDFS中的檔案都會被分為大小相等的block,被單獨存儲和處理。

Having a block abstraction for a distributed filesystem brings several benefits.

  • The first benefit is the most obvious: a file can be larger than any single disk in the network. There’s nothing that requires the blocks from a file to be stored on the same disk, so they can take advantage of any of the disks in the cluster.
  • Second, making the unit of abstraction a block rather than a file simplifies the storage subsystem.
  • Furthermore, blocks fit well with replication for providing fault tolerance and availability.

Namenodes and Datanodes

A HDFS cluster has two types of node operating in a master-worker pattern: a namenode(the master) and a number of datanodes (workers) .

Namenode manages the filesystem namespace. It maintains the filesystem tree and the metadata for all the files

and directories in the tree. This information is stored persistently on the local disk in the form of two files: the namespace image and the edit log. The namenode also knows the datanodes on which all the blocks for a given file are located, however, it does not store block locations persistently, since this information is reconstructed from datanodes when the system starts.

Datanodes are the work horses of the filesystem. They store and retrieve blocks when they are told to (by clients or the namenode), and they report back to the namenode periodically with lists of blocks that they are storing.

Without the namenode, the filesystem cannot be used. For this reason, it is important to make the namenode resilient to failure, and Hadoop provides two mechanisms for this.

  • The first way is to back up the files that make up the persistent state of the filesystem metadata.
  • It is also possible to run a secondary namenode, which despite its name does not act as a namenode.

Hadoop Filesystems

Hadoop has an abstract notion of filesystem, of which HDFS is just one implementation.

雖然HDFS是Hadoop的專有的檔案系統,但是Hadoop也支援其他的檔案系統,下面是一些例子

Filesystem            URI scheme       Java implementation (all under org.apache.hadoop)        

Local                          file                                    fs.LocalFileSystem

HDFS                         hdfs                            hdfs.DistributedFileSystem

KFS (Cloud-Store)      kfs                                fs.kfs.KosmosFileSystem                                

FTP                            ftp                                     fs.ftp.FTPFileSystem

S3 (native)                s3n                             fs.s3native.NativeS3FileSystem

S3 (blockbased)        s3                                       fs.s3.S3FileSystem

Description

KFS (Cloud-Store):CloudStore (formerly Kosmos filesystem)is a distributed filesystem like HDFS or Google’s GFS, written in C++. Find more information about it at http://kosmosfs.sourceforge.net/.

S3 (native):A filesystem backed by AmazonS3. See http://wiki.apache.org/hadoop/AmazonS3.

S3 (blockbased):A filesystem backed by Amazon S3, which stores files in blocks(much like HDFS) to overcome S3’s 5 GB file size limit.

Although it is possible (and sometimes very convenient) to run MapReduce programs that access any of these filesystems, when you are processing large volumes of data, you should choose a distributed filesystem that has the data locality optimization, such as HDFS or KFS

HDFS Data Flow

Anatomy of a File Read

  1. The client calling open() on the FileSystem object, which for HDFS is an instance of DistributedFileSystem .
  2. DistributedFileSystem calls the namenode , using RPC, to determine the locations of the blocks for the first few blocks in the file. The DistributedFileSystem returns a FSDataInputStream (an input stream that supports file seeks) to the client for it to read data from.
  3. The client then calls read() on the stream. DFSInputStream, which has stored the datanode addresses for the first few blocks in the file, then connects to the first (closest) datanode for the first block in the file. Data is streamed from the datanode back to the client, which calls read() repeatedly on the stream. When the endof the block is reached, DFSInputStream will close the connection to the datanode , then find the best datanode for the next block . This happens transparently to the client , which from its point of view is just reading a continuous stream
  4. When the client has finished reading, it calls close() on the FSDataInputStream

During reading, if the client encounters an error while communicating with a datanode, then it will try the next closest one for that block . It will also remember datanodes that have failed so that it doesn’t needlessly retry them for later blocks.

Anatomy of a File Write

  1. The client creates the file by calling create() on DistributedFileSystem.
  2. DistributedFileSystem makes an RPC call to the namenode to create a new file in the filesystem’s namespace, with no blocks associated with it. The namenode performs various checks to make sure the file doesn’t already exist, and that the client has the right permissions to create the file. If these checks pass , the namenode makes a record of the new file; otherwise, file creation fails and the client is thrown an IOException. The DistributedFileSystem returns a FSDataOutputStream for the client to start writing data to.
  3. As the client writes data, DFSOutputStream splits it into packets , which it writes to an internal queue, called the data queue . The data queue is consumed by the Data Streamer , whose responsibility it is to ask the namenode to allocate new blocks by picking a list of suitable datanodes to store the replicas . The list of datanodes forms a pipeline .
  4. DFSOutputStream also maintains an internal queue of packets that are waiting to be acknowledged by datanodes , called the ack queue . A packet is removed from the ack queue only when it has been acknowledged by all the datanodes in the pipeline
  5. When the client has finished writing data it calls close() on the stream. This action flushes all the remaining packets to the datanode pipeline and waits for acknowledgments before contacting the namenode to signal that the file is complete

如果在過程中發現某個datanote fail了,當然也有辦法處理它

首先會關閉管道。之後,所有在确認隊列中的包都被放到資料隊列的最前面。然後,管道中其餘工作正常的 datanode的目前block會被賦予一個新的認證号。這樣,那個出錯的datanode的沒寫完的block會在下次該datanode恢複工作之後被删除。之後,把那個出錯的datanode從管道中删除。目前block的其餘部分将被寫到其餘的正常的datanode上。于此同時,namenode指定一個新的datanode來替換出錯的datanode,以正常完成寫檔案及資料複制過程。HDFS設有一個 dfs.replication.min參數(預設值是1)是用來設定容錯率的。如果在寫一個block過程中多個datanode出錯了,隻要沒有達到 dfs.replication.min的值,寫檔案依然會成功。

Replica Placement

How does the namenode choose which datanodes to store replicas on ?

There’s a tradeoff between reliability and write bandwidth and read bandwidth here.

Hadoop’s strategy is to place the first replica on the same node as the client (for clients running outside the cluster, a node is chosen at random, although the system tries not to pick nodes that are too full or too busy). The second replica is placed on a different rack from the first (off-rack), chosen at random. The third replica is placed on the same rack as the second , but on a different node chosen at random. Further replicas are placed on random nodes on the cluster, although the system tries to avoid placing too many replicas on the same rack.

Hadoop Archives

HDFS stores small files inefficiently , since each file is stored in a block, and block metadata is held in memory by the namenode.

Hadoop Archives, or HAR files , are a file archiving facility that packs files into HDFS blocks more efficiently, thereby reducing namenode memory usage while still allowing transparent access to files. In particular, Hadoop Archives can be used as input to MapReduce .

Anatomy of a MapReduce Job Run

You can run a MapReduce job with a single line of code: JobClient.runJob(conf). It’s very short, but it conceals a great deal of processing behind the scenes. At the highest level, there are four independent entities:

  • The client , which submits the MapReduce job.
  • The jobtracker , which coordinates the job run. The jobtracker is a Java application whose main class is JobTracker.
  • The tasktrackers , which run the tasks that the job has been split into. Tasktrackers are Java applications whose main class is TaskTracker.
  • The distributed filesystem , which is used for sharing job files between the other entities.

下面就每個步驟較長的描述一下Job Run的process

Job Submission

The job submission process implemented by JobClient’s submitJob() method does the following:

  • Asks the jobtracker for a new job ID (by calling getNewJobId() on JobTracker)
  • Checks the output specification of the job. For example, if the output directory has not been specified or it already exists, the job is not submitted and an error is thrown to the MapReduce program.
  • Computes the input splits for the job. If the splits cannot be computed, because the input paths don’t exist, for example, then the job is not submitted and an error is thrown to the MapReduce program.
  • Copies the resources needed to run the job, including the job JAR file , the configuration file and the computed input splits , to the jobtracker’s filesystem i n a directory named after the job ID.
  • Tells the jobtracker that the job is ready for execution (by calling submitJob() on JobTracker)

Job Initialization

When the JobTracker receives a call to its submitJob() method, it puts it into an internal queue from where the job scheduler will pick it up and initialize it . Initialization involves creating an object to represent the job being run, which encapsulates its tasks, and bookkeeping information to keep track of the tasks’ status and progress

Task Assignment

Tasktrackers run a simple loop that periodically sends heartbeat method calls to the jobtracker . Heartbeats tell the jobtracker that a tasktracker is alive , but they also double as a channel for messages. As a part of the heartbeat, a tasktracker will indicate whether it is ready to run a new task , and if it is, the jobtracker will allocate it a task , which it communicates to the tasktracker using the heartbeat return value

Tasktrackers have a fixed number of slots for map tasks and for reduce tasks.The default scheduler fills empty map task slots before reduce task slots, so if the tasktracker has at least one empty map task slot, the jobtracker will select a map task; otherwise, it will select a reduce task.

Jobtracker在選擇reduce task時比較簡單,因為不用考慮data locality,但是對于map task,it takes account of the tasktracker’s network location and picks a task whose input split is as close as possible to the tasktracker

就是要盡量保證data-local,如果無法達到,maybe rack-local,一句話,選取map task時,盡量減少map資料的網絡流量。

Task Execution

First, it localizes the job JAR by copying it from the shared filesystem to the tasktracker’s filesystem .

Second, it creates a local working directory for the task, and un-jars the contents of the JAR into this directory.

Third, it creates an instance of TaskRunner to run the task.

先copy task的代碼,對于java就是jar包,到tasktracker所在伺服器,然後建立子程序去執行,對于java就是建立新的JVM去執行,保證task之間不幹擾,這兒考慮到不斷建立JVM的耗費,可以優化為建立JVM的pool,節省不斷建立和銷毀的時間開銷。

Progress and Status Updates

MapReduce jobs are long-running batch jobs, taking anything from minutes to hours to run. It’s important for the user to get feedback on how the job is progressing. A job and each of its tasks have a status, which includes such things as the state of the job or task (e.g., running, successfully completed, failed), the progress of maps and reduces , the values of the job’s counters , and a status message or description (which may be set by user code).

運作時間長了,總要給使用者些進度報告,這是人性化的設計,有jobtracker的存在,這些資料還是很容易得到的,不具體說了。

Job Completion

When the jobtracker receives a notification that the last task for a job is complete, it changes the status for the job to “successful.” Then, when the JobClient polls for status, it learns that the job has completed successfully, so it prints a message to tell the user, and then returns from the runJob() method.

Last, the jobtracker cleans up its working state for the job, and instructs tasktrackers to do the same (so intermediate output is deleted, for example).

這兒隻有當job完成後才會去删除中間資料,如map的output,這是從容錯考慮。

那麼在MapReduce中間出錯了,怎麼處理,可以看出無論講什麼,容錯技術在分布式計算裡面是很重要的,沒有好的容錯技術,那在現實中是不可行的

Task Failure

  • The most common way that this happens is when user code in the map or reduce task throws a runtime exception . If this happens, the child JVM reports the error back to its parent tasktracker, before it exits. The error ultimately makes it into the user logs. The tasktracker marks the task attempt as failed, freeing up a slot to run another task.
  • Another failure mode is the sudden exit of the child JVM —perhaps there is a JVM bug that causes the JVM to exit for a particular set of circumstances exposed by the Map-Reduce user code. In this case, the tasktracker notices that the process has exited, and marks the attempt as failed.
  • Hanging tasks are dealt with differently. The tasktracker notices that it hasn’t received a progress update for a while, and proceeds to mark the task as failed. The child JVM process will be automatically killed after this period.
  • When the jobtracker is notified of a task attempt that has failed (by the tasktracker’s heartbeat call) it will reschedule execution of the task. The jobtracker will try to avoid rescheduling the task on a tasktracker where it has previously failed. Furthermore, if a task fails more than four times, it will not be retried further.

Tasktracker Failure

Failure of a tasktracker is another failure mode. If a tasktracker fails by crashing, or running very slowly, it will stop sending heartbeats to the jobtracker (or send them very infrequently). The jobtracker will notice a tasktracker that has stopped sending heartbeats (if it hasn’t received one for 10 minutes, configured via the mapred.task tracker.expiry.interval property, in milliseconds) and remove it from its pool of tasktrackers to schedule tasks on.

Jobtracker Failure

Failure of the jobtracker is the most serious failure mode. Currently, Hadoop has no mechanism for dealing with failure of the jobtracker—it is a single point of failure—so in this case the job fails.

對于MapReduce的過程中,從map的output到reduce的input的過程也是很重要的一步

Shuffle and Sort

MapReduce makes the guarantee that the input to every reducer is sorted by key. The process by which the system performs the sort—and transfers the map outputs to the reducers as inputs—is known as the shuffle.

其實叫shuffle,有點不妥,shuffle是洗牌的意思,就是把順序的牌,随機打亂,而這邊的過程是把從map出來的資料按key排序後發給reduce的過程。

The Map Side

Each map task has a circular memory buffer that it writes the output to. The buffer is 100 MB by default, a size which can be tuned by changing the io.sort.mb property. When the contents of the buffer reaches a certain threshold size (io.sort.spill.percent, default 0.80, or 80%) a background thread will start to spill the contents to disk .

Before it writes to disk, the thread first divides the data into partitions corresponding to the reducers that they will ultimately be sent to. Within each partition, the background thread performs an in-memory sort by key , and if there is a combiner function, it is run on the output of the sort.

個人認為這邊先把data分成partition是為了sort,data太大無法直接在記憶體中sort,這個應該是外排的基本思路,本文對partition解釋的很模糊。

Each time the memory buffer reaches the spill threshold, a new spill file is created, so after the map task has written its last output record there could be several spill files . Before the task is finished, the spill files are merged into a single partitioned and sorted output file . The configuration property io.sort.factor controls the maximum number

of streams to merge at once; the default is 10.

The Reduce Side

The map output file is sitting on the local disk of the tasktracker that ran the map task, 是以要做reduce,必須把所有map的output資料都copy到reducer所在的伺服器上,This is known as the copy phase of the reduce task.

When all the map outputs have been copied, the reduce task moves into the sort phase (which should properly be called the merge phase , as the sorting was carried out on the map side), which merges the map outputs, maintaining their sort ordering. This is done in rounds.

For example, 50 map outputs, and the merge factor was 10 (the default, controlled by the io.sort.factor), then there would be 5 rounds.

Rather than have a final round that merges these five files into a single sorted file, the merge saves a trip to disk by directly feeding the reduce function in what is the last phase: the reduce phase .

可見reduce的過程,是由copy,merge,reduce3個階段組成的。

繼續閱讀