帶着疑問學源碼,第三篇:Elasticsearch 更新性能
代碼分析基于:https://github.com/jiankunking/elasticsearch
Elasticsearch 7.10.2+
目的
在看源碼之前先梳理一下,自己對于更新疑惑的點:
為什麼Elasticsearch更新與寫入的性能會有比較大的差異?
源碼分析
建議先看一下:【Elasticsearch源碼】 寫入分析
在【Elasticsearch源碼】 寫入分析中可以看到bulk請求最終在TransportShardBulkAction doRun()中執行的時候,還是通過一個循環,一個一個處理的,并沒有什麼神奇之處。
下面看一下具體執行的代碼executeBulkItemRequest doRun():
/**
* Executes bulk item requests and handles request execution exceptions.
* @return {@code true} if request completed on this thread and the listener was invoked, {@code false} if the request triggered
* a mapping update that will finish and invoke the listener on a different thread
*/
static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<Void> itemDoneListener) throws Exception {
final DocWriteRequest.OpType opType = context.getCurrent().opType();
final UpdateHelper.Result updateResult;
if (opType == DocWriteRequest.OpType.UPDATE) {
final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
try {
//
updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
final Engine.Result result =
new Engine.IndexResult(failure, updateRequest.version());
context.setRequestToExecute(updateRequest);
context.markOperationAsExecuted(result);
context.markAsCompleted(context.getExecutionResult());
return true;
}
// execute translated update request
switch (updateResult.getResponseResult()) {
case CREATED:
case UPDATED:
IndexRequest indexRequest = updateResult.action();
IndexMetadata metadata = context.getPrimary().indexSettings().getIndexMetadata();
MappingMetadata mappingMd = metadata.mapping();
indexRequest.process(metadata.getCreationVersion(), mappingMd, updateRequest.concreteIndex());
context.setRequestToExecute(indexRequest);
break;
case DELETED:
context.setRequestToExecute(updateResult.action());
break;
case NOOP:
context.markOperationAsNoOp(updateResult.action());
context.markAsCompleted(context.getExecutionResult());
return true;
default:
throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());
}
} else {
context.setRequestToExecute(context.getCurrent());
updateResult = null;
}
assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state
final IndexShard primary = context.getPrimary();
final long version = context.getRequestToExecute().version();
final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;
final Engine.Result result;
if (isDelete) {
final DeleteRequest request = context.getRequestToExecute();
result = primary.applyDeleteOperationOnPrimary(version, request.id(), request.versionType(),
request.ifSeqNo(), request.ifPrimaryTerm());
} else {
final IndexRequest request = context.getRequestToExecute();
result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
request.index(), request.id(), request.source(), request.getContentType(), request.routing()),
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
try {
primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME,
new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS),
MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT);
} catch (Exception e) {
logger.info(() -> new ParameterizedMessage("{} mapping update rejected by primary", primary.shardId()), e);
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
return true;
}
mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
new ActionListener<>() {
@Override
public void onResponse(Void v) {
context.markAsRequiringMappingUpdate();
waitForMappingUpdate.accept(
ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(Void v) {
assert context.requiresWaitingForMappingUpdate();
context.resetForExecutionForRetry();
}
@Override
public void onFailure(Exception e) {
context.failOnMappingUpdate(e);
}
}, () -> itemDoneListener.onResponse(null))
);
}
@Override
public void onFailure(Exception e) {
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
// Requesting mapping update failed, so we don't have to wait for a cluster state update
assert context.isInitial();
itemDoneListener.onResponse(null);
}
});
return false;
} else {
onComplete(result, context, updateResult);
}
return true;
}
/**
* Prepares an update request by converting it into an index or delete request or an update response (no action).
*/
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {
// 這裡是實時擷取
// 擷取結果最終會到InternalEngine
// get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper)
// 後面會附上 代碼
final GetResult getResult = indexShard.getService().getForUpdate(
request.id(), request.ifSeqNo(), request.ifPrimaryTerm());
return prepare(indexShard.shardId(), request, getResult, nowInMillis);
}
public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) {
// realtime是true
return get(id, new String[]{RoutingFieldMapper.NAME}, true,
Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE);
}
private GetResult get(String id, String[] gFields, boolean realtime, long version, VersionType versionType,
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
currentMetric.inc();
try {
long now = System.nanoTime();
GetResult getResult =
innerGet(id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext);
if (getResult.isExists()) {
existsMetric.inc(System.nanoTime() - now);
} else {
missingMetric.inc(System.nanoTime() - now);
}
return getResult;
} finally {
currentMetric.dec();
}
}
private GetResult innerGet(String id, String[] gFields, boolean realtime, long version, VersionType versionType,
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
Engine.GetResult get = indexShard.get(new Engine.Get(realtime, realtime, id)
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";
if (get.exists() == false) {
get.close();
}
if (get == null || get.exists() == false) {
return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
}
try {
// break between having loaded it from translog (so we only have _source), and having a document to load
return innerGetLoadFromStoredFields(id, gFields, fetchSourceContext, get, mapperService);
} finally {
get.close();
}
}
public Engine.GetResult get(Engine.Get get) {
readAllowed();
DocumentMapper mapper = mapperService.documentMapper();
if (mapper == null) {
return GetResult.NOT_EXISTS;
}
return getEngine().get(get, mapper, this::wrapSearcher);
}
/**
* Prepares an update request by converting it into an index or delete request or an update response (no action, in the event of a
* noop).
*/
protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) {
if (getResult.isExists() == false) {
// If the document didn't exist, execute the update request as an upsert
return prepareUpsert(shardId, request, getResult, nowInMillis);
} else if (getResult.internalSourceRef() == null) {
// no source, we can't do anything, throw a failure...
throw new DocumentSourceMissingException(shardId, request.id());
} else if (request.script() == null && request.doc() != null) {
// The request has no script, it is a new doc that should be merged with the old document
return prepareUpdateIndexRequest(shardId, request, getResult, request.detectNoop());
} else {
// The request has a script (or empty script), execute the script and prepare a new index request
return prepareUpdateScriptRequest(shardId, request, getResult, nowInMillis);
}
}
其中,prepare在org/elasticsearch/action/update/UpdateHelper.java 中。
從代碼中可以看到更新邏輯分兩步:
- 擷取待更新文檔的資料
- 執行更新文檔的操作
第1步最終會調用InternalEngine中的get方法。代碼如下:
@Override
public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
// 是否實時擷取
if (get.realtime()) {
final VersionValue versionValue;
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
}
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
throw new VersionConflictEngineException(shardId, get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term
)) {
throw new VersionConflictEngineException(shardId, get.id(),
get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
}
// 是否從Translog擷取
if (get.isReadFromTranslog()) {
// this is only used for updates - API _GET calls will always read form a reader for consistency
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
throw new EngineException(shardId, "failed to read operation from translog", e);
}
} else {
trackTranslogLocation.set(true);
}
}
assert versionValue.seqNo >= 0 : versionValue;
refreshIfNeeded("realtime_get", versionValue.seqNo);
}
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
}
}
}
總結
update操作需要先擷取原始文檔,如果查詢不到,會新增;如果存在,會根據原始文檔更新。
雖然更新操作最終調用的方法也是InternalEngine中的index,但在更新時調用lucene softUpdateDocuments,會包含兩個操作:标記删除、新增。
- 多了一次完整的查詢(為了保證一緻性,update調用GET時将realtime選項設定為true,并且不可配置。是以update操作可能會導緻refresh生成新的Lucene分段。)
- 多了一個标記删除