11.8建立PyDev項目
11.8.1依次選擇File->New->Project
11.8.2選擇PyDev->PyDev Project
11.8.3輸入項目名稱
11.8.4已經建立的項目
11.9加入WordCount.py程式
11.9.1加入新程式
11.9.2輸入程式檔案名
11.10輸入WordCount.py程式
11.10.1導入相關連結庫
文法 | 說明 |
---|---|
# -- coding:utf-8 -- | 設定為utf-8編碼 |
from pyspark import SparkContext | 導入SparkContext |
from pyspark import SparkConf | 導入SparkConf |
11.10.2 CreateSparkContext()
SparkContext是開放Spark應用程式的入口。當我們使用IPythonNotebook時,可以直接使用SparkContext。例如,sc.master。可是當我們開放Python Spark應用程式時,則必須自建SparkContext。後面所有程式都需要它。
def CreateSparkContext():
#setAppname為設定app名,此名稱會顯示在spark、yarn的web頁面 sparkConf=SparkConf().setAppName(“WordCounts”).set(“spark.ui.show ConsoleProgress”,“false”)
sc = SparkContext(conf = sparkConf)
print(“master=”+sc.master) # 顯示目前的運作模式
SetLogger(sc) # 設定不顯示過多資訊,函數在下面
SetPath(sc) # 設定檔案讀取路徑,函數在下面
return (sc)
代碼 | 說明 |
---|---|
def CreateSparkContext(): | 定義CreateSparkContext()函數 |
sparkConf = SparkConf() | 建立SparkConf配置設定對象 |
.setAppName(“WordCounts”) | 設定app名,此名稱會顯示在spark、yarn的web頁面 |
.set(“spark.ui.showConsoleProgress”,“false”) | 設定不要顯示Spark執行進度,以免螢幕顯示太亂 |
sc = SparkContext(conf = sparkConf) | 建立SparkContext傳入參數:SparkConf配置設定對象 |
print(“master=”+sc.master) | 顯示目前運作模式:local、YARN client或Spark Stand alone |
SetLogger(sc) | 設定不顯示過多資訊 |
SetPath(sc) | 設定檔案讀取路徑 |
return (sc) | 傳回SparkContext sc |
11.10.3設定不要顯示太多資訊SetLogger(sc)
def SetLogger(sc): # 去除spark預設顯示的雜亂資訊
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger(“org”).setLevel(logger.Level.ERROR) logger.LogManager.getLogger(“akka”).setLevel(logger.Level.ERROR) logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR)
11.10.4配置檔案讀取路徑SetPath(sc)
def SetPath(sc): # 配置讀取檔案的路徑(本地檔案和hdfs檔案)
global Path
if sc.master[0:5] == “local”:
# local模式讀取本地檔案
Path = “file:/home/hduser/pythonwork/PythonProject/”
else: # 其他模式(yarn等)讀取hdfs檔案
Path = “hdfs://master:9000/user/hduser/”
11.10.5編寫主程式
if name == “main”:
print(“開始執行 wordcount”)
sc = CreateSparkContext()
print(“開始讀取文本檔案…”)
textFile = sc.textFile(Path+“data/README.md”)
print(“文本共有”+str(textFile.count())+“行”)
# map-reduce 運算
countsRDD = textFile.flatMap(lambda line:line.split(’ ')).map(lambda x:(x, 1)).reduceByKey(lambda x,y:x+y)
print(“文本統計共有”+str(countsRDD.count())+“項資料”)
# 儲存結果到檔案
print(“開始儲存到文本檔案…”)
try:
countsRDD.saveAsTextFile(Path+“data/output”)
except Exception as e:
print(“輸出目錄已存在,請先删除原有目錄”)
sc.stop()
11.11建立測試檔案并上傳至HDFS目錄
後續我們将介紹在不同的模式下運作WordCount.py,是以我們需要把檔案複制到本地項目檔案,再把測試檔案上傳到HDFS目錄。
11.11.1複制本地測試檔案
mkdir -p ~/pythonwork/PythonProject/data
cp /usr/local/spark/README.md ~/pythonwork/PythonProject/data
11.11.2啟動Hadoop Multi Node Cluster
啟動Hadoop Multi Node Cluster之前,先啟動所有的虛拟機。
start-all.sh
11.11.3複制測試檔案到HDFS
#建立HDFS測試目錄
hadoop fs -mkdir -p /user/hduser/data
#檢視HDFS測試檔案
hadoop fs -ls /user/hduser/data/README.md
11.12使用spark-submit執行WordCount程式
11.12.1spark-submit詳細介紹
Spark-submit常用選項
選項 | 說明 |
---|---|
–master MASTER_URL | 可設定Spark在什麼環境運作 |
–driver-memory MEM | driver程式所使用的記憶體 |
–executor-memory MEM | executor程式所使用的記憶體 |
–name NAME | 要運作的application名稱,此名稱會在Hadoop或Spark Web UI界面中 |
Python程式檔案名 | 要運作的Python程式 |
–master MASTER_URL選項說明
Local | 在本地運作,隻使用1個線程 |
---|---|
Local[K] | 在本地運作,使用K個線程(會使用本地計算機的多核CPU) |
Local[*] | 在本地運作,Spark會自動盡量利用本地計算機的多核CPU |
Spark://HOST:PORT | 在Spark Standalone Cluster上運作,例如:spark://master:7077(預設port是7077) |
Mesos://HOST:PORT | 在Mesos cluser上運作(預設port是5050) |
YARN | 在YARN Client上運作,必須要設定HADOOP_CONF_DIR或YARN_CONF_DIR環境變量 |
11.12.2在local運作WordCount
在“終端”輸入下列指令
cd ~/pythonwork/PythonProject
spark-submit --driver-memory 2g --master local[4] WordCount.py
11.12.3檢視輸出目錄
ll data/output
11.12.4檢視輸入檔案内容
cat data/output/part-00000|more