文章目錄
- 一、 一次讀一個檔案,檔案是單通道的windows檔案
-
- (一)目标檔案資訊:
- (二)kuiper處理原理
- (三)實操流程
-
- 1. 解析wav
- 2. sources
- 3. function
- 4. sinks
- 5. kuiper-stream
- 6. kuiper-rule
- (四)結果:
- 二、一個檔案分多次讀, 一次讀1024個資料,檔案是單通道的windews檔案
-
- (一)和上一個實驗的差別
-
- source
- (二)結果
- 三、反思
- 四、讀取demo檔案,一次讀取整個檔案
-
- (一)demo檔案解析
-
- 1. demo檔案包含的資訊
- 2. 使用go程式解析demo
- 解決了的問題
-
- kuiper server報錯:
-
- 解決方法
- sink判斷輸入類型
- 還要解決的問題
一、 一次讀一個檔案,檔案是單通道的windows檔案
(一)目标檔案資訊:
一秒左右的标準wav檔案,資訊如下:
File size : 84744 bytes
Canonical format : true
Audio format : 1
Number of channels: 1
Sampling rate : 44100 Hz
Sample size : 16 bits
Number of samples : 42364
Sound size : 84728 bytes
Sound duration : 0s
(二)kuiper處理原理
source:讀取wav檔案,讀到傳回檔案數組,并開始讀取下一個檔案。沒讀取到就等3秒再讀
function:對接收到的數組fft,每接收一個就fft一次,傳回fft後的數組
sink:得到一個數組就以json格式追加寫入到目标檔案中,這裡是追加,而沒有把每個數組分别存儲到不同的檔案,後面也可以改成這樣
總結:讀取wav,fft,寫入txt
(三)實操流程
1. 解析wav
安裝了一個可以讀取wav檔案的庫,修改以後可以讀取标準wav檔案,傳回檔案本身的資訊和檔案包含的資訊。前者包括通道數,bits等,後者是一系列的數組。
2. sources
source讀取wav檔案,讀到傳回檔案數組,并開始讀取下一個檔案。沒讀取到就等3秒再讀
核心代碼
go func(exeCtx api.StreamContext) {
for{
select{
case s.filepath = <-c:
fmt.Println(s.filepath)
row_arr:=Main1(s.filepath)
println("len(row_arr)=",len(row_arr))
for i:=0;i<len(row_arr);i++{//把讀取到的數組傳回,每次傳回fft_len
ret:=row_arr[i]
s.pattern["value"]=ret
consumer<-api.NewDefaultSourceTuple(s.pattern,nil)
}
case <-exeCtx.Done():
return
}
}
}(exeCtx)
go func() {
//此線程不斷查詢新的檔案,有就把檔案名傳入通道c,并查詢下一個檔案,沒有就等待3s
id:=0
filename := getfilepath(id)
for{
_, err := os.Stat(filename)
if(err==nil){//如果存在目标檔案,即未讀取的檔案
time.Sleep(3000)
c<-filename
id++
filename = getfilepath(id)
fmt.Println(filename)
}else{
time.Sleep(3000)//等待3秒
//fmt.Println("waiting file")
}
}
}()
3. function
function簡單,主要是fft
核心代碼
func (f *row2fft) Exec(args []interface{}) (interface{}, bool) {
m :=args[0].([]float64)
a:=fftabs(m)
fmt.Printf("變換前第3個%v,變換後第3個%v\n",m[2],a[2])
return a,true
}
4. sinks
把傳來的[]byte ,以json格式追加寫入txt檔案。
核心代碼:
func (m *wavSink) Collect(ctx api.StreamContext, item interface{}) error {
logger := ctx.GetLogger()
if v, ok := item.([]byte); ok {
var str_content = string(v)
fd_content:=strings.Join([]string{str_content,"\n"},"")
buf:=[]byte(fd_content)
m.fd.Write(buf)
println("received")
println(len(fd_content))
logger.Debug("wavSink sink receive data")
} else {
logger.Debug("wavSink sink receive non byte data")
println("item isnot []byte")
}
println("collect func finished")
return nil
}
5. kuiper-stream
bin/cli create stream streamReadWav '(value array(float)) with (datasource="topicReadWav",TYPE="readWav")'
//流名稱 (sources中map的鍵:value 類型) with (主題, sources插件名)
6. kuiper-rule
{
"id": "ruleWavSink",
"sql": "SELECT row2fft(value) from streamReadWav",
"actions": [
{
"log":{},
"wavSink": {}
}
]
}
(四)結果:
成功,對比使用go語言直接讀取并fft得到的數組,一樣。
得到的txt數組大小 800k。主要是浮點數比較長,存儲起來比較費空間。
二、一個檔案分多次讀, 一次讀1024個資料,檔案是單通道的windews檔案
(一)和上一個實驗的差別
function相同,sink沒有,直接查詢的,是以可以認為這兩個都相同,隻有source不同
source
隻有下面的地方不同
go func(exeCtx api.StreamContext) {
for{
select{
case s.filepath = <-c:
num :=1
fmt.Println(s.filepath)
row_arr:=Main1(s.filepath)
println("len(row_arr)=",len(row_arr))
ret_arr1:=make([]interface{},s.fft_len)
for i:=0;i<len(row_arr)-s.fft_len;i=i+s.fft_len{//把讀取到的數組傳回,每次傳回fft_len
for j:=0;j<s.fft_len;j++{
ret_arr1[j]=row_arr[i+j]
}
s.pattern["value"]=ret_arr1
fmt.Printf("source第%v次傳回的數組第3個數字%v\n",num,ret_arr1[2])
num++
//等待小數組放進去
consumer<-api.NewDefaultSourceTuple(s.pattern,nil)
}
case <-exeCtx.Done():
return
}
}
}(exeCtx)
想要source讀取一個檔案,得到一個大數組,把大數組切割,每1024位切割出來,傳回到通道;
同時function每接收到一個數組,就fft,在kuiper查詢處理結果。
(二)結果
發生了錯誤如下:
-
source每讀取1024個位元組就傳給consumer通道
source的源碼為:
。。。
fmt.Printf("source第%v次傳回的數組第3個數字%v\n",num,ret_arr1[2])
consumer<-api.NewDefaultSourceTuple(s.pattern,nil)
-
function每接收到一個數組就進行fft:
源碼為
func (f *row2fft) Exec(args []interface{}) (interface{}, bool) {
m :=args[0].([]float64)
a:=fftabs(m)
fmt.Printf("變換前第3個%v,變換後第3個%v\n",m[2],a[2])
return a,true
}
-
結果為:
檔案長度42364,每次1024,舍棄後面的部分,可以得到41個長1024的數組。
souece源碼中
fmt.Printf("source第%v次傳回的數組第3個數字%v\n",num,ret_arr1[2])
語句按順序執行了,依次列印輸出0,-6,29.。。
和預期的相同,說明前面部分沒有錯誤。
function列印出的順序有誤,而且有的數組重複列印了,有的沒列印,多次運作列印的東西還不相同;不過總共列印次數居然和預期一樣是41,說明剛好接收到了41個數組。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL90zdNpXSURmNk1mY2hnMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL5MTO0MzMwcTM2ATNwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
三、反思
雖然source一次讀取整個一個文檔并傳回可以成功,但是source一次傳回1024個就會導緻function亂序。不知道是什麼原因。
我不由得反思,source一次讀取一個文檔并傳回,如果文檔較多,比如幾十個,function處理的順序會變嗎,順序變化還影響不大,假如也出現 有的處理了多次,有的不處理,那就很麻煩了。
四、讀取demo檔案,一次讀取整個檔案
(一)demo檔案解析
1. demo檔案包含的資訊
和普通wav檔案類似,但是demo和普通的wav檔案有些不同:
- 一是通道,demo是3通道,xlsx檔案寫的是2-5通道,不知道實際應用中是固定3通道還是2-5通道
- 二是demo包含一些新的資訊,我找的wav解析包解析不出來這些資訊,如果這些資訊重要,還得自己寫解析函數,如果不重要或者說所有的wav檔案資訊相同,就沒必要解析了,直接用notepad打開wav檔案可以看到這些資訊
這些資訊包括:
44-299 256位元組的 UTF-8 裝置名稱
300(13CH)-307 8位元組 UINT64 時間戳
308-371 64位元組的 UTF-8通道名稱1:VIB1_1
372(174H)-435 64位元組的 UTF-8 單元名稱1: m/s^2
436(1B4)-439 浮點放大1 :@u<== 00 40 75 3C ==1,014,317,056
440-443 浮動偏移1 : 0
2. 使用go程式解析demo
使用go程式,對整個demo檔案解析,選取其中的一個通道,讀取到的資料如下:
這個圖橫坐标太多了,多以全紅。繪制通道1、原信号的前1024位
進行fft變換,得到的結果如下。
感覺fft變換後的圖檔中沒有包含很多資訊,而且比較亂,大概在4 700 000HZ、6 000 000HZ、 500 000HZ頻率下有較多資料。
如果隻選擇1024位進行fft,結果如下:
感覺資訊更少
另外兩個通道的圖類似。
解決了的問題
kuiper server報錯:
因為source的open方法需要傳回一個[]interface {}的接口類型,而我傳回的是[]float64,是以報錯。
解決方法
建立一個[]interface {},把[]float64中的元素一一指派給[]interface {}的元素。
sink判斷輸入類型
雖然function傳回流 []float64,但是sink斷言輸入類型位 []float64 時會報錯,要改為斷言為 []byte。然後用string(bu),轉為字元串。
但是怎麼由[]byte轉為數組?不知道
還要解決的問題
難題
- 為什麼function隻能一次接收一個檔案的數組,而一次接收1024個資料就會亂序,遺漏和重複
- 這樣如果wav檔案很多,function處理起來會不會也亂序,遺漏和重複呢
待解決
- sink可以寫入不同的檔案。
- 嘗試一次讀取多個檔案(意義不大)
- function一次處理1024個資料
- 3通道/2-5通道 時如何處理
- 如果新區資訊是必要的,怎麼獲得
擴充:
- fft是否精度有問題(不會)
- 是否需要相頻,時域虛數等資訊
總之,三個插件,function基本不用改了,另外兩個可以修改。當然,多通道時可能也要改function