天天看点

Interfacing to kdb+ from Java

http://www.sinv.de/slog/?p=23

Retrieving data from kdb+ into Java is extremely simple.

The java driver for kdb+ resides in one file, c.java, and can be downloaded from www.kx.com/q/c. Don’t be put off when reading what is inside c.java - it is actually very straightforward. Most of the code is to serialize java objects into kdb+ objects, and vice versa. You can stick to the public methods, like the constructor, k, ks and close. In fact you need only be aware of 3 functions to query a server!

Essentially to query a server, one needs to open a connection, send a query, and then receive a response. We’ll go through that here:

public void test() {

c connection= null;

try

{

// open a connection using hostname, port, user and password

connection=new c(”localhost”,5001,”cskelton:xyz”);

// send a blocking query

Object result=connection.k(”fn”,new Integer(10),new Integer(20));

if( result instanceof Integer){

System.out.println( “Result is “+((Integer)result).intValue());

}

else {

System.err.println( “Unexpected result type received”);

}

}

catch( c.KException e){

System.err.println( “Server Reported Error: “+e.toString());

}

catch( java.io.IOException e){

System.err.println( “IOException: ” + e.getMessage());

}

finally{

if( connection != null)

{

try{

connection.close();

}

catch(java.io.IOException e){}

}

}

}

The above code opens a connection to the server, and invokes a function called fn, passing two parameters to it (2 integers, of values 10 and 20). The result from that call is stored in an Object result, which we then interrogate to see what type it is - we expect an Integer object, and if that’s what we receive, we print it and close the connection. On the server side we should create a suitable function to call, something like

fn:{x+y}

c.java only throws 2 types of checked exceptions - c.KException and java.io.IOException. KException is to indicate that the server itself has reported an error, and IOException is used to indicate a problem with the underlying network.

In the event of no errors or exceptions we should receive the Integer 30 as a result. If we cannot connect to the server, we print a message as

IOException: Connection refused

and if no function called fn exists on the server we would see

Server Reported Error: c$KException: fn

There are a few other errors that can happen, e.g. if authentication fails.

The methods k and ks inside c.java are for blocking and non-blocking queries. Typically you would use k(…) for your usual queries that you expect an immediate result from. If you are sending data to a kdb+ process, and do not need a confirmation that it has been processed you could use ks(…) instead.

====

http://www.kx.com/q/d/kdb+.htm

Copyright © kx.com

Abridged Kdb+ Database Manual

Arthur Whitney

1 Summary

Kdb+ is an RDBMS and a general purpose programming language: http://kx.com/q/d/q.htm

* OLTP from 100 thousand to 1 million records per second per cpu.

* OLAP from 1 million to 100 million records per second per cpu.

* Solaris, Linux and Windows. 32bit and 64bit.

trade:([]time:`time$();sym:`symbol$();price:`float$();size:`int$()) / create`trade insert(12:34:56.789;`xx;93.5;300)                            / insertselect sum size by sym from trade                                   / select      

The realtime OLTP+OLAP part of the database is generally in memory with an optional update log. There is no limit to the size of the OLAP historical databases on disk. Typical customers run realtime analysis on 100 million transactions per day and historical analysis on the accumulated billions of records.

2 Install

Unzip kdb+ (and

k4.lic

) in QHOME (default:

q

).

SOL64> unzip s64.zip -d q                (q/s64/q)LIN64> unzip l64.zip -d q                (q/l64/q)WIN32> w32.exe            (WINDIR/q.exe)                

Executable is

q

. Put

QHOME/{s64|l64}

on PATH.

3 Server
>q [f] [-p p] [-s s] [-t t] [-T T] [-o o] [-u u] [-w w] [-r r] [-l] f load script (*.q), file or directory                            -p port kdbc(/jdbc/odbc) http(html xml txt csv)                    -s slaves for parallel execution                                   -t timer milliseconds                                              -T timeout seconds                                                 -o offset hours(from GMT: affects .z.Z)                            -u usr:pwd file (-U allows file operations)                        -w workspace MB limit                                              -r replicate from :host:port                                       -l log updates to filesystem (-L sync)                                   
-z [0] "D"$ uses mm/dd/yyyy or dd/mm/yyyy-P [7] printdigits(0:all)                -W [2] week offset(0:sat)                      

The simplest database is just a process which may be initialized with a script. Each process handles any number of concurrent http and kdbc (jdbc/odbc) clients. For example, start a trade.q database listening on port 5001.

>q trade.q -p 5001      

View the database with a browser: http://localhost:5001

4 Client

Kdb+ returns text (html, txt, csv or xml) to http queries.

browser                                                 http://localhost:5001/?select sum size by sym from trade      
browser/excel                                                http://localhost:5001/q.csv?select sum size by sym from trade      
excel/rtd                          http://www.skelton.de/rtdserver.htm      
excel (data/getexternaldata/newwebquery)                     http://localhost:5001/q.txt?select sum size by sym from trade      
console (q/l32/qcon localhost:5001)             localhost:5001>select sum size by sym from trade      
perl (use LWP::Simple;)                                            get("http://localhost:5001/.txt?select sum size by sym from trade")      

Kdb+ returns data to java, .net, c, jdbc/odbc or q clients. The results are atom, list, dict or table.

Result=k(string | {function,args})      
java (q/c/c.java)                                                                                 import java.sql.*;                                                                                try{c c=new c("localhost",5001);                    // connect                                     Object[]x={new Time(System.currentTimeMillis()%86400000),"xx",new Double(93.5),new Integer(300)}; c.k("insert","trade",x);                           // insert                                      Object r=c.k("select sum size by sym from trade"); // select                                     }catch(Exception e){}                                                                                   
c    (q/c/c.c)                                                          #include"k.h"                                                           int main(){K x;int c=khp("localhost",5001);if(c==-1)return c; // connect x=knk(4,kt(1000*(time(0)%86400)),ks("xx"),kf(93.5),ki(300));            k(-c,"insert",ks("trade"),x,0);                              // insert  x=k(c,"select sum size by sym from trade",0);r0(x);          // select  return 0;}                                                                   
q                                                   c:hopen`:localhost:5001                    / connectc("insert";`trade;("t"$.z.Z;`xx;93.5;300)) / insert c"select sum size by sym from trade"       / select       

Data is faster and more flexible than text. You should get 100,000 single record selects, inserts, updates [and deletes] per second. You can insert or update a million records per second in bulk. Kdb+ is generally faster than the network.

5 Datatypes
t| size literal      q        sql       java      .net     xmlschema ---------------------------------------------------------------------b  1    0b           boolean            Boolean   boolean  boolean   x  1    0x0          byte               Byte      byte     byte      h  2    0h           short    smallint  Short     int16    short     i  4    0            int      int       Integer   int32    int       j  8    0j           long     bigint    Long      int64    long      e  4    0e           real     real      Float     single   single    f  8    0.0          float    float     Double    double   double    c  1    " "          char               Character char               s  .    `            symbol   varchar   String    string   string    m  4    2000.01m     month                                           d  4    2000.01.01   date     date      Date               date      z  8    dateTtime    datetime timestamp Timestamp DateTime dateTime  u  4    00:00        minute                                          v  4    00:00:00     second                                          t  4    00:00:00.000 time     time      Time      TimeSpan time      *  4    `s$`         enum                                                  

All but boolean and byte have nulls. The int, float, char and symbol literal nulls are:

0N 0n " " `

. The rest use type extensions, e.g.

0Nd

. Each type also a list notation, e.g. in the following nested list:

(01b;0x00ff;0 1h;0 1;0 1j;0 1e;0 1.0;"ab";`a`b;2000.01 2000.02m;2000.01.01 2000.01.02;..)      

There must be no spaces in symbol lists, e.g.

`a`b`c

. Symbols can have any non-zero characters (e.g. `$"a-b") but identifiers must be alphanumeric.

6 Insert and Upsert

The q

`t insert ..      

is similar to the sql

insert into t ..      

It will check primary key violations. The q

`t upsert ..  (also t,:..)      

is insert for tables and upsert (update existing and insert new) for keyed tables.

7 Select and Update

q is an extension of sql capabilities. Expressions are short and simple.

select vwap:size wavg price by sym from trade     / volume weighted average priceselect from trade where sym=`IBM, 0<deltas price  / price went up                select sum qty by order.customer.nation from item / rollup of 4-way join               

These all run at several million records per second. In general:

select [a] [by b] from t [where c]update [a] [by b] from t [where c]      

These are similar to (but more powerful than) the sql

select [b,] [a] from t [where c] [group by b order by b]update t set [a] [where c]                                    

In sql the

where

and

group

clauses are atomic and the

select

and

update

clauses are atomic or aggregate if grouping. In q the

where

and

by

clauses are uniform and the

select

and

update

clauses are uniform or aggregate if grouping (

by

). All clauses execute on the columns and therefore q can take advantage of order. Sql can't tell the difference. Sql repeats the

group by

expressions in the

select

and the

where

clause is one boolean expression. The q

where

clause is a cascading list of constraints which nicely obviates some complex sql correlated subqueries and also gets rid of some parentheses. q relational queries are generally half the size of the corresponding sql. Ordered and functional queries do things that are difficult in sql. See http://kx.com/q/e for examples.

i

is a special token that indicates record handle.

8 Default Column Names

Create, select and update will create names from expressions if there is no assignment. The name is last token (first token if operation is +, -, *, %, & or |) in the expression or

x

if this token isn't a valid name, e.g.

select count i,sum qty by order.customer.nation ..      

is short for

select x:count i,qty:sum qty by nation:order.customer.nation ..      
9 Parameterized Queries
String[]x={"IBM","MSFT"};                                          r=c.k("{[s]select last price by sym from trade where sym in s}",x);      
10 Table Arithmetic
t:([x:`a`b]y:2 3)        u:([x:`b`c]y:4 5)        t+u / ([x:`a`b`c]y:2 7 5)      

It is common to want to do arithmetic with tables. Extending arithmetic to tables replaces complicated sql coalescing and outer joins.

11 Foreign Keys

Foreign keys are useful in q, e.g. given

s:([s]name;city;..)                                p:([p]name;city;..)                                sp:([]s:`s$();p:`p$();..) / foreign key definitions      

then

select .. from sp where s.city=p.city / supplier city equal part cityupdate .. from sp where s.city=p.city / try that in sql                    

See http://kx.com/q/e/tpcd.q for more examples.

12 Joins

Joins generally run at 10 to 100 million records per second.

See http://kx.com/q/e/tpcd.txt for an 8-way join

13 Q from SQL

Following is a map of how to do sql things in q. There is no reverse map since q is a much richer language than sql. With respect to CJDate "The SQL Standard"

s)create table s(s varchar(*)primary key,name varchar(*),status int,city varchar(*))q)s:([s:`symbol$()]name:`symbol$();status:`int$();city:`symbol$())                        
s)create table sp(s varchar(*)references s,p varchar(*)references p,qty int)q)sp:([]s:`s$();p:`p$();qty:`int$())                                              
s)insert into s values('s1','smith',20,'london')q)`s insert(`s1;`smith;20;`london)                    
s)update s set status=status+1 where s='s1'q)update status:status+1 from`s where s=`s1      
s)delete from s where s='s1'  q)delete from`s where s=`s1                                 s)select * from s where s='s1'q)select from s where s=`s1         
s)select s,count(*)as x from sp group by s order by sq)select count i by s from sp                              
s)select s,sum(qty)as qty from sp,s,p where sp.s=s.s and sp.p=p.p and p.city=s.city group s order sq)select sum qty by s from sp where s.city=p.city                                                                                                                                                     s)select distinct ..                                                                               q)select distinct ..                                                                                                                                                                                  s)count sum min max avg                                                                            q)count sum min max avg prd first last wavg wsum ..                                                      
s)+ - * / < > = <= >= <> like between-and not and or   q)+ - * % < > = <= >= <> like within      not &   |  ..      
14 Stored Procedures

Server functions and procedures are written in q. Why not java? q is faster, smaller and better than java. For example, a server function to subtract a client table p from a server table P is

f:{[p]P-:p}

15 Rename, Rearrange
xcol  [p]rename    `a`b xcol sp xcols [p]rearrange `p`s xcols sp      
16 Correlated Subquery
(aggr;exp)fby exp      

obviates common correlated subqueries, e.g.

select from sp where qty>(avg;qty)fby p      select from sp where s=`s1,qty>(avg;qty)fby p      

see also http://kx.com/q/e/tpcd.q

17 Performance

Choose layout and optimizations to fit the important queries:

`s sorted `u unique `p parted `g grouped      
don't bother with any on fewer than one million rows.don't bother with `p  on fewer than one billion rows.      

think about the queries. think about disk. e.g. if you want fast access on terabytes by time and customer store it both ways.

18 Layout

Tables are:

size| small medium    large     --------------------------------32bit 1GB   32M rows  unlimited 64bit 10GB  512M rows unlimited store file  directory partitionsquery .01ms .3ms      10ms(seek)      tick            taq             

large are usually sorted and parted so that important queries are one seek.

64bit can generally handle realtime tables around 10 times bigger than 32bit.

64bit has an advantage on medium (and large) because queries on 32bit are dominated by mapping.

19 Small

http://kx.com/q/tick

One day's worth of tick(trades, quotes, orders, ..) is a few GB and fits in RAM.

Queries are around 10 microseconds.

20 Medium
t `s#date,`g#mas, ..      

dayend append (and `g#mas) is fast.

once data is cached in memory

\t select i from t where date=..\t select i from t where mas=..       

Queries are around 1 millisecond.

21 Large

http://kx.com/q/taq

trade `p#symquote `p#sym      

Queries run from disk at 10ms per date/sym/field.

The partition field is

date month year or int

and is virtual.

Kdb+taq is more than 2500 partitions of daily trades and quotes. Each new day is more than 1GB. [Queries on partitioned databases can run in parallel.] To set a subview -- handy for test and development:

.Q.view 2#date / view two first partitions.Q.view[]      / reset                          
22 Limits

Each database runs in memory and/or disk map-on-demand -- possibly partitioned. There is no limit on the size of a partitioned database but on 32-bit systems the main memory OLTP portion of a database is limited to about 1GB of raw data, i.e. 1/4 of the address space. The raw data of a main memory 64bit process should be limited to about 1/2 of available RAM.

23 Partition

A kdb+ historical database is a file system directory. Small tables are simply binary files inside this directory. Medium-sized tables (<100 million rows) are splayed (a file for each column) across a subdirectory. Big tables (billions of rows) are horizontally partitioned (often by date) and then splayed across a directory. An example layout for kdb+taq is:

/data/db/                     sym  /enumeration of symbols mas  /master table           2005.03.15/                   trade/time ..                quote/time ..               2005.03.16/                   trade/time ..                quote/time ..               ..                                

Kdb+ databases run 24*7. After a new date is added a simple reset message is sent to the server. Small tables are in memory. The big table columns are mapped in and out on demand. To avoid swapping RAM should be at least 10* size of the largest column, e.g. if one table in one partition can have 100 million rows there should be 8GB (10*8(byte float)*100,000,000) of RAM. [Address usage is 20* size of the largest column so on 32bit systems tables shouldn't have more than 16.7 million rows.] The number of partitions, tables and columns doesn't matter. Some customers have 1000's of partitions. Some have 1000's of tables. Some have 1000's of columns in the tables.

Tables with many columns are good splayed table candidates because the bulk of these tables will always be out of memory (queries rarely require access to more than a few columns at a time). The performance characteristics of splayed tables are independent of the number of columns in the table. They depend only on the number of columns that must be in memory at any one time, which in turn depends on the specific queries being executed. Columns of Kdb+ splayed tables are stored contiguously on disk and laid out contiguously in real memory when accessed. This organization gives optimal disk access performance. The operating system caches accessed columns in real memory. Consequently, if the real memory hit-rate is high, the performance is real memory performance. (This is often the case because many database applications have a few heavily used queries that reference the same few columns). There can be one or many instances of the database. All instances point to the same data. All processes will reside on the server including a gateway should one be required. The system can be implemented with or without a gateway process. In a system without a gateway, users connect directly to a database instance and send queries, with a gateway the users connect and send queries to the gateway process which allocates the query based on which instance is free. This can be handy for running more than one query at a time but overall throughput can of course suffer to the extent that the queries are competing for disk. But this is a way to let an important user get something quick while some bigger analysis is going on in another instance.

24 Parallel

Partitioned databases can run in parallel. But if we run in parallel it is best to have parallel i/o. Each kdb+ process can easily consume 100MB per second. Therefore the fastest multi-terabyte kdb+ databases use parallel i/o with guaranteed no contention. Customers do this with DAS (direct attached storage). U320 scsi controllers are fast and inexpensive. Customers sometimes use SAN (200MB/sec total) or NAS (80MB/sec total) because the IT departments insist on it. That is ok. But this is the limiting factor. If you use NAS be sure to have at least 1Gbit ethernet.

If you use NAS there is no point in going parallel -- i.e. one kdb+ process can easily consume everything the filesystem throws at it. 4 slaves will saturate SAN. With DAS (direct attached storage) we can have one slave per disk array and ideally at least 2 disk arrays per cpu core.

If all queries access just one partition/date then there is no point in going parallel. If you have DAS and have queries that span more than one partition then the best setup is to have one or more disk arrays per slave process (thus no contention) with approximately 2 slaves processes per cpu. (one is crunching while the other is reading). But be sure to have enough RAM per process so that there is no swapping, e.g. a machine with 4cpu/32GBram/4controllers(2channels each)/8diskarray(14*146GB drives each) can have 8 slave processes on 50 million row (per partition) tables and 4 slave processes on 100 million row (per partition) tables. The o/s independent way to allocate partitions to different disk-arrays is to use a file (par.txt) that lists the different directories, e.g.

/data/db/ sym      mas      par.txt       

where par.txt is

/0/db/1/db/2/db/3/db      

and these directories are:

/0/db                   2005.03.15/trade quote 2005.03.19/trade quote/1/db                   2005.03.16/trade quote 2005.03.20/trade quote..                           

Partitions are round-robin allocated mod N where N is the number of partitions. Round-robin is to get maximum parallelization on date ranges.

25 Logs

Log/commit strategies are none, file (-l) or synch (-L). None is good for test, readonly, readmostly, trusted, duplicated or cache db's. File (100,000 transactions per second) is useful if you trust (or duplicate) the machine. Synch is 100 times per second with SCSI disk and 10,000 times per second with SSD(solid state disk). In any case a single drive can commit more than 40MB of updates per second. A logging database is [script/][state/]log, e.g.

&gt;q d -l / loads d.q(func), d.qdb(data), d.log(updates)                \l      / checkpoint d.qdb and empty d.log (d must not have &quot;.&quot; in it)      

i.e. put code in d.q in directory .u(utilities). all update messages are logged. To send a message from the console use 0, e.g.

0"v+:1"

. An error after a partial update will cause rollback. d can start empty. d, d.q and d.log should all be in the same launch directory. In general only log in production. Keep the database as simple as possible.

26 Load
(types;widths)0:file / text load(types;widths)1:file / data load      

The windows version comes with an odbc loader, e.g.

&gt;q w32/odbc.k                                         q).odbc.load`northwind.mdb / will load entire database      

You should get 10MB/sec -- if the source rdbms and network are up to it. Also,

h:.odbc.open`       t:.odbc.tables h    r:.odbc.eval[h]&quot;...&quot;  .odbc.close h           

$bcp t out t.txt -c

27 Archive

It is trivial to incrementally archive the large parallel databases: simply move the old directories away and send the reset message.

28 User

Servers can control access with a file of usr:pwd's, e.g.

&gt;q f -p 1234 -u ../user.txt      

Clients connect with:

q&gt; c:hopen`:host:1234:usr:pwd         c.java&gt; c c=new k.c(&quot;host&quot;,1234,&quot;usr:pwd&quot;);  c.cs&gt; c c=new k.c(&quot;host&quot;,1234,&quot;usr:pwd&quot;);   c.o&gt; int c=khpu(&quot;host&quot;,1234,&quot;usr:pwd&quot;);       

Even when there is no server -u the q client still sends username. All incoming messages reflect:

.z.w / who-socket.z.a / address   .z.u / user            

Clients can only do file i/o in the server directory.

29 Grid

Kdb+ is a parallel/grid programming system.

&gt;q [f] -s n / will start n slaves threads{..}peach x / parallel execute on x, e.g.      

The overhead is conditional threads - 1 microsecond - so use accordingly. Test overhead with:

\t {x}peach til 1000      

Ideal parallel cpu candidate has high cpu%data, e.g.

{sum exp x?1.0}peach 2#1000000 / should be twice as fast with 2 cpu's      

Peach is also good for going after many different drives at once even if you only have one cpu.

=============================c.java

package kx; //jar cf c.jar kx

private static String e="ISO-8859-1";private static PrintStream out=System.out;

public static void setEncoding(String e)throws UnsupportedEncodingException{c.e=e;out=new PrintStream(System.out,true,e);}

public Socket s;DataInputStream i;OutputStream o;byte[]b,B;int j,J;boolean a,v6;

void io(Socket x)throws IOException{s=x;i=new DataInputStream(s.getInputStream());o=s.getOutputStream();}public void close()throws IOException{s.close();i.close();o.close();}

public c(ServerSocket s)throws IOException{io(s.accept());i.read(b=new byte[99]);o.write(b,0,1);} //c c=new c(new ServerSocket(5010));while(true)c.w(2,c.k());

public c(String h,int p,String u)throws KException,IOException{B=new byte[2+ns(u)];io(new Socket(h,p));J=0;w(u+"\1");o.write(B);if(1!=i.read(B,0,1)){close();B=new byte[1+ns(u)];io(new Socket(h,p));J=0;w(u);o.write(B);if(1!=i.read(B,0,1)){close();throw new KException("access");}}v6=B[0]==1;}

public c(String h,int p)throws KException,IOException{this(h,p,System.getProperty("user.name"));}

public static class Month{public int i;public Month(int x){i=x;}public String toString(){int m=i+24000,y=m/12;return i==ni?"":i2(y/100)+i2(y%100)+"-"+i2(1+m%12);}}

public static class Minute{public int i;public Minute(int x){i=x;}public String toString(){return i==ni?"":i2(i/60)+":"+i2(i%60);}}

public static class Second{public int i;public Second(int x){i=x;}public String toString(){return i==ni?"":new Minute(i/60).toString()+':'+i2(i%60);}}

public static class Timespan{public long j;public Timespan(long x){j=x;}public String toString(){return j==nj?"":j+"";}}

public static class Dict{public Object x;public Object y;public Dict(Object X,Object Y){x=X;y=Y;}}

public static class Flip{public String[]x;public Object[]y;public Flip(Dict X){x=(String[])X.x;y=(Object[])X.y;}public Object at(String s){return y[find(x,s)];}}

public static class KException extends Exception{KException(String s){super(s);}}

private void u(){int n=0,r=0,f=0,s=8,p=s;short i=0;j=0;byte[]dst=new byte[ri()];int d=j;int[]aa=new int[256];while(s<dst.length){if(i==0){f=0xff&(int)b[d++];i=1;}if((f&i)!=0){r=aa[0xff&(int)b[d++]];dst[s++]=dst[r++];dst[s++]=dst[r++];n=0xff&(int)b[d++];for(int m=0;m<n;m++)dst[s+m]=dst[r+m];}else dst[s++]=b[d++];while(p<s-1)aa[(0xff&(int)dst[p])^(0xff&(int)dst[p+1])]=p++;if((f&i)!=0)p=s+=n;i*=2;if(i==256)i=0;}b=dst;j=8;}

void w(byte x){B[J++]=x;}static int ni=Integer.MIN_VALUE;static long nj=Long.MIN_VALUE;static double nf=Double.NaN;

boolean rb(){return 1==b[j++];}void w(boolean x){w((byte)(x?1:0));}  char rc(){return(char)(b[j++]&0xff);}void w(char c){w((byte)c);}

short rh(){int x=b[j++],y=b[j++];return(short)(a?x&0xff|y<<8:x<<8|y&0xff);}                               void w(short h){w((byte)(h>>8));w((byte)h);}

int ri(){int x=rh(),y=rh();return a?x&0xffff|y<<16:x<<16|y&0xffff;}                                       void w(int i){w((short)(i>>16));w((short)i);}

long rj(){int x=ri(),y=ri();return a?x&0xffffffffL|(long)y<<32:(long)x<<32|y&0xffffffffL;}                void w(long j){w((int)(j>>32));w((int)j);}

float re(){return Float.intBitsToFloat(ri());}                                                            void w(float e){w(Float.floatToIntBits(e));}

double rf(){return Double.longBitsToDouble(rj());}                                                        void w(double f){w(Double.doubleToLongBits(f));}

Month rm(){return new Month(ri());}   void w(Month m){w(m.i);} Minute ru(){return new Minute(ri());}      void w(Minute u){w(u.i);}

Second rv(){return new Second(ri());} void w(Second v){w(v.i);}Timespan rn(){return new Timespan(rj());}  void w(Timespan n){if(!v6)throw new RuntimeException("Timespan not valid pre kdb+2.6");w(n.j);}

public java.util.TimeZone tz=java.util.TimeZone.getDefault();

static long k=86400000L*10957,n=1000000000L;long o(long x){return tz.getOffset(x);}long lg(long x){return x+o(x);}long gl(long x){return x-o(x-o(x));}

Date rd(){int i=ri();return new Date(i==ni?nj:gl(k+86400000L*i));}                             void w(Date d){long j=d.getTime();w(j==nj?ni:(int)(lg(j)/86400000-10957));}

Time rt(){int i=ri();return new Time(i==ni?nj:gl(i));}                                         void w(Time t){long j=t.getTime();w(j==nj?ni:(int)(lg(j)%86400000));}

//Timestamp

java.util.Date rz(){double f=rf();return new java.util.Date(Double.isNaN(f)?nj:gl(k+Math.round(8.64e7*f)));} void w(java.util.Date z){long j=z.getTime();w(j==nj?nf:(lg(j)-k)/8.64e7);}

Timestamp rp(){long j=rj(),d=j<0?(j+1)/n-1:j/n;Timestamp p=new Timestamp(j==nj?j:gl(k+1000*d));if(j!=nj)p.setNanos((int)(j-n*d));return p;}

void w(Timestamp p){long j=p.getTime();if(!v6)throw new RuntimeException("Timestamp not valid pre kdb+2.6");w(j==nj?j:1000000*(lg(j)-k)+p.getNanos()%1000000);}

String rs()throws UnsupportedEncodingException{int i=j;for(;b[j++]!=0;);return (i==j-1)?"":new String(b,i,j-1-i,e);}void w(String s)throws UnsupportedEncodingException{int i=0,n=ns(s);byte[]b=s.getBytes(e);for(;i<n;)w(b[i++]);B[J++]=0;}

Object r()throws UnsupportedEncodingException{int i=0,n,t=b[j++];if(t<0)switch(t){case-1:return new Boolean(rb());case-4:return new Byte(b[j++]);case-5:return new Short(rh());

  case-6:return new Integer(ri());case-7:return new Long(rj());case-8:return new Float(re());case-9:return new Double(rf());case-10:return new Character(rc());case-11:return rs();

  case-12:return rp();case-13:return rm();case-14:return rd();case-15:return rz();case-16:return rn();case-17:return ru();case-18:return rv();case-19:return rt();}

 if(t>99){if(t==100){rs();return r();}if(t<104)return b[j++]==0&&t==101?null:"func";if(t>105)r();else for(n=ri();i<n;i++)r();return"func";}

 if(t==99)return new Dict(r(),r());j++;if(t==98)return new Flip((Dict)r());n=ri();switch(t){

  case 0:Object[]L=new Object[n];for(;i<n;i++)L[i]=r();return L;        case 1:boolean[]B=new boolean[n];for(;i<n;i++)B[i]=rb();return B;

  case 4:byte[]G=new byte[n];for(;i<n;i++)G[i]=b[j++];return G;         case 5:short[]H=new short[n];for(;i<n;i++)H[i]=rh();return H;

  case 6:int[]I=new int[n];for(;i<n;i++)I[i]=ri();return I;             case 7:long[]J=new long[n];for(;i<n;i++)J[i]=rj();return J;

  case 8:float[]E=new float[n];for(;i<n;i++)E[i]=re();return E;         case 9:double[]F=new double[n];for(;i<n;i++)F[i]=rf();return F;

 case 10:char[]C=new String(b,j,n,e).toCharArray();j+=n;return C;       case 11:String[]S=new String[n];for(;i<n;i++)S[i]=rs();return S;

 case 12:Timestamp[]P=new Timestamp[n];for(;i<n;i++)P[i]=rp();return P; case 13:Month[]M=new Month[n];for(;i<n;i++)M[i]=rm();return M;

 case 14:Date[]D=new Date[n];for(;i<n;i++)D[i]=rd();return D;           case 15:java.util.Date[]Z=new java.util.Date[n];for(;i<n;i++)Z[i]=rz();return Z;

 case 16:Timespan[]N=new Timespan[n];for(;i<n;i++)N[i]=rn();return N;   case 17:Minute[]U=new Minute[n];for(;i<n;i++)U[i]=ru();return U;

 case 18:Second[]V=new Second[n];for(;i<n;i++)V[i]=rv();return V;       case 19:Time[]T=new Time[n];for(;i<n;i++)T[i]=rt();return T;}return null;}

//object.getClass().isArray()   t(int[]) is .5 isarray is .1 lookup .05

public static int t(Object x){return

 x instanceof Boolean?-1:x instanceof Byte?-4:x instanceof Short?-5:x instanceof Integer?-6:x instanceof Long?-7:x instanceof Float?-8:x instanceof Double?-9:x instanceof Character?-10:x instanceof String?-11:

x instanceof Date?-14:x instanceof Time?-19:x instanceof Timestamp?-12:x instanceof java.util.Date?-15:x instanceof Timespan?-16: x instanceof Month?-13:x instanceof Minute?-17:x instanceof Second?-18:

 x instanceof boolean[]?1:x instanceof byte[]?4:x instanceof short[]?5:x instanceof int[]?6:x instanceof long[]?7:x instanceof float[]?8:x instanceof double[]?9:x instanceof char[]?10:x instanceof String[]?11:

x instanceof Date[]?14:x instanceof Time[]?19:x instanceof Timestamp[]?12:x instanceof java.util.Date[]?15:x instanceof Timespan[]?16:x instanceof Month[]?13:x instanceof Minute[]?17:x instanceof Second[]?18:

 x instanceof Flip?98:x instanceof Dict?99:0;}

static int[]nt={0,1,0,0,1,2,4,8,4,8,1,0,8,4,4,8,8,4,4,4};static int ns(String s)throws UnsupportedEncodingException{int i;if(s==null)return 0;if(-1<(i=s.indexOf('\000')))s=s.substring(0,i);return s.getBytes(e).length;}

public static int n(Object x)throws UnsupportedEncodingException{return x instanceof Dict?n(((Dict)x).x):x instanceof Flip?n(((Flip)x).y[0]):x instanceof char[]?new String((char[])x).getBytes(e).length:Array.getLength(x);}

public int nx(Object x)throws UnsupportedEncodingException{int i=0,n,t=t(x),j;if(t==99)return 1+nx(((Dict)x).x)+nx(((Dict)x).y);if(t==98)return 3+nx(((Flip)x).x)+nx(((Flip)x).y);

 if(t<0)return t==-11?2+ns((String)x):1+nt[-t];j=6;n=n(x);if(t==0||t==11)for(;i<n;++i)j+=t==0?nx(((Object[])x)[i]):1+ns(((String[])x)[i]);else j+=n*nt[t];return j;}

void w(Object x)throws UnsupportedEncodingException{int i=0,n,t=t(x);w((byte)t);if(t<0)switch(t){case-1:w(((Boolean)x).booleanValue());return;

  case-4:w(((Byte)x).byteValue());return;       case-5:w(((Short)x).shortValue());return;

  case-6:w(((Integer)x).intValue());return;     case-7:w(((Long)x).longValue());return;

  case-8:w(((Float)x).floatValue());return;     case-9:w(((Double)x).doubleValue());return;

  case-10:w(((Character)x).charValue());return; case-11:w((String)x);return;

  case-12:w((Timestamp)x);return;               case-13:w((Month)x);return;case-14:w((Date)x);return;

  case-15:w((java.util.Date)x);return;          case-16:w((Timespan)x);return;case-17:w((Minute)x);return;

  case-18:w((Second)x);return;case-19:w((Time)x);return;}

 if(t==99){Dict r=(Dict)x;w(r.x);w(r.y);return;}B[J++]=0;if(t==98){Flip r=(Flip)x;B[J++]=99;w(r.x);w(r.y);return;}

 w(n=n(x));if(t==10){byte[]b=new String((char[])x).getBytes(e);for(;i<b.length;)w(b[i++]);}else for(;i<n;++i)if(t==0)w(((Object[])x)[i]);else if(t==1)w(((boolean[])x)[i]);else if(t==4)w(((byte[])x)[i]);

 else if(t==5)w(((short[])x)[i]);else if(t==6)w(((int[])x)[i]);else if(t==7)w(((long[])x)[i]);

 else if(t==8)w(((float[])x)[i]);else if(t==9)w(((double[])x)[i]);

 else if(t==11)w(((String[])x)[i]);else if(t==12)w(((Timestamp[])x)[i]);else if(t==13)w(((Month[])x)[i]);else if(t==14)w(((Date[])x)[i]);

 else if(t==15)w(((java.util.Date[])x)[i]);else if(t==16)w(((Timespan[])x)[i]);else if(t==17)w(((Minute[])x)[i]);else if(t==18)w(((Second[])x)[i]);

 else w(((Time[])x)[i]);}

void w(int i,Object x)throws IOException{int n=nx(x)+8;synchronized(o){B=new byte[n];B[0]=0;B[1]=(byte)i;J=4;w(n);w(x);o.write(B);}}

public void ks(String s)throws IOException{w(0,cs(s));}public void ks(Object x)throws IOException{w(0,x);} char[]cs(String s){return s.toCharArray();}

public void ks(String s,Object x)throws IOException{Object[]a={cs(s),x};w(0,a);}

public void ks(String s,Object x,Object y)throws IOException{Object[]a={cs(s),x,y};w(0,a);}

public void ks(String s,Object x,Object y,Object z)throws IOException{Object[]a={cs(s),x,y,z};w(0,a);}

public Object k()throws KException,IOException,UnsupportedEncodingException{synchronized(i){i.readFully(b=new byte[8]);a=b[0]==1;boolean c=b[2]==1;j=4;i.readFully(b=new byte[ri()-8]);if(c)u();else j=0;if(b[0]==-128){j=1;throw new KException(rs());}return r();}}

public synchronized Object k(Object x)throws KException,IOException{w(1,x);return k();}

public Object k(String s)throws KException,IOException{return k(cs(s));}

public Object k(String s,Object x)throws KException,IOException{Object[]a={cs(s),x};return k(a);}

public Object k(String s,Object x,Object y)throws KException,IOException{Object[]a={cs(s),x,y};return k(a);}

public Object k(String s,Object x,Object y,Object z)throws KException,IOException{Object[]a={cs(s),x,y,z};return k(a);}

public static Object[]NULL={null,new Boolean(false),null,null,new Byte((byte)0),new Short(Short.MIN_VALUE),new Integer(ni),new Long(nj),new Float(nf),new Double(nf),new Character(' '),"",

 new Timestamp(nj),new Month(ni),new Date(nj),new java.util.Date(nj),new Timespan(nj),new Minute(ni),new Second(ni),new Time(nj)};

public static Object NULL(char c){return NULL[" b  xhijefcspmdznuvt".indexOf(c)];}

public static boolean qn(Object x){int t=-t(x);return t>4&&x.equals(NULL[t]);}

public static Object at(Object x,int i){return qn(x=Array.get(x,i))?null:x;}

public static void set(Object x,int i,Object y){Array.set(x,i,null==y?NULL[t(x)]:y);}

static int find(String[]x,String y){int i=0;for(;i<x.length&&!x[i].equals(y);)++i;return i;}

public static Flip td(Object X)throws java.io.UnsupportedEncodingException{if(X instanceof Flip)return(Flip)X;Dict d=(Dict)X;Flip a=(Flip)d.x,b=(Flip)d.y;int m=n(a.x),n=n(b.x);String[]x=new String[m+n];System.arraycopy(a.x,0,x,0,m);System.arraycopy(b.x,0,x,m,n);Object[]y=new Object[m+n];System.arraycopy(a.y,0,y,0,m);System.arraycopy(b.y,0,y,m,n);return new Flip(new Dict(x,y));}

public static Object O(Object x){out.println(x);return x;}public static void O(int x){out.println(x);}public static void O(boolean x){out.println(x);}public static void O(long x){out.println(x);}public static void O(double x){out.println(x);}

public static long t(){return System.currentTimeMillis();}static long t;public static void tm(){long u=t;t=t();if(u>0)O(t-u);}static String i2(int i){return new DecimalFormat("00").format(i);}

}

//2012.02.09 close() if connect fails

//2012.01.06 read datetime, rz(), was truncating mS rather than rounding

//2010.10.06 block sending timestamp/timespan types to versions prior to kdb+2.6

//2010.05.06 optimized rs() for reading null symbols

//2010.03.20 changed datetime to java.util.Date as it was incompatible with timestamp

//2010.02.01 added unicode support for char vectors and symbol

//2010.01.06 fixed 0Np

//2009.12.07 removed v6 dependencies

//2009.12.02 uncommented at, set and qn

//2009.10.29 u - uncompress, connect retry for v<=2.5

//2009.09.23 Timestamp,Timespan,v6 connect

//2008.08.14 String(,,,"ISO-8859-1") to avoid mutex

//2007.10.18 tz

//2007.08.06 kx

//2007.04.20 sql.{Date|Time|Timestamp}

继续阅读