flink 读取hive的数据
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,flink1.8 对hive 的支持不够好,造成300W的数据,居然读了2个小时,打算将程序迁移至spark。 先把代码贴上。 后发现sql不应该有where条件,去掉后速度还行。maven
千家信息网最后更新 2025年02月02日flink 读取hive的数据
flink1.8 对hive 的支持不够好,造成300W的数据,居然读了2个小时,打算将程序迁移至spark。 先把代码贴上。 后发现sql不应该有where条件,去掉后速度还行。
maven
org.apache.hive hive-jdbc 1.1.0 org.apache.hadoop hadoop-common 3.1.2 jdk.tools jdk.tools 1.8 system ${JAVA_HOME}/lib/tools.jar
java
private final static String driverName = "org.apache.hive.jdbc.HiveDriver";// jdbc驱动路径 private final static String url = ";";// hive库地址+库名 private final static String user = "";// 用户名 private final static String password = "!";// 密码 private final static String table=""; private final static String sql = " "; public static void main(String[] arg) throws Exception { long time=System.currentTimeMillis(); HttpClientUtil.sendDingMessage("开始同步hive-"+table+";"+Utils.getTimeString()); /** * 初始化环境 */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); try { TypeInformation[] types = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}; String[] colName = new String[]{"user","name"}; RowTypeInfo rowTypeInfo = new RowTypeInfo(types, colName); JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat().setDrivername(driverName) .setDBUrl(url) .setUsername(user).setPassword(password); Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date()); calendar.add(Calendar.DATE, -1); //用昨天产出的数据 SimpleDateFormat sj = new SimpleDateFormat("yyyyMMdd"); String d=sj.format(calendar.getTime()); JDBCInputFormat jdbcInputFormat = builder.setQuery(sql+" and dt='"+d+"' limit 100000000").setRowTypeInfo(rowTypeInfo).finish(); DataSource rowlist = env.createInput(jdbcInputFormat); DataSet temp= rowlist.filter(new FilterFunction(){ @Override public boolean filter(Row row) throws Exception { String key=row.getField(0).toString(); String value=row.getField(1).toString(); if(key.length()<5 || key.startsWith("-") || key.startsWith("$") || value.length()<5 || value.startsWith("-") || value.startsWith("$")) { return false; }else { return true; } } }).map(new MapFunction(){ @Override public RedisDataModel map(Row value) throws Exception { RedisDataModel m=new RedisDataModel(); m.setExpire(-1); m.setKey(JobConstants.REDIS_FLINK_IMEI_USER+value.getField(0).toString()); m.setGlobal(true); m.setValue(value.getField(1).toString()); return m; } }); HttpClientUtil.sendDingMessage("同步hive-"+table+"完成;开始推送模型,共有"+temp.count()+"条;"+Utils.getTimeString()); RedisOutputFormat redisOutput = RedisOutputFormat.buildRedisOutputFormat() .setHostMaster(AppConfig.getProperty(JobConstants.REDIS_HOST_MASTER)) .setHostSentinel(AppConfig.getProperty(JobConstants.REDIS_HOST_SENTINELS)) .setMaxIdle(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXIDLE))) .setMaxTotal(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXTOTAL))) .setMaxWaitMillis(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXWAITMILLIS))) .setTestOnBorrow(Boolean.parseBoolean(AppConfig.getProperty(JobConstants.REDIS_TESTONBORROW))) .finish(); temp.output(redisOutput); env.execute("hive-"+table+" sync"); HttpClientUtil.sendDingMessage("同步hive-"+table+"完成,耗时:"+(System.currentTimeMillis()-time)/1000+"s"); } catch (Exception e) { logger.error("",e); HttpClientUtil.sendDingMessage("同步hive-"+table+"失败,时间戳:"+time+",原因:"+e.toString()); }
同步
数据
不够
代码
原因
地址
密码
小时
时间
条件
模型
环境
用户
用户名
程序
路径
速度
产出
推送
支持
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
陕西网络技术服务含义
中指数据库免费
网络技术 百度网盘
杭州软件开发增值税
全区网络安全和信息化研讨班
网络安全宣传手抄报小洋老师
天津pdu服务器电源厂商哪家好
信息采集软件连不上数据库
杜比服务器开不了机
嘉定区网络技术服务推荐咨询
杭州云搜网络技术有限公司介绍
君凤煌商城软件开发
通卡网络技术有限公司官网
服务器健康指示灯黄灯闪烁
深圳找大壮互联网科技有限公司
浙江树橙网络技术有限公司
网络安全新形势如何应对
浦东新区智能软件开发零售价格
互联网科技口译
仙络 服务器
长沙鹤岗app软件开发
网络服务器经常忙
想学好数据库技术
开展网络安全知识培训
分布式服务器架构视频
现在还有网络安全证书吗
网络安全法 网站
简答数据库三级模式结构
今元网络技术
软件开发程序员 码农