-
前言
之前已完成了Binary行情的解析,接着便繼續研究FAST(STEP)行情,但花費了将近一個月時間才終于搞定了。前面說過Binary格式的行情不太直覺,是以對于初學者有點難度,接觸FAST後才知道什麼叫“完全看不明白”。還好網際網路是偉大的,大佬們偶爾留下的隻言片語對我們來說就是難得的啟迪了。
一開始我是想人肉解析的,但确實沒看明白,找到的參考資料如下:
GitHub - kuangtu/fixfast: 對于fast協定通過OpenFast進行分析
上述連結的資料非常全,但恕我真的沒看懂,接着推薦另一份資料:
SSE Level 2 Vendor Interface Specification(FAST)-15章-FAST Decoder(FAST解析-第四部分)_wqfhenanxc的部落格-CSDN部落格(這隻是一部分,該部落格還有同系列的其他文章)
兩個結合起來看終于搞明白了“停止位”和“map”,但實際解析還是不對,故放棄轉而開始研究openfast。
事實上關于openfast的資料也非常稀有,而且大都隻是提了幾句完全無法使用,但經過我不懈的努力,還是找到了一些有用的資料。
Openfast下載下傳:
OpenFAST download | SourceForge.net
非常有參考價值的一個示例:
https://blog.csdn.net/qq_33601179/article/details/122407384?spm=1001.2014.3001.5502(此連結也隻是一部分,該部落格還有同系列其他文章)
有了之前binary的經驗以及上述資料的加持,我終于把FAST行情初步給搞出來了,不過截至目前我隻是用openfast解析出了内容,并沒有做深入研究,是以僅供各位看官參考。
另附上官方參考文檔:
《上海證券交易所低延時行情釋出系統(LDDS)接口說明書》
《上海證券交易所LDDS系統Level-1 FAST行情接口說明書》
《IS120_上海證券交易所行情網關STEP資料接口規範》
-
FAST接口
要搞明白FAST就得先搞清楚fix、step,參考資料如下:
國内交易所協定FIX STEP FAST Binary_wqfhenanxc的部落格-CSDN部落格_binary協定
簡單來說,FIX就是KEY=VALUE這樣的一對資料,中間用一個特殊符号分割開,解析起來比較容易而且新增、修改接口也比較簡單。但缺點是過于冗長了,傳輸金融行情資料占用很大的帶寬,不是很科學。
STEP跟FIX看起來差不多,隻是做了一些本地化,沒詳細研究也不知道做了啥修改,不多說。
由于FIX、STEP都比較冗長,是以又整出了FAST。首先FAST也是源于KEY=VALUE這樣的格式,然後在此基礎上做了壓縮,是以所謂的FAST解析就是把它還原成KEY=VALUE這樣。FAST壓縮的大緻方法如下:
- 取消了Key,使用模闆(temple)來約定各字段的含義
- 使用二進制而不是字元串的形式來傳輸
- 在二進制的基礎上使用了非常特别(而且看不懂)的壓縮方法,進一步做了壓縮。
而上交所在發FAST行情時,還給他套了個STEP的殼,而且是雙重殼,見下圖:
STEP的内容直接輸出成字元串即可,可讀性很強,但FAST的内容全都是亂碼了,因為它即是二進制又做了壓縮。雖然FAST協定本身很難看懂,但借助openfast我們可以很容易解析出資料,那麼現在的問題就是如何擷取FAST的内容。
-
登入
與binary類似,隻需要按指定的格式發資訊到VDE即可,這一塊參考官方文檔很容易就接入了。其中“\u0001”就是SOH這個分隔符了。
-
STEP解析
你把從接口擷取到的内容直接字元串輸出,可以看到STEP部分很好處理,而FAST部分全是亂碼,是以要把FAST部分完整的截取出來用openfast來解析。如果你用SOH來分割還不行,因為FAST裡有很多“00000001”這樣的資料,是以你得先解析出FAST的長度,然後根據長度去讀byte數組,讀剛好那麼長的byte數組出來,直接怼到openfast裡解析就行了。
這裡我偷懶直接通過數SOH找到FAST的長度,也就是tag95,注意因為套娃的關系,是第二個tag95的值。
-
FAST解析
前面把FAST的内容讀出來了,解析的話就是用openfast即可,具體openfast的使用好似也比較多樣,我這裡就參考前面博文随便寫了下,沒有深入研究。
輸出結果如下:
由于中文還得特别處理下,是以這裡顯示的是亂碼,另外我為了簡便都是以string格式讀的,按說不太合适,大家自行研究吧,可以看openfast的文檔看都有啥函數。
完整代碼如下:
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.OutputStream;
import java.io.*;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.openfast.template.MessageTemplate;
import org.openfast.template.loader.XMLMessageTemplateLoader;
import org.openfast.Context;
import org.openfast.codec.FastDecoder;
import org.openfast.Message;
public class connSHFAST2{
public static String getCheckSum(String str){
byte[] strBytes = str.getBytes();
int sum = 0;
for(byte b : strBytes){
int bInt = b&0xff;
sum += bInt;
sum = sum&0xff;
}
String sumStr = sum + "";
if(sumStr.length()==1){
sumStr = "00"+sumStr;
}else if(sumStr.length()==2){
sumStr = "0"+sumStr;
}
return sumStr;
}
/**
* 把byte轉化成2進制字元串
* @param b
* @return
*/
public static String getBinaryStrFromByte(byte b){
String result ="";
byte a = b; ;
for (int i = 0; i < 8; i++){
byte c=a;
a=(byte)(a>>1);//每移一位如同将10進制數除以2并去掉餘數。
a=(byte)(a<<1);
if(a==c){
result="0"+result;
}else{
result="1"+result;
}
a=(byte)(a>>1);
}
return result;
}
static class ReadThread extends Thread{
InputStream readStream;
public ReadThread(InputStream readStream){
this.readStream = readStream;
}
@Override
public void run(){
try{
System.out.println("thread run...");
while(true){
byte[] buffer = new byte[102400];
int len = readStream.read(buffer,0,102400);
//讀到的資料太少
if(len<28){
System.out.println("len<28");
Thread.sleep(1000);
continue;
}
//讀到的資料太多
if(len>=102400){
System.out.println("len>=102400");
continue;
}
//截取有效部分
byte[] step = Arrays.copyOf(buffer,len);
//逐一讀取byte,數SOH,數到第27個
int count = 0;
int start = 0;
len = 0;
for(int i=0; i<step.length; i++){
byte b = step[i];
int bInt = b & 0xFF;
if(bInt==1){
count = count + 1;
if(count == 1){
len = i - start;
byte[] BeginString = new byte[len];
System.arraycopy(step,start,BeginString,0,len);
System.out.println("BeginString="+new String(BeginString));
start = i+1;
}
if(count == 2){
len = i - start;
byte[] BodyLength = new byte[len];
System.arraycopy(step,start,BodyLength,0,len);
System.out.println("BodyLength="+new String(BodyLength));
start = i+1;
}
if(count == 3){
len = i - start;
byte[] MsgType = new byte[len];
System.arraycopy(step,start,MsgType,0,len);
System.out.println("MsgType="+new String(MsgType));
start = i+1;
if(!"35=UA5302".equals(new String(MsgType))){
System.out.println("MsgType!=UA5302");
break;
}
}
if(count == 26){
start = i+1;
}
if(count==27){
len = i - start;
byte[] Tag95 = new byte[len];
System.arraycopy(step,start,Tag95,0,len);
System.out.println("Tag95="+new String(Tag95));
byte[] FastLenBG = new byte[len-3];
System.arraycopy(Tag95,3,FastLenBG,0,len-3);
int FastLen = Integer.parseInt(new String(FastLenBG));
System.out.println("FastLen="+FastLen);
start = i+1;
//讀取FAST内容
byte[] FAST = new byte[FastLen];
System.arraycopy(step,start+3,FAST,0,FastLen);
//System.out.println("FAST="+new String(FAST));
//解析FATS内容
InputStream inputStream = new FileInputStream("template.xml");
MessageTemplate template = new XMLMessageTemplateLoader().load(inputStream)[0];
Context context = new Context();
context.registerTemplate(4001, template);
InputStream sbs = new ByteArrayInputStream(FAST);
FastDecoder fastDecoder = new FastDecoder(context, sbs);
Message msg111 = fastDecoder.readMessage();
System.out.println("MDStreamID="+msg111.getString("MDStreamID"));
System.out.println("SecurityID="+msg111.getString("SecurityID"));
System.out.println("Symbol="+msg111.getString("Symbol"));
System.out.println("NumTrades="+msg111.getString("NumTrades"));
System.out.println("TradeVolume="+msg111.getString("TradeVolume"));
System.out.println("TotalValueTraded="+msg111.getString("TotalValueTraded"));
System.out.println("PrevClosePx="+msg111.getString("PrevClosePx"));
System.out.println("PrevSetPx="+msg111.getString("PrevSetPx"));
System.out.println("TotalLongPosition="+msg111.getString("TotalLongPosition"));
}
}
}
}
}catch(Exception e){
System.out.println(e.getMessage());
}
}
}
public static void main(String[] args){
try{
//head
String BeginString = "8=STEP.1.0.0\u0001";
String BodyLength = "9=56\u0001";//除了 8、9、10 域,所 有其他域長度總和
String MsgType = "35=A\u0001";
String SenderCompID = "49=VSS\u0001";
String TargetCompID = "56=VDE\u0001";
String MsgSeqNum = "34=0\u0001";
String SendingTime = "52=20220916-11:00:00\u0001";
//body
String EncryptMethod = "98=0\u0001";
String HeartBtInt = "108=0\u0001";
//check body,用于做校驗和的
String CheckBody = BeginString+BodyLength+MsgType+SenderCompID
+TargetCompID+MsgSeqNum+SendingTime
+EncryptMethod+HeartBtInt;
//checksum 除了10域本身,對所有其他域的每個位元組累加後取256的餘數。餘數不足3位的,前補0
String CheckSum = "10="+getCheckSum(CheckBody)+"\u0001";
String MsgBody = CheckBody + CheckSum;
//connect
Socket socket = new Socket("你的IP",端口);
OutputStream outStream = socket.getOutputStream();
outStream.write(MsgBody.getBytes());
outStream.flush();
//listion thread
ReadThread readThread = new ReadThread(socket.getInputStream());
readThread.start();
while(true){
Thread.sleep(3000);
System.out.println("every 3000...");
}
}catch(Exception e){
System.out.println(e.getMessage());
}
}
}