天天看點

MongoDB Spark Connector 實戰指南count.pymongo-spark-test.pyCreate Spark SessionRead from MongoDBFilter and WriteUse SQLdf.createOrReplaceTempView("temp")some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")some_fruit.show()

Why Spark with MongoDB?

高性能,官方号稱 100x faster,因為可以全記憶體運作,性能提升肯定是很明顯的

簡單易用,支援 Java、Python、Scala、SQL 等多種語言,使得建構分析應用非常簡單

統一建構 ,支援多種資料源,通過 Spark RDD 屏蔽底層資料差異,同一個分析應用可運作于不同的資料源;

應用場景廣泛,能同時支援批處理以及流式處理

MongoDB Spark Connector 為官方推出,用于适配 Spark 操作 MongoDB 資料;本文以 Python 為例,介紹 MongoDB Spark Connector 的使用,幫助你基于 MongoDB 建構第一個分析應用。

準備 MongoDB 環境

安裝 MongoDB 參考 Install MongoDB Community Edition on Linux

mkdir mongodata

mongod --dbpath mongodata --port 9555

準備 Spark python 環境

參考 PySpark - Quick Guide

下載下傳 Spark

cd /home/mongo-spark

wget

tar zxvf spark-2.4.4-bin-hadoop2.7.tgz

設定 Spark 環境變量

export SPARK_HOME=/home/mongo-spark/spark-2.4.4-bin-hadoop2.7

export PATH=$PATH:/home/mongo-spark/spark-2.4.4-bin-hadoop2.7/bin

export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH

export PATH=$SPARK_HOME/python:$PATH

運作 Spark RDD 示例

count.py

from pyspark import SparkContext

sc = SparkContext("local", "count app")

words = sc.parallelize (

["scala",

"java",

"hadoop",

"spark",

"akka",

"spark vs hadoop",

"pyspark",

"pyspark and spark"]

)

counts = words.count()

$SPARK_HOME/bin/spark-submit count.py

Number of elements in RDD → 8

如果上述程式運作成功,說明 Spark python 環境準備成功,還可以測試 Spark 的其他 RDD 操作,比如 collector、filter、map、reduce、join 等,更多

買QQ

示例參考 PySpark - Quick Guide

Spark 操作 MongoDB 資料

參考 Spark Connector Python Guide

準備測試資料 test.coll01 插入3條測試資料,test.coll02 未空

mongo --port 9555

db.coll01.find()

{ "_id" : 1, "type" : "apple", "qty" : 5 }

{ "_id" : 2, "type" : "orange", "qty" : 10 }

{ "_id" : 3, "type" : "banana", "qty" : 15 }

db.coll02.find()

準備操作腳本,将輸入集合的資料按條件進行過濾,寫到輸出集合

mongo-spark-test.py

from pyspark.sql import SparkSession

Create Spark Session

spark = SparkSession \

.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1:9555/test.coll01") \
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1:9555/test.coll") \
.getOrCreate()

           

Read from MongoDB

df = spark.read.format("mongo").load()

df.show()

Filter and Write

df.filter(df['qty'] >= 10).write.format("mongo").mode("append").save()

Use SQL

df.createOrReplaceTempView("temp")

some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")

some_fruit.show()

運作腳本

$SPARK_HOME/bin/spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 mongo-spark-test.py

{ "_id" : 2, "qty" : 10, "type" : "orange" }

{ "_id" : 3, "qty" : 15, "type" : "banana" }