不使用Sqoop流程,利用CacheManager直接完成SparkSQL数据流直接回写Oracle
以前都是使用Sqoop来完成数据从生成的hdfs数据存储上来抽取至oracle的数据库:sqoop抽取语句:
sqoop export --connect "jdbc:oracle:thin:@ip:port:sid" --username 用户名 --password 密码 --table sid.表名 --export-dir hdfs://nameservice1/user/XXX(hdfs地址) --fields-terminated-by "\001" --null-non-string '' --null-string '' -m 10;
由于项目需求我们现在要完成在代码中省城所需字段之后,直接回写到oracle中,因为数据量每天都很大,用实例或者List存有很大的局限性,可能会出现内存异常等不可预料的东西,所以我通过缓存器机制来存储数据,然后进行生成结果的临时表直接回写(后面做的hbase接口封装批量提交也比较类似) 废话不多说直接上代码: 1、建立缓存实体 package usi.java.oracle;
/**
- @author HK
@date 2011-2-15 下午06:45:57
*/
public class Cache {
private String key;
private Object value;
private long timeOut;
private boolean expired;
public Cache() {
super();
}public Cache(String key, String value, long timeOut, boolean expired) {
this.key = key;
this.value = value;
this.timeOut = timeOut;
this.expired = expired;
}public String getKey() {
return key;
}public long getTimeOut() {
return timeOut;
}public Object getValue() {
return value;
}public void setKey(String string) {
key = string;
}public void setTimeOut(long l) {
timeOut = l;
}public void setValue(Object object) {
value = object;
}public boolean isExpired() {
return expired;
}public void setExpired(boolean b) {
expired = b;
}
}
2、建立缓存控制器
package usi.java.oracle;
import java.util.Date;
import java.util.HashMap;
/**
- @author HK
@date 2011-2-15 下午09:40:00
*/
public class CacheManager {private static HashMap cacheMap = new HashMap();
/**
This class is singleton so private constructor is used.
*/
private CacheManager() {
super();
}/**
- returns cache item from hashmap
- @param key
@return Cache
*/
private synchronized static Cache getCache(String key) {
return (Cache)cacheMap.get(key);
}/**
- Looks at the hashmap if a cache item exists or not
- @param key
@return Cache
*/
private synchronized static boolean hasCache(String key) {
return cacheMap.containsKey(key);
}/**
Invalidates all cache
*/
public synchronized static void invalidateAll() {
cacheMap.clear();
}/**
- Invalidates a single cache item
@param key
*/
public synchronized static void invalidate(String key) {
cacheMap.remove(key);
}/**
- Adds new item to cache hashmap
- @param key
@return Cache
*/
private synchronized static void putCache(String key, Cache object) {
cacheMap.put(key, object);
}/**
- Reads a cache item's content
- @param key
@return
*/
public static Cache getContent(String key) {
if (hasCache(key)) {
Cache cache = getCache(key);
if (cacheExpired(cache)) {
cache.setExpired(true);
}
return cache;
} else {
return null;
}
}/**
- @param key
- @param content
@param ttl
*/
public static void putContent(String key, Object content, long ttl) {
Cache cache = new Cache();
cache.setKey(key);
cache.setValue(content);
cache.setTimeOut(ttl + new Date().getTime());
cache.setExpired(false);
putCache(key, cache);
}/* @modelguid {172828D6-3AB2-46C4-96E2-E72B34264031} /
private static boolean cacheExpired(Cache cache) {
if (cache == null) {
return false;
}
long milisNow = new Date().getTime();
long milisExpire = cache.getTimeOut();
if (milisExpire < 0) { // Cache never expires
return false;
} else if (milisNow >= milisExpire) {
return true;
} else {
return false;
}
}
}
3、建立需要导出数据对象
package usi.java.oracle;
public class TaskAll {
private String mme_eid;
private String mme_editor;
private String entitytype_eid;
private String project_eid;
private String resource_eid;
public String getMme_eid() {
return mme_eid;
}
public void setMme_eid(String mme_eid) {
this.mme_eid = mme_eid;
}
public String getMme_editor() {
return mme_editor;
}
public void setMme_editor(String mme_editor) {
this.mme_editor = mme_editor;
}
public String getEntitytype_eid() {
return entitytype_eid;
}
public void setEntitytype_eid(String entitytype_eid) {
this.entitytype_eid = entitytype_eid;
}
public String getProject_eid() {
return project_eid;
}
public void setProject_eid(String project_eid) {
this.project_eid = project_eid;
}
public String getResource_eid() {
return resource_eid;
}
public void setResource_eid(String resource_eid) {
this.resource_eid = resource_eid;
}
}
5、执行逻辑主体,回写数据,批量提交
package usi.java.oracle;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
//import java.sql.ResultSet;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
public class redict_to_171ora {
public static void main(String[] args) {
SparkConf sc = new SparkConf().setAppName("redict_to_171ora");
SparkContext jsc = new SparkContext(sc);
HiveContext hc = new HiveContext(jsc);
String hivesql1="select t.mme_eid,t.mme_editor,t.entitytype_eid,t.project_eid,t.resource_eid from usi_odso.c_taskall t limit 150000";
DataFrame redict_to_171ora= hc.sql(hivesql1); //redict_to_171ora.registerTempTable("hivesql1"); List collect=redict_to_171ora.javaRDD().collect(); int o=0; for (Row lists: collect){ TaskAll task=new TaskAll(); task.setMme_eid(lists.getString(0)); task.setMme_editor(lists.getString(1)); task.setEntitytype_eid(lists.getString(2)); task.setProject_eid(lists.getString(3)); task.setResource_eid(lists.getString(4)); CacheManager.putContent(o+"", task, 30000000); o++; /* System.out.println(lists.size()); System.out.println(lists.getString(0)); System.out.println(lists.getString(1)); System.out.println(lists.getString(2)); System.out.println(lists.getString(3)); System.out.println(lists.getString(4));*/ } System.out.println(o); Connection con = null;// 创建一个数据库连接 PreparedStatement pre = null;// 创建预编译语句对象,一般都是用这个而不用Statement //ResultSet result = null;// 创建一个结果集对象 try { Class.forName("oracle.jdbc.driver.OracleDriver");// 加载Oracle驱动程序 System.out.println("开始尝试连接数据库!"); String url = "jdbc:oracle:" + "thin:@ip:1521:sid";// 127.0.0.1是本机地址,XE是精简版Oracle的默认数据库名 String user = "user";// 用户名,系统默认的账户名 String password = "password";// 你安装时选设置的密码 con = DriverManager.getConnection(url, user, password);// 获取连接 System.out.println("连接成功!"); String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values(?,?,?,?,?)";// 预编译语句,"?"代表参数 pre = con.prepareStatement(sql);// 实例化预编译语句 for(int i=0;i
}
数据
参数
数据库
语句
对象
编译
结果
缓存
很大
代码
代表
地址
实例
密码
用户
用户名
索引
存储
抽取
生成
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
web数据库建立用什么技术
软件开发项目税票
数据库怎么判断bug
郑州定制软件开发服务商
兰州网络安全院士
北京飞鲨软件开发
宝山区数据软件开发代理品牌
计算机网络技术不会怎么办
世卫组织网络安全会议
5万人同时访问服务器
加载数据库驱动时抛出异常
电视剧软件开发
第二届江西省网络安全大赛
思科服务器配置ap
国开计算机网络技术课程
诸城市各大软件开发程序员薪资
武装突袭3著名服务器
水务行业网络安全整体解决方案
西安安卓软件开发
金达网络技术有限公司
视频软件开发自学
自建dns服务器访问公网
网络安全从我做起图片
浙江龙芯服务器
网络安全法由 于2016年
华为网络安全和隐私中心组织
易界在线网络技术
服务器内存条插入顺序
餐馆数据库
大兴网络技术