天天看點

從頭開始進行CUDA程式設計:線程間協作的常見技術

作者:deephub

在前一篇文章中,我們介紹了如何使用 GPU 運作的并行算法。這些并行任務是那些完全互相獨立的任務,這點與我們一般認識的程式設計方式有很大的不同,雖然我們可以從并行中受益,但是這種奇葩的并行運作方式對于我們來說肯定感到非常的複雜。是以在本篇文章的Numba代碼中,我們将介紹一些允許線程在計算中協作的常見技術。

首先還是載入相應的包

from time import perf_counter
import numpy as np
import numba
from numba import cuda
print(np.__version__)
print(numba.__version__)
cuda.detect()
# 1.21.6
# 0.55.2
# Found 1 CUDA devices
# id 0 b'Tesla T4' [SUPPORTED]
# Compute Capability: 7.5
# PCI Device ID: 4
# PCI Bus ID: 0
# UUID: GPU-bcc6196e-f26e-afdc-1db3-6eba6ff160f0
# Watchdog: Disabled
# FP32/FP64 Performance Ratio: 32
# Summary:
# 1/1 devices are supported
# True           

不要忘記,我們這裡是CUDA程式設計,是以NV的GPU是必須的,比如可以去colab或者Kaggle白嫖。

線程間的協作

簡單的并行歸約算法

我們将從一個非常簡單的問題開始本節:對數組的所有元素求和。這個算法非常簡單。如果不使用NumPy,我們可以這樣實作它:

def sum_cpu(array):
s = 0.0
for i in range(array.size):
s += array[i]
return s           

這看起來不是很 Pythonic。 但它能夠讓我們了解它正在跟蹤數組中的所有元素。 如果 s 的結果依賴于數組的每個元素,我們如何并行化這個算法呢? 首先,我們需要重寫算法以允許并行化, 如果有無法并行化的部分則應該允許線程互相通信。

到目前為止,我們還沒有學會如何讓線程互相通信……事實上,我們之前說過不同塊中的線程不通信。 我們可以考慮隻啟動一個塊,但是我們上次也說了,在大多數 GPU 中塊隻能有 1024 個線程!

如何克服這一點? 如果将數組拆分為 1024 個塊(或适當數量的threads_per_block)并分别對每個塊求和呢? 然後最後,我們可以将每個塊的總和的結果相加。 圖 2.1 顯示了一個非常簡單的 2 塊拆分示例。

從頭開始進行CUDA程式設計:線程間協作的常見技術

上圖就是對數組元素求和的“分而治之”方法。

如何在 GPU 上做到這一點呢? 首先需要将數組拆分為塊。 每個數組塊将隻對應一個具有固定數量的線程的CUDA塊。 在每個塊中,每個線程可以對多個數組元素求和。 然後将這些每個線程的值求和,這裡就需要線程進行通信,我們将在下一個示例中讨論如何通信。

由于我們正在對塊進行并行化,是以核心的輸出應該被設定為一個塊。 為了完成Reduce,我們将其複制到 CPU 并在那裡完成工作。

threads_per_block = 1024 # Why not!
blocks_per_grid = 32 * 80 # Use 32 * multiple of streaming multiprocessors
# Example 2.1: Naive reduction
@cuda.jit
def reduce_naive(array, partial_reduction):
i_start = cuda.grid(1)
threads_per_grid = cuda.blockDim.x * cuda.gridDim.x
s_thread = 0.0
for i_arr in range(i_start, array.size, threads_per_grid):
s_thread += array[i_arr]
# We need to create a special *shared* array which will be able to be read
# from and written to by every thread in the block. Each block will have its
# own shared array. See the warning below!
s_block = cuda.shared.array((threads_per_block,), numba.float32)

# We now store the local temporary sum of a single the thread into the
# shared array. Since the shared array is sized
# threads_per_block == blockDim.x
# (1024 in this example), we should index it with `threadIdx.x`.
tid = cuda.threadIdx.x
s_block[tid] = s_thread

# The next line synchronizes the threads in a block. It ensures that after
# that line, all values have been written to `s_block`.
cuda.syncthreads()
# Finally, we need to sum the values from all threads to yield a single
# value per block. We only need one thread for this.
if tid == 0:
# We store the sum of the elements of the shared array in its first
# coordinate
for i in range(1, threads_per_block):
s_block[0] += s_block[i]
# Move this partial sum to the output. Only one thread is writing here.
partial_reduction[cuda.blockIdx.x] = s_block[0]           

這裡需要注意的是必須共享數組,并且讓每個數組塊變得“小”

這裡的“小”: 确切大小取決于 GPU 的計算能力,通常在 48 KB 和 163 KB 之間。 請參閱此表中的“每個線程塊的最大共享記憶體量”項。(https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#features-and-technical-specifications__technical-specifications-per-compute-capability)

在編譯時有一個已知的大小(這就是我們調整共享數組threads_per_block而不是blockDim.x的原因)。 我們總是可以為任何大小的共享數組定義一個工廠函數……但要注意這些核心的編譯時間。

這裡的數組需要為 Numba 類型指定的 dtype,而不是 Numpy 類型(這個沒有為什麼!)。

N = 1_000_000_000
a = np.arange(N, dtype=np.float32)
a /= a.sum() # a will have sum = 1 (to float32 precision)
s_cpu = a.sum()
# Highly-optimized NumPy CPU code
timing_cpu = np.empty(21)
for i in range(timing_cpu.size):
tic = perf_counter()
a.sum()
toc = perf_counter()
timing_cpu[i] = toc - tic
timing_cpu *= 1e3 # convert to ms
print(f"Elapsed time CPU: {timing_cpu.mean():.0f} ± {timing_cpu.std():.0f} ms")
# Elapsed time CPU: 354 ± 24 ms
dev_a = cuda.to_device(a)
dev_partial_reduction = cuda.device_array((blocks_per_grid,), dtype=a.dtype)
reduce_naive[blocks_per_grid, threads_per_block](dev_a, dev_partial_reduction)
s = dev_partial_reduction.copy_to_host().sum() # Final reduction in CPU
np.isclose(s, s_cpu) # Ensure we have the right number
# True
timing_naive = np.empty(21)
for i in range(timing_naive.size):
tic = perf_counter()
reduce_naive[blocks_per_grid, threads_per_block](dev_a, dev_partial_reduction)
s = dev_partial_reduction.copy_to_host().sum()
cuda.synchronize()
toc = perf_counter()
assert np.isclose(s, s_cpu) 
timing_naive[i] = toc - tic
timing_naive *= 1e3 # convert to ms
print(f"Elapsed time naive: {timing_naive.mean():.0f} ± {timing_naive.std():.0f} ms")
# Elapsed time naive: 30 ± 12 ms           

在谷歌Colab上測試,得到了10倍的加速。

題外話:上面這個方法之是以說是簡單的規約算法,是因為這個算法最簡單,也最容易實作。我們在大資料中常見的Map-Reduce算法就是這個算法。雖然實作簡單,但是他容易了解,是以十分常見,當然他慢也是出名的,有興趣的大家可以去研究研究。

一種更好的并行歸約算法

上面的算法最 “樸素”的,是以有很多技巧可以加快這種代碼的速度(請參閱 CUDA 示範文稿中的 Optimizing Parallel Reduction 以獲得基準測試)。

在介紹更好的方法之前,讓我們回顧一下核心函數的的最後一點:

if tid == 0: # Single thread taking care of business
for i in range(1, threads_per_block):
s_block[0] += s_block[i]
partial_reduction[cuda.blockIdx.x] = s_block[0]           

我們并行化了幾乎所有的操作,但是在核心的最後,讓一個線程負責對共享數組 s_block 的所有 threads_per_block 元素求和。為什麼不能把這個總和也并行化呢?

聽起來不錯對吧,下圖顯示了如何在 threads_per_block 大小為 16 的情況下實作這一點。我們從 8 個線程開始工作,第一個将對 s_block[0] 和 s_block[8] 中的值求和。第二個求和s_block[1]和s_block[9]中的值,直到最後一個線程将s_block[7]和s_block[15]的值相加。

在下一步中,隻有前 4 個線程需要工作。第一個線程将對 s_block[0] 和 s_block[4] 求和;第二個,s_block[1] 和 s_block[5];第三個,s_block[2] 和 s_block[6];第四個也是最後一個,s_block[3] 和 s_block[7]。

第三步,隻需要 2 個線程來處理 s_block 的前 4 個元素。

第四步也是最後一步将使用一個線程對 2 個元素求和。

由于工作已線上程之間配置設定,是以它是并行化的。這不是每個線程的平等劃分,但它是一種改進。在計算上,這個算法是 O(log2(threads_per_block)),而上面“樸素”算法是 O(threads_per_block)。比如在我們這個示例中是 1024 次操作,用于 了兩個算法差距有10倍

最後還有一個細節。在每一步,我們都需要確定所有線程都已寫入共享數組。是以我們必須調用 cuda.syncthreads()。

從頭開始進行CUDA程式設計:線程間協作的常見技術
# Example 2.2: Better reduction
@cuda.jit
def reduce_better(array, partial_reduction):
i_start = cuda.grid(1)
threads_per_grid = cuda.blockDim.x * cuda.gridDim.x
s_thread = 0.0
for i_arr in range(i_start, array.size, threads_per_grid):
s_thread += array[i_arr]
# We need to create a special *shared* array which will be able to be read
# from and written to by every thread in the block. Each block will have its
# own shared array. See the warning below!
s_block = cuda.shared.array((threads_per_block,), numba.float32)

# We now store the local temporary sum of the thread into the shared array.
# Since the shared array is sized threads_per_block == blockDim.x,
# we should index it with `threadIdx.x`.
tid = cuda.threadIdx.x
s_block[tid] = s_thread

# The next line synchronizes the threads in a block. It ensures that after
# that line, all values have been written to `s_block`.
cuda.syncthreads()
i = cuda.blockDim.x // 2
while (i > 0):
if (tid < i):
s_block[tid] += s_block[tid + i]
cuda.syncthreads()
i //= 2
if tid == 0:
partial_reduction[cuda.blockIdx.x] = s_block[0]
reduce_better[blocks_per_grid, threads_per_block](dev_a, dev_partial_reduction)
s = dev_partial_reduction.copy_to_host().sum() # Final reduction in CPU
np.isclose(s, s_cpu)
# True
timing_naive = np.empty(21)
for i in range(timing_naive.size):
tic = perf_counter()
reduce_better[blocks_per_grid, threads_per_block](dev_a, dev_partial_reduction)
s = dev_partial_reduction.copy_to_host().sum()
cuda.synchronize()
toc = perf_counter()
assert np.isclose(s, s_cpu) 
timing_naive[i] = toc - tic
timing_naive *= 1e3 # convert to ms
print(f"Elapsed time better: {timing_naive.mean():.0f} ± {timing_naive.std():.0f} ms")
# Elapsed time better: 22 ± 1 ms           

可以看到,這比原始方法快25%。

重要說明:你可能很想将同步線程移動到 if 塊内,因為在每一步之後,超過目前線程數一半的核心将不會被使用。 但是這樣做會使調用同步線程的 CUDA 線程停止并等待所有其他線程,而所有其他線程将繼續運作。 是以停止的線程将永遠等待永遠不會停止同步的線程。如果您同步線程,請確定在所有線程中調用 cuda.syncthreads()。

i = cuda.blockDim.x // 2
while (i > 0):
if (tid < i):
s_block[tid] += s_block[tid + i]
#cuda.syncthreads() 這個是錯的
cuda.syncthreads() # 這個是對的
i //= 2           

Numba 自動歸約

其實歸約算法并不簡單,是以Numba 提供了一個友善的 cuda.reduce 裝飾器,可以将二進制函數轉換為歸約。 是以上面冗長而複雜的算法可以替換為:

@cuda.reduce
def reduce_numba(a, b):
return a + b
# Compile and check
s = reduce_numba(dev_a)
np.isclose(s, s_cpu)
# True
# Time
timing_numba = np.empty(21)
for i in range(timing_numba.size):
tic = perf_counter()
s = reduce_numba(dev_a)
toc = perf_counter()
assert np.isclose(s, s_cpu) 
timing_numba[i] = toc - tic
timing_numba *= 1e3 # convert to ms
print(f"Elapsed time better: {timing_numba.mean():.0f} ± {timing_numba.std():.0f} ms")
# Elapsed time better: 45 ± 0 ms           

上面的運作結果我們可以看到手寫代碼通常要快得多(至少 2 倍),但 Numba 給我們提供的方法卻非常容易使用。 這對我們來說是格好事,因為終于有我們自己實作的Python方法比官方的要快了

這裡還有一點要注意就是預設情況下,要減少複制因為這會強制同步。 為避免這種情況可以使用裝置上數組作為輸出調用歸約:

dev_s = cuda.device_array((1,), dtype=s)

reduce_numba(dev_a, res=dev_s)

s = dev_s.copy_to_host()[0]

np.isclose(s, s_cpu)

# True

二維規約示例

并行約簡技術是非常偉大的,如何将其擴充到更高的次元?雖然我們總是可以使用一個展開的數組(array2 .ravel())調用,但了解如何手動約簡多元數組是很重要的。

在下面這個例子中,将結合剛才所學的知識來計算二維數組。

threads_per_block_2d = (16, 16) # 256 threads total
blocks_per_grid_2d = (64, 64)
# Total number of threads in a 2D block (has to be an int)
shared_array_len = int(np.prod(threads_per_block_2d))
# Example 2.4: 2D reduction with 1D shared array
@cuda.jit
def reduce2d(array2d, partial_reduction2d):
ix, iy = cuda.grid(2)
threads_per_grid_x, threads_per_grid_y = cuda.gridsize(2)
s_thread = 0.0
for i0 in range(iy, array2d.shape[0], threads_per_grid_x):
for i1 in range(ix, array2d.shape[1], threads_per_grid_y):
s_thread += array2d[i0, i1]
# Allocate shared array
s_block = cuda.shared.array(shared_array_len, numba.float32)
# Index the threads linearly: each tid identifies a unique thread in the
# 2D grid.
tid = cuda.threadIdx.x + cuda.blockDim.x * cuda.threadIdx.y
s_block[tid] = s_thread
cuda.syncthreads()
# We can use the same smart reduction algorithm by remembering that
# shared_array_len == blockDim.x * cuda.blockDim.y
# So we just need to start our indexing accordingly.
i = (cuda.blockDim.x * cuda.blockDim.y) // 2
while (i != 0):
if (tid < i):
s_block[tid] += s_block[tid + i]
cuda.syncthreads()
i //= 2

# Store reduction in a 2D array the same size as the 2D blocks
if tid == 0:
partial_reduction2d[cuda.blockIdx.x, cuda.blockIdx.y] = s_block[0]
N_2D = (20_000, 20_000)
a_2d = np.arange(np.prod(N_2D), dtype=np.float32).reshape(N_2D)
a_2d /= a_2d.sum() # a_2d will have sum = 1 (to float32 precision)
s_2d_cpu = a_2d.sum()
dev_a_2d = cuda.to_device(a_2d)
dev_partial_reduction_2d = cuda.device_array(blocks_per_grid_2d, dtype=a.dtype)
reduce2d[blocks_per_grid_2d, threads_per_block_2d](dev_a_2d, dev_partial_reduction_2d)
s_2d = dev_partial_reduction_2d.copy_to_host().sum() # Final reduction in CPU
np.isclose(s_2d, s_2d_cpu) # Ensure we have the right number
# True
timing_2d = np.empty(21)
for i in range(timing_2d.size):
tic = perf_counter()
reduce2d[blocks_per_grid_2d, threads_per_block_2d](dev_a_2d, dev_partial_reduction_2d)
s_2d = dev_partial_reduction_2d.copy_to_host().sum()
cuda.synchronize()
toc = perf_counter()
assert np.isclose(s_2d, s_2d_cpu) 
timing_2d[i] = toc - tic
timing_2d *= 1e3 # convert to ms
print(f"Elapsed time better: {timing_2d.mean():.0f} ± {timing_2d.std():.0f} ms")
# Elapsed time better: 15 ± 4 ms           

裝置函數

到目前為止,我們隻讨論了核心函數,它是啟動線程的特殊GPU函數。核心通常依賴于較小的函數,這些函數在GPU中定義,隻能通路GPU數組。這些被稱為裝置函數(Device functions)。與核心函數不同的是,它們可以傳回值。

我們将展示一個跨不同核心使用裝置函數的示例。該示例還将展示在使用共享數組時同步線程的重要性。

在CUDA的新版本中,核心可以啟動其他核心。這被稱為動态并行,但是Numba 的CUDA API還不支援。

我們将在固定大小的數組中建立波紋圖案。首先需要聲明将使用的線程數,因為這是共享數組所需要的。

threads_16 = 16
import math
@cuda.jit(device=True, inline=True) # inlining can speed up execution
def amplitude(ix, iy):
return (1 + math.sin(2 * math.pi * (ix - 64) / 256)) * (
1 + math.sin(2 * math.pi * (iy - 64) / 256)
)
# Example 2.5a: 2D Shared Array
@cuda.jit
def blobs_2d(array2d):
ix, iy = cuda.grid(2)
tix, tiy = cuda.threadIdx.x, cuda.threadIdx.y
shared = cuda.shared.array((threads_16, threads_16), numba.float32)
shared[tiy, tix] = amplitude(iy, ix)
cuda.syncthreads()
array2d[iy, ix] = shared[15 - tiy, 15 - tix]
# Example 2.5b: 2D Shared Array without synchronize
@cuda.jit
def blobs_2d_wrong(array2d):
ix, iy = cuda.grid(2)
tix, tiy = cuda.threadIdx.x, cuda.threadIdx.y
shared = cuda.shared.array((threads_16, threads_16), numba.float32)
shared[tiy, tix] = amplitude(iy, ix)
# When we don't sync threads, we may have not written to shared
# yet, or even have overwritten it by the time we write to array2d
array2d[iy, ix] = shared[15 - tiy, 15 - tix]
N_img = 1024
blocks = (N_img // threads_16, N_img // threads_16)
threads = (threads_16, threads_16)
dev_image = cuda.device_array((N_img, N_img), dtype=np.float32)
dev_image_wrong = cuda.device_array((N_img, N_img), dtype=np.float32)
blobs_2d[blocks, threads](dev_image)
blobs_2d_wrong[blocks, threads](dev_image_wrong)
image = dev_image.copy_to_host()
image_wrong = dev_image_wrong.copy_to_host()
import matplotlib.pyplot as plt
fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.imshow(image.T, cmap="nipy_spectral")
ax2.imshow(image_wrong.T, cmap="nipy_spectral")
for ax in (ax1, ax2):
ax.set_xticks([])
ax.set_yticks([])
ax.set_xticklabels([])
ax.set_yticklabels([])           
從頭開始進行CUDA程式設計:線程間協作的常見技術

左:同步(正确)核心的結果。正确:來自不同步(不正确)核心的結果。

總結

本文介紹了如何開發需要規約模式來處理1D和2D數組的核心函數。在這個過程中,我們學習了如何利用共享數組和裝置函數。

作者:Carlos Costa, Ph.D.