什麼是 Apache Spark™?
Apache Spark ™是一種多語言引擎,用于在單節點機器或叢集上執行資料工程、資料科學和機器學習。
使用“pip”安裝
$ pip 安裝 pyspark
$ pyspark
使用官方 Docker 鏡像
$ docker run -it --rm Spark:python3 /opt/spark/bin/pyspark
快速開始
df = spark.read.csv("accounts.csv", header=True)
# Select subset of features and filter for balance > 0
filtered_df = df.select("AccountBalance", "CountOfDependents").filter("AccountBalance > 0")
# Generate summary statistics
filtered_df.summary().show()
機器學習
# Every record contains a label and feature vector
df = spark.createDataFrame(data, ["label", "features"])
# Split the data into train/test datasets
train_df, test_df = df.randomSplit([.80, .20], seed=42)
# Set hyperparameters for the algorithm
rf = RandomForestRegressor(numTrees=100)
# Fit the model to the training data
model = rf.fit(train_df)
# Generate predictions on the test dataset.
model.transform(test_df).show()
分析與資料科學
df = spark.read.csv("accounts.csv", header=True)
# Select subset of features and filter for balance > 0
filtered_df = df.select("AccountBalance", "CountOfDependents").filter("AccountBalance > 0")
# Generate summary statistics
filtered_df.summary().show()
Spark SQL 引擎:底層
Apache Spark ™建構在适用于大規模資料的進階分布式 SQL 引擎之上
自适應查詢執行
Spark SQL會在運作時調整執行計劃,例如自動設定reducer的數量和join算法。
支援 ANSI SQL
使用您已經熟悉的相同 SQL。
結構化和非結構化資料
Spark SQL 适用于結構化表和非結構化資料(例如 JSON 或圖像)。
案例
文字搜尋
textFile = sc.textFile("hdfs://...")
# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()
簡單的資料操作
在此示例中,我們讀取資料庫中存儲的表并計算每個年齡段的人數。最後,我們将計算結果以JSON格式儲存到S3中。示例中使用了一個簡單的 MySQL 表“people”,該表有兩列:“name”和“age”。
# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", "people") \
.load()
# Looks the schema of this DataFrame.
df.printSchema()
# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()
# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
使用邏輯回歸進行預測(python)
# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])
# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)
# Fit the model to the data.
model = lr.fit(df)
# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
項目位址:
https://github.com/apache/spark