千家信息网

Flume日志采集框架的使用方法

发表于:2024-11-27 作者:千家信息网编辑
千家信息网最后更新 2024年11月27日,本篇内容主要讲解" Flume日志采集框架的使用方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习" Flume日志采集框架的使用方法"吧!Flume日志采
千家信息网最后更新 2024年11月27日Flume日志采集框架的使用方法

本篇内容主要讲解" Flume日志采集框架的使用方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习" Flume日志采集框架的使用方法"吧!

Flume日志采集框架 安装和部署 Flume运行机制 采集静态文件到hdfs 采集动态日志文件到hdfs 两个agent级联

Flume日志采集框架

在一个完整的离线大数据处理系统中,除了hdfs+mapreduce+hive组成分析系统的核心之外,还需要数据采集、结果数据导出、任务调度等不可或缺的辅助系统,而这些辅助工具在hadoop生态体系中都有便捷的开源框架,如图所示:

1 Flume介绍

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFShbasehivekafka等众多外部存储系统中。

对于一般的采集需求,通过对flume的简单配置即可实现。

Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景。

2 Flume运行机制

Flume分布式系统中最核心的角色是agentflume采集系统就是由一个个agent所连接起来形成,每一个agent相当于一个数据传递员,内部有三个组件:

  • Source:采集组件,用于跟数据源对接,以获取数据

  • Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据

  • Channel:传输通道组件,用于从source将数据传递到sink

单个agent采集数据:

多级agent之间串联:

3 Flume的安装部署

1 下载安装包apache-flume-1.9.0-bin.tar.gz解压

2 在conf文件夹下的flume-env.sh添加JAVA_HOME

export JAVA_HOME=/usr/local/bigdata/java/jdk1.8.0_211

3 根据采集的需求,添加采集方案配置文件,文件名可以任意取

具体可以看后面的示例

4 启动flume

测试环境下:

$ bin/flume/-ng agent -c conf/ -f ./dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console

命令说明:

  • -c:指定flume自带的配置文件目录,不用自己修改

  • -f:指定自己的配置文件,这里问当前文件夹下的dir-hdfs.conf

  • -n:指定自己配置文件中使用那个agent,对应的配置文件中定义的名字。

  • -Dflume.root.logger:把日志打印在控制台,类型为INFO,这个只用于测试,后面将打印到日志文件中

生产中,启动flume,应该把flume启动在后台:

nohup bin/flume-ng  agent  -c  ./conf  -f ./dir-hdfs.conf -n  agent1 1>/dev/null 2>&1 &

4 采集静态文件到hdfs

4.1 采集需求

某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去

4.2 添加配置文件

在安装目录下添加文件dir-hdfs.conf,然后添加配置信息。

先获取agent,命名为agent1,后面的配置都跟在agent1后面,也可以改为其他值,如agt1,同一个配置文件中可以有多个配置配置方案,启动agent的时候获取对应的名字就可以。

根据需求,首先定义以下3大要素

数据源组件

source --监控文件目录 : spooldir spooldir有如下特性:

  • 监视一个目录,只要目录中出现新文件,就会采集文件中的内容

  • 采集完成的文件,会被agent自动添加一个后缀:COMPLETED(可修改)

  • 所监视的目录中不允许重复出现相同文件名的文件

下沉组件

sink--HDFS文件系统 : hdfs sink

通道组件

channel--可用file channel 也可以用内存channel

#定义三大组件的名称agent1.sources = source1agent1.sinks = sink1agent1.channels = channel1# 配置source组件agent1.sources.source1.type = spooldiragent1.sources.source1.spoolDir = /root/log/agent1.sources.source1.fileSuffix=.FINISHED#文件每行的长度,注意这里如果事情文件每行超过这个长度会自动切断,会导致数据丢失agent1.sources.source1.deserializer.maxLineLength=5120# 配置sink组件agent1.sinks.sink1.type = hdfsagent1.sinks.sink1.hdfs.path =hdfs://Master:9000/access_log/%y-%m-%d/%H-%Magent1.sinks.sink1.hdfs.filePrefix = app_logagent1.sinks.sink1.hdfs.fileSuffix = .logagent1.sinks.sink1.hdfs.batchSize= 100agent1.sinks.sink1.hdfs.fileType = DataStreamagent1.sinks.sink1.hdfs.writeFormat =Text# roll:滚动切换:控制写文件的切换规则## 按文件体积(字节)来切 agent1.sinks.sink1.hdfs.rollSize = 512000## 按event条数切agent1.sinks.sink1.hdfs.rollCount = 1000000## 按时间间隔切换文件agent1.sinks.sink1.hdfs.rollInterval = 60## 控制生成目录的规则agent1.sinks.sink1.hdfs.round = trueagent1.sinks.sink1.hdfs.roundValue = 10agent1.sinks.sink1.hdfs.roundUnit = minuteagent1.sinks.sink1.hdfs.useLocalTimeStamp = true# channel组件配置agent1.channels.channel1.type = memory## event条数agent1.channels.channel1.capacity = 500000##flume事务控制所需要的缓存容量600条eventagent1.channels.channel1.transactionCapacity = 600# 绑定source、channel和sink之间的连接agent1.sources.source1.channels = channel1agent1.sinks.sink1.channel = channel1

Channel参数解释:

  • capacity:默认该通道中最大的可以存储的event数量

  • trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量

  • keep-aliveevent添加到通道中或者移出的允许时间

4.3启动flume

$ bin/flume/-ng agent -c conf/ -f dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console

5 采集动态日志文件到hdfs

5.1 采集需求

比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs

5.2 配置文件

配置文件名称:tail-hdfs.conf 根据需求,首先定义以下3大要素:

  • 采集源,即source--监控文件内容更新 : exec tail -F file

  • 下沉目标,即sink--HDFS文件系统 : hdfs sink

  • Sourcesink之间的传递通道--channel,可用file channel 也可以用 内存channel

配置文件内容:

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /root/app_weichat_login.log# Describe the sinkagent1.sinks.sink1.type = hdfsagent1.sinks.sink1.hdfs.path =hdfs://Master:9000/app_weichat_login_log/%y-%m-%d/%H-%Magent1.sinks.sink1.hdfs.filePrefix = weichat_logagent1.sinks.sink1.hdfs.fileSuffix = .datagent1.sinks.sink1.hdfs.batchSize= 100agent1.sinks.sink1.hdfs.fileType = DataStreamagent1.sinks.sink1.hdfs.writeFormat =Textagent1.sinks.sink1.hdfs.rollSize = 100agent1.sinks.sink1.hdfs.rollCount = 1000000agent1.sinks.sink1.hdfs.rollInterval = 60agent1.sinks.sink1.hdfs.round = trueagent1.sinks.sink1.hdfs.roundValue = 1agent1.sinks.sink1.hdfs.roundUnit = minuteagent1.sinks.sink1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

5.3 启动flume

启动命令:

bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1

6 两个agent级联

从tail命令获取数据发送到avro端口 另一个节点可配置一个avro源来中继数据,发送外部存储

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /root/log/access.log# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hdp-05a1.sinks.k1.port = 4141a1.sinks.k1.batch-size = 2# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

从avro端口接收数据,下沉到hdfs

采集配置文件,avro-hdfs.conf

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source##source中的avro组件是一个接收者服务a1.sources.r1.type = avroa1.sources.r1.bind = hdp-05a1.sources.r1.port = 4141# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /flume/taildata/%y-%m-%d/a1.sinks.k1.hdfs.filePrefix = tail-a1.sinks.k1.hdfs.round = truea1.sinks.k1.hdfs.roundValue = 24a1.sinks.k1.hdfs.roundUnit = houra1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.rollSize = 0a1.sinks.k1.hdfs.rollCount = 50a1.sinks.k1.hdfs.batchSize = 10a1.sinks.k1.hdfs.useLocalTimeStamp = true#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本a1.sinks.k1.hdfs.fileType = DataStream# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

到此,相信大家对" Flume日志采集框架的使用方法"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

文件 数据 配置 日志 组件 系统 目录 框架 内容 需求 通道 方法 存储 控制 使用方法 之间 命令 文件夹 切换 生成 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 非专业学软件开发 高德地图地理编码服务器 县财政局网络安全 泰坦陨落pc服务器和主机服务器 sql 更改数据库名 怎样卸载服务器安全狗 网络安全课题研究个人总结 我的世界服务器地狱怎么做 广西大数据时钟同步服务器 加强网站及网络安全管理 蛇蛇争霸服务器 大学学软件开发用什么笔记本 戴尔新一代服务器 国家鼓励引导网络安全 实况国际服务器在哪里 软件开发工作年龄限制 虚拟服务器怎么进 上位机开发必须学会数据库吗 网络技术与应用远程桌面配置 数据库查询关系代数表达式 新手可以做手机软件开发吗 河南犀利牛网络技术有限公司 山丹县网络安全知识竞赛 服务器的服务器管理器在哪 网络安全服务包括哪些方面 数据库cpe用的什么操作系统 软件开发票可以抵扣吗 重生科技互联网投资小说贴吧 金启航网络技术工程 大班网络安全小常识
0