天天看點

CNTK API文檔翻譯(11)——使用LSTM預測時間序列資料(物聯網資料)

在上一期我們開發了一個簡單的LSTM神經網絡來預測時序資料的值。在本期我們要把這模型用在真實世界的物聯網資料上。作為示例,我們會根據之前幾天觀測到的資料預測太陽能電池闆的日産電量。

太陽能發電量預測是一個重要且艱難的問題。太陽能産電量的預測還與天氣預測密切相關。實際上,這個問題分為兩部分,第一部分,我們需要關注太陽能光強度或者其他氣象的變量,另一方面我們需要計算在預測的天氣狀況下太陽能電池闆的産電量。通常來說,這種複雜問題的處理經常會涉及到空間和時間尺度的變化,本教程僅僅簡單的通過之前太陽能電池闆生成的電量資料預測之後的資料。

目标

通過一塊太陽能電池闆以往一天産生的電量,我們需要預測全部太陽能電池闆在預測天内産生的電量。我們需要使用在上期開發的基于時間序列預測的LSTM網絡模型來根據以往的資料預測太陽能電池闆的産電量。

CNTK API文檔翻譯(11)——使用LSTM預測時間序列資料(物聯網資料)

我們使用太陽能電池闆的曆史資料據來訓練模型,在我們的示例中,我們需要使用之前的資料預測整個太陽能電池闆陣列的産電量。我們在最初的兩個資料之後就開始預測,然後與新讀取的資料進行拟合。

在本教程中我們我們會使用上期的LSTM模型,是以與上期類似,有如下幾個部分:

  • 初始化
  • 資料生成
  • LSTM模型建構
  • 模型訓練和評估

初始化

我們先導入一些庫,定義一些以後會用到的常量。

from matplotlib import pyplot as plt
import math
import numpy as np
import os
import pandas as pd
import random
import time

import cntk as C

try:
    from urllib.request import urlretrieve
except ImportError:
    from urllib import urlretrieve
           

在下面的代碼中,我們通過檢查在CNTK内部定義的環境變量來選擇正确的裝置(GPU或者CPU)來運作代碼,如果不檢查的話,會使用CNTK的預設政策來使用最好的裝置(如果GPU可用的話就使用GPU,否則使用CPU)

if 'TEST_DEVICE' in os.environ:
    if os.environ['TEST_DEVICE'] == 'cpu':
        C.device.try_set_default_device(C.device.cpu())
    else:
        C.device.try_set_default_device(C.device.gpu())
           

我們設定了兩種運作模式:

  • 快速模式:isFast變量設定成True。這是我們的預設模式,在這個模式下我們會訓練更少的次數,也會使用更少的資料,這個模式保證功能的正确性,但訓練的結果還遠遠達不到可用的要求。
  • 慢速模式:我們建議學習者在學習的時候試試将isFast變量設定成False,這會讓學習者更加了解本教程的内容。

快速模式我們會訓練100個周期,得到的結果會有比較低的精度,不過對于開發來說已經足夠了。這個模型目前比較好的表現需要訓練1000到2000個周期。

isFast = True

# we need around 2000 epochs to see good accuracy. For testing 100 epochs will do.
EPOCHS =  if isFast else 
           

我們的太陽能電池闆每三十分鐘采集一次資料:

  • solar.current表示目前的産電量,用瓦特表示。
  • solar.total是到當天的目前為止,太陽能電池闆的平均功率,用瓦特/小時表示。

我們的預測采用的資料從最初讀取的兩個資料開始。以這些資料為基礎,我們不斷的預測和把預測結果與新的真實值拟合。我們用到的資料采用csv格式,其形式如下:

time,solar.current,solar.total

7am,6.3,1.7

7:30am,44.3,11.4

我們使用的訓練資料集包含了三年的資料,可以從這裡下載下傳:https://guschmueds.blob.core.windows.net/datasets/solar.csv。資料沒有預處理,有些行的資料有些确實和錯誤。

預處理

在本示例中大部分代碼都是用來資料預處理的。感謝pandas庫讓資料預處理工作變得非常簡單。

generate_solar_data函數執行了如下任務:

  • 以pandas資料幀的方式讀取CSV資料
  • 格式化資料
  • 資料分組
  • 添加solar.current.max列和solar.total.max列。
  • 生成每天的資料序列

序列生成:所有的序列都會被構成一系列的序列,在這裡再也沒有時間戳的概念,隻有序列才是要緊的。

注:如果一天的資料量少于8個,我們會假設當天的資料丢失了,就跳過這天的資料。如果一天的資料多于14條,我們會截取為14條。

訓練/測試/驗證

我們從使用CNTK讀取CSV檔案開始,資料以一行行組織,通過時間排列,我們需要随機将這些資料分割成訓練資料集、驗證資料集和測試資料集,但是這樣的分割會讓資料與真實情況不符。是以,我們我們使用如下方式分割資料:把一天的資料讀取進序列,其中八個資料作為訓練資料,一個是驗證資料,一個是測試資料,這就可以讓訓練資料、驗證資料和測試資料分布在整個時間線上。

def generate_solar_data(input_url, time_steps, normalize=, val_size=, test_size=):
    """
    generate sequences to feed to rnn based on data frame with solar panel data
    the csv has the format: time ,solar.current, solar.total
     (solar.current is the current output in Watt, solar.total is the total production
      for the day so far in Watt hours)
    """
    # try to find the data file local. If it doesn't exists download it.
    cache_path = os.path.join("data", "iot")
    cache_file = os.path.join(cache_path, "solar.csv")
    if not os.path.exists(cache_path):
        os.makedirs(cache_path)
    if not os.path.exists(cache_file):
        urlretrieve(input_url, cache_file)
        print("downloaded data successfully from ", input_url)
    else:
        print("using cache for ", input_url)

    df = pd.read_csv(cache_file, index_col="time", parse_dates=['time'], dtype=np.float32)

    df["date"] = df.index.date

    # normalize data
    df['solar.current'] /= normalize
    df['solar.total'] /= normalize

    # group by day, find the max for a day and add a new column .max
    grouped = df.groupby(df.index.date).max()
    grouped.columns = ["solar.current.max", "solar.total.max", "date"]

    # merge continuous readings and daily max values into a single frame
    df_merged = pd.merge(df, grouped, right_index=True, on="date")
    df_merged = df_merged[["solar.current", "solar.total",
                           "solar.current.max", "solar.total.max"]]
    # we group by day so we can process a day at a time.
    grouped = df_merged.groupby(df_merged.index.date)
    per_day = []
    for _, group in grouped:
        per_day.append(group)

    # split the dataset into train, validatation and test sets on day boundaries
    val_size = int(len(per_day) * val_size)
    test_size = int(len(per_day) * test_size)
    next_val = 
    next_test = 

    result_x = {"train": [], "val": [], "test": []}
    result_y = {"train": [], "val": [], "test": []}    

    # generate sequences a day at a time
    for i, day in enumerate(per_day):
        # if we have less than 8 datapoints for a day we skip over the
        # day assuming something is missing in the raw data
        total = day["solar.total"].values
        if len(total) < :
            continue
        if i >= next_val:
            current_set = "val"
            next_val = i + int(len(per_day) / val_size)
        elif i >= next_test:
            current_set = "test"
            next_test = i + int(len(per_day) / test_size)
        else:
            current_set = "train"
        max_total_for_day = np.array(day["solar.total.max"].values[])
        for j in range(, len(total)):
            result_x[current_set].append(total[:j])
            result_y[current_set].append([max_total_for_day])
            if j >= time_steps:
                break
    # make result_y a numpy array
    for ds in ["train", "val", "test"]:
        result_y[ds] = np.array(result_y[ds])
    return result_x, result_y
           

資料緩存

對于正常測試,我們希望使用本地緩存的資料。如果緩存不能用,我們就需要去下載下傳。

# there are 14 lstm cells, 1 for each possible reading we get per day
TIMESTEPS = 

# 20000 is the maximum total output in our dataset. We normalize all values with 
# this so our inputs are between 0.0 and 1.0 range.
NORMALIZE = 

X, Y = generate_solar_data("https://www.cntk.ai/jup/dat/solar.csv", 
                           TIMESTEPS, normalize=NORMALIZE)
           

擷取實時資料

next_batch産生下一次用于訓練的資料集。我們在CNTK中使用的變長序列和取樣包都是一系列的numpy數組,這些數組都是變長的。

标準的做法是每個訓練周期都随機抽取資料包,但是我們不這麼幹,為了以後資料可視化的時候易于了解。

# process batches of 10 days
BATCH_SIZE = TIMESTEPS * 

def next_batch(x, y, ds):
    """get the next batch for training"""

    def as_batch(data, start, count):
        return data[start:start + count]

    for i in range(, len(x[ds]), BATCH_SIZE):
        yield as_batch(X[ds], i, BATCH_SIZE), as_batch(Y[ds], i, BATCH_SIZE)
           

了解資料格式

現在你可以看到我們輸進LSTM神經網絡的資料了。

X['train'][:]
           

輸出:

[array([        ,  ], dtype=float32),
 array([        ,  ,  ], dtype=float32),
 array([        ,  ,  ,   ], dtype=float32)]
           
Y['train'][:]
           

輸出

array([[ 0.23899999],
       [ 0.23899999],
       [ 0.23899999]], dtype=float32)
           

LSTM神經網絡初始化

與最多14個輸入資料相對應,我們的模型有14個LSTM單元,每個單元代表當天的每個輸入資料點。由于輸入的資料在8到14之間變動,我們就可以利用CNTK可變長序列的優勢,而不用專門去填充空白的輸入單元。

神經網絡的輸出值是某天的輸出電量值,給定的每個序列具有一樣的總輸出電量。打個比方:

, -> 
,, -> 
,,, ... -> 
,,,, -> 
           

LSTM的輸出值作為全連接配接網絡層的輸入值,當然其中會被随機丢掉百分之二十,來避免過度拟合。全連接配接網絡層的輸出值就是我們這個模型的預測值了。

我們的LSTM模型設計如下:

CNTK API文檔翻譯(11)——使用LSTM預測時間序列資料(物聯網資料)

下面的代碼就是對上圖的實作:

def create_model(x):
    """Create the model for time series prediction"""
    with C.layers.default_options(initial_state = ):
        m = C.layers.Recurrence(C.layers.LSTM(TIMESTEPS))(x)
        m = C.sequence.last(m)
        m = C.layers.Dropout()(m)
        m = C.layers.Dense()(m)
        return m
           

訓練

在我們訓練之前我們需要綁定輸入資料,標明訓練器。在本示例中我們使用adam訓練器,squared_error作為成本函數。

# input sequences
x = C.sequence.input_variable()

# create the model
z = create_model(x)

# expected output (label), also the dynamic axes of the model output
# is specified as the model of the label input
l = C.input_variable(, dynamic_axes=z.dynamic_axes, name="y")

# the learning rate
learning_rate = 
lr_schedule = C.learning_rate_schedule(learning_rate, C.UnitType.minibatch)

# loss function
loss = C.squared_error(z, l)

# use squared error to determine error for now
error = C.squared_error(z, l)

# use adam optimizer
momentum_time_constant = C.momentum_as_time_constant_schedule(BATCH_SIZE / -math.log()) 
learner = C.fsadagrad(z.parameters, 
                      lr = lr_schedule, 
                      momentum = momentum_time_constant)
trainer = C.Trainer(z, (loss, error), [learner])


# training
loss_summary = []

start = time.time()
for epoch in range(, EPOCHS):
    for x_batch, l_batch in next_batch(X, Y, "train"):
        trainer.train_minibatch({x: x_batch, l: l_batch})

    if epoch % (EPOCHS / ) == :
        training_loss = trainer.previous_minibatch_loss_average
        loss_summary.append(training_loss)
        print("epoch: {}, loss: {:.4f}".format(epoch, training_loss))

print("Training took {:.1f} sec".format(time.time() - start))
           

結果:

epoch: , loss: 
epoch: , loss: 
epoch: , loss: 
epoch: , loss: 
epoch: , loss: 
epoch: , loss: 
epoch: , loss: 
epoch: , loss: 
epoch: , loss: 
epoch: , loss: 
Training took  sec
           

然後我們測試和驗證。我們使用均方誤差成本函數可能稍微有點簡單,有個辦法,我們可以計算在誤差允許範圍内的比率。

# validate
def get_mse(X,Y,labeltxt):
    result = 
    for x1, y1 in next_batch(X, Y, labeltxt):
        eval_error = trainer.test_minibatch({x : x1, l : y1})
        result += eval_error
    return result/len(X[labeltxt])

# Print the train and validation errors
for labeltxt in ["train", "val"]:
    print("mse for {}: {:.6f}".format(labeltxt, get_mse(X, Y, labeltxt)))

# Print the test error
labeltxt = "test"
print("mse for {}: {:.6f}".format(labeltxt, get_mse(X, Y, labeltxt)))
           

可視化預測結果

我們的模型訓練狀況量好,因為訓練、測試和驗證的誤內插補點都在一個可控的範圍内。預測的時序資料能夠很好的可視化出來,讓我們看看實際和預測的對比。

# predict
f, a = plt.subplots(, , figsize=(, ))
for j, ds in enumerate(["val", "test"]):
    results = []
    for x_batch, _ in next_batch(X, Y, ds):
        pred = z.eval({x: x_batch})
        results.extend(pred[:, ])
    # because we normalized the input data we need to multiply the prediction
    # with SCALER to get the real values.
    a[j].plot((Y[ds] * NORMALIZE).flatten(), label=ds + ' raw');
    a[j].plot(np.array(results) * NORMALIZE, label=ds + ' pred');
    a[j].legend();
           
CNTK API文檔翻譯(11)——使用LSTM預測時間序列資料(物聯網資料)

如果訓練兩千個周期,資料會非常好看。

歡迎掃碼關注我的微信公衆号擷取最新文章

CNTK API文檔翻譯(11)——使用LSTM預測時間序列資料(物聯網資料)

繼續閱讀