如何使用Apache Flink实现自定义Sink
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,如何使用Apache Flink实现自定义Sink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。socket发送过来的数据,把Strin
千家信息网最后更新 2025年02月02日如何使用Apache Flink实现自定义Sink
如何使用Apache Flink实现自定义Sink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中。
创建数据库和表
create database imooc_flink;create table student(id int(11) NOT NULL AUTO_INCREMENT,name varchar(25),age int(10),primary key(id))
导入mysql依赖:
mysql mysql-connector-java 8.0.15
创建POJO Student
package com.vincent.course05;public class Student { private int id; private String name; private int age; @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}'; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; }}
然后创建连接,SinkToMySQL.java
package com.vincent.course05;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;public class SinkToMySQL extends RichSinkFunction{ PreparedStatement ps; private Connection connection; /** * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "insert into student(id, name, age) values(?, ?, ?);"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); //关闭连接和释放资源 if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } /** * 每条数据的插入都要调用一次 invoke() 方法 * * @param value * @param context * @throws Exception */ @Override public void invoke(Student value, Context context) throws Exception { //组装数据,执行插入操作 ps.setInt(1, value.getId()); ps.setString(2, value.getName()); ps.setInt(3, value.getAge()); ps.executeUpdate(); } private static Connection getConnection() { Connection con = null; try { Class.forName("com.mysql.cj.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456"); } catch (Exception e) { e.printStackTrace(); System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage()); } return con; }}
main方法:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcesource = environment.socketTextStream("192.168.152.45", 9999); SingleOutputStreamOperator studentStream = source.map(new MapFunction () { @Override public Student map(String value) throws Exception { String[] splits = value.split(","); Student student = new Student(); student.setId(Integer.parseInt(splits[0])); student.setName(splits[1]); student.setAge(Integer.parseInt(splits[2])); return student; } }); studentStream.addSink(new SinkToMySQL()); environment.execute("JavaCustomSinkToMysql"); }
从socket中获取数据,数据格式使用逗号分割,在控制台中输入:
nc -lk 99991,tom,23
检查数据库,数据库中多了一条数据
mysql> select * from student;+----+------+------+| id | name | age |+----+------+------+| 1 | tom | 23 |+----+------+------+1 row in set (0.00 sec)
这样就很方便的使用自定义的sink,写入到MySQL中去。
总结:
第一步:继承RichSinkFunction
T就是想要写入的对象类型 第二步:重写方法 open/close生命周期方法,invoke每条记录执行一次
默认情况下open方法的并行度不是1,跟具体的电脑有关系。
看完上述内容,你们掌握如何使用Apache Flink实现自定义Sink的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
数据
方法
数据库
对象
内容
更多
类型
问题
束手无策
为此
不用
原因
周期
对此
就是
情况
技能
控制台
时候
格式
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
c语言web服务器实验报告
对接港交所数据库需要条件
软件开发就是编程序对吗
对于wps表格数据库
梓安然软件开发
1U1N服务器尺寸
南宁用的打车软件开发
linux管理服务器的命令
唐山网络安全知识答题
编程软件开发方法
数据库单一来源采购说明怎么写
学好数据库英文
oracle镜像数据库
中控智慧考勤机如何导入数据库
mysql 数据库复制
软件开发企业技术部的职责
厦门闽微网络技术
联盟哪个服务器打战场公会好
数据库的安全性检查实验
uml 软件开发 那些图
湖北省超级服务器云主机
阿里大牛数据库优化
软件开发怎么做兼职
支持建设网络安全相关学科专业
如何在服务器附加数据
组态王查询数据库数据慢
数据库软件占用空间
怎么看数据库表里有没有循环
县转数据库
电脑服务器怎样断开