RBM理論部分大家看懂這個圖檔就差不多了:

Lasagne寫代碼首先要确定層與層,RBM 正向反向過程可以分别當作一個層,權值矩陣互為轉置即可。
代碼:
**# -- coding: utf-8 --
“””
data format is ‘bc01’
written by Ph.D wang重點内容
“””
import theano.tensor as T
import numpy as np
from lasagne.layers import Layer
from lasagne import init
from lasagne import nonlinearities**
class RBMDenseLayer(Layer):
def init(self, incoming, num_units, W=init.GlorotUniform(),
c=init.Constant(0.), nonlinearity=nonlinearities.rectify,
num_leading_axes=1, **kwargs):
super(RBMDenseLayer, self).init(incoming, **kwargs)
self.nonlinearity = (nonlinearities.identity if nonlinearity is None
else nonlinearity)
self.num_units = num_units
if num_leading_axes >= len(self.input_shape):
raise ValueError(
“Got num_leading_axes=%d for a %d-dimensional input, ”
“leaving no trailing axes for the dot product.” %
(num_leading_axes, len(self.input_shape)))
elif num_leading_axes < -len(self.input_shape):
raise ValueError(
“Got num_leading_axes=%d for a %d-dimensional input, ”
“requesting more trailing axes than there are input ”
“dimensions.” % (num_leading_axes, len(self.input_shape)))
self.num_leading_axes = num_leading_axes
if any(s is None for s in self.input_shape[num_leading_axes:]):
raise ValueError(
“A DenseLayer requires a fixed input shape (except for ”
“the leading axes). Got %r for num_leading_axes=%d.” %
(self.input_shape, self.num_leading_axes))
num_inputs = int(np.prod(self.input_shape[num_leading_axes:]))
self.W = self.add_param(W, (num_inputs, num_units), name=”W”)
if c is None:
self.c = None
else:
self.c = self.add_param(c, (num_units,), name=”c”,
regularizable=False)
def get_output_shape_for(self, input_shape):
return input_shape[:self.num_leading_axes] + (self.num_units,)
def get_output_for(self, input, **kwargs):
num_leading_axes = self.num_leading_axes
if num_leading_axes < 0:
num_leading_axes += input.ndim
if input.ndim > num_leading_axes + 1:
# flatten trailing axes (into (n+1)-tensor for num_leading_axes=n)
input = input.flatten(num_leading_axes + 1)
activation = T.dot(input, self.W)
if self.c is not None:
activation = activation + self.c
return self.nonlinearity(activation)
-- coding: utf-8 --
“””
data format is ‘bc01’
written by Ph.D wang
“””
import lasagne
import theano
import theano.tensor as T
import lasagne
import numpy
import numpy as np
from theano.sandbox.rng_mrg import MRG_RandomStreams
from lasagne.nonlinearities import sigmoid
from lasagne.layers import get_output, get_all_param_values, get_all_params, get_output_shape
import gzip
import pickle
import os
from Layer.RBMDenseLayer import RBMDenseLayer
try:
import PIL.Image as Image
except ImportError:
import Image
from utils.utils import tile_raster_images
class Data(object):
def init(self, dataset=’/media/wang/0000EEC70001E10B/Data_Resource/MNIST/mnist.pkl.gz’):
self.datapath= dataset
self.train_set = []
self.valid_set = []
self.test_set = []
def call(self):
return self.generate()
def generate(self):
data_dir, data_file = os.path.split(self.datapath)
if data_dir == “” and not os.path.isfile(self.datapath):
# Check if dataset is in the data directory.
new_path = os.path.join(
os.path.split(file)[0],
“..”,
“data”,
self.datapath
)
if os.path.isfile(new_path) or data_file == ‘mnist.pkl.gz’:
dataset = new_path
if (not os.path.isfile(self.datapath)) and data_file == ‘mnist.pkl.gz’:
from six.moves import urllib
origin = (
‘http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz’
)
print(‘Downloading data from %s’ % origin)
urllib.request.urlretrieve(origin, self.datapath)
print(‘… loading data’)
# Load the dataset
with gzip.open(self.datapath, ‘rb’) as f:
try:
train_set, valid_set, test_set = pickle.load(f, encoding=’latin1’)
except:
train_set, valid_set, test_set = pickle.load(f)
# train_set, valid_set, test_set format: tuple(input, target)
# input is a numpy.ndarray of 2 dimensions (a matrix)
# where each row corresponds to an example. target is a
# numpy.ndarray of 1 dimension (vector) that has the same length as
# the number of rows in the input. It should give the target
# to the example with the same index in the input.
def shared_dataset(data_xy, borrow=True):
“”” Function that loads the dataset into shared variables
The reason we store our dataset in shared variables is to allow
Theano to copy it into the GPU memory (when code is run on GPU).
Since copying data into the GPU is slow, copying a minibatch everytime
is needed (the default behaviour if the data is not in a shared
variable) would lead to a large decrease in performance.
“””
data_x, data_y = data_xy
data_x = np.reshape(data_x, (len(data_x), 1, 28, 28))
shared_x = theano.shared(numpy.asarray(data_x,
dtype=theano.config.floatX),
borrow=borrow)
shared_y = theano.shared(numpy.asarray(data_y,
dtype=theano.config.floatX),
borrow=borrow)
# When storing data on the GPU it has to be stored as floats
# therefore we will store the labels as
floatX
as well
# (
shared_y
does exactly that). But during our computations
# we need them as ints (we use labels as index, and if they are
# floats it doesn’t make sense) therefore instead of returning
#
shared_y
we will have to cast it to int. This little hack
# lets ous get around this issue
return shared_x, T.cast(shared_y, ‘int32’)
self.test_set = shared_dataset(test_set)
self.valid_set = shared_dataset(valid_set)
self.train_set = shared_dataset(train_set)
class RBM(object):
def init(self, input=None, input_shape=None, K=1, obj_type=’free’, n_visible=784,
n_hidden=500, numpy_rng=None, theano_rng=None):
self.n_visible = n_visible
self.n_hidden = n_hidden
self.obj_type = obj_type
if numpy_rng is None:
# create a number generator
numpy_rng = numpy.random.RandomState(1234)
if theano_rng is None:
# theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))
self.theano_rng = MRG_RandomStreams(numpy_rng.randint(2 ** 30))
if input is None:
self.input = T.tensor4(‘input_data’)
else:
self.input = input
self.input_shape = input_shape
self.batch_size = input_shape[0]
self.K = K
self.input_layer = lasagne.layers.InputLayer(shape=self.input_shape, input_var=self.input)
self.layer_h_given_v = lasagne.layers.DenseLayer(self.input_layer, num_units=self.n_hidden, nonlinearity=sigmoid)
self.layer_v_given_h = RBMDenseLayer(self.layer_h_given_v, num_units=self.n_visible, W=self.layer_h_given_v.W.T,
nonlinearity=sigmoid)
# self.params = get_all_param_values(self.v_given_h)
self.params = get_all_params(self.layer_v_given_h)
def pcd_sampler(self):
pass
def h_given_v(self):
return [get_output(self.layer_h_given_v), self.theano_rng.binomial(size=get_output_shape(self.layer_h_given_v),
p=get_output(self.layer_h_given_v))]
def v_given_h(self):
return [get_output(self.layer_v_given_h).reshape(self.input_shape),self.theano_rng.binomial(size=get_output_shape(self.layer_v_given_h),
p=get_output(self.layer_v_given_h)).reshape(self.input_shape)]
def get_sample(self):
posh, posh_sample = self.h_given_v()
for i in xrange(self.K):
negv, negv_sample = self.v_given_h()
negh, negh_sample = self.h_given_v()
# if method == persistent:
return [posh, posh_sample, negv, negv_sample, negh, negh_sample]
def free_energy(self, sample):
wx_b = T.dot(T.reshape(sample, (sample.shape[0], sample.shape[1]*sample.shape[2]*sample.shape[3])),
self.layer_h_given_v.W) + self.layer_h_given_v.b
vbias_term = T.dot(T.reshape(sample, (sample.shape[0], sample.shape[1]*sample.shape[2]*sample.shape[3])),self.layer_v_given_h.c)
hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis=1)
return -hidden_term - vbias_term
def proxy_energy(self, visible_sample, hidden_sample):
vwh = T.dot(T.reshape(visible_sample, (visible_sample.shape[0],
visible_sample.shape[1] * visible_sample.shape[2] * visible_sample.shape[3])),
self.layer_h_given_v.W)*hidden_sample
vc = T.reshape(visible_sample, (visible_sample.shape[0],
visible_sample.shape[1] * visible_sample.shape[2] * visible_sample.shape[3]))*self.layer_v_given_h.c
hb = hidden_sample * self.layer_h_given_v.b
return -vwh.sum(axis=1)-vc.sum(axis=1)-hb.sum(axis=1)
def get_reconstruction_cost(self, pre_sigmoid_nv):
cross_entropy = T.mean(
T.sum(
self.input * T.log(T.nnet.sigmoid(pre_sigmoid_nv)) +
(1 - self.input) * T.log(1 - T.nnet.sigmoid(pre_sigmoid_nv)),
axis=[1, 2, 3]
)
)
return cross_entropy
def get_loss(self):
[posh, posh_sample, negv, negv_sample, negh, negh_sample] = self.get_sample()
if self.obj_type == ‘free’:
loss = self.free_energy(self.input)-self.free_energy(negv_sample)+0.2*T.square(0.001-posh).sum(axis=[1])
else:
loss = self.proxy_energy(self.input, posh_sample) - self.proxy_energy(negv_sample, negh_sample)
monitoring_cost = self.get_reconstruction_cost(negv)
return loss.mean(), monitoring_cost.mean()
def optimization(self, train_set, epoch):
params = lasagne.layers.get_all_params(self.layer_v_given_h, trainable=True)
updates = lasagne.updates.nesterov_momentum(
self.get_loss()[0], params, learning_rate=0.1, momentum=0.3)
numsample = train_set.get_value().shape[0]
numbatch = int(numsample/self.batch_size)
index = T.iscalar(‘index’)
train_fn = theano.function([index], self.get_loss()[0],
givens={self.input: train_set[index*self.batch_size:(index+1)*self.batch_size]},
updates=updates)
for j in xrange(epoch):
loss = []
for i in xrange(numbatch):
loss = loss+[train_fn(i)]
# print “epoch %d batch %d error is %d” %(j, i, train_fn(i))
print “epoch %d error is %d” % (j, np.mean(loss))
image = self.visilize()
image.save(‘/home/wang/SCI1/Figure/filters_at_epoch_%d.png’ % j)
def save_model(self):
path = ‘/home/wang/桌面/’ + ‘rbm_hidden_’ + str(self.n_hidden) + ‘.pickle’
with open(path, ‘wb’) as file1:
model = get_all_param_values(self.layer_v_given_h)
pickle.dump(model, file1)
def visilize(self, X = None):
if X == None:
X = self.layer_h_given_v.W.get_value(borrow=True).T
image = Image.fromarray(
tile_raster_images(
X,
img_shape=(28, 28),
tile_shape=(10, 10),
tile_spacing=(1, 1)
)
)
return image
if name == ‘main‘:
# x = T.matrix()
x = T.tensor4()
rbm = RBM(input=x, input_shape=(200, 1, 28, 28))
# print type(rbm.input)
data = Data()
data()
# c = data.train_set[0].get_value().reshape(50000,784)
# image=rbm.visilize(c[0:100])
# image.save(‘/home/wang/SCI1/Figure/mnist.png’)
# print data.train_set[0].get_value()[0:20].shape
# print data.train_set[0].get_value()
rbm.optimization(data.train_set[0], epoch=15)
image = rbm.visilize()
image.save(‘learned_filter.png’)
# rbm.save_model()