flink 从mysql 读取数据 放入kafka中 用于搜索全量
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,接着上一篇,将mysql的数据导入kafka中public static void main(String[] arg) throws Exception { TypeInformati
千家信息网最后更新 2025年01月23日flink 从mysql 读取数据 放入kafka中 用于搜索全量
接着上一篇,将mysql的数据导入kafka中
public static void main(String[] arg) throws Exception { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "name", "address" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/tablespace?characterEncoding=utf8") .setUsername("user").setPassword("root") .setQuery("select LOGIC_CODE, SHARE_LOG_CODE from table").setRowTypeInfo(rowTypeInfo).finish(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource s = env.createInput(jdbcInputFormat); BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); tableEnv.registerDataSet("t2", s); Table tapiResult = tableEnv.scan("t2"); System.out.println("schema is:"); tapiResult.printSchema(); Table query = tableEnv.sqlQuery("select name, address from t2"); DataSet ds= tableEnv.toDataSet(query, Result.class); DataSet temp=ds.map(new MapFunction() { @Override public String map(Result result) throws Exception { String name = result.name; String value = result.address; return name+":->:"+value; } }); logger.info("read db end"); KafkaOutputFormat kafkaOutput = KafkaOutputFormat.buildKafkaOutputFormat() .setBootstrapServers("ip:9092").setTopic("search_test_whk").setAcks("all").setBatchSize("1000") .setBufferMemory("100000").setLingerMs("1").setRetries("2").finish(); temp.output(kafkaOutput); logger.info("write kafka end"); env.execute("Flink add data source"); }
数据
上一
搜索
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全法实施条例第四章解析
hpilo4服务器系统安装
夏普服务器原理图
成都h5游戏软件开发
网络安全手抄报
上海大数据时钟同步服务器
站群服务器来选火星6服务
河北省台式电脑服务器自动生产线
wms仓储软件开发
我的世界怎么创造模组服务器
联通几点更新数据库
软件开发50岁还能做吗
数字资产交易软件开发商
诛仙liunx数据库怎么设置
青岛百灵互联网工业科技
宝塔网站和数据库怎么备份
软件开发流程流程
数据库设计假设学生选修
网络技术安全怎样挣钱
ipv6的服务器多少台
ciw网络安全试题 免费
s8s8网络技术论坛
东莞智能软件开发商家
erp软件开发多久
查征信和大数据库
orcle数据库脚本
如何破坏数据库的完整性规则
网络安全教育活动
全国网络安全工作会议新闻
c c 做软件开发