基本架構
RDS -> SLS -> Spark Streaming -> Spark HDFS
上述鍊路主要包含3個過程:
- 如何把 RDS 的 binlog 收集到 SLS。
- 如何通過 Spark Streaming 将 SLS 中的日志讀取出來,進行分析。
- 如何把鍊路 2 中讀取和處理過的日志,儲存到 Spark HDFS中。
環境準備
- 安裝一個 MySQL 類型的資料庫(使用 MySQL 協定,例如 RDS、DRDS 等),開啟 log-bin 功能,且配置 binlog 類型為 ROW 模式(RDS預設開啟)。
- 開通 SLS 服務。
操作步驟
- 檢查 MySQL 資料庫環境。
- 檢視是否開啟 log-bin 功能。
mysql> show variables like "log_bin"; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.02 sec)
- 檢視 binlog 類型
mysql> show variables like "binlog_format"; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.03 sec)
- 添加使用者權限。(也可以直接通過RDS控制台添加)
CREATE USER canal IDENTIFIED BY ‘canal’;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;FLUSH PRIVILEGES;
- 為 SLS 服務添加對應的配置檔案,并檢查資料是否正常采集。
- 在 SLS 控制台添加對應的 project 和 logstore,例如:建立一個名稱為 canaltest 的 project,然後建立一個名稱為 canal 的 logstore。
- 對 SLS 進行配置:在 /etc/ilogtail 目錄下建立檔案user_local_config.json,具體配置如下:
其中 detail 中的 Host 和 Password 等資訊為 MySQL 資料庫資訊,User 資訊為之前授權過的使用者名。aliUid、defaultEndpoint、project_name、category 請根據自己的實際情況填寫對應的使用者和 SLS 資訊。{ "metrics": { "##1.0##canaltest$plugin-local": { "aliuid": "****", "enable": true, "category": "canal", "defaultEndpoint": "*******", "project_name": "canaltest", "region": "cn-hangzhou", "version": 2 "log_type": "plugin", "plugin": { "inputs": [ { "type": "service_canal", "detail": { "Host": "*****", "Password": "****", "ServerID": ****, "User" : "***", "DataBases": [ "yourdb" ], "IgnoreTables": [ "\\S+_inner" ], "TextToString" : true } } ], "flushers": [ { "type": "flusher_sls", "detail": {} } ] } } } }
- 等待約 2 分鐘,通過 SLS 控制台檢視日志資料是否上傳成功,具體如圖所示。
基于Spark Streaming 進行 MySQL Binlog 日志準實時傳輸
如果日志資料沒有采內建功,請根據SLS的提示,檢視SLS的采集日志進行排查。
- 準備代碼,将代碼編譯成 jar 包,然後上傳到 OSS。
- 将 EMR 的示例代碼通過 git 複制下來,然後進行修改,具體指令為:
示例代碼中已經有 LoghubSample 類,該類主要用于從 SLS 采集資料并列印。以下是修改後的代碼,供參考:git clone https://github.com/aliyun/aliyun-emapreduce-demo.git。
package com.aliyun.emr.example import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.aliyun.logservice.LoghubUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} object LoghubSample { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar | | """.stripMargin) System.exit(1) } val loghubProject = args(0) val logStore = args(1) val loghubGroupName = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val conf = new SparkConf().setAppName("Mysql Sync") // conf.setMaster("local[4]"); val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, loghubProject, logStore, loghubGroupName, endpoint, 1, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.foreachRDD(rdd => rdd.saveAsTextFile("/mysqlbinlog") ) ssc.start() ssc.awaitTermination() } }
其中的主要改動是:
loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) )
這樣在 EMR 叢集中運作時,就會把Spark Streaming 中流出來的資料,儲存到 EMR 的 HDFS 中。
-
說明
由于如果要在本地運作,請在本地環境提前搭建 Hadoop 叢集。
由于 EMR 的 Spark SDK 做了更新,其示例代碼比較舊,不能直接在參數中傳遞 OSS 的 AccessKeyId、AccessKeySecret, 而是需要通過 SparkConf 進行設定,如下所示。
trait RunLocally {
val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.mapreduce.job.run-local", "true")
conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
conf.set("spark.hadoop.job.runlocal", "true")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")
val sc = new SparkContext(conf)
def getAppName: String
}
在本地調試時,需要把 loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) ) 中的 /mysqlbinlog 修改成本地 HDFS的位址。
-
代碼編譯。
在本地調試完成後,我們可以通過如下指令進行打包編譯:
- clean install
-
上傳 jar 包。
請先在 OSS 上建立 bucket 為 qiaozhou-EMR/jar的目錄,然後通過OSS 控制台或 OSS 的 SDK 将 /target/shaded目錄下的 examples-1.1-shaded.jar上傳到 OSS 的這個目錄下。上傳後的 jar 包位址為 oss://qiaozhou-EMR/jar/examples-1.1-shaded.jar,這個位址在後面會用上,如下圖所示:
- 搭建 EMR 叢集,建立任務并運作執行計劃。
- 通過 EMR 控制台建立一個 EMR 叢集,大約需要 10 分鐘左右,請耐心等待。
-
建立一個類型為 Spark 的作業。
請根據您具體的配置将
替換成真實值。請注意參數的順序,否則可能會報錯。SLS_endpoint $SLS_access_id $SLS_secret_key
—master yarn —deploy-mode client —driver-memory 4g —executor-memory 2g —executor-cores 2 —class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $SLS_endpoint $SLS_access_id $SLS_secret_key 1
運作以上的指令
- 查詢 Master 節點的IP
- 通過 SSH 登入後,執行以下指令:
- fs -ls /
- 可以看到 mysqlbinlog 開頭的目錄,再通過以下指令檢視 mysqlbinlog 檔案:
- fs -ls /mysqlbinlog
還可以通過 hadoop fs -cat /mysqlbinlog/part-00000 指令檢視檔案内容。
-
錯誤排查。
如果沒有看到正常的結果,可以登陸節點,檢視對應的作業的錯誤情況。