天天看點

badger和rocksDB性能對比

badger和rocksDB性能對比

結論:

  1. 從最後一個表格來看,ssd隻對batch_read和batch-write操作有優勢,而且在多協程的情況下,這個優勢也丢失了。
  2. 從第二和第三個表格來看,badger的write操作比rocksDB慢了一個數量級,而batch_write操作badger又非常快。是以如果你用的是go語言,如果write不是你的主要操作,推薦用badger。

  參數不同、測試方法不同都會導緻結論不同,以下是我的測試代碼。

storage.go  

package storage

import (
	"fmt"
)

var storageOpenFunction = map[string]func(path string) (Storage, error){
	"badger":  OpenBadger,
	"rocksdb": OpenRocksdb,
}

type Storage interface {
	Set(k, v []byte, expireAt int64) error
	BatchSet(keys, values [][]byte, expireAts []int64) error
	Get(k []byte) ([]byte, error)
	BatchGet(keys [][]byte) ([][]byte, error)
	Delete(k []byte) error
	BatchDelete(keys [][]byte) error
	Has(k []byte) bool
	IterDB(fn func(k, v []byte) error) int64
	IterKey(fn func(k []byte) error) int64
	Close() error
}

func OpenStorage(storeName string, path string) (Storage, error) {
	if fc, exists := storageOpenFunction[storeName]; exists {
		return fc(path)
	} else {
		return nil, fmt.Errorf("unsupported storage engine: %v", storeName)
	}
}
      

rocks.go

package storage

import (
	"github.com/tecbot/gorocksdb"
	"os"
	"path"
	"sync/atomic"
)

var (
	rocksOptions = gorocksdb.NewDefaultOptions()
	readOptions  = gorocksdb.NewDefaultReadOptions()
	writeOptions = gorocksdb.NewDefaultWriteOptions()
)

type Rocksdb struct {
	db *gorocksdb.DB
}

func OpenRocksdb(dbPath string) (Storage, error) {
	if err := os.MkdirAll(path.Dir(dbPath), os.ModePerm); err != nil { //如果dbPath對應的檔案夾已存在則什麼都不做,如果dbPath對應的檔案已存在則傳回錯誤
		return nil, err
	}

	rocksOptions.SetCreateIfMissing(true)
	rocksOptions.SetCompression(gorocksdb.NoCompression)
	rocksOptions.SetWriteBufferSize(1000000)

	db, err := gorocksdb.OpenDb(rocksOptions, dbPath)
	if err != nil {
		panic(err)
	}

	return &Rocksdb{db: db}, err
}

func (s *Rocksdb) Set(k, v []byte, expireAt int64) error {
	return s.db.Put(writeOptions, k, v)
}

func (s *Rocksdb) BatchSet(keys, values [][]byte, expireAts []int64) error {
	wb := gorocksdb.NewWriteBatch()
	defer wb.Destroy()
	for i, key := range keys {
		value := values[i]
		wb.Put(key, value)
	}
	s.db.Write(writeOptions, wb)
	return nil
}

func (s *Rocksdb) Get(k []byte) ([]byte, error) {
	return s.db.GetBytes(readOptions, k)
}

func (s *Rocksdb) BatchGet(keys [][]byte) ([][]byte, error) {
	var slices gorocksdb.Slices
	var err error
	slices, err = s.db.MultiGet(readOptions, keys...)
	if err == nil {
		values := make([][]byte, 0, len(slices))
		for _, slice := range slices {
			values = append(values, slice.Data())
		}
		return values, nil
	}
	return nil, err
}

func (s *Rocksdb) Delete(k []byte) error {
	return s.db.Delete(writeOptions, k)
}

func (s *Rocksdb) BatchDelete(keys [][]byte) error {
	wb := gorocksdb.NewWriteBatch()
	defer wb.Destroy()
	for _, key := range keys {
		wb.Delete(key)
	}
	s.db.Write(writeOptions, wb)
	return nil
}

//Has
func (s *Rocksdb) Has(k []byte) bool {
	values, err := s.db.GetBytes(readOptions, k)
	if err == nil && len(values) > 0 {
		return true
	}
	return false
}

func (s *Rocksdb) IterDB(fn func(k, v []byte) error) int64 {
	var total int64
	iter := s.db.NewIterator(readOptions)
	defer iter.Close()
	for iter.SeekToFirst(); iter.Valid(); iter.Next() {
		//k := make([]byte, 4)
		//copy(k, iter.Key().Data())
		//value := iter.Value().Data()
		//v := make([]byte, len(value))
		//copy(v, value)
		//fn(k, v)
		if err := fn(iter.Key().Data(), iter.Value().Data()); err == nil {
			atomic.AddInt64(&total, 1)
		}
	}
	return atomic.LoadInt64(&total)
}

func (s *Rocksdb) IterKey(fn func(k []byte) error) int64 {
	var total int64
	iter := s.db.NewIterator(readOptions)
	defer iter.Close()
	for iter.SeekToFirst(); iter.Valid(); iter.Next() {
		//k := make([]byte, 4)
		//copy(k, iter.Key().Data())
		//fn(k)
		if err := fn(iter.Key().Data()); err == nil {
			atomic.AddInt64(&total, 1)
		}
	}
	return atomic.LoadInt64(&total)
}

func (s *Rocksdb) Close() error {
	s.db.Close()
	return nil
}
      

badger.go

package storage

import (
	"github.com/dgraph-io/badger"
	"os"
	"path"
	"time"
	"github.com/pkg/errors"
	"fmt"
	"sync/atomic"
	"github.com/dgraph-io/badger/options"
)

type Badger struct {
	db *badger.DB
}

var badgerOptions = badger.Options{
	DoNotCompact:        false,    //LSM tree最主要的性能消耗在于 compaction 過程:多個檔案需要讀進記憶體,排序,然後再寫回磁盤
	LevelOneSize:        64 << 20, //第一層大小
	LevelSizeMultiplier: 10,       //下一層是上一層的多少倍
	MaxLevels:           7,        //LSM tree最多幾層
	//key存在記憶體中,values(實際上value指針)存在磁盤中--稱為vlog file
	TableLoadingMode:        options.MemoryMap, //LSM tree完全載入記憶體
	ValueLogLoadingMode:     options.FileIO,    //使用FileIO而非MemoryMap可以節省大量記憶體
	MaxTableSize:            4 << 20,           //4M
	NumCompactors:           8,                 //compaction線程數
	NumLevelZeroTables:      4,
	NumLevelZeroTablesStall: 10,
	NumMemtables:            4,     //寫操作立即反應在MemTable上,當MemTable達到一定的大小時,它被重新整理到磁盤,作為一個不可變的SSTable
	SyncWrites:              false, //異步寫磁盤。即實時地去寫記憶體中的LSM tree,當資料量達到MaxTableSize時,才對資料進行compaction然後寫入磁盤。當調用Close時也會把記憶體中的資料flush到磁盤
	NumVersionsToKeep:       1,
	ValueLogFileSize:        64 << 20, //機關:位元組。vlog檔案超過這麼大時就分裂檔案。64M
	ValueLogMaxEntries:      100000,
	ValueThreshold:          32,
	Truncate:                false,
}

//var badgerOptions = badger.DefaultOptions

func OpenBadger(dbPath string) (Storage, error) {
	if err := os.MkdirAll(path.Dir(dbPath), os.ModePerm); err != nil { //如果dbPath對應的檔案夾已存在則什麼都不做,如果dbPath對應的檔案已存在則傳回錯誤
		return nil, err
	}

	badgerOptions.Dir = dbPath
	badgerOptions.ValueDir = dbPath
	db, err := badger.Open(badgerOptions) //檔案隻能被一個程序使用,如果不調用Close則下次無法Open。手動釋放鎖的辦法:把LOCK檔案删掉
	if err != nil {
		panic(err)
	}

	return &Badger{db}, err
}

func (s *Badger) CheckAndGC() {
	lsmSize1, vlogSize1 := s.db.Size()
	for {
		if err := s.db.RunValueLogGC(0.5); err == badger.ErrNoRewrite || err == badger.ErrRejected {
			break
		}
	}
	lsmSize2, vlogSize2 := s.db.Size()
	if vlogSize2 < vlogSize1 {
		fmt.Printf("badger before GC, LSM %d, vlog %d. after GC, LSM %d, vlog %d\n", lsmSize1, vlogSize1, lsmSize2, vlogSize2)
	} else {
		fmt.Println("collect zero garbage")
	}
}

//Set 為單個寫操作開一個事務
func (s *Badger) Set(k, v []byte, expireAt int64) error {
	err := s.db.Update(func(txn *badger.Txn) error { //db.Update相當于打開了一個讀寫事務:db.NewTransaction(true)。用db.Update的好處在于不用顯式調用Txn.Commit()了
		duration := time.Duration(expireAt-time.Now().Unix()) * time.Second
		return txn.SetWithTTL(k, v, duration) //duration是能存活的時長
	})
	return err
}

//BatchSet 多個寫操作使用一個事務
func (s *Badger) BatchSet(keys, values [][]byte, expireAts []int64) error {
	if len(keys) != len(values) {
		return errors.New("key value not the same length")
	}
	var err error
	txn := s.db.NewTransaction(true)
	for i, key := range keys {
		value := values[i]
		duration := time.Duration(expireAts[i]-time.Now().Unix()) * time.Second
		//fmt.Println("duration",duration)
		if err = txn.SetWithTTL(key, value, duration); err != nil {
			_ = txn.Commit(nil) //發生異常時就送出老事務,然後開一個新事務,重試set
			txn = s.db.NewTransaction(true)
			_ = txn.SetWithTTL(key, value, duration)
		}
	}
	txn.Commit(nil)
	return err
}

//Get 如果key不存在會傳回error:Key not found
func (s *Badger) Get(k []byte) ([]byte, error) {
	var ival []byte
	err := s.db.View(func(txn *badger.Txn) error { //db.View相當于打開了一個讀寫事務:db.NewTransaction(true)。用db.Update的好處在于不用顯式調用Txn.Discard()了
		item, err := txn.Get(k)
		if err != nil {
			return err
		}
		//buffer := make([]byte, badgerOptions.ValueLogMaxEntries)
		//ival, err = item.ValueCopy(buffer) //item隻能在事務内部使用,如果要在事務外部使用需要通過ValueCopy
		ival, err = item.Value()
		return err
	})
	return ival, err
}

//BatchGet 傳回的values與傳入的keys順序保持一緻。如果key不存在或讀取失敗則對應的value是空數組
func (s *Badger) BatchGet(keys [][]byte) ([][]byte, error) {
	var err error
	txn := s.db.NewTransaction(false) //隻讀事務
	values := make([][]byte, len(keys))
	for i, key := range keys {
		var item *badger.Item
		item, err = txn.Get(key)
		if err == nil {
			//buffer := make([]byte, badgerOptions.ValueLogMaxEntries)
			var ival []byte
			//ival, err = item.ValueCopy(buffer)
			ival, err = item.Value()
			if err == nil {
				values[i] = ival
			} else { //拷貝失敗
				values[i] = []byte{} //拷貝失敗就把value設為空數組
			}
		} else { //讀取失敗
			values[i] = []byte{} //讀取失敗就把value設為空數組
			if err != badger.ErrKeyNotFound { //如果真的發生異常,則開一個新事務繼續讀後面的key
				txn.Discard()
				txn = s.db.NewTransaction(false)
			}
		}
	}
	txn.Discard() //隻讀事務調Discard就可以了,不需要調Commit。Commit内部也會調Discard
	return values, err
}

//Delete
func (s *Badger) Delete(k []byte) error {
	err := s.db.Update(func(txn *badger.Txn) error {
		return txn.Delete(k)
	})
	return err
}

//BatchDelete
func (s *Badger) BatchDelete(keys [][]byte) error {
	var err error
	txn := s.db.NewTransaction(true)
	for _, key := range keys {
		if err = txn.Delete(key); err != nil {
			_ = txn.Commit(nil) //發生異常時就送出老事務,然後開一個新事務,重試delete
			txn = s.db.NewTransaction(true)
			_ = txn.Delete(key)
		}
	}
	txn.Commit(nil)
	return err
}

//Has 判斷某個key是否存在
func (s *Badger) Has(k []byte) bool {
	var exists bool = false
	s.db.View(func(txn *badger.Txn) error { //db.View相當于打開了一個讀寫事務:db.NewTransaction(true)。用db.Update的好處在于不用顯式調用Txn.Discard()了
		_, err := txn.Get(k)
		if err != nil {
			return err
		} else {
			exists = true //沒有任何異常發生,則認為k存在。如果k不存在會發生ErrKeyNotFound
		}
		return err
	})
	return exists
}

//IterDB 周遊整個DB
func (s *Badger) IterDB(fn func(k, v []byte) error) int64 {
	var total int64
	s.db.View(func(txn *badger.Txn) error {
		opts := badger.DefaultIteratorOptions
		it := txn.NewIterator(opts)
		defer it.Close()
		for it.Rewind(); it.Valid(); it.Next() {
			item := it.Item()
			key := item.Key()
			val, err := item.Value()
			if err != nil {
				continue
			}
			if err := fn(key, val); err == nil {
				atomic.AddInt64(&total, 1)
			}
		}
		return nil
	})
	return atomic.LoadInt64(&total)
}

//IterKey 隻周遊key。key是全部存在LSM tree上的,隻需要讀記憶體,是以很快
func (s *Badger) IterKey(fn func(k []byte) error) int64 {
	var total int64
	s.db.View(func(txn *badger.Txn) error {
		opts := badger.DefaultIteratorOptions
		opts.PrefetchValues = false //隻需要讀key,是以把PrefetchValues設為false
		it := txn.NewIterator(opts)
		defer it.Close()
		for it.Rewind(); it.Valid(); it.Next() {
			item := it.Item()
			k := item.Key()
			if err := fn(k); err == nil {
				atomic.AddInt64(&total, 1)
			}
		}
		return nil
	})
	return atomic.LoadInt64(&total)
}

func (s *Badger) Size() (int64, int64) {
	return s.db.Size()
}

//Close 把記憶體中的資料flush到磁盤,同時釋放檔案鎖
func (s *Badger) Close() error {
	return s.db.Close()
}
      

compare.go

package main

import (
	"crypto/md5"
	"encoding/hex"
	"math/rand"
	"pkg/radic/storage"
	"time"
	"fmt"
	"sync"
	"sync/atomic"
)

const (
	KEY_LEN   = 30
	VALUE_LEN = 1000
)

func checksum(data []byte) string {
	checksum := md5.Sum(data)
	return hex.EncodeToString(checksum[:])
}

func Bytes(n int) []byte {
	d := make([]byte, n)
	rand.Read(d)

	return d
}

type src struct {
	Data     []byte
	Checksum string
}

func prepareData(n int) src {
	data := Bytes(n)
	checksum := md5.Sum(data)
	return src{Data: data, Checksum: hex.EncodeToString(checksum[:])}
}

func TestWriteAndGet(db storage.Storage, parallel int) {
	var writeTime int64
	var readTime int64
	var writeCount int64
	var readCount int64
	wg := sync.WaitGroup{}
	wg.Add(parallel)
	for r := 0; r < parallel; r++ {
		go func() {
			defer wg.Done()
			EXPIRE_AT := time.Now().Add(100 * time.Minute).Unix()
			keys := [][]byte{}
			values := [][]byte{}
			validations := []string{}
			const loop = 100000
			for i := 0; i < loop; i++ {
				key := prepareData(KEY_LEN).Data
				keys = append(keys, key)
				value := prepareData(VALUE_LEN)
				values = append(values, value.Data)
				validations = append(validations, value.Checksum)
			}
			begin := time.Now()
			for i, key := range keys {
				value := values[i]
				db.Set(key, value, EXPIRE_AT)
			}
			atomic.AddInt64(&writeTime, time.Since(begin).Nanoseconds())
			atomic.AddInt64(&writeCount, int64(len(keys)))

			begin = time.Now()
			for _, key := range keys {
				db.Get(key)
			}
			atomic.AddInt64(&readTime, time.Since(begin).Nanoseconds())
			atomic.AddInt64(&readCount, int64(len(keys)))
		}()
	}
	wg.Wait()

	fmt.Printf("write %d op/ns, read %d op/ns\n", atomic.LoadInt64(&writeTime)/atomic.LoadInt64(&writeCount), atomic.LoadInt64(&readTime)/atomic.LoadInt64(&readCount))
}

func TestBatchWriteAndGet(db storage.Storage, parallel int) {
	var writeTime int64
	var readTime int64
	var writeCount int64
	var readCount int64

	loop := 100
	wg := sync.WaitGroup{}
	wg.Add(parallel)
	for r := 0; r < parallel; r++ {
		go func() {
			defer wg.Done()
			for i := 0; i < loop; i++ {
				EXPIRE_AT := time.Now().Add(100 * time.Minute).Unix()
				keys := [][]byte{}
				values := [][]byte{}
				expire_ats := []int64{}
				for j := 0; j < 1000; j++ {
					key := prepareData(KEY_LEN).Data
					keys = append(keys, key)
					value := prepareData(VALUE_LEN).Data
					values = append(values, value)
					expire_ats = append(expire_ats, EXPIRE_AT)
				}
				begin := time.Now()
				db.BatchSet(keys, values, expire_ats)
				atomic.AddInt64(&writeTime, time.Since(begin).Nanoseconds())
				atomic.AddInt64(&writeCount, 1)

				begin = time.Now()
				db.BatchGet(keys)
				atomic.AddInt64(&readTime, time.Since(begin).Nanoseconds())
				atomic.AddInt64(&readCount, 1)
			}
		}()
	}
	wg.Wait()

	fmt.Printf("batch write %d op/ns, batch read %d op/ns\n", atomic.LoadInt64(&writeTime)/atomic.LoadInt64(&writeCount), atomic.LoadInt64(&readTime)/atomic.LoadInt64(&readCount))
}

func main() {
	badger, _ := storage.OpenStorage("badger", "badgerdb")
	rocks, _ := storage.OpenStorage("rocksdb", "rocksdb")
	TestWriteAndGet(badger, 1)
	TestWriteAndGet(rocks, 1)
	TestBatchWriteAndGet(badger, 1)
	TestBatchWriteAndGet(rocks, 1)
	fmt.Println("parallel test")
	TestWriteAndGet(badger, 10)
	TestWriteAndGet(rocks, 10)
	TestBatchWriteAndGet(badger, 10)
	TestBatchWriteAndGet(rocks, 10)
	fmt.Println("please watch the memory")
	fmt.Println("rocksdb......")
	rocks.IterDB(func(k, v []byte) error {
		return nil
	})
	fmt.Println("badger......")
	badger.IterDB(func(k, v []byte) error {
		return nil
	})
}