spark-sql 自定义函数
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,(1)自定义UDFobject SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Lo
千家信息网最后更新 2025年01月24日spark-sql 自定义函数
(1)自定义UDF
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext /** * 注册定义的UDF: * 这里的泛型[Int,String] * 第一个是返回值类型,后面可以是一个或者多个,是方法参数类型 */ sqlContext.udf.register[Int,String]("strLen",strLen) val sql= """ |select strLen("zhangsan") """.stripMargin spark.sql(sql).show() } //自定义UDF方法 def strLen(str:String):Integer={ str.length }}
(2) 自定义UDAF
这里举的例子是实现一个count:
自定义UDAF类:
class MyCountUDAF extends UserDefinedAggregateFunction{ //该UDAF输入的数据类型 override def inputSchema: StructType = { StructType(List( StructField("age",DataTypes.IntegerType) )) } //在该UDAF中聚合的数据类型 override def bufferSchema: StructType = { StructType(List( StructField("age",DataTypes.IntegerType) )) } //该UDAF输出的数据类型 override def dataType: DataType = DataTypes.IntegerType //确定性判断,通常特定输入和输出的类型一致 override def deterministic: Boolean = true //buffer:计算过程中临时的存储了聚合结果的Buffer override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer.update(0,0) } /** * 分区内的数据聚合合并 * @param buffer:就是我们在initialize方法中声明初始化的临时缓冲区 * @param input:聚合操作新传入的值 */ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val oldValue=buffer.getInt(0) buffer.update(0,oldValue+1) } /** * 分区间的聚合 * @param buffer1:分区一聚合的临时结果 * @param buffer2;分区二聚合的临时结果 */ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val p1=buffer1.getInt(0) val p2=buffer2.getInt(0) buffer1.update(0,p1+p2) } //该聚合函数最终输出的值 override def evaluate(buffer: Row): Any = { buffer.get(0) }}
调用:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //注册UDAF sqlContext.udf.register("myCount",new MyCountUDAF()) val stuList = List( new Student("委xx", 18), new Student("吴xx", 18), new Student("戚xx", 18), new Student("王xx", 19), new Student("薛xx", 19) ) import spark.implicits._ val stuDS: Dataset[Student] = sqlContext.createDataset(stuList) stuDS.createTempView("student") val sql= """ |select myCount(1) counts |from student |group by age |order by counts """.stripMargin spark.sql(sql).show() }}case class Student(name:String,age:Int)
类型
数据
方法
结果
输出
入口
对象
日志
编程
输入
函数
一致
例子
参数
多个
就是
新传
确定性
缓冲区
过程
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全法明确禁止
软件开发越老越吃香吗
网络安全作文200字五年级
北京网络安全大会视频
无网络连接的网络安全攻防战
万方数据库有关键词字段吗
安徽高速哪个服务器有肯德基
数据库中表锁住了如何解开
内网穿透数据库速度
在数据库中存在一张表
互联网公司怎么防止删掉数据库
java连接数据库实体类
陕西管理平台软件开发
贵州数据库
国家网络安全比赛获奖
服务器的负荷
服务器的环境
商用数据库多少钱一套
怎么应对网络安全人才短缺
北京市海淀区网络安全保卫大队
网络技术应用是一本什么样的书
软件开发过程记录表
互联网科技在什么时候开发
用友u8如何进入数据库
数据库四字歌名的查询
阿里云服务器放多个网站
深圳人工智能软件开发哪家可靠
中华人民共和国宪法网络安全法
数据库怎么找主外键
监控服务器地址错误怎么回事