天天看點

Aerospike用例分析

       Aerospike是一個以分布式為核心,T級别大資料高并發的……,參見《Aerospike入坑導讀》 https://yq.aliyun.com/articles/622455?spm=a2c4e.11155435.0.0.36e43312Nq4iTX ,針對常用的一般性操作給出九個用例,代碼采用GO語言實作,JAVA、C\C++以及其他語言的實作都差不多,同樣具有參考意義,完整的代碼請從 https://gitee.com/gonglibin/codes/0hqjzto12argkmbsyv97n32 下載下傳。

func asInformation() {
	url := fmt.Sprintf("%s:%d", AS_HOST, AS_PORT)

	if con, err := as.NewConnection(url, time.Second * 10); err == nil {
		defer con.Close()

		if ifs, err := as.RequestInfo(con, ""); nil == err {
			for k, v := range ifs {
				/**
				 * features: peers;cdt-list;cdt-map;cluster-stable;pipelining;geo ...
				 * build: 4.2.0.5
				 * version: Aerospike Community Edition build 4.2.0.5
				 * build_os: el7
				 * statistics: cluster_size=1;cluster_key=E50AB9AFD60C;cluster_in ...
				 * services-alumni:
				 * services:
				 * partition-generation: 0
				 * node: BB9822D9A270008
				 * edition: Aerospike Community Edition
				 * build_time: Mon Jul 16 19:02:48 UTC 2018
				 */
				fmt.Printf("%s: %s\n", k, v)
			}
		}
	} else {
		panic(err.Error())
	}
}           

       用例一:asInformation擷取aerospike的基礎資訊,第64行調用RequestInfo函數,函數的第一個參數為aerospike的連接配接句柄,第二個參數為資訊名稱,空則傳回全部資訊,結果儲存在map容器中。

func asMapToRecord(cli *as.Client) {
	arr := []byte{0x11, 0x21, 0x31}
	lst := []interface{}{111, "STRING", arr}
	mmp := map[interface{}] interface{} {
		"key1": 1,
		"key2": "A",
		"key3": arr,
		"key4": lst,
	}

	mybin := "mybin1"
	bin := as.NewBin(mybin, mmp)
	cli.PutBins(AS_WPLC, AS_NKEY, bin)

	if rst, err := cli.Get(AS_PLCY, AS_NKEY, bin.Name); nil == err {
		val := rst.Bins[bin.Name].(map[interface{}] interface{})
		fmt.Printf("Intege: %d, String: %s, []Byte: %v, Map: %v\n", val["key1"], val["key2"], val["key3"], val["key4"])
	}
}           

       用例二:asMapToRecord以map資料結構作為bin存儲的資料類型,93、94、95行構造了一個map執行個體,且每個節點的資料類型都不一樣,分别為整形、字元串、字元數組和連結清單(其中連結清單中包含三個元素,分别為整形、字元串和字元數組),可見GO語言的map對資料類型的支援是多麼的肆意妄為。查詢的結果以鍵值對的方式傳回,指定key進而讀取value。

func asListToRecord(cli *as.Client) {
	mybin := "mybin2"
	bin := as.NewBin(mybin, []interface{}{111, "STRING_111", []byte{0x11, 0x21, 0x31}})

	cli.PutBins(AS_WPLC, AS_NKEY, bin)

	if rst, err := cli.Get(AS_PLCY, AS_NKEY, bin.Name); nil == err {
		val := rst.Bins[bin.Name].([]interface{})
		fmt.Printf("Intege: %d, String: %s, []Byte: %v\n", val[0], val[1], val[2])
	}
}           

       用例三:asListToRecord以連結清單資料結構作為bin存儲的資料類型,第119行構造了一個連結清單執行個體,并附加bin名稱定義了一個bin對象,連結清單結構共三個節點,分别存儲整形、字元串和字元數組。查詢的結果集以連結清單方式傳回,可以直接通過下标得到資料項。

func asBatchToRecord(cli *as.Client) {
	bn := "mybin3"
	bk := []string{"b_key1", "b_key2", "b_key3", "b_key4"}
	bv := []string{"b_val1", "b_val2", "b_val3", "b_val4"}

	// 寫入操作
	for i, k := range bk {
		bin := as.NewBin(bn, bv[i])
		key, _ := as.NewKey(AS_NMSP, AS_MYST, k)
		cli.PutBins(AS_WPLC, key, bin)
	}

	// 存在判斷
	ks := make([]*as.Key, len(bk) * 2)
	for i, k := range bk {
		ks[i], _ = as.NewKey(AS_NMSP, AS_MYST, k)
	}
	ks[4], _ = as.NewKey(AS_NMSP, AS_MYST, "b_key5")
	ks[5], _ = as.NewKey(AS_NMSP, AS_MYST, "b_key6")
	ks[6], _ = as.NewKey(AS_NMSP, AS_MYST, "b_key7")
	ks[7], _ = as.NewKey(AS_NMSP, AS_MYST, "b_key8")
	if ext, err := cli.BatchExists(nil, ks); nil == err {
		for i, e := range ext {
			fmt.Printf("ns = %s, set = %s, key = %s, exists = %t\n", ks[i].Namespace(), ks[i].SetName(), ks[i].Value(), e)
		}
	} else {
		panic(err.Error())
	}

	// 批量擷取
	if rst, err := cli.BatchGet(nil, ks, bn); nil == err {
		for i, r := range rst {
			if nil != r {
				fmt.Printf("ns = %s, set = %s, key = %s, bin = %s, val = %s\n", ks[i].Namespace(), ks[i].SetName(), ks[i].Value(), bn, r.Bins[bn])
			} else {
				fmt.Printf("ns = %s, set = %s, key = %s, bin = %s, val = %s\n", ks[i].Namespace(), ks[i].SetName(), ks[i].Value(), bn, "nil")
			}
		}
	}
}           

       用例四:asBatchToRecord批量操作,140 - 144行寫入四條記錄,147行故意定義其兩倍查詢主鍵數組,其中前四個為剛才寫入記錄的主鍵,151 - 154行構造了四個不存在的主鍵,155行提請批量判斷記錄是否存在,156 - 158行列印結果至标準輸出。164 - 172行采用兩倍主鍵數組批量擷取結果集,存在的結果列印,不存在的顯示nil。

func asAppendToRecord(cli *as.Client) {
	mybin := "mybin4"

	bin := as.NewBin(mybin, "aero")
	cli.AppendBins(AS_WPLC, AS_NKEY, bin)
	asGetBinsAndPrint("AppendBins第一次調用", cli, AS_NKEY, bin)

	bin = as.NewBin(mybin, "spike")
	cli.AppendBins(AS_WPLC, AS_NKEY, bin)
	asGetBinsAndPrint("AppendBins第二次調用", cli, AS_NKEY, bin)
}           

       用例五:asAppendToRecord從右側向bin中追加資料,183 - 185行寫入aero,187 - 189行寫入spike,分兩次寫入完整aerospike資料。

func asAddBinsToRecord(cli *as.Client) {
	val := 8
	mybin := "mybin5"

	bm := make(as.BinMap)
	bm[mybin] = val
	cli.Add(AS_WPLC, AS_NKEY, bm)
	asGetBinsAndPrint("BinMap 傳入調用", cli, AS_NKEY, as.NewBin(mybin, val))

	// NewBin 調用累加
	bin := as.NewBin(mybin, val << 1)
	cli.AddBins(AS_WPLC, AS_NKEY, bin)
	asGetBinsAndPrint("NewBin 調用累加", cli, AS_NKEY, bin)

	// Operate調用累加
	bin = as.NewBin(mybin, val << 2)
	cli.Operate(AS_WPLC, AS_NKEY, as.AddOp(bin), as.GetOp())
	asGetBinsAndPrint("Operate調用累加", cli, AS_NKEY, bin)
	//上面一行與下面幾行等價,Operate調用累加的同時傳回結果。
	//if rst, err := cli.Operate(AS_WPLC, AS_NKEY, as.AddOp(bin), as.GetOp()); nil == err {
	//	fmt.Printf(
	//		"Operate調用累加 ---> ns = %s, set = %s, AS_NKEY = %s, bin = %s, val = %d\n",
	//		AS_NKEY.Namespace(), AS_NKEY.SetName(), AS_NKEY.Value(), bin.Name, rst.Bins[bin.Name])
	//}
}           

       用例六:asAddBinsToRecord對bin值進行累加,201 - 204行構造一個map,調用as的Add函數累加,207 - 209行調用as的AddBins函數累加,212 - 214行調用as的Operate函數累加,三種方式導緻的結果一緻,不同的隻是入口參數不同。

func asPrependToRecord(cli *as.Client) {
	mybin := "mybin6"
	wds := []string{"工程師", "研發", "軟體", "是", "我"}

	for i, w := range wds {
		bin := as.NewBin(mybin, w)
		cli.PrependBins(AS_WPLC, AS_NKEY, bin)
		asGetBinsAndPrint(strconv.Itoa(i + 1), cli, AS_NKEY, bin)
	}
}           

       用例七:asPrependToRecord從左側向bin中追加資料,為了示範效果特将一句中文分詞後從右向左逐一寫入bin,每寫一次列印輸出一次。

func asReplaceToRecord(cli *as.Client) {
	bn := []string{"b_name_1", "b_name_2", "b_name_3"}
	bv := []string{"b_value_1", "b_value_2", "b_value_3"}

	for i, b := range bv {
		bin := as.NewBin(bn[i], b)
		cli.PutBins(AS_WPLC, AS_NKEY, bin)
	}

	fmt.Printf("覆寫前 --- > ")
	if rst, err := cli.Get(AS_PLCY, AS_NKEY, bn[0], bn[1], bn[2]); nil == err {
		for k, v := range rst.Bins {
			fmt.Printf("{bin = %s, val = %s}, ", k, v)
		}
		fmt.Println()
	}

	wp := as.NewWritePolicy(0, 0)
	wp.RecordExistsAction = as.REPLACE
	nb := as.NewBin("b_name_replace", "b_value_replace")

	cli.PutBins(wp, AS_NKEY, nb)
	asGetBinsAndPrint("覆寫後", cli, AS_NKEY, nb)
}           

       用例八:asReplaceToRecord對原有記錄進行覆寫操作,需要特别注意的一點是PutBins時務必指定所有的bin,已存在的但未傳入的bin系統将不會自動儲存。248 - 259行寫入三個bin并列印至标準輸出,261 - 266行采用一個新的bin執行覆寫操作成功後,之前寫入的三個bin被删除。

func asGetHeaderFromKey(cli *as.Client) {
	if rst, _ := cli.GetHeader(AS_PLCY, AS_NKEY); nil != rst {
		fmt.Printf("Key.generation = %d, Key.expiration = %d (%s)\n", rst.Generation, rst.Expiration, time.Duration(rst.Expiration) * time.Second)
	} else {
		fmt.Printf("Failed to GetHeader: namespace = %s, set = %s, key = %s\n", AS_NKEY.Namespace(), AS_NKEY.SetName(), AS_NKEY.Value())
	}
}           

       用例九:asGetHeaderFromKey查詢一個已存在主鍵的被修改次數和生存期,一個key在沒有被修改的情況下,系統預設生存期為720個小時(30天)

       Redis和Aerospike經常被拿來比較,至于用哪裡好更多的時候還是要看需求,從總體上看,Aerospike比Redis對結構化資料支援的更好些,資料有效期、叢集化管理、容災容錯方面也是各有所長,一定要找一個彼此不可替代的場景還真不好找。現在做個項目工程哪個不需要存點臨時的中間資料給各個子系統讀讀寫寫的,多個備選方案總不是壞事。