1、準備資料,2個檔案
words.txt
内容:
lilei hello
zhangsan hello
lisi hello
蘇三 hello
words.log
内容:
lilei hello
zhangsan hello
lisi hello
2、環境Intellj IDEA scala插件
3、代碼
package p1
import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.io.Source
//模式比對類,用于送出任務
case class SubmitTask(fileName:String)
//單例的模式比對類,用于停止任務
case object StopTask
//用于收集分組後結果的
case class ResultTask(result:Map[String,Int])
/**
* scala Actor建構在java的線程基礎之上的,
* 為了避免頻繁的線程建立、銷毀和切換等,scala中提供了react方法
* 方法執行完畢後,仍然被保留
*/
class Task extends Actor{
override def act(){
loop{//重複執行一個代碼塊
react{
case SubmitTask(fileName)=>{
val result=Source.fromFile(fileName,"gb2312").getLines()//擷取檔案,有中文-編碼,每一行生成一個List集合
.flatMap(_.split(" ")).map((_,1)).toList//把上面所有集合壓縮成一個集合,再切分,再生成map-(“單詞”,1)
.groupBy(_._1).mapValues(_.size)//按照key分組,value就是分組後map的數量
sender ! ResultTask(result)//把單個檔案的統計結果輸出,!代表異步執行
}
case StopTask=>{
exit()
}
}
}
}
}
object WorkCount{
def main(args: Array[String]) {
//要讀取的檔案
val files=Array("E://words.txt","E://words.log")
val replaySet=new mutable.HashSet[Future[Any]]
val resultList=new mutable.ListBuffer[ResultTask]
//每個檔案啟動一個線程,異步送出,replaySet接收傳回的值
for(f<-files){
val t=new Task
val replay=t.start() !! SubmitTask(f)
replaySet+=replay
}
while(replaySet.size>0){
//檢查replaySet中是否有執行完Future,過濾出來
val toCompute=replaySet.filter(_.isSet)
for(r<-toCompute){
//r.apply()等價于r()取出r對象
val result=r.apply()
//取出的對象進行強轉,放到resultList中
resultList+=result.asInstanceOf[ResultTask]
//操作完一個移除一個,避免重複
replaySet.remove(r)//replaySet -=r
}
Thread.sleep(100)//睡一會避免死循環,等待所有任務執行完
}
//最終resultList中的資料是每個檔案處理好的esultTask(Map[String,Int])集合
//此步驟類似于hadoop裡的reducer
val finalResult=resultList.map(_.result)//變成List裡裝的很多map格式
.flatten.groupBy(_._1)//壓縮分組
.mapValues(x=>x.foldLeft(0)(_+_._2))//累加
//列印結果
println(finalResult)
}
}
5、結果
Map(lisi -> 2, 蘇三 -> 1, lilei -> 2, hello -> 7, zhangsan -> 2)