天天看點

送出spark程式

在hadoop環境下執行spark程式,使用spark-submit送出jar

package com.spark.classfication;

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.*;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;

import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

import java.util.LinkedList;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;

public class Classfication {
	public static void main(String[] args) {
		svm();
		kmeans();
		pca();
		gaus();
	}

	public static void svm() {
		SparkConf conf = new SparkConf().setAppName("SVM Classifier Example");//.setMaster("local");
		// SparkConf conf = new SparkConf().setAppName("Spark Pi").setMaster("spark://10.10.1.5:7070").setJars(new List("out\\artifacts\\sparkTest_jar\\sparkTest.jar"));
		SparkContext sc = new SparkContext(conf);
		String path = "/tmp/data/mllib/sample_libsvm_data.txt";
		JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();

		// Split initial RDD into two... [60% training data, 40% testing data].
		JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L);
		training.cache();
		JavaRDD<LabeledPoint> test = data.subtract(training);

		// Run training algorithm to build the model.
		int numIterations = 100;
		final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);

		// Clear the default threshold.
		model.clearThreshold();

		// Compute raw scores on the test set.
		JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(new Function<LabeledPoint, Tuple2<Object, Object>>() {
			public Tuple2<Object, Object> call(LabeledPoint p) {
				Double score = model.predict(p.features());
				return new Tuple2<Object, Object>(score, p.label());
			}
		});

		// Get evaluation metrics.
		BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
		double auROC = metrics.areaUnderROC();

		System.out.println("Area under ROC = " + auROC);
	}

	public static void kmeans() {
		/* Kmean */
		SparkConf conf = new SparkConf().setAppName("K-means Example");
		JavaSparkContext sc = new JavaSparkContext(conf);
		// Load and parse data
		String path = "/tmp/data/model/123.txt";
		JavaRDD<String> data = sc.textFile(path);
		JavaRDD<Vector> parsedData = data.map(new Function<String, Vector>() {
			public Vector call(String s) {
				String[] sarray = s.split(" ");
				double[] values = new double[sarray.length];
				for (int i = 0; i < sarray.length; i++)
					values[i] = Double.parseDouble(sarray[i]);
				return Vectors.dense(values);
			}
		});
		parsedData.cache();
		// Cluster the data into two classes using KMeans
		int numClusters = 2;
		int numIterations = 20;
		KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
		// Evaluate clustering by computing Within Set Sum of Squared Errors
		double WSSSE = clusters.computeCost(parsedData.rdd());
		System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
		for (Vector v : clusters.clusterCenters()) {
			System.out.println(" " + v);
		}
		System.out.println(
				"Prediction of (1.1, 2.1, 3.1): " + clusters.predict(Vectors.dense(new double[] { 1.1, 2.1, 3.1 })));
		System.out.println("Prediction of (10.1, 9.1, 11.1): "
				+ clusters.predict(Vectors.dense(new double[] { 10.1, 9.1, 11.1 })));
		System.out.println("Prediction of (21.1, 17.1, 16.1): "
				+ clusters.predict(Vectors.dense(new double[] { 21.1, 17.1, 16.1 })));

		// Save and load model
		// clusters.save(sc.sc(), "myModelPath");
		// KMeansModel sameModel = KMeansModel.load(sc.sc(), "myModelPath");
	}

	public static void gaus() {
		/** GaussianMixture **/
		SparkConf conf = new SparkConf().setAppName("GaussianMixture Example");
		JavaSparkContext sc = new JavaSparkContext(conf);

		// Load and parse data
		String path = "/tmp/data/mllib/gmm_data.txt";
		JavaRDD<String> data = sc.textFile(path);
		JavaRDD<Vector> parsedData = data.map(new Function<String, Vector>() {
			public Vector call(String s) {
				String[] sarray = s.trim().split(" ");
				double[] values = new double[sarray.length];
				for (int i = 0; i < sarray.length; i++)
					values[i] = Double.parseDouble(sarray[i]);
				return Vectors.dense(values);
			}
		});
		parsedData.cache();

		// Cluster the data into two classes using GaussianMixture
		GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());

		// Save and load GaussianMixtureModel
		// gmm.save(sc.sc(), "myGMMModel");
		// GaussianMixtureModel sameModel = GaussianMixtureModel.load(sc.sc(),
		// "myGMMModel");
		// Output the parameters of the mixture model
		for (int j = 0; j < gmm.k(); j++) {
			System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n", gmm.weights()[j], gmm.gaussians()[j].mu(),
					gmm.gaussians()[j].sigma());
		}
	}

	public static void pca() {
		SparkConf conf = new SparkConf().setAppName("PCA Example");// .setMaster("local");
		SparkContext sc = new SparkContext(conf);

		double[][] array = new double[][] { { 40.4, 24.7, 7.2, 6.1, 8.3, 8.7, 2.442, 20.0 },
				{ 25.0, 12.7, 11.2, 11.0, 12.9, 20.2, 3.542, 9.1 }, { 13.2, 3.3, 3.9, 4.3, 4.4, 5.5, 0.578, 3.6 },
				{ 22.3, 6.7, 5.6, 3.7, 6.0, 7.4, 0.176, 7.3 }, { 34.3, 11.8, 7.1, 7.1, 8.0, 8.9, 1.726, 27.5 },
				{ 35.6, 12.5, 16.4, 16.7, 22.8, 29.3, 3.017, 26.6 }, { 22.0, 7.8, 9.9, 10.2, 12.6, 17.6, 0.847, 10.6 },
				{ 48.4, 13.4, 10.9, 9.9, 10.9, 13.9, 1.772, 1.772 },
				{ 40.6, 19.1, 19.8, 19.0, 29.7, 39.6, 2.449, 35.8 }, { 24.8, 8.0, 9.8, 8.9, 11.9, 16.2, 0.789, 13.7 },
				{ 12.5, 9.7, 4.2, 4.2, 4.6, 6.5, 0.874, 3.9 }, { 1.8, 0.6, 0.7, 0.7, 0.8, 1.1, 0.056, 1.0 },
				{ 32.3, 13.9, 9.4, 8.3, 9.8, 13.3, 2.126, 17.1 }, { 38.5, 9.1, 11.3, 9.5, 12.2, 16.4, 1.327, 11.6 },
				{ 26.2, 10.1, 5.6, 15.6, 7.7, 30.1, 0.126, 25.9 } };

		LinkedList<Vector> rowsList = new LinkedList<Vector>();
		for (int i = 0; i < array.length; i++) {
			Vector currentRow = Vectors.dense(array[i]);
			rowsList.add(currentRow);
		}
		JavaRDD<Vector> rows = JavaSparkContext.fromSparkContext(sc).parallelize(rowsList);

		// Create a RowMatrix from JavaRDD<Vector>.
		RowMatrix mat = new RowMatrix(rows.rdd());

		// Compute the top 3 principal components.
		Matrix pc = mat.computePrincipalComponents(3);
		RowMatrix projected = mat.multiply(pc);
		System.out.println("Hello World!");
	}
}
           

所使用的pom.xml引用包

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>AntiSpam</groupId>
  <artifactId>ExtractData</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>ExtractData</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

	<dependencies>
	<dependency>
		<groupId>junit</groupId>
		<artifactId>junit</artifactId>
		<version>3.8.1</version>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-mapreduce-client-core</artifactId>
		<version>2.3.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-hdfs</artifactId>
		<version>2.3.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-common</artifactId>
		<version>2.3.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hive</groupId>
		<artifactId>hive-jdbc</artifactId>
		<version>1.1.1</version>
	</dependency>
	<dependency>
		<groupId>jdk.tools</groupId>
		<artifactId>jdk.tools</artifactId>
		<version>1.6</version>
		<scope>system</scope>
		<systemPath>C:/Program Files/Java/jdk1.8.0_73/lib/tools.jar</systemPath>
	</dependency>	
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-mllib_2.10</artifactId>
		<version>1.6.1</version>
	</dependency>
</dependencies>
<!-- <build>
		<resources>
			<resource>
				<directory>conf/</directory>
				<includes>
					<include>**/*.properties</include>
					<include>**/*.xml</include>
				</includes>
				<filtering>false</filtering>
			</resource>
		</resources>
		<plugins>
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<configuration>
					<appendAssemblyId>false</appendAssemblyId>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<archive>
						<manifest>
							<mainClass>AntiSpam.ExtractData.AntiSpam</mainClass>
						</manifest>
					</archive>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
		</build> -->
</project>
           

mvn clean;mvn package

spark-submit  Gaus.jar

繼續閱讀