Flume 入门
1Flume
概述
1.1 定义
Flume
是Cloudera
提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统;
Flume
基于流式架构,灵活简单。
1.2 特点
可以和任意存储进程集成
输入的的数据速率大于写入目的存储的速率,Flume
会进行缓冲,减小HDFS
的压力
Flume
中的事务基于Channel
,使用了两个事务模型(sender
+ receiver
),确保消息被可靠发送
Flume
使用两个独立的事务分别负责从Soucrce
到Channel
,以及从Channel
到Sink
的事件传递。一旦事务中所有的数据全部成功提交到Channel
,那么Source
才认为该数据读取完成,同理,只有成功被Sink
写出去的数据,才会从Channel
中移除
1.3 组成架构
1.3.1Agent
Agent
是一个JVM
进程,它以事件的形式将数据从源头传递到目的地
Agent
主要由Source
、Channel
、Sink
组成
1.3.2Source
Source
是负责接收数据到Agent
的组件,可以处理各种类型,包括avro
、thrift
、exec
、jms
、spooling directory
、netcat
、sequence generator
、syslog
、http
、legacy
1.3.3Channel
Channel
是位于Source
和Sink
之间的缓冲区,因此,Channel
允许Source
和Sink
运作在不同的速率上,Channel
是线程安全的,可以同时处理几个Source
的写入操作和几个Sink
的读取操作。
Flume
自带两种Channel
:
Memory Channel
:内存中的队列速度快,适合在不需要关系数据丢失的情境下使用
File Channel
:将所有事件写入磁盘,因此在程序关闭或机器宕机的情况下不会丢失数据
1.3.4Sink
Sink
不断地轮询Channel
中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent
。
Sink
是完全事务性的,在从Channel
批量删除数据之前,每个Sink
用Channel
启动一个事务,批量事件一旦成功写出到存储系统或下一个Flume Agent
,Sink
就利用Channel
提交事务,事务一旦被提交,该Channel
从自己的内部缓冲区删除事件。
Sink
组件目的地包括hdfs
、logger
、avro
、thrift
、ipc
、file
、null
、HBase
、solr
、自定义。
1.3.5Event
传输单元,Flume
数据传输的基本单元,以事件的形式将数据从源头送至目的地。
Event
由可选的header
和载有数据的一个byte array
构成,Header
是容纳了key-value
字符串对的HashMap
。
通常一条数据就是一个 Event
,每2048
个字节划分一个Event
。
1.4 拓扑结构
这种模式是将多个Flume
给顺序连接起来了,从最初的Source
开始到最终Sink
传送的目的存储系统,此模式不建议桥接过多的Flume
数量, Flume
数量过多不仅会影响传输速率,而且一旦传输过程中某个节点Flume
宕机,会影响整个传输系统。
Flum
支持将事件流向一个或者多个目的地,这种模式将数据源复制到多个Channel
中,每个Channel
都有相同的数据,Sink
可以选择传送的不同的目的地。
Flume
支持使用将多个Sink
逻辑上分到一个Sink
组,Flume
将数据发送到不同的Sink
,主要解决负载均衡和故障转移问题。
这种模式是我们最常见的,也非常实用,日常web
应用通常分布在上百个服务器,大者甚至上千个、上万个服务器,产生的日志,处理起来也非常麻烦,用Flume
的这种组合方式能很好的解决这一问题,每台服务器部署一个Flume
采集日志,传送到一个集中收集日志的Flume
,再由此Flume
上传到 hdfs
、hive
、hbase
、jms
等进行日志分析。
1.5Agent
原理
2Flume
部署
1、解压apache-flume-1.7.0-bin.tar.gz
到/opt/module
目录下
2、修改apache-flume-1.7.0-bi
的名称为flume
3、将flume/conf
下的flume-env.sh.template
文件修改为flume-env.sh
,并配置flume-env.sh
中的JAVA_HOME
3 企业开发案例
3.1 监控端口数据
需求分析:
服务端监听本机44444
端口
服务端使用netcat
工具向44444
端口发送消息
最后将数据展示在控制台上
实现步骤:
1、在job
文件夹下创建Agent
配置文件flume-netcat-logger.conf
[djm@hadoop102 job]$ vim flume-netcat-logger.conf
2、添加如下内容:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
3、启动任务
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console
参数说明:
--conf conf/
表示配置文件存储在conf/
目录
--name a1
表示给 Agent 起名为a1
--conf-file job/flume-netcat.conf Flume
本次启动读取的配置文件是在job
文件夹下的 flume-telnet.conf
文件
-Dflume.root.logger==INFO,console -D
表示Flume
运行时动态修改flume.root.logger
参数属性值,并将控制台日志打印级别设置为INFO
级别
3.2 实时读取本地文件到HDFS
需求分析:
实时监控Hive
日志,并上传到HDFS
中
实现步骤:
1、在job
文件夹下创建Agent
配置文件flume-file-hdfs.conf
[djm@hadoop102 job]$ vim flume-file-hdfs.conf
2、添加如下内容:
# Name the components on this agenta2.sources = r2a2.sinks = k2a2.channels = c2# Describe/configure the sourcea2.sources.r2.type = execa2.sources.r2.command = tail -F /opt/module/hive/logs/hive.loga2.sources.r2.shell = /bin/bash -c# Describe the sinka2.sinks.k2.type = hdfsa2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H#上传文件的前缀a2.sinks.k2.hdfs.filePrefix = logs-#是否按照时间滚动文件夹a2.sinks.k2.hdfs.round = true#多少时间单位创建一个新的文件夹a2.sinks.k2.hdfs.roundValue = 1#重新定义时间单位a2.sinks.k2.hdfs.roundUnit = hour#是否使用本地时间戳a2.sinks.k2.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a2.sinks.k2.hdfs.batchSize = 1000#设置文件类型,可支持压缩a2.sinks.k2.hdfs.fileType = DataStream#多久生成一个新的文件a2.sinks.k2.hdfs.rollInterval = 60#设置每个文件的滚动大小a2.sinks.k2.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a2.sinks.k2.hdfs.rollCount = 0# Use a channel which buffers events in memorya2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2
3、启动任务
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/flume-file-hdfs.conf
注意:
要想读取Linux
系统中的文件,就得按照Linux
命令的规则执行命令,由于Hive
日志在Linux
系统中所以读取文件的类型选择:exec
即execute
执行的意思。表示执行Linux
命令来读取文件。
3.3 实时读取目录文件到 HDFS
需求分析:
使用Flume
监听整个目录的文件
实现步骤:
1、在job
文件夹下创建Agent
配置文件flume-dir-hdfs.conf
[djm@hadoop102 job]$ vim flume-dir-hdfs.conf
2、添加如下内容:
a3.sources = r3a3.sinks = k3a3.channels = c3# Describe/configure the sourcea3.sources.r3.type = spooldira3.sources.r3.spoolDir = /opt/module/flume/uploada3.sources.r3.fileSuffix = .COMPLETEDa3.sources.r3.fileHeader = true#忽略所有以.tmp结尾的文件,不上传a3.sources.r3.ignorePattern = ([^ ]*\.tmp)# Describe the sinka3.sinks.k3.type = hdfsa3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H#上传文件的前缀a3.sinks.k3.hdfs.filePrefix = upload-#是否按照时间滚动文件夹a3.sinks.k3.hdfs.round = true#多少时间单位创建一个新的文件夹a3.sinks.k3.hdfs.roundValue = 1#重新定义时间单位a3.sinks.k3.hdfs.roundUnit = hour#是否使用本地时间戳a3.sinks.k3.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a3.sinks.k3.hdfs.batchSize = 100#设置文件类型,可支持压缩a3.sinks.k3.hdfs.fileType = DataStream#多久生成一个新的文件a3.sinks.k3.hdfs.rollInterval = 60#设置每个文件的滚动大小大概是128Ma3.sinks.k3.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a3.sinks.k3.hdfs.rollCount = 0# Use a channel which buffers events in memorya3.channels.c3.type = memorya3.channels.c3.capacity = 1000a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3
3、启动任务
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/flume-dir-hdfs.conf
注意:
不要在监控目录中创建并持续修改文件
3.4 单数据源多出口案例(选择器)
需求分析:
使用Flume-1
监控文件变动,Flume-1
将变动内容传递给Flume-2
Flume-2
负责存储到HDFS
同时Flume-1
将变动内容传递给Flume-3
,Flume-3
负责输出到Local FileSystem
1、在group1
文件夹下创建Agent
配置文件flume-file-flume.conf
[djm@hadoop102 group1]$ vim flume-file-flume.conf
2、添加如下内容:
# Name the components on this agenta1.sources = r1a1.sinks = k1 k2a1.channels = c1 c2# 将数据流复制给所有channela1.sources.r1.selector.type = replicating# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /opt/module/hive/logs/hive.loga1.sources.r1.shell = /bin/bash -c# Describe the sink# sink端的avro是一个数据发送者a1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141a1.sinks.k2.type = avroa1.sinks.k2.hostname = hadoop102a1.sinks.k2.port = 4142# Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.channels.c2.capacity = 1000a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1 c2a1.sinks.k1.channel = c1a1.sinks.k2.channel = c2
3、在group1
文件夹下创建Agent
配置文件flume-flume-hdfs.conf
[djm@hadoop102 group1]$ vim flume-flume-hdfs.conf
4、添加如下内容:
# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = c1# Describe/configure the source# source端的avro是一个数据接收服务a2.sources.r1.type = avroa2.sources.r1.bind = hadoop102a2.sources.r1.port = 4141# Describe the sinka2.sinks.k1.type = hdfsa2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H#上传文件的前缀a2.sinks.k1.hdfs.filePrefix = flume2-#是否按照时间滚动文件夹a2.sinks.k1.hdfs.round = true#多少时间单位创建一个新的文件夹a2.sinks.k1.hdfs.roundValue = 1#重新定义时间单位a2.sinks.k1.hdfs.roundUnit = hour#是否使用本地时间戳a2.sinks.k1.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a2.sinks.k1.hdfs.batchSize = 100#设置文件类型,可支持压缩a2.sinks.k1.hdfs.fileType = DataStream#多久生成一个新的文件a2.sinks.k1.hdfs.rollInterval = 600#设置每个文件的滚动大小大概是128Ma2.sinks.k1.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a2.sinks.k1.hdfs.rollCount = 0# Describe the channela2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = c1
5、在group1
文件夹下创建 Agent 配置文件flume-flume-dir.conf
[djm@hadoop102 group1]$ vim flume-flume-dir.conf
6、添加如下内容:
# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = c2# Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = hadoop102a3.sources.r1.port = 4142# Describe the sinka3.sinks.k1.type = file_rolla3.sinks.k1.sink.directory = /opt/module/data/flume3# Describe the channela3.channels.c2.type = memorya3.channels.c2.capacity = 1000a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channela3.sources.r1.channels = c2a3.sinks.k1.channel = c2
7、启动任务
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
注意:
Avro
是一种语言无关的数据序列化和RPC
框架
输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录
必须先启动Sink
存在的job
3.5 单数据源多出口案例(Sink
组)
需求分析:
使用Flume-1
监控端口数据,Flume-1
将变动内容传递给Flume-2
Flume-2
负责将数据展示在控制台上
同时Flume-1
将变动内容传递给Flume-3
,Flume-3
也负责将数据展示在控制台上
实现步骤:
1、在group2
文件夹下创建Agent
配置文件flume-netcat-flume.conf
2、添加如下内容:
# Name the components on this agenta1.sources = r1a1.channels = c1a1.sinkgroups = g1a1.sinks = k1 k2# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444a1.sinkgroups.g1.processor.type = load_balancea1.sinkgroups.g1.processor.backoff = truea1.sinkgroups.g1.processor.selector = round_robina1.sinkgroups.g1.processor.selector.maxTimeOut=10000# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop102a1.sinks.k1.port = 4141a1.sinks.k2.type = avroa1.sinks.k2.hostname = hadoop102a1.sinks.k2.port = 4142# Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinkgroups.g1.sinks = k1 k2a1.sinks.k1.channel = c1a1.sinks.k2.channel = c1
3、在group2
文件夹下创建Agent
配置文件flume-flume-console1.conf
# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = c1# Describe/configure the sourcea2.sources.r1.type = avroa2.sources.r1.bind = hadoop102a2.sources.r1.port = 4141# Describe the sinka2.sinks.k1.type = logger# Describe the channela2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = c1
5、在 group2
文件夹下创建Agent
配置文件flume-flume-console2.conf
6、添加如下内容:
# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = c2# Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = hadoop102a3.sources.r1.port = 4142# Describe the sinka3.sinks.k1.type = logger# Describe the channela3.channels.c2.type = memorya3.channels.c2.capacity = 1000a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channela3.sources.r1.channels = c2a3.sinks.k1.channel = c2
7、启动任务
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
3.6 多数据源汇总
需求分析:
hadoop103
上的Flume-1
监控文件/opt/module/group.log
hadoop102
上的Flume-2
监控某一个端口的数据流
Flume-1
与Flume-2
将数据发送给hadoop104
上的Flume-3
,Flume-3
将最终数据打印到控制台
实现步骤:
1、在group3
文件夹下创建Agent
配置文件flume1-logger-flume.conf
[djm@hadoop102 group3]$ vim flume1-logger-flume.conf
2、添加如下内容:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /opt/module/group.loga1.sources.r1.shell = /bin/bash -c# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop104a1.sinks.k1.port = 4141# Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
3、在group3
文件夹下创建Agent
配置文件flume2-netcat-flume.conf
[djm@hadoop102 group3]$ vim flume2-netcat-flume.conf
4、添加如下内容:
# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = c1# Describe/configure the sourcea2.sources.r1.type = netcata2.sources.r1.bind = hadoop102a2.sources.r1.port = 44444# Describe the sinka2.sinks.k1.type = avroa2.sinks.k1.hostname = hadoop104a2.sinks.k1.port = 4141# Use a channel which buffers events in memorya2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = c1
5、在group3
文件夹下创建Agent
配置文件flume3-flume-logger.conf
[djm@hadoop102 group3]$ vim flume3-flume-logger.conf
6、添加如下内容:
# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = c1# Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = hadoop104a3.sources.r1.port = 4141# Describe the sink# Describe the sinka3.sinks.k1.type = logger# Describe the channela3.channels.c1.type = memorya3.channels.c1.capacity = 1000a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela3.sources.r1.channels = c1a3.sinks.k1.channel = c1
7、分发配置文件
[djm@hadoop102 group3]$ xsync /opt/module/flume/job
8、启动任务
[djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf[djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf
4Ganglia
部署
1、安装httpd
服务与php
yum -y install httpd php
2、安装其他依赖
yum -y install rrdtool perl-rrdtool rrdtool-devel
3、安装ganglia
rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpmyum -y install ganglia-gmetad ganglia-gmond ganglia-web
4、修改ganglia
配置文件
vim /etc/httpd/conf.d/ganglia.conf
## Ganglia monitoring system php web frontend#Alias /ganglia /usr/share/ganglia # Require local Require all granted # Require ip 10.1.2.3 # Require host example.org
特别注意:以下配置是不能起作用的
Order deny,allow Allow from all
5、修改gmetad
配置文件
vim /etc/ganglia/gmetad.conf
data_source "hadoop102" 192.168.1.102
6、修改gmond
配置文件
vim /etc/ganglia/gmond.conf
cluster { #name = "unspecified" name = "hadoop102" owner = "unspecified" latlong = "unspecified" url = "unspecified"}udp_send_channel { #bind_hostname = yes # Highly recommended, soon to be default. # This option tells gmond to use a source address# that resolves to the machine's hostname. Without# this, the metrics may appear to come from any# interface and the DNS names associated with# those IPs will be used to create the RRDs. #mcast_join = 239.2.11.71 host = 192.168.10.102 port = 8649 ttl = 1}/* You can specify as many udp_recv_channels as you like as well. */udp_recv_channel { #mcast_join = 239.2.11.71 port = 8649 #bind = 239.2.11.71 bind = 192.168.10.102 retry_bind = true # Size of the UDP buffer. If you are handling lots of metrics you really# should bump it up to e.g. 10MB or even higher.# buffer = 10485760}
6、查看SELinux
状态
sestatus
如果不是disabled
,需修改以下配置文件:
vim /etc/selinux/config
或者临时关闭SELinux
:
setenforce 0
7、启动ganglia
systemctl start httpdsystemctl start gmetad systemctl start gmond
8、打开浏览器访问
http://hadoop102/ganglia/
如果完成以上操作仍出现权限不足错误,可修改/var/lib/ganglia
目录的权限尝试
chmod -R 777 /var/lib/ganglia
5 自定义Source
需求分析:
编码实现:
1、引入依赖
org.apache.flume flume-ng-core 1.7.0
2、代码编写
package com.djm.flume;import org.apache.flume.Context;import org.apache.flume.EventDeliveryException;import org.apache.flume.PollableSource;import org.apache.flume.conf.Configurable;import org.apache.flume.event.SimpleEvent;import org.apache.flume.source.AbstractSource;import java.util.HashMap;public class MySource extends AbstractSource implements Configurable, PollableSource { //定义配置文件将来要读取的字段 private Long delay; private String field; /** * 接收数据,将数据封装成一个个event,写入channel * @return * @throws EventDeliveryException */ public Status process() throws EventDeliveryException { HashMap hearderMap = new HashMap<>(); SimpleEvent event = new SimpleEvent(); try { for (int i = 0; i < 5; i++) { event.setHeaders(hearderMap); event.setBody((field + i).getBytes()); getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (InterruptedException e) { e.printStackTrace(); return Status.BACKOFF; } return Status.READY; } public long getBackOffSleepIncrement() { return 0; } public long getMaxBackOffSleepInterval() { return 0; } /** * 读取配置文件 * @param context */ public void configure(Context context) { delay = context.getLong("delay"); field = context.getString("field", "hello"); }}
3、打包测试
利用Maven
打包并上传到 /opt/module/flume/lib
目录下
在job
文件夹下创建Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysource.conf
添加如下内容:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = com.djm.flume.MySourcea1.sources.r1.delay = 1000a1.sources.r1.field = djm# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动任务
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
6 自定义Sink
需求分析:
编码实现:
1、引入依赖
org.apache.flume flume-ng-core 1.7.0
2、代码编写
package com.djm.flume;import org.apache.flume.*;import org.apache.flume.conf.Configurable;import org.apache.flume.sink.AbstractSink;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable { private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class); private String prefix; private String suffix; @Override public Status process() throws EventDeliveryException { Status status = null; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); try { Event event; transaction.begin(); while ((event = channel.take()) == null) { Thread.sleep(200); } LOG.info(prefix + new String(event.getBody()) + suffix); transaction.commit(); status = Status.READY; } catch (Throwable e) { transaction.rollback(); status = Status.BACKOFF; if (e instanceof Error) throw (Error) e; } finally { transaction.close(); } return status; } @Override public void configure(Context context) { prefix = context.getString("prefix"); suffix = context.getString("suffix"); }}
3、打包测试
利用Maven
打包并上传到 /opt/module/flume/lib
目录下
在job
文件夹下创建Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysink.conf
添加如下内容:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = com.djm.flume.MySinka1.sinks.k1.prefix = djm:a1.sinks.k1.suffix = :end# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动任务
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
7Flume
参数调优
7.1Source
增加Source
个数可以增大Source
的读取数据的能力,例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个Source
以保证Source
有足够的能力获取到新产生的数据。
batchSize
参数决定Source
一次批量运输到Channel
的Event
条数,适当调大这个参数可以提高Source
搬运Event
到Channel
时的性能。
7.2Channel
Type
选择Memory Channel
时Channel
的性能最好,但是如果Flume
进程意外挂掉可能会丢失数据
Type
选择File Channel
时Channel
的容错性更好,但是性能上会比Memory Channel
差,使用File Channel
时`dataDirs 配置多个不同盘下的目录可以提高性能。
Capacity
参数决定Channel
可容纳最大的Event
条数,TransactionCapacity
参数决定每次Source
往Channel
里面写的最大Event
条数和每次Sink
从Channel
里面读的最大Event
条数,TransactionCapacity
需要大于Source
和Sink
的batchSize
参数。
7.3Sink
增加Sink
的个数可以增加Sink
消费Event
的能力,Sink
也不是越多越好够用就行,过多的Sink
会占用系统资源,造成系统资源不必要的浪费。
batchSize
参数决定Sink
一次批量从Channel
读取的Event
条数,适当调大这个参数可以提高Sink
从Channel
搬出Event
的性能。