天天看點

基于資料驅動 U-Net 模型的大氣污染物擴散快速預測

基于資料驅動 U-Net 模型的大氣污染物擴散快速預測

01

項目背景

目前,常見的大氣污染預測模型大多是基于實體機理建構的,比如空氣品質預測模型 Calpuff、AERMOD、CMAQ 等。然而,這些模型運算較為複雜,對于輸入資料的要求非常高,運算耗時也比較長,适合用于正常固定區域的預報。當遇到突發污染事件時,就無法有效發揮作用。

針對以上問題,本項目以某城區3km*3km 範圍的固定模拟區域,根據污染物擴散模型,快速計算任意釋放點源和任意風向的污染物擴散動圖,并進行精度評估。僅利用城市局部污染物擴散雲圖作為輸入,使用深度學習模型提取圖像中污染物擴散的特征,純資料驅動,無需建立實體模型,預測耗時短,适合作為突發污染擴散事件時的應急處置決策輔助。

02

項目需求

▎課題名稱

基于資料驅動的污染物擴散深度學習模型案例

▎課題需求

外部機關提供資料集,總資料集較長的描述:120個動圖資料(3個風速*5個釋放源點位 * 8個風向)。選取其中任意1個動圖的資料,基于資料驅動類模型(模型不限制)提取資料特征,得到污染物擴散模型,可對污染物擴散進行預測。

項目位址

03

實作過程

▎資料集

我們選擇了風速15m/s,風向正北,Pos_0作為污染源釋放點的動圖資料,資料來源于某城區3km*3km 範圍的固定區域内污染物擴散 CFD 模拟結果(南京歐帕提亞公司提供),共745秒148張污染物濃度雲圖,兩張圖檔時間間隔5秒。

基于飛槳2.4.0的開發環境,在對動圖解壓之後,我們發現動圖解壓得到的181張靜态圖檔中第148張之後的圖檔存在明顯的圖像抖動。我們采用了基于 Harris 角點檢測的圖像對齊算法進行處理,但是圖像抖動沒有得到完全消除。為了保證模型輸入資料的品質,我們丢棄了第148張之後的靜态圖檔。

基于資料驅動 U-Net 模型的大氣污染物擴散快速預測
基于資料驅動 U-Net 模型的大氣污染物擴散快速預測
基于資料驅動 U-Net 模型的大氣污染物擴散快速預測
基于資料驅動 U-Net 模型的大氣污染物擴散快速預測

圖1 原始資料

▎U-Net 網絡模型

網絡模型如圖2所示,其由3個 Encoder/Decoder、9個卷積 Conv、9個反卷積 Conv-T 組成,約30萬個訓練參數。之是以選擇 U-Net,是因為該網絡在圖像分割和目辨別别中應用廣泛,污染物擴散模式學習可以看作是一種動态的目辨別别任務,隻不過目标的形态比較抽象;另一個原因是 U-Net 的代碼實作較簡單,短時間内可以完成網絡的搭建。

基于資料驅動 U-Net 模型的大氣污染物擴散快速預測

圖2 U-Net 網絡圖

▎核心代碼

import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.nn.utils import weight_norm

# 建立基礎卷積層
def create_layer(in_channels, out_channels, kernel_size, wn=True, bn=True,
                 activation=nn.ReLU, convolution=nn.Conv2D):
    assert kernel_size % 2 == 1
    layer = [ ]
    conv = convolution(in_channels, out_channels, kernel_size, padding=kernel_size // 2)
    if wn:
        conv = weight_norm(conv)
    layer.append(conv)
    if activation is not None:
        layer.append(activation())
    if bn:
        layer.append(nn.BatchNorm2D(out_channels))
    return nn.Sequential(*layer)

# 建立Encoder中的單個塊
def create_encoder_block(in_channels, out_channels, kernel_size, wn=True, bn=True,
                         activation=nn.ReLU, layers=2):
    encoder = [ ]
    for i in range(layers):
        _in = out_channels
        _out = out_channels
        if i == 0:
            _in = in_channels
        encoder.append(create_layer(_in, _out, kernel_size, wn, bn, activation, nn.Conv2D))
    return nn.Sequential(*encoder)

# 建立Decoder中的單個塊
def create_decoder_block(in_channels, out_channels, kernel_size, wn=True, bn=True,
                         activation=nn.ReLU, layers=2, final_layer=False):
    decoder = [ ]
    for i in range(layers):
        _in = in_channels
        _out = in_channels
        _bn = bn
        _activation = activation
        if i == 0:
            _in = in_channels * 2
        if i == layers - 1:
            _out = out_channels
            if final_layer:
                _bn = False
                _activation = None
        decoder.append(create_layer(_in, _out, kernel_size, wn, _bn, _activation, nn.Conv2DTranspose))
    return nn.Sequential(*decoder)

# 建立Encoder
def create_encoder(in_channels, filters, kernel_size, wn=True, bn=True, activation=nn.ReLU, layers=2):
    encoder = [ ]
    for i in range(len(filters)):
        if i == 0:
            encoder_layer = create_encoder_block(in_channels, filters[i], kernel_size, wn, bn, activation, layers)
        else:
            encoder_layer = create_encoder_block(filters[i - 1], filters[i], kernel_size, wn, bn, activation, layers)
        encoder = encoder + [encoder_layer]
    return nn.Sequential(*encoder)

# 建立Decoder
def create_decoder(out_channels, filters, kernel_size, wn=True, bn=True, activation=nn.ReLU, layers=2):
    decoder = []
    for i in range(len(filters)):
        if i == 0:
            decoder_layer = create_decoder_block(filters[i], out_channels, kernel_size, wn, bn, activation, layers,
                                                 final_layer=True)
        else:
            decoder_layer = create_decoder_block(filters[i], filters[i - 1], kernel_size, wn, bn, activation, layers,
                                                 final_layer=False)
        decoder = [decoder_layer] + decoder
    return nn.Sequential(*decoder)

# 建立網絡
class UNetEx(nn.Layer):
    def __init__(self, in_channels, out_channels, kernel_size=3, filters=[16, 32, 64], layers=3,
                 weight_norm=True, batch_norm=True, activation=nn.ReLU, final_activation=None):
        super().__init__()
        assert len(filters) > 0
        self.final_activation = final_activation
        self.encoder = create_encoder(in_channels, filters, kernel_size, weight_norm, batch_norm, activation, layers)
        decoders = [ ]
        # for i in range(out_channels):
        decoders.append(create_decoder(out_channels, filters, kernel_size, weight_norm, batch_norm, activation, layers))
        self.decoders = nn.Sequential(*decoders)

    def encode(self, x):
        tensors = [ ]
        indices = [ ]
        sizes = [ ]
        for encoder in self.encoder:
            x = encoder(x)
            sizes.append(x.shape)
            tensors.append(x)
            x, ind = F.max_pool2d(x, 2, 2, return_mask=True)
            indices.append(ind)
        return x, tensors, indices, sizes

    def decode(self, _x, _tensors, _indices, _sizes):
        y = [ ]
        for _decoder in self.decoders:
            x = _x
            tensors = _tensors[:]
            indices = _indices[:]
            sizes = _sizes[:]
            for decoder in _decoder:
                tensor = tensors.pop()
                size = sizes.pop()
                ind = indices.pop()
                # 反池化操作,為上采樣
                x = F.max_unpool2d(x, ind, 2, 2, output_size=size)
                x = paddle.concat([tensor, x], axis=1)
                x = decoder(x)
            y.append(x)
        return paddle.concat(y, axis=1)

    def forward(self, x):
        x, tensors, indices, sizes = self.encode(x)
        x = self.decode(x, tensors, indices, sizes)
        if self.final_activation is not None:
            x = self.final_activation(x)
        return x           

▎訓練

訓練時輸入資料為上一時刻的污染物雲圖,輸出為預測的下一時刻的污染物雲圖。目前的訓練 batch-size 為1,即隻預測下一時刻的污染物擴散情況。訓練時,每10個 epoch 儲存一次模型,防止訓練意外中斷時模型參數丢失。

# 訓練方法
def train(model, train_dataset, criterion, optimizer, device, num_epochs):
    loss_history = [ ]
    epoch_loss = 0
# 周遊批次
    for epoch in range(num_epochs):
        optimizer.clear_grad()
        for batch_id in range(len(train_dataset)-1):
            inputs = train_dataset[batch_id]
            outputs_true = train_dataset[batch_id+1]

            inputs = T.ToTensor()(inputs)
            inputs = paddle.unsqueeze(inputs, 0)
            outputs_true = T.ToTensor()(outputs_true)
            outputs_true = paddle.unsqueeze(outputs_true, 0)

# 訓練
            outputs = model(inputs)
# 計算損失值
            loss = criterion(outputs, outputs_true)
            if batch_id % 10 ==0:
                print('epoch:',epoch,'batch_id:',batch_id,'loss:',loss.numpy())
            loss.backward()

            epoch_loss += loss.item()
        optimizer.step()
        epoch_loss /= len(train_dataset)


        loss_history.append(epoch_loss)
        print("Epoch [{}/{}], Loss: {:.8f}".format(epoch + 1, num_epochs, loss.numpy()[0]))

    # 儲存模型
        if epoch % 10 == 0:
            save_model(model, '/home/aistudio/pollution_model.pdparams')

    print("Training complete.")
return loss_history           

▎預測

預測時,輸入測試資料某時刻的污染物擴散雲圖,預測下一時刻的污染物擴散情況。測試函數中 supervise 這個 flag 為後續連續預測多個時刻的資料預置了接口。目前 supervise 置為 true,當模型預備連續預測多個時刻資料時,測試時将 supervise 置為 false。

def test():
    # 初始化結果清單
    results = [ ]

    # 測試集合起始點
    inputs = test_dataset[0]
    inputs = T.ToTensor()(inputs)
    inputs = paddle.unsqueeze(inputs, 0)

    # 是否supervise
    flag_supervise = True

    device = paddle.set_device('gpu' if paddle.is_compiled_with_cuda() else 'cpu')
    # 加載模型
    model = UNetEx(3,3,3)
    load_model(model,'/home/aistudio/pollution_model.pdparams',device)

    for num in range(1,10):

        # 進行預測
        outputs = model(inputs)

        outputs_np = outputs.numpy()
        outputs_np = np.squeeze(outputs_np, axis=0)  # 去除第一個次元(batch_size)
        outputs_np = np.transpose(outputs_np, (1, 2, 0))  # 将通道次元調整為最後一個次元
        outputs_np = (255 * np.clip(outputs_np, 0, 1)).astype('uint8')
        #outputs_np = outputs_np.transpose([1, 2, 0])
        #outputs_np_uint8 = (outputs_np * 255).astype(np.uint8)
        # 将預測結果添加到結果清單中
        results.append(outputs_np)

        if flag_supervise == False:
            # 将預測結果作為下一幀的輸入
            inputs = outputs
        else:
            # 使用真實資料預測
            inputs = test_dataset[num+1]
            inputs = T.ToTensor()(inputs)
            inputs = paddle.unsqueeze(inputs, 0)

    return results

results = test()           

04

項目成果

基于資料驅動 U-Net 模型的大氣污染物擴散快速預測

圖3 計算函數損失值

基于資料驅動 U-Net 模型的大氣污染物擴散快速預測

圖4 對比 CFD 模拟參數對比

基于資料驅動 U-Net 模型的大氣污染物擴散快速預測

圖5 殘內插補點對比

如圖5所示,濃度誤差主要集中在污染源附近(如圖紅色框),主要數值分布在-0.02~0.02之間。不同顔色分别代表不同濃度區間誤差,藍色表示的低濃度相對誤差較小,綠色紅色表示的中高濃度誤差平均誤差較高,綠色區域表征的中等濃度區域,偏大的誤差影響的面積較大。

基于資料驅動 U-Net 模型的大氣污染物擴散快速預測

圖6 數值對比

05

未來發展方向

▎預測能力方面

基于前一時刻的污染物濃度雲圖,預測後十個時刻、二十個時刻,四十個時刻的污染物濃度雲圖;

嘗試用多時刻預測多時刻。

▎網絡方面

嘗試引入更先進的網絡架構,如 transformer;

對于網絡層數和每層網絡的神經元個數,嘗試進行敏感性分析和誤差分析;

嘗試引入更多種類的激活函數如 tanh,silu 等;

嘗試對 learning rate、batch size 等超參數進行調整實驗。

▎實體原理方面

嘗試引入實體先驗知識,對建築、邊界位置施加 loss 軟限制;

嘗試利用流體 NS 方程對模型進行修正。

▎模型方面

嘗試引入更多參數作為輸入:如污染源位置、污染源初始濃度等提高模型的适應能力;

增加模型參數量級,探索大模型對複雜多态問題的處理能力;

嘗試和傳統流體求解方法進行融合。

06

項目意義與心得

本項目嘗試用 U-Net 網絡通過污染物擴散雲圖來學習污染物擴散的模型參數,對污染物擴散進行快速預測,是資料驅動計算場景拓展的一次探索。從項目結果來看,模型計算速度相比 CFD 模拟提升明顯,但是模型預測的效果還有待提升,未來将通過探索以上幾個方向,不斷優化模型預測效果。項目實作過程中,我們花費了大量的時間處理背景存在抖動的圖像,直到後來發現有一部分資料集的品質要遠遠好于另一部分,我們選擇放棄品質不好的資料,進而加快了項目的進展。

資料處理過程中有以下幾個方面的心得。

第一,對項目的資料應該第一時間進行全局探索,了解資料的全貌,對資料品質進行評估;

第二,與其花費大量的時間處理品質不好的資料,不如先使用品質較好的資料,優先做對模型取得進展更加關鍵的事情;

第三,相對于改變模型的結構,提高輸入資料的品質對模型的訓練結果起到更加積極的作用。一些開源模型的效果無法複現的原因在于訓練資料的不公開,即便大家都用到同樣結構的網絡,但是訓練資料不同,模型取得的效果就大不相同。從這個角度看,模型參數是訓練資料在網絡上留下的壓縮資訊,訓練資料存在的瑕疵很難通過優化網絡來解決。

飛槳 AI for Science 共創計劃為本項目提供了強大的技術支援,打造活躍的前瞻性的 AI for Science 開源社群,通過飛槳 AI for Science 共創計劃,學習到了如何在飛槳平台上使用科學計算的 AI 方法去解決 CFD 模拟預測的問題,并且大幅度提高了資料驅動計算的速度。相信未來會有越來越多的項目通過 AI for Science 共創計劃建立産學研閉環,推動科研創新與産業賦能。

■ 相關位址

飛槳 AI for Science 共創計劃

飛槳 PPSIG-Science 小組

飛槳 PaddleScience 工具元件

​​​​

繼續閱讀