flume实际生产场景分析
发表于:2025-01-26 作者:千家信息网编辑
千家信息网最后更新 2025年01月26日,需求:A B两台日志服务器实时生产日志主要类型为access.log、nginx.log、web.log,现在要求:把A、B机器中的access.log、nginx.log、web.log 采集汇总到
千家信息网最后更新 2025年01月26日flume实际生产场景分析
需求:A B两台日志服务器实时生产日志主要类型为access.log、nginx.log、web.log,现在要求:
把A、B机器中的access.log、nginx.log、web.log 采集汇总到 C 机器上然后统一收集到 hdfs中,但是在hdfs中要求的目录为:
/source/logs/access/日期/**
/source/logs/nginx/日期/**
/source/logs/web/日期/**
场景分析:
规划:
hadoop01(web01):
source:access.log 、nginx.log、web.log
channel:memory
sink:avro
hadoop02(web02):
source:access.log 、nginx.log、web.log
channel:memory
sink:avro
hadoop03(数据收集):
source;avro
channel:memory
sink:hdfs
配置文件:
#exec_source_avro_sink.properties#指定各个核心组件a1.sources = r1 r2 r3a1.sinks = k1a1.channels = c1#r1a1.sources.r1.type = execa1.sources.r1.command = tail -F /home/hadoop/flume_data/access.loga1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = typea1.sources.r1.interceptors.i1.value = access#r2a1.sources.r2.type = execa1.sources.r2.command = tail -F /home/hadoop/flume_data/nginx.loga1.sources.r2.interceptors = i2a1.sources.r2.interceptors.i2.type = statica1.sources.r2.interceptors.i2.key = typea1.sources.r2.interceptors.i2.value = nginx#r3a1.sources.r3.type = execa1.sources.r3.command = tail -F /home/hadoop/flume_data/web.loga1.sources.r3.interceptors = i3a1.sources.r3.interceptors.i3.type = statica1.sources.r3.interceptors.i3.key = typea1.sources.r3.interceptors.i3.value = web#Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop03a1.sinks.k1.port = 41414#Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 20000a1.channels.c1.transactionCapacity = 10000#Bind the source and sink to the channela1.sources.r1.channels = c1a1.sources.r2.channels = c1a1.sources.r3.channels = c1a1.sinks.k1.channel = c1
#avro_source_hdfs_sink.properties#定义 agent 名, source、channel、sink 的名称a1.sources = r1a1.sinks = k1a1.channels = c1#定义 sourcea1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port =41414#添加时间拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type=org.apache.flume.interceptor.TimestampInterceptor$Builder#定义 channelsa1.channels.c1.type = memorya1.channels.c1.capacity = 20000a1.channels.c1.transactionCapacity = 10000#定义 sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path=hdfs://myha01/source/logs/%{type}/%Y%m%da1.sinks.k1.hdfs.filePrefix =eventsa1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.writeFormat = Text#时间类型a1.sinks.k1.hdfs.useLocalTimeStamp = true#生成的文件不按条数生成a1.sinks.k1.hdfs.rollCount = 0#生成的文件按时间生成a1.sinks.k1.hdfs.rollInterval = 30#生成的文件按大小生成a1.sinks.k1.hdfs.rollSize = 10485760#批量写入 hdfs 的个数a1.sinks.k1.hdfs.batchSize = 20#flume 操作 hdfs 的线程数(包括新建,写入等)a1.sinks.k1.hdfs.threadsPoolSize=10#操作 hdfs 超时时间a1.sinks.k1.hdfs.callTimeout=30000#组装 source、channel、sinka1.sources.r1.channels = c1a1.sinks.k1.channel = c1
测试:
#在hadoop01和 hadoop02上的/home/hadoop/data 有数据文件 access.log、nginx.log、 web.log#先启动hadoop03上的flume:(存储)flume-ng agent -c conf -f avro_source_hdfs_sink.properties -name a1 -Dflume.root.logger=DEBUG,console#然后在启动hadoop01和hadoop02上的命令flume(收集)flume-ng agent -c conf -f exec_source_avro_sink.properties -name a1 -Dflume.root.logger=DEBUG,console
生成
文件
时间
日期
数据
日志
机器
类型
场景
分析
生产
个数
名称
命令
大小
实时
是在
服务器
核心
目录
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库下一年度
二级mysql数据库程序设计
阿里云网络安全问题
chat软件开发
4k纸网络安全手抄报初一
技嘉服务器出货量
只复制粘贴未隐藏的数据库
江苏乐知网络技术
软件开发需要购买哪些原材料
源服务器管理
服务器数据多久备份一次好
软件开发i3处理器行不行
什么设备可以管理服务器web
外汇ea软件开发
职业篮球赛信息管理系统数据库
计算机网络技术要考啥证
我的世界服务器给永久管理
数据库通过id删除语句
捷作商贸宝没有登录到数据库
网络安全与执法开设院校大学
网络安全法剖析
数据库access c
丰泽区旗开网络技术服务部
方向软件数据库损坏
东城区正规软件开发推广
学校常见的服务器有哪些
为什么网络安全是管理问题
数据库的高级应用有哪些
江苏网络软件开发一体化
澳门 软件开发 兼职