千家信息网

flink例子-读取数据库

发表于:2024-11-27 作者:千家信息网编辑
千家信息网最后更新 2024年11月27日,private final static Logger logger = LoggerFactory.getLogger(GetData.class); public static void m
千家信息网最后更新 2024年11月27日flink例子-读取数据库
private final static Logger logger = LoggerFactory.getLogger(GetData.class);    public static void main(String[] arg) throws Exception {        TypeInformation[] fieldTypes = new TypeInformation[] {                BasicTypeInfo.STRING_TYPE_INFO        };        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()                .setDrivername("com.mysql.jdbc.Driver")                .setDBUrl("jdbc:mysql://ip:3306/tablename?characterEncoding=utf8")                .setUsername("*")                .setPassword("*")                .setQuery("select name from words")                .setRowTypeInfo(rowTypeInfo)                .finish();    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();    DataSource s = env.createInput(jdbcInputFormat); // datasource    BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT());    tableEnv.registerDataSet("t2", s);    tableEnv.sqlQuery("select * from t2").printSchema();    Table query = tableEnv.sqlQuery("select * from t2");    DataSet result = tableEnv.toDataSet(query, Row.class);    result.print();    System.out.println(s.count());}

通过插件将所需的类打到一个jar中

                            maven-assembly-plugin                                                                false                                                                                jar-with-dependencies                                                                                                                                                                                                                            *                                                                                                                                                                                                                        make-assembly                                            package                                                                                                assembly                                                                                                                                

然后执行

./bin/flink run  /flink-1.8.0/collector-api-0.1.jar
0