flink 读取hive的数据
发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,flink1.8 对hive 的支持不够好,造成300W的数据,居然读了2个小时,打算将程序迁移至spark。 先把代码贴上。 后发现sql不应该有where条件,去掉后速度还行。maven
千家信息网最后更新 2024年11月26日flink 读取hive的数据
flink1.8 对hive 的支持不够好,造成300W的数据,居然读了2个小时,打算将程序迁移至spark。 先把代码贴上。 后发现sql不应该有where条件,去掉后速度还行。
maven
org.apache.hive hive-jdbc 1.1.0 org.apache.hadoop hadoop-common 3.1.2 jdk.tools jdk.tools 1.8 system ${JAVA_HOME}/lib/tools.jar
java
private final static String driverName = "org.apache.hive.jdbc.HiveDriver";// jdbc驱动路径 private final static String url = ";";// hive库地址+库名 private final static String user = "";// 用户名 private final static String password = "!";// 密码 private final static String table=""; private final static String sql = " "; public static void main(String[] arg) throws Exception { long time=System.currentTimeMillis(); HttpClientUtil.sendDingMessage("开始同步hive-"+table+";"+Utils.getTimeString()); /** * 初始化环境 */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); try { TypeInformation[] types = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}; String[] colName = new String[]{"user","name"}; RowTypeInfo rowTypeInfo = new RowTypeInfo(types, colName); JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat().setDrivername(driverName) .setDBUrl(url) .setUsername(user).setPassword(password); Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date()); calendar.add(Calendar.DATE, -1); //用昨天产出的数据 SimpleDateFormat sj = new SimpleDateFormat("yyyyMMdd"); String d=sj.format(calendar.getTime()); JDBCInputFormat jdbcInputFormat = builder.setQuery(sql+" and dt='"+d+"' limit 100000000").setRowTypeInfo(rowTypeInfo).finish(); DataSource rowlist = env.createInput(jdbcInputFormat); DataSet temp= rowlist.filter(new FilterFunction(){ @Override public boolean filter(Row row) throws Exception { String key=row.getField(0).toString(); String value=row.getField(1).toString(); if(key.length()<5 || key.startsWith("-") || key.startsWith("$") || value.length()<5 || value.startsWith("-") || value.startsWith("$")) { return false; }else { return true; } } }).map(new MapFunction(){ @Override public RedisDataModel map(Row value) throws Exception { RedisDataModel m=new RedisDataModel(); m.setExpire(-1); m.setKey(JobConstants.REDIS_FLINK_IMEI_USER+value.getField(0).toString()); m.setGlobal(true); m.setValue(value.getField(1).toString()); return m; } }); HttpClientUtil.sendDingMessage("同步hive-"+table+"完成;开始推送模型,共有"+temp.count()+"条;"+Utils.getTimeString()); RedisOutputFormat redisOutput = RedisOutputFormat.buildRedisOutputFormat() .setHostMaster(AppConfig.getProperty(JobConstants.REDIS_HOST_MASTER)) .setHostSentinel(AppConfig.getProperty(JobConstants.REDIS_HOST_SENTINELS)) .setMaxIdle(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXIDLE))) .setMaxTotal(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXTOTAL))) .setMaxWaitMillis(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXWAITMILLIS))) .setTestOnBorrow(Boolean.parseBoolean(AppConfig.getProperty(JobConstants.REDIS_TESTONBORROW))) .finish(); temp.output(redisOutput); env.execute("hive-"+table+" sync"); HttpClientUtil.sendDingMessage("同步hive-"+table+"完成,耗时:"+(System.currentTimeMillis()-time)/1000+"s"); } catch (Exception e) { logger.error("",e); HttpClientUtil.sendDingMessage("同步hive-"+table+"失败,时间戳:"+time+",原因:"+e.toString()); }
同步
数据
不够
代码
原因
地址
密码
小时
时间
条件
模型
环境
用户
用户名
程序
路径
速度
产出
推送
支持
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
运维人员服务器管理
网络安全与执法难不难
做软件开发必须要学会什么
山海关网络安全
佛山网络安全工程师入行门槛低
服务器部署java网站
在vfp中什么是数据库
自动注册软件开发工具
新时代网络技术有限公司
单机穿越火线怎么开服务器
御龙网络安全海报
杰奇数据库进不去
航线数据库设计
数据库 容器技术
sql2005数据库可疑
如何安装数据库监控
工商学院网络安全知识竞赛
汽车行业网络安全案例
服务器怎么增加离线下载
web服务器的管理员联系
汽车app服务器什么意思
优质的联想服务器
数据库中继承建表
宁波北仑区联想机架式刀片服务器
天津流水互联网科技有限公司
出国做网络技术
软件开发工资发放数据流程图
aspice软件开发
前端代码执行和服务器有关系吗
国产数据库sm2加密