千家信息网

一、Flume--数据采集器基本原理和使用

发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,一、概述1、flume是什么1) Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,Flume只能在Linux环境下运行。2) Flume基于流式架构,容错性强,也很
千家信息网最后更新 2024年09月22日一、Flume--数据采集器基本原理和使用

一、概述

1、flume是什么

1) Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,Flume只能在Linux环境下运行。
2) Flume基于流式架构,容错性强,也很灵活简单,架构简单。
3) Flume、Kafka用来实时进行数据收集,Spark、Storm用来实时处理数据,impala用来实时查询。

2、flume的基本架构

​ 图1.1 flume架构

说到flume的架构,直接拿官网的图来说就足够了。
首先在每个数据源上都会部署一个 flume agent ,这个agent就是用来采取数据的。
这个agent由3个组件组成:source,channel,sink。而在flume中,数据传输的基本单位是event。下面讲讲这几个概念

(1)source

用于从数据源采集数据,并将数据传输在channel中。source支持多种数据源采集方式。比如监听端口采集数据,从文件中采集,从目录中采集,从http服务中采集等。

(2)channel

位于source和sink之间,是数据的一个暂存区域。一般情况下,从source流出数据的速率和sink流出的数据的速率会有所差异。所以需要一个空间暂存那些还没办法传输到sink进行处理的数据。所以channel类似于一个缓冲区,一个队列。

(3)sink

从channel获取数据,并将数据写到目标源。目标源支持多种,比如本地文件、hdfs、kafka、下一个flume agent的source等均可。

(4)event

传输单元,flume传输的基本单位,包括 headers和body两部分,header可以添加一些头部信息,body则是数据。

3、flume传输过程

基于上面的概念,流程基本很清晰,source监控数据源,如果产生新的数据,则获取数据,并封装成一个event,然后将event传输到channel,接着sink从channel拉取数据写入到目标源中。

二、flume的使用

1、flume部署

flume的程序本身的部署非常简单,
(1)部署jdk1.8
(2)解压flume的程序压缩包到指定目录,然后添加环境变量即可
(3)修改配置文件

cd /opt/modules/apache-flume-1.8.0-bin将模板配置文件复制重命名为正式配置文件cp conf/flume-env.sh.template conf/flume-env.sh添加jdk家目录变量vim conf/flume-env.sh加上这句export JAVA_HOME=/opt/modules/jdk1.8.0_144

这就完成配置了,基本没啥难度。flume的使用重点在于agent的配置文件的编写,根据业务场景不同,配置也不同。简单来说其实就是对source,channel,sink三大组件的工作属性的配置。

2、agent定义流程

agent的配置其实就是对source、channel、sink的配置。主要有5个步骤,下面看看这个流程是怎样的。

# 1、定义的agent名称,指定使用的source sinks channels的名称# 可以有多个source sinks channels。.sources = .sinks = .channels = # 2、定义source工作属性。# 基本格式就是 agent名.sources.source名.参数名=value# 第一个参数都是type,就是指定source类型的.sources..type=xxxx.sources..=xxxx.sources..=xxxx.........# 3、设置channel工作属性.格式都是类似的# 第一个参数都是type,就是指定channel类型的.channels..type=xxxxx.channels..=xxxxx.channels..=xxxxx.........# 4、设置sink工作属性# 第一个参数都是type,就是指定sink类型的.sinks..type=xxxxx.sinks..=xxxxx.sinks..=xxxxx...............# 5、设置source以及sink使用的channel,通过channel将两者连接起来.sources..channels = .sinks..channel = 

这就是agent定义的完整流程,source、channel、sink每个都有不同的类型,每个类型定义的参数会有差异。下面看看source、channel、sink中常用的类型(想看完整的全部的类型就看官网吧)

3、常用source的类型

(1)netcat--从tcp端口获取数据

常用属性:type:需指定为  netcatbind:监听的主机名或者ipport:监听的端口例子:监听在 0.0.0.0:6666端口a1.sources.r1.type = netcata1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 6666

(2)exec--执行命令输出作为数据源

常用属性:type:需指定为 execcommand:运行的命令shell:运行名为所需的shell,如 /bin/bash -c例子:监控文件的新增内容a1.sources.r1.type = execa1.sources.r1.command = tail -F /var/log/securea1.sourcesr.r1.shell = /bin/bash -c

(3)spooldir--监控目录内容

常用的属性:type:设置为 spooldirspoolDir:监控的目录路径fileSuffix:上传完成的文件加上指定的后缀,默认是 .COMPLETEDfileHeader:是否在event的header添加一个key标明该文件的绝对路径,默认为falseignorePattern:正则匹配,忽略的文件还有其他很多参数,具体到官网上看吧例子:a3.sources.r3.type = spooldira3.sources.r3.spoolDir = /opt/module/flume1.8.0/uploada3.sources.r3.fileSuffix = .COMPLETEDa3.sources.r3.fileHeader = true#忽略所有以.tmp结尾的文件,不上传a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

(4)avro--flume之间串联的中间格式

这个源比较特别,通常用在上一个flume的sink 输出,然后作为下一个flume的输入的格式。

常用的属性:type:需指定为  avrobind:监听的主机名或者ip,只能是agent所在主机的ip或者hostnameport:监听的端口例子:a1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 4141

(5)TAILDIR--监控文件或者目录内容变化(1.7以及之后才有)

​ spoolDir有一个bug,就是已经上传完成的文件,不能再追加内容,否则会报错,而且也无法读取到新的文件内容。所以spooldir只能用来监控目录下新文件的变化,没办法监控已有文件的内容变化。以往这种情况只能使用 exec源,然后使用tail -f xxxlog 的方式来监听文件内容变化,但是这种方式有缺陷,就是容易丢失数据。而在flume1.7之后有一个新的source,叫TAILDIR,可以直接监听文件变化的内容。看看用法:

常用属性:type:TAILDIR ,记住,要全部大写filegroups:要监听的文件组的名字,可以有多个文件组filegroups.:指定文件组的包含哪些文件,可以使用扩展正则表达式,这里可以有的小技巧 /path/.*  这样就可以监听目录下的所有文件内容的变化positionFile:这个文件json格式记录了目录下每个文件的inode,以及pos偏移量fileHeader:是否添加header属性过多,可以当官网看:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#spooling-directory-source例子:a1.sources = r1a1.channels = c1a1.sources.r1.type = TAILDIRa1.sources.r1.channels = c1a1.sources.r1.positionFile = /var/log/flume/taildir_position.jsona1.sources.r1.filegroups = f1 f2  有两个文件组# 文件组1内容a1.sources.r1.filegroups.f1 = /var/log/test1/example.loga1.sources.r1.headers.f1.headerKey1 = value1# 使用正则表达式指定文件组a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*a1.sources.r1.headers.f2.headerKey1 = value2a1.sources.r1.headers.f2.headerKey2 = value2-2a1.sources.r1.fileHeader = truea1.sources.ri.maxBatchCount = 1000

下面再说说上面说到的 positionFile 这个东东,看看它的格式:

[{"inode":408241856,"pos":27550,"file":"/opt/modules/apache-flume-1.8.0-bin/logs/flume.log.COMPLETED"},{"inode":406278032,"pos":0,"file":"/opt/modules/apache-flume-1.8.0-bin/logs/words.txt.COMPLETED"},{"inode":406278035,"pos":0,"file":"/opt/modules/apache-flume-1.8.0-bin/logs/words.txt"},{"inode":406278036,"pos":34,"file":"/opt/modules/apache-flume-1.8.0-bin/logs/test.txt"}]分析:1、每个文件都是一个json串,由多个json串组成一个类似于数组的东西。2、每个json包含内容有:    inode:这个什么意思就自己具体看看文件系统的基本知识吧    pos:开始监听文件内容的起始偏移量    file:文件绝对路径名3、小技巧:(1)如果监听目录时,某些文件已存在,那么flume默认是从文件最后作为监听起始点进行监听。当文件内容更新时,flume会获取,然后sink。接着就会更新pos值。所以因为这个特点,就算flume agent突然崩了,下一次启动时,自动从上次崩溃的pos开始监听,而不是从最新的文件末尾开始监听。这样就不会丢失数据了,而且不会重复读取旧数据。(2)从(1)可知,pos就是实时更新的一个文件内容监听点,如果我们想文件从头开始监听,有时候有需求,需要将监听目录下的文件全部传输一边。这时候很简单,将json文件中的pos改为0就好了。4、如果没有指定positionFile路径,默认为/USER_HOME/.flume/taildir_position.json

4、常用channel类型

(1)memory--用内存作为暂存空间

常用的属性:type:需指定为  memorycapacity:存储在channel中event数量的最大值transactionCapacity:一次传输的event的最大数量 例子:a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100

(2)file--使用磁盘文件作为暂存空间

常用的属性:type:需指定为  filecheckpointDir:存储checkpoint文件的目录dataDirs:存储数据的目录例子:a1.channels.c1.type = filea1.channels.c1.checkpointDir = /mnt/flume/checkpointa1.channels.c1.dataDirs = /mnt/flume/data

(3)SPILLABLEMEMORY--文件+内存作为暂存空间

这个类型是将内存+文件作为channel,当容量空间超过内存时就写到文件中常用的属性:type:指定为 SPILLABLEMEMORYmemoryCapacity:使用内存存储的event的最大数量overflowCapacity:存储到文件event的最大数量byteCapacity:使用内存存储的event的最大容量,单位是 bytescheckpointDir:存储checkpoint文件的目录dataDirs:存储数据的目录例子:a1.channels.c1.type = SPILLABLEMEMORYa1.channels.c1.memoryCapacity = 10000a1.channels.c1.overflowCapacity = 1000000a1.channels.c1.byteCapacity = 800000a1.channels.c1.checkpointDir = /mnt/flume/checkpointa1.channels.c1.dataDirs = /mnt/flume/data

(4)kafka--作为channel

生产环境中,flume+kafka也是常用的技术栈,但是一般是将kafka作为sink目标

常用属性:type:设置为 org.apache.flume.channel.kafka.KafkaChannelbootstrap.servers:kafka集群的服务器, ip:port,ip2:port,....topic:kafka中的topicconsumer.group.id:消费者的groupid例子:a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092a1.channels.channel1.kafka.topic = channel1a1.channels.channel1.kafka.consumer.group.id = flume-consumer

5、常用sink类型

(1)logger--直接作为log信息输出

常用属性:type:logger例子:a1.sinks.k1.type = logger

这个类型比较简单,一般用于调试时使用

(2)avro--串联flume的中间格式

这个类型主要就是用来给下一个flume作为输入的格式,是字节流的方式,而且是序列化的序列。

常用属性:type:avrohostname:输出目标的主机名或者ip,可以任意主机,不局限于本机ip:输出到的端口例子:a1.sinks.k1.type = avroa1.sinks.k1.hostname = 10.10.10.10a1.sinks.k1.port = 4545

(3)hdfs--直接写入到hdfs

常用属性:type:hdfshdfs.path:存储路径 , hdfs://namenode:port/PATHhdfs.filePrefix:上传的文件的前缀(额外加上的)hdfs.round:是否按时间滚动文件夹hdfs.roundValue:滚动的时间值hdfs.roundUnit:滚动的时间的单位hdfs.userLocalTimeStamp:是否使用本地时间戳,true还是falsehdfs.batchSize:积攒多少个event才flush到hdfs 一次hdfs.fileType:文件类型,DataStream(普通文件),SequenceFile(二进制格式,默认),CompressedStream(压缩格式)hdfs.rollInterval:多久生成一个新的文件,单位是秒hdfs.rollSize:文件滚动大小,单位是 byteshdfs.rollCount:文件滚动是否与event数量有关,true 还是falsehdfs.minBlockReplicas:最小副本数例子:#指定sink的类型为存储在hdfs中a2.sinks.k2.type = hdfs# 路径命名为按小时a2.sinks.k2.hdfs.path = hdfs://bigdata121:9000/flume/%H#上传文件的前缀a2.sinks.k2.hdfs.filePrefix = king-#是否按照时间滚动文件夹a2.sinks.k2.hdfs.round = true#多少时间单位创建一个新的文件夹a2.sinks.k2.hdfs.roundValue = 1#重新定义时间单位a2.sinks.k2.hdfs.roundUnit = hour#是否使用本地时间戳a2.sinks.k2.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a2.sinks.k2.hdfs.batchSize = 1000#设置文件类型,可支持压缩a2.sinks.k2.hdfs.fileType = DataStream#多久生成一个新的文件,单位是秒a2.sinks.k2.hdfs.rollInterval = 600#设置每个文件的滚动大小,单位是bytesa2.sinks.k2.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a2.sinks.k2.hdfs.rollCount = 0#最小副本数a2.sinks.k2.hdfs.minBlockReplicas = 1

(4)file_roll--存储到本地文件系统

常用属性:type:file_rollsink.directory:存储路径例子:a1.sinks.k1.type = file_rolla1.sinks.k1.sink.directory = /var/log/flum

(5)kafka--存储到kafka集群中

常用属性:tpye:org.apache.flume.sink.kafka.KafkaSinkkafka.topic:kafka话题名kafka.bootstrap.servers:集群服务器列表,以逗号分隔kafka.flumeBatchSize:刷写到kafka的event数量kafka.producer.acks:接收到时返回ack信息时,写入的最少的副本数kafka.producer.compression.type:压缩类型例子:a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = mytopica1.sinks.k1.kafka.bootstrap.servers = localhost:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1a1.sinks.k1.kafka.producer.compression.type = snappy

6、拦截器interceptors 常用类型

拦截器interceptors并不是必须的,它是工作在source和channel之间的一个组件,用于过滤source来的数据,并输出到channel。
使用格式:

先指定拦截器的名字,然后对每个拦截器进行工作属性配置.sources..interceptors = .sources..interceptors.. = xxxx

(1)timestamp时间戳拦截器

在event 的header中添加一个字段,用于标明时间戳如:headers:{timestamp:111111}。

常用属性:type:timestampheaderName:在header中的key名字,默认是 timestamp例子:a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = timestamp

(2)host主机名拦截器

在event 的header中添加一个字段,用于标明host戳,如:headers:{host:bigdata121}。

常用属性:type:hosthostHeader:在header中的key名字,默认是 hostuseIP:用ip还是主机名例子:a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = host

(3)UUID拦截器

在event 的header中添加一个字段,用于标明uuid如:headers:{id:111111}。

常用属性:type:org.apache.flume.sink.solr.morphline.UUIDInterceptor$BuilderheadName:在header中的key名字,默认是 idprefix:给每个UUID添加前缀

(4)search_replace查询替换

使用正则匹配,然后替换指定字符

常用属性:type:search_replacesearchPattern:匹配的正则replaceString:替换的字符串charset:字符集,默认UTF-8例子:删除特定字符开头的字符串a1.sources.avroSrc.interceptors = search-replacea1.sources.avroSrc.interceptors.search-replace.type = search_replacea1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+a1.sources.avroSrc.interceptors.search-replace.replaceString =

(5)regex_filter正则过滤

正则匹配,匹配到的丢弃或者留下

常用属性:type:regex_filterregex:正则excludeEvents:true为过滤掉匹配的,false为留下匹配的例子:a1.sources.r1.interceptors.i1.type = regex_filtera1.sources.r1.interceptors.i1.regex = ^A.*#如果excludeEvents设为false,表示过滤掉不是以A开头的events。如果excludeEvents设为true,则表示过滤掉以A开头的events。a1.sources.r1.interceptors.i1.excludeEvents = true

(6) regex_extractor正则抽取

这里其实是利用正则的分组匹配来获取多个匹配组,然后将每个组的匹配值存储到header中,key可以自定义。

a1.sources.r1.type = execa1.sources.r1.channels = c1a1.sources.r1.command = tail -F /opt/Andya1.sources.r1.interceptors = i1# 指定类型为 regex_extractora1.sources.r1.interceptors.i1.type = regex_extractor# 分组匹配的正则a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)# 两个分组各自的key别名a1.sources.r1.interceptors.i1.serializers = s1 s2# 分别设置key的名字a1.sources.r1.interceptors.i1.serializers.s1.name = cookieida1.sources.r1.interceptors.i1.serializers.s2.name = ip

(7)自定义拦截器

继承接口 org.apache.flume.interceptor.Interceptor,实现里面的特定方法,如:

import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;import java.util.List;public class MyInterceptor implements Interceptor {    @Override    public void initialize() {    }    @Override    public void close() {    }    /**     * 拦截source发送到通道channel中的消息     * 处理单个event     * @param event 接收过滤的event     * @return event    根据业务处理后的event     */    @Override    public Event intercept(Event event) {        // 获取事件对象中的字节数据        byte[] arr = event.getBody();        // 将获取的数据转换成大写        event.setBody(new String(arr).toUpperCase().getBytes());        // 返回到消息中        return event;    }    // 处理event集合    @Override    public List intercept(List events) {        List list = new ArrayList<>();        for (Event event : events) {            list.add(intercept(event));        }        return list;    }    //用来返回拦截器对象    public static class Builder implements Interceptor.Builder {        // 获取配置文件的属性        @Override        public Interceptor build() {            return new MyInterceptor();        }        @Override        public void configure(Context context) {        }    }

pom.xml依赖

                            org.apache.flume            flume-ng-core            1.8.0            

在 agent的配置文件中指定拦截器

a1.sources.r1.interceptors = i1#全类名$Buildera1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder

运行命令:

bin/flume-ng agent -c conf/ -n a1 -f jar/ToUpCase.conf -C jar/Flume_Andy-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console-C 指定额外的jar包的路径,就是我们自己写的拦截器的jar包

也可以将jar包放到flume程序目录的lib目录下

三、flume案例

1、读取文件到hdfs

# 1.定义agent的名字a2.以及定义这个agent中的source,sink,channel的名字a2.sources = r2a2.sinks = k2a2.channels = c2#2.定义Source,定义数据来源# 定义source类型是exec,执行命令的方式a2.sources.r2.type = exec# 命令a2.sources.r2.command = tail -F /tmp/access.log# 使用的shella2.sources.r2.shell = /bin/bash -c#3.定义sink#指定sink的类型为存储在hdfs中a2.sinks.k2.type = hdfs# 路径命名为按小时a2.sinks.k2.hdfs.path = hdfs://bigdata121:9000/flume/%H#上传文件的前缀a2.sinks.k2.hdfs.filePrefix = king-#是否按照时间滚动文件夹a2.sinks.k2.hdfs.round = true#多少时间单位创建一个新的文件夹a2.sinks.k2.hdfs.roundValue = 1#重新定义时间单位a2.sinks.k2.hdfs.roundUnit = hour#是否使用本地时间戳a2.sinks.k2.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a2.sinks.k2.hdfs.batchSize = 1000#设置文件类型,可支持压缩a2.sinks.k2.hdfs.fileType = DataStream#多久生成一个新的文件,单位是秒a2.sinks.k2.hdfs.rollInterval = 600#设置每个文件的滚动大小,单位是bytesa2.sinks.k2.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a2.sinks.k2.hdfs.rollCount = 0#最小副本数a2.sinks.k2.hdfs.minBlockReplicas = 1# 4.定义Channel,类型、容量限制、传输容量限制 a2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 100# 5.链接,通过channel将source和sink连接起来a2.sources.r2.channels = c2a2.sinks.k2.channel = c2

启动flume-agent:

/opt/module/flume1.8.0/bin/flume-ng agent \--conf /opt/module/flume1.8.0/conf/ \   flume配置目录--name a2 \                             agent名字--conf-file /opt/module/flume1.8.0/jobconf/flume-hdfs.conf  agent配置-Dflume.root.logger=INFO,console          打印日志到终端

2、多flume联合,一对多

flume1:输出到flume2和flume3
flume2:输出到本地文件
flume3:输出到hdfs

flume1.conf

# Name the components on this agenta1.sources = r1a1.sinks = k1 k2a1.channels = c1 c2# 将数据流复制给多个channel。启动复制模式a1.sources.r1.selector.type = replicating# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /opt/testa1.sources.r1.shell = /bin/bash -c# 这是k1 sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = bigdata111a1.sinks.k1.port = 4141# 这是k2 sinka1.sinks.k2.type = avroa1.sinks.k2.hostname = bigdata111a1.sinks.k2.port = 4142# Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.channels.c2.capacity = 1000a1.channels.c2.transactionCapacity = 100# 给source接入连接两个channel.每个channel对应一个sinka1.sources.r1.channels = c1 c2a1.sinks.k1.channel = c1a1.sinks.k2.channel = c2

flume2.conf

# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = c1# Describe/configure the sourcea2.sources.r1.type = avroa2.sources.r1.bind = bigdata111a2.sources.r1.port = 4141# Describe the sinka2.sinks.k1.type = hdfsa2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H#上传文件的前缀a2.sinks.k1.hdfs.filePrefix = flume2-#是否按照时间滚动文件夹a2.sinks.k1.hdfs.round = true#多少时间单位创建一个新的文件夹a2.sinks.k1.hdfs.roundValue = 1#重新定义时间单位a2.sinks.k1.hdfs.roundUnit = hour#是否使用本地时间戳a2.sinks.k1.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a2.sinks.k1.hdfs.batchSize = 100#设置文件类型,可支持压缩a2.sinks.k1.hdfs.fileType = DataStream#多久生成一个新的文件a2.sinks.k1.hdfs.rollInterval = 600#设置每个文件的滚动大小大概是128Ma2.sinks.k1.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a2.sinks.k1.hdfs.rollCount = 0#最小副本数a2.sinks.k1.hdfs.minBlockReplicas = 1# Describe the channela2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = c1

flume3.conf

# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = c1# Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = bigdata111a3.sources.r1.port = 4142# Describe the sinka3.sinks.k1.type = file_roll#备注:此处的文件夹需要先创建好a3.sinks.k1.sink.directory = /opt/flume3# Describe the channela3.channels.c1.type = memorya3.channels.c1.capacity = 1000a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela3.sources.r1.channels = c1a3.sinks.k1.channel = c1

启动时,先启动flume2和flume3,最后启动flume1。启动命令不重复了。

3、多flume联合,多对一

多台server产生的日志,需要各自监控,然后汇总起来存储,这种场景很多。
flume1(监听文件)和flume2(监听端口)各自收集数据,然后分别sink到flume3,flume3负责汇总写入hdfs
flume1.conf

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /opt/Andya1.sources.r1.shell = /bin/bash -c# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = bigdata111a1.sinks.k1.port = 4141# Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

flume2.conf

# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = c1# Describe/configure the sourcea2.sources.r1.type = netcata2.sources.r1.bind = bigdata111a2.sources.r1.port = 44444# Describe the sinka2.sinks.k1.type = avroa2.sinks.k1.hostname = bigdata111a2.sinks.k1.port = 4141# Use a channel which buffers events in memorya2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = c1

flume3.conf

# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = c1# Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = bigdata111a3.sources.r1.port = 4141# Describe the sinka3.sinks.k1.type = hdfsa3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H#上传文件的前缀a3.sinks.k1.hdfs.filePrefix = flume3-#是否按照时间滚动文件夹a3.sinks.k1.hdfs.round = true#多少时间单位创建一个新的文件夹a3.sinks.k1.hdfs.roundValue = 1#重新定义时间单位a3.sinks.k1.hdfs.roundUnit = hour#是否使用本地时间戳a3.sinks.k1.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a3.sinks.k1.hdfs.batchSize = 100#设置文件类型,可支持压缩a3.sinks.k1.hdfs.fileType = DataStream#多久生成一个新的文件a3.sinks.k1.hdfs.rollInterval = 600#设置每个文件的滚动大小大概是128Ma3.sinks.k1.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a3.sinks.k1.hdfs.rollCount = 0#最小冗余数a3.sinks.k1.hdfs.minBlockReplicas = 1# Describe the channela3.channels.c1.type = memorya3.channels.c1.capacity = 1000a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela3.sources.r1.channels = c1a3.sinks.k1.channel = c1

启动时先启动flume3,然后启动flume1和flume2

$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf

测试可以通过 telnet bigdata111 44444 端口来发送数据
可以在/opt/Andy文件中追加数据

文件 数据 属性 常用 类型 时间 监听 例子 单位 目录 存储 内容 配置 就是 格式 正则 拦截器 数量 文件夹 传输 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 搜索网络安全手抄报 软件开发面试有几面 湖南商祺网络技术有限公司 数据库登陆的ip地址是什么 代还软件开发图片 江西省青少年网络安全颁奖典礼 山东世界技能大赛网络安全 大数据和网络技术有什么区别 景洪软件开发公司 互联网科技摄影 数据库连接关闭方法 数据库的字符集是啥 湖南省学电脑软件开发月薪 lol游戏环境最好的服务器 如何找到靠谱的软件开发公司 服务器供应链管理方法 北汽eu5酷我音乐服务器错误 每台服务器都需要中间件么 数据库添加不成功要怎么看原因 地下管廊管理服务器 vss数据库 如何破解 海淀区智能网络技术诚信合作 浙江应用软件开发有哪些 数字媒体与网络技术系学什么 公安部网络安全保卫局姚伟 网络安全开场视频 攀枝花展厅互动软件开发公司 软件开发外包 深信和丰软件 北汽eu5酷我音乐服务器错误 ibm服务器槽位有问题
0