设计思路:
new 一个线程沲
打开文件,nio或reader,nio打开,map一大块mappedbytebuffer,从buffer中读出一定大小的数据,定位到最后一个'\n',最后一个'\n'及之前的数据put到一个线程执行类实例buffer中,余下的put到一个临时buffer里,下一次循环时处理这部分内容,在线程的执行体中,先行rewind bytebuffer,循环处理buffer,读到一个完整的import语句put到map里,buffer处理完成后合并map到全局concurrentmap中。bio的则是读一定的行数后submit线程到线程沲,之后,用正则表达式处理每一行生成map,处理完成后合并map
上代码:
=========================nio=================================
package com.arvey.files;
import java.io.file;
import java.io.fileinputstream;
import java.io.filenotfoundexception;
import java.io.ioexception;
import java.nio.bytebuffer;
import java.nio.mappedbytebuffer;
import java.nio.channels.filechannel;
import java.util.arraylist;
import java.util.collections;
import java.util.comparator;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.concurrent.concurrenthashmap;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
public class statecounternio {
public static final int mappedsize = 5*4*1024;
public static final int handlesize = 4*1024;
public final static int executorsnum = 200;
string file="/users/arvey/wwork/docs.code.clean/docsapp/bigfile.src";
//string file="/users/arvey/wwork/docs.full/source.code.201512.clean/bigfile.src";
//string file="/users/arvey/wwork/gsafety.code.donotdeleted/cloud-core/bigfile.src";
//string file="/users/arvey/wwork/docs.full/source.code.201512.clean/cleanertype.java";
public static concurrenthashmap<string,integer> result = new concurrenthashmap<string,integer>();
//public static pattern pattern = pattern.compile("^(import.+);");
private executorservice pool = executors.newfixedthreadpool(executorsnum);
//private handlebuffer ahandle;
public static synchronized void updatetoplist(map<string,integer> partial){
for(string key:partial.keyset()){
if(result.containskey(key)){
result.put(key, (integer)result.get(key)+(integer)partial.get(key));
}else
result.put(key, (integer)partial.get(key));
}
public void gettop10(){
file afile = new file(file);
long filelength = afile.length();
fileinputstream fis = null;
filechannel fc = null;
long foffset = 0l;
//mappedbytebuffer buffer = (mappedbytebuffer) mappedbytebuffer.allocate(5*4*1024);
mappedbytebuffer buffer = null;
bytebuffer tmpbytebuffer = bytebuffer.allocate(statecounternio.handlesize);
byte[] tmpbytearray = new byte[statecounternio.handlesize];
try {
fis = new fileinputstream(afile);
fc = fis.getchannel();
while(foffset<filelength){
long buffersize = math.min(filelength-foffset,statecounternio.mappedsize);
buffer = fc.map(filechannel.mapmode.read_only, foffset, buffersize);
while( buffer.position() < buffersize ){
handlebuffer ahandle = new handlebuffer();
//boolean submit = false;
if(tmpbytebuffer.position() > 0){
byte[] tmpba = new byte[tmpbytebuffer.position()];
tmpbytebuffer.rewind();
tmpbytebuffer.get(tmpba);
ahandle.getmbuffer().put(tmpba);
tmpbytebuffer.clear();
int tmpbacap = math.min(math.min(statecounternio.handlesize, (int)(buffersize - buffer.position())), statecounternio.handlesize - ahandle.getmbuffer().position() );
buffer.get(tmpbytearray,0,tmpbacap);
//end of file
if(buffer.position() == buffersize && (foffset+buffersize == filelength)){
ahandle.getmbuffer().put(tmpbytearray,0,tmpbacap);
} else {
for( int i = tmpbacap-1;i>=0;i-- ){
if(i == 0){//this means that no '\n' in the whole buffer, then put full handle buffer and submit
tmpbytebuffer.put(tmpbytearray,0,tmpbacap);
}
if ( tmpbytearray[i] == '\n'){
ahandle.getmbuffer().put(tmpbytearray, 0, i);
//put those byte into tmpbytebuffer which will handle with next buffer
if( i != tmpbacap-1 )
tmpbytebuffer.put(tmpbytearray,i,tmpbacap-i);
break;
pool.submit( ahandle );
foffset += buffer.position();
buffer.clear();
//if(pool.
} catch (filenotfoundexception e) {
// todo auto-generated catch block
e.printstacktrace();
} catch (ioexception e) {
} finally {
if (fis != null)
fis.close();
if( fc != null )
fc.close();
pool.shutdown();
while(!pool.isterminated()){
thread.sleep(2000);
} catch (interruptedexception e) {
class handlebuffer implements runnable{
bytebuffer mbuffer = bytebuffer.allocate(4*1024);
public bytebuffer getmbuffer(){
return mbuffer;
@override
public void run() {
map<string,integer> amap = new hashmap<string,integer>();
byte[] bimport = "import ".getbytes();
int bimport_index = 0,markedpos = 0;
boolean isimportline = false;
int availabesize = mbuffer.position();
mbuffer.rewind();
while(mbuffer.position() < availabesize)
{
//mbuffer.
byte abyte = mbuffer.get();
if(!isimportline && bimport_index< bimport.length && abyte == bimport[bimport_index] ){
bimport_index++;
if( bimport_index == bimport.length ){
isimportline = true;
markedpos = mbuffer.position() - bimport.length;
else if( abyte == '\n' && isimportline){
byte[] tmp = new byte[mbuffer.position() - markedpos];
mbuffer.position(markedpos);
mbuffer.get(tmp);
string aimport = new string( tmp ).trim();
if(amap.containskey(aimport)){
amap.put(aimport, (integer)amap.get(aimport)+1);
}else{
amap.put(aimport, 1);
isimportline = false;
bimport_index=0;
} else if(!isimportline && bimport_index != 0){//清除没有读到完整"import "时的index
bimport_index = 0;
statecounternio.updatetoplist(amap);
public static void main(string[] args) {
// todo auto-generated method stub
long startat = system.currenttimemillis();
statecounternio anio = new statecounternio();
anio.gettop10();
list<map.entry<string,integer>> slist = new arraylist<map.entry<string,integer>>(result.entryset());
collections.sort(slist,new comparator<map.entry<string,integer>>(){
public int compare(map.entry<string, integer> o1, map.entry<string, integer> o2) {
if(o2.getvalue()!=null&&o1.getvalue()!=null&&o2.getvalue().compareto(o1.getvalue())>0){
return 1;
return -1;
});
int index=0;
for(map.entry<string,integer> aentry: slist){
system.out.println(aentry.getkey() + "--"+ aentry.getvalue());
if(index++>=100)
//system.out.println("the thread counter is " + acount.getpoolcounter());
system.out.println("the cost is " + (system.currenttimemillis()-startat) );
==================================bio======================================
import java.io.bufferedreader;
import java.io.inputstreamreader;
import java.util.treemap;
import java.util.regex.matcher;
import java.util.regex.pattern;
public class statecountbio {
executorservice pool = executors.newfixedthreadpool(executorsnum);
public static pattern pattern = pattern.compile("^(import.+);");
private bufferedreader freader;
private int poolcounter = 0;
public int getpoolcounter(){
return poolcounter;
file bigfile = new file(file);
fileinputstream fio = null;
fio = new fileinputstream(bigfile);
//bufferedreader freader = new bufferedreader(new inputstreamreader(new fileinputstream(bigfile)));
//filechannel frchannel = fio.getchannel();
inputstreamreader areader = new inputstreamreader(fio);
freader = new bufferedreader(areader);
boolean notreachedend = true;
while(notreachedend){
string content=null;
int index = 0;
handlerun aninst = new handlerun();
stringbuilder awriter = aninst.getbuffer();
while (index < 1024){
if( (content = freader.readline())!=null){
awriter.append(content+"\n");
index++;
notreachedend = false;
//system.out.println("start a new thread!");
poolcounter++;
pool.submit(aninst);
//areader
} finally{
fio.close();
while( !pool.isterminated() ){
thread.sleep( 2000 );
class handlerun implements runnable{
stringbuilder buffer = new stringbuilder();
public stringbuilder getbuffer(){
return buffer;
string[] alllines = buffer.tostring().split("\n");
map<string,integer> res = new hashmap<string,integer>();
for(string aline:alllines){
matcher m = pattern.matcher(aline);
if(m.find()){
string key = m.group(0);
if(res.containskey(key)){
res.put(key, (integer)res.get(key)+1);
res.put(key, 1);
statecountbio.updatetoplist(res);
//system.out.println("the current thread complementd!");
static class valuecomparator implements comparator<string>{
map<string,integer> map = result;
public valuecomparator(){
public int compare(string o1, string o2) {
if(map.get(o1) >= map.get(o2))
else
statecountbio acount = new statecountbio();
acount.gettop10();
collections.sort(slist,new comparator<map.entry<string, integer>>(){
public int compare(map.entry<string, integer> o1, map.entry<string, integer> o2){
if(o2.getvalue()!=null&&o1.getvalue()!=null&&o2.getvalue().compareto(o1.getvalue())>0){
return 1;
}else{
return -1;
}
system.out.println("the thread counter is " + acount.getpoolcounter());
//for(string key:sorted_map.keyset()){
//system.out.println(key+"----"+sorted_map.get(key));
//}
//for(string key:result.keyset()){
//system.out.println(key+"----"+result.get(key));
=====================================================
效率分析
处理大文件 文件size达到8967006720时 线程沲200 100 50对比如下(五次执行平均结果):
nio bio
200 139843 67376
100 136914 66576
50 140000 67249
为何nio的要慢于bio的呢?
nio在处理线程中遍例buffer,是不是这个原因造成的呢?当增加每次buffer处理的容量时,性能提升明显,如文件每次map的和每一个线程处理的buffer的空间扩容10增时,在50个线程时,数据降到82439,但是对于bio的调整一次处理的行数,性能变化很小,程序运行时间略有增长(726**),then什么才能获得最好的性能呢!