设为首页 收藏本站
查看: 3848|回复: 0

[经验分享] OGG同步ORACLE数据到KAFKA

[复制链接]

尚未签到

发表于 2018-9-21 12:30:09 | 显示全部楼层 |阅读模式
  环境:
  源端:oracle12.2   ogg for oracle 12.3
  目标端:KAFKA      ogg for bigdata 12.3
  将oracle中的数据通过OGG同步到KAFKA
  源端配置:
  1、为要同步的表添加附加日志
  dblogin USERID ogg@orclpdb, PASSWORD ogg
  add trandata scott.tab1
  add trandata scott.tab2
  2、 添加抽取进程
  GGSCI>add extract EXT_KAF1,integrated tranlog, begin now
  GGSCI>add EXTTRAIL ./dirdat/k1, extract EXT_KAF1,MEGABYTES 200
  编辑抽取进程参数:
  GGSCI> edit params EXT_KAF1
  extract EXT_KAF1
  userid c##ggadmin,PASSWORD ggadmin
  LOGALLSUPCOLS
  UPDATERECORDFORMAT COMPACT

  exttrail ./dirdat/k1,FORMAT>  SOURCECATALOG orclpdb  --(指定pdb)
  table scott.tab1;
  table scott.tab2;
  注册进程
  GGSCI> DBLOGIN USERID c##ggadmin,PASSWORD ggadmin
  GGSCI> register extract EXT_KAF1 database container (orclpdb)
  3、添加投递进程:
  GGSCI>add extract PMP_KAF1, exttrailsource ./dirdat/k1
  GGSCI>add rmttrail ./dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200
  编辑投递进程参数:
  GGSCI>edit param PMP_KAF1
  EXTRACT PMP_KAF1
  USERID c##ggadmin,PASSWORD ggadmin
  PASSTHRU
  RMTHOST 10.1.1.247, MGRPORT 9178

  RMTTRAIL ./dirdat/f1,format>  SOURCECATALOG orclpdb
  TABLE scott.tab1;
  table scott.tab2;
  4、添加数据初始化进程(Oracle initial load) 可以多个表分开初始化也可以一起初始化,此处选择分开初始化
  GGSCI> add extract ek_01, sourceistable
  编辑参数:
  GGSCI> EDIT PARAMS ek_01
  EXTRACT ek_01
  USERID  c##ggadmin,PASSWORD ggadmin
  RMTHOST 10.1.1.247, MGRPORT 9178

  RMTFILE ./dirdat/ka,maxfiles 999, megabytes 500,format>  SOURCECATALOG orclpdb
  table scott.tab1;
  GGSCI> add extract ek_02, sourceistable
  EDIT PARAMS ek_02
  EXTRACT ek_02
  USERID  c##ggadmin,PASSWORD ggadmin
  RMTHOST 10.1.1.247, MGRPORT 9178

  RMTFILE ./dirdat/kb,maxfiles 999, megabytes 500,format>  SOURCECATALOG orclpdb
  table scott.tab2;
  5、生成def文件:
  GGSCI> edit param defgen1
  USERID c##ggadmin,PASSWORD ggadmin

  defsfile /home/oracle/ogg/ggs12/dirdef/defgen1.def,format>  SOURCECATALOG orclpdb
  table scott.tab1;
  table scott.tab2;
  在OGG_HOME下执行如下命令生成def文件
  defgen paramfile dirprm/defgen1.prm
  将生成的def文件传到目标端$OGG_HOME/dirdef下
  目标端配置:
  1、将$OGG_HOME/AdapterExamples/big-data/kafka下的所有文件copy到$OGG_HOME/dirprm下
  cd $OGG_HOME/AdapterExamples/big-data/kafka
  cp * $OGG_HOME/dirprm
  2、将$ORACLE_HOME/AdapterExamples/trail下的文件tr000000000 copy到$OGG_HOME/dirdat下
  cd $ORACLE_HOME/AdapterExamples/trail
  cp tr000000000 $OGG_HOME/dirdat
  3、添加初始化进程:(可以多表一起初始化也可以分开初始化,此处选择单独初始化)
  GGSCI> ADD replicat rp_01, specialrun
  GGSCI> EDIT PARAMS rp_01
  SPECIALRUN
  end runtime
  setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
  targetdb libfile libggjava.so set property=./dirprm/kafka1.props
  SOURCEDEFS ./dirdef/defgen1.def
  EXTFILE ./dirdat/ka
  reportcount every 1 minutes, rate
  grouptransops 10000
  MAP orclpdb.scott.tab1, TARGET scott.tab1;
  GGSCI> ADD replicat rp_02, specialrun
  GGSCI> EDIT PARAMS rp_02
  SPECIALRUN
  end runtime
  setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
  targetdb libfile libggjava.so set property=./dirprm/kafka2.props
  SOURCEDEFS ./dirdef/defgen1.def
  EXTFILE ./dirdat/kb
  reportcount every 1 minutes, rate
  grouptransops 10000
  MAP orclpdb.scott.tab2, TARGET scott.tab2;
  4、添加恢复进程:
  GGSCI>add replicat r_kaf1,exttrail ./dirdat/f1
  GGSCI>edit params r_kaf1
  REPLICAT r_kaf1
  setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
  HANDLECOLLISIONS
  targetdb libfile libggjava.so set property=./dirprm/kafka1.props
  SOURCEDEFS ./dirdef/defgen1.def
  reportcount every 1 minutes, rate
  grouptransops 10000
  MAP orclpdb.scott.tab1, TARGET scott.tab1;
  GGSCI> add replicat r_kaf2,exttrail ./dirdat/f2
  GGSCI> edit params r_kaf2
  REPLICAT r_kaf2
  setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
  HANDLECOLLISIONS
  targetdb libfile libggjava.so set property=./dirprm/kafka2.props
  SOURCEDEFS ./dirdef/defgen1.def
  reportcount every 1 minutes, rate
  grouptransops 10000
  MAP orclpdb.scott.tab2, TARGET scott.tab2;
  5、参数配置:
  custom_kafka_producer.properties文件内容如下:
  bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200      --只需要改动这一行就行,指定kafka的地址和端口号
  acks=1
  reconnect.backoff.ms=1000
  value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
  key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
  batch.size=16384
  linger.ms=10000
  kafka1.props文件内容如下:
  gg.handlerlist = kafkahandler
  gg.handler.kafkahandler.type=kafka
  gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
  #The following resolves the topic name using the short table name
  gg.handler.kafkahandler.topicMappingTemplate= topic1
  #gg.handler.kafkahandler.format=avro_op
  gg.handler.kafkahandler.format =json   --这里做了改动,指定格式为json格式
  gg.handler.kafkahandler.format.insertOpKey=I
  gg.handler.kafkahandler.format.updateOpKey=U
  gg.handler.kafkahandler.format.deleteOpKey=D
  gg.handler.kafkahandler.format.truncateOpKey=T
  gg.handler.kafkahandler.format.prettyPrint=false
  gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
  gg.handler.kafkahandler.format.includePrimaryKeys=true    --包含主键
  gg.handler.kafkahandler.SchemaTopicName= topic1   --此处指定为要同步到的目标topic名字
  gg.handler.kafkahandler.BlockingSend =false
  gg.handler.kafkahandler.includeTokens=false
  gg.handler.kafkahandler.mode=op
  goldengate.userexit.timestamp=utc
  goldengate.userexit.writers=javawriter
  javawriter.stats.display=TRUE
  javawriter.stats.full=TRUE
  gg.log=log4j
  gg.log.level=INFO
  gg.report.time=30sec
  #Sample gg.classpath for Apache Kafka
  gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/    --指定classpath,这里很重要,必须有kafka安装文件的类库。
  #Sample gg.classpath for HDP
  #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/
  javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
  启动进程进程恢复:
  1、启动源端抓取进程
  GGSCI> start EXT_KAF1
  2、启动源端投递进程
  GGSCI> start PMP_KAF1
  3、启动源端初始化进程
  GGSCI> start ek_01
  4、启动目标端初始化进程
  在$OGG_HOME下执行如下命令:
  ./replicat paramfile ./dirprm/rp_01.prm reportfile ./dirrpt/rp_01.rpt -p INITIALDATALOAD
  5、启动目标端恢复进程
  GGSCI> start R_KAF1
  遇到的错误:
  1、ERROR OGG-15050 Error loading Java VM runtime library(2 no such file or directory)
DSC0000.jpg

  原因:找不到类库(配置好环境变量之后,OGG的mgr进程没有重启,导致的)
  解决:重启MGR进程
  2、ERROR OG-15051  Java or JNI exception
DSC0001.jpg

  原因:没有使用ogg12.3.1.1.1自带的kafka.props,而是copy了ogg12.2的kafka.props,导致出现异常。
  解决:使用ogg12.3.1.1.1自带的kafka.props,并指定相关的属性,解决。


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-599419-1-1.html 上篇帖子: centos 6.5部署oracle 11g记录 下篇帖子: Loading Data From Oracle To Hive By ODI 12c-candon123
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表