Flume接入Hive数仓的搭建流程
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章主要讲解了"Flume接入Hive数仓的搭建流程",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flume接入Hive数仓的搭建流程"吧!实时流
千家信息网最后更新 2025年02月02日Flume接入Hive数仓的搭建流程
这篇文章主要讲解了"Flume接入Hive数仓的搭建流程",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flume接入Hive数仓的搭建流程"吧!
实时流接入数仓,基本在大公司都会有,在Flume1.8以后支持taildir source, 其有以下几个特点,而被广泛使用:
使用正则表达式匹配目录中的文件名
监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink
高可靠,不会丢失数据
不会对跟踪文件有任何处理,不会重命名也不会删除
不支持Windows,不能读二进制文件。支持按行读取文本文件
本文以开源Flume流为例,介绍流接入HDFS ,后面在其上面建立ods层外表。
1.1 taildir source配置
a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log
1.2 hdfs sink 配置
a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 100 # 使用本地时间 a1.sinks.k1.hdfs.useLocalTimeStamp = true
1.3 Agent的配置
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 1000 # 使用本地时间 a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
/opt/hoult/servers/conf/flume-log2hdfs.conf
1.4 启动
flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote" # 要想使配置文件生效,还要在命令行中指定配置文件目录 flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console
要$FLUME_HOME/conf/flume-env.sh加下面的参数,否则会报错误如下:
1.5 使用自定义拦截器解决Flume Agent替换本地时间为日志里面的时间戳
使用netcat source → logger sink来测试
# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = linux121 a1.sources.r1.port = 9999 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # sink a1.sinks.k1.type = logger # source、channel、sink之间的关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
拦截器主要代码如下:
public class CustomerInterceptor implements Interceptor { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd"); @Override public void initialize() { } @Override public Event intercept(Event event) { // 获得body的内容 String eventBody = new String(event.getBody(), Charsets.UTF_8); // 获取header的内容 MapheaderMap = event.getHeaders(); final String[] bodyArr = eventBody.split("\\s+"); try { String jsonStr = bodyArr[6]; if (Strings.isNullOrEmpty(jsonStr)) { return null; } // 将 string 转成 json 对象 JSONObject jsonObject = JSON.parseObject(jsonStr); String timestampStr = jsonObject.getString("time"); //将timestamp 转为时间日期类型(格式 :yyyyMMdd) long timeStamp = Long.valueOf(timestampStr); String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault())); headerMap.put("logtime", date); event.setHeaders(headerMap); } catch (Exception e) { headerMap.put("logtime", "unknown"); event.setHeaders(headerMap); } return event; } @Override public List intercept(List events) { List out = new ArrayList<>(); for (Event event : events) { Event outEvent = intercept(event); if (outEvent != null) { out.add(outEvent); } } return out; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new CustomerInterceptor(); } @Override public void configure(Context context) { } }
启动
flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console ## 测试 telnet linux121 9999
感谢各位的阅读,以上就是"Flume接入Hive数仓的搭建流程"的内容了,经过本文的学习后,相信大家对Flume接入Hive数仓的搭建流程这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
文件
配置
接入
时间
流程
内容
学习
支持
个数
名称
大小
数据
方式
目录
拦截器
测试
中指
之间
二进制
代码
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
sql数据库关系图重要吗
服务器上的中文名文件访问不到
有数据库可以做什么APP
流媒体服务器 存储服务器
数据库多少数据分表
单机版魔兽世界数据库
打印服务器属性设置 安全
软件开发基本工作
南通信息网络技术推荐咨询
c语言项目用哪个软件开发
服务器系统能连打印机吗
健集网络技术有限公司
双色球历史中奖数据库
哪种是主干网络技术
黑龙江省泽谦网络技术有限公司怎么样
数据库原理与应用期末知识点
厦门分布式服务器介绍
网络安全法实施后处罚案例
贵州智能土地gis系统软件开发
海康威视的软件开发岗位工资
数据库 文件类型
学校对公安网络安全的意见
上海公共信息网络安全备案
计算机网络技术专业教学大纲
锦州自习室软件开发
网络安全法规定 给他人
郑州金山云网络技术公司电话
网络安全对无人驾驶的影响
拉萨智能法治文化展馆软件开发
分析几种软件开发方法