千家信息网

如何进行MaxCompute full outer join改写left anti join的实践分析

发表于:2024-11-24 作者:千家信息网编辑
千家信息网最后更新 2024年11月24日,如何进行MaxCompute full outer join改写left anti join的实践分析,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更
千家信息网最后更新 2024年11月24日如何进行MaxCompute full outer join改写left anti join的实践分析

如何进行MaxCompute full outer join改写left anti join的实践分析,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

简介: ods层数据同步时经常会遇到增全量合并的模型,即T-1天增量表 + T-2全量表 = T-1全量表。可以通过full outer join脚本来完成合并,但是数据量很大时非常消耗资源。下面将为您介绍在做增量数据的增加、更新时如何通过full outer join改写left anti join来实现的最佳实践。

背景

ods层数据同步时经常会遇到增全量合并的模型,即T-1天增量表 + T-2全量表 = T-1全量表。可以通过full outer join脚本来完成合并,但是数据量很大时非常消耗资源。

insert overwrite table tb_test partition(ds='${bizdate}')select case when a.id is not null then a.id esle b.id end as id         ,if(a.name is not null, a.name, b.name) as name  ,coalesce(a.age, b.age) as age       --这3种写法一样,都是优先取delta表的字段from(   select * from tb_test_delta where ds='${bizdate}') afull outer join(   select * from tb_test where ds='${bizdate-1}') bon a.id =b.id;

这种写法可实现新增和更新操作:

  • 新增是指增量表中新出现的数据,而全量表中没有;

  • 更新是指增量表和全量表中都有的数据,但优先取增量表的数据,覆盖历史表的数据。
    如下图所示,R2_1是增量表当天去重后增量数据,M3是全量表前一天的数据,而J4_2_3则是full outer join的执行图。

将J4_2_3展开会发现里面将增量和全量进行了merge join,当数据量很大(1288亿条)时会产生很大的shuffle开销。此时优化方案就是将full outer join改成 union all,从而避免join shuffle

优化模型

结论:full outer join改成hash cluster + left join +union all可以有效地降低计算成本,且有两种应用场景。先将模型进行抽象,假设有a和b两个表,a是增量表,b是全量表:

with  a as ( select * from values  (1,'111')                             ,(2,'two')                             ,(7,'777') as (id,name) ) --增量,b as ( select * from values  (1,'')                             ,(2,'222')                             ,(3,'333')                             ,(4,'444') as (id,name) )  --全量

场景1:只合并新增数据到全量表

left anti join相当于not in,增量not in全量,过滤后只剩下完全新增的id,对全量中已有的id不修改:

--查询完全新增的idselect * from a left anti join b on a.id=b.id ;--结果如下+------------+------+| id         | name |+------------+------+| 7          | 777  |+------------+------+
--完全新增的合并全量表select * from  a --增量表left anti join b on a.id=b.id  union all select * from b  --全量表--结果如下+------------+------+| id         | name |+------------+------+| 1          |      || 2          | 222  || 3          | 333  || 4          | 444  || 7          | 777  |+------------+------+

场景2:合并新增数据到全量表,且更新历史数据

全量not in增量,过滤后只剩下历史的id,然后union all增量,既新增也修改

--查询历史全量数据select * from b left anti join a on a.id=b.id;--结果如下+------------+------+| id         | name |+------------+------+| 3          | 333  || 4          | 444  |+------------+------+
--合并新增数据到全量表,且更新历史数据select * from  b --全量表left anti join a on a.id=b.idunion all select * from a ; --增量表--结果如下+------------+------+| id         | name |+------------+------+| 1          | 111  || 2          | two  || 7          | 777  || 3          | 333  || 4          | 444  |+------------+------+

优化实践

步骤1:表属性修改

表、作业属性修改,对原来的表、作业进行属性优化,可以提升优化效果。

set odps.sql.reducer.instances=3072;  --可选。默认最大1111个reducer,1111哈希桶。alter table table_name clustered by(contact_id) sorted by(contact_id) into 3072 buckets;--必选

步骤2:按照上述模型的场景1 或者 场景2进行代码改造。

这里先给出代码改造后的资源消耗对比:

原来的full outer jionleft anti join初始化原来的full outer jionleft anti join第二天以后
时间消耗8h40min38s1h5min48s7h42min30s32min30s
cpu消耗29666.02 Core * Min65705.30 Core * Min31126.86 Core * Min30589.29 Core * Min
mem消耗109640.80 GB * Min133922.25 GB * Min114764.80 GB * Min65509.28 GB * Min

可以发现hash cluster分桶操作在初始化有额外的开销,主要是按主键进行散列和排序,但是这是值得的,可一劳永逸,后续的读取速度非常快。以前每天跑需要8小时,现在除了分桶初始化需要1小时,以后每天实际只需要30分钟。

初始化执行图

图1:

  • M2是读全量表。

  • M4是读取增量表,在场景2的模型中增量表被读取了两次,其中:

    • R5_4是对主键去重(row_number)后用于后面的union all,里面包含了所有的增量数据;

    • R1_4是对主键去重(row_number)后用于left anti join,里面只包含了主键。

  • J3_1_2是left anti join,将它展开后看到这里还是有mergJoin,但是这只是初始化的操作,后面每天就不会有了。展开后如图2。

  • R6_3_5是将增量和全量进行union all,展开后如图3。

  • R7_6则是将索引信息写入元数据,如图3的MetaCollector1会在R7_6中sink。
    因此:图1中除了R5_4和R1_4是去重必须的,有shuffle。还有J3_1_2和R6_3_5这两个地方有shuffle。

图2:

第二天以后的执行图

R3_2和R1_2是对增量去重必要对操作,有shuffle,这里忽略。

初始化执行图的J3_1_2和R6_3_5已经被合并到了M4_1_3,将其展开后如图2。即left anti join 和 union all这两步操作在一个阶段完成了,且这个阶段是Map 任务(M4_1_3),而不是Join任务或Reduce任务。而且全量表不在单独占用一个Map任务,也被合并到了M4_1_3,因此整个过程下来没有shuffle操作,速度提升非常明显。也就是说只需要一个M4_1_3就能完成所有到操作,直接sink到表。

R5_4则是将索引信息写入元数据,如图2的MetaCollector1会在R5_4中sink。

图2:

关于如何进行MaxCompute full outer join改写left anti join的实践分析问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。

0