本文改編自Mat Buckland的遊戲開發中的人工智能技術中的Chapter 7 掃雷機的實作,C++代碼重新用python來實作(本文所有遺傳算法/神經網絡相關代碼均改編值Mat的C++代碼,如有雷同,純屬巧合)。
在本章節中Mat Buckland實作了一個利用神經網絡和遺傳算法進化的掃雷機,掃雷機從初始的幾乎隻會原地打轉,最終可以進化到自行尋找地圖上的地雷。廢話不多說,先看示意圖,進化開始的時候,掃雷機的行為如下圖,這些蠢蠢得掃雷機都在不斷的原地打轉:
進化一定時間後,掃雷機就智能多了,開始在地圖上有目的的搜尋地雷了:
是不是看起來比較神奇,下面我們一步一步的來剖析Mat是如何實作這個進化過程的。
神經網絡模型
這個概念現在很火啊,基本的數學原理在這裡不再贅述(我也不懂 >_<),有興趣的可以自行去搜尋。個人對神經網絡的了解就是一個決策系統,在訓練成熟的神經網絡的模型下,神經網絡能夠根據給定的輸入決策出所期望的輸出。基本的神經網絡的模型如下圖,這個神經網絡有一個輸入層(Layer L1),一個隐藏層(Layer L2),還有一個輸出層(Layer L3):
在經過一定階段的訓練之後,我們可以得到這個神經網絡裡每個節點的系數,這個系數就決定了訓練好的神經網絡在給定的輸入得情況下會得到什麼樣的輸出。
掃雷機的神經網絡
掃雷機在工作的時候的狀态如下圖:
控制掃雷機工作的輸入條件有下面兩個:
1. 與掃雷機最靠近的地雷的位置(x, y)
2. 代表掃雷機前進的方向的向量(x, y)
在這個模型裡,我們對掃雷機的左右履帶各設定一個動力值,左右履帶的動力值之差我們定義為掃雷機轉向的力,左右履帶的動力值之和我們定義為掃雷機前進的速度。
綜上,我們設計的神經網絡需要4個輸入,分别對應兩個控制條件中的4個值,需要兩個輸出,對應掃雷機左右履帶的動力值。訓練的目标就是給出一個最合理的模型來判決掃雷機前進的方向和速度。
來看看神經網絡的編碼:
神經元
根據神經網絡的模型不難寫出神經元的python代碼,一個神經元有一定數目的輸入,每個輸入都有一個權重,這裡需要注意的是,我們所有的參數最後都會通過Sigmoid來進行歸一化(請自行百度Sigmoid函數),是以我們的權重選擇的也是在[-1, 1]之間的随機數:
class SNeuron(object):
def __init__(self, NumInputs):
self._NumInputs = NumInputs
self._Weights = []
for i in xrange(self._NumInputs):
self._Weights.append(random.random() - random.random()) #生成一個-1, 1之間的随機Weight
return
神經網絡層
一定數目的神經元組合在一起就成為一個網絡層:
class SNeuronLayer(object):
def __init__(self, NumNeurons, NumInputsPerNeuron):
self._NumNeurons = NumNeurons
self._Neurons = []
for i in xrange(self._NumNeurons):
self._Neurons.append(SNeuron(NumInputsPerNeuron))
return
網絡
再将一定的神經網絡層組合起來,就是最終的神經網絡的模型了。直接看代碼,神經網絡接受四個參數分别是:
NumInputs - 神經網絡的輸入參數的個數
NumOutputs - 神經網絡的輸出參數的個數
NumHidden - 神經網絡隐藏層的層數
NeuronsPerHiddenLayer - 神經網絡每個隐藏層包含多少個神經元
class CNeuralNet(object):
def __init__(self, NumInputs=iNumInputs, NumOutputs=iNumOutputs, NumHidden=iNumHidden, NeuronsPerHiddenLayer=iNeuronsPerHiddenLayer):
self._NumInputs = NumInputs
self._NumOutputs = NumOutputs
self._NumHiddenLayers = NumHidden
self._NeuronsPerHiddenLyr = NeuronsPerHiddenLayer
self._Layers = []
self.CreateNet()
return
def CreateNet(self):
if self._NumHiddenLayers > 0:
# 第一個hidden layer
self._Layers.append(SNeuronLayer(self._NeuronsPerHiddenLyr, self._NumInputs))
# 其餘的hidden layer
for i in xrange(self._NumHiddenLayers - 1):
self._Layers.append(SNeuronLayer(self._NeuronsPerHiddenLyr, self._NeuronsPerHiddenLyr))
# 輸出層
self._Layers.append(SNeuronLayer(self._NumOutputs, self._NeuronsPerHiddenLyr))
else:
self._Layers.append(SNeuronLayer(self._NumOutputs, self._NumInputs))
通過上面的代碼我們就成功建立了一個所有權重在[-1,1]之間的神經網絡。而我們的目标是通過遺傳算法來不停的疊代進化神經網絡,這樣我們就需要接口來擷取和更新神經網絡的權重:
def GetWeights(self):
weights = []
for i in xrange(self._NumHiddenLayers + 1):
for j in xrange(self._Layers[i]._NumNeurons):
for k in xrange(self._Layers[i]._Neurons[j]._NumInputs):
weights.append(self._Layers[i]._Neurons[j]._Weights[k])
return weights
def PutWeights(self, weights):
weightIdx = 0
for i in xrange(self._NumHiddenLayers + 1):
for j in xrange(self._Layers[i]._NumNeurons):
for k in xrange(self._Layers[i]._Neurons[j]._NumInputs):
self._Layers[i]._Neurons[j]._Weights[k] = weights[weightIdx]
weightIdx += 1
當給出一組inputs時,神經網絡需要能夠給出對應的outputs,其實就是根據神經網絡每層的權重依次計算結果,本層的outputs就是下一層網絡層的inputs,直到計算到最後的輸出層:
def Update(self, inputs):
outputs = []
weightIdx = 0
if len(inputs) != self._NumInputs:
return outputs
for i in xrange(self._NumHiddenLayers + 1):
if i > 0:
inputs = copy.deepcopy(outputs)
outputs = []
weightIdx = 0
for j in xrange(self._Layers[i]._NumNeurons):
netinput = 0.0
NumInputs = self._Layers[i]._Neurons[j]._NumInputs
for k in xrange(NumInputs-1):
netinput += (self._Layers[i]._Neurons[j]._Weights[k] * inputs[weightIdx])
weightIdx += 1
netinput += (self._Layers[i]._Neurons[j]._Weights[NumInputs-1] * dBias) #dBias = -1
outputs.append(self.Sigmoid(netinput, dActivationResponse)) # dActivationResponse = 1
weightIdx = 0
return outputs
遺傳算法
遺傳算法就不再贅述,僅介紹幾個算子的實作:
變異
滿足變異條件的染色體,我們對每個gene在原有的基礎上增加一個随機的擾動:
def Mutate(self, chromo):
newChromo = []
for idx, gene in enumerate(chromo):
if random.random() < self._dMutationRate:
gene += (random.random() - random.random()) * dMaxPerturbation # dMaxPerturbation = 0.3
newChromo.append(gene)
return newChromo
選擇
賭輪盤,永遠的賭輪盤:
def GetChromoRoulette(self):
Slice = random.random() * self._dTotalFitness
TheChosenOne = None
FitnessSoFar = 0
for i in xrange(self._iPopSize):
FitnessSoFar += self._vecPop[i]._dFitness
if FitnessSoFar >= Slice:
TheChosenOne = copy.deepcopy(self._vecPop[i])
break
return TheChosenOne
交叉
這裡有兩種交叉方式:
1. 随機在所有的權重裡進行交叉;
2. 将每層神經網絡的的權重看成一個整體,将這個整體統一進行交叉;
def Crossover(self, mum, dad):
baby1 = []
baby2 = []
if random.random() > self._dCrossoverRate or mum == dad:
baby1 = copy.deepcopy(mum)
baby2 = copy.deepcopy(dad)
return baby1, baby2
cp = random.randint(0, self._iChromoLength-1)
baby1 = mum[:cp] + dad[cp:]
baby2 = dad[:cp] + mum[cp:]
return baby1, baby2
def CrossoverAtSplits(self, mum, dad):
baby1 = []
baby2 = []
if random.random() > self._dCrossoverRate or mum == dad:
baby1 = copy.deepcopy(mum)
baby2 = copy.deepcopy(dad)
return baby1, baby2
cpIdx1 = random.randint(0, len(self._vecSplitPoints)-2)
cp1 = self._vecSplitPoints[cpIdx1]
cp2 = self._vecSplitPoints[random.randint(cpIdx1, len(self._vecSplitPoints)-1)]
baby1 = mum[:cp1] + dad[cp1:cp2] + mum[cp2:]
baby2 = dad[:cp1] + mum[cp1:cp2] + dad[cp2:]
return baby1, baby2
第二種交叉方式中的_vecSplitPoints是在神經網絡的類中擷取的:
def CalculateSplitPoints(self):
SplitPoints = []
WeightCounter = 0
for i in xrange(self._NumHiddenLayers + 1):
for j in xrange(self._Layers[i]._NumNeurons):
for k in xrange(self._Layers[i]._Neurons[j]._NumInputs):
WeightCounter += 1
SplitPoints.append(WeightCounter - 1)
print SplitPoints
return SplitPoints
進化
進化也很簡單,保留最優的幾個個體,其餘的個體通過賭輪盤選擇後進行交叉變異。
def Epoch(self, old_pop):
self._vecPop = copy.deepcopy(old_pop)
self.Reset()
self._vecPop.sort() # 我們為SGenome重載了__lt__
self.CalculateBestWorstAvTot()
vecNewPop = []
if (iNumCopiesElite * iNumElite) % 2 == 0:
vecNewPop = vecNewPop + self.GrabNBest(iNumElite, iNumCopiesElite)
while len(vecNewPop) < self._iPopSize:
mum = self.GetChromoRoulette()
dad = self.GetChromoRoulette()
baby1, baby2 = self.CrossoverAtSplits(mum._vecWeights, dad._vecWeights)
baby1 = self.Mutate(baby1)
baby2 = self.Mutate(baby2)
vecNewPop.append(SGenome(baby1, 0.0))
vecNewPop.append(SGenome(baby2, 0.0))
self._vecPop = copy.deepcopy(vecNewPop)
return self._vecPop
掃雷機
再來看看掃雷機的實作:
_dRotation - 掃雷機初始的行進方向
_lTrack - 掃雷機初始的左履帶的動力
_rTrack - 掃雷機初始的右履帶的動力
_dFitness - 掃雷機進化的适應度
_dScale - 掃雷機在windows裡顯示的大小比例
_iClosestMine - 距離掃雷機最近的地雷的索引
_vLookAt - 掃雷機行進過程中的方向
_ItsBrain - 掃雷機的大腦,也就是神經網絡
_dSpeed - 掃雷機的速度
_vPosition - 掃雷機在windows裡的位置
class CMinesweeper(object):
def __init__(self):
self._dRotation = random.random() * dTwoPi
self._lTrack = 0.16
self._rTrack = 0.16
self._dFitness = dStartEnergy
self._dScale = iSweeperScale
self._iClosestMine = 0
self._vLookAt = SVector2D()
self._ItsBrain = CNeuralNet()
self._dSpeed = 0.0
self._vPosition = SVector2D(random.random()*WindowWidth, random.random()*WindowHeight)
return
GetClosestMine用于擷取目前離掃雷機最近的地雷;
CheckForMine用于檢查掃雷機于地雷間的距離是否小于給定值,如果小于給定值,認為該掃雷機成功的掃除了該地雷。
def GetClosestMine(self, mines):
closest_so_far = 99999.9
vClosestObj = SVector2D(0.0, 0.0)
for idx, mine in enumerate(mines):
len_to_obj = Vec2DLength(mine - self._vPosition)
if len_to_obj < closest_so_far:
closest_so_far = len_to_obj
vClosestObj = self._vPosition - mine # 計算目前Position到最近的mine的矢量距離
self._iClosestMine = idx
return vClosestObj
def CheckForMine(self, mines, size):
DistToObj = self._vPosition - mines[self._iClosestMine]
if Vec2DLength(DistToObj) < (size + 5):
return self._iClosestMine
return -1
程式運作過程中,掃雷機每時每刻都在進行更新:
1. 查找最近的地雷,将最近的地雷的位置和目前掃雷機前進的方向作為神經網絡的輸入,得到目前掃雷機應該對左履帶和右履帶施加的動力
2. 根據掃雷機左右履帶的動力,計算出掃雷機前進的方向,速度,以及更新後的位置
def Update(self, mines):
inputs = []
vClosestMine = self.GetClosestMine(mines)
vClosestMine = Vec2DNormalize(vClosestMine)
inputs.append(vClosestMine._x)
inputs.append(vClosestMine._y)
inputs.append(self._vLookAt._x)
inputs.append(self._vLookAt._x)
output = self._ItsBrain.Update(inputs)
if len(output) < iNumOutputs:
print "Output size not correct. Length of output %d, iNumOutputs %d" % (len(output), iNumOutputs)
return False
self._lTrack = output[0]
self._rTrack = output[1]
RotForce = self._lTrack - self._rTrack
# Clamp rotation
RotForce = -dMaxTurnRate if RotForce < -dMaxTurnRate else dMaxTurnRate if RotForce > dMaxTurnRate else RotForce
self._dRotation += RotForce
self._dSpeed = self._lTrack + self._rTrack
self._vLookAt._x = -math.sin(self._dRotation)
self._vLookAt._y = math.cos(self._dRotation)
self._vPosition = self._vPosition + (self._vLookAt * self._dSpeed) # 注意這裡都是重載了Vector的運算
self._vPosition._x = 0 if self._vPosition._x > WindowWidth else WindowWidth if self._vPosition._x < 0 else self._vPosition._x
self._vPosition._y = 0 if self._vPosition._y > WindowHeight else WindowHeight if self._vPosition._y < 0 else self._vPosition._y
return True
總控
最後,将上面的神經網絡、遺傳算法和掃雷機類都整合到一起,通過pygame展示出來,就是我們文初所展現的效果了。
掃雷機和地雷都被歸一化成對應的頂點,然後用pygame将這些頂點按照順序依次連線,就可以畫出效果:
sweeper = [SPoint(-0.5, -0.5), SPoint(-0.5, -1), SPoint(-1, -1), SPoint(-1, 1), SPoint(-0.5, 1), SPoint(-0.5, 0.5), SPoint(-0.25, 0.5), SPoint(-0.25, 1.75),SPoint(0.25, 1.75), SPoint(0.25, 0.5), SPoint(0.5, 0.5), SPoint(0.5, 1), SPoint(1,1), SPoint(1, -1), SPoint(0.5, -1), SPoint(0.5, -0.5)]
mine = [SPoint(-1, -1), SPoint(-1, 1), SPoint(1, 1), SPoint(1, -1)]
generationStr = "Generation: " + str(self._iGenerations)
screen.blit(font.render(generationStr, True, (0, 0, 255)), (20, 20)) # 畫mines
for i in xrange(self._NumMines):
mineVB = copy.deepcopy(mine)
mineVB = self.WorldTransform(mineVB, self._vecMines[i])
points4Paint = []
for item in mineVB:
points4Paint.append((item._x, item._y))
pygame.draw.polygon(screen, green, points4Paint, 1) # 畫sweepers,最優的前iNumElite用紅色畫出來
for i in xrange(self._NumSweepers):
sweeperVB = copy.deepcopy(sweeper)
sweeperVB = self._vecSweepers[i].WorldTransform(sweeperVB)
points4Paint = []
for item in sweeperVB:
points4Paint.append((item._x, item._y))
if i < iNumElite:
pygame.draw.polygon(screen, red, points4Paint, 1)
else:
pygame.draw.polygon(screen, blue, points4Paint, 1)
Pygame在每幀更新的時候,都會調用update函數來計算掃雷機和地雷的狀态,當iTick達到iNumTicks(2000)的時候,進行遺傳算法的新一輪疊代:
def Update(self):
self._iTicks += 1
if self._iTicks < iNumTicks:
for i in xrange(self._NumSweepers):
if self._vecSweepers[i].Update(self._vecMines) == False:
return False
# 檢視是否找到了一個mine
GrabHit = self._vecSweepers[i].CheckForMine(self._vecMines, dMineScale)
if GrabHit > 0:
self._vecSweepers[i].IncrementFitness()
# 該mine被找到了,随機生成另外一個mine
self._vecMines[GrabHit] = SVector2D(random.random() * self._cxClient, random.random() * self._cyClient)
self._vecThePopulation[i]._dFitness = self._vecSweepers[i].Fitness()
else:
self._vecAvFitness.append(self._pGA.AverageFitness())
self._vecBestFitness.append(self._pGA.BestFitness())
self._iGenerations += 1
self._iTicks = 0
self._vecThePopulation = self._pGA.Epoch(self._vecThePopulation)
for i in xrange(self._NumSweepers):
self._vecSweepers[i].PutWeights(self._vecThePopulation[i]._vecWeights)
self._vecSweepers[i].Reset()
return True
後話
這隻是遺傳算法和神經網絡的一個簡單的應用,也很容易入門,如果有興趣,還是強烈建議看看Mat Buckland的書,講得真的很好。神經網絡看起來很繞,自己動手敲一遍代碼能讓人更加深刻的了解它是怎麼實作的。:)
btw: Python功底還是有待加強,python寫出來的APP運作起來還是比C++慢了好多,如果不用numpy,那就更慢啦~~~~~~~