Build a neural network from Scratch
文章目錄
- Build a neural network from Scratch
-
- 1. Node
- 2. Variable
- 3. Linear
- 4. Relu
- 5. Sigmoid
- 6. MSE
- 7. Session
- 8. main
1. Node
class Node:
'''
name 該節點辨別
inputs 指向該節點的節點
outputs 出節點
is_trainable 是否在梯度下降中更新,隻有參數is_trainable=True,中間結果不必更新,輸入輸出資料不能更新
value 前向傳播求值
gradients 記錄loss對inputs的偏導
'''
def __init__(self,name=None,inputs=None,is_trainable=None):
self.inputs=inputs
self.outputs=[]
self.name=name
self.is_trainable=is_trainable
'''
程式設計時為每個節點指定輸入節點,每個node自動為輸入節點添加出節點,進而每個節點擷取輸入和輸出節點
'''
if self.inputs:
for _ in self.inputs:
_.outputs.append(self)
self.value=None
self.gradients={}
'''
在目前自身完成前向計算
例如linear self.value=weight*x+bias
'''
def forward(self):
raise NotImplementedError
'''
loss對每個參數的偏導由該參數指向的output節點計算出
x-k->output
partial loss/partial k = partial loss/partial output * partial output/partial k
其中partial loss/partial output在反向傳播到該節點之前已經計算好,在反向傳播到output結點時
output節點需要将partial output/partial k計算好并存儲到output節點上
'''
def backward(self):
raise NotImplementedError
def __repr__(self):
return self.name
'''
toString()
'''
def __str__(self):
return self.name
2. Variable
from selfMadeNeutralNetwork.Node import Node
'''
Variable表示k,b,x等變量
在圖中為葉節點
'''
class Variable(Node):
def __init__(self,name=None,is_trainable=True):
super().__init__(name=name,is_trainable=is_trainable)
def forward(self,value=None):
if value:
self.value=value
def backward(self):
for output in self.outputs:
self.gradients[self]=output.gradients[self] * 1
3. Linear
from selfMadeNeutralNetwork.Node import Node
import numpy as np
class Linear(Node):
def __init__(self,x=None,weight=None,bias=None,name=None,is_trainable=False):
super().__init__(inputs=[x,weight,bias],name=name,is_trainable=is_trainable)
def forward(self):
k,x,b = self.inputs[1].value,self.inputs[0].value,self.inputs[2].value
self.value=np.dot(k,x)+b
def backward(self):
k,x,b = self.inputs[1],self.inputs[0],self.inputs[2]
for output in self.outputs:
loss_for_linear_gradient = output.gradients[self]
self.gradients[k] = np.dot(loss_for_linear_gradient,x.value)
self.gradients[x] = np.dot(loss_for_linear_gradient,k.value)
self.gradients[b] = loss_for_linear_gradient*1
4. Relu
from selfMadeNeutralNetwork.Node import Node
import numpy as np
class Relu(Node):
def __init__(self,x=None,name=None,is_trainable=False):
super().__init__(inputs=[x],name=name,is_trainable=is_trainable)
self.x=x
def forward(self):
self.value=self.x.value
def backward(self):
for output in self.outputs:
grad_cost = output.gradients[self]
self.gradients[self.x]=np.dot(grad_cost,(self.x.value>0))
5. Sigmoid
from selfMadeNeutralNetwork.Node import Node
import numpy as np
class Sigmoid(Node):
def __init__(self,x=None,name=None,is_trainable=False):
super().__init__(inputs=[x],name=name,is_trainable=is_trainable)
self.x=self.inputs[0]
def _sigmoid(self,x):
return 1/(1+np.exp(-1*x))
def forward(self):
self.value=self._sigmoid(self.x.value)
def partial(self):
return np.dot(self._sigmoid(self.x.value), 1-self._sigmoid(self.x.value))
def backward(self):
for output in self.outputs:
loss_for_sigmoid_gradient = output.gradients[self]
self.gradients[self.x] = np.dot(loss_for_sigmoid_gradient,self.partial())
6. MSE
from selfMadeNeutralNetwork.Node import Node
import numpy as np
class MSE(Node):
def __init__(self,y=None,yhat=None,name=None,is_trainable=False):
super().__init__(inputs=[y,yhat],name=name,is_trainable=is_trainable)
self.y=y
self.yhat=yhat
def forward(self):
y_v = np.array(self.y.value)
yhat_v = np.array(self.yhat.value)
self.value=np.mean((y_v-yhat_v)**2)
def backward(self):
y_v = np.array(self.y.value)
yhat_v = np.array(self.yhat.value)
self.gradients[self.y]=2*np.mean(y_v-yhat_v)
self.gradients[self.yhat]=-2*np.mean(y_v-yhat_v)
7. Session
from _collections import defaultdict
import matplotlib.pyplot as plt
import networkx as nx
from selfMadeNeutralNetwork.Variable import Variable
'''
graph 建圖
valid_order 拓撲排序節點順序
X_node 填入資料節點
y_node 結果節點
yhat 預測節點
cost cost節點
cost_history cost曆史記錄
'''
class Session:
def __init__(self,session_result=None):
self.graph=None
self.valid_order=None
self.X_node=session_result[0]
self.y_node=session_result[1]
self.yhat=session_result[2]
self.cost_history=[]
def feed(self,feed_dict):
self.graph = self.convert_feed_dict_to_graph(feed_dict)
nx_graph = nx.DiGraph(self.graph)
self.valid_order = list(nx.topological_sort(nx_graph))
'''
梯度下降優化
'''
def optimize(self, learning_rate=1e-3):
for node in self.graph:
if node.is_trainable:
node.value = node.value + (-1)*node.gradients[node]*learning_rate
def forward(self):
for node in self.valid_order:
node.forward()
def backward(self):
for node in self.valid_order[::-1]:
node.backward()
'''
前向傳播和反向傳播一輪
'''
def run_one_epoch(self):
self.forward()
self.backward()
'''
feed_dict={
X_node:X_rm,
y_node:y,
w1:w1_,
w2:w2_,
b1:b1_,
b2:b2_
}
feed_dict中隻有初始化的參數和要輸入的資料,
注意在定義output的時候已經把每個節點的output設定好,定義變量就是一個建圖的過程
最終傳回一個字典
{
node1:[outputs]
node2:[outputs]
node3:[outputs]
}
'''
def convert_feed_dict_to_graph(self,feed_dict):
computing_graph=defaultdict(list)
# 隻獲得每個節點的記憶體
nodes = [n for n in feed_dict]
while nodes:
n = nodes.pop(0)
if isinstance(n, Variable):
n.value=feed_dict[n]
if n in computing_graph:
continue
for m in n.outputs:
computing_graph[n].append(m)
nodes.append(m)
return computing_graph
def predict(self,predict_x):
self.X_node.value=predict_x
self.run_one_epoch()
return self.yhat.value
8. main
from sklearn.datasets import load_boston
from tqdm import tqdm
import numpy as np
from matplotlib import pyplot as plt
from selfMadeNeutralNetwork.Session import Session
from selfMadeNeutralNetwork.Linear import Linear
from selfMadeNeutralNetwork.MSE import MSE
from selfMadeNeutralNetwork.Sigmoid import Sigmoid
from selfMadeNeutralNetwork.Variable import Variable
def show_loss_history(loss_history):
plt.plot(loss_history)
plt.show()
def main():
data=load_boston()
X,y=data["data"],data["target"]
# 卧室數量
X_rm=X[:,5]
# 随機參數
w1_,b1_=np.random.normal(),np.random.normal()
w2_,b2_=np.random.normal(),np.random.normal()
X_node,y_node = Variable(name="X"),Variable(name="y")
w1,b1 = Variable(name="w1"),Variable(name="b1")
w2,b2 = Variable(name="w2"),Variable(name="b2")
output1 = Linear(x=X_node,weight=w1,bias=b1,name="Linear-01")
output2 = Sigmoid(x=output1,name="activation")
yhat = Linear(x=output2,weight=w2,bias=b2,name="yhat")
cost = MSE(y=y_node,yhat=yhat,name="cost")
feed_dict={
X_node:X_rm,
y_node:y,
w1:w1_,
w2:w2_,
b1:b1_,
b2:b2_
}
session_result=[X_node,y_node,yhat]
session = Session(session_result)
session.feed(feed_dict)
# 總體的輪數
epoch = 1000
# 每個batch的數目
batch_num=len(X_rm)
# 學習率
learning_rate=1e-3
loss_history = []
for _ in tqdm(range(epoch)):
loss=0
for __ in range(batch_num):
# 每次從X_rm中随機抽取一個元素
index = np.random.choice(range(len(X_rm)))
X_node.value=X_rm[index]
y_node.value=y[index]
session.run_one_epoch()
session.optimize(learning_rate)
loss+=cost.value
loss_history.append(loss/batch_num)
if _%100==0:
print("Epoch:{},Loss:{:.3f}".format(_+1, loss/batch_num))
show_loss_history(loss_history)
print(session.predict(6))
if __name__=="__main__":
main()