想要掌握 Seata 的配置,必須了解 Seata 的啟動過程,了解啟動時的各項配置,才能在配置時知道該幹什麼。
用到的配置
這裡先列出 Server 啟動過程中實際用到的配置配置,下文會具體分析。
屬性 | 讀取值 |
config.type | file |
config.file.name | file.conf |
metrics.enabled | FALSE |
recovery.asyn-committing-retry-period | 1000 |
recovery.committing-retry-period | 1000 |
recovery.rollbacking-retry-period | 1000 |
recovery.timeout-retry-period | 1000 |
registry.type | file |
service.max.commit.retry.timeout | -1 |
service.max.rollback.retry.timeout | -1 |
store.file.dir | sessionStore |
store.file.file-write-buffer-cache-size | 16384 |
store.file.flush-disk-mode | async |
store.file.session.reload.read_size | 100 |
store.mode | file |
transaction.undo.log.delete.period | 86400000 |
transport.heartbeat | TRUE |
transport.server | NIO |
transport.thread-factory.boss-thread-prefix | NettyBoss |
transport.thread-factory.boss-thread-size | 1 |
transport.thread-factory.share-boss-worker | FALSE |
transport.thread-factory.worker-thread-prefix | NettyServerNIOWorker |
transport.thread-factory.worker-thread-size | 8 |
transport.type | TCP |
Server 入口
io.seata.server.Server
類是整個服務的入口,從這裡的
main
方式入手。
public static void main(String[] args) throws IOException {
//initialize the metrics
MetricsManager.get().init();//1
//initialize the parameter parser
ParameterParser parameterParser = new ParameterParser(args);
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
RpcServer rpcServer = new RpcServer(WORKING_THREADS);
//server port
rpcServer.setListenPort(parameterParser.getPort());
UUIDGenerator.init(parameterParser.getServerNode());
//log store mode : file、db
SessionHolder.init(parameterParser.getStoreMode());
DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
coordinator.init();
rpcServer.setHandler(coordinator);
// register ShutdownHook
ShutdownHook.getInstance().addDisposable(coordinator);
//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(rpcServer.getListenPort());
rpcServer.init();
System.exit(0);
}
下面逐層對上面代碼進行分析。
1. MetricsManager.get().init()
MetricsManager.get().init()
MetricsManager
是一個單例實作,在
init
方法中,首先調用了
ConfigurationFactory.getInstance()
方法,該方法是最重要的一個配置入口,繼續深入進去來看。
1.1 ConfigurationFactory.getInstance()
ConfigurationFactory.getInstance()
ConfigurationFactory
傳回的
io.seata.config.Configuration<T>
的單例,該單例通過
buildConfiguration()
建立。
在
buildConfiguration()
中,又首先使用了
CURRENT_FILE_INSTANCE.getConfig
方法。
這裡的
CURRENT_FILE_INSTANCE
建立方式如下:
private static final String REGISTRY_CONF_PREFIX = "registry";
private static final String REGISTRY_CONF_SUFFIX = ".conf";
private static final String ENV_SYSTEM_KEY = "SEATA_ENV";
private static final String ENV_PROPERTY_KEY = "seataEnv";
private static final String SYSTEM_PROPERTY_SEATA_CONFIG_NAME = "seata.config.name";
private static final String ENV_SEATA_CONFIG_NAME = "SEATA_CONFIG_NAME";
public static final Configuration CURRENT_FILE_INSTANCE;
static {
//0.8.1 增加這個配置後,配置檔案不用局限在 conf 目錄(類路徑cp)下面了,通過 file: 字首可以配置為任意的路徑
String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
if (null == seataConfigName) {
seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
}
if (null == seataConfigName) {
seataConfigName = REGISTRY_CONF_PREFIX;
}
String envValue = System.getProperty(ENV_PROPERTY_KEY);
if (null == envValue) {
envValue = System.getenv(ENV_SYSTEM_KEY);
}
CURRENT_FILE_INSTANCE = (null == envValue) ? new FileConfiguration(seataConfigName + REGISTRY_CONF_SUFFIX)
: new FileConfiguration(seataConfigName + "-" + envValue + REGISTRY_CONF_SUFFIX);
}
2019-10-12 更新上述代碼為 0.8.1 版本,0.8.1 之後配置的路徑沒有了限制,可以更友善的在容器或者K8S中動态配置。
預設的 registry.conf 配置檔案如下:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file"
# 為了簡短,隻保留預設的 file 相關内容
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
file {
name = "file.conf"
}
}
1.2 擷取 seata 配置
通過上述方式得到了
CURRENT_FILE_INSTANCE
,現在回到
buildConfiguration()
方法:
private static Configuration buildConfiguration() {
ConfigType configType = null;
String configTypeName = null;
try {
// 擷取 config.type 名稱,參考上面預設配置,值為 file
configTypeName = CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);
// 對應枚舉值為 File
configType = ConfigType.getType(configTypeName);
} catch (Exception e) {
throw new NotSupportYetException("not support register type: " + configTypeName, e);
}
// 預設配置這裡是 File
if (ConfigType.File == configType) {
// 擷取 config.file.name 值
String pathDataId = ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ FILE_TYPE + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ NAME_KEY;
// 上述預設值為 file.conf
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
// 讀取 file.conf 配置
return new FileConfiguration(name);
} else {
return EnhancedServiceLoader.load(ConfigurationProvider.class, Objects.requireNonNull(configType).name())
.provide();
}
}
從上面邏輯看,這裡就是從支援的多種 registry 中擷取具體的配置,預設是 file。
1.3 處理 metrics 配置
回到上一層方法:
// ConfigurationFactory.getInstance() 中是 file.conf 配置
boolean enabled = ConfigurationFactory.getInstance().getBoolean(
// 擷取該配置中的 metrics.enabled,預設值為 false
ConfigurationKeys.METRICS_PREFIX + ConfigurationKeys.METRICS_ENABLED, false);
file.conf
中
metrics
部配置設定置如下:
## metrics settings
metrics {
# 預設不啟用
enabled = false
registry-type = "compact"
# multi exporters use comma divided
exporter-list = "prometheus"
exporter-prometheus-port = 9898
}
啟用後,會讀取
metrics. registry-type
,目前僅支援
compact
,然後通過下面方法:
EnhancedServiceLoader.load(Registry.class, Objects.requireNonNull(registryType).name());
擷取對應的實作傳回,在這裡的
load
方法中也是單例形式,隻會初始化一次。
然後在讀取
metrics.exporter-list
(逗号隔開) 擷取所有支援的
Exporter
(目前也隻支援
prometheus
)。
prometheus
對應的
PrometheusExporter
實作中會讀取上面的
exporter-prometheus-port
擷取 HTTP 服務的端口号。
2. 解析指令行參數
在進行了配置的初始化後,開始處理 main 方法的指令行參數:
// 解析參數使用了 http://www.jcommander.org/
//initialize the parameter parser
ParameterParser parameterParser = new ParameterParser(args);
// 擷取存儲方式("--storeMode", "-m"),預設為 file,可選 db
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
RpcServer rpcServer = new RpcServer(WORKING_THREADS);
// 擷取端口号("--port", "-p"),預設為 8091
rpcServer.setListenPort(parameterParser.getPort());
// 擷取服務節點ID("--serverNode", "-n"),預設 1
UUIDGenerator.init(parameterParser.getServerNode());
// 初始化存儲
SessionHolder.init(parameterParser.getStoreMode());//3
3. 初始化 RpcServer
RpcServer
代碼如下:
/**
* Instantiates a new Abstract rpc server.
*
* @param messageExecutor the message executor
*/
public RpcServer(ThreadPoolExecutor messageExecutor) {
super(new NettyServerConfig(), messageExecutor);
}
這裡用到了
NettyServerConfig
,這個類中存在大量類似下面的方法:
/**
* Get boss thread prefix string.
*
* @return the string
*/
public String getBossThreadPrefix() {
return CONFIG.getConfig("transport.thread-factory.boss-thread-prefix", DEFAULT_BOSS_THREAD_PREFIX);
}
這裡用到了下面部分的配置:
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
這裡是對 netty 的各種詳細配置。
繼續往下深入看
UUIDGenerator.init
4. UUIDGenerator.init
UUIDGenerator.init
這裡根據節點ID定義了伺服器 UUID 值的範圍,避免了節點間的沖突。
有沒有人覺得這段代碼存在疑問?
public static long generateUUID() {
//假設在超過最大值,進入下面同步前擷取了10個id
//在上面第一個 id 經過 UUID.set(id); 後又擷取了 10 個新id
//這10個新id 會不會和上面的10個- UUID_INTERNAL 後出現部分重複?
long id = UUID.incrementAndGet();
if (id >= UUID_INTERNAL * (serverNodeId + 1)) {
synchronized (UUID) {
if (UUID.get() >= id) {
id -= UUID_INTERNAL;
UUID.set(id);
}
}
}
return id;
}
5. SessionHolder.init
SessionHolder.init
首先參數中的
storeMode
可選,如果沒有設定會按下面方式擷取:
if (StringUtils.isBlank(mode)) {
//use default
mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
}
也就是前面
file.conf
中的
store.mode
,這部分預設配置如下:
## transaction log store
store {
## store mode: file、db
mode = "file"
## file store
file {
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}
## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "mysql"
password = "mysql"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
上面的預設值也是
file
,如果使用
db
參考上面配置修改即可。
再往下就是針對 db 和 file 的兩種處理政策。
5.1 db 方式
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.name());
這裡額外指定了最後一個參數
activateName
,這會在預設的
META-INF/services/
和
META-INF/seata/
基礎上額外去
META-INF/seata/db
目錄加載資源,并且該資源會在隊列的最後一個位置。然後判斷所有的資源是否有
@LoadLevel
并且和
activateName
比對,如果存在多個比對的值,就會使用最後一個。如果沒有比對的值,會使用所有實作的最後一個。
這裡的 db 類定義如下:
@LoadLevel(name = "db")
public class DataBaseSessionManager extends AbstractSessionManager
implements SessionManager, SessionLifecycleListener, Initialize {
ASYNC_COMMITTING_SESSION_MANAGER
等3個隻是增加了額外的參數,在初始化的時候會使用相應的構造方法進行建立。
5.2 file 方式
這裡首先讀取
store.file.dir
擷取存儲 file 的路徑,然後建立下面的實作:
@LoadLevel(name = "file")
public class FileBasedSessionManager extends DefaultSessionManager implements Reloadable {
在 db 和 file 中還涉及了
TransactionStoreManager
的執行個體化,這裡不再深入。
在
SessionHolder.init
最後還有一個針對 file 方式的
reload
。
回到 main 方法繼續。
6. coordinator.init()
coordinator.init()
// 建立協調者
DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
// 初始化
coordinator.init();
rpcServer.setHandler(coordinator);
// register ShutdownHook
ShutdownHook.getInstance().addDisposable(coordinator);
recovery {
#schedule committing retry period in milliseconds
committing-retry-period = 1000
#schedule asyn committing retry period in milliseconds
asyn-committing-retry-period = 1000
#schedule rollbacking retry period in milliseconds
rollbacking-retry-period = 1000
#schedule timeout retry period in milliseconds
timeout-retry-period = 1000
}
transaction {
undo.data.validation = true
undo.log.serialization = "jackson"
undo.log.save.days = 7
#schedule delete expired undo_log in milliseconds
undo.log.delete.period = 86400000
undo.log.table = "undo_log"
}
service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
7. 其他
// 服務綁定 IP("--host", "-h")
// 127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(rpcServer.getListenPort());
// 啟動服務,通過 netty hold 住
rpcServer.init();
// 退出
System.exit(0);