天天看点

Ketama一致性哈希算法的实现

一、修改test程序

在test程序中加入memcached_behavior_set_distribution(memc, MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY),设置分布式算法。

二、memcached_behavior_set_distribution

memcached_behavior_set_distribution()仍然会调用run_distribution()。

memcached_return_t memcached_behavior_set_distribution(memcached_st *ptr, memcached_server_distribution_t type)
{
  if (type < MEMCACHED_DISTRIBUTION_CONSISTENT_MAX)
  {
    if (MEMCACHED_DISTRIBUTION_CONSISTENT_WEIGHTED)  //此处该项不为0,所以执行ptr->ketama.weighted = true;
    {
      ptr->ketama.weighted= true;
    }
    else
    {
      ptr->ketama.weighted= false;
    }

    ptr->distribution= type;
    return run_distribution(ptr);  //对于 MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY,调用update_continuum(ptr)
  }

  return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
                             memcached_literal_param("Invalid memcached_server_distribution_t"));
}
           

三、update_continuum

static memcached_return_t update_continuum(memcached_st *ptr)
{
  uint32_t continuum_index= 0;
  memcached_server_st *list;
  uint32_t pointer_counter= 0;
  uint32_t pointer_per_server= MEMCACHED_POINTS_PER_SERVER; //此值为100,表示产生100个虚拟节点
  uint32_t pointer_per_hash= 1;
  uint32_t live_servers= 0;
  struct timeval now;

  if (gettimeofday(&now, NULL))
  {
    return memcached_set_errno(*ptr, errno, MEMCACHED_AT);
  }

  list= memcached_server_list(ptr);

  /* count live servers (those without a retry delay set) */
  bool is_auto_ejecting= _is_auto_eject_host(ptr);
  if (is_auto_ejecting)             //此次执行为false,if不执行
  {
    live_servers= 0;
    ptr->ketama.next_distribution_rebuild= 0;
    for (uint32_t host_index= 0; host_index < memcached_server_count(ptr); ++host_index)
    {
      if (list[host_index].next_retry <= now.tv_sec)
      {
        live_servers++;
      }
      else
      {
        if (ptr->ketama.next_distribution_rebuild == 0 or list[host_index].next_retry < ptr->ketama.next_distribution_rebuild)
        {
          ptr->ketama.next_distribution_rebuild= list[host_index].next_retry;  //设置下一次rebuild的时间,就是之后最靠前的retry时。
        }
      }
    }
  }
  else
  {
    live_servers= memcached_server_count(ptr);      //本例为3个服务器
  }

  uint64_t is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);    //本例为true,由memcached_behavior_set_distribution()设置
  uint32_t points_per_server= (uint32_t) (is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA : MEMCACHED_POINTS_PER_SERVER);//前者为160,后者为100

  if (not live_servers)
  {
    return MEMCACHED_SUCCESS;
  }

  if (live_servers > ptr->ketama.continuum_count)
  {
    memcached_continuum_item_st *new_ptr;
    //MEMCACHED_CONTINUUM_ADDITION默认为10.此处分配了(3+10)*160个emcached_continuum_item_st元素的数组。
    new_ptr= libmemcached_xrealloc(ptr, ptr->ketama.continuum, (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server, memcached_continuum_item_st);

    if (new_ptr == 0)
    {
      return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
    }

    ptr->ketama.continuum= new_ptr;
    ptr->ketama.continuum_count= live_servers + MEMCACHED_CONTINUUM_ADDITION;
  }

  uint64_t total_weight= 0;
  if (is_ketama_weighted)
  {
    for (uint32_t host_index = 0; host_index < memcached_server_count(ptr); ++host_index)
    {
      if (is_auto_ejecting == false or list[host_index].next_retry <= now.tv_sec)
      {
        total_weight += list[host_index].weight;   //本例未设置weight,所以每个服务器weight默认为1。因此total_weight此处为3.
      }
    }
  }

  for (uint32_t host_index= 0; host_index < memcached_server_count(ptr); ++host_index)
  {
    if (is_auto_ejecting and list[host_index].next_retry > now.tv_sec)
    {
      continue;
    }

    if (is_ketama_weighted) //根据权重大小设置每个服务器虚拟节点的个数。
    {
        float pct= (float)list[host_index].weight / (float)total_weight;
        pointer_per_server= (uint32_t) ((::floor((float) (pct * MEMCACHED_POINTS_PER_SERVER_KETAMA / 4 * (float)live_servers + 0.0000000001))) * 4);
        pointer_per_hash= 4;
    }


    if (ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY)
    {
      for (uint32_t pointer_index= 0;
           pointer_index < pointer_per_server / pointer_per_hash;
           pointer_index++)   //由于pointer_per_hash为4,本例中pointer_index为0-39
      {
        char sort_host[1 +MEMCACHED_NI_MAXHOST +1 +MEMCACHED_NI_MAXSERV +1 + MEMCACHED_NI_MAXSERV ]= "";
        int sort_host_length;

        // Spymemcached ketema key format is: hostname/ip:port-index
        // If hostname is not available then: /ip:port-index
        sort_host_length= snprintf(sort_host, sizeof(sort_host),
                                   "/%s:%u-%u",
                                   list[host_index].hostname,
                                   (uint32_t)list[host_index].port,
                                   pointer_index);
	//结果:sort_host类似于:"/localhost:11211-0"
        if (size_t(sort_host_length) >= sizeof(sort_host) or sort_host_length < 0)
        {
          return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, 
                                     memcached_literal_param("snprintf(sizeof(sort_host))"));
        }

        if (is_ketama_weighted)
        {
          for (uint32_t x= 0; x < pointer_per_hash; x++) //从此处可以看出pointer_per_hash的意义为每一个hash可以为四个虚拟节点产生哈希值
          {                                            //因为ketama_server_hash()采用MD5,产生16字节的MD5值,而每个虚拟节点的哈希值为4字节。
            uint32_t value= ketama_server_hash(sort_host, (size_t)sort_host_length, x);
            ptr->ketama.continuum[continuum_index].index= host_index;
            ptr->ketama.continuum[continuum_index++].value= value;
          }
        }
        else
        {             //否则此处pointer_per_hash为1,则给每个虚拟节点产生一个四字节的hash值即可。
          uint32_t value= hashkit_digest(&ptr->hashkit, sort_host, (size_t)sort_host_length);
          ptr->ketama.continuum[continuum_index].index= host_index;
          ptr->ketama.continuum[continuum_index++].value= value;
        }
      }
    }
    else
    {
      for (uint32_t pointer_index= 1;
           pointer_index <= pointer_per_server / pointer_per_hash;
           pointer_index++)
      {
        char sort_host[MEMCACHED_NI_MAXHOST +1 +MEMCACHED_NI_MAXSERV +1 +MEMCACHED_NI_MAXSERV]= "";
        int sort_host_length;

        if (list[host_index].port == MEMCACHED_DEFAULT_PORT)
        {
          sort_host_length= snprintf(sort_host, sizeof(sort_host),
                                     "%s-%u",
                                     list[host_index].hostname,
                                     pointer_index - 1);
        }
        else
        {
          sort_host_length= snprintf(sort_host, sizeof(sort_host),
                                     "%s:%u-%u",
                                     list[host_index].hostname,
                                     (uint32_t)list[host_index].port,
                                     pointer_index - 1);
        }

        if (size_t(sort_host_length) >= sizeof(sort_host) or sort_host_length < 0)
        {
          return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, 
                                     memcached_literal_param("snprintf(sizeof(sort_host)))"));
        }

        if (is_ketama_weighted)
        {
          for (uint32_t x = 0; x < pointer_per_hash; x++)
          {
            uint32_t value= ketama_server_hash(sort_host, (size_t)sort_host_length, x);
            ptr->ketama.continuum[continuum_index].index= host_index;
            ptr->ketama.continuum[continuum_index++].value= value;
          }
        }
        else
        {
          uint32_t value= hashkit_digest(&ptr->hashkit, sort_host, (size_t)sort_host_length);
          ptr->ketama.continuum[continuum_index].index= host_index;
          ptr->ketama.continuum[continuum_index++].value= value;
        }
      }
    }

    pointer_counter+= pointer_per_server;
  }

  WATCHPOINT_ASSERT(ptr);
  WATCHPOINT_ASSERT(ptr->ketama.continuum);
  WATCHPOINT_ASSERT(memcached_server_count(ptr) * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE);
  ptr->ketama.continuum_points_counter= pointer_counter;
  qsort(ptr->ketama.continuum, ptr->ketama.continuum_points_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp);
  //按value大小进行排序(从小到大),方便之后的查找。
  return MEMCACHED_SUCCESS;
}
           

四、mcached_generate_hash_with_redistribution 

1、根据前一篇,首先调用_generate_hash_wrapper(),最终调用hashkit_one_at_a_time ()计算key的哈希值;

2、_regen_for_auto_eject(ptr)根据

static inline void _regen_for_auto_eject(memcached_st *ptr)
{
  if (_is_auto_eject_host(ptr) && ptr->ketama.next_distribution_rebuild) //根据是否自动抛出服务器
  {
    struct timeval now;

    if (gettimeofday(&now, NULL) == 0 and
        now.tv_sec > ptr->ketama.next_distribution_rebuild)  //如果现在时间大于重建时间
    {                        //从前面可知,重建时间为最近要retry的服务器的时间
      run_distribution(ptr);
    }
  }
}
           

3、dispatch_host(ptr, hash),根据上一篇,就已经知道采用二分法来找到hash值比key的哈希值稍大的那个虚拟节点,这样就可以确定serverID了。

case MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY:
    {
      uint32_t num= ptr->ketama.continuum_points_counter;
      WATCHPOINT_ASSERT(ptr->ketama.continuum);

      memcached_continuum_item_st *begin, *end, *left, *right, *middle;
      begin= left= ptr->ketama.continuum;
      end= right= ptr->ketama.continuum + num;  //right为最后一个元素后的位置,简化边界问题
      //可以假设最后一个元素后的位置为无限大,这样恒定条件:right总是大于等于key的哈希值
      while (left < right)
      {
        middle= left + (right - left) / 2;
        if (middle->value < hash)
          left= middle + 1;
        else
          right= middle;
      }
      if (right == end) //当key的hash值很大,回滚到begin
        right= begin;
      return right->index;
    }
           

五、memcached_connect

如果连接成功,则调用memcached_mark_server_as_clean()使得server->server_failure_counter= 0,server->next_retry= 0;之后的运行与之前介绍的几篇一致。这里,我们终止掉程序将要连接的服务器,连接当然会出现失败,这样memcached_connect()会按以下执行:

set_last_disconnected_host(server); //设置server->root->last_disconnected_server
  if (memcached_has_current_error(*server))
  {
    memcached_mark_server_for_timeout(server);
    assert(memcached_failed(memcached_server_error_return(server)));
  }
  else
  {
    memcached_set_error(*server, rc, MEMCACHED_AT);
    memcached_mark_server_for_timeout(server);
  }
           
static inline void memcached_mark_server_for_timeout(memcached_server_write_instance_st server)
{
  if (server->state != MEMCACHED_SERVER_STATE_IN_TIMEOUT)
  {
    struct timeval next_time;
    if (gettimeofday(&next_time, NULL) == 0)
    {
      server->next_retry= next_time.tv_sec +server->root->retry_timeout;//server->root->retry_timeout默认为2
    }
    else
    {
      server->next_retry= 1; // Setting the value to 1 causes the timeout to occur immediatly
    }

    server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;  //进入超时状态
    if (server->server_failure_counter_query_id != server->root->query_id)
    {
      server->server_failure_counter++;
      server->server_failure_counter_query_id= server->root->query_id;
    }
    set_last_disconnected_host(server);
  }
}
           

六、backoff_handling

在memcached_connect()中,一开始就调用了此函数,用以检查准备连接的那个server的出错状况,并相应处理。这是一个关键性的函数。

/*
  backoff_handling()

  Based on time/failure count fail the connect without trying. This prevents waiting in a state where
  we get caught spending cycles just waiting.
*/
static memcached_return_t backoff_handling(memcached_server_write_instance_st server, bool& in_timeout)
{
  struct timeval curr_time;
  bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);

  /* 
    If we hit server_failure_limit then something is completely wrong about the server.

    1) If autoeject is enabled we do that.
    2) If not? We go into timeout again, there is much else to do :(
  */
  if (server->server_failure_counter >= server->root->server_failure_limit)
  {  //尝试多次,每次都失败,说明该server确实出现了问题。因此需要去除该服务器。
    /*
      We just auto_eject if we hit this point 
    */
    if (_is_auto_eject_host(server->root))  //假设我们允许自动抛出出错的服务器,往下执行。
    {
      set_last_disconnected_host(server);

      // Retry dead servers if requested
      if (_gettime_success and server->root->dead_timeout > 0) //如果设置了server->root->dead_timeout,它的值应该大一些,这样过较长时间再看看服务器恢复了没有。本例它为0.
      {
        server->next_retry= curr_time.tv_sec +server->root->dead_timeout;

        // We only retry dead servers once before assuming failure again
        server->server_failure_counter= server->root->server_failure_limit -1;
      }

      memcached_return_t rc;
      if (memcached_failed(rc= run_distribution((memcached_st *)server->root)))  //
      {
        return memcached_set_error(*server, rc, MEMCACHED_AT, memcached_literal_param("Backoff handling failed during run_distribution"));
      }

      return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
    }

    server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;

    // Sanity check/setting
    if (server->next_retry == 0)
    {
      server->next_retry= 1;
    }
  }

  if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT) //如果上一次连接该服务器失败,服务器就会进入该状态(见memcached_connect()中的memcached_mark_server_for_timeout())
  {                                     //同时会设定server->next_retry。
    /*
      If next_retry is less then our current time, then we reset and try everything again.
    */
    if (_gettime_success and server->next_retry < curr_time.tv_sec)//这说明server进入超时状态已经有一段时间了(next_retry已到),服务器应该恢复正常了,我们可以再尝试连接一下服务器,因此进入MEMCACHED_SERVER_STATE_NEW状态。
    {
      server->state= MEMCACHED_SERVER_STATE_NEW;
    }
    else
    {
      return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
    }

    in_timeout= true;
  }

  return MEMCACHED_SUCCESS;
}
           

七、总结

经过跟踪,无疑run_distribution()是一致性哈希最核心的函数。它在哪些地方被调用呢?

1、加入server时,memcached_server_push()或server_add()用于添加server,它会调用run_distribution()理所当然。

2、当server失效时(判定失效是根据连接失败次数有没有超过limit来决定的,失效后彻底抛弃server还是过较长的一段时间重试,取决于server->root->dead_timeout失效节点重试间隔),这可以参考上一节的backoff_handling()。不过是否调用run_distribution(),还取决于_is_auto_eject_host(server->root),即ptr->flags.auto_eject_hosts(默认为false)。

3、memcached_generate_hash_with_redistribution()用于根据key产生serverID,其中调用_regen_for_auto_eject(ptr),该函数检测有没有失效的服务器刚刚可能重启了,如果是,则调用run_distribution()。

4、其他地方,如memcached_behavior_set_distribution()设置分布式算法,memcached_clone()也会调用。

下一篇: memcached_get

继续阅读