千家信息网

如何使用ogg将Oracle数据传输到flume刷到kafka

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,本篇内容主要讲解"如何使用ogg将Oracle数据传输到flume刷到kafka",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何使用ogg将Oracle
千家信息网最后更新 2025年02月04日如何使用ogg将Oracle数据传输到flume刷到kafka

本篇内容主要讲解"如何使用ogg将Oracle数据传输到flume刷到kafka",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何使用ogg将Oracle数据传输到flume刷到kafka"吧!

源端测试服务器:

服务器环境部署:

命令步骤如下:

[root@test ~]# groupadd oinstall

[root@test ~]# groupadd dba

[root@test ~]# useradd -g oinstall -G dba oracle

[root@test ~]#

修改权限:

[root@test ~]# chown -R oracle:oinstall /data

[root@test ~]#

2. 设置全局java环境变量

[root@test ~]# cat /etc/redhat-release

CentOS release 6.4 (Final)

[root@test ~]#

[oracle@test data]$ tar -zxvf jdk-8u60-linux-x64.tar.gz

在root下执行配置:

设置java环境变量:

vi /etc/profile

###jdk

export JAVA_HOME=/data/jdk1.8.0_60

export JAVA_BIN=/data/jdk1.8.0_60/bin

export PATH=$PATH:$JAVA_HOME/bin

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export JAVA_HOME JAVA_BIN PATH CLASSPATH

export LD_LIBRARY_PATH=/data/jdk1.8.0_60/jre/lib/amd64/server:$LD_LIBRARY_PATH

切换Oracle用户核对:

[root@test ~]# su - oracle

[oracle@test ~]$ java -version

java version "1.8.0_60"

Java(TM) SE Runtime Environment (build 1.8.0_60-b27)

Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

[oracle@test ~]$

如果不生效:

修改java环境变量:

alternatives --install /usr/bin/java java /data/jdk1.8.0_60/bin/java 100

alternatives --install /usr/bin/jar jar /data/jdk1.8.0_60/bin/jar 100

alternatives --install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100

update-alternatives --install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100

# /usr/sbin/alternatives --config java

[root@test1 data]# /usr/sbin/alternatives --config java

There are 4 programs which provide 'java'.

Selection Command

-----------------------------------------------

1 /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java

*+ 2 /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

3 /usr/lib/jvm/jre-1.5.0-gcj/bin/java

4 /data/jdk1.8.0_60/bin/java

Enter to keep the current selection[+], or type selection number: 4

[root@test1 data]# /usr/sbin/alternatives --config java

There are 4 programs which provide 'java'.

Selection Command

-----------------------------------------------

1 /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java

* 2 /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

3 /usr/lib/jvm/jre-1.5.0-gcj/bin/java

+ 4 /data/jdk1.8.0_60/bin/java

Enter to keep the current selection[+], or type selection number:

[root@test1 data]#

[root@test1 data]# java -version

java version "1.8.0_60"

Java(TM) SE Runtime Environment (build 1.8.0_60-b27)

Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

[root@test1 data]#

修改flume 参数配置:

[oracle@test1 conf]$ cat flume-conf.properties

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements. See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership. The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License. You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing,

# software distributed under the License is distributed on an

# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

# KIND, either express or implied. See the License for the

# specific language governing permissions and limitations

# under the License.

# The configuration file needs to define the sources,

# the channels and the sinks.

# Sources, channels and sinks are defined per agent,

# in this case called 'agent'

agent.sources = r1

agent.channels = fileChannel

agent.sinks = kafkaSink

# For each one of the sources, the type is defined

agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.

agent.sources.seqGenSrc.channels = fileChannel

#

agent.sources.r1.type = avro

agent.sources.r1.port = 14141

agent.sources.r1.bind = 192.168.88.66

agent.sources.r1.channels = fileChannel

# Each sink's type must be defined

agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use

agent.sinks.loggerSink.channel = memoryChannel

#kafka sink

agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.kafkaSink.topic = my_schema

agent.sinks.kafkaSink.brokerList = 192.168.88.1:9092,192.168.88.2:9092,192.168.88.3:9092,192.168.88.4:9092

agent.sinks.kafkaSink.requiredAcks = 1

agent.sinks.kafkaSink.batchSize = 20

agent.sinks.kafkaSink.channel = fileChannel

# Each channel's type is defined.

agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent.channels.memoryChannel.capacity = 100

#File Channel

agent.channels.fileChannel.type = file

agent.channels.fileChannel.transactionCapacity = 20000000

agent.channels.fileChannel.capacity = 50000000

agent.channels.fileChannel.maxFileSize = 2147483648

agent.channels.fileChannel.minimumRequiredSpace = 52428800

agent.channels.fileChannel.keep-alive = 3

agent.channels.fileChannel.checkpointInterval = 20000

agent.channels.fileChannel.checkpointDir = /data/apache-flume-1.6.0-bin/CheckpointDir

agent.channels.fileChannel.dataDirs = /data/apache-flume-1.6.0-bin/DataDir

[oracle@test1 conf]$

############配置OGG

主库在

源库创建新的抽取进程:

dblogin userid goldengate, password goldengate

add extract EXTJMS,tranlog, threads 2,begin now

add exttrail /data/goldengate/dirdat/kf, extract EXTJMS megabytes 200

add schematrandata my_schema

add trandata my_schema.*

原抽取进程:

extract EXTJMS

setenv (ORACLE_SID="testdb")

setenv (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")

userid goldengate, password goldengate

TRANLOGOPTIONS DBLOGREADER

exttrail /data/goldengate/dirdat/kf

discardfile /data/goldengate/dirrpt/EXTJMS.dsc,append

THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000

numfiles 3000

CHECKPOINTSECS 20

DISCARDROLLOVER AT 05:30

dynamicresolution

GETUPDATEBEFORES

NOCOMPRESSUPDATES

NOCOMPRESSDELETES

RecoveryOptions OverwriteMode

ddl &

include mapped &

exclude objtype 'TRIGGER' &

exclude objtype 'PROCEDURE' &

exclude objtype 'FUNCTION' &

exclude objtype 'PACKAGE' &

exclude objtype 'PACKAGE BODY' &

exclude objtype 'TYPE' &

exclude objtype 'GRANT' &

exclude instr 'GRANT' &

exclude objtype 'DATABASE LINK' &

exclude objtype 'CONSTRAINT' &

exclude objtype 'JOB' &

exclude instr 'ALTER SESSION' &

exclude INSTR 'AS SELECT' &

exclude INSTR 'REPLACE SYNONYM' &

EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_CMP" &

EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_UNCMP"

FETCHOPTIONS NOUSESNAPSHOT, USELATESTVERSION, MISSINGROW REPORT

TABLEEXCLUDE *.DBMS_TABCOMP_TEMP*;

--extract table user

TABLE my_schema.*;

SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS;

Database altered.

SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS;

Database altered.

SQL> select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI ,FORCE_LOGGING from v$database;

SUPPLEME SUP SUP FOR

-------- --- --- ---

YES YES YES YES

SQL>

源端添加新的pump进程:

在my_schema源库测试添加pump进程:

添加pump进程:

添加新的pump:

add extract EDPKF,exttrailsource /data/goldengate/dirdat/kf, begin now

edit param EDPKF

EXTRACT EDPKF

setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)

PASSTHRU

GETUPDATEBEFORES

NOCOMPRESSUPDATES

NOCOMPRESSDELETES

RecoveryOptions OverwriteMode

RMTHOST 192.168.88.66, MGRPORT 7839

RMTTRAIL /data/ogg_for_bigdata/dirdat/kp

DISCARDFILE ./dirrpt/EDPKF.dsc,APPEND,MEGABYTES 5

TABLE my_schema.* ;

add rmttrail /data/ogg_for_bigdata/dirdat/kp, extract EDPKF megabytes 200

edit param defgen

userid goldengate, password goldengate

defsfile dirdef/my_schema.def

TABLE my_schema.*;

传递定义文件:

./defgen paramfile ./dirprm/defgen.prm

目标端直接端

mgr:

PORT 7839

DYNAMICPORTLIST 7840-7850

--AUTOSTART replicat *

--AUTORESTART replicat *,RETRIES 5,WAITMINUTES 2

AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10

PURGEOLDEXTRACTS /data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2

LAGREPORTHOURS 1

LAGINFOMINUTES 30

LAGCRITICALMINUTES 45

添加 UE DATA PUMP:

使用版本:

Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209

ADD EXTRACT LOANFLM, EXTTRAILSOURCE /data/ogg_for_bigdata/dirdat/kp

edit param JMSFLM

GGSCI (localhost.localdomain) 18> view param JMSFLM

EXTRACT JMSFLM

SETENV (GGS_USEREXIT_CONF ="dirprm/JMSFLM.props")

GetEnv (JAVA_HOME)

GetEnv (PATH)

GetEnv (LD_LIBRARY_PATH)

SourceDefs dirdef/my_schema.def

CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores

GetUpdateBefores

NoCompressDeletes

NoCompressUpdates

NoTcpSourceTimer

TABLEEXCLUDE my_schema.MV*;

TABLE my_schema.*;

--alter prodjms extseqno 736, extrba 0

注释: 在目标端完全可以不安装Oracle数据库,可以和flume环境放在一起,最终刷数据到kafka的服务器接收消息。

本案例是 通过flume中转实现的,完全没有问题。

当然也可以直接将数据传输到kafka处理消息,原理都是一样的。

未来更多的大数据融合也是一个不错的方案,无论是mysql,mongodb,hdfs等都可以完美结合。

参数文件:

$ cat JMSFLM.props

gg.handlerlist=flumehandler

gg.handler.flumehandler.type=com.goldengate.delivery.handler.flume.FlumeHandler

gg.handler.flumehandler.host=192.168.88.66

gg.handler.flumehandler.port=14141

gg.handler.flumehandler.rpcType=avro

gg.handler.flumehandler.delimiter=\u0001

gg.handler.flumehandler.mode=op

gg.handler.flumehandler.includeOpType=true

# Indicates if the operation timestamp should be included as part of output in the delimited separated values

# true - Operation timestamp will be included in the output

# false - Operation timestamp will not be included in the output

# Default :- true

#gg.handler.flumehandler.includeOpTimestamp=true

#gg.handler.name.deleteOpKey=D

#gg.handler.name.updateOpKey=U

#gg.handler.name.insertOpKey=I

#gg.handler.name.pKUpdateOpKey=P

#gg.handler.name.includeOpType=true

# Optional properties to use the transaction grouping functionality

#gg.handler.flumehandler.maxGroupSize=1000

#gg.handler.flumehandler.minGroupSize=1000

### native library config ###

goldengate.userexit.nochkpt=TRUE

goldengate.userexit.timestamp=utc

goldengate.log.logname=cuserexit

goldengate.log.level=DEBUG

goldengate.log.tofile=true

goldengate.userexit.writers=javawriter

goldengate.log.level.JAVAUSEREXIT=DEBUG

#gg.brokentrail=true

gg.report.time=30sec

gg.classpath=/data/ogg_for_bigdata/dirprm/flumejar/*:/data/apache-flume-1.6.0-bin/lib/*

javawriter.stats.full=TRUE

javawriter.stats.display=TRUE

javawriter.bootoptions=-Xmx81920m -Xms20480m -Djava.class.path=/data/ogg_for_bigdata/ggjava/ggjava.jar -Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties

到此,相信大家对"如何使用ogg将Oracle数据传输到flume刷到kafka"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

数据 环境 进程 数据传输 传输 变量 服务器 服务 配置 内容 参数 文件 更多 消息 目标 学习 抽取 测试 不错 实用 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 邮箱收发服务器 数据库应用技术第六章行考答案 服务器安装管理维护的软件 软件开发学徒实习生 软件开发开发公司哪家好 学校图书馆数据库资源用不了 计算机作为服务器 如何查看服务器是几核几线程 服务器空间怎么搭建 网络安全比赛waf 网络安全企业文化标语挂墙 目前常用的软件开发技术 i7 4790支持服务器内存 身边还存在哪些手机网络安全隐患 我的世界挂机为主的服务器 在软件开发中什么最重要 数据库实验查询心得体会 福建质量网络技术咨询产品 软件开发计划包括哪些问题 安徽企业软件开发项目 关系型数据库是由二维表组成 网络安全周2020宣传方案 幻塔悯雨岛服务器是渠道服吗 郑州中资互联网科技公司 我的世界1.6.1花庭雨服务器 华为3d眼镜数字媒体服务器 知网是文摘型数据库吗 服务器导出文件乱码常用方案 通信与网络安全有联系吗 软件开发团队进度管理
0