天天看點

02_Hadoop序列化_2.2 自定義Bean對象 實作序列化接口(Writable)

 代碼示例

package GroupByPoneNumPk {

  import java.io.{DataInput, DataOutput}
  import java.lang

  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.fs.Path
  import org.apache.hadoop.io.{LongWritable, Text, Writable}
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
  import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}

  // Mapper 類
  // 每個Mapper類執行個體 處理一個切片檔案
  class GroupByPoneNumMapper extends Mapper[LongWritable, Text, Text, FlowBean] {
    var text = new Text

    // 每行記錄調用一次map方法
    override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, FlowBean]#Context) = {
      //1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
      //1. 切割資料
      var line: Array[String] = value.toString.split(" +")
      println("第一行 : " + line.mkString("-"))
      var phone = line(1)
      var upflow = line.reverse(2)
      var downflow = line.reverse(1)

      //2. 建立FlowBean對象
      var flowBean = new FlowBean(upflow.toInt, downflow.toInt, 0)

      //3. 寫入到環形緩沖區
      text.set(phone)
      context.write(text, flowBean)
      println(flowBean)
    }
  }


  // Reducer 類
  // 所有Mapper執行個體 執行完畢後 Reducer才會執行
  // Mapper類的輸出類型 = Reducer類的輸入類型
  class GroupByPoneNumReducer extends Reducer[Text, FlowBean, Text, FlowBean] {

    // 每個key調用一次
    override def reduce(key: Text, values: lang.Iterable[FlowBean], context: Reducer[Text, FlowBean, Text, FlowBean]#Context) = {
      println("reduce into ....")
      //1. 對 upflow、downflow求和
      var sumUpflow = 0
      var sumDownflow = 0
      values.forEach(
        bean => {
          sumUpflow += bean.upflow
          sumDownflow += bean.downflow
        }
      )

      //2. 求 總流量
      var flowBean = new FlowBean(sumUpflow, sumDownflow, sumUpflow + sumDownflow)

      //2. 寫出資料
      context.write(key, flowBean)
      println("第二行 :" + flowBean)

    }
  }

  // Driver
  object Driver {
    def main(args: Array[String]): Unit = {
      //1. 擷取配置資訊以及 擷取job對象
      var configuration =
        new Configuration
      var job: Job =
        Job.getInstance(configuration)

      //2. 注冊本Driver程式的jar
      job.setJarByClass(this.getClass)

      job.setJobName("scala mr")

      //3. 注冊 Mapper 和 Reducer的jar
      job.setMapperClass(classOf[GroupByPoneNumMapper])
      job.setReducerClass(classOf[GroupByPoneNumReducer])

      //4. 設定Mapper 類輸出key-value 資料類型
      job.setMapOutputKeyClass(classOf[Text])
      job.setMapOutputValueClass(classOf[FlowBean])

      //5. 設定最終輸出key-value 資料類型
      job.setOutputKeyClass(classOf[Text])
      job.setOutputValueClass(classOf[FlowBean])

      //6. 設定輸入輸出路徑
      FileInputFormat.setInputPaths(job, new Path("src/main/data/input/phone_data.txt"))
      FileOutputFormat.setOutputPath(job, new Path("src/main/data/output"))

      //7. 送出job
      val bool: Boolean =
        job.waitForCompletion(true)
      System.exit(bool match {
        case true => "0".toInt
        case false => "1".toInt
      })

    }

  }


  //  1363157985066 id
  //  13726230503 手機号
  //  00-FD-07-A4-72-B8:CMCC MAC位址
  //  120.196.100.82 網絡ip
  //  i02.c.aliimg.com 域名
  //  24
  //  27
  //  2481 上行流量
  //  24681 下行流量
  //  200  網絡狀态碼
  class FlowBean() extends Writable {
    var upflow = 0
    var downflow = 0
    var sumflow = 0

    //輔助構造器
    def this(upflow: Int, downflow: Int, sumflow: Int) {
      this()
      this.upflow = upflow
      this.downflow = downflow
      this.sumflow = sumflow
    }

    override def write(out: DataOutput): Unit = {
      out.writeInt(upflow)
      out.writeInt(downflow)
      out.writeInt(sumflow)
    }

    override def readFields(in: DataInput): Unit = {
      upflow = in.readInt
      downflow = in.readInt
      sumflow = in.readInt
    }

    override def toString: String = {
      s"${upflow} \t ${downflow} \t ${sumflow}"
    }
  }


}