Flume笔记整理
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,[TOC]Flume简介Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据
千家信息网最后更新 2025年02月04日Flume笔记整理
[TOC]
Flume简介
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力。名词介绍:Flume OG:Flume original generation,即Flume0.9x版本Flume NG:Flume next generation,即Flume1.x版本官网:http://flume.apache.org
Flume体系结构
1、Flume有一个简单、灵活的基于流的数据流结构2、Flume具有故障转移机制和负载均衡机制3、Flume使用一个简单可扩展的数据模型(source、channel、sink)目前,flume-ng处理数据有两种方式:avro-client、agentavro-client:一次性将数据传输到指定的avro服务的客户端agent:一个持续传输数据的服务Agent主要的组件包括:Source、Channel、SinkSource:完成对日志数据的手机,分成transtion和event打入到channel之中。Channel:主要提供一个队列的功能,对source提供的数据进行简单的缓存。Sink:取出Channel中的数据,进行相应的存储文件系统,数据库或是提交到远程服务器。数据在组件传输的单位是Event。
Flume基本组件
Source
source意为来源、源头。主要作用:从外界采集各种类型的数据,将数据传递给Channel。 比如:监控某个文件只要增加数据就立即采集新增的数据、监控某个目录一旦有新文件产生就采集新文件的内容、监控某个端口等等。常见采集的数据类型: Exec Source、Avro Source、NetCat Source、Spooling Directory Source等详细查看: http://flume.apache.org/FlumeUserGuide.html#flume-sources 或者自带的文档查看。Source具体作用:AvroSource:监听一个avro服务端口,采集Avro数据序列化后的数据;Thrift Source:监听一个Thrift 服务端口,采集Thrift数据序列化后的数据;Exec Source:基于Unix的command在标准输出上采集数据;tail -F 和tail -f 区别。基于log4j切割文件时的能否读取问题。JMS Source:Java消息服务数据源,Java消息服务是一个与具体平台无关的API,这是支持jms规范的数据源采集;Spooling Directory Source:通过文件夹里的新增的文件作为数据源的采集;Kafka Source:从kafka服务中采集数据。NetCat Source: 绑定的端口(tcp、udp),将流经端口的每一个文本行数据作为Event输入HTTP Source:监听HTTP POST和 GET产生的数据的采集
Channel
Channel 一个数据的存储池,中间通道。主要作用 接受source传出的数据,向sink指定的目的地传输。Channel中的数据直到进入到下一个channel中或者进入终端才会被删除。当sink写入失败后,可以自动重写,不会造成数据丢失,因此很可靠。channel的类型很多比如:内存中、jdbc数据源中、文件形式存储等。常见采集的数据类型: Memory Channel File Channel Spillable Memory Channel等详细查看: http://flume.apache.org/FlumeUserGuide.html#flume-channelsChannel具体作用:Memory Channel:使用内存作为数据的存储。速度快File Channel:使用文件来作为数据的存储。安全可靠Spillable Memory Channel:使用内存和文件作为数据的存储,即:先存在内存中,如果内存中数据达到阀值则flush到文件中。JDBC Channel:使用jdbc数据源来作为数据的存储。Kafka Channel:使用kafka服务来作为数据的存储。
Sink
Sink:数据的最终的目的地。主要作用:接受channel写入的数据以指定的形式表现出来(或存储或展示)。sink的表现形式很多比如:打印到控制台、hdfs上、avro服务中、文件中等。常见采集的数据类型: HDFS Sink Hive Sink Logger Sink Avro Sink Thrift Sink File Roll Sink HBaseSink Kafka Sink等详细查看: http://flume.apache.org/FlumeUserGuide.html#flume-sinksHDFSSink需要有hdfs的配置文件和类库。一般采取多个sink汇聚到一台采集机器负责推送到hdfs。Sink具体作用:Logger Sink:将数据作为日志处理(根据flume中的设置的日志的级别显示)。HDFS Sink:将数据传输到hdfs集群中。Avro Sink:数据被转换成Avro Event,然后发送到指定的服务端口上。Thrift Sink:数据被转换成Thrift Event,然后发送到指定的的服务端口上。File Roll Sink:数据传输到本地文件中。Hive Sink:将数据传输到hive的表中。IRC Sink:数据向指定的IRC服务和端口中发送。Null Sink:取消数据的传输,即不发送到任何目的地。HBaseSink:将数据发往hbase数据库中。MorphlineSolrSink:数据发送到Solr搜索服务器(集群)。ElasticSearchSink:数据发送到Elastic Search搜索服务器(集群)。Kafka Sink:将数据发送到kafka服务中。(注意依赖类库)
Event
event是Flume NG传输的数据的基本单位,也是事务的基本单位。在文本文件,通常是一行记录就是一个event。网络消息传输系统中,一条消息就是一个event。event里有header、bodyEvent里面的header类型:Map我们可以在source中自定义header的key:value,在某些channel和sink中使用header。练习1: 一个需求:怎么实时监听一个文件的数据增加呢?打印到控制台上。如果这个文件增加的量特别大呢?
Avro client
avro客户端:往指定接收方相应的主机名:端口 发送本机要监听发送的源文件或者文件夹。bin/flume-ng avro-client --conf conf/ -H master -p 41414 -F /opt/logs/access.log需要提供 avro-source注意:--headerFile选项:追加header信息,文件以空格隔开。bin/flume-ng avro-client --conf conf/ --host slave01 --port 41414 --filename /opt/logs/access.log --headerFile /opt/logs/kv.log 如果指定了--dirname。则传输后此文件夹里的文件会加上fileSuffix后缀。练习02: 监控文件的新增内容向另一台机器的source发送数据。怎么处理?
Flume安装
系统要求: 1、JRE:JDK1.6+(推荐使用1.7) 2、内存:没有上限和下限,能够配置满足source、channel以及sink即可 3、磁盘空间:同2 4、目录权限:一般的agent操作的目录必须要有读写权限 这里采用的Flume版本为1.8.0,也是目前最新的版本,下载地址为: http://archive.apache.org/dist/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz安装步骤: 解压缩:[uplooking@uplooking01 ~]$ tar -zxvf soft/apache-flume-1.8.0-bin.tar.gz -C app/ 重命名:[uplooking@uplooking01 ~]$ mv app/apache-flume-1.8.0-bin/ app/flume 添加到环境变量中 vim ~/.bash_profile export FLUME_HOME=/home/uplooking/app/flume export PATH=$PATH:$FLUME_HOME/bin 修改配置文件 conf]# cp flume-env.sh.template flume-env.sh 添加JAVA_HOME export JAVA_HOME=/opt/jdk
Flume Agent案例
侦听网络端口数据
定义flume agent配置文件:
####################################################################### this's flume log purpose is listenning a socket port which product## data of stream## this agent is consists of source which is r1 , sinks which is k1,## channel which is c1## ## 这里面的a1 是flume一个实例agent的名字######################################################################定义了当前agent的名字叫做a1a1.sources = r1 ##定了该agent中的sources组件叫做r1a1.sinks = k1 ##定了该agent中的sinks组件叫做k1a1.channels = c1 ##定了该agent中的channels组件叫做c1# 监听数据源的方式,这里采用监听网络端口a1.sources.r1.type = netcat #source的类型为网络字节流a1.sources.r1.bind = uplooking01 #source监听的网络的hostnamea1.sources.r1.port = 52019 #source监听的网络的port# 采集的数据的下沉(落地)方式 通过日志a1.sinks.k1.type = logger #sink的类型为logger日志方式,log4j的级别有INFO、Console、file。。。# 描述channel的部分,使用内存做数据的临时存储a1.channels.c1.type = memory #channel的类型使用内存进行数据缓存,这是最常见的一种channela1.channels.c1.capacity = 1000 #定义了channel对的容量a1.channels.c1.transactionCapacity = 100 #定义channel的最大的事务容量# 使用channel将source和sink连接起来# 需要将source和sink使用channel连接起来,组成一个类似流水管道a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-nc.conf -Dflume.root.logger=INFO,console-c conf:使用配置文件的方式-n a1:指定agent的名称为a1-f:指定配置文件因为数据落地是通过日志,所以后面需要指定日志的相关配置选项。
通过telnet或者nc向端口发送数据
安装telnet或nc:
yum isntall -y telentyum install -y nc
向端口发送数据:
# 使用telnet[uplooking@uplooking01 ~]$ telnet uplooking01 52019Trying 192.168.43.101...Connected to uplooking01.Escape character is '^]'.wo ai niOKsai bei de xueOK# 使用nc[uplooking@uplooking01 ~]$ nc uplooking01 52019heiheiOKxpleafOK
此时可以查看flume agent启动终端的输出:
2018-03-24 20:09:34,390 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.43.101:52019]2018-03-24 20:10:13,022 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 6F 20 61 69 20 6E 69 0D wo ai ni. }2018-03-24 20:10:24,139 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 73 61 69 20 62 65 69 20 64 65 20 78 75 65 0D sai bei de xue. }2018-03-24 20:13:26,190 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 69 68 65 69 heihei }2018-03-24 20:13:26,463 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 78 70 6C 65 61 66 xpleaf }2018-03-24 20:17:01,694 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F hello }
侦听目录中的新增文件
配置文件如下:
####################################################################### 监听目录中的新增文件## this agent is consists of source which is r1 , sinks which is k1,## channel which is c1## ## 这里面的a1 是flume一个实例agent的名字#####################################################################a1.sources = r1a1.sinks = k1a1.channels = c1# 监听数据源的方式,这里采用监听目录中的新增文件a1.sources.r1.type = spooldira1.sources.r1.spoolDir = /home/uplooking/data/flumea1.sources.r1.fileSuffix = .ok# a1.sources.r1.deletePolicy = immediatea1.sources.r1.deletePolicy = nevera1.sources.r1.fileHeader = true# 采集的数据的下沉(落地)方式 通过日志a1.sinks.k1.type = logger# 描述channel的部分,使用内存做数据的临时存储a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# 使用channel将source和sink连接起来a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-dir.conf -Dflume.root.logger=INFO,console
在监听目录下新增文件,内容如下:
hello youhello hehello me
可以看到flume agent终端输出:
2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 79 6F 75 hello you }2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 68 65 hello he }2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 6D 65 hello me }2018-03-24 21:23:59,184 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.2018-03-24 21:23:59,184 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)] Preparing to move file /home/uplooking/data/flume/hello.txt to /home/uplooking/data/flume/hello.txt.ok
可以看到提示说,原来的文本文件已经被重命名为.ok,查看数据目录中的文件:
[uplooking@uplooking01 flume]$ lshello.txt.ok
监听文件中的新增数据
tail -f与tail -F的说明:
在生产环境中,为了防止日志文件过大,通常会每天生成一个新的日志文件,这是通过重命名原来的日志文件,然后touch一个原来的日志文件的方式来实现的。 http-xxx.log http-xxx.log.2017-03-15 http-xxx.log.2017-03-16 -f不会监听分割之后的文件,而-F则会继续监听。
配置文件:
####################################################################### 监听文件中的新增数据## ## this agent is consists of source which is r1 , sinks which is k1,## channel which is c1## ## 这里面的a1 是flume一个实例agent的名字#####################################################################a1.sources = r1a1.sinks = k1a1.channels = c1# 监听数据源的方式,这里监听文件中的新增数据a1.sources.r1.type = execa1.sources.r1.command = tail -F /home/uplooking/data/flume/http-flume.log# 采集的数据的下沉(落地)方式 通过日志a1.sinks.k1.type = logger# 描述channel的部分,使用内存做数据的临时存储a1.channels.c1.type = memorya1.channels.c1.capacity = 10000000a1.channels.c1.transactionCapacity = 1000000# 使用channel将source和sink连接起来a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-data.conf -Dflume.root.logger=INFO,console
向监听文件中添加数据:
cat hello.txt.ok > http-flume.log
查看flume agent终端的输出:
2018-03-25 01:28:39,359 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 79 6F 75 hello you }2018-03-25 01:28:40,465 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 68 65 hello he }2018-03-25 01:28:40,465 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 6D 65 hello me }
数据过大导致的内存溢出问题与解决方案
使用jps -v命令可以查看启动flume时,分配的内存大小:
20837 Application -Xmx20m -Dflume.root.logger=INFO,console -Djava.library.path=:/home/uplooking/app/hadoop/lib/native:/home/uplooking/app/hadoop/lib/native
可以看到分配的最大内存为20M,因为我们使用的是将channel中的数据保存到内存中,所以一旦监听的文本数据过大,就会造成内存溢出,先使用下面的脚本生成一个比较大的文本数据:
for i in `seq 1 10000000`do echo "${i}.I like bigdata, I would like to do something with bigdata." >> /home/uplooking/data/mr/bigData.logdone
然后向监听的日志中打数据:
cat bigData.log > ../flume/http-flume.log
这时可以在flume agent终端中看到异常:
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:3664) at java.lang.String.(String.java:207) at java.lang.StringBuilder.toString(StringBuilder.java:407) at sun.net.www.protocol.jar.Handler.parseContextSpec(Handler.java:207) at sun.net.www.protocol.jar.Handler.parseURL(Handler.java:153) at java.net.URL.(URL.java:622) at java.net.URL.(URL.java:490)Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "SinkRunner-PollingRunner-DefaultSinkProcessor"
解决方案:
通过调整# 描述channel的部分,使用内存做数据的临时存储a1.channels.c1.type = memorya1.channels.c1.capacity = 10000000a1.channels.c1.transactionCapacity = 1000000执行案例监听日志文件中的新增记录,操作一下异常java.lang.OutOfMemoryError: GC overhead limit exceeded,简称OOM/OOME两种方案解决: 第一种方案:给该flume程序加大内存存储容量 默认值为-Xmx20m(最大堆内存大小),--->-Xmx 2000m -Xms10m(初始堆内存大小) flume-ng agent -Xms1000m -Xmx1000m -c conf -n a1 -f conf/flume-data.conf -Dflume.root.logger=INFO,console 第二种方案:第一种搞不定的时候,比如机器可用内存不够的话的,使用其它channel解决 比如磁盘文件,比如jdbc
如果文本数据不是特别大,那么用第一种方案也是可以解决的,但是一旦文本数据过大,第一种方案需要分配很大的内存空间,所以下面演示使用第二种方案。
配置文件如下:
####################################################################### 监听文件中的新增数据## 使用文件做为channel## ## this agent is consists of source which is r1 , sinks which is k1,## channel which is c1## ## 这里面的a1 是flume一个实例agent的名字#####################################################################a1.sources = r1a1.sinks = k1a1.channels = c1# 监听数据源的方式,这里监听文件中的新增数据a1.sources.r1.type = execa1.sources.r1.command = tail -F /home/uplooking/data/flume/http-flume.log# 采集的数据的下沉(落地)方式 通过日志a1.sinks.k1.type = logger# 描述channel的部分,使用内存做数据的临时存储a1.channels.c1.type = filea1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpointa1.channels.c1.transactionCapacity = 1000000a1.channels.c1.dataDirs = /home/uplooking/data/flume/data# 使用channel将source和sink连接起来a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
注意需要创建下面两个目录:
/home/uplooking/data/flume/checkpoint # 存放检查点数据/home/uplooking/data/flume/data # 存放channel的数据
这样再向监听文件中打数据,会在终端中看到不停地刷数据。
flume数据下沉之hdfs目录
可以将channel中的数据最终保存到hdfs中,配置文件如下:
####################################################################### 监听文件中的新增数据## 使用文件做为channel## this agent is consists of source which is r1 , sinks which is k1,## channel which is c1## ## 这里面的a1 是flume一个实例agent的名字#####################################################################a1.sources = r1a1.sinks = k1a1.channels = c1# 监听数据源的方式,这里采用监听网络端口a1.sources.r1.type = netcata1.sources.r1.bind = uplooking01a1.sources.r1.port = 52019# 采集的数据的下沉(落地)方式 存储到hdfs的某一路径a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://ns1/input/flume/%Y/%m/%d# 文件生成后的前缀a1.sinks.k1.hdfs.filePrefix = http# 文件生成后的后缀,如http.1521927418991.loga1.sinks.k1.hdfs.fileSuffix = .log# 文件使用时的前缀a1.sinks.k1.hdfs.inUsePrefix = xttzm.# 文件使用时的后缀,如xttzm.http.1521927418992.log.zdhma1.sinks.k1.hdfs.inUseSuffix = .zdhma1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.rollSize = 0a1.sinks.k1.hdfs.rollCount = 5a1.sinks.k1.hdfs.useLocalTimeStamp = true# 默认为SequenceFile,查看hdfs上的文件时为序列化的a1.sinks.k1.hdfs.fileType = DataStream# 上面的要配置,这个也要配置,写入的数据格式为文本内容a1.sinks.k1.hdfs.writeFormat = Text# 下面这个配置选项不加,那么rollInterval rollSize rollCount是不会生效的a1.sinks.k1.hdfs.minBlockReplicas = 1# 描述channel的部分,使用文件做数据的临时存储a1.channels.c1.type = filea1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpointa1.channels.c1.transactionCapacity = 1000000a1.channels.c1.dataDirs = /home/uplooking/data/flume/data# 使用channel将source和sink连接起来a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-hdfs-sink.conf -Dflume.root.logger=INFO,console
通过nc发送数据:
$ nc uplooking01 520191OK2OK3OK......12OK13OK14OK15OK16OK
这样,在hdfs目录下会生成三个正式文件,同时还应该有一个临时文件:
$ hdfs dfs -ls /input/flume/2018/03/25/ Found 4 items-rw-r--r-- 3 uplooking supergroup 10 2018-03-25 06:00 /input/flume/2018/03/25/http.1521928799720.log-rw-r--r-- 3 uplooking supergroup 11 2018-03-25 06:00 /input/flume/2018/03/25/http.1521928799721.log-rw-r--r-- 3 uplooking supergroup 15 2018-03-25 06:00 /input/flume/2018/03/25/http.1521928799722.log-rw-r--r-- 3 uplooking supergroup 3 2018-03-25 06:00 /input/flume/2018/03/25/xttzm.http.1521928799723.log.zdhm
数据
文件
监听
内存
日志
存储
服务
方式
端口
配置
传输
目录
数据源
类型
文本
方案
网络
面的
作用
名字
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库建设合同
东城电脑服务器回收公司
互联网数据库流量分析
德力西电表数据库
网络安全对教育的影响
科技与互联网十周年活动简报
庆阳市公安局开展网络安全
数据库可以进行测试吗
预防网络安全教育
软件开发关键节点是
松子服务器
2022年国家网络安全总体形势
19c数据库安全性
黑魂2服务器怎么登录
云服务器部署静态网页
江苏智能软件开发价钱
工业互联网与金融科技
网络安全生产新纪元
湖州独立显卡服务器
福建通讯软件开发设施价格优惠
美国宾州州立大学网络安全
大学里学会什么软件开发
制作网络安全宣传ppt
sw修改器服务器在哪
学一个软件开发就可以当程序员吗
甘肃gisbim软件开发
工业部网络安全大检查
专业支付软件开发公司有哪些
软件开发者能看到手机ip吗
网络技术及应用实训报告