千家信息网

Spark 3.0的新功能是什么呢

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,今天就跟大家聊聊有关Spark 3.0的新功能是什么呢,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。最近,Apache Spark社区发布了S
千家信息网最后更新 2025年01月23日Spark 3.0的新功能是什么呢

今天就跟大家聊聊有关Spark 3.0的新功能是什么呢,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

最近,Apache Spark社区发布了Spark 3.0的预览版,该预览版包含许多重要的新功能,这些功能将帮助Spark创造强大的影响力,在此大数据和数据科学时代,该产品已拥有广泛的企业用户和开发人员。

在新版本中,Spark社区已将一些功能从Spark SQL移植到了编程的Scala API(

org.apache.spark.sql.functions),这鼓励开发人员直接将此功能用作其DataFrame转换的一部分,而不是直接输入 进入SQL模式或创建视图,并使用此函数以及SQL表达式或callUDF函数。

社区还辛苦地引入了一些新的数据转换功能和partition_transforms函数,这些功能在与Spark的新DataFrameWriterv2一起使用以将数据写到某些外部存储时非常有用。

Spark 3中的一些新功能已经是Databricks Spark以前版本的一部分。 因此,如果您在Databricks云中工作,您可能会发现其中一些熟悉的功能。

下面介绍了Spark SQL和Scala API中用于DataFrame操作访问的Spark新功能,以及从Spark SQL移植到Scala API以进行编程访问的功能。

Spark SQL中的Spark 3.0中引入的功能以及用于DataFrame转换的功能

from_csv

像from_json一样,此函数解析包含CSV字符串的列,并将其转换为Struct类型。 如果CSV字符串不可解析,则将返回null。

例:

  • 该函数需要一个Struct模式和一些选项,这些模式和选项指示如何解析CSV字符串。 选项与CSV数据源相同。

ss="dp-sql">ss="alt">val studentInfo = ss="string">"1,Jerin,CSE"::ss="string">"2,Jerlin,ECE"::ss="string">"3,Arun,CSE"::Nil ss="">val ss="keyword">schema = new StructType()  ss="alt">            .ss="keyword">add(ss="string">"Id",IntegerType) ss="">            .ss="keyword">add(ss="string">"Name",StringType) ss="alt">            .ss="keyword">add(ss="string">"Dept",StringType) ss="">val options = Map(ss="string">"delimiter" ->ss="string">",") ss="alt">val studentDF = studentInfo.toDF(ss="string">"Student_Info") ss="">.withColumn(ss="string">"csv_struct",from_csv('Student_Info, ss="keyword">schema,options)) ss="alt">studentDF.show()

to_csv

要将"结构类型"列转换为CSV字符串。

例:

  • 与Struct type列一起,此函数还接受可选的options参数,该参数指示如何将Struct列转换为CSV字符串。

ss="dp-sql">ss="alt">studentDF ss="">.withColumn(ss="string">"csv_string",to_csv($ss="string">"csv_struct",Map.empty[String, String].asJava)) ss="alt">.show

推断CSV字符串的模式,并以DDL格式返回模式。

例:

  • 该函数需要一个CSV字符串列和一个可选参数,其中包含如何解析CSV字符串的选项。

ss="dp-sql">ss="alt">studentDF ss="">  .withColumn(ss="string">"schema",schema_of_csv(ss="string">"csv_string")) ss="alt">  .show

for_all

将给定谓词应用于数组中的所有元素,并且仅当数组中的所有元素求值为true时返回true,否则返回false。

例:

  • 检查给定Array列中的所有元素是否均是偶数。

ss="dp-sql">ss="alt">val  df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"int_array") ss="">df.withColumn(ss="string">"flag",forall($ss="string">"int_array",(x:ss="keyword">Column)=>(lit(x%2==0)))) ss="alt">.show

transform

将函数应用于数组中的所有元素后,返回一个新数组。

例:

  • 将" 1"添加到数组中的所有元素。

ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6)),(Seq(5,10,3))).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"num_array",transform($ss="string">"num_array",x=>x+1)).show

overlay

要替换列的内容,请使用从指定字节位置到可选的指定字节长度的实际替换内容。

例:

  • 将特定人员的问候语更改为传统的" Hello World"

这里我们用世界替换人名,因为名字的起始位置是7,并且我们要在替换内容之前删除完整的姓名,需要删除的字节位置的长度应大于或等于最大值 列中名称的长度。

因此,我们将替换词传递为"world",将内容替换为" 7"的特定起始位置,从指定起始位置移除的位置数为" 12"(如果未指定,则该位置是可选的 函数只会从指定的起始位置将源内容替换为替换内容)。

覆盖替换了StringType,TimeStampType,IntegerType等中的内容。但是Column的返回类型将始终为StringType,而与Column输入类型无关。

ss="dp-sql">ss="alt">val greetingMsg = ss="string">"Hello Arun"::ss="string">"Hello Mohit Chawla"::ss="string">"Hello Shaurya"::Nil ss="">val greetingDF = greetingMsg.toDF(ss="string">"greet_msg") ss="alt">greetingDF.withColumn(ss="string">"greet_msg",overlay($ss="string">"greet_msg",lit(ss="string">"World"),lit(ss="string">"7"),lit(ss="string">"12"))) ss="">.show

分裂

根据给定的正则表达式和指定的限制拆分字符串表达式,该限制指示将正则表达式应用于给定的字符串表达式的次数。

如果指定的限制小于或等于零,则正则表达式将在字符串上应用多次,并且结果数组将根据给定的正则表达式包含所有可能的字符串拆分。

如果指定的限制大于零,则将使用不超过该限制的正则表达式

例:

  • 根据正则表达式将给定的字符串表达式拆分为两个。 即 字符串定界符。

ss="dp-sql">ss="alt">val num = ss="string">"one~two~three"::ss="string">"four~five"::Nil ss="">val numDF = num.toDF(ss="string">"numbers") ss="alt">numDF ss="">.withColumn(ss="string">"numbers",split($ss="string">"numbers",ss="string">"~",2)) ss="alt">.show

将同一个字符串表达式分成多个部分,直到出现分隔符

ss="dp-sql">ss="alt">numDF ss="">.withColumn(ss="string">"numbers",split($ss="string">"numbers",ss="string">"~",0)) ss="alt">.show

map_entries

将映射键值转换为无序的条目数组。

例:

  • 获取数组中Map的所有键和值。

ss="dp-sql">ss="alt">val df = Seq(Map(1->ss="string">"x",2->ss="string">"y")).toDF(ss="string">"key_values") ss="">df.withColumn(ss="string">"key_value_array",map_entries($ss="string">"key_values")) ss="alt">.show

map_zip_with

使用功能根据键将两个Map合并为一个。

例:

  • 要计算跨部门员工的总销售额,并通过传递一个函数,该函数将基于键汇总来自两个不同"地图"列的总销售额,从而在单个地图中获取特定员工的总销售额。

ss="dp-sql">ss="alt">val df = Seq((Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000), ss="">             Map(ss="string">"EID_1"->1000,ss="string">"EID_2"->2500)))   .toDF(ss="string">"emp_sales_dept1",ss="string">"emp_sales_dept2") ss="alt"> ss="">df. ss="alt">withColumn(ss="string">"total_emp_sales",map_zip_with($ss="string">"emp_sales_dept1",$ss="string">"emp_sales_dept2",(k,v1,v2)=>(v1+v2))) ss="">.show

map_filter

返回仅包含满足给定谓词功能的Map值的新键值对。

例:

  • 仅筛选出销售值高于20000的员工

ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000)) ss="">          .toDF(ss="string">"emp_sales") ss="alt"> ss="">df ss="alt">.withColumn(ss="string">"filtered_sales",map_filter($ss="string">"emp_sales",(k,v)=>(v>20000))) ss="">.show

transform_values

根据给定的函数操作Map列中所有元素的值。

例:

  • 通过给每个雇员加薪5000来计算雇员薪水

ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000)) ss="">         .toDF(ss="string">"emp_salary") ss="alt"> ss="">df ss="alt">.withColumn(ss="string">"emp_salary",transform_values($ss="string">"emp_salary",(k,v)=>(v+5000))) ss="">.show

transform_keys

根据给定的函数操作Map列中所有元素的键。

例:

  • 要将公司名称" XYZ"添加到员工编号。

ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1" -> 10000, ss="string">"EID_2" -> 25000)) ss="">        .toDF(ss="string">"employees") ss="alt">df ss="">.withColumn(ss="string">"employees", transform_keys($ss="string">"employees", (k, v) => concat(k,lit(ss="string">"_XYZ")))) ss="alt">.show

xhash74

要计算给定列内容的哈希码,请使用64位xxhash算法并将结果返回为long。

从Spark SQL移植到Spark 3.0中的Scala API进行DataFrame转换的功能

Scala API可使用大多数Spark SQL函数,该函数可将相同的函数用作DataFrame操作的一部分。 但是仍然有一些功能不能作为编程功能使用。 要使用这些功能,必须进入Spark SQL模式并将这些功能用作SQL表达式的一部分,或使用Spark" callUDF"功能使用相同的功能。 随着功能的普及和使用不断发展,这些功能中的某些功能过去曾被移植到新版本的程序化Spark API中。 以下是从以前版本的Spark SQL函数移植到Scala API(

org.spark.apache.sql.functions)的函数

date_sub

从日期,时间戳记和字符串数据类型中减去天数。 如果数据类型为字符串,则其格式应可转换为日期" yyyy-MM-dd"或" yyyy-MM-dd HH:mm:ss.ssss"

例:

  • 从eventDateTime中减去" 1天"。

如果要减去的天数为负,则此功能会将给定的天数添加到实际日期中。

ss="dp-sql">ss="alt">var df = Seq( ss="">        (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt">        (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss="">        (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt">        (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss="">         ) ss="alt">     .toDF(ss="string">"typeId",ss="string">"eventDateTime") ss=""> ss="alt"> df.withColumn(ss="string">"Adjusted_Date",date_sub($ss="string">"eventDateTime",1)).show()

date_add

与date_sub相同,但是将天数添加到实际天数中。

例:

  • 将" 1天"添加到eventDateTime

ss="dp-sql">ss="alt">var df = Seq( ss="">         (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt">         (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss="">         (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt">         (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss="">         ) ss="alt">    .toDF(ss="string">"Id",ss="string">"eventDateTime") ss="">df ss="alt">.withColumn(ss="string">"Adjusted Date",date_add($ss="string">"eventDateTime",1)) ss="">.show()

months_add

像date_add和date_sub一样,此功能有助于添加月份。

减去月份,将要减去的月份数设为负数,因为没有单独的减去函数用于减去月份

例:

  • 从eventDateTime添加和减去一个月。

ss="dp-sql">ss="alt">var df = Seq( ss="">    (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt">    (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss="">    (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt">    (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss="">     ).toDF(ss="string">"typeId",ss="string">"eventDateTime") ss="alt">//ss="keyword">To ss="keyword">add one months ss=""> df ss="alt">.withColumn(ss="string">"Adjusted Date",add_months($ss="string">"eventDateTime",1)) ss="">.show() ss="alt">//ss="keyword">To subtract one months ss="">df ss="alt">.withColumn(ss="string">"Adjusted Date",add_months($ss="string">"eventDateTime",-1)) ss="">.show()

zip_with

通过应用函数合并左右数组。

此函数期望两个数组的长度都相同,如果其中一个数组比另一个数组短,则将添加null以匹配更长的数组长度。

例:

  • 将两个数组的内容相加并合并为一个

ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6),Seq(5,10,3))) ss="">         .toDF(ss="string">"array_1",ss="string">"array_2") ss="alt">   ss=""> df ss="alt">.withColumn(ss="string">"merged_array",zip_with($ss="string">"array_1",$ss="string">"array_2",(x,y)=>(x+y))) ss=""> .show

将谓词应用于所有元素,并检查数组中的至少一个或多个元素是否对谓词函数成立。

例:

  • 检查数组中至少一个元素是否为偶数。

ss="dp-sql">ss="alt">val df= Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"flag",exists($ss="string">"num_array", x =>lit(x%2===0))) ss="alt">.show

过滤

将给定谓词应用于数组中的所有元素,并过滤掉谓词为true的元素。

例:

  • 仅过滤掉数组中的偶数元素。

ss="dp-sql">ss="alt">val df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"even_array",filter($ss="string">"num_array", x =>lit(x%2===0))) ss="alt">.show

聚合 aggregate

使用给定函数将给定数组和另一个值/状态简化为单个值,并应用可选的finish函数将缩减后的值转换为另一个状态/值。

例:

  • 将10加到数组的总和并将结果乘以2

ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6),3),(Seq(5,10,3),8)) ss="">  .toDF(ss="string">"num_array",ss="string">"constant") ss="alt">df.withColumn(ss="string">"reduced_array",aggregate($ss="string">"num_array", $ss="string">"constant",(x,y)=>x+y,x => x*2)) ss="">  .show

Spark 3.0中为Spark SQL模式引入的功能

以下是新的SQL函数,您只能在Spark SQL模式下才能使用它们。

acosh

查找给定表达式的双曲余弦的倒数。

asinh

找出给定表达式的双曲正弦的逆。

atanh

查找给定表达式的双曲正切的逆。

bit_and,bit_or和bit_xor

计算按位AND,OR和XOR值

bit_count

返回计数的位数。

bool_and和bool_or

验证表达式的所有值是否为真或验证表达式中的至少一个为真。

count_if

返回一列中的真值数量

例:

  • 找出给定列中的偶数值

ss="dp-sql">ss="alt">var df = Seq((1),(2),(4)).toDF(ss="string">"num") ss=""> ss="alt"> df.createOrReplaceTempView(ss="string">"table") ss="">spark.sql(ss="string">"select count_if(num %2==0) from table").show

date_part

提取日期/时间戳的一部分,例如小时,分钟等…

div

用于将表达式或带有另一个表达式/列的列分开

every 和 sum

如果给定的表达式对每个列的所有列值都求值为true,并且至少一个值对某些值求得true,则此函数返回true。

make_date,make_interval和make_timestamp

构造日期,时间戳和特定间隔。

例:

ss="dp-sql">ss="alt">ss="keyword">SELECT make_timestamp(2020, 01, 7, 30, 45.887)

max_by和min_by

比较两列并返回与右列的最大值/最小值关联的左列的值

例:

ss="dp-sql">ss="alt">var df = Seq((1,1),(2,1),(4,3)).toDF(ss="string">"x",ss="string">"y") ss=""> ss="alt"> df.createOrReplaceTempView(ss="string">"table") ss="">spark.sql(ss="string">"select max_by(x,y) from table").show

类型

返回列值的数据类型

返回Spark版本及其git版本

justify_days,justify_hours和justify_interval

新引入的对齐功能用于调整时间间隔。

例:

  • 表示30天为一个月,

ss="dp-sql">ss="alt">ss="keyword">SELECT justify_days(interval ss="string">'30 day')

分区转换功能

从Spark 3.0及更高版本开始,存在一些新功能,这些功能有助于对数据进行分区,我将在另一篇文章中介绍。

总体而言,我们已经分析了所有数据转换和分析功能,这些功能是3.0版本中产生的火花。 希望本指南有助于您了解这些新功能。 这些功能肯定会加速火花开发工作,并有助于建立坚固有效的火花管道。

看完上述内容,你们对Spark 3.0的新功能是什么呢有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

功能 函数 数组 表达式 字符 字符串 元素 内容 数据 位置 模式 类型 应用 新功能 正则 版本 谓词 相同 两个 天数 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 硬件视频会议软件开发 vac安全服务器启动项验证 爱如生古籍数据库如何使用 网络安全法的意义是什么 江西省网络安全规定 运动员成绩管理系统数据库报告 网络安全中分段攻击的意思 恒捷互联网科技有限公司招聘 北京助友高科网络技术 公安网络安全工作思考 山东华为服务器管理软件 江苏塞普网络技术公司 数据存储服务器和云存储服务器 长沙ipfs存储服务器 电脑怎么设置无线网络安全 猎人导师修服务器 有线网采用什么网络技术 杭州net软件开发报价 江苏计算机软件开发公司 数据库技术在计算机中的地位 网络技术应用 目录 汉阳靠谱的软件开发中心 如何搭建网页服务器 当前网络安全工作存在的主要问题 数据库动态端口改静态 学生网络安全责任谁负责 软件开发中需要注意的问题 杭州联告网络技术有限公司 株洲java编程软件开发 ps5极限国度服务器
0