天天看點

深入剖析SolrCloud(四)

 在上一篇中介紹了連接配接Zookeeper叢集的方法,這一篇将圍繞一個有趣的話題---來展開,這就是Replication(索引複制),關于Solr Replication的詳細介紹,可以參考http://wiki.apache.org/solr/SolrReplication。

         在開始這個話題之前,先從我最近在應用中引入solr的master/slave架構時,遇到的一個讓我困擾的實際問題。

應用場景簡單描述如下:

1)首先master節點下載下傳索引分片,然後建立配置檔案,加入master節點的replication配置片段,再對索引分片進行合并(關于mergeIndex,可以參考http://wiki.apache.org/solr/MergingSolrIndexes),然後利用上述配置檔案和索引資料去建立一個solr核。

2)slave節點建立配置檔案,加入slave節點的replication配置片段,建立一個空的solr核,等待從master節點進行索引資料同步

出現的問題:slave節點沒有從master節點同步到資料。

問題分析:

1)首先檢查master節點,擷取最新的可複制索引的版本号,

http://master_host:port/solr/replication?command=indexversion

發現傳回的索引版本号是0,這說明mater節點根本沒有觸發replication動作,

2)為了确認上述判斷,在slave節點上進一步檢視replication的詳細資訊

http://slave_host:port/solr/replication?command=details

發現确實如此,盡管master節點的索引版本号和slave節點的索引版本号不一緻,但索引卻沒有同步過來,再分别檢視master節點和slave節點的日志,發現索引複制動作确實沒有開始。

綜上所述,确實是master節點沒有觸發索引複制動作,那究竟是為何呢?先将原因擺出來,後面會通過源碼的分析來加以說明。

原因:solr合并索引時,不管你是通過mergeindexes的http指令,還是調用底層lucene的IndexWriter,記得最後一定要送出一個commit,否則,不僅索引不僅不會對查詢可見,更是對于master/slave架構的solr叢集來說,master節點的replication動作不會觸發,因為indexversion沒有感覺到變化。

         好了,下面開始對Solr的Replication的分析。

         Solr容器在加載solr核的時候,會對已經注冊的各個實作SolrCoreAware接口的Handler進行回調,調用其inform方法。

         對于ReplicationHandler來說,就是在這裡對自己是屬于master節點還是slave節點進行判斷,若是slave節點,則建立一個SnapPuller對象,定時負責從master節點主動拉索引資料下來;若是master節點,則隻設定相應的參數。

複制代碼

  public void inform(SolrCore core) {

    this.core = core;

    registerFileStreamResponseWriter();

    registerCloseHook();

    NamedList slave = (NamedList) initArgs.get("slave");

    boolean enableSlave = isEnabled( slave );

    if (enableSlave) {

      tempSnapPuller = snapPuller = new SnapPuller(slave, this, core);

      isSlave = true;

    }

    NamedList master = (NamedList) initArgs.get("master");

    boolean enableMaster = isEnabled( master );

    if (!enableSlave && !enableMaster) {

      enableMaster = true;

      master = new NamedList<Object>();

    if (enableMaster) {

      includeConfFiles = (String) master.get(CONF_FILES);

      if (includeConfFiles != null && includeConfFiles.trim().length() > 0) {

        List<String> files = Arrays.asList(includeConfFiles.split(","));

        for (String file : files) {

          if (file.trim().length() == 0) continue;

          String[] strs = file.split(":");

          // if there is an alias add it or it is null

          confFileNameAlias.add(strs[0], strs.length > 1 ? strs[1] : null);

        }

        LOG.info("Replication enabled for following config files: " + includeConfFiles);

      }

      List backup = master.getAll("backupAfter");

      boolean backupOnCommit = backup.contains("commit");

      boolean backupOnOptimize = !backupOnCommit && backup.contains("optimize");

      List replicateAfter = master.getAll(REPLICATE_AFTER);

      replicateOnCommit = replicateAfter.contains("commit");

      replicateOnOptimize = !replicateOnCommit && replicateAfter.contains("optimize");

      if (!replicateOnCommit && ! replicateOnOptimize) {

        replicateOnCommit = true;

      // if we only want to replicate on optimize, we need the deletion policy to

      // save the last optimized commit point.

      if (replicateOnOptimize) {

        IndexDeletionPolicyWrapper wrapper = core.getDeletionPolicy();

        IndexDeletionPolicy policy = wrapper == null ? null : wrapper.getWrappedDeletionPolicy();

        if (policy instanceof SolrDeletionPolicy) {

          SolrDeletionPolicy solrPolicy = (SolrDeletionPolicy)policy;

          if (solrPolicy.getMaxOptimizedCommitsToKeep() < 1) {

            solrPolicy.setMaxOptimizedCommitsToKeep(1);

          }

        } else {

          LOG.warn("Replication can't call setMaxOptimizedCommitsToKeep on " + policy);

      if (replicateOnOptimize || backupOnOptimize) {

        core.getUpdateHandler().registerOptimizeCallback(getEventListener(backupOnOptimize, replicateOnOptimize));

      if (replicateOnCommit || backupOnCommit) {

        core.getUpdateHandler().registerCommitCallback(getEventListener(backupOnCommit, replicateOnCommit));

      if (replicateAfter.contains("startup")) {

        replicateOnStart = true;

        RefCounted<SolrIndexSearcher> s = core.getNewestSearcher(false);

        try {

          DirectoryReader reader = s==null ? null : s.get().getIndexReader();

          if (reader!=null && reader.getIndexCommit() != null && reader.getIndexCommit().getGeneration() != 1L) {

            try {

              if(replicateOnOptimize){

                Collection<IndexCommit> commits = DirectoryReader.listCommits(reader.directory());

                for (IndexCommit ic : commits) {

                  if(ic.getSegmentCount() == 1){

                    if(indexCommitPoint == null || indexCommitPoint.getGeneration() < ic.getGeneration()) indexCommitPoint = ic;

                  }

                }

              } else{

                indexCommitPoint = reader.getIndexCommit();

              }

            } finally {

              // We don't need to save commit points for replication, the SolrDeletionPolicy

              // always saves the last commit point (and the last optimized commit point, if needed)

              /***

              if(indexCommitPoint != null){

                core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getGeneration());

              ***/

            }

          // reboot the writer on the new index

          core.getUpdateHandler().newIndexWriter();

        } catch (IOException e) {

          LOG.warn("Unable to get IndexCommit on startup", e);

        } finally {

          if (s!=null) s.decref();

      String reserve = (String) master.get(RESERVE);

      if (reserve != null && !reserve.trim().equals("")) {

        reserveCommitDuration = SnapPuller.readInterval(reserve);

      LOG.info("Commits will be reserved for  " + reserveCommitDuration);

      isMaster = true;

  ReplicationHandler可以響應多種指令:

1)       indexversion。

這裡需要了解的第一個概念是索引送出點(IndexCommit),這是底層lucene的東西,可以自行查閱資料。首先擷取最新的索引送出點,然後從其中擷取索引版本号和索引所屬代。

      IndexCommit commitPoint = indexCommitPoint;  // make a copy so it won't change

      if (commitPoint != null && replicationEnabled.get()) {

        core.getDeletionPolicy().setReserveDuration(commitPoint.getVersion(), reserveCommitDuration);

        rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());

  rsp.add(GENERATION, commitPoint.getGeneration());  

    2)backup。這個指令用來對索引做快照。首先擷取最新的索引送出點,然後建立做一個SnapShooter,具體的快照動作由這個對象完成,

   private void doSnapShoot(SolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) { 

    try {

      int numberToKeep = params.getInt(NUMBER_BACKUPS_TO_KEEP, Integer.MAX_VALUE);

      IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy();

      IndexCommit indexCommit = delPolicy.getLatestCommit();

      if(indexCommit == null) {

        indexCommit = req.getSearcher().getReader().getIndexCommit();

      // small race here before the commit point is saved

      new SnapShooter(core, params.get("location")).createSnapAsync(indexCommit, numberToKeep, this);

    } catch (Exception e) {

      LOG.warn("Exception during creating a snapshot", e);

      rsp.add("exception", e);

  }

快照對象會啟動一個線程去異步地做一個索引備份。

void createSnapAsync(final IndexCommit indexCommit, final int numberToKeep, final ReplicationHandler replicationHandler) {

    replicationHandler.core.getDeletionPolicy().saveCommitPoint(indexCommit.getVersion());

    new Thread() {

      @Override

      public void run() {

        createSnapshot(indexCommit, numberToKeep, replicationHandler);

    }.start();

 }

 void createSnapshot(final IndexCommit indexCommit, int numberToKeep, ReplicationHandler replicationHandler) {

    NamedList details = new NamedList();

    details.add("startTime", new Date().toString());

    File snapShotDir = null;

    String directoryName = null;

    Lock lock = null;

      if(numberToKeep<Integer.MAX_VALUE) {

        deleteOldBackups(numberToKeep);

      SimpleDateFormat fmt = new SimpleDateFormat(DATE_FMT, Locale.US);

      directoryName = "snapshot." + fmt.format(new Date());

      lock = lockFactory.makeLock(directoryName + ".lock");

      if (lock.isLocked()) return;

      snapShotDir = new File(snapDir, directoryName);

      if (!snapShotDir.mkdir()) {

        LOG.warn("Unable to create snapshot directory: " + snapShotDir.getAbsolutePath());

        return;

      Collection<String> files = indexCommit.getFileNames();

      FileCopier fileCopier = new FileCopier(solrCore.getDeletionPolicy(), indexCommit);

      fileCopier.copyFiles(files, snapShotDir);

      details.add("fileCount", files.size());

      details.add("status", "success");

      details.add("snapshotCompletedAt", new Date().toString());

      SnapPuller.delTree(snapShotDir);

      LOG.error("Exception while creating snapshot", e);

      details.add("snapShootException", e.getMessage());

    } finally {

      replicationHandler.core.getDeletionPolicy().releaseCommitPoint(indexCommit.getVersion());  

      replicationHandler.snapShootDetails = details;

      if (lock != null) {

          lock.release();

          LOG.error("Unable to release snapshoot lock: " + directoryName + ".lock");

3)fetchindex。響應來自slave節點的取索引檔案的請求,會啟動一個線程來實作索引檔案的擷取。

      String masterUrl = solrParams.get(MASTER_URL);

      if (!isSlave && masterUrl == null) {

        rsp.add(STATUS,ERR_STATUS);

        rsp.add("message","No slave configured or no 'masterUrl' Specified");

      final SolrParams paramsCopy = new ModifiableSolrParams(solrParams);

      new Thread() {

        @Override

        public void run() {

          doFetch(paramsCopy);

      }.start();

      rsp.add(STATUS, OK_STATUS);

具體的擷取動作是通過SnapPuller對象來實作的,首先嘗試擷取pull對象鎖,如果請求鎖失敗,則說明還有取索引資料動作未結束,如果請求鎖成功,就調用SnapPuller對象的fetchLatestIndex方法來取最新的索引資料。

 void doFetch(SolrParams solrParams) {

    String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);

    if (!snapPullLock.tryLock())

      return;

      tempSnapPuller = snapPuller;

      if (masterUrl != null) {

        NamedList<Object> nl = solrParams.toNamedList();

        nl.remove(SnapPuller.POLL_INTERVAL);

        tempSnapPuller = new SnapPuller(nl, this, core);

      tempSnapPuller.fetchLatestIndex(core);

      LOG.error("SnapPull failed ", e);

      snapPullLock.unlock();

最後真正的取索引資料過程,首先,若mastet節點的indexversion為0,則說明master節點根本沒有提供可供複制的索引資料,若master節點和slave節點的indexversion相同,則說明slave節點目前與master節點索引資料狀态保持一緻,無需同步。若兩者的indexversion不同,則開始索引複制過程,首先從master節點上下載下傳指定索引版本号的索引檔案清單,然後建立一個索引檔案同步服務線程來完成同并工作。

這裡需要區分的是,如果master節點的年代比slave節點要老,那就說明兩者已經不相容,此時slave節點需要建立一個索引目錄,再從master節點做一次全量索引複制。還需要注意的一點是,索引同步也是可以同步配置檔案的,若配置檔案發生變化,則需要對solr核進行一次reload操作。最對了,還有,和文章開頭一樣, slave節點同步完資料後,别忘了做一次commit操作,以便重新整理自己的索引送出點到最新的狀态。最後,關閉并等待同步服務線程結束。此外,具體的取索引檔案是通過FileFetcher對象來完成。

 boolean fetchLatestIndex(SolrCore core) throws IOException {

    replicationStartTime = System.currentTimeMillis();

      //get the current 'replicateable' index version in the master

      NamedList response = null;

      try {

        response = getLatestVersion();

      } catch (Exception e) {

        LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());

        return false;

      long latestVersion = (Long) response.get(CMD_INDEX_VERSION);

      long latestGeneration = (Long) response.get(GENERATION);

      if (latestVersion == 0L) {

        //there is nothing to be replicated

      IndexCommit commit;

      RefCounted<SolrIndexSearcher> searcherRefCounted = null;

        searcherRefCounted = core.getNewestSearcher(false);

        commit = searcherRefCounted.get().getReader().getIndexCommit();

      } finally {

        if (searcherRefCounted != null)

          searcherRefCounted.decref();

      if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {

        //master and slave are alsready in sync just return

        LOG.info("Slave in sync with master.");

      LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration);

      LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration());

      LOG.info("Starting replication process");

      // get the list of files first

      fetchFileList(latestVersion);

      // this can happen if the commit point is deleted before we fetch the file list.

      if(filesToDownload.isEmpty()) return false;

      LOG.info("Number of files in latest index in master: " + filesToDownload.size());

      // Create the sync service

      fsyncService = Executors.newSingleThreadExecutor();

      // use a synchronized list because the list is read by other threads (to show details)

      filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());

      // if the generateion of master is older than that of the slave , it means they are not compatible to be copied

      // then a new index direcory to be created and all the files need to be copied

      boolean isFullCopyNeeded = commit.getGeneration() >= latestGeneration;

      File tmpIndexDir = createTempindexDir(core);

      if (isIndexStale())

        isFullCopyNeeded = true;

      successfulInstall = false;

      boolean deleteTmpIdxDir = true;

      File indexDir = null ;

        indexDir = new File(core.getIndexDir());

        downloadIndexFiles(isFullCopyNeeded, tmpIndexDir, latestVersion);

        LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");

        Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);

        if (!modifiedConfFiles.isEmpty()) {

          downloadConfFiles(confFilesToDownload, latestVersion);

          if (isFullCopyNeeded) {

            successfulInstall = modifyIndexProps(tmpIndexDir.getName());

            deleteTmpIdxDir = false;

          } else {

            successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);

          if (successfulInstall) {

            LOG.info("Configuration files are modified, core will be reloaded");

            logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);//write to a file time of replication and conf files.

            reloadCore();

          terminateAndWaitFsyncService();

            logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);

            doCommit();

        replicationStartTime = 0;

        return successfulInstall;

      } catch (ReplicationHandlerException e) {

        LOG.error("User aborted Replication");

      } catch (SolrException e) {

        throw e;

        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);

        if (deleteTmpIdxDir) delTree(tmpIndexDir);

        else delTree(indexDir);

      return successfulInstall;

      if (!successfulInstall) {

        logReplicationTimeAndConfFiles(null, successfulInstall);

      filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;

      replicationStartTime = 0;

      fileFetcher = null;

      if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdownNow();

      fsyncService = null;

      stop = false;

      fsyncException = null;

 } 

本文轉自Phinecos(洞庭散人)部落格園部落格,原文連結:http://www.cnblogs.com/phinecos/archive/2012/02/29/2372682.html,如需轉載請自行聯系原作者,如需轉載請自行聯系原作者

繼續閱讀