前段時間在了解分布式,發現firefoxbug在部落格中寫的這篇對這個問題說明得比較清晰易懂,本文主要是自己的了解和實踐。
在後端一般會遇到這樣的場景:随着應用系統的通路量或者DB/檔案存儲系統的資料量增大,系統由于負載增大而出現響應延遲甚至down掉的情況。為了解決這個問題,往往會對系統采用垂直擴充和水準擴充的架構設計,而分布式系統正是水準擴充架構的一種應用實踐。
1 分布式系統要求
分布式設計的初衷就是為了解決單一服務端負載過大的問題,是以在對系統做水準擴充後,資料要盡量均勻地分布在每台伺服器節點的上(即不會出現熱點資料節點)。其次,如果後期需要擴容或者某一節點發生故障需要從叢集中剔除,那麼處理後的分布式系統應該做到對已存儲的資料影響最小,降低資料遷移的
成本
和
風險
。
2 解決方法
由于機器的數量不可能是無限的,是以水準擴充的時候,要考慮把無限的資料通過一定的算法
平衡、有序、易擴充
地分布在這些機器上。
常見的做法是利用把要處理的資料進行編号,然後對機器的資料進行
取模運算
。例如,假設有10個資料(編号為0~9),機器數量為3(編号為0~2),那麼每個資料編号對機器數3取模後,0号機器存放了編号為0,3,6,9的資料;1号機器存了編号為1,4,7的資料;2号機器存放了編号為2,5,8的資料。
取模算法比較簡單,但是當某個伺服器節點出現故障或者新增節點後,需要對已有資料作大量的遷移。在memcached分布式原理中介紹了
Consistent Hashing
算法,它能較好地解決這個問題。
3 一緻性雜湊演算法原理
如上圖所示,memcached分布式提供的雜湊演算法的主要處理流程如下:
1、使用算法求出每個memcached伺服器節點(ip位址)的哈希值x,并将其配置設定到0~2^32的圓上(值域);
2、用同樣的方法求出存儲資料鍵的哈希值y,并映射到圓上。
3、按順時針方向查找第1個比y大的x,那麼y就分布在x前面那個節點上。
4 示例程式
在firefoxbug的原文中提供了python2的示例程式,這裡改成了python3。注意,程式中對這4台機器都使用了虛拟節點(
replicas
),它可以增加資料分布的均勻性。
# -*- coding: UTF-8 -*-
'''
FileName: consistenthashdistributed1.sh
Description: 分布式系統:一緻性hash算法的應用
Simple Usage: python consistenthashdistributed1.py [numbers of replicate]
Reference: http://www.firefoxbug.com/index.php/archives/2791/
(c) 2018.02.17 vfhky https://typecodes.com/python/consistenthashdistributed1.html
'''
import sys
import hashlib
CONTENT = """Consistent hashing is a special kind of hashing such that when a hash table is resized and consistent hashing is used, only K/n keys need to be remapped on average, where K is the number of keys, and n is the number of slots. In contrast, in most traditional hash tables, a change in the number of array slots causes nearly all keys to be remapped."""
# 所有機器清單
SERVERS = [
"192.168.1.1",
"192.168.2.2",
"192.168.3.3",
"192.168.4.4"
]
class HashRing(object):
"""Constructs.
"""
def __init__(self, nodes=None, replicas=3):
"""Manages a hash ring.
`nodes` is a list of objects that have a proper __str__ representation.
`replicas` indicates how many virtual points should be used pr. node,
replicas are required to improve the distribution.
"""
self.replicas = replicas
self.ring = dict()
self._sorted_keys = []
if nodes:
for node in nodes:
self.add_node(node)
def add_node(self, node):
"""Adds a `node` to the hash ring (including a number of replicas).
"""
for i in range(0, self.replicas):
key = self.gen_key('%s:%s' % (node, i))
self.ring[key] = node
# print("key=[%s]=[%s]." %(key, node))
self._sorted_keys.append(key)
self._sorted_keys.sort()
#print("%s" %(self._sorted_keys))
def remove_node(self, node):
"""Removes `node` from the hash ring and its replicas.
"""
for i in range(0, self.replicas):
key = self.gen_key('%s:%s' % (node, i))
del self.ring[key]
self._sorted_keys.remove(key)
def get_node(self, string_key):
"""Given a string key a corresponding node in the hash ring is returned.
If the hash ring is empty, `None` is returned.
"""
return self.get_node_pos(string_key)[0]
def get_node_pos(self, string_key):
"""Given a string key a corresponding node in the hash ring is returned
along with it's position in the ring.
If the hash ring is empty, (`None`, `None`) is returned.
"""
if not self.ring:
return None, None
key = self.gen_key(string_key)
nodes = self._sorted_keys
nodes_num = len(nodes)
for i in range(0, nodes_num):
node = nodes[i]
if key <= node:
return self.ring[node], i
# 對于key>node節點key的,全部落在第1個key對應的節點(192.168.1.4)上,這樣就形成了1個閉環。
print("[%s:%s] string_key=[%s] key=[%s] node=[%s] self.ring[nodes[0]]=[%s].\n" %(__file__, sys._getframe().f_lineno, string_key, key, node, self.ring[nodes[0]]))
return self.ring[nodes[0]], 0
def gen_key(self, key):
"""Given a string key it returns a long value,
this long value represents a place on the hash ring.
md5 is currently used because it mixes well.
"""
m = hashlib.md5()
m.update(key.encode('utf-8'))
return m.hexdigest()
def consistent_hash(replicas):
'''docstring'''
# 模拟初始化每天機器的db
database = {}
for s in SERVERS:
database[s] = []
hr = HashRing(SERVERS,replicas)
for w in CONTENT.split():
database[hr.get_node(w)].append(w)
# 列印所有的節點下面的資料
for node in database:
print("[%s]=[%s].\n" %(node, database[node]))
if __name__ == '__main__':
'''docstring'''
replicas = 3
if len(sys.argv) > 1:
replicas = long(sys.argv[1])
if( replicas < 3 or replicas > 100000 ):
print( "Rreplicas should lower than 100000." )
sys.exit()
consistent_hash(replicas)
上面程式在查找落地節點時,采用的是
周遊
整個hash圈上的值,是以虛拟節點不宜過大,否則會出現查找時間過長的問題。如下圖所示,BZ在自己的單核1G記憶體的虛拟機中測試,發現4個節點如果都有10000個虛拟節點時在速度和均衡性方面都是不錯的。5 測試
6 參考文章
《Memcached 分布式緩存實作原理》。
一緻性hash在分布式系統中的應用
場景
如果要設計一套KV存儲的系統,使用者PUT一個key和value,存儲到系統中,并且提供使用者根據key來GET對應的value。要求随着使用者規模變大,系統是可以水準擴充的,主要要解決以下幾個問題。
- 系統是一個叢集,包含很多節點,如何解決使用者資料的存儲問題?保證使用者的資料盡可能平均分散到各個節點上。
- 如果使用者量增長,需要對叢集進行擴容,擴容完成後如何解決資料重新分布?保證不會出現熱點資料節點。
方案一:取模hash
要設計上面的系統,最簡單的方案就是取模hash。基本的原理就是:假設叢集一共有N台機器提供服務,對于使用者的請求編号,比如編号M,那麼就把這個請求通過取模發送到指定機器。
機器序号 = M % N
舉個例子,比如有下面這些機器
0. 192.168.1.1
1. 192.168.2.2
2. 192.168.3.3
3. 192.168.4.4
使用者PUT 100個請求,此時用戶端(可以設計)帶上一個編号,分别是1-100,那麼
1%4 = 1 <<-->> 192.168.2.2
2%4 = 2 <<-->> 192.168.3.3
3%4 = 3 <<-->> 192.168.4.4
...
100%4 = 0 <<-->> 192.168.1.1
這樣就可以很簡單把使用者的請求負載均衡到4台機器上了,解決了第一個問題。可以看看下面代碼實作
content = """Consistent hashing is a special kind of hashing such that when a hash table is resized and consistent hashing is used, only K/n keys need to be remapped on average, where K is the number of keys, and n is the number of slots. In contrast, in most traditional hash tables, a change in the number of array slots causes nearly all keys to be remapped."""
### 所有機器清單
servers = [
"192.168.1.1",
"192.168.2.2",
"192.168.3.3",
"192.168.4.4"
]
class NormalHash(object):
"""Normal Hash """
def __init__(self, nodes=None):
if nodes:
self.nodes = nodes
self.number = len(nodes)
def get_node(self, index):
"""Return node by index % servers number
"""
if index < 0:
return None
return self.nodes[index%self.number]
def normal_hash():
"""Normal hash usage example"""
nh = NormalHash(servers)
words = content.split()
# 模拟初始化每天機器的db
database = {}
for s in servers:
database[s] = []
for i in xrange(len(words)):
database[nh.get_node(i)].append(words[i])
print database
上面這部分是用戶端的代碼,NormalHash其實可以是在服務端實作,用戶端每次要PUT或者GET一個key,就調用服務端的sdk,擷取對應機器,然後操作。
取模hash情況下擴容機器
取模hash有一個明顯的缺點,就是上面提出的第二個問題,如何解決擴容機器後資料分布的問題?繼續上面的例子,比如這時候要新增一台機器,機器規模變成
0. 192.168.1.1
1. 192.168.2.2
2. 192.168.3.3
3. 192.168.4.4
4. 192.168.5.5
那麼問題就來了,如果現在使用者要通過GET請求資料,同樣還是1-100的請求編号,這時候取模就變成
i % 5
1%5 = 1 <<-->> 192.168.2.2
2%5 = 2 <<-->> 192.168.3.3
3%5 = 3 <<-->> 192.168.4.4
4%5 = 4 <<-->> 192.168.5.5 ->> 這裡開始就變化了
...
很顯然,對于新的PUT操作不會有影響,但是對于使用者老的資料GET請求, 資料就不一緻了,這時候必須要進行移資料,可以推斷出,這裡的資料變更是很大的,在80%左右。
但是,如果擴容的叢集是原來的倍數,之前是N台,現在擴容到 M * N台,那麼資料遷移量是50%。
取模hash總結
取模hash能解決負載均衡問題,而且實作很簡單,維護meta資訊成本也很小,但是擴容叢集的時候,最好是按照整數倍擴容,否則資料遷移成本太高。
我個人覺得,取模hash已經能滿足業務比較小的場景了,在機器隻有幾台或者幾十台的時候,完全能夠應付了。而且這種方案很簡潔,實作起來很容易,很容易了解。
方案二:一緻性hash
一緻性hash基本實作如下圖,這張圖最早出現在是memcached分布式實作裡。如何了解一緻性hash呢?
- 首先我們設計一個環,假設這個環是由2^32 - 1個點組成,也就是說[0, 2^32)上的任意一個點都能在環上找到。
- 現在采用一個算法(md5就可以),把我們叢集中的伺服器以ip位址作為key,然後根據算法得到一個值,這個值映射到環上的一個點,然後還有對應的資料存儲區間
IP位址 hash value(例子) 資料範圍
192.168.1.1 -->> 1000 -->> (60000, 1000](可以看環來了解,和時鐘一樣)
192.168.2.2 -->> 8000 -->> (1000, 8000]
192.168.3.3 -->> 25000 -->> (8000, 25000]
192.168.4.4 -->> 60000 -->> (25000, 60000]
- 使用者的請求過來後,對key進行hash,也映射到環上的一個點,根據ip位址的資料範圍存儲到對應的節點上,圖上粉紅色的點就代表資料映射後的環上位置,然後箭頭就是代表存儲的節點位置
一緻性hash情況下擴容機器
一緻性hash在某種程度上是可以解決資料的負載均衡問題的,再來看看擴容的情況,這時候新增加一個節點,圖
機器情況變成
IP位址 hash value(例子) 資料範圍
192.168.1.1 -->> 1000 -->> (60000, 1000](注意:取模後的邏輯大小)
192.168.2.2 -->> 8000 -->> (1000, 8000]
192.168.5.5 -->> 15000 -->> (8000, 15000] (新增的)
192.168.3.3 -->> 25000 -->> (15000, 25000]
192.168.4.4 -->> 60000 -->> (25000, 60000]
這時候被影響的資料範圍僅僅是(8000, 15000]的資料,這部分需要做遷移。同樣的如果有一台機器當機,那麼受影響的也隻是比這台機器對應環上的點大,比下一個節點值小的點。
一緻性hash總結
一緻性hash能解決熱點分布的問題,對于縮容和擴容也能低成本進行。但是一緻性hash在小規模叢集中,就會有問題,很容易出現資料熱點分布不均勻的現象,因為當機器數量比較少的時候,hash出來很有可能各自幾點管理的“範圍”有大有小。而且一旦規模比較小的情況下,如果資料原本是均勻分布的,這時候新加入一個節點,就會影響資料分布不均勻。
虛拟節點
虛拟節點可以解決一緻性hash在節點比較少的情況下的問題,簡單而言就是在一個節點實際虛拟出多個節點,對應到環上的值,然後按照順時針或者逆時針劃分區間
下面貼上一緻性hash的代碼,replicas實作了虛拟節點,當replicas=1的時候,就退化到上面的圖,一個節點真實對應到一個環上的點。
# -*- coding: UTF-8 -*-
import md5
content = """Consistent hashing is a special kind of hashing such that when a hash table is resized and consistent hashing is used, only K/n keys need to be remapped on average, where K is the number of keys, and n is the number of slots. In contrast, in most traditional hash tables, a change in the number of array slots causes nearly all keys to be remapped."""
# 所有機器清單
servers = [
"192.168.1.1",
"192.168.2.2",
"192.168.3.3",
"192.168.4.4"
]
class HashRing(object):
def __init__(self, nodes=None, replicas=3):
"""Manages a hash ring.
`nodes` is a list of objects that have a proper __str__ representation.
`replicas` indicates how many virtual points should be used pr. node,
replicas are required to improve the distribution.
"""
self.replicas = replicas
self.ring = dict()
self._sorted_keys = []
if nodes:
for node in nodes:
self.add_node(node)
def add_node(self, node):
"""Adds a `node` to the hash ring (including a number of replicas).
"""
for i in xrange(0, self.replicas):
key = self.gen_key('%s:%s' % (node, i))
self.ring[key] = node
self._sorted_keys.append(key)
self._sorted_keys.sort()
def remove_node(self, node):
"""Removes `node` from the hash ring and its replicas.
"""
for i in xrange(0, self.replicas):
key = self.gen_key('%s:%s' % (node, i))
del self.ring[key]
self._sorted_keys.remove(key)
def get_node(self, string_key):
"""Given a string key a corresponding node in the hash ring is returned.
If the hash ring is empty, `None` is returned.
"""
return self.get_node_pos(string_key)[0]
def get_node_pos(self, string_key):
"""Given a string key a corresponding node in the hash ring is returned
along with it's position in the ring.
If the hash ring is empty, (`None`, `None`) is returned.
"""
if not self.ring:
return None, None
key = self.gen_key(string_key)
nodes = self._sorted_keys
for i in xrange(0, len(nodes)):
node = nodes[i]
if key <= node:
return self.ring[node], i
return self.ring[nodes[0]], 0
def get_nodes(self, string_key):
"""Given a string key it returns the nodes as a generator that can hold the key.
The generator is never ending and iterates through the ring
starting at the correct position.
"""
if not self.ring:
yield None, None
node, pos = self.get_node_pos(string_key)
for key in self._sorted_keys[pos:]:
yield self.ring[key]
while True:
for key in self._sorted_keys:
yield self.ring[key]
def gen_key(self, key):
"""Given a string key it returns a long value,
this long value represents a place on the hash ring.
md5 is currently used because it mixes well.
"""
m = md5.new()
m.update(key)
return long(m.hexdigest(), 16)
def consistent_hash():
# 模拟初始化每天機器的db
database = {}
for s in servers:
database[s] = []
hr = HashRing(servers)
for w in words.split():
database[hr.get_node(w)].append(w)
print database
consistent_hash()
标簽:hash, distributed
http://www.firefoxbug.com/index.php/archives/2791/