千家信息网

Hive 混合函数 UDTF UDF UDAF详解

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,混合函数可以使用java中的方法java_method(class,method[,arg1[,arg2...]])或者reflectHive版本1.2.1UDTF 用户定义表函数(表函数)一行变成多
千家信息网最后更新 2025年02月01日Hive 混合函数 UDTF UDF UDAF详解

混合函数可以使用java中的方法java_method(class,method[,arg1[,arg2...]])或者reflect

Hive版本1.2.1

UDTF 用户定义表函数(表函数)一行变成多行配合lateral view

hive的Lateral view

http://blog.sina.com.cn/s/blog_7e04e0d00101csic.html


UDF 重写evaluate方法 Map端

import org.apache.hadoop.hive.ql.exec.UDF;import org.apache.hadoop.io.Text;public class udftest extends UDF{    public boolean evaluate(Text t1,Text t2){        if(t1==null||t2==null){            return false;        }        double d1=Double.parseDouble(t1.toString());        double d2=Double.parseDouble(t2.toString());        if(d1>d2){            return true;        }else{                        return false;        }        }}


函数打包成function.jar

hive命令行

add jar /home/jar/function.jar  //jar包进入分布式缓存create temporary function bigthan as 'com.peixun.udf.udftest'//执行创建模版函数bigthan


UDAF (user defined aggregation function)用户自定义聚合函数





自定义UDAF统计b字段大于30的记录个数 countbigthan(b,30)实现代码

import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.parse.SemanticException;import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;import org.apache.hadoop.io.LongWritable;//继承类型检查类public class udaftest extends AbstractGenericUDAFResolver {    // 参数个数判断    @Override    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)            throws SemanticException {        if (parameters.length != 2) {            throw new UDFArgumentTypeException(parameters.length - 1,                    "Exactly two argument is expected");        }        return new GenericUDAFCountBigThanEvaluator();// 返回处理逻辑类    }    // 处理逻辑类    public static class GenericUDAFCountBigThanEvaluator extends            GenericUDAFEvaluator {        private LongWritable result;        private PrimitiveObjectInspector inputOI1;        private PrimitiveObjectInspector inputOI2;        // init方法map,reduce阶段都得执行        // map阶段parameters长度与UDAF输入的参数个数有关        // reduce阶段,parameters长度为1        @Override        public ObjectInspector init(Mode m, ObjectInspector[] parameters)                throws HiveException {            result = new LongWritable(0);            inputOI1 = (PrimitiveObjectInspector) parameters[0];            if (parameters.length > 1) {                inputOI2 = (PrimitiveObjectInspector) parameters[1];            }            return PrimitiveObjectInspectorFactory.writableLongObjectInspector;            // 最终结果返回类型        }        @Override        public AggregationBuffer getNewAggregationBuffer() throws HiveException {            CountAgg agg = new CountAgg();// 存放部分聚合值            reset(agg);            return agg;        }        // 缓存对象初始化        @Override        public void reset(AggregationBuffer agg) throws HiveException {            CountAgg countagg = (CountAgg) agg;            countagg.count = 0;        }        // 具体逻辑        // iterate只在map端运算        @Override        public void iterate(AggregationBuffer agg, Object[] parameters)                throws HiveException {            assert (parameters.length == 2);            if (parameters == null || parameters[0] == null                    || parameters[1] == null) {                return;            }            double base = PrimitiveObjectInspectorUtils.getDouble(                    parameters[0], inputOI1);            double tmp = PrimitiveObjectInspectorUtils.getDouble(parameters[1],                    inputOI2);            if (base > tmp) {                ((CountAgg) agg).count++;            }        }        // map阶段返回部分结果        @Override        public Object terminatePartial(AggregationBuffer agg)                throws HiveException {            result.set(((CountAgg) agg).count);            return result;        }        // 合并部分结果 map(含有Combiner)和reduce都执行,parial传递terminatePartial得到的部分结果        @Override        public void merge(AggregationBuffer agg, Object partial)                throws HiveException {            if (partial != null) {                long p = PrimitiveObjectInspectorUtils.getLong(partial,                        inputOI1);                ((CountAgg) agg).count += p;            }        }        @Override        public Object terminate(AggregationBuffer agg) throws HiveException {            result.set(((CountAgg) agg).count);            return result;        }        public class CountAgg implements AggregationBuffer {            long count;        }    }}


hive注册永久函数三种方法

hive shell每次启动都会默认执行$HOME/.hiverc文件

0