不使用Sqoop流程,利用CacheManager直接完成SparkSQL数据流直接回写Oracle
以前都是使用Sqoop来完成数据从生成的hdfs数据存储上来抽取至oracle的数据库:sqoop抽取语句:
sqoop export --connect "jdbc:oracle:thin:@ip:port:sid" --username 用户名 --password 密码 --table sid.表名 --export-dir hdfs://nameservice1/user/XXX(hdfs地址) --fields-terminated-by "\001" --null-non-string '' --null-string '' -m 10;
由于项目需求我们现在要完成在代码中省城所需字段之后,直接回写到oracle中,因为数据量每天都很大,用实例或者List存有很大的局限性,可能会出现内存异常等不可预料的东西,所以我通过缓存器机制来存储数据,然后进行生成结果的临时表直接回写(后面做的hbase接口封装批量提交也比较类似) 废话不多说直接上代码: 1、建立缓存实体 package usi.java.oracle;
/**
- @author HK
@date 2011-2-15 下午06:45:57
*/
public class Cache {
private String key;
private Object value;
private long timeOut;
private boolean expired;
public Cache() {
super();
}public Cache(String key, String value, long timeOut, boolean expired) {
this.key = key;
this.value = value;
this.timeOut = timeOut;
this.expired = expired;
}public String getKey() {
return key;
}public long getTimeOut() {
return timeOut;
}public Object getValue() {
return value;
}public void setKey(String string) {
key = string;
}public void setTimeOut(long l) {
timeOut = l;
}public void setValue(Object object) {
value = object;
}public boolean isExpired() {
return expired;
}public void setExpired(boolean b) {
expired = b;
}
}
2、建立缓存控制器
package usi.java.oracle;
import java.util.Date;
import java.util.HashMap;
/**
- @author HK
@date 2011-2-15 下午09:40:00
*/
public class CacheManager {private static HashMap cacheMap = new HashMap();
/**
This class is singleton so private constructor is used.
*/
private CacheManager() {
super();
}/**
- returns cache item from hashmap
- @param key
@return Cache
*/
private synchronized static Cache getCache(String key) {
return (Cache)cacheMap.get(key);
}/**
- Looks at the hashmap if a cache item exists or not
- @param key
@return Cache
*/
private synchronized static boolean hasCache(String key) {
return cacheMap.containsKey(key);
}/**
Invalidates all cache
*/
public synchronized static void invalidateAll() {
cacheMap.clear();
}/**
- Invalidates a single cache item
@param key
*/
public synchronized static void invalidate(String key) {
cacheMap.remove(key);
}/**
- Adds new item to cache hashmap
- @param key
@return Cache
*/
private synchronized static void putCache(String key, Cache object) {
cacheMap.put(key, object);
}/**
- Reads a cache item's content
- @param key
@return
*/
public static Cache getContent(String key) {
if (hasCache(key)) {
Cache cache = getCache(key);
if (cacheExpired(cache)) {
cache.setExpired(true);
}
return cache;
} else {
return null;
}
}/**
- @param key
- @param content
@param ttl
*/
public static void putContent(String key, Object content, long ttl) {
Cache cache = new Cache();
cache.setKey(key);
cache.setValue(content);
cache.setTimeOut(ttl + new Date().getTime());
cache.setExpired(false);
putCache(key, cache);
}/* @modelguid {172828D6-3AB2-46C4-96E2-E72B34264031} /
private static boolean cacheExpired(Cache cache) {
if (cache == null) {
return false;
}
long milisNow = new Date().getTime();
long milisExpire = cache.getTimeOut();
if (milisExpire < 0) { // Cache never expires
return false;
} else if (milisNow >= milisExpire) {
return true;
} else {
return false;
}
}
}
3、建立需要导出数据对象
package usi.java.oracle;
public class TaskAll {
private String mme_eid;
private String mme_editor;
private String entitytype_eid;
private String project_eid;
private String resource_eid;
public String getMme_eid() {
return mme_eid;
}
public void setMme_eid(String mme_eid) {
this.mme_eid = mme_eid;
}
public String getMme_editor() {
return mme_editor;
}
public void setMme_editor(String mme_editor) {
this.mme_editor = mme_editor;
}
public String getEntitytype_eid() {
return entitytype_eid;
}
public void setEntitytype_eid(String entitytype_eid) {
this.entitytype_eid = entitytype_eid;
}
public String getProject_eid() {
return project_eid;
}
public void setProject_eid(String project_eid) {
this.project_eid = project_eid;
}
public String getResource_eid() {
return resource_eid;
}
public void setResource_eid(String resource_eid) {
this.resource_eid = resource_eid;
}
}
5、执行逻辑主体,回写数据,批量提交
package usi.java.oracle;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
//import java.sql.ResultSet;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
public class redict_to_171ora {
public static void main(String[] args) {
SparkConf sc = new SparkConf().setAppName("redict_to_171ora");
SparkContext jsc = new SparkContext(sc);
HiveContext hc = new HiveContext(jsc);
String hivesql1="select t.mme_eid,t.mme_editor,t.entitytype_eid,t.project_eid,t.resource_eid from usi_odso.c_taskall t limit 150000";
DataFrame redict_to_171ora= hc.sql(hivesql1); //redict_to_171ora.registerTempTable("hivesql1"); List collect=redict_to_171ora.javaRDD().collect(); int o=0; for (Row lists: collect){ TaskAll task=new TaskAll(); task.setMme_eid(lists.getString(0)); task.setMme_editor(lists.getString(1)); task.setEntitytype_eid(lists.getString(2)); task.setProject_eid(lists.getString(3)); task.setResource_eid(lists.getString(4)); CacheManager.putContent(o+"", task, 30000000); o++; /* System.out.println(lists.size()); System.out.println(lists.getString(0)); System.out.println(lists.getString(1)); System.out.println(lists.getString(2)); System.out.println(lists.getString(3)); System.out.println(lists.getString(4));*/ } System.out.println(o); Connection con = null;// 创建一个数据库连接 PreparedStatement pre = null;// 创建预编译语句对象,一般都是用这个而不用Statement //ResultSet result = null;// 创建一个结果集对象 try { Class.forName("oracle.jdbc.driver.OracleDriver");// 加载Oracle驱动程序 System.out.println("开始尝试连接数据库!"); String url = "jdbc:oracle:" + "thin:@ip:1521:sid";// 127.0.0.1是本机地址,XE是精简版Oracle的默认数据库名 String user = "user";// 用户名,系统默认的账户名 String password = "password";// 你安装时选设置的密码 con = DriverManager.getConnection(url, user, password);// 获取连接 System.out.println("连接成功!"); String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values(?,?,?,?,?)";// 预编译语句,"?"代表参数 pre = con.prepareStatement(sql);// 实例化预编译语句 for(int i=0;i
}
数据
参数
数据库
语句
对象
编译
结果
缓存
很大
代码
代表
地址
实例
密码
用户
用户名
索引
存储
抽取
生成
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
河北保定市网络安全知识答题
个人服务器备案信息如何填写
软件开发与编程科普
100个汉字在数据库
iphone11网络安全过低
校园代理服务器登录
网络安全宣传小卫士
王安网络技术服务公司
嵌入式软件开发中树的使用
行为管理服务器多少钱
漫威超级战争服务器地址
会计专业引文数据库
软件开发是读哪个专业的
媒体服务器 win8
数据库驱动程序是什么
mvc ef更新数据库
亲爱的 网络安全
2020年网络安全周以线上
专业网络技术供应
腾讯会议服务器异常是为什么
纯正ip数据库手机版
云服务器可以干什么
网络安全相关法律政策
毕节服务器云存储费用
怎么学习网络安全法
仓库erp软件开发定制公司
给通讯作者要数据库
计算机网络技术专业薪酬怎么样
vtuber数据库
寒战网络技术