spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是
千家信息网最后更新 2025年02月04日spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库
学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark
以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者oracle等关系型数据库:
package com.twq.javaapi.java7;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.rdd.JdbcRDD;import java.io.Serializable;import java.sql.*;public class JavaJdbcRDDSuite implements Serializable { public static void prepareData() throws ClassNotFoundException, SQLException { //使用本地数据库derby,当然可以使用mysql等关系型数据库 Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); Connection connection = DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true"); try { //创建一张表FOO,ID是一个自增的主键,DATA是一个INTEGER列 Statement create = connection.createStatement(); create.execute( "CREATE TABLE FOO(" + "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + "DATA INTEGER)"); create.close(); //插入数据 PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)"); for (int i = 1; i <= 5; i++) { insert.setInt(1, i * 2); insert.executeUpdate(); } insert.close(); } catch (SQLException e) { // If table doesn't exist... if (e.getSQLState().compareTo("X0Y32") != 0) { throw e; } } finally { connection.close(); } } public static void shutdownDB() throws SQLException { try { DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true"); } catch (SQLException e) { // Throw if not normal single database shutdown // https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html if (e.getSQLState().compareTo("08006") != 0) { throw e; } } } public static void main(String[] args) throws Exception { JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite"); //准备数据 prepareData(); //构建JdbcRDD JavaRDDrdd = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { @Override public Connection getConnection() throws SQLException { return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"); } }, "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 5, 1, new Function () { @Override public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); //结果: [2, 4, 6, 8, 10] System.out.println(rdd.collect()); shutdownDB(); sc.stop(); }}
详细了解RDD的api的话,可以参考: spark core RDD api原理详解
数据
数据库
参考
原理
技术
结果
建一
准备
学习
由浅入深
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
学习数据库最好的书是
网络安全需要人注意什么
网络安全素养周的心得体会
苏州朗钧网络技术服务
贵州服务器电源哪家便宜
沂水软件开发培训班在线学习
软件开发网格
进出口软件开发服务要报关
开展网络安全巡查
大冶正规计算机软件开发
阿里用的是什么数据库
软件开发数据隐私标记
rds数据库服务器
来安自动化软件开发技术怎么样
达梦数据库怎么启服务
合众 网络技术
遥控器无法连接服务器
网络安全和生态安全
网络安全可靠性报告
网络安全职业技能竞赛决赛举行
黑客属于客户端还是服务器端
安卓软件开发技校
网络安全宣传行动
北京储存服务器虚拟化软件云空间
数据库有什么好书
来安自动化软件开发技术怎么样
sql数据库评估期过了怎么办
崇明区营销网络技术包括什么
网络安全雷区
学生网络安全发言稿320