天天看點

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

這裡我選擇Hadoop2.7.0版本,java8版本,spark2.4.5版本,python3.6版本,hbase 1.2.0,MySQL8.0.23。注意以上版本對應關系不然安裝後可能無法使用

1 安裝ssh以及配置無密碼登入

叢集、單節點模式都需要用到 SSH 登陸(類似于遠端登陸,你可以登入某台 Linux 主機,并且在上面運作指令),Ubuntu 預設已安裝了 SSH client,此外還需要安裝 SSH server:

輸入如下指令:

$sudo apt-get update

$sudo apt-get install openssh-server       #一般的ubuntu不自帶伺服器的ssh,都有客服端的ssh,需要安裝伺服器端的ssh

$ssh localhost      #啟動ssh

#下面配置無密碼ssh登入

$cd ~/.ssh/      # 若沒有該目錄,請先執行一次ssh localhost,~表示使用者的主檔案夾

$ssh-keygen -t rsa     # 會有提示,一直按回車就可以

$cat ./id_rsa.pub >> ./authorized_keys    # 加入授權

此時再用 

ssh localhost

 指令,無需輸入密碼就可以直接登陸了

2 安裝java

打開終端

$cd /usr/lib

$sudo mkdir jvm             #建立/usr/lib/jvm目錄用來存放JDK檔案

$cd ~                     #進入hadoop使用者的主目錄

$cd Downloads                 #注意區分大小寫字母,剛才已經通過FTP軟體把JDK安裝包jdk-8u162-linux-x64.tar.gz上傳到該目錄(downloads)下

$sudo tar -zxvf ./jdk-8u162-linux-x64.tar.gz -C /usr/lib/jvm         #把JDK檔案解壓到/usr/lib/jvm目錄下

$cd ~

$gedit ~/.bashrc

打開了hadoop這個使用者的環境變量配置檔案,請在這個檔案的開頭位置,添加如下幾行内容:

export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
           

儲存.bashrc檔案并退出

$source ~/.bashrc

$java -version                #檢視是否安裝成功

3 安裝hadoop

打開hadoop檔案包目錄下的終端,輸入以下代碼顯示安裝成功

我們選擇将 Hadoop 安裝至 /usr/local/ 中:

$sudo tar -zxf ~/下載下傳/hadoop-2.6.0.tar.gz -C /usr/local            # 解壓到/usr/local中

$cd /usr/local/

$sudo mv ./hadoop-2.6.0/ ./hadoop          # 将檔案夾名改為hadoop

$sudo chown -R yinchen ./hadoop           # 修改檔案權限

$cd /usr/local/hadoop

$./bin/hadoop version                              #出現版本資訊,表示hadoop安裝成功了

Hadoop僞分布式配置

Hadoop僞分布式配置參考:http://dblab.xmu.edu.cn/blog/install-hadoop/

修改配置檔案 core-site.xml (通過 gedit 編輯會比較友善: 

gedit ./etc/hadoop/core-site.xml

),将當中最後兩行的内容的

<configuration>

</configuration>

修改為:

<configuration>

    <property>

        <name>hadoop.tmp.dir</name>

        <value>file:/usr/local/hadoop/tmp</value>

        <description>Abase for other temporary directories.</description>

    </property>

    <property>

        <name>fs.defaultFS</name>

        <value>hdfs://localhost:9000</value>

    </property>

</configuration>

hadoop.tmp.dir 是 hadoop檔案系統依賴的基本配置,很多配置路徑都依賴它,它的預設位置是在 /tmp/{$user}下面,注意這是個臨時目錄!!!

是以,它的持久化配置很重要的! 如果選擇預設,一旦因為斷電等外在因素影響,/tmp/{$user}下的所有東西都會丢失,哇咔咔。。。趟過坑的應該知道多麼酸爽。。。

注意:

修改完配置需要重新格式化NameNode!!!

是以,建議:最好在安裝配置HADOOP的時候,就給配置OK!!!

fs.defaultFS配置資料傳輸的位址

同樣的,修改配置檔案 hdfs-site.xml:

<configuration>

    <property>

        <name>dfs.replication</name>

        <value>1</value>

    </property>

    <property>

        <name>dfs.namenode.name.dir</name>

        <value>file:/usr/local/hadoop/tmp/dfs/name</value>

    </property>

    <property>

        <name>dfs.datanode.data.dir</name>

        <value>file:/usr/local/hadoop/tmp/dfs/data</value>

    </property>

</configuration>

dfs.replication設定檔案備份數量

dfs.namenode.name.dir設定namenode檔案存儲的檔案夾

dfs.datanode.data.dir設定datanode檔案存儲的檔案夾

$cd /usr/local/hadoop

$./bin/hdfs namenode -format       #格式化namenode節點

$cd /usr/local/hadoop

$./sbin/start-dfs.sh                             #start-dfs.sh是個完整的可執行檔案,中間沒有空格

啟動完成後,可以通過指令 

jps

 來判斷是否成功啟動,若成功啟動則會列出如下程序:

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

此外,若是 DataNode 沒有啟動,可嘗試如下的方法(注意這會删除 HDFS 中原有的所有資料,如果原有的資料很重要請不要這樣做):

  1. # 針對 DataNode 沒法啟動的解決方法
  2. cd /usr/local/hadoop
  3. ./sbin/stop-dfs.sh # 關閉
  4. rm -r ./tmp # 删除 tmp 檔案,注意這會删除 HDFS 中原有的所有資料
  5. ./bin/hdfs namenode -format # 重新格式化 NameNode
  6. ./sbin/start-dfs.sh # 重新開機

成功啟動後,可以通路 Web 界面 http://localhost:50070 檢視 NameNode 和 Datanode 資訊,還可以線上檢視 HDFS 中的檔案。

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

Hadoop的Web界面

運作Hadoop僞分布式執行個體

$./bin/hdfs dfs -mkdir -p /user/hadoop

$./bin/hdfs dfs -mkdir /user/hadoop/input

$./bin/hdfs dfs -put ./etc/hadoop/*.xml /user/hadoop/input

$./bin/hdfs dfs -ls /user/hadoop/input

$./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar grep /user/hadoop/input output 'dfs[a-z.]+'

$./bin/hdfs dfs -cat output/*

$rm -r ./output # 先删除本地的 output 檔案夾(如果存在)

$./bin/hdfs dfs -get output ./output # 将 HDFS 上的 output 檔案夾拷貝到本機

$cat ./output/*

$./bin/hdfs dfs -rm -r output # 删除 output 檔案夾

$./sbin/stop-dfs.sh        #若要關閉 Hadoop,則運作這行代碼

下次啟動 hadoop 時,無需進行 NameNode 的初始化,隻需要運作 

./sbin/start-dfs.sh

 就可以!

運作截圖

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka
ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka
ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題

是以需要額外再安裝python3.6.1版本

當在指令行輸入python時出現python3.6時,說明python安裝成功。可通過軟連結方式設定,一定不要删除ubuntu自帶的python3.8,不然系統可能出現問題。

ubuntu安裝python3.6: https://mp.csdn.net/editor/html/113042751

5 spark的安裝

sudo tar -zxf spark安裝包 -C /usr/local/        #解壓檔案

cd /usr/local

sudo mv ./spark-2.4檔案名 ./spark       #将解壓後的檔案重命名為spark

sudo chown -R 使用者名 ./spark           #設定使用者權限,-R後的參數表示計算機使用者名

cd /usr/local/spark

cp ./conf/spark-env.sh.template ./conf/spark-env.sh

sudo gedit ./conf/spark-env.sh               #編輯spark-env.sh檔案

在檔案最後一行加入:export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)    來使hadoop和spark建立聯系

之後直接在指令行輸入

$./bin/pyspark    就安裝成功出現如下界面:

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

學習視訊:https://study.163.com/course/courseLearn.htm?courseId=1209408816#/learn/video?lessonId=1279281390&courseId=1209408816

6 Hbase安裝

hbase安裝分為3種方式,

單機模式:單純的用于hbase文法練習

僞分布式模式:用于學習hadoop,spark和hbase之間的互動,模拟實際應用場景

叢集模式:用于實際應用場景

6.1 hbase安裝:http://dblab.xmu.edu.cn/blog/install-hbase/

說明:HBase的版本一定要和之前已經安裝的Hadoop的版本保持相容,不能随便選擇版本。

安裝hbase的條件,– jdk– Hadoop( 單機模式不需要,僞分布式模式和分布式模式需要)– SSH

6.2 單機模式配置

用gedit指令打開并編輯hbase-env.sh

gedit /usr/local/hbase/conf/hbase-env.sh

添加以下兩行内容:

export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64

export HBASE_MANAGES_ZK=true

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

添加完成後儲存退出即可。

第一行表示:配置JAVA環境變量

配置HBASE_MANAGES_ZK為true,表示由hbase自己管理zookeeper,不需要單獨的zookeeper。

配置/usr/local/hbase/conf/hbase-site.xml

<configuration>

<property>

<name>hbase.rootdir</name>

<value>file:///usr/local/hbase/hbase-tmp</value>

</property>

</configuration>

不設定的話,hbase.rootdir預設為/tmp/hbase-${user.name},這意味着每次重新開機系統都會丢失資料。

測試運作:

  1. cd /usr/local/hbase
  2. bin/start-hbase.sh
  3. bin/hbase shell
ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

停止HBase運作:bin/stop-hbase.sh

注意:如果在操作HBase的過程中發生錯誤,可以通過{HBASE_HOME}目錄(/usr/local/hbase)下的logs子目錄中的日志檔案檢視錯誤原因。

6.3 僞分布式模式配置及運作

1.配置/usr/local/hbase/conf/hbase-env.sh

vi /usr/local/hbase/conf/hbase-env.sh

向檔案寫入一下資訊:

export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64

export HBASE_CLASSPATH=/usr/local/hadoop/conf

export HBASE_MANAGES_ZK=true

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

2.配置/usr/local/hbase/conf/hbase-site.xm

gedit /usr/local/hbase/conf/hbase-site.xml

配置如下内容:

<configuration>

        <property>

                <name>hbase.rootdir</name>

                <value>hdfs://localhost:9000/hbase</value>

        </property>

        <property>

                <name>hbase.cluster.distributed</name>

                <value>true</value>

        </property>

</configuration>

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

hbase.rootdir指定HBase的存儲目錄;

hbase.cluster.distributed設定叢集處于分布式模式.

3.測試運作HBase。SSH->Hadoop->Hbase->Hbase shell

第一步:首先登陸ssh,之前設定了無密碼登陸,是以這裡不需要密碼;再切換目錄至/usr/local/hadoop ;再啟動hadoop,如果已經啟動hadoop請跳過此步驟。指令如下:

ssh localhost

cd /usr/local/hadoop

./sbin/start-dfs.sh

輸入指令jps,能看到NameNode,DataNode和SecondaryNameNode都已經成功啟動,表示hadoop啟動成功,截圖如下:

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

第二步:切換目錄至/usr/local/hbase;再啟動HBase.指令如下:

cd /usr/local/hbase

bin/start-hbase.sh

啟動成功,輸入指令jps,看到以下界面說明hbase啟動成功

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

第三步:進入shell界面:bin/hbase shell

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

停止HBase運作:bin/stop-hbase.sh

注意:如果在操作HBase的過程中發生錯誤,可以通過{HBASE_HOME}目錄(/usr/local/hbase)下的logs子目錄中的日志檔案檢視錯誤原因。

這裡啟動關閉Hadoop和HBase的順序一定是:登入SSH->啟動Hadoop->啟動HBase->關閉HBase->關閉Hadoo

6.4 spark連接配接僞分布式資料庫Hbase

啟動順序:SSH->Hadoop->Hbase->pyspark

注意hbase和其他版本配套關系

配置視訊:https://study.163.com/course/courseLearn.htm?courseId=1209408816&from=study#/learn/video?lessonId=1279281401&courseId=1209408816

配置文檔:http://dblab.xmu.edu.cn/blog/1715-2/

http://dblab.xmu.edu.cn/blog/install-hbase/

(1)建立一個HBase表

cd /usr/local/hadoop

./sbin/start-all.sh

啟動完成以後,一定要輸入jps指令檢視是否啟動成功:如果少了其中一個程序,說明啟動失敗

2375 SecondaryNameNode
2169 DataNode
2667 NodeManager
2972 Jps
2045 NameNode
2541 ResourceManager
           

hbase> list   #顯示目前HBase資料庫中有哪些已經建立好的表

如果裡面已經有一個名稱為student的表,請使用如下指令删除

hbase> disable 'student'

hbase> drop 'student'

hbase>  create 'student','info'

hbase> describe 'student'

//首先錄入student表的第一個學生記錄

hbase> put 'student','1','info:name','Xueqian'

hbase> put 'student','1','info:gender','F'

hbase> put 'student','1','info:age','23'

//然後錄入student表的第二個學生記錄

hbase> put 'student','2','info:name','Weiliang'

hbase> put 'student','2','info:gender','M'

hbase> put 'student','2','info:age','24'

資料錄入結束後,可以用下面指令檢視剛才已經錄入的資料:

//如果每次隻檢視一行,就用下面指令

hbase> get 'student','1'

//如果每次檢視全部資料,就用下面指令

hbase> scan 'student'

可以得到如下結果:

ROW                                    COLUMN+CELL                                                                                                   
 1                                     column=info:age, timestamp=1479640712163, value=23                                                            
 1                                     column=info:gender, timestamp=1479640704522, value=F                                                          
 1                                     column=info:name, timestamp=1479640696132, value=Xueqian                                                      
 2                                     column=info:age, timestamp=1479640752474, value=24                                                            
 2                                     column=info:gender, timestamp=1479640745276, value=M                                                          
 2                                     column=info:name, timestamp=1479640732763, value=Weiliang                                                     
2 row(s) in 0.1610 seconds
           

(2)配置Spark

請建立一個終端,執行下面指令

cd  /usr/local/spark/jars

mkdir  hbase

cd  hbase

cp  /usr/local/hbase/lib/hbase*.jar  ./

cp  /usr/local/hbase/lib/guava-12.0.1.jar  ./    #注意自己包的版本

cp  /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar  ./    ##注意自己包的版本

cp  /usr/local/hbase/lib/protobuf-java-2.5.0.jar  ./   ##注意自己包的版本

打開spark-example-1.6.0.jar下載下傳jar包

mkdir -p /usr/local/spark/jars/hbase/

mv ~/下載下傳/spark-examples* /usr/local/spark/jars/hbase/

編輯器打開spark-env.sh檔案

cd /usr/local/spark/conf

vim spark-env.sh

在檔案最前面增加下面一行内容:

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*
           

隻有這樣,後面編譯和運作過程才不會出錯。

插曲:由于我把上面export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*配置錯了導緻了如下問題pyspark程式就是不能連接配接hbase。

[email protected]:/usr/local/spark$ ./bin/pyspark --jars ./bin/spark-examples_2.11-1.6.0-typesafe-001.jar 

Python 3.6.8 (default, Jan 23 2021, 13:39:30) 

[GCC 9.3.0] on linux

Type "help", "copyright", "credits" or "license" for more information.

2021-02-17 23:12:39 WARN  Utils:66 - Your hostname, yc resolves to a loopback address: 127.0.1.1; using 192.168.254.129 instead (on interface ens33)

2021-02-17 23:12:39 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address

2021-02-17 23:12:39 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0

      /_/

Using Python version 3.6.8 (default, Jan 23 2021 13:39:30)

SparkSession available as 'spark'.

>>> host="localhost"

>>> table="student"

>>> conf={"hbase.zookeeper.quorum":host,"hbase.mapreduce.inputtable":table}

>>> keyConv="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"

>>> valueConv="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

>>> hbase_rdd=sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)

Traceback (most recent call last):

  File "<stdin>", line 1, in <module>

  File "/usr/local/spark/python/pyspark/context.py", line 751, in newAPIHadoopRDD

    jconf, batchSize)

  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__

  File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco

    return f(*a, **kw)

  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.

: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable

    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

    at java.lang.Class.forName0(Native Method)

    at java.lang.Class.forName(Class.java:348)

    at org.apache.spark.util.Utils$.classForName(Utils.scala:238)

    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDDFromClassNames(PythonRDD.scala:312)

    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:297)

    at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

    at py4j.Gateway.invoke(Gateway.java:282)

    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

    at py4j.commands.CallCommand.execute(CallCommand.java:79)

    at py4j.GatewayConnection.run(GatewayConnection.java:238)

    at java.lang.Thread.run(Thread.java:748)

(3)編寫pyspark程式讀取HBase資料

啟動pyspark:/usr/local/spark/bin/pyspark

輸入如下代碼:

host = 'localhost'

table = 'student'

conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"

valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)

count = hbase_rdd.count()

hbase_rdd.cache()

output = hbase_rdd.collect()

for (k, v) in output:

        print (k, v)

1 {"qualifier" : "age", "timestamp" : "1512549772307", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}

{"qualifier" : "gender", "timestamp" : "1512549765192", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}

{"qualifier" : "name", "timestamp" : "1512549757406", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}

2 {"qualifier" : "age", "timestamp" : "1512549829145", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "24"}

{"qualifier" : "gender", "timestamp" : "1512549790422", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}

{"qualifier" : "name", "timestamp" : "1512549780044", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}               

(4)編寫pyspark程式向HBase寫入資料

啟動pyspark,然後在pyspark shell中輸入如下代碼

host = 'localhost'

table = 'student'    #定義寫入的hbase表格名稱

keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"

valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}     #conf配置連接配接資料庫的相關資訊

rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua']     #rawData為通過hadoop寫入hbase資料庫的内容

// ( rowkey , [ row key , column family , column name , value ] )

sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)

此處發現執行後,pyspark會報如下錯誤,但是資料仍會插入到hbase中.

pyspark.sql.utils.IllegalArgumentException: 'Can not create a Path from a null string'
           

執行後,我們可以切換到剛才的HBase終端視窗,在HBase shell中輸入如下指令檢視結果:

hbase> scan 'student'

hbase

就能檢視到新增的資料

7 MySQL安裝

7.1.首先ubuntu正常安裝mysql:

參考部落格:https://blog.csdn.net/sa726663676/article/details/113647373

也可參考http://dblab.xmu.edu.cn/blog/install-mysql/(不推薦)

7.2.找到系統對應版本的java JDBC8.0.23  版本對應于我剛剛安裝的mysql版本8.0.23 。    如果版本不對應可能導緻插曲7.5.1錯誤

java JDBC8.0.23JDBC 下載下傳:    https://dev.mysql.com/downloads/connector/j/?os=26   

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

7.3.将下載下傳的tar.gz檔案進行解壓:

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

ubuntu的解壓指令是tar -zxvf (檔案名)**.tar.gz

7.4 解壓的畫圈檔案放在 /usr/local/spark/jars  目錄下

注意路徑是自己安裝的spark路徑

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

使用pyspark連接配接mysql資料庫:

連接配接之前我們需要先添加些資訊

首先打開資料庫向資料庫添加些表格和資訊:

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

7.5 pyspark通過jdbc連接配接MySQL資料庫:

打開pyspark輸入下面兩行代碼:

jdbcDF=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/spark").option("dbtable","student").option("user","root").option("password","yinchen").load()


jdbcDF.show()
           

運作結果圖:正确連接配接到了mysql顯示了剛剛建立表格的資訊

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

以上代碼解讀

jdbcDF=spark.read.format("jdbc")            

.option("driver","com.mysql.jdbc.Driver")    

.option("url","jdbc:mysql://localhost:3306/spark")      #ip位址localhost和端口号3306以及連接配接的資料庫名spark,mysql預設為3306,如果端口寫錯會導緻插曲7.5.2錯誤

.option("dbtable","student")                                     #表名student

.option("user","root")                                              #使用者名root

.option("password","yinchen")                               #密碼yinchen

.load()

本節參考視訊:https://study.163.com/course/courseLearn.htm?courseId=1209408816&from=study#/learn/video?lessonId=1279274539&courseId=1209408816

插曲7.5.1:如果JDBC檔案沒有複制到pyspark子目錄的jar目錄中,出現錯誤如下,是以當出現下面錯誤的時候考慮JDBC.jar包是否存在或者版本是否正确:

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0

      /_/

Using Python version 3.6.8 (default, Jan 23 2021 13:39:30)

SparkSession available as 'spark'.

>>> jdbcDF=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/spark").option("dbtable","student").option("user","root").option("password","yinchen").load()

Traceback (most recent call last):

  File "<stdin>", line 1, in <module>

  File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 172, in load

    return self._df(self._jreader.load())

  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__

  File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco

    return f(*a, **kw)

  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.

: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

    at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)

    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)

    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)

    at scala.Option.foreach(Option.scala:257)

    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99)

    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:35)

    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)

    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)

    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)

    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)

    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

    at py4j.Gateway.invoke(Gateway.java:282)

    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

    at py4j.commands.CallCommand.execute(CallCommand.java:79)

    at py4j.GatewayConnection.run(GatewayConnection.java:238)

    at java.lang.Thread.run(Thread.java:748)

插曲7.5.2  使用JDBC連接配接mysql時端口寫錯了

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0

      /_/

Using Python version 3.6.8 (default, Jan 23 2021 13:39:30)

SparkSession available as 'spark'.

>>> jdbcDF=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:33060/spark").option("dbtable","student").option("user","root").option("password","yinchen").load()

Traceback (most recent call last):

  File "<stdin>", line 1, in <module>

  File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 172, in load

    return self._df(self._jreader.load())

  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__

  File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco

    return f(*a, **kw)

  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o37.load.

: java.sql.SQLNonTransientConnectionException: Unsupported protocol version: 11. Likely connecting to an X Protocol port.

    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)

    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)

    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)

    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)

    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73)

    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:79)

    at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:833)

    at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:453)

    at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246)

    at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)

    at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)

    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)

    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)

    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)

    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:210)

    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)

    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)

    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)

    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)

    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

    at py4j.Gateway.invoke(Gateway.java:282)

    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

    at py4j.commands.CallCommand.execute(CallCommand.java:79)

    at py4j.GatewayConnection.run(GatewayConnection.java:238)

    at java.lang.Thread.run(Thread.java:748)

Caused by: com.mysql.cj.exceptions.UnableToConnectException: Unsupported protocol version: 11. Likely connecting to an X Protocol port.

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

    at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)

    at com.mysql.cj.protocol.a.NativeCapabilities.setInitialHandshakePacket(NativeCapabilities.java:112)

    at com.mysql.cj.protocol.a.NativeProtocol.readServerCapabilities(NativeProtocol.java:506)

    at com.mysql.cj.protocol.a.NativeProtocol.beforeHandshake(NativeProtocol.java:385)

    at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1348)

    at com.mysql.cj.NativeSession.connect(NativeSession.java:157)

    at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:953)

    at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:823)

    ... 24 more

8.安裝kafka

8.1 下載下傳安裝kafka

kafka的官方下載下傳位址為:https://kafka.apache.org/downloads  

#這裡我安裝的是0.11.0.3:安裝的時候最好注意下版本比對,此安裝包内已經附帶zookeeper,不需要額外安裝zookeeper.按順序執行如下步驟:

ubuntu20.04.1安裝java-hadoop-spark-python-hbase-mysql-kafka及僞分布式配置1 安裝ssh以及配置無密碼登入2 安裝java3 安裝hadoop4 安裝python 系統自帶的python3.8好像版本太高,運作會有問題5 spark的安裝6 Hbase安裝7 MySQL安裝8.安裝kafka

打開檔案下載下傳位置的終端,然後運作如下指令

cd ~/下載下傳

sudo tar -zxf kafka_2.11-0.10.1.0.tgz -C /usr/local

cd /usr/local

sudo mv kafka_2.11-0.10.1.0/ ./kafka

sudo chown -R  yinchen ./kafka    #注意紅色為你自己的電腦使用者名

8.2 測試簡單執行個體

接下來在Ubuntu系統環境下測試簡單的執行個體。Mac系統請自己按照安裝的位置,切換到相應的指令。按順序執行如下指令:

# 進入kafka所在的目錄

cd /usr/local/kafka

bin/zookeeper-server-start.sh config/zookeeper.properties

指令執行後不會傳回Shell指令輸入狀态,zookeeper就會按照預設的配置檔案啟動服務,請千萬不要關閉目前終端.啟動新的終端,輸入如下指令:

cd /usr/local/kafka

bin/kafka-server-start.sh config/server.properties

kafka服務端就啟動了,請千萬不要關閉目前終端。啟動另外一個終端,輸入如下指令:

cd /usr/local/kafka

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab

topic是釋出消息釋出的category,以單節點的配置建立了一個叫dblab的topic.可以用list列出所有建立的topics,來檢視剛才建立的主題是否存在

bin/kafka-topics.sh --list --zookeeper localhost:2181  

可以在結果中檢視到dblab這個topic存在。接下來用producer生産點資料:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab

并嘗試輸入如下資訊:

hello hadoop
hello xmu
hadoop world
           

然後再次開啟新的終端或者直接按CTRL+C退出。然後使用consumer來接收資料,輸入如下指令:

cd /usr/local/kafka

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning  

便可以看到剛才産生的三條資訊。說明kafka安裝成功。

kafka安裝參考:http://dblab.xmu.edu.cn/blog/1096-2/

繼續閱讀