0. DataX 同步工具

DataX 同步工具

https://github.com/alibaba/DataX/tree/master

https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz

安装DataX

# 下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
源码地址:https://github.com/alibaba/DataX

# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz

mkdir /opt/module -p

tar -zxvf datax.tar.gz -C /opt/module/
# 运行自检脚本
cd /opt/module/datax/bin/
python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

# 常用配置
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)

MysqlWriter-writeMode

描述:控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句
所有选项:insert/replace/update
默认值:insert
insert into...(当主键/唯一性索引冲突时会写不进去冲突的行)
replace into...(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 Mysql。
出于性能考虑,采用了 PreparedStatement + Batch,并且设置了:rewriteBatchedStatements=true,
将数据缓冲到线程上下文 Buffer 中,当 Buffer 累计到预定阈值时,才发起写入请求。

HdfsWriter-writeMode

描述:hdfswriter写入前数据清理处理模式:
append,写入前不做任何处理,DataX hdfswriter直接使用filename写入,并保证文件名不冲突。
nonConflict,如果目录下有fileName前缀的文件,直接报错。
truncate,如果目录下有fileName前缀的文件,先删除后写入。

"splitPk": "", 分片字段,如果指定该字段,则DataX会启动多个Taask同步数据;
若未指定,则只会有单个Task。该参数只在TableMode模式下有效,QueryMode只会有单个Task。

?useUnicode=true&characterEncoding=UTF-8&useSSL=false"

限速常用配置

    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 2097152
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte": 10485760
            },
            "errorLimit": {
                "percentage": 0.2
            }
        },

python datax.py  --jvm="-Xms3G -Xmx3G" ../job/test.json
DataX使用优化
关键参数
job.setting.speed.channel : channel并发数
job.setting.speed.record : 2全局配置channel的record限速
job.setting.speed.byte:全局配置channel的byte限速
core.transport.channel.speed.record:单个channel的record限速
core.transport.channel.speed.byte:单个channel的byte限速

### 优化1:提升每个channel的速度
在DataX内部对每个Channel会有严格的速度控制,分两种,一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是1MB/s,
可以根据具体硬件情况设置这个byte速度或者record速度,一般设置byte速度,比如:我们可以把单个Channel的速度上限配置为5MB

### 优化2:提升DataX Job内Channel并发数 
并发数 = taskGroup的数量 * 每个TaskGroup并发执行的Task数 (默认为5)。

## 提升job内Channel并发有三种配置方式:
# 配置全局Byte限速以及单Channel Byte限速
Channel个数 = 全局Byte限速 / 单Channel Byte限速
core.transport.channel.speed.byte=1048576,job.setting.speed.byte=5242880,所以Channel个数 = 全局Byte限速 / 单Channel Byte限速=5242880/1048576=5个

# 配置全局Record限速以及单Channel Record限速
Channel个数 = 全局Record限速 / 单Channel Record限速
{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "record": 100
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "record" : 500
            }
        },
        ...
    }
}
core.transport.channel.speed.record=100,job.setting.speed.record=500,所以配置全局Record限速以及单Channel Record限速,Channel个数 = 全局Record限速 / 单Channel Record限速=500/100=5

# 直接配置Channel个数
    只有在上面两种未设置才生效,上面两个同时设置是取值小的作为最终的channel数。
{
    "job": {
        "setting": {
            "speed": {
                "channel" : 5
            }
        },
        ...
    }
}
直接配置job.setting.speed.channel=5,所以job内Channel并发=5个

### 优化3:提高JVM堆内存
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。
例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,
为了防止OOM等错误,调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json

批量提交行数(batchSize)对性能影响很大,当 batchSize>=512 之后,单线程写入速度能达到每秒写入一万行
在 batchSize>=512 的基础上,随着通道数的增加(通道数<32),速度呈线性比增加。
通常不建议写入数据库时,通道个数 >32

参数                           说明
job.setting.speed.channel     并发数
job.setting.speed.record     总 record 限速(tps:条数/s)
job.setting.speed.byte       总 byte 限速(bps:字节数/s)
core.transport.channel.speed.record      单个 channel 的record限速,默认值为10000(10000条/s)
core.transport.channel.speed.byte        单个channel的byte限速,默认值1024*1024(1M/s)

注意事项:
1. 若配置了总 record 限速,则必须配置单个 channel 的 record 限速
2. 若配置了总 byte 限速,则必须配置单个 channe 的 byte 限速
3. 若配置了总 record 限速和总 byte 限速,channel 并发数参数就会失效。因为配置了总record限速和总 byte 限速之后,实际 channel 并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channel的byte限速,总record限速/单个 channel 的record限速)

配置示例:
{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 2097152  //单个channel byte限速2M/s
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte" : 10485760 //总byte限速10M/s
            }
        },
        ...
    }
}

dataX 全量导入hdfs示例

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "username",
            "password": "password",
            "connection": [
              {
                "table": [
                  "mytable"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://localhost:3306/mydb"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hivewriter",
          "parameter": {
            "defaultFS": "hdfs://namenode:8020",
            "fileType": "orc",
            "path": "/user/hive/warehouse/mytable",
            "column": [
              // 列名列表
            ],
            "writeMode": "overwrite"
          }
        }
      }
    ]
  }
}

dataX 增量导入hdfs示例

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "username",
            "password": "password",
            "connection": [
              {
                "table": [
                  "mytable"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://localhost:3306/mydb"
                ],
                "querySql": [
                  "SELECT * FROM mytable WHERE update_time > '${startTime}'"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hivewriter",
          "parameter": {
            "defaultFS": "hdfs://namenode:8020",
            "fileType": "orc",
            "path": "/user/hive/warehouse/mytable_incremental",
            "column": [
              // 列名列表
            ],
            "writeMode": "append"
          }
        }
      }
    ]
  }
}

从stream流读取数据并打印到控制台

# 查看配置模板
python datax.py -r streamreader -w streamwriter
vim stream2stream.json
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
       }
    }
  }
}

datax.py stream2stream.json

读取MySQL中的数据存放到HDFS

# 查看配置模板
python datax.py -r mysqlreader -w hdfswriter
vim mysql2hdfs.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/datax"
                                ], 
                                "table": [
                                    "student"
                                ]
                            }
                        ], 
                        "username": "root", 
                        "password": "000000"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "int"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            }
                        ],  
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "fieldDelimiter": "\t", 
                        "fileName": "student.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

datax.py mysql2hdfs.json

hdfs HA的支持

"hadoopConfig":{
  "dfs.nameservices": "ns",
  "dfs.ha.namenodes.ns": "nn1,nn2",
  "dfs.namenode.rpc-address.ns.nn1": "主机名:端口",
  "dfs.namenode.rpc-address.ns.nn2": "主机名:端口",
  "dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}

读取HDFS数据写入MySQL

# 查看官方模板
python datax.py -r hdfsreader -w mysqlwriter
vim hdfs2mysql.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader", 
                    "parameter": {
                        "column": ["*"], 
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "encoding": "UTF-8", 
                        "fieldDelimiter": "\t", 
                        "fileType": "text", 
                        "path": "/student.txt"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/datax", 
                                "table": ["student2"]
                            }
                        ], 
                        "password": "000000", 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

datax.py hdfs2mysql.json

从Oracle中读取数据存到MySQL

// vim oracle2mysql.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "oraclereader",
                    "parameter": {
                        "column": ["*"],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:oracle:thin:@hadoop102:1521:orcl"],
                                "table": ["student"]
                            }
                        ],
                        "password": "000000",
                        "username": "atguigu"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": ["*"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/oracle",
                                "table": ["student"]
                            }
                        ],
                        "password": "000000",
                        "username": "root",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

datax.py oracle2mysql.json

读取Oracle的数据存入HDFS中

// vim oracle2hdfs.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "oraclereader", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:oracle:thin:@hadoop102:1521:orcl"], 
                                "table": ["student"]
                            }
                        ], 
                        "password": "000000", 
                        "username": "atguigu"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "int"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            }

                        ], 
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "fieldDelimiter": "\t", 
                        "fileName": "oracle.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

datax.py oracle2hdfs.json

读取MongoDB的数据导入到HDFS

// vim mongdb2hdfs.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader", 
                    "parameter": {
                        "address": ["127.0.0.1:27017"], 
                        "collectionName": "atguigu", 
                        "column": [
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"url",
                                "type":"string"
                            }
                        ], 
                        "dbName": "test", 
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"url",
                                "type":"string"
                            }
                        ], 
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "fieldDelimiter": "\t", 
                        "fileName": "mongo.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

datax.py mongdb2hdfs.json

读取MongoDB的数据导入MySQL

// vim mongodb2mysql.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader", 
                    "parameter": {
                        "address": ["127.0.0.1:27017"], 
                        "collectionName": "atguigu", 
                        "column": [
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"url",
                                "type":"string"
                            }
                        ], 
                        "dbName": "test", 
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/test", 
                                "table": ["atguigu"]
                            }
                        ], 
                        "password": "000000", 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

datax.py mongodb2mysql.json

读取SQLServer的数据导入到HDFS

// vim sqlserver2hdfs.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "sqlserverreader", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:sqlserver://hadoop2:1433;DatabaseName=datax"
                                ], 
                                "table": [
                                    "student"
                                ]
                            }
                        ], 
                        "username": "root", 
                        "password": "000000"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "int"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            }
                        ],  
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "fieldDelimiter": "\t", 
                        "fileName": "sqlserver.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

datax.py sqlserver2hdfs.json

读取SQLServer的数据导入MySQL

// vim sqlserver2mysql.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "sqlserverreader", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:sqlserver://hadoop2:1433;DatabaseName=datax"
                                ], 
                                "table": [
                                    "student"
                                ]
                            }
                        ], 
                        "username": "root", 
                        "password": "000000"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/datax", 
                                "table": ["student"]
                            }
                        ], 
                        "password": "000000", 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

datax.py sqlserver2mysql.json

db2

# 注册db2驱动
# datax暂时没有独立插件支持db2,需要使用通用的使用rdbmsreader或rdbmswriter。
    1)注册reader的db2驱动
vim /opt/module/datax/plugin/reader/rdbmsreader/plugin.json
#在drivers里添加db2的驱动类
"drivers":["dm.jdbc.driver.DmDriver", "com.sybase.jdbc3.jdbc.SybDriver", "com.edb.Driver","com.ibm.db2.jcc.DB2Driver"]
    2)注册writer的db2驱动
vim /opt/module/datax/plugin/writer/rdbmswriter/plugin.json
#在drivers里添加db2的驱动类
"drivers":["dm.jdbc.driver.DmDriver", "com.sybase.jdbc3.jdbc.SybDriver", "com.edb.Driver","com.ibm.db2.jcc.DB2Driver"]

读取DB2的数据导入到HDFS

// vim job/db2-2-hdfs.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "rdbmsreader", 
                    "parameter": {
                        "column": [
                            "ID",
                            "NAME"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:db2://hadoop2:50000/sample"
                                ], 
                                "table": [
                                    "STUDENT"
                                ]
                            }
                        ], 
                        "username": "db2inst1", 
                        "password": "atguigu"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "int"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            }
                        ],  
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "fieldDelimiter": "\t", 
                        "fileName": "db2.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

读取DB2的数据导入MySQL

// vim job/db2-2-mysql.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "rdbmsreader", 
                    "parameter": {
                        "column": [
                            "ID",
                            "NAME"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:db2://hadoop2:50000/sample"
                                ], 
                                "table": [
                                    "STUDENT"
                                ]
                            }
                        ], 
                        "username": "db2inst1", 
                        "password": "atguigu"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/datax", 
                                "table": ["student"]
                            }
                        ], 
                        "password": "000000", 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

mysql之间同步

https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md

{
    "job": {
        "setting": {
            "speed": {
                "channel": 8,
                // "record": -1,
                // "byte": -1,
                "batchSize": 2048
            },
            "errorLimit": {
                // "record": 10,
                "percentage": 0.2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age"
                        ],
                        "connection": [
                            {
                                "querySql": [
                                    "select id, name, age from user"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://localhost:3306/test?useSSL=false"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age"
                        ],
                        "session": ["set session sql_mode='ANSI'"],
                        "preSql": ["delete from test_users2 "],
                        "writeMode": "replace",
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://localhost:3306/test2?useSSL=false",
                                "table": [
                                    "user"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

mysql增量同步

[mysqld]
log-bin=mysql-bin # 启用 Binlog
binlog-format=ROW # 设置 Binlog 格式为 ROW

vim mysql_binlog_sync.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlbinlogreader",
                    "parameter": {
                        "username": "yourUsername",  // MySQL 用户名
                        "password": "yourPassword",  // MySQL 密码
                        "database": "yourDatabase",   // 数据库名
                        "table": ["yourTable"],       // 要同步的表
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://localhost:3306/yourDatabase",
                                "querySql": ["SELECT * FROM yourTable WHERE ID > {$last_id}"]  // 增量查询SQL
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "username": "yourUsername",  // 目标数据库用户名
                        "password": "yourPassword",  // 目标数据库密码
                        "database": "targetDatabase", // 目标数据库名
                        "table": ["targetTable"],     // 目标表名
                        "preSql": [
                            "SET FOREIGN_KEY_CHECKS = 0",  // 关闭外键检查
                            "SET UNIQUE_CHECKS = 0"        // 关闭唯一检查
                        ],
                        "postSql": [
                            "SET FOREIGN_KEY_CHECKS = 1",  // 恢复外键检查
                            "SET UNIQUE_CHECKS = 1"        // 恢复唯一检查
                        ]
                    }
                }
            }
        ],
        "settings": {
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            },
            "speed": {
                "channel": 3
            }
        }
    }
}
启动 DataX 并执行配置文件

python /path/to/datax/bin/datax.py /path/to/your/mysql_binlog_sync.json