Flink中Connectors如何连接Kafka
发表于:2024-11-19 作者:千家信息网编辑
千家信息网最后更新 2024年11月19日,这篇文章主要介绍Flink中Connectors如何连接Kafka,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!通过使用Flink DataStream Connectors
千家信息网最后更新 2024年11月19日Flink中Connectors如何连接Kafka
这篇文章主要介绍Flink中Connectors如何连接Kafka,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
通过使用Flink DataStream Connectors 数据流连接器连接到ElasticSearch搜索引擎的文档数据库Index,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
数据流输入
DataStreamSource.java
package com.flink.examples.kafka;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;/** * @Description 从Kafka中消费数据 */public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度(使用几个CPU核心) env.setParallelism(1); //每隔2000ms进行启动一个检查点 env.enableCheckpointing(2000); //设置模式为exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有进行500 ms的进度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //1.消费者客户端连接到kafka Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-45"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props); //setStartFromEarliest()会从最早的数据开始进行消费,忽略存储的offset信息 //consumer.setStartFromEarliest(); //Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略 //consumer.setStartFromTimestamp(1559801580000L); //Flink从topic中最新的数据开始消费 //consumer.setStartFromLatest(); //Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数 //consumer.setStartFromGroupOffsets(); //2.在算子中进行处理 DataStream sourceStream = env.addSource(consumer) .filter((FilterFunction ) value -> StringUtils.isNotBlank(value)) .map((MapFunction ) value -> { System.out.println("print:" + value); //注意,因已开启enableCheckpointing容错定期检查状态机制,当算子出现错误时, //会导致数据流恢复到最新checkpoint的状态,并从存储在checkpoint中的offset开始重新消费Kafka中的消息。 //因此会有可能导制数据重复消费,重复错误,陷入死循环。加上try|catch,捕获错误后再正确输出。 Gson gson = new Gson(); try { TUser user = gson.fromJson(value, TUser.class); return user; }catch(Exception e){ System.out.println("error:" + e.getMessage()); } return new TUser(); }) .returns(TUser.class); sourceStream.print(); //3.执行 env.execute("flink kafka source"); }}
数据流输出
DataStreamSink.java
package com.flink.examples.kafka;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/** * @Description 将生产者数据写入到kafka */public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //必需设置setParallelism并行度,否则不会输出 env.setParallelism(1); //每隔2000ms进行启动一个检查点 env.enableCheckpointing(2000); //设置模式为exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有进行500 ms的进度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 检查点必须在一分钟内完成,或者被丢弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //1.连接kafka Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092"); FlinkKafkaProducerproducer = new FlinkKafkaProducer ("test", new SimpleStringSchema(), props); //2.创建数据,并写入数据到流中 TUser user = new TUser(); user.setId(8); user.setName("liu3"); user.setAge(22); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(1598889600000L); DataStream sourceStream = env.fromElements(user).map((MapFunction ) value -> new Gson().toJson(value)); //3.将数据流输入到kafka sourceStream.addSink(producer); sourceStream.print(); env.execute("flink kafka sink"); }}
在kafka上创建名称为test的topic
先启动DataStreamSource.java获取输出流,在启动DataStreamSink.java输入流
数据展示
以上是"Flink中Connectors如何连接Kafka"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!
数据
消费
检查
数据流
检查点
输出
输入
文档
时间
错误
中指
之间
内容
官方
模式
状态
算子
篇文章
进度
存储
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
初中生能学计算机网络技术吗
淄博服装库存软件开发公司
国家工信部软件开发工程师
数据库job可以查看日志吗
中国好网民 网络安全宣传
搜客网络技术
网络安全团员心得
au外文数据库
mysql 数据库位置
问道手游客服服务器人数
浪潮服务器进不了启动盘
金蝶软件服务器未运行
服务器关机命令power
速维网络技术有限公司怎么样
服务器cpu增加内存
天工开物网络技术
皇室战争一共几个服务器
有序网络安全吗
医院信息化网络安全会议
怎么搭建服务器远程访问
数据库负数取正
深圳盘古网络技术有限公司
安卓本身带有数据库吗
ui设计是学什么软件开发
坦克世界战车服务器
rup的软件开发规范
北京配天软件开发面试
乡卫生院网络安全处理情况
网络安全相关专业的发展状况
漳州求职数据库