天天看點

萬字長文:自底向上剖析boltdb資料庫源碼(下)

  作者:jaydenwen,騰訊PCG背景開發工程師

  本篇(下)介紹3.3-6内容,前文請移步(上)篇:萬字長文:自底向上剖析boltdb資料庫源碼(上)

  3.3 node節點的相關操作

  在開始分析node節點之前,我們先看一下官方對node節點的描述

  node represents an in-memory, deserialized page

  一個node節點,既可能是葉子節點,也可能是根節點,也可能是分支節點。是實體磁盤上讀取進來的頁page的記憶體表現形式。

  3.3.1 node節點的定義

  // node represents an in-memory, deserialized page.

  type node struct {

  bucket *Bucket // 關聯一個桶

  isLeaf bool

  unbalanced bool // 值為true的話,需要考慮頁合并

  spilled bool // 值為true的話,需要考慮頁分裂

  key []byte // 對于分支節點的話,保留的是最小的key

  pgid pgid // 分支節點關聯的頁id

  parent *node // 該節點的parent

  children nodes // 該節點的孩子節點

  inodes inodes // 該節點上儲存的索引資料

  }

  // inode represents an internal node inside of a node.

  // It can be used to point to elements in a page or point

  // to an element which hasn't been added to a page yet.

  type inode struct {

  // 表示是否是子桶葉子節點還是普通葉子節點。如果flags值為1表示子桶葉子節點,否則為普通葉子節點

  flags uint32

  // 當inode為分支元素時,pgid才有值,為葉子元素時,則沒值

  pgid pgid

  key []byte

  // 當inode為分支元素時,value為空,為葉子元素時,才有值

  value []byte

  type inodes []inode

  3.3.2 node節點和page轉換

  在node對象上有兩個方法,read(page)、write(page),其中read(page)方法是用來通過page建構一個node節點;而write(page)方法則是将目前的node節點寫入到page中,我們在前面他提到了node節點和page節點的互相轉換,大家可以回到2.4節内容進行回顧。此處不再重複。

  3.3.3 node節點的增删改查

  put(k,v)

  // put inserts a key/value.

  // 如果put的是一個key、value的話,不需要指定pgid。

  // 如果put的一個樹枝節點,則需要指定pgid,不需要指定value

  func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {

  if pgid >= n.bucket.tx.meta.pgid {

  panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", pgid, n.bucket.tx.meta.pgid))

  } else if len(oldKey) <= 0 {

  panic("put: zero-length old key")

  } else if len(newKey) <= 0 {

  panic("put: zero-length new key")

  // Find insertion index.

  index := sort.Search(len(n.inodes), func(i int) bool { return bytespare(n.inodes[i].key, oldKey) != -1 })

  // Add capacity and shift nodes if we don't have an exact match and need to insert.

  exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey))

  if !exact {

  n.inodes = append(n.inodes, inode{})

  copy(n.inodes[index+1:], n.inodes[index:])

  inode := &n.inodes[index]

  inode.flags = flags

  inode.key = newKey

  inode.value = value

  inode.pgid = pgid

  _assert(len(inode.key) > 0, "put: zero-length inode key")

  get(k)

  在node中,沒有get(k)的方法,其本質是在Cursor中就傳回了get的資料。大家可以看看Cursor中的keyValue()方法。

  del(k)

  // del removes a key from the node.

  func (n *node) del(key []byte) {

  // Find index of key.

  index := sort.Search(len(n.inodes), func(i int) bool { return bytespare(n.inodes[i].key, key) != -1 })

  // Exit if the key isn't found.

  if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {

  return

  // Delete inode from the node.

  n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)

  // Mark the node as needing rebalancing.

  n.unbalanced = true

  3.3.4 node節點的分裂和合并

  上面我們看了對node節點的操作,包括put和del方法。經過這些操作後,可能會導緻目前的page填充度過高或者過低。是以就引出了node節點的分裂和合并。下面簡單介紹下什麼是分裂和合并。

  分裂: 當一個node中的資料過多時,最簡單就是當超過了page的填充度時,就需要将目前的node拆分成兩個,也就是底層會将一頁資料拆分存放到兩頁中。具體實作在spill()方法中。

  spill writes the nodes to dirty pages and splits nodes as it goes. Returns an error if dirty pages cannot be allocated.

  合并: 當删除了一個或者一批對象時,此時可能會導緻一頁資料的填充度過低,此時空間可能會浪費比較多。是以就需要考慮對頁之間進行資料合并。具體實作在rebalance()方法中。

  rebalance attempts to combine the node with sibling nodes if the node fill size is below a threshold or if there are not enough keys.

  由于内容過長,此處代碼就不貼出來了。關于該部分的代碼分析大家感興趣可以點選此處 或者點選此處 進行閱讀。

  3.4 Bucket的相關操作

  前面我們分析完了如何周遊、查找一個Bucket之後,下面我們來看看如何建立、擷取、删除一個Bucket對象。

  3.4.1 建立一個Bucket

  1. CreateBucketIfNotExists()、CreateBucket()分析

  根據指定的key來建立一個Bucket,如果指定key的Bucket已經存在,則會報錯。如果指定的key之前有插入過元素,也會報錯。否則的話,會在目前的Bucket中找到合适的位置,然後建立一個Bucket插入進去,最後傳回給用戶端。

  // CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.

  // Returns an error if the bucket name is blank, or if the bucket name is too long.

  // The bucket instance is only valid for the lifetime of the transaction.

  func (b Bucket) CreateBucketIfNotExists(key []byte) (Bucket, error) {

  child, err := b.CreateBucket(key)

  if err == ErrBucketExists {

  return b.Bucket(key), nil

  } else if err != nil {

  return nil, err

  return child, nil

  // CreateBucket creates a new bucket at the given key and returns the new bucket.

  // Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.

  func (b Bucket) CreateBucket(key []byte) (Bucket, error) {

  // ...

  // 省去異常檢查邏輯

  // Move cursor to correct position.

  // 拿到遊标

  c := b.Cursor()

  // 開始周遊、找到合适的位置

  k, _, flags := c.seek(key)

  // Return an error if there is an existing key.

  if bytes.Equal(key, k) {

  // 是桶,已經存在了

  if (flags & bucketLeafFlag) != 0 {

  return nil, ErrBucketExists

  // 不是桶、但key已經存在了

  return nil, ErrIncompatibleValue

  // Create empty, inline bucket.

  var bucket = Bucket{

  bucket: &bucket{},

  rootNode: &node{isLeaf: true},

  FillPercent: DefaultFillPercent,

  // 拿到bucket對應的value

  var value = bucket.write()

  // Insert into node.

  key = cloneBytes(key)

  // 插入到inode中

  // c.node()方法會在記憶體中建立這棵樹,調用n.read(page)

  c.node().put(key, key, value, 0, bucketLeafFlag)

  // Since subbuckets are not allowed on inline buckets, we need to

  // dereference the inline page, if it exists. This will cause the bucket

  // to be treated as a regular, non-inline bucket for the rest of the tx.

  b.page = nil

  //根據key擷取一個桶

  // write allocates and writes a bucket to a byte slice.

  // 内聯桶的話,其value中bucketHeaderSize後面的内容為其page的資料

  func (b *Bucket) write() []byte {

  // Allocate the appropriate size.

  var n = b.rootNode

  var value = make([]byte, bucketHeaderSize+n.size())

  // Write a bucket header.

  var bucket = (*bucket)(unsafe.Pointer(&value[0]))

  bucket = b.bucket

  // Convert byte slice to a fake page and write the root node.

  var p = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))

  // 将該桶中的元素壓縮存儲,放在value中

  n.write(p)

  return value

  // node returns the node that the cursor is currently positioned on.

  func (c Cursor) node() node {

  _assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack")

  // If the top of the stack is a leaf node then just return it.

  if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {

  return ref.node

  // Start from root and traverse down the hierarchy.

  var n = c.stack[0].node

  if n == nil {

  n = c.bucket.node(c.stack[0].page.id, nil)

  // 非葉子節點

  for _, ref := range c.stack[:len(c.stack)-1] {

  _assert(!n.isLeaf, "expected branch node")

  n = n.childAt(int(ref.index))

  _assert(n.isLeaf, "expected leaf node")

  return n

  put方法的具體實作3.3節中的put(k,v)。

  3.4.2 擷取一個Bucket

  根據指定的key來擷取一個Bucket。如果找不到則傳回nil。

  // Bucket retrieves a nested bucket by name.

  // Returns nil if the bucket does not exist.

  func (b Bucket) Bucket(name []byte) Bucket {

  if b.buckets != nil {

  if child := b.buckets[string(name)]; child != nil {

  return child

  // Move cursor to key.

  // 根據遊标找key

  k, v, flags := c.seek(name)

  // Return nil if the key doesn't exist or it is not a bucket.

  if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {

  return nil

  // Otherwise create a bucket and cache it.

  // 根據找到的value來打開桶。

  var child = b.openBucket(v)

  // 加速緩存的作用

  b.buckets[string(name)] = child

  // Helper method that re-interprets a sub-bucket value

  // from a parent into a Bucket

  func (b Bucket) openBucket(value []byte) Bucket {

  var child = newBucket(b.tx)

  // If unaligned load/stores are broken on this arch and value is

  // unaligned simply clone to an aligned byte array.

  unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0

  if unaligned {

  value = cloneBytes(value)

  // If this is a writable transaction then we need to copy the bucket entry.

  // Read-only transactions can point directly at the mmap entry.

  if b.tx.writable && !unaligned {

  child.bucket = &bucket{}

  child.bucket = (*bucket)(unsafe.Pointer(&value[0]))

  } else {

  // Save a reference to the inline page if the bucket is inline.

  // 内聯桶

  if child.root == 0 {

  child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))

  return &child

  3.4.3 删除一個Bucket

  DeleteBucket()方法用來删除一個指定key的Bucket。其内部實作邏輯是先遞歸的删除其子桶。然後再釋放該Bucket的page,并最終從葉子節點中移除

  // DeleteBucket deletes a bucket at the given key.

  // Returns an error if the bucket does not exists, or if the key represents a non-bucket value.

  func (b *Bucket) DeleteBucket(key []byte) error {

  //異常邏輯檢查

  // Return an error if bucket doesn't exist or is not a bucket.

  if !bytes.Equal(key, k) {

  return ErrBucketNotFound

  } else if (flags & bucketLeafFlag) == 0 {

  return ErrIncompatibleValue

  // Recursively delete all child buckets.

  child := b.Bucket(key)

  // 将該桶下面的所有桶都删除

  err := child.ForEach(func(k, v []byte) error {

  if v == nil {

  if err := child.DeleteBucket(k); err != nil {

  return fmt.Errorf("delete bucket: %s", err)

  })

  if err != nil {

  return err

  // Remove cached copy.

  delete(b.buckets, string(key))

  // Release all bucket pages to freelist.

  child.nodes = nil

  child.rootNode = nil

  child.free()

  // Delete the node if we have a matching key.

  c.node().del(key)

  node的del()方法具體實作參考之前3.3節del(k)

  3.5 key&value的插入、擷取、删除

  上面一節我們介紹了一下如何建立一個Bucket、如何擷取一個Bucket。有了Bucket,我們就可以對我們最關心的key/value鍵值對進行增删改查了。其實本質上,對key/value的所有操作最終都要表現在底層的node上。因為node節點就是用來存儲真實資料的。

  3.5.1 插入一個key&value

  // Put sets the value for a key in the bucket.

  // If the key exist then its previous value will be overwritten.

  // Supplied value must remain valid for the life of the transaction.

  // Returns an error if the bucket was created from a read-only transaction,

  // if the key is blank, if the key is too large, or if the value is too large.

  func (b *Bucket) Put(key []byte, value []byte) error {

  if b.tx.db == nil {

  return ErrTxClosed

  } else if !b.Writable() {

  return ErrTxNotWritable

  } else if len(key) == 0 {

  return ErrKeyRequired

  } else if len(key) > MaxKeySize {

  return ErrKeyTooLarge

  } else if int64(len(value)) > MaxValueSize {

  return ErrValueTooLarge

  // Return an error if there is an existing key with a bucket value.

  if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {

  c.node().put(key, key, value, 0, 0)

  3.5.2 擷取一個key&value

  // Get retrieves the value for a key in the bucket.

  // Returns a nil value if the key does not exist or if the key is a nested bucket.

  // The returned value is only valid for the life of the transaction.

  func (b *Bucket) Get(key []byte) []byte {

  k, v, flags := b.Cursor().seek(key)

  // Return nil if this is a bucket.

  // If our target node isn't the same key as what's passed in then return nil.

  return v

  3.5.3 删除一個key&value

  // Delete removes a key from the bucket.

  // If the key does not exist then nothing is done and a nil error is returned.

  // Returns an error if the bucket was created from a read-only transaction.

  func (b *Bucket) Delete(key []byte) error {

  _, _, flags := c.seek(key)

  // Return an error if there is already existing bucket value.

  3.5.4 周遊Bucket中所有的key&value

  // ForEach executes a function for each key/value pair in a bucket.

  // If the provided function returns an error then the iteration is stopped and

  // the error is returned to the caller. The provided function must not modify

  // the bucket; this will result in undefined behavior.

  func (b *Bucket) ForEach(fn func(k, v []byte) error) error {

  // 周遊鍵值對

  for k, v := c.First(); k != nil; k, v = c.Next() {

  if err := fn(k, v); err != nil {

  3.6 Bucket的頁分裂、頁合并

  關于本部分的内容過長,大家可以點選此處或者點選此處 進行閱讀。

  3.7 總結

  本章我們主要介紹了boltdb中比較核心的兩個資料結構:Bucket、node。為什麼這兩個資料結構放在一起介紹呢?答案是在boltdb中一個Bucket就對應一顆b+樹。而b+樹的結構(根節點、葉子節點、非葉子節點)、組織都是通過node來完成的。這也是為什麼把他們放在一起介紹的主要原因。

  在介紹中,我們主要圍繞Bucket的建立、擷取、删除、周遊、增删改kv等操作進行展開。其次在周遊時,就引入了Cursor資料結構,一個Bucket對象(一顆b+樹)的周遊在boltdb中時通過一個棧來維護周遊的路徑來完成的。這也是Cursor中stack的意義。

  其次Bucket中對kv的操作都反應到底層的node上,是以我們又同時介紹了node的相關方法,例如put、get、del、spill、rebalance。

  最後到此為止,我們的資料時如何存儲的、組織的。以及記憶體和磁盤資料時如何轉換映射的,我們就清楚了。下章将介紹boltdb中的事務了。有了事務我們的資料庫才稱得上是一個完備的資料庫。

  4. boltdb事務控制

  事務可以說是一個資料庫必不可少的特性,對boltdb而言也不例外。我們都知道提到事務,必然會想到事務的四大特性。那麼下面就讓我們看看在boltdb中到底是怎麼實作它的事務的呢?

  4.1 boltdb事務簡介

  我們先看一下,boltdb官方文檔中對事務的描述:

  Bolt allows only one read-write transaction at a time but allows as many read-only transactions as you want at a time. Each transaction has a consistent view of the data as it existed when the transaction started.

  Individual transactions and all objects created from them (e.g. buckets, keys) are not thread safe. To work with data in multiple goroutines you must start a transaction for each one or use locking to ensure only one goroutine accesses a transaction at a time. Creating transaction from the DB is thread safe.

  Read-only transactions and read-write transactions should not depend on one another and generally shouldn't be opened simultaneously in the same goroutine. This can cause a deadlock as the read-write transaction needs to periodically re-map the data file but it cannot do so while a read-only transaction is open.

  我們再簡單總結下,在boltdb中支援兩類事務:讀寫事務、隻讀事務。同一時間有且隻能有一個讀寫事務執行;但同一個時間可以允許有多個隻讀事務執行。每個事務都擁有自己的一套一緻性視圖。

  此處需要注意的是,在boltdb中打開一個資料庫時,有兩個選項:隻讀模式、讀寫模式。内部在實作時是根據不同的選項來底層加不同的鎖(flock)。隻讀模式對應共享鎖,讀寫模式對應互斥鎖。具體加解鎖的實作可以在bolt_unix.go 和bolt_windows.go中找到。

  關于事務的ACID特性此處就不特别說明了。此部分内容大家可以自行查閱資料,下面我們進入主題。

  4.2 boltdb事務Tx定義

  // txid represents the internal transaction identifier.

  type txid uint64

  // Tx represents a read-only or read/write transaction on the database.

  // Read-only transactions can be used for retrieving values for keys and creating cursors.

  // Read/write transactions can create and remove buckets and create and remove keys.

  // IMPORTANT: You must commit or rollback transactions when you are done with

  // them. Pages can not be reclaimed by the writer until no more transactions

  // are using them. A long running read transaction can cause the database to

  // quickly grow.

  // Tx 主要封裝了讀事務和寫事務。其中通過writable來區分是讀事務還是寫事務

  type Tx struct {

  writable bool

  managed bool

  db *DB

  meta *meta

  root Bucket

  pages map[pgid]*page

  stats TxStats

  // 送出時執行的動作

  commitHandlers []func()

  // WriteFlag specifies the flag for write-related methods like WriteTo().

  // Tx opens the database file with the specified flag to copy the data.

  // By default, the flag is unset, which works well for mostly in-memory

  // workloads. For databases that are much larger than available RAM,

  // set the flag to syscall.O_DIRECT to avoid trashing the page cache.

  WriteFlag int

  // init initializes the transaction.

  func (tx Tx) init(db DB) {

  tx.db = db

  tx.pages = nil

  // Copy the meta page since it can be changed by the writer.

  // 拷貝元資訊

  tx.meta = &meta{}

  db.meta().copy(tx.meta)

  // Copy over the root bucket.

  // 拷貝根節點

  tx.root = newBucket(tx)

  tx.root.bucket = &bucket{}

  // meta.root=bucket{root:3}

  *tx.root.bucket = tx.meta.root

  // Increment the transaction id and add a page cache for writable transactions.

  if tx.writable {

  tx.pages = make(map[pgid]*page)

  tx.meta.txid += txid(1)

  4.3 Begin()實作

  此處需要說明一下:在boltdb中,事務的開啟方法是綁定在DB對象上的,為了保證内容的完整性,我們還是把事務開啟的Begin()方法補充到這個地方。 前面提到boltdb中事務分為兩類,它的區分就是在開啟事務時,根據傳遞的參數來内部執行不同的邏輯。 在讀寫事務中,開始事務時加鎖,也就是db.rwlock.Lock()。在事務送出或者復原時才釋放鎖:db.rwlock.UnLock()。同時也印證了我們前面說的,同一時刻隻能有一個讀寫事務在執行。

  // Begin starts a new transaction.

  // Multiple read-only transactions can be used concurrently but only one

  // write transaction can be used at a time. Starting multiple write transactions

  // will cause the calls to block and be serialized until the current write

  // transaction finishes.

  //

  // Transactions should not be dependent on one another. Opening a read

  // transaction and a write transaction in the same goroutine can cause the

  // writer to deadlock because the database periodically needs to re-mmap itself

  // as it grows and it cannot do that while a read transaction is open.

  // If a long running read transaction (for example, a snapshot transaction) is

  // needed, you might want to set DB.InitialMmapSize to a large enough value

  // to avoid potential blocking of write transaction.

  // IMPORTANT: You must close read-only transactions after you are finished or

  // else the database will not reclaim old pages.

  func (db DB) Begin(writable bool) (Tx, error) {

  if writable {

  return db.beginRWTx()

  return db.beginTx()

  func (db DB) beginTx() (Tx, error) {

  // Lock the meta pages while we initialize the transaction. We obtain

  // the meta lock before the mmap lock because that's the order that the

  // write transaction will obtain them.

  db.metalock.Lock()

  // Obtain a read-only lock on the mmap. When the mmap is remapped it will

  // obtain a write lock so all transactions must finish before it can be

  // remapped.

  db.mmaplock.RLock()

  // Exit if the database is not open yet.

  if !db.opened {

  db.mmaplocklock()

  db.metalock.Unlock()

  return nil, ErrDatabaseNotOpen

  // Create a transaction associated with the database.

  t := &Tx{}

  t.init(db)

  // Keep track of transaction until it closes.

  db.txs = append(db.txs, t)

  n := len(db.txs)

  // Unlock the meta pages.

  // Update the transaction stats.

  db.statlock.Lock()

  db.stats.TxN++

  db.stats.OpenTxN = n

  db.statlock.Unlock()

  return t, nil

  func (db DB) beginRWTx() (Tx, error) {

  // If the database was opened with Options.ReadOnly, return an error.

  if db.readOnly {

  return nil, ErrDatabaseReadOnly

  // Obtain writer lock. This is released by the transaction when it closes.

  // This enforces only one writer transaction at a time.

  db.rwlock.Lock()

  // Once we have the writer lock then we can lock the meta pages so that

  // we can set up the transaction.

  defer db.metalock.Unlock()

  db.rwlock.Unlock()

  t := &Tx{writable: true}

  db.rwtx = t

  // Free any pages associated with closed read-only transactions.

  var minid txid = 0xFFFFFFFFFFFFFFFF

  // 找到最小的事務id

  for _, t := range db.txs {

  if t.meta.txid < minid {

  minid = t.meta.txid

  if minid > 0 {

  // 将之前事務關聯的page全部釋放了,因為在隻讀事務中,沒法釋放,隻讀事務的頁,因為可能目前的事務已經完成 ,但實際上其他的讀事務還在用

  db.freelist.release(minid - 1)

  4.4 Commit()實作

  Commit()方法内部實作中,總體思路是:

  先判定節點要不要合并、分裂對空閑清單的判斷,是否存在溢出的情況,溢出的話,需要重新配置設定空間将事務中涉及改動的頁進行排序(保證盡可能的順序IO),排序後循環寫入到磁盤中,最後再執行刷盤當資料寫入成功後,再将元資訊頁寫到磁盤中,刷盤以保證持久化上述操作中,但凡有失敗,目前事務都會進行復原

  // Commit writes all changes to disk and updates the meta page.

  // Returns an error if a disk write error occurs, or if Commit is

  // called on a read-only transaction.

  // 先更新資料然後再更新元資訊

  // 更新資料成功、元資訊未來得及更新機器就挂掉了。資料如何恢複?

  func (tx *Tx) Commit() error {

  // 此處省去異常邏輯檢查..

  // 删除時,進行平衡,頁合并

  // Rebalance nodes which have had deletions.

  var startTime = time.Now()

  tx.root.rebalance()

  if tx.stats.Rebalance > 0 {

  tx.stats.RebalanceTime += time.Since(startTime)

  // 頁分裂

  // spill data onto dirty pages.

  startTime = time.Now()

  // 這個内部會往緩存tx.pages中加page

  if err := tx.root.spill(); err != nil {

  tx.rollback()

  tx.stats.SpillTime += time.Since(startTime)

  // Free the old root bucket.

  tx.meta.root.root = tx.root.root

  opgid := tx.meta.pgid

  // Free the freelist and allocate new pages for it. This will overestimate

  // the size of the freelist but not underestimate the size (which would be bad).

  // 配置設定新的頁面給freelist,然後将freelist寫入新的頁面

  tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))

  // 空閑清單可能會增加,是以需要重新配置設定頁用來存儲空閑清單

  // 因為在開啟寫事務的時候,有去釋放之前讀事務占用的頁資訊,是以此處需要判斷是否freelist會有溢出的問題

  p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)

  // 将freelist寫入到連續的新頁中

  if err := tx.db.freelist.write(p); err != nil {

  // 更新中繼資料的頁id

  tx.meta.freelist = p.id

  // If the high water mark has moved up then attempt to grow the database.

  // 在allocate中有可能會更改meta.pgid

  if tx.meta.pgid > opgid {

  if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {

  // Write dirty pages to disk.

  // 寫資料

  if err := tx.write(); err != nil {

  // If strict mode is enabled then perform a consistency check.

  // Only the first consistency error is reported in the panic.

  if tx.db.StrictMode {

  //做一緻性檢查邏輯此處省略..

  // Write meta to disk.

  // 元資訊寫入到磁盤

  if err := tx.writeMeta(); err != nil {

  tx.stats.WriteTime += time.Since(startTime)

  // Finalize the transaction.

  tx.close()

  // Execute commit handlers now that the locks have been removed.

  for _, fn := range txmitHandlers {

  fn()

  // write writes any dirty pages to disk.

  func (tx *Tx) write() error {

  // Sort pages by id.

  // 保證寫的頁是有序的

  pages := make(pages, 0, len(tx.pages))

  for _, p := range tx.pages {

  pages = append(pages, p)

  // Clear out page cache early.

  sort.Sort(pages)

  // Write pages to disk in order.

  for _, p := range pages {

  // 頁數和偏移量

  size := (int(p.overflow) + 1) * tx.db.pageSize

  offset := int64(p.id) * int64(tx.db.pageSize)

  // Write out page in "max allocation" sized chunks.

  ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))

  // 循環寫某一頁

  for {

  // Limit our write to our max allocation size.

  sz := size

  // 2^31=2G

  if sz > maxAllocSize-1 {

  sz = maxAllocSize - 1

  // Write chunk to disk.

  buf := ptr[:sz]

  if _, err := tx.db.ops.writeAt(buf, offset); err != nil {

  // Update statistics.

  tx.stats.Write++

  // Exit inner for loop if we've written all the chunks.

  size -= sz

  if size == 0 {

  break

  // Otherwise move offset forward and move pointer to next chunk.

  // 移動偏移量

  offset += int64(sz)

  // 同時指針也移動

  ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))

  // Ignore file sync if flag is set on DB.

  if !tx.db.NoSync || IgnoreNoSync {

  if err := fdatasync(tx.db); err != nil {

  // Put small pages back to page pool.

  // Ignore page sizes over 1 page.

  // These are allocated using make() instead of the page pool.

  if int(p.overflow) != 0 {

  continue

  buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize]

  // See go.googlesource/go/+/f03c9202c43e0abb130669852082117ca50aa9b1

  // 清空buf,然後放入pagePool中

  for i := range buf {

  buf[i] = 0

  tx.db.pagePool.Put(buf)

  // writeMeta writes the meta to the disk.

  func (tx *Tx) writeMeta() error {

  // Create a temporary buffer for the meta page.

  buf := make([]byte, tx.db.pageSize)

  p := tx.db.pageInBuffer(buf, 0)

  // 将事務的元資訊寫入到頁中

  tx.meta.write(p)

  // Write the meta page to file.

  if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {

  // allocate returns a contiguous block of memory starting at a given page.

  // 配置設定一段連續的頁

  func (tx Tx) allocate(count int) (page, error) {

  p, err := tx.db.allocate(count)

  // Save to our page cache.

  tx.pages[p.id] = p

  tx.stats.PageCount++

  tx.stats.PageAlloc += count * tx.db.pageSize

  return p, nil

  4.5 Rollback()實作

  Rollback()中,主要對不同僚務進行不同操作:

  如果目前事務是隻讀事務,則隻需要從db中的txs中找到目前事務,然後移除掉即可。如果目前事務是讀寫事務,則需要将空閑清單中和該事務關聯的頁釋放掉,同時重新從freelist中加載空閑頁。

  // Rollback closes the transaction and ignores all previous updates. Read-only

  // transactions must be rolled back and not committed.

  func (tx *Tx) Rollback() error {

  _assert(!tx.managed, "managed tx rollback not allowed")

  if tx.db == nil {

  func (tx *Tx) rollback() {

  // 移除該事務關聯的pages

  tx.db.freelist.rollback(tx.meta.txid)

  // 重新從freelist頁中讀取建構空閑清單

  tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))

  func (tx *Tx) close() {

  // Grab freelist stats.

  var freelistFreeN = tx.db.freelist.free_count()

  var freelistPendingN = tx.db.freelist.pending_count()

  var freelistAlloc = tx.db.freelist.size()

  // Remove transaction ref & writer lock.

  tx.db.rwtx = nil

  tx.db.rwlock.Unlock()

  // Merge statistics.

  tx.db.statlock.Lock()

  tx.db.stats.FreePageN = freelistFreeN

  tx.db.stats.PendingPageN = freelistPendingN

  tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize

  tx.db.stats.FreelistInuse = freelistAlloc

  tx.db.stats.TxStats.add(&tx.stats)

  tx.db.statlock.Unlock()

  // 隻讀事務

  tx.db.removeTx(tx)

  // Clear all references.

  tx.db = nil

  tx.meta = nil

  tx.root = Bucket{tx: tx}

  // removeTx removes a transaction from the database.

  func (db DB) removeTx(tx Tx) {

  // Release the read lock on the mmap.

  // Use the meta lock to restrict access to the DB object.

  // Remove the transaction.

  for i, t := range db.txs {

  if t == tx {

  last := len(db.txs) - 1

  db.txs[i] = db.txs[last]

  db.txs[last] = nil

  db.txs = db.txs[:last]

  db.stats.TxStats.add(&tx.stats)

  4.6 總結

  本章主要詳細分析了下,boltdb内部事務的實作機制,再此基礎上對事務中核心的幾個方法做了代碼的分析。到此基本上一個資料庫核心的部件都已經實作完畢。那剩下的功能就把各部分功能進行組裝起來,實作一個完整對外可用的資料庫了。下一章我們來詳細分析下boltdb中DB對象的内部一些實作。

  5. boltdb的DB對象分析

  前面我們介紹了boltdb底層在磁盤上資料時如何組織存儲(page)的,然後又介紹了磁盤中的資料在記憶體中又是如何存儲(node)的。接着我們又介紹了管理kv資料集合的Bucket對象以及用來周遊Bucket的Cursor對象。最後我們詳細的介紹了boltdb中事務是如何實作(Tx)的。到此

二手手機号碼出售

boltdb中各個零散的部件我們都一一熟悉了,接下來是時候将他們組織在一起工作了。因而就有了boltdb中最上層的DB對象。本章主要介紹DB對象相關的方法以及其内部實作。

  5.1 DB結構

  DB在boltdb是一個結構體,裡面封裝了很多屬性,部分屬性添加了中文注釋,其他部分屬性,大家可以直接看英文注釋,感覺英文表述的很通俗易懂。

  //省略部分常量定義

  // Default values if not set in a DB instance.

  const (

  DefaultMaxBatchSize int =s 1000

  DefaultMaxBatchDelay = 10 * time.Millisecond

  // 16k

  DefaultAllocSize = 16 1024 1024

  )

  // default page size for db is set to the OS page size.

  var defaultPageSize = os.Getpagesize()

  // DB represents a collection of buckets persisted to a file on disk.

  // All data access is performed through transactions which can be obtained through the DB.

  // All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.

  type DB struct {

  //限于篇幅此處省略一些配置項(MaxBatchDelay、MaxBatchDelay、MmapFlags、NoGrowSync等)相關的參加定義。想了解全部内容的可以參考源碼,或者文章末尾的書籍連結進行閱讀。

  path string

  file *os.File // 真實存儲資料的磁盤檔案

  lockfile *os.File // windows only

  dataref []byte // mmap'ed readonly, write throws SEGV

  // 通過mmap映射進來的位址

  data *[maxMapSize]byte

  datasz int

  filesz int // current on disk file size

  // 中繼資料

  meta0 *meta

  meta1 *meta

  pageSize int

  opened bool

  rwtx *Tx // 寫事務鎖

  txs []*Tx // 讀事務數組

  freelist *freelist // 空閑清單

  stats Stats

  pagePool sync.Pool

  batchMu sync.Mutex

  batch *batch

  rwlock sync.Mutex // Allows only one writer at a time.

  metalock sync.Mutex // Protects meta page access.

  mmaplock sync.RWMutex // Protects mmap access during remapping.

  statlock sync.RWMutex // Protects stats access.

  ops struct {

  writeAt func(b []byte, off int64) (n int, err error)

  // Read only mode.

  // When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately.

  readOnly bool

  5.2 對外接口

  1.Open()建立資料庫接口

  // Open creates and opens a database at the given path.

  // If the file does not exist then it will be created automatically.

  // Passing in nil options will cause Bolt to open the database with the default options.

  // 建立資料庫接口

  func Open(path string, mode os.FileMode, options Options) (DB, error)

  2.View()查詢接口

  // View executes a function within the context of a managed read-only transaction.

  // Any error that is returned from the function is returned from the View() method.

  // Attempting to manually rollback within the function will cause a panic.

  func (db DB) View(fn func(Tx) error) error

  3.Update()更新接口

  // Update executes a function within the context of a read-write managed transaction.

  // If no error is returned from the function then the transaction is committed.

  // If an error is returned then the entire transaction is rolled back.

  // Any error that is returned from the function or returned from the commit is

  // returned from the Update() method.

  // Attempting to manually commit or rollback within the function will cause a panic.

  func (db DB) Update(fn func(Tx) error) error

  4.Batch()批量更新接口

  // Batch calls fn as part of a batch. It behaves similar to Update,

  // except:

  // 1. concurrent Batch calls can be combined into a single Bolt

  // transaction.

  // 2. the function passed to Batch may be called multiple times,

  // regardless of whether it returns error or not.

  // This means that Batch function side effects must be idempotent and

  // take permanent effect only after a successful return is seen in

  // caller.

  // The maximum batch size and delay can be adjusted with DB.MaxBatchSize

  // and DB.MaxBatchDelay, respectively.

  // Batch is only useful when there are multiple goroutines calling it.

  func (db DB) Batch(fn func(Tx) error) error

  5.Begin()開啟事務接口

  func (db DB) Begin(writable bool) (Tx, error)

  備注:Begin()的實作分析,參見事務4.3節内容,下面不在做分析。

  下面我們将對上述接口做一一分析

  5.3 Open()實作分析

  Open()方法主要用來建立一個boltdb的DB對象,底層會執行建立或者打開存儲資料的檔案,當指定的檔案不存在時, boltdb就會建立一個資料檔案。否則的話,就直接加載指定的資料庫檔案内容。

  值的注意是,boltdb會根據Open時,options傳遞的參數來判斷到底加互斥鎖還是共享鎖。

  建立時: 會調用init()方法,内部主要是建立一個檔案,然後第0頁、第1頁寫入中繼資料資訊;第2頁寫入freelist資訊;第3頁寫入bucket leaf資訊。并最終刷盤。

  加載時: 會讀取第0頁内容,也就是元資訊。然後對其進行校驗和校驗,當校驗通過後擷取pageSize。否則的話,讀取作業系統預設的pagesize(一般4k)

  上述操作完成後,會通過mmap來映射資料。最後再根據磁盤頁中的freelist資料初始化db的freelist字段。

  func Open(path string, mode os.FileMode, options Options) (DB, error) {

  var db = &DB{opened: true}

  // Set default options if no options are provided.

  // 初始化一些配置項,此處省略

  // Open data file and separate sync handler for metadata writes.

  db.path = path

  var err error

  // 打開db檔案

  if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil {

  _ = db.close()

  // Lock file so that other processes using Bolt in read-write mode cannot

  // use the database at the same time. This would cause corruption since

  // the two processes would write meta pages and free pages separately.

  // The database file is locked exclusively (only one process can grab the lock)

  // if !options.ReadOnly.

  // The database file is locked using the shared lock (more than one process may

  // hold a lock at the same time) otherwise (options.ReadOnly is set).

  // 隻讀加共享鎖、否則加互斥鎖

  if err := flock(db, mode, !db.readOnly, options.Timeout); err != nil {

  // Default values for test hooks

  db.ops.writeAt = db.file.WriteAt

  // Initialize the database if it doesn't exist.

  if info, err := db.file.Stat(); err != nil {

  } else if info.Size() == 0 {

  // Initialize new files with meta pages.

  // 初始化新db檔案

  if err := db.init(); err != nil {

  // 不是新檔案,讀取第一頁中繼資料

  // Read the first meta page to determine the page size.

  // 2^12,正好是4k

  var buf [0x1000]byte

  if _, err := db.file.ReadAt(buf[:], 0); err == nil {

  // 僅僅是讀取了pageSize

  m := db.pageInBuffer(buf[:], 0).meta()

  if err := m.validate(); err != nil {

  // If we can't read the page size, we can assume it's the same

  // as the OS -- since that's how the page size was chosen in the

  // first place.

  // If the first page is invalid and this OS uses a different

  // page size than what the database was created with then we

  // are out of luck and cannot access the database.

  db.pageSize = os.Getpagesize()

  db.pageSize = int(m.pageSize)

  // Initialize page pool.

  db.pagePool = sync.Pool{

  New: func() interface{} {

  // 4k

  return make([]byte, db.pageSize)

  },

  // Memory map the data file.

  // mmap映射db檔案資料到記憶體

  if err := db.mmap(options.InitialMmapSize); err != nil {

  // Read in the freelist.

  db.freelist = newFreelist()

  // db.meta().freelist=2

  // 讀第二頁的資料

  // 然後建立起freelist中

  db.freelist.read(db.page(db.meta().freelist))

  // Mark the database as opened and return.

  return db, nil

  // init creates a new database file and initializes its meta pages.

  func (db *DB) init() error {

  // Set the page size to the OS page size.

  // Create two meta pages on a buffer.

  buf := make([]byte, db.pageSize*4)

  for i := 0; i < 2; i++ {

  p := db.pageInBuffer(buf[:], pgid(i))

  p.id = pgid(i)

  // 第0頁和第1頁存放中繼資料

  p.flags = metaPageFlag

  // Initialize the meta page.

  m := p.meta()

  m.magic = magic

  m.version = version

  m.pageSize = uint32(db.pageSize)

  m.freelist = 2

  m.root = bucket{root: 3}

  m.pgid = 4

  m.txid = txid(i)

  m.checksum = m.sum64()

  // Write an empty freelist at page 3.

  // 拿到第2頁存放freelist

  p := db.pageInBuffer(buf[:], pgid(2))

  p.id = pgid(2)

  p.flags = freelistPageFlag

  p.count = 0

  // 第三塊存放葉子page

  // Write an empty leaf page at page 4.

  p = db.pageInBuffer(buf[:], pgid(3))

  p.id = pgid(3)

  p.flags = leafPageFlag

  // Write the buffer to our data file.

  // 寫入4頁的資料

  if _, err := db.ops.writeAt(buf, 0); err != nil {

  // 刷盤

  if err := fdatasync(db); err != nil {

  // page retrieves a page reference from the mmap based on the current page size.

  func (db DB) page(id pgid) page {

  pos := id * pgid(db.pageSize)

  return (*page)(unsafe.Pointer(&db.data[pos]))

  // pageInBuffer retrieves a page reference from a given byte array based on the current page size.

  func (db DB) pageInBuffer(b []byte, id pgid) page {

  return (page)(unsafe.Pointer(&b[idpgid(db.pageSize)]))

  // mmap opens the underlying memory-mapped file and initializes the meta references.

  // minsz is the minimum size that the new mmap can be.

  func (db *DB) mmap(minsz int) error {

  // 此處主要完成mmap。内容省略,感興趣的同學可以查閱線上書籍對應部分詳細閱讀。

  5.4 db.View()實作分析

  View()主要用來執行隻讀事務。事務的開啟、送出、復原都交由tx控制。

  func (db DB) View(fn func(Tx) error) error {

  t, err := db.Begin(false)

  // Make sure the transaction rolls back in the event of a panic.

  defer func() {

  if t.db != nil {

  t.rollback()

  }()

  // Mark as a managed tx so that the inner function cannot manually rollback.

  t.managed = true

  // If an error is returned from the function then pass it through.

  err = fn(t)

  t.managed = false

  _ = t.Rollback()

  if err := t.Rollback(); err != nil {

  5.5 db.Update()實作分析

  Update()主要用來執行讀寫事務。事務的開始、送出、復原都交由tx内部控制

  func (db DB) Update(fn func(Tx) error) error {

  t, err := db.Begin(true)

  // Mark as a managed tx so that the inner function cannot manually commit.

  // If an error is returned from the function then rollback and return error.

  return tmit()

  5.6 db.Batch()實作分析

  現在對Batch()方法稍作分析,在DB定義的那一節中我們可以看到,一個DB對象擁有一個batch對象,該對象是全局的。當我們使用Batch()方法時,内部會對将傳遞進去的fn緩存在calls中。 其内部也是調用了Update,隻不過是在Update内部周遊之前緩存的calls。

  有兩種情況會觸發調用Update:

  第一種情況是到達了MaxBatchDelay時間,就會觸發Update第二種情況是len(db.batch.calls) >=db.MaxBatchSize,即緩存的calls個數大于等于MaxBatchSize時,也會觸發Update

  Batch的本質是: 将每次寫、每次刷盤的操作轉變成了多次寫、一次刷盤,進而提升性能。

  關于Batch的代碼實作分析内容有點長,此處就不貼了,大家可以點選此處或者點選此處進行閱讀。

  5.7 總結

  本章我們主要介紹了boltdb中最上層的DB對象的知識。首先介紹了DB的定義,然後介紹了下建立DB的Open()以及DB對外暴露的一些接口,這些接口基本上是平常使用最頻繁的api。 在介紹了幾個接口後,然後逐一對其内部的源碼實作進行了分析。其實有了前幾節的知識後,再來看這些接口的實作,相對比較簡單。因為他們無非就是對之前的Tx、Bucket、node做的 一些封裝。底層還是調用的之前介紹的那些方法。到此我們所有和bolt相關的源碼分析就告一段落了。

  在第6章也給大家提供了一些其他技術大牛寫的源碼分析的文章,大家有興趣可以進一步閱讀和學習。

  6. 參考資料閱讀 boltDB 源碼後的小結給boltdb源碼添加注釋倉庫boltdb官方倉庫分析boltdb源碼的微信公衆号文章集合7.結尾

  在boltdb中它還自帶了一個指令行工具主要功能用來檢視boltdb中所有的頁以及不同頁上的資料資訊,以及做性能測試等,後續抽時間也會将該工具支援的功能補充到該文章中。

  在沒有做這件事情之前,總感覺對架構或者元件的源碼分析,基本上停留在給代碼加一些注釋、畫圖梳理的層面。當真正自己動手從頭到尾來寫時,才發現中間有太多太多的細節,需要重新了解和把握。總體來說,這算是一次不錯的體驗和收獲了。

  在最後,本文基本上都是按照個人的了解和閱讀源碼基礎上完成的。文章中難免有錯誤和了解有誤的地方,大家看的時候發現問題,可以及時回報給我,同時歡迎大家一起交流學習。