什么是 Apache Spark™?
Apache Spark ™是一种多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习。
使用“pip”安装
$ pip 安装 pyspark
$ pyspark
使用官方 Docker 镜像
$ docker run -it --rm Spark:python3 /opt/spark/bin/pyspark
快速开始
df = spark.read.csv("accounts.csv", header=True)
# Select subset of features and filter for balance > 0
filtered_df = df.select("AccountBalance", "CountOfDependents").filter("AccountBalance > 0")
# Generate summary statistics
filtered_df.summary().show()
机器学习
# Every record contains a label and feature vector
df = spark.createDataFrame(data, ["label", "features"])
# Split the data into train/test datasets
train_df, test_df = df.randomSplit([.80, .20], seed=42)
# Set hyperparameters for the algorithm
rf = RandomForestRegressor(numTrees=100)
# Fit the model to the training data
model = rf.fit(train_df)
# Generate predictions on the test dataset.
model.transform(test_df).show()
分析与数据科学
df = spark.read.csv("accounts.csv", header=True)
# Select subset of features and filter for balance > 0
filtered_df = df.select("AccountBalance", "CountOfDependents").filter("AccountBalance > 0")
# Generate summary statistics
filtered_df.summary().show()
Spark SQL 引擎:底层
Apache Spark ™构建在适用于大规模数据的高级分布式 SQL 引擎之上
自适应查询执行
Spark SQL会在运行时调整执行计划,例如自动设置reducer的数量和join算法。
支持 ANSI SQL
使用您已经熟悉的相同 SQL。
结构化和非结构化数据
Spark SQL 适用于结构化表和非结构化数据(例如 JSON 或图像)。
案例
文字搜索
textFile = sc.textFile("hdfs://...")
# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()
简单的数据操作
在此示例中,我们读取数据库中存储的表并计算每个年龄段的人数。最后,我们将计算结果以JSON格式保存到S3中。示例中使用了一个简单的 MySQL 表“people”,该表有两列:“name”和“age”。
# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", "people") \
.load()
# Looks the schema of this DataFrame.
df.printSchema()
# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()
# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
使用逻辑回归进行预测(python)
# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])
# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)
# Fit the model to the data.
model = lr.fit(df)
# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
项目地址:
https://github.com/apache/spark