結論:
- 從最後一個表格來看,ssd隻對batch_read和batch-write操作有優勢,而且在多協程的情況下,這個優勢也丢失了。
- 從第二和第三個表格來看,badger的write操作比rocksDB慢了一個數量級,而batch_write操作badger又非常快。是以如果你用的是go語言,如果write不是你的主要操作,推薦用badger。
參數不同、測試方法不同都會導緻結論不同,以下是我的測試代碼。
storage.go
package storage
import (
"fmt"
)
var storageOpenFunction = map[string]func(path string) (Storage, error){
"badger": OpenBadger,
"rocksdb": OpenRocksdb,
}
type Storage interface {
Set(k, v []byte, expireAt int64) error
BatchSet(keys, values [][]byte, expireAts []int64) error
Get(k []byte) ([]byte, error)
BatchGet(keys [][]byte) ([][]byte, error)
Delete(k []byte) error
BatchDelete(keys [][]byte) error
Has(k []byte) bool
IterDB(fn func(k, v []byte) error) int64
IterKey(fn func(k []byte) error) int64
Close() error
}
func OpenStorage(storeName string, path string) (Storage, error) {
if fc, exists := storageOpenFunction[storeName]; exists {
return fc(path)
} else {
return nil, fmt.Errorf("unsupported storage engine: %v", storeName)
}
}
rocks.go
package storage
import (
"github.com/tecbot/gorocksdb"
"os"
"path"
"sync/atomic"
)
var (
rocksOptions = gorocksdb.NewDefaultOptions()
readOptions = gorocksdb.NewDefaultReadOptions()
writeOptions = gorocksdb.NewDefaultWriteOptions()
)
type Rocksdb struct {
db *gorocksdb.DB
}
func OpenRocksdb(dbPath string) (Storage, error) {
if err := os.MkdirAll(path.Dir(dbPath), os.ModePerm); err != nil { //如果dbPath對應的檔案夾已存在則什麼都不做,如果dbPath對應的檔案已存在則傳回錯誤
return nil, err
}
rocksOptions.SetCreateIfMissing(true)
rocksOptions.SetCompression(gorocksdb.NoCompression)
rocksOptions.SetWriteBufferSize(1000000)
db, err := gorocksdb.OpenDb(rocksOptions, dbPath)
if err != nil {
panic(err)
}
return &Rocksdb{db: db}, err
}
func (s *Rocksdb) Set(k, v []byte, expireAt int64) error {
return s.db.Put(writeOptions, k, v)
}
func (s *Rocksdb) BatchSet(keys, values [][]byte, expireAts []int64) error {
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
for i, key := range keys {
value := values[i]
wb.Put(key, value)
}
s.db.Write(writeOptions, wb)
return nil
}
func (s *Rocksdb) Get(k []byte) ([]byte, error) {
return s.db.GetBytes(readOptions, k)
}
func (s *Rocksdb) BatchGet(keys [][]byte) ([][]byte, error) {
var slices gorocksdb.Slices
var err error
slices, err = s.db.MultiGet(readOptions, keys...)
if err == nil {
values := make([][]byte, 0, len(slices))
for _, slice := range slices {
values = append(values, slice.Data())
}
return values, nil
}
return nil, err
}
func (s *Rocksdb) Delete(k []byte) error {
return s.db.Delete(writeOptions, k)
}
func (s *Rocksdb) BatchDelete(keys [][]byte) error {
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
for _, key := range keys {
wb.Delete(key)
}
s.db.Write(writeOptions, wb)
return nil
}
//Has
func (s *Rocksdb) Has(k []byte) bool {
values, err := s.db.GetBytes(readOptions, k)
if err == nil && len(values) > 0 {
return true
}
return false
}
func (s *Rocksdb) IterDB(fn func(k, v []byte) error) int64 {
var total int64
iter := s.db.NewIterator(readOptions)
defer iter.Close()
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
//k := make([]byte, 4)
//copy(k, iter.Key().Data())
//value := iter.Value().Data()
//v := make([]byte, len(value))
//copy(v, value)
//fn(k, v)
if err := fn(iter.Key().Data(), iter.Value().Data()); err == nil {
atomic.AddInt64(&total, 1)
}
}
return atomic.LoadInt64(&total)
}
func (s *Rocksdb) IterKey(fn func(k []byte) error) int64 {
var total int64
iter := s.db.NewIterator(readOptions)
defer iter.Close()
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
//k := make([]byte, 4)
//copy(k, iter.Key().Data())
//fn(k)
if err := fn(iter.Key().Data()); err == nil {
atomic.AddInt64(&total, 1)
}
}
return atomic.LoadInt64(&total)
}
func (s *Rocksdb) Close() error {
s.db.Close()
return nil
}
badger.go
package storage
import (
"github.com/dgraph-io/badger"
"os"
"path"
"time"
"github.com/pkg/errors"
"fmt"
"sync/atomic"
"github.com/dgraph-io/badger/options"
)
type Badger struct {
db *badger.DB
}
var badgerOptions = badger.Options{
DoNotCompact: false, //LSM tree最主要的性能消耗在于 compaction 過程:多個檔案需要讀進記憶體,排序,然後再寫回磁盤
LevelOneSize: 64 << 20, //第一層大小
LevelSizeMultiplier: 10, //下一層是上一層的多少倍
MaxLevels: 7, //LSM tree最多幾層
//key存在記憶體中,values(實際上value指針)存在磁盤中--稱為vlog file
TableLoadingMode: options.MemoryMap, //LSM tree完全載入記憶體
ValueLogLoadingMode: options.FileIO, //使用FileIO而非MemoryMap可以節省大量記憶體
MaxTableSize: 4 << 20, //4M
NumCompactors: 8, //compaction線程數
NumLevelZeroTables: 4,
NumLevelZeroTablesStall: 10,
NumMemtables: 4, //寫操作立即反應在MemTable上,當MemTable達到一定的大小時,它被重新整理到磁盤,作為一個不可變的SSTable
SyncWrites: false, //異步寫磁盤。即實時地去寫記憶體中的LSM tree,當資料量達到MaxTableSize時,才對資料進行compaction然後寫入磁盤。當調用Close時也會把記憶體中的資料flush到磁盤
NumVersionsToKeep: 1,
ValueLogFileSize: 64 << 20, //機關:位元組。vlog檔案超過這麼大時就分裂檔案。64M
ValueLogMaxEntries: 100000,
ValueThreshold: 32,
Truncate: false,
}
//var badgerOptions = badger.DefaultOptions
func OpenBadger(dbPath string) (Storage, error) {
if err := os.MkdirAll(path.Dir(dbPath), os.ModePerm); err != nil { //如果dbPath對應的檔案夾已存在則什麼都不做,如果dbPath對應的檔案已存在則傳回錯誤
return nil, err
}
badgerOptions.Dir = dbPath
badgerOptions.ValueDir = dbPath
db, err := badger.Open(badgerOptions) //檔案隻能被一個程序使用,如果不調用Close則下次無法Open。手動釋放鎖的辦法:把LOCK檔案删掉
if err != nil {
panic(err)
}
return &Badger{db}, err
}
func (s *Badger) CheckAndGC() {
lsmSize1, vlogSize1 := s.db.Size()
for {
if err := s.db.RunValueLogGC(0.5); err == badger.ErrNoRewrite || err == badger.ErrRejected {
break
}
}
lsmSize2, vlogSize2 := s.db.Size()
if vlogSize2 < vlogSize1 {
fmt.Printf("badger before GC, LSM %d, vlog %d. after GC, LSM %d, vlog %d\n", lsmSize1, vlogSize1, lsmSize2, vlogSize2)
} else {
fmt.Println("collect zero garbage")
}
}
//Set 為單個寫操作開一個事務
func (s *Badger) Set(k, v []byte, expireAt int64) error {
err := s.db.Update(func(txn *badger.Txn) error { //db.Update相當于打開了一個讀寫事務:db.NewTransaction(true)。用db.Update的好處在于不用顯式調用Txn.Commit()了
duration := time.Duration(expireAt-time.Now().Unix()) * time.Second
return txn.SetWithTTL(k, v, duration) //duration是能存活的時長
})
return err
}
//BatchSet 多個寫操作使用一個事務
func (s *Badger) BatchSet(keys, values [][]byte, expireAts []int64) error {
if len(keys) != len(values) {
return errors.New("key value not the same length")
}
var err error
txn := s.db.NewTransaction(true)
for i, key := range keys {
value := values[i]
duration := time.Duration(expireAts[i]-time.Now().Unix()) * time.Second
//fmt.Println("duration",duration)
if err = txn.SetWithTTL(key, value, duration); err != nil {
_ = txn.Commit(nil) //發生異常時就送出老事務,然後開一個新事務,重試set
txn = s.db.NewTransaction(true)
_ = txn.SetWithTTL(key, value, duration)
}
}
txn.Commit(nil)
return err
}
//Get 如果key不存在會傳回error:Key not found
func (s *Badger) Get(k []byte) ([]byte, error) {
var ival []byte
err := s.db.View(func(txn *badger.Txn) error { //db.View相當于打開了一個讀寫事務:db.NewTransaction(true)。用db.Update的好處在于不用顯式調用Txn.Discard()了
item, err := txn.Get(k)
if err != nil {
return err
}
//buffer := make([]byte, badgerOptions.ValueLogMaxEntries)
//ival, err = item.ValueCopy(buffer) //item隻能在事務内部使用,如果要在事務外部使用需要通過ValueCopy
ival, err = item.Value()
return err
})
return ival, err
}
//BatchGet 傳回的values與傳入的keys順序保持一緻。如果key不存在或讀取失敗則對應的value是空數組
func (s *Badger) BatchGet(keys [][]byte) ([][]byte, error) {
var err error
txn := s.db.NewTransaction(false) //隻讀事務
values := make([][]byte, len(keys))
for i, key := range keys {
var item *badger.Item
item, err = txn.Get(key)
if err == nil {
//buffer := make([]byte, badgerOptions.ValueLogMaxEntries)
var ival []byte
//ival, err = item.ValueCopy(buffer)
ival, err = item.Value()
if err == nil {
values[i] = ival
} else { //拷貝失敗
values[i] = []byte{} //拷貝失敗就把value設為空數組
}
} else { //讀取失敗
values[i] = []byte{} //讀取失敗就把value設為空數組
if err != badger.ErrKeyNotFound { //如果真的發生異常,則開一個新事務繼續讀後面的key
txn.Discard()
txn = s.db.NewTransaction(false)
}
}
}
txn.Discard() //隻讀事務調Discard就可以了,不需要調Commit。Commit内部也會調Discard
return values, err
}
//Delete
func (s *Badger) Delete(k []byte) error {
err := s.db.Update(func(txn *badger.Txn) error {
return txn.Delete(k)
})
return err
}
//BatchDelete
func (s *Badger) BatchDelete(keys [][]byte) error {
var err error
txn := s.db.NewTransaction(true)
for _, key := range keys {
if err = txn.Delete(key); err != nil {
_ = txn.Commit(nil) //發生異常時就送出老事務,然後開一個新事務,重試delete
txn = s.db.NewTransaction(true)
_ = txn.Delete(key)
}
}
txn.Commit(nil)
return err
}
//Has 判斷某個key是否存在
func (s *Badger) Has(k []byte) bool {
var exists bool = false
s.db.View(func(txn *badger.Txn) error { //db.View相當于打開了一個讀寫事務:db.NewTransaction(true)。用db.Update的好處在于不用顯式調用Txn.Discard()了
_, err := txn.Get(k)
if err != nil {
return err
} else {
exists = true //沒有任何異常發生,則認為k存在。如果k不存在會發生ErrKeyNotFound
}
return err
})
return exists
}
//IterDB 周遊整個DB
func (s *Badger) IterDB(fn func(k, v []byte) error) int64 {
var total int64
s.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.Key()
val, err := item.Value()
if err != nil {
continue
}
if err := fn(key, val); err == nil {
atomic.AddInt64(&total, 1)
}
}
return nil
})
return atomic.LoadInt64(&total)
}
//IterKey 隻周遊key。key是全部存在LSM tree上的,隻需要讀記憶體,是以很快
func (s *Badger) IterKey(fn func(k []byte) error) int64 {
var total int64
s.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false //隻需要讀key,是以把PrefetchValues設為false
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
k := item.Key()
if err := fn(k); err == nil {
atomic.AddInt64(&total, 1)
}
}
return nil
})
return atomic.LoadInt64(&total)
}
func (s *Badger) Size() (int64, int64) {
return s.db.Size()
}
//Close 把記憶體中的資料flush到磁盤,同時釋放檔案鎖
func (s *Badger) Close() error {
return s.db.Close()
}
compare.go
package main
import (
"crypto/md5"
"encoding/hex"
"math/rand"
"pkg/radic/storage"
"time"
"fmt"
"sync"
"sync/atomic"
)
const (
KEY_LEN = 30
VALUE_LEN = 1000
)
func checksum(data []byte) string {
checksum := md5.Sum(data)
return hex.EncodeToString(checksum[:])
}
func Bytes(n int) []byte {
d := make([]byte, n)
rand.Read(d)
return d
}
type src struct {
Data []byte
Checksum string
}
func prepareData(n int) src {
data := Bytes(n)
checksum := md5.Sum(data)
return src{Data: data, Checksum: hex.EncodeToString(checksum[:])}
}
func TestWriteAndGet(db storage.Storage, parallel int) {
var writeTime int64
var readTime int64
var writeCount int64
var readCount int64
wg := sync.WaitGroup{}
wg.Add(parallel)
for r := 0; r < parallel; r++ {
go func() {
defer wg.Done()
EXPIRE_AT := time.Now().Add(100 * time.Minute).Unix()
keys := [][]byte{}
values := [][]byte{}
validations := []string{}
const loop = 100000
for i := 0; i < loop; i++ {
key := prepareData(KEY_LEN).Data
keys = append(keys, key)
value := prepareData(VALUE_LEN)
values = append(values, value.Data)
validations = append(validations, value.Checksum)
}
begin := time.Now()
for i, key := range keys {
value := values[i]
db.Set(key, value, EXPIRE_AT)
}
atomic.AddInt64(&writeTime, time.Since(begin).Nanoseconds())
atomic.AddInt64(&writeCount, int64(len(keys)))
begin = time.Now()
for _, key := range keys {
db.Get(key)
}
atomic.AddInt64(&readTime, time.Since(begin).Nanoseconds())
atomic.AddInt64(&readCount, int64(len(keys)))
}()
}
wg.Wait()
fmt.Printf("write %d op/ns, read %d op/ns\n", atomic.LoadInt64(&writeTime)/atomic.LoadInt64(&writeCount), atomic.LoadInt64(&readTime)/atomic.LoadInt64(&readCount))
}
func TestBatchWriteAndGet(db storage.Storage, parallel int) {
var writeTime int64
var readTime int64
var writeCount int64
var readCount int64
loop := 100
wg := sync.WaitGroup{}
wg.Add(parallel)
for r := 0; r < parallel; r++ {
go func() {
defer wg.Done()
for i := 0; i < loop; i++ {
EXPIRE_AT := time.Now().Add(100 * time.Minute).Unix()
keys := [][]byte{}
values := [][]byte{}
expire_ats := []int64{}
for j := 0; j < 1000; j++ {
key := prepareData(KEY_LEN).Data
keys = append(keys, key)
value := prepareData(VALUE_LEN).Data
values = append(values, value)
expire_ats = append(expire_ats, EXPIRE_AT)
}
begin := time.Now()
db.BatchSet(keys, values, expire_ats)
atomic.AddInt64(&writeTime, time.Since(begin).Nanoseconds())
atomic.AddInt64(&writeCount, 1)
begin = time.Now()
db.BatchGet(keys)
atomic.AddInt64(&readTime, time.Since(begin).Nanoseconds())
atomic.AddInt64(&readCount, 1)
}
}()
}
wg.Wait()
fmt.Printf("batch write %d op/ns, batch read %d op/ns\n", atomic.LoadInt64(&writeTime)/atomic.LoadInt64(&writeCount), atomic.LoadInt64(&readTime)/atomic.LoadInt64(&readCount))
}
func main() {
badger, _ := storage.OpenStorage("badger", "badgerdb")
rocks, _ := storage.OpenStorage("rocksdb", "rocksdb")
TestWriteAndGet(badger, 1)
TestWriteAndGet(rocks, 1)
TestBatchWriteAndGet(badger, 1)
TestBatchWriteAndGet(rocks, 1)
fmt.Println("parallel test")
TestWriteAndGet(badger, 10)
TestWriteAndGet(rocks, 10)
TestBatchWriteAndGet(badger, 10)
TestBatchWriteAndGet(rocks, 10)
fmt.Println("please watch the memory")
fmt.Println("rocksdb......")
rocks.IterDB(func(k, v []byte) error {
return nil
})
fmt.Println("badger......")
badger.IterDB(func(k, v []byte) error {
return nil
})
}