天天看点

scala用actor并发编程写一个单机版的WorldCount(类似Hadoop的MapReduce思想)

1、准备数据,2个文件

words.txt

内容:

lilei hello

zhangsan hello

lisi hello

苏三 hello

words.log

内容:

lilei hello

zhangsan hello

lisi hello

2、环境Intellj IDEA   scala插件

3、代码

package p1
import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.io.Source

//模式匹配类,用于提交任务
case class SubmitTask(fileName:String)
//单例的模式匹配类,用于停止任务
case object  StopTask
//用于收集分组后结果的
case class ResultTask(result:Map[String,Int])
/**
  * scala Actor构建在java的线程基础之上的,
  * 为了避免频繁的线程创建、销毁和切换等,scala中提供了react方法
  * 方法执行完毕后,仍然被保留
  */
class Task extends Actor{

   override def act(){
      loop{//重复执行一个代码块
         react{
            case SubmitTask(fileName)=>{
               val result=Source.fromFile(fileName,"gb2312").getLines()//获取文件,有中文-编码,每一行生成一个List集合
                     .flatMap(_.split(" ")).map((_,1)).toList//把上面所有集合压缩成一个集合,再切分,再生成map-(“单词”,1)
                     .groupBy(_._1).mapValues(_.size)//按照key分组,value就是分组后map的数量
               sender ! ResultTask(result)//把单个文件的统计结果输出,!代表异步执行
            }
            case StopTask=>{
               exit()
            }
         }
      }
   }
}
object WorkCount{
   def main(args: Array[String]) {
      //要读取的文件
      val files=Array("E://words.txt","E://words.log")
      val replaySet=new mutable.HashSet[Future[Any]]
      val resultList=new mutable.ListBuffer[ResultTask]

      //每个文件启动一个线程,异步提交,replaySet接收返回的值
      for(f<-files){
         val t=new Task
         val replay=t.start() !! SubmitTask(f)
         replaySet+=replay
      }
      while(replaySet.size>0){
         //检查replaySet中是否有执行完Future,过滤出来
         val toCompute=replaySet.filter(_.isSet)
         for(r<-toCompute){
            //r.apply()等价于r()取出r对象
            val result=r.apply()
            //取出的对象进行强转,放到resultList中
            resultList+=result.asInstanceOf[ResultTask]
            //操作完一个移除一个,避免重复
            replaySet.remove(r)//replaySet -=r
         }
         Thread.sleep(100)//睡一会避免死循环,等待所有任务执行完
      }
      //最终resultList中的数据是每个文件处理好的esultTask(Map[String,Int])集合
      //此步骤类似于hadoop里的reducer
      val finalResult=resultList.map(_.result)//变成List里装的很多map格式
                      .flatten.groupBy(_._1)//压缩分组
                      .mapValues(x=>x.foldLeft(0)(_+_._2))//累加
      //打印结果
      println(finalResult)
   }
}
           

5、结果

Map(lisi -> 2, 苏三 -> 1, lilei -> 2, hello -> 7, zhangsan -> 2)

继续阅读