Flume怎么自定义Event Serializer序列化类
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要介绍"Flume怎么自定义Event Serializer序列化类",在日常操作中,相信很多人在Flume怎么自定义Event Serializer序列化类问题上存在疑惑,小编查阅了各式资
千家信息网最后更新 2025年01月23日Flume怎么自定义Event Serializer序列化类
这篇文章主要介绍"Flume怎么自定义Event Serializer序列化类",在日常操作中,相信很多人在Flume怎么自定义Event Serializer序列化类问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Flume怎么自定义Event Serializer序列化类"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
把日志从flume打到hbase中,但是我们的日志由于前期是存到MongoDb中的,所以都是Json格式的日志,这时候使用flume自带的SimpleHbaseEventSerializer和RegexHbaseEventSerializer这样的就不行了,于是开始痛苦的看源码,自己写序列化的类(这里需要注意,如果是在flume的hbasesink包下编写的代码,License信息一定要加上。就是最上面那段英文,要不然在运行的时候会报错),比较简单,编写好类之后,编译打包,传到flume的lib目录下,然后在配置agent的时候指定Serializer的类为编写的类即可。下面是代码(类注释没贴出来,见谅哈):
public class PRTMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { private byte[] table;//hbase表 private byte[] cf;//列簇 private byte[][] payload;//列集合 private byte[][] payloadColumn;//列值 private byte[] incrementColumn; private String rowSuffix;//roykey后缀 private String rowPrefix;//rowkey前缀 private byte[] incrementRow; private KeyType keyType;//rowkey后缀类型 private static final Logger logger = LoggerFactory.getLogger(PRTMSAsyncHbaseEventSerializer.class); @Override public void configure(Context context) { // TODO Auto-generated method stub //设置主键后缀类型,这里使用时间戳 keyType = KeyType.TS; if (iCol != null && !iCol.isEmpty()) { incrementColumn = iCol.getBytes(Charsets.UTF_8); } incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); } @Override public void configure(ComponentConfiguration conf) { // TODO Auto-generated method stub } @Override public void initialize(byte[] table, byte[] cf) { // TODO Auto-generated method stub this.table = table; this.cf = cf; } /** * * @Title: setEvent * @Description: 获取日志信息,并解析出HBase的列以及列的value值 * @param event * @throws * @see org.apache.flume.sink.hbase.AsyncHbaseEventSerializer#setEvent(org.apache.flume.Event) */ @Override public void setEvent(Event event) { // TODO Auto-generated method stub //获取日志信息 String log = new String(event.getBody(), StandardCharsets.UTF_8); //headers包含日志中项目编号和host信息 Mapheaders = event.getHeaders(); JsonReader jsonReader = new JsonReader(new StringReader(log)); String name = ""; String value = ""; String path = ""; Map kv = new HashMap (); try { //解析日志中的键值对缓存到map中 jsonReader.beginObject(); while (jsonReader.hasNext()) { name = jsonReader.nextName(); value = jsonReader.nextString(); if(name.equals("uri")) path = value.split(" ")[1]; kv.put(name, value); } jsonReader.endObject(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } //解析headers中的项目id和服务host、路径 if(path.contains("?")){ path = path.substring(0, path.indexOf("?")); } String pcode = headers.get("pcode"); String host = headers.get("host"); //将项目编号和服务器host添加到map中 kv.put("pcode",pcode); kv.put("host", host); //初始化列和value数组 this.payloadColumn = new byte[kv.keySet().size()][]; this.payload = new byte[kv.keySet().size()][]; int i = 0; //给hbase的列和value赋值 for (String key : kv.keySet()) { this.payloadColumn[i] = key.getBytes(); this.payload[i] = kv.get(key).getBytes(); i++; } //设置rowkey的前缀 格式是项目编号+路径 this.rowSuffix = new StringBuilder(pcode).reverse().toString() + ":"+path+":"+kv.get("time"); } @Override public List getActions() { // TODO Auto-generated method stub List actions = new ArrayList (); if (payloadColumn != null) { byte[] rowKey; try { rowKey = rowSuffix.getBytes(); // for 循环,提交所有列和对于数据的put请求。 for (int i = 0; i < this.payload.length; i++) { PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn[i], payload[i]); actions.add(putRequest); } } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; } @Override public List getIncrements() { // TODO Auto-generated method stub List actions = new ArrayList (); if (incrementColumn != null) { AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn); actions.add(inc); } return actions; } @Override public void cleanUp() { // TODO Auto-generated method stub }}
到此,关于"Flume怎么自定义Event Serializer序列化类"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
日志
序列
信息
项目
学习
后缀
代码
前缀
时候
更多
格式
类型
路径
帮助
服务
不行
实用
痛苦
接下来
下编
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库00955
网络安全生物安全总体国家安全观
网络安全有多火
网络技术工程师是什么证书
空间数据库的建库规范
搭建一个ftp服务器
eve架设服务器
三级网络技术dhcp参数
公司电脑与服务器怎样联接
山西便民平台软件开发
渔乐服务器
数据库管理软件 知乎
软件开发最基本
数据库 link
几百个数据库
无锡加工软件开发价格实惠
美团数据库技术
服务器密码管理办法
甘肃专业网络技术服务优势
保定快运客软件开发公司
软件开发开票的综合税率
网络地址和服务器地址一样吗
幼儿网络安全拍手歌
工业园区管理软件开发诚信合作
百度移信网络技术公司
数据库 link
架设aaa服务器
广州市互联网科技公司
查数据库配置
全球影视链软件开发