天天看點

01_MapRedece概述_1.8 WordCount案例(Scala版本)

1. 在Mac環境搭建Hadoop MapReduce 項目

  1.  scala項目搭建   https://www.cnblogs.com/bajiaotai/p/15381309.html

      2.  添加pom依賴

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
    </dependencies>      

    3.  在 src/main/resources 下添加 log4j.properties 檔案

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n      

2. 代碼案例 (scala)

package onePk {

  import java.lang
  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.fs.Path
  import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
  import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}

  // Mapper 類
  // 每個Mapper類執行個體 處理一個切片檔案
  class WCMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
    var text = new Text
    var intWritable = new IntWritable(1)

    // 每行記錄調用一次map方法
    override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = {
      println("map enter .....")
      //1. 擷取一行記錄
      val line = value.toString

      //2. 切割
      val words = line.split(" ")

      //3. 輸出到緩沖區
      words.foreach(
        key1 => {
          text.set(key1);
          context.write(text, intWritable)
        }
      )

    }
  }

  // Reducer 類
  // 所有Mapper執行個體 執行完畢後 Reducer才會執行
  // Mapper類的輸出類型 = Reducer類的輸入類型
  class WCReducer extends Reducer[Text, IntWritable, Text, IntWritable] {
    var sum: Int = 0
    private val intWritable = new IntWritable

    // 每個key調用一次
    // 張飛 <1,1,1,1,1>
    override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
      println("reduce enter .....")

      // 1. 對詞頻數 求sum
      values.forEach(sum += _.get)

      // 2. 輸出結果
      intWritable.set(sum)
      context.write(key, intWritable)

    }
  }

  // Driver
  object Driver {
    def main(args: Array[String]): Unit = {
      println("1111")
      //1. 擷取配置資訊以及 擷取job對象
      var configuration =
        new Configuration
      var job: Job =
        Job.getInstance(configuration)

      //2. 注冊本Driver程式的jar
      job.setJarByClass(this.getClass)

      job.setJobName("scala mr")

      //3. 注冊 Mapper 和 Reducer的jar
      job.setMapperClass(classOf[WCMapper])
      job.setReducerClass(classOf[WCReducer])

      //4. 設定Mapper 類輸出key-value 資料類型
      job.setMapOutputKeyClass(classOf[Text])
      job.setMapOutputValueClass(classOf[IntWritable])

      //5. 設定最終輸出key-value 資料類型
      job.setOutputKeyClass(classOf[Text])
      job.setOutputValueClass(classOf[IntWritable])

      //6. 設定輸入輸出路徑
      FileInputFormat.setInputPaths(job, new Path("src/main/data/input"))
      FileOutputFormat.setOutputPath(job, new Path("src/main/data/output"))

      //7. 送出job
      val bool: Boolean =
        job.waitForCompletion(true)
      System.exit(bool match {
        case true => "0".toInt
        case false => "1".toInt
      })

    }


  }


}