天天看點

基于Spark Streaming 進行 MySQL Binlog 日志準實時傳輸

基本架構

RDS -> SLS -> Spark Streaming -> Spark HDFS

上述鍊路主要包含3個過程:

  1. 如何把 RDS 的 binlog 收集到 SLS。
  2. 如何通過 Spark Streaming 将 SLS 中的日志讀取出來,進行分析。
  3. 如何把鍊路 2 中讀取和處理過的日志,儲存到 Spark HDFS中。

環境準備

  1. 安裝一個 MySQL 類型的資料庫(使用 MySQL 協定,例如 RDS、DRDS 等),開啟 log-bin 功能,且配置 binlog 類型為 ROW 模式(RDS預設開啟)。
  2. 開通 SLS 服務。

操作步驟

  1. 檢查 MySQL 資料庫環境。
    1. 檢視是否開啟 log-bin 功能。
    mysql> show variables like "log_bin";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | log_bin       | ON    |
    +---------------+-------+
    1 row in set (0.02 sec)           
    1. 檢視 binlog 類型
    mysql> show variables like "binlog_format";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | binlog_format | ROW   |
    +---------------+-------+
    1 row in set (0.03 sec)             
  2. 添加使用者權限。(也可以直接通過RDS控制台添加)
    CREATE USER canal IDENTIFIED BY ‘canal’;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;FLUSH PRIVILEGES;           
  3. 為 SLS 服務添加對應的配置檔案,并檢查資料是否正常采集。
    1. 在 SLS 控制台添加對應的 project 和 logstore,例如:建立一個名稱為 canaltest 的 project,然後建立一個名稱為 canal 的 logstore。
    2. 對 SLS 進行配置:在 /etc/ilogtail 目錄下建立檔案user_local_config.json,具體配置如下:
    {
    "metrics": {
     "##1.0##canaltest$plugin-local": {
         "aliuid": "****",
         "enable": true,
         "category": "canal",
         "defaultEndpoint": "*******",
         "project_name": "canaltest",
         "region": "cn-hangzhou",
         "version": 2
         "log_type": "plugin",
         "plugin": {
             "inputs": [
                 {
                     "type": "service_canal",
                     "detail": {
                         "Host": "*****",
                         "Password": "****",
                         "ServerID": ****,
                         "User" : "***",
                         "DataBases": [
                             "yourdb"
                         ],
                         "IgnoreTables": [
                             "\\S+_inner"
                         ],
                          "TextToString" : true
                     }
                 }
             ],
             "flushers": [
                 {
                     "type": "flusher_sls",
                     "detail": {}
                 }
             ]
         }
     }
    }
    }           
    其中 detail 中的 Host 和 Password 等資訊為 MySQL 資料庫資訊,User 資訊為之前授權過的使用者名。aliUid、defaultEndpoint、project_name、category 請根據自己的實際情況填寫對應的使用者和 SLS 資訊。
    1. 等待約 2 分鐘,通過 SLS 控制台檢視日志資料是否上傳成功,具體如圖所示。
      基于Spark Streaming 進行 MySQL Binlog 日志準實時傳輸

如果日志資料沒有采內建功,請根據SLS的提示,檢視SLS的采集日志進行排查。

  1. 準備代碼,将代碼編譯成 jar 包,然後上傳到 OSS。
    1. 将 EMR 的示例代碼通過 git 複制下來,然後進行修改,具體指令為:
    git clone https://github.com/aliyun/aliyun-emapreduce-demo.git。           
    示例代碼中已經有 LoghubSample 類,該類主要用于從 SLS 采集資料并列印。以下是修改後的代碼,供參考:
    package com.aliyun.emr.example
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    object LoghubSample {
    def main(args: Array[String]): Unit = {
    if (args.length < 7) {
     System.err.println(
       """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
         |            
         |           
       """.stripMargin)
     System.exit(1)
    }
    val loghubProject = args(0)
    val logStore = args(1)
    val loghubGroupName = args(2)
    val endpoint = args(3)
    val accessKeyId = args(4)
    val accessKeySecret = args(5)
    val batchInterval = Milliseconds(args(6).toInt * 1000)
    val conf = new SparkConf().setAppName("Mysql Sync")
    //    conf.setMaster("local[4]");
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
     ssc,
     loghubProject,
     logStore,
     loghubGroupName,
     endpoint,
     1,
     accessKeyId,
     accessKeySecret,
     StorageLevel.MEMORY_AND_DISK)
    loghubStream.foreachRDD(rdd =>
       rdd.saveAsTextFile("/mysqlbinlog")
    )
    ssc.start()
    ssc.awaitTermination()
    }
    }           

其中的主要改動是:

loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) )           

這樣在 EMR 叢集中運作時,就會把Spark Streaming 中流出來的資料,儲存到 EMR 的 HDFS 中。

  1. 說明

    由于如果要在本地運作,請在本地環境提前搭建 Hadoop 叢集。

由于 EMR 的 Spark SDK 做了更新,其示例代碼比較舊,不能直接在參數中傳遞 OSS 的 AccessKeyId、AccessKeySecret, 而是需要通過 SparkConf 進行設定,如下所示。

trait RunLocally {
val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.mapreduce.job.run-local", "true")
conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
conf.set("spark.hadoop.job.runlocal", "true")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")
val sc = new SparkContext(conf)
def getAppName: String
}           

在本地調試時,需要把 loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) ) 中的 /mysqlbinlog 修改成本地 HDFS的位址。

  1. 代碼編譯。

    在本地調試完成後,我們可以通過如下指令進行打包編譯:

  2. clean install
  3. 上傳 jar 包。

    請先在 OSS 上建立 bucket 為 qiaozhou-EMR/jar的目錄,然後通過OSS 控制台或 OSS 的 SDK 将 /target/shaded目錄下的 examples-1.1-shaded.jar上傳到 OSS 的這個目錄下。上傳後的 jar 包位址為 oss://qiaozhou-EMR/jar/examples-1.1-shaded.jar,這個位址在後面會用上,如下圖所示:

基于Spark Streaming 進行 MySQL Binlog 日志準實時傳輸
  1. 搭建 EMR 叢集,建立任務并運作執行計劃。
    1. 通過 EMR 控制台建立一個 EMR 叢集,大約需要 10 分鐘左右,請耐心等待。
    2. 建立一個類型為 Spark 的作業。

      請根據您具體的配置将

      SLS_endpoint $SLS_access_id $SLS_secret_key

      替換成真實值。請注意參數的順序,否則可能會報錯。
    —master yarn —deploy-mode client —driver-memory 4g —executor-memory 2g —executor-cores 2 —class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $SLS_endpoint $SLS_access_id $SLS_secret_key 1           

運作以上的指令

  1. 查詢 Master 節點的IP
  2. 通過 SSH 登入後,執行以下指令:
  3. fs -ls /
  4. 可以看到 mysqlbinlog 開頭的目錄,再通過以下指令檢視 mysqlbinlog 檔案:
  5. fs -ls /mysqlbinlog

還可以通過 hadoop fs -cat /mysqlbinlog/part-00000 指令檢視檔案内容。

  1. 錯誤排查。

    如果沒有看到正常的結果,可以登陸節點,檢視對應的作業的錯誤情況。

繼續閱讀