千家信息网

flume中hdfssink如何自定义EventSerializer序列化类

发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,这篇文章将为大家详细讲解有关flume中hdfssink如何自定义EventSerializer序列化类,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。因为之前做了h
千家信息网最后更新 2025年02月07日flume中hdfssink如何自定义EventSerializer序列化类

这篇文章将为大家详细讲解有关flume中hdfssink如何自定义EventSerializer序列化类,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

因为之前做了hbasesink的序列化类,觉得写hdfs的应该会很简单,可是没想到竟然不一样。hdfs并没有直接配置序列化类的选项需要根据fileType来选择对相应序列化类,我们使用的datastream的类型,对应的类是HDFSDataStream,这个类默认的序列化类TEXT(这是个枚举类型)

serializerType = context.getString("serializer", "TEXT");

枚举的类如下:

public enum EventSerializerType {  TEXT(BodyTextEventSerializer.Builder.class),  HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class),  AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class),  CUSTOM(CUSTOMEventSerializer.Builder.class),//自定义的序列化类  OTHER(null);  private final Class builderClass;  EventSerializerType(Class builderClass) {    this.builderClass = builderClass;  }  public Class getBuilderClass() {    return builderClass;  }}

在里面加了自定义的类型和枚举,在配置agent的时候配置好filetype和serializer即可,同样需要编译上传。

自定义的序列化类如下:

public class CUSTOMEventSerializer implements EventSerializer {        private final static Logger logger = LoggerFactory.getLogger(CUSTOMEventSerializer.class);        private final String SPLITCHAR = "\001";//列分隔符        // for legacy reasons, by default, append a newline to each event written        // out        private final String APPEND_NEWLINE = "appendNewline";        private final boolean APPEND_NEWLINE_DFLT = true;        private final OutputStream out;        private final boolean appendNewline;        private CUSTOMEventSerializer(OutputStream out, Context ctx) {                this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT);                this.out = out;        }        @Override        public boolean supportsReopen() {                return true;        }        @Override        public void afterCreate() {                // noop        }        @Override        public void afterReopen() {                // noop        }        @Override        public void beforeClose() {                // noop        }        @Override        public void write(Event e) throws IOException {                // 获取日志信息                String log = new String(e.getBody(), StandardCharsets.UTF_8);                logger.info("-----------logs-------" + log);                // headers包含日志中项目编号和host信息                Map headers = e.getHeaders();                String parsedLog = parseJson2Value(log, headers);                out.write(parsedLog.getBytes());                logger.info("-----------values-------" + parsedLog);                logger.info("-----------valueSSSSSS-------" + parsedLog.getBytes());                out.write('\n');        }        /**         *          * @Title: parseJson2Value          * @Description: 解析出json日志中的value。          * @param log json格式日志         * @param headers event头信息         * @return           * @return String 解析后的日志         * @throws         */        private String parseJson2Value(String log, Map headers) {                log.replace("\\", "/");                String time = "";                String path = "";                Object value = "";                StringBuilder values = new StringBuilder();                ObjectMapper objectMapper = new ObjectMapper();                try {                        Map m = objectMapper.readValue(log, Map.class);                        for(String key:m.keySet()){                                value = m.get(key);                                if (key.equals("uri")){                                        //解析访问路径                                        path = pasreUriToPath(value.toString());                                }                                if(key.equals("time")){                                        time = value.toString().substring(10);                                }                                values.append(value).append(this.SPLITCHAR);                        }                } catch (JsonParseException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                } catch (JsonMappingException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                } catch (IOException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                }                // 解析headers中的项目编号和服务host                String pcode = headers.get("pcode");                String host = headers.get("host");                values.append(path).append(this.SPLITCHAR).                append(pcode).append(this.SPLITCHAR).                append(host).append(this.SPLITCHAR).                append(time).append(this.SPLITCHAR);                //value字符串                return values.toString();        }        @Override        public void flush() throws IOException {                // noop        }        public static class Builder implements EventSerializer.Builder {                @Override                public EventSerializer build(Context context, OutputStream out) {                        CUSTOMEventSerializer s = new CUSTOMEventSerializer(out, context);                        return s;                }        }        /**         * 把请求uri转换成具体的访问路径         *          * @param uri 请求uri         * @return   访问路径         */        protected String pasreUriToPath(String uri){                if(uri == null || "".equals(uri.trim())){                        return uri;                }                int index = uri.indexOf("/");                if(index > -1){                        uri = uri.substring(index);                }                index = uri.indexOf("?");                if(index > -1){                        uri = uri.substring(0, index);                }                index = uri.indexOf(";");                if(index > -1){                        uri = uri.substring(0, index);                }                index = uri.indexOf(" HTTP/1.1");                if(index > -1){                        uri = uri.substring(0, index);                }                index = uri.indexOf("HTTP/1.1");                if(index > -1){                        uri = uri.substring(0, index);                }                return uri;        }}

关于"flume中hdfssink如何自定义EventSerializer序列化类"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

0