首页  

DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步     所属分类 dolphinscheduler 浏览量 42
MySQL库A 到 MySQL库B的增量数据同步

DolphinScheduler中配置DataX MySQL To MySQL工作流

工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件

SHELL组件
脚本
echo 'DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步 '

DATAX组件 
用到2个插件mysqlreader mysqlwriter 
选 自定义模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/数据库A?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
                                ],
                                "querySql": [
                                    "select a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}';"
                                ]
                            }
                        ],
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "`id`",
                            "`title`",
                            "`content`",
                            "`is_delete`",
                            "`delete_date`",
                            "`create_date`",
                            "`update_date`"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/数据库B?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
                                "table": [
                                    "t_article"
                                ]
                            }
                        ],
                        "writeMode": "replace",
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0,
                "record": 0
            },
            "speed": {
                "channel": 1,
                "record": 1000
            }
        }
    }
}
reader和writer的字段配置需保持一致

自定义参数:

biz_update_dt: ${global_bizdate}
biz_mysql_host: xxx
biz_mysql_port: 3306
biz_mysql_username:  mysql账号
biz_mysql_password: mysql密码

本文实验环境A库和B库用的同一个实例,如果MySQL是多个实例,可以新增加参数定义例如 biz_mysql_host_b,
在模板中对应引用即可

配置的自定义参数将会自动替换json模板中的同名变量


reader mysqlreader插件中关键配置: 

a.update_date >= '${biz_update_dt}'  实现增量同步的关键配置

writer mysqlwriter插件中关键配置

"parameter": {
    "writeMode": "replace",
    ......
}
writeMode为replace,相同主键id重复写入数据,就会更新数据。
sql本质上执行的是 replace into

保存工作流
全局变量设置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]

global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档

结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天


参考 1. DataX MysqlReader 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 2. DataX MysqlWriter 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md 3. Apache DolphinScheduler 内置参数 https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2

上一篇     下一篇
《腾讯传》笔记

文件权限设置报错 UnsupportedOperationException

《思考快与慢》 笔记

《金融心理学》笔记

Dbsyncer 开源多数据库同步利器

Quartz Cron 表达式测试