千家信息网

【Druid】Druid读取Kafka数据的简单配置过程

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,Druid的单机版安装参考:https://blog.51cto.com/10120275/2429912Druid实时接入Kafka的过程下载、安装、启动kafka过程:wget http://mi
千家信息网最后更新 2025年01月24日【Druid】Druid读取Kafka数据的简单配置过程

Druid的单机版安装参考:https://blog.51cto.com/10120275/2429912

Druid实时接入Kafka的过程

下载、安装、启动kafka过程:

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.11-2.2.1.tgztar -zxvf kafka_2.11-2.2.1.tgzln -s kafka_2.11-2.2.1 kafka$KAFKA_HOME/kafka-server-start.sh ~/kafka/config/server.properties 1>/dev/null 2>&1 &

创建topic : wikipedia
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia

解压wikiticker-2015-09-12-sampled.json.gz文件,这个步骤是给kafka topic准备输入文件

cd $DRUID_HOME/quickstart/tutorialgunzip -k wikiticker-2015-09-12-sampled.json.gz

这个步骤操作完成后,在$DRUID_HOME/quickstart/tutorial文件夹下生成wikiticker-2015-09-12-sampled.json

上图配置文件如下,其中bootstrap.servers配置kafka地址

{  "type": "kafka",  "dataSchema": {    "dataSource": "wikipedia",    "parser": {      "type": "string",      "parseSpec": {        "format": "json",        "timestampSpec": {          "column": "time",          "format": "auto"        },        "dimensionsSpec": {          "dimensions": [            "channel",            "cityName",            "comment",            "countryIsoCode",            "countryName",            "isAnonymous",            "isMinor",            "isNew",            "isRobot",            "isUnpatrolled",            "metroCode",            "namespace",            "page",            "regionIsoCode",            "regionName",            "user",            { "name": "added", "type": "long" },            { "name": "deleted", "type": "long" },            { "name": "delta", "type": "long" }          ]        }      }    },    "metricsSpec" : [],    "granularitySpec": {      "type": "uniform",      "segmentGranularity": "DAY",      "queryGranularity": "NONE",      "rollup": false    }  },  "tuningConfig": {    "type": "kafka",    "reportParseExceptions": false  },  "ioConfig": {    "topic": "wikipedia",    "replicas": 2,    "taskDuration": "PT10M",    "completionTimeout": "PT20M",    "consumerProperties": {      "bootstrap.servers": "localhost:9092"     }  }}

接下来要将wikiticker-2015-09-12-sampled.json文件内容,利用kafka生产者脚本写入wikipedia的topic中

export KAFKA_OPTS="-Dfile.encoding=UTF-8"./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
0