天天看點

DolphinDB +機器學習,預測地震波形資料

作者:DolphinDB

1. 地震波形資料預測業務場景說明

在地震波形資料異常檢測場景中,通常需要使用多種工具和方法來提高檢測精度和魯棒性。其中,FilterPicker 是一種常用的基于模闆比對的異常檢測工具,可以實作地震波形資料的實時異常檢測和定位。FilterPicker 需要進行預處理,如噪聲過濾、信号增強等,然後根據預設的模闆和門檻值進行比對和檢測,最後輸出異常檢測的結果和位置資訊。

雖然 FilterPicker 可以快速、準确地檢測地震波形資料中的異常信号,但其精度和穩定性仍然受到多種因素的影響,例如噪聲幹擾、模闆選擇等。為了提高異常檢測的精度和魯棒性,可以使用深度學習算法進行進一步的檢測和預測。相比傳統的基于規則和模闆比對的方法,深度學習算法具有更好的适應性和泛化性能,可以處理更複雜的地震波形資料,并減少人為幹擾和誤判的風險。

綜上所述,業界在地震波形資料異常檢測場景中,通常是先使用 FilterPicker 進行一級異常檢測,若有異常點,則進一步使用深度學習的方法進行更加精确的檢測。

2. 波形資料異常檢測相關算法介紹

2.1 模闆比對算法

模闆比對算法是一種基于信号相似性的比對算法,主要利用信号進行中的相關性來進行模闆比對。

在模闆比對算法中,需要先定義一個待比對信号,即需要被檢測的信号。然後,構造一個模闆,模闆是一種已知特征的信号,其特征可以用來描述所要比對的目标。模闆通常是人為制定的,可以根據需要來選擇和設計。

模闆比對的核心是通過計算信号之間的相關系數來比較待比對信号和模闆之間的相似度。相關系數是一種用于描述兩個信号相似度的名額。在信号進行中,一般使用互相關函數(cross-correlation)來計算相關系數。

對于兩個長度為 N 的信号 X 和 Y,它們的互相關函數可以表示為:

DolphinDB +機器學習,預測地震波形資料

其中 m 為滞後值,R(m) 表示 X 和 Y 之間在滞後值為 m 時的相關性。

通過計算待比對信号與模闆之間的相關系數,可以得到一個相關系數序列,序列中的每個元素對應于待比對信号中的一個位置。相關系數序列中的極值通常被認為是比對信号和模闆之間的最佳位置,是以可以用來檢測待比對信号中是否存在與模闆相似的部分。

然而,模闆比對算法存在一些限制,如噪聲、幹擾等因素的影響,以及模闆比對在多元空間中的複雜性等。是以,在實際應用中,需要針對具體問題選擇合适的算法和參數,并進行優化和改進。

2.2 線性趨勢去除算法

在地震信号進行中,線性趨勢去除的基本思想是将信号的趨勢部分表示成一個線性函數的形式,并通過拟合線性函數來計算趨勢。由于線性函數的參數可以通過最小二乘法求解,是以這種方法具有計算簡單、易于實作等優點。具體而言,線性趨勢去除的過程可以分為以下幾個步驟:

step1:計算信号的時間序列 t 和信号值序列 x 的均值和,即:

DolphinDB +機器學習,預測地震波形資料

其中,N 表示信号的長度。

step2:計算信号值序列相對于均值的偏差 Δxi 和時間序列相對于均值的偏差 Δti。

step3:用最小二乘法拟合一條直線,使得它盡可能地接近信号值序列 Δx,即:

DolphinDB +機器學習,預測地震波形資料

其中,a 和 b 是拟合直線的斜率和截距,εi 是誤差項。

step4:計算拟合直線的值 yi,即:

DolphinDB +機器學習,預測地震波形資料

step5:将信号的趨勢部分去除,得到去趨勢後的信号,即:

DolphinDB +機器學習,預測地震波形資料

2.3 sosBandpass 算法

sosBandpass 是一種數字信号處理算法,用于對地震資料進行帶通濾波。其原理是使用一組二階 IIR 濾波器級聯,實作對地震資料在指定頻率範圍内的濾波,同時保留原始信号的相位資訊。

具體地,sosBandpass 算法使用了一組二階 IIR 濾波器級聯的結構,也稱為“串聯二階段濾波器”(Second-Order Sections Filter,SOS Filter)。每個二階 IIR 濾波器的傳遞函數可以表示為:

DolphinDB +機器學習,預測地震波形資料

其中 z 是機關圓上的複變量,b0 b1, b2, a1, a2 是濾波器的系數。通過調整這些系數的值,可以實作不同類型的濾波器,例如低通濾波器、高通濾波器和帶通濾波器等。

對于一個帶通濾波器,其通帶範圍可以表示為 [flow, fhigh],其中 flow 和 fhigh 分别表示低頻和高頻截止頻率。在 sosBandpass 算法中,通帶範圍被分成若幹個子段,每個子段對應一個二階 IIR 濾波器。假設通帶範圍被分成了 N 個子段,那麼 sosBandpass 算法可以表示為:

DolphinDB +機器學習,預測地震波形資料

其中 x[n] 表示原始地震資料,y[n] 表示濾波後的地震資料。設 Hi(z) 表示第 i 個二階 IIR 濾波器的輸出,可以表示為:

DolphinDB +機器學習,預測地震波形資料

其中,bi,0 ,bi,1,bi,2,ai,1, ai,2 是第 i 個二階 IIR 濾波器的系數。

3. DolphinDB解決方案

DolphinDB +機器學習,預測地震波形資料
  • 實時流接入:流資料表是 DolphinDB 設計專門用于應對實時流資料存儲與計算的記憶體表。具備吞吐量大,低延遲的優點,支援持久化,支援高可用。
  • 流資料釋出、訂閱與消費:DolphinDB 流資料子產品采用“釋出-訂閱-消費”的模式。流資料首先注入流資料表中,通過流資料表來釋出資料,資料節點或者第三方的應用可以通過 DolphinDB 腳本或 API 來訂閱及消費流資料。
  • FilterPicker 插件:FilterPicker 是一種自動地震信号檢測工具,可以從大量地震資料中自動檢測和識别地震信号。它主要應用于地震預警、地震監測、地震研究等領域。DolphinDB 開發了對應的 FilterPicker 插件,根據該插件,可在 DolphinDB 内調用模闆比對算法,實作對地震波資料的處理,輸出其中的突峭點。
  • RTSeis 插件:RTSeis 是一個基于 Python 的實時地震資料處理包,包括地震資料的讀取、處理、濾波、台陣響應的移除和地震事件檢測等。DolphinDB 開發了對應的 RTSeis 插件,根據該插件,可以對波形資料進行 sosBandPass 濾波處理。
  • TensorFlow 插件:使用者可以使用 TensorFlow 架構将訓練好的模型導出成 .pb 檔案,在 DolphinDB 中通過 TensorFlow 插件調用該模型進行預測。

技術架構圖中,DolphinDB 流資料表接收外部地震計産生的實時流資料,調用 FilterPicker 插件中的模闆比對算法對實時資料進行一級異常檢測,對于異常點,先進行去趨勢、濾波處理,再将結果歸一化,最後調用 TensorFlow 插件進行預測,輸出異常檢測結果。

4. 環境準備

本節主要内容為加載插件、建立流資料表等,是後續波形實時資料模拟、異常檢測的前備工作。

4.1 加載插件

為實作波形資料異常檢測,DolphinDB 開發了對應的插件,見附錄(目前隻提供離線版,插件正式釋出後會提供下載下傳連結)。下載下傳插件,在 plugins 目錄下建立 filterpicker、rtseis、tf 三個檔案夾,将插件解壓到對應的檔案夾裡。

注意:添加環境變量前需關閉 DolphinDB Server。

Linux 添加環境變量:

export LD_LIBRARY_PATH=<YOUR DolphinDB Dir>/server/plugins/rtseis/:$LD_LIBRARY_PATH
export LD_LIBRARY_PATH=<YOUR DolphinDB Dir>/server/plugins/tf/:$LD_LIBRARY_PATH           

啟動 DolphinDB Server,通過 loadPlugin() 函數加載插件

try{ loadPlugin("./plugins/filterpicker/PluginFilterPicker.txt") }catch(ex){print(ex) }
try{ loadPlugin("./plugins/rtseis/PluginRTSeis.txt") }catch(ex){print(ex) }
try{ loadPlugin("./plugins/tf/PluginTf.txt") }catch(ex){print(ex) }           

4.2 流資料表環境清理

清理流資料表之前需要取消訂閱,通過 unsubscribeTable() 函數取消訂閱,然後再通過 dropStream() 函數删除流資料表:

unsubscribeTable(tableName = "dataStream", actionName="filterPickerPredict");go
unsubscribeTable(tableName = "pickerStream",actionName="tensorFlowPredict");go
try{dropStreamTable("dataStream")}catch(ex){print(ex)}
try{dropStreamTable("pickerStream")}catch(ex){print(ex)}
try{dropStreamTable("tensorStream")}catch(ex){print(ex)}           

以 enableTableShareAndPersistence() 函數建立流資料共享表并持久化:

//dataStream,接收波形實時流資料
enableTableShareAndPersistence(table=streamTable(1000000:0, `tagid`ts`data, [INT,TIMESTAMP, INT]), tableName=`dataStream, cacheSize=1000000);
  //pickerStream
enableTableShareAndPersistence(table=streamTable(100000:0, `ts`id, [TIMESTAMP,INT]), tableName=`pickerStream, cacheSize=1000000);
  //tensorStream
enableTableShareAndPersistence(table=streamTable(100000:0, `ts`id, [TIMESTAMP, INT]), tableName=`tensorStream, cacheSize=1000000);           

建立分布式資料庫和次元表:

if(existsDatabase("dfs://seis01")) dropDatabase("dfs://seis01");
  //建立分布式資料庫
create database "dfs://seis01" partitioned by VALUE(1..10), engine='TSDB'
  //建立次元表
create table "dfs://seis01"."tagInfo"(
	xfdsn SYMBOL,
	net SYMBOL,
	sta SYMBOL,
	loc SYMBOL,
	chn SYMBOL,
	id INT[compress="delta"]
)
sortColumns=[`id];
  //向次元表插入資料
net = ["ZJ","YN","XZ","XJ","TJ"]
sta = ["A0001","A0002","A0003","A0004","A0005","A0006","B0001","B0002","B0003","C0001"]
tmp = `EIE`EIN`EIZ
netList = stretch(net,150)
staList = take(stretch(sta,30),150)
locList = take(`40,150)
chn = take(tmp,150)
colt =   array(STRING)
for(i in 0..(chn.size()-1)){
	colt.append!( chn[i].split()[0] + "_" + chn[i].split()[1] + "_" +chn[i].split()[2] )
}
xfdsn = "XFDSN:"+netList+"_"+staList+"_"+locList+"_"+colt
t = table(xfdsn,netList as net,staList as sta,locList as loc,chn,1..150 as id)
loadTable( "dfs://seis01","tagInfo").append!(t);           

5. 方案實作

本章包括以下内容

  • 波形實時資料模拟
  • 一級異常檢測
  • 二級異常檢測

5.1 波形實時資料模拟

實際生産環境中每個通道每10毫秒采集一次資料,每個台站有三個通道。以下代碼模拟了三個台站的波形實時資料:

/*
 * 模拟實時資料
 */
def insertIntoDataStream(){
	do{
		tagidList = 1 2 3 4 5 6 7 8 9
		ts = now()+(0..400)*10
		t = table(stretch(tagidList,3600)as tagid,take(ts,3600) as ts,randNormal(-2500, 1000, 3600) as data)
		objByName(`dataStream).append!(t)
		sleep(3500)
	}while(true)
}
jobId = submitJob("simulate","simulate",insertIntoDataStream);
//通過以下方式取消Job
//cancelJob(jobId)           

5.2 一級異常檢測

通過 subscribe() 函數訂閱實時資料,進行一級異常檢測,處理邏輯及代碼如下所示:

step1:将實時資料按照 tagid 分組

step2:對每一 tagid,調用 filterPicker::createFilterPicker() 函數建立 filter handler

step3:調用 filterPicker::filerPicker(pickerPtr, timeCol, dataCol, [fixedSize]) 函數進行模闆比對,得到突峭點。其中,各個參數含義如下所示:

  • pickerPtr:每一 tagid 對應的 filter handler;
  • timeCol:每一 tagid 對應的時間列;
  • dataCol:每一 tagid 對應的采樣值;
  • fixedSize:批計算資料量大小,若資料不足 fixedSize,會累積到下次湊足 fixedSize 再進行計算

其中,突峭點算法參考 ALomax - FilterPicker - a general purpose, broad-band, phase detector and picker

/*
 *一級異常檢測處理函數
 *計算波形實時資料中的突峭時間點
 *傳回值為突峭時間點和通道id
 */
def pickerHandler(mutable pickerSt, mutable res, vz, msg){
	tags=groups(msg[`tagid]) //按tagid分組
	for (tag in tags.keys()){
		re=res[tag]
		if(re==NULL){
			//建立filter handler
			re= filterPicker::createFilterPicker()
			res[tag]=re
		}
		//将資料傳入filterPicker進行計算
		vt=filterPicker::filterPicker(re,msg[tags[tag]][`ts], msg[tags[tag]][`data].float(), 1000)
		if(vt.size()>0){
			vid=take(tag,vt.size())
			pickerSt.append!(table(vt as ts,vid as id))
		}
	}
}

vz=exec id from loadTable("dfs://seis01","tagInfo") where chn ilike("%z") //隻需要計算Z分量的資料
res=dict(INT, ANY)
//訂閱dataStream,進行一級異常檢測,異常檢測結果輸出到filterStream中
subscribeTable(tableName = "dataStream", actionName="filterPickerPredict", offset=-1,handler=pickerHandler{objByName("pickerStream"), res, vz, }, msgAsTable=true, batchSize = 2000, throttle=1,reconnect=true)           

一級異常檢測結果樣例如下所示:

ts id
2023.04.20T14:37:57.838 1

上述異常時間點前後幾秒内的采樣資料如下:

tagid ts data
1 2023.04.20T14:37:57.797 -3120
1 2023.04.20T14:37:57.807 -3005
1 2023.04.20T14:37:57.818 -521
1 2023.04.20T14:37:57.828 -3886
1 2023.04.20T14:37:57.838 1441
1 2023.04.20T14:37:57.848 1220
1 2023.04.20T14:37:57.858 -974
1 2023.04.20T14:37:57.868 -396
1 2023.04.20T14:37:57.878 -943

從采樣資料來看,“2023.04.20T14:37:57.838”時間點前後振幅明顯變大,确實是一個突峭點。

5.3 二級異常檢測

通過 subscribe() 函數訂閱 pickerStream,進行去趨勢、濾波、歸一化以及二級異常檢測,處理邏輯及代碼如下所示:

step1:調用 tf::load(modelFileName, inputName, outputName, [shape]) 函數導入訓練好的 TensorFlow 模型(本文使用的深度學習模型不友善對外提供,讀者需自行訓練模型并調用)。其中各參數含義如下:

  • modelFileName:模型路徑。如果是 keras 生成的 h5 格式的模型檔案,可用 Keras to TensorFlow 将 h5 格式的模型檔案轉成 TensorFlow 的 pb 檔案後再進行加載。
  • inputName:模型中輸入節點名稱,如果不知道,可用附錄的 printTensorName.py 檔案進行列印,輸出的第一個即為輸入節點名稱。
  • outputName:模型中輸出節點名稱,如果不知道,可用附錄的 printTensorName.py 檔案進行列印,輸出的最後一個即為輸出節點名稱。
  • shape:N 維 (N>=3) 數組的 shape,當 N>=3 時才需要提供,類型為 int 類型的 vector,如[3, 5, 8]表示 shape 為3×5×8。如果提供了 shape,後面用于預測的 data 請使用 flatten(x) 函數轉成的一維向量。

step2:提取突峭點前後兩秒内三分量資料

step3:剔除趨勢成分(本文将趨勢視為線性的,采用最小二乘法估計線性函數的參數)

step4:調用 rtseis::sosBandPass(x, nTaps, f_min, f_max, windowType, fsamp, ripple) 函數對三通道資料進行 sosBandPass 變換。其中,各個參數含義如下所示:

  • x:需要處理的資料
  • nTaps:篩選器參數
  • f_min:最低頻率
  • f_max:最高頻率
  • windowType:視窗類型,0-3分别代表 BUTTERWORTH,BESSEL,CHEBYSHEV1,CHEBYSHEV2
  • fsamp:采樣頻率
  • ripple:windowType 為2或3時的波紋大小

step5:先對濾波資料進行歸一化處理,然後調用 tf::predict(model, data) 函數進行預測,當預測機率 prob_P 或者 prob_s 的機率大于0.9時,認為該段信号很可能是地震波 P 波或者 S 波信号,然後輸出對應的 filterpicker 的突峭點。其中,tf::predict(model, data) 各參數含義如下所示:

  • model:tensorflow 的 model,為 tf::load() 函數的傳回值
  • data:需要預測的資料,僅支援同種資料類型的 matrix、vector、table 或者 dictionary。如果在 tf::load() 時提供了 shape 參數,則 data 必須為使用 flatten(x) 函數轉成的一維向量。
def tensorHandler(mutable tensorSt, mutable infoTable, mutable model, mutable cache, msg){
	if(msg.type()!=0) cache.append!(msg)	
	if(cache.size()==0) return
	currTime = now() - 2000
	outed = select * from cache where timestamp  < currTime
	for(s in outed){
		//擷取同個台站id資訊
		info = select net,sta,loc from infoTable where id == s[`id]
		netInfo = info[`net][0]
		staInfo = info[`sta][0]
		bhzInfo = info[`loc][0]
		bheId = exec id from infoTable where net == netInfo and sta == staInfo and loc == bhzInfo and ilike(chn,"%e")
		bhnId = exec id from infoTable where net == netInfo and sta == staInfo and loc == bhzInfo and ilike(chn,"%n")
		bhzId = exec id from infoTable where net == netInfo and sta == staInfo and loc == bhzInfo and ilike(chn,"%z")
		//擷取三個分量資料
		begT = timestamp(s[`timestamp] - 2000)
		endT = timestamp(s[`timestamp] + 1999)
		ve = exec data from dataStream where tagid ==bheId[0] and ts between(begT:endT)
		vn = exec data from dataStream where tagid ==bhnId[0] and ts between(begT:endT)
		vz = exec data from dataStream where tagid ==bhzId[0] and ts between(begT:endT)
		if(ve.size()==  400 && vn.size() ==  400 && vz.size() ==  400) {
			//去趨勢
			tem = ols(vn, 0..399)
			bhn = vn - (tem[0] + tem[1]*0..399)
			tem = ols(ve, 0..399)
			bhe = ve - (tem[0] + tem[1]*0..399)
			tem = ols(vz, 0..399)
			bhz = vz - (tem[0] + tem[1]*0..399)
			//濾波
			bhn = rtseis::sosBandPass(bhn,4,3.0,20.0,0,100.0,0.0);
			bhe = rtseis::sosBandPass(bhe,4,3.0,20.0,0,100.0,0.0);
			bhz = rtseis::sosBandPass(bhz,4,3.0,20.0,0,100.0,0.0);
			m = matrix(bhn,bhe,bhz)
			//矩陣資料歸一化
			m1 = m / max(max(abs(m)))
			//輸入模型進行預測, ts傳回值為預測結果
			ts = tf::predict(model, float(m1))
			//prob_p 是P波的機率,prob_S是S波機率, prob_N是噪聲的機率。
			prob_P = ts[0]
			prob_S = ts[1]
			prob_N = ts[2]
			if(prob_P > 0.90 || prob_S > 0.90){
				tensorSt.append!(table(s[`timestamp] as timestamp, s[`id] as id))
			}	
		 }
	}
	delete from cache where timestamp  < currTime	
}

  //導入模型
model = tf::load("./model_pol.pb", "conv1d_1_input", "activation_7_1/concat")
infoTable = select * from  loadTable("dfs://seis01","tagInfo")
vz=exec id from loadTable("dfs://seis01","tagInfo") where chn ilike("%z") //隻需要計算Z分量的資料
cache =   table(10:0, `timestamp`id, [TIMESTAMP, INT])
  //訂閱pickerStream,進行二級異常檢測
subscribeTable(tableName = "pickerStream",actionName="tensorFlowPredict", offset=-1, handler=tensorHandler{objByName("tensorStream"),infoTable, model,cache,}, msgAsTable=true, batchSize = 1000, throttle=0.4, timeTrigger=true,reconnect=true)           

對一級異常檢測的結果進行二級異常檢測後,結果樣例如下表所示:

ts id
2023.04.20T14:37:57.838 1

6. 附錄

插件及腳本檔案

  • filterpicker
  • rtseis
  • tf
  • printTensorName.py

以上三個插件及 printTensorName.py 可以在 Earthquake_Prediction_with_DolphinDB_and_Machine_Learning 上下載下傳并解壓便可擷取。

表結構

次元表結構:

字段 DolphinDB 資料類型 含義
xfdsn SYMBOL
net SYMBOL 台網
sta SYMBOL 台站
loc SYMBOL 位置
chn SYMBOL 通道
id INT

dataStream 表結構:

字段名 DolphinDB 資料類型 含義
tagid INT 次元表中的 id,代表一個通道
ts TIMESTAMP 采樣時間
data INT 采樣值

pickerStream 表結構:

字段 DolphinDB 資料類型 含義
ts TIMESTAMP 采樣時間
id INT 次元表中的 id,代表一個通道

tensorStream 表結構:

字段 DolphinDB 資料類型 含義
ts TIMESTAMP 采樣時間
id INT 次元表中的 id,代表一個通道

繼續閱讀