flume-ng怎么自定义拦截器
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本篇内容主要讲解"flume-ng怎么自定义拦截器",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"flume-ng怎么自定义拦截器"吧!代码如下:packa
千家信息网最后更新 2025年01月24日flume-ng怎么自定义拦截器
本篇内容主要讲解"flume-ng怎么自定义拦截器",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"flume-ng怎么自定义拦截器"吧!
代码如下:
package com.wy.flume.interceptor;import java.util.List;import java.util.Map;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.commons.lang.StringUtils;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.google.common.base.Charsets;import com.google.common.base.Preconditions;import com.google.common.base.Throwables;import com.google.common.collect.Lists;public class RegexExtractorHeaderInterceptor implements Interceptor { static final String REGEX = "regex"; static final String SERIALIZERS = "serializers"; static final String EXTRACTOR_HEADER = "extractorHeader"; static final boolean DEFAULT_EXTRACTOR_HEADER = false; static final String EXTRACTOR_HEADER_KEY = "extractorHeaderKey"; private static final Logger logger = LoggerFactory .getLogger(RegexExtractorHeaderInterceptor.class); private final Pattern regex; private final Listserializers; private final boolean extractorHeader; private final String extractorHeaderKey; private RegexExtractorHeaderInterceptor(Pattern regex, List serializers,boolean extractorHeader, String extractorHeaderKey) { this.regex = regex; this.serializers = serializers; this.extractorHeader = extractorHeader; this.extractorHeaderKey = extractorHeaderKey; } @Override public void initialize() { // NO-OP... } @Override public void close() { // NO-OP... } @Override public Event intercept(Event event) { String extractorHeaderVal; if (extractorHeader){ extractorHeaderVal = event.getHeaders().get(extractorHeaderKey); }else{ extractorHeaderVal = new String(event.getBody(),Charsets.UTF_8); } Matcher matcher = regex.matcher(extractorHeaderVal); Map headers = event.getHeaders(); if (matcher.find()) { for (int group = 0, count = matcher.groupCount(); group < count; group++) { int groupIndex = group + 1; if (groupIndex > serializers.size()) { if (logger.isDebugEnabled()) { logger.debug("Skipping group {} to {} due to missing serializer", group, count); } break; } NameAndSerializer serializer = serializers.get(group); if (logger.isDebugEnabled()) { logger.debug("Serializing {} using {}", serializer.headerName, serializer.serializer); } headers.put(serializer.headerName, serializer.serializer.serialize(matcher.group(groupIndex))); } } return event; } @Override public List intercept(List events) { List intercepted = Lists.newArrayListWithCapacity(events.size()); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { intercepted.add(interceptedEvent); } } return intercepted; } public static class Builder implements Interceptor.Builder { private Pattern regex; private List serializerList; private boolean extractorHeader; private String extractorHeaderKey; private final RegexExtractorInterceptorPassThroughSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer(); @Override public void configure(Context context) { String regexString = context.getString(REGEX); Preconditions.checkArgument(!StringUtils.isEmpty(regexString), "Must supply a valid regex string"); regex = Pattern.compile(regexString); regex.pattern(); regex.matcher("").groupCount(); configureSerializers(context); extractorHeader = context.getBoolean(EXTRACTOR_HEADER,DEFAULT_EXTRACTOR_HEADER); if (extractorHeader){ extractorHeaderKey = context.getString(EXTRACTOR_HEADER_KEY); Preconditions.checkArgument(!StringUtils.isEmpty(extractorHeaderKey),"header key must"); } } private void configureSerializers(Context context) { String serializerListStr = context.getString(SERIALIZERS); Preconditions.checkArgument(!StringUtils.isEmpty(serializerListStr), "Must supply at least one name and serializer"); String[] serializerNames = serializerListStr.split("\\s+"); Context serializerContexts = new Context(context.getSubProperties(SERIALIZERS + ".")); serializerList = Lists.newArrayListWithCapacity(serializerNames.length); for(String serializerName : serializerNames) { Context serializerContext = new Context( serializerContexts.getSubProperties(serializerName + ".")); String type = serializerContext.getString("type", "DEFAULT"); String name = serializerContext.getString("name"); Preconditions.checkArgument(!StringUtils.isEmpty(name), "Supplied name cannot be empty."); if("DEFAULT".equals(type)) { serializerList.add(new NameAndSerializer(name, defaultSerializer)); } else { serializerList.add(new NameAndSerializer(name, getCustomSerializer( type, serializerContext))); } } } private RegexExtractorInterceptorSerializer getCustomSerializer( String clazzName, Context context) { try { RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class .forName(clazzName).newInstance(); serializer.configure(context); return serializer; } catch (Exception e) { logger.error("Could not instantiate event serializer.", e); Throwables.propagate(e); } return defaultSerializer; } @Override public Interceptor build() { Preconditions.checkArgument(regex != null, "Regex pattern was misconfigured"); Preconditions.checkArgument(serializerList.size() > 0, "Must supply a valid group match id list"); return new RegexExtractorHeaderInterceptor(regex, serializerList, extractorHeader, extractorHeaderKey); } } static class NameAndSerializer { private final String headerName; private final RegexExtractorInterceptorSerializer serializer; public NameAndSerializer(String headerName, RegexExtractorInterceptorSerializer serializer) { this.headerName = headerName; this.serializer = serializer; } } }
应用配置:
hdp2.sources.s1.interceptors = i2
hdp2.sources.s1.interceptors.i2.type = com.wy.flume.interceptor.RegexExtractorHeaderInterceptor$Builder
hdp2.sources.s1.interceptors.i2.regex = ([^_]+)_(\\d{8}).*
hdp2.sources.s1.interceptors.i2.extractorHeader = true
hdp2.sources.s1.interceptors.i2.extractorHeaderKey = basename
hdp2.sources.s1.interceptors.i2.serializers = s1 s2
hdp2.sources.s1.interceptors.i2.serializers.s1.name = log_type
hdp2.sources.s1.interceptors.i2.serializers.s2.name = log_day
到此,相信大家对"flume-ng怎么自定义拦截器"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
拦截器
内容
学习
实用
更深
代码
兴趣
实用性
实际
操作简单
方法
更多
朋友
网站
频道
应用
查询
配置
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
各个数据库的值域
没有和帐号服务器连接
网络安全的重要性与措施
网站未连接服务器咋回事
软件开发维护工程师是做什么的
安全监管云数据库
黑龙江运营网络技术服务代理商
电子政务网络安全培训通知
网站网络安全处置预案
金融行业软件开发面试问题
智慧徐州软件开发
抖音服务器繁忙是什么意思
多能服务器
刀剑2 数据库修改
懂点网络技术怎么兼职
网络安全责任层层签
2u服务器风扇
广州餐厅线上点餐软件开发
数据库的字段长度在哪里
2k20服务器为什么关闭
专业网络安全解决方案
服务器如何实现数据冗余
软件开发收入的确认原则
惠普360 g10服务器开箱
克州网络技术厂家报价
华为无法连接服务器501
龙岩会员系统小程序精品软件开发
网络安全与隐私保护实验室
上古王冠有几个服务器
应用技术与网络技术区别