千家信息网

从Oracle用goldengate抽取数据到kafka

发表于:2025-01-21 作者:千家信息网编辑
千家信息网最后更新 2025年01月21日,Goldengate到kafka配置详解环境介绍:源端数据库版本源端OGG版本目标端OGG版本Kafka集群目标端数据库GP11.2.0.312.2.0.1.1ggs_Adapters_Linux_x
千家信息网最后更新 2025年01月21日从Oracle用goldengate抽取数据到kafka

Goldengatekafka配置详解

环境介绍:

源端数据库版本

源端OGG版本

目标端OGG版本

Kafka集群

目标端数据库GP

11.2.0.3

12.2.0.1.1

ggs_Adapters_Linux_x64

切记OGG版本是for big data

12.3.0.1.0

源端配置:

1.1安装OGG软件。

OGG软件不做要求12版本即可

配置MGR

PORT 7810

DYNAMICPORTLIST 7811-7914

AUTORESTART REPLICAT dpe*, WAITMINUTES 1, RETRIES 5

AUTORESTART REPLICAT ext*, WAITMINUTES 1, RETRIES 5

PURGEOLDEXTRACTS /home/ogg/kafka_ogg/dirdat/kf*,USECHECKPOINTS, minkeephours 6

配置ext抽取进程参数:

EXTRACT extkaf

--setenv (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")

setenv (ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1)

userid goldengate@ogg ,password Golden_1230

--getupdatebefores

GETTRUNCATES

REPORTCOUNT EVERY 15 MINUTES, RATE

DISCARDFILE ./dirrpt/extkaf.dsc,APPEND,MEGABYTES 1024

--THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000 IOLATENCY 90000

DBOPTIONS ALLOWUNUSEDCOLUMN

--WARNLONGTRANS 2h,CHECKINTERVAL 3m

EXTTRAIL ./dirdat/kf

-- TRANLOGOPTIONS CONVERTUCS2CLOBS

TRANLOGOPTIONS EXCLUDEUSER goldengate

TRANLOGOPTIONS DBLOGREADER

-- TRANLOGOPTIONS _noReadAhead Any

--DYNAMICRESOLUTION

Table schema1.tablename1;

Table schema1.tablename2;

Table schema1.tablename3 ;

Table4schema1.tablename4;

配置投递进程:

extract dpekaf

rmthost 172.31.31.10,mgrport 7810

passthru

numfiles 500

rmttrail /home/ogg/kafka_ogg/dirdat/kf

------------------------------------------------------------

Table schema1.tablename1;

Table schema1.tablename2;

Table schema1.tablename3 ;

Table4schema1.tablename4;

-----===========================目标端配置==================


Goldengate for big data

目标端配置MGR

PORT 7810

DYNAMICPORTLIST 7811-7914

AUTORESTART REPLICAT rep*, WAITMINUTES 1, RETRIES 5

PURGEOLDEXTRACTS /home/ogg/kafka_ogg/dirdat/kf*,USECHECKPOINTS, minkeephours 6

配置replicat进程:

入库进程1

replicat repykaf1

--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)

--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)

--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)

--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)

--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)

--getenv (JAVA_HOME)

--getenv (JRE_HOME)

--getenv (CLASSPATH)

--getenv (LD_LIBRARY_PATH)

--getenv (PATH)

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka1.props

GETTRUNCATES

REPORTCOUNT EVERY 1 MINUTES ,RATE

GROUPTRANSOPS 1000

Table schema1.tablename1;

入库进程2

replicat repykaf2

--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)

--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)

--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)

--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)

--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)

--getenv (JAVA_HOME)

--getenv (JRE_HOME)

--getenv (CLASSPATH)

--getenv (LD_LIBRARY_PATH)

--getenv (PATH)

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka2.props

GETTRUNCATES

REPORTCOUNT EVERY 1 MINUTES ,RATE

GROUPTRANSOPS 1000

Table schema1.tablename2;

入库进程3

replicat repykaf3

--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)

--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)

--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)

--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)

--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)

--getenv (JAVA_HOME)

--getenv (JRE_HOME)

--getenv (CLASSPATH)

--getenv (LD_LIBRARY_PATH)

--getenv (PATH)

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka3.props

GETTRUNCATES

REPORTCOUNT EVERY 1 MINUTES ,RATE

GROUPTRANSOPS 1000

Table schema1.tablename3;

入库进程4

replicat repykaf4

--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)

--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)

--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)

--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)

--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)

--getenv (JAVA_HOME)

--getenv (JRE_HOME)

--getenv (CLASSPATH)

--getenv (LD_LIBRARY_PATH)

--getenv (PATH)

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka4.props

GETTRUNCATES

REPORTCOUNT EVERY 1 MINUTES ,RATE

GROUPTRANSOPS 1000

Table schema1.tablename4;

配置去kafka的参数文件

我的OGG解压目录是:kafka_ogg

OGG的解压目录下有:AdapterExamples 文件夹

cp /home/ogg/kafka_ogg/AdapterExamples/big-data/kafka/* /home/ogg/kafka_ogg/dirprm/

编辑:

Vi custom_kafka_producer.properties

###bootstrap.servers=ip:端口,ip:端口 例子

bootstrap.servers=172.31.31.10:6667,172.31.31.11:6667,172.31.31.12:6667,172.31.31.13:6667

acks=1

reconnect.backoff.ms=1000

compression.type=gzip

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

# 100KB per partition

batch.size=102400

linger.ms=10000

max.request.size=10240000

send.buffer.bytes=10240000

编辑

Kafka1.props

gg.handlerlist = kafkahandler

gg.handler.kafkahandler.type=kafka

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

gg.handler.kafkahandler.topicMappingTemplate=topic-name

(这里写你创建的topic name

gg.handler.kafkahandler.format=json

gg.handler.kafkahandler.BlockingSend =false

gg.handler.kafkahandler.includeTokens=false

gg.handler.kafkahandler.mode=tx

goldengate.userexit.timestamp=utc

goldengate.userexit.writers=javawriter

javawriter.stats.display=TRUE

javawriter.stats.full=TRUE

gg.log=log4j

gg.log.level=INFO

gg.report.time=30sec

gg.classpath=dirprm/:/home/ogg/kafka_ogg/ggjava/resources/lib/*:/usr/hdp/2.4.0.0-169/kafka/libs/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

goldengate.userexit.utf8mode=true

gg.handler.kafkahandler.keyMappingTemplate=HH

gg.handler.kafkahandler.format.includePrimaryKeys=true

Kafka2.props

Kafka3.props

Kafka4.props

0