天天看點

Apache Spark:用于資料工程、資料科學和機器學習的多語言引擎

作者:運維開發木子李
Apache Spark:用于資料工程、資料科學和機器學習的多語言引擎

什麼是 Apache Spark™?

Apache Spark ™是一種多語言引擎,用于在單節點機器或叢集上執行資料工程、資料科學和機器學習。

Apache Spark:用于資料工程、資料科學和機器學習的多語言引擎

使用“pip”安裝

$ pip 安裝 pyspark

$ pyspark

使用官方 Docker 鏡像

$ docker run -it --rm Spark:python3 /opt/spark/bin/pyspark

快速開始

df = spark.read.csv("accounts.csv", header=True)

# Select subset of features and filter for balance > 0
filtered_df = df.select("AccountBalance", "CountOfDependents").filter("AccountBalance > 0")

# Generate summary statistics
filtered_df.summary().show()           

機器學習

# Every record contains a label and feature vector
df = spark.createDataFrame(data, ["label", "features"])

# Split the data into train/test datasets
train_df, test_df = df.randomSplit([.80, .20], seed=42)

# Set hyperparameters for the algorithm
rf = RandomForestRegressor(numTrees=100)

# Fit the model to the training data
model = rf.fit(train_df)

# Generate predictions on the test dataset.
model.transform(test_df).show()           

分析與資料科學

df = spark.read.csv("accounts.csv", header=True)

# Select subset of features and filter for balance > 0
filtered_df = df.select("AccountBalance", "CountOfDependents").filter("AccountBalance > 0")

# Generate summary statistics
filtered_df.summary().show()           

Spark SQL 引擎:底層

Apache Spark ™建構在适用于大規模資料的進階分布式 SQL 引擎之上

自适應查詢執行

Spark SQL會在運作時調整執行計劃,例如自動設定reducer的數量和join算法。

支援 ANSI SQL

使用您已經熟悉的相同 SQL。

結構化和非結構化資料

Spark SQL 适用于結構化表和非結構化資料(例如 JSON 或圖像)。

Apache Spark:用于資料工程、資料科學和機器學習的多語言引擎

案例

文字搜尋

textFile = sc.textFile("hdfs://...")

# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()           

簡單的資料操作

在此示例中,我們讀取資料庫中存儲的表并計算每個年齡段的人數。最後,我們将計算結果以JSON格式儲存到S3中。示例中使用了一個簡單的 MySQL 表“people”,該表有兩列:“name”和“age”。

# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
  .read \
  .format("jdbc") \
  .option("url", url) \
  .option("dbtable", "people") \
  .load()

# Looks the schema of this DataFrame.
df.printSchema()

# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()

# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")           

使用邏輯回歸進行預測(python)

# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()           

項目位址:

https://github.com/apache/spark

繼續閱讀