天天看點

《Spark MLlib 機器學習》第十五章代碼

《Spark MLlib 機器學習》第十五章代碼

1、神經網絡類

package NN

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.mllib.linalg._

import breeze.linalg.{
  Matrix => BM,
  CSCMatrix => BSM,
  DenseMatrix => BDM,
  Vector => BV,
  DenseVector => BDV,
  SparseVector => BSV,
  axpy => brzAxpy,
  svd => brzSvd
}
import breeze.numerics.{
  exp => Bexp,
  tanh => Btanh
}

import scala.collection.mutable.ArrayBuffer
import java.util.Random
import scala.math._

/**
 * label:目标矩陣
 * nna:神經網絡每層節點的輸出值,a(0),a(1),a(2)
 * error:輸出層與目标值的誤差矩陣
 */
case class NNLabel(label: BDM[Double], nna: ArrayBuffer[BDM[Double]], error: BDM[Double]) extends Serializable

/**
 * 配置參數
 */
case class NNConfig(
  size: Array[Int],
  layer: Int,
  activation_function: String,
  learningRate: Double,
  momentum: Double,
  scaling_learningRate: Double,
  weightPenaltyL2: Double,
  nonSparsityPenalty: Double,
  sparsityTarget: Double,
  inputZeroMaskedFraction: Double,
  dropoutFraction: Double,
  testing: Double,
  output_function: String) extends Serializable

/**
 * NN(neural network)
 */

class NeuralNet(
  private var size: Array[Int],
  private var layer: Int,
  private var activation_function: String,
  private var learningRate: Double,
  private var momentum: Double,
  private var scaling_learningRate: Double,
  private var weightPenaltyL2: Double,
  private var nonSparsityPenalty: Double,
  private var sparsityTarget: Double,
  private var inputZeroMaskedFraction: Double,
  private var dropoutFraction: Double,
  private var testing: Double,
  private var output_function: String,
  private var initW: Array[BDM[Double]]) extends Serializable with Logging {
  //            var size=Array(5, 10, 7, 1)
  //            var layer=4
  //            var activation_function="tanh_opt"
  //            var learningRate=2.0
  //            var momentum=0.5
  //            var scaling_learningRate=1.0
  //            var weightPenaltyL2=0.0
  //            var nonSparsityPenalty=0.0
  //            var sparsityTarget=0.05
  //            var inputZeroMaskedFraction=0.0
  //            var dropoutFraction=0.0
  //            var testing=0.0
  //            var output_function="sigm"
  /**
   * size = architecture;
   * n = numel(nn.size);
   * activation_function = sigm   隐含層函數Activation functions of hidden layers: 'sigm' (sigmoid) or 'tanh_opt' (optimal tanh).
   * learningRate = 2;            學習率learning rate Note: typically needs to be lower when using 'sigm' activation function and non-normalized inputs.
   * momentum = 0.5;              Momentum
   * scaling_learningRate = 1;    Scaling factor for the learning rate (each epoch)
   * weightPenaltyL2  = 0;        正則化L2 regularization
   * nonSparsityPenalty = 0;      權重稀疏度懲罰值on sparsity penalty
   * sparsityTarget = 0.05;       Sparsity target
   * inputZeroMaskedFraction = 0; 加入noise,Used for Denoising AutoEncoders
   * dropoutFraction = 0;         每一次mini-batch樣本輸入訓練時,随機扔掉x%的隐含層節點Dropout level (http://www.cs.toronto.edu/~hinton/absps/dropout.pdf)
   * testing = 0;                 Internal variable. nntest sets this to one.
   * output = 'sigm';             輸出函數output unit 'sigm' (=logistic), 'softmax' and 'linear'   *
   */
  def this() = this(NeuralNet.Architecture, 3, NeuralNet.Activation_Function, 2.0, 0.5, 1.0, 0.0, 0.0, 0.05, 0.0, 0.0, 0.0, NeuralNet.Output, Array(BDM.zeros[Double](1, 1)))

  /** 設定神經網絡結構. Default: [10, 5, 1]. */
  def setSize(size: Array[Int]): this.type = {
    this.size = size
    this
  }

  /** 設定神經網絡層資料. Default: 3. */
  def setLayer(layer: Int): this.type = {
    this.layer = layer
    this
  }

  /** 設定隐含層函數. Default: sigm. */
  def setActivation_function(activation_function: String): this.type = {
    this.activation_function = activation_function
    this
  }

  /** 設定學習率因子. Default: 2. */
  def setLearningRate(learningRate: Double): this.type = {
    this.learningRate = learningRate
    this
  }

  /** 設定Momentum. Default: 0.5. */
  def setMomentum(momentum: Double): this.type = {
    this.momentum = momentum
    this
  }

  /** 設定scaling_learningRate. Default: 1. */
  def setScaling_learningRate(scaling_learningRate: Double): this.type = {
    this.scaling_learningRate = scaling_learningRate
    this
  }

  /** 設定正則化L2因子. Default: 0. */
  def setWeightPenaltyL2(weightPenaltyL2: Double): this.type = {
    this.weightPenaltyL2 = weightPenaltyL2
    this
  }

  /** 設定權重稀疏度懲罰因子. Default: 0. */
  def setNonSparsityPenalty(nonSparsityPenalty: Double): this.type = {
    this.nonSparsityPenalty = nonSparsityPenalty
    this
  }

  /** 設定權重稀疏度目标值. Default: 0.05. */
  def setSparsityTarget(sparsityTarget: Double): this.type = {
    this.sparsityTarget = sparsityTarget
    this
  }

  /** 設定權重加入噪聲因子. Default: 0. */
  def setInputZeroMaskedFraction(inputZeroMaskedFraction: Double): this.type = {
    this.inputZeroMaskedFraction = inputZeroMaskedFraction
    this
  }

  /** 設定權重Dropout因子. Default: 0. */
  def setDropoutFraction(dropoutFraction: Double): this.type = {
    this.dropoutFraction = dropoutFraction
    this
  }

  /** 設定testing. Default: 0. */
  def setTesting(testing: Double): this.type = {
    this.testing = testing
    this
  }

  /** 設定輸出函數. Default: linear. */
  def setOutput_function(output_function: String): this.type = {
    this.output_function = output_function
    this
  }

  /** 設定初始權重. Default: 0. */
  def setInitW(initW: Array[BDM[Double]]): this.type = {
    this.initW = initW
    this
  }

  /**
   * 運作神經網絡算法.
   */
  def NNtrain(train_d: RDD[(BDM[Double], BDM[Double])], opts: Array[Double]): NeuralNetModel = {
    val sc = train_d.sparkContext
    var initStartTime = System.currentTimeMillis()
    var initEndTime = System.currentTimeMillis()
    // 參數配置 廣播配置
    var nnconfig = NNConfig(size, layer, activation_function, learningRate, momentum, scaling_learningRate,
      weightPenaltyL2, nonSparsityPenalty, sparsityTarget, inputZeroMaskedFraction, dropoutFraction, testing,
      output_function)
    // 初始化權重
    var nn_W = NeuralNet.InitialWeight(size)
    if (!((initW.length == 1) && (initW(0) == (BDM.zeros[Double](1, 1))))) {
      for (i <- 0 to initW.length - 1) {
        nn_W(i) = initW(i)
      }
    }
    var nn_vW = NeuralNet.InitialWeightV(size)
    //    val tmpw = nn_W(0)
    //    for (i <- 0 to tmpw.rows - 1) {
    //      for (j <- 0 to tmpw.cols - 1) {
    //        print(tmpw(i, j) + "\t")
    //      }
    //      println()
    //    }

    // 初始化每層的平均激活度nn.p
    // average activations (for use with sparsity)
    var nn_p = NeuralNet.InitialActiveP(size)

    // 樣本資料劃分:訓練資料、交叉檢驗資料
    val validation = opts(2)
    val splitW1 = Array(1.0 - validation, validation)
    val train_split1 = train_d.randomSplit(splitW1, System.nanoTime())
    val train_t = train_split1(0)
    val train_v = train_split1(1)

    // m:訓練樣本的數量
    val m = train_t.count
    // batchsize是做batch gradient時候的大小 
    // 計算batch的數量
    val batchsize = opts(0).toInt
    val numepochs = opts(1).toInt
    val numbatches = (m / batchsize).toInt
    var L = Array.fill(numepochs * numbatches.toInt)(0.0)
    var n = 0
    var loss_train_e = Array.fill(numepochs)(0.0)
    var loss_val_e = Array.fill(numepochs)(0.0)
    // numepochs是循環的次數 
    for (i <- 1 to numepochs) {
      initStartTime = System.currentTimeMillis()
      val splitW2 = Array.fill(numbatches)(1.0 / numbatches)
      // 根據分組權重,随機劃分每組樣本資料  
      val bc_config = sc.broadcast(nnconfig)
      for (l <- 1 to numbatches) {
        // 權重 
        val bc_nn_W = sc.broadcast(nn_W)
        val bc_nn_vW = sc.broadcast(nn_vW)

        //        println(i + "\t" + l)
        //        println("W1")
        //        val tmpw0 = bc_nn_W.value(0)
        //        for (i <- 0 to tmpw0.rows - 1) {
        //          for (j <- 0 to tmpw0.cols - 1) {
        //            print(tmpw0(i, j) + "\t")
        //          }
        //          println()
        //        }
        //        println("W2")
        //        val tmpw1 = bc_nn_W.value(1)
        //        for (i <- 0 to tmpw1.rows - 1) {
        //          for (j <- 0 to tmpw1.cols - 1) {
        //            print(tmpw1(i, j) + "\t")
        //          }
        //          println()
        //        }
        //        println("W3")
        //        val tmpw2 = bc_nn_W.value(2)
        //        for (i <- 0 to tmpw2.rows - 1) {
        //          for (j <- 0 to tmpw2.cols - 1) {
        //            print(tmpw2(i, j) + "\t")
        //          }
        //          println()
        //        }

        // 樣本劃分
        val train_split2 = train_t.randomSplit(splitW2, System.nanoTime())
        val batch_xy1 = train_split2(l - 1)
        //        val train_split3 = train_t.filter { f => (f._1 >= batchsize * (l - 1) + 1) && (f._1 <= batchsize * (l)) }
        //        val batch_xy1 = train_split3.map(f => (f._2, f._3))
        // Add noise to input (for use in denoising autoencoder)
        // 加入noise,這是denoising autoencoder需要使用到的部分  
        // 這部分請參見《Extracting and Composing Robust Features with Denoising Autoencoders》這篇論文  
        // 具體加入的方法就是把訓練樣例中的一些資料調整變為0,inputZeroMaskedFraction表示了調整的比例  
        //val randNoise = NeuralNet.RandMatrix(batch_x.numRows.toInt, batch_x.numCols.toInt, inputZeroMaskedFraction)
        val batch_xy2 = if (bc_config.value.inputZeroMaskedFraction != 0) {
          NeuralNet.AddNoise(batch_xy1, bc_config.value.inputZeroMaskedFraction)
        } else batch_xy1

        //        val tmpxy = batch_xy2.map(f => (f._1.toArray,f._2.toArray)).toArray.map {f => ((new ArrayBuffer() ++ f._1) ++ f._2).toArray}
        //        for (i <- 0 to tmpxy.length - 1) {
        //          for (j <- 0 to tmpxy(i).length - 1) {
        //            print(tmpxy(i)(j) + "\t")
        //          }
        //          println()
        //        }

        // NNff是進行前向傳播
        // nn = nnff(nn, batch_x, batch_y);
        val train_nnff = NeuralNet.NNff(batch_xy2, bc_config, bc_nn_W)

        //        val tmpa0 = train_nnff.map(f => f._1.nna(0)).take(20)
        //        println("tmpa0")
        //        for (i <- 0 to 10) {
        //          for (j <- 0 to tmpa0(i).cols - 1) {
        //            print(tmpa0(i)(0, j) + "\t")
        //          }
        //          println()
        //        }
        //        val tmpa1 = train_nnff.map(f => f._1.nna(1)).take(20)
        //        println("tmpa1")
        //        for (i <- 0 to 10) {
        //          for (j <- 0 to tmpa1(i).cols - 1) {
        //            print(tmpa1(i)(0, j) + "\t")
        //          }
        //          println()
        //        }
        //        val tmpa2 = train_nnff.map(f => f._1.nna(2)).take(20)
        //        println("tmpa2")
        //        for (i <- 0 to 10) {
        //          for (j <- 0 to tmpa2(i).cols - 1) {
        //            print(tmpa2(i)(0, j) + "\t")
        //          }
        //          println()
        //        }

        // sparsity計算,計算每層節點的平均稀疏度
        nn_p = NeuralNet.ActiveP(train_nnff, bc_config, nn_p)
        val bc_nn_p = sc.broadcast(nn_p)

        // NNbp是後向傳播
        // nn = nnbp(nn);
        val train_nnbp = NeuralNet.NNbp(train_nnff, bc_config, bc_nn_W, bc_nn_p)

        //        val tmpd0 = rdd5.map(f => f._2(2)).take(20)
        //        println("tmpd0")
        //        for (i <- 0 to 10) {
        //          for (j <- 0 to tmpd0(i).cols - 1) {
        //            print(tmpd0(i)(0, j) + "\t")
        //          }
        //          println()
        //        }
        //        val tmpd1 = rdd5.map(f => f._2(1)).take(20)
        //        println("tmpd1")
        //        for (i <- 0 to 10) {
        //          for (j <- 0 to tmpd1(i).cols - 1) {
        //            print(tmpd1(i)(0, j) + "\t")
        //          }
        //          println()
        //        }
        //        val tmpdw0 = rdd5.map(f => f._3(0)).take(20)
        //        println("tmpdw0")
        //        for (i <- 0 to 10) {
        //          for (j <- 0 to tmpdw0(i).cols - 1) {
        //            print(tmpdw0(i)(0, j) + "\t")
        //          }
        //          println()
        //        }
        //        val tmpdw1 = rdd5.map(f => f._3(1)).take(20)
        //        println("tmpdw1")
        //        for (i <- 0 to 10) {
        //          for (j <- 0 to tmpdw1(i).cols - 1) {
        //            print(tmpdw1(i)(0, j) + "\t")
        //          }
        //          println()
        //        }

        // nn = NNapplygrads(nn) returns an neural network structure with updated
        // weights and biases
        // 更新權重參數:w=w-α*[dw + λw]    
        val train_nnapplygrads = NeuralNet.NNapplygrads(train_nnbp, bc_config, bc_nn_W, bc_nn_vW)
        nn_W = train_nnapplygrads(0)
        nn_vW = train_nnapplygrads(1)

        //        val tmpw2 = train_nnapplygrads(0)(0)
        //        for (i <- 0 to tmpw2.rows - 1) {
        //          for (j <- 0 to tmpw2.cols - 1) {
        //            print(tmpw2(i, j) + "\t")
        //          }
        //          println()
        //        }
        //        val tmpw3 = train_nnapplygrads(0)(1)
        //        for (i <- 0 to tmpw3.rows - 1) {
        //          for (j <- 0 to tmpw3.cols - 1) {
        //            print(tmpw3(i, j) + "\t")
        //          }
        //          println()
        //        }

        // error and loss
        // 輸出誤差計算
        val loss1 = train_nnff.map(f => f._1.error)
        val (loss2, counte) = loss1.treeAggregate((0.0, 0L))(
          seqOp = (c, v) => {
            // c: (e, count), v: (m)
            val e1 = c._1
            val e2 = (v :* v).sum
            val esum = e1 + e2
            (esum, c._2 + 1)
          },
          combOp = (c1, c2) => {
            // c: (e, count)
            val e1 = c1._1
            val e2 = c2._1
            val esum = e1 + e2
            (esum, c1._2 + c2._2)
          })
        val Loss = loss2 / counte.toDouble
        L(n) = Loss * 0.5
        n = n + 1
      }
      // 計算本次疊代的訓練誤差及交叉檢驗誤差
      // Full-batch train mse
      val evalconfig = NNConfig(size, layer, activation_function, learningRate, momentum, scaling_learningRate,
        weightPenaltyL2, nonSparsityPenalty, sparsityTarget, inputZeroMaskedFraction, dropoutFraction, 1.0,
        output_function)
      loss_train_e(i - 1) = NeuralNet.NNeval(train_t, sc.broadcast(evalconfig), sc.broadcast(nn_W))
      if (validation > 0) loss_val_e(i - 1) = NeuralNet.NNeval(train_v, sc.broadcast(evalconfig), sc.broadcast(nn_W))

      // 更新學習因子
      // nn.learningRate = nn.learningRate * nn.scaling_learningRate;
      nnconfig = NNConfig(size, layer, activation_function, nnconfig.learningRate * nnconfig.scaling_learningRate, momentum, scaling_learningRate,
        weightPenaltyL2, nonSparsityPenalty, sparsityTarget, inputZeroMaskedFraction, dropoutFraction, testing,
        output_function)
      initEndTime = System.currentTimeMillis()

      // 列印輸出結果
      printf("epoch: numepochs = %d , Took = %d seconds; Full-batch train mse = %f, val mse = %f.\n", i, scala.math.ceil((initEndTime - initStartTime).toDouble / 1000).toLong, loss_train_e(i - 1), loss_val_e(i - 1))
    }
    val configok = NNConfig(size, layer, activation_function, learningRate, momentum, scaling_learningRate,
      weightPenaltyL2, nonSparsityPenalty, sparsityTarget, inputZeroMaskedFraction, dropoutFraction, 1.0,
      output_function)
    new NeuralNetModel(configok, nn_W)
  }

}

/**
 * NN(neural network)
 */
object NeuralNet extends Serializable {

  // Initialization mode names
  val Activation_Function = "sigm"
  val Output = "linear"
  val Architecture = Array(10, 5, 1)

  /**
   * 增加随機噪聲
   * 若随機值>=Fraction,值不變,否則改為0
   */
  def AddNoise(rdd: RDD[(BDM[Double], BDM[Double])], Fraction: Double): RDD[(BDM[Double], BDM[Double])] = {
    val addNoise = rdd.map { f =>
      val features = f._2
      val a = BDM.rand[Double](features.rows, features.cols)
      val a1 = a :>= Fraction
      val d1 = a1.data.map { f => if (f == true) 1.0 else 0.0 }
      val a2 = new BDM(features.rows, features.cols, d1)
      val features2 = features :* a2
      (f._1, features2)
    }
    addNoise
  }

  /**
   * 初始化權重
   * 初始化為一個很小的、接近零的随機值
   */
  def InitialWeight(size: Array[Int]): Array[BDM[Double]] = {
    // 初始化權重參數
    // weights and weight momentum
    // nn.W{i - 1} = (rand(nn.size(i), nn.size(i - 1)+1) - 0.5) * 2 * 4 * sqrt(6 / (nn.size(i) + nn.size(i - 1)));
    // nn.vW{i - 1} = zeros(size(nn.W{i - 1}));
    val n = size.length
    val nn_W = ArrayBuffer[BDM[Double]]()
    for (i <- 1 to n - 1) {
      val d1 = BDM.rand(size(i), size(i - 1) + 1)
      d1 :-= 0.5
      val f1 = 2 * 4 * sqrt(6.0 / (size(i) + size(i - 1)))
      val d2 = d1 :* f1
      //val d3 = new DenseMatrix(d2.rows, d2.cols, d2.data, d2.isTranspose)
      //val d4 = Matrices.dense(d2.rows, d2.cols, d2.data)
      nn_W += d2
    }
    nn_W.toArray
  }

  /**
   * 初始化權重vW
   * 初始化為0
   */
  def InitialWeightV(size: Array[Int]): Array[BDM[Double]] = {
    // 初始化權重參數
    // weights and weight momentum
    // nn.vW{i - 1} = zeros(size(nn.W{i - 1}));
    val n = size.length
    val nn_vW = ArrayBuffer[BDM[Double]]()
    for (i <- 1 to n - 1) {
      val d1 = BDM.zeros[Double](size(i), size(i - 1) + 1)
      nn_vW += d1
    }
    nn_vW.toArray
  }

  /**
   * 初始每一層的平均激活度
   * 初始化為0
   */
  def InitialActiveP(size: Array[Int]): Array[BDM[Double]] = {
    // 初始每一層的平均激活度
    // average activations (for use with sparsity)
    // nn.p{i}     = zeros(1, nn.size(i));  
    val n = size.length
    val nn_p = ArrayBuffer[BDM[Double]]()
    nn_p += BDM.zeros[Double](1, 1)
    for (i <- 1 to n - 1) {
      val d1 = BDM.zeros[Double](1, size(i))
      nn_p += d1
    }
    nn_p.toArray
  }

  /**
   * 随機讓網絡某些隐含層節點的權重不工作
   * 若随機值>=Fraction,矩陣值不變,否則改為0
   */
  def DropoutWeight(matrix: BDM[Double], Fraction: Double): Array[BDM[Double]] = {
    val aa = BDM.rand[Double](matrix.rows, matrix.cols)
    val aa1 = aa :> Fraction
    val d1 = aa1.data.map { f => if (f == true) 1.0 else 0.0 }
    val aa2 = new BDM(matrix.rows: Int, matrix.cols: Int, d1: Array[Double])
    val matrix2 = matrix :* aa2
    Array(aa2, matrix2)
  }

  /**
   * sigm激活函數
   * X = 1./(1+exp(-P));
   */
  def sigm(matrix: BDM[Double]): BDM[Double] = {
    val s1 = 1.0 / (Bexp(matrix * (-1.0)) + 1.0)
    s1
  }

  /**
   * tanh激活函數
   * f=1.7159*tanh(2/3.*A);
   */
  def tanh_opt(matrix: BDM[Double]): BDM[Double] = {
    val s1 = Btanh(matrix * (2.0 / 3.0)) * 1.7159
    s1
  }

  /**
   * nnff是進行前向傳播
   * 計算神經網絡中的每個節點的輸出值;
   */
  def NNff(
    batch_xy2: RDD[(BDM[Double], BDM[Double])],
    bc_config: org.apache.spark.broadcast.Broadcast[NNConfig],
    bc_nn_W: org.apache.spark.broadcast.Broadcast[Array[BDM[Double]]]): RDD[(NNLabel, Array[BDM[Double]])] = {
    // 第1層:a(1)=[1 x]
    // 增加偏置項b
    val train_data1 = batch_xy2.map { f =>
      val lable = f._1
      val features = f._2
      val nna = ArrayBuffer[BDM[Double]]()
      val Bm1 = new BDM(features.rows, 1, Array.fill(features.rows * 1)(1.0))
      val features2 = BDM.horzcat(Bm1, features)
      val error = BDM.zeros[Double](lable.rows, lable.cols)
      nna += features2
      NNLabel(lable, nna, error)
    }

    //    println("bc_size " + bc_config.value.size(0) + bc_config.value.size(1) + bc_config.value.size(2))
    //    println("bc_layer " + bc_config.value.layer)
    //    println("bc_activation_function " + bc_config.value.activation_function)
    //    println("bc_output_function " + bc_config.value.output_function)
    //
    //    println("tmpw0 ")
    //    val tmpw0 = bc_nn_W.value(0)
    //    for (i <- 0 to tmpw0.rows - 1) {
    //      for (j <- 0 to tmpw0.cols - 1) {
    //        print(tmpw0(i, j) + "\t")
    //      }
    //      println()
    //    }

    // feedforward pass
    // 第2至n-1層計算,a(i)=f(a(i-1)*w(i-1)')
    //val tmp1 = train_data1.map(f => f.nna(0).data).take(1)(0)
    //val tmp2 = new BDM(1, tmp1.length, tmp1)
    //val nn_a = ArrayBuffer[BDM[Double]]()
    //nn_a += tmp2
    val train_data2 = train_data1.map { f =>
      val nn_a = f.nna
      val dropOutMask = ArrayBuffer[BDM[Double]]()
      dropOutMask += new BDM[Double](1, 1, Array(0.0))
      for (j <- 1 to bc_config.value.layer - 2) {
        // 計算每層輸出
        // Calculate the unit's outputs (including the bias term)
        // nn.a{i} = sigm(nn.a{i - 1} * nn.W{i - 1}')
        // nn.a{i} = tanh_opt(nn.a{i - 1} * nn.W{i - 1}');            
        val A1 = nn_a(j - 1)
        val W1 = bc_nn_W.value(j - 1)
        val aw1 = A1 * W1.t
        val nnai1 = bc_config.value.activation_function match {
          case "sigm" =>
            val aw2 = NeuralNet.sigm(aw1)
            aw2
          case "tanh_opt" =>
            val aw2 = NeuralNet.tanh_opt(aw1)
            //val aw2 = Btanh(aw1 * (2.0 / 3.0)) * 1.7159
            aw2
        }
        // dropout計算
        // Dropout是指在模型訓練時随機讓網絡某些隐含層節點的權重不工作,不工作的那些節點可以暫時認為不是網絡結構的一部分
        // 但是它的權重得保留下來(隻是暫時不更新而已),因為下次樣本輸入時它可能又得工作了
        // 參照 http://www.cnblogs.com/tornadomeet/p/3258122.html   
        val dropoutai = if (bc_config.value.dropoutFraction > 0) {
          if (bc_config.value.testing == 1) {
            val nnai2 = nnai1 * (1.0 - bc_config.value.dropoutFraction)
            Array(new BDM[Double](1, 1, Array(0.0)), nnai2)
          } else {
            NeuralNet.DropoutWeight(nnai1, bc_config.value.dropoutFraction)
          }
        } else {
          val nnai2 = nnai1
          Array(new BDM[Double](1, 1, Array(0.0)), nnai2)
        }
        val nnai2 = dropoutai(1)
        dropOutMask += dropoutai(0)
        // Add the bias term
        // 增加偏置項b
        // nn.a{i} = [ones(m,1) nn.a{i}];
        val Bm1 = BDM.ones[Double](nnai2.rows, 1)
        val nnai3 = BDM.horzcat(Bm1, nnai2)
        nn_a += nnai3
      }
      (NNLabel(f.label, nn_a, f.error), dropOutMask.toArray)
    }

    // 輸出層計算
    val train_data3 = train_data2.map { f =>
      val nn_a = f._1.nna
      // nn.a{n} = sigm(nn.a{n - 1} * nn.W{n - 1}');
      // nn.a{n} = nn.a{n - 1} * nn.W{n - 1}';          
      val An1 = nn_a(bc_config.value.layer - 2)
      val Wn1 = bc_nn_W.value(bc_config.value.layer - 2)
      val awn1 = An1 * Wn1.t
      val nnan1 = bc_config.value.output_function match {
        case "sigm" =>
          val awn2 = NeuralNet.sigm(awn1)
          //val awn2 = 1.0 / (Bexp(awn1 * (-1.0)) + 1.0)
          awn2
        case "linear" =>
          val awn2 = awn1
          awn2
      }
      nn_a += nnan1
      (NNLabel(f._1.label, nn_a, f._1.error), f._2)
    }

    // error and loss
    // 輸出誤差計算
    // nn.e = y - nn.a{n};
    // val nn_e = batch_y - nnan
    val train_data4 = train_data3.map { f =>
      val batch_y = f._1.label
      val nnan = f._1.nna(bc_config.value.layer - 1)
      val error = (batch_y - nnan)
      (NNLabel(f._1.label, f._1.nna, error), f._2)
    }
    train_data4
  }

  /**
   * sparsity計算,網絡稀疏度
   * 計算每個節點的平均值
   */
  def ActiveP(
    train_nnff: RDD[(NNLabel, Array[BDM[Double]])],
    bc_config: org.apache.spark.broadcast.Broadcast[NNConfig],
    nn_p_old: Array[BDM[Double]]): Array[BDM[Double]] = {
    val nn_p = ArrayBuffer[BDM[Double]]()
    nn_p += BDM.zeros[Double](1, 1)
    // calculate running exponential activations for use with sparsity
    // sparsity計算,計算sparsity,nonSparsityPenalty 是對沒達到sparsitytarget的參數的懲罰系數 
    for (i <- 1 to bc_config.value.layer - 1) {
      val pi1 = train_nnff.map(f => f._1.nna(i))
      val initpi = BDM.zeros[Double](1, bc_config.value.size(i))
      val (piSum, miniBatchSize) = pi1.treeAggregate((initpi, 0L))(
        seqOp = (c, v) => {
          // c: (nnasum, count), v: (nna)
          val nna1 = c._1
          val nna2 = v
          val nnasum = nna1 + nna2
          (nnasum, c._2 + 1)
        },
        combOp = (c1, c2) => {
          // c: (nnasum, count)
          val nna1 = c1._1
          val nna2 = c2._1
          val nnasum = nna1 + nna2
          (nnasum, c1._2 + c2._2)
        })
      val piAvg = piSum / miniBatchSize.toDouble
      val oldpi = nn_p_old(i)
      val newpi = (piAvg * 0.01) + (oldpi * 0.09)
      nn_p += newpi
    }
    nn_p.toArray
  }

  /**
   * NNbp是後向傳播
   * 計算權重的平均偏導數
   */
  def NNbp(
    train_nnff: RDD[(NNLabel, Array[BDM[Double]])],
    bc_config: org.apache.spark.broadcast.Broadcast[NNConfig],
    bc_nn_W: org.apache.spark.broadcast.Broadcast[Array[BDM[Double]]],
    bc_nn_p: org.apache.spark.broadcast.Broadcast[Array[BDM[Double]]]): Array[BDM[Double]] = {
    // 第n層偏導數:d(n)=-(y-a(n))*f'(z),sigmoid函數f'(z)表達式:f'(z)=f(z)*[1-f(z)]
    // sigm: d{n} = - nn.e .* (nn.a{n} .* (1 - nn.a{n}));
    // {'softmax','linear'}: d{n} = - nn.e;
    val train_data5 = train_nnff.map { f =>
      val nn_a = f._1.nna
      val error = f._1.error
      val dn = ArrayBuffer[BDM[Double]]()
      val nndn = bc_config.value.output_function match {
        case "sigm" =>
          val fz = nn_a(bc_config.value.layer - 1)
          (error * (-1.0)) :* (fz :* (1.0 - fz))
        case "linear" =>
          error * (-1.0)
      }
      dn += nndn
      (f._1, f._2, dn)
    }
    // 第n-1至第2層導數:d(n)=-(w(n)*d(n+1))*f'(z) 
    val train_data6 = train_data5.map { f =>
      // 假設 f(z) 是sigmoid函數 f(z)=1/[1+e^(-z)],f'(z)表達式,f'(z)=f(z)*[1-f(z)]    
      // 假設 f(z) tanh f(z)=1.7159*tanh(2/3.*A) ,f'(z)表達式,f'(z)=1.7159 * 2/3 * (1 - 1/(1.7159)^2 * f(z).^2)   
      // train_data5.map(f => f._1.nna).take(1)
      // train_data5.map(f => f._3).take(1)
      // train_data5.map(f => f._2).take(1)
      //      val di = ArrayBuffer(BDM((0.011181628780251586)))
      //      val nn_a = ArrayBuffer[BDM[Double]]()
      //      val a1 = BDM((1.0, 0.312605257000000, 0.848582961000000, 0.999014768000000, 0.278330771000000, 0.462701179000000))
      //      val a2 = BDM((1.0, 0.838091550300577, 0.996782915917104, 0.118033012437165, 0.312605257000000, 0.848582961000000, 0.999014768000000, 0.278330771000000, 0.462701179000000, 0.278330771000000, 0.462701179000000))
      //      val a3 = BDM((1.0, 0.312605257000000, 0.848582961000000, 0.999014768000000, 0.278330771000000, 0.462701179000000, 0.278330771000000, 0.462701179000000))
      //      val a4 = BDM((0.9826605123949446))
      //      nn_a += a1
      //      nn_a += a2
      //      nn_a += a3
      //      nn_a += a4
      //      val dropout = Array(BDM.zeros[Double](1,1), BDM.zeros[Double](1,1), BDM.zeros[Double](1,1)) 
      val nn_a = f._1.nna
      val di = f._3
      val dropout = f._2
      for (i <- (bc_config.value.layer - 2) to 1 by -1) {
        // f'(z)表達式
        val nnd_act = bc_config.value.activation_function match {
          case "sigm" =>
            val d_act = nn_a(i) :* (1.0 - nn_a(i))
            d_act
          case "tanh_opt" =>
            val fz2 = (1.0 - ((nn_a(i) :* nn_a(i)) * (1.0 / (1.7159 * 1.7159))))
            val d_act = fz2 * (1.7159 * (2.0 / 3.0))
            d_act
        }
        // 稀疏度懲罰誤差計算:-(t/p)+(1-t)/(1-p)
        // sparsityError = [zeros(size(nn.a{i},1),1) nn.nonSparsityPenalty * (-nn.sparsityTarget ./ pi + (1 - nn.sparsityTarget) ./ (1 - pi))];
        val sparsityError = if (bc_config.value.nonSparsityPenalty > 0) {
          val nn_pi1 = bc_nn_p.value(i)
          val nn_pi2 = (bc_config.value.sparsityTarget / nn_pi1) * (-1.0) + (1.0 - bc_config.value.sparsityTarget) / (1.0 - nn_pi1)
          val Bm1 = new BDM(nn_pi2.rows, 1, Array.fill(nn_pi2.rows * 1)(1.0))
          val sparsity = BDM.horzcat(Bm1, nn_pi2 * bc_config.value.nonSparsityPenalty)
          sparsity
        } else {
          val nn_pi1 = bc_nn_p.value(i)
          val sparsity = BDM.zeros[Double](nn_pi1.rows, nn_pi1.cols + 1)
          sparsity
        }
        // 導數:d(n)=-( w(n)*d(n+1)+ sparsityError )*f'(z) 
        // d{i} = (d{i + 1} * nn.W{i} + sparsityError) .* d_act;
        val W1 = bc_nn_W.value(i)
        val nndi1 = if (i + 1 == bc_config.value.layer - 1) {
          //in this case in d{n} there is not the bias term to be removed  
          val di1 = di(bc_config.value.layer - 2 - i)
          val di2 = (di1 * W1 + sparsityError) :* nnd_act
          di2
        } else {
          // in this case in d{i} the bias term has to be removed
          val di1 = di(bc_config.value.layer - 2 - i)(::, 1 to -1)
          val di2 = (di1 * W1 + sparsityError) :* nnd_act
          di2
        }
        // dropoutFraction
        val nndi2 = if (bc_config.value.dropoutFraction > 0) {
          val dropouti1 = dropout(i)
          val Bm1 = new BDM(nndi1.rows: Int, 1: Int, Array.fill(nndi1.rows * 1)(1.0))
          val dropouti2 = BDM.horzcat(Bm1, dropouti1)
          nndi1 :* dropouti2
        } else nndi1
        di += nndi2
      }
      di += BDM.zeros(1, 1)
      // 計算最終需要的偏導數值:dw(n)=(1/m)∑d(n+1)*a(n)
      //  nn.dW{i} = (d{i + 1}' * nn.a{i}) / size(d{i + 1}, 1);
      val dw = ArrayBuffer[BDM[Double]]()
      for (i <- 0 to bc_config.value.layer - 2) {
        val nndW = if (i + 1 == bc_config.value.layer - 1) {
          (di(bc_config.value.layer - 2 - i).t) * nn_a(i)
        } else {
          (di(bc_config.value.layer - 2 - i)(::, 1 to -1)).t * nn_a(i)
        }
        dw += nndW
      }
      (f._1, di, dw)
    }
    val train_data7 = train_data6.map(f => f._3)

    // Sample a subset (fraction miniBatchFraction) of the total data
    // compute and sum up the subgradients on this subset (this is one map-reduce)
    val initgrad = ArrayBuffer[BDM[Double]]()
    for (i <- 0 to bc_config.value.layer - 2) {
      val init1 = if (i + 1 == bc_config.value.layer - 1) {
        BDM.zeros[Double](bc_config.value.size(i + 1), bc_config.value.size(i) + 1)
      } else {
        BDM.zeros[Double](bc_config.value.size(i + 1), bc_config.value.size(i) + 1)
      }
      initgrad += init1
    }
    val (gradientSum, miniBatchSize) = train_data7.treeAggregate((initgrad, 0L))(
      seqOp = (c, v) => {
        // c: (grad, count), v: (grad)
        val grad1 = c._1
        val grad2 = v
        val sumgrad = ArrayBuffer[BDM[Double]]()
        for (i <- 0 to bc_config.value.layer - 2) {
          val Bm1 = grad1(i)
          val Bm2 = grad2(i)
          val Bmsum = Bm1 + Bm2
          sumgrad += Bmsum
        }
        (sumgrad, c._2 + 1)
      },
      combOp = (c1, c2) => {
        // c: (grad, count)
        val grad1 = c1._1
        val grad2 = c2._1
        val sumgrad = ArrayBuffer[BDM[Double]]()
        for (i <- 0 to bc_config.value.layer - 2) {
          val Bm1 = grad1(i)
          val Bm2 = grad2(i)
          val Bmsum = Bm1 + Bm2
          sumgrad += Bmsum
        }
        (sumgrad, c1._2 + c2._2)
      })
    // 求平均值
    val gradientAvg = ArrayBuffer[BDM[Double]]()
    for (i <- 0 to bc_config.value.layer - 2) {
      val Bm1 = gradientSum(i)
      val Bmavg = Bm1 :/ miniBatchSize.toDouble
      gradientAvg += Bmavg
    }
    gradientAvg.toArray
  }

  /**
   * NNapplygrads是權重更新
   * 權重更新
   */
  def NNapplygrads(
    train_nnbp: Array[BDM[Double]],
    bc_config: org.apache.spark.broadcast.Broadcast[NNConfig],
    bc_nn_W: org.apache.spark.broadcast.Broadcast[Array[BDM[Double]]],
    bc_nn_vW: org.apache.spark.broadcast.Broadcast[Array[BDM[Double]]]): Array[Array[BDM[Double]]] = {
    // nn = nnapplygrads(nn) returns an neural network structure with updated
    // weights and biases
    // 更新權重參數:w=w-α*[dw + λw]    
    val W_a = ArrayBuffer[BDM[Double]]()
    val vW_a = ArrayBuffer[BDM[Double]]()
    for (i <- 0 to bc_config.value.layer - 2) {
      val nndwi = if (bc_config.value.weightPenaltyL2 > 0) {
        val dwi = train_nnbp(i)
        val zeros = BDM.zeros[Double](dwi.rows, 1)
        val l2 = BDM.horzcat(zeros, dwi(::, 1 to -1))
        val dwi2 = dwi + (l2 * bc_config.value.weightPenaltyL2)
        dwi2
      } else {
        val dwi = train_nnbp(i)
        dwi
      }
      val nndwi2 = nndwi :* bc_config.value.learningRate
      val nndwi3 = if (bc_config.value.momentum > 0) {
        val vwi = bc_nn_vW.value(i)
        val dw3 = nndwi2 + (vwi * bc_config.value.momentum)
        dw3
      } else {
        nndwi2
      }
      // nn.W{i} = nn.W{i} - dW;
      W_a += (bc_nn_W.value(i) - nndwi3)
      // nn.vW{i} = nn.momentum*nn.vW{i} + dW;
      val nnvwi1 = if (bc_config.value.momentum > 0) {
        val vwi = bc_nn_vW.value(i)
        val vw3 = nndwi2 + (vwi * bc_config.value.momentum)
        vw3
      } else {
        bc_nn_vW.value(i)
      }
      vW_a += nnvwi1
    }
    Array(W_a.toArray, vW_a.toArray)
  }

  /**
   * nneval是進行前向傳播并計算輸出誤差
   * 計算神經網絡中的每個節點的輸出值,并計算平均誤差;
   */
  def NNeval(
    batch_xy: RDD[(BDM[Double], BDM[Double])],
    bc_config: org.apache.spark.broadcast.Broadcast[NNConfig],
    bc_nn_W: org.apache.spark.broadcast.Broadcast[Array[BDM[Double]]]): Double = {
    // NNff是進行前向傳播
    // nn = nnff(nn, batch_x, batch_y);
    val train_nnff = NeuralNet.NNff(batch_xy, bc_config, bc_nn_W)
    // error and loss
    // 輸出誤差計算
    val loss1 = train_nnff.map(f => f._1.error)
    val (loss2, counte) = loss1.treeAggregate((0.0, 0L))(
      seqOp = (c, v) => {
        // c: (e, count), v: (m)
        val e1 = c._1
        val e2 = (v :* v).sum
        val esum = e1 + e2
        (esum, c._2 + 1)
      },
      combOp = (c1, c2) => {
        // c: (e, count)
        val e1 = c1._1
        val e2 = c2._1
        val esum = e1 + e2
        (esum, c1._2 + c2._2)
      })
    val Loss = loss2 / counte.toDouble
    Loss * 0.5
  }
}
           

2、ANN 模型

package NN

import breeze.linalg.{
  Matrix => BM,
  CSCMatrix => BSM,
  DenseMatrix => BDM,
  Vector => BV,
  DenseVector => BDV,
  SparseVector => BSV
}
import org.apache.spark.rdd.RDD

/**
 * label:目标矩陣
 * features:特征矩陣
 * predict_label:預測矩陣
 * error:誤差
 */
case class PredictNNLabel(label: BDM[Double], features: BDM[Double], predict_label: BDM[Double], error: BDM[Double]) extends Serializable

/**
 * NN(neural network)
 */

class NeuralNetModel(
  val config: NNConfig,
  val weights: Array[BDM[Double]]) extends Serializable {

  /**
   * 傳回預測結果
   *  傳回格式:(label, feature,  predict_label, error)
   */
  def predict(dataMatrix: RDD[(BDM[Double], BDM[Double])]): RDD[PredictNNLabel] = {
    val sc = dataMatrix.sparkContext
    val bc_nn_W = sc.broadcast(weights)
    val bc_config = sc.broadcast(config)
    // NNff是進行前向傳播
    // nn = nnff(nn, batch_x, batch_y);
    val train_nnff = NeuralNet.NNff(dataMatrix, bc_config, bc_nn_W)
    val predict = train_nnff.map { f =>
      val label = f._1.label
      val error = f._1.error
      val nnan = f._1.nna(bc_config.value.layer - 1)
      val nna1 = f._1.nna(0)(::, 1 to -1)
      PredictNNLabel(label, nna1, nnan, error)
    }
    predict
  }

  /**
   * 計算輸出誤差
   * 平均誤差;
   */
  def Loss(predict: RDD[PredictNNLabel]): Double = {
    val predict1 = predict.map(f => f.error)
    // error and loss
    // 輸出誤差計算
    val loss1 = predict1
    val (loss2, counte) = loss1.treeAggregate((0.0, 0L))(
      seqOp = (c, v) => {
        // c: (e, count), v: (m)
        val e1 = c._1
        val e2 = (v :* v).sum
        val esum = e1 + e2
        (esum, c._2 + 1)
      },
      combOp = (c1, c2) => {
        // c: (e, count)
        val e1 = c1._1
        val e2 = c2._1
        val esum = e1 + e2
        (esum, c1._2 + c2._2)
      })
    val Loss = loss2 / counte.toDouble
    Loss * 0.5
  }

}
           

3、測試函數代碼

package util

import java.util.Random
import breeze.linalg.{
  Matrix => BM,
  CSCMatrix => BSM,
  DenseMatrix => BDM,
  Vector => BV,
  DenseVector => BDV,
  SparseVector => BSV,
  axpy => brzAxpy,
  svd => brzSvd
}
import breeze.numerics.{
  exp => Bexp,
  cos => Bcos,
  tanh => Btanh
}
import scala.math.Pi

object RandSampleData extends Serializable {
  // Rosenbrock:
  //∑(100*(x(i+1)-x(i) 2) 2 + (x(i)-1) 2)
  // Rastrigin:
  //∑(x(i) 2 -10*cos(2*3.14*x(i))+10)
  // Sphere :
  //∑(x(i) 2)
  /**
   * 測試函數: Rosenbrock, Rastrigin
   * 随機生成n2維資料,并根據測試函數計算Y
   * n1 行,n2 列,b1 上限,b2 下限,function 計算函數
   */
  def RandM(
    n1: Int,
    n2: Int,
    b1: Double,
    b2: Double,
    function: String): BDM[Double] = {
    //    val n1 = 2
    //    val n2 = 3
    //    val b1 = -30
    //    val b2 = 30
    val bdm1 = BDM.rand(n1, n2) * (b2 - b1).toDouble + b1.toDouble
    val bdm_y = function match {
      case "rosenbrock" =>
        val xi0 = bdm1(::, 0 to (bdm1.cols - 2))
        val xi1 = bdm1(::, 1 to (bdm1.cols - 1))
        val xi2 = (xi0 :* xi0)
        val m1 = ((xi1 - xi2) :* (xi1 - xi2)) * 100.0 + ((xi0 - 1.0) :* (xi0 - 1.0))
        val m2 = m1 * BDM.ones[Double](m1.cols, 1)
        m2
      case "rastrigin" =>
        val xi0 = bdm1
        val xi2 = (xi0 :* xi0)
        val sicos = Bcos(xi0 * 2.0 * Pi) * 10.0
        val m1 = xi2 - sicos + 10.0
        val m2 = m1 * BDM.ones[Double](m1.cols, 1)
        m2
      case "sphere" =>
        val xi0 = bdm1
        val xi2 = (xi0 :* xi0)
        val m1 = xi2
        val m2 = m1 * BDM.ones[Double](m1.cols, 1)
        m2
    }
    val randm = BDM.horzcat(bdm_y, bdm1)
    randm
  }
}
           

4、執行個體代碼

package tests

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.linalg.{ Vector, Vectors }
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.regression.LabeledPoint
import breeze.linalg.{
  Matrix => BM,
  CSCMatrix => BSM,
  DenseMatrix => BDM,
  Vector => BV,
  DenseVector => BDV,
  SparseVector => BSV,
  axpy => brzAxpy,
  svd => brzSvd,
  max => Bmax,
  min => Bmin,
  sum => Bsum
}
import scala.collection.mutable.ArrayBuffer
import NN.NeuralNet
import util.RandSampleData

object Test_example_NN {

  def main(args: Array[String]) {
    //1 建構Spark對象
    val conf = new SparkConf().setAppName("NNtest")
    val sc = new SparkContext(conf)

    //*****************************例1(基于經典優化算法測試函數随機生成樣本)*****************************// 
    //2 随機生成測試資料
    // 随機數生成
    Logger.getRootLogger.setLevel(Level.WARN)
    val sample_n1 = 1000
    val sample_n2 = 5
    val randsamp1 = RandSampleData.RandM(sample_n1, sample_n2, -10, 10, "sphere")
    // 歸一化[0 1]
    val normmax = Bmax(randsamp1(::, breeze.linalg.*))
    val normmin = Bmin(randsamp1(::, breeze.linalg.*))
    val norm1 = randsamp1 - (BDM.ones[Double](randsamp1.rows, 1)) * normmin
    val norm2 = norm1 :/ ((BDM.ones[Double](norm1.rows, 1)) * (normmax - normmin))
    // 轉換樣本train_d
    val randsamp2 = ArrayBuffer[BDM[Double]]()
    for (i <- 0 to sample_n1 - 1) {
      val mi = norm2(i, ::)
      val mi1 = mi.inner
      val mi2 = mi1.toArray
      val mi3 = new BDM(1, mi2.length, mi2)
      randsamp2 += mi3
    }
    val randsamp3 = sc.parallelize(randsamp2, 10)
    sc.setCheckpointDir("hdfs://192.168.180.79:9000/user/huangmeiling/checkpoint")
    randsamp3.checkpoint()
    val train_d = randsamp3.map(f => (new BDM(1, 1, f(::, 0).data), f(::, 1 to -1)))
    //3 設定訓練參數,建立模型
    // opts:疊代步長,疊代次數,交叉驗證比例
    val opts = Array(100.0, 50.0, 0.0)
    train_d.cache
    val numExamples = train_d.count()
    println(s"numExamples = $numExamples.")
    val NNmodel = new NeuralNet().
      setSize(Array(5, 7, 1)).
      setLayer(3).
      setActivation_function("tanh_opt").
      setLearningRate(2.0).
      setScaling_learningRate(1.0).
      setWeightPenaltyL2(0.0).
      setNonSparsityPenalty(0.0).
      setSparsityTarget(0.05).
      setInputZeroMaskedFraction(0.0).
      setDropoutFraction(0.0).
      setOutput_function("sigm").
      NNtrain(train_d, opts)

    //4 模型測試
    val NNforecast = NNmodel.predict(train_d)
    val NNerror = NNmodel.Loss(NNforecast)
    println(s"NNerror = $NNerror.")
    val printf1 = NNforecast.map(f => (f.label.data(0), f.predict_label.data(0))).take(20)
    println("預測結果——實際值:預測值:誤差")
    for (i <- 0 until printf1.length)
      println(printf1(i)._1 + "\t" + printf1(i)._2 + "\t" + (printf1(i)._2 - printf1(i)._1))
    println("權重W{1}")
    val tmpw0 = NNmodel.weights(0)
    for (i <- 0 to tmpw0.rows - 1) {
      for (j <- 0 to tmpw0.cols - 1) {
        print(tmpw0(i, j) + "\t")
      }
      println()
    }
    println("權重W{2}")
    val tmpw1 = NNmodel.weights(1)
    for (i <- 0 to tmpw1.rows - 1) {
      for (j <- 0 to tmpw1.cols - 1) {
        print(tmpw1(i, j) + "\t")
      }
      println()
    }

    //        val tmpxy = train_d.map(f => (f._1.toArray, f._2.toArray)).toArray.map { f => ((new ArrayBuffer() ++ f._1) ++ f._2).toArray }
    //        for (i <- 0 to tmpxy.length - 1) {
    //          for (j <- 0 to tmpxy(i).length - 1) {
    //            print(tmpxy(i)(j) + "\t")
    //          }
    //          println()
    //        }

    //*****************************例2(讀取固定樣本:來源于經典優化算法測試函數Sphere Model)*****************************// 
    //    //2 讀取樣本資料,
    //    Logger.getRootLogger.setLevel(Level.WARN)
    //    val data_path = "hdfs://192.168.180.79:9000/user/huangmeiling/deeplearn/data1"
    //    val examples = sc.textFile(data_path).cache()
    //    val train_d1 = examples.map { line =>
    //      val f1 = line.split("\t")
    //      val f = f1.map(f => f.toDouble)
    //      val id = f(0)
    //      val y = Array(f(1))
    //      val x = f.slice(2, f.length)
    //      (id, new BDM(1, y.length, y), new BDM(1, x.length, x))
    //    }
    //    val train_d = train_d1
    //    val opts = Array(100.0, 20.0, 0.0)
    //    //3 設定訓練參數,建立模型
    //    val NNmodel = new NeuralNet().
    //      setSize(Array(5, 7, 1)).
    //      setLayer(3).
    //      setActivation_function("tanh_opt").
    //      setLearningRate(2.0).
    //      setScaling_learningRate(1.0).
    //      setWeightPenaltyL2(0.0).
    //      setNonSparsityPenalty(0.0).
    //      setSparsityTarget(0.0).
    //      setOutput_function("sigm").
    //      NNtrain(train_d, opts)
    //
    //    //4 模型測試
    //    val NNforecast = NNmodel.predict(train_d.map(f => (f._2, f._3)))
    //    val NNerror = NNmodel.Loss(NNforecast)
    //    println(s"NNerror = $NNerror.")
    //    val printf1 = NNforecast.map(f => (f.label.data(0), f.predict_label.data(0))).take(200)
    //    println("預測結果——實際值:預測值:誤差")
    //    for (i <- 0 until printf1.length)
    //      println(printf1(i)._1 + "\t" + printf1(i)._2 + "\t" + (printf1(i)._2 - printf1(i)._1))
    //    println("權重W{1}")
    //    val tmpw0 = NNmodel.weights(0)
    //    for (i <- 0 to tmpw0.rows - 1) {
    //      for (j <- 0 to tmpw0.cols - 1) {
    //        print(tmpw0(i, j) + "\t")
    //      }
    //      println()
    //    }
    //    println("權重W{2}")
    //    val tmpw1 = NNmodel.weights(1)
    //    for (i <- 0 to tmpw1.rows - 1) {
    //      for (j <- 0 to tmpw1.cols - 1) {
    //        print(tmpw1(i, j) + "\t")
    //      }
    //      println()
    //    }

    //*****************************例3(讀取SparkMlib資料)*****************************//       
    //例2 讀取樣本資料,轉化:[y1,[x1 x2  x10]] => ([y1 y2],[x1 x2...x10])
    //    val data_path = "file:/home/jb-huangmeiling/data/sample_linear_regression_data.txt"
    //    val examples = MLUtils.loadLibSVMFile(sc, data_path).cache()
    //    val train_d1 = examples.map { f =>
    //      LabeledPoint(f.label, Vectors.dense(f.features.toArray))
    //    }
    //    val opts = Array(100.0, 100.0, 0.0)
    //    val train_d = train_d1.map(f => (BDM((f.label, f.label * 0.5 + 2.0)), BDM(f.features.toArray)))
    //    val numExamples = train_d.count()
    //    println(s"numExamples = $numExamples.")

  }
}
           

代碼和資料位址網盤:

http://pan.baidu.com/s/1c1J8ZN6

繼續閱讀