千家信息网

Flume日志采集框架的使用方法

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容主要讲解" Flume日志采集框架的使用方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习" Flume日志采集框架的使用方法"吧!Flume日志采
千家信息网最后更新 2025年01月23日Flume日志采集框架的使用方法

本篇内容主要讲解" Flume日志采集框架的使用方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习" Flume日志采集框架的使用方法"吧!

Flume日志采集框架 安装和部署 Flume运行机制 采集静态文件到hdfs 采集动态日志文件到hdfs 两个agent级联

Flume日志采集框架

在一个完整的离线大数据处理系统中,除了hdfs+mapreduce+hive组成分析系统的核心之外,还需要数据采集、结果数据导出、任务调度等不可或缺的辅助系统,而这些辅助工具在hadoop生态体系中都有便捷的开源框架,如图所示:

1 Flume介绍

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFShbasehivekafka等众多外部存储系统中。

对于一般的采集需求,通过对flume的简单配置即可实现。

Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景。

2 Flume运行机制

Flume分布式系统中最核心的角色是agentflume采集系统就是由一个个agent所连接起来形成,每一个agent相当于一个数据传递员,内部有三个组件:

  • Source:采集组件,用于跟数据源对接,以获取数据

  • Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据

  • Channel:传输通道组件,用于从source将数据传递到sink

单个agent采集数据:

多级agent之间串联:

3 Flume的安装部署

1 下载安装包apache-flume-1.9.0-bin.tar.gz解压

2 在conf文件夹下的flume-env.sh添加JAVA_HOME

export JAVA_HOME=/usr/local/bigdata/java/jdk1.8.0_211

3 根据采集的需求,添加采集方案配置文件,文件名可以任意取

具体可以看后面的示例

4 启动flume

测试环境下:

$ bin/flume/-ng agent -c conf/ -f ./dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console

命令说明:

  • -c:指定flume自带的配置文件目录,不用自己修改

  • -f:指定自己的配置文件,这里问当前文件夹下的dir-hdfs.conf

  • -n:指定自己配置文件中使用那个agent,对应的配置文件中定义的名字。

  • -Dflume.root.logger:把日志打印在控制台,类型为INFO,这个只用于测试,后面将打印到日志文件中

生产中,启动flume,应该把flume启动在后台:

nohup bin/flume-ng  agent  -c  ./conf  -f ./dir-hdfs.conf -n  agent1 1>/dev/null 2>&1 &

4 采集静态文件到hdfs

4.1 采集需求

某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去

4.2 添加配置文件

在安装目录下添加文件dir-hdfs.conf,然后添加配置信息。

先获取agent,命名为agent1,后面的配置都跟在agent1后面,也可以改为其他值,如agt1,同一个配置文件中可以有多个配置配置方案,启动agent的时候获取对应的名字就可以。

根据需求,首先定义以下3大要素

数据源组件

source --监控文件目录 : spooldir spooldir有如下特性:

  • 监视一个目录,只要目录中出现新文件,就会采集文件中的内容

  • 采集完成的文件,会被agent自动添加一个后缀:COMPLETED(可修改)

  • 所监视的目录中不允许重复出现相同文件名的文件

下沉组件

sink--HDFS文件系统 : hdfs sink

通道组件

channel--可用file channel 也可以用内存channel

#定义三大组件的名称agent1.sources = source1agent1.sinks = sink1agent1.channels = channel1# 配置source组件agent1.sources.source1.type = spooldiragent1.sources.source1.spoolDir = /root/log/agent1.sources.source1.fileSuffix=.FINISHED#文件每行的长度,注意这里如果事情文件每行超过这个长度会自动切断,会导致数据丢失agent1.sources.source1.deserializer.maxLineLength=5120# 配置sink组件agent1.sinks.sink1.type = hdfsagent1.sinks.sink1.hdfs.path =hdfs://Master:9000/access_log/%y-%m-%d/%H-%Magent1.sinks.sink1.hdfs.filePrefix = app_logagent1.sinks.sink1.hdfs.fileSuffix = .logagent1.sinks.sink1.hdfs.batchSize= 100agent1.sinks.sink1.hdfs.fileType = DataStreamagent1.sinks.sink1.hdfs.writeFormat =Text# roll:滚动切换:控制写文件的切换规则## 按文件体积(字节)来切 agent1.sinks.sink1.hdfs.rollSize = 512000## 按event条数切agent1.sinks.sink1.hdfs.rollCount = 1000000## 按时间间隔切换文件agent1.sinks.sink1.hdfs.rollInterval = 60## 控制生成目录的规则agent1.sinks.sink1.hdfs.round = trueagent1.sinks.sink1.hdfs.roundValue = 10agent1.sinks.sink1.hdfs.roundUnit = minuteagent1.sinks.sink1.hdfs.useLocalTimeStamp = true# channel组件配置agent1.channels.channel1.type = memory## event条数agent1.channels.channel1.capacity = 500000##flume事务控制所需要的缓存容量600条eventagent1.channels.channel1.transactionCapacity = 600# 绑定source、channel和sink之间的连接agent1.sources.source1.channels = channel1agent1.sinks.sink1.channel = channel1

Channel参数解释:

  • capacity:默认该通道中最大的可以存储的event数量

  • trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量

  • keep-aliveevent添加到通道中或者移出的允许时间

4.3启动flume

$ bin/flume/-ng agent -c conf/ -f dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console

5 采集动态日志文件到hdfs

5.1 采集需求

比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs

5.2 配置文件

配置文件名称:tail-hdfs.conf 根据需求,首先定义以下3大要素:

  • 采集源,即source--监控文件内容更新 : exec tail -F file

  • 下沉目标,即sink--HDFS文件系统 : hdfs sink

  • Sourcesink之间的传递通道--channel,可用file channel 也可以用 内存channel

配置文件内容:

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /root/app_weichat_login.log# Describe the sinkagent1.sinks.sink1.type = hdfsagent1.sinks.sink1.hdfs.path =hdfs://Master:9000/app_weichat_login_log/%y-%m-%d/%H-%Magent1.sinks.sink1.hdfs.filePrefix = weichat_logagent1.sinks.sink1.hdfs.fileSuffix = .datagent1.sinks.sink1.hdfs.batchSize= 100agent1.sinks.sink1.hdfs.fileType = DataStreamagent1.sinks.sink1.hdfs.writeFormat =Textagent1.sinks.sink1.hdfs.rollSize = 100agent1.sinks.sink1.hdfs.rollCount = 1000000agent1.sinks.sink1.hdfs.rollInterval = 60agent1.sinks.sink1.hdfs.round = trueagent1.sinks.sink1.hdfs.roundValue = 1agent1.sinks.sink1.hdfs.roundUnit = minuteagent1.sinks.sink1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

5.3 启动flume

启动命令:

bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1

6 两个agent级联

从tail命令获取数据发送到avro端口 另一个节点可配置一个avro源来中继数据,发送外部存储

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /root/log/access.log# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hdp-05a1.sinks.k1.port = 4141a1.sinks.k1.batch-size = 2# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

从avro端口接收数据,下沉到hdfs

采集配置文件,avro-hdfs.conf

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source##source中的avro组件是一个接收者服务a1.sources.r1.type = avroa1.sources.r1.bind = hdp-05a1.sources.r1.port = 4141# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /flume/taildata/%y-%m-%d/a1.sinks.k1.hdfs.filePrefix = tail-a1.sinks.k1.hdfs.round = truea1.sinks.k1.hdfs.roundValue = 24a1.sinks.k1.hdfs.roundUnit = houra1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.rollSize = 0a1.sinks.k1.hdfs.rollCount = 50a1.sinks.k1.hdfs.batchSize = 10a1.sinks.k1.hdfs.useLocalTimeStamp = true#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本a1.sinks.k1.hdfs.fileType = DataStream# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

到此,相信大家对" Flume日志采集框架的使用方法"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

文件 数据 配置 日志 组件 系统 目录 框架 内容 需求 通道 方法 存储 控制 使用方法 之间 命令 文件夹 切换 生成 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络技术维护与支撑 河南艺术职业学院网络技术专业 数据库的日志怎么截 我的世界手机版宝可梦服务器星辰 流行网络技术 数据库管理如何建表填写数据 网络安全品牌深信服奇安信启明星 长沙中兴通讯软件开发 关于网络安全意识的提问问题 网络安全大事2017 国内外数据库技术发展现状 曲阜软件开发有限公司在线咨询 公安部网络安全保卫局能考吗 农商银行网络安全培训课后测试题 软件开发类职业发展 一场爱国主义网络安全教育课 铜梁区网络软件开发流程常见问题 广州体感软件开发 清华同方软件开发 待遇 外文论文数据库收录 网络安全行业的社会分析 网络安全进社区宣传 华为服务器管理页面 简述最常用的数据库系统 联通通信网络技术是干嘛的 日语专业加软件开发找工作 软件开发成果定义 训练模式为什么连不上服务器 中国石油 工控网络安全 软件开发倒排工期模板
0