千家信息网

flume中如何自定义Interceptor

发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,这期内容当中小编将会给大家带来有关flume中如何自定义Interceptor,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。flume现状:这种比较个性化的转换fl
千家信息网最后更新 2024年11月26日flume中如何自定义Interceptor

这期内容当中小编将会给大家带来有关flume中如何自定义Interceptor,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

flume现状:

这种比较个性化的转换flume没有相关插件

分析:

flume event 针对source为文本文件时,会一行一个event(默认小于2048长度)

而拦截器就是针对event来做处理的

代码:

package com.wy.flume.interceptor;import com.google.common.base.Charsets;import com.google.common.collect.Lists;import java.util.HashMap;import java.util.List;import java.util.Set;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;  public class AdRefererLogFormatInterceptor implements Interceptor {    //匹配user-agent    private static final Pattern pattern = Pattern.compile("^\"(.*)\"\\s\"(.*)\"(.*)$");    private static final HashMap platform = initPlatforms();    private static final HashMap browser = initBrowsers();    private static HashMap initPlatforms() {        HashMap platforms = new HashMap<>();        platforms.put("windows nt 6.2", "Win8");        platforms.put("windows nt 6.2", "Win8");        platforms.put("windows nt 6.1", "Win7");        platforms.put("windows nt 6.0", "Win Longhorn");        platforms.put("windows nt 5.2", "Win2003");        platforms.put("windows nt 5.0", "Win2000");        platforms.put("windows nt 5.1", "WinXP");        platforms.put("windows nt 4.0", "Windows NT 4.0");        platforms.put("winnt4.0", "Windows NT 4.0");        platforms.put("winnt 4.0", "Windows NT");        platforms.put("winnt", "Windows NT");        platforms.put("windows 98", "Win98");        platforms.put("win98", "Win98");        platforms.put("windows 95", "Win95");        platforms.put("win95", "Win95");        platforms.put("windows", "Unknown Windows OS");        platforms.put("os x", "MacOS X");        platforms.put("ppc mac", "Power PC Mac");        platforms.put("freebsd", "FreeBSD");        platforms.put("ppc", "Macintosh");        platforms.put("linux", "Linux");        platforms.put("debian", "Debian");        platforms.put("sunos", "Sun Solaris");        platforms.put("beos", "BeOS");        platforms.put("apachebench", "ApacheBench");        platforms.put("aix", "AIX");        platforms.put("irix", "Irix");        platforms.put("osf", "DEC OSF");        platforms.put("hp-ux", "HP-UX");        platforms.put("netbsd", "NetBSD");        platforms.put("bsdi", "BSDi");        platforms.put("openbsd", "OpenBSD");        platforms.put("gnu", "GNU/Linux");        platforms.put("unix", "Unknown Unix OS");        return platforms;    }    private static HashMap initBrowsers() {        HashMap browsers = new HashMap<>();        browsers.put("Flock", "Flock");        browsers.put("Chrome", "Chrome");        browsers.put("Opera", "Opera");        browsers.put("MSIE", "IE");        browsers.put("Internet Explorer", "IE");        browsers.put("Shiira", "Shiira");        browsers.put("Firefox", "Firefox");        browsers.put("Chimera", "Chimera");        browsers.put("Phoenix", "Phoenix");        browsers.put("Firebird", "Firebird");        browsers.put("Camino", "Camino");        browsers.put("Netscape", "Netscape");        browsers.put("OmniWeb", "OmniWeb");        browsers.put("Safari", "Safari");        browsers.put("Mozilla", "Mozilla");        browsers.put("Konqueror", "Konqueror");        browsers.put("icab", "iCab");        browsers.put("Lynx", "Lynx");        browsers.put("Links", "Links");        browsers.put("hotjava", "HotJava");        browsers.put("amaya", "Amaya");        browsers.put("IBrowse", "IBrowse");        return browsers;    }    private AdRefererLogFormatInterceptor() {    }    @Override    public void initialize() {        // NO-OP...    }    @Override    public void close() {        // NO-OP...    }    @Override    public Event intercept(Event event) {        String body = new String(event.getBody(), Charsets.UTF_8);        String[] fields = body.split(",", 8);        StringBuilder sb = new StringBuilder();        sb.append(fields[0]);        sb.append('\t');        sb.append(fields[1]);        sb.append('\t');        sb.append(fields[2]);        sb.append('\t');        sb.append(fields[3]);        sb.append('\t');        sb.append(fields[4]);        sb.append('\t');        sb.append(fields[5]);        sb.append('\t');        sb.append(fields[6]);        sb.append('\t');        Matcher submatcher = pattern.matcher(fields[7].trim());        String url = "";        String os = "others";        String br = "others";        String ver = "";        if (submatcher.matches()) {            url = submatcher.group(1);            String agent = submatcher.group(2);            //匹配操作系统            Set platformKeys = platform.keySet();            for (String platformKey : platformKeys) {                Pattern pattern = Pattern.compile( Pattern.quote(platformKey) , Pattern.CASE_INSENSITIVE);                Matcher matcher = pattern.matcher(agent);                if (matcher.find()) {                    os = platform.get(platformKey);                    break;                }            }            //匹配浏览器 和版本            Set browserKeys = browser.keySet();            for (String browserKey : browserKeys) {                Pattern pattern = Pattern.compile( Pattern.quote(browserKey) + ".*?([0-9\\.]+)", Pattern.CASE_INSENSITIVE);                Matcher matcher = pattern.matcher(agent);                if (matcher.find()) {                    ver = matcher.group(1);                    br = browser.get(browserKey);                    break;                }            }        }        sb.append(url);        sb.append('\t');        sb.append(os);        sb.append('\t');        sb.append(br);        sb.append('\t');        sb.append(ver);        //修改event body        event.setBody(sb.toString().getBytes());        return event;    }    @Override    public List intercept(List events) {        List intercepted = Lists.newArrayListWithCapacity(events.size());        for (Event event : events) {            Event interceptedEvent = intercept(event);            if (interceptedEvent != null) {                intercepted.add(interceptedEvent);            }        }        return intercepted;    }        public static class Builder implements Interceptor.Builder {        //使用Builder初始化Interceptor        @Override        public Interceptor build() {            return new AdRefererLogFormatInterceptor();        }        @Override        public void configure(Context context) {                    }    }}

部署:

1、将程序打包成AdRerfererLogInterceptor.jar

2、将jar包上传到FLUME_HOME的lib目录下(flume1.5采用bin安装)

3、在配置文件中使用Interceptor

hdp2.sources.s1.interceptors = i1
hdp2.sources.s1.interceptors.i1.type = com.wy.flume.interceptor.AdRefererLogFormatInterceptor$Builder

优势:

在数据传输的同时进行数据的处理,节省步骤,而且有flume帮组管理文件进度,程序中断时不用手动做恢复(file channel)

总结:

在Interceptor中可以对event的header 和 body 进行处理,进而达到定制化的目的。

上述就是小编为大家分享的flume中如何自定义Interceptor了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

0