天天看點

ignite mysql_apache ignite系列(三):資料處理(資料加載,資料并置,資料查詢)

​使用ignite的一個常見思路就是将現有的關系型資料庫中的資料導入到ignite中,然後直接使用ignite中的資料,相當于将ignite作為一個緩存服務,當然ignite的功能遠不止于此,下面以将ignite內建進java服務的方式進行示範ignite的資料存儲和查詢相關的功能。由于個人習慣,示例示範沒有使用測試代碼,而是使用rest接口示範。

​在講資料加載之前,ignite中存儲的幾種模式(LOCAL, REPLICATED, PARTITIONED):

LOCAL:本地模式,資料都存儲在本地,無資料再平衡,類似常見的存儲服務;

PARTITIONED:分區模式,資料分散到叢集中的各個節點,分區模式适合存儲數量龐大的資料

ignite mysql_apache ignite系列(三):資料處理(資料加載,資料并置,資料查詢)

如圖所示是設定了Backup備份數的,預設備份數是0,如果分區模式下不設定備份數的話則會存在丢失資料的風險。

REPLICATED:複制模式,有資料再平衡過程,主節點(Primary)資料與分區模式的一緻,隻是複制模式預設備份了除主節點資料外的其餘資料。複制模式适合存儲資料量小,增長不快的資料。

ignite mysql_apache ignite系列(三):資料處理(資料加載,資料并置,資料查詢)

分區模式和複制模式各有優點和缺點,具體選擇要根據實際場景的特點去權衡:

模式

優點

缺點

分區模式(PARTITIONED)

能存儲海量資料,頻繁更新對其影響不大

查詢緩存涉及到資料移動,對查詢性能有影響

複制模式(REPLICATED)

适合存儲資料量不大的資料,資料查詢性能穩定

頻繁更新對其影響較大

1,資料加載

這裡使用mybatis查詢MYSQL裡的資料然後存入ignite,完整代碼可以參考:

為了示範,需要先在MYSQL中生成樣本資料,相關sql腳本為ignite-example\src\main\resources\import.sql,執行該SQL腳本即可完成表的建立和測試資料的初始化。

在配置檔案中定義緩存

java.lang.Long

org.cord.ignite.data.domain.Student

添加相關依賴

org.apache.ignite

ignite-core

${ignite.version}

org.apache.ignite

ignite-spring

${ignite.version}

org.apache.ignite

ignite-indexing

${ignite.version}

​一般将資料導入ignite叢集的方式是使用cache.put(...)方法,但是當有大量的資料需要導入的時候,put的效率已經無法滿足了,針對大量資料導入可以使用ignite的流處理器:

DataLoader.java

......

public void loadData(){

//查詢student集合

List result = studentDao.findAllStudents();

//分布式id生成器

IgniteAtomicSequence sequence = ignite.atomicSequence("studentPk", 0, true);

//根據緩存名擷取流處理器,并往流處理器中添加資料

try(IgniteDataStreamer streamer = ignite.dataStreamer(CacheKeyConstant.STUDENT)) {

result.stream().forEach(r -> streamer.addData(sequence.incrementAndGet(), r));

//将流裡面的剩餘資料壓進ignite

streamer.flush();

}

}

......

導入資料之後,可以在監控程式中看到資料存儲情況:

ignite mysql_apache ignite系列(三):資料處理(資料加載,資料并置,資料查詢)

​流之是以能提高加載資料的速度,是因為流本質就是一個批處理。ignite是通過一緻性哈希保證一緻性的,每往叢集中存入一條cache記錄,ignite會先根據一緻性雜湊演算法計算出這條cache映射到哪個節點,然後會将這條記錄存儲在該節點。而在流處理器中,流處理器會将映射到相同節點的資料批量存儲到對應節點,這樣會顯著提升資料加載的效率。

2,資料查詢

​最直接的查詢緩存方式是使用cache.get(...)方法,這種方式隻能應對簡單的key-value緩存,如果是設定了索引類型(indexedTypes),則緩存就會變成SQL table,這個時候就需要使用SQL方式查詢,當使用SQL方式查詢的時候,一般會有各種查詢條件,這些查詢條件對應的字段均需要預先設定索引。ignite裡面有兩種索引,一種是普通索引,一種是組合索引,要用到@QuerySqlField注解。而查詢用到的api主要是SqlFieldsQuery和SqlQuery,前者是域查詢,也就是查詢部分字段結果集,而後者是普通查詢。

​是以,如果想使用SQL查詢,就需要在加載資料之前在緩存定義中設定索引類型(indexedTypes),并對查詢中可能用到的字段在對應實體類中相關屬性添加注解,有必要的情況下還要設定索引。當定義緩存的時候設定了索引類型,則緩存不再是普通的KV形式的緩存了,而是具有資料庫表的特性,這時候ignite就變成了分布式的記憶體資料庫了,其sql相關功能是基于h2的sql引擎實作的。

1) 設定緩存索引類型

JAVA代碼定義緩存時設定索引類型

這裡以long為主鍵,String為實體類作為示例:

使用CacheConfiguration.setIndexedTypes(Long.class, String.class)即可設定索引

XML配置中設定索引類型

同樣也是設定indexedTypes屬性即可

......

java.lang.Long

org.cord.ignite.data.domain.Student

......

2) 注解@QuerySqlField的三種用法

啟用實體類屬性為查詢域

@QuerySqlField

private String test;

加上該注解後,test字段才可以在sql語句中通路,這種形式不對該屬性列建立索引。

啟用查詢域并為該列設定普通索引

@QuerySqlField(index = true)

private String test;

啟用查詢域并設定組合索引

@QuerySqlField(orderedGroups = {@QuerySqlField.Group(

name = "student", order = 0)})

private String name;

@QuerySqlField(orderedGroups = {@QuerySqlField.Group(

name = "student", order = 1)})

private String email;

其中name屬性指定了組合索引的名字,order表示該字段在組合索引中的順序。

該組合索引與普通資料庫類似,同樣遵循最左原則,即組合索引是否會用到會受到最左原則的限制。

3) 使用 SqlFieldsQuery進行域查詢

SQL文法中有兩個預定義字段_key和_val:

_key : 表示緩存中的所有鍵

_val: 表示緩存中的所有值對象

List> res = cache.query(new SqlFieldsQuery("select _VAL,name from \"Test\".student")).getAll();

System.out.format("The name is %s.\n", res.get(0).get(0));

4) 使用SqlQuery進行普通查詢

NormalController.class

@RequestMapping("/sqlQuery")

public @ResponseBody

String sqlQuery(HttpServletRequest request, HttpServletResponse response) {

IgniteCache tempCache = ignite.cache(CacheKeyConstant.STUDENT);

String sql_query = "name = ? and email = ?";

SqlQuery cSqlQuery = new SqlQuery<>(Student.class, sql_query);

cSqlQuery.setReplicatedOnly(true).setArgs("student_44", "student_44gmail.com");

List> tempResult = tempCache.query(cSqlQuery).getAll();

if (CollectionUtils.isEmpty(tempResult)) {

return "result is Empty!";

}

Student student = tempResult.stream().map(t -> t.getValue()).findFirst().get();

System.out.format("the beginning of student[student_44] is %s\n", student.getDob());

String sql_count = "select count(1) from student";

SqlFieldsQuery countQuery = new SqlFieldsQuery(sql_count);

countQuery.setReplicatedOnly(true);

List> countList = tempCache.query(countQuery).getAll();

long count = 0;

if(!CollectionUtils.isEmpty(countList)) {

count = (Long)countList.get(0).get(0);

}

System.out.format("count of cache[student] is %s\n", count);

String sql_sum = "select sum(studId) from student";

SqlFieldsQuery sumQuery = new SqlFieldsQuery(sql_sum);

sumQuery.setReplicatedOnly(true);

List> sumList = tempCache.query(sumQuery).getAll();

long sum = 0;

if(!CollectionUtils.isEmpty(sumList)) {

sum = (Long)sumList.get(0).get(0);

}

System.out.format("sum of cache[student.id] is %s\n", sum);

return "all executed!";

}

運作結果如下:

the beginning of student[student_44] is Thu Sep 28 00:00:00 GMT+08:00 2017

count of cache[student] is 500

sum of cache[student.id] is 125250

3,資料并置與關聯查詢

​資料的并置主要是針對采用分區模式存儲的資料,所謂的資料并置,就是提供一種限制,将相關連的資料存儲在相同的網格節點上,這樣在資料查詢或者分布式計算的時候就不需要資料移動了,這樣會提升整體的性能。

​以下以X,Y,Z三個cache的并置作為示例,完整代碼請檢視示例工程ignite-example。

ignite mysql_apache ignite系列(三):資料處理(資料加載,資料并置,資料查詢)

其中X,Y,Z為三個分區模式的cache,Y與X并置,也就是說,Y的資料在存儲的時候,會根據其XId屬性,将資料存儲在對應的X所在的節點上,同理,Z與Y并置,也就是将Z的資料存儲在其YId屬性對應的Y所在的節點上。以此形成一種限制,使得資料的配置設定可以人為控制。

要使用資料并置,就不得不提到一個API了,也就是AffinityKey,當一個cache與另外一個cache并置的時候,其cache的Key就得是AffinityKey類型了。

首先進行資料初始化:

CollocatedController.java

private String init(){

if(init.get()){

return "already execute init.";

}

//定義三個緩存

CacheConfiguration xcf = new CacheConfiguration("X")

.setCacheMode(CacheMode.PARTITIONED)

.setIndexedTypes(Long.class, X.class);

CacheConfiguration, Y> ycf = new CacheConfiguration, Y>("Y")

.setCacheMode(CacheMode.PARTITIONED)

.setIndexedTypes(Affinity.class, Y.class);

CacheConfiguration, Z> zcf = new CacheConfiguration, Z>("Z")

.setCacheMode(CacheMode.PARTITIONED)

.setIndexedTypes(Affinity.class, Z.class);

ignite.destroyCache("X");

ignite.destroyCache("Y");

ignite.destroyCache("Z");

ignite.getOrCreateCache(xcf);

ignite.getOrCreateCache(ycf);

ignite.getOrCreateCache(zcf);

IgniteCache xc = ignite.cache("X");

IgniteCache, Y> yc = ignite.cache("Y");

IgniteCache, Z> zc = ignite.cache("Z");

//加載資料

Y y;

Z z;

for (long i = 0; i < 100; i++) {

xc.put(i, new X(i, String.valueOf(i)));

y = new Y(i, String.valueOf(i), i);

yc.put(y.key(), y);

z = new Z(i, String.valueOf(i), i);

zc.put(z.key(), z);

}

init.set(true);

return "all executed.";

}

而cache并置之後怎麼校驗并置是否成功呢?這就要用到Affinity.class的mapKeyToNode()方法了,其作用是根據給定的key,找到存儲該key的節點資訊,具體使用方法如下:

@RequestMapping("/verify")

public @ResponseBody

String verifyCollocate(HttpServletRequest request, HttpServletResponse response) throws Exception {

if(!init.get()){

init();

}

Affinity affinityX = ignite.affinity("X");

Affinity affinityY = ignite.affinity("Y");

Affinity affinityZ = ignite.affinity("Z");

for (long i = 0; i < 100; i++) {

ClusterNode nodeX = affinityX.mapKeyToNode(i);

ClusterNode nodeY = affinityY.mapKeyToNode(i);

ClusterNode nodeZ = affinityZ.mapKeyToNode(i);

if(nodeX.id() != nodeY.id() || nodeY.id() != nodeZ.id() || nodeX.id() != nodeZ.id()){

throw new Exception("cache collocated is error!");

}

}

System.out.println("cache collocated is right!");

return "all executed.";

}

執行/verify之後,無異常抛出,在監控程式中檢視一下存儲情況:

ignite mysql_apache ignite系列(三):資料處理(資料加載,資料并置,資料查詢)

會發現三個cache的資料分布完全一緻,這與驗證程式的結果(無異常抛出)保持一緻,說明cache并置成功。

當資料并置成功後,就可以使用關聯查詢了,可以類比資料庫中的多表聯查:

@RequestMapping("/query")

public @ResponseBody

String query(HttpServletRequest request, HttpServletResponse response){

if(!init.get()){

init();

}

IgniteCache xc = ignite.cache("X");

IgniteCache, Y> yc = ignite.cache("Y");

IgniteCache, Z> zc = ignite.cache("Z");

String sql1 = "from Y,\"X\".X " +

"where Y.XId = X.id " +

"and Y.info = ?";

String sql2 = "from Z,\"Y\".Y " +

"where Z.YId = Y.id " +

"and Z.info = ?";

String sql3 = "from Z,\"Y\".Y,\"X\".X " +

"where Z.YId = Y.id and Y.XId = X.id " +

"and Z.info = ?";

int i = IntStream.range(1, 100).skip((int)(100*Math.random())).findFirst().getAsInt();

System.out.println("query X and Y:");

System.out.println(yc.query(new SqlQuery, Y>(Y.class, sql1).setArgs(i)).getAll());

System.out.println("**************************************************************************************");

System.out.println("query Y and Z:");

System.out.println(zc.query(new SqlQuery, Z>(Z.class, sql2).setArgs(i)).getAll());

System.out.println("**************************************************************************************");

System.out.println("query X and Y and Z:");

System.out.println(zc.query(new SqlQuery, Z>(Z.class, sql3).setArgs(i)).getAll());

System.out.println("**************************************************************************************");

return "all executed.";

}

執行結果如下:

query X and Y:

[Entry [key=AffinityKey [key=83, affKey=83], [email protected]]]

**************************************************************************************

query Y and Z:

[Entry [key=AffinityKey [key=83, affKey=83], [email protected]]]

**************************************************************************************

query X and Y and Z:

[Entry [key=AffinityKey [key=83, affKey=83], [email protected]]]

**************************************************************************************

如果是沒有并置的緩存,在關聯查詢的時候就需要啟用非并置的分布式關聯:SqlQuery.setDistributedJoins(true)

資料并置還可以使用注解@AffinityKeyMapped注解,其用法與使用AffinityKey .class類似,完整示例可參看AffinityMappedController.class

至此,ignite的資料處理相關内容結束。