千家信息网

spark怎么编写udaf函数求中位数

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本篇内容主要讲解"spark怎么编写udaf函数求中位数",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"spark怎么编写udaf函数求中位数"吧!pack
千家信息网最后更新 2025年01月24日spark怎么编写udaf函数求中位数

本篇内容主要讲解"spark怎么编写udaf函数求中位数",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"spark怎么编写udaf函数求中位数"吧!

package com.frank.sparktest.java;import org.apache.spark.sql.Row;import org.apache.spark.sql.expressions.MutableAggregationBuffer;import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;import org.apache.spark.sql.types.DataType;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.Arrays;import java.util.Collections;import java.util.List;public class MedianUdaf extends UserDefinedAggregateFunction {    private StructType inputSchema;    private StructType bufferSchema;    public MedianUdaf(){        List inputFields = new ArrayList<>();        inputFields.add(DataTypes.createStructField("nums",DataTypes.IntegerType,true));        inputSchema=DataTypes.createStructType(inputFields);        List bufferFields = new ArrayList<>();        bufferFields.add(DataTypes.createStructField("datas",DataTypes.StringType,true));        bufferSchema=DataTypes.createStructType(bufferFields);    }    @Override    public StructType inputSchema() {        return inputSchema;    }    @Override    public StructType bufferSchema() {        return bufferSchema;    }    @Override    public DataType dataType() {        return DataTypes.DoubleType;    }    @Override    public boolean deterministic() {        return true;    }    @Override    public void initialize(MutableAggregationBuffer buffer) {        buffer.update(0,0);        buffer.update(1,0);    }    @Override    public void update(MutableAggregationBuffer buffer, Row input) {        if (!input.isNullAt(0)){            buffer.update(0,buffer.getString(0)+","+input.getInt(0));        }    }    @Override    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {        buffer1.update(0,buffer1.getString(0)+","+buffer2.getInt(0));    }    @Override    public Object evaluate(Row buffer) {        List list = new ArrayList();        List stringList = Arrays.asList(buffer.getString(0).split(","));        for (String s : stringList){            list.add(Integer.valueOf(s));        }        Collections.sort(list);        int size = list.size();        int num=0;        if(size % 2 == 1) {            num = list.get((size / 2)+1);        }        if(size %2  == 0) {            num = (list.get(size / 2)+list.get((size / 2)+1))/2;        }        return num;    }}

上面是代码段,可以直接拿来使用

下面是测试程序

package com.frank.sparktest.java;import org.apache.spark.sql.SQLContext;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import java.io.IOException;import java.util.stream.IntStream;public class DemoUDAF {    public static void main(String[] args) throws IOException {        SQLContext sqlContext = SparkSession.builder().master("local").getOrCreate().sqlContext();        sqlContext.udf().register("generate", (Integer start, Integer end)-> IntStream.range(start, end+1).boxed().toArray(), DataTypes.createArrayType(DataTypes.IntegerType));        sqlContext.udf().register("media",new MedianUdaf());        sqlContext.sql("select generate(1,10)").show();    }}

到此,相信大家对"spark怎么编写udaf函数求中位数"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0