千家信息网

Flink中Connectors如何连接Kafka

发表于:2024-11-19 作者:千家信息网编辑
千家信息网最后更新 2024年11月19日,这篇文章主要介绍Flink中Connectors如何连接Kafka,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!通过使用Flink DataStream Connectors
千家信息网最后更新 2024年11月19日Flink中Connectors如何连接Kafka

这篇文章主要介绍Flink中Connectors如何连接Kafka,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

通过使用Flink DataStream Connectors 数据流连接器连接到ElasticSearch搜索引擎的文档数据库Index,并提供数据流输入与输出操作;

示例环境

java.version: 1.8.xflink.version: 1.11.1kafka:2.11

数据流输入

DataStreamSource.java

package com.flink.examples.kafka;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;/** * @Description 从Kafka中消费数据 */public class DataStreamSource {    /**     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html     */    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //设置并行度(使用几个CPU核心)        env.setParallelism(1);        //每隔2000ms进行启动一个检查点        env.enableCheckpointing(2000);        //设置模式为exactly-once        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);        // 确保检查点之间有进行500 ms的进度        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);        //1.消费者客户端连接到kafka        Properties props = new Properties();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092");        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000);        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-45");        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props);        //setStartFromEarliest()会从最早的数据开始进行消费,忽略存储的offset信息        //consumer.setStartFromEarliest();        //Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略        //consumer.setStartFromTimestamp(1559801580000L);        //Flink从topic中最新的数据开始消费        //consumer.setStartFromLatest();        //Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数        //consumer.setStartFromGroupOffsets();        //2.在算子中进行处理        DataStream sourceStream = env.addSource(consumer)                .filter((FilterFunction) value -> StringUtils.isNotBlank(value))                .map((MapFunction) value -> {                    System.out.println("print:" + value);                    //注意,因已开启enableCheckpointing容错定期检查状态机制,当算子出现错误时,                    //会导致数据流恢复到最新checkpoint的状态,并从存储在checkpoint中的offset开始重新消费Kafka中的消息。                    //因此会有可能导制数据重复消费,重复错误,陷入死循环。加上try|catch,捕获错误后再正确输出。                    Gson gson = new Gson();                    try {                        TUser user = gson.fromJson(value, TUser.class);                        return user;                    }catch(Exception e){                        System.out.println("error:" + e.getMessage());                    }                    return new TUser();                })                .returns(TUser.class);        sourceStream.print();        //3.执行        env.execute("flink  kafka source");    }}

数据流输出

DataStreamSink.java

package com.flink.examples.kafka;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/** * @Description 将生产者数据写入到kafka */public class DataStreamSink {    /**     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html     */    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //必需设置setParallelism并行度,否则不会输出        env.setParallelism(1);        //每隔2000ms进行启动一个检查点        env.enableCheckpointing(2000);        //设置模式为exactly-once        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);        // 确保检查点之间有进行500 ms的进度        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);        // 检查点必须在一分钟内完成,或者被丢弃        env.getCheckpointConfig().setCheckpointTimeout(60000);        // 同一时间只允许进行一个检查点        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);        //1.连接kafka        Properties props = new Properties();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092");        FlinkKafkaProducer producer = new FlinkKafkaProducer("test", new SimpleStringSchema(), props);        //2.创建数据,并写入数据到流中        TUser user = new TUser();        user.setId(8);        user.setName("liu3");        user.setAge(22);        user.setSex(1);        user.setAddress("CN");        user.setCreateTimeSeries(1598889600000L);        DataStream sourceStream = env.fromElements(user).map((MapFunction) value -> new Gson().toJson(value));        //3.将数据流输入到kafka        sourceStream.addSink(producer);        sourceStream.print();        env.execute("flink kafka sink");    }}
  1. 在kafka上创建名称为test的topic

  2. 先启动DataStreamSource.java获取输出流,在启动DataStreamSink.java输入流

数据展示

以上是"Flink中Connectors如何连接Kafka"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!

0