代碼示例
package GroupByPoneNumPk {
import java.io.{DataInput, DataOutput}
import java.lang
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text, Writable}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
// Mapper 類
// 每個Mapper類執行個體 處理一個切片檔案
class GroupByPoneNumMapper extends Mapper[LongWritable, Text, Text, FlowBean] {
var text = new Text
// 每行記錄調用一次map方法
override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, FlowBean]#Context) = {
//1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
//1. 切割資料
var line: Array[String] = value.toString.split(" +")
println("第一行 : " + line.mkString("-"))
var phone = line(1)
var upflow = line.reverse(2)
var downflow = line.reverse(1)
//2. 建立FlowBean對象
var flowBean = new FlowBean(upflow.toInt, downflow.toInt, 0)
//3. 寫入到環形緩沖區
text.set(phone)
context.write(text, flowBean)
println(flowBean)
}
}
// Reducer 類
// 所有Mapper執行個體 執行完畢後 Reducer才會執行
// Mapper類的輸出類型 = Reducer類的輸入類型
class GroupByPoneNumReducer extends Reducer[Text, FlowBean, Text, FlowBean] {
// 每個key調用一次
override def reduce(key: Text, values: lang.Iterable[FlowBean], context: Reducer[Text, FlowBean, Text, FlowBean]#Context) = {
println("reduce into ....")
//1. 對 upflow、downflow求和
var sumUpflow = 0
var sumDownflow = 0
values.forEach(
bean => {
sumUpflow += bean.upflow
sumDownflow += bean.downflow
}
)
//2. 求 總流量
var flowBean = new FlowBean(sumUpflow, sumDownflow, sumUpflow + sumDownflow)
//2. 寫出資料
context.write(key, flowBean)
println("第二行 :" + flowBean)
}
}
// Driver
object Driver {
def main(args: Array[String]): Unit = {
//1. 擷取配置資訊以及 擷取job對象
var configuration =
new Configuration
var job: Job =
Job.getInstance(configuration)
//2. 注冊本Driver程式的jar
job.setJarByClass(this.getClass)
job.setJobName("scala mr")
//3. 注冊 Mapper 和 Reducer的jar
job.setMapperClass(classOf[GroupByPoneNumMapper])
job.setReducerClass(classOf[GroupByPoneNumReducer])
//4. 設定Mapper 類輸出key-value 資料類型
job.setMapOutputKeyClass(classOf[Text])
job.setMapOutputValueClass(classOf[FlowBean])
//5. 設定最終輸出key-value 資料類型
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[FlowBean])
//6. 設定輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path("src/main/data/input/phone_data.txt"))
FileOutputFormat.setOutputPath(job, new Path("src/main/data/output"))
//7. 送出job
val bool: Boolean =
job.waitForCompletion(true)
System.exit(bool match {
case true => "0".toInt
case false => "1".toInt
})
}
}
// 1363157985066 id
// 13726230503 手機号
// 00-FD-07-A4-72-B8:CMCC MAC位址
// 120.196.100.82 網絡ip
// i02.c.aliimg.com 域名
// 24
// 27
// 2481 上行流量
// 24681 下行流量
// 200 網絡狀态碼
class FlowBean() extends Writable {
var upflow = 0
var downflow = 0
var sumflow = 0
//輔助構造器
def this(upflow: Int, downflow: Int, sumflow: Int) {
this()
this.upflow = upflow
this.downflow = downflow
this.sumflow = sumflow
}
override def write(out: DataOutput): Unit = {
out.writeInt(upflow)
out.writeInt(downflow)
out.writeInt(sumflow)
}
override def readFields(in: DataInput): Unit = {
upflow = in.readInt
downflow = in.readInt
sumflow = in.readInt
}
override def toString: String = {
s"${upflow} \t ${downflow} \t ${sumflow}"
}
}
}