使用ogg同步数据给kafka
为更好的配合公司自然人项目的实施,我利用虚机搭了一个测试环境,把生产库中的部分表的数据通过ogg软件同步给kafka
1 测试环境说明
1)目标
源数据库hr用户下的一张表t1,当发生dml操作时,操作数据写到kafka集群里,并显示出来。
2)环境构成
192.168.60.88 tdb1
192.168.60.89 reps
192.168.60.91 kafka01
192.168.60.92 kafka02
192.168.60.93 kafka03
tdb1是源,Oracle数据库,版本是10.2.0.4,sid:tdb
reps是接口服务器,用于安装ogg for bigdata软件,接收源端ogg pump进程发过来的数据,并把这些数据写入kafka集群
kafka01-03这三台组成了一个kafka的集群
所有这些服务器的os都是rhel 6.8
tdb1,kafka01-03的jdk版本是1.7.0.79
reps的jdk版本是1.8.0.121,因ogg for bigdata 12.2.xxx的版本需要jdk的版本是1.8以上,jdk 1.7会报版本不足的错
这五台服务器都需关掉防火墙,关掉selinux
2 kafka集群的安装
1)下载软件
zookeeper,版本3.4.10,文件名:zookeeper-3.4.10.tar.gz,下载地址:http://zookeeper.apache.org/releases.html
kafka,版本:2.10-0.10.2.0,文件名:kafka_2.10-0.10.2.0.tgz,下载地址:http://kafka.apache.org/downloads
2)准备工作
kafka01-03这三台机器调整/etc/hosts文件
[root@kafka01 ~]# cd /etc [root@kafka01 etc]# cat hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.60.91 kafka01 192.168.60.92 kafka02 192.168.60.93 kafka03
kafka01-03这三台机器都建一个名字为kafka的用户,这三台机器的kafka用户需要做ssh互信任,如何做可以baidu。
kafka01主机,kafka用户示例:
[kafka@kafka01 ~]$ pwd /home/kafka [kafka@kafka01 ~]$ id uid=500(kafka) gid=501(kafka) groups=501(kafka) [kafka@kafka01 ~]$ [kafka@kafka01 ~]$ cat .bash_profile # .bash_profile # Get the aliases and functions if [ -f ~/.bashrc ]; then . ~/.bashrc fi # User specific environment and startup programs PATH=$PATH:$HOME/bin export PATH export JAVA_HOME=/usr/java/jdk1.7.0_79 export JRE_HOME=/usr/java/jdk1.7.0_79/jre export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH export ZOOKEEPER_HOME=/home/kafaka/zookeeper export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH [kafka@kafka01 ~]$
3)安装配置zookeeper
如下操作在kafka01-03都要做,如下kafka01上做示例
a.解压
zookeeper-3.4.10.tar.gz解压后,目录重命名为:zookeeper,并挪到/home/kafka/下,效果如下:
[kafka@kafka01 zookeeper]$ pwd /home/kafka/zookeeper [kafka@kafka01 zookeeper]$ ls bin conf data docs ivy.xml LICENSE.txt README_packaging.txt recipes zookeeper-3.4.10.jar zookeeper-3.4.10.jar.md5 zookeeper.out build.xml contrib dist-maven ivysettings.xml lib NOTICE.txt README.txt src zookeeper-3.4.10.jar.asc zookeeper-3.4.10.jar.sha1 [kafka@kafka01 zookeeper]$
b.配置zoo.cfg
cd /home/kafka/ zookeeper cp zoo_sample.cfg zoo.cfg
编辑zoo.cfg,内容如下:
[kafka@kafka01 conf]$ pwd /home/kafka/zookeeper/conf zoo.cfg设置如下参数: dataDir=/home/kafka/zookeeper/data clientPort=2181 server.1=kafka01:2888:3888 server.2=kafka02:2888:3888 server.3=kafka03:2888:3888
c.设置节点标识
cd /home/kafka/zookeeper mkdir data cd data vi myid 输入1 [kafka@kafka01 data]$ pwd /home/kafka/zookeeper/data [kafka@kafka01 data]$ cat myid 1
d.启动Zookeeper
[kafka@kafka01 bin]$ pwd /home/kafka/zookeeper/bin ./zkServer.sh start 看状态: [kafka@kafka01 bin]$ ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /home/kafka/zookeeper/bin/../conf/zoo.cfg Mode: follower 三台中一台Mode是leader,其余两台Mode为follower 排错: 如果没启动,可以使用./zkServer.sh start-foreground启动,屏幕上会显示日志信息,能看出哪块出了问题。
4)安装配置kafka
如下操作在kafka01-03都要做,kafka01上示例如下:
a.解压
kafka_2.10-0.10.2.0.tgz解压后,目录重命名为:kafka,并挪到/home/kafka/下,效果如下:
[kafka@kafka02 kafka]$ pwd /home/kafka/kafka [kafka@kafka02 kafka]$ ls bin config libs LICENSE logs NOTICE site-docs
b.修改Kafka Servre配置文件
cd /home/kafka/kafka/config vi server.properties [kafka@kafka01 config]$ cat server.properties --注:不需改的条目去掉了 broker.id=1 #kafka01为1,kafka02为2,kafka03为3 host.name=kafka01 #按主机名相应调整 listeners=PLAINTEXT://kafka01:9092 #按主机名相应调整 advertised.listeners=PLAINTEXT://kafka01:9092 #按主机名相应调整 log.dirs=/home/kafka/kafka/logs num.partitions=4
c.后台启动kakfa
在集群中的这三个节点上分别后台启动Kafka,分别执行如下命令:
cd /home/kafka/kafka/bin ./kafka-server-start.sh/home/kafka/kafka/config/server.properties &
d.测试
创建一个名称为oggtest的Topic,4个分区,并且复制因子为3:
可以任意一节点
./kafka-topics.sh -create -zookeeper kafka01:2181,kafka02:2181,kafka03:2181 -replication-factor 3 -partitions 3 –topic oggtest
查看创建的Topic,执行如下命令:
可以任意一节点
./kafka-topics.sh -describe -zookeeper kafka01:2181,kafka02:2181,kafka03:2181 -topic oggtest 查看所有topic: ./kafka-topics.sh -describe -zookeeper kafka01:2181,kafka02:2181,kafka03:2181
可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如何发布消息、消费消息:
在一个终端,启动Producer,并向我们上面创建的名称为oggtest的Topic中生产消息,执行如下脚本:
./kafka-console-producer.sh -broker-list kafka01:9092,kafka02:9092,kafka03:9092 -topic oggtest
在另一个终端,启动Consumer,并订阅我们上面创建的Topic中生产的消息:
./kafka-console-consumer.sh –zookeeperkafka01:2181,kafka02:2181,kafka03:2181 –from-beginning –topic oggtest
如果kafka集群配置的没有问题,随便在producer窗口敲入一些信息回车后,在consumer窗口便能看到相应的消息
3 ogg源端(sdb1主机)的安装配置
1)准备工作
a.ogg 软件
软件名:Oracle GoldenGate V11.2.1.0.3 for Oracle 11g on Linux x86-64.zip,在https://edelivery.oracle.com可以下载到
b.源数据库要开归档,置成force logging,开追加日志
[oracle@tdb1 ~]$ sqlplus / as sysdba SQL> archive log list Database log mode Archive Mode Automatic archival Enabled Archive destination /oracle/arc Oldest online log sequence 9 Next log sequence to archive 11 Current log sequence 11 SQL> SQL> Select 2 SUPPLEMENTAL_LOG_DATA_MIN 3 ,SUPPLEMENTAL_LOG_DATA_PK 4 ,SUPPLEMENTAL_LOG_DATA_UI 5 ,SUPPLEMENTAL_LOG_DATA_ALL 6 , FORCE_LOGGING from v$database; SUPPLEME SUP SUP SUP FOR -------- --- --- --- --- YES NO NO NO YES SQL>
c.推荐添加一ogg用户,以避免对oracle用户的影响,放在oracle用户的主group,根据数据extract的量为ogg用户建立一个合适大小的文件系统
ogg用户最终效果示例:
[root@tdb1 ~]# su - ogg [ogg@tdb1 ~]$ id uid=501(ogg) gid=500(dba) groups=500(dba) [ogg@tdb1 ~]$ [ogg@tdb1 ~]$ cat .bash_profile # .bash_profile # Get the aliases and functions if [ -f ~/.bashrc ]; then . ~/.bashrc fi # User specific environment and startup programs PATH=$PATH:$HOME/bin export PATH umask 022 export ORACLE_BASE=/oracle/app/oracle export ORACLE_HOME=$ORACLE_BASE/product/10.2.0 export ORACLE_SID=tdb export PATH=$ORACLE_HOME/bin:$PATH:. export NLS_LANG=AMERICAN_AMERICA.ZHS16GBK export NLS_DATE_FORMAT=YYYYMMDDHH24MISS export DISPLAY=192.168.60.1:0.0 #ogg export GG_HOME=/ogg #export PATH=$PATH:$GG_HOME export LD_LIBRARY_PATH=/ogg:$ORACLE_HOME/lib [ogg@tdb1 ~]$ [ogg@tdb1 ~]$ sqlplus / as sysdba SQL*Plus: Release 10.2.0.5.0 - Production on Thu Apr 13 11:10:59 2017 Copyright (c) 1982, 2010, Oracle. All Rights Reserved. Connected to: Oracle Database 10g Enterprise Edition Release 10.2.0.5.0 - 64bit Production With the Partitioning, Data Mining and Real Application Testing options SQL>
d.数据库内建立ogg用户,并给其授权
create tablespace ogg datafile '/oracle/oradata/tdb/ogg.dbf' size 1G; create user ogg identified by gg_888 default tablespace ogg; grant connect,resource to ogg; grant dba to ogg; --如果不做ddl trigger,dba权限可以不给 GRANT CREATE SESSION TO ogg; GRANT ALTER SESSION TO ogg; GRANT SELECT ANY DICTIONARY TO ogg; GRANT SELECT ANY TABLE TO ogg; GRANT ALTER ANY TABLE TO ogg; --用户配置表级追加日志 GRANT FLASHBACK ANY TABLE TO ogg; GRANT EXECUTE on DBMS_FLASHBACK package TO ogg; GRANT EXECUTE on DBMS_FLASHBACK TO ogg; GRANT EXECUTE ON utl_file TO ogg; execute DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE('OGG'); grant execute on sys.dbms_lob to ogg; --如下pl/sql块是在oracle 11g之上版本用的,10g版本不需要执行 BEGIN DBMS_GOLDENGATE_AUTH.GRANT_ADMIN_PRIVILEGE( Grantee => 'OGG', privilege_type => 'CAPTURE', grant_select_privileges => TRUE, do_grants => TRUE); END; /
e.为了测试,我建立了一个hr用户,并在其下面建了一个t1的表
-- Create table create table T1 ( id NUMBER not null, name VARCHAR2(100) ) tablespace USERS; -- Create/Recreate primary, unique and foreign key constraints alter table T1 add constraint PK_T1_ID primary key (ID) using index tablespace USERS;
2)配置ogg
a.解压缩ogg软件,放在$GG_HOME下
效果类似如下:
[ogg@tdb1 ogg]$ ls -l gg* -rwxr-x--- 1 ogg dba 6577392 Aug 24 2012 ggcmd -rw-r----- 1 ogg dba 1841 Apr 12 15:58 gglog-defgen.dmp -rw-r----- 1 ogg dba 1239 Apr 12 16:40 gglog-DPE_TEST-43680.dmp -rw-r----- 1 ogg dba 962 Apr 12 16:49 gglog-DPE_TEST-43782.dmp -rw-r----- 1 ogg dba 0 Apr 12 16:40 gglog-DPE_TEST.dmp -rw-r----- 1 ogg dba 1280336 Aug 24 2012 ggMessage.dat -rwxr-x--- 1 ogg dba 13899588 Aug 24 2012 ggsci -rw-rw-rw- 1 ogg dba 21819 Apr 13 08:47 ggserr.log [ogg@tdb1 ogg]$
b.创建ogg相关子目录
[ogg@tdb1 ogg]$ pwd /ogg [ogg@tdb1 ogg]$ ./ggsci GGSCI>create subdirs
ggsci报错处理:
[ogg@tdb1 ogg]$ ggsci ggsci: error while loading shared libraries: libnnz11.so: cannot open shared object file: No such file or directory [ogg@tdb1 ogg]$ ldd ggsci linux-vdso.so.1 => (0x00007ffd3db73000) libdl.so.2 => /lib64/libdl.so.2 (0x00000035bbc00000) libgglog.so => /ogg/./libgglog.so (0x00007ff824130000) libggrepo.so => /ogg/./libggrepo.so (0x00007ff823fdc000) libdb-5.2.so => /ogg/./libdb-5.2.so (0x00007ff823d3b000) libicui18n.so.38 => /ogg/./libicui18n.so.38 (0x00007ff8239da000) libicuuc.so.38 => /ogg/./libicuuc.so.38 (0x00007ff8236a1000) libicudata.so.38 => /ogg/./libicudata.so.38 (0x00007ff8226c5000) libpthread.so.0 => /lib64/libpthread.so.0 (0x00000035bc400000) libxerces-c.so.28 => /ogg/./libxerces-c.so.28 (0x00007ff8221ad000) libantlr3c.so => /ogg/./libantlr3c.so (0x00007ff822097000) libnnz11.so => not found libclntsh.so.11.1 => not found libstdc++.so.6 => /usr/lib64/libstdc++.so.6 (0x00000035c7400000) libm.so.6 => /lib64/libm.so.6 (0x00000035bcc00000) libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x00000035c7000000) libc.so.6 => /lib64/libc.so.6 (0x00000035bc000000) /lib64/ld-linux-x86-64.so.2 (0x00000035bb800000) [oracle@tdb1 ~]$ cd $ORACLE_HOME/lib [oracle@tdb1 lib]$ ln -s libnnz10.so libnnz11.so [oracle@tdb1 lib]$ ln -s libclntsh.so libclntsh.so.11.1 [oracle@tdb1 lib]$ ls -l libclntsh.so.11.1 lrwxrwxrwx 1 oracle dba 12 Apr 11 22:33 libclntsh.so.11.1 -> libclntsh.so [oracle@tdb1 lib]$ ls -l libnnz11.so lrwxrwxrwx 1 oracle dba 11 Apr 11 22:31 libnnz11.so -> libnnz10.so
c.打开hr.t1表级附加日志
[ogg@tdb1 ogg]$ ./ggsci GGSCI>Dblogin userid ogg, password gg_888 Add trandata hr.t1
d.配置ogg manager
[ogg@tdb1 ogg]$ ./ggsci GGSCI>edit params mgr 内容如下,保存 PORT 7809 DYNAMICPORTLIST 7810-7860 AUTORESTART EXTRACT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60 PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 1 LAGREPORTHOURS 1 LAGINFOMINUTES 30 LAGCRITICALMINUTES 45 启动OGG manager GGSCI>start mgr 查看manager进程状态,正确的形态如下: GGSCI (tdb1) 1> info mgr Manager is running (IP port tdb1.7809).
e.创建Extract
GGSCI>edit params ext_test 内容如下,保存 EXTRACT ext_test Setenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") USERID ogg, PASSWORD gg_888 gettruncates DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 1024 DBOPTIONS ALLOWUNUSEDCOLUMN REPORTCOUNT EVERY 1 MINUTES, RATE WARNLONGTRANS 2h,CHECKINTERVAL 3m FETCHOPTIONS NOUSESNAPSHOT TRANLOGOPTIONS CONVERTUCS2CLOBS EXTTRAIL ./dirdat/te WILDCARDRESOLVE DYNAMIC GETUPDATEBEFORES NOCOMPRESSUPDATES NOCOMPRESSDELETES dynamicresolution table hr.t1; 添加抽取进程组: GGSCI>add extract ext_test, TRANLOG, BEGIN NOW 定义trail文件: GGSCI> ADD EXTTRAIL ./dirdat/te, EXTRACT ext_test, MEGABYTES 200
f.pump extract进程
GGSCI>edit param dpe_test 内容如下,保存 EXTRACT dpe_test PASSTHRU RMTHOST 192.168.60.89, MGRPORT 7809 RMTTRAIL ./dirdat/te DYNAMICRESOLUTION TABLE hr.t1; 添加pump捕获组: GGSCI> ADD EXTRACT dpe_test, EXTTRAILSOURCE ./dirdat/te 定义pump trail文件: GGSCI> ADD RMTTRAIL ./dirdat/te, EXTRACT dpe_test, MEGABYTES 200
g.启动捕获进程
GGSCI> start extract ext_test; GGSCI> start extract dpe_test; 看状态,如果如正就是对的: GGSCI> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING EXTRACT RUNNING DPE_TEST 00:00:00 00:00:03 EXTRACT RUNNING EXT_TEST 00:00:00 00:00:01
4 接口机reps安装配置
1)安装OGG for Big Data
a.如源端类以,解压缩ogg for big data软件,放在$GG_HOME下
b./etc/hosts文件
[root@reps etc]# cd /etc [root@reps etc]# cat hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.60.89 reps 192.168.60.91 kafka01 192.168.60.92 kafka02 192.168.60.93 kafka03 [root@reps etc]#
c.安装jdk 1.8及之以的版本
ogg for big data 12.2.xx需要jdk 1.8以上的版本,我这里用的是1.8.0_121
[ogg@reps ogg]$ java -version java version "1.8.0_121" Java(TM) SE Runtime Environment (build 1.8.0_121-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
d.创建ogg用户,配置环境变量,安装kafka软件
[root@reps etc]# su - ogg [ogg@reps ~]$ id uid=500(ogg) gid=501(ogg) groups=501(ogg) [ogg@reps ~]$ cat .bash_profile # .bash_profile # Get the aliases and functions if [ -f ~/.bashrc ]; then . ~/.bashrc fi # User specific environment and startup programs PATH=$PATH:$HOME/bin export PATH export OGG_HOME=/ogg export PATH=$PATH:$GG_HOME export LD_LIBRARY_PATH=$OGG_HOME:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64 [ogg@reps ~]$ [ogg@reps ~]$ ls -l total 8 drwxrwxr-x 2 ogg ogg 4096 Apr 11 22:56 install drwxr-xr-x 6 ogg ogg 4096 Feb 15 01:28 kafka --把kafka软件包解压到这,也可以从kafka主机拷贝这个目录 [ogg@reps ~]$ [ogg@reps ~]$ cd /ogg [ogg@reps ogg]$ ls -l ggsci -rwxr-x--- 1 ogg ogg 39120528 Oct 20 07:05 ggsci [ogg@reps ogg]$
2)配置OGG for kafka
a.启动ogg,并创建相关子目录
./ggsci GGSCI>create subdirs
b.复制example
cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirprm/
c.配置manager
GGSCI>edit params mgr 内容如下: PORT 7809 DYNAMICPORTLIST 7810-7860 AUTORESTART REPLICAT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60 PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 1 LAGREPORTHOURS 1 LAGINFOMINUTES 30 LAGCRITICALMINUTES 45
d.配置kafka.props
内容如下:
[ogg@reps dirprm]$ cat kafka.props gg.handlerlist = kafkahandler gg.handler.kafkahandler.type = kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties gg.handler.kafkahandler.TopicName =oggtest gg.handler.kafkahandler.format =avro_op gg.handler.kafkahandler.format=delimitedtext gg.handler.kafkahandler.format.fieldDelimiter=| gg.handler.kafkahandler.SchemaTopicName=myoggtest gg.handler.kafkahandler.BlockingSend =false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode =op #gg.handler.kafkahandler.maxGroupSize =100, 1Mb #gg.handler.kafkahandler.minGroupSize =50, 500Kb goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec #Sample gg.classpath for Apache Kafka gg.classpath=dirprm/:/home/ogg/kafka/libs/* #Sample gg.classpath for HDP #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/* javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
说明:
gg.handler.kafkahandler.TopicName必须指定kafka端定义的topic
gg.handler.kafkahandler.format下面配置使用文本,并用”|”相隔,kafka最终接收到如下格式的消息。
gg.classpath须指定相应的lib路径
e.配置custom_kafka_producer.properties
内容如下:
bootstrap.servers=kafka01:9092,kafka02:9092,kafka03:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size=102400 linger.ms=10000
3)表结构传递
源端:
GGSCI> ggsci GGSCI> edit param defgen 内容 DEFSFILE dirdef/source.def, PURGE USERID ogg, PASSWORD gg_888 TABLE hr.t1 ; [ogg@tdb1 ogg]$ defgen paramfile dirprm/defgen.prm --shell命令
把defgen.prm放到接口机(reps)的/ogg/dirdef/下
4)定义replication
a.定义参数
./ggsci GGSCI>edit params rep_test 输入如下内容: REPLICAT rep_test TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props SOURCEDEFS dirdef/source.def REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP hr.*, TARGET hr.*;
b.指定Trail文件
GGSCI> add replicat rep_test, exttrail ./dirdat/te
c.启动replicat进程,并检查状态
GGSCI> start replicat rep_test 检查状态,类似如下输出表示正常 GGSCI (reps) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING REPLICAT RUNNING REP_TEST 00:00:00 00:00:05
其它:如果replicat进程启动不了,可以使用如下命令启动,以方便诊断问题:
cd $OGG_HOME ./replicat paramfile dirprm/rep_test.prm
5 测试验证
1)启动kafka consumerconsole
kafka任一结点:
./kafka-console-consumer.sh -zookeeper :2181 -topic oggtest -from-beginning
2)在源端测试表中插入数据
sqlplus hr/hr SQL> insert into t1 values(5,'shihaifeng'); 1 row created. SQL> commit; Commit complete.
3)查看kafka消费控制台是否接收到该行数据
我的有如下显示:
I|HR.T1|2017-04-13 03:31:03.835147|2017-04-13T11:31:08.973000|00000000000000001478|5|shihaifeng
近期评论