本文根據10月24日在群裡直播的内容整理,友善習慣看文字的同學。如果更習慣看視訊,可以點選 觀看直播回放
為何選擇Cassandra
作為全球範圍内最流行的寬表資料庫,Apache Cassandra具備諸多優點:海量資料存儲;簡潔易上手的類SQL文法;總是線上;擴容靈活等。除了服務端的各種優點之外,Cassandra對各種語言用戶端(driver)的高性能支援也是其實作易用性和良好性能的重要環節。Cassandra支援幾乎所有流行語言的原生用戶端:Java/Python/C++/C#/NodeJS/PHP/Ruby/Go/Perl/Scala...(詳細清單參見
Cassandra文檔) 。下圖列出了Cassandra支援的開發語言:

同時,Cassandra用戶端良好的架構設計保證了在易于使用的同時能夠從服務端擷取最大的性能。下圖是Cassandra和一些其他引擎的性能對比(出自
https://www.datastax.com/nosql-databases/benchmarks-cassandra-vs-mongodb-vs-hbase)
接下來的内容中,我們會從分布式資料庫用戶端要解決的一般問題出發,分析和比較幾種常見的用戶端實作方案,了解Cassandra高性能用戶端背後的實作原理和架構,并以java driver為例,介紹Cassandra高性能用戶端的異步接口、連接配接池、負載均衡、重試政策等重要特性的原理和配置參數。
分布式資料庫用戶端常見方案
要為分布式資料庫設計一個用戶端,可以簡化為如下模型:
資料分布在不同的伺服器節點上面,要如何設計用戶端和伺服器的互動方式?
一般來說,這包含了如下問題:
- 用戶端如何知道資料在哪個節點?這涉及meta資料如何存儲、互動的問題。
- 如何減少不必要的網絡通信? 一般要用到一些資料本地化的設計。
- 負載均衡:包括proxy之間、多個副本之間的均衡。
- 重用戶端 vs 輕用戶端,需要做一些取舍。
- 如何實作多語言的支援?有些引擎會為每種語言實作一個Native的用戶端,也有一些引擎(例如HBase)會使用IDL的方式來定義跨語言的接口。
- 其他設計上的考慮,如連接配接池/線程排程/異步…
Mongodb方案
Mongodb Sharding叢集的架構參見下圖
這是一個比較典型的proxy方式實作分布式資料庫用戶端的方案。在這種方案裡面,meta資料單獨存儲在config server裡面,前端通過一個proxy(mongos)來将用戶端的請求路由到實際的資料節點(Shard)。
HBase方案
下圖同時畫出了HBase通過Java和其他語言通路的架構:
HBase的Thrift Server也是類似proxy的工作方式。
Cassandra方案
Cassandra與其他引擎不同的地方首先展現在每個服務端節點上都有meta資訊,同時用戶端也會在啟動時拉取meta。這意味着:
(1)Cassandra在把用戶端請求路由到資料節點時不需要從遠端節點查找meta。
(2)Cassandra用戶端的政策可以很靈活,既可以把任意一個節點作為proxy來發送請求,也可以像重用戶端一樣把請求發到資料所在的節點減少網絡轉發的請求,如下圖:
比較
了解了這幾種架構之後,我們可以再回到這一小節前面提到的問題,見下表:
比較内容 | MongoDB | HBase | Cassandra |
---|---|---|---|
meta存儲 | 中心化 | 分布式 | |
資料本地化 | 無 | 有 | |
proxy負載均衡 | |||
副本負載均衡 | |||
重用戶端? | 輕用戶端 | 重用戶端 | 政策靈活,介于二者之間 |
多語言支援 | Native | Thrift |
可見,Cassandra用戶端的設計在每個點上幾乎都做了最有利于性能的選擇。
Cassandra用戶端深入
了解了Cassandra用戶端和服務端互動的方式之後,我們再來深入看一下Cassandra用戶端的内部結構(以DataStax Java Driver為例):
關于這個圖的進一步描述可以參考
https://beyondthelines.net/databases/the-cassandra-java-driver,中文版本是
https://yq.aliyun.com/articles/719645,這裡隻做簡單的解讀:
(1)應用通過session對象和driver互動,session對象中管理着一系列連接配接池(pool)。
(2)用戶端會針對伺服器的每個節點建立一個連接配接池,每個連接配接池中有若幹到節點的網絡連接配接(Connection)。
(3)請求都是異步發送的,這也意味着每個連接配接可以并行發送若幹請求。
各種對象之間的關系如下圖:
Quick Start
一個簡單的Java用戶端示例代碼如下:
public class Demo {
public static void main(String[] args) {
// 此處填寫資料庫連接配接點位址(公網或者内網的),控制台有幾個就填幾個。
// 實際上SDK最終隻會連上第一個可連接配接的連接配接點并建立控制連接配接,填寫多個是為了防止單個節點挂掉導緻無法連接配接資料庫。
// 此處無需關心連接配接點的順序,因為SDK内部會先打亂連接配接點順序避免不同用戶端的控制連接配接總是連一個點。
// 千萬不要把公網和内網的位址一起填入。
String[] contactPoints = new String[]{
"$host1",
"$host2"
};
Cluster cluster = Cluster.builder()
.addContactPoints(contactPoints)
// 填寫賬戶名密碼(如果忘記可以在 帳号管理 處重置)
.withAuthProvider(new PlainTextAuthProvider("$username", "$Password"))
// 如果進行的是公網通路,需要在帳号名後面帶上 @public 以切換至完全的公網鍊路。
// 否則無法在公網連上所有内部節點,會看到異常或者卡頓,影響本地開發調試。
// 後續會支援網絡鍊路自動識别(即無需手動添加@public)具體可以關注官網Changelog。
//.withAuthProvider(new PlainTextAuthProvider("cassandra@public", "123456"))
.build();
// 初始化叢集,此時會建立控制連接配接(這步可忽略,建立Session時候會自動調用)
cluster.init();
// 連接配接叢集,會對每個Cassandra節點建立長連接配接池。
// 是以這個操作非常重,不能每個請求建立一個Session。合理的應該是每個程序預先建立若幹個。
// 通常來說一個夠用了,你也可以根據自己業務測試情況适當調整,比如把讀寫的Session分開管理等。
Session session = cluster.connect();
//查詢
ResultSet res = session.execute("SELECT release_version FROM system.local");
// ResultSet 實作了 Iterable 接口,我們直接将每行資訊列印到控制台
res.forEach(System.out::println);
// 關閉Session
session.close();
// 關閉Cluster
cluster.close();
}
}
更多例子可以參加
阿裡雲幫助文檔、
直播示範的Demo及各種語言Driver的文檔。
主要特性
Statements
這裡主要介紹如下三種statement
SimpleStatement
如下代碼建立了一個SimpleStatement并執行:
session.execute("SELECT value FROM application_params WHERE name = 'greeting_message'");
這個語句執行過程中與服務端的互動過程如下圖:
PreparedStatement
PreparedStatement prepared = session.prepare(
"insert into product (sku, description) values (?, ?)");
BoundStatement bound = prepared.bind("234827", "Mouse");
session.execute(bound);
session.execute(prepared.bind("987274", "Keyboard"));
PreparedStatement的執行過程分為兩個步驟,prepare階段伺服器會解析CQL語句并緩存:
之後每次調用execute,伺服器并不需要重新解析語句,而是從緩存中取出解析的結果來執行,是以減少了解析的時間:
BatchStatement
PreparedStatement preparedInsertExpense =
session.prepare(
"INSERT INTO cyclist_expenses (cyclist_name, expense_id, amount, description, paid) "
+ "VALUES (:name, :id, :amount, :description, :paid)");
SimpleStatement simpleInsertBalance =
new SimpleStatement("INSERT INTO cyclist_expenses (cyclist_name, balance) VALUES (?, 0) IF NOT EXISTS",
"Vera ADRIAN");
BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
.add(simpleInsertBalance)
.add(preparedInsertExpense.bind("Vera ADRIAN", 1, 7.95f, "Breakfast", false));
session.execute(batch);
關于BatchStatement的原理和注意事項參見
簡析Cassandra的BATCH操作一文。
總結
- SimpleStatement适用于隻執行一次(或幾次)的語句
- PreparedStatement适用于經常執行的語句,可以節省parse的時間
- BatchStatement适用于有原子性要求的批量語句;或者對同一個partition key的批量操作。
異步API
異步API的示例如下:
import com.google.common.util.concurrent.*;
ListenableFuture<Session> session = cluster.connectAsync();
// Use transform with an AsyncFunction to chain an async operation after another:
ListenableFuture<ResultSet> resultSet = Futures.transform(session,
new AsyncFunction<Session, ResultSet>() {
public ListenableFuture<ResultSet> apply(Session session) throws Exception {
return session.executeAsync("select release_version from system.local");
}
});
// Use transform with a simple Function to apply a synchronous computation on the result:
ListenableFuture<String> version = Futures.transform(resultSet,
new Function<ResultSet, String>() {
public String apply(ResultSet rs) {
return rs.one().getString("release_version");
}
});
// Use a callback to perform an action once the future is complete:
Futures.addCallback(version, new FutureCallback<String>() {
public void onSuccess(String version) {
System.out.printf("Cassandra version: %s%n", version);
}
public void onFailure(Throwable t) {
System.out.printf("Failed to retrieve the version: %s%n",
t.getMessage());
}
});
由于異步API不會阻塞應用執行,是以可以提高效率。異步API+TokenAwarePolicy配合使用可以實作很好的寫入性能。
連接配接池
如下代碼示例了如何配置連接配接池相關參數:
PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions
.setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
.setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
Cluster cluster = Cluster.builder()
.withContactPoints("127.0.0.1")
.withPoolingOptions(poolingOptions)
.build();
和連接配接池有關的配置内容主要有以下這幾個方面:
pool大小
pool大小的配置決定了和服務端每個host建立多少個連接配接。預設值為LOCAL hosts: core = max = 1,REMOTE hosts: core = max = 1。如果core != max,cassandra會根據負載情況動态調整連接配接數,具體的政策如下:
- 若連接配接數n < max 且 并發請求數 > (n - 1) * maxRequestsPerConnection + NewConnectionThreshold,則建立連接配接
-
每10s執行一次清理。若n > core 且 需要的連接配接數 < n,則清理連接配接。需要的連接配接數由 實際請求數,maxRequestsPerConnection,NewConnectionThreshold三者共同決定。
可根據應用的實際負載情況适當調大CoreConnections/MaxConnections的值。NewConnectionThreshold一般情況下不需要配置。
maxRequestsPerConnection
這個配置項的含義是in flight(已發到伺服器但還沒收到響應)的最大請求數。超出這個配置的請求會排隊或報錯(取決于排隊配置)。預設值為LOCAL hosts:1024,REMOTE hosts:256,還是比較小的,可以根據應用實際情況适當調大。
排隊配置
主要包括MaxQueueSize和PoolTimeoutMillis。如果所有連接配接都在忙(超過maxRequestsPerConnection個請求),則新的請求會排隊一段時間以等待連接配接可用,隊列最大長度為MaxQueueSize,等待的最長時間為PoolTimeoutMillis。
另外,在生産環境上面如果需要監測連接配接池的使用情況,Driver裡面也提供了Session.getState這個方法。
負載均衡政策
如下代碼示例了如何配置負載均衡政策:
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.withLoadBalancingPolicy(new RoundRobinPolicy())
.build();
負載均衡政策決定新的請求發往哪個coordinator,以及failover時選擇哪個coordinator。預設的政策是
new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build());
,表示會把請求盡量發到LOCAL節點,并切會根據token分布把請求發到資料對應的節點上面去。
除預設政策外,其他政策還包括:
- RoundRobinPolicy
- DCAwareRoundRobinPolicy
- TokenAwarePolicy
- LatencyAwarePolicy
- WhiteListPolicy
- HostFilterPolicy
重試政策
如下代碼示例了如何配置重試政策:
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.withRetryPolicy(new MyCustomPolicy())
.build();
如下這些重試行為是hard-code的,應用無法配置(實際上也沒有理由要去配置……):
- 網絡實際寫入發生前,有任何錯誤都會在另一個host上重試
- prepared statement未編譯,在同一個host上編譯并重試
-
若節點處于bootstrapping狀态,在另一個節點重試請求
預設政策:
- OnReadTimeout:如果收到足夠多副本數的響應但沒有讀到資料,則在同一個host重試一次;其他情況直接抛異常。
- onWriteTimeout:隻在寫batchlog失敗時重試一次;其他情況抛異常。
- onUnavailable:在其他host重試一次。
-
onRequestError:WriteFailure/ReadFailure不重試,用戶端逾時等請求錯誤會在其他host重試一次。
如果預設政策不滿足需求,也可以通過實作RetryPolicy接口來自定義重試政策。
可以看到,Cassandra不僅支援多種開發語言通路,而且用戶端的設計兼顧了易用性和高性能,靈活而且高效,還提供了很多有用的特性可以根據應用的實際場景從服務端擷取最大的性能。
入群邀約
為了營造一個開放的 Cassandra 技術交流,我們建立了微信群公衆号和釘釘群,為廣大使用者提供專業的技術分享及問答,定期開展專家技術直播,歡迎大家加入。另外阿裡雲提供免費Cassandra試用:
https://www.aliyun.com/product/cds釘釘群入群連結:
https://c.tb.cn/F3.ZRTY0o微信群公衆号: