天天看点

mysql8.0 hash join源码分析hash_join_chunkhash_join_bufferhash_join_iterator

mysql8.0中关于hash join的文件有以下几个,一个个来分析。

mysql8.0 hash join源码分析hash_join_chunkhash_join_bufferhash_join_iterator

hash_join_chunk

HashJoinChunk是用来存储行并落盘的类型,表分区成小文件时也可以用。

将列写入 HashJoinChunk 时,我们使用 StoreFromTableBuffers 将必要的列转换为适合存储在磁盘上的格式。

方便的是,StoreFromTableBuffers 创建了一个连续的字节范围和相应的长度,可以轻松有效地将其写到文件中。从文件中读回行时,LoadIntoTableBuffers() 用于将行放回表记录缓冲区。

具体用法如下:

HashJoinChunk chunk;

// 初始化chunk来保存给定表的数据,匹配标志设置为false
// 第一个参数用来指定存储的哪张表的数据,第二个参数用来指定每行需不需要有匹配标志
chunk.Init(tables, /*uses_match_flags=*/false);

// 表和chunk之间复制数据用的缓冲区
String buffer;

while (iterator->Read() == 0) {
     // 将buffer中记录的数据转到chunk中
     // 如果这个chunk被初始化为匹配标志为true,我们会在行前加一个匹配标志
     chunk.WriteRowToChunk(&buffer, /*matched=*/false);
};

// 准备读取chunk中的第一行数据
chunk.Rewind();

bool match_flag;
// 将chunk中的数据导到buffer中
// 如果chunk被初始化为使用匹配标志,则行读取的匹配标志将存储在“match_flag”中。
chunk.LoadRowFromChunk(&buffer, &match_flag);
           

HashJoinChunk拥有几个接口:

// 初始化接口
bool Init(const pack_rows::TableCollection &tables, bool uses_match_flags);

// 返回chunk中存储的行数
ha_rows num_rows() const { return m_num_rows; }

// 将row写入chunk
bool WriteRowToChunk(String *buffer, bool matched);

// 从chunk中读入row
bool LoadRowFromChunk(String *buffer, bool *matched);

//刷新文件缓冲区,并准备读取文件。
bool Rewind();
           

看了一下WriteRowChunk的源码,思路还是很清楚的:

mysql8.0 hash join源码分析hash_join_chunkhash_join_bufferhash_join_iterator

hash_join_buffer

HashJoinBuffer是用来存储一批行数据的。

行数据会存储在哈希表中,HashJoinBuffer会维护自己的MEM_ROOT,所有数据都会分配在这里。

考虑下面的sql:

SELECT t1.data FROM t1 JOIN t2 ON (t1.key = t2.key);

假设表“t2”存储在 HashJoinBuffer 中。这种情况下,t2.key就将作为哈希表键值,如果on条件中有多个等式,那么这些等式将一起组合为哈希表的键值。

使用函数接口 StoreFromTableBuffers来存储行数据。

HashJoinBuffer能存储的数据量由参数join_buffer_size来限制,当然,为了检查数据量是不是超过join_buffer_size,可能会需要一些更多的内存空间。

哈希键值是由join的on条件来决定的,比如 (t1.col1 = t2.col1 AND t1.col2 = t2.col2),那么哈希键值就是(col1, col2),实际上转成什么类型也是由join条件决定的,比如有字符串和整数,那么整数就会被转为字符串存储。所以应该使用共有的结构来比较,这里就直接一个个字符(byte-by-byte)来哈希和比较,存储的时候也是如此。

对于类似UPPER(t1.col1) = UPPER(t2.col1)这种on条件,会把UPPER(t1.col1)作为哈希键值

这里的Key类型的结构体定义如下,很简单:

class Key {
 public:
  /// Note that data can be nullptr if size is 0.
  Key(const uchar *data, size_t size) : m_data(data), m_size(size) {}

  bool operator==(const Key &other) const {
    if (other.size() != size()) {
      return false;
    } else if (size() == 0) {
      return true;
    }

    return memcmp(other.data(), data(), size()) == 0;
  }

  const uchar *data() const { return m_data; }

  size_t size() const { return m_size; }

 private:
  const uchar *m_data;
  const size_t m_size;
};
           

主体的HashJoinRowBuffer类,主要包含下面的成员变量:

// 存储的join条件 
const std::vector<HashJoinCondition> m_join_conditions;

// 一行数据可以由来自不同表的部分组成。 这个结构告诉我们涉及哪些表。
const pack_rows::TableCollection m_tables;

// 所有的哈希键值都会存储在MEM_ROOT,分配在堆上。
MEM_ROOT m_mem_root;

// 这个MEM_ROOT结构仅用于存储插入哈希表的最后一行数据,目的是为了保证不会溢出内存
MEM_ROOT m_overflow_mem_root;

// 存储行值的哈希表
std::unique_ptr<hash_map_type> m_hash_map;

// 通过join条件构建key时用的缓冲区
String m_buffer;
size_t m_row_size_upper_bound;

// The maximum size of the buffer, given in bytes.
const size_t m_max_mem_available;

// 存储在哈希表中的最后一行,如果哈希表为空,则为 nullptr。
LinkedImmutableString m_last_row_stored{nullptr};

// 获取相关列,组成一行数据后,作为LinkedImmutableString类型存储到哈希表中
LinkedImmutableString StoreLinkedImmutableStringFromTableBuffers(
      LinkedImmutableString next_ptr, bool *full);
           

主体的HashJoinRowBuffer类,接口如下:

// 初始化一个HashJoinRowBuffer,代表已经准备好存储行数据
// 这个接口可以调用很多次,每调用一次会清除现有的行数据
bool Init();

// 用来存储行数据
// THD:处理线程
// reject_duplicate_keys:如果为 true,则拒绝具有重复键的行。被拒绝后,仍将返回 ROW_STORED
// store_rows_with_null_in_condition:是否存储NULL行
// 返回值类型:
// ROW_STORED: 行已经被存储了
// BUFFER_FULL:行被存储了,但是缓冲区已经满了
// FATAL_ERROR:错误
StoreRowResult StoreRow(THD *thd, bool reject_duplicate_keys,
                          bool store_rows_with_null_in_condition);

// 返回哈希表的状态
size_t size() const { return m_hash_map->size(); }

bool empty() const { return m_hash_map->empty(); }

bool inited() const { return m_hash_map != nullptr; }

// 返回查找哈希键值key的迭代器
hash_map_iterator find(const Key &key) const { return m_hash_map->find(key); }

hash_map_iterator begin() const { return m_hash_map->begin(); }

hash_map_iterator end() const { return m_hash_map->end(); }

// 获取最后一行插入到哈希表中的行
LinkedImmutableString LastRowStored() const {
    assert(Initialized());
    return m_last_row_stored;
}

// 和inited()作用差不多
bool Initialized() const { return m_hash_map.get() != nullptr; }

// 查看是否包含某个key值
bool contains(const Key &key) const { return find(key) != end(); }
           

hash_join_iterator

这个文件里介绍了一下目前hash join的工作原理。

未超过内存限制的 hash join

join操作可以看做有两个输入,一个是 build input,一个是probe input。从名字可以看出来,build input是需要用来构建哈希表的,而probe input就是用来一行一行读取并和哈希表匹配的。

所以hash join的流程如下:

  1. 将两个input,指定其中一个为build input,指定另外一个为probe input。通常都会指定表大小比较小的表作为build input(而不是指定行数比较少的表,这里应该是为了内存中能存储下)
  2. 将build input的表全部读取到内存中构建哈希表,哈希键值是根据on条件来计算的,比如SELECT * FROM lineitem

    INNER JOIN orders ON orders.o_orderkey = lineitem.l_orderkey;

    其中“orders”表作为build input,那么哈希键值就根据o_orderkey来做,当然优化器也会识别join中的隐式条件,比如,

    SELECT * FROM orders, lineitem

    WHERE orders.o_orderkey = lineitem.l_orderkey;

    这个也会识别出o_orderkey。

  3. 每行每行的读取probe input中的数据,对于每一行来计算对应的哈希键值,上面的例子是l_orderkey,采用相同的哈希计算函数来计算哈希值,并从哈希表中查找到对应值。关于行数据是如何存储和恢复的细节,可以看pack_rows::StoreFromTableBuffers

当然,内存有限,限制于join_buffer_size。

没有超过内存的部分仍然按照上述步骤来做。

如果超过了限制,那么就按照下面的步骤来做

超过内存限制的 hash join

  1. build input中,没有被构建成hash table的其余行,会被分成给定数量n的文件,存储在HashJoinChunk中。对于build input 和 probe input,都会开n个chunk文件,这n个chunk文件是为了存储两张表的表数据的,因为内存放不下,同样会用到哈希,不过用的是不同的哈希函数。
  2. 然后开始从probe input一行一行的读取数据,先和内存中的hashtable做匹配,也就是走一遍上面的hash join步骤。然后使用另一个哈希函数来计算哈希值,确定这行数据要存到那个chunk文件中,因为这行数据之后还要和build input的其他表做匹配。
  3. 讲过上面的步骤,因为用的是一样的哈希函数分区,可以保证build input和probe input匹配的行肯定在同一个chunk中。所以开始对每个chunk的数据进行上面经典的hash join流程

上面的介绍都是对inner join的介绍,对于semi join来说,也差不多。

semi join 的hash join

  1. 因为semi join只需要匹配一行数据就行,所以构建哈希表时不需要存储重复值。
  2. 输出所有能在哈希表中找到匹配的行。如果内存存不下,变成落盘的hash join,那么只需要把probe input未能在哈希表中匹配的行进行落盘

hash join的注意点

  • hash join只适用于等值条件,对于有其他条件的join来说,哈希匹配后,还要保证其他条件也能够匹配。
  • 如果on-disk的hash join,产生的chunk文件仍然过大,比如有一个很偏的数据集,那么肯定会有这种情况。
  • 写磁盘时会设定一个"kMaxChunks",用来表示最大chunk文件数量,这个是为了不消耗完所有文件描述符。
  • 还会有一个设定值来避免写磁盘,如果不落盘,那么就一遍遍的构建哈希表,然后一遍遍的读取probe input的数据。有的场景其实是不罗盘更好,但是mysql没有针对这个的成本优化,要不要落盘只能通过参数来控制。

HashJoinIterator类

初始化一个HashJoinIterator需要下面的参数:

// thd:处理线程
// build_input:build input迭代器
// build_input_tables:build input涉及到的表
// estimated_build_rows:build input的预估行数
// probe_input:probe input迭代器
// probe_input_tables:probe input涉及到的表
// store_rowids:bool类型,是否保存rowid,仅在清除操作时使用 
// tables_to_get_rowid_for:一个map,存储需要调用position()的表
// max_memory_available:就是join_buffer_size
// join_conditions:join条件列表
// allow_spill_to_disk:是否允许落盘
// join_type:join类型,semi或者inner
// extra_conditions:不能被构建哈希表的额外条件,且由and连接
// probe_input_batch_mode:是否在probe input启用批处理
// hash_table_generation:清除哈希表次数的计数器

HashJoinIterator(THD *thd, unique_ptr_destroy_only<RowIterator> build_input,
                   const Prealloced_array<TABLE *, 4> &build_input_tables,
                   double estimated_build_rows,
                   unique_ptr_destroy_only<RowIterator> probe_input,
                   const Prealloced_array<TABLE *, 4> &probe_input_tables,
                   bool store_rowids, table_map tables_to_get_rowid_for,
                   size_t max_memory_available,
                   const std::vector<HashJoinCondition> &join_conditions,
                   bool allow_spill_to_disk, JoinType join_type,
                   const Mem_root_array<Item *> &extra_conditions,
                   bool probe_input_batch_mode,
                   uint64_t *hash_table_generation);
           

HashJoinIterator类的接口如下:

// 使用build input的数据构建哈希表,内存存不下,剩余数据就落盘
bool BuildHashTable();

// 从下一个chunk文件中读取数据到内存中构建哈希表
bool ReadNextHashJoinChunk();

// 从probe input读取一行数据,如果当前已经开始落盘操作,那么读取到的数据会落盘
// 读取完,迭代器的状态会有如下改变:
// READING_FIRST_ROW_FROM_HASH_TABLE:这行数据已经读取完放在内存里了
// LOADING_NEXT_CHUNK_PAIR:probe input已经没有数据可以读取了
bool ReadRowFromProbeIterator();

// 从当前的probe chunk文件中读取一行数据到内存,迭代器的返回状态和上面相同
bool ReadRowFromProbeChunkFile();

// 从probe saving 文件中读取一行数据,我本来没搞懂这个savingfile和上面的chunk file有什么区别
// 看之前的看介绍可以知道,chunkfile是和build input成对的,落盘用的,结构是HashJoinChunk
// 而savingFile是为了记录没有匹配到的数据的
// 比如semi join,是匹配到就返回的,如果哈希表被构建了多次,那么probe input也会有多次,
// 那么就会产生重复值,所以需要记录没有匹配的数据,匹配过的就不管了
bool ReadRowFromProbeRowSavingFile();

// 在哈希表中查找m_temporary_row_and_join_key_buffer的键值
void LookupProbeRowInHashTable();

// 判断是否启用了disk hash join
bool on_disk_hash_join() const { return !m_chunk_files_on_disk.empty(); }

// 如果允许落盘操作,将probe input的行写到chunkfile中
bool WriteProbeRowToDiskIfApplicable();

// 判断哈希匹配成功的数据能不能满足其他条件,都满足就返回true
bool JoinedRowPassesExtraConditions() const;

// 看能不能去掉哈希表中的重复键值
bool RejectDuplicateKeys() const

// 以下几个初始化函数顾名思义
bool InitRowBuffer();
bool InitProbeIterator();
bool InitWritingToProbeRowSavingFile();
bool InitReadingFromProbeRowSavingFile();

// 设置probe input的迭代器状态
void SetReadingProbeRowState();

// 找哈希表中下一个匹配的数据,并查看满不满足其他join条件,如果需要还会落盘
int ReadNextJoinedRowFromHashTable();
           

mysql的源码注释写的非常详细,弄懂这些接口,也就能明白mysql对于哈希join的处理了。

看mysql的源码确实非常受益,看这些大神们是怎么管理自己的代码的,看他们对于内存管理怎么做的,像join_buffer_size这种东西,实际上超没超,肯定是插入数据之后才能看出来,最起码我是这么写的,但是mysql的开发者们就很聪明的新建了个临时缓存区,判断无误后才能放入buffer中,这样就可以很便捷的管理内存了,类似这种思路,mysql中还有很多,确实能感受到设计的严谨,像我在实际写内存限制的时候,这些细节就考虑不好,读书有益,读源码也一样的。