mysql8.0中关于hash join的文件有以下几个,一个个来分析。
hash_join_chunk
HashJoinChunk是用来存储行并落盘的类型,表分区成小文件时也可以用。
将列写入 HashJoinChunk 时,我们使用 StoreFromTableBuffers 将必要的列转换为适合存储在磁盘上的格式。
方便的是,StoreFromTableBuffers 创建了一个连续的字节范围和相应的长度,可以轻松有效地将其写到文件中。从文件中读回行时,LoadIntoTableBuffers() 用于将行放回表记录缓冲区。
具体用法如下:
HashJoinChunk chunk;
// 初始化chunk来保存给定表的数据,匹配标志设置为false
// 第一个参数用来指定存储的哪张表的数据,第二个参数用来指定每行需不需要有匹配标志
chunk.Init(tables, /*uses_match_flags=*/false);
// 表和chunk之间复制数据用的缓冲区
String buffer;
while (iterator->Read() == 0) {
// 将buffer中记录的数据转到chunk中
// 如果这个chunk被初始化为匹配标志为true,我们会在行前加一个匹配标志
chunk.WriteRowToChunk(&buffer, /*matched=*/false);
};
// 准备读取chunk中的第一行数据
chunk.Rewind();
bool match_flag;
// 将chunk中的数据导到buffer中
// 如果chunk被初始化为使用匹配标志,则行读取的匹配标志将存储在“match_flag”中。
chunk.LoadRowFromChunk(&buffer, &match_flag);
HashJoinChunk拥有几个接口:
// 初始化接口
bool Init(const pack_rows::TableCollection &tables, bool uses_match_flags);
// 返回chunk中存储的行数
ha_rows num_rows() const { return m_num_rows; }
// 将row写入chunk
bool WriteRowToChunk(String *buffer, bool matched);
// 从chunk中读入row
bool LoadRowFromChunk(String *buffer, bool *matched);
//刷新文件缓冲区,并准备读取文件。
bool Rewind();
看了一下WriteRowChunk的源码,思路还是很清楚的:
hash_join_buffer
HashJoinBuffer是用来存储一批行数据的。
行数据会存储在哈希表中,HashJoinBuffer会维护自己的MEM_ROOT,所有数据都会分配在这里。
考虑下面的sql:
SELECT t1.data FROM t1 JOIN t2 ON (t1.key = t2.key);
假设表“t2”存储在 HashJoinBuffer 中。这种情况下,t2.key就将作为哈希表键值,如果on条件中有多个等式,那么这些等式将一起组合为哈希表的键值。
使用函数接口 StoreFromTableBuffers来存储行数据。
HashJoinBuffer能存储的数据量由参数join_buffer_size来限制,当然,为了检查数据量是不是超过join_buffer_size,可能会需要一些更多的内存空间。
哈希键值是由join的on条件来决定的,比如 (t1.col1 = t2.col1 AND t1.col2 = t2.col2),那么哈希键值就是(col1, col2),实际上转成什么类型也是由join条件决定的,比如有字符串和整数,那么整数就会被转为字符串存储。所以应该使用共有的结构来比较,这里就直接一个个字符(byte-by-byte)来哈希和比较,存储的时候也是如此。
对于类似UPPER(t1.col1) = UPPER(t2.col1)这种on条件,会把UPPER(t1.col1)作为哈希键值
这里的Key类型的结构体定义如下,很简单:
class Key {
public:
/// Note that data can be nullptr if size is 0.
Key(const uchar *data, size_t size) : m_data(data), m_size(size) {}
bool operator==(const Key &other) const {
if (other.size() != size()) {
return false;
} else if (size() == 0) {
return true;
}
return memcmp(other.data(), data(), size()) == 0;
}
const uchar *data() const { return m_data; }
size_t size() const { return m_size; }
private:
const uchar *m_data;
const size_t m_size;
};
主体的HashJoinRowBuffer类,主要包含下面的成员变量:
// 存储的join条件
const std::vector<HashJoinCondition> m_join_conditions;
// 一行数据可以由来自不同表的部分组成。 这个结构告诉我们涉及哪些表。
const pack_rows::TableCollection m_tables;
// 所有的哈希键值都会存储在MEM_ROOT,分配在堆上。
MEM_ROOT m_mem_root;
// 这个MEM_ROOT结构仅用于存储插入哈希表的最后一行数据,目的是为了保证不会溢出内存
MEM_ROOT m_overflow_mem_root;
// 存储行值的哈希表
std::unique_ptr<hash_map_type> m_hash_map;
// 通过join条件构建key时用的缓冲区
String m_buffer;
size_t m_row_size_upper_bound;
// The maximum size of the buffer, given in bytes.
const size_t m_max_mem_available;
// 存储在哈希表中的最后一行,如果哈希表为空,则为 nullptr。
LinkedImmutableString m_last_row_stored{nullptr};
// 获取相关列,组成一行数据后,作为LinkedImmutableString类型存储到哈希表中
LinkedImmutableString StoreLinkedImmutableStringFromTableBuffers(
LinkedImmutableString next_ptr, bool *full);
主体的HashJoinRowBuffer类,接口如下:
// 初始化一个HashJoinRowBuffer,代表已经准备好存储行数据
// 这个接口可以调用很多次,每调用一次会清除现有的行数据
bool Init();
// 用来存储行数据
// THD:处理线程
// reject_duplicate_keys:如果为 true,则拒绝具有重复键的行。被拒绝后,仍将返回 ROW_STORED
// store_rows_with_null_in_condition:是否存储NULL行
// 返回值类型:
// ROW_STORED: 行已经被存储了
// BUFFER_FULL:行被存储了,但是缓冲区已经满了
// FATAL_ERROR:错误
StoreRowResult StoreRow(THD *thd, bool reject_duplicate_keys,
bool store_rows_with_null_in_condition);
// 返回哈希表的状态
size_t size() const { return m_hash_map->size(); }
bool empty() const { return m_hash_map->empty(); }
bool inited() const { return m_hash_map != nullptr; }
// 返回查找哈希键值key的迭代器
hash_map_iterator find(const Key &key) const { return m_hash_map->find(key); }
hash_map_iterator begin() const { return m_hash_map->begin(); }
hash_map_iterator end() const { return m_hash_map->end(); }
// 获取最后一行插入到哈希表中的行
LinkedImmutableString LastRowStored() const {
assert(Initialized());
return m_last_row_stored;
}
// 和inited()作用差不多
bool Initialized() const { return m_hash_map.get() != nullptr; }
// 查看是否包含某个key值
bool contains(const Key &key) const { return find(key) != end(); }
hash_join_iterator
这个文件里介绍了一下目前hash join的工作原理。
未超过内存限制的 hash join
join操作可以看做有两个输入,一个是 build input,一个是probe input。从名字可以看出来,build input是需要用来构建哈希表的,而probe input就是用来一行一行读取并和哈希表匹配的。
所以hash join的流程如下:
- 将两个input,指定其中一个为build input,指定另外一个为probe input。通常都会指定表大小比较小的表作为build input(而不是指定行数比较少的表,这里应该是为了内存中能存储下)
-
将build input的表全部读取到内存中构建哈希表,哈希键值是根据on条件来计算的,比如SELECT * FROM lineitem
INNER JOIN orders ON orders.o_orderkey = lineitem.l_orderkey;
其中“orders”表作为build input,那么哈希键值就根据o_orderkey来做,当然优化器也会识别join中的隐式条件,比如,
SELECT * FROM orders, lineitem
WHERE orders.o_orderkey = lineitem.l_orderkey;
这个也会识别出o_orderkey。
- 每行每行的读取probe input中的数据,对于每一行来计算对应的哈希键值,上面的例子是l_orderkey,采用相同的哈希计算函数来计算哈希值,并从哈希表中查找到对应值。关于行数据是如何存储和恢复的细节,可以看pack_rows::StoreFromTableBuffers
当然,内存有限,限制于join_buffer_size。
没有超过内存的部分仍然按照上述步骤来做。
如果超过了限制,那么就按照下面的步骤来做
超过内存限制的 hash join
- build input中,没有被构建成hash table的其余行,会被分成给定数量n的文件,存储在HashJoinChunk中。对于build input 和 probe input,都会开n个chunk文件,这n个chunk文件是为了存储两张表的表数据的,因为内存放不下,同样会用到哈希,不过用的是不同的哈希函数。
- 然后开始从probe input一行一行的读取数据,先和内存中的hashtable做匹配,也就是走一遍上面的hash join步骤。然后使用另一个哈希函数来计算哈希值,确定这行数据要存到那个chunk文件中,因为这行数据之后还要和build input的其他表做匹配。
- 讲过上面的步骤,因为用的是一样的哈希函数分区,可以保证build input和probe input匹配的行肯定在同一个chunk中。所以开始对每个chunk的数据进行上面经典的hash join流程
上面的介绍都是对inner join的介绍,对于semi join来说,也差不多。
semi join 的hash join
- 因为semi join只需要匹配一行数据就行,所以构建哈希表时不需要存储重复值。
- 输出所有能在哈希表中找到匹配的行。如果内存存不下,变成落盘的hash join,那么只需要把probe input未能在哈希表中匹配的行进行落盘
hash join的注意点
- hash join只适用于等值条件,对于有其他条件的join来说,哈希匹配后,还要保证其他条件也能够匹配。
- 如果on-disk的hash join,产生的chunk文件仍然过大,比如有一个很偏的数据集,那么肯定会有这种情况。
- 写磁盘时会设定一个"kMaxChunks",用来表示最大chunk文件数量,这个是为了不消耗完所有文件描述符。
- 还会有一个设定值来避免写磁盘,如果不落盘,那么就一遍遍的构建哈希表,然后一遍遍的读取probe input的数据。有的场景其实是不罗盘更好,但是mysql没有针对这个的成本优化,要不要落盘只能通过参数来控制。
HashJoinIterator类
初始化一个HashJoinIterator需要下面的参数:
// thd:处理线程
// build_input:build input迭代器
// build_input_tables:build input涉及到的表
// estimated_build_rows:build input的预估行数
// probe_input:probe input迭代器
// probe_input_tables:probe input涉及到的表
// store_rowids:bool类型,是否保存rowid,仅在清除操作时使用
// tables_to_get_rowid_for:一个map,存储需要调用position()的表
// max_memory_available:就是join_buffer_size
// join_conditions:join条件列表
// allow_spill_to_disk:是否允许落盘
// join_type:join类型,semi或者inner
// extra_conditions:不能被构建哈希表的额外条件,且由and连接
// probe_input_batch_mode:是否在probe input启用批处理
// hash_table_generation:清除哈希表次数的计数器
HashJoinIterator(THD *thd, unique_ptr_destroy_only<RowIterator> build_input,
const Prealloced_array<TABLE *, 4> &build_input_tables,
double estimated_build_rows,
unique_ptr_destroy_only<RowIterator> probe_input,
const Prealloced_array<TABLE *, 4> &probe_input_tables,
bool store_rowids, table_map tables_to_get_rowid_for,
size_t max_memory_available,
const std::vector<HashJoinCondition> &join_conditions,
bool allow_spill_to_disk, JoinType join_type,
const Mem_root_array<Item *> &extra_conditions,
bool probe_input_batch_mode,
uint64_t *hash_table_generation);
HashJoinIterator类的接口如下:
// 使用build input的数据构建哈希表,内存存不下,剩余数据就落盘
bool BuildHashTable();
// 从下一个chunk文件中读取数据到内存中构建哈希表
bool ReadNextHashJoinChunk();
// 从probe input读取一行数据,如果当前已经开始落盘操作,那么读取到的数据会落盘
// 读取完,迭代器的状态会有如下改变:
// READING_FIRST_ROW_FROM_HASH_TABLE:这行数据已经读取完放在内存里了
// LOADING_NEXT_CHUNK_PAIR:probe input已经没有数据可以读取了
bool ReadRowFromProbeIterator();
// 从当前的probe chunk文件中读取一行数据到内存,迭代器的返回状态和上面相同
bool ReadRowFromProbeChunkFile();
// 从probe saving 文件中读取一行数据,我本来没搞懂这个savingfile和上面的chunk file有什么区别
// 看之前的看介绍可以知道,chunkfile是和build input成对的,落盘用的,结构是HashJoinChunk
// 而savingFile是为了记录没有匹配到的数据的
// 比如semi join,是匹配到就返回的,如果哈希表被构建了多次,那么probe input也会有多次,
// 那么就会产生重复值,所以需要记录没有匹配的数据,匹配过的就不管了
bool ReadRowFromProbeRowSavingFile();
// 在哈希表中查找m_temporary_row_and_join_key_buffer的键值
void LookupProbeRowInHashTable();
// 判断是否启用了disk hash join
bool on_disk_hash_join() const { return !m_chunk_files_on_disk.empty(); }
// 如果允许落盘操作,将probe input的行写到chunkfile中
bool WriteProbeRowToDiskIfApplicable();
// 判断哈希匹配成功的数据能不能满足其他条件,都满足就返回true
bool JoinedRowPassesExtraConditions() const;
// 看能不能去掉哈希表中的重复键值
bool RejectDuplicateKeys() const
// 以下几个初始化函数顾名思义
bool InitRowBuffer();
bool InitProbeIterator();
bool InitWritingToProbeRowSavingFile();
bool InitReadingFromProbeRowSavingFile();
// 设置probe input的迭代器状态
void SetReadingProbeRowState();
// 找哈希表中下一个匹配的数据,并查看满不满足其他join条件,如果需要还会落盘
int ReadNextJoinedRowFromHashTable();
mysql的源码注释写的非常详细,弄懂这些接口,也就能明白mysql对于哈希join的处理了。
看mysql的源码确实非常受益,看这些大神们是怎么管理自己的代码的,看他们对于内存管理怎么做的,像join_buffer_size这种东西,实际上超没超,肯定是插入数据之后才能看出来,最起码我是这么写的,但是mysql的开发者们就很聪明的新建了个临时缓存区,判断无误后才能放入buffer中,这样就可以很便捷的管理内存了,类似这种思路,mysql中还有很多,确实能感受到设计的严谨,像我在实际写内存限制的时候,这些细节就考虑不好,读书有益,读源码也一样的。