設計思路:
new 一個線程沲
打開檔案,nio或reader,nio打開,map一大塊mappedbytebuffer,從buffer中讀出一定大小的資料,定位到最後一個'\n',最後一個'\n'及之前的資料put到一個線程執行類執行個體buffer中,餘下的put到一個臨時buffer裡,下一次循環時處理這部分内容,線上程的執行體中,先行rewind bytebuffer,循環處理buffer,讀到一個完整的import語句put到map裡,buffer處理完成後合并map到全局concurrentmap中。bio的則是讀一定的行數後submit線程到線程沲,之後,用正規表達式處理每一行生成map,處理完成後合并map
上代碼:
=========================nio=================================
package com.arvey.files;
import java.io.file;
import java.io.fileinputstream;
import java.io.filenotfoundexception;
import java.io.ioexception;
import java.nio.bytebuffer;
import java.nio.mappedbytebuffer;
import java.nio.channels.filechannel;
import java.util.arraylist;
import java.util.collections;
import java.util.comparator;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.concurrent.concurrenthashmap;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
public class statecounternio {
public static final int mappedsize = 5*4*1024;
public static final int handlesize = 4*1024;
public final static int executorsnum = 200;
string file="/users/arvey/wwork/docs.code.clean/docsapp/bigfile.src";
//string file="/users/arvey/wwork/docs.full/source.code.201512.clean/bigfile.src";
//string file="/users/arvey/wwork/gsafety.code.donotdeleted/cloud-core/bigfile.src";
//string file="/users/arvey/wwork/docs.full/source.code.201512.clean/cleanertype.java";
public static concurrenthashmap<string,integer> result = new concurrenthashmap<string,integer>();
//public static pattern pattern = pattern.compile("^(import.+);");
private executorservice pool = executors.newfixedthreadpool(executorsnum);
//private handlebuffer ahandle;
public static synchronized void updatetoplist(map<string,integer> partial){
for(string key:partial.keyset()){
if(result.containskey(key)){
result.put(key, (integer)result.get(key)+(integer)partial.get(key));
}else
result.put(key, (integer)partial.get(key));
}
public void gettop10(){
file afile = new file(file);
long filelength = afile.length();
fileinputstream fis = null;
filechannel fc = null;
long foffset = 0l;
//mappedbytebuffer buffer = (mappedbytebuffer) mappedbytebuffer.allocate(5*4*1024);
mappedbytebuffer buffer = null;
bytebuffer tmpbytebuffer = bytebuffer.allocate(statecounternio.handlesize);
byte[] tmpbytearray = new byte[statecounternio.handlesize];
try {
fis = new fileinputstream(afile);
fc = fis.getchannel();
while(foffset<filelength){
long buffersize = math.min(filelength-foffset,statecounternio.mappedsize);
buffer = fc.map(filechannel.mapmode.read_only, foffset, buffersize);
while( buffer.position() < buffersize ){
handlebuffer ahandle = new handlebuffer();
//boolean submit = false;
if(tmpbytebuffer.position() > 0){
byte[] tmpba = new byte[tmpbytebuffer.position()];
tmpbytebuffer.rewind();
tmpbytebuffer.get(tmpba);
ahandle.getmbuffer().put(tmpba);
tmpbytebuffer.clear();
int tmpbacap = math.min(math.min(statecounternio.handlesize, (int)(buffersize - buffer.position())), statecounternio.handlesize - ahandle.getmbuffer().position() );
buffer.get(tmpbytearray,0,tmpbacap);
//end of file
if(buffer.position() == buffersize && (foffset+buffersize == filelength)){
ahandle.getmbuffer().put(tmpbytearray,0,tmpbacap);
} else {
for( int i = tmpbacap-1;i>=0;i-- ){
if(i == 0){//this means that no '\n' in the whole buffer, then put full handle buffer and submit
tmpbytebuffer.put(tmpbytearray,0,tmpbacap);
}
if ( tmpbytearray[i] == '\n'){
ahandle.getmbuffer().put(tmpbytearray, 0, i);
//put those byte into tmpbytebuffer which will handle with next buffer
if( i != tmpbacap-1 )
tmpbytebuffer.put(tmpbytearray,i,tmpbacap-i);
break;
pool.submit( ahandle );
foffset += buffer.position();
buffer.clear();
//if(pool.
} catch (filenotfoundexception e) {
// todo auto-generated catch block
e.printstacktrace();
} catch (ioexception e) {
} finally {
if (fis != null)
fis.close();
if( fc != null )
fc.close();
pool.shutdown();
while(!pool.isterminated()){
thread.sleep(2000);
} catch (interruptedexception e) {
class handlebuffer implements runnable{
bytebuffer mbuffer = bytebuffer.allocate(4*1024);
public bytebuffer getmbuffer(){
return mbuffer;
@override
public void run() {
map<string,integer> amap = new hashmap<string,integer>();
byte[] bimport = "import ".getbytes();
int bimport_index = 0,markedpos = 0;
boolean isimportline = false;
int availabesize = mbuffer.position();
mbuffer.rewind();
while(mbuffer.position() < availabesize)
{
//mbuffer.
byte abyte = mbuffer.get();
if(!isimportline && bimport_index< bimport.length && abyte == bimport[bimport_index] ){
bimport_index++;
if( bimport_index == bimport.length ){
isimportline = true;
markedpos = mbuffer.position() - bimport.length;
else if( abyte == '\n' && isimportline){
byte[] tmp = new byte[mbuffer.position() - markedpos];
mbuffer.position(markedpos);
mbuffer.get(tmp);
string aimport = new string( tmp ).trim();
if(amap.containskey(aimport)){
amap.put(aimport, (integer)amap.get(aimport)+1);
}else{
amap.put(aimport, 1);
isimportline = false;
bimport_index=0;
} else if(!isimportline && bimport_index != 0){//清除沒有讀到完整"import "時的index
bimport_index = 0;
statecounternio.updatetoplist(amap);
public static void main(string[] args) {
// todo auto-generated method stub
long startat = system.currenttimemillis();
statecounternio anio = new statecounternio();
anio.gettop10();
list<map.entry<string,integer>> slist = new arraylist<map.entry<string,integer>>(result.entryset());
collections.sort(slist,new comparator<map.entry<string,integer>>(){
public int compare(map.entry<string, integer> o1, map.entry<string, integer> o2) {
if(o2.getvalue()!=null&&o1.getvalue()!=null&&o2.getvalue().compareto(o1.getvalue())>0){
return 1;
return -1;
});
int index=0;
for(map.entry<string,integer> aentry: slist){
system.out.println(aentry.getkey() + "--"+ aentry.getvalue());
if(index++>=100)
//system.out.println("the thread counter is " + acount.getpoolcounter());
system.out.println("the cost is " + (system.currenttimemillis()-startat) );
==================================bio======================================
import java.io.bufferedreader;
import java.io.inputstreamreader;
import java.util.treemap;
import java.util.regex.matcher;
import java.util.regex.pattern;
public class statecountbio {
executorservice pool = executors.newfixedthreadpool(executorsnum);
public static pattern pattern = pattern.compile("^(import.+);");
private bufferedreader freader;
private int poolcounter = 0;
public int getpoolcounter(){
return poolcounter;
file bigfile = new file(file);
fileinputstream fio = null;
fio = new fileinputstream(bigfile);
//bufferedreader freader = new bufferedreader(new inputstreamreader(new fileinputstream(bigfile)));
//filechannel frchannel = fio.getchannel();
inputstreamreader areader = new inputstreamreader(fio);
freader = new bufferedreader(areader);
boolean notreachedend = true;
while(notreachedend){
string content=null;
int index = 0;
handlerun aninst = new handlerun();
stringbuilder awriter = aninst.getbuffer();
while (index < 1024){
if( (content = freader.readline())!=null){
awriter.append(content+"\n");
index++;
notreachedend = false;
//system.out.println("start a new thread!");
poolcounter++;
pool.submit(aninst);
//areader
} finally{
fio.close();
while( !pool.isterminated() ){
thread.sleep( 2000 );
class handlerun implements runnable{
stringbuilder buffer = new stringbuilder();
public stringbuilder getbuffer(){
return buffer;
string[] alllines = buffer.tostring().split("\n");
map<string,integer> res = new hashmap<string,integer>();
for(string aline:alllines){
matcher m = pattern.matcher(aline);
if(m.find()){
string key = m.group(0);
if(res.containskey(key)){
res.put(key, (integer)res.get(key)+1);
res.put(key, 1);
statecountbio.updatetoplist(res);
//system.out.println("the current thread complementd!");
static class valuecomparator implements comparator<string>{
map<string,integer> map = result;
public valuecomparator(){
public int compare(string o1, string o2) {
if(map.get(o1) >= map.get(o2))
else
statecountbio acount = new statecountbio();
acount.gettop10();
collections.sort(slist,new comparator<map.entry<string, integer>>(){
public int compare(map.entry<string, integer> o1, map.entry<string, integer> o2){
if(o2.getvalue()!=null&&o1.getvalue()!=null&&o2.getvalue().compareto(o1.getvalue())>0){
return 1;
}else{
return -1;
}
system.out.println("the thread counter is " + acount.getpoolcounter());
//for(string key:sorted_map.keyset()){
//system.out.println(key+"----"+sorted_map.get(key));
//}
//for(string key:result.keyset()){
//system.out.println(key+"----"+result.get(key));
=====================================================
效率分析
處理大檔案 檔案size達到8967006720時 線程沲200 100 50對比如下(五次執行平均結果):
nio bio
200 139843 67376
100 136914 66576
50 140000 67249
為何nio的要慢于bio的呢?
nio在處理線程中遍例buffer,是不是這個原因造成的呢?當增加每次buffer處理的容量時,性能提升明顯,如檔案每次map的和每一個線程處理的buffer的空間擴容10增時,在50個線程時,資料降到82439,但是對于bio的調整一次處理的行數,性能變化很小,程式運作時間略有增長(726**),then什麼才能獲得最好的性能呢!