如何使用Apache Flink实现自定义Sink
发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,如何使用Apache Flink实现自定义Sink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。socket发送过来的数据,把Strin
千家信息网最后更新 2024年09月22日如何使用Apache Flink实现自定义Sink
如何使用Apache Flink实现自定义Sink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中。
创建数据库和表
create database imooc_flink;create table student(id int(11) NOT NULL AUTO_INCREMENT,name varchar(25),age int(10),primary key(id))
导入mysql依赖:
mysql mysql-connector-java 8.0.15
创建POJO Student
package com.vincent.course05;public class Student { private int id; private String name; private int age; @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}'; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; }}
然后创建连接,SinkToMySQL.java
package com.vincent.course05;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;public class SinkToMySQL extends RichSinkFunction{ PreparedStatement ps; private Connection connection; /** * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "insert into student(id, name, age) values(?, ?, ?);"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); //关闭连接和释放资源 if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } /** * 每条数据的插入都要调用一次 invoke() 方法 * * @param value * @param context * @throws Exception */ @Override public void invoke(Student value, Context context) throws Exception { //组装数据,执行插入操作 ps.setInt(1, value.getId()); ps.setString(2, value.getName()); ps.setInt(3, value.getAge()); ps.executeUpdate(); } private static Connection getConnection() { Connection con = null; try { Class.forName("com.mysql.cj.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456"); } catch (Exception e) { e.printStackTrace(); System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage()); } return con; }}
main方法:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcesource = environment.socketTextStream("192.168.152.45", 9999); SingleOutputStreamOperator studentStream = source.map(new MapFunction () { @Override public Student map(String value) throws Exception { String[] splits = value.split(","); Student student = new Student(); student.setId(Integer.parseInt(splits[0])); student.setName(splits[1]); student.setAge(Integer.parseInt(splits[2])); return student; } }); studentStream.addSink(new SinkToMySQL()); environment.execute("JavaCustomSinkToMysql"); }
从socket中获取数据,数据格式使用逗号分割,在控制台中输入:
nc -lk 99991,tom,23
检查数据库,数据库中多了一条数据
mysql> select * from student;+----+------+------+| id | name | age |+----+------+------+| 1 | tom | 23 |+----+------+------+1 row in set (0.00 sec)
这样就很方便的使用自定义的sink,写入到MySQL中去。
总结:
第一步:继承RichSinkFunction
T就是想要写入的对象类型 第二步:重写方法 open/close生命周期方法,invoke每条记录执行一次
默认情况下open方法的并行度不是1,跟具体的电脑有关系。
看完上述内容,你们掌握如何使用Apache Flink实现自定义Sink的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
数据
方法
数据库
对象
内容
更多
类型
问题
束手无策
为此
不用
原因
周期
对此
就是
情况
技能
控制台
时候
格式
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库原理与技术总结感想
网络安全数据库审计
网络监控系统管理服务器
管家婆服务器名字不对
河源网络安全吗
网络安全素质教育pdf版
多选题数据库 表设计
网络安全管理操作规程
远程无线网络技术
互联网文化科技创意
合肥万户网络技术招聘
日本大学网络安全专业
漳州平和dns服务器
校园网访问学校服务器
三级计算机视网络技术频教程
新版vcds数据库导入旧版
服务器上日志项目启动不成功
服务器开发工程师质量管理
不限流量服务器
计算机网络技术应用思维导图
https证书服务器
华为服务器无法识别固态硬盘
网络技术员招聘岗位职责
软件开发工作的求职信
庆祝建党百年网络安全保障
知识产权数据库服务是什么公司
ora数据库连接慢
软件开发潮流
山东省服务器代理商查询
离石区网络安全和信息化工作