昨晚睡覺前把多線程建立索引demo寫好了,今天早上7點多就起來,趁着勁頭趕緊記錄分享一下,這樣對那些同樣對lucene感興趣的童鞋也有所幫助。
我們都知道lucene的indexwriter在構造初始化的時候會去擷取索引目錄的寫鎖writerlock,加鎖的目的就是保證同時隻能有一個indexwriter執行個體在往索引目錄中寫資料,具體看截圖:

而在多線程環境下,光保證隻有indexwriter執行個體能得到鎖還不行,還必須保證每次隻能有一個線程能擷取到writerlock,lucene内部是如何實作的呢?請看源碼:
indexwriter添加索引文檔是通過adddocument方法實作的,下面是adddocument方法的截圖:
我們發現内部實際調用的是updatedocument方法,繼續跟進updatedocument方法,
updatedocument中ensureopen();首先確定索引目錄已經打開,然後通過docwriter.updatedocument(doc, analyzer, term)真正去更新索引,更新成功後觸發索引merge事件processevents(true, false);docwriter是documentswriter類型,真正執行索引寫操作的類是documentswriter,indexwriter隻是内部維護了一個documentswriter屬性調用它的方法而已,繼續跟進documentswriter類的updatedocument方法,如圖:
final threadstate perthread = flushcontrol.obtainandlock();會視圖去擷取lock,因為索引寫操作不能同時并發執行,沒錯這裡的threadstate就是nio裡的reentrantlock,它跟synchronized作用類似,但它比synchronized控制粒度更小更靈活,能手動在方法内部的任意位置打開和解除鎖,兩者性能且不談,因為随着jvm對代碼的不斷優化,兩者性能上的差異會越來越小。扯遠了,接着前面的繼續說,flushcontrol.obtainandlock()在擷取鎖的時候内部實際是通過perthreadpool.getandlock來擷取鎖的,perthreadpool并不是什麼線程池,準确來說它是一個鎖池,池子裡維護了n把鎖,每個鎖與一個線程id,跟着我繼續看源碼,你就明白了。
perthreadpool是如何擷取lock的呢?繼續看它的getandlock方法:
getandlock需要傳入一個線程,而flushcontrol.obtainandlock()在擷取鎖的時候内部是這樣實作的:
到此,你應該明白了,lucene内部隻是維護了多把鎖而已,并沒有真的去new thread,thread是通過把目前調用線程當作參數傳入的,然後配置設定鎖的時候,每個線程隻配置設定一把鎖,而每把鎖在寫索引的時候都會使用reentrantlock.lock來限制并發寫操作,其實每次對于同一個索引目錄仍然隻能有一個indexwriter在寫索引,那lucene内部維護多把鎖有什麼意義呢?一個索引目錄隻能有一把鎖,那如果有多個索引目錄,每個索引目錄發一把鎖,n個索引目錄同時進行索引寫操作就有意義了。把索引資料全部放一個索引目錄本身就不現實,再說一個檔案夾下能存放的檔案最大數量也不是無窮大的,當一個檔案夾下的檔案數量達到某個數量級會你讀寫性能都會急劇下降的,是以把索引檔案分攤到多個索引目錄是明智之舉,是以,當你需要索引的資料量很龐大的時候,要想提高索引建立的速度,除了要充分利用ramdirectory減少與磁盤io次數外,可以嘗試把索引資料分多索引目錄存儲,個人建議,如果說的不對,請盡情的噴我。下面我貼一個我昨晚寫的多線程建立索引的demo,抛個磚引個玉哈!看代碼:
package com.yida.framework.lucene5.index;
import java.io.bufferedreader;
import java.io.ioexception;
import java.io.inputstream;
import java.io.inputstreamreader;
import java.nio.charset.standardcharsets;
import java.nio.file.filevisitresult;
import java.nio.file.files;
import java.nio.file.linkoption;
import java.nio.file.openoption;
import java.nio.file.path;
import java.nio.file.paths;
import java.nio.file.simplefilevisitor;
import java.nio.file.attribute.basicfileattributes;
import java.util.concurrent.countdownlatch;
import org.apache.lucene.analysis.analyzer;
import org.apache.lucene.document.document;
import org.apache.lucene.document.field;
import org.apache.lucene.document.longfield;
import org.apache.lucene.document.stringfield;
import org.apache.lucene.document.textfield;
import org.apache.lucene.index.indexwriter;
import org.apache.lucene.index.indexwriterconfig;
import org.apache.lucene.index.term;
import org.apache.lucene.index.indexwriterconfig.openmode;
import org.apache.lucene.store.fsdirectory;
import com.yida.framework.lucene5.util.luceneutils;
/**
* 索引建立線程
* @author lanxiaowei
*
*/
public class indexcreator implements runnable {
/**需要讀取的檔案存放目錄*/
private string docpath;
/**索引檔案存放目錄*/
private string lucenedir;
private int threadcount;
private final countdownlatch countdownlatch1;
private final countdownlatch countdownlatch2;
public indexcreator(string docpath, string lucenedir,int threadcount,countdownlatch countdownlatch1,countdownlatch countdownlatch2) {
super();
this.docpath = docpath;
this.lucenedir = lucenedir;
this.threadcount = threadcount;
this.countdownlatch1 = countdownlatch1;
this.countdownlatch2 = countdownlatch2;
}
public void run() {
indexwriter writer = null;
try {
countdownlatch1.await();
analyzer analyzer = luceneutils.analyzer;
fsdirectory directory = luceneutils.openfsdirectory(lucenedir);
indexwriterconfig config = new indexwriterconfig(analyzer);
config.setopenmode(openmode.create_or_append);
writer = luceneutils.getindexwriter(directory, config);
try {
indexdocs(writer, paths.get(docpath));
} catch (ioexception e) {
e.printstacktrace();
}
} catch (interruptedexception e1) {
e1.printstacktrace();
} finally {
luceneutils.closeindexwriter(writer);
countdownlatch2.countdown();
}
/**
*
* @param writer
* 索引寫入器
* @param path
* 檔案路徑
* @throws ioexception
*/
public static void indexdocs(final indexwriter writer, path path)
throws ioexception {
// 如果是目錄,查找目錄下的檔案
if (files.isdirectory(path, new linkoption[0])) {
system.out.println("directory");
files.walkfiletree(path, new simplefilevisitor() {
@override
public filevisitresult visitfile(object file,
basicfileattributes attrs) throws ioexception {
path path = (path)file;
system.out.println(path.getfilename());
indexdoc(writer, path, attrs.lastmodifiedtime().tomillis());
return filevisitresult.continue;
}
});
} else {
indexdoc(writer, path,
files.getlastmodifiedtime(path, new linkoption[0])
.tomillis());
* 讀取檔案建立索引
* @param file
* @param lastmodified
* 檔案最後一次修改時間
public static void indexdoc(indexwriter writer, path file, long lastmodified)
inputstream stream = files.newinputstream(file, new openoption[0]);
document doc = new document();
field pathfield = new stringfield("path", file.tostring(),
field.store.yes);
doc.add(pathfield);
doc.add(new longfield("modified", lastmodified, field.store.yes));
doc.add(new textfield("contents",intputstream2string(stream),field.store.yes));
//doc.add(new textfield("contents", new bufferedreader(new inputstreamreader(stream, standardcharsets.utf_8))));
if (writer.getconfig().getopenmode() == indexwriterconfig.openmode.create) {
system.out.println("adding " + file);
writer.adddocument(doc);
system.out.println("updating " + file);
writer.updatedocument(new term("path", file.tostring()), doc);
writer.commit();
* inputstream轉換成string
* @param is 輸入流對象
* @return
private static string intputstream2string(inputstream is) {
bufferedreader bufferreader = null;
stringbuilder stringbuilder = new stringbuilder();
string line;
bufferreader = new bufferedreader(new inputstreamreader(is, standardcharsets.utf_8));
while ((line = bufferreader.readline()) != null) {
stringbuilder.append(line + "\r\n");
} catch (ioexception e) {
e.printstacktrace();
if (bufferreader != null) {
try {
bufferreader.close();
} catch (ioexception e) {
e.printstacktrace();
return stringbuilder.tostring();
}
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
* 多線程建立索引
public class multithreadindextest {
* 建立了5個線程同時建立索引
* @param args
* @throws interruptedexception
public static void main(string[] args) throws interruptedexception {
int threadcount = 5;
executorservice pool = executors.newfixedthreadpool(threadcount);
countdownlatch countdownlatch1 = new countdownlatch(1);
countdownlatch countdownlatch2 = new countdownlatch(threadcount);
for(int i = 0; i < threadcount; i++) {
runnable runnable = new indexcreator("c:/doc" + (i+1), "c:/lucenedir" + (i+1),threadcount,
countdownlatch1,countdownlatch2);
//子線程交給線程池管理
pool.execute(runnable);
countdownlatch1.countdown();
system.out.println("開始建立索引");
//等待所有線程都完成
countdownlatch2.await();
//線程全部完成工作
system.out.println("所有線程都建立索引完畢");
//釋放線程池資源
pool.shutdown();
package com.yida.framework.lucene5.util;
import java.util.concurrent.locks.lock;
import java.util.concurrent.locks.reentrantlock;
import org.apache.lucene.index.directoryreader;
import org.apache.lucene.index.indexreader;
import org.apache.lucene.search.indexsearcher;
import org.apache.lucene.store.directory;
import org.apache.lucene.store.lockobtainfailedexception;
* lucene索引讀寫器/查詢器單例擷取工具類
public class lucenemanager {
private volatile static lucenemanager singleton;
private volatile static indexwriter writer;
private volatile static indexreader reader;
private volatile static indexsearcher searcher;
private final lock writerlock = new reentrantlock();
//private final lock readerlock = new reentrantlock();
//private final lock searcherlock = new reentrantlock();
private static threadlocal<indexwriter> writerlocal = new threadlocal<indexwriter>();
private lucenemanager() {}
public static lucenemanager getinstance() {
if (null == singleton) {
synchronized (lucenemanager.class) {
if (null == singleton) {
singleton = new lucenemanager();
return singleton;
* 擷取indexwriter單例對象
* @param dir
* @param config
public indexwriter getindexwriter(directory dir, indexwriterconfig config) {
if(null == dir) {
throw new illegalargumentexception("directory can not be null.");
if(null == config) {
throw new illegalargumentexception("indexwriterconfig can not be null.");
writerlock.lock();
writer = writerlocal.get();
if(null != writer) {
return writer;
if(null == writer){
//如果索引目錄被鎖,則直接抛異常
if(indexwriter.islocked(dir)) {
throw new lockobtainfailedexception("directory of index had been locked.");
writer = new indexwriter(dir, config);
writerlocal.set(writer);
} catch (lockobtainfailedexception e) {
writerlock.unlock();
return writer;
* 擷取indexwriter[可能為null]
public indexwriter getindexwriter() {
* 擷取indexreader對象
* @param enablenrtreader 是否開啟nrtreader
public indexreader getindexreader(directory dir,boolean enablenrtreader) {
if(null == reader){
reader = directoryreader.open(dir);
} else {
if(enablenrtreader && reader instanceof directoryreader) {
//開啟近實時reader,能立即看到動态添加/删除的索引變化
reader = directoryreader.openifchanged((directoryreader)reader);
return reader;
* 擷取indexreader對象(預設不啟用netreader)
public indexreader getindexreader(directory dir) {
return getindexreader(dir, false);
* 擷取indexsearcher對象
* @param reader indexreader對象執行個體
* @param executor 如果你需要開啟多線程查詢,請提供executorservice對象參數
public indexsearcher getindexsearcher(indexreader reader,executorservice executor) {
if(null == reader) {
throw new illegalargumentexception("the indexreader can not be null.");
if(null == searcher){
searcher = new indexsearcher(reader);
return searcher;
* 擷取indexsearcher對象(不支援多線程查詢)
public indexsearcher getindexsearcher(indexreader reader) {
return getindexsearcher(reader, null);
* 關閉indexwriter
public void closeindexwriter(indexwriter writer) {
if(null != writer) {
writer.close();
writer = null;
writerlocal.remove();
import java.util.arraylist;
import java.util.collections;
import java.util.list;
import java.util.set;
import org.ansj.lucene5.ansjanalyzer;
import org.apache.lucene.index.indexablefield;
import org.apache.lucene.queryparser.classic.queryparser;
import org.apache.lucene.search.query;
import org.apache.lucene.search.scoredoc;
import org.apache.lucene.search.topdocs;
import org.apache.lucene.search.highlight.formatter;
import org.apache.lucene.search.highlight.fragmenter;
import org.apache.lucene.search.highlight.highlighter;
import org.apache.lucene.search.highlight.invalidtokenoffsetsexception;
import org.apache.lucene.search.highlight.queryscorer;
import org.apache.lucene.search.highlight.scorer;
import org.apache.lucene.search.highlight.simplefragmenter;
import org.apache.lucene.search.highlight.simplehtmlformatter;
* lucene工具類(基于lucene5.0封裝)
public class luceneutils {
private static final lucenemanager lucenemanager = lucenemanager.getinstance();
public static analyzer analyzer = new ansjanalyzer();
* 打開索引目錄
* @param lucenedir
public static fsdirectory openfsdirectory(string lucenedir) {
fsdirectory directory = null;
directory = fsdirectory.open(paths.get(lucenedir));
/**
* 注意:islocked方法内部會試圖去擷取lock,如果擷取到lock,會關閉它,否則return false表示索引目錄沒有被鎖,
* 這也就是為什麼unlock方法被從indexwriter類中移除的原因
*/
indexwriter.islocked(directory);
return directory;
* 關閉索引目錄并銷毀
* @param directory
public static void closedirectory(directory directory) throws ioexception {
if (null != directory) {
directory.close();
directory = null;
* 擷取indexwriter
public static indexwriter getindexwriter(directory dir, indexwriterconfig config) {
return lucenemanager.getindexwriter(dir, config);
public static indexwriter getindexwrtier(string directorypath, indexwriterconfig config) {
fsdirectory directory = openfsdirectory(directorypath);
return lucenemanager.getindexwriter(directory, config);
* 擷取indexreader
public static indexreader getindexreader(directory dir,boolean enablenrtreader) {
return lucenemanager.getindexreader(dir, enablenrtreader);
* 擷取indexreader(預設不啟用nrtreader)
public static indexreader getindexreader(directory dir) {
return lucenemanager.getindexreader(dir);
* 擷取indexsearcher
* @param reader indexreader對象
public static indexsearcher getindexsearcher(indexreader reader,executorservice executor) {
return lucenemanager.getindexsearcher(reader, executor);
* 擷取indexsearcher(不支援多線程查詢)
public static indexsearcher getindexsearcher(indexreader reader) {
return lucenemanager.getindexsearcher(reader);
* 建立queryparser對象
* @param field
* @param analyzer
public static queryparser createqueryparser(string field, analyzer analyzer) {
return new queryparser(field, analyzer);
* 關閉indexreader
* @param reader
public static void closeindexreader(indexreader reader) {
if (null != reader) {
reader.close();
reader = null;
public static void closeindexwriter(indexwriter writer) {
lucenemanager.closeindexwriter(writer);
* 關閉indexreader和indexwriter
public static void closeall(indexreader reader, indexwriter writer) {
closeindexreader(reader);
closeindexwriter(writer);
* 删除索引[注意:請自己關閉indexwriter對象]
* @param value
public static void deleteindex(indexwriter writer, string field, string value) {
writer.deletedocuments(new term[] {new term(field,value)});
* @param query
public static void deleteindex(indexwriter writer, query query) {
writer.deletedocuments(query);
* 批量删除索引[注意:請自己關閉indexwriter對象]
* @param terms
public static void deleteindexs(indexwriter writer,term[] terms) {
writer.deletedocuments(terms);
* @param querys
public static void deleteindexs(indexwriter writer,query[] querys) {
writer.deletedocuments(querys);
* 删除所有索引文檔
public static void deleteallindex(indexwriter writer) {
writer.deleteall();
* 更新索引文檔
* @param term
* @param document
public static void updateindex(indexwriter writer,term term,document document) {
writer.updatedocument(term, document);
public static void updateindex(indexwriter writer,string field,string value,document document) {
updateindex(writer, new term(field, value), document);
* 添加索引文檔
* @param doc
public static void addindex(indexwriter writer, document document) {
updateindex(writer, null, document);
* 索引文檔查詢
* @param searcher
public static list<document> query(indexsearcher searcher,query query) {
topdocs topdocs = null;
topdocs = searcher.search(query, integer.max_value);
scoredoc[] scores = topdocs.scoredocs;
int length = scores.length;
if (length <= 0) {
return collections.emptylist();
list<document> doclist = new arraylist<document>();
for (int i = 0; i < length; i++) {
document doc = searcher.doc(scores[i].doc);
doclist.add(doc);
return doclist;
* 傳回索引文檔的總數[注意:請自己手動關閉indexreader]
public static int getindextotalcount(indexreader reader) {
return reader.numdocs();
* 傳回索引文檔中最大文檔id[注意:請自己手動關閉indexreader]
public static int getmaxdocid(indexreader reader) {
return reader.maxdoc();
* 傳回已經删除尚未送出的文檔總數[注意:請自己手動關閉indexreader]
public static int getdeleteddocnum(indexreader reader) {
return getmaxdocid(reader) - getindextotalcount(reader);
* 根據docid查詢索引文檔
* @param reader indexreader對象
* @param docid documentid
* @param fieldstoload 需要傳回的field
public static document finddocumentbydocid(indexreader reader,int docid, set<string> fieldstoload) {
return reader.document(docid, fieldstoload);
return null;
public static document finddocumentbydocid(indexreader reader,int docid) {
return finddocumentbydocid(reader, docid, null);
* @title: createhighlighter
* @description: 建立高亮器
* @param query 索引查詢對象
* @param prefix 高亮字首字元串
* @param stuffix 高亮字尾字元串
* @param fragmenterlength 摘要最大長度
public static highlighter createhighlighter(query query, string prefix, string stuffix, int fragmenterlength) {
formatter formatter = new simplehtmlformatter((prefix == null || prefix.trim().length() == 0) ?
"<font color=\"red\">" : prefix, (stuffix == null || stuffix.trim().length() == 0)?"</font>" : stuffix);
scorer fragmentscorer = new queryscorer(query);
highlighter highlighter = new highlighter(formatter, fragmentscorer);
fragmenter fragmenter = new simplefragmenter(fragmenterlength <= 0 ? 50 : fragmenterlength);
highlighter.settextfragmenter(fragmenter);
return highlighter;
* @title: highlight
* @description: 生成高亮文本
* @param document 索引文檔對象
* @param highlighter 高亮器
* @param analyzer 索引分詞器
* @param field 高亮字段
* @throws invalidtokenoffsetsexception
public static string highlight(document document,highlighter highlighter,analyzer analyzer,string field) throws ioexception {
list<indexablefield> list = document.getfields();
for (indexablefield fieldable : list) {
string fieldvalue = fieldable.stringvalue();
if(fieldable.name().equals(field)) {
fieldvalue = highlighter.getbestfragment(analyzer, field, fieldvalue);
} catch (invalidtokenoffsetsexception e) {
fieldvalue = fieldable.stringvalue();
return (fieldvalue == null || fieldvalue.trim().length() == 0)? fieldable.stringvalue() : fieldvalue;
return null;
* @title: searchtotalrecord
* @description: 擷取符合條件的總記錄數
public static int searchtotalrecord(indexsearcher search,query query) {
scoredoc[] docs = null;
topdocs topdocs = search.search(query, integer.max_value);
if(topdocs == null || topdocs.scoredocs == null || topdocs.scoredocs.length == 0) {
return 0;
docs = topdocs.scoredocs;
return docs.length;
* @title: pagequery
* @description: lucene分頁查詢
* @param page
public static void pagequery(indexsearcher searcher,directory directory,query query,page<document> page) {
int totalrecord = searchtotalrecord(searcher,query);
//設定總記錄數
page.settotalrecord(totalrecord);
topdocs = searcher.searchafter(page.getafterdoc(),query, page.getpagesize());
scoredoc[] docs = topdocs.scoredocs;
int index = 0;
for (scoredoc scoredoc : docs) {
int docid = scoredoc.doc;
document document = null;
document = searcher.doc(docid);
if(index == docs.length - 1) {
page.setafterdoc(scoredoc);
page.setafterdocid(docid);
doclist.add(document);
index++;
page.setitems(doclist);
closeindexreader(searcher.getindexreader());
* @description: 分頁查詢[如果設定了高亮,則會更新索引文檔]
* @param highlighterparam
* @param writerconfig
public static void pagequery(indexsearcher searcher,directory directory,query query,page<document> page,highlighterparam highlighterparam,indexwriterconfig writerconfig) throws ioexception {
//若未設定高亮
if(null == highlighterparam || !highlighterparam.ishighlight()) {
pagequery(searcher,directory,query, page);
int totalrecord = searchtotalrecord(searcher,query);
system.out.println("totalrecord:" + totalrecord);
//設定總記錄數
page.settotalrecord(totalrecord);
topdocs topdocs = searcher.searchafter(page.getafterdoc(),query, page.getpagesize());
list<document> doclist = new arraylist<document>();
scoredoc[] docs = topdocs.scoredocs;
int index = 0;
writer = getindexwriter(directory, writerconfig);
for (scoredoc scoredoc : docs) {
int docid = scoredoc.doc;
document document = searcher.doc(docid);
string content = document.get(highlighterparam.getfieldname());
if(null != content && content.trim().length() > 0) {
//建立高亮器
highlighter highlighter = luceneutils.createhighlighter(query,
highlighterparam.getprefix(), highlighterparam.getstuffix(),
highlighterparam.getfragmenterlength());
string text = highlight(document, highlighter, analyzer, highlighterparam.getfieldname());
//若高亮後跟原始文本不相同,表示高亮成功
if(!text.equals(content)) {
document tempdocument = new document();
list<indexablefield> indexablefieldlist = document.getfields();
if(null != indexablefieldlist && indexablefieldlist.size() > 0) {
for(indexablefield field : indexablefieldlist) {
if(field.name().equals(highlighterparam.getfieldname())) {
tempdocument.add(new textfield(field.name(), text, field.store.yes));
} else {
tempdocument.add(field);
}
}
}
updateindex(writer, new term(highlighterparam.getfieldname(),content), tempdocument);
document = tempdocument;
}
if(index == docs.length - 1) {
page.setafterdoc(scoredoc);
page.setafterdocid(docid);
doclist.add(document);
index++;
page.setitems(doclist);
demo源碼我會在最底下的附件裡上傳,有需要的請自己下載下傳。demo代碼運作時請先在c盤建5個檔案夾放需要讀取的檔案,建5個檔案夾分别存儲索引檔案,如圖:
ok,為了這篇部落格已經耗時整整1個小時了,打完收工!下一篇準備說說如何多索引目錄多線程查詢,敬請期待吧!
如果你還有什麼問題請加我Q-q:7-3-6-0-3-1-3-0-5,
或者加裙
一起交流學習!
轉載:http://iamyida.iteye.com/blog/2196855