flink-cdc实时同步(oracle to mysql)

Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracle 的支持。

Flink下载地址

https://flink.apache.org/downloads/

其他必需的jar包(cdc、jdbc、mysq和oracle等驱动包)

 下载Flink后,直接解压到指定目录下即可;

tar zxvf flink-1.20.0-bin-scala_2.12.tgz

 将所有必须的jar包放在lib目录下,我这边的目录为/u01/flink-1.20.0/lib;

启动flink:

[root@gcv-b-test-gmes-oracle bin]# /u01/flink-1.20.0/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host gcv-b-test-gmes-oracle.
Starting taskexecutor daemon on host gcv-b-test-gmes-oracle.

 

如果需要web登录查看flink,需要修改配置文件(/u01/flink-1.20.0/conf/config.yaml)

address: 10.240.12.219
bind-address: 0.0.0.0

 

登录web界面:

 

配置Oracle:

必须开启归档(步骤查资料);

测试用户及表
create user flink identified by "123456";
grant connect ,resource to flink;
create table flink.user_info(id number primary key,name varchar2(100),age number);
##开启数据库级别补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
##开启该表的列附加日志
ALTER TABLE flink.user_info ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
创建用于cdc解析的表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/oradata/sharedb/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
创建flinkuser复制用户
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
 GRANT CREATE SESSION TO flinkuser;
 GRANT SET CONTAINER TO flinkuser;
 GRANT SELECT ON V_$DATABASE to flinkuser;
 GRANT FLASHBACK ANY TABLE TO flinkuser;
 GRANT SELECT ANY TABLE TO flinkuser;
 GRANT SELECT_CATALOG_ROLE TO flinkuser;
 GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
 GRANT SELECT ANY TRANSACTION TO flinkuser;
 GRANT LOGMINING TO flinkuser;
 GRANT ANALYZE ANY TO flinkuser;
 GRANT CREATE TABLE TO flinkuser;
 -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
 GRANT LOCK ANY TABLE TO flinkuser;
 GRANT ALTER ANY TABLE TO flinkuser;
 GRANT CREATE SEQUENCE TO flinkuser;
 GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
 GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
 GRANT SELECT ON V_$LOG TO flinkuser;
 GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
 GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
 GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
 GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
 GRANT SELECT ON V_$LOGFILE TO flinkuser;
 GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
 GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

  

 

 

启动sql-client(SQL 客户端 的目的是提供一种简单的方式来编写、调试和提交表程序到 Flink 集群上,而无需写一行 Java 或 Scala 代码。SQL 客户端命令行界面(CLI) 能够在命令行中检索和可视化分布式应用中实时产生的结果)

我的理解就是帮你智能生成java代码,不需要自己写代码。

[root@gcv-b-test-gmes-oracle conf]# /u01/flink-1.20.0/bin/sql-client.sh
 ▒▓██▓██▒
 ▓████▒▒█▓▒▓███▓▒
 ▓███▓░░ ▒▒▒▓██▒ ▒
 ░██▒ ▒▒▓▓█▓▓▒░ ▒████
 ██▒ ░▒▓███▒ ▒█▒█▒
 ░▓█ ███ ▓░▒██
 ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
 █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
 ████░ ▒▓█▓ ██▒▒▒ ▓███▒
 ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
 ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
 ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
 ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
 ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
 ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
 ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
 ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
 ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
 ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
 ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
 █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
 ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
 ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
 ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
 ▓█ ▒█▓ ░ █░ ▒█ █▓
 █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
 █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
 ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
 ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
 ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
 ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
 ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
 ______ _ _ _ _____ ____ _ _____ _ _ _ BETA
 | ____| (_) | | / ____|/ __ \| | / ____| (_) | |
 | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
 | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
 | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
 |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
 Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /root/.flink-sql-history
Flink SQL>

  

配置Oracle连接器:

CREATE TABLE user_info (
 ID INT NOT NULL,
 NAME STRING,
 AGE int,
 PRIMARY KEY(ID) NOT ENFORCED
 ) WITH (
 'connector' = 'oracle-cdc',
 'hostname' = '10.240.12.219',
 'port' = '1521',
 'username' = 'flinkuser',
 'password' = 'flinkpw',
 'database-name' = 'sharedb',
 'schema-name' = 'FLINK',
 'table-name' = 'USER_INFO',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true'
);##这里有个坑,字段必须大写啊(因为oracle默认都是大写,这里是严格区分大小写的)##如果大小写不一致,会识别不到字段。查询的时候报错如下:##org.apache.flink.table.api.TableException: Column 'id' is NOT NULL, ##however, a null value is being written into it.)

  

 查看数据:

Flink SQL> select * from user_info;
[INFO] Result retrieval cancelled.

  

 

mysql数据库同步的表:

create database flink;
CREATE TABLE `user_info` (
 `id` bigint(20) NOT NULL AUTO_INCREMENT,
 `name` varchar(100) NOT NULL,
 `age` bigint(20) NOT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

 

sql-client配置mysql:

CREATE TABLE user_info_mysql (
 id INT NOT NULL,
 name STRING,
 age int,
 PRIMARY KEY(id) NOT ENFORCED
 ) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.251.93.3:3306/flink',
	'driver' = 'com.mysql.cj.jdbc.Driver',
 'username' = 'xxxx',
 'password' = 'xxxx',
 'table-name' = 'user_info');
Insert into user_info_mysql select * from user_info;

  

 

 

验证:

oracle:插入数据

 mysql验证:

 

感觉上手难度不大,有些jar包容易漏,导致异常。

 

参考文档:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/tutorials/oracle-tutorial/

                  https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/oracle-cdc/

 

作者:阿西吧li原文地址:https://www.cnblogs.com/muzisanshi/p/18356836

%s 个评论

要回复文章请先登录注册