本案例要實作的目标:
1、模拟修改配置,通過發指令的方式統計一個檔案中出現的單詞的字數。
案例代碼結構如下:
在整個案例中需要有以下幾類檔案:
a:worker服務端,用于類似mapreduce接收jar,接收配置檔案,執行業務邏輯
b:程式用戶端、用于組裝配置檔案、發送業務執行的指令(聽過socket發送jarfile、jobconf、和job2run的指令)
代碼結構,每個包和代碼作用介紹
cn.toto.bigdata.mymr.task
taskprocessor
核心的主體執行程式
processlogic
定義用戶端調用必須實作的方法,相當于webservice中的接口規範
cn.toto.bigdata.mymr.io
inputformat
封裝讀檔案的元件(接口用途)
defaultinputformat
封裝讀檔案的元件的實作類
outputformat
封裝寫檔案的元件(接口用途)
defaultoutputformat
封裝寫檔案的元件的實作
cn.toto.bigdata.mymr.common
constants
常量定義
context
應用上下文,用于存儲計算單詞出現字數的次數的中間變量
cn.toto.bigdata.mymr.userapp
userlogic
用戶端對processlogic規範的實作
userapp
用戶端主入口程式
cn.toto.bigdata.mymr.scheduler
runner
用戶端userapp執行指令是依賴的runner類,通過這裡面的socket發送指令。
workerclient
用戶端執行時需要用到的client相關的代碼
workerserver
userapp執行時,需要提前啟動的服務端
workerrunnable
服務端執行的相關邏輯
運作條件:
1、将mapreduce-my-demo導出成test.jar放置在e:/test.jar下。
2、需要有用于統計用的文本檔案a.txt,檔案在e:\a.txt
内容截圖類似:
假設a.txt内容為:
the true
nobility is
in being
superior to
your previous
self guess
no great
discovery
was ever
made without
a bold
knowledge will
give you
power but
character respect
the sun
is just
rising in
the morning
of another
day i
i figure
life is
a gift
and i
don’t intend
on wasting
3、首先運作:workerserver,相當于是啟動服務端的代碼
4、再次運作:userapp,相當于是用戶端
5、最終的統計結果将顯示在:e:/out.txt中。統計結果如下:
nobility 1
but 1
gift 1
wasting 1
rising 1
don't 1
another 1
i 3
your 1
knowledge 1
sun 1
without 1
life 1
the 2
character 1
and 1
of 1
power 1
just 1
day 1
you 1
on 1
no 1
a 2
give 1
figure 1
previous 1
in 2
will 1
made 1
was 1
is 3
being 1
bold 1
great 1
respect 1
morning 1
the 1
ever 1
superior 1
guess 1
discovery 1
true 1
self 1
to 1
intend 1
6、最終的日志将存儲在:e:/task/task.log,最終的配置和工作用的jar也将會生成到這個目錄下面,效果如下:
其中job.conf的内容為:
生成的task.log效果如下:
接着:具體的代碼實作如下:
taskprocessor代碼如下
package cn.toto.bigdata.mymr.task;
import java.util.hashmap;
import java.util.logging.filehandler;
import java.util.logging.level;
import java.util.logging.logger;
import cn.toto.bigdata.mymr.common.constants;
import cn.toto.bigdata.mymr.common.context;
import cn.toto.bigdata.mymr.io.inputformat;
import cn.toto.bigdata.mymr.io.outputformat;
/**
* 1、核心的主體執行程式
* 這裡是任務執行者
*/
public class taskprocessor {
public static void main(string[] args) throws exception {
// 加載使用者指定的所有配置參數到上下文對象中,同時讀取配置檔案
context context = new context();
//擷取上下文中的配置檔案
hashmap<string, string> conf = context.getconfiguration();
//通過列印日志的方式檢視程式運作的結果
logger logger = logger.getlogger("taskprocessor");
//設定日志的輸出級别是info級别
logger.setlevel(level.info);
filehandler filehandler = new filehandler("e:/task/task.log");
filehandler.setlevel(level.info);
logger.addhandler(filehandler);
logger.info("context:" + context);
logger.info("conf:" + conf);
//初始化檔案讀取元件
//從配置檔案中擷取用于讀取的元件的class資訊
class<?> forname = class.forname(conf.get(constants.input_format));
inputformat inputformat = (inputformat) forname.newinstance();
inputformat.init(context);
//用inputformat元件讀資料,并調用使用者邏輯
class<?> forname2 = class.forname(conf.get(constants.user_logic));
processlogic userlogic = (processlogic) forname2.newinstance();
//對每一行調用使用者邏輯,并通過context将使用者調用結果存儲内部緩存
while(inputformat.hasnext()) {
integer key = inputformat.nextkey();
string value = inputformat.nextvalue();
userlogic.process(key, value, context);
}
userlogic.cleanup(context);
//替使用者輸出結果
class<?> forname3 = class.forname(conf.get(constants.output_format));
outputformat outputformat = (outputformat) forname3.newinstance();
outputformat.write(context);
}
}
processlogic代碼如下:
package cn.toto.bigdata.mymr.task;
import cn.toto.bigdata.mymr.common.context;
* 1、規定的業務邏輯編寫規範
* process() 和 cleanup都沒有寫實作,這裡的實作在用戶端
public abstract class processlogic {
/**
* 這裡的context存儲處理後的結果值
* @param key :行号
* @param value :所在行的一行内容
* @param context :應用上下文的内容
*/
public abstract void process(integer key,string value,context context);
* 通過cleanup輸出處理後的結果
public void cleanup(context context){}
package cn.toto.bigdata.mymr.io;
public abstract class inputformat {
* 擷取下一行要讀的行的位置
public abstract int nextkey();
* 擷取從檔案中讀取的到的行的資訊
public abstract string nextvalue();
* 從檔案中讀取到一行資訊
public abstract string readline() throws exception;
* 判斷是否還可以讀取到下一行的内容
public abstract boolean hasnext() throws exception;
* 初始化要讀取的檔案的路徑和檔案流
public abstract void init(context context) throws exception;
package cn.toto.bigdata.mymr.io;
import java.io.bufferedreader;
import java.io.filereader;
* 這裡是預設的讀取的實作類
public class defaultinputformat extends inputformat{
//這裡表示要讀取的檔案的路徑
private string inputpath;
private bufferedreader br = null;
//這裡的key是指文本中類似讀取到的指針的偏移量,是行号的偏移量
private int key;
//這裡的value是指一行中的資料
private string value;
//預設讀取的行是第0行
private int linenumber = 0;
@override
public void init(context context) throws exception {
//擷取要讀的檔案的路徑
this.inputpath = context.getconfiguration().get(constants.input_path);
//開始初始化輸入流,隻不過,這個流是從檔案中擷取的
this.br = new bufferedreader(new filereader(inputpath));
public int nextkey() {
return this.key;
public string nextvalue() {
return this.value;
public boolean hasnext() throws exception {
string line = null;
line = readline();
//資料讀取完成之後行号加一
this.key = linenumber++;
this.value = line;
return null != line;
* 讀取一行資料
public string readline() throws exception {
string line = br.readline();
//如果讀取到空了之後,将bufferedreader的值變成空
if (line == null) {
br.close();
return line;
* 用于輸出結果的類
public abstract class outputformat {
* 将結果寫入檔案中
public abstract void write(context context) throws exception;
* 關閉流
public abstract void cleanup() throws exception;
import java.io.bufferedwriter;
import java.io.filewriter;
import java.util.hashmap;
import java.util.set;
import java.util.map.entry;
public class defaultoutputformat extends outputformat{
bufferedwriter bw = null;
@override
public void write(context context) throws exception {
string outputpath = context.getconfiguration().get(constants.output_path);
hashmap<string, integer> kvbuffer = context.getkvbuffer();
this.bw = new bufferedwriter(new filewriter(outputpath));
set<entry<string, integer>> entryset = kvbuffer.entryset();
for (entry<string, integer> entry : entryset) {
bw.write(entry.getkey() + "\t" + entry.getvalue() + "\r");
}
bw.flush();
}
public void cleanup() throws exception {
bw.close();
package cn.toto.bigdata.mymr.common;
public class constants {
public static final string jar_path = "jar.path";
public static final string jar_file = "job.jar";
public static final string worker_host = "worker.host";
public static final string worker_port = "worker.port";
public static final string conf_file = "job.conf";
public static final string input_format = "input.format.class";
public static final string output_format = "output.format.class";
public static final string input_path = "input.path";
public static final string output_path = "output.path";
public static final string task_processor = "cn.toto.bigdata.mymr.task.taskprocessor";
public static final string user_logic = "user.logic.class";
public static final string task_work_dir = "e:/task";
import java.io.file;
import java.io.fileinputstream;
import java.io.objectinputstream;
* 應用上下文,通過這個内容擷取配置檔案
* 通過這個上下文最終輸出結果
public class context {
private hashmap<string, integer> kvbuffer = new hashmap<string, integer>();
private hashmap<string, string> conf;
@suppresswarnings("unchecked")
public context() throws exception {
//加載配置參數
file file = new file(constants.task_work_dir + "/" + constants.conf_file);
if (file.exists()) {
@suppresswarnings("resource")
objectinputstream oi = new objectinputstream(new fileinputstream(file));
this.conf = (hashmap<string, string>) oi.readobject();
} else {
// throw new runtimeexception("read conf failed ....");
/**
* 通過這種變量最後輸出結果
*/
public void write(string k, integer v) {
kvbuffer.put(k, v);
public hashmap<string, integer> getkvbuffer() {
return kvbuffer;
public void setkvbuffer(hashmap<string, integer> tmpkv) {
this.kvbuffer = tmpkv;
* 擷取配置檔案中的資訊
public hashmap<string, string> getconfiguration() {
return conf;
* 在context()構造函數裡面已經有了conf的配置,這裡再次傳入說明配置可以讓使用者手動指定
public void setconfiguration(hashmap<string, string> configuration) {
this.conf = configuration;
package cn.toto.bigdata.mymr.userapp;
import cn.toto.bigdata.mymr.task.processlogic;
public class userlogic extends processlogic {
private hashmap<string, integer> wordcount = new hashmap<string, integer>();
public void process(integer key, string value, context context) {
string [] words = value.split(" ");
for(string word : words) {
integer count = wordcount.get(word);
if (count == null) {
wordcount.put(word, 1);
} else {
wordcount.put(word, count + 1);
}
public void cleanup(context context) {
set<entry<string, integer>> entryset = wordcount.entryset();
for(entry<string, integer> entry : entryset) {
context.write(entry.getkey(), entry.getvalue());
import cn.toto.bigdata.mymr.scheduler.runner;
public class userapp {
public static void main(string[] args) throws exception {
hashmap<string, string> conf = new hashmap<string,string>();
conf.put(constants.input_path, "e:/a.txt");
conf.put(constants.output_path, "e:/out.txt");
conf.put(constants.input_format, "cn.toto.bigdata.mymr.io.defaultinputformat");
conf.put(constants.output_format, "cn.toto.bigdata.mymr.io.defaultoutputformat");
conf.put(constants.jar_path, "e:/test.jar");
conf.put(constants.worker_host, "localhost");
conf.put(constants.worker_port, "9889");
conf.put(constants.user_logic, "cn.toto.bigdata.mymr.userapp.userlogic");
runner runner = new runner(conf);
runner.submit("localhost", 9889);
package cn.toto.bigdata.mymr.scheduler;
import java.io.fileoutputstream;
import java.io.objectoutputstream;
public class runner {
private hashmap<string, string> conf;
public runner(hashmap<string, string> conf) {
this.conf = conf;
}
public void submit(string host,int port) throws exception {
objectoutputstream jobconfstream = new objectoutputstream(new fileoutputstream(constants.conf_file));
jobconfstream.writeobject(conf);
workerclient workerclient = new workerclient(conf);
workerclient.submit();
import java.io.outputstream;
import java.net.socket;
public class workerclient {
socket socket = null;
outputstream so = null;
public workerclient(hashmap<string, string> conf) {
this.conf = conf;
public void submit() throws exception {
socket = new socket(conf.get(constants.worker_host), integer.parseint(conf.get(constants.worker_port)));
so = socket.getoutputstream();
string jarpath = conf.get(constants.jar_path);
// 發送jar包
byte[] buff = new byte[4096];
fileinputstream jarins = new fileinputstream(jarpath);
so.write("jarfile".getbytes());
int read = 0;
while ((read=jarins.read(buff)) != -1) {
so.write(buff,0,read);
jarins.close();
so.close();
socket.close();
// 發送job.conf檔案
fileinputstream confins = new fileinputstream(constants.conf_file);
so.write("jobconf".getbytes());
while ((read = confins.read(buff)) != -1) {
confins.close();
// 發送啟動指令
so.write("job2run".getbytes());
string shell = "java -cp e:/test.jar cn.toto.bigdata.mymr.task.taskprocessor";
so.write(shell.getbytes());
import java.net.serversocket;
public class workerserver {
serversocket ssc = new serversocket(9889);
system.out.println("worker伺服器啟動-->9889");
while (true) {
socket accept = ssc.accept();
new thread(new workerrunnable(accept)).start();
import java.io.inputstream;
import java.io.inputstreamreader;
public class workerrunnable implements runnable {
socket socket;
inputstream in = null;
volatile long confsize = 0;
volatile long jarsize = 0;
public workerrunnable(socket socket) {
this.socket = socket;
public void run() {
try {
this.in = socket.getinputstream();
byte[] protocal = new byte[7];
int read = in.read(protocal, 0, 7);
if (read < 7) {
system.out.println("用戶端請求不符合協定規範......");
return;
string command = new string(protocal);
switch (command) {
case "jarfile":
receivejarfile();
break;
case "jobconf":
receiveconffile();
case "job2run":
runjob();
default:
system.out.println("用戶端請求不符合協定規範.....");
socket.close();
} catch (exception e) {
e.printstacktrace();
private void receiveconffile() throws exception {
system.out.println("開始接收conf檔案");
fileoutputstream fo = new fileoutputstream(constants.task_work_dir + "/" + constants.conf_file);
while ((read = in.read(buff)) != -1) {
confsize += read;
fo.write(buff, 0, read);
fo.flush();
fo.close();
in.close();
private void receivejarfile() throws exception {
system.out.println("開始接收jar檔案");
fileoutputstream fo = new fileoutputstream(constants.task_work_dir + "/" + constants.jar_file);
jarsize += read;
private void runjob() throws exception {
int read = in.read(buff);
string shell = new string(buff, 0, read);
system.out.println("接收到啟動指令......." + shell);
thread.sleep(500);
file jarfile = new file(constants.task_work_dir + "/" + constants.jar_file);
file conffile = new file(constants.task_work_dir + "/" + constants.conf_file);
system.out.println("jarfile 存在?" + jarfile.exists());
system.out.println("conffile 存在?" + conffile.exists());
system.out.println("jarfile可讀?" + jarfile.canread());
system.out.println("jarfile可寫?" + jarfile.canwrite());
system.out.println("conffile可讀?" + conffile.canread());
system.out.println("conffile可寫?" + conffile.canwrite());
system.out.println("jarfile.length():" + jarfile.length());
system.out.println("conffile.length():" + conffile.length());
/*if (jarfile.length() == jarsize && conffile.length() == confsize) {
system.out.println("jar 和 conf 檔案已經準備就緒......");
}*/
system.out.println("開始啟動資料處理taskprocessor......");
process exec = runtime.getruntime().exec(shell);
int waitfor = exec.waitfor();
inputstream errstream = exec.geterrorstream();
bufferedreader errreader = new bufferedreader(new inputstreamreader(errstream));
string inline = null;
/*
* inputstream stdstream = exec.getinputstream(); bufferedreader
* stdreader = new bufferedreader(new inputstreamreader(stdstream));
* while ((inline = stdreader.readline()) != null) {
* system.out.println(inline); }
*/
while ((inline = errreader.readline()) != null) {
system.out.println(inline);
if (waitfor == 0) {
system.out.println("task成功運作完畢.....");
system.out.println("task異常退出......");