01
項目背景
目前,常見的大氣污染預測模型大多是基于實體機理建構的,比如空氣品質預測模型 Calpuff、AERMOD、CMAQ 等。然而,這些模型運算較為複雜,對于輸入資料的要求非常高,運算耗時也比較長,适合用于正常固定區域的預報。當遇到突發污染事件時,就無法有效發揮作用。
針對以上問題,本項目以某城區3km*3km 範圍的固定模拟區域,根據污染物擴散模型,快速計算任意釋放點源和任意風向的污染物擴散動圖,并進行精度評估。僅利用城市局部污染物擴散雲圖作為輸入,使用深度學習模型提取圖像中污染物擴散的特征,純資料驅動,無需建立實體模型,預測耗時短,适合作為突發污染擴散事件時的應急處置決策輔助。
02
項目需求
▎課題名稱
基于資料驅動的污染物擴散深度學習模型案例
▎課題需求
外部機關提供資料集,總資料集較長的描述:120個動圖資料(3個風速*5個釋放源點位 * 8個風向)。選取其中任意1個動圖的資料,基于資料驅動類模型(模型不限制)提取資料特征,得到污染物擴散模型,可對污染物擴散進行預測。
項目位址
03
實作過程
▎資料集
我們選擇了風速15m/s,風向正北,Pos_0作為污染源釋放點的動圖資料,資料來源于某城區3km*3km 範圍的固定區域内污染物擴散 CFD 模拟結果(南京歐帕提亞公司提供),共745秒148張污染物濃度雲圖,兩張圖檔時間間隔5秒。
基于飛槳2.4.0的開發環境,在對動圖解壓之後,我們發現動圖解壓得到的181張靜态圖檔中第148張之後的圖檔存在明顯的圖像抖動。我們采用了基于 Harris 角點檢測的圖像對齊算法進行處理,但是圖像抖動沒有得到完全消除。為了保證模型輸入資料的品質,我們丢棄了第148張之後的靜态圖檔。
圖1 原始資料
▎U-Net 網絡模型
網絡模型如圖2所示,其由3個 Encoder/Decoder、9個卷積 Conv、9個反卷積 Conv-T 組成,約30萬個訓練參數。之是以選擇 U-Net,是因為該網絡在圖像分割和目辨別别中應用廣泛,污染物擴散模式學習可以看作是一種動态的目辨別别任務,隻不過目标的形态比較抽象;另一個原因是 U-Net 的代碼實作較簡單,短時間内可以完成網絡的搭建。
圖2 U-Net 網絡圖
▎核心代碼
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.nn.utils import weight_norm
# 建立基礎卷積層
def create_layer(in_channels, out_channels, kernel_size, wn=True, bn=True,
activation=nn.ReLU, convolution=nn.Conv2D):
assert kernel_size % 2 == 1
layer = [ ]
conv = convolution(in_channels, out_channels, kernel_size, padding=kernel_size // 2)
if wn:
conv = weight_norm(conv)
layer.append(conv)
if activation is not None:
layer.append(activation())
if bn:
layer.append(nn.BatchNorm2D(out_channels))
return nn.Sequential(*layer)
# 建立Encoder中的單個塊
def create_encoder_block(in_channels, out_channels, kernel_size, wn=True, bn=True,
activation=nn.ReLU, layers=2):
encoder = [ ]
for i in range(layers):
_in = out_channels
_out = out_channels
if i == 0:
_in = in_channels
encoder.append(create_layer(_in, _out, kernel_size, wn, bn, activation, nn.Conv2D))
return nn.Sequential(*encoder)
# 建立Decoder中的單個塊
def create_decoder_block(in_channels, out_channels, kernel_size, wn=True, bn=True,
activation=nn.ReLU, layers=2, final_layer=False):
decoder = [ ]
for i in range(layers):
_in = in_channels
_out = in_channels
_bn = bn
_activation = activation
if i == 0:
_in = in_channels * 2
if i == layers - 1:
_out = out_channels
if final_layer:
_bn = False
_activation = None
decoder.append(create_layer(_in, _out, kernel_size, wn, _bn, _activation, nn.Conv2DTranspose))
return nn.Sequential(*decoder)
# 建立Encoder
def create_encoder(in_channels, filters, kernel_size, wn=True, bn=True, activation=nn.ReLU, layers=2):
encoder = [ ]
for i in range(len(filters)):
if i == 0:
encoder_layer = create_encoder_block(in_channels, filters[i], kernel_size, wn, bn, activation, layers)
else:
encoder_layer = create_encoder_block(filters[i - 1], filters[i], kernel_size, wn, bn, activation, layers)
encoder = encoder + [encoder_layer]
return nn.Sequential(*encoder)
# 建立Decoder
def create_decoder(out_channels, filters, kernel_size, wn=True, bn=True, activation=nn.ReLU, layers=2):
decoder = []
for i in range(len(filters)):
if i == 0:
decoder_layer = create_decoder_block(filters[i], out_channels, kernel_size, wn, bn, activation, layers,
final_layer=True)
else:
decoder_layer = create_decoder_block(filters[i], filters[i - 1], kernel_size, wn, bn, activation, layers,
final_layer=False)
decoder = [decoder_layer] + decoder
return nn.Sequential(*decoder)
# 建立網絡
class UNetEx(nn.Layer):
def __init__(self, in_channels, out_channels, kernel_size=3, filters=[16, 32, 64], layers=3,
weight_norm=True, batch_norm=True, activation=nn.ReLU, final_activation=None):
super().__init__()
assert len(filters) > 0
self.final_activation = final_activation
self.encoder = create_encoder(in_channels, filters, kernel_size, weight_norm, batch_norm, activation, layers)
decoders = [ ]
# for i in range(out_channels):
decoders.append(create_decoder(out_channels, filters, kernel_size, weight_norm, batch_norm, activation, layers))
self.decoders = nn.Sequential(*decoders)
def encode(self, x):
tensors = [ ]
indices = [ ]
sizes = [ ]
for encoder in self.encoder:
x = encoder(x)
sizes.append(x.shape)
tensors.append(x)
x, ind = F.max_pool2d(x, 2, 2, return_mask=True)
indices.append(ind)
return x, tensors, indices, sizes
def decode(self, _x, _tensors, _indices, _sizes):
y = [ ]
for _decoder in self.decoders:
x = _x
tensors = _tensors[:]
indices = _indices[:]
sizes = _sizes[:]
for decoder in _decoder:
tensor = tensors.pop()
size = sizes.pop()
ind = indices.pop()
# 反池化操作,為上采樣
x = F.max_unpool2d(x, ind, 2, 2, output_size=size)
x = paddle.concat([tensor, x], axis=1)
x = decoder(x)
y.append(x)
return paddle.concat(y, axis=1)
def forward(self, x):
x, tensors, indices, sizes = self.encode(x)
x = self.decode(x, tensors, indices, sizes)
if self.final_activation is not None:
x = self.final_activation(x)
return x
▎訓練
訓練時輸入資料為上一時刻的污染物雲圖,輸出為預測的下一時刻的污染物雲圖。目前的訓練 batch-size 為1,即隻預測下一時刻的污染物擴散情況。訓練時,每10個 epoch 儲存一次模型,防止訓練意外中斷時模型參數丢失。
# 訓練方法
def train(model, train_dataset, criterion, optimizer, device, num_epochs):
loss_history = [ ]
epoch_loss = 0
# 周遊批次
for epoch in range(num_epochs):
optimizer.clear_grad()
for batch_id in range(len(train_dataset)-1):
inputs = train_dataset[batch_id]
outputs_true = train_dataset[batch_id+1]
inputs = T.ToTensor()(inputs)
inputs = paddle.unsqueeze(inputs, 0)
outputs_true = T.ToTensor()(outputs_true)
outputs_true = paddle.unsqueeze(outputs_true, 0)
# 訓練
outputs = model(inputs)
# 計算損失值
loss = criterion(outputs, outputs_true)
if batch_id % 10 ==0:
print('epoch:',epoch,'batch_id:',batch_id,'loss:',loss.numpy())
loss.backward()
epoch_loss += loss.item()
optimizer.step()
epoch_loss /= len(train_dataset)
loss_history.append(epoch_loss)
print("Epoch [{}/{}], Loss: {:.8f}".format(epoch + 1, num_epochs, loss.numpy()[0]))
# 儲存模型
if epoch % 10 == 0:
save_model(model, '/home/aistudio/pollution_model.pdparams')
print("Training complete.")
return loss_history
▎預測
預測時,輸入測試資料某時刻的污染物擴散雲圖,預測下一時刻的污染物擴散情況。測試函數中 supervise 這個 flag 為後續連續預測多個時刻的資料預置了接口。目前 supervise 置為 true,當模型預備連續預測多個時刻資料時,測試時将 supervise 置為 false。
def test():
# 初始化結果清單
results = [ ]
# 測試集合起始點
inputs = test_dataset[0]
inputs = T.ToTensor()(inputs)
inputs = paddle.unsqueeze(inputs, 0)
# 是否supervise
flag_supervise = True
device = paddle.set_device('gpu' if paddle.is_compiled_with_cuda() else 'cpu')
# 加載模型
model = UNetEx(3,3,3)
load_model(model,'/home/aistudio/pollution_model.pdparams',device)
for num in range(1,10):
# 進行預測
outputs = model(inputs)
outputs_np = outputs.numpy()
outputs_np = np.squeeze(outputs_np, axis=0) # 去除第一個次元(batch_size)
outputs_np = np.transpose(outputs_np, (1, 2, 0)) # 将通道次元調整為最後一個次元
outputs_np = (255 * np.clip(outputs_np, 0, 1)).astype('uint8')
#outputs_np = outputs_np.transpose([1, 2, 0])
#outputs_np_uint8 = (outputs_np * 255).astype(np.uint8)
# 将預測結果添加到結果清單中
results.append(outputs_np)
if flag_supervise == False:
# 将預測結果作為下一幀的輸入
inputs = outputs
else:
# 使用真實資料預測
inputs = test_dataset[num+1]
inputs = T.ToTensor()(inputs)
inputs = paddle.unsqueeze(inputs, 0)
return results
results = test()
04
項目成果
圖3 計算函數損失值
圖4 對比 CFD 模拟參數對比
圖5 殘內插補點對比
如圖5所示,濃度誤差主要集中在污染源附近(如圖紅色框),主要數值分布在-0.02~0.02之間。不同顔色分别代表不同濃度區間誤差,藍色表示的低濃度相對誤差較小,綠色紅色表示的中高濃度誤差平均誤差較高,綠色區域表征的中等濃度區域,偏大的誤差影響的面積較大。
圖6 數值對比
05
未來發展方向
▎預測能力方面
基于前一時刻的污染物濃度雲圖,預測後十個時刻、二十個時刻,四十個時刻的污染物濃度雲圖;
嘗試用多時刻預測多時刻。
▎網絡方面
嘗試引入更先進的網絡架構,如 transformer;
對于網絡層數和每層網絡的神經元個數,嘗試進行敏感性分析和誤差分析;
嘗試引入更多種類的激活函數如 tanh,silu 等;
嘗試對 learning rate、batch size 等超參數進行調整實驗。
▎實體原理方面
嘗試引入實體先驗知識,對建築、邊界位置施加 loss 軟限制;
嘗試利用流體 NS 方程對模型進行修正。
▎模型方面
嘗試引入更多參數作為輸入:如污染源位置、污染源初始濃度等提高模型的适應能力;
增加模型參數量級,探索大模型對複雜多态問題的處理能力;
嘗試和傳統流體求解方法進行融合。
06
項目意義與心得
本項目嘗試用 U-Net 網絡通過污染物擴散雲圖來學習污染物擴散的模型參數,對污染物擴散進行快速預測,是資料驅動計算場景拓展的一次探索。從項目結果來看,模型計算速度相比 CFD 模拟提升明顯,但是模型預測的效果還有待提升,未來将通過探索以上幾個方向,不斷優化模型預測效果。項目實作過程中,我們花費了大量的時間處理背景存在抖動的圖像,直到後來發現有一部分資料集的品質要遠遠好于另一部分,我們選擇放棄品質不好的資料,進而加快了項目的進展。
資料處理過程中有以下幾個方面的心得。
第一,對項目的資料應該第一時間進行全局探索,了解資料的全貌,對資料品質進行評估;
第二,與其花費大量的時間處理品質不好的資料,不如先使用品質較好的資料,優先做對模型取得進展更加關鍵的事情;
第三,相對于改變模型的結構,提高輸入資料的品質對模型的訓練結果起到更加積極的作用。一些開源模型的效果無法複現的原因在于訓練資料的不公開,即便大家都用到同樣結構的網絡,但是訓練資料不同,模型取得的效果就大不相同。從這個角度看,模型參數是訓練資料在網絡上留下的壓縮資訊,訓練資料存在的瑕疵很難通過優化網絡來解決。
飛槳 AI for Science 共創計劃為本項目提供了強大的技術支援,打造活躍的前瞻性的 AI for Science 開源社群,通過飛槳 AI for Science 共創計劃,學習到了如何在飛槳平台上使用科學計算的 AI 方法去解決 CFD 模拟預測的問題,并且大幅度提高了資料驅動計算的速度。相信未來會有越來越多的項目通過 AI for Science 共創計劃建立産學研閉環,推動科研創新與産業賦能。
■ 相關位址
飛槳 AI for Science 共創計劃
飛槳 PPSIG-Science 小組
飛槳 PaddleScience 工具元件