天天看點

Ketama一緻性雜湊演算法的實作

一、修改test程式

在test程式中加入memcached_behavior_set_distribution(memc, MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY),設定分布式算法。

二、memcached_behavior_set_distribution

memcached_behavior_set_distribution()仍然會調用run_distribution()。

memcached_return_t memcached_behavior_set_distribution(memcached_st *ptr, memcached_server_distribution_t type)
{
  if (type < MEMCACHED_DISTRIBUTION_CONSISTENT_MAX)
  {
    if (MEMCACHED_DISTRIBUTION_CONSISTENT_WEIGHTED)  //此處該項不為0,是以執行ptr->ketama.weighted = true;
    {
      ptr->ketama.weighted= true;
    }
    else
    {
      ptr->ketama.weighted= false;
    }

    ptr->distribution= type;
    return run_distribution(ptr);  //對于 MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY,調用update_continuum(ptr)
  }

  return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
                             memcached_literal_param("Invalid memcached_server_distribution_t"));
}
           

三、update_continuum

static memcached_return_t update_continuum(memcached_st *ptr)
{
  uint32_t continuum_index= 0;
  memcached_server_st *list;
  uint32_t pointer_counter= 0;
  uint32_t pointer_per_server= MEMCACHED_POINTS_PER_SERVER; //此值為100,表示産生100個虛拟節點
  uint32_t pointer_per_hash= 1;
  uint32_t live_servers= 0;
  struct timeval now;

  if (gettimeofday(&now, NULL))
  {
    return memcached_set_errno(*ptr, errno, MEMCACHED_AT);
  }

  list= memcached_server_list(ptr);

  /* count live servers (those without a retry delay set) */
  bool is_auto_ejecting= _is_auto_eject_host(ptr);
  if (is_auto_ejecting)             //此次執行為false,if不執行
  {
    live_servers= 0;
    ptr->ketama.next_distribution_rebuild= 0;
    for (uint32_t host_index= 0; host_index < memcached_server_count(ptr); ++host_index)
    {
      if (list[host_index].next_retry <= now.tv_sec)
      {
        live_servers++;
      }
      else
      {
        if (ptr->ketama.next_distribution_rebuild == 0 or list[host_index].next_retry < ptr->ketama.next_distribution_rebuild)
        {
          ptr->ketama.next_distribution_rebuild= list[host_index].next_retry;  //設定下一次rebuild的時間,就是之後最靠前的retry時。
        }
      }
    }
  }
  else
  {
    live_servers= memcached_server_count(ptr);      //本例為3個伺服器
  }

  uint64_t is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);    //本例為true,由memcached_behavior_set_distribution()設定
  uint32_t points_per_server= (uint32_t) (is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA : MEMCACHED_POINTS_PER_SERVER);//前者為160,後者為100

  if (not live_servers)
  {
    return MEMCACHED_SUCCESS;
  }

  if (live_servers > ptr->ketama.continuum_count)
  {
    memcached_continuum_item_st *new_ptr;
    //MEMCACHED_CONTINUUM_ADDITION預設為10.此處配置設定了(3+10)*160個emcached_continuum_item_st元素的數組。
    new_ptr= libmemcached_xrealloc(ptr, ptr->ketama.continuum, (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server, memcached_continuum_item_st);

    if (new_ptr == 0)
    {
      return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
    }

    ptr->ketama.continuum= new_ptr;
    ptr->ketama.continuum_count= live_servers + MEMCACHED_CONTINUUM_ADDITION;
  }

  uint64_t total_weight= 0;
  if (is_ketama_weighted)
  {
    for (uint32_t host_index = 0; host_index < memcached_server_count(ptr); ++host_index)
    {
      if (is_auto_ejecting == false or list[host_index].next_retry <= now.tv_sec)
      {
        total_weight += list[host_index].weight;   //本例未設定weight,是以每個伺服器weight預設為1。是以total_weight此處為3.
      }
    }
  }

  for (uint32_t host_index= 0; host_index < memcached_server_count(ptr); ++host_index)
  {
    if (is_auto_ejecting and list[host_index].next_retry > now.tv_sec)
    {
      continue;
    }

    if (is_ketama_weighted) //根據權重大小設定每個伺服器虛拟節點的個數。
    {
        float pct= (float)list[host_index].weight / (float)total_weight;
        pointer_per_server= (uint32_t) ((::floor((float) (pct * MEMCACHED_POINTS_PER_SERVER_KETAMA / 4 * (float)live_servers + 0.0000000001))) * 4);
        pointer_per_hash= 4;
    }


    if (ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY)
    {
      for (uint32_t pointer_index= 0;
           pointer_index < pointer_per_server / pointer_per_hash;
           pointer_index++)   //由于pointer_per_hash為4,本例中pointer_index為0-39
      {
        char sort_host[1 +MEMCACHED_NI_MAXHOST +1 +MEMCACHED_NI_MAXSERV +1 + MEMCACHED_NI_MAXSERV ]= "";
        int sort_host_length;

        // Spymemcached ketema key format is: hostname/ip:port-index
        // If hostname is not available then: /ip:port-index
        sort_host_length= snprintf(sort_host, sizeof(sort_host),
                                   "/%s:%u-%u",
                                   list[host_index].hostname,
                                   (uint32_t)list[host_index].port,
                                   pointer_index);
	//結果:sort_host類似于:"/localhost:11211-0"
        if (size_t(sort_host_length) >= sizeof(sort_host) or sort_host_length < 0)
        {
          return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, 
                                     memcached_literal_param("snprintf(sizeof(sort_host))"));
        }

        if (is_ketama_weighted)
        {
          for (uint32_t x= 0; x < pointer_per_hash; x++) //從此處可以看出pointer_per_hash的意義為每一個hash可以為四個虛拟節點産生哈希值
          {                                            //因為ketama_server_hash()采用MD5,産生16位元組的MD5值,而每個虛拟節點的哈希值為4位元組。
            uint32_t value= ketama_server_hash(sort_host, (size_t)sort_host_length, x);
            ptr->ketama.continuum[continuum_index].index= host_index;
            ptr->ketama.continuum[continuum_index++].value= value;
          }
        }
        else
        {             //否則此處pointer_per_hash為1,則給每個虛拟節點産生一個四位元組的hash值即可。
          uint32_t value= hashkit_digest(&ptr->hashkit, sort_host, (size_t)sort_host_length);
          ptr->ketama.continuum[continuum_index].index= host_index;
          ptr->ketama.continuum[continuum_index++].value= value;
        }
      }
    }
    else
    {
      for (uint32_t pointer_index= 1;
           pointer_index <= pointer_per_server / pointer_per_hash;
           pointer_index++)
      {
        char sort_host[MEMCACHED_NI_MAXHOST +1 +MEMCACHED_NI_MAXSERV +1 +MEMCACHED_NI_MAXSERV]= "";
        int sort_host_length;

        if (list[host_index].port == MEMCACHED_DEFAULT_PORT)
        {
          sort_host_length= snprintf(sort_host, sizeof(sort_host),
                                     "%s-%u",
                                     list[host_index].hostname,
                                     pointer_index - 1);
        }
        else
        {
          sort_host_length= snprintf(sort_host, sizeof(sort_host),
                                     "%s:%u-%u",
                                     list[host_index].hostname,
                                     (uint32_t)list[host_index].port,
                                     pointer_index - 1);
        }

        if (size_t(sort_host_length) >= sizeof(sort_host) or sort_host_length < 0)
        {
          return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, 
                                     memcached_literal_param("snprintf(sizeof(sort_host)))"));
        }

        if (is_ketama_weighted)
        {
          for (uint32_t x = 0; x < pointer_per_hash; x++)
          {
            uint32_t value= ketama_server_hash(sort_host, (size_t)sort_host_length, x);
            ptr->ketama.continuum[continuum_index].index= host_index;
            ptr->ketama.continuum[continuum_index++].value= value;
          }
        }
        else
        {
          uint32_t value= hashkit_digest(&ptr->hashkit, sort_host, (size_t)sort_host_length);
          ptr->ketama.continuum[continuum_index].index= host_index;
          ptr->ketama.continuum[continuum_index++].value= value;
        }
      }
    }

    pointer_counter+= pointer_per_server;
  }

  WATCHPOINT_ASSERT(ptr);
  WATCHPOINT_ASSERT(ptr->ketama.continuum);
  WATCHPOINT_ASSERT(memcached_server_count(ptr) * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE);
  ptr->ketama.continuum_points_counter= pointer_counter;
  qsort(ptr->ketama.continuum, ptr->ketama.continuum_points_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp);
  //按value大小進行排序(從小到大),友善之後的查找。
  return MEMCACHED_SUCCESS;
}
           

四、mcached_generate_hash_with_redistribution 

1、根據前一篇,首先調用_generate_hash_wrapper(),最終調用hashkit_one_at_a_time ()計算key的哈希值;

2、_regen_for_auto_eject(ptr)根據

static inline void _regen_for_auto_eject(memcached_st *ptr)
{
  if (_is_auto_eject_host(ptr) && ptr->ketama.next_distribution_rebuild) //根據是否自動抛出伺服器
  {
    struct timeval now;

    if (gettimeofday(&now, NULL) == 0 and
        now.tv_sec > ptr->ketama.next_distribution_rebuild)  //如果現在時間大于重建時間
    {                        //從前面可知,重建時間為最近要retry的伺服器的時間
      run_distribution(ptr);
    }
  }
}
           

3、dispatch_host(ptr, hash),根據上一篇,就已經知道采用二分法來找到hash值比key的哈希值稍大的那個虛拟節點,這樣就可以确定serverID了。

case MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY:
    {
      uint32_t num= ptr->ketama.continuum_points_counter;
      WATCHPOINT_ASSERT(ptr->ketama.continuum);

      memcached_continuum_item_st *begin, *end, *left, *right, *middle;
      begin= left= ptr->ketama.continuum;
      end= right= ptr->ketama.continuum + num;  //right為最後一個元素後的位置,簡化邊界問題
      //可以假設最後一個元素後的位置為無限大,這樣恒定條件:right總是大于等于key的哈希值
      while (left < right)
      {
        middle= left + (right - left) / 2;
        if (middle->value < hash)
          left= middle + 1;
        else
          right= middle;
      }
      if (right == end) //當key的hash值很大,復原到begin
        right= begin;
      return right->index;
    }
           

五、memcached_connect

如果連接配接成功,則調用memcached_mark_server_as_clean()使得server->server_failure_counter= 0,server->next_retry= 0;之後的運作與之前介紹的幾篇一緻。這裡,我們終止掉程式将要連接配接的伺服器,連接配接當然會出現失敗,這樣memcached_connect()會按以下執行:

set_last_disconnected_host(server); //設定server->root->last_disconnected_server
  if (memcached_has_current_error(*server))
  {
    memcached_mark_server_for_timeout(server);
    assert(memcached_failed(memcached_server_error_return(server)));
  }
  else
  {
    memcached_set_error(*server, rc, MEMCACHED_AT);
    memcached_mark_server_for_timeout(server);
  }
           
static inline void memcached_mark_server_for_timeout(memcached_server_write_instance_st server)
{
  if (server->state != MEMCACHED_SERVER_STATE_IN_TIMEOUT)
  {
    struct timeval next_time;
    if (gettimeofday(&next_time, NULL) == 0)
    {
      server->next_retry= next_time.tv_sec +server->root->retry_timeout;//server->root->retry_timeout預設為2
    }
    else
    {
      server->next_retry= 1; // Setting the value to 1 causes the timeout to occur immediatly
    }

    server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;  //進入逾時狀态
    if (server->server_failure_counter_query_id != server->root->query_id)
    {
      server->server_failure_counter++;
      server->server_failure_counter_query_id= server->root->query_id;
    }
    set_last_disconnected_host(server);
  }
}
           

六、backoff_handling

在memcached_connect()中,一開始就調用了此函數,用以檢查準備連接配接的那個server的出錯狀況,并相應處理。這是一個關鍵性的函數。

/*
  backoff_handling()

  Based on time/failure count fail the connect without trying. This prevents waiting in a state where
  we get caught spending cycles just waiting.
*/
static memcached_return_t backoff_handling(memcached_server_write_instance_st server, bool& in_timeout)
{
  struct timeval curr_time;
  bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);

  /* 
    If we hit server_failure_limit then something is completely wrong about the server.

    1) If autoeject is enabled we do that.
    2) If not? We go into timeout again, there is much else to do :(
  */
  if (server->server_failure_counter >= server->root->server_failure_limit)
  {  //嘗試多次,每次都失敗,說明該server确實出現了問題。是以需要去除該伺服器。
    /*
      We just auto_eject if we hit this point 
    */
    if (_is_auto_eject_host(server->root))  //假設我們允許自動抛出出錯的伺服器,往下執行。
    {
      set_last_disconnected_host(server);

      // Retry dead servers if requested
      if (_gettime_success and server->root->dead_timeout > 0) //如果設定了server->root->dead_timeout,它的值應該大一些,這樣過較長時間再看看伺服器恢複了沒有。本例它為0.
      {
        server->next_retry= curr_time.tv_sec +server->root->dead_timeout;

        // We only retry dead servers once before assuming failure again
        server->server_failure_counter= server->root->server_failure_limit -1;
      }

      memcached_return_t rc;
      if (memcached_failed(rc= run_distribution((memcached_st *)server->root)))  //
      {
        return memcached_set_error(*server, rc, MEMCACHED_AT, memcached_literal_param("Backoff handling failed during run_distribution"));
      }

      return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
    }

    server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;

    // Sanity check/setting
    if (server->next_retry == 0)
    {
      server->next_retry= 1;
    }
  }

  if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT) //如果上一次連接配接該伺服器失敗,伺服器就會進入該狀态(見memcached_connect()中的memcached_mark_server_for_timeout())
  {                                     //同時會設定server->next_retry。
    /*
      If next_retry is less then our current time, then we reset and try everything again.
    */
    if (_gettime_success and server->next_retry < curr_time.tv_sec)//這說明server進入逾時狀态已經有一段時間了(next_retry已到),伺服器應該恢複正常了,我們可以再嘗試連接配接一下伺服器,是以進入MEMCACHED_SERVER_STATE_NEW狀态。
    {
      server->state= MEMCACHED_SERVER_STATE_NEW;
    }
    else
    {
      return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
    }

    in_timeout= true;
  }

  return MEMCACHED_SUCCESS;
}
           

七、總結

經過跟蹤,無疑run_distribution()是一緻性哈希最核心的函數。它在哪些地方被調用呢?

1、加入server時,memcached_server_push()或server_add()用于添加server,它會調用run_distribution()理所當然。

2、當server失效時(判定失效是根據連接配接失敗次數有沒有超過limit來決定的,失效後徹底抛棄server還是過較長的一段時間重試,取決于server->root->dead_timeout失效節點重試間隔),這可以參考上一節的backoff_handling()。不過是否調用run_distribution(),還取決于_is_auto_eject_host(server->root),即ptr->flags.auto_eject_hosts(預設為false)。

3、memcached_generate_hash_with_redistribution()用于根據key産生serverID,其中調用_regen_for_auto_eject(ptr),該函數檢測有沒有失效的伺服器剛剛可能重新開機了,如果是,則調用run_distribution()。

4、其他地方,如memcached_behavior_set_distribution()設定分布式算法,memcached_clone()也會調用。

下一篇: memcached_get

繼續閱讀