千家信息网

如何使用Apache Flink实现自定义Sink

发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,如何使用Apache Flink实现自定义Sink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。socket发送过来的数据,把Strin
千家信息网最后更新 2024年09月22日如何使用Apache Flink实现自定义Sink

如何使用Apache Flink实现自定义Sink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中。

创建数据库和表

create database imooc_flink;create table student(id int(11) NOT NULL AUTO_INCREMENT,name varchar(25),age int(10),primary key(id))

导入mysql依赖:

                                   mysql                        mysql-connector-java                        8.0.15                

创建POJO Student

package com.vincent.course05;public class Student {    private int id;    private String name;    private int age;    @Override    public String toString() {        return "Student{" +                "id=" + id +                ", name='" + name + '\'' +                ", age=" + age +                '}';    }    public int getId() {        return id;    }    public void setId(int id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public int getAge() {        return age;    }    public void setAge(int age) {        this.age = age;    }}

然后创建连接,SinkToMySQL.java

package com.vincent.course05;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;public class SinkToMySQL extends RichSinkFunction {    PreparedStatement ps;    private Connection connection;    /**     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接     *     * @param parameters     * @throws Exception     */    @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);        connection = getConnection();        String sql = "insert into student(id, name, age) values(?, ?, ?);";        ps = this.connection.prepareStatement(sql);    }    @Override    public void close() throws Exception {        super.close();        //关闭连接和释放资源        if (connection != null) {            connection.close();        }        if (ps != null) {            ps.close();        }    }    /**     * 每条数据的插入都要调用一次 invoke() 方法     *     * @param value     * @param context     * @throws Exception     */    @Override    public void invoke(Student value, Context context) throws Exception {        //组装数据,执行插入操作        ps.setInt(1, value.getId());        ps.setString(2, value.getName());        ps.setInt(3, value.getAge());        ps.executeUpdate();    }    private static Connection getConnection() {        Connection con = null;        try {            Class.forName("com.mysql.cj.jdbc.Driver");            con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456");        } catch (Exception e) {            e.printStackTrace();            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());        }        return con;    }}

main方法:

public static void main(String[] args) throws Exception {        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource source = environment.socketTextStream("192.168.152.45", 9999);        SingleOutputStreamOperator studentStream = source.map(new MapFunction() {            @Override            public Student map(String value) throws Exception {                String[] splits = value.split(",");                Student student = new Student();                student.setId(Integer.parseInt(splits[0]));                student.setName(splits[1]);                student.setAge(Integer.parseInt(splits[2]));                return student;            }        });        studentStream.addSink(new SinkToMySQL());        environment.execute("JavaCustomSinkToMysql");    }

从socket中获取数据,数据格式使用逗号分割,在控制台中输入:

nc -lk 99991,tom,23

检查数据库,数据库中多了一条数据

mysql> select * from student;+----+------+------+| id | name | age  |+----+------+------+|  1 | tom  |   23 |+----+------+------+1 row in set (0.00 sec)

这样就很方便的使用自定义的sink,写入到MySQL中去。

总结:

第一步:继承RichSinkFunction T就是想要写入的对象类型

第二步:重写方法 open/close生命周期方法,invoke每条记录执行一次

默认情况下open方法的并行度不是1,跟具体的电脑有关系。

看完上述内容,你们掌握如何使用Apache Flink实现自定义Sink的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0