天天看點

Hadoop TDG 2 -- introduction

the good news is that big data is here. the bad news is that we are struggling to store and analyze it.

面對海量的資料,我們需要高效的分析和存儲他們,而hadoop可以做到這點,

this, in a nutshell, is what hadoop provides: a reliable shared storage and analysis system. the storage is provided by hdfs, and analysis by mapreduce 

和rdbms的差別

mapreduce is a good fit for problems that need to analyze the whole dataset , in a batch fashion, particularly for ad hoc analysis. an rdbms is good for point queries or updates, where the dataset has been indexed to deliver low-latency retrieval and update times of a relatively small amount of data .

mapreduce suits applications where the data is written once, and read many times , whereas a relational database is good for datasets that are continually updated .

mapreduce works well on unstructured or semistructured data , since it is designed to interpret the data at processing time. in other words, the input keys and values for mapreduce are not an intrinsic property of the data, but they are chosen by the person analyzing the data.

both as relational databases start incorporating some of the ideas from mapreduce (such as aster data’s and greenplum’s databases), and, from the other direction, as higher-level query languages built on mapreduce (such as pig and hive) make mapreduce systems more approachable to traditional database programmers.

和grid computing的差別

mapreduce tries to colocate the data with the compute node, so data access is fast since it is local, known as data locality , is at the heart of mapreduce and is the reason for its good performance.

grid computing is to distribute the work across a cluster of machines, which access a shared filesystem , hosted by a san. this works well for predominantly compute-intensive jobs, but becomes a problem when nodes need to access larger data volumes (hundreds of gigabytes, the point at which mapreduce really starts to shine), since the network bandwidth is the bottleneck, and compute nodes become idle.

hadoop was created by doug cutting , the creator of apache lucene, the widely used text search library. hadoop has its origins in apache nutch , an open source web search engine, itself a part of the lucene project.

下面我們通過一個簡單的例子來了解mapreduce的過程,

這個例子就是給出了氣象台的氣溫的記錄,想從中找出每年的最高氣溫。

可以看出給出的初始資料是,非結構化的

example

(0, 0067011990999991950 051507004...9999999n9+0000 1+99999999999...)

(424, 0043012650999991949 032418004...0500001n9+0078 1+99999999999...) 

(106, 0043011990999991950 051512004...9999999n9+0022 1+99999999999...) 

(212, 0043011990999991950 051518004...9999999n9-00 11 1+99999999999...) 

(318, 0043012650999991949 032412004...0500001n9+0111 1+99999999999...)

map的任務就是從大量的初始資料中抽取需要的少量資料,組成(key, value)的結構,因為map過程都是本地進行的,是以是很高效的,隻需要通過網絡傳輸抽取出的少量的資料。

the map function merely extracts the year and the air temperature (indicated in bold text), and emits them as its output.

(1950, 0)

(1949, 78) 

(1950, 22) 

(1950, -11) 

(1949, 111)

framework在把map輸出的資料傳給reduce之前,需要做個預處理,就是把(key, value) pairs,按照key排序 

the output from the map function is processed by the mapreduce framework before being sent to the reduce function. this processing sorts and groups the key-value pairs by key. 

(1949, [111, 78]) 

(1950, [0, 22, -11])

上面隻是個抽象的表示,實際傳給reduce的應該隻是排序的序列,如下

(1949, 78)

(1950, 0) 

(1950, -11)

reduce就是按照你定義的邏輯,從map産生的資料中得出最終的答案,這兒的邏輯就是找出最大值

all the reduce function has to do now is iterate through the list and pick up the maximum reading. 

(1949, 111) 

(1950, 22)

這就是一個完整的mapreduce的過程,還是比較簡單和容易了解的。

having run through how the mapreduce program works, the next step is to express it in code. we need three things: a map function, a reduce function, and some code to run the job.

下面來看看用java怎麼來寫上面這個例子的過程,

the mapper interface is a generic type, with four formal type parameters that specify the input key, input value, output key, and output value types of the map function. 

for the present example, the input key is a long integer offset, the input value is a line of text, the output key is a year, and the output value is an air temperature (an integer). 

rather than use built-in java types, hadoop provides its own set of basic types that are optimized for network serialization. these are found in the org.apache.hadoop.io package. 

here we use longwritable, which corresponds to a java long, text (like java string), and intwritable (like java integer).

the reduce function is similarly defined using a reducer. 

again, four formal type parameters are used to specify the input and output types, this time for the reduce function. 

the input types of the reduce function must match the output types of the map function: text and intwritable. 

and in this case, the output types of the reduce function are text and intwritable, for a year and its maximum temperature, which we find by iterating through the temperatures and comparing each 

with a record of the highest found so far.

a jobconf object forms the specification of the job. it gives you control over how the job is run. 

1.  指定job代碼的jar file, 但是不需要指定jar file name, 而是給出class name, hadoop會自己去locate包含這個class的jar file. 為什麼要這麼麻煩? 便于修改jar file name?

when we run this job on a hadoop cluster, we will package the code into a jar file (which hadoop will distribute around the cluster). 

rather than explicitly specify the name of the jar file, we can pass a class in the jobconf constructor, which hadoop will use to locate the relevant jar file by looking for the jar file containing 

this class.

2.  指定job的輸入和輸出路徑 

having constructed a jobconf object, we specify the input and output paths. 

an input path is specified by calling the static addinputpath() method on fileinputformat, and it can be a single file, a directory (in which case, the input forms all the files in that directory), or a file pattern. as the name suggests, addinputpath() can be called more than once to use input from multiple paths.

the output path (of which there is only one) is specified by the static setoutput path() method on fileoutputformat. it specifies a directory where the output files from the reducer functions are written. the directory shouldn’t exist before running the job, as hadoop will complain and not run the job. this precaution is to prevent data loss (it can be very annoying to accidentally overwrite the output of a long job with another).

3.  指定mapper和reducer class 

next, we specify the map and reduce types to use via the setmapperclass() and setreducerclass() methods.

4.  指定map和reduce輸出的類型 

the setoutputkeyclass() and setoutputvalueclass() methods control the output types for the map and the reduce functions, which are often the same, as they are in our case. 

if they are different, then the map output types can be set using the methods setmapoutputkeyclass() and setmapoutputvalueclass(). 

the input types are controlled via the input format, which we have not explicitly set since we are using the default textinputformat. 

after setting the classes that define the map and reduce functions, we are ready to run the job.

5.  run 

the static runjob() method on jobclient submits the job and waits for it to finish, writing information about its progress to the console.

release 0.20.0 of hadoop included a new java mapreduce api, sometimes referred to as “context objects,” designed to make the api easier to evolve in the future. 

the new api is type-incompatible with the old, however, so applications need to be rewritten to take advantage of it. 

there are several notable differences between the two apis: 

• the new api favors abstract classes over interfaces, since these are easier to evolve. 

for example, you can add a method (with a default implementation) to an abstract class without breaking old implementations of the class. in the new api, the mapper and reducer interfaces are now abstract classes. 

• the new api is in the org.apache.hadoop.mapreduce package (and subpackages). the old api can still be found in org.apache.hadoop.mapred. 

• the new api makes extensive use of context objects that allow the user code to communicate with the mapreduce system. the mapcontext, for example, essentially unifies the role of the jobconf, the outputcollector, and the reporter. 

• the new api supports both a “push” and a “pull” style of iteration. 

in both apis, ey-value record pairs are pushed to the mapper, but in addition, the new api allows a mapper to pull records from within the map() method. 

the same goes for the reducer. an example of how the “pull” style can be useful is processing records in batches, rather than one by one.

• configuration has been unified. the old api has a special jobconf object for job configuration, which is an extension of hadoop’s vanilla configuration object (used for configuring daemons; see “the configuration api” on page 130). in the new api, this distinction is dropped, so job configuration is done through a configuration. 

• job control is performed through the job class, rather than jobclient, which no longer exists in the new api. 

• output files are named slightly differently: part-m-nnnnn for map outputs, and partr-nnnnn for reduce outputs (where nnnnn is an integer designating the part number, starting from zero).

you’ve seen how mapreduce works for small inputs; now it’s time to take a bird’s-eye view of the system and look at the data flow for large inputs. 

for simplicity, the examples so far have used files on the local filesystem. 

however, to scale out, we need to store the data in a distributed filesystem, typically hdfs (which you’ll learn about in the next chapter), to allow hadoop to move the mapreduce computation to each machine hosting a part of the data. let’s see how this works.

a mapreduce job is a unit of work that the client wants to be performed: it consists of the input data , the mapreduce program , and configuration information. 

hadoop runs the job by dividing it into tasks , of which there are two types: map tasks and reduce tasks . 

一個m/r job由”輸入資料, m/r代碼, 和配置資訊”組成. 而job又可以分為, map和reduce兩種

there are two types of nodes that control the job execution process: a jobtracker and a number of tasktrackers .the jobtracker coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers. tasktrackers run tasks and send progress reports to the jobtracker, which keeps a record of the overall progress of each job. if a tasks fails, the jobtracker can reschedule it on a different tasktracker. 

中心化設計, 主節點, jobtracker 和一系列任務節點tasktrackers. 主節點用于負責管理和協調任務節點

hadoop divides the input to a mapreduce job into fixed-size pieces called input splits, or just splits. 

hadoop creates one map task for each split, which runs the userdefined map function for each record in the split. 

having many splits means the time taken to process each split is small compared to the time to process the whole input. 

分布式處理, m/r, 首先就是要資料劃分, 劃分開了才能并發的處理, 劃分多大算合适?

so if we are processing the splits in parallel, the processing is better load-balanced if the splits are small, since a faster machine will be able to process proportionally more splits over the course of the job than a slower machine. even if the machines are identical, failed processes or other jobs running concurrently make load balancing desirable, and the quality of the load balancing increases as the splits become more fine-grained. 

on the other hand, if splits are too small, then the overhead of managing the splits and of map task creation begins to dominate the total job execution time. 

splits劃分的比較細, 有利于load-balanced和failed processes 

而劃分的比較粗的話, 問題是增加了task管理的overhead

for most jobs, a good split size tends to be the size of an hdfs block, 64 mb by default, although this can be changed for the cluster (for all newly created files), or specified when each file is created. 

是以折衷的結果就是, 對于大多數job, split預設劃分為64m, 比較合理

hadoop does its best to run the map task on a node where the input data resides in hdfs. this is called the data locality optimization. 

it should now be clear why the optimal split size is the same as the block size: it is the largest size of input that can be guaranteed to be stored on a single node. if the split spanned two blocks, it would be unlikely that any hdfs node stored both blocks, so some of the split would have to be transferred across the network to the node running the map task, which is clearly less efficient than running the whole map task using local data. 

為什麼64m比較合理, 因為block size也是64m, 而hdfs上存儲的機關就是block, 為了滿足data locality optimization, 如果超出64m, 無法保證多個block在存儲在同一個node上 

map tasks write their output to the local disk, not to hdfs. why is this? 

map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete the map output can be thrown away. so storing it in hdfs, with replication, would be overkill. if the node running the map task fails before the map output has been consumed by the reduce task, then hadoop will automatically rerun the map task on another node to re-create the map output. 

reduce tasks don’t have the advantage of data locality—the input to a single reduce task is normally the output from all mappers. 

in the present example, we have a single reduce task that is fed by all of the map tasks. therefore, the sorted map outputs have to be transferred across the network to the node where the reduce task is running, where they are merged and then passed to the user-defined reduce function. the output of the reduce is normally stored in hdfs for reliability. 

as explained in chapter 3, for each hdfs block of the reduce output, the first replica is stored on the local node, with other replicas being stored on off-rack nodes. thus, writing the reduce output does consume network bandwidth, but only as much as a normal hdfs write pipeline consumes. 

reducer隻有比較少的幾個,而且需要把所有mapper的資料通過網絡傳給reducer,而mapper是具有data locality optimization特性,就是需要處理的資料都在本地,是以mapper的任務就是從大量的未處理的資料提取需要的資料,并進行預處理,讓傳給reducer的資料盡量的簡單。 

而reducer産生的結果也是最終的處理結果, 是以需要存儲到hdfs上...

the whole data flow with a single reduce task is illustrated in figure 2-2. the dotted boxes indicate nodes, the light arrows show data transfers on a node, and the heavy arrows show data transfers between nodes.

when there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task. 

there can be many keys (and their associated values) in each partition, but the records for any given key are all in a single partition. 簡單的hash就可以做到 

the partitioning can be controlled by a user-defined partitioning function, but normally the default partitioner—which buckets keys using a hash function—works very well.

the data flow for the general case of multiple reduce tasks is illustrated in figure 2-3. 

this diagram makes it clear why the data flow between map and reduce tasks is colloquially known as “the shuffle,” as each reduce task is fed by many map tasks. 

the shuffle is more complicated than this diagram suggests, and tuning it can have a big impact on job execution time, as you will see in “shuffle and sort” on page 177.

finally, it’s also possible to have zero reduce tasks. 

this can be appropriate when you don’t need the shuffle since the processing can be carried out entirely in parallel (a few examples are discussed in “nlineinputformat” on page 211). in this case, the only off-node data transfer is when the map tasks write to hdfs (see figure 2-4).

many mapreduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks. 

hadoop allows the user to specify a combiner function to be run on the map output—the combiner function’s output forms the input to the reduce function. 

since the combiner function is an optimization, hadoop does not provide a guarantee of how many times it will call it for a particular map output record, if at all. in other words, calling the combiner function zero, one, or many times should produce the same output from the reducer.

目的就是優化, 為了進一步減少map到reduce的網絡傳輸量,我們可以在map後加上combiner,相當于在local先run一下reduce邏輯,進一步減少需要傳輸的資料。看看下面的例子

the first map produced the output: 

(1950, 20)   

(1950, 10) 

and the second produced: 

(1950, 25) 

(1950, 15) 

reduce function input: 

(1950, [0, 20, 10, 25, 15]) 

加上combiner的效果是,對每個mapper的資料做預處理,在每個mapper node上先濾出最大的,再發給reducer, 這樣可以大大減少網絡傳輸量。 

reduce function 輸入由(1950, [0, 20, 10, 25, 15])減少成(1950, [20, 25])

not all functions possess this property. for example, if we were calculating mean temperatures, then we couldn’t use the mean as our combiner function. 

the combiner function doesn’t replace the reduce function. (how could it? the reduce function is still needed to process records with the same key from different maps.) 

but it can help cut down the amount of data shuffled between the maps and the reduces, and for this reason alone it is always worth considering whether you can use a combiner function in your mapreduce job.

設定的方法也很簡單

hadoop provides an api to mapreduce that allows you to write your map and reduce functions in languages other than java. 

hadoop streaming uses unix standard streams as the interface between hadoop and your program, so you can use any language that can read standard input and write to standard output to write your mapreduce program. 

streaming is naturally suited for text processing (although, as of version 0.21.0, it can handle binary streams, too), and when used in text mode, it has a line-oriented view of data. map input data is passed over standard input to your map function, which processes it line by line and writes lines to standard output. a map output key-value pair is written as a single tab-delimited line. input to the reduce function is in the same format—a tab-separated key-value pair—passed over standard input. the reduce function reads lines from standard input, which the framework guarantees are sorted by key, and writes its results to standard output. 

我比較喜歡用python,下面就給出一個python的例子, 

python 

example 2-10. map function for maximum temperature in python 

example 2-11. reduce function for maximum temperature in python 

你可以用這種簡單的方法來測試你寫的map,reduce函數是否正确 

用hadoop streaming來運作你的mapreduce過程 

更加具體的用python寫mapreduce的例子參考這個blog

<a href="http://www.michael-noll.com/wiki/writing_an_hadoop_mapreduce_program_in_python">http://www.michael-noll.com/wiki/writing_an_hadoop_mapreduce_program_in_python</a>

現在由寫基于streaming api開放的python hadoop平台, 直接使用很友善

mrjob is a python 2.5+ package that helps you write and run hadoop streaming jobs.

mrjob fully supports amazon's elastic mapreduce (emr) service, which allows you to buy time on a hadoop cluster on an hourly basis. it also works with your own hadoop cluster.

<a href="https://github.com/yelp/mrjob">https://github.com/yelp/mrjob</a>

<a href="https://github.com/klbostee/dumbo/wiki">https://github.com/klbostee/dumbo/wiki</a>

本文章摘自部落格園,原文釋出日期:2011-07-04