天天看點

PET-AI解讀 | rs-fMRI的GNN和TCN模組化(圖建構,時間序列歸一化)

  • 相關論文:A deep graph neural network architecture for modelling spatio-temporal dynamics in resting-state functional MRI data
  • 相關repo:​​github.com/tjiagoM/spa…​​
  • 筆記人:陳亦新

這個是代碼解讀的第一部分。主要是資料加載的了解。包含下面三個部分:

  • 時間序列資料如何變成graph
  • graph如何轉化成無向圖和稀疏圖的
  • 時間序列如何進行歸一化

資料加載

我們主要從UKB資料中看看對資料的處理線索:

class UKBDataset(BrainDataset):
    def __init__(self,...):

    @property
    def processed_file_names(self):
        return ['data_ukb_brain.dataset']

    def __create_data_object(self,...):

    def process(self):      

process函數調用了__create_data_object函數,是很規範的書寫方式,贊一個。我們按照process内的部分來盤邏輯。

def process(self):
    ...
    conn_measure = ConnectivityMeasure(
            kind='correlation',
            vectorize=False)
    ...      

這個ConnectivityMeasure是從nilearn.connectome包中引入的方法。

【nilearn包介紹】

這是一個用可以讓機器學習、模式識别和多元統計更容易的應用在神經成像的資料上。可以很容易的應用在任務态和靜息态、VBM資料。此外nilearn具有繪圖功能,可以可視化神經成像等等。

ConnectivityMeasure這個方法就是建構相關性矩陣。

def process(self):
    ...
    conn_measure = ConnectivityMeasure(
            kind='correlation',
            vectorize=False)
    ...
    corr_tmp = conn_measure.fit_transform([ts])
    ...      

corr_tmp就是相關性矩陣,然後通過create_thresholded_graph生成graph。

def threshold_adj_array(adj_array: np.ndarray, threshold: int, num_nodes: int) -> np.ndarray:
    num_to_filter: int = int((threshold / 100.0) * (num_nodes * (num_nodes - 1) / 2))
    # 先将下三角的資料變成0,也包含對角線上的
    adj_array[np.tril_indices(num_nodes)] = 0
    # 擷取非0元素的坐标索引
    indices = np.where(adj_array)
    #從array中提取非0元素,并且排序,然後變成從大到小排序
    sorted_indices = np.argsort(adj_array[indices])[::-1]
    # 然後選擇num_to_filter個最大的數字保留,其他置0
    adj_array[(indices[0][sorted_indices][num_to_filter:], indices[1][sorted_indices][num_to_filter:])] = 0
    # 重新建構成無向圖對稱
    adj_array = adj_array + adj_array.T

    # 将對角線上的1補回來。
    adj_array[np.diag_indices(num_nodes)] = 1.0

    return adj_array      

這個過程比較有趣。我在上面寫了詳細的注釋,總結就是,先去除一般的相關性,因為要建構無向圖。然後去排序,隻保留相關性最大的num_to_filter的邊。

這個graph通過這樣的方法轉換成networkx的格式:

graph = nx.from_numpy_array(adj_array, create_using=nx.DiGraph)      

然後:

edge_index = torch.tensor(np.array(graph.edges()), dtype=torch.long).t().contiguous()      

轉化成下圖的edge索引形式:

PET-AI解讀 | rs-fMRI的GNN和TCN模組化(圖建構,時間序列歸一化)

然後

edge_attr = torch.tensor(list(nx.get_edge_attributes(graph, 'weight').values()),
                                                 dtype=torch.float).unsqueeze(1)      

edge_attr就是一個value的清單,剛好和上面edge_index對應起來:

PET-AI解讀 | rs-fMRI的GNN和TCN模組化(圖建構,時間序列歸一化)

然後,我們需要将所有我們得到的東西,放到這個函數當中:

data = self.__create_data_object(person=person, ts=ts, edge_index=edge_index, edge_attr=edge_attr,
                                                 covars=main_covars)      

其中:

  • person應該就是一個字元串;
  • ts就是對應person的時間序列特征;
  • edge_index和edge_attr就是圖中非0值的索引和數值;
  • main_covars不知道是啥,好像是一個dataframe
PET-AI解讀 | rs-fMRI的GNN和TCN模組化(圖建構,時間序列歸一化)

我們來看__create_data_object函數的邏輯:

def __create_data_object(self, person: int, ts: np.ndarray, covars: pd.DataFrame, edge_attr: torch.Tensor,
                             edge_index: torch.Tensor):
        assert ts.shape[0] > ts.shape[1]  # TS > N

        timeseries = normalise_timeseries(timeseries=ts, normalisation=self.normalisation)

        if self.encoding_strategy == EncodingStrategy.STATS:
            assert timeseries.shape == (self.num_nodes, self.time_length)
            timeseries = calculate_stats_features(timeseries)
            assert timeseries.shape == (self.num_nodes, 16)
            timeseries[np.isnan(timeseries)] = 0
            assert not np.isnan(timeseries).any()

        if self.analysis_type in [AnalysisType.ST_UNIMODAL, AnalysisType.ST_UNIMODAL_AVG]:
            x = torch.tensor(timeseries, dtype=torch.float)

        if self.target_var == 'gender':
            y = torch.tensor([covars.loc[person, 'Sex']], dtype=torch.float)
        elif self.target_var == 'bmi':
            y = torch.tensor([covars.loc[person, 'BMI.at.scan']], dtype=torch.float)
        else:
            y = torch.tensor([covars.loc[person, 'Age.at.scan']], dtype=torch.float)

        data = Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=y)
        data.ukb_id = torch.tensor([person])

        if self.target_var == 'gender':
            data.age = torch.tensor([covars.loc[person, 'Age.at.scan']])
            data.bmi = torch.tensor([covars.loc[person, 'BMI.at.scan']])
        elif self.target_var == 'bmi':
            data.sex = torch.tensor([covars.loc[person, 'Sex']])
            data.age = torch.tensor([covars.loc[person, 'Age.at.scan']])
        else:
            data.sex = torch.tensor([covars.loc[person, 'Sex']])
            data.bmi = torch.tensor([covars.loc[person, 'BMI.at.scan']])

        return data      

對于時間序列,先繼續歸一化,采用的歸一化方式為:Normalisation('subject_norm')

def normalise_timeseries(timeseries: np.ndarray, normalisation: Normalisation) -> np.ndarray:
    """
    :param normalisation:
    :param timeseries: In  format TS x N
    :return:
    """
    if normalisation == Normalisation.ROI:
        scaler = RobustScaler().fit(timeseries)
        timeseries = scaler.transform(timeseries).T
    elif normalisation == Normalisation.SUBJECT:
        flatten_timeseries = timeseries.flatten().reshape(-1, 1)
        scaler = RobustScaler().fit(flatten_timeseries)
        timeseries = scaler.transform(flatten_timeseries).reshape(timeseries.shape).T
    else:  # No normalisation
        timeseries = timeseries.T

    return timeseries      

不管怎麼說,我們把ROI歸一化和SUBJECT歸一化都了解一下。

【ROI歸一化】

RobustScaler來自sklearn.preprocessing包,是對異常值魯棒的統計資訊來縮放特征。減去中值,除以分位數的範圍(預設是四分位數)對資料進行縮放。IQR四分位數範圍為第1個四分位數和第3個四分位數之間的範圍。資料集的标準化是一個常見的需求,通常使用平均值和機關方差來實作,但是異常值往往會對樣本均值方法産生負面影響,是以這個時候使用中位數和IQR的效果會更好。

計算公式如下:

【SUBJECT歸一化】

仔細看了一下,發現其實也是上賣弄的RobustScaler方法。差別在于,ROI歸一化的資料ts是TSxN的資料。N是ROI的數目,TS是時間序列的範圍。是以ROI歸一化是對每一個ROI區域的時間序列進行歸一化。SUBJECT則是将所有的ROI都flatten,對整體的資料做一個歸一化。好處我猜測是因為,這樣可以保留不同ROI區域時間序列之間的整體偏差,保留ROI的整體差異性。

  • 時間序列
  • 圖的索引
  • 圖的值

繼續閱讀