DataX 同步工具
https://github.com/alibaba/DataX/tree/master
https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
安装DataX
# 下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
源码地址:https://github.com/alibaba/DataX
# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
mkdir /opt/module -p
tar -zxvf datax.tar.gz -C /opt/module/
# 运行自检脚本
cd /opt/module/datax/bin/
python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
# 常用配置
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
MysqlWriter-writeMode
描述:控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句
所有选项:insert/replace/update
默认值:insert
insert into...(当主键/唯一性索引冲突时会写不进去冲突的行)
replace into...(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 Mysql。
出于性能考虑,采用了 PreparedStatement + Batch,并且设置了:rewriteBatchedStatements=true,
将数据缓冲到线程上下文 Buffer 中,当 Buffer 累计到预定阈值时,才发起写入请求。
HdfsWriter-writeMode
描述:hdfswriter写入前数据清理处理模式:
append,写入前不做任何处理,DataX hdfswriter直接使用filename写入,并保证文件名不冲突。
nonConflict,如果目录下有fileName前缀的文件,直接报错。
truncate,如果目录下有fileName前缀的文件,先删除后写入。
"splitPk": "", 分片字段,如果指定该字段,则DataX会启动多个Taask同步数据;
若未指定,则只会有单个Task。该参数只在TableMode模式下有效,QueryMode只会有单个Task。
?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
限速常用配置
"core": {
"transport": {
"channel": {
"speed": {
"byte": 2097152
}
}
}
},
"job": {
"setting": {
"speed": {
"byte": 10485760
},
"errorLimit": {
"percentage": 0.2
}
},
python datax.py --jvm="-Xms3G -Xmx3G" ../job/test.json
DataX使用优化
关键参数
job.setting.speed.channel : channel并发数
job.setting.speed.record : 2全局配置channel的record限速
job.setting.speed.byte:全局配置channel的byte限速
core.transport.channel.speed.record:单个channel的record限速
core.transport.channel.speed.byte:单个channel的byte限速
### 优化1:提升每个channel的速度
在DataX内部对每个Channel会有严格的速度控制,分两种,一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是1MB/s,
可以根据具体硬件情况设置这个byte速度或者record速度,一般设置byte速度,比如:我们可以把单个Channel的速度上限配置为5MB
### 优化2:提升DataX Job内Channel并发数
并发数 = taskGroup的数量 * 每个TaskGroup并发执行的Task数 (默认为5)。
## 提升job内Channel并发有三种配置方式:
# 配置全局Byte限速以及单Channel Byte限速
Channel个数 = 全局Byte限速 / 单Channel Byte限速
core.transport.channel.speed.byte=1048576,job.setting.speed.byte=5242880,所以Channel个数 = 全局Byte限速 / 单Channel Byte限速=5242880/1048576=5个
# 配置全局Record限速以及单Channel Record限速
Channel个数 = 全局Record限速 / 单Channel Record限速
{
"core": {
"transport": {
"channel": {
"speed": {
"record": 100
}
}
}
},
"job": {
"setting": {
"speed": {
"record" : 500
}
},
...
}
}
core.transport.channel.speed.record=100,job.setting.speed.record=500,所以配置全局Record限速以及单Channel Record限速,Channel个数 = 全局Record限速 / 单Channel Record限速=500/100=5
# 直接配置Channel个数
只有在上面两种未设置才生效,上面两个同时设置是取值小的作为最终的channel数。
{
"job": {
"setting": {
"speed": {
"channel" : 5
}
},
...
}
}
直接配置job.setting.speed.channel=5,所以job内Channel并发=5个
### 优化3:提高JVM堆内存
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。
例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,
为了防止OOM等错误,调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json
批量提交行数(batchSize)对性能影响很大,当 batchSize>=512 之后,单线程写入速度能达到每秒写入一万行
在 batchSize>=512 的基础上,随着通道数的增加(通道数<32),速度呈线性比增加。
通常不建议写入数据库时,通道个数 >32
参数 说明
job.setting.speed.channel 并发数
job.setting.speed.record 总 record 限速(tps:条数/s)
job.setting.speed.byte 总 byte 限速(bps:字节数/s)
core.transport.channel.speed.record 单个 channel 的record限速,默认值为10000(10000条/s)
core.transport.channel.speed.byte 单个channel的byte限速,默认值1024*1024(1M/s)
注意事项:
1. 若配置了总 record 限速,则必须配置单个 channel 的 record 限速
2. 若配置了总 byte 限速,则必须配置单个 channe 的 byte 限速
3. 若配置了总 record 限速和总 byte 限速,channel 并发数参数就会失效。因为配置了总record限速和总 byte 限速之后,实际 channel 并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channel的byte限速,总record限速/单个 channel 的record限速)
配置示例:
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 2097152 //单个channel byte限速2M/s
}
}
}
},
"job": {
"setting": {
"speed": {
"byte" : 10485760 //总byte限速10M/s
}
},
...
}
}
dataX 全量导入hdfs示例
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "username",
"password": "password",
"connection": [
{
"table": [
"mytable"
],
"jdbcUrl": [
"jdbc:mysql://localhost:3306/mydb"
]
}
]
}
},
"writer": {
"name": "hivewriter",
"parameter": {
"defaultFS": "hdfs://namenode:8020",
"fileType": "orc",
"path": "/user/hive/warehouse/mytable",
"column": [
// 列名列表
],
"writeMode": "overwrite"
}
}
}
]
}
}
dataX 增量导入hdfs示例
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "username",
"password": "password",
"connection": [
{
"table": [
"mytable"
],
"jdbcUrl": [
"jdbc:mysql://localhost:3306/mydb"
],
"querySql": [
"SELECT * FROM mytable WHERE update_time > '${startTime}'"
]
}
]
}
},
"writer": {
"name": "hivewriter",
"parameter": {
"defaultFS": "hdfs://namenode:8020",
"fileType": "orc",
"path": "/user/hive/warehouse/mytable_incremental",
"column": [
// 列名列表
],
"writeMode": "append"
}
}
}
]
}
}
从stream流读取数据并打印到控制台
# 查看配置模板
python datax.py -r streamreader -w streamwriter
vim stream2stream.json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
datax.py stream2stream.json
读取MySQL中的数据存放到HDFS
# 查看配置模板
python datax.py -r mysqlreader -w hdfswriter
vim mysql2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/datax"
],
"table": [
"student"
]
}
],
"username": "root",
"password": "000000"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "student.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
datax.py mysql2hdfs.json
hdfs HA的支持
"hadoopConfig":{
"dfs.nameservices": "ns",
"dfs.ha.namenodes.ns": "nn1,nn2",
"dfs.namenode.rpc-address.ns.nn1": "主机名:端口",
"dfs.namenode.rpc-address.ns.nn2": "主机名:端口",
"dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
读取HDFS数据写入MySQL
# 查看官方模板
python datax.py -r hdfsreader -w mysqlwriter
vim hdfs2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": ["*"],
"defaultFS": "hdfs://hadoop102:9000",
"encoding": "UTF-8",
"fieldDelimiter": "\t",
"fileType": "text",
"path": "/student.txt"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/datax",
"table": ["student2"]
}
],
"password": "000000",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
datax.py hdfs2mysql.json
从Oracle中读取数据存到MySQL
// vim oracle2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "oraclereader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": ["jdbc:oracle:thin:@hadoop102:1521:orcl"],
"table": ["student"]
}
],
"password": "000000",
"username": "atguigu"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/oracle",
"table": ["student"]
}
],
"password": "000000",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
datax.py oracle2mysql.json
读取Oracle的数据存入HDFS中
// vim oracle2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "oraclereader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": ["jdbc:oracle:thin:@hadoop102:1521:orcl"],
"table": ["student"]
}
],
"password": "000000",
"username": "atguigu"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "oracle.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
datax.py oracle2hdfs.json
读取MongoDB的数据导入到HDFS
// vim mongdb2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["127.0.0.1:27017"],
"collectionName": "atguigu",
"column": [
{
"name":"name",
"type":"string"
},
{
"name":"url",
"type":"string"
}
],
"dbName": "test",
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name":"name",
"type":"string"
},
{
"name":"url",
"type":"string"
}
],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "mongo.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
datax.py mongdb2hdfs.json
读取MongoDB的数据导入MySQL
// vim mongodb2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["127.0.0.1:27017"],
"collectionName": "atguigu",
"column": [
{
"name":"name",
"type":"string"
},
{
"name":"url",
"type":"string"
}
],
"dbName": "test",
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/test",
"table": ["atguigu"]
}
],
"password": "000000",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
datax.py mongodb2mysql.json
读取SQLServer的数据导入到HDFS
// vim sqlserver2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "sqlserverreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:sqlserver://hadoop2:1433;DatabaseName=datax"
],
"table": [
"student"
]
}
],
"username": "root",
"password": "000000"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "sqlserver.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
datax.py sqlserver2hdfs.json
读取SQLServer的数据导入MySQL
// vim sqlserver2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "sqlserverreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:sqlserver://hadoop2:1433;DatabaseName=datax"
],
"table": [
"student"
]
}
],
"username": "root",
"password": "000000"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/datax",
"table": ["student"]
}
],
"password": "000000",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
datax.py sqlserver2mysql.json
db2
# 注册db2驱动
# datax暂时没有独立插件支持db2,需要使用通用的使用rdbmsreader或rdbmswriter。
1)注册reader的db2驱动
vim /opt/module/datax/plugin/reader/rdbmsreader/plugin.json
#在drivers里添加db2的驱动类
"drivers":["dm.jdbc.driver.DmDriver", "com.sybase.jdbc3.jdbc.SybDriver", "com.edb.Driver","com.ibm.db2.jcc.DB2Driver"]
2)注册writer的db2驱动
vim /opt/module/datax/plugin/writer/rdbmswriter/plugin.json
#在drivers里添加db2的驱动类
"drivers":["dm.jdbc.driver.DmDriver", "com.sybase.jdbc3.jdbc.SybDriver", "com.edb.Driver","com.ibm.db2.jcc.DB2Driver"]
读取DB2的数据导入到HDFS
// vim job/db2-2-hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"column": [
"ID",
"NAME"
],
"connection": [
{
"jdbcUrl": [
"jdbc:db2://hadoop2:50000/sample"
],
"table": [
"STUDENT"
]
}
],
"username": "db2inst1",
"password": "atguigu"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "db2.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
读取DB2的数据导入MySQL
// vim job/db2-2-mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"column": [
"ID",
"NAME"
],
"connection": [
{
"jdbcUrl": [
"jdbc:db2://hadoop2:50000/sample"
],
"table": [
"STUDENT"
]
}
],
"username": "db2inst1",
"password": "atguigu"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/datax",
"table": ["student"]
}
],
"password": "000000",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
mysql之间同步
https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
{
"job": {
"setting": {
"speed": {
"channel": 8,
// "record": -1,
// "byte": -1,
"batchSize": 2048
},
"errorLimit": {
// "record": 10,
"percentage": 0.2
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age"
],
"connection": [
{
"querySql": [
"select id, name, age from user"
],
"jdbcUrl": [
"jdbc:mysql://localhost:3306/test?useSSL=false"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age"
],
"session": ["set session sql_mode='ANSI'"],
"preSql": ["delete from test_users2 "],
"writeMode": "replace",
"connection": [
{
"jdbcUrl": "jdbc:mysql://localhost:3306/test2?useSSL=false",
"table": [
"user"
]
}
]
}
}
}
]
}
}
mysql增量同步
[mysqld]
log-bin=mysql-bin # 启用 Binlog
binlog-format=ROW # 设置 Binlog 格式为 ROW
vim mysql_binlog_sync.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlbinlogreader",
"parameter": {
"username": "yourUsername", // MySQL 用户名
"password": "yourPassword", // MySQL 密码
"database": "yourDatabase", // 数据库名
"table": ["yourTable"], // 要同步的表
"connection": [
{
"jdbcUrl": "jdbc:mysql://localhost:3306/yourDatabase",
"querySql": ["SELECT * FROM yourTable WHERE ID > {$last_id}"] // 增量查询SQL
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "yourUsername", // 目标数据库用户名
"password": "yourPassword", // 目标数据库密码
"database": "targetDatabase", // 目标数据库名
"table": ["targetTable"], // 目标表名
"preSql": [
"SET FOREIGN_KEY_CHECKS = 0", // 关闭外键检查
"SET UNIQUE_CHECKS = 0" // 关闭唯一检查
],
"postSql": [
"SET FOREIGN_KEY_CHECKS = 1", // 恢复外键检查
"SET UNIQUE_CHECKS = 1" // 恢复唯一检查
]
}
}
}
],
"settings": {
"errorLimit": {
"record": 0,
"percentage": 0.02
},
"speed": {
"channel": 3
}
}
}
}
启动 DataX 并执行配置文件
python /path/to/datax/bin/datax.py /path/to/your/mysql_binlog_sync.json