天天看點

常見的幾種性能優化方案

作者:騰訊技術工程

作者:jiangtzhang,騰訊WXG背景開發工程師

| 導語 性能優化,長路漫漫啊。

在過去的一段時間做了些性能優化的事情,在這裡記錄一下遇到的一些性能優化case,也請各位大佬多多指教。

1. 使用Class代替ProtoBuf協定

在大量調用的API代碼中盡量的不要用ProtoBuf協定,最好使用C++的Class來代替。因為ProtoBuf采用的是Arena記憶體配置設定器政策,有些場景會比C++的Class記憶體管理複雜,當有大量記憶體配置設定和釋放的時候會比Class的性能差很多。而且Protobuf會不斷的配置設定和回收小記憶體對象,持續的配置設定和删除小記憶體對象導緻産生記憶體碎片,降低程式的記憶體使用率,尤其是當協定中包含string類型的時候,性能差距可能有幾倍。對于包含了很多小對象的protobuf message,析構過程會導緻堆上配置設定了許多對象,進而會變得複雜,導緻析構函數執行速度變慢。

下面給出一個實際開發中使用Class代替Protobuf的例子:

ProtoBuf協定:

message Param {
    optional string name = 1;
    optional string value = 2;
}
message ParamHit {
    enum Type {
        Unknown = 0;
        WhiteList = 1;
        LaunchLayer = 2;
        BaseAB = 3;
        DefaultParam = 4;
    }

    optional Param param = 1;
    optional uint64 group_id = 2;
    optional uint64 expt_id = 3;
    optional uint64 launch_layer_id = 4;
    optional string hash_key_used = 5; 
    optional string hash_key_val_used = 6;
    optional Type type = 7;
    optional bool is_hit_mbox = 8;
}           

改寫的Class

class ParamHitInfo {
public:
class Param {
public:
    Param() = default;
    ~Param() = default;
    
    const std::string & name() const {
        return name_;
    }
    void set_name(const std::string &name) {
        name_ = name;
    }
    void clear_name() {
        name_.clear();
    }
    const std::string & value() const {
        return value_;
    }
    void set_value(const std::string &value) {
        value_ = value;
    }
    void clear_value() {
        value_.clear();
    }
    void Clear() {
        clear_name();
        clear_value();
    }
private:
    std::string name_, value_;
};
    ParamHitInfo() {
        expt_id_ = group_id_ = launch_layer_id_ = 0u;
        is_hit_mbox_ = false;
        type_ = ParamHit::Unknown;
    }
    ~ParamHitInfo() = default;

    void Clear() {
        clear_group_id();
        clear_expt_id();
        clear_launch_layer_id();
        clear_is_hit_mbox();
        clear_hash_key_used();
        clear_hash_key_val_used();
        clear_type();
        param_.Clear();
    }

    const ParamHit ToProtobuf() const { 
        ParamHit ans;
        ans.set_expt_id(expt_id_);
        ans.set_group_id(group_id_);
        ans.set_launch_layer_id(launch_layer_id_);
        ans.set_is_hit_mbox(is_hit_mbox_);
        ans.set_hash_key_used(hash_key_used_);
        ans.set_hash_key_val_used(hash_key_val_used_);
        ans.set_type(type_);
        ans.mutable_param()->set_name(param_.name());
        ans.mutable_param()->set_value(param_.value());
        return ans; 
    }
    uint64_t group_id() const {
        return group_id_;
    }
    void set_group_id(const uint64_t group_id) {
        group_id_ = group_id;
    }
    void clear_group_id() {
        group_id_ = 0u;
    }
    uint64_t expt_id() const {
        return expt_id_;
    }
    void set_expt_id(const uint64_t expt_id) {
        expt_id_ = expt_id;
    }
    void clear_expt_id() {
        expt_id_ = 0u;
    }

    uint64_t launch_layer_id() const {
        return launch_layer_id_;
    }
    void set_launch_layer_id(const uint64_t launch_layer_id) {
        launch_layer_id_ = launch_layer_id;
    }
    void clear_launch_layer_id() {
        launch_layer_id_ = 0u;
    }

    bool is_hit_mbox() const {
        return is_hit_mbox_;
    }
    void set_is_hit_mbox(const bool is_hit_mbox) {
        is_hit_mbox_ = is_hit_mbox;
    }
    void clear_is_hit_mbox() {
        is_hit_mbox_ = false;
    }

    const std::string & hash_key_used() const {
        return hash_key_used_;
    }
    void set_hash_key_used(const std::string &hash_key_used) {
        hash_key_used_ = hash_key_used;
    }
    void clear_hash_key_used() {
        hash_key_used_.clear();
    }

    const std::string & hash_key_val_used() const {
        return hash_key_val_used_;
    }
    void set_hash_key_val_used(const std::string &hash_key_val_used) {
        hash_key_val_used_ = hash_key_val_used;
    }
    void clear_hash_key_val_used() {
        hash_key_val_used_.clear();
    }

    ParamHit_Type type() const {
        return type_;
    }
    void set_type(const ParamHit_Type type) {
        type_ = type;
    }
    void clear_type() {
        type_ = ParamHit::Unknown;
    }

    const Param & param() const {
        return param_;
    }
    Param * mutable_param() {
        return ¶m_;
    }
    std::string ShortDebugString() const {
        std::string ans = "type: " + std::to_string(type_);
        ans.append(", group_id: ").append(std::to_string(group_id_));
        ans.append(", expt_id: ").append(std::to_string(expt_id_));
        ans.append(", launch_layer_id: ").append(std::to_string(launch_layer_id_));
        ans.append(", hash_key_used: ").append(hash_key_used_);
        ans.append(", hash_key_val_used: ").append(hash_key_val_used_);
        ans.append(", param_name: ").append(param_.name());
        ans.append(", param_val: ").append(param_.value());
        ans.append(", is_hit_mbox: ").append(std::to_string(is_hit_mbox_));
        return ans;
    }
    int ByteSize() {
        int ans = 0;
        ans += sizeof(uint64_t) * 3 + sizeof(bool) + sizeof(ParamHit_Type);
        ans += hash_key_used_.size() + hash_key_val_used_.size() + param_.name().size() + param_.value().size();
        return ans;
    }
private:
    ParamHit_Type type_;
    uint64_t group_id_, expt_id_, launch_layer_id_;
    std::string hash_key_used_, hash_key_val_used_;
    bool is_hit_mbox_;
    Param param_;
};           

性能測試代碼:

TEST(ParamHitDestructorPerf, test) {
    vector<ParamHit> hits;
    vector<ParamHitInfo> hit_infos;
    const int hit_cnts = 1000;
    vector<pair<string, string>> params;
    for (int i=0; i<hit_cnts; ++i) {
        string name = "name: " + to_string(i);
        string val; 
        int n = 200;
        val.resize(n);
        for (int i=0; i<n; ++i) val[i] = (i%10 + 'a');
        params.push_back(make_pair(name, val));
    }
    int uin_start = 12345645;
    for (int i=0; i<hit_cnts; ++i) {
        ParamHit hit;
        hit.set_expt_id(i + uin_start);
        hit.set_group_id(i + 1 + uin_start);
        hit.set_type(ParamHit::BaseAB);
        hit.set_is_hit_mbox(false);
        hit.set_hash_key_used("uin_bytes");
        hit.set_hash_key_val_used(BusinessUtil::UInt64ToLittleEndianBytes(i));
        auto p = hit.mutable_param();
        p->set_name(params[i].first);
        p->set_value(params[i].second);
        hits.emplace_back(std::move(hit));
    }
    for (int i=0; i<hit_cnts; ++i) {
        ParamHitInfo hit;
        hit.set_expt_id(i + uin_start);
        hit.set_group_id(i + 1 + uin_start);
        hit.set_type(ParamHit::BaseAB);
        hit.set_is_hit_mbox(false);
        hit.set_hash_key_used("uin_bytes");
        hit.set_hash_key_val_used(BusinessUtil::UInt64ToLittleEndianBytes(i));
        auto p = hit.mutable_param();
        p->set_name(params[i].first);
        p->set_value(params[i].second);
        hit_infos.emplace_back(std::move(hit));
    }   
    int kRuns = 1000;
    chrono::high_resolution_clock::time_point t1 = chrono::high_resolution_clock::now();
    {
        for (int i=0; i<kRuns; ++i) {
            for (auto &&hit: hits) {
                auto tmp = hit;
            }
        }
    }
    chrono::high_resolution_clock::time_point t2 = chrono::high_resolution_clock::now();
    auto time_span = chrono::duration_cast<chrono::milliseconds>(t2 - t1);
    std::cerr << "ParamHit_PB Destructor kRuns: " << kRuns << " hit_cnts: " << hit_cnts << " cost: " << time_span.count() << "ms\n";
    
    t1 = chrono::high_resolution_clock::now();
    {
        for (int i=0; i<kRuns; ++i) {
            for (auto &&hit: hit_infos) {
                auto tmp = hit;
            }
        }
    }
    t2 = chrono::high_resolution_clock::now();
    time_span = chrono::duration_cast<chrono::milliseconds>(t2 - t1);
    std::cerr << "ParamHitInfo_Class Destructor kRuns: " << kRuns << " hit_cnts: " << hit_cnts << " cost: " << time_span.count() << "ms\n";
}           

性能對比結果:

常見的幾種性能優化方案

可以看到使用c++的Class相比于ProtoBuf可以提升3倍的性能 。

2. 使用Cache Friendly的資料結構

這裡想先抛出一個問題:使用哈希表的查找一定比使用數組的查找快嗎?

Q:理論上來說哈希表的查找複雜度是O(1),數組的查找複雜度是O(n),那麼是不是可以得到一個結論就是說哈希表的查找速度一定比數組快呢?

A:其實是不一定的,由于數組具有較高的緩存局部性,可提高CPU緩存的命中率,是以在有些場景下數組的查找效率要遠遠高于哈希表。

這裡給出一個常見操作耗時的資料(2020年):

常見的幾種性能優化方案

下面也給出一個項目中的使用Cache Friendly優化的例子:

優化前的資料結構:

class HitContext {
public:
    inline void update_hash_key(const std::string &key, const std::string &val) {
        hash_keys_[key] = val;
    }

    inline const std::string * search_hash_key(const std::string &key) const {
        auto it = hash_keys_.find(key);
        return it != hash_keys_.end() ? &(it->second) : nullptr;
    }

private:
    Context context_;
    std::unordered_map<std::string, std::string> hash_keys_;
};           

優化後的資料結構:

class HitContext {
public:
    inline void update_hash_key(const std::string &key, const std::string &val) {
        if (Misc::IsSnsHashKey(key)) {
            auto sns_id = Misc::FastAtoi(key.c_str()+Misc::SnsHashKeyPrefix().size());
            sns_hash_keys_.emplace_back(sns_id,  Misc::LittleEndianBytesToUInt32(val));
            return;
        }
        hash_keys_[key] = val;
    }

    inline void update_hash_key(const std::string &key, const uint32_t val) {
        if (Misc::IsSnsHashKey(key)) {
            auto sns_id = Misc::FastAtoi(key.c_str()+Misc::SnsHashKeyPrefix().size());
            sns_hash_keys_.emplace_back(sns_id, val);
            return;
        }
        hash_keys_[key] = Misc::UInt32ToLittleEndianBytes(val);
    }

    inline const std::string search_hash_key(const std::string &key, bool &find) const {
        if (Misc::IsSnsHashKey(key)) {
            auto sns_id = Misc::FastAtoi(key.c_str()+Misc::SnsHashKeyPrefix().size());
            auto it = std::find_if(sns_hash_keys_.rbegin(), sns_hash_keys_.rend(), [sns_id](const std::pair<uint32_t, uint32_t> &v) { return v.first == sns_id; });
            find = it != sns_hash_keys_.rend();
            return find ? Misc::UInt32ToLittleEndianBytes(it->second) : "";
        }
        auto it = hash_keys_.find(key);
        find = it != hash_keys_.end();
        return find ? it->second : "";
    }

private:
    Context context_;
    std::unordered_map<std::string, std::string> hash_keys_;
    std::vector<std::pair<uint32_t, uint32_t>> sns_hash_keys_;
};           

性能測試代碼

TEST(HitContext, test) {
    const int keycnt = 264;
    std::vector<std::string> keys, vals;
    for (int j = 0; j < keycnt; ++j)  {
        auto key = j+21324;
        auto val = j+94512454;
        keys.push_back("sns"+std::to_string(key));
        vals.push_back(std::to_string(val));
    }
    const int kRuns = 1000;
    std::unordered_map<uint32_t, uint64_t> hash_keys;
    chrono::high_resolution_clock::time_point t1 = chrono::high_resolution_clock::now();
    for (int i = 0; i < kRuns; ++i) {
        HitContext1 ctx;
        for (int j = 0; j < keycnt; ++j) {
           ctx.update_hash_key(keys[j], vals[j]);
        }
        for (int j=0; j<keycnt; ++j) {
            auto val = ctx.search_hash_key(keys[j]);
            if (!val) assert(0);
        }
    }
    chrono::high_resolution_clock::time_point t2 = chrono::high_resolution_clock::now();
    auto time_span = chrono::duration_cast<chrono::microseconds>(t2 - t1);
    std::cerr << "HashTable Hitcontext cost: " << time_span.count() << "us" << std::endl;
    hash_keys.clear();
    t1 = chrono::high_resolution_clock::now();
    for (int i = 0; i < kRuns; ++i) {
        HitContext2 ctx;
        for (int j = 0; j < keycnt; ++j) {
           ctx.update_hash_key(keys[j], vals[j]);
        }
        for (int j=0; j<keycnt; ++j) {
            bool find = false;
            auto val = ctx.search_hash_key(keys[j], find);
            if (!find) assert(0);
        }
    }
    t2 = chrono::high_resolution_clock::now();
    time_span = chrono::duration_cast<chrono::microseconds>(t2 - t1);
    std::cerr << "Vector HitContext cost: " << time_span.count() << "us" << std::endl;
}           

性能對比結果:

常見的幾種性能優化方案

3. 使用jemalloc/tcmalloc代替普通的malloc方式

由于代碼中大量的使用了C++的STL,是以會出現以下幾種缺點:

(1). 記憶體碎片:頻繁配置設定和釋放不同大小的對象,可能導緻記憶體碎片,降低記憶體的使用效率。

(2). Cache不友好。而且STL的普通記憶體配置設定器分散了對象的記憶體位址,降低了資料的緩存命中率

(3). 并發差。STL的預設記憶體配置設定器可能使用全局鎖,相當于給加了一把大鎖,在多線程環境下性能表現很差。

目前在我們的代碼中加jemalloc還是很友善的,就是在所編譯的target中加下依賴就好了,比如:

常見的幾種性能優化方案

使用jemalloc與不使用jemalloc前後性能對比(這裡的測試場景是在loadbusiness的時候,具體涉及到了一些業務代碼)

常見的幾種性能優化方案

可以發現使用jemalloc可以提升20%多的性能,還是優化了很大的,很小的開發成本(隻需要加一個編譯依賴)帶來不錯的收益。

4. 使用無鎖資料結構

在過去項目開發的時候使用過一種雙buffer的無鎖資料結構,之是以使用雙buffer是因為API有大約26億/s的調用量,這麼高的調用量對性能的要求是很高的。資料結構的定義:

struct expt_api_new_shm {
  void *p_shm_data;
  // header
  volatile int *p_mem_switch; // 0:uninit. 1:mem 1 on server. 2:mem 2 on server
  uint32_t *p_crc_sum;
  // data
  expt_new_context* p_new_context;
  parameter2business* p_param2business;
  char* p_business_cache;
  HashTableWithCache hash_table; //多級哈希表
};           

常用的幾個函數:

int InitExptNewShmData(expt_api_new_shm *pstShmData, void *pData) {
  int ptr_offset = EXPT_NEW_SHM_HEADER_SIZE;
  pstShmData->p_shm_data = pData;
  pstShmData->p_mem_switch = MAKE_PTR(volatile int *, pData, 0);
  pstShmData->p_crc_sum = MAKE_PTR(uint32_t *, pData, 4);
  pstShmData->p_new_context =
      (expt_new_context *)((uint8_t *)pstShmData->p_shm_data + ptr_offset);
  pstShmData->p_param2business =
      (parameter2business *)((uint8_t *)pstShmData->p_shm_data + ptr_offset +
                             EXPT_NEW_SHM_OFFSET_0);
  pstShmData->p_business_cache =
      (char *)((uint8_t *)pstShmData->p_shm_data + ptr_offset +
                            EXPT_NEW_SHM_OFFSET_1);
  
  size_t node_size = sizeof(APICacheItem),  row_cnt = sizeof(auModsInCache)/sizeof(size_t);
  size_t hash_tbl_size = CalHashTableWithCacheSize(node_size, row_cnt, auModsInCache);
  pstShmData->hash_table.pTable = (void *)((uint8_t *)pstShmData->p_shm_data + EXPT_NEW_SHM_SIZE - hash_tbl_size);
  int ret = HashTableWithCacheInit(&pstShmData->hash_table, hash_tbl_size, node_size, row_cnt, auModsInCache);
  return ret;
}
int ResetExptNewShmData(expt_api_new_shm *pstShmData) {
  int iOffset = 0;

  if (*pstShmData->p_mem_switch <= 1) {
    iOffset = 0;
  } else if (*pstShmData->p_mem_switch > 1) {
    iOffset = EXPT_NEW_SHM_DATA_SIZE;
  }

  void *ptrData = MAKE_PTR(void *, pstShmData->p_shm_data,
                           EXPT_NEW_SHM_HEADER_SIZE + iOffset);
  memset(ptrData, 0, EXPT_NEW_SHM_DATA_SIZE);
  return 0;
}
int ResetExptNewShmHeader(expt_api_new_shm *pstShmData) {
  memset(pstShmData->p_shm_data, 0, EXPT_NEW_SHM_HEADER_SIZE);
  return 0;
}
void SwitchNewShmMemToWrite(expt_api_new_shm *pstShmData) {
  int iSwitchOffset =
      EXPT_NEW_SHM_DATA_SIZE * ((*pstShmData->p_mem_switch <= 1 ? 0 : 1));
  int ptr_offset = EXPT_NEW_SHM_HEADER_SIZE + iSwitchOffset;

  pstShmData->p_new_context =
      (expt_new_context *)((uint8_t *)pstShmData->p_shm_data + ptr_offset);
  pstShmData->p_param2business =
      (parameter2business *)((uint8_t *)pstShmData->p_shm_data + ptr_offset +
                             EXPT_NEW_SHM_OFFSET_0);
  pstShmData->p_business_cache =
      (char *)((uint8_t *)pstShmData->p_shm_data + ptr_offset +
                            EXPT_NEW_SHM_OFFSET_1);
}
void SwitchNewShmMemToWriteDone(expt_api_new_shm *pstShmData) {
  if (*pstShmData->p_mem_switch <= 1)
    *pstShmData->p_mem_switch = 2;
  else
    *pstShmData->p_mem_switch = 1;
}
void SwitchNewShmMemToRead(expt_api_new_shm *pstShmData) {
  int iSwitchOffset =
      EXPT_NEW_SHM_DATA_SIZE * ((*pstShmData->p_mem_switch <= 1 ? 1 : 0));
  int ptr_offset = EXPT_NEW_SHM_HEADER_SIZE + iSwitchOffset;
  pstShmData->p_new_context =
      (expt_new_context *)((uint8_t *)pstShmData->p_shm_data + ptr_offset);
  pstShmData->p_param2business =
      (parameter2business *)((uint8_t *)pstShmData->p_shm_data + ptr_offset +
                             EXPT_NEW_SHM_OFFSET_0);
  pstShmData->p_business_cache =
      (char *)((uint8_t *)pstShmData->p_shm_data + ptr_offset +
                            EXPT_NEW_SHM_OFFSET_1);
}           

雙buffer的工作原理就是:設定兩個buffer,一個用于讀,另一個用于寫。

(a)初始化這兩個buffer為空,調用InitExptNewShmData函數。

(b)對于寫操作,先準備資料,即調用SwitchNewShmMemToWrite函數,等資料準備完(即寫完相應的資料),然後調用SwitchNewShmMemToWriteDone函數,完成指針的切換。

(c)對于讀操作,線程從讀buffer讀取資料,調用SwitchNewShmMemToRead函數。

我們平台的場景主要是讀,而且由于拉取實驗配置采用的都是增量的拉取方式,是以配置的改變也不是很頻繁,也就很少有寫操作的出現。采用雙buffer無鎖資料結構的優勢在于可以提高并發性能,由于讀寫操作在不同的buffer上同時進行,是以不需要額外加鎖,減少了資料競争和鎖沖突的可能性。當然這種資料結構也有相應的缺點,就是會多用了一倍的記憶體,用空間換時間。

5.對于特定的場景采用特定的處理方式

這其實也很容易了解其實有很多場景是需要定制化優化的,是以不能從主體代碼的層面去優化了,那換個思路,是不是可以從傳回的資料格式進行優化呢?舉個我們過去遇到的一個例子:我們平台有一個染色場景,就是需要對當天登入的所有微信使用者計算命中情況,舊的資料格式其實傳回了一堆本身染色場景不需要的字段,是以這裡其實是可以優化的。

優化前的資料格式:

struct expt_param_item {
    int experiment_id;
    int expt_group_id;
    int layer_id;
    int domain_id;
    uint32_t seq;
    uint32_t start_time;
    uint32_t end_time;
    uint8_t  expt_type;
    uint16_t  expt_client_expand;
    int parameter_id;
    uint8_t value[MAX_PARAMETER_VLEN];
    char param_name[MAX_PARAMETER_NLEN];
    int value_len;
    uint8_t is_pkg = 0;
    uint8_t is_white_list = 0;
    uint8_t is_launch = 0;
    uint64_t bucket_src = 0;
    uint8_t is_control = 0;
};            

其實染色場景下不需要參數的資訊,隻保留實驗ID、組ID以及分桶的資訊就好了。優化後的資料格式:

struct DyeHitInfo {
    int expt_id, group_id;
    uint64_t bucket_src;
    DyeHitInfo(){}
	DyeHitInfo(int expt_id_, int group_id_, uint64_t bucket_src_) :expt_id(expt_id_), group_id(group_id_), bucket_src(bucket_src_){}
    bool operator <(const DyeHitInfo &hit) const {
        if (expt_id == hit.expt_id) {
            if (group_id == hit.group_id) return bucket_src < hit.bucket_src;
            return group_id < hit.group_id;
        }
		return expt_id < hit.expt_id;
	}
    bool operator==(const DyeHitInfo &hit) {
        return hit.expt_id == expt_id && hit.group_id == group_id && hit.bucket_src == bucket_src;
    }
    std::string ToString() const {
        char buf[1024];
        sprintf(buf, "expt_id: %u, group_id: %u, bucket_src: %lu", expt_id, group_id, bucket_src);
        return std::string(buf);
    }
};           

優化前後性能對比:

常見的幾種性能優化方案

是以其實針對某些特殊場景做一些定制化的開發成本也沒有很高,但是帶來的收益卻是巨大的。

6.善用性能測試工具

這裡列舉一些常見的性能測試工具:linux提供的perf、GNU編譯器提供的gprof、Valgrind、strace等等。

這裡推薦幾個覺得好用的工具:

perf(linux自帶性能測試工具)

https://godbolt.org/(可以檢視代碼對應的彙編代碼)

微信運維提供的性能測試工具

https://github.com/brendangregg/FlameGraph (生成火焰圖的工具)

7.總結

其實還有一些性能優化的地方,比如使用合适的資料結構和算法、減少大對象的拷貝,減少無效的計算,IO與計算分離,分支預測等等,後續如果有時間的話可以在補充點内容。性能優化不是一錘子買賣,是以需要一直監控,一直優化。需要注意的一點是不要過度優化,在提升程式性能的時候不要丢掉代碼的可維護性,而且還要評估下性能提升帶來的收益是否與花費的時間成正比。總之,性能優化,長路漫漫。

在這裡要感謝下團隊小夥伴的幫助。

最後請各位大佬多多指教。

8.參考

(1). https://colin-scott.github.io/personal_website/research/interactive_latency.html

(2). https://zhuanlan.zhihu.com/p/484951383

(3). https://www.brendangregg.com/perf.html

(4). https://jemalloc.net/

繼續閱讀