天天看點

從零編寫一個簡單神經網絡架構Build a neural network from Scratch

Build a neural network from Scratch

文章目錄

  • Build a neural network from Scratch
    • 1. Node
    • 2. Variable
    • 3. Linear
    • 4. Relu
    • 5. Sigmoid
    • 6. MSE
    • 7. Session
    • 8. main

1. Node

class Node:
    '''
        name 該節點辨別
        inputs 指向該節點的節點
        outputs 出節點
        is_trainable 是否在梯度下降中更新,隻有參數is_trainable=True,中間結果不必更新,輸入輸出資料不能更新
        value 前向傳播求值
        gradients 記錄loss對inputs的偏導
    '''
    def __init__(self,name=None,inputs=None,is_trainable=None):
        self.inputs=inputs
        self.outputs=[]
        self.name=name
        self.is_trainable=is_trainable
        '''
                程式設計時為每個節點指定輸入節點,每個node自動為輸入節點添加出節點,進而每個節點擷取輸入和輸出節點
        '''
        if self.inputs:
            for _ in self.inputs:
                _.outputs.append(self)
        
    self.value=None
    
    self.gradients={}
    '''
                在目前自身完成前向計算
                例如linear self.value=weight*x+bias
    '''
    def forward(self):
        raise NotImplementedError
    '''
        loss對每個參數的偏導由該參數指向的output節點計算出
        x-k->output
        partial loss/partial k = partial loss/partial output * partial output/partial k
                其中partial loss/partial output在反向傳播到該節點之前已經計算好,在反向傳播到output結點時
        output節點需要将partial output/partial k計算好并存儲到output節點上
    '''
    def backward(self):
        raise NotImplementedError

    def __repr__(self):
        return self.name
    '''
        toString()
    '''
    def __str__(self):
        return self.name
           

2. Variable

from selfMadeNeutralNetwork.Node import Node
'''
    Variable表示k,b,x等變量
        在圖中為葉節點
'''
class Variable(Node):
    
    def __init__(self,name=None,is_trainable=True):
        super().__init__(name=name,is_trainable=is_trainable)
    
    def forward(self,value=None):
        if value:
            self.value=value
    
    def backward(self):
        for output in self.outputs:
            self.gradients[self]=output.gradients[self] * 1
           

3. Linear

from selfMadeNeutralNetwork.Node import Node
import numpy as np

class Linear(Node):
    
    def __init__(self,x=None,weight=None,bias=None,name=None,is_trainable=False):
        super().__init__(inputs=[x,weight,bias],name=name,is_trainable=is_trainable)
    
    def forward(self):
        k,x,b = self.inputs[1].value,self.inputs[0].value,self.inputs[2].value
        self.value=np.dot(k,x)+b
    
    def backward(self):
        k,x,b = self.inputs[1],self.inputs[0],self.inputs[2]
        
        for output in self.outputs:
            loss_for_linear_gradient = output.gradients[self]
            
            self.gradients[k] = np.dot(loss_for_linear_gradient,x.value)
            self.gradients[x] = np.dot(loss_for_linear_gradient,k.value)
            self.gradients[b] = loss_for_linear_gradient*1
           

4. Relu

from selfMadeNeutralNetwork.Node import Node
import numpy as np

class Relu(Node):
    
    def __init__(self,x=None,name=None,is_trainable=False):
        super().__init__(inputs=[x],name=name,is_trainable=is_trainable)
        self.x=x
    
    def forward(self):
        self.value=self.x.value
    
    def backward(self):
        for output in self.outputs:
            grad_cost = output.gradients[self]
            self.gradients[self.x]=np.dot(grad_cost,(self.x.value>0))
           

5. Sigmoid

from selfMadeNeutralNetwork.Node import Node
import numpy as np

class Sigmoid(Node):
    def __init__(self,x=None,name=None,is_trainable=False):
        super().__init__(inputs=[x],name=name,is_trainable=is_trainable)
        self.x=self.inputs[0]
        
    def _sigmoid(self,x):
        return 1/(1+np.exp(-1*x))
    
    def forward(self):
        self.value=self._sigmoid(self.x.value)
        
    def partial(self):
        return np.dot(self._sigmoid(self.x.value), 1-self._sigmoid(self.x.value))
    
    def backward(self):
        for output in self.outputs:
            loss_for_sigmoid_gradient = output.gradients[self]
            self.gradients[self.x] = np.dot(loss_for_sigmoid_gradient,self.partial())
           

6. MSE

from selfMadeNeutralNetwork.Node import Node
import numpy as np

class MSE(Node):
    
    def __init__(self,y=None,yhat=None,name=None,is_trainable=False):
        super().__init__(inputs=[y,yhat],name=name,is_trainable=is_trainable)
        self.y=y
        self.yhat=yhat
    
    def forward(self):
        y_v = np.array(self.y.value)
        yhat_v = np.array(self.yhat.value)
        
        self.value=np.mean((y_v-yhat_v)**2)
    
    def backward(self):
        y_v = np.array(self.y.value)
        yhat_v = np.array(self.yhat.value)
        
        self.gradients[self.y]=2*np.mean(y_v-yhat_v)
        self.gradients[self.yhat]=-2*np.mean(y_v-yhat_v)
           

7. Session

from _collections import defaultdict

import matplotlib.pyplot as plt
import networkx as nx
from selfMadeNeutralNetwork.Variable import Variable

'''
graph 建圖
valid_order 拓撲排序節點順序
X_node 填入資料節點
y_node 結果節點
yhat 預測節點
cost cost節點
cost_history cost曆史記錄
'''
class Session:
    
    def __init__(self,session_result=None):
        self.graph=None
        self.valid_order=None
        self.X_node=session_result[0]
        self.y_node=session_result[1]
        self.yhat=session_result[2]
        self.cost_history=[]
    
    def feed(self,feed_dict):
        self.graph = self.convert_feed_dict_to_graph(feed_dict)
        nx_graph = nx.DiGraph(self.graph)
        self.valid_order = list(nx.topological_sort(nx_graph))
    '''
            梯度下降優化
    '''
    def optimize(self, learning_rate=1e-3):
        for node in self.graph:
            if node.is_trainable:
                node.value = node.value + (-1)*node.gradients[node]*learning_rate
    
    def forward(self):
        for node in self.valid_order:
            node.forward()
    
    def backward(self):
        for node in self.valid_order[::-1]:
            node.backward()
    
    '''
            前向傳播和反向傳播一輪
    '''
    def run_one_epoch(self):
        self.forward()
        self.backward()
    '''
    feed_dict={
            X_node:X_rm,
            y_node:y,
            w1:w1_,
            w2:w2_,
            b1:b1_,
            b2:b2_
        }
    feed_dict中隻有初始化的參數和要輸入的資料,
    注意在定義output的時候已經把每個節點的output設定好,定義變量就是一個建圖的過程
    
    最終傳回一個字典
    {
    node1:[outputs]
    node2:[outputs]
    node3:[outputs]
    }
    '''
    def convert_feed_dict_to_graph(self,feed_dict):
        computing_graph=defaultdict(list)
        # 隻獲得每個節點的記憶體
        nodes = [n for n in feed_dict]
        
        while nodes:
            n = nodes.pop(0)
            if isinstance(n, Variable):
                n.value=feed_dict[n]
            
            if n in computing_graph:
                continue
            
            for m in n.outputs:
                computing_graph[n].append(m)
                nodes.append(m)
    
        return computing_graph

    def predict(self,predict_x):
        self.X_node.value=predict_x
        self.run_one_epoch()
        return self.yhat.value
           

8. main

from sklearn.datasets import load_boston
from tqdm import tqdm

import numpy as np
from matplotlib import pyplot as plt
from selfMadeNeutralNetwork.Session import Session
from selfMadeNeutralNetwork.Linear import Linear
from selfMadeNeutralNetwork.MSE import MSE
from selfMadeNeutralNetwork.Sigmoid import Sigmoid
from selfMadeNeutralNetwork.Variable import Variable

def show_loss_history(loss_history):
    plt.plot(loss_history)
    plt.show()

def main():
    data=load_boston()
    X,y=data["data"],data["target"]
    # 卧室數量
    X_rm=X[:,5]
    # 随機參數
    w1_,b1_=np.random.normal(),np.random.normal()
    w2_,b2_=np.random.normal(),np.random.normal()
    
    X_node,y_node = Variable(name="X"),Variable(name="y")
    w1,b1 = Variable(name="w1"),Variable(name="b1")
    w2,b2 = Variable(name="w2"),Variable(name="b2")
    
    output1 = Linear(x=X_node,weight=w1,bias=b1,name="Linear-01")
    output2 = Sigmoid(x=output1,name="activation")
    yhat = Linear(x=output2,weight=w2,bias=b2,name="yhat")
    cost = MSE(y=y_node,yhat=yhat,name="cost")
    
    feed_dict={
        X_node:X_rm,
        y_node:y,
        w1:w1_,
        w2:w2_,
        b1:b1_,
        b2:b2_
    }
    session_result=[X_node,y_node,yhat]
    session = Session(session_result)
    session.feed(feed_dict)
    
    # 總體的輪數
    epoch = 1000
    # 每個batch的數目
    batch_num=len(X_rm)
    # 學習率
    learning_rate=1e-3
    loss_history = []
    
    for _ in tqdm(range(epoch)):
        loss=0
        for __ in range(batch_num):
            # 每次從X_rm中随機抽取一個元素
            index = np.random.choice(range(len(X_rm)))
            X_node.value=X_rm[index]
            y_node.value=y[index]
            
            session.run_one_epoch()
            session.optimize(learning_rate)
            loss+=cost.value
        loss_history.append(loss/batch_num)
        if _%100==0:
            print("Epoch:{},Loss:{:.3f}".format(_+1, loss/batch_num))
    
    show_loss_history(loss_history)
    print(session.predict(6))

if __name__=="__main__":
    main()