Flink中Connectors如何连接Kafka
发表于:2024-11-19 作者:千家信息网编辑
千家信息网最后更新 2024年11月19日,这篇文章主要介绍Flink中Connectors如何连接Kafka,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!通过使用Flink DataStream Connectors
千家信息网最后更新 2024年11月19日Flink中Connectors如何连接Kafka
这篇文章主要介绍Flink中Connectors如何连接Kafka,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
通过使用Flink DataStream Connectors 数据流连接器连接到ElasticSearch搜索引擎的文档数据库Index,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
数据流输入
DataStreamSource.java
package com.flink.examples.kafka;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;/** * @Description 从Kafka中消费数据 */public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度(使用几个CPU核心) env.setParallelism(1); //每隔2000ms进行启动一个检查点 env.enableCheckpointing(2000); //设置模式为exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有进行500 ms的进度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //1.消费者客户端连接到kafka Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-45"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props); //setStartFromEarliest()会从最早的数据开始进行消费,忽略存储的offset信息 //consumer.setStartFromEarliest(); //Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略 //consumer.setStartFromTimestamp(1559801580000L); //Flink从topic中最新的数据开始消费 //consumer.setStartFromLatest(); //Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数 //consumer.setStartFromGroupOffsets(); //2.在算子中进行处理 DataStream sourceStream = env.addSource(consumer) .filter((FilterFunction ) value -> StringUtils.isNotBlank(value)) .map((MapFunction ) value -> { System.out.println("print:" + value); //注意,因已开启enableCheckpointing容错定期检查状态机制,当算子出现错误时, //会导致数据流恢复到最新checkpoint的状态,并从存储在checkpoint中的offset开始重新消费Kafka中的消息。 //因此会有可能导制数据重复消费,重复错误,陷入死循环。加上try|catch,捕获错误后再正确输出。 Gson gson = new Gson(); try { TUser user = gson.fromJson(value, TUser.class); return user; }catch(Exception e){ System.out.println("error:" + e.getMessage()); } return new TUser(); }) .returns(TUser.class); sourceStream.print(); //3.执行 env.execute("flink kafka source"); }}
数据流输出
DataStreamSink.java
package com.flink.examples.kafka;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/** * @Description 将生产者数据写入到kafka */public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //必需设置setParallelism并行度,否则不会输出 env.setParallelism(1); //每隔2000ms进行启动一个检查点 env.enableCheckpointing(2000); //设置模式为exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有进行500 ms的进度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 检查点必须在一分钟内完成,或者被丢弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //1.连接kafka Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092"); FlinkKafkaProducerproducer = new FlinkKafkaProducer ("test", new SimpleStringSchema(), props); //2.创建数据,并写入数据到流中 TUser user = new TUser(); user.setId(8); user.setName("liu3"); user.setAge(22); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(1598889600000L); DataStream sourceStream = env.fromElements(user).map((MapFunction ) value -> new Gson().toJson(value)); //3.将数据流输入到kafka sourceStream.addSink(producer); sourceStream.print(); env.execute("flink kafka sink"); }}
在kafka上创建名称为test的topic
先启动DataStreamSource.java获取输出流,在启动DataStreamSink.java输入流
数据展示
以上是"Flink中Connectors如何连接Kafka"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!
数据
消费
检查
数据流
检查点
输出
输入
文档
时间
错误
中指
之间
内容
官方
模式
状态
算子
篇文章
进度
存储
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
政府对网络安全的监管
网络安全 公益广告儿歌
网络安全预测评估
access数据库怎么查询
网络安全汇报材料
萌站服务器负荷较大需要暂时排队
数据库怎么导入数据库
服务器1tb硬盘价格
软件开发的成果 怎么写
仿安防互联服务器IDC公司模板
上海憬彤网络技术公司
跑步路线记录软件开发
数据库上班可以拿手机吗
java游戏服务器埋点方案
云南回收服务器主板ic 云主机
国产服务器是啥架构
小白怎么学习网络技术
为什么京东服务器那么好
移动摄像头连接服务器失败
用友u8加密服务器
软件软件开发外包的企业
网络安全汇报材料
合肥app软件开发的公司
创造与魔法两种服务器
武汉市软件开发培训学校
数据库与终端检测系统
网络安全专题组织生活会
服务器R5300g4
加强网络安全从自身做起的倡议书
网络安全手抄报最简单的意思