天天看點

如何使用Tunnel SDK上傳/下載下傳MaxCompute複雜類型資料複雜資料類型複雜類型構造與操作函數Tunnel SDK 介紹基于Tunnel SDK構造複雜類型資料從MaxCompute下載下傳複雜類型資料運作執行個體

基于Tunnel SDK如何上傳複雜類型資料到MaxCompute?首先介紹一下MaxCompute複雜資料類型:

複雜資料類型

MaxCompute采用基于ODPS2.0的SQL引擎,豐富了對複雜資料類型類型的支援。MaxCompute支援ARRAY, MAP, STRUCT類型,并且可以任意嵌套使用并提供了配套的内建函數。

類型 定義方法 構造方法
ARRAY array;array> array(1, 2, 3); array(array(1, 2); array(3, 4))
MAP map;map> map(“k1”, “v1”, “k2”, “v2”);map(1S, array(‘a’, ‘b’), 2S, array(‘x’, ‘y))
STRUCT struct;struct< field1:bigint, field2:array, field3:map> named_struct(‘x’, 1, ‘y’, 2);named_struct(‘field1’, 100L, ‘field2’, array(1, 2), ‘field3’, map(1, 100, 2, 200)

複雜類型構造與操作函數

傳回類型 簽名 注釋
map(K key1, V value1, K key2, V value2, ...) 使用給定key/value對建立map, 所有key類型一緻,必須是基本類型,所有value類型一緻,可為任意類型
map_keys(Map m) 将參數中的map的所有key作為數組傳回,輸入NULL,傳回NULL
map_values(MAP m) 将參數中的map的所有value作為數組傳回,輸入NULL,傳回NULL
int size(MAP) 取得給定MAP元素數目
TABLE explode(MAP) 表生成函數,将給定MAP展開,每個key/value一行,每行兩列分别對應key和value
array(T value1, T value2, ...) 使用給定value構造ARRAY,所有value類型一緻
size(ARRAY) 取得給定ARRAY元素數目
boolean array_contains(ARRAY a, value v) 檢測給定ARRAY a中是否包含v
sort_array(ARRAY) 對給定數組排序
collect_list(T col) 聚合函數,在給定group内,将col指定的表達式聚合為一個數組
collect_set(T col) 聚合函數,在給定group内,将col指定的表達式聚合為一個無重複元素的集合數組
explode(ARRAY) 表生成函數,将給定ARRAY展開,每個value一行,每行一列對應相應數組元素
TABLE (int, T) posexplode(ARRAY) 表生成函數,将給定ARRAY展開,每個value一行,每行兩列分别對應數組從0開始的下标和數組元素
struct(T1 value1, T2 value2, ...) 使用給定value清單建立struct, 各value可為任意類型,生成struct的field的名稱依次為col1, col2, ...
named_struct(name1, value1, name2, value2, ...) 使用給定name/value清單建立struct, 各value可為任意類型,生成struct的field的名稱依次為name1, name2, ...
TABLE (f1 T1, f2 T2, ...) inline(ARRAY>) 表生成函數,将給定struct數組展開,每個元素對應一行,每行每個struct元素對應一列

Tunnel SDK 介紹

Tunnel 是 ODPS 的資料通道,使用者可以通過 Tunnel 向 ODPS 中上傳或者下載下傳資料。

TableTunnel 是通路 ODPS Tunnel 服務的入口類,僅支援表資料(非視圖)的上傳和下載下傳。

對一張表或 partition 上傳下載下傳的過程,稱為一個session。session 由一或多個到 Tunnel RESTful API 的 HTTP Request 組成。

session 用 session ID 來辨別,session 的逾時時間是24小時,如果大批量資料傳輸導緻超過24小時,需要自行拆分成多個 session。

資料的上傳和下載下傳分别由 

TableTunnel.UploadSession

 和 

TableTunnel.DownloadSession

 這兩個會話來負責。

TableTunnel 提供建立 UploadSession 對象和 DownloadSession 對象的方法.

  • 典型表資料上傳流程: 

    1) 建立 TableTunnel

    2) 建立 UploadSession

    3) 建立 RecordWriter,寫入 Record

    4)送出上傳操作

  • 典型表資料下載下傳流程:

    2) 建立 DownloadSession

    3) 建立 RecordReader,讀取 Record

基于Tunnel SDK構造複雜類型資料

代碼示例:

RecordWriter recordWriter = uploadSession.openRecordWriter(0);
      ArrayRecord record = (ArrayRecord) uploadSession.newRecord();

      // prepare data
      List arrayData = Arrays.asList(1, 2, 3);
      Map<String, Long> mapData = new HashMap<String, Long>();
      mapData.put("a", 1L);
      mapData.put("c", 2L);

      List<Object> structData = new ArrayList<Object>();
      structData.add("Lily");
      structData.add(18);

      // set data to record
      record.setArray(0, arrayData);
      record.setMap(1, mapData);
      record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(),
                                           structData));

      // write the record
      recordWriter.write(record);           

從MaxCompute下載下傳複雜類型資料

RecordReader recordReader = downloadSession.openRecordReader(0, 1);

      // read the record
      ArrayRecord record1 = (ArrayRecord)recordReader.read();

      // get array field data
      List field0 = record1.getArray(0);
      List<Long> longField0 = record1.getArray(Long.class, 0);

      // get map field data
      Map field1 = record1.getMap(1);
      Map<String, Long> typedField1 = record1.getMap(String.class, Long.class, 1);

      // get struct field data
      Struct field2 = record1.getStruct(2);           

運作執行個體

完整代碼如下:

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.data.SimpleStruct;
import com.aliyun.odps.data.Struct;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.StructTypeInfo;

public class TunnelComplexTypeSample {

  private static String accessId = "<your access id>";
  private static String accessKey = "<your access Key>";
  private static String odpsUrl = "<your odps endpoint>";
  private static String project = "<your project>";

  private static String table = "<your table name>";

  // partitions of a partitioned table, eg: "pt=\'1\',ds=\'2\'"
  // if the table is not a partitioned table, do not need it
  private static String partition = "<your partition spec>";

  public static void main(String args[]) {
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setEndpoint(odpsUrl);
    odps.setDefaultProject(project);

    try {
      TableTunnel tunnel = new TableTunnel(odps);
      PartitionSpec partitionSpec = new PartitionSpec(partition);

      // ---------- Upload Data ---------------
      // create upload session for table
      // the table schema is {"col0": ARRAY<BIGINT>, "col1": MAP<STRING, BIGINT>, "col2": STRUCT<name:STRING,age:BIGINT>}
      UploadSession uploadSession = tunnel.createUploadSession(project, table, partitionSpec);
      // get table schema
      TableSchema schema = uploadSession.getSchema();

      // open record writer
      RecordWriter recordWriter = uploadSession.openRecordWriter(0);
      ArrayRecord record = (ArrayRecord) uploadSession.newRecord();

      // prepare data
      List arrayData = Arrays.asList(1, 2, 3);
      Map<String, Long> mapData = new HashMap<String, Long>();
      mapData.put("a", 1L);
      mapData.put("c", 2L);

      List<Object> structData = new ArrayList<Object>();
      structData.add("Lily");
      structData.add(18);

      // set data to record
      record.setArray(0, arrayData);
      record.setMap(1, mapData);
      record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(),
                                           structData));

      // write the record
      recordWriter.write(record);

      // close writer
      recordWriter.close();

      // commit uploadSession, the upload finish
      uploadSession.commit(new Long[]{0L});
      System.out.println("upload success!");

      // ---------- Download Data ---------------
      // create download session for table
      // the table schema is {"col0": ARRAY<BIGINT>, "col1": MAP<STRING, BIGINT>, "col2": STRUCT<name:STRING,age:BIGINT>}
      DownloadSession downloadSession = tunnel.createDownloadSession(project, table, partitionSpec);
      schema = downloadSession.getSchema();

      // open record reader, read one record here for example
      RecordReader recordReader = downloadSession.openRecordReader(0, 1);

      // read the record
      ArrayRecord record1 = (ArrayRecord)recordReader.read();

      // get array field data
      List field0 = record1.getArray(0);
      List<Long> longField0 = record1.getArray(Long.class, 0);

      // get map field data
      Map field1 = record1.getMap(1);
      Map<String, Long> typedField1 = record1.getMap(String.class, Long.class, 1);

      // get struct field data
      Struct field2 = record1.getStruct(2);

      System.out.println("download success!");
    } catch (TunnelException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    }

  }
}           
如何使用Tunnel SDK上傳/下載下傳MaxCompute複雜類型資料複雜資料類型複雜類型構造與操作函數Tunnel SDK 介紹基于Tunnel SDK構造複雜類型資料從MaxCompute下載下傳複雜類型資料運作執行個體