天天看點

zookeeper源碼分析(6)-資料和存儲

在Zookeeper中,資料存儲分為兩部分:記憶體資料存儲和磁盤資料存儲。本文主要分析

伺服器啟動時記憶體資料庫的初始化過程

主從伺服器資料同步的過程

。在此之前介紹一些資料存儲涉及的基本類。

DataTree

Zookeeper的資料模型是一棵樹,DataTree是記憶體資料存儲的核心,代表了記憶體中一份完整的資料(最新),包括所有的節點路徑,節點資料和ACL資訊,對應watches等。類的主要屬性為:

//節點路徑為key,節點資料内容DataNode為value.實時存儲了所有的zk節點,使用ConcurrentHashMap保證并發性
    private final ConcurrentHashMap<String, DataNode> nodes =new ConcurrentHashMap<String, DataNode>();

//節點資料對應的watch
    private final WatchManager dataWatches = new WatchManager();

//節點路徑對應的watch
    private final WatchManager childWatches = new WatchManager();

//key為sessionId,value為該會話對應的臨時節點路徑,友善實時通路和清理
    private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();

//This set contains the paths of all container nodes
    private final Set<String> containers =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

//This set contains the paths of all ttl nodes
    private final Set<String> ttls =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
//記憶體資料庫的最大zxid
public volatile long lastProcessedZxid = 0;           

複制

DataNode

資料存儲的最小單元,包含節點的資料内容,節點狀态,子節點清單,以及對子節點的操作接口等,主要屬性為:

//節點内容
    byte data[];
    Long acl;
    //節點狀态,包括一些節點的中繼資料,如ephemeralOwner,czxid等
    public StatPersisted stat;
//子節點相對父節點路徑集合,不包括父節點路徑
    private Set<String> children = null;           

複制

ZKDatabase

Zookeeper的記憶體資料庫,負責管理Zookeeper的所有會話,DataTree存儲和事務日志。它會定時向磁盤dump快照資料(snapCount主要控制),伺服器啟動時,會通過磁盤上的事務日志和快照資料檔案恢複成完整的記憶體資料庫。主要屬性為:

protected DataTree dataTree;
//key為sessionId,value為會話過期時間
    protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
//用于和磁盤互動事務日志檔案和快照檔案的類
    protected FileTxnSnapLog snapLog;
//主從資料同步時使用
    protected long minCommittedLog, maxCommittedLog;
    public static final int commitLogCount = 500;
    protected static int commitLogBuffer = 700;
//todo
    protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();           

複制

檔案存儲主要包括事務日志檔案的存儲和快照檔案的存儲,分别與FileTxnLog和FileSnap類有關。

FileTxnLog

實作了TxnLog接口,提供了API可以擷取日志和寫入日志,首先先看一下事務日志檔案的格式

LogFile:
//一個日志檔案由以下三個部分組成
 *     FileHeader TxnList ZeroPad
//1.檔案頭
 * FileHeader: {
 *     magic 4bytes (ZKLG)   
 *     version 4bytes
 *     dbid 8bytes
 *   }
 //事務内容
 * TxnList:
 *     Txn || Txn TxnList

 * Txn:
//一條事務日志的組成部分
 *     checksum Txnlen TxnHeader Record 0x42

 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 *
 * Txnlen:
 *     len 4bytes
 *
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *
 * Record:
 *     See Jute definition file for details on the various record types
 *
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)           

複制

主要分析下

寫入日志

日志截斷

的過程

寫入日志

public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr == null) {
            return false;
        }
//lastZxidSeen:最大(新)的zxid
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("Current zxid " + hdr.getZxid()
                    + " is <= " + lastZxidSeen + " for "
                    + hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
//如果沒有事務日志可寫,需要關聯一個新的檔案流,寫入日志檔案頭資訊FileHeader,并馬上強制刷盤
        if (logStream==null) {
           if(LOG.isInfoEnabled()){
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }

           logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
           fos = new FileOutputStream(logFileWrite);
           logStream=new BufferedOutputStream(fos);
           oa = BinaryOutputArchive.getArchive(logStream);
           FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
           fhdr.serialize(oa, "fileheader");
           // Make sure that the magic number is written before padding.
           logStream.flush();
           filePadding.setCurrentSize(fos.getChannel().position());
           streamsToFlush.add(fos);
        }
//确定事務日志檔案是否需要擴容(預配置設定)
        filePadding.padFile(fos.getChannel());
//事務序列化
        byte[] buf = Util.marshallTxnEntry(hdr, txn);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " +
                    "and txn");
        }
//生成Checksum
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");
//寫入事務日志檔案流
        Util.writeTxnBytes(oa, buf);

        return true;
    }           

複制

主要流程為:

1.确定是否有事務日志可寫

當zookeeper伺服器啟動完成時需要進行第一次事務日志的寫入,或是上一個事務日志寫滿時,都會處于與事務日志檔案斷開的狀态。當

logStream==null

時需要關聯一個新的檔案流,寫入日志檔案頭資訊FileHeader,并馬上強制刷盤。

2.确定事務日志檔案是否需要擴容(預配置設定)

long padFile(FileChannel fileChannel) throws IOException {
//currentSize:目前檔案的大小位置
//preAllocSize:預設64MB
        long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
        if (currentSize != newFileSize) {
            fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
            currentSize = newFileSize;
        }
        return currentSize;
    }
//判斷是否需要擴容
public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
        // If preAllocSize is positive and we are within 4KB of the known end of the file calculate a new file size
        if (preAllocSize > 0 && position + 4096 >= fileSize) {
            // If we have written more than we have previously preallocated we need to make sure the new
            // file size is larger than what we already have
            if (position > fileSize) {
                fileSize = position + preAllocSize;
                fileSize -= fileSize % preAllocSize;
            } else {
                fileSize += preAllocSize;
            }
        }

        return fileSize;
    }           

複制

calculateFileSizeWithPadding

中可以看出,當寫入資料量超過4KB的時候便會将檔案大小

currentSize

擴容到

preAllocSize

,預設為64MB,并将未寫入部分填充0,好處是避免開辟新的磁盤塊,減少磁盤Seek

3.事務序列化

分别對事物頭(TxnHeader)和事務體(Record)序列化,參考zookeeper源碼分析(5)-序列化和協定

4.生成Checksum

可校驗事務日志檔案的完整性和資料準确性

5.寫入事務日志檔案流

将事物頭,事務體和Checksum寫入檔案流中,由于使用的輸出流是BufferedOutputStream,會先放到緩沖區中,不會真正寫入

日志截斷

在主從同步時,如果learner伺服器的事務ID大于leader伺服器的事務ID,将會要求learner伺服器丢棄掉比leader伺服器的事務ID大的事務日志。

FileTxnIterator

是可以指定zxid的事務日志疊代器,也就是說如果需要從zxid=11的位置開始建立一個疊代器,那麼該台伺服器上面在zxid=11之後的日志都會儲存在該疊代器中。其主要屬性為:

public static class FileTxnIterator implements TxnLog.TxnIterator {
//事務日志的目錄
        File logDir;
//需要從該事務ID處獲得疊代器
        long zxid;
//zxid所在事務檔案的檔案頭
        TxnHeader hdr;
//目前正在疊代的事務日志
        Record record;
//zxid所在的事務日志檔案
        File logFile;
//輸入流
        InputArchive ia;
        static final String CRC_ERROR="CRC check failed";
//輸入流,可讀取到zxid的位置
        PositionInputStream inputStream=null;
//比zxid所在事務日志檔案大的事務檔案集合
        private ArrayList<File> storedFiles;
··········省略代碼·······
}           

複制

public boolean truncate(long zxid) throws IOException {
        FileTxnIterator itr = null;
        try {
            itr = new FileTxnIterator(this.logDir, zxid);
            PositionInputStream input = itr.inputStream;
            if(input == null) {
                throw new IOException("No log files found to truncate! This could " +
                        "happen if you still have snapshots from an old setup or " +
                        "log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
            }
            long pos = input.getPosition();
            // now, truncate at the current position
            RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
            raf.setLength(pos);
            raf.close();
            while(itr.goToNextLog()) {
                if (!itr.logFile.delete()) {
                    LOG.warn("Unable to truncate {}", itr.logFile);
                }
            }
        } finally {
            close(itr);
        }
        return true;
    }           

複制

從代碼中可以看出,截斷的邏輯就是删掉zxid所在事務檔案中比zxid大的事務日志,以及所有比該事務檔案大的事務檔案。

FileSnap

資料快照是用來記錄zookeeper伺服器在某一時刻的全量記憶體資料,并将其寫入到指定位置磁盤上。存儲内容包括DataTree資訊和會話資訊。FileSnap提供了快照相應的接口,,主要包括存儲、序列化、反序列化、通路相應快照檔案。

FileTxnSnapLog

封裝了TxnLog和SnapShot,提供了從磁盤中恢複記憶體資料庫的

restore

方法和儲存快照的

save

方法,主要屬性

//the directory containing the
    //the transaction logs
    private final File dataDir;
    //the directory containing the snapshot directory
    private final File snapDir;
    private TxnLog txnLog;
    private SnapShot snapLog;
    // 版本号
    public final static int VERSION = 2;
    // 版本
    public final static String version = "version-";           

複制

首先看下儲存快照的

save

方法

//syncSnap: sync the snapshot immediately after write
public void save(DataTree dataTree,
                     ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
                     boolean syncSnap)
        throws IOException {
        long lastZxid = dataTree.lastProcessedZxid;
        File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
        LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
                snapshotFile);
        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);

    }           

複制

主要流程就是根據目前dataTree的最新事務id生成快照檔案名,然後将dataTree的内容和sessionsWithTimeouts(會話資訊)序列化,存到指定磁盤位置。

伺服器啟動期間的資料初始化

就是磁盤中最新快照檔案(全量資料)和它之後的事務日志資料(增量資料)的反序列化到記憶體資料庫中的過程,流程圖為:

zookeeper源碼分析(6)-資料和存儲

回到zookeeper源碼分析(1)-服務端啟動流程,在伺服器啟動時,需要先

初始化FileTxnSnapLog

初始化 ZKDatabase

1.初始化FileTxnSnapLog

public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
        LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);

        this.dataDir = new File(dataDir, version + VERSION);
        this.snapDir = new File(snapDir, version + VERSION);

        // by default create snap/log dirs, but otherwise complain instead
        // See ZOOKEEPER-1161 for more details
        boolean enableAutocreate = Boolean.valueOf(
                System.getProperty(ZOOKEEPER_DATADIR_AUTOCREATE,
                        ZOOKEEPER_DATADIR_AUTOCREATE_DEFAULT));

        if (!this.dataDir.exists()) {
            if (!enableAutocreate) {
                throw new DatadirException("Missing data directory "
                        + this.dataDir
                        + ", automatic data directory creation is disabled ("
                        + ZOOKEEPER_DATADIR_AUTOCREATE
                        + " is false). Please create this directory manually.");
            }

            if (!this.dataDir.mkdirs()) {
                throw new DatadirException("Unable to create data directory "
                        + this.dataDir);
            }
        }
        if (!this.dataDir.canWrite()) {
            throw new DatadirException("Cannot write to data directory " + this.dataDir);
        }

        if (!this.snapDir.exists()) {
            // by default create this directory, but otherwise complain instead
            // See ZOOKEEPER-1161 for more details
            if (!enableAutocreate) {
                throw new DatadirException("Missing snap directory "
                        + this.snapDir
                        + ", automatic data directory creation is disabled ("
                        + ZOOKEEPER_DATADIR_AUTOCREATE
                        + " is false). Please create this directory manually.");
            }

            if (!this.snapDir.mkdirs()) {
                throw new DatadirException("Unable to create snap directory "
                        + this.snapDir);
            }
        }
        if (!this.snapDir.canWrite()) {
            throw new DatadirException("Cannot write to snap directory " + this.snapDir);
        }

        // check content of transaction log and snapshot dirs if they are two different directories
        // See ZOOKEEPER-2967 for more details
        if(!this.dataDir.getPath().equals(this.snapDir.getPath())){
//用來檢查當dataDir和snapDir不同時,dataDir是否包含了快照檔案,snapDir是否包含了事務日志檔案
            checkLogDir();
            checkSnapDir();
        }

        txnLog = new FileTxnLog(this.dataDir);
        snapLog = new FileSnap(this.snapDir);

        autoCreateDB = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DB_AUTOCREATE,
                ZOOKEEPER_DB_AUTOCREATE_DEFAULT));
    }           

複制

可以看到會在傳入的datadir和snapdir目錄下新生成version-2的目錄,并且會判斷目錄是否建立成功,之後會建立txnLog和snapLog。

2.初始化 ZKDatabase

public ZKDatabase(FileTxnSnapLog snapLog) {
        dataTree = new DataTree();
        sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
        this.snapLog = snapLog;
·······
}           

複制

可以看到主要初始化了DataTree和sessionsWithTimeouts,前者會在zookeeper建立一些配置跟節點,如/,/zookeeper,/zookeeper/quota等節點,與zookeeper自身伺服器相關的節點。

之後調用資料初始化的方法為

ZooKeeperServer.loadData

public void loadData() throws IOException, InterruptedException {
//如果是leader伺服器,會在lead方法中再次調用該方法,此時zkDb.isInitialized()=true,僅做快照存儲的工作
        if(zkDb.isInitialized()){
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        }
        else {
//第一次初始化
            setZxid(zkDb.loadDataBase());
        }
·········會話過期清理的代碼···········
        // Make a clean snapshot
        takeSnapshot();
    }

    public void takeSnapshot() {
        takeSnapshot(false);
    }

    public void takeSnapshot(boolean syncSnap){
       txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
·········省略異常檢查···········
     }           

複制

第一次初始化的時候會調用

zkDb.loadDataBase()

,該方法最終會傳回記憶體資料庫最新的事務id

public long loadDataBase() throws IOException {
        long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
        initialized = true;
        return zxid;
    }           

複制

也就是調用

FileTxnSnapLog.restore

,首先介紹下FileTxnSnapLog的内部類

PlayBackListener

它是用來接收事務應用過程中的回調,在Zookeeper資料恢複後期,會有事務修正過程(增量資料的反序列化過程),此過程會回調PlayBackListener.onTxnLoaded來進行對應的資料修正。這裡傳入的是

commitProposalPlaybackListener

FileTxnSnapLog.restore

//方法參數中DataTree dt, Map<Long, Integer> sessions是要恢複記憶體資料庫的對象,其實就是ZKDatabase中的屬性
//PlayBackListener是用來修正事務日志時回調用的
    public long restore(DataTree dt, Map<Long, Integer> sessions,
                        PlayBackListener listener) throws IOException {
//解析快照資料
        long deserializeResult = snapLog.deserialize(dt, sessions);
        FileTxnLog txnLog = new FileTxnLog(dataDir);
        boolean trustEmptyDB;
        File initFile = new File(dataDir.getParent(), "initialize");
        if (Files.deleteIfExists(initFile.toPath())) {
            LOG.info("Initialize file found, an empty database will not block voting participation");
            trustEmptyDB = true;
        } else {
//
            trustEmptyDB = autoCreateDB;
        }
        if (-1L == deserializeResult) {
            /* this means that we couldn't find any snapshot, so we need to
             * initialize an empty database (reported in ZOOKEEPER-2325) */
            if (txnLog.getLastLoggedZxid() != -1) {
                throw new IOException(
                        "No snapshot found, but there are log entries. " +
                        "Something is broken!");
            }
//預設相信空磁盤資料,因為伺服器第一次啟動的時候資料一般為空
            if (trustEmptyDB) {
                /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
                 *       or use Map on save() */
                save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);

                /* return a zxid of 0, since we know the database is empty */
                return 0L;
            } else {
                /* return a zxid of -1, since we are possibly missing data */
                LOG.warn("Unexpected empty data tree, setting zxid to -1");
                dt.lastProcessedZxid = -1L;
                return -1L;
            }
        }
        return fastForwardFromEdits(dt, sessions, listener);
    }           

複制

3.解析快照資料

解析快照資料到datatree和sessions中,取出最新的100個快照資料,依次解析判斷快照檔案是否有資料且是可用的

snapLog.deserialize(dt, sessions)

,傳回快照檔案資料的最大ZXID

public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
        // we run through 100 snapshots (not all of them)
        // if we cannot get it running within 100 snapshots
        // we should  give up
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) {
            return -1L;
        }
        File snap = null;
        boolean foundValid = false;
        for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
            snap = snapList.get(i);
            LOG.info("Reading snapshot " + snap);
            try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
                 CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
                InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                deserialize(dt, sessions, ia);
                long checkSum = crcIn.getChecksum().getValue();
                long val = ia.readLong("val");
                if (val != checkSum) {
                    throw new IOException("CRC corruption in snapshot :  " + snap);
                }
                foundValid = true;
                break;
            } catch (IOException e) {
                LOG.warn("problem reading snap file " + snap, e);
            }
        }
        if (!foundValid) {
            throw new IOException("Not able to find valid snapshots in " + snapDir);
        }
        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        return dt.lastProcessedZxid;
    }           

複制

若傳回-1,說明不存在快照檔案:

如果事務日志檔案zxid也為-1,說明磁盤資料為空,則将空資料快照一下,傳回最大事務id,為0。否則,調用

fastForwardFromEdits

4.擷取最新的ZXID

public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
                                     PlayBackListener listener) throws IOException {

        TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
        long highestZxid = dt.lastProcessedZxid;
        TxnHeader hdr;
        try {
            while (true) {
                // iterator points to
                // the first valid txn when initialized
                hdr = itr.getHeader();
                if (hdr == null) {
                    //empty logs
                    return dt.lastProcessedZxid;
                }
                if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                    LOG.error("{}(highestZxid) > {}(next log) for type {}",
                            highestZxid, hdr.getZxid(), hdr.getType());
                } else {
                    highestZxid = hdr.getZxid();
                }
                try {
                    processTransaction(hdr,dt,sessions, itr.getTxn());
                } catch(KeeperException.NoNodeException e) {
                   throw new IOException("Failed to process transaction type: " +
                         hdr.getType() + " error: " + e.getMessage(), e);
                }
                listener.onTxnLoaded(hdr, itr.getTxn());
                if (!itr.next())
                    break;
            }
        } finally {
            if (itr != null) {
                itr.close();
            }
        }
        return highestZxid;
    }           

複制

首先基于目前

dt.lastProcessedZxid

+1擷取一個事務日志疊代器,這些事務日志是需要更新的增量資料。while循環一條條疊代這些事務日志,不斷的更新

highestZxid

,最終将其傳回。

5.應用事務

在循環過程中處理事務日志

processTransaction

,也就是根據事務日志類型不斷的更新sessions 和DataTree中的資料内容

6.回調事務

回調

listener.onTxnLoaded

,就是ZKDatabase中的commitProposalPlaybackListener

private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
        public void onTxnLoaded(TxnHeader hdr, Record txn){
            addCommittedProposal(hdr, txn);
        }
    };

private void addCommittedProposal(TxnHeader hdr, Record txn) {
        Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
        addCommittedProposal(r);
    }

    /**
     * maintains a list of last <i>committedLog</i>
     *  or so committed requests. This is used for
     * fast follower synchronization.
     * @param request committed request
     */
    public void addCommittedProposal(Request request) {
        WriteLock wl = logLock.writeLock();
        try {
            wl.lock();
            if (committedLog.size() > commitLogCount) {
                committedLog.removeFirst();
                minCommittedLog = committedLog.getFirst().packet.getZxid();
            }
            if (committedLog.isEmpty()) {
//
                minCommittedLog = request.zxid;
                maxCommittedLog = request.zxid;
            }

            byte[] data = SerializeUtils.serializeRequest(request);
            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
            Proposal p = new Proposal();
            p.packet = pp;
            p.request = request;
            committedLog.add(p);
            maxCommittedLog = p.packet.getZxid();
        } finally {
            wl.unlock();
        }
    }           

複制

主要邏輯在

addCommittedProposal

方法中,構造了一個

LinkedList<Proposal> committedLog

,用來存儲過來的每一條增量事務日志,

minCommittedLog

儲存的是第一條增量事務日志的zxid,

maxCommittedLog

儲存的是最後以條增量事務日志的zxid。這三個變量是用來主從做

快速同步判斷

用的。

7.epoch校驗

epoch辨別了目前leader的周期,每次選舉産生一個新的Leader伺服器之後,就會生成一個新的epoch。叢集間互相通信的過程中,都會帶上這個epoch以確定彼此在同一個Leader周期内。

對于leader伺服器,完成資料初始化時會将自己的

currentEpoch

和剛解析出來的

最大zxid

放到

leaderStateSummary

中,和主動連接配接的learner伺服器的

epoch

最大zxid

對比,必須保證leader伺服器的

leaderStateSummary

大于learner伺服器的

StateSummary

才能說明leader伺服器的資料是比learner伺服器新的,然後leader伺服器才可以開啟新一輪的epoch,進行資料同步的工作。

主從伺服器間的資料同步

大緻過程如圖:

zookeeper源碼分析(6)-資料和存儲

由zookeeper源碼分析(4)-選舉流程和伺服器啟動處理可知,當

LearnerHandler

接收到Learner伺服器的ACKEPOCH消息後會開始進行主從同步

Leader資料同步發送過程

LearnerHandler.run

public void run() {
           ····省略接收ACKEPOCH消息之前的互動過程···
            //learner zxid
            peerLastZxid = ss.getLastZxid();
           
            // Take any necessary action if we need to send TRUNC or DIFF
            // startForwarding() will be called in all cases
            //确定是否需要進行全量同步
            boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
            
            LOG.debug("Sending NEWLEADER message to " + sid);
            // the version of this quorumVerifier will be set by leader.lead() in case
            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
            // we got here, so the version was set
//發送NEWLEADER消息
            if (getVersion() < 0x10000) {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, null, null);
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
                                .toString().getBytes(), null);
                queuedPackets.add(newLeaderQP);
            }
//強刷,這裡對應的DIFF/TRUNC/DIFF+TRUNC方式的同步
            bufferedOutput.flush();

            /* if we are not truncating or sending a diff just send a snapshot */
            if (needSnap) {
//全量同步
                boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
                LearnerSnapshot snapshot = 
                        leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                try {
                    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                    bufferedOutput.flush();
                    // Dump data to peer
                    leader.zk.getZKDatabase().serializeSnapshot(oa);
                    oa.writeString("BenWasHere", "signature");
//強刷,這裡對應的SNAP方式的同步
            bufferedOutput.flush();
                    bufferedOutput.flush();
                } finally {
                    snapshot.close();
                }
            }

            // Start thread that blast packets in the queue to learner
            startSendingPackets();
            
        //等待learner伺服器的同步完成的ACK通知
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            if(qp.getType() != Leader.ACK){
                LOG.error("Next packet was supposed to be an ACK,"
                    + " but received packet: {}", packetToString(qp));
                return;
            }

            if(LOG.isDebugEnabled()){
                LOG.debug("Received NEWLEADER-ACK message from " + sid);   
            }
            leader.waitForNewLeaderAck(getSid(), qp.getZxid());
//同步時間檢測,不能超過tickTime*syncLimit
            syncLimitCheck.start();
            
            // now that the ack has been processed expect the syncLimit
            sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

            /*
             * Wait until leader starts up
             */
            synchronized(leader.zk){
                while(!leader.zk.isRunning() && !this.isInterrupted()){
                    leader.zk.wait(20);
                }
            }
            // Mutation packets will be queued during the serialize,
            // so we need to mark when the peer can actually start
            // using the data
            //
            LOG.debug("Sending UPTODATE message to " + sid);      
            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

          ···············同步完成,開始與learner伺服器的正常通信···········
    }           

複制

在伺服器資料初始化時候,我們提到記憶體資料庫

zkDatabase

會儲存最新快照之後的增量資料,

LinkedList<Proposal> committedLog:

用來存儲過來的每一條增量事務日志

minCommittedLog:

第一條增量事務日志的zxid

maxCommittedLog:

最後一條增量事務日志的zxid

Leader伺服器會根據learner伺服器的最大事務ID:

peerLastZxid

minCommittedLog

/

maxCommittedLog

之間的大小關系來最終确定是差異同步還是全量同步,主要邏輯在

syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader)

public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
//learner伺服器zxid是否為0
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
        // Keep track of the latest zxid which already queued
        long currentZxid = peerLastZxid;
        boolean needSnap = true;
//是否設定了快照大小參數,預設設定了,且snapshotSizeFactor=0.33
        boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
        ReentrantReadWriteLock lock = db.getLogLock();
        ReadLock rl = lock.readLock();
        try {
            rl.lock();
            long maxCommittedLog = db.getmaxCommittedLog();
            long minCommittedLog = db.getminCommittedLog();
            long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
            if (db.getCommittedLog().isEmpty()) {
                /*
                 * It is possible that committedLog is empty. In that case
                 * setting these value to the latest txn in leader db
                 * will reduce the case that we need to handle
                 *
                 * Here is how each case handle by the if block below
                 * 1. lastProcessZxid == peerZxid -> Handle by (2)
                 * 2. lastProcessZxid < peerZxid -> Handle by (3)
                 * 3. lastProcessZxid > peerZxid -> Handle by (5)
                 */
                minCommittedLog = lastProcessedZxid;
                maxCommittedLog = lastProcessedZxid;
            }

            /*
             * Here are the cases that we want to handle
             *
             * 1. Force sending snapshot (for testing purpose)
             * 2. Peer and leader is already sync, send empty diff
             * 3. Follower has txn that we haven't seen. This may be old leader
             *    so we need to send TRUNC. However, if peer has newEpochZxid,
             *    we cannot send TRUNC since the follower has no txnlog
             * 4. Follower is within committedLog range or already in-sync.
             *    We may need to send DIFF or TRUNC depending on follower's zxid
             *    We always send empty DIFF if follower is already in-sync
             * 5. Follower missed the committedLog. We will try to use on-disk
             *    txnlog + committedLog to sync with follower. If that fail,
             *    we will send snapshot
             */

            if (forceSnapSync) {
                // Force leader to use snapshot to sync with follower
                LOG.warn("Forcing snapshot sync - should not see this in production");
            } else if (lastProcessedZxid == peerLastZxid) {
                // Follower is already sync with us, send empty diff
             //将packet發送到queuedPackets中,queuedPackets是負責發送消息到learner伺服器的隊列
                queueOpPacket(Leader.DIFF, peerLastZxid);
                needOpPacket = false;
                needSnap = false;
            } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
                // Newer than committedLog, send trunc and done
                queueOpPacket(Leader.TRUNC, maxCommittedLog);
                currentZxid = maxCommittedLog;
                needOpPacket = false;
                needSnap = false;
            } else if ((maxCommittedLog >= peerLastZxid)
                    && (minCommittedLog <= peerLastZxid)) {
                // Follower is within commitLog range
                Iterator<Proposal> itr = db.getCommittedLog().iterator();
//差異化同步,發送(peerLaxtZxid, maxZxid]之間的消息給learner伺服器
                currentZxid = queueCommittedProposals(itr, peerLastZxid,
                                                     null, maxCommittedLog);
                needSnap = false;
            } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
                // Use txnlog and committedLog to sync

                // Calculate sizeLimit that we allow to retrieve txnlog from disk
                long sizeLimit = db.calculateTxnLogSizeLimit();
                // This method can return empty iterator if the requested zxid
                // is older than on-disk txnlog
                Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
                        peerLastZxid, sizeLimit);
                if (txnLogItr.hasNext()) {
                   
                    currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                                         minCommittedLog, maxCommittedLog);

                    currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                                         null, maxCommittedLog);
                    needSnap = false;
                }
                // closing the resources
                if (txnLogItr instanceof TxnLogProposalIterator) {
                    TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
                    txnProposalItr.close();
                }
            } else {
                LOG.warn("Unhandled scenario for peer sid: " +  getSid());
            }
            LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
                      " for peer sid: " +  getSid());
//lets the leader know that a follower is capable of following and is done syncing
//已經通過的提議但是還沒來得及送出的Proposal
            leaderLastZxid = leader.startForwarding(this, currentZxid);
        } finally {
            rl.unlock();
        }
//needOpPacket:用來判斷是否需要發送TRUNC或DIFF消息給發送隊列,預設為true
        if (needOpPacket && !needSnap) {
            // This should never happen, but we should fall back to sending
            // snapshot just in case.
            LOG.error("Unhandled scenario for peer sid: " +  getSid() +
                     " fall back to use snapshot");
            needSnap = true;
        }
        return needSnap;
    }           

複制

可以看出同步方式可大緻分為5種:

1.強制快照同步

可設定forceSnapSync為true,用于測試使用,預設為false

2.不需要同步

此時主從最大zxid一緻,不需要同步,僅需要發送一個DIFF消息即可

3.復原同步

learner伺服器zxid

peerLastZxid

大于leader伺服器zxid

lastProcessedZxid

,并且peerLastZxid>0,此時需要從伺服器丢棄大于lastProcessedZxid的事務日志,會發送TRUNC消息給learner伺服器

queueOpPacket(Leader.TRUNC, maxCommittedLog);

4.差異化同步(TRUNC+DIFF同步)

  • peerLastZxid

    位于

    minCommittedLog

    maxCommittedLog

    之間,但

    peerLastZxid

    找不到這個範圍内的值,則先復原到離

    peerLastZxid

    最近的前一條消息

    prevProposalZxid

    ,然後再進行(prevProposalZxid, maxZxid]之間的zxid同步
  • peerLastZxid

    位于

    minCommittedLog

    maxCommittedLog

    之間,且

    peerLastZxid

    真實存在,則隻需要進行(peerLaxtZxid, maxZxid]之間的zxid同步,與上面一條的差别處理可見

    LearnerHanler.queueCommittedProposals

protected long queueCommittedProposals(Iterator<Proposal> itr,
            long peerLastZxid, Long maxZxid, Long lastCommittedZxid) {
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
        long queuedZxid = peerLastZxid;
        // as we look through proposals, this variable keeps track of previous
        // proposal Id.
        long prevProposalZxid = -1;
        while (itr.hasNext()) {
            Proposal propose = itr.next();

            long packetZxid = propose.packet.getZxid();
            // abort if we hit the limit
            if ((maxZxid != null) && (packetZxid > maxZxid)) {
                break;
            }

            // skip the proposals the peer already has
            if (packetZxid < peerLastZxid) {
                prevProposalZxid = packetZxid;
                continue;
            }

            // If we are sending the first packet, figure out whether to trunc
            // or diff
            if (needOpPacket) {

                // Send diff when we see the follower's zxid in our history,情況5-1
                if (packetZxid == peerLastZxid) {
                    LOG.info("Sending DIFF zxid=0x" +
                             Long.toHexString(lastCommittedZxid) +
                             " for peer sid: " + getSid());
                    queueOpPacket(Leader.DIFF, lastCommittedZxid);
                    needOpPacket = false;
                    continue;
                }

                if (isPeerNewEpochZxid) {
                   // Send diff and fall through if zxid is of a new-epoch
                   LOG.info("Sending DIFF zxid=0x" +
                            Long.toHexString(lastCommittedZxid) +
                            " for peer sid: " + getSid());
                   queueOpPacket(Leader.DIFF, lastCommittedZxid);
                   needOpPacket = false;
                } else if (packetZxid > peerLastZxid  ) {
                    // Peer have some proposals that the leader hasn't seen yet,情況4
                    // it may used to be a leader
                    if (ZxidUtils.getEpochFromZxid(packetZxid) !=
                            ZxidUtils.getEpochFromZxid(peerLastZxid)) {
                        // We cannot send TRUNC that cross epoch boundary.
                        // The learner will crash if it is asked to do so.
                        // We will send snapshot this those cases.
                        LOG.warn("Cannot send TRUNC to peer sid: " + getSid() +
                                 " peer zxid is from different epoch" );
                        return queuedZxid;
                    }

                    LOG.info("Sending TRUNC zxid=0x" +
                            Long.toHexString(prevProposalZxid) +
                            " for peer sid: " + getSid());
                    queueOpPacket(Leader.TRUNC, prevProposalZxid);
                    needOpPacket = false;
                }
            }

            if (packetZxid <= queuedZxid) {
                // We can get here, if we don't have op packet to queue
                // or there is a duplicate txn in a given iterator
                continue;
            }

            // Since this is already a committed proposal, we need to follow
            // it by a commit packet
//發送PROPOSAL消息,包含資料資訊
            queuePacket(propose.packet);
//發送COMMIT消息,僅包含需要送出的zxid資訊
            queueOpPacket(Leader.COMMIT, packetZxid);
            queuedZxid = packetZxid;

        }

        if (needOpPacket && isPeerNewEpochZxid) {
            // We will send DIFF for this kind of zxid in any case. This if-block
            // is the catch when our history older than learner and there is
            // no new txn since then. So we need an empty diff
            LOG.info("Sending DIFF zxid=0x" +
                     Long.toHexString(lastCommittedZxid) +
                     " for peer sid: " + getSid());
            queueOpPacket(Leader.DIFF, lastCommittedZxid);
            needOpPacket = false;
        }

        return queuedZxid;
    }           

複制

  • 如果

    peerLastZxid < minCommittedLog

    ,但是所處事務日志檔案txnLog位置之後的事務大小小于最近快照中後

    snapSize * snapshotSizeFactor

    的大小,則采用txnLog + committedLog的方式同步,分為兩部分:
currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                                         minCommittedLog, maxCommittedLog);

                    currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                                         null, maxCommittedLog);           

複制

5.全量同步

如果

peerLastZxid

小于以上情況,則進行全量同步,該方法傳回true,回到

LearnerHandler.run

,會發送SNAP消息,并将整個ZKDatabase序列化,發送出去

之後會開啟線程異步發送

queuedPackets

隊列消息,等待learner伺服器的同步完成ACK消息。

Learner資料同步接收過程

當Learner伺服器發送完ACKEPOCH消息後,便會進入同步過程

Learner.syncWithLeader

(Follewer/Observer都會調用此方法)

protected void syncWithLeader(long newLeaderZxid) throws Exception{
        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
        QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
        
        QuorumVerifier newLeaderQV = null;
        
        // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
        // For SNAP and TRUNC the snapshot is needed to save that history
        boolean snapshotNeeded = true;
        boolean syncSnapshot = false;
        readPacket(qp);
        LinkedList<Long> packetsCommitted = new LinkedList<Long>();
        LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
        synchronized (zk) {
            if (qp.getType() == Leader.DIFF) {
                LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
                snapshotNeeded = false;
            }
            else if (qp.getType() == Leader.SNAP) {
                LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
                // The leader is going to dump the database
                // db is clear as part of deserializeSnapshot()
                zk.getZKDatabase().deserializeSnapshot(leaderIs);
                // ZOOKEEPER-2819: overwrite config node content extracted
                // from leader snapshot with local config, to avoid potential
                // inconsistency of config node content during rolling restart.
                if (!QuorumPeerConfig.isReconfigEnabled()) {
                    LOG.debug("Reset config node content from local config after deserialization of snapshot.");
                    zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
                }
                String signature = leaderIs.readString("signature");
                if (!signature.equals("BenWasHere")) {
                    LOG.error("Missing signature. Got " + signature);
                    throw new IOException("Missing signature");                   
                }
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());

                // immediately persist the latest snapshot when there is txn log gap
                syncSnapshot = true;
            } else if (qp.getType() == Leader.TRUNC) {
                //we need to truncate the log to the lastzxid of the leader
                LOG.warn("Truncating log to get in sync with the leader 0x"
                        + Long.toHexString(qp.getZxid()));
                boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
                if (!truncated) {
                    // not able to truncate the log
                    LOG.error("Not able to truncate the log "
                            + Long.toHexString(qp.getZxid()));
                    System.exit(13);
                }
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());

            }
            else {
                LOG.error("Got unexpected packet from leader: {}, exiting ... ",
                          LearnerHandler.packetToString(qp));
                System.exit(13);

            }
            zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
            zk.createSessionTracker();            
            
            long lastQueued = 0;

            // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
            // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
            // we need to make sure that we don't take the snapshot twice.
            boolean isPreZAB1_0 = true;
            //If we are not going to take the snapshot be sure the transactions are not applied in memory
            // but written out to the transaction log
            boolean writeToTxnLog = !snapshotNeeded;
            // we are now going to start getting transactions to apply followed by an UPTODATE
            outerLoop:
            while (self.isRunning()) {
                readPacket(qp);
                switch(qp.getType()) {
                case Leader.PROPOSAL:
                    PacketInFlight pif = new PacketInFlight();
                    pif.hdr = new TxnHeader();
                    pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
                    if (pif.hdr.getZxid() != lastQueued + 1) {
                    LOG.warn("Got zxid 0x"
                            + Long.toHexString(pif.hdr.getZxid())
                            + " expected 0x"
                            + Long.toHexString(lastQueued + 1));
                    }
                    lastQueued = pif.hdr.getZxid();
                    
                    if (pif.hdr.getType() == OpCode.reconfig){                
                        SetDataTxn setDataTxn = (SetDataTxn) pif.rec;       
                       QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
                       self.setLastSeenQuorumVerifier(qv, true);                               
                    }
                    
                    packetsNotCommitted.add(pif);
                    break;
                case Leader.COMMIT:
                case Leader.COMMITANDACTIVATE:
                    pif = packetsNotCommitted.peekFirst();
                    if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
                        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));
                        boolean majorChange = self.processReconfig(qv, ByteBuffer.wrap(qp.getData()).getLong(),
                                qp.getZxid(), true);
                        if (majorChange) {
                            throw new Exception("changes proposed in reconfig");
                        }
                    }
                    if (!writeToTxnLog) {
                        if (pif.hdr.getZxid() != qp.getZxid()) {
                            LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                        } else {
                            zk.processTxn(pif.hdr, pif.rec);
                            packetsNotCommitted.remove();
                        }
                    } else {
                        packetsCommitted.add(qp.getZxid());
                    }
                    break;
                case Leader.INFORM:
                case Leader.INFORMANDACTIVATE:
                    PacketInFlight packet = new PacketInFlight();
                    packet.hdr = new TxnHeader();

                    if (qp.getType() == Leader.INFORMANDACTIVATE) {
                        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
                        long suggestedLeaderId = buffer.getLong();
                        byte[] remainingdata = new byte[buffer.remaining()];
                        buffer.get(remainingdata);
                        packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr);
                        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)packet.rec).getData()));
                        boolean majorChange =
                                self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
                        if (majorChange) {
                            throw new Exception("changes proposed in reconfig");
                        }
                    } else {
                        packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
                        // Log warning message if txn comes out-of-order
                        if (packet.hdr.getZxid() != lastQueued + 1) {
                            LOG.warn("Got zxid 0x"
                                    + Long.toHexString(packet.hdr.getZxid())
                                    + " expected 0x"
                                    + Long.toHexString(lastQueued + 1));
                        }
                        lastQueued = packet.hdr.getZxid();
                    }
                    if (!writeToTxnLog) {
                        // Apply to db directly if we haven't taken the snapshot
                        zk.processTxn(packet.hdr, packet.rec);
                    } else {
                        packetsNotCommitted.add(packet);
                        packetsCommitted.add(qp.getZxid());
                    }

                    break;                
                case Leader.UPTODATE:
                    LOG.info("Learner received UPTODATE message");                                      
                    if (newLeaderQV!=null) {
                       boolean majorChange =
                           self.processReconfig(newLeaderQV, null, null, true);
                       if (majorChange) {
                           throw new Exception("changes proposed in reconfig");
                       }
                    }
                    if (isPreZAB1_0) {
                        zk.takeSnapshot(syncSnapshot);
                        self.setCurrentEpoch(newEpoch);
                    }
                    self.setZooKeeperServer(zk);
                    self.adminServer.setZooKeeperServer(zk);
                    break outerLoop;
                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                    // means this is Zab 1.0
                   LOG.info("Learner received NEWLEADER message");
                   if (qp.getData()!=null && qp.getData().length > 1) {
                       try {                       
                           QuorumVerifier qv = self.configFromString(new String(qp.getData()));
                           self.setLastSeenQuorumVerifier(qv, true);
                           newLeaderQV = qv;
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
                   }

                   if (snapshotNeeded) {
                       zk.takeSnapshot(syncSnapshot);
                   }
                   
                    self.setCurrentEpoch(newEpoch);
                    writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
                    isPreZAB1_0 = false;
                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                    break;
                }
            }
        }
        ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
        writePacket(ack, true);
        sock.setSoTimeout(self.tickTime * self.syncLimit);
        zk.startup();
        /*
         * Update the election vote here to ensure that all members of the
         * ensemble report the same vote to new servers that start up and
         * send leader election notifications to the ensemble.
         * 
         * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
         */
        self.updateElectionVote(newEpoch);

        // We need to log the stuff that came in between the snapshot and the uptodate
        if (zk instanceof FollowerZooKeeperServer) {
            FollowerZooKeeperServer fzk = (FollowerZooKeeperServer)zk;
            for(PacketInFlight p: packetsNotCommitted) {
                fzk.logRequest(p.hdr, p.rec);
            }
            for(Long zxid: packetsCommitted) {
                fzk.commit(zxid);
            }
        } else if (zk instanceof ObserverZooKeeperServer) {
            // Similar to follower, we need to log requests between the snapshot
            // and UPTODATE
            ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
            for (PacketInFlight p : packetsNotCommitted) {
                Long zxid = packetsCommitted.peekFirst();
                if (p.hdr.getZxid() != zxid) {
                    // log warning message if there is no matching commit
                    // old leader send outstanding proposal to observer
                    LOG.warn("Committing " + Long.toHexString(zxid)
                            + ", but next proposal is "
                            + Long.toHexString(p.hdr.getZxid()));
                    continue;
                }
                packetsCommitted.remove();
                Request request = new Request(null, p.hdr.getClientId(),
                        p.hdr.getCxid(), p.hdr.getType(), null, null);
                request.setTxn(p.rec);
                request.setHdr(p.hdr);
                ozk.commitRequest(request);
            }
        } else {
            // New server type need to handle in-flight packets
            throw new UnsupportedOperationException("Unknown server type");
        }
    }           

複制

大緻流程為:首先會判斷第一個接收到的消息類型是DIFF,SNAP還是TRUNC,分别進行不同的資料同步準備。然後開始不斷讀取同步消息,直到接收到NEWLEADER消息後,發送ACK給leader伺服器,等待leader伺服器的UPTODATE消息,表示同步完成,然後再發送ACK給leader伺服器,表示learner伺服器也知道了,開始啟動zkServer,對外提供服務。

本文參與 騰訊雲自媒體分享計劃 ,歡迎熱愛寫作的你一起參與!

本文分享自作者個人站點/部落格

https://www.jianshu.com/u/33d6034f5539

複制

如有侵權,請聯系 [email protected] 删除。