Flume的介绍和简单操作
发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,Flume是什么Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据
千家信息网最后更新 2025年02月06日Flume的介绍和简单操作
Flume是什么
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume的功能
- 支持在日志系统中定制各类数据发送方,用于收集数据
- 提供对数据简单处理,并写到各类数据接收方(可定制)的能力
Flume的组成
- Agent:核心组件
- source 负责数据的产生或搜集
- channel 是一种短暂的存储容器,负责数据的存储持久化
- sink 负责数据的转发
Flume的工作流示意图
- 数据流模型
- 多Agent模型
- 合并模型
- 混合模型
Flume的安装
下载安装包并解压
wget http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gztar -zxvf apache-flume-1.8.0-bin.tar.gz
配置环境变量
vim ~/.bashrcexport FLUME_HOME=/usr/local/src/apache-flume-1.8.0-binexport PATH=$PATH:$FLUME_HOME/binsource ~/.bashrc
Flume简单操作
- netcat模式
进入conf目录下编写netcat.conf文件,内容如下:
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = netcatagent.sources.netcatSource.bind = localhostagent.sources.netcatSource.port = 11111agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10
启动一个实例
(py27) [root@master conf]# pwd/usr/local/src/apache-flume-1.8.0-bin/conf(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./netcat.conf --name agent -Dflume.root.logger=INFO,console
启动成功
18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume_netcat.conf18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink18/10/24 11:26:35 INFO conf.FlumeConfiguration: Added sinks: loggerSink Agent: agent18/10/24 11:26:35 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Creating channels18/10/24 11:26:35 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Created channel memoryChannel18/10/24 11:26:35 INFO source.DefaultSourceFactory: Creating instance of source netcatSource, type netcat18/10/24 11:26:35 INFO sink.DefaultSinkFactory: Creating instance of sink: loggerSink, type: logger18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [netcatSource, loggerSink]18/10/24 11:26:35 INFO node.Application: Starting new configuration:{ sourceRunners:{netcatSource=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcatSource,state:IDLE} }} sinkRunners:{loggerSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@262b92ac counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }18/10/24 11:26:35 INFO node.Application: Starting Channel memoryChannel18/10/24 11:26:35 INFO node.Application: Waiting for channel: memoryChannel to start. Sleeping for 500 ms18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started18/10/24 11:26:36 INFO node.Application: Starting Sink loggerSink18/10/24 11:26:36 INFO node.Application: Starting Source netcatSource18/10/24 11:26:36 INFO source.NetcatSource: Source starting18/10/24 11:26:36 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.16.155.120:11111]
然后新开一个终端,发送数据
(py27) [root@master apache-flume-1.8.0-bin]# telnet localhost 11111Trying 127.0.0.1...Connected to localhost.Escape character is '^]'.1OK
查看接收数据
18/10/24 11:30:15 INFO sink.LoggerSink: Event: { headers:{} body: 31 0D 1. }
注:如果没有telnet工具,请先安装:yum install telnet
- Exec模式
编写配置文件exec.conf
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = exec agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10
启动实例
(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec.conf --name agent -Dflume.root.logger=INFO,console
启动成功后,创建配置文件中的exec.log文件
(py27) [root@master test_data]# lsexec.log(py27) [root@master test_data]# pwd/home/master/FlumeTest/test_data(py27) [root@master test_data]#
然后通过echo命令模拟日志的产生
(py27) [root@master test_data]# echo 'Hello World!!!' >> exec.log
查看接收的日志
18/10/25 09:19:52 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 21 21 Hello World!!! }
如何将日志保存到HDFS上
修改配置文件
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = exec agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = hdfs agent.sinks.loggerSink.hdfs.path = /flume/%y-%m-%d/%H%M/agent.sinks.loggerSink.hdfs.filePrefix = exec_hdfs_agent.sinks.loggerSink.hdfs.round = trueagent.sinks.loggerSink.hdfs.roundValue = 1agent.sinks.loggerSink.hdfs.roundUnit = minuteagent.sinks.loggerSink.hdfs.rollInterval = 3agent.sinks.loggerSink.hdfs.rollSize = 20agent.sinks.loggerSink.hdfs.rollCount = 5agent.sinks.loggerSink.hdfs.useLocalTimeStamp = trueagent.sinks.loggerSink.hdfs.fileType = DataStreamagent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10
然后启动实例
(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec_hdfs.conf --name agent -Dflume.root.logger=INFO,console
然后可以看到它把exec.log文件里的日志给写到了HDFS上
18/10/25 09:54:26 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false18/10/25 09:54:26 INFO hdfs.BucketWriter: Creating /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp18/10/25 09:54:32 INFO hdfs.BucketWriter: Closing /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp18/10/25 09:54:32 INFO hdfs.BucketWriter: Renaming /flume/18-10-25/0954/exec_hdfs_.1540475666623.tmp to /flume/18-10-25/0954/exec_hdfs_.154047566662318/10/25 09:54:32 INFO hdfs.HDFSEventSink: Writer callback called.
我们进入HDFS查看,可以看到log里的内容
(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0954Found 1 items-rw-r--r-- 3 root supergroup 15 2018-10-25 09:54 /flume/18-10-25/0954/exec_hdfs_.1540475666623(py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0954/exec_hdfs_.1540475666623Hello World!!!
然后我们再次写入写的log,然后再查看
//写入新的log(py27) [root@master test_data]# echo 'test001' >> exec.log (py27) [root@master test_data]# echo 'test002' >> exec.log//进入HDFS目录查看(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25Found 2 itemsdrwxr-xr-x - root supergroup 0 2018-10-25 09:54 /flume/18-10-25/0954drwxr-xr-x - root supergroup 0 2018-10-25 09:56 /flume/18-10-25/0956(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0956Found 1 items-rw-r--r-- 3 root supergroup 16 2018-10-25 09:56 /flume/18-10-25/0956/exec_hdfs_.1540475766338(py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0956/exec_hdfs_.1540475766338test001test002
- 故障转移实例
首先需要三台机器,master、slave1、slave2,然后分别配置实例并启动,master上的agent实例发送日志,slave1和slave2接收日志
master配置
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSink1 loggerSink2agent.sinkgroups = groupagent.sources.netcatSource.type = execagent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink1.type = avroagent.sinks.loggerSink1.hostname = slave1agent.sinks.loggerSink1.port = 52020agent.sinks.loggerSink1.channel = memoryChannelagent.sinks.loggerSink2.type = avroagent.sinks.loggerSink2.hostname = slave2agent.sinks.loggerSink2.port = 52020agent.sinks.loggerSink2.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000agent.sinkgroups.group.sinks = loggerSink1 loggerSink2agent.sinkgroups.group.processor.type = failoveragent.sinkgroups.group.processor.loggerSink1 = 10agent.sinkgroups.group.processor.loggerSink2 = 1agent.sinkgroups.group.processor.maxpenalty = 10000
slave1配置
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = avroagent.sources.netcatSource.bind = slave1agent.sources.netcatSource.port = 52020agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000
slave2配置
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = avroagent.sources.netcatSource.bind = slave2agent.sources.netcatSource.port = 52020agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000
分别启动master、slave1、slave2的agent,然后在mater上写入日志,然后观察谁收到了
//master(py27) [root@master test_data]# echo 'hello' >> exec.log //slave118/10/25 10:53:53 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F hello }//slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726
发现是slave1收到数据,然后我们把slave1的agent关掉,再次发送日志
//master(py27) [root@master test_data]# echo '11111' >> exec.log //slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:3972618/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31 11111 }
然后再次启动slave1的agent
//master(py27) [root@master test_data]# echo '22222' >> exec.log //slave118/10/25 10:58:21 INFO sink.LoggerSink: Event: { headers:{} body: 32 32 32 32 32 22222 }//slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:3972618/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31 11111 }
数据
日志
配置
实例
文件
模型
再次
系统
成功
内容
模式
目录
能力
处理
存储
支持
下编
分布式
功能
变量
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
杭州凡华互联网科技招聘
sky光遇国际服务器
雄安软件开发测试培训
db2数据库备份
棋牌软件开发价格表
华为服务器无法启动风扇音大
求职58同城网络安全
浙江省网络安全须知
列数据库
数据库的收录范围
软件开发主流哪门语言
本地服务器安全测速
ios系统软件开发服务方案价格
构建全域数据库
关于网络安全方面的调查问卷
通鼎互联有网络安全概念吗
达梦数据库的版本如何查询
宝塔如何防护服务器安全
cs服务器租用
网络安全的小装饰
第七章网络技术
券商服务器交易所边上
x79能用服务器内存条
如何将服务器数据转移到云端
网络安全风险如何处理
上海挑选软件开发试验设备
网络技术是计算机网络技术吗
地质灾害遥感图像数据库的研究
工控醇码商城软件开发
戴尔服务器风扇一直转是怎么回事