千家信息网

flink 读取hive的数据

发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,flink1.8 对hive 的支持不够好,造成300W的数据,居然读了2个小时,打算将程序迁移至spark。 先把代码贴上。 后发现sql不应该有where条件,去掉后速度还行。maven
千家信息网最后更新 2024年11月26日flink 读取hive的数据

flink1.8 对hive 的支持不够好,造成300W的数据,居然读了2个小时,打算将程序迁移至spark。 先把代码贴上。 后发现sql不应该有where条件,去掉后速度还行。

maven

            org.apache.hive            hive-jdbc            1.1.0                            org.apache.hadoop            hadoop-common            3.1.2                               jdk.tools               jdk.tools               1.8               system               ${JAVA_HOME}/lib/tools.jar          

java

private final static String driverName = "org.apache.hive.jdbc.HiveDriver";// jdbc驱动路径    private final static String url = ";";// hive库地址+库名    private final static String user = "";// 用户名    private final static String password = "!";// 密码    private final static String table="";    private final static String sql = " ";    public static void main(String[] arg) throws Exception {        long time=System.currentTimeMillis();           HttpClientUtil.sendDingMessage("开始同步hive-"+table+";"+Utils.getTimeString());                /**         * 初始化环境         */        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(4);        try {            TypeInformation[] types = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};            String[] colName = new String[]{"user","name"};                 RowTypeInfo rowTypeInfo = new RowTypeInfo(types, colName);            JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat().setDrivername(driverName)                    .setDBUrl(url)                    .setUsername(user).setPassword(password);            Calendar calendar = Calendar.getInstance();            calendar.setTime(new Date());            calendar.add(Calendar.DATE, -1); //用昨天产出的数据            SimpleDateFormat sj = new SimpleDateFormat("yyyyMMdd");            String d=sj.format(calendar.getTime());            JDBCInputFormat jdbcInputFormat = builder.setQuery(sql+" and dt='"+d+"' limit 100000000").setRowTypeInfo(rowTypeInfo).finish();            DataSource rowlist = env.createInput(jdbcInputFormat);            DataSet temp= rowlist.filter(new FilterFunction(){                @Override                public boolean filter(Row row) throws Exception {                    String key=row.getField(0).toString();                    String value=row.getField(1).toString();                    if(key.length()<5 || key.startsWith("-") || key.startsWith("$") || value.length()<5 || value.startsWith("-") || value.startsWith("$")) {                        return false;                    }else {                        return true;                    }                }            }).map(new MapFunction(){                @Override                public RedisDataModel map(Row value) throws Exception {                    RedisDataModel m=new RedisDataModel();                    m.setExpire(-1);                     m.setKey(JobConstants.REDIS_FLINK_IMEI_USER+value.getField(0).toString());                          m.setGlobal(true);                    m.setValue(value.getField(1).toString());                    return m;                }             });            HttpClientUtil.sendDingMessage("同步hive-"+table+"完成;开始推送模型,共有"+temp.count()+"条;"+Utils.getTimeString());             RedisOutputFormat redisOutput = RedisOutputFormat.buildRedisOutputFormat()                    .setHostMaster(AppConfig.getProperty(JobConstants.REDIS_HOST_MASTER))                    .setHostSentinel(AppConfig.getProperty(JobConstants.REDIS_HOST_SENTINELS))                    .setMaxIdle(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXIDLE)))                    .setMaxTotal(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXTOTAL)))                     .setMaxWaitMillis(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXWAITMILLIS)))                    .setTestOnBorrow(Boolean.parseBoolean(AppConfig.getProperty(JobConstants.REDIS_TESTONBORROW)))                    .finish();               temp.output(redisOutput);                           env.execute("hive-"+table+" sync");            HttpClientUtil.sendDingMessage("同步hive-"+table+"完成,耗时:"+(System.currentTimeMillis()-time)/1000+"s");         } catch (Exception e) {            logger.error("",e);             HttpClientUtil.sendDingMessage("同步hive-"+table+"失败,时间戳:"+time+",原因:"+e.toString());        } 
0