千家信息网

Flume怎么自定义Event Serializer序列化类

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要介绍"Flume怎么自定义Event Serializer序列化类",在日常操作中,相信很多人在Flume怎么自定义Event Serializer序列化类问题上存在疑惑,小编查阅了各式资
千家信息网最后更新 2025年01月23日Flume怎么自定义Event Serializer序列化类

这篇文章主要介绍"Flume怎么自定义Event Serializer序列化类",在日常操作中,相信很多人在Flume怎么自定义Event Serializer序列化类问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Flume怎么自定义Event Serializer序列化类"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

把日志从flume打到hbase中,但是我们的日志由于前期是存到MongoDb中的,所以都是Json格式的日志,这时候使用flume自带的SimpleHbaseEventSerializer和RegexHbaseEventSerializer这样的就不行了,于是开始痛苦的看源码,自己写序列化的类(这里需要注意,如果是在flume的hbasesink包下编写的代码,License信息一定要加上。就是最上面那段英文,要不然在运行的时候会报错),比较简单,编写好类之后,编译打包,传到flume的lib目录下,然后在配置agent的时候指定Serializer的类为编写的类即可。下面是代码(类注释没贴出来,见谅哈):

public class PRTMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {        private byte[] table;//hbase表        private byte[] cf;//列簇        private byte[][] payload;//列集合        private byte[][] payloadColumn;//列值        private byte[] incrementColumn;        private String rowSuffix;//roykey后缀        private String rowPrefix;//rowkey前缀        private byte[] incrementRow;        private KeyType keyType;//rowkey后缀类型         private static final Logger logger = LoggerFactory.getLogger(PRTMSAsyncHbaseEventSerializer.class);        @Override        public void configure(Context context) {                // TODO Auto-generated method stub                //设置主键后缀类型,这里使用时间戳                keyType = KeyType.TS;                if (iCol != null && !iCol.isEmpty()) {                        incrementColumn = iCol.getBytes(Charsets.UTF_8);                }                incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);        }        @Override        public void configure(ComponentConfiguration conf) {                // TODO Auto-generated method stub        }        @Override        public void initialize(byte[] table, byte[] cf) {                // TODO Auto-generated method stub                this.table = table;                this.cf = cf;        }        /**         *          * @Title: setEvent          * @Description: 获取日志信息,并解析出HBase的列以及列的value值          * @param event            * @throws          * @see org.apache.flume.sink.hbase.AsyncHbaseEventSerializer#setEvent(org.apache.flume.Event)         */        @Override        public void setEvent(Event event) {                // TODO Auto-generated method stub                //获取日志信息                String log = new String(event.getBody(), StandardCharsets.UTF_8);                //headers包含日志中项目编号和host信息                Map headers = event.getHeaders();                JsonReader jsonReader = new JsonReader(new StringReader(log));                String name = "";                String value = "";                String path = "";                Map kv = new HashMap();                try {                        //解析日志中的键值对缓存到map中                        jsonReader.beginObject();                        while (jsonReader.hasNext()) {                                name = jsonReader.nextName();                                value = jsonReader.nextString();                                if(name.equals("uri"))                                        path = value.split(" ")[1];                                kv.put(name, value);                        }                        jsonReader.endObject();                } catch (IOException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                }                //解析headers中的项目id和服务host、路径                if(path.contains("?")){                        path = path.substring(0, path.indexOf("?"));                }                String pcode = headers.get("pcode");                String host = headers.get("host");                //将项目编号和服务器host添加到map中                kv.put("pcode",pcode);                kv.put("host", host);                //初始化列和value数组                this.payloadColumn = new byte[kv.keySet().size()][];                this.payload = new byte[kv.keySet().size()][];                int i = 0;                //给hbase的列和value赋值                for (String key : kv.keySet()) {                        this.payloadColumn[i] = key.getBytes();                        this.payload[i] = kv.get(key).getBytes();                        i++;                }                //设置rowkey的前缀 格式是项目编号+路径                                this.rowSuffix = new StringBuilder(pcode).reverse().toString() + ":"+path+":"+kv.get("time");        }                @Override        public List getActions() {                // TODO Auto-generated method stub                List actions = new ArrayList();                if (payloadColumn != null) {                        byte[] rowKey;                        try {                                rowKey = rowSuffix.getBytes();                                // for 循环,提交所有列和对于数据的put请求。                                for (int i = 0; i < this.payload.length; i++) {                                        PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn[i], payload[i]);                                        actions.add(putRequest);                                }                        } catch (Exception e) {                                throw new FlumeException("Could not get row key!", e);                        }                }                return actions;        }        @Override        public List getIncrements() {                // TODO Auto-generated method stub                List actions = new ArrayList();                if (incrementColumn != null) {                        AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn);                        actions.add(inc);                }                return actions;        }        @Override        public void cleanUp() {                // TODO Auto-generated method stub        }}

到此,关于"Flume怎么自定义Event Serializer序列化类"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0