千家信息网

如何使用Tunnel SDK上传和下载MaxCompute复杂类型数据

发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,如何使用Tunnel SDK上传和下载MaxCompute复杂类型数据,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。基于Tunnel S
千家信息网最后更新 2025年02月08日如何使用Tunnel SDK上传和下载MaxCompute复杂类型数据

如何使用Tunnel SDK上传和下载MaxCompute复杂类型数据,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

基于Tunnel SDK如何上传复杂类型数据到MaxCompute?首先介绍一下MaxCompute复杂数据类型:

复杂数据类型

MaxCompute采用基于ODPS2.0的SQL引擎,丰富了对复杂数据类型类型的支持。MaxCompute支持ARRAY, MAP, STRUCT类型,并且可以任意嵌套使用并提供了配套的内建函数。

类型定义方法构造方法
ARRAYarray;array>array(1, 2, 3); array(array(1, 2); array(3, 4))
MAPmap;map>map("k1", "v1", "k2", "v2");map(1S, array('a', 'b'), 2S, array('x', 'y))
STRUCTstruct;struct< field1:bigint, field2:array, field3:map>named_struct('x', 1, 'y', 2);named_struct('field1', 100L, 'field2', array(1, 2), 'field3', map(1, 100, 2, 200)

复杂类型构造与操作函数

返回类型签名注释
MAPmap(K key1, V value1, K key2, V value2, ...)使用给定key/value对建立map, 所有key类型一致,必须是基本类型,所有value类型一致,可为任意类型
ARRAYmap_keys(Map m)将参数中的map的所有key作为数组返回,输入NULL,返回NULL
ARRAYmap_values(MAP m)将参数中的map的所有value作为数组返回,输入NULL,返回NULL
intsize(MAP)取得给定MAP元素数目
TABLEexplode(MAP)表生成函数,将给定MAP展开,每个key/value一行,每行两列分别对应key和value
ARRAYarray(T value1, T value2, ...)使用给定value构造ARRAY,所有value类型一致
intsize(ARRAY)取得给定ARRAY元素数目
booleanarray_contains(ARRAY a, value v)检测给定ARRAY a中是否包含v
ARRAYsort_array(ARRAY)对给定数组排序
ARRAYcollect_list(T col)聚合函数,在给定group内,将col指定的表达式聚合为一个数组
ARRAYcollect_set(T col)聚合函数,在给定group内,将col指定的表达式聚合为一个无重复元素的集合数组
TABLEexplode(ARRAY)表生成函数,将给定ARRAY展开,每个value一行,每行一列对应相应数组元素
TABLE (int, T)posexplode(ARRAY)表生成函数,将给定ARRAY展开,每个value一行,每行两列分别对应数组从0开始的下标和数组元素
STRUCTstruct(T1 value1, T2 value2, ...)使用给定value列表建立struct, 各value可为任意类型,生成struct的field的名称依次为col1, col2, ...
STRUCTnamed_struct(name1, value1, name2, value2, ...)使用给定name/value列表建立struct, 各value可为任意类型,生成struct的field的名称依次为name1, name2, ...
TABLE (f1 T1, f2 T2, ...)inline(ARRAY>)表生成函数,将给定struct数组展开,每个元素对应一行,每行每个struct元素对应一列

Tunnel SDK 介绍

Tunnel 是 ODPS 的数据通道,用户可以通过 Tunnel 向 ODPS 中上传或者下载数据。
TableTunnel 是访问 ODPS Tunnel 服务的入口类,仅支持表数据(非视图)的上传和下载。

对一张表或 partition 上传下载的过程,称为一个session。session 由一或多个到 Tunnel RESTful API 的 HTTP Request 组成。
session 用 session ID 来标识,session 的超时时间是24小时,如果大批量数据传输导致超过24小时,需要自行拆分成多个 session。
数据的上传和下载分别由 TableTunnel.UploadSessionTableTunnel.DownloadSession 这两个会话来负责。
TableTunnel 提供创建 UploadSession 对象和 DownloadSession 对象的方法.

  • 典型表数据上传流程:
    1) 创建 TableTunnel
    2) 创建 UploadSession
    3) 创建 RecordWriter,写入 Record
    4)提交上传操作

  • 典型表数据下载流程:
    1) 创建 TableTunnel
    2) 创建 DownloadSession
    3) 创建 RecordReader,读取 Record

基于Tunnel SDK构造复杂类型数据

代码示例:

            RecordWriter recordWriter = uploadSession.openRecordWriter(0);      ArrayRecord record = (ArrayRecord) uploadSession.newRecord();      // prepare data      List arrayData = Arrays.asList(1, 2, 3);      Map mapData = new HashMap();      mapData.put("a", 1L);      mapData.put("c", 2L);      List structData = new ArrayList();      structData.add("Lily");      structData.add(18);      // set data to record      record.setArray(0, arrayData);      record.setMap(1, mapData);      record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(),                                           structData));      // write the record      recordWriter.write(record);

从MaxCompute下载复杂类型数据

代码示例:

            RecordReader recordReader = downloadSession.openRecordReader(0, 1);      // read the record      ArrayRecord record1 = (ArrayRecord)recordReader.read();      // get array field data      List field0 = record1.getArray(0);      List longField0 = record1.getArray(Long.class, 0);      // get map field data      Map field1 = record1.getMap(1);      Map typedField1 = record1.getMap(String.class, Long.class, 1);      // get struct field data      Struct field2 = record1.getStruct(2);

运行实例

完整代码如下:

import java.io.IOException;import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import java.util.List;import java.util.Map;import com.aliyun.odps.Odps;import com.aliyun.odps.PartitionSpec;import com.aliyun.odps.TableSchema;import com.aliyun.odps.account.Account;import com.aliyun.odps.account.AliyunAccount;import com.aliyun.odps.data.ArrayRecord;import com.aliyun.odps.data.RecordReader;import com.aliyun.odps.data.RecordWriter;import com.aliyun.odps.data.SimpleStruct;import com.aliyun.odps.data.Struct;import com.aliyun.odps.tunnel.TableTunnel;import com.aliyun.odps.tunnel.TableTunnel.UploadSession;import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;import com.aliyun.odps.tunnel.TunnelException;import com.aliyun.odps.type.StructTypeInfo;public class TunnelComplexTypeSample {  private static String accessId = "";  private static String accessKey = "";  private static String odpsUrl = "";  private static String project = "";  private static String table = "";  // partitions of a partitioned table, eg: "pt=\'1\',ds=\'2\'"  // if the table is not a partitioned table, do not need it  private static String partition = "";  public static void main(String args[]) {    Account account = new AliyunAccount(accessId, accessKey);    Odps odps = new Odps(account);    odps.setEndpoint(odpsUrl);    odps.setDefaultProject(project);    try {      TableTunnel tunnel = new TableTunnel(odps);      PartitionSpec partitionSpec = new PartitionSpec(partition);      // ---------- Upload Data ---------------      // create upload session for table      // the table schema is {"col0": ARRAY, "col1": MAP, "col2": STRUCT}      UploadSession uploadSession = tunnel.createUploadSession(project, table, partitionSpec);      // get table schema      TableSchema schema = uploadSession.getSchema();      // open record writer      RecordWriter recordWriter = uploadSession.openRecordWriter(0);      ArrayRecord record = (ArrayRecord) uploadSession.newRecord();      // prepare data      List arrayData = Arrays.asList(1, 2, 3);      Map mapData = new HashMap();      mapData.put("a", 1L);      mapData.put("c", 2L);      List structData = new ArrayList();      structData.add("Lily");      structData.add(18);      // set data to record      record.setArray(0, arrayData);      record.setMap(1, mapData);      record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(),                                           structData));      // write the record      recordWriter.write(record);      // close writer      recordWriter.close();      // commit uploadSession, the upload finish      uploadSession.commit(new Long[]{0L});      System.out.println("upload success!");      // ---------- Download Data ---------------      // create download session for table      // the table schema is {"col0": ARRAY, "col1": MAP, "col2": STRUCT}      DownloadSession downloadSession = tunnel.createDownloadSession(project, table, partitionSpec);      schema = downloadSession.getSchema();      // open record reader, read one record here for example      RecordReader recordReader = downloadSession.openRecordReader(0, 1);      // read the record      ArrayRecord record1 = (ArrayRecord)recordReader.read();      // get array field data      List field0 = record1.getArray(0);      List longField0 = record1.getArray(Long.class, 0);      // get map field data      Map field1 = record1.getMap(1);      Map typedField1 = record1.getMap(String.class, Long.class, 1);      // get struct field data      Struct field2 = record1.getStruct(2);      System.out.println("download success!");    } catch (TunnelException e) {      e.printStackTrace();    } catch (IOException e) {      e.printStackTrace();    }  }}

看完上述内容,你们掌握如何使用Tunnel SDK上传和下载MaxCompute复杂类型数据的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0