千家信息网

Flink实现Kafka到Mysql的Exactly-Once

发表于:2024-10-09 作者:千家信息网编辑
千家信息网最后更新 2024年10月09日,一、背景最近项目中使用Flink消费kafka消息,并将消费的消息存储到mysql中,看似一个很简单的需求,在网上也有很多flink消费kafka的例子,但看了一圈也没看到能解决重复消费的问题的文章,
千家信息网最后更新 2024年10月09日Flink实现Kafka到Mysql的Exactly-Once

一、背景

最近项目中使用Flink消费kafka消息,并将消费的消息存储到mysql中,看似一个很简单的需求,在网上也有很多flink消费kafka的例子,但看了一圈也没看到能解决重复消费的问题的文章,于是在flink官网中搜索此类场景的处理方式,发现官网也没有实现flink到mysql的Exactly-Once例子,但是官网却有类似的例子来解决端到端的仅一次消费问题。这个现成的例子就是FlinkKafkaProducer011这个类,它保证了通过FlinkKafkaProducer011发送到kafka的消息是Exactly-Once的,主要的实现方式就是继承了TwoPhaseCommitSinkFunction这个类,关于TwoPhaseCommitSinkFunction这个类的作用可以先看上一篇文章https://blog.51cto.com/simplelife/2401411。


二、实现思想

这里简单说下这个类的作用就是实现这个类的方法:beginTransaction、preCommit、commit、abort,达到事件(preCommit)预提交的逻辑(当事件进行自己的逻辑处理后进行预提交,如果预提交成功之后才进行真正的(commit)提交,如果预提交失败则调用abort方法进行事件的回滚操作),结合flink的checkpoint机制,来保存topic中partition的offset。

达到的效果我举个例子来说明下:比如checkpoint每10s进行一次,此时用FlinkKafkaConsumer011实时消费kafka中的消息,消费并处理完消息后,进行一次预提交数据库的操作,如果预提交没有问题,10s后进行真正的插入数据库操作,如果插入成功,进行一次checkpoint,flink会自动记录消费的offset,可以将checkpoint保存的数据放到hdfs中,如果预提交出错,比如在5s的时候出错了,此时Flink程序就会进入不断的重启中,重启的策略可以在配置中设置,当然下一次的checkpoint也不会做了,checkpoint记录的还是上一次成功消费的offset,本次消费的数据因为在checkpoint期间,消费成功,但是预提交过程中失败了,注意此时数据并没有真正的执行插入操作,因为预提交(preCommit)失败,提交(commit)过程也不会发生了。等你将异常数据处理完成之后,再重新启动这个Flink程序,它会自动从上一次成功的checkpoint中继续消费数据,以此来达到Kafka到Mysql的Exactly-Once。


三、具体实现代码三个类

1、StreamDemoKafka2Mysql.java

package com.fwmagic.flink.streaming;import com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;/** * 消费kafka消息,sink(自定义)到mysql中,保证kafka to mysql的Exactly-Once */@SuppressWarnings("all")public class StreamDemoKafka2Mysql {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //设置并行度,为了方便测试,查看消息的顺序,这里设置为1,可以更改为多并行度        env.setParallelism(1);        //checkpoint设置        //每隔10s进行启动一个检查点【设置checkpoint的周期】        env.enableCheckpointing(10000);        //设置模式为:exactly_one,仅一次语义        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);        //确保检查点之间有1s的时间间隔【checkpoint最小间隔】        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);        //检查点必须在10s之内完成,或者被丢弃【checkpoint超时时间】        env.getCheckpointConfig().setCheckpointTimeout(10000);        //同一时间只允许进行一次检查点        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);        //表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint        //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);        //设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地        env.setStateBackend(new FsStateBackend("file:///Users/temp/cp/"));        //设置kafka消费参数        Properties props = new Properties();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hd1:9092,hd2:9092,hd3:9092");        props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group1");        //kafka分区自动发现周期        props.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3000");        /*SimpleStringSchema可以获取到kafka消息,JSONKeyValueDeserializationSchema可以获取都消息的key,value,metadata:topic,partition,offset等信息*/        // FlinkKafkaConsumer011 kafkaConsumer011 = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);        FlinkKafkaConsumer011 kafkaConsumer011 = new FlinkKafkaConsumer011<>("demo123", new JSONKeyValueDeserializationSchema(true), props);        //加入kafka数据源        DataStreamSource streamSource = env.addSource(kafkaConsumer011);        //数据传输到下游        streamSource.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");        //触发执行        env.execute(StreamDemoKafka2Mysql.class.getName());    }}


2、MySqlTwoPhaseCommitSink.java

package com.fwmagic.flink.sink;import com.fwmagic.flink.util.DBConnectUtil;import org.apache.flink.api.common.ExecutionConfig;import org.apache.flink.api.common.typeutils.base.VoidSerializer;import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.Timestamp;import java.text.SimpleDateFormat;import java.util.Date;/** * 自定义kafka to mysql,继承TwoPhaseCommitSinkFunction,实现两阶段提交。 * 功能:保证kafak to mysql 的Exactly-Once */public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction {    public MySqlTwoPhaseCommitSink() {        super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);    }    /**     * 执行数据入库操作     * @param connection     * @param objectNode     * @param context     * @throws Exception     */    @Override    protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {        System.err.println("start invoke.......");        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());        System.err.println("===>date:" + date + " " + objectNode);        String value = objectNode.get("value").toString();        String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)";        PreparedStatement ps = connection.prepareStatement(sql);        ps.setString(1, value);        ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));        //执行insert语句        ps.execute();        //手动制造异常        if(Integer.parseInt(value) == 15) System.out.println(1/0);    }    /**     * 获取连接,开启手动提交事物(getConnection方法中)     * @return     * @throws Exception     */    @Override    protected Connection beginTransaction() throws Exception {        String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";        Connection connection = DBConnectUtil.getConnection(url, "root", "123456");        System.err.println("start beginTransaction......."+connection);        return connection;    }    /**     * 预提交,这里预提交的逻辑在invoke方法中     * @param connection     * @throws Exception     */    @Override    protected void preCommit(Connection connection) throws Exception {        System.err.println("start preCommit......."+connection);    }    /**     * 如果invoke执行正常则提交事物     * @param connection     */    @Override    protected void commit(Connection connection) {        System.err.println("start commit......."+connection);        DBConnectUtil.commit(connection);    }        @Override    protected void recoverAndCommit(Connection connection) {        System.err.println("start recoverAndCommit......."+connection);    }    @Override    protected void recoverAndAbort(Connection connection) {        System.err.println("start abort recoverAndAbort......."+connection);    }    /**     * 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行     * @param connection     */    @Override    protected void abort(Connection connection) {        System.err.println("start abort rollback......."+connection);        DBConnectUtil.rollback(connection);    }}


3、DBConnectUtil.java

package com.fwmagic.flink.util;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;public class DBConnectUtil {    /**     * 获取连接     *     * @param url     * @param user     * @param password     * @return     * @throws SQLException     */    public static Connection getConnection(String url, String user, String password) throws SQLException {        Connection conn = null;        try {            Class.forName("com.mysql.jdbc.Driver");        } catch (ClassNotFoundException e) {            e.printStackTrace();        }        conn = DriverManager.getConnection(url, user, password);        //设置手动提交        conn.setAutoCommit(false);        return conn;    }    /**     * 提交事物     */    public static void commit(Connection conn) {        if (conn != null) {            try {                conn.commit();            } catch (SQLException e) {                e.printStackTrace();            } finally {                close(conn);            }        }    }    /**     * 事物回滚     *     * @param conn     */    public static void rollback(Connection conn) {        if (conn != null) {            try {                conn.rollback();            } catch (SQLException e) {                e.printStackTrace();            } finally {                close(conn);            }        }    }    /**     * 关闭连接     *     * @param conn     */    public static void close(Connection conn) {        if (conn != null) {            try {                conn.close();            } catch (SQLException e) {                e.printStackTrace();            }        }    }}


四、代码测试

为了方便发送消息,我用一个定时任务每秒发送一个数字,1~16,在发送到数字15之前,应该是做过一次checkpoint了,并且快要到第二次checkpoint的时间,第一次checkpoint的消费数据成功将插入数据库中,在消费到数字15的时候,手动造一个异常,此时数据库中应该只有第一次checkpoint后commit的数据,第二次checkpoint的数据并不会插入到数据库中(因为预提交已经失败,不会进行真正的提交),我实验的日志信息:

start invoke.......===>date:2019-05-28 18:36:50 {"value":1,"metadata":{"offset":892,"topic":"gaga","partition":0}}start invoke.......===>date:2019-05-28 18:36:51 {"value":2,"metadata":{"offset":887,"topic":"gaga","partition":2}}start invoke.......===>date:2019-05-28 18:36:52 {"value":3,"metadata":{"offset":889,"topic":"gaga","partition":1}}start invoke.......===>date:2019-05-28 18:36:53 {"value":4,"metadata":{"offset":893,"topic":"gaga","partition":0}}start invoke.......===>date:2019-05-28 18:36:54 {"value":5,"metadata":{"offset":888,"topic":"gaga","partition":2}}start invoke.......===>date:2019-05-28 18:36:55 {"value":6,"metadata":{"offset":890,"topic":"gaga","partition":1}}start invoke.......===>date:2019-05-28 18:36:56 {"value":7,"metadata":{"offset":894,"topic":"gaga","partition":0}}start invoke.......===>date:2019-05-28 18:36:57 {"value":8,"metadata":{"offset":889,"topic":"gaga","partition":2}}start preCommit.......start beginTransaction.......start commit.......com.mysql.jdbc.JDBC4Connection@3c5ad420start invoke.......===>date:2019-05-28 18:36:58 {"value":9,"metadata":{"offset":891,"topic":"gaga","partition":1}}start invoke.......===>date:2019-05-28 18:36:59 {"value":10,"metadata":{"offset":895,"topic":"gaga","partition":0}}start invoke.......===>date:2019-05-28 18:37:00 {"value":11,"metadata":{"offset":890,"topic":"gaga","partition":2}}start invoke.......===>date:2019-05-28 18:37:01 {"value":12,"metadata":{"offset":892,"topic":"gaga","partition":1}}start invoke.......===>date:2019-05-28 18:37:02 {"value":13,"metadata":{"offset":896,"topic":"gaga","partition":0}}start invoke.......===>date:2019-05-28 18:37:03 {"value":14,"metadata":{"offset":891,"topic":"gaga","partition":2}}start invoke.......===>date:2019-05-28 18:37:04 {"value":15,"metadata":{"offset":893,"topic":"gaga","partition":1}}start abort rollback.......com.mysql.jdbc.JDBC4Connection@5f2afc1bstart commit.......com.mysql.jdbc.JDBC4Connection@71ed09ajava.lang.ArithmeticException: / by zero        at com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:36)        at com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:16)        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)        at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)        at java.lang.Thread.run(Thread.java:748)

从日志中可以看到第一次commit时间在2019-05-28 18:36:57,成功入库到数据为1-8,第二次消费到数字15的时候,提交失败,日志最后一行发生了回滚,关闭了连接,然后进行conmit的时候也失败了,消费的数据9-15不会插入到数据库中,此时checkpoint也不会做了,checkpoint保存的还是上一次成功消费后的offset数据。


数据库表:t_test

CREATE TABLE `t_test` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `value` varchar(255) DEFAULT NULL,  `insert_time` datetime DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4


表中的数据:


五、完整代码地址:https://gitee.com/fang_wei/fwmagic-flink


0