千家信息网

PyODPS DataFrame 处理笛卡尔积的几种方式分别是什么

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,今天就跟大家聊聊有关PyODPS DataFrame 处理笛卡尔积的几种方式分别是什么,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。PyODP
千家信息网最后更新 2025年02月04日PyODPS DataFrame 处理笛卡尔积的几种方式分别是什么

今天就跟大家聊聊有关PyODPS DataFrame 处理笛卡尔积的几种方式分别是什么,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

PyODPS 提供了 DataFrame API 来用类似 pandas 的接口进行大规模数据分析以及预处理,本文主要介绍如何使用 PyODPS 执行笛卡尔积的操作。

笛卡尔积最常出现的场景是两两之间需要比较或者运算。以计算地理位置距离为例,假设大表 Coordinates1 存储目标点经纬度坐标,共有 M 行数据,小表 Coordinates2 存储出发点经纬度坐标,共有 N 行数据,现在需要计算所有离目标点最近的出发点坐标。对于一个目标点来说,我们需要计算所有的出发点到目标点的距离,然后找到最小距离,所以整个中间过程需要产生 M * N 条数据,也就是一个笛卡尔积问题。

haversine 公式

首先简单介绍一下背景知识,已知两个地理位置的坐标点的经纬度,求解两点之间的距离可以使用 haversine 公式,使用 Python 的表达如下:

def  haversine(lat1,  lon1,  lat2,  lon2):        #  lat1,  lon1  为位置  1  的经纬度坐标        #  lat2,  lon2  为位置  2  的经纬度坐标        import  numpy  as  np        dlon  =  np.radians(lon2  -  lon1)        dlat  =  np.radians(lat2  -  lat1)        a  =  np.sin(  dlat  /2  )  **2  +  np.cos(np.radians(lat1))  *  np.cos(np.radians(lat2))  *  np.sin(  dlon  /2  )  **2        c  =  2  *  np.arcsin(np.sqrt(a))        r  =  6371  #  地球平均半径,单位为公里        return  c  *  r

MapJoin

目前最推荐的方法就是使用 mapjoin,PyODPS 中使用 mapjoin 的方式十分简单,只需要两个 dataframe join 时指定 mapjoin=True,执行时会对右表做 mapjoin 操作。

In  [3]:  df1  =  o.get_table('coordinates1').to_df()                                                                                                                                                                                        In  [4]:  df2  =  o.get_table('coordinates2').to_df()                                                                                                                                                                                        In  [5]:  df3  =  df1.join(df2,  mapjoin=True)                                                                                                                                                                                                        In  [6]:  df1.schema                                                                                                                                                                                                                                                      Out[6]:  odps.Schema  {    latitude                    float64                  longitude                  float64                  id                                string                }In  [7]:  df2.schema                                                                                                                                                                                                                                                      Out[7]:  odps.Schema  {    latitude                    float64                  longitude                  float64                  id                                string                }In  [8]:  df3.schema                                                                                                                                                                                                                                                      Out[8]:  odps.Schema  {    latitude_x                        float64                  longitude_x                      float64                  id_x                                    string                    latitude_y                        float64                  longitude_y                      float64                  id_y                                    string                }

可以看到在执行 join 时默认会将重名列加上 _x 和 _y 后缀,可通过在 suffixes 参数中传入一个二元 tuple 来自定义后缀,当有了 join 之后的表后,通过 PyODPS 中 DataFrame 的自建函数就可以计算出距离,十分简洁明了,并且效率很高。

In  [9]:  r  =  6371        ...:  dis1  =  (df3.latitude_y  -  df3.latitude_x).radians()        ...:  dis2  =  (df3.longitude_y  -  df3.longitude_x).radians()        ...:  a  =  (dis1  /  2).sin()  **  2  +  df3.latitude_x.radians().cos()  *  df3.latitude_y.radians().cos()  *  (dis2  /  2).sin()  **  2        ...:  df3['dis']  =  2  *  a.sqrt().arcsin()  *  r                                                                                                                                                                                                                                                                                                                                                                                                      In [12]: df3.head(10)                                                                                                                        Out[12]:     latitude_x  longitude_x id_x  latitude_y   longitude_y id_y       dis0   76.252432    59.628253    0   84.045210     6.517522    0  1246.8649811   76.252432    59.628253    0   59.061796     0.794939    1  2925.9531472   76.252432    59.628253    0   42.368304    30.119837    2  4020.6049423   76.252432    59.628253    0   81.290936    51.682749    3   584.7797484   76.252432    59.628253    0   34.665222   147.167070    4  6213.9449425   76.252432    59.628253    0   58.058854   165.471565    5  4205.2191796   76.252432    59.628253    0   79.150677    58.661890    6   323.0707857   76.252432    59.628253    0   72.622352   123.195778    7  1839.3807608   76.252432    59.628253    0   80.063614   138.845193    8  1703.7824219   76.252432    59.628253    0   36.231584    90.774527    9  4717.284949In [13]: df1.count()                                                                                                                         Out[13]: 2000In [14]: df2.count()                                                                                                                         Out[14]: 100In [15]: df3.count()                                                                                                                         Out[15]: 200000

df3 已经是有 M * N 条数据了,接下来如果需要知道最小距离,直接对 df3 调用 groupby 接上 min 聚合函数就可以得到每个目标点的最小距离。

In [16]: df3.groupby('id_x').dis.min().head(10)                                                                                              Out[16]:        dis_min0   323.0707851    64.7554932  1249.2831693   309.8182884  1790.4847485   385.1077396   498.8161577   615.9874678   437.7654329   272.589621

DataFrame 自定义函数

如果我们需要知道对应最小距离的点的城市,也就是表中对应的 id ,可以在 mapjoin 之后调用 MapReduce,不过我们还有另一种方式是使用 DataFrame 的 apply 方法。要对一行数据使用自定义函数,可以使用 apply 方法,axis 参数必须为 1,表示在行上操作。

表资源

要注意 apply 是在服务端执行的 UDF,所以不能在函数内使用类似于df=o.get_table('table_name').to_df() 的表达式去获得表数据,具体原理可以参考PyODPS DataFrame 的代码在哪里跑。以本文中的情况为例,要想将表 1 与表 2 中所有的记录计算,那么需要将表 2 作为一个资源表,然后在自定义中引用该表资源。PyODPS 中使用表资源也十分方便,只需要将一个 collection 传入 resources 参数即可。collection 是个可迭代对象,不是一个 DataFrame 对象,不可以直接调用 DataFrame 的接口,每个迭代值是一个 namedtuple,可以通过字段名或者偏移来取对应的值。

## use dataframe udfdf1 = o.get_table('coordinates1').to_df()df2 = o.get_table('coordinates2').to_df()def func(collections):    import pandas as pd        collection = collections[0]        ids = []    latitudes = []    longitudes = []    for r in collection:        ids.append(r.id)        latitudes.append(r.latitude)        longitudes.append(r.longitude)    df = pd.DataFrame({'id': ids, 'latitude':latitudes, 'longitude':longitudes})    def h(x):                df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)        return df.iloc[df['dis'].idxmin()]['id']    return hdf1[df1.id, df1.apply(func, resources=[df2], axis=1, reduce=True, types='string').rename('min_id')].execute(    libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])

在自定义函数中,将表资源通过循环读成 pandas DataFrame,利用 pandas 的 loc 可以很方便的找到最小值对应的行,从而得到距离最近的出发点 id。另外,如果在自定义函数中需要使用到三方包(例如本例中的 pandas)可以参考这篇文章。

全局变量

当小表的数据量十分小的时候,我们甚至可以将小表数据作为全局变量在自定义函数中使用。

df1 = o.get_table('coordinates1').to_df()df2 = o.get_table('coordinates2').to_df()df = df2.to_pandas()def func(x):    df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)    return df.iloc[df['dis'].idxmin()]['id']df1[df1.id, df1.apply(func, axis=1, reduce=True, types='string').rename('min_id')].execute(    libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])

在上传函数的时候,会将函数内使用到的全局变量(上面代码中的 df) pickle 到 UDF 中。但是注意这种方式使用场景很局限,因为 ODPS 的上传的文件资源大小是有限制的,所以数据量太大会导致 UDF 生成的资源太大从而无法上传,而且这种方式最好保证三方包的客户端与服务端的版本一致,否则很有可能出现序列化的问题,所以建议只在数据量非常小的时候使用。

使用 PyODPS 解决笛卡尔积的问题主要分为两种方式,一种是 mapjoin,比较直观,性能好,一般能用 mapjoin 解决的我们都推荐使用 mapjoin,并且最好使用内建函数计算,能到达最高的效率,但是它不够灵活。另一种是使用 DataFrame 自定义函数,比较灵活,性能相对差一点(可以使用 pandas 或者 numpy 获得性能上的提升),通过使用表资源,将小表作为表资源传入 DataFrame 自定义函数中,从而完成笛卡尔积的操作。

看完上述内容,你们对PyODPS DataFrame 处理笛卡尔积的几种方式分别是什么有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

函数 数据 资源 方式 笛卡尔 最小 坐标 目标 经纬 经纬度 位置 全局 内容 出发点 参数 变量 性能 方法 时候 问题 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 无限法则怎么自动连接服务器 软件开发从大公司离职去哪好 华为电视一直显示无法连接服务器 科技互联网发展报告 失落的方舟东部北美服务器 数据库删除为空数据 网络安全防范风险培训 太原市网络安全中心副主任 黑客文化与网络安全重要点 黑魂三为什么不能连接服务器 数据库怎么倒叙查看 1.12数据库 怎样下载方舟手游的服务器 数据库迁移 新旧表 重庆维普数据库的期刊 宣扬网络安全的目的 软件开发一般有哪几个阶段 最先进的网络技术有哪些 象山手机软件开发服务 计算机网络技术后悔了 中国第一台服务器是浪潮生产的吗 眼科医院数据库 我的世界rpg服务器fate 不开展网络安全等级保护等于违法 幼儿园网络安全宣传周ppt 软件开发立项包括哪些内容 深圳天泰网络技术有限公司招聘 图形网络技术 软件开发哪个公司服务 陌陌软件开发的背景
0