flink 从mysql 读取数据 放入kafka中 用于搜索全量
发表于:2024-10-21 作者:千家信息网编辑
千家信息网最后更新 2024年10月21日,接着上一篇,将mysql的数据导入kafka中public static void main(String[] arg) throws Exception { TypeInformati
千家信息网最后更新 2024年10月21日flink 从mysql 读取数据 放入kafka中 用于搜索全量
接着上一篇,将mysql的数据导入kafka中
public static void main(String[] arg) throws Exception { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "name", "address" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/tablespace?characterEncoding=utf8") .setUsername("user").setPassword("root") .setQuery("select LOGIC_CODE, SHARE_LOG_CODE from table").setRowTypeInfo(rowTypeInfo).finish(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource s = env.createInput(jdbcInputFormat); BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); tableEnv.registerDataSet("t2", s); Table tapiResult = tableEnv.scan("t2"); System.out.println("schema is:"); tapiResult.printSchema(); Table query = tableEnv.sqlQuery("select name, address from t2"); DataSet ds= tableEnv.toDataSet(query, Result.class); DataSet temp=ds.map(new MapFunction() { @Override public String map(Result result) throws Exception { String name = result.name; String value = result.address; return name+":->:"+value; } }); logger.info("read db end"); KafkaOutputFormat kafkaOutput = KafkaOutputFormat.buildKafkaOutputFormat() .setBootstrapServers("ip:9092").setTopic("search_test_whk").setAcks("all").setBatchSize("1000") .setBufferMemory("100000").setLingerMs("1").setRetries("2").finish(); temp.output(kafkaOutput); logger.info("write kafka end"); env.execute("Flink add data source"); }
数据
上一
搜索
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
录屏软件开发最终版
数据库系统原理习题答案
全球网络安全产业分析
天津电力应急软件开发价格标准
门禁系统数据库未登记
网络舆情网络安全领导组
中国电子网络安全招聘
软件开发还要读啥证
惠普服务器管理口修改
广东炬烽网络技术科技有限公司
华为网络安全安防产品
命令行创建有密码的数据库
贵州网络技术职业学校
2018 网络安全竞赛
我的世界在线生存服务器
江湖软件开发公司
手游ea服务器连不上
联通软件开发岗位
一百个对象存入数据库
虚拟机服务器怎么绑定域名
惠普服务器面板指示灯橙色
uc浏览器总是提示网络安全
邮箱服务器拒收
北京货币天下网络技术有限
数据库怎样把角色的权限授予用户
互联网新科技金句
中药专利文献数据库
网络安全应急办公室设在
政治教育与网络技术
手机传奇服务器多少钱一个月