天天看點

基于Lasagne實作限制玻爾茲曼機(RBM)-- coding: utf-8 --

RBM理論部分大家看懂這個圖檔就差不多了:

基于Lasagne實作限制玻爾茲曼機(RBM)-- coding: utf-8 --

Lasagne寫代碼首先要确定層與層,RBM 正向反向過程可以分别當作一個層,權值矩陣互為轉置即可。

代碼:

**# -- coding: utf-8 --

“””

data format is ‘bc01’

written by Ph.D wang重點内容

“””

import theano.tensor as T

import numpy as np

from lasagne.layers import Layer

from lasagne import init

from lasagne import nonlinearities**

class RBMDenseLayer(Layer):

def init(self, incoming, num_units, W=init.GlorotUniform(),

c=init.Constant(0.), nonlinearity=nonlinearities.rectify,

num_leading_axes=1, **kwargs):

super(RBMDenseLayer, self).init(incoming, **kwargs)

self.nonlinearity = (nonlinearities.identity if nonlinearity is None

else nonlinearity)

self.num_units = num_units

if num_leading_axes >= len(self.input_shape):

raise ValueError(

“Got num_leading_axes=%d for a %d-dimensional input, ”

“leaving no trailing axes for the dot product.” %

(num_leading_axes, len(self.input_shape)))

elif num_leading_axes < -len(self.input_shape):

raise ValueError(

“Got num_leading_axes=%d for a %d-dimensional input, ”

“requesting more trailing axes than there are input ”

“dimensions.” % (num_leading_axes, len(self.input_shape)))

self.num_leading_axes = num_leading_axes

if any(s is None for s in self.input_shape[num_leading_axes:]):

raise ValueError(

“A DenseLayer requires a fixed input shape (except for ”

“the leading axes). Got %r for num_leading_axes=%d.” %

(self.input_shape, self.num_leading_axes))

num_inputs = int(np.prod(self.input_shape[num_leading_axes:]))

self.W = self.add_param(W, (num_inputs, num_units), name=”W”)

if c is None:

self.c = None

else:

self.c = self.add_param(c, (num_units,), name=”c”,

regularizable=False)

def get_output_shape_for(self, input_shape):

return input_shape[:self.num_leading_axes] + (self.num_units,)

def get_output_for(self, input, **kwargs):

num_leading_axes = self.num_leading_axes

if num_leading_axes < 0:

num_leading_axes += input.ndim

if input.ndim > num_leading_axes + 1:

# flatten trailing axes (into (n+1)-tensor for num_leading_axes=n)

input = input.flatten(num_leading_axes + 1)

activation = T.dot(input, self.W)

if self.c is not None:

activation = activation + self.c

return self.nonlinearity(activation)

-- coding: utf-8 --

“””

data format is ‘bc01’

written by Ph.D wang

“””

import lasagne

import theano

import theano.tensor as T

import lasagne

import numpy

import numpy as np

from theano.sandbox.rng_mrg import MRG_RandomStreams

from lasagne.nonlinearities import sigmoid

from lasagne.layers import get_output, get_all_param_values, get_all_params, get_output_shape

import gzip

import pickle

import os

from Layer.RBMDenseLayer import RBMDenseLayer

try:

import PIL.Image as Image

except ImportError:

import Image

from utils.utils import tile_raster_images

class Data(object):

def init(self, dataset=’/media/wang/0000EEC70001E10B/Data_Resource/MNIST/mnist.pkl.gz’):

self.datapath= dataset

self.train_set = []

self.valid_set = []

self.test_set = []

def call(self):

return self.generate()

def generate(self):

data_dir, data_file = os.path.split(self.datapath)

if data_dir == “” and not os.path.isfile(self.datapath):

# Check if dataset is in the data directory.

new_path = os.path.join(

os.path.split(file)[0],

“..”,

“data”,

self.datapath

)

if os.path.isfile(new_path) or data_file == ‘mnist.pkl.gz’:

dataset = new_path

if (not os.path.isfile(self.datapath)) and data_file == ‘mnist.pkl.gz’:

from six.moves import urllib

origin = (

‘http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz’

)

print(‘Downloading data from %s’ % origin)

urllib.request.urlretrieve(origin, self.datapath)

print(‘… loading data’)

# Load the dataset

with gzip.open(self.datapath, ‘rb’) as f:

try:

train_set, valid_set, test_set = pickle.load(f, encoding=’latin1’)

except:

train_set, valid_set, test_set = pickle.load(f)

# train_set, valid_set, test_set format: tuple(input, target)

# input is a numpy.ndarray of 2 dimensions (a matrix)

# where each row corresponds to an example. target is a

# numpy.ndarray of 1 dimension (vector) that has the same length as

# the number of rows in the input. It should give the target

# to the example with the same index in the input.

def shared_dataset(data_xy, borrow=True):

“”” Function that loads the dataset into shared variables

The reason we store our dataset in shared variables is to allow

Theano to copy it into the GPU memory (when code is run on GPU).

Since copying data into the GPU is slow, copying a minibatch everytime

is needed (the default behaviour if the data is not in a shared

variable) would lead to a large decrease in performance.

“””

data_x, data_y = data_xy

data_x = np.reshape(data_x, (len(data_x), 1, 28, 28))

shared_x = theano.shared(numpy.asarray(data_x,

dtype=theano.config.floatX),

borrow=borrow)

shared_y = theano.shared(numpy.asarray(data_y,

dtype=theano.config.floatX),

borrow=borrow)

# When storing data on the GPU it has to be stored as floats

# therefore we will store the labels as

floatX

as well

# (

shared_y

does exactly that). But during our computations

# we need them as ints (we use labels as index, and if they are

# floats it doesn’t make sense) therefore instead of returning

#

shared_y

we will have to cast it to int. This little hack

# lets ous get around this issue

return shared_x, T.cast(shared_y, ‘int32’)

self.test_set = shared_dataset(test_set)

self.valid_set = shared_dataset(valid_set)

self.train_set = shared_dataset(train_set)

class RBM(object):

def init(self, input=None, input_shape=None, K=1, obj_type=’free’, n_visible=784,

n_hidden=500, numpy_rng=None, theano_rng=None):

self.n_visible = n_visible

self.n_hidden = n_hidden

self.obj_type = obj_type

if numpy_rng is None:

# create a number generator

numpy_rng = numpy.random.RandomState(1234)

if theano_rng is None:

# theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))

self.theano_rng = MRG_RandomStreams(numpy_rng.randint(2 ** 30))

if input is None:

self.input = T.tensor4(‘input_data’)

else:

self.input = input

self.input_shape = input_shape

self.batch_size = input_shape[0]

self.K = K

self.input_layer = lasagne.layers.InputLayer(shape=self.input_shape, input_var=self.input)

self.layer_h_given_v = lasagne.layers.DenseLayer(self.input_layer, num_units=self.n_hidden, nonlinearity=sigmoid)

self.layer_v_given_h = RBMDenseLayer(self.layer_h_given_v, num_units=self.n_visible, W=self.layer_h_given_v.W.T,

nonlinearity=sigmoid)

# self.params = get_all_param_values(self.v_given_h)

self.params = get_all_params(self.layer_v_given_h)

def pcd_sampler(self):

pass

def h_given_v(self):

return [get_output(self.layer_h_given_v), self.theano_rng.binomial(size=get_output_shape(self.layer_h_given_v),

p=get_output(self.layer_h_given_v))]

def v_given_h(self):

return [get_output(self.layer_v_given_h).reshape(self.input_shape),self.theano_rng.binomial(size=get_output_shape(self.layer_v_given_h),

p=get_output(self.layer_v_given_h)).reshape(self.input_shape)]

def get_sample(self):

posh, posh_sample = self.h_given_v()

for i in xrange(self.K):

negv, negv_sample = self.v_given_h()

negh, negh_sample = self.h_given_v()

# if method == persistent:

return [posh, posh_sample, negv, negv_sample, negh, negh_sample]

def free_energy(self, sample):

wx_b = T.dot(T.reshape(sample, (sample.shape[0], sample.shape[1]*sample.shape[2]*sample.shape[3])),

self.layer_h_given_v.W) + self.layer_h_given_v.b

vbias_term = T.dot(T.reshape(sample, (sample.shape[0], sample.shape[1]*sample.shape[2]*sample.shape[3])),self.layer_v_given_h.c)

hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis=1)

return -hidden_term - vbias_term

def proxy_energy(self, visible_sample, hidden_sample):

vwh = T.dot(T.reshape(visible_sample, (visible_sample.shape[0],

visible_sample.shape[1] * visible_sample.shape[2] * visible_sample.shape[3])),

self.layer_h_given_v.W)*hidden_sample

vc = T.reshape(visible_sample, (visible_sample.shape[0],

visible_sample.shape[1] * visible_sample.shape[2] * visible_sample.shape[3]))*self.layer_v_given_h.c

hb = hidden_sample * self.layer_h_given_v.b

return -vwh.sum(axis=1)-vc.sum(axis=1)-hb.sum(axis=1)

def get_reconstruction_cost(self, pre_sigmoid_nv):

cross_entropy = T.mean(

T.sum(

self.input * T.log(T.nnet.sigmoid(pre_sigmoid_nv)) +

(1 - self.input) * T.log(1 - T.nnet.sigmoid(pre_sigmoid_nv)),

axis=[1, 2, 3]

)

)

return cross_entropy

def get_loss(self):

[posh, posh_sample, negv, negv_sample, negh, negh_sample] = self.get_sample()

if self.obj_type == ‘free’:

loss = self.free_energy(self.input)-self.free_energy(negv_sample)+0.2*T.square(0.001-posh).sum(axis=[1])

else:

loss = self.proxy_energy(self.input, posh_sample) - self.proxy_energy(negv_sample, negh_sample)

monitoring_cost = self.get_reconstruction_cost(negv)

return loss.mean(), monitoring_cost.mean()

def optimization(self, train_set, epoch):

params = lasagne.layers.get_all_params(self.layer_v_given_h, trainable=True)

updates = lasagne.updates.nesterov_momentum(

self.get_loss()[0], params, learning_rate=0.1, momentum=0.3)

numsample = train_set.get_value().shape[0]

numbatch = int(numsample/self.batch_size)

index = T.iscalar(‘index’)

train_fn = theano.function([index], self.get_loss()[0],

givens={self.input: train_set[index*self.batch_size:(index+1)*self.batch_size]},

updates=updates)

for j in xrange(epoch):

loss = []

for i in xrange(numbatch):

loss = loss+[train_fn(i)]

# print “epoch %d batch %d error is %d” %(j, i, train_fn(i))

print “epoch %d error is %d” % (j, np.mean(loss))

image = self.visilize()

image.save(‘/home/wang/SCI1/Figure/filters_at_epoch_%d.png’ % j)

def save_model(self):

path = ‘/home/wang/桌面/’ + ‘rbm_hidden_’ + str(self.n_hidden) + ‘.pickle’

with open(path, ‘wb’) as file1:

model = get_all_param_values(self.layer_v_given_h)

pickle.dump(model, file1)

def visilize(self, X = None):

if X == None:

X = self.layer_h_given_v.W.get_value(borrow=True).T

image = Image.fromarray(

tile_raster_images(

X,

img_shape=(28, 28),

tile_shape=(10, 10),

tile_spacing=(1, 1)

)

)

return image

if name == ‘main‘:

# x = T.matrix()

x = T.tensor4()

rbm = RBM(input=x, input_shape=(200, 1, 28, 28))

# print type(rbm.input)

data = Data()

data()

# c = data.train_set[0].get_value().reshape(50000,784)

# image=rbm.visilize(c[0:100])

# image.save(‘/home/wang/SCI1/Figure/mnist.png’)

# print data.train_set[0].get_value()[0:20].shape

# print data.train_set[0].get_value()

rbm.optimization(data.train_set[0], epoch=15)

image = rbm.visilize()

image.save(‘learned_filter.png’)

# rbm.save_model()

基于Lasagne實作限制玻爾茲曼機(RBM)-- coding: utf-8 --

繼續閱讀