天天看點

快學Scala-Actor并發程式設計實作WordCount

使用scala的多線程來做wordcount之前至少要知道單擊版怎麼做wordcount,是以先在指令行做單機版的單詞計數,具體解釋參考

單詞計數在D盤下有words.txt檔案和words.log,内容均如下

hello tom
hello jerry
hello tom
hello jerry
hello tom
hello tom
           

現在對words.txt内容做wordcount

scala> Source.fromFile("d://words.txt").getLines().toList
res5: List[String] = List(hello tom, hello jerry, hello tom, hello jerry, hello tom, hello tom)

scala> Source.fromFile("d://words.txt").getLines().toList.map(_.split(" "))
res6: List[Array[String]] = List(Array(hello, tom), Array(hello, jerry), Array(hello, tom), Array(hello, jerry), Array(hello, tom), Array(hello, tom))

scala> Source.fromFile("d://words.txt").getLines().toList.flatMap(_.split(" "))
res7: List[String] = List(hello, tom, hello, jerry, hello, tom, hello, jerry, hello, tom, hello, tom)

scala> Source.fromFile("d://words.txt").getLines().toList.flatMap(_.split(" ")).map((_,))
res8: List[(String, Int)] = List((hello,), (tom,), (hello,), (jerry,), (hello,), (tom,), (hello,), (jerry,), (hello,), (tom,), (hello,), (tom,))

scala> Source.fromFile("d://words.txt").getLines().toList.flatMap(_.split(" ")).map((_,)).groupBy(_._1)
res9: scala.collection.immutable.Map[String,List[(String, Int)]] = Map(tom -> List((tom,), (tom,), (tom,), (tom,)), jerry -> List((jerry,), (jerry,)), hello -> List((hello,), (hello,), (hello,), (hello,), (hello,), (hello,)))

scala> Source.fromFile("d://words.txt").getLines().toList.flatMap(_.split(" ")).map((_,)).groupBy(_._1).mapValues(_.size)
res10: scala.collection.immutable.Map[String,Int] = Map(tom -> , jerry -> , hello -> )
           

單機版搞定,那麼使用scala的Actor來做

case class WordCountTask(filename : String)
case class ResultTask(map : Map[String,Int])
case object StopTask

class WordCountActor extends Actor{
  override def act(): Unit = {
    loop{
      react {
        case WordCountTask(filename) => {
          //得到的是一個map,Map(tom -> 4, jerry -> 2, hello -> 6)
          val wcResultMap = Source.fromFile(filename).getLines().toList.flatMap(_.split(" ")).map((_,)).groupBy(_._1).mapValues(_.size)
          //結果方法哦task中傳回給發送者
          sender ! ResultTask(wcResultMap)
        }
          //退出
        case StopTask => {
          exit()
        }
      }
    }
  }
}

object WordCountActor {
  def main(args: Array[String]): Unit = {

    val responseSet = new mutable.HashSet[Future[Any]]()
    val resultList = new ListBuffer[ResultTask]

    //指定進行單詞計數的檔案
    val files = Array ("d://words.txt","d://words.log")
    //有幾個檔案就啟幾個actor
    for (file <- files) {
      val actor = new WordCountActor
      //啟動線程,發送異步消息等待接收傳回結果
      val response = actor.start() !! WordCountTask(file)
      //接收結果放到Set中
      responseSet += response
    }
    while (responseSet.size > ){
      // 擷取接收到了消息的Future放到集合filterSet中
      //responseSet中雖然有Future引用,但是此時Future中還不一定有内容
      val filterSet = responseSet.filter(_.isSet)
      for (ele <- filterSet) {
        //取出Future中資訊(ResultTask(wcResultMap)),f.apply()得到Futrue裡面的資料
        val result = ele.apply().asInstanceOf[ResultTask]
        //ListBuffer(ResultTask(Map(tom -> 4, jerry -> 2, hello -> 6),...))
        resultList += result
        //Set中移除
        responseSet -= ele
      }
      //睡眠一會,保證消息傳回完畢
      Thread.sleep()
    }
    //下面做的相當于彙總的功能mapreduce中的reduce

    //ListBuffer((tom,4),(jerry,2),(hello,6)...)
    val r1 = resultList.flatMap(_.map)
    //Map((tom,ListBuffer((tom,4),(tom,4))),(..),(...))
    val r2 = r1.groupBy(_._1)
    val r3 = r2.mapValues(_.foldLeft()(_+_._2))
    println(r3)

  }
}
輸出
Map(tom -> , jerry -> , hello -> )
           

注:上面的代碼來源于學習資料,其實又些地方似乎不完善,比如做睡眠似乎就沒考慮到Future的apply阻塞特性,可以不用過濾也能實作。把while那段改成下面的沒問題

while (responseSet.size > ){
      for (ele <- responseSet) {
        //取出Future中資訊(ResultTask(wcResultMap)),f.apply()得到Futrue裡面的資料
        val result = ele.apply().asInstanceOf[ResultTask]
        //ListBuffer(ResultTask(Map(tom -> , jerry -> , hello -> ),...))
        resultList += result
        //Set中移除
        responseSet -= ele
      }
    }
           

繼續閱讀