金融市場中我們經常會遇到切片資料,例如A股的L1行情,我們處理這類資料的思路無外乎對每一個Ticker,按照時間先後的順序,在1Min、5Min甚至日線的次元上上彙總特征,加工因子。考慮到執行效率問題,手工維護這個流程略顯冗繁,本文介紹如何使用Clickhouse資料倉庫來自動化的高效完成這類操作,并以K線合成為例完整走一遍流程。
一、基本概念
1.1 視窗
流資料的計算可以把連續不斷的資料按照一定的規則拆分成大量的片段,在片段内進行聚合統計和計算。常見的拆分方法有:
- 以時間為機關的不重疊片段,例如:每1分鐘、每1小時等,在流式計算裡通常稱為Tumble Window。在量化裡比如1分鐘K線、1小時K線,就是此類型。
- 以時間為機關的重疊片段,例如:前10分鐘、前1小時,每隔一個時間間隔向前滾動,前後兩個視窗有一定的重合部分,通常稱為Hop Window,很明顯,Tumble Window是Hop Window的一個特例。在量化裡,計算滑動均值,就是這類任務的一個代表。
- 以會話為機關的不連續片段,例如每個HTTP通路會話,可能被其他會話打斷,在量化交易中較少涉及,本文不再讨論。
1.2 時間
- • 處理時間Processing Time,指的是進行資料處理操作時,當時的系統時間。
- • 事件時間Event Time,指的是業務發生時間,每一條業務記錄上會攜帶一個時間戳。
- • 提取時間Ingestion Time,指的是資料進入處理系統時,當時的系統時間。
很明顯,量化系統中,處理曆史資料一定會用到事件時間,處理實時資料大部分情況也應用事件時間,少部分情況下可以用處理時間近似代替。本文将預設時間模式為事件時間。
1.3 視圖
ClickHouse支援建立普通視圖(normal view)、物化視圖(materialized view)、實時視圖(live view)和視窗視圖(window view),其中實時視圖和視窗視圖目前還是試驗功能,不能保證穩定性。
- • Normal View:視圖本身并不會存儲任何的資料,它們僅僅隻是讀取了所關聯的表格的查詢結果而已。一個視圖其實儲存的是一個 select查詢的語句,而不是它查詢的結果。
- • Materialized View:物化視圖和普通視圖最大的差別是物化視圖實際存儲了一份資料。使用者查詢的時候和表沒有差別,更像是一張時刻在預計算的表。在建立物化視圖的時候也需要定義存儲引擎。
- • Live View: 實時視圖是一種特殊的視圖,類似于ZooKeeper中的注冊監聽和Redis中的釋出訂閱,能夠将一條SQL查詢結果作為監控目标,當 Live view 變化時可以及時感覺到。
- • Window View:視窗可以按時間視窗聚合資料,類似Flink中的Window,并在視窗結束時輸出結果。它将部分聚合結果(預聚合)存儲在一個内部(或指定的)表中,以減少延遲,并可以将處理結果推送到指定的表或使用WATCH語句查詢推送通知。
通過上面的介紹,我們知道通過視窗視圖和時間函數,Clickhouse也擁有了流式資料處理能力。但視窗視圖處于實驗階段,需要我們手動開啟這項功能,開啟的方式有兩種:
- • 在sql語句中添加一條控制開關: set allow_experimental_window_view = 1
- • 在Clickhouse中增加一個使用者配置:• 建立檔案:nano /etc/clickhouse-server/users.d/allow_experimental_window_functions.xml• 寫入如下配置:
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<allow_experimental_window_view>1</allow_experimental_window_view>
</default>
</profiles>
</yandex>
其中增加使用者配置方案是永久性的,寫入後就預設開啟此功能。
二、設計資料表
2.1 原始tick行情資料加工
通常交易所tick行情提供的字段有:
- • open:開盤價
- • last:最新價
- • high:最高價
- • low:最低價
- • prev_close:昨收價
- • volume:累計成交量
- • total_turnover:累計成交額
- • change_rate:漲跌幅
- • ask_price_1-5:賣出價1-5檔
- • ask_volume_1-5: 賣出量1-5檔
- • ask_price_1-5:賣出價1-5檔
- • ask_volume_1-5: 賣出量1-5檔
實時處理時通常要使用一個全局字典,将累計成交量、累計成交額轉換成切片瞬時成交量和成交金額, 離線處理我們可用SQL進行簡單的轉換。
首先建立一張tick資料表(股票代碼、交易時間、tick價格、tick成交量、漲跌幅):
create table tick.sse50_data
(
ticker String,
trade_time DateTime('Asia/Shanghai'),
tick_price_close Float32,
tick_volume Int32,
close_chg_rate Float32
)
ENGINE = AggregatingMergeTree()
ORDER BY (trade_time, ticker)
然後使用如下SQL進行簡單加工,即通過volume - ifNull(any(volume) OVER (PARTITION BY stock_code ORDER BY trade_time ASC ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), 0) 語句獲得瞬時成交量:
select
stock_code as ticker,
trade_time,
last as tick_price_close,
toInt32(volume - ifNull(any(volume) OVER (PARTITION BY stock_code ORDER BY trade_time ASC ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), 0)) AS tick_volume,
round(100 * change_rate, 3) as close_chg_rate
from tick.sse_50
order by trade_time ASC, ticker
這裡我們可以把資料先存儲到data對象中,後面用來做行情回放,動态寫入tick.sse50_data表中
2.2 設計1分鐘視窗視圖
首先建立一張1分鐘特征表用來存儲加工得到的K線特征(包含1分鐘開盤價、收盤價、最高價、最低價、平均價、價格标準差、峰度等統計量):
create table if not exists tick.factor_m1
(
ticker String,
trade_timestamp DateTime('Asia/Shanghai'),
m1_price_open Float32,
m1_price_close Float32,
m1_price_high Float32,
m1_price_low Float32,
m1_price_avg Float32,
m1_volume Int32,
m1_chg_ptp Float32,
m1_chg_avg Float32,
m1_price_std Float32,
m1_price_skew Float32,
m1_price_kurt Float32
)
ENGINE = AggregatingMergeTree()
ORDER BY (trade_timestamp, ticker)
然後建立我們的主角,視窗函數:
CREATE WINDOW VIEW IF NOT EXISTS stock_m1 TO tick.factor_m1 WATERMARK=INTERVAL '2' SECOND AS
SELECT
ticker,
tumbleStart(trade_time_id) as trade_timestamp,
any(tick_price_close) as m1_price_open,
anyLast(tick_price_close) as m1_price_close,
max(tick_price_close) as m1_price_high,
min(tick_price_close) as m1_price_low,
0.5 * (m1_price_open + m1_price_close) as m1_price_avg,
sum(tick_volume) as m1_volume,
max(close_chg_rate) - min(close_chg_rate) as m1_chg_ptp,
avg(close_chg_rate) as m1_chg_avg,
stddevPop(tick_price_close) as m1_price_std,
skewPop(tick_price_close) as m1_price_skew,
kurtPop(tick_price_close) as m1_price_kurt
FROM tick.sse50_data
GROUP BY tumble(trade_time, INTERVAL '1' MINUTE) as trade_time_id, ticker
ORDER BY trade_time_id, ticker
其中 tumble(trade_time, INTERVAL '1' MINUTE) 表示每1分鐘執行一次。
三、效果測試
3.1 用戶端模拟實時插入
for item in tqdm(data):
db_client.execute("insert into tick.sse50_data values", [item])
3.2 查詢
在另一個控制台上查詢tick.factor_m1表,可以發現資料已經實時寫入特征表中了(K線與看盤軟體有1分鐘偏移,因為這裡時間戳表示該分鐘的起始位置):
通過WATCH語句,在控制台中我們能看到K線的實時生成:
結論
雖然仍處于實驗階段,但Clickhouse的視窗視圖顯示出強大的流式處理能力,我們可以利用其輕松搭建一個tick級的高頻交易系統,自動提取特征入庫,省去手工維護之煩惱。