使用scala的多線程來做wordcount之前至少要知道單擊版怎麼做wordcount,是以先在指令行做單機版的單詞計數,具體解釋參考
單詞計數在D盤下有words.txt檔案和words.log,内容均如下
hello tom
hello jerry
hello tom
hello jerry
hello tom
hello tom
現在對words.txt内容做wordcount
scala> Source.fromFile("d://words.txt").getLines().toList
res5: List[String] = List(hello tom, hello jerry, hello tom, hello jerry, hello tom, hello tom)
scala> Source.fromFile("d://words.txt").getLines().toList.map(_.split(" "))
res6: List[Array[String]] = List(Array(hello, tom), Array(hello, jerry), Array(hello, tom), Array(hello, jerry), Array(hello, tom), Array(hello, tom))
scala> Source.fromFile("d://words.txt").getLines().toList.flatMap(_.split(" "))
res7: List[String] = List(hello, tom, hello, jerry, hello, tom, hello, jerry, hello, tom, hello, tom)
scala> Source.fromFile("d://words.txt").getLines().toList.flatMap(_.split(" ")).map((_,))
res8: List[(String, Int)] = List((hello,), (tom,), (hello,), (jerry,), (hello,), (tom,), (hello,), (jerry,), (hello,), (tom,), (hello,), (tom,))
scala> Source.fromFile("d://words.txt").getLines().toList.flatMap(_.split(" ")).map((_,)).groupBy(_._1)
res9: scala.collection.immutable.Map[String,List[(String, Int)]] = Map(tom -> List((tom,), (tom,), (tom,), (tom,)), jerry -> List((jerry,), (jerry,)), hello -> List((hello,), (hello,), (hello,), (hello,), (hello,), (hello,)))
scala> Source.fromFile("d://words.txt").getLines().toList.flatMap(_.split(" ")).map((_,)).groupBy(_._1).mapValues(_.size)
res10: scala.collection.immutable.Map[String,Int] = Map(tom -> , jerry -> , hello -> )
單機版搞定,那麼使用scala的Actor來做
case class WordCountTask(filename : String)
case class ResultTask(map : Map[String,Int])
case object StopTask
class WordCountActor extends Actor{
override def act(): Unit = {
loop{
react {
case WordCountTask(filename) => {
//得到的是一個map,Map(tom -> 4, jerry -> 2, hello -> 6)
val wcResultMap = Source.fromFile(filename).getLines().toList.flatMap(_.split(" ")).map((_,)).groupBy(_._1).mapValues(_.size)
//結果方法哦task中傳回給發送者
sender ! ResultTask(wcResultMap)
}
//退出
case StopTask => {
exit()
}
}
}
}
}
object WordCountActor {
def main(args: Array[String]): Unit = {
val responseSet = new mutable.HashSet[Future[Any]]()
val resultList = new ListBuffer[ResultTask]
//指定進行單詞計數的檔案
val files = Array ("d://words.txt","d://words.log")
//有幾個檔案就啟幾個actor
for (file <- files) {
val actor = new WordCountActor
//啟動線程,發送異步消息等待接收傳回結果
val response = actor.start() !! WordCountTask(file)
//接收結果放到Set中
responseSet += response
}
while (responseSet.size > ){
// 擷取接收到了消息的Future放到集合filterSet中
//responseSet中雖然有Future引用,但是此時Future中還不一定有内容
val filterSet = responseSet.filter(_.isSet)
for (ele <- filterSet) {
//取出Future中資訊(ResultTask(wcResultMap)),f.apply()得到Futrue裡面的資料
val result = ele.apply().asInstanceOf[ResultTask]
//ListBuffer(ResultTask(Map(tom -> 4, jerry -> 2, hello -> 6),...))
resultList += result
//Set中移除
responseSet -= ele
}
//睡眠一會,保證消息傳回完畢
Thread.sleep()
}
//下面做的相當于彙總的功能mapreduce中的reduce
//ListBuffer((tom,4),(jerry,2),(hello,6)...)
val r1 = resultList.flatMap(_.map)
//Map((tom,ListBuffer((tom,4),(tom,4))),(..),(...))
val r2 = r1.groupBy(_._1)
val r3 = r2.mapValues(_.foldLeft()(_+_._2))
println(r3)
}
}
輸出
Map(tom -> , jerry -> , hello -> )
注:上面的代碼來源于學習資料,其實又些地方似乎不完善,比如做睡眠似乎就沒考慮到Future的apply阻塞特性,可以不用過濾也能實作。把while那段改成下面的沒問題
while (responseSet.size > ){
for (ele <- responseSet) {
//取出Future中資訊(ResultTask(wcResultMap)),f.apply()得到Futrue裡面的資料
val result = ele.apply().asInstanceOf[ResultTask]
//ListBuffer(ResultTask(Map(tom -> , jerry -> , hello -> ),...))
resultList += result
//Set中移除
responseSet -= ele
}
}