Flink中Connectors如何连接Kafka
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章主要介绍Flink中Connectors如何连接Kafka,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!通过使用Flink DataStream Connectors
千家信息网最后更新 2025年02月04日Flink中Connectors如何连接Kafka
这篇文章主要介绍Flink中Connectors如何连接Kafka,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
通过使用Flink DataStream Connectors 数据流连接器连接到ElasticSearch搜索引擎的文档数据库Index,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
数据流输入
DataStreamSource.java
package com.flink.examples.kafka;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;/** * @Description 从Kafka中消费数据 */public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度(使用几个CPU核心) env.setParallelism(1); //每隔2000ms进行启动一个检查点 env.enableCheckpointing(2000); //设置模式为exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有进行500 ms的进度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //1.消费者客户端连接到kafka Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-45"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props); //setStartFromEarliest()会从最早的数据开始进行消费,忽略存储的offset信息 //consumer.setStartFromEarliest(); //Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略 //consumer.setStartFromTimestamp(1559801580000L); //Flink从topic中最新的数据开始消费 //consumer.setStartFromLatest(); //Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数 //consumer.setStartFromGroupOffsets(); //2.在算子中进行处理 DataStream sourceStream = env.addSource(consumer) .filter((FilterFunction ) value -> StringUtils.isNotBlank(value)) .map((MapFunction ) value -> { System.out.println("print:" + value); //注意,因已开启enableCheckpointing容错定期检查状态机制,当算子出现错误时, //会导致数据流恢复到最新checkpoint的状态,并从存储在checkpoint中的offset开始重新消费Kafka中的消息。 //因此会有可能导制数据重复消费,重复错误,陷入死循环。加上try|catch,捕获错误后再正确输出。 Gson gson = new Gson(); try { TUser user = gson.fromJson(value, TUser.class); return user; }catch(Exception e){ System.out.println("error:" + e.getMessage()); } return new TUser(); }) .returns(TUser.class); sourceStream.print(); //3.执行 env.execute("flink kafka source"); }}
数据流输出
DataStreamSink.java
package com.flink.examples.kafka;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/** * @Description 将生产者数据写入到kafka */public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //必需设置setParallelism并行度,否则不会输出 env.setParallelism(1); //每隔2000ms进行启动一个检查点 env.enableCheckpointing(2000); //设置模式为exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有进行500 ms的进度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 检查点必须在一分钟内完成,或者被丢弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //1.连接kafka Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092"); FlinkKafkaProducerproducer = new FlinkKafkaProducer ("test", new SimpleStringSchema(), props); //2.创建数据,并写入数据到流中 TUser user = new TUser(); user.setId(8); user.setName("liu3"); user.setAge(22); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(1598889600000L); DataStream sourceStream = env.fromElements(user).map((MapFunction ) value -> new Gson().toJson(value)); //3.将数据流输入到kafka sourceStream.addSink(producer); sourceStream.print(); env.execute("flink kafka sink"); }}
在kafka上创建名称为test的topic
先启动DataStreamSource.java获取输出流,在启动DataStreamSink.java输入流
数据展示
以上是"Flink中Connectors如何连接Kafka"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!
数据
消费
检查
数据流
检查点
输出
输入
文档
时间
错误
中指
之间
内容
官方
模式
状态
算子
篇文章
进度
存储
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
山东省互联网科技
数据库系统工程师工程师
html如何把数据库连接
mc精灵宝可梦手机版服务器下载
数据库单表同步
创建数据库在主文件增加
太原市鑫思创软件开发
后台获取服务器控件
用友备份数据库
数据库网上购物系统设计
海康威视校时服务器地址
网络安全法首例
驾考app软件开发
微服务器框架
2018剑侠情缘服务器
甲方和软件开发
jdbc连接数据库所涉及的类
洛阳林辰网络技术
出图数据库
当今网络安全管理现状分析
tls安全连接到服务器
软件开发和开发工程师的区别
历史签到数据库设计
网络数据库技术试题
中兴服务器怎么修改主机名
20人软件开发公司
网络安全自评打分报告
说说你对网络安全信息安全
学习软件开发难度
虚拟专用网络技术的应用场景