从Oracle用goldengate抽取数据到kafka
Goldengate到kafka配置详解
环境介绍:
源端数据库版本 | 源端OGG版本 | 目标端OGG版本 | Kafka集群 | 目标端数据库GP |
11.2.0.3 | 12.2.0.1.1 | ggs_Adapters_Linux_x64 | ||
切记OGG版本是for big data | ||||
12.3.0.1.0 |
源端配置:
1.1安装OGG软件。
OGG软件不做要求12版本即可
配置MGR
PORT 7810
DYNAMICPORTLIST 7811-7914
AUTORESTART REPLICAT dpe*, WAITMINUTES 1, RETRIES 5
AUTORESTART REPLICAT ext*, WAITMINUTES 1, RETRIES 5
PURGEOLDEXTRACTS /home/ogg/kafka_ogg/dirdat/kf*,USECHECKPOINTS, minkeephours 6
配置ext抽取进程参数:
EXTRACT extkaf
--setenv (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
setenv (ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1)
userid goldengate@ogg ,password Golden_1230
--getupdatebefores
GETTRUNCATES
REPORTCOUNT EVERY 15 MINUTES, RATE
DISCARDFILE ./dirrpt/extkaf.dsc,APPEND,MEGABYTES 1024
--THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000 IOLATENCY 90000
DBOPTIONS ALLOWUNUSEDCOLUMN
--WARNLONGTRANS 2h,CHECKINTERVAL 3m
EXTTRAIL ./dirdat/kf
-- TRANLOGOPTIONS CONVERTUCS2CLOBS
TRANLOGOPTIONS EXCLUDEUSER goldengate
TRANLOGOPTIONS DBLOGREADER
-- TRANLOGOPTIONS _noReadAhead Any
--DYNAMICRESOLUTION
Table schema1.tablename1;
Table schema1.tablename2;
Table schema1.tablename3 ;
Table4schema1.tablename4;
配置投递进程:
extract dpekaf
rmthost 172.31.31.10,mgrport 7810
passthru
numfiles 500
rmttrail /home/ogg/kafka_ogg/dirdat/kf
------------------------------------------------------------
Table schema1.tablename1;
Table schema1.tablename2;
Table schema1.tablename3 ;
Table4schema1.tablename4;
-----===========================目标端配置==================
Goldengate for big data
目标端配置MGR:
PORT 7810
DYNAMICPORTLIST 7811-7914
AUTORESTART REPLICAT rep*, WAITMINUTES 1, RETRIES 5
PURGEOLDEXTRACTS /home/ogg/kafka_ogg/dirdat/kf*,USECHECKPOINTS, minkeephours 6
配置replicat进程:
入库进程1:
replicat repykaf1
--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)
--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)
--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)
--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)
--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)
--getenv (JAVA_HOME)
--getenv (JRE_HOME)
--getenv (CLASSPATH)
--getenv (LD_LIBRARY_PATH)
--getenv (PATH)
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka1.props
GETTRUNCATES
REPORTCOUNT EVERY 1 MINUTES ,RATE
GROUPTRANSOPS 1000
Table schema1.tablename1;
入库进程2:
replicat repykaf2
--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)
--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)
--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)
--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)
--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)
--getenv (JAVA_HOME)
--getenv (JRE_HOME)
--getenv (CLASSPATH)
--getenv (LD_LIBRARY_PATH)
--getenv (PATH)
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka2.props
GETTRUNCATES
REPORTCOUNT EVERY 1 MINUTES ,RATE
GROUPTRANSOPS 1000
Table schema1.tablename2;
入库进程3:
replicat repykaf3
--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)
--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)
--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)
--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)
--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)
--getenv (JAVA_HOME)
--getenv (JRE_HOME)
--getenv (CLASSPATH)
--getenv (LD_LIBRARY_PATH)
--getenv (PATH)
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka3.props
GETTRUNCATES
REPORTCOUNT EVERY 1 MINUTES ,RATE
GROUPTRANSOPS 1000
Table schema1.tablename3;
入库进程4:
replicat repykaf4
--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)
--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)
--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)
--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)
--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)
--getenv (JAVA_HOME)
--getenv (JRE_HOME)
--getenv (CLASSPATH)
--getenv (LD_LIBRARY_PATH)
--getenv (PATH)
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka4.props
GETTRUNCATES
REPORTCOUNT EVERY 1 MINUTES ,RATE
GROUPTRANSOPS 1000
Table schema1.tablename4;
配置去kafka的参数文件
我的OGG解压目录是:kafka_ogg
在OGG的解压目录下有:AdapterExamples 文件夹
cp /home/ogg/kafka_ogg/AdapterExamples/big-data/kafka/* /home/ogg/kafka_ogg/dirprm/
编辑:
Vi custom_kafka_producer.properties
###bootstrap.servers=ip:端口,ip:端口 例子
bootstrap.servers=172.31.31.10:6667,172.31.31.11:6667,172.31.31.12:6667,172.31.31.13:6667
acks=1
reconnect.backoff.ms=1000
compression.type=gzip
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=102400
linger.ms=10000
max.request.size=10240000
send.buffer.bytes=10240000
编辑
Kafka1.props
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=topic-name
(这里写你创建的topic name )
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=tx
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
gg.classpath=dirprm/:/home/ogg/kafka_ogg/ggjava/resources/lib/*:/usr/hdp/2.4.0.0-169/kafka/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
goldengate.userexit.utf8mode=true
gg.handler.kafkahandler.keyMappingTemplate=HH
gg.handler.kafkahandler.format.includePrimaryKeys=true
Kafka2.props
Kafka3.props
Kafka4.props