1. 在Mac環境搭建Hadoop MapReduce 項目
1. scala項目搭建 https://www.cnblogs.com/bajiaotai/p/15381309.html
2. 添加pom依賴
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
3. 在 src/main/resources 下添加 log4j.properties 檔案
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
2. 代碼案例 (scala)
package onePk {
import java.lang
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
// Mapper 類
// 每個Mapper類執行個體 處理一個切片檔案
class WCMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
var text = new Text
var intWritable = new IntWritable(1)
// 每行記錄調用一次map方法
override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = {
println("map enter .....")
//1. 擷取一行記錄
val line = value.toString
//2. 切割
val words = line.split(" ")
//3. 輸出到緩沖區
words.foreach(
key1 => {
text.set(key1);
context.write(text, intWritable)
}
)
}
}
// Reducer 類
// 所有Mapper執行個體 執行完畢後 Reducer才會執行
// Mapper類的輸出類型 = Reducer類的輸入類型
class WCReducer extends Reducer[Text, IntWritable, Text, IntWritable] {
var sum: Int = 0
private val intWritable = new IntWritable
// 每個key調用一次
// 張飛 <1,1,1,1,1>
override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
println("reduce enter .....")
// 1. 對詞頻數 求sum
values.forEach(sum += _.get)
// 2. 輸出結果
intWritable.set(sum)
context.write(key, intWritable)
}
}
// Driver
object Driver {
def main(args: Array[String]): Unit = {
println("1111")
//1. 擷取配置資訊以及 擷取job對象
var configuration =
new Configuration
var job: Job =
Job.getInstance(configuration)
//2. 注冊本Driver程式的jar
job.setJarByClass(this.getClass)
job.setJobName("scala mr")
//3. 注冊 Mapper 和 Reducer的jar
job.setMapperClass(classOf[WCMapper])
job.setReducerClass(classOf[WCReducer])
//4. 設定Mapper 類輸出key-value 資料類型
job.setMapOutputKeyClass(classOf[Text])
job.setMapOutputValueClass(classOf[IntWritable])
//5. 設定最終輸出key-value 資料類型
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[IntWritable])
//6. 設定輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path("src/main/data/input"))
FileOutputFormat.setOutputPath(job, new Path("src/main/data/output"))
//7. 送出job
val bool: Boolean =
job.waitForCompletion(true)
System.exit(bool match {
case true => "0".toInt
case false => "1".toInt
})
}
}
}