千家信息网

好程序员大数据学习路线之Logstach与flume对比

发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,好程序员大数据学习路线之Logstach与flume对比,没有集群的概念,logstach与flume都称为组logstash是用JRuby语言开发的组件的对比:logstach : input fi
千家信息网最后更新 2025年02月08日好程序员大数据学习路线之Logstach与flume对比

好程序员大数据学习路线之Logstach与flume对比,没有集群的概念,logstach与flume都称为组

logstash是用JRuby语言开发的

组件的对比:

  logstach : input filter output

  flume : source channel sink

优劣对比:

logstach :

安装简单,安装体积小

filter组件,使得该工具具有数据过滤,数据切分的功能

可以与ES无缝结合

具有数据容错功能,在数据采集的时候,如果发生宕机或断开的情况,会断点续传(会记录读取的偏移量)

  综上,该工具主要用途为采集日志数据

flume:

高可用方面要比logstach强大

flume一直在强调数据的安全性,flume在数据传输过程中是由事务控制的

flume可以应用在多类型数据传输领域

数据对接

logstach.gz文件上传解压即可

可以在logstach目录下创建conf文件,用来存储配置文件

命令启动

1.bin/logstash -e 'input { stdin {} } output { stdout{} }'

  stdin/stdout(标准输入输出流)

hello xixi

2018-09-12T21:58:58.649Z hadoop01 hello xixi

hello haha

2018-09-12T21:59:19.487Z hadoop01 hello haha

2.bin/logstash -e 'input { stdin {} } output { stdout{codec => rubydebug} }'

hello xixi

{

"message" => "hello xixi",

"@version" => "1",

"@timestamp" => "2018-09-12T22:00:49.612Z",

"host" => "hadoop01"

}

3.es集群中 ,需要启动es集群

  bin/logstash -e 'input { stdin {} } output { elasticsearch {hosts => ["192.168.88.81:9200"]} stdout{} }'

输入命令后,es自动生成index,自动mapping.

hello haha

2018-09-12T22:13:05.361Z hadoop01 hehello haha

  bin/logstash -e 'input { stdin {} } output { elasticsearch {hosts => ["192.168.88.81:9200", "192.168.88.82:9200"]} stdout{} }'

4.kafka集群中,启动kafka集群

  bin/logstash -e 'input { stdin {} } output { elasticsearch {hosts => ["192.168.88.81:9200", "192.168.88.82:9200"]} stdout{} }'

配置文件启动

需要启动zookeeper集群,kafka集群,es集群

1.与kafka数据对接

vi logstash-kafka.conf

  启动

  bin/logstash -f logstash-kafka.conf (-f:指定文件)

  在另一节点上启动kafka消费命令

input {

file {

path => "/root/data/test.log"

discover_interval => 5

start_position => "beginning"

}

}

output {

kafka {

topic_id => "test1"

codec => plain {

format => "%{message}"

charset => "UTF-8"

}

bootstrap_servers => "node01:9092,node02:9092,node03:9092"

}

}

2.与kafka-es数据对接

vi logstash-es.conf

#启动logstash

bin/logstash -f logstash-es.conf

  在另一节点上启动kafka消费命令

input {

file {

type => "gamelog"

path => "/log/*/*.log"

discover_interval => 10

start_position => "beginning"

}

}

output {

elasticsearch {

index => "gamelog-%{+YYYY.MM.dd}"

hosts => ["node01:9200", "node02:9200", "node03:9200"]

}

}

数据对接过程

logstach节点存放: 哪个节点空闲资源多放入哪个节点 (灵活存放)


1.启动logstach监控logserver目录,把数据采集到kafka

2.启动另外一个logstach,监控kafka某个topic数据,把他采集到elasticsearch

数据对接案例

需要启动两个logstach,调用各个配置文件,进行对接

1.采集数据到kafka

  cd conf

  创建配置文件: vi gs-kafka.conf

input {

file {

codec => plain {

charset => "GB2312"

}

path => "/root/basedir/*/*.txt"

discover_interval => 5

start_position => "beginning"

}

}

output {

kafka {

topic_id => "gamelogs"

codec => plain {

format => "%{message}"

charset => "GB2312"

}

bootstrap_servers => "node01:9092,node02:9092,node03:9092"

}

}

  创建kafka对应的topic

bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic gamelogs

2.在hadoop01上启动logstach

  bin/logstash -f conf/gs-kafka.conf

3.在hadoop02上启动另外一个logstach

  cd logstach/conf

  vi kafka-es.conf

input {

kafka {

type => "accesslogs"

codec => "plain"

auto_offset_reset => "smallest"

group_id => "elas1"

topic_id => "accesslogs"

zk_connect => "node01:2181,node02:2181,node03:2181"

}

kafka {

type => "gamelogs"

auto_offset_reset => "smallest"

codec => "plain"

group_id => "elas2"

topic_id => "gamelogs"

zk_connect => "node01:2181,node02:2181,node03:2181"

}

}

filter {

if [type] == "accesslogs" {

json {

source => "message"

remove_field => [ "message" ]

target => "access"

}

}

if [type] == "gamelogs" {

mutate {

split => { "message" => " " }

add_field => {

"event_type" => "%{message[3]}"

"current_map" => "%{message[4]}"

"current_X" => "%{message[5]}"

"current_y" => "%{message[6]}"

"user" => "%{message[7]}"

"item" => "%{message[8]}"

"item_id" => "%{message[9]}"

"current_time" => "%{message[12]}"

}

remove_field => [ "message" ]

}

}

}

output {

if [type] == "accesslogs" {

elasticsearch {

index => "accesslogs"

codec => "json"

hosts => ["node01:9200", "node02:9200", "node03:9200"]

}

}

if [type] == "gamelogs" {

elasticsearch {

index => "gamelogs1"

codec => plain {

charset => "UTF-16BE"

}

hosts => ["node01:9200", "node02:9200", "node03:9200"]

}

}

}

   bin/logstash -f conf/kafka-es.conf

4.修改basedir文件中任意数据即可产生es的index文件


5.网页数据存储在设置的/data/esdata中

6.在网页中查找指定字段

  默认分词器为term,只能查找单个汉字,query_string可以查找全汉字


数据 文件 集群 节点 命令 配置 功能 工具 数据传输 数据采集 目录 组件 网页 过程 汉字 传输 存储 消费 监控 输入 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 服务器看不到数据库列表怎么打开 可视化工具数据库是什么 教育局中小学生网络安全知识 河南服务器防火墙哪个好 软件开发验收不合格怎么办 东南大学杨明网络安全 怎么提高ftp服务器安全性 网络安全书版报 显示连接数据库失败怎么解决 云服务器开发网络游戏 常州网络安全答题入口 sqlplus删除数据库 mycat数据库集群怎么做 垂直求精的含义软件开发 软件开发创造简单项目 联想服务器导航光盘怎么用 无名鹅作剧哪个服务器最便宜 数据库职业技能培训学校 新斗罗大陆有多少个服务器 监狱网络安全防范建设现状 安卓打开定位服务器 网络安全的手抄报写字写清楚一点 微机计算机使用的数据库 海康服务器大小 怀旧服务器盗贼学附魔怎么样 深圳市创帮网络技术有限公司 app软件开发公司排位 宁波互联网科技有限公司电话 惠普服务器连接地址 数据库设计 嵌套 闭包
0