flume如何安装并整合kafka
这篇文章给大家分享的是有关flume如何安装并整合kafka的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
flume
Flume agent之间的通信(参考图书)
flume内置了专门的RPC sink-source对来处理agent之间的数据传输。 source是负责接收数据到Flume Agent的组件。包括Avro Source、Thrift source 、HTTP Source、Spooling Directory Source、Syslog Source、Exec Source、JMS Source等。 channel是位于source和sink之间的缓冲区,是保证数据不丢失的关键。 sink从Channel中读取事件,每一个sink只能从一个Channel钟读取事件,必须给每一个sink配置Channel,否则会从agent中移除。
安装flume
下载安装
cd /data/
wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
tar axf apache-flume-1.8.0-bin.tar.gz
cd apache-flume-1.8.0-bin
修改环境变量
vim /etc/profile
#FLUSMexport FLUME_HOME=/data/apache-flume-1.8.0-binexport PATH=$PATH:${FLUME_HOME}/binexport HADOOP_HOME=/data/hadoop
source /etc/profile
修改配置文件
cd ${FLUME_HOME}/conf/
cp flume-env.sh.template flume-env.sh
修改 flume-env.sh
export JAVA_HOME=/usr/local/jdkexport JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"export HADOOP_HOME=/data/hadoop
验证安装
flume-ng version
使用flume
单节点agent传输信息
cd ${FLUME_HOME}/conf/
添加配置文件
vim avro.conf
#Name the components on this agentagent.sources = avroSrcagent.channels = avroChannel#Describe/configure the sourceagent.sources.avroSrc.type = netcatagent.sources.avroSrc.bind = localhostagent.sources.avroSrc.port = 62000#Describe the sinkagent.sinks.avroSink.type = logger#Use a channel that buffers events in memoryagent.channels.avroChannel.type = memoryagent.channels.avroChannel.capacity = 1000agent.channels.avroChannel.transactionCapacity = 100#Bind the source and sink to the channelagent.sinks = avroSinkagent.sources.avroSrc.channels = avroChannelagent.sinks.avroSink.channel = avroChannel
"#测试agent.sources.avroSrc.type用avro,然后报错
#org.apache.avro.AvroRuntimeException: Excessively large list #allocation request detected: 1863125517 items! Connection #closed"
运行flume agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/avro.conf -n agent -Dflume.root.logger=INFO,console
使用Telnet连接测试
telnet localhost 6200
查看
exec监控本地文件
cd ${FLUME_HOME}/conf/
添加配置文件
vim exec.conf
#example.conf: A single-node Flume configuration#Name the components on this agentagentexec.sources = avroexecagentexec.sinks = sinkexecagentexec.channels = channelexec#Describe/configure the sources#Describe/configure the sourceagentexec.sources.avroexec.bind = localhostagentexec.sources.avroexec.port = 630000agentexec.sources.avroexec.type = execagentexec.sources.avroexec.command = tail -F /tmp/testexec.log#Describe the sinkagentexec.sinks.sinkexec.type = logger#Use a channel which buffers events in memoryagentexec.channels.channelexec.type = memoryagentexec.channels.channelexec.capacity = 100000agentexec.channels.channelexec.transactionCapacity = 10000#Bind the source and sink to the channelagentexec.sources.avroexec.channels = channelexecagentexec.sinks.sinkexec.channel = channelexec
运行flume agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/exec.conf --name agentexec -Dflume.root.logger=INFO,console
测试
尴尬,只获取到了一部分(暂时没有占到解决方法)
spooldir整合kafka监控日志
前提:安装kafka集群
cd ${FLUME_HOME}/conf/
添加配置文件
vim single_agent.conf
#agent name a1a1.sources = source1a1.channels = channel1a1.sinks = sink1#set source#"测试使用将数据放在了/tmp目录下,注意设置"a1.sources.source1.type = spooldira1.sources.source1.spoolDir=/tmp/spooldira11.sources.source1.fileHeader = false#set sinka1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092a1.sinks.sink1.topic= spooldir#set channel#"测试使用将数据放在了/tmp目录下,注意设置"a1.channels.channel1.type = filea1.channels.channel1.checkpointDir = /tmp/flume_data/checkpointa1.channels.channel1.dataDirs= /tmp/flume_data/data#binda1.sources.source1.channels = channel1a1.sinks.sink1.channel = channel1
创建文件存放目录
mkdir -pv /tmp/spooldirmkdir -pv /tmp/flume_data/checkpointmkdir -pv /tmp/flume_data/data
(所有节点)启动kafka集群
kafka-server-start.sh /data/kafka_2.11-1.0.0/config/server.properties
创建kafka的topic
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic spooldir --replication-factor 1 --partitions 3
查看topic
kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
创建kafka的consumer
kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic spooldir --from-beginning
(新窗口)启动flume的agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console
写入测试
[root@master conf]# echo "hello ,test flume spooldir source" >> /tmp/spooldir/spool.txt
flume-ng信息
kafka信息
将日志信息写入hbase
前提:安装hbase集群
cd ${FLUME_HOME}/conf/
mkdir hbase && cd hbase
添加配置文件,这里需要两个agent端
hbase-back.conf用于收集本地数据,hbase-front.conf用于将数据写入hbase
vim hbase-back.conf
agent.sources =backsrcagent.channels=memoryChannelagent.sinks =remotesink#Describe the sourcesagent.sources.backsrc.type = execagent.sources.backsrc.command = tail -F /tmp/test/data/data.txtagent.sources.backsrc.checkperiodic = 1000agent.sources.backsrc.channels=memoryChannel#Describe the channelsagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.keep-alive = 30agent.channels.memoryChannel.capacity = 1000agent.channels.memoryChannel.transactionCapacity = 1000#Describe the sinksagent.sinks.remotesink.type = avroagent.sinks.remotesink.hostname = masteragent.sinks.remotesink.port = 9999agent.sinks.remotesink.channel= memoryChannel
vim hbase-front.conf
agent.sources = frontsrcagent.channels = memoryChannelagent.sinks = fileSink#Describe the sourcesagent.sources.frontsrc.type = avroagent.sources.frontsrc.bind = masteragent.sources.frontsrc.port = 9999agent.sources.frontsrc.channels = memoryChannel#Describe the channelsagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.keep-alive = 30agent.channels.memoryChannel.capacity = 1000agent.channels.memoryChannel.transactionCapacity =1000#Describe the sinksagent.sinks.fileSink.type = hbaseagent.sinks.fileSink.channel=memoryChannelagent.sinks.fileSink.table = access_logagent.sinks.fileSink.columnFamily = tagent.sinks.fileSink.batchSize= 50agent.sinks.fileSink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializeragent.sinks.fileSink.zookeeperQuorum = master:2181,slave1:2181,slave2:2181agent.sinks.fileSink.znodeParent = /hbaseagent.sinks.fileSink.timeout = 90000
创建本地文件和目录
mkdir -pv /tmp/test/data && touch /tmp/test/data/data.txt
创建hbase中的表
hbase shell
创建表
create 'access_log','t'
查看
list
启动back agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/hbase/hbase-back.conf --name agent -Dflume.root.logger=INFO,console
启动后会报错
18/01/22 22:29:28 WARN sink.AbstractRpcSink: Unable to create Rpc client using hostname: 192.168.3.58, port: 9999
org.apache.flume.FlumeException: NettyAvroRpcClient { host: master, port: 9999 }: RPC connection error
这是因为avro连接没有完成,现在只启动了sink端,没有source端,等启动了front后就会显示连接上了
启动front agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/hbase/hbase-front.conf --name agent -Dflume.root.logger=INFO,console
向本地文件中追加内容,然后在hbase中查看
echo "hello ,test flush to hbase">>/tmp/test/data/data.txt
写入的过程中两个agent不会打印日志
查看hbase中的数据
hbase shellscan "access_log"
flume向hbase中写入日志会有一定时间的延迟
将日志写入hadoop
原理和写入hbase一样,理解了hbase写入流程就很好理解写入其它服务了,详细配置参考官方文档。
前提:安装hadoop集群
cd ${FLUME_HOME}/conf/
mkdir hdfs && cd hdfs
添加配置文件,这里需要两个agent端
hadoop-back.conf用于收集本地数据,hadoop-front.conf用于将数据写入hadoop
vim hadoop-back.conf
#Namethe componentshadoop.sources= backsrchadoop.sinks= fileSinkhadoop.channels= memoryChannel#Sourcehadoop.sources.backsrc.type= spooldirhadoop.sources.backsrc.spoolDir= /tmp/data/hadoophadoop.sources.backsrc.channels= memoryChannelhadoop.sources.backsrc.fileHeader = true#Channelhadoop.channels.memoryChannel.type= memoryhadoop.channels.memoryChannel.keep-alive = 30hadoop.channels.memoryChannel.capacity = 1000hadoop.channels.memoryChannel.transactionCapacity = 1000#Sinkhadoop.sinks.fileSink.type= avrohadoop.sinks.fileSink.hostname= masterhadoop.sinks.fileSink.port= 10000hadoop.sinks.fileSink.channel= memoryChannel
vim hadoop-front.conf
#Namethe componentshadoop.sources= frontsrchadoop.channels= memoryChannelhadoop.sinks= remotesink#Sourcehadoop.sources.frontsrc.type= avrohadoop.sources.frontsrc.bind= masterhadoop.sources.frontsrc.port= 10000hadoop.sources.frontsrc.channels= memoryChannel#Channelhadoop.channels.memoryChannel.type= memoryhadoop.channels.memoryChannel.keep-alive = 30hadoop.channels.memoryChannel.capacity = 1000hadoop.channels.memoryChannel.transactionCapacity =1000#Sinkhadoop.sinks.remotesink.type= hdfshadoop.sinks.remotesink.hdfs.path=hdfs://master/flumehadoop.sinks.remotesink.hdfs.rollInterval = 0hadoop.sinks.remotesink.hdfs.idleTimeout = 10000hadoop.sinks.remotesink.hdfs.fileType= DataStreamhadoop.sinks.remotesink.hdfs.writeFormat= Texthadoop.sinks.remotesink.hdfs.threadsPoolSize = 20hadoop.sinks.remotesink.channel= memoryChannel
创建本地目录并修改权限
mkdir -pv /tmp/data/hadoop && chmod -R 777 /tmp/data/
创建hdfs中的目录并修改权限
hadoop fs -mkdir /flumehadoop fs -chmod 777 /flumehadoop fs -ls /
向本地目录中写入文件
echo "hello, test hadoop" >> /tmp/data/hadoop/hadoop.logecho "hello, test flume" >> /tmp/data/hadoop/flume.logecho "hello, test helloworld" >> /tmp/data/hadoop/helloworld.log
查看hdfs中的文件和文件信息
hadoop fs -ls /flumehadoop fs -cat /flume/FlumeData.1516634328510.tmp
感谢各位的阅读!关于"flume如何安装并整合kafka"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!