天天看點

分布式系統:一緻性hash算法 & 在分布式系統中的應用

前段時間在了解分布式,發現firefoxbug在部落格中寫的這篇對這個問題說明得比較清晰易懂,本文主要是自己的了解和實踐。

在後端一般會遇到這樣的場景:随着應用系統的通路量或者DB/檔案存儲系統的資料量增大,系統由于負載增大而出現響應延遲甚至down掉的情況。為了解決這個問題,往往會對系統采用垂直擴充和水準擴充的架構設計,而分布式系統正是水準擴充架構的一種應用實踐。

1 分布式系統要求

分布式設計的初衷就是為了解決單一服務端負載過大的問題,是以在對系統做水準擴充後,資料要盡量均勻地分布在每台伺服器節點的上(即不會出現熱點資料節點)。其次,如果後期需要擴容或者某一節點發生故障需要從叢集中剔除,那麼處理後的分布式系統應該做到對已存儲的資料影響最小,降低資料遷移的​

​成本​

​​和​

​風險​

​。

2 解決方法

由于機器的數量不可能是無限的,是以水準擴充的時候,要考慮把無限的資料通過一定的算法​

​平衡、有序、易擴充​

​地分布在這些機器上。

常見的做法是利用把要處理的資料進行編号,然後對機器的資料進行​

​取模運算​

​。例如,假設有10個資料(編号為0~9),機器數量為3(編号為0~2),那麼每個資料編号對機器數3取模後,0号機器存放了編号為0,3,6,9的資料;1号機器存了編号為1,4,7的資料;2号機器存放了編号為2,5,8的資料。

取模算法比較簡單,但是當某個伺服器節點出現故障或者新增節點後,需要對已有資料作大量的遷移。在memcached分布式原理中介紹了​

​Consistent Hashing​

​算法,它能較好地解決這個問題。

分布式系統:一緻性hash算法 & 在分布式系統中的應用

3 一緻性雜湊演算法原理

如上圖所示,memcached分布式提供的雜湊演算法的主要處理流程如下:

1、使用算法求出每個memcached伺服器節點(ip位址)的哈希值x,并将其配置設定到0~2^32的圓上(值域);
2、用同樣的方法求出存儲資料鍵的哈希值y,并映射到圓上。
3、按順時針方向查找第1個比y大的x,那麼y就分布在x前面那個節點上。      

4 示例程式

在firefoxbug的原文中提供了python2的示例程式,這裡改成了python3。注意,程式中對這4台機器都使用了虛拟節點(​

​replicas​

​),它可以增加資料分布的均勻性。

# -*- coding: UTF-8 -*-

'''
FileName:      consistenthashdistributed1.sh
Description:   分布式系統:一緻性hash算法的應用
Simple Usage:  python consistenthashdistributed1.py [numbers of replicate]
Reference:     http://www.firefoxbug.com/index.php/archives/2791/
(c) 2018.02.17 vfhky https://typecodes.com/python/consistenthashdistributed1.html
'''

import sys
import hashlib

CONTENT = """Consistent hashing is a special kind of hashing such that when a hash table is resized and consistent hashing is used, only K/n keys need to be remapped on average, where K is the number of keys, and n is the number of slots. In contrast, in most traditional hash tables, a change in the number of array slots causes nearly all keys to be remapped."""

# 所有機器清單
SERVERS = [
    "192.168.1.1",
    "192.168.2.2",
    "192.168.3.3",
    "192.168.4.4"
]


class HashRing(object):
    """Constructs.
    """
    def __init__(self, nodes=None, replicas=3):
        """Manages a hash ring.

        `nodes` is a list of objects that have a proper __str__ representation.
        `replicas` indicates how many virtual points should be used pr. node,
        replicas are required to improve the distribution.
        """
        self.replicas = replicas

        self.ring = dict()
        self._sorted_keys = []

        if nodes:
            for node in nodes:
                self.add_node(node)

    def add_node(self, node):
        """Adds a `node` to the hash ring (including a number of replicas).
        """
        for i in range(0, self.replicas):
            key = self.gen_key('%s:%s' % (node, i))
            self.ring[key] = node
            # print("key=[%s]=[%s]." %(key, node))
            self._sorted_keys.append(key)

        self._sorted_keys.sort()
        #print("%s" %(self._sorted_keys))

    def remove_node(self, node):
        """Removes `node` from the hash ring and its replicas.
        """
        for i in range(0, self.replicas):
            key = self.gen_key('%s:%s' % (node, i))
            del self.ring[key]
            self._sorted_keys.remove(key)

    def get_node(self, string_key):
        """Given a string key a corresponding node in the hash ring is returned.

        If the hash ring is empty, `None` is returned.
        """
        return self.get_node_pos(string_key)[0]

    def get_node_pos(self, string_key):
        """Given a string key a corresponding node in the hash ring is returned
        along with it's position in the ring.

        If the hash ring is empty, (`None`, `None`) is returned.
        """
        if not self.ring:
            return None, None

        key = self.gen_key(string_key)

        nodes = self._sorted_keys
        nodes_num = len(nodes)
        for i in range(0, nodes_num):
            node = nodes[i]
            if key <= node:
                return self.ring[node], i

        # 對于key>node節點key的,全部落在第1個key對應的節點(192.168.1.4)上,這樣就形成了1個閉環。
        print("[%s:%s] string_key=[%s] key=[%s] node=[%s] self.ring[nodes[0]]=[%s].\n" %(__file__, sys._getframe().f_lineno, string_key, key, node, self.ring[nodes[0]]))
        return self.ring[nodes[0]], 0

    def gen_key(self, key):
        """Given a string key it returns a long value,
        this long value represents a place on the hash ring.

        md5 is currently used because it mixes well.
        """
        m = hashlib.md5()
        m.update(key.encode('utf-8'))
        return m.hexdigest()


def consistent_hash(replicas):
    '''docstring'''
    # 模拟初始化每天機器的db
    database = {}
    for s in SERVERS:
        database[s] = []

    hr = HashRing(SERVERS,replicas)

    for w in CONTENT.split():
        database[hr.get_node(w)].append(w)

    # 列印所有的節點下面的資料
    for node in database:
        print("[%s]=[%s].\n" %(node, database[node]))


if __name__ == '__main__':
    '''docstring'''
    replicas = 3

    if len(sys.argv) > 1:
        replicas = long(sys.argv[1])

    if( replicas < 3 or replicas > 100000 ):
        print( "Rreplicas should lower than 100000." )
        sys.exit()

    consistent_hash(replicas)      

上面程式在查找落地節點時,采用的是​

​周遊​

​整個hash圈上的值,是以虛拟節點不宜過大,否則會出現查找時間過長的問題。如下圖所示,BZ在自己的單核1G記憶體的虛拟機中測試,發現4個節點如果都有10000個虛拟節點時在速度和均衡性方面都是不錯的。5 測試

分布式系統:一緻性hash算法 &amp; 在分布式系統中的應用

6 參考文章

​​《Memcached 分布式緩存實作原理》​​。

一緻性hash在分布式系統中的應用

場景

如果要設計一套KV存儲的系統,使用者PUT一個key和value,存儲到系統中,并且提供使用者根據key來GET對應的value。要求随着使用者規模變大,系統是可以水準擴充的,主要要解決以下幾個問題。

  1. 系統是一個叢集,包含很多節點,如何解決使用者資料的存儲問題?保證使用者的資料盡可能平均分散到各個節點上。
  2. 如果使用者量增長,需要對叢集進行擴容,擴容完成後如何解決資料重新分布?保證不會出現熱點資料節點。

方案一:取模hash

要設計上面的系統,最簡單的方案就是取模hash。基本的原理就是:假設叢集一共有N台機器提供服務,對于使用者的請求編号,比如編号M,那麼就把這個請求通過取模發送到指定機器。

機器序号 = M % N      

舉個例子,比如有下面這些機器

0. 192.168.1.1
1. 192.168.2.2
2. 192.168.3.3
3. 192.168.4.4      

使用者PUT 100個請求,此時用戶端(可以設計)帶上一個編号,分别是1-100,那麼

1%4 = 1 <<-->> 192.168.2.2
2%4 = 2 <<-->> 192.168.3.3
3%4 = 3 <<-->> 192.168.4.4
...
100%4 = 0 <<-->> 192.168.1.1      

這樣就可以很簡單把使用者的請求負載均衡到4台機器上了,解決了第一個問題。可以看看下面代碼實作

content = """Consistent hashing is a special kind of hashing such that when a hash table is resized and consistent hashing is used, only K/n keys need to be remapped on average, where K is the number of keys, and n is the number of slots. In contrast, in most traditional hash tables, a change in the number of array slots causes nearly all keys to be remapped."""

### 所有機器清單
servers = [
    "192.168.1.1",
    "192.168.2.2",
    "192.168.3.3",
    "192.168.4.4"
]

class NormalHash(object):
    """Normal Hash """
    def __init__(self, nodes=None):
        if nodes:
            self.nodes = nodes
            self.number = len(nodes)

    def get_node(self, index):
        """Return node by index % servers number
        """
        if index < 0:
            return None
        return self.nodes[index%self.number]

def normal_hash():
    """Normal hash usage example"""
    nh = NormalHash(servers)
    words = content.split()

    # 模拟初始化每天機器的db
    database = {}
    for s in servers:
        database[s] = []

    for i in xrange(len(words)):
        database[nh.get_node(i)].append(words[i])

    print database
      

上面這部分是用戶端的代碼,NormalHash其實可以是在服務端實作,用戶端每次要PUT或者GET一個key,就調用服務端的sdk,擷取對應機器,然後操作。

取模hash情況下擴容機器

取模hash有一個明顯的缺點,就是上面提出的第二個問題,如何解決擴容機器後資料分布的問題?繼續上面的例子,比如這時候要新增一台機器,機器規模變成

0. 192.168.1.1
1. 192.168.2.2
2. 192.168.3.3
3. 192.168.4.4
4. 192.168.5.5      

那麼問題就來了,如果現在使用者要通過GET請求資料,同樣還是1-100的請求編号,這時候取模就變成

i % 5

1%5 = 1 <<-->> 192.168.2.2
2%5 = 2 <<-->> 192.168.3.3
3%5 = 3 <<-->> 192.168.4.4
4%5 = 4 <<-->> 192.168.5.5  ->> 這裡開始就變化了
...
      

很顯然,對于新的PUT操作不會有影響,但是對于使用者老的資料GET請求, 資料就不一緻了,這時候必須要進行移資料,可以推斷出,這裡的資料變更是很大的,在80%左右。

但是,如果擴容的叢集是原來的倍數,之前是N台,現在擴容到 M * N台,那麼資料遷移量是50%。

取模hash總結

取模hash能解決負載均衡問題,而且實作很簡單,維護meta資訊成本也很小,但是擴容叢集的時候,最好是按照整數倍擴容,否則資料遷移成本太高。

我個人覺得,取模hash已經能滿足業務比較小的場景了,在機器隻有幾台或者幾十台的時候,完全能夠應付了。而且這種方案很簡潔,實作起來很容易,很容易了解。

方案二:一緻性hash

一緻性hash基本實作如下圖,這張圖最早出現在是memcached分布式實作裡。如何了解一緻性hash呢?

分布式系統:一緻性hash算法 &amp; 在分布式系統中的應用
  • 首先我們設計一個環,假設這個環是由2^32 - 1個點組成,也就是說[0, 2^32)上的任意一個點都能在環上找到。
  • 現在采用一個算法(md5就可以),把我們叢集中的伺服器以ip位址作為key,然後根據算法得到一個值,這個值映射到環上的一個點,然後還有對應的資料存儲區間
IP位址          hash     value(例子)           資料範圍
192.168.1.1     -->>        1000        -->>  (60000, 1000](可以看環來了解,和時鐘一樣)
192.168.2.2     -->>        8000        -->>   (1000, 8000]
192.168.3.3     -->>        25000       -->>   (8000, 25000]
192.168.4.4     -->>        60000       -->>   (25000, 60000]      
  • 使用者的請求過來後,對key進行hash,也映射到環上的一個點,根據ip位址的資料範圍存儲到對應的節點上,圖上粉紅色的點就代表資料映射後的環上位置,然後箭頭就是代表存儲的節點位置

一緻性hash情況下擴容機器

一緻性hash在某種程度上是可以解決資料的負載均衡問題的,再來看看擴容的情況,這時候新增加一個節點,圖

分布式系統:一緻性hash算法 &amp; 在分布式系統中的應用

機器情況變成

IP位址          hash     value(例子)           資料範圍
192.168.1.1     -->>        1000        -->>  (60000, 1000](注意:取模後的邏輯大小)
192.168.2.2     -->>        8000        -->>   (1000, 8000]
192.168.5.5     -->>       15000        -->>  (8000, 15000] (新增的)
192.168.3.3     -->>        25000       -->>   (15000, 25000]
192.168.4.4     -->>        60000       -->>   (25000, 60000]      

這時候被影響的資料範圍僅僅是(8000, 15000]的資料,這部分需要做遷移。同樣的如果有一台機器當機,那麼受影響的也隻是比這台機器對應環上的點大,比下一個節點值小的點。

一緻性hash總結

一緻性hash能解決熱點分布的問題,對于縮容和擴容也能低成本進行。但是一緻性hash在小規模叢集中,就會有問題,很容易出現資料熱點分布不均勻的現象,因為當機器數量比較少的時候,hash出來很有可能各自幾點管理的“範圍”有大有小。而且一旦規模比較小的情況下,如果資料原本是均勻分布的,這時候新加入一個節點,就會影響資料分布不均勻。

虛拟節點

虛拟節點可以解決一緻性hash在節點比較少的情況下的問題,簡單而言就是在一個節點實際虛拟出多個節點,對應到環上的值,然後按照順時針或者逆時針劃分區間

下面貼上一緻性hash的代碼,replicas實作了虛拟節點,當replicas=1的時候,就退化到上面的圖,一個節點真實對應到一個環上的點。

# -*- coding: UTF-8 -*-

import md5

content = """Consistent hashing is a special kind of hashing such that when a hash table is resized and consistent hashing is used, only K/n keys need to be remapped on average, where K is the number of keys, and n is the number of slots. In contrast, in most traditional hash tables, a change in the number of array slots causes nearly all keys to be remapped."""

# 所有機器清單
servers = [
    "192.168.1.1",
    "192.168.2.2",
    "192.168.3.3",
    "192.168.4.4"
]

class HashRing(object):

    def __init__(self, nodes=None, replicas=3):
        """Manages a hash ring.

        `nodes` is a list of objects that have a proper __str__ representation.
        `replicas` indicates how many virtual points should be used pr. node,
        replicas are required to improve the distribution.
        """
        self.replicas = replicas

        self.ring = dict()
        self._sorted_keys = []

        if nodes:
            for node in nodes:
                self.add_node(node)

    def add_node(self, node):
        """Adds a `node` to the hash ring (including a number of replicas).
        """
        for i in xrange(0, self.replicas):
            key = self.gen_key('%s:%s' % (node, i))
            self.ring[key] = node
            self._sorted_keys.append(key)

        self._sorted_keys.sort()

    def remove_node(self, node):
        """Removes `node` from the hash ring and its replicas.
        """
        for i in xrange(0, self.replicas):
            key = self.gen_key('%s:%s' % (node, i))
            del self.ring[key]
            self._sorted_keys.remove(key)

    def get_node(self, string_key):
        """Given a string key a corresponding node in the hash ring is returned.

        If the hash ring is empty, `None` is returned.
        """
        return self.get_node_pos(string_key)[0]

    def get_node_pos(self, string_key):
        """Given a string key a corresponding node in the hash ring is returned
        along with it's position in the ring.

        If the hash ring is empty, (`None`, `None`) is returned.
        """
        if not self.ring:
            return None, None

        key = self.gen_key(string_key)

        nodes = self._sorted_keys
        for i in xrange(0, len(nodes)):
            node = nodes[i]
            if key <= node:
                return self.ring[node], i

        return self.ring[nodes[0]], 0

    def get_nodes(self, string_key):
        """Given a string key it returns the nodes as a generator that can hold the key.

        The generator is never ending and iterates through the ring
        starting at the correct position.
        """
        if not self.ring:
            yield None, None

        node, pos = self.get_node_pos(string_key)
        for key in self._sorted_keys[pos:]:
            yield self.ring[key]

        while True:
            for key in self._sorted_keys:
                yield self.ring[key]

    def gen_key(self, key):
        """Given a string key it returns a long value,
        this long value represents a place on the hash ring.

        md5 is currently used because it mixes well.
        """
        m = md5.new()
        m.update(key)
        return long(m.hexdigest(), 16)

def consistent_hash():

    # 模拟初始化每天機器的db
    database = {}
    for s in servers:
        database[s] = []

    hr = HashRing(servers)

    for w in words.split():
        database[hr.get_node(w)].append(w)

    print database

consistent_hash()
      

标簽:​​hash​​​, ​​distributed​​

​​http://www.firefoxbug.com/index.php/archives/2791/​​

Kotlin 開發者社群

繼續閱讀