flume中hdfssink如何自定义EventSerializer序列化类
发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,这篇文章将为大家详细讲解有关flume中hdfssink如何自定义EventSerializer序列化类,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。因为之前做了h
千家信息网最后更新 2025年02月07日flume中hdfssink如何自定义EventSerializer序列化类
这篇文章将为大家详细讲解有关flume中hdfssink如何自定义EventSerializer序列化类,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
因为之前做了hbasesink的序列化类,觉得写hdfs的应该会很简单,可是没想到竟然不一样。hdfs并没有直接配置序列化类的选项需要根据fileType来选择对相应序列化类,我们使用的datastream的类型,对应的类是HDFSDataStream,这个类默认的序列化类TEXT(这是个枚举类型)
serializerType = context.getString("serializer", "TEXT");
枚举的类如下:
public enum EventSerializerType { TEXT(BodyTextEventSerializer.Builder.class), HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class), AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class), CUSTOM(CUSTOMEventSerializer.Builder.class),//自定义的序列化类 OTHER(null); private final Class extends EventSerializer.Builder> builderClass; EventSerializerType(Class extends EventSerializer.Builder> builderClass) { this.builderClass = builderClass; } public Class extends EventSerializer.Builder> getBuilderClass() { return builderClass; }}
在里面加了自定义的类型和枚举,在配置agent的时候配置好filetype和serializer即可,同样需要编译上传。
自定义的序列化类如下:
public class CUSTOMEventSerializer implements EventSerializer { private final static Logger logger = LoggerFactory.getLogger(CUSTOMEventSerializer.class); private final String SPLITCHAR = "\001";//列分隔符 // for legacy reasons, by default, append a newline to each event written // out private final String APPEND_NEWLINE = "appendNewline"; private final boolean APPEND_NEWLINE_DFLT = true; private final OutputStream out; private final boolean appendNewline; private CUSTOMEventSerializer(OutputStream out, Context ctx) { this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT); this.out = out; } @Override public boolean supportsReopen() { return true; } @Override public void afterCreate() { // noop } @Override public void afterReopen() { // noop } @Override public void beforeClose() { // noop } @Override public void write(Event e) throws IOException { // 获取日志信息 String log = new String(e.getBody(), StandardCharsets.UTF_8); logger.info("-----------logs-------" + log); // headers包含日志中项目编号和host信息 Mapheaders = e.getHeaders(); String parsedLog = parseJson2Value(log, headers); out.write(parsedLog.getBytes()); logger.info("-----------values-------" + parsedLog); logger.info("-----------valueSSSSSS-------" + parsedLog.getBytes()); out.write('\n'); } /** * * @Title: parseJson2Value * @Description: 解析出json日志中的value。 * @param log json格式日志 * @param headers event头信息 * @return * @return String 解析后的日志 * @throws */ private String parseJson2Value(String log, Map headers) { log.replace("\\", "/"); String time = ""; String path = ""; Object value = ""; StringBuilder values = new StringBuilder(); ObjectMapper objectMapper = new ObjectMapper(); try { Map m = objectMapper.readValue(log, Map.class); for(String key:m.keySet()){ value = m.get(key); if (key.equals("uri")){ //解析访问路径 path = pasreUriToPath(value.toString()); } if(key.equals("time")){ time = value.toString().substring(10); } values.append(value).append(this.SPLITCHAR); } } catch (JsonParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JsonMappingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } // 解析headers中的项目编号和服务host String pcode = headers.get("pcode"); String host = headers.get("host"); values.append(path).append(this.SPLITCHAR). append(pcode).append(this.SPLITCHAR). append(host).append(this.SPLITCHAR). append(time).append(this.SPLITCHAR); //value字符串 return values.toString(); } @Override public void flush() throws IOException { // noop } public static class Builder implements EventSerializer.Builder { @Override public EventSerializer build(Context context, OutputStream out) { CUSTOMEventSerializer s = new CUSTOMEventSerializer(out, context); return s; } } /** * 把请求uri转换成具体的访问路径 * * @param uri 请求uri * @return 访问路径 */ protected String pasreUriToPath(String uri){ if(uri == null || "".equals(uri.trim())){ return uri; } int index = uri.indexOf("/"); if(index > -1){ uri = uri.substring(index); } index = uri.indexOf("?"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(";"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(" HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf("HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } return uri; }}
关于"flume中hdfssink如何自定义EventSerializer序列化类"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
序列
日志
信息
篇文章
类型
路径
配置
更多
项目
不错
实用
没想到
内容
分隔符
字符
字符串
文章
时候
格式
知识
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
cam软件开发的相关书籍
中兴长沙软件开发
福州网络安全培训机构实战教学
江宁路街道服务器电脑回收上门
科技互联网类软文
软件开发与数据库的关系
软件开发分支与代码合并
字节跳动网络安全问题
海淀区智能网络技术诚信经营
app服务器配置要求
学校网络安全制度及领导分工
无线网络安全策略
青白江软件开发找哪家
剑桥晶体数据库网址
软件开发费 多少钱 人天
方舟维姆服务器
期刊可以收录到不同数据库中吗
网络安全可信接入
杭州青鸾网络技术有限公司电话
两个串口服务器的配置
关系型数据库未来发展技术
国泰安数字经济研究数据库
网络技术研发和标准
服务器主板没有声卡
织梦栏目这哪个数据库
软件开发项目都有什么工作
计算机国家网络安全
网站网络安全工作责任书
新版mimic数据库
文明重启什么服务器不能抄家