千家信息网

Flume接入Hive数仓的搭建流程

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章主要讲解了"Flume接入Hive数仓的搭建流程",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flume接入Hive数仓的搭建流程"吧!实时流
千家信息网最后更新 2025年02月02日Flume接入Hive数仓的搭建流程

这篇文章主要讲解了"Flume接入Hive数仓的搭建流程",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flume接入Hive数仓的搭建流程"吧!

实时流接入数仓,基本在大公司都会有,在Flume1.8以后支持taildir source, 其有以下几个特点,而被广泛使用:

  1. 使用正则表达式匹配目录中的文件名

  2. 监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink

  3. 高可靠,不会丢失数据

  4. 不会对跟踪文件有任何处理,不会重命名也不会删除

  5. 不支持Windows,不能读二进制文件。支持按行读取文本文件

本文以开源Flume流为例,介绍流接入HDFS ,后面在其上面建立ods层外表。

1.1 taildir source配置

a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log

1.2 hdfs sink 配置

a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 100 # 使用本地时间 a1.sinks.k1.hdfs.useLocalTimeStamp = true

1.3 Agent的配置

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 1000 # 使用本地时间 a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

/opt/hoult/servers/conf/flume-log2hdfs.conf

1.4 启动

flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console  export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote" # 要想使配置文件生效,还要在命令行中指定配置文件目录 flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console

要$FLUME_HOME/conf/flume-env.sh加下面的参数,否则会报错误如下:

1.5 使用自定义拦截器解决Flume Agent替换本地时间为日志里面的时间戳

使用netcat source → logger sink来测试

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = linux121 a1.sources.r1.port = 9999 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # sink a1.sinks.k1.type = logger # source、channel、sink之间的关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

拦截器主要代码如下:

public class CustomerInterceptor implements Interceptor {     private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");      @Override     public void initialize() {      }      @Override     public Event intercept(Event event) {         // 获得body的内容         String eventBody = new String(event.getBody(), Charsets.UTF_8);         // 获取header的内容         Map headerMap = event.getHeaders();         final String[] bodyArr = eventBody.split("\\s+");         try {             String jsonStr = bodyArr[6];             if (Strings.isNullOrEmpty(jsonStr)) {                 return null;             }             // 将 string 转成 json 对象             JSONObject jsonObject = JSON.parseObject(jsonStr);             String timestampStr = jsonObject.getString("time");             //将timestamp 转为时间日期类型(格式 :yyyyMMdd)             long timeStamp = Long.valueOf(timestampStr);             String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault()));             headerMap.put("logtime", date);             event.setHeaders(headerMap);         } catch (Exception e) {             headerMap.put("logtime", "unknown");             event.setHeaders(headerMap);         }         return event;      }      @Override     public List intercept(List events) {         List out = new ArrayList<>();         for (Event event : events) {             Event outEvent = intercept(event);             if (outEvent != null) {                 out.add(outEvent);             }         }         return out;     }      @Override     public void close() {      }      public static class Builder implements Interceptor.Builder {         @Override         public Interceptor build() {             return new CustomerInterceptor();         }          @Override         public void configure(Context context) {         }     }

启动

flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console ## 测试 telnet linux121 9999

感谢各位的阅读,以上就是"Flume接入Hive数仓的搭建流程"的内容了,经过本文的学习后,相信大家对Flume接入Hive数仓的搭建流程这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

文件 配置 接入 时间 流程 内容 学习 支持 个数 名称 大小 数据 方式 目录 拦截器 测试 中指 之间 二进制 代码 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 sql数据库关系图重要吗 服务器上的中文名文件访问不到 有数据库可以做什么APP 流媒体服务器 存储服务器 数据库多少数据分表 单机版魔兽世界数据库 打印服务器属性设置 安全 软件开发基本工作 南通信息网络技术推荐咨询 c语言项目用哪个软件开发 服务器系统能连打印机吗 健集网络技术有限公司 双色球历史中奖数据库 哪种是主干网络技术 黑龙江省泽谦网络技术有限公司怎么样 数据库原理与应用期末知识点 厦门分布式服务器介绍 网络安全法实施后处罚案例 贵州智能土地gis系统软件开发 海康威视的软件开发岗位工资 数据库 文件类型 学校对公安网络安全的意见 上海公共信息网络安全备案 计算机网络技术专业教学大纲 锦州自习室软件开发 网络安全法规定 给他人 郑州金山云网络技术公司电话 网络安全对无人驾驶的影响 拉萨智能法治文化展馆软件开发 分析几种软件开发方法
0