天天看點

Spark運作任務

1. 啟動hadoop

sh start-dfs.sh

sh start-yarn.sh

2. 啟動spark

cd /appl/spark-1.4.0/

sbin/start-all.sh

3. 準備資料

hadoop fs -put /mk/test/kmeans_data.txt /test/

4. 編寫程式

Spark運作任務

Java

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.SparkConf;

/* Test:
 * sh start-dfs.sh
 * sh start-yarn.sh
 * cd /appl/spark-1.4.0/
 * sbin/start-all.sh
 * hadoop fs -put /mk/test/kmeans_data.txt /test/
 * ./bin/spark-submit /mk/test/KMeansSim.jar
 */
public class KMeansSim {
  public static void main(String[] args) {
	// environment initialization
    SparkConf conf = new SparkConf().setAppName("K-means Example");
    JavaSparkContext sc = new JavaSparkContext(conf);

    // Load and parse data (${SPARK_HOME}/data/mllib/kmeans_data.txt)
    String path = "/test/kmeans_data.txt";
    JavaRDD<String> data = sc.textFile(path);
    JavaRDD<Vector> parsedData = data.map(
      new Function<String, Vector>() {
        public Vector call(String s) {
          return Vectors.dense(toDoubleArray(s));
        }
      }
    );
    parsedData.cache();

    // Cluster the data into two classes using KMeans
    int numClusters = 2;
    int numIterations = 20;
    KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);

    // Evaluate clustering by computing Within Set Sum of Squared Errors
    double WSSSE = clusters.computeCost(parsedData.rdd());
    System.out.println("Within Set Sum of Squared Errors = " + WSSSE);

    // Save and load model
    clusters.save(sc.sc(), "myModelPath");
    KMeansModel sameModel = KMeansModel.load(sc.sc(), "myModelPath");
    
    // predict test
    System.out.println("~~~predict:" + clusters.predict(Vectors.dense(
    		toDoubleArray("1.0 2.1 3.8"))));
    
    // ending
    sc.stop();
  }
  
  // String to double[]
  public static double[] toDoubleArray(String s) {
	  String[] sarray = s.split(" ");
      double[] values = new double[sarray.length];
      for (int i = 0; i < sarray.length; i++)
        values[i] = Double.parseDouble(sarray[i]);
      return values;
  }
}
           

5. 運作

./bin/spark-submit /mk/test/KMeansSim.jar