前提
Snowflake
(雪花)是
Twitter
開源的高性能
ID
生成算法(服務)。

上圖是
Snowflake
的
Github
倉庫,
master
分支中的
REAEMDE
檔案中提示:初始版本于
2010
年釋出,基于
Apache Thrift
,早于
Finagle
(這裡的
Finagle
是
Twitter
上用于
RPC
服務的構模組化塊)釋出,而
Twitter
内部使用的
Snowflake
是一個完全重寫的程式,在很大程度上依靠
Twitter
上的現有基礎架構來運作。
而
2010
年釋出的初版
Snowflake
源碼是使用
Scala
語言編寫的,歸檔于
scala_28
分支。換言之,大家目前使用的
Snowflake
算法原版或者改良版已經是十年前(目前是
2020
年)的産物,不得不說這個算法确實比較厲害。
scala_28
分支中有介紹該算法的動機和要求,這裡簡單摘錄一下:
動機:
-
中沒有生成順序Cassandra
的工具,ID
由使用Twitter
轉向使用MySQL
的時候需要一種新的方式來生成Cassandra
(印證了架構不是設計出來,而是基于業務場景疊代出來)。ID
要求:
- 高性能:每秒每個程序至少産生
個10K
,加上網絡延遲響應速度要在ID
内。2ms
- 順序性:具備按照時間的自增趨勢,可以直接排序。
- 緊湊性:保持生成的
的長度在ID
或更短。64 bit
- 高可用:
生成方案需要和存儲服務一樣高可用。ID
下面就
Snowflake
的源碼分析一下他的實作原理。
Snowflake方案簡述
Snowflake
在初版設計方案是:
- 時間:
長度,使用毫秒級别精度,帶有一個自定義41 bit
,那麼可以使用大概epoch
年。69
- 可配置的機器
:ID
長度,可以滿足10 bit
個機器使用。1024
- 序列号:
長度,可以在12 bit
個數字中随機取值,進而避免單個機器在4096
内生成重複的序列号。1 ms
但是在實際源碼實作中,
Snowflake
把
10 bit
的可配置的機器
ID
拆分為
5 bit
Worker ID
(這個可以了解為原來的機器
ID
)和
5 bit
Data Center ID
(資料中心
ID
),詳情見
IdWorker.scala
也就是說,支援配置最多
32
個機器
ID
和最多
32
個資料中心
ID
由于算法是
Scala
語言編寫,是依賴于
JVM
的語言,傳回的
ID
值為
Long
類型,也就是
64 bit
的整數,原來的算法生成序列中隻使用了
63 bit
的長度,要傳回的是無符号數,是以在高位補一個
(占用
1 bit
),那麼加起來整個
ID
的長度就是
64 bit
其中:
-
毫秒級别時間戳的取值範圍是:41 bit
=>[0, 2^41 - 1]
,一共0 ~ 2199023255551
個數字。2199023255552
-
機器5 bit
的取值範圍是:ID
[0, 2^5 - 1]
0 ~ 31
32
-
資料中心5 bit
ID
[0, 2^5 - 1]
0 ~ 31
32
-
序列号的取值範圍是:12 bit
[0, 2^12 - 1]
0 ~ 4095
4096
那麼理論上可以生成
2199023255552 * 32 * 32 * 4096
個完全不同的
ID
值。
Snowflake
算法還有一個明顯的特征:依賴于系統時鐘。
41 bit
長度毫秒級别的時間來源于系統時間戳,是以必須保證系統時間是向前遞進,不能發生時鐘回撥(通說來說就是不能在同一個時刻産生多個相同的時間戳或者産生了過去的時間戳)。一旦發生時鐘回撥,
Snowflake
會拒絕生成下一個
ID
。
位運算知識補充
Snowflake
算法中使用了大量的位運算。由于整數的補碼才是在計算機中的存儲形式,
Java
或者
Scala
中的整型都使用補碼表示,這裡稍微提一下原碼和補碼的知識。
- 原碼用于閱讀,補碼用于計算。
- 正數的補碼與其原碼相同。
- 負數的補碼是除最高位其他所有位取反,然後加
(反碼加1
),而負數的補碼還原為原碼也是使用這個方式。1
-
的原碼是+0
,而0000 0000
-0
,補碼隻有一個 值,用1000 0000
表示,這一點很重要,補碼的 沒有二義性。0000 0000
簡單來看就是這樣:
* [+ 11] 原碼 = [0000 1011] 補碼 = [0000 1011]
* [- 11] 原碼 = [1000 1011] 補碼 = [1111 0101]
* [- 11]的補碼計算過程:
原碼 1000 1011
除了最高位其他位取反 1111 0100
加1 1111 0101 (補碼)
使用原碼、反碼在計算的時候得到的不一定是準确的值,而使用補碼的時候計算結果才是正确的,記住這個結論即可,這裡不在舉例。由于
Snowflake
ID
生成方案中,除了最高位,其他四個部分都是無符号整數,是以四個部分的整數使用補碼進行位運算的效率會比較高,也隻有這樣才能滿足Snowflake高性能設計的初衷。
Snowflake
算法中使用了幾種位運算:異或(
^
)、按位與(
&
)、按位或(
|
)和帶符号左移(
<<
)。
異或
異或的運算規則是:
0^0=0
0^1=1
1^0=1
1^1=0
,也就是位不同則結果為1,位相同則結果為0。主要作用是:
- 特定位翻轉,也就是一個數和
個位都為N
的數進行異或操作,這對應的1
個位都會翻轉,例如N
,結果就是0100 & 1111
1011
- 與 項異或,則結果和原來的值一緻。
- 兩數的值互動:
a=a^b
b=b^a
,這三個操作完成之後,a=a^b
和a
的值完成交換。b
這裡推演一下最後一條:
* [+ 11] 原碼 = [0000 1011] 補碼 = [0000 1011] a
* [- 11] 原碼 = [1000 1011] 補碼 = [1111 0101] b
a=a^b 0000 1011
1111 0101
---------^
1111 1110
b=b^a 1111 0101
---------^
0000 1011 (十進制數:11) b
a=a^b 1111 1110
---------^
1111 0101 (十進制數:-11) a
按位與
按位與的運算規則是:
0&0=0
0&1=0
1&0=0
1&1=1
,隻有對應的位都為1的時候計算結果才是1,其他情況的計算結果都是0。主要作用是:
- 清零,如果想把一個數清零,那麼和所有位為 的數進行按位與即可。
- 取一個數中的指定位,例如要取
中的低X
位,隻需要和4
進行按位與即可,例如取zzzz...1111
的低1111 0110
位,則4
即可得到11110110 & 00001111
00000110
按位或
0|0=0
0|1=1
1|0=1
1|1=1
,隻要有其中一個位存在1則計算結果是1,隻有兩個位同時為0的情況下計算結果才是0。主要作用是:
- 對一個數的部分位指派為
,隻需要和對應位全為 的數做按位或操作就行,例如1
如果低1011 0000
位想全部指派為4
,那麼1
10110000 | 00001111
1011 1111
帶符号左移
帶符号左移的運算符是
<<
,一般格式是:
M << n
。作用如下:
-
的二進制數(補碼)向左移動M
位。n
- 左邊(高位)移出部分直接舍棄,右邊(低位)移入部分全部補
- 移位結果:相當于
的值乘以M
2
次方,并且0、正、負數通用。n
- 移動的位數超過了該類型的最大位數,那麼編譯器會對移動的位數取模,例如
移位int
位,實際上隻移動了33
33 % 2 = 1
推演過程如下(假設
n = 2
):
* [+ 11] 原碼 = [0000 1011] 補碼 = [0000 1011]
* [- 11] 原碼 = [1000 1011] 補碼 = [1111 0101]
* [+ 11 << 2]的計算過程
補碼 0000 1011
左移2位 0000 1011
舍高補低 0010 1100
十進制數 2^2 + 2^3 + 2^5 = 44
* [- 11 << 2]的計算過程
補碼 1111 0101
左移2位 1111 0101
舍高補低 1101 0100
原碼 1010 1100 (補碼除最高位其他所有位取反再加1)
十進制數 - (2^2 + 2^3 + 2^5) = -44
可以寫個
main
方法驗證一下:
public static void main(String[] args) {
System.out.println(-11 << 2); // -44
System.out.println(11 << 2); // 44
}
組合技巧
利用上面提到的三個位運算符,互相組合可以實作一些高效的計算方案。
計算n個bit能表示的最大數值:
Snowflake
算法中有這樣的代碼:
// 機器ID的位長度
private val workerIdBits = 5L;
// 最大機器ID -> 31
private val maxWorkerId = -1L ^ (-1L << workerIdBits);
這裡的算子是
-1L ^ (-1L << 5L)
,整理運算符的順序,再使用
64 bit
的二進制數推演計算過程如下:
* [-1] 的補碼 11111111 11111111 11111111 11111111 11111111 11111111 11111111 11111111
左移5位 11111111 11111111 11111111 11111111 11111111 11111111 11111111 11100000
[-1] 的補碼 11111111 11111111 11111111 11111111 11111111 11111111 11111111 11111111
異或 ----------------------------------------------------------------------- ^
結果的補碼 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00011111 (十進制數 2^0 + 2^1 + 2^2 + 2^3 + 2^4 = 31)
這樣就能計算出
5 bit
能表示的最大數值
n
,
n
為整數并且
0 <= n <= 31
,即
0、1、2、3...31
Worker ID
Data Center ID
部分的最大值就是使用這種組合運算得出的。
用固定位的最大值作為Mask避免溢出:
Snowflake
var sequence = 0L
......
private val sequenceBits = 12L
// 這裡得到的是sequence的最大值4095
private val sequenceMask = -1L ^ (-1L << sequenceBits)
......
sequence = (sequence + 1) & sequenceMask
最後這個算子其實就是
sequence = (sequence + 1) & 4095
,假設
sequence
目前值為
4095
,推演一下計算過程:
* [4095] 的補碼 00000000 00000000 00000000 00000000 00000000 00000000 00000111 11111111
[sequence + 1] 的補碼 00000000 00000000 00000000 00000000 00000000 00000000 00001000 00000000
按位與 ----------------------------------------------------------------------- &
計算結果 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 (十進制數:0)
可以編寫一個
main
public static void main(String[] args) {
int mask = 4095;
System.out.println(0 & mask); // 0
System.out.println(1 & mask); // 1
System.out.println(2 & mask); // 2
System.out.println(4095 & mask); // 4095
System.out.println(4096 & mask); // 0
System.out.println(4097 & mask); // 1
}
也就是
x = (x + 1) & (-1L ^ (-1L << N))
能保證最終得到的
x
值不會超過
N
,這是利用了按位與中的"取指定位"的特性。
Snowflake算法實作源碼分析
Snowflake
雖然用
Scala
語言編寫,文法其實和
Java
差不多,當成
Java
代碼這樣閱讀就行,下面閱讀代碼的時候會跳過一些日志記錄和度量統計的邏輯。先看
IdWorker.scala
的屬性值:
// 定義基準紀元值,這個值是中原標準時間2010-11-04 09:42:54,估計就是2010年初版送出代碼時候定義的一個時間戳
val twepoch = 1288834974657L
// 初始化序列号為0
var sequence = 0L //TODO after 2.8 make this a constructor param with a default of 0
// 機器ID的最大位長度為5
private val workerIdBits = 5L
// 資料中心ID的最大位長度為5
private val datacenterIdBits = 5L
// 最大的機器ID值,十進制數為為31
private val maxWorkerId = -1L ^ (-1L << workerIdBits)
// 最大的資料中心ID值,十進制數為為31
private val maxDatacenterId = -1L ^ (-1L << datacenterIdBits)
// 序列号的最大位長度為12
private val sequenceBits = 12L
// 機器ID需要左移的位數12
private val workerIdShift = sequenceBits
// 資料中心ID需要左移的位數 = 12 + 5
private val datacenterIdShift = sequenceBits + workerIdBits
// 時間戳需要左移的位數 = 12 + 5 + 5
private val timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits
// 序列号的掩碼,十進制數為4095
private val sequenceMask = -1L ^ (-1L << sequenceBits)
// 初始化上一個時間戳快照值為-1
private var lastTimestamp = -1L
// 下面的代碼塊為參數校驗和初始化日志列印,這裡不做分析
if (workerId > maxWorkerId || workerId < 0) {
exceptionCounter.incr(1)
throw new IllegalArgumentException("worker Id can't be greater than %d or less than 0".format(maxWorkerId))
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
exceptionCounter.incr(1)
throw new IllegalArgumentException("datacenter Id can't be greater than %d or less than 0".format(maxDatacenterId))
}
log.info("worker starting. timestamp left shift %d, datacenter id bits %d, worker id bits %d, sequence bits %d, workerid %d",
timestampLeftShift, datacenterIdBits, workerIdBits, sequenceBits, workerId)
接着看算法的核心代碼邏輯:
// 同步方法,其實就是protected synchronized long nextId(){ ...... }
protected[snowflake] def nextId(): Long = synchronized {
// 擷取系統時間戳(毫秒)
var timestamp = timeGen()
// 高并發場景,同一毫秒内生成多個ID
if (lastTimestamp == timestamp) {
// 確定sequence + 1之後不會溢出,最大值為4095,其實也就是保證1毫秒内最多生成4096個ID值
sequence = (sequence + 1) & sequenceMask
// 如果sequence溢出則變為0,說明1毫秒内并發生成的ID數量超過了4096個,這個時候同1毫秒的第4097個生成的ID必須等待下一毫秒
if (sequence == 0) {
// 死循環等待下一個毫秒值,直到比lastTimestamp大
timestamp = tilNextMillis(lastTimestamp)
}
} else {
// 低并發場景,不同毫秒中生成ID
// 不同毫秒的情況下,由于外層方法保證了timestamp大于或者小于lastTimestamp,而小于的情況是發生了時鐘回撥,下面會抛出異常,是以不用考慮
// 也就是隻需要考慮一種情況:timestamp > lastTimestamp,也就是目前生成的ID所在的毫秒數比上一個ID大
// 是以如果時間戳部分增大,可以确定整數值一定變大,是以序列号其實可以不用計算,這裡直接指派為0
sequence = 0
}
// 擷取到的時間戳比上一個儲存的時間戳小,說明時鐘回撥,這種情況下直接抛出異常,拒絕生成ID
// 個人認為,這個方法應該可以提前到var timestamp = timeGen()這段代碼之後
if (timestamp < lastTimestamp) {
exceptionCounter.incr(1)
log.error("clock is moving backwards. Rejecting requests until %d.", lastTimestamp);
throw new InvalidSystemClock("Clock moved backwards. Refusing to generate id for %d milliseconds".format(lastTimestamp - timestamp));
}
// lastTimestamp儲存目前時間戳,作為方法下次被調用的上一個時間戳的快照
lastTimestamp = timestamp
// 度量統計,生成的ID計數器加1
genCounter.incr()
// X = (系統時間戳 - 自定義的紀元值) 然後左移22位
// Y = (資料中心ID左移17位)
// Z = (機器ID左移12位)
// 最後ID = X | Y | Z | 計算出來的序列号sequence
((timestamp - twepoch) << timestampLeftShift) |
(datacenterId << datacenterIdShift) |
(workerId << workerIdShift) |
sequence
}
// 輔助方法:擷取系統目前的時間戳(毫秒)
protected def timeGen(): Long = System.currentTimeMillis()
// 輔助方法:擷取系統目前的時間戳(毫秒),用死循環保證比傳入的lastTimestamp大,也就是擷取下一個比lastTimestamp大的毫秒數
protected def tilNextMillis(lastTimestamp: Long): Long = {
var timestamp = timeGen()
while (timestamp <= lastTimestamp) {
timestamp = timeGen()
}
timestamp
}
最後一段邏輯的位操作比較多,但是如果熟練使用位運算操作符,其實邏輯并不複雜,這裡可以畫個圖推演一下:
四個部分的整數完成左移之後,由于空缺的低位都會補充了
,基于按位或的特性,所有低位隻要存在
1
,那麼對應的位就會填充為
1
,由于四個部分的位不會越界配置設定,是以這裡的本質就是:四個部分左移完畢後最終的數字進行加法計算。
Snowflake算法改良
Snowflake
算法有幾個比較大的問題:
- 低并發場景會産生連續偶數,原因是低并發場景系統時鐘總是走到下一個毫秒值,導緻序列号重置為
- 依賴系統時鐘,時鐘回撥會拒絕生成新的
(直接抛出異常)。ID
-
Woker ID
的管理比較麻煩,特别是同一個服務的不同叢集節點需要保證每個節點的Data Center ID
Woker ID
組合唯一。Data Center ID
這三個問題美團開源的
Leaf
提供了解決思路,下圖截取自
com.sankuai.inf.leaf.snowflake.SnowflakeIDGenImpl
對應的解決思路是(不進行深入的源碼分析,有興趣可以閱讀以下
Leaf
的源碼):
- 序列号生成添加随機源,會稍微減少同一個毫秒内能産生的最大
數量。ID
- 時鐘回撥則進行一定期限的等待。
- 使用
緩存和管理Zookeeper
Woker ID
Data Center ID
Woker ID
Data Center ID
的配置是極其重要的,對于同一個服務(例如支付服務)叢集的多個節點,必須配置不同的機器
ID
和資料中心
ID
或者同樣的資料中心
ID
和不同的機器
ID
(簡單說就是確定
Woker ID
Data Center ID
的組合全局唯一),否則在高并發的場景下,在系統時鐘一緻的情況下,很容易在多個節點産生相同的
ID
值,是以一般的部署架構如下:
管理這兩個
ID
的方式有很多種,或者像
Leaf
這樣的開源架構引入分布式緩存進行管理,再如筆者所在的創業小團隊生産服務比較少,直接把
Woker ID
Data Center ID
寫死在服務啟動腳本中,然後把所有服務使用的
Woker ID
Data Center ID
統一登記在團隊内部知識庫中。
自實作簡化版Snowflake
如果完全不考慮性能的話,也不考慮時鐘回撥、序列号生成等等問題,其實可以把
Snowflake
的位運算和異常處理部分全部去掉,使用
Long.toBinaryString()
方法結合字元串按照
Snowflake
算法思路拼接出
64 bit
的二進制數,再通過
Long.parseLong()
方法轉化為
Long
類型。編寫一個
main
方法如下:
public class Main {
private static final String HIGH = "0";
/**
* 2020-08-01 00:00:00
*/
private static final long EPOCH = 1596211200000L;
public static void main(String[] args) {
long workerId = 1L;
long dataCenterId = 1L;
long seq = 4095;
String timestampString = leftPadding(Long.toBinaryString(System.currentTimeMillis() - EPOCH), 41);
String workerIdString = leftPadding(Long.toBinaryString(workerId), 5);
String dataCenterIdString = leftPadding(Long.toBinaryString(dataCenterId), 5);
String seqString = leftPadding(Long.toBinaryString(seq), 12);
String value = HIGH + timestampString + workerIdString + dataCenterIdString + seqString;
long num = Long.parseLong(value, 2);
System.out.println(num); // 某個時刻輸出為3125927076831231
}
private static String leftPadding(String value, int maxLength) {
int diff = maxLength - value.length();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < diff; i++) {
builder.append("0");
}
builder.append(value);
return builder.toString();
}
}
然後把代碼規範一下,編寫出一個簡版
Snowflake
算法實作的工程化代碼:
// 主鍵生成器接口
public interface PrimaryKeyGenerator {
long generate();
}
// 簡易Snowflake實作
public class SimpleSnowflake implements PrimaryKeyGenerator {
private static final String HIGH = "0";
private static final long MAX_WORKER_ID = 31;
private static final long MIN_WORKER_ID = 0;
private static final long MAX_DC_ID = 31;
private static final long MIN_DC_ID = 0;
private static final long MAX_SEQUENCE = 4095;
/**
* 機器ID
*/
private final long workerId;
/**
* 資料中心ID
*/
private final long dataCenterId;
/**
* 基準紀元值
*/
private final long epoch;
private long sequence = 0L;
private long lastTimestamp = -1L;
public SimpleSnowflake(long workerId, long dataCenterId, long epoch) {
this.workerId = workerId;
this.dataCenterId = dataCenterId;
this.epoch = epoch;
checkArgs();
}
private void checkArgs() {
if (!(MIN_WORKER_ID <= workerId && workerId <= MAX_WORKER_ID)) {
throw new IllegalArgumentException("Worker id must be in [0,31]");
}
if (!(MIN_DC_ID <= dataCenterId && dataCenterId <= MAX_DC_ID)) {
throw new IllegalArgumentException("Data center id must be in [0,31]");
}
}
@Override
public synchronized long generate() {
long timestamp = System.currentTimeMillis();
// 時鐘回撥
if (timestamp < lastTimestamp) {
throw new IllegalStateException("Clock moved backwards");
}
// 同一毫秒内并發
if (lastTimestamp == timestamp) {
sequence = sequence + 1;
if (sequence == MAX_SEQUENCE) {
timestamp = untilNextMillis(lastTimestamp);
sequence = 0L;
}
} else {
// 下一毫秒重置sequence為0
sequence = 0L;
}
lastTimestamp = timestamp;
// 41位時間戳字元串,不夠位數左邊補"0"
String timestampString = leftPadding(Long.toBinaryString(timestamp - epoch), 41);
// 5位機器ID字元串,不夠位數左邊補"0"
String workerIdString = leftPadding(Long.toBinaryString(workerId), 5);
// 5位資料中心ID字元串,不夠位數左邊補"0"
String dataCenterIdString = leftPadding(Long.toBinaryString(dataCenterId), 5);
// 12位序列号字元串,不夠位數左邊補"0"
String seqString = leftPadding(Long.toBinaryString(sequence), 12);
String value = HIGH + timestampString + workerIdString + dataCenterIdString + seqString;
return Long.parseLong(value, 2);
}
private long untilNextMillis(long lastTimestamp) {
long timestamp;
do {
timestamp = System.currentTimeMillis();
} while (timestamp <= lastTimestamp);
return timestamp;
}
private static String leftPadding(String value, int maxLength) {
int diff = maxLength - value.length();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < diff; i++) {
builder.append("0");
}
builder.append(value);
return builder.toString();
}
public static void main(String[] args) {
long epoch = LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0)
.toInstant(ZoneOffset.of("+8")).toEpochMilli();
PrimaryKeyGenerator generator = new SimpleSnowflake(1L, 1L, epoch);
for (int i = 0; i < 5; i++) {
System.out.println(String.format("第%s個生成的ID: %d", i + 1, generator.generate()));
}
}
}
// 某個時刻輸出如下
第1個生成的ID: 6698247966366502912
第2個生成的ID: 6698248027448152064
第3個生成的ID: 6698248032162549760
第4個生成的ID: 6698248033076908032
第5個生成的ID: 6698248033827688448
通過字元串拼接的寫法雖然運作效率低,但是可讀性會比較高,工程化處理後的代碼可以在執行個體化時候直接指定
Worker ID
Data Center ID
等值,并且這個簡易的
Snowflake
實作沒有第三方庫依賴,拷貝下來可以直接運作。上面的方法使用字元串拼接看起來比較低端,其實最後那部分的按位或,可以完全轉化為加法:
public class Main {
/**
* 2020-08-01 00:00:00
*/
private static final long EPOCH = 1596211200000L;
public static void main(String[] args) {
long workerId = 1L;
long dataCenterId = 1L;
long seq = 4095;
long timestampDiff = System.currentTimeMillis() - EPOCH;
long num = (long) (timestampDiff * Math.pow(2, 22)) + (long) (dataCenterId * Math.pow(2, 17)) + (long) (workerId * Math.pow(2, 12)) + seq;
System.out.println(num); // 某個時刻輸出為3248473482862591
}
}
這樣看起來整個算法都變得簡單,不過這裡涉及到指數運算和加法運算,效率會比較低。
小結
Snowflake
算法是以高性能為核心目标的算法,基于這一點目的巧妙地大量使用位運算,這篇文章已經把
Snowflake
中應用到的位運算和具體源碼實作徹底分析清楚。最後,基于
Twitter
官方的
Snowflake
算法源碼,修訂出了一版
Java
實作版本,并且應用前面提到的改良方式,修複了低并發場景下隻産生偶數的問題,并且已經應用于生産環境一段時間,代碼倉庫如下(代碼沒有任何第三方庫依賴,拷貝出來就直接可用):
-
Github
https://github.com/zjcscut/framework-mesh/tree/master/java-snowflake
參考資料:
(本文完 c-3-d e-a-20200809)
推廣連結