千家信息网

Flink实现Kafka到Mysql的Exactly-Once

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,一、背景最近项目中使用Flink消费kafka消息,并将消费的消息存储到mysql中,看似一个很简单的需求,在网上也有很多flink消费kafka的例子,但看了一圈也没看到能解决重复消费的问题的文章,
千家信息网最后更新 2025年01月24日Flink实现Kafka到Mysql的Exactly-Once

一、背景

最近项目中使用Flink消费kafka消息,并将消费的消息存储到mysql中,看似一个很简单的需求,在网上也有很多flink消费kafka的例子,但看了一圈也没看到能解决重复消费的问题的文章,于是在flink官网中搜索此类场景的处理方式,发现官网也没有实现flink到mysql的Exactly-Once例子,但是官网却有类似的例子来解决端到端的仅一次消费问题。这个现成的例子就是FlinkKafkaProducer011这个类,它保证了通过FlinkKafkaProducer011发送到kafka的消息是Exactly-Once的,主要的实现方式就是继承了TwoPhaseCommitSinkFunction这个类,关于TwoPhaseCommitSinkFunction这个类的作用可以先看上一篇文章https://blog.51cto.com/simplelife/2401411。


二、实现思想

这里简单说下这个类的作用就是实现这个类的方法:beginTransaction、preCommit、commit、abort,达到事件(preCommit)预提交的逻辑(当事件进行自己的逻辑处理后进行预提交,如果预提交成功之后才进行真正的(commit)提交,如果预提交失败则调用abort方法进行事件的回滚操作),结合flink的checkpoint机制,来保存topic中partition的offset。

达到的效果我举个例子来说明下:比如checkpoint每10s进行一次,此时用FlinkKafkaConsumer011实时消费kafka中的消息,消费并处理完消息后,进行一次预提交数据库的操作,如果预提交没有问题,10s后进行真正的插入数据库操作,如果插入成功,进行一次checkpoint,flink会自动记录消费的offset,可以将checkpoint保存的数据放到hdfs中,如果预提交出错,比如在5s的时候出错了,此时Flink程序就会进入不断的重启中,重启的策略可以在配置中设置,当然下一次的checkpoint也不会做了,checkpoint记录的还是上一次成功消费的offset,本次消费的数据因为在checkpoint期间,消费成功,但是预提交过程中失败了,注意此时数据并没有真正的执行插入操作,因为预提交(preCommit)失败,提交(commit)过程也不会发生了。等你将异常数据处理完成之后,再重新启动这个Flink程序,它会自动从上一次成功的checkpoint中继续消费数据,以此来达到Kafka到Mysql的Exactly-Once。


三、具体实现代码三个类

1、StreamDemoKafka2Mysql.java

package com.fwmagic.flink.streaming;import com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;/** * 消费kafka消息,sink(自定义)到mysql中,保证kafka to mysql的Exactly-Once */@SuppressWarnings("all")public class StreamDemoKafka2Mysql {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //设置并行度,为了方便测试,查看消息的顺序,这里设置为1,可以更改为多并行度        env.setParallelism(1);        //checkpoint设置        //每隔10s进行启动一个检查点【设置checkpoint的周期】        env.enableCheckpointing(10000);        //设置模式为:exactly_one,仅一次语义        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);        //确保检查点之间有1s的时间间隔【checkpoint最小间隔】        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);        //检查点必须在10s之内完成,或者被丢弃【checkpoint超时时间】        env.getCheckpointConfig().setCheckpointTimeout(10000);        //同一时间只允许进行一次检查点        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);        //表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint        //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);        //设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地        env.setStateBackend(new FsStateBackend("file:///Users/temp/cp/"));        //设置kafka消费参数        Properties props = new Properties();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hd1:9092,hd2:9092,hd3:9092");        props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group1");        //kafka分区自动发现周期        props.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3000");        /*SimpleStringSchema可以获取到kafka消息,JSONKeyValueDeserializationSchema可以获取都消息的key,value,metadata:topic,partition,offset等信息*/        // FlinkKafkaConsumer011 kafkaConsumer011 = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);        FlinkKafkaConsumer011 kafkaConsumer011 = new FlinkKafkaConsumer011<>("demo123", new JSONKeyValueDeserializationSchema(true), props);        //加入kafka数据源        DataStreamSource streamSource = env.addSource(kafkaConsumer011);        //数据传输到下游        streamSource.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");        //触发执行        env.execute(StreamDemoKafka2Mysql.class.getName());    }}


2、MySqlTwoPhaseCommitSink.java

package com.fwmagic.flink.sink;import com.fwmagic.flink.util.DBConnectUtil;import org.apache.flink.api.common.ExecutionConfig;import org.apache.flink.api.common.typeutils.base.VoidSerializer;import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.Timestamp;import java.text.SimpleDateFormat;import java.util.Date;/** * 自定义kafka to mysql,继承TwoPhaseCommitSinkFunction,实现两阶段提交。 * 功能:保证kafak to mysql 的Exactly-Once */public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction {    public MySqlTwoPhaseCommitSink() {        super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);    }    /**     * 执行数据入库操作     * @param connection     * @param objectNode     * @param context     * @throws Exception     */    @Override    protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {        System.err.println("start invoke.......");        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());        System.err.println("===>date:" + date + " " + objectNode);        String value = objectNode.get("value").toString();        String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)";        PreparedStatement ps = connection.prepareStatement(sql);        ps.setString(1, value);        ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));        //执行insert语句        ps.execute();        //手动制造异常        if(Integer.parseInt(value) == 15) System.out.println(1/0);    }    /**     * 获取连接,开启手动提交事物(getConnection方法中)     * @return     * @throws Exception     */    @Override    protected Connection beginTransaction() throws Exception {        String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";        Connection connection = DBConnectUtil.getConnection(url, "root", "123456");        System.err.println("start beginTransaction......."+connection);        return connection;    }    /**     * 预提交,这里预提交的逻辑在invoke方法中     * @param connection     * @throws Exception     */    @Override    protected void preCommit(Connection connection) throws Exception {        System.err.println("start preCommit......."+connection);    }    /**     * 如果invoke执行正常则提交事物     * @param connection     */    @Override    protected void commit(Connection connection) {        System.err.println("start commit......."+connection);        DBConnectUtil.commit(connection);    }        @Override    protected void recoverAndCommit(Connection connection) {        System.err.println("start recoverAndCommit......."+connection);    }    @Override    protected void recoverAndAbort(Connection connection) {        System.err.println("start abort recoverAndAbort......."+connection);    }    /**     * 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行     * @param connection     */    @Override    protected void abort(Connection connection) {        System.err.println("start abort rollback......."+connection);        DBConnectUtil.rollback(connection);    }}


3、DBConnectUtil.java

package com.fwmagic.flink.util;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;public class DBConnectUtil {    /**     * 获取连接     *     * @param url     * @param user     * @param password     * @return     * @throws SQLException     */    public static Connection getConnection(String url, String user, String password) throws SQLException {        Connection conn = null;        try {            Class.forName("com.mysql.jdbc.Driver");        } catch (ClassNotFoundException e) {            e.printStackTrace();        }        conn = DriverManager.getConnection(url, user, password);        //设置手动提交        conn.setAutoCommit(false);        return conn;    }    /**     * 提交事物     */    public static void commit(Connection conn) {        if (conn != null) {            try {                conn.commit();            } catch (SQLException e) {                e.printStackTrace();            } finally {                close(conn);            }        }    }    /**     * 事物回滚     *     * @param conn     */    public static void rollback(Connection conn) {        if (conn != null) {            try {                conn.rollback();            } catch (SQLException e) {                e.printStackTrace();            } finally {                close(conn);            }        }    }    /**     * 关闭连接     *     * @param conn     */    public static void close(Connection conn) {        if (conn != null) {            try {                conn.close();            } catch (SQLException e) {                e.printStackTrace();            }        }    }}


四、代码测试

为了方便发送消息,我用一个定时任务每秒发送一个数字,1~16,在发送到数字15之前,应该是做过一次checkpoint了,并且快要到第二次checkpoint的时间,第一次checkpoint的消费数据成功将插入数据库中,在消费到数字15的时候,手动造一个异常,此时数据库中应该只有第一次checkpoint后commit的数据,第二次checkpoint的数据并不会插入到数据库中(因为预提交已经失败,不会进行真正的提交),我实验的日志信息:

start invoke.......===>date:2019-05-28 18:36:50 {"value":1,"metadata":{"offset":892,"topic":"gaga","partition":0}}start invoke.......===>date:2019-05-28 18:36:51 {"value":2,"metadata":{"offset":887,"topic":"gaga","partition":2}}start invoke.......===>date:2019-05-28 18:36:52 {"value":3,"metadata":{"offset":889,"topic":"gaga","partition":1}}start invoke.......===>date:2019-05-28 18:36:53 {"value":4,"metadata":{"offset":893,"topic":"gaga","partition":0}}start invoke.......===>date:2019-05-28 18:36:54 {"value":5,"metadata":{"offset":888,"topic":"gaga","partition":2}}start invoke.......===>date:2019-05-28 18:36:55 {"value":6,"metadata":{"offset":890,"topic":"gaga","partition":1}}start invoke.......===>date:2019-05-28 18:36:56 {"value":7,"metadata":{"offset":894,"topic":"gaga","partition":0}}start invoke.......===>date:2019-05-28 18:36:57 {"value":8,"metadata":{"offset":889,"topic":"gaga","partition":2}}start preCommit.......start beginTransaction.......start commit.......com.mysql.jdbc.JDBC4Connection@3c5ad420start invoke.......===>date:2019-05-28 18:36:58 {"value":9,"metadata":{"offset":891,"topic":"gaga","partition":1}}start invoke.......===>date:2019-05-28 18:36:59 {"value":10,"metadata":{"offset":895,"topic":"gaga","partition":0}}start invoke.......===>date:2019-05-28 18:37:00 {"value":11,"metadata":{"offset":890,"topic":"gaga","partition":2}}start invoke.......===>date:2019-05-28 18:37:01 {"value":12,"metadata":{"offset":892,"topic":"gaga","partition":1}}start invoke.......===>date:2019-05-28 18:37:02 {"value":13,"metadata":{"offset":896,"topic":"gaga","partition":0}}start invoke.......===>date:2019-05-28 18:37:03 {"value":14,"metadata":{"offset":891,"topic":"gaga","partition":2}}start invoke.......===>date:2019-05-28 18:37:04 {"value":15,"metadata":{"offset":893,"topic":"gaga","partition":1}}start abort rollback.......com.mysql.jdbc.JDBC4Connection@5f2afc1bstart commit.......com.mysql.jdbc.JDBC4Connection@71ed09ajava.lang.ArithmeticException: / by zero        at com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:36)        at com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:16)        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)        at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)        at java.lang.Thread.run(Thread.java:748)

从日志中可以看到第一次commit时间在2019-05-28 18:36:57,成功入库到数据为1-8,第二次消费到数字15的时候,提交失败,日志最后一行发生了回滚,关闭了连接,然后进行conmit的时候也失败了,消费的数据9-15不会插入到数据库中,此时checkpoint也不会做了,checkpoint保存的还是上一次成功消费后的offset数据。


数据库表:t_test

CREATE TABLE `t_test` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `value` varchar(255) DEFAULT NULL,  `insert_time` datetime DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4


表中的数据:


五、完整代码地址:https://gitee.com/fang_wei/fwmagic-flink


数据 消费 消息 成功 数据库 事物 时间 检查点 检查 例子 手动 数字 方法 时候 上一 处理 事件 代码 就是 日志 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 电子数据库检索图书的意义 数据库插入当前时间的语句 删除网络安全助手 王者荣耀的软件开发模型 江苏智能软件开发技术指导 靠谱的珠宝软件开发项目 ci切换数据库 绿色全光网络技术联盟成立 视频服务器百兆带宽 如何加强网络安全意识相关论文 数据库中角色用什么数据类型 关于网络安全的手抄报绘画边框 重庆双桥区苹果软件开发机构 吉林有名的网络技术咨询口碑推荐 带有数据库的计算机系统构成 国内计算机网络安全事件 青岛市网络安全公益广告 做一个卖房的数据库管理系统 软件开发预付款怎么入账 楼盘网软件开发人 数据库批量更新 靠谱的珠宝软件开发项目 互联网网络技术咨询口碑推荐 征信公司软件开发 查找代理服务器 计算机网络技术课程女生 竹山信息软件开发服务保障 服务器不优化怎么回事 网匠网络技术服务有限公司 我的世界正常服务器
0