這裡我選擇Hadoop2.7.0版本,java8版本,spark2.4.5版本,python3.6版本,hbase 1.2.0,MySQL8.0.23。注意以上版本對應關系不然安裝後可能無法使用
1 安裝ssh以及配置無密碼登入
叢集、單節點模式都需要用到 SSH 登陸(類似于遠端登陸,你可以登入某台 Linux 主機,并且在上面運作指令),Ubuntu 預設已安裝了 SSH client,此外還需要安裝 SSH server:
輸入如下指令:
$sudo apt-get update
$sudo apt-get install openssh-server #一般的ubuntu不自帶伺服器的ssh,都有客服端的ssh,需要安裝伺服器端的ssh
$ssh localhost #啟動ssh
#下面配置無密碼ssh登入
$cd ~/.ssh/ # 若沒有該目錄,請先執行一次ssh localhost,~表示使用者的主檔案夾
$ssh-keygen -t rsa # 會有提示,一直按回車就可以
$cat ./id_rsa.pub >> ./authorized_keys # 加入授權
此時再用
ssh localhost
指令,無需輸入密碼就可以直接登陸了
2 安裝java
打開終端
$cd /usr/lib
$sudo mkdir jvm #建立/usr/lib/jvm目錄用來存放JDK檔案
$cd ~ #進入hadoop使用者的主目錄
$cd Downloads #注意區分大小寫字母,剛才已經通過FTP軟體把JDK安裝包jdk-8u162-linux-x64.tar.gz上傳到該目錄(downloads)下
$sudo tar -zxvf ./jdk-8u162-linux-x64.tar.gz -C /usr/lib/jvm #把JDK檔案解壓到/usr/lib/jvm目錄下
$cd ~
$gedit ~/.bashrc
打開了hadoop這個使用者的環境變量配置檔案,請在這個檔案的開頭位置,添加如下幾行内容:
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
儲存.bashrc檔案并退出
$source ~/.bashrc
$java -version #檢視是否安裝成功
3 安裝hadoop
打開hadoop檔案包目錄下的終端,輸入以下代碼顯示安裝成功
我們選擇将 Hadoop 安裝至 /usr/local/ 中:
$sudo tar -zxf ~/下載下傳/hadoop-2.6.0.tar.gz -C /usr/local # 解壓到/usr/local中
$cd /usr/local/
$sudo mv ./hadoop-2.6.0/ ./hadoop # 将檔案夾名改為hadoop
$sudo chown -R yinchen ./hadoop # 修改檔案權限
$cd /usr/local/hadoop
$./bin/hadoop version #出現版本資訊,表示hadoop安裝成功了
Hadoop僞分布式配置
Hadoop僞分布式配置參考:http://dblab.xmu.edu.cn/blog/install-hadoop/
修改配置檔案 core-site.xml (通過 gedit 編輯會比較友善:
gedit ./etc/hadoop/core-site.xml
),将當中最後兩行的内容的
<configuration>
</configuration>
修改為:
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>file:/usr/local/hadoop/tmp</value>
<description>Abase for other temporary directories.</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
hadoop.tmp.dir 是 hadoop檔案系統依賴的基本配置,很多配置路徑都依賴它,它的預設位置是在 /tmp/{$user}下面,注意這是個臨時目錄!!!
是以,它的持久化配置很重要的! 如果選擇預設,一旦因為斷電等外在因素影響,/tmp/{$user}下的所有東西都會丢失,哇咔咔。。。趟過坑的應該知道多麼酸爽。。。
注意:
修改完配置需要重新格式化NameNode!!!
是以,建議:最好在安裝配置HADOOP的時候,就給配置OK!!!
fs.defaultFS配置資料傳輸的位址
同樣的,修改配置檔案 hdfs-site.xml:
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/usr/local/hadoop/tmp/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/usr/local/hadoop/tmp/dfs/data</value>
</property>
</configuration>
dfs.replication設定檔案備份數量
dfs.namenode.name.dir設定namenode檔案存儲的檔案夾
dfs.datanode.data.dir設定datanode檔案存儲的檔案夾
$cd /usr/local/hadoop
$./bin/hdfs namenode -format #格式化namenode節點
$cd /usr/local/hadoop
$./sbin/start-dfs.sh #start-dfs.sh是個完整的可執行檔案,中間沒有空格
啟動完成後,可以通過指令
jps
來判斷是否成功啟動,若成功啟動則會列出如下程序:
此外,若是 DataNode 沒有啟動,可嘗試如下的方法(注意這會删除 HDFS 中原有的所有資料,如果原有的資料很重要請不要這樣做):
- # 針對 DataNode 沒法啟動的解決方法
- cd /usr/local/hadoop
- ./sbin/stop-dfs.sh # 關閉
- rm -r ./tmp # 删除 tmp 檔案,注意這會删除 HDFS 中原有的所有資料
- ./bin/hdfs namenode -format # 重新格式化 NameNode
- ./sbin/start-dfs.sh # 重新開機
成功啟動後,可以通路 Web 界面 http://localhost:50070 檢視 NameNode 和 Datanode 資訊,還可以線上檢視 HDFS 中的檔案。
Hadoop的Web界面
運作Hadoop僞分布式執行個體
$./bin/hdfs dfs -mkdir -p /user/hadoop
$./bin/hdfs dfs -mkdir /user/hadoop/input
$./bin/hdfs dfs -put ./etc/hadoop/*.xml /user/hadoop/input
$./bin/hdfs dfs -ls /user/hadoop/input
$./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar grep /user/hadoop/input output 'dfs[a-z.]+'
$./bin/hdfs dfs -cat output/*
$rm -r ./output # 先删除本地的 output 檔案夾(如果存在)
$./bin/hdfs dfs -get output ./output # 将 HDFS 上的 output 檔案夾拷貝到本機
$cat ./output/*
$./bin/hdfs dfs -rm -r output # 删除 output 檔案夾
$./sbin/stop-dfs.sh #若要關閉 Hadoop,則運作這行代碼
下次啟動 hadoop 時,無需進行 NameNode 的初始化,隻需要運作
./sbin/start-dfs.sh
就可以!
運作截圖
4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題
是以需要額外再安裝python3.6.1版本
當在指令行輸入python時出現python3.6時,說明python安裝成功。可通過軟連結方式設定,一定不要删除ubuntu自帶的python3.8,不然系統可能出現問題。
ubuntu安裝python3.6: https://mp.csdn.net/editor/html/113042751
5 spark的安裝
sudo tar -zxf spark安裝包 -C /usr/local/ #解壓檔案
cd /usr/local
sudo mv ./spark-2.4檔案名 ./spark #将解壓後的檔案重命名為spark
sudo chown -R 使用者名 ./spark #設定使用者權限,-R後的參數表示計算機使用者名
cd /usr/local/spark
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
sudo gedit ./conf/spark-env.sh #編輯spark-env.sh檔案
在檔案最後一行加入:export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath) 來使hadoop和spark建立聯系
之後直接在指令行輸入
$./bin/pyspark 就安裝成功出現如下界面:
學習視訊:https://study.163.com/course/courseLearn.htm?courseId=1209408816#/learn/video?lessonId=1279281390&courseId=1209408816
6 Hbase安裝
hbase安裝分為3種方式,
單機模式:單純的用于hbase文法練習
僞分布式模式:用于學習hadoop,spark和hbase之間的互動,模拟實際應用場景
叢集模式:用于實際應用場景
6.1 hbase安裝:http://dblab.xmu.edu.cn/blog/install-hbase/
說明:HBase的版本一定要和之前已經安裝的Hadoop的版本保持相容,不能随便選擇版本。
安裝hbase的條件,– jdk– Hadoop( 單機模式不需要,僞分布式模式和分布式模式需要)– SSH
6.2 單機模式配置
用gedit指令打開并編輯hbase-env.sh
gedit /usr/local/hbase/conf/hbase-env.sh
添加以下兩行内容:
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export HBASE_MANAGES_ZK=true
添加完成後儲存退出即可。
第一行表示:配置JAVA環境變量
配置HBASE_MANAGES_ZK為true,表示由hbase自己管理zookeeper,不需要單獨的zookeeper。
配置/usr/local/hbase/conf/hbase-site.xml
<configuration>
<property>
<name>hbase.rootdir</name>
<value>file:///usr/local/hbase/hbase-tmp</value>
</property>
</configuration>
不設定的話,hbase.rootdir預設為/tmp/hbase-${user.name},這意味着每次重新開機系統都會丢失資料。
測試運作:
- cd /usr/local/hbase
- bin/start-hbase.sh
- bin/hbase shell
停止HBase運作:bin/stop-hbase.sh
注意:如果在操作HBase的過程中發生錯誤,可以通過{HBASE_HOME}目錄(/usr/local/hbase)下的logs子目錄中的日志檔案檢視錯誤原因。
6.3 僞分布式模式配置及運作
1.配置/usr/local/hbase/conf/hbase-env.sh
vi /usr/local/hbase/conf/hbase-env.sh
向檔案寫入一下資訊:
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export HBASE_CLASSPATH=/usr/local/hadoop/conf
export HBASE_MANAGES_ZK=true
2.配置/usr/local/hbase/conf/hbase-site.xm
gedit /usr/local/hbase/conf/hbase-site.xml
配置如下内容:
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
</configuration>
hbase.rootdir指定HBase的存儲目錄;
hbase.cluster.distributed設定叢集處于分布式模式.
3.測試運作HBase。SSH->Hadoop->Hbase->Hbase shell
第一步:首先登陸ssh,之前設定了無密碼登陸,是以這裡不需要密碼;再切換目錄至/usr/local/hadoop ;再啟動hadoop,如果已經啟動hadoop請跳過此步驟。指令如下:
ssh localhost
cd /usr/local/hadoop
./sbin/start-dfs.sh
輸入指令jps,能看到NameNode,DataNode和SecondaryNameNode都已經成功啟動,表示hadoop啟動成功,截圖如下:
第二步:切換目錄至/usr/local/hbase;再啟動HBase.指令如下:
cd /usr/local/hbase
bin/start-hbase.sh
啟動成功,輸入指令jps,看到以下界面說明hbase啟動成功
第三步:進入shell界面:bin/hbase shell
停止HBase運作:bin/stop-hbase.sh
注意:如果在操作HBase的過程中發生錯誤,可以通過{HBASE_HOME}目錄(/usr/local/hbase)下的logs子目錄中的日志檔案檢視錯誤原因。
這裡啟動關閉Hadoop和HBase的順序一定是:登入SSH->啟動Hadoop->啟動HBase->關閉HBase->關閉Hadoo
6.4 spark連接配接僞分布式資料庫Hbase
啟動順序:SSH->Hadoop->Hbase->pyspark
注意hbase和其他版本配套關系
配置視訊:https://study.163.com/course/courseLearn.htm?courseId=1209408816&from=study#/learn/video?lessonId=1279281401&courseId=1209408816
配置文檔:http://dblab.xmu.edu.cn/blog/1715-2/
http://dblab.xmu.edu.cn/blog/install-hbase/
(1)建立一個HBase表
cd /usr/local/hadoop
./sbin/start-all.sh
啟動完成以後,一定要輸入jps指令檢視是否啟動成功:如果少了其中一個程序,說明啟動失敗
2375 SecondaryNameNode
2169 DataNode
2667 NodeManager
2972 Jps
2045 NameNode
2541 ResourceManager
hbase> list #顯示目前HBase資料庫中有哪些已經建立好的表
如果裡面已經有一個名稱為student的表,請使用如下指令删除
hbase> disable 'student'
hbase> drop 'student'
hbase> create 'student','info'
hbase> describe 'student'
//首先錄入student表的第一個學生記錄
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
//然後錄入student表的第二個學生記錄
hbase> put 'student','2','info:name','Weiliang'
hbase> put 'student','2','info:gender','M'
hbase> put 'student','2','info:age','24'
資料錄入結束後,可以用下面指令檢視剛才已經錄入的資料:
//如果每次隻檢視一行,就用下面指令
hbase> get 'student','1'
//如果每次檢視全部資料,就用下面指令
hbase> scan 'student'
可以得到如下結果:
ROW COLUMN+CELL
1 column=info:age, timestamp=1479640712163, value=23
1 column=info:gender, timestamp=1479640704522, value=F
1 column=info:name, timestamp=1479640696132, value=Xueqian
2 column=info:age, timestamp=1479640752474, value=24
2 column=info:gender, timestamp=1479640745276, value=M
2 column=info:name, timestamp=1479640732763, value=Weiliang
2 row(s) in 0.1610 seconds
(2)配置Spark
請建立一個終端,執行下面指令
cd /usr/local/spark/jars
mkdir hbase
cd hbase
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava-12.0.1.jar ./ #注意自己包的版本
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./ ##注意自己包的版本
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./ ##注意自己包的版本
打開spark-example-1.6.0.jar下載下傳jar包
mkdir -p /usr/local/spark/jars/hbase/
mv ~/下載下傳/spark-examples* /usr/local/spark/jars/hbase/
編輯器打開spark-env.sh檔案
cd /usr/local/spark/conf
vim spark-env.sh
在檔案最前面增加下面一行内容:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*
隻有這樣,後面編譯和運作過程才不會出錯。
插曲:由于我把上面export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*配置錯了導緻了如下問題pyspark程式就是不能連接配接hbase。
[email protected]:/usr/local/spark$ ./bin/pyspark --jars ./bin/spark-examples_2.11-1.6.0-typesafe-001.jar
Python 3.6.8 (default, Jan 23 2021, 13:39:30)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
2021-02-17 23:12:39 WARN Utils:66 - Your hostname, yc resolves to a loopback address: 127.0.1.1; using 192.168.254.129 instead (on interface ens33)
2021-02-17 23:12:39 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2021-02-17 23:12:39 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.6.8 (default, Jan 23 2021 13:39:30)
SparkSession available as 'spark'.
>>> host="localhost"
>>> table="student"
>>> conf={"hbase.zookeeper.quorum":host,"hbase.mapreduce.inputtable":table}
>>> keyConv="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
>>> valueConv="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
>>> hbase_rdd=sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark/python/pyspark/context.py", line 751, in newAPIHadoopRDD
jconf, batchSize)
File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDDFromClassNames(PythonRDD.scala:312)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:297)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
(3)編寫pyspark程式讀取HBase資料
啟動pyspark:/usr/local/spark/bin/pyspark
輸入如下代碼:
host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)
1 {"qualifier" : "age", "timestamp" : "1512549772307", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}
{"qualifier" : "gender", "timestamp" : "1512549765192", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}
{"qualifier" : "name", "timestamp" : "1512549757406", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}
2 {"qualifier" : "age", "timestamp" : "1512549829145", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "24"}
{"qualifier" : "gender", "timestamp" : "1512549790422", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}
{"qualifier" : "name", "timestamp" : "1512549780044", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}
(4)編寫pyspark程式向HBase寫入資料
啟動pyspark,然後在pyspark shell中輸入如下代碼
host = 'localhost'
table = 'student' #定義寫入的hbase表格名稱
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} #conf配置連接配接資料庫的相關資訊
rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua'] #rawData為通過hadoop寫入hbase資料庫的内容
// ( rowkey , [ row key , column family , column name , value ] )
sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
此處發現執行後,pyspark會報如下錯誤,但是資料仍會插入到hbase中.
pyspark.sql.utils.IllegalArgumentException: 'Can not create a Path from a null string'
執行後,我們可以切換到剛才的HBase終端視窗,在HBase shell中輸入如下指令檢視結果:
hbase> scan 'student'
hbase
就能檢視到新增的資料
7 MySQL安裝
7.1.首先ubuntu正常安裝mysql:
參考部落格:https://blog.csdn.net/sa726663676/article/details/113647373
也可參考http://dblab.xmu.edu.cn/blog/install-mysql/(不推薦)
7.2.找到系統對應版本的java JDBC8.0.23 版本對應于我剛剛安裝的mysql版本8.0.23 。 如果版本不對應可能導緻插曲7.5.1錯誤
java JDBC8.0.23JDBC 下載下傳: https://dev.mysql.com/downloads/connector/j/?os=26
7.3.将下載下傳的tar.gz檔案進行解壓:
ubuntu的解壓指令是tar -zxvf (檔案名)**.tar.gz
7.4 解壓的畫圈檔案放在 /usr/local/spark/jars 目錄下
注意路徑是自己安裝的spark路徑
使用pyspark連接配接mysql資料庫:
連接配接之前我們需要先添加些資訊
首先打開資料庫向資料庫添加些表格和資訊:
7.5 pyspark通過jdbc連接配接MySQL資料庫:
打開pyspark輸入下面兩行代碼:
jdbcDF=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/spark").option("dbtable","student").option("user","root").option("password","yinchen").load()
jdbcDF.show()
運作結果圖:正确連接配接到了mysql顯示了剛剛建立表格的資訊
以上代碼解讀
jdbcDF=spark.read.format("jdbc")
.option("driver","com.mysql.jdbc.Driver")
.option("url","jdbc:mysql://localhost:3306/spark") #ip位址localhost和端口号3306以及連接配接的資料庫名spark,mysql預設為3306,如果端口寫錯會導緻插曲7.5.2錯誤
.option("dbtable","student") #表名student
.option("user","root") #使用者名root
.option("password","yinchen") #密碼yinchen
.load()
本節參考視訊:https://study.163.com/course/courseLearn.htm?courseId=1209408816&from=study#/learn/video?lessonId=1279274539&courseId=1209408816
插曲7.5.1:如果JDBC檔案沒有複制到pyspark子目錄的jar目錄中,出現錯誤如下,是以當出現下面錯誤的時候考慮JDBC.jar包是否存在或者版本是否正确:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.6.8 (default, Jan 23 2021 13:39:30)
SparkSession available as 'spark'.
>>> jdbcDF=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/spark").option("dbtable","student").option("user","root").option("password","yinchen").load()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 172, in load
return self._df(self._jreader.load())
File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:35)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
插曲7.5.2 使用JDBC連接配接mysql時端口寫錯了
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.6.8 (default, Jan 23 2021 13:39:30)
SparkSession available as 'spark'.
>>> jdbcDF=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:33060/spark").option("dbtable","student").option("user","root").option("password","yinchen").load()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 172, in load
return self._df(self._jreader.load())
File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o37.load.
: java.sql.SQLNonTransientConnectionException: Unsupported protocol version: 11. Likely connecting to an X Protocol port.
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:79)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:833)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:453)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:210)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.mysql.cj.exceptions.UnableToConnectException: Unsupported protocol version: 11. Likely connecting to an X Protocol port.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
at com.mysql.cj.protocol.a.NativeCapabilities.setInitialHandshakePacket(NativeCapabilities.java:112)
at com.mysql.cj.protocol.a.NativeProtocol.readServerCapabilities(NativeProtocol.java:506)
at com.mysql.cj.protocol.a.NativeProtocol.beforeHandshake(NativeProtocol.java:385)
at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1348)
at com.mysql.cj.NativeSession.connect(NativeSession.java:157)
at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:953)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:823)
... 24 more
8.安裝kafka
8.1 下載下傳安裝kafka
kafka的官方下載下傳位址為:https://kafka.apache.org/downloads
#這裡我安裝的是0.11.0.3:安裝的時候最好注意下版本比對,此安裝包内已經附帶zookeeper,不需要額外安裝zookeeper.按順序執行如下步驟:
打開檔案下載下傳位置的終端,然後運作如下指令
cd ~/下載下傳
sudo tar -zxf kafka_2.11-0.10.1.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.11-0.10.1.0/ ./kafka
sudo chown -R yinchen ./kafka #注意紅色為你自己的電腦使用者名
8.2 測試簡單執行個體
接下來在Ubuntu系統環境下測試簡單的執行個體。Mac系統請自己按照安裝的位置,切換到相應的指令。按順序執行如下指令:
# 進入kafka所在的目錄
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
指令執行後不會傳回Shell指令輸入狀态,zookeeper就會按照預設的配置檔案啟動服務,請千萬不要關閉目前終端.啟動新的終端,輸入如下指令:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
kafka服務端就啟動了,請千萬不要關閉目前終端。啟動另外一個終端,輸入如下指令:
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
topic是釋出消息釋出的category,以單節點的配置建立了一個叫dblab的topic.可以用list列出所有建立的topics,來檢視剛才建立的主題是否存在
bin/kafka-topics.sh --list --zookeeper localhost:2181
可以在結果中檢視到dblab這個topic存在。接下來用producer生産點資料:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
并嘗試輸入如下資訊:
hello hadoop
hello xmu
hadoop world
然後再次開啟新的終端或者直接按CTRL+C退出。然後使用consumer來接收資料,輸入如下指令:
cd /usr/local/kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning
便可以看到剛才産生的三條資訊。說明kafka安裝成功。
kafka安裝參考:http://dblab.xmu.edu.cn/blog/1096-2/