天天看点

自娱小程序--超大文件topN

 设计思路:

new 一个线程沲

打开文件,nio或reader,nio打开,map一大块mappedbytebuffer,从buffer中读出一定大小的数据,定位到最后一个'\n',最后一个'\n'及之前的数据put到一个线程执行类实例buffer中,余下的put到一个临时buffer里,下一次循环时处理这部分内容,在线程的执行体中,先行rewind bytebuffer,循环处理buffer,读到一个完整的import语句put到map里,buffer处理完成后合并map到全局concurrentmap中。bio的则是读一定的行数后submit线程到线程沲,之后,用正则表达式处理每一行生成map,处理完成后合并map

上代码:

=========================nio================================= 

package com.arvey.files;

import java.io.file;

import java.io.fileinputstream;

import java.io.filenotfoundexception;

import java.io.ioexception;

import java.nio.bytebuffer;

import java.nio.mappedbytebuffer;

import java.nio.channels.filechannel;

import java.util.arraylist;

import java.util.collections;

import java.util.comparator;

import java.util.hashmap;

import java.util.list;

import java.util.map;

import java.util.concurrent.concurrenthashmap;

import java.util.concurrent.executorservice;

import java.util.concurrent.executors;

public class statecounternio {

public static final int mappedsize = 5*4*1024;

public static final int handlesize = 4*1024;

public final static int executorsnum = 200;

string file="/users/arvey/wwork/docs.code.clean/docsapp/bigfile.src";

//string file="/users/arvey/wwork/docs.full/source.code.201512.clean/bigfile.src";

//string file="/users/arvey/wwork/gsafety.code.donotdeleted/cloud-core/bigfile.src";

//string file="/users/arvey/wwork/docs.full/source.code.201512.clean/cleanertype.java";

public static concurrenthashmap<string,integer> result = new concurrenthashmap<string,integer>();

//public static pattern pattern = pattern.compile("^(import.+);");

private executorservice pool = executors.newfixedthreadpool(executorsnum);

//private handlebuffer ahandle;

public static synchronized void updatetoplist(map<string,integer> partial){

for(string key:partial.keyset()){

if(result.containskey(key)){

result.put(key, (integer)result.get(key)+(integer)partial.get(key));

}else

result.put(key, (integer)partial.get(key));

}

public void gettop10(){

file afile = new file(file);

long filelength = afile.length();

fileinputstream fis = null;

filechannel fc = null;

long foffset = 0l;

//mappedbytebuffer buffer = (mappedbytebuffer) mappedbytebuffer.allocate(5*4*1024);

mappedbytebuffer buffer = null;

bytebuffer tmpbytebuffer = bytebuffer.allocate(statecounternio.handlesize);

byte[] tmpbytearray = new byte[statecounternio.handlesize];

try {

fis = new fileinputstream(afile);

fc = fis.getchannel();

while(foffset<filelength){

long buffersize = math.min(filelength-foffset,statecounternio.mappedsize);

buffer = fc.map(filechannel.mapmode.read_only, foffset, buffersize);

while( buffer.position() < buffersize ){

handlebuffer ahandle = new handlebuffer();

//boolean submit = false;

if(tmpbytebuffer.position() > 0){

byte[] tmpba = new byte[tmpbytebuffer.position()];

tmpbytebuffer.rewind();

tmpbytebuffer.get(tmpba);

ahandle.getmbuffer().put(tmpba);

tmpbytebuffer.clear();

int tmpbacap = math.min(math.min(statecounternio.handlesize, (int)(buffersize - buffer.position())), statecounternio.handlesize - ahandle.getmbuffer().position() );

buffer.get(tmpbytearray,0,tmpbacap);

//end of file

if(buffer.position() == buffersize && (foffset+buffersize == filelength)){

ahandle.getmbuffer().put(tmpbytearray,0,tmpbacap);

} else {

for( int i = tmpbacap-1;i>=0;i-- ){

if(i == 0){//this means that no '\n' in the whole buffer, then put full handle buffer and submit

tmpbytebuffer.put(tmpbytearray,0,tmpbacap);

if ( tmpbytearray[i] == '\n'){

ahandle.getmbuffer().put(tmpbytearray, 0, i);

//put those byte into tmpbytebuffer which will handle with next buffer

if( i != tmpbacap-1 )

tmpbytebuffer.put(tmpbytearray,i,tmpbacap-i);

break;

pool.submit( ahandle );

foffset += buffer.position();

buffer.clear();

//if(pool.

} catch (filenotfoundexception e) {

// todo auto-generated catch block

e.printstacktrace();

} catch (ioexception e) {

} finally {

if (fis != null)

fis.close();

if( fc != null )

fc.close();

pool.shutdown();

while(!pool.isterminated()){

thread.sleep(2000);

} catch (interruptedexception e) {

class handlebuffer implements runnable{

bytebuffer mbuffer = bytebuffer.allocate(4*1024);

public bytebuffer getmbuffer(){

return mbuffer;

@override

public void run() {

map<string,integer> amap = new hashmap<string,integer>();

byte[] bimport = "import ".getbytes();

int bimport_index = 0,markedpos = 0; 

boolean isimportline = false;

int availabesize = mbuffer.position();

mbuffer.rewind();

while(mbuffer.position() < availabesize)

{

//mbuffer.

byte abyte = mbuffer.get();

if(!isimportline && bimport_index< bimport.length && abyte == bimport[bimport_index] ){

bimport_index++;

if( bimport_index == bimport.length ){

isimportline = true;

markedpos = mbuffer.position() - bimport.length;

else if( abyte == '\n' && isimportline){

byte[] tmp = new byte[mbuffer.position() - markedpos];

mbuffer.position(markedpos);

mbuffer.get(tmp);

string aimport = new string( tmp ).trim();

if(amap.containskey(aimport)){

amap.put(aimport, (integer)amap.get(aimport)+1);

}else{

amap.put(aimport, 1);

isimportline = false;

bimport_index=0;

} else if(!isimportline && bimport_index != 0){//清除没有读到完整"import "时的index

bimport_index = 0;

statecounternio.updatetoplist(amap);

public static void main(string[] args) {

// todo auto-generated method stub

long startat = system.currenttimemillis();

statecounternio anio = new statecounternio();

anio.gettop10();

list<map.entry<string,integer>> slist = new arraylist<map.entry<string,integer>>(result.entryset());

collections.sort(slist,new comparator<map.entry<string,integer>>(){

public int compare(map.entry<string, integer> o1, map.entry<string, integer> o2) {

if(o2.getvalue()!=null&&o1.getvalue()!=null&&o2.getvalue().compareto(o1.getvalue())>0){  

return 1;

return -1;

});

int index=0;

for(map.entry<string,integer> aentry: slist){

system.out.println(aentry.getkey() + "--"+ aentry.getvalue());

if(index++>=100)

//system.out.println("the thread counter is " + acount.getpoolcounter());

system.out.println("the cost is " + (system.currenttimemillis()-startat) );

==================================bio======================================

import java.io.bufferedreader;

import java.io.inputstreamreader;

import java.util.treemap;

import java.util.regex.matcher;

import java.util.regex.pattern;

public class statecountbio {

executorservice pool = executors.newfixedthreadpool(executorsnum);

public static pattern pattern = pattern.compile("^(import.+);");

private bufferedreader freader;

private int poolcounter = 0;

public int getpoolcounter(){

return poolcounter;

file bigfile = new file(file);

fileinputstream fio = null;

fio = new fileinputstream(bigfile);

//bufferedreader freader = new bufferedreader(new inputstreamreader(new fileinputstream(bigfile)));

//filechannel frchannel = fio.getchannel();

inputstreamreader areader = new inputstreamreader(fio);

freader = new bufferedreader(areader);

boolean notreachedend = true;

while(notreachedend){

string content=null;

int index = 0;

handlerun aninst = new handlerun();

stringbuilder awriter = aninst.getbuffer();

while (index < 1024){

if( (content = freader.readline())!=null){

awriter.append(content+"\n");

index++;

notreachedend = false;

//system.out.println("start a new thread!");

poolcounter++;

pool.submit(aninst);

//areader

} finally{

fio.close();

while( !pool.isterminated() ){

thread.sleep( 2000 );

class handlerun implements runnable{

stringbuilder buffer = new stringbuilder();

public stringbuilder getbuffer(){

return buffer;

string[] alllines = buffer.tostring().split("\n");

map<string,integer> res = new hashmap<string,integer>();

for(string aline:alllines){

matcher m = pattern.matcher(aline);

if(m.find()){

string key = m.group(0);

if(res.containskey(key)){

res.put(key, (integer)res.get(key)+1);

res.put(key, 1);

statecountbio.updatetoplist(res);

//system.out.println("the current thread complementd!");

static class valuecomparator implements comparator<string>{

map<string,integer> map = result;

public valuecomparator(){

public int compare(string o1, string o2) {

if(map.get(o1) >= map.get(o2))

else

statecountbio acount = new statecountbio();

acount.gettop10();

collections.sort(slist,new comparator<map.entry<string, integer>>(){

public int compare(map.entry<string, integer> o1, map.entry<string, integer> o2){

 if(o2.getvalue()!=null&&o1.getvalue()!=null&&o2.getvalue().compareto(o1.getvalue())>0){  

           return 1;  

          }else{  

           return -1;  

          }

system.out.println("the thread counter is " + acount.getpoolcounter());

//for(string key:sorted_map.keyset()){

//system.out.println(key+"----"+sorted_map.get(key));

//}

//for(string key:result.keyset()){

//system.out.println(key+"----"+result.get(key));

=====================================================

效率分析

处理大文件 文件size达到8967006720时 线程沲200 100 50对比如下(五次执行平均结果):

                       nio                bio

200                139843          67376 

100                136914          66576

50                  140000          67249  

为何nio的要慢于bio的呢?

nio在处理线程中遍例buffer,是不是这个原因造成的呢?当增加每次buffer处理的容量时,性能提升明显,如文件每次map的和每一个线程处理的buffer的空间扩容10增时,在50个线程时,数据降到82439,但是对于bio的调整一次处理的行数,性能变化很小,程序运行时间略有增长(726**),then什么才能获得最好的性能呢!