Spark 应用程序参数解析
在编写Spark 应用程序时,往往我们需要传递很多参数给main函数,比如如下所示: 我们定义的main函数 --> def main(args: Array[String]) 比如我们需要传递如下几个参数: rank ,numIterations ,lambda 为了方便,我们先定义传入参数的顺序,直接把值写在调用程序的后面,但是存在一个问题,如果参数过多,很容易写错顺序,而且对于有些参数有默认值也不太好处理,所 以通过参数解析是一个很好的解决方案。但是如果自己写参数解析模块费时而且也不可能太完善,接下来展示spark给我们写的参数解析模块的使用样例。
- 定义程序需要的参数列表,比如如下所示:
case class Params(
input: String = null,
kryo: Boolean = false,
numIterations: Int = 20,
lambda: Double = 1.0,
rank: Int = 10,
numUserBlocks: Int = -1,
numProductBlocks: Int = -1,
implicitPrefs: Boolean = false) extends AbstractParams[Params]
上面是scala 语言,定义的样本Params 类(到时替换成你自己需要的参数), 继承的AbstractParams 类属于下面这个包里面
<span style="white-space:pre"> </span>package org.apache.spark.examples.mllib
</pre><pre name="code" class="html"> 2.实例化一个参数类
<span style="white-space:pre"> </span><pre name="code" class="html"><span style="white-space:pre"> </span>val defaultParams = Params()
3. 实例化一个参数解析器
<span style="white-space:pre"> </span><pre name="code" class="plain">val parser = new OptionParser[Params]("MovieLensALS") {
head("MovieLensALS: an example app for ALS on MovieLens data.")
opt[Int]("rank")
.text(s"rank, default: ${defaultParams.rank}")
.action((x, c) => c.copy(rank = x))
opt[Int]("numIterations")
.text(s"number of iterations, default: ${defaultParams.numIterations}")
.action((x, c) => c.copy(numIterations = x))
opt[Double]("lambda")
.text(s"lambda (smoothing constant), default: ${defaultParams.lambda}")
.action((x, c) => c.copy(lambda = x))
opt[Unit]("kryo")
.text("use Kryo serialization")
.action((_, c) => c.copy(kryo = true))
opt[Int]("numUserBlocks")
.text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)")
.action((x, c) => c.copy(numUserBlocks = x))
opt[Int]("numProductBlocks")
.text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)")
.action((x, c) => c.copy(numProductBlocks = x))
opt[Unit]("implicitPrefs")
.text("use implicit preference")
.action((_, c) => c.copy(implicitPrefs = true))
arg[String]("<input>")
.required()
.text("input paths to a MovieLens dataset of ratings")
.action((x, c) => c.copy(input = x))
note(
"""
|For example, the following command runs this app on a synthetic dataset:
|
| bin/spark-submit --class org.apache.spark.examples.mllib.MovieLensALS \
| examples/target/scala-*/spark-examples-*.jar \
| --rank 5 --numIterations 20 --lambda 1.0 --kryo \
| data/mllib/sample_movielens_data.txt
""".stripMargin)
}
</pre><pre name="code" class="plain"> 4. 通过parser 进行参数解析
<span style="white-space:pre"> </span><pre name="code" class="html"><span style="white-space:pre"> </span>parser.parse(args, defaultParams).map { params =>
<span style="white-space:pre"> </span>run(params)
<span style="white-space:pre"> </span>} getOrElse {
<span style="white-space:pre"> </span>System.exit(1)
<span style="white-space:pre"> </span>}
<span style="white-space:pre"> </span>如果解析成功,则params 则赋值为输入的参数值,不再是默认值。
5. 下面是一个详细的列子
<span style="white-space:pre"> </span><pre name="code" class="plain">/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.mllib
import scala.collection.mutable
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
/**
* An example app for ALS on MovieLens data (http://grouplens.org/datasets/movielens/).
* Run with
* {{{
* bin/run-example org.apache.spark.examples.mllib.MovieLensALS
* }}}
* A synthetic dataset in MovieLens format can be found at `data/mllib/sample_movielens_data.txt`.
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object MovieLensALS {
case class Params(
input: String = null,
kryo: Boolean = false,
numIterations: Int = 20,
lambda: Double = 1.0,
rank: Int = 10,
numUserBlocks: Int = -1,
numProductBlocks: Int = -1,
implicitPrefs: Boolean = false) extends AbstractParams[Params]
def main(args: Array[String]) {
val defaultParams = Params()
val parser = new OptionParser[Params]("MovieLensALS") {
head("MovieLensALS: an example app for ALS on MovieLens data.")
opt[Int]("rank")
.text(s"rank, default: ${defaultParams.rank}")
.action((x, c) => c.copy(rank = x))
opt[Int]("numIterations")
.text(s"number of iterations, default: ${defaultParams.numIterations}")
.action((x, c) => c.copy(numIterations = x))
opt[Double]("lambda")
.text(s"lambda (smoothing constant), default: ${defaultParams.lambda}")
.action((x, c) => c.copy(lambda = x))
opt[Unit]("kryo")
.text("use Kryo serialization")
.action((_, c) => c.copy(kryo = true))
opt[Int]("numUserBlocks")
.text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)")
.action((x, c) => c.copy(numUserBlocks = x))
opt[Int]("numProductBlocks")
.text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)")
.action((x, c) => c.copy(numProductBlocks = x))
opt[Unit]("implicitPrefs")
.text("use implicit preference")
.action((_, c) => c.copy(implicitPrefs = true))
arg[String]("<input>")
.required()
.text("input paths to a MovieLens dataset of ratings")
.action((x, c) => c.copy(input = x))
note(
"""
|For example, the following command runs this app on a synthetic dataset:
|
| bin/spark-submit --class org.apache.spark.examples.mllib.MovieLensALS \
| examples/target/scala-*/spark-examples-*.jar \
| --rank 5 --numIterations 20 --lambda 1.0 --kryo \
| data/mllib/sample_movielens_data.txt
""".stripMargin)
}
parser.parse(args, defaultParams).map { params =>
run(params)
} getOrElse {
System.exit(1)
}
}
def run(params: Params) {
val conf = new SparkConf().setAppName(s"MovieLensALS with $params")
if (params.kryo) {
conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating]))
.set("spark.kryoserializer.buffer", "8m")
}
val sc = new SparkContext(conf)
Logger.getRootLogger.setLevel(Level.WARN)
val implicitPrefs = params.implicitPrefs
val ratings = sc.textFile(params.input).map { line =>
val fields = line.split("::")
if (implicitPrefs) {
/*
* MovieLens ratings are on a scale of 1-5:
* 5: Must see
* 4: Will enjoy
* 3: It's okay
* 2: Fairly bad
* 1: Awful
* So we should not recommend a movie if the predicted rating is less than 3.
* To map ratings to confidence scores, we use
* 5 -> 2.5, 4 -> 1.5, 3 -> 0.5, 2 -> -0.5, 1 -> -1.5. This mappings means unobserved
* entries are generally between It's okay and Fairly bad.
* The semantics of 0 in this expanded world of non-positive weights
* are "the same as never having interacted at all".
*/
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)
} else {
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
}
}.cache()
val numRatings = ratings.count()
val numUsers = ratings.map(_.user).distinct().count()
val numMovies = ratings.map(_.product).distinct().count()
println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.")
val splits = ratings.randomSplit(Array(0.8, 0.2))
val training = splits(0).cache()
val test = if (params.implicitPrefs) {
/*
* 0 means "don't know" and positive values mean "confident that the prediction should be 1".
* Negative values means "confident that the prediction should be 0".
* We have in this case used some kind of weighted RMSE. The weight is the absolute value of
* the confidence. The error is the difference between prediction and either 1 or 0,
* depending on whether r is positive or negative.
*/
splits(1).map(x => Rating(x.user, x.product, if (x.rating > 0) 1.0 else 0.0))
} else {
splits(1)
}.cache()
val numTraining = training.count()
val numTest = test.count()
println(s"Training: $numTraining, test: $numTest.")
ratings.unpersist(blocking = false)
val model = new ALS()
.setRank(params.rank)
.setIterations(params.numIterations)
.setLambda(params.lambda)
.setImplicitPrefs(params.implicitPrefs)
.setUserBlocks(params.numUserBlocks)
.setProductBlocks(params.numProductBlocks)
.run(training)
val rmse = computeRmse(model, test, params.implicitPrefs)
println(s"Test RMSE = $rmse.")
sc.stop()
}
/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean)
: Double = {
def mapPredictedRating(r: Double): Double = {
if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
}
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map { x =>
((x.user, x.product), mapPredictedRating(x.rating))
}.join(data.map(x => ((x.user, x.product), x.rating))).values
math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())
}
}
// scalastyle:on println