天天看點

模拟MapReduce程式設計的程式案例(用于統計文本中單詞出現頻率)

本案例要實作的目标:

1、模拟修改配置,通過發指令的方式統計一個檔案中出現的單詞的字數。

案例代碼結構如下:

模拟MapReduce程式設計的程式案例(用于統計文本中單詞出現頻率)

在整個案例中需要有以下幾類檔案:

a:worker服務端,用于類似mapreduce接收jar,接收配置檔案,執行業務邏輯

b:程式用戶端、用于組裝配置檔案、發送業務執行的指令(聽過socket發送jarfile、jobconf、和job2run的指令)

代碼結構,每個包和代碼作用介紹

cn.toto.bigdata.mymr.task

taskprocessor

核心的主體執行程式

processlogic

定義用戶端調用必須實作的方法,相當于webservice中的接口規範

cn.toto.bigdata.mymr.io

inputformat

封裝讀檔案的元件(接口用途)

defaultinputformat

封裝讀檔案的元件的實作類

outputformat

封裝寫檔案的元件(接口用途)

defaultoutputformat

封裝寫檔案的元件的實作

cn.toto.bigdata.mymr.common

constants

常量定義

context

應用上下文,用于存儲計算單詞出現字數的次數的中間變量

cn.toto.bigdata.mymr.userapp

userlogic

用戶端對processlogic規範的實作

userapp

用戶端主入口程式

cn.toto.bigdata.mymr.scheduler

runner

用戶端userapp執行指令是依賴的runner類,通過這裡面的socket發送指令。

workerclient

用戶端執行時需要用到的client相關的代碼

workerserver

userapp執行時,需要提前啟動的服務端

workerrunnable

服務端執行的相關邏輯

運作條件:

1、将mapreduce-my-demo導出成test.jar放置在e:/test.jar下。

2、需要有用于統計用的文本檔案a.txt,檔案在e:\a.txt

内容截圖類似:

模拟MapReduce程式設計的程式案例(用于統計文本中單詞出現頻率)

假設a.txt内容為:

the true

nobility is

in being

superior to

your previous

self guess

no great

discovery

was ever

made without

a bold

knowledge will

give you

power but

character respect

the sun

is just

rising in

the morning

of another

day i

i figure

life is

a gift

and i

don’t intend

on wasting

3、首先運作:workerserver,相當于是啟動服務端的代碼

4、再次運作:userapp,相當于是用戶端

5、最終的統計結果将顯示在:e:/out.txt中。統計結果如下:

nobility     1

but   1

gift   1

wasting    1

rising        1

don't         1

another    1

i        3

your 1

knowledge       1

sun   1

without    1

life    1

the  2

character 1

and  1

of      1

power       1

just  1

day   1

you   1

on     1

no    1

a       2

give  1

figure        1

previous   1

in      2

will   1

made        1

was  1

is      3

being        1

bold 1

great        1

respect    1

morning   1

the   1

ever 1

superior   1

guess        1

discovery 1

true 1

self   1

to     1

intend       1

6、最終的日志将存儲在:e:/task/task.log,最終的配置和工作用的jar也将會生成到這個目錄下面,效果如下:

模拟MapReduce程式設計的程式案例(用于統計文本中單詞出現頻率)

其中job.conf的内容為:

模拟MapReduce程式設計的程式案例(用于統計文本中單詞出現頻率)

生成的task.log效果如下:

模拟MapReduce程式設計的程式案例(用于統計文本中單詞出現頻率)

接着:具體的代碼實作如下:

taskprocessor代碼如下

package cn.toto.bigdata.mymr.task;

import java.util.hashmap;

import java.util.logging.filehandler;

import java.util.logging.level;

import java.util.logging.logger;

import cn.toto.bigdata.mymr.common.constants;

import cn.toto.bigdata.mymr.common.context;

import cn.toto.bigdata.mymr.io.inputformat;

import cn.toto.bigdata.mymr.io.outputformat;

/**

 * 1、核心的主體執行程式

 * 這裡是任務執行者

 */

public class taskprocessor {

   public static void main(string[] args) throws exception {

      // 加載使用者指定的所有配置參數到上下文對象中,同時讀取配置檔案

      context context = new context();

      //擷取上下文中的配置檔案

      hashmap<string, string>  conf = context.getconfiguration();

      //通過列印日志的方式檢視程式運作的結果

      logger logger = logger.getlogger("taskprocessor");

      //設定日志的輸出級别是info級别

      logger.setlevel(level.info);

      filehandler filehandler = new filehandler("e:/task/task.log");

      filehandler.setlevel(level.info);

      logger.addhandler(filehandler);

      logger.info("context:" + context);

      logger.info("conf:" + conf);

      //初始化檔案讀取元件

      //從配置檔案中擷取用于讀取的元件的class資訊

      class<?> forname = class.forname(conf.get(constants.input_format));

      inputformat inputformat = (inputformat) forname.newinstance();

      inputformat.init(context);

      //用inputformat元件讀資料,并調用使用者邏輯

      class<?> forname2 = class.forname(conf.get(constants.user_logic));

      processlogic userlogic = (processlogic) forname2.newinstance();

      //對每一行調用使用者邏輯,并通過context将使用者調用結果存儲内部緩存

      while(inputformat.hasnext()) {

         integer key = inputformat.nextkey();

         string value = inputformat.nextvalue();

         userlogic.process(key, value, context);

      }

      userlogic.cleanup(context);

      //替使用者輸出結果

      class<?> forname3 = class.forname(conf.get(constants.output_format));

      outputformat outputformat = (outputformat) forname3.newinstance();

      outputformat.write(context);

   }

}

processlogic代碼如下:

package cn.toto.bigdata.mymr.task;

import cn.toto.bigdata.mymr.common.context;

 * 1、規定的業務邏輯編寫規範

 * process() 和  cleanup都沒有寫實作,這裡的實作在用戶端

public abstract class processlogic {

   /**

    * 這裡的context存儲處理後的結果值

    * @param key          :行号

    * @param value        :所在行的一行内容

    * @param context      :應用上下文的内容

    */

   public abstract void process(integer key,string value,context context);

    * 通過cleanup輸出處理後的結果

   public void cleanup(context context){}

package cn.toto.bigdata.mymr.io;

public abstract class inputformat {

    * 擷取下一行要讀的行的位置

   public abstract int nextkey();

    * 擷取從檔案中讀取的到的行的資訊

   public abstract string nextvalue();

    * 從檔案中讀取到一行資訊

   public abstract string readline() throws exception;

    * 判斷是否還可以讀取到下一行的内容

   public abstract boolean hasnext() throws exception;

    * 初始化要讀取的檔案的路徑和檔案流

   public abstract void init(context context) throws exception;

package cn.toto.bigdata.mymr.io;

import java.io.bufferedreader;

import java.io.filereader;

 * 這裡是預設的讀取的實作類

public class defaultinputformat extends inputformat{

   //這裡表示要讀取的檔案的路徑

   private string inputpath;

   private bufferedreader br = null;

   //這裡的key是指文本中類似讀取到的指針的偏移量,是行号的偏移量

   private int key;

   //這裡的value是指一行中的資料

   private string value;

   //預設讀取的行是第0行

   private int linenumber = 0;

   @override

   public void init(context context) throws exception {

      //擷取要讀的檔案的路徑

       this.inputpath = context.getconfiguration().get(constants.input_path);

       //開始初始化輸入流,隻不過,這個流是從檔案中擷取的

       this.br = new bufferedreader(new filereader(inputpath));

   public int nextkey() {

      return this.key;

   public string nextvalue() {

      return this.value;

   public boolean hasnext() throws exception {

      string line = null;

      line = readline();

      //資料讀取完成之後行号加一

      this.key = linenumber++;

      this.value = line;

      return null != line;

    * 讀取一行資料

   public string readline() throws exception {

      string line = br.readline();

      //如果讀取到空了之後,将bufferedreader的值變成空

      if (line == null) {

         br.close();

      return line;

 * 用于輸出結果的類

public abstract class outputformat {

    * 将結果寫入檔案中

   public abstract void write(context context) throws exception;

    * 關閉流

    public abstract void cleanup() throws exception;

import java.io.bufferedwriter;

import java.io.filewriter;

import java.util.hashmap;

import java.util.set;

import java.util.map.entry;

public class defaultoutputformat extends outputformat{

    bufferedwriter bw = null;

         @override

         public void write(context context) throws exception {

             string outputpath = context.getconfiguration().get(constants.output_path);

             hashmap<string, integer> kvbuffer = context.getkvbuffer();

             this.bw = new bufferedwriter(new filewriter(outputpath));

             set<entry<string, integer>> entryset = kvbuffer.entryset();

             for (entry<string, integer> entry : entryset) {

                            bw.write(entry.getkey() + "\t" + entry.getvalue() + "\r");

                   }

             bw.flush();

         }

         public void cleanup() throws exception {

                   bw.close();

package cn.toto.bigdata.mymr.common;

public class constants {

   public static final string jar_path = "jar.path";

   public static final string jar_file = "job.jar";

   public static final string worker_host = "worker.host";

   public static final string worker_port = "worker.port";

   public static final string conf_file = "job.conf";

   public static final string input_format = "input.format.class";

   public static final string output_format = "output.format.class";

   public static final string input_path = "input.path";

   public static final string output_path = "output.path";

   public static final string task_processor = "cn.toto.bigdata.mymr.task.taskprocessor";

   public static final string user_logic = "user.logic.class";

   public static final string task_work_dir = "e:/task";

import java.io.file;

import java.io.fileinputstream;

import java.io.objectinputstream;

 * 應用上下文,通過這個内容擷取配置檔案

 * 通過這個上下文最終輸出結果

public class context {

         private hashmap<string, integer> kvbuffer = new hashmap<string, integer>();

         private hashmap<string, string> conf;

         @suppresswarnings("unchecked")

         public context() throws exception {

                   //加載配置參數

                   file file = new file(constants.task_work_dir + "/" + constants.conf_file);

                   if (file.exists()) {

                            @suppresswarnings("resource")

                            objectinputstream oi = new objectinputstream(new fileinputstream(file));

                            this.conf = (hashmap<string, string>) oi.readobject();

                   } else {

                            // throw new runtimeexception("read conf failed ....");

         /**

          * 通過這種變量最後輸出結果

          */

         public void write(string k, integer v) {

                   kvbuffer.put(k, v);

         public hashmap<string, integer> getkvbuffer() {

                   return kvbuffer;

         public void setkvbuffer(hashmap<string, integer> tmpkv) {

                   this.kvbuffer = tmpkv;

          * 擷取配置檔案中的資訊

         public hashmap<string, string> getconfiguration() {

                   return conf;

          * 在context()構造函數裡面已經有了conf的配置,這裡再次傳入說明配置可以讓使用者手動指定

         public void setconfiguration(hashmap<string, string> configuration) {

                   this.conf = configuration;

package cn.toto.bigdata.mymr.userapp;

import cn.toto.bigdata.mymr.task.processlogic;

public class userlogic extends processlogic {

         private hashmap<string, integer> wordcount = new hashmap<string, integer>();

         public void process(integer key, string value, context context) {

                   string [] words = value.split(" ");

                   for(string word : words) {

                            integer count = wordcount.get(word);

                            if (count == null) {

                                     wordcount.put(word, 1);

                            } else {

                                     wordcount.put(word, count + 1);

                            }

         public void cleanup(context context) {

                   set<entry<string, integer>> entryset = wordcount.entryset();

                   for(entry<string, integer> entry : entryset) {

                            context.write(entry.getkey(), entry.getvalue());

import cn.toto.bigdata.mymr.scheduler.runner;

public class userapp {

         public static void main(string[] args) throws exception {

                   hashmap<string, string> conf = new hashmap<string,string>();

                   conf.put(constants.input_path, "e:/a.txt");

                   conf.put(constants.output_path, "e:/out.txt");

                   conf.put(constants.input_format, "cn.toto.bigdata.mymr.io.defaultinputformat");

                   conf.put(constants.output_format, "cn.toto.bigdata.mymr.io.defaultoutputformat");

                   conf.put(constants.jar_path, "e:/test.jar");

                   conf.put(constants.worker_host, "localhost");

                   conf.put(constants.worker_port, "9889");

                   conf.put(constants.user_logic, "cn.toto.bigdata.mymr.userapp.userlogic");

                   runner runner = new runner(conf);

                   runner.submit("localhost", 9889);

package cn.toto.bigdata.mymr.scheduler;

import java.io.fileoutputstream;

import java.io.objectoutputstream;

public class runner {

    private hashmap<string, string> conf;

    public runner(hashmap<string, string> conf) {

             this.conf = conf;

    }

    public void submit(string host,int port) throws exception {

             objectoutputstream jobconfstream = new objectoutputstream(new fileoutputstream(constants.conf_file));

                   jobconfstream.writeobject(conf);

                   workerclient workerclient = new workerclient(conf);

                   workerclient.submit();

import java.io.outputstream;

import java.net.socket;

public class workerclient {

         socket socket = null;

         outputstream so = null;

         public workerclient(hashmap<string, string> conf) {

                   this.conf = conf;

         public void submit() throws exception {

                   socket = new socket(conf.get(constants.worker_host), integer.parseint(conf.get(constants.worker_port)));

                   so = socket.getoutputstream();

                   string jarpath = conf.get(constants.jar_path);

                   // 發送jar包

                   byte[] buff = new byte[4096];

                   fileinputstream jarins = new fileinputstream(jarpath);

                   so.write("jarfile".getbytes());

                   int read = 0;

                   while ((read=jarins.read(buff)) != -1) {

                            so.write(buff,0,read);

                   jarins.close();

                   so.close();

                   socket.close();

                   // 發送job.conf檔案

                   fileinputstream confins = new fileinputstream(constants.conf_file);

                   so.write("jobconf".getbytes());

                   while ((read = confins.read(buff)) != -1) {

                   confins.close();

                   // 發送啟動指令

                   so.write("job2run".getbytes());

                   string shell = "java -cp e:/test.jar cn.toto.bigdata.mymr.task.taskprocessor";

                   so.write(shell.getbytes());

import java.net.serversocket;

public class workerserver {

                   serversocket ssc = new serversocket(9889);

                   system.out.println("worker伺服器啟動-->9889");

                   while (true) {

                            socket accept = ssc.accept();

                            new thread(new workerrunnable(accept)).start();

import java.io.inputstream;

import java.io.inputstreamreader;

public class workerrunnable implements runnable {

         socket socket;

         inputstream in = null;

         volatile long confsize = 0;

         volatile long jarsize = 0;

         public workerrunnable(socket socket) {

                   this.socket = socket;

         public void run() {

                   try {

                            this.in = socket.getinputstream();

                            byte[] protocal = new byte[7];

                            int read = in.read(protocal, 0, 7);

                            if (read < 7) {

                                     system.out.println("用戶端請求不符合協定規範......");

                                     return;

                            string command = new string(protocal);

                            switch (command) {

                            case "jarfile":

                                     receivejarfile();

                                     break;

                            case "jobconf":

                                     receiveconffile();

                            case "job2run":

                                     runjob();

                            default:

                                     system.out.println("用戶端請求不符合協定規範.....");

                                     socket.close();

                   } catch (exception e) {

                            e.printstacktrace();

         private void receiveconffile() throws exception {

                   system.out.println("開始接收conf檔案");

                   fileoutputstream fo = new fileoutputstream(constants.task_work_dir + "/" + constants.conf_file);

                   while ((read = in.read(buff)) != -1) {

                            confsize += read;

                            fo.write(buff, 0, read);

                   fo.flush();

                   fo.close();

                   in.close();

         private void receivejarfile() throws exception {

                   system.out.println("開始接收jar檔案");

                   fileoutputstream fo = new fileoutputstream(constants.task_work_dir + "/" + constants.jar_file);

                            jarsize += read;

         private void runjob() throws exception {

                   int read = in.read(buff);

                   string shell = new string(buff, 0, read);

                   system.out.println("接收到啟動指令......." + shell);

                   thread.sleep(500);

                   file jarfile = new file(constants.task_work_dir + "/" + constants.jar_file);

                   file conffile = new file(constants.task_work_dir + "/" + constants.conf_file);

                   system.out.println("jarfile 存在?" + jarfile.exists());

                   system.out.println("conffile 存在?" + conffile.exists());

                   system.out.println("jarfile可讀?" + jarfile.canread());

                   system.out.println("jarfile可寫?" + jarfile.canwrite());

                   system.out.println("conffile可讀?" + conffile.canread());

                   system.out.println("conffile可寫?" + conffile.canwrite());

                   system.out.println("jarfile.length():" + jarfile.length());

                   system.out.println("conffile.length():" + conffile.length());

                   /*if (jarfile.length() == jarsize && conffile.length() == confsize) {

                            system.out.println("jar 和 conf 檔案已經準備就緒......");

                   }*/

                   system.out.println("開始啟動資料處理taskprocessor......");

                   process exec = runtime.getruntime().exec(shell);

                   int waitfor = exec.waitfor();

                   inputstream errstream = exec.geterrorstream();

                   bufferedreader errreader = new bufferedreader(new inputstreamreader(errstream));

                   string inline = null;

                   /*

                    * inputstream stdstream = exec.getinputstream(); bufferedreader

                    * stdreader = new bufferedreader(new inputstreamreader(stdstream));

                    * while ((inline = stdreader.readline()) != null) {

                    * system.out.println(inline); }

                    */

                   while ((inline = errreader.readline()) != null) {

                            system.out.println(inline);

                   if (waitfor == 0) {

                            system.out.println("task成功運作完畢.....");

                            system.out.println("task異常退出......");