oracle队列(AQ)---实现orale到mysql的数据同步
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,高级队列(Advanced Queue,简称AQ):高级队列是oracle的一种高级应用,它主要是表和触发器之间的组合而成的一种应用。其主要作用是在各应用系统中进行消息传递。利用高级队列来实现消息在两
千家信息网最后更新 2025年01月23日oracle队列(AQ)---实现orale到mysql的数据同步高级队列(Advanced Queue,简称AQ):
高级队列是oracle的一种高级应用,它主要是表和触发器之间的组合而成的一种应用。其主要作用是在各应用系统中进行消息传递。
利用高级队列来实现消息在两个不同数据库之间的异步传输,满足业务系统的改造需求。
我们利用触发器+高级序列 然后加ruby读取队列的主键,然后再在对应表中查出数据,insert 进mysql,这是idc机房oracle到idc的mysql的过程; 至于idc机房到阿里云的mysql,处于安全考虑,ruby不能直接连接rds,借助了mq, 先放放到mq,然后从mq读取放进rds. 实现oracle到mysql的同步。需要注意oracle高级序列是可以让多个 Oracle 高级队列具体开发步骤如下: (1)首先确定应用的需求,是否适合使用高级队列?使用高级队列预计提高性能的预期值
(2)赋予数据库账户相应aq权限。
(3)确定队列包体结构,即创建type。
(4)创建队列表及队列。
(5)队列管理
一:我们的队列结构(type):
因为我们oracle中信息表有两个:t_publish_info 和t_publish_zbxx,所以会比会员表t_member_info 的type多一个字段:table_name 用来区分是从那个表读取数据,其中opr也是是标识位,1表示insert, 2表示update ,3表示delete CREATE OR REPLACE TYPE INFOSERVICE."INFO_SYNC_TYPE2" as object ( table_name number(3), opr number(2) , record_id number(20) ); 二:队列表:INFOSERVICE.T_INFO_SYNC_MESSAGE begin sys.dbms_aqadm.create_queue_table( queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_payload_type => 'INFOSERVICE.INFO_SYNC_TYPE2', sort_list => 'ENQ_TIME', compatible => '10.0.0', primary_instance => 0, secondary_instance => 0, storage_clause => 'tablespace INFOSERVICE pctfree 10 initrans 1 maxtrans 255 storage ( initial 16M next 16M minextents 1 maxextents unlimited )'); end; / 三:创建队列: begin sys.dbms_aqadm.create_queue ( queue_name => 'infoservice.q_info_sync_message', queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_type => sys.dbms_aqadm.normal_queue, max_retries => 3, retry_delay => 1, retention_time => 0 ); end; 启动队列: begin sys.dbms_aqadm.start_queue (queue_name => 'infoservice.q_info_sync_message',enqueue => true ,dequeue => true ); end;
暂停队列: begin sys.dbms_aqadm.stop_queue ( queue_name => '队列名'); end; 删除队列: begin sys.dbms_aqadm.drop_queue ( queue_name => '队列名'); end; 删除队列表: begin sys.dbms_aqadm.drop_queue_table (queue_table => '队列表名'); end; 四:入队存储过程DBMS_AQ.enqueue: create or replace procedure infoservice.info_sync_enqueue( table_name in number, opr in number,record_id in number) as begin DECLARE queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; BEGIN my_message := info_sync_type2( table_name, opr,record_id ); DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id); COMMIT; END; end info_sync_enqueue; 出对存储过程DBMS_AQ.DEQUEUE: create or replace procedure infoservice.info_sync_dequeue(table_name out number,opr out number ,record_id out number) as begin DECLARE queue_options DBMS_AQ.DEQUEUE_OPTIONS_T; message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; message_id RAW(200); my_message info_sync_type2; BEGIN DBMS_AQ.DEQUEUE( queue_name => 'infoservice.q_info_sync_message', dequeue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id ); COMMIT; table_name := my_message.table_name; opr :=my_message. opr; record_id := my_message.record_id ; END; end info_sync_dequeue; 五:我们这里用的触发器实现自动入队,当在T_PUBLISH_INFO表上做增删改的时候,触发入队,通过标识字段 opr的值 来区分: 1表示insert, 2表示update ,3表示delete: create or replace trigger INFOSERVICE.TRG_PUBLISH_2016A_Q before insert or delete or update of table_name,table_name2,cust_id, title,publish_date,OK_STATUS,OK_DATE,UP_DATE,IN_DATE,FILE_NAME on T_PUBLISH_INFO for each row declare queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; opr number(2); table_name number(3); begin opr := 0; table_name := 22;###就把标识位置为22, CASE WHEN inserting THEN if (:NEW.OK_STATUS = 'Y') then opr := 1; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN updating THEN if ((:NEW.OK_STATUS != :OLD.OK_STATUS)or (:NEW.OK_STATUS = 'Y' )) then opr := 2; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN deleting THEN opr := 3; my_message := info_sync_type2( table_name, opr,:old.record_id ); END CASE; if ( opr != 0) then DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties =>message_properties, payload => my_message, msgid => message_id); end if; end; 这里的table_name:=22,就是说这个触发器入队的时候,就把标识位table_name设置成了22,方便接下来ruby在读取的时候,判断从那个表读取数据, call infoservice.info_sync_enqueue(21,1,39097403); call infoservice.info_sync_enqueue(155,1,39097403); ---这样就读t_publish_info_20142015 执行入队的存储过程,就会把type(INFO_SYNC_TYPE2)对应的三个字段的值存进队列表中,如下:需要注意的是队列表的字段是固定的。 SQL> desc INFOSERVICE.T_INFO_SYNC_MESSAGE; Name Null? Type ----------------------------------------- -------- ---------------------------- Q_NAME VARCHAR2(30) MSGID NOT NULL RAW(16) CORRID VARCHAR2(128) PRIORITY NUMBER STATE NUMBER DELAY TIMESTAMP(6) EXPIRATION NUMBER TIME_MANAGER_INFO TIMESTAMP(6) LOCAL_ORDER_NO NUMBER CHAIN_NO NUMBER CSCN NUMBER DSCN NUMBER ENQ_TIME TIMESTAMP(6) ENQ_UID VARCHAR2(30) ENQ_TID VARCHAR2(30) DEQ_TIME TIMESTAMP(6) DEQ_UID VARCHAR2(30) DEQ_TID VARCHAR2(30) RETRY_COUNT NUMBER EXCEPTION_QSCHEMA VARCHAR2(30) EXCEPTION_QUEUE VARCHAR2(30) STEP_NO NUMBER RECIPIENT_KEY NUMBER DEQUEUE_MSGID RAW(16) SENDER_NAME VARCHAR2(30) SENDER_ADDRESS VARCHAR2(1024) SENDER_PROTOCOL NUMBER USER_DATA INFOSERVICE.INFO_SYNC_TYPE2 USER_PROP ANYDATA 查看现在队列表中的数据,USER_DATA.table_name ,USER_DATA.opr ,USER_DATA.record_id 这三个是之前type中定义的字段。 select * from INFOSERVICE.T_INFO_SYNC_MESSAGE;
总结:我们这个案例中高级队列中实际上保存的最重要的信息是那个主键id,也就是record_id,然后ruby连接上oracle和mysql。通过主键id在oracle对应的表中查出数据(通过table_name 的值来判断是从哪个表找数据),通过标识符opr的值来判断是增删改操作。ruby读取oracle数据到内存中,然后insert 到mysql。实现了可靠的异步同步。
高级队列是oracle的一种高级应用,它主要是表和触发器之间的组合而成的一种应用。其主要作用是在各应用系统中进行消息传递。
利用高级队列来实现消息在两个不同数据库之间的异步传输,满足业务系统的改造需求。
我们利用触发器+高级序列 然后加ruby读取队列的主键,然后再在对应表中查出数据,insert 进mysql,这是idc机房oracle到idc的mysql的过程; 至于idc机房到阿里云的mysql,处于安全考虑,ruby不能直接连接rds,借助了mq, 先放放到mq,然后从mq读取放进rds. 实现oracle到mysql的同步。需要注意oracle高级序列是可以让多个 Oracle 高级队列具体开发步骤如下: (1)首先确定应用的需求,是否适合使用高级队列?使用高级队列预计提高性能的预期值
(2)赋予数据库账户相应aq权限。
(3)确定队列包体结构,即创建type。
(4)创建队列表及队列。
(5)队列管理
一:我们的队列结构(type):
因为我们oracle中信息表有两个:t_publish_info 和t_publish_zbxx,所以会比会员表t_member_info 的type多一个字段:table_name 用来区分是从那个表读取数据,其中opr也是是标识位,1表示insert, 2表示update ,3表示delete CREATE OR REPLACE TYPE INFOSERVICE."INFO_SYNC_TYPE2" as object ( table_name number(3), opr number(2) , record_id number(20) ); 二:队列表:INFOSERVICE.T_INFO_SYNC_MESSAGE begin sys.dbms_aqadm.create_queue_table( queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_payload_type => 'INFOSERVICE.INFO_SYNC_TYPE2', sort_list => 'ENQ_TIME', compatible => '10.0.0', primary_instance => 0, secondary_instance => 0, storage_clause => 'tablespace INFOSERVICE pctfree 10 initrans 1 maxtrans 255 storage ( initial 16M next 16M minextents 1 maxextents unlimited )'); end; / 三:创建队列: begin sys.dbms_aqadm.create_queue ( queue_name => 'infoservice.q_info_sync_message', queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_type => sys.dbms_aqadm.normal_queue, max_retries => 3, retry_delay => 1, retention_time => 0 ); end; 启动队列: begin sys.dbms_aqadm.start_queue (queue_name => 'infoservice.q_info_sync_message',enqueue => true ,dequeue => true ); end;
暂停队列: begin sys.dbms_aqadm.stop_queue ( queue_name => '队列名'); end; 删除队列: begin sys.dbms_aqadm.drop_queue ( queue_name => '队列名'); end; 删除队列表: begin sys.dbms_aqadm.drop_queue_table (queue_table => '队列表名'); end; 四:入队存储过程DBMS_AQ.enqueue: create or replace procedure infoservice.info_sync_enqueue( table_name in number, opr in number,record_id in number) as begin DECLARE queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; BEGIN my_message := info_sync_type2( table_name, opr,record_id ); DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id); COMMIT; END; end info_sync_enqueue; 出对存储过程DBMS_AQ.DEQUEUE: create or replace procedure infoservice.info_sync_dequeue(table_name out number,opr out number ,record_id out number) as begin DECLARE queue_options DBMS_AQ.DEQUEUE_OPTIONS_T; message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; message_id RAW(200); my_message info_sync_type2; BEGIN DBMS_AQ.DEQUEUE( queue_name => 'infoservice.q_info_sync_message', dequeue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id ); COMMIT; table_name := my_message.table_name; opr :=my_message. opr; record_id := my_message.record_id ; END; end info_sync_dequeue; 五:我们这里用的触发器实现自动入队,当在T_PUBLISH_INFO表上做增删改的时候,触发入队,通过标识字段 opr的值 来区分: 1表示insert, 2表示update ,3表示delete: create or replace trigger INFOSERVICE.TRG_PUBLISH_2016A_Q before insert or delete or update of table_name,table_name2,cust_id, title,publish_date,OK_STATUS,OK_DATE,UP_DATE,IN_DATE,FILE_NAME on T_PUBLISH_INFO for each row declare queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; opr number(2); table_name number(3); begin opr := 0; table_name := 22;###就把标识位置为22, CASE WHEN inserting THEN if (:NEW.OK_STATUS = 'Y') then opr := 1; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN updating THEN if ((:NEW.OK_STATUS != :OLD.OK_STATUS)or (:NEW.OK_STATUS = 'Y' )) then opr := 2; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN deleting THEN opr := 3; my_message := info_sync_type2( table_name, opr,:old.record_id ); END CASE; if ( opr != 0) then DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties =>message_properties, payload => my_message, msgid => message_id); end if; end; 这里的table_name:=22,就是说这个触发器入队的时候,就把标识位table_name设置成了22,方便接下来ruby在读取的时候,判断从那个表读取数据, call infoservice.info_sync_enqueue(21,1,39097403); call infoservice.info_sync_enqueue(155,1,39097403); ---这样就读t_publish_info_20142015 执行入队的存储过程,就会把type(INFO_SYNC_TYPE2)对应的三个字段的值存进队列表中,如下:需要注意的是队列表的字段是固定的。 SQL> desc INFOSERVICE.T_INFO_SYNC_MESSAGE; Name Null? Type ----------------------------------------- -------- ---------------------------- Q_NAME VARCHAR2(30) MSGID NOT NULL RAW(16) CORRID VARCHAR2(128) PRIORITY NUMBER STATE NUMBER DELAY TIMESTAMP(6) EXPIRATION NUMBER TIME_MANAGER_INFO TIMESTAMP(6) LOCAL_ORDER_NO NUMBER CHAIN_NO NUMBER CSCN NUMBER DSCN NUMBER ENQ_TIME TIMESTAMP(6) ENQ_UID VARCHAR2(30) ENQ_TID VARCHAR2(30) DEQ_TIME TIMESTAMP(6) DEQ_UID VARCHAR2(30) DEQ_TID VARCHAR2(30) RETRY_COUNT NUMBER EXCEPTION_QSCHEMA VARCHAR2(30) EXCEPTION_QUEUE VARCHAR2(30) STEP_NO NUMBER RECIPIENT_KEY NUMBER DEQUEUE_MSGID RAW(16) SENDER_NAME VARCHAR2(30) SENDER_ADDRESS VARCHAR2(1024) SENDER_PROTOCOL NUMBER USER_DATA INFOSERVICE.INFO_SYNC_TYPE2 USER_PROP ANYDATA 查看现在队列表中的数据,USER_DATA.table_name ,USER_DATA.opr ,USER_DATA.record_id 这三个是之前type中定义的字段。 select * from INFOSERVICE.T_INFO_SYNC_MESSAGE;
总结:我们这个案例中高级队列中实际上保存的最重要的信息是那个主键id,也就是record_id,然后ruby连接上oracle和mysql。通过主键id在oracle对应的表中查出数据(通过table_name 的值来判断是从哪个表找数据),通过标识符opr的值来判断是增删改操作。ruby读取oracle数据到内存中,然后insert 到mysql。实现了可靠的异步同步。
队列
数据
高级
字段
标识
触发器
过程
应用
时候
存储
同步
三个
两个
之间
信息
序列
数据库
机房
消息
系统
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
3ds服务器正在维护
网络技术员和两委哪个好
C操作系统是数据库系统的子系统
jmatpro 数据库
端游用什么软件开发
如何删除数据库中的某一个数据库
广西泛欧互联网科技有限公司
服务器 温度
网络安全宣传周时间2019
拓程互联网科技怎么样
山东他虎哥网络技术有限公司
软件开发团队成员分工图
软件开发等于软件技术吗
荒野乱斗怎么转服务器
腾讯云 招聘 服务器
数据库查询怎么查
网络安全法手机号加密输出
网络安全最新宣传资料
河南铆钉枪软件开发
网络技术培训学院北京
中资源数据库安全吗
福州辰方网络技术有限公司
服务器输出端
科技创新互联网络
虚谷数据库授权
数据库数据加密的意义
最好服务器可以多少人同时使用
跨境电商网络安全的障碍
众心网络技术
徐州老师发卡通里面的网络安全手抄报怎样画