天天看点

Apache Spark:用于数据工程、数据科学和机器学习的多语言引擎

作者:运维开发木子李
Apache Spark:用于数据工程、数据科学和机器学习的多语言引擎

什么是 Apache Spark™?

Apache Spark ™是一种多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习。

Apache Spark:用于数据工程、数据科学和机器学习的多语言引擎

使用“pip”安装

$ pip 安装 pyspark

$ pyspark

使用官方 Docker 镜像

$ docker run -it --rm Spark:python3 /opt/spark/bin/pyspark

快速开始

df = spark.read.csv("accounts.csv", header=True)

# Select subset of features and filter for balance > 0
filtered_df = df.select("AccountBalance", "CountOfDependents").filter("AccountBalance > 0")

# Generate summary statistics
filtered_df.summary().show()           

机器学习

# Every record contains a label and feature vector
df = spark.createDataFrame(data, ["label", "features"])

# Split the data into train/test datasets
train_df, test_df = df.randomSplit([.80, .20], seed=42)

# Set hyperparameters for the algorithm
rf = RandomForestRegressor(numTrees=100)

# Fit the model to the training data
model = rf.fit(train_df)

# Generate predictions on the test dataset.
model.transform(test_df).show()           

分析与数据科学

df = spark.read.csv("accounts.csv", header=True)

# Select subset of features and filter for balance > 0
filtered_df = df.select("AccountBalance", "CountOfDependents").filter("AccountBalance > 0")

# Generate summary statistics
filtered_df.summary().show()           

Spark SQL 引擎:底层

Apache Spark ™构建在适用于大规模数据的高级分布式 SQL 引擎之上

自适应查询执行

Spark SQL会在运行时调整执行计划,例如自动设置reducer的数量和join算法。

支持 ANSI SQL

使用您已经熟悉的相同 SQL。

结构化和非结构化数据

Spark SQL 适用于结构化表和非结构化数据(例如 JSON 或图像)。

Apache Spark:用于数据工程、数据科学和机器学习的多语言引擎

案例

文字搜索

textFile = sc.textFile("hdfs://...")

# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()           

简单的数据操作

在此示例中,我们读取数据库中存储的表并计算每个年龄段的人数。最后,我们将计算结果以JSON格式保存到S3中。示例中使用了一个简单的 MySQL 表“people”,该表有两列:“name”和“age”。

# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
  .read \
  .format("jdbc") \
  .option("url", url) \
  .option("dbtable", "people") \
  .load()

# Looks the schema of this DataFrame.
df.printSchema()

# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()

# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")           

使用逻辑回归进行预测(python)

# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()           

项目地址:

https://github.com/apache/spark

继续阅读