oracle队列(AQ)---实现orale到mysql的数据同步
发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,高级队列(Advanced Queue,简称AQ):高级队列是oracle的一种高级应用,它主要是表和触发器之间的组合而成的一种应用。其主要作用是在各应用系统中进行消息传递。利用高级队列来实现消息在两
千家信息网最后更新 2024年09月22日oracle队列(AQ)---实现orale到mysql的数据同步高级队列(Advanced Queue,简称AQ):
高级队列是oracle的一种高级应用,它主要是表和触发器之间的组合而成的一种应用。其主要作用是在各应用系统中进行消息传递。
利用高级队列来实现消息在两个不同数据库之间的异步传输,满足业务系统的改造需求。
我们利用触发器+高级序列 然后加ruby读取队列的主键,然后再在对应表中查出数据,insert 进mysql,这是idc机房oracle到idc的mysql的过程; 至于idc机房到阿里云的mysql,处于安全考虑,ruby不能直接连接rds,借助了mq, 先放放到mq,然后从mq读取放进rds. 实现oracle到mysql的同步。需要注意oracle高级序列是可以让多个 Oracle 高级队列具体开发步骤如下: (1)首先确定应用的需求,是否适合使用高级队列?使用高级队列预计提高性能的预期值
(2)赋予数据库账户相应aq权限。
(3)确定队列包体结构,即创建type。
(4)创建队列表及队列。
(5)队列管理
一:我们的队列结构(type):
因为我们oracle中信息表有两个:t_publish_info 和t_publish_zbxx,所以会比会员表t_member_info 的type多一个字段:table_name 用来区分是从那个表读取数据,其中opr也是是标识位,1表示insert, 2表示update ,3表示delete CREATE OR REPLACE TYPE INFOSERVICE."INFO_SYNC_TYPE2" as object ( table_name number(3), opr number(2) , record_id number(20) ); 二:队列表:INFOSERVICE.T_INFO_SYNC_MESSAGE begin sys.dbms_aqadm.create_queue_table( queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_payload_type => 'INFOSERVICE.INFO_SYNC_TYPE2', sort_list => 'ENQ_TIME', compatible => '10.0.0', primary_instance => 0, secondary_instance => 0, storage_clause => 'tablespace INFOSERVICE pctfree 10 initrans 1 maxtrans 255 storage ( initial 16M next 16M minextents 1 maxextents unlimited )'); end; / 三:创建队列: begin sys.dbms_aqadm.create_queue ( queue_name => 'infoservice.q_info_sync_message', queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_type => sys.dbms_aqadm.normal_queue, max_retries => 3, retry_delay => 1, retention_time => 0 ); end; 启动队列: begin sys.dbms_aqadm.start_queue (queue_name => 'infoservice.q_info_sync_message',enqueue => true ,dequeue => true ); end;
暂停队列: begin sys.dbms_aqadm.stop_queue ( queue_name => '队列名'); end; 删除队列: begin sys.dbms_aqadm.drop_queue ( queue_name => '队列名'); end; 删除队列表: begin sys.dbms_aqadm.drop_queue_table (queue_table => '队列表名'); end; 四:入队存储过程DBMS_AQ.enqueue: create or replace procedure infoservice.info_sync_enqueue( table_name in number, opr in number,record_id in number) as begin DECLARE queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; BEGIN my_message := info_sync_type2( table_name, opr,record_id ); DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id); COMMIT; END; end info_sync_enqueue; 出对存储过程DBMS_AQ.DEQUEUE: create or replace procedure infoservice.info_sync_dequeue(table_name out number,opr out number ,record_id out number) as begin DECLARE queue_options DBMS_AQ.DEQUEUE_OPTIONS_T; message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; message_id RAW(200); my_message info_sync_type2; BEGIN DBMS_AQ.DEQUEUE( queue_name => 'infoservice.q_info_sync_message', dequeue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id ); COMMIT; table_name := my_message.table_name; opr :=my_message. opr; record_id := my_message.record_id ; END; end info_sync_dequeue; 五:我们这里用的触发器实现自动入队,当在T_PUBLISH_INFO表上做增删改的时候,触发入队,通过标识字段 opr的值 来区分: 1表示insert, 2表示update ,3表示delete: create or replace trigger INFOSERVICE.TRG_PUBLISH_2016A_Q before insert or delete or update of table_name,table_name2,cust_id, title,publish_date,OK_STATUS,OK_DATE,UP_DATE,IN_DATE,FILE_NAME on T_PUBLISH_INFO for each row declare queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; opr number(2); table_name number(3); begin opr := 0; table_name := 22;###就把标识位置为22, CASE WHEN inserting THEN if (:NEW.OK_STATUS = 'Y') then opr := 1; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN updating THEN if ((:NEW.OK_STATUS != :OLD.OK_STATUS)or (:NEW.OK_STATUS = 'Y' )) then opr := 2; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN deleting THEN opr := 3; my_message := info_sync_type2( table_name, opr,:old.record_id ); END CASE; if ( opr != 0) then DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties =>message_properties, payload => my_message, msgid => message_id); end if; end; 这里的table_name:=22,就是说这个触发器入队的时候,就把标识位table_name设置成了22,方便接下来ruby在读取的时候,判断从那个表读取数据, call infoservice.info_sync_enqueue(21,1,39097403); call infoservice.info_sync_enqueue(155,1,39097403); ---这样就读t_publish_info_20142015 执行入队的存储过程,就会把type(INFO_SYNC_TYPE2)对应的三个字段的值存进队列表中,如下:需要注意的是队列表的字段是固定的。 SQL> desc INFOSERVICE.T_INFO_SYNC_MESSAGE; Name Null? Type ----------------------------------------- -------- ---------------------------- Q_NAME VARCHAR2(30) MSGID NOT NULL RAW(16) CORRID VARCHAR2(128) PRIORITY NUMBER STATE NUMBER DELAY TIMESTAMP(6) EXPIRATION NUMBER TIME_MANAGER_INFO TIMESTAMP(6) LOCAL_ORDER_NO NUMBER CHAIN_NO NUMBER CSCN NUMBER DSCN NUMBER ENQ_TIME TIMESTAMP(6) ENQ_UID VARCHAR2(30) ENQ_TID VARCHAR2(30) DEQ_TIME TIMESTAMP(6) DEQ_UID VARCHAR2(30) DEQ_TID VARCHAR2(30) RETRY_COUNT NUMBER EXCEPTION_QSCHEMA VARCHAR2(30) EXCEPTION_QUEUE VARCHAR2(30) STEP_NO NUMBER RECIPIENT_KEY NUMBER DEQUEUE_MSGID RAW(16) SENDER_NAME VARCHAR2(30) SENDER_ADDRESS VARCHAR2(1024) SENDER_PROTOCOL NUMBER USER_DATA INFOSERVICE.INFO_SYNC_TYPE2 USER_PROP ANYDATA 查看现在队列表中的数据,USER_DATA.table_name ,USER_DATA.opr ,USER_DATA.record_id 这三个是之前type中定义的字段。 select * from INFOSERVICE.T_INFO_SYNC_MESSAGE;
总结:我们这个案例中高级队列中实际上保存的最重要的信息是那个主键id,也就是record_id,然后ruby连接上oracle和mysql。通过主键id在oracle对应的表中查出数据(通过table_name 的值来判断是从哪个表找数据),通过标识符opr的值来判断是增删改操作。ruby读取oracle数据到内存中,然后insert 到mysql。实现了可靠的异步同步。
高级队列是oracle的一种高级应用,它主要是表和触发器之间的组合而成的一种应用。其主要作用是在各应用系统中进行消息传递。
利用高级队列来实现消息在两个不同数据库之间的异步传输,满足业务系统的改造需求。
我们利用触发器+高级序列 然后加ruby读取队列的主键,然后再在对应表中查出数据,insert 进mysql,这是idc机房oracle到idc的mysql的过程; 至于idc机房到阿里云的mysql,处于安全考虑,ruby不能直接连接rds,借助了mq, 先放放到mq,然后从mq读取放进rds. 实现oracle到mysql的同步。需要注意oracle高级序列是可以让多个 Oracle 高级队列具体开发步骤如下: (1)首先确定应用的需求,是否适合使用高级队列?使用高级队列预计提高性能的预期值
(2)赋予数据库账户相应aq权限。
(3)确定队列包体结构,即创建type。
(4)创建队列表及队列。
(5)队列管理
一:我们的队列结构(type):
因为我们oracle中信息表有两个:t_publish_info 和t_publish_zbxx,所以会比会员表t_member_info 的type多一个字段:table_name 用来区分是从那个表读取数据,其中opr也是是标识位,1表示insert, 2表示update ,3表示delete CREATE OR REPLACE TYPE INFOSERVICE."INFO_SYNC_TYPE2" as object ( table_name number(3), opr number(2) , record_id number(20) ); 二:队列表:INFOSERVICE.T_INFO_SYNC_MESSAGE begin sys.dbms_aqadm.create_queue_table( queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_payload_type => 'INFOSERVICE.INFO_SYNC_TYPE2', sort_list => 'ENQ_TIME', compatible => '10.0.0', primary_instance => 0, secondary_instance => 0, storage_clause => 'tablespace INFOSERVICE pctfree 10 initrans 1 maxtrans 255 storage ( initial 16M next 16M minextents 1 maxextents unlimited )'); end; / 三:创建队列: begin sys.dbms_aqadm.create_queue ( queue_name => 'infoservice.q_info_sync_message', queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_type => sys.dbms_aqadm.normal_queue, max_retries => 3, retry_delay => 1, retention_time => 0 ); end; 启动队列: begin sys.dbms_aqadm.start_queue (queue_name => 'infoservice.q_info_sync_message',enqueue => true ,dequeue => true ); end;
暂停队列: begin sys.dbms_aqadm.stop_queue ( queue_name => '队列名'); end; 删除队列: begin sys.dbms_aqadm.drop_queue ( queue_name => '队列名'); end; 删除队列表: begin sys.dbms_aqadm.drop_queue_table (queue_table => '队列表名'); end; 四:入队存储过程DBMS_AQ.enqueue: create or replace procedure infoservice.info_sync_enqueue( table_name in number, opr in number,record_id in number) as begin DECLARE queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; BEGIN my_message := info_sync_type2( table_name, opr,record_id ); DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id); COMMIT; END; end info_sync_enqueue; 出对存储过程DBMS_AQ.DEQUEUE: create or replace procedure infoservice.info_sync_dequeue(table_name out number,opr out number ,record_id out number) as begin DECLARE queue_options DBMS_AQ.DEQUEUE_OPTIONS_T; message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; message_id RAW(200); my_message info_sync_type2; BEGIN DBMS_AQ.DEQUEUE( queue_name => 'infoservice.q_info_sync_message', dequeue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id ); COMMIT; table_name := my_message.table_name; opr :=my_message. opr; record_id := my_message.record_id ; END; end info_sync_dequeue; 五:我们这里用的触发器实现自动入队,当在T_PUBLISH_INFO表上做增删改的时候,触发入队,通过标识字段 opr的值 来区分: 1表示insert, 2表示update ,3表示delete: create or replace trigger INFOSERVICE.TRG_PUBLISH_2016A_Q before insert or delete or update of table_name,table_name2,cust_id, title,publish_date,OK_STATUS,OK_DATE,UP_DATE,IN_DATE,FILE_NAME on T_PUBLISH_INFO for each row declare queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; opr number(2); table_name number(3); begin opr := 0; table_name := 22;###就把标识位置为22, CASE WHEN inserting THEN if (:NEW.OK_STATUS = 'Y') then opr := 1; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN updating THEN if ((:NEW.OK_STATUS != :OLD.OK_STATUS)or (:NEW.OK_STATUS = 'Y' )) then opr := 2; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN deleting THEN opr := 3; my_message := info_sync_type2( table_name, opr,:old.record_id ); END CASE; if ( opr != 0) then DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties =>message_properties, payload => my_message, msgid => message_id); end if; end; 这里的table_name:=22,就是说这个触发器入队的时候,就把标识位table_name设置成了22,方便接下来ruby在读取的时候,判断从那个表读取数据, call infoservice.info_sync_enqueue(21,1,39097403); call infoservice.info_sync_enqueue(155,1,39097403); ---这样就读t_publish_info_20142015 执行入队的存储过程,就会把type(INFO_SYNC_TYPE2)对应的三个字段的值存进队列表中,如下:需要注意的是队列表的字段是固定的。 SQL> desc INFOSERVICE.T_INFO_SYNC_MESSAGE; Name Null? Type ----------------------------------------- -------- ---------------------------- Q_NAME VARCHAR2(30) MSGID NOT NULL RAW(16) CORRID VARCHAR2(128) PRIORITY NUMBER STATE NUMBER DELAY TIMESTAMP(6) EXPIRATION NUMBER TIME_MANAGER_INFO TIMESTAMP(6) LOCAL_ORDER_NO NUMBER CHAIN_NO NUMBER CSCN NUMBER DSCN NUMBER ENQ_TIME TIMESTAMP(6) ENQ_UID VARCHAR2(30) ENQ_TID VARCHAR2(30) DEQ_TIME TIMESTAMP(6) DEQ_UID VARCHAR2(30) DEQ_TID VARCHAR2(30) RETRY_COUNT NUMBER EXCEPTION_QSCHEMA VARCHAR2(30) EXCEPTION_QUEUE VARCHAR2(30) STEP_NO NUMBER RECIPIENT_KEY NUMBER DEQUEUE_MSGID RAW(16) SENDER_NAME VARCHAR2(30) SENDER_ADDRESS VARCHAR2(1024) SENDER_PROTOCOL NUMBER USER_DATA INFOSERVICE.INFO_SYNC_TYPE2 USER_PROP ANYDATA 查看现在队列表中的数据,USER_DATA.table_name ,USER_DATA.opr ,USER_DATA.record_id 这三个是之前type中定义的字段。 select * from INFOSERVICE.T_INFO_SYNC_MESSAGE;
总结:我们这个案例中高级队列中实际上保存的最重要的信息是那个主键id,也就是record_id,然后ruby连接上oracle和mysql。通过主键id在oracle对应的表中查出数据(通过table_name 的值来判断是从哪个表找数据),通过标识符opr的值来判断是增删改操作。ruby读取oracle数据到内存中,然后insert 到mysql。实现了可靠的异步同步。
队列
数据
高级
字段
标识
触发器
过程
应用
时候
存储
同步
三个
两个
之间
信息
序列
数据库
机房
消息
系统
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络技术 国家三级 含金量
亚马逊数据库的建立
兰州app软件开发费用
计算器软件开发需求说明书
上海运维管理软件开发服务
酒店管理服务器配置
武汉商途在线软件开发公司
江西app软件开发一般要多少钱
创临 图数据库
武汉戴尔服务器高质量的选择
我的世界服务器活跃值指令
创业网络安全插画
蚁剑连接数据库
柬埔寨的网络技术
测试数据库被删了
南通朗涯互联网科技有限公司
网络里的数据库是哪种
网络安全督查的总结
做发卡网站需要什么服务器
计算机网络安全的情话
腾讯邮件接收服务器
河北省保密局网络技术处
静安区java软件开发在线咨询
网络安全手抄报幼儿图片
网络安全与网络暴力的看法
湖南会计软件开发中心
创业网络安全插画
非阻塞式服务器
网络安全检查工作胸牌
校园网络安全怎么设计