8. DataX 批量处理

DataX 批量处理

安装DataX

下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
源码地址:https://github.com/alibaba/DataX

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

mkdir /opt/module -p

tar -zxvf datax.tar.gz -C /opt/module/
# 运行自检脚本
cd /opt/module/datax/bin/
python datax.py /opt/module/datax/job/job.json

# 自检失败
rm -rf /opt/module/datax/plugin/*/._*

# 安装驱动
yum install MySQL-python -y
压缩比:ORC >  Parquet >  textFile(textfile没有进行压缩)
查询速度:三者几乎一致
如果为textfile的文件格式,直接load就OK,不需要走MapReduce;
如果是其他的类型就需要走MapReduce了,因为其他的类型都涉及到了文件的压缩,这需要借助MapReduce的压缩方式来实现
# hive textfile这种格式存储
CREATE TABLE `t_user_active`(
 `id` string COMMENT '主键ID',
 `user_id` string COMMENT '用户ID',
 `user_name` string COMMENT '用户名称',
 `channel_id` string COMMENT '渠道号id',
 `use_count` int COMMENT '使用个数据',
 `create_by` string COMMENT '创建者ID',
 `create_date` string COMMENT '创建时间',
 `update_by` string COMMENT '更新者ID',
 `update_date` timestamp COMMENT '更新时间'
 )
PARTITIONED BY (
 `create_data` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;

# dataX配置
"fileType": "text",

# ocr的格式存储,开启snappy压缩
CREATE TABLE `t_user_active`(
 `id` string COMMENT '主键ID',
 `user_id` string COMMENT '用户ID',
 `user_name` string COMMENT '用户名称',
 `channel_id` string COMMENT '渠道号id',
 `use_count` int COMMENT '使用个数据',
 `create_by` string COMMENT '创建者ID',
 `create_date` string COMMENT '创建时间',
 `update_by` string COMMENT '更新者ID',
 `update_date` timestamp COMMENT '更新时间'
 )
PARTITIONED BY ( `create_data` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS orc
tblproperties("orc.compress"="SNAPPY");

# dataX配置
"fileType": "orc",
"compress": "SNAPPY",

DataX配置文件生成脚本

### vim /opt/module/datax/job/gen_import_config.py
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "172.16.100.42"
mysql_port = "4000"
mysql_user = "root"
mysql_passwd = "84^j1TZ-+2b9A@S7Hs"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "kube-42"
hdfs_nn_port = "8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"

def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)

def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall

def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))

def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)

def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "overwrite",
                        "fieldDelimiter": "\t",
                        "encoding": "utf-8",
                        "fileType": "text"
                        # "fileType": "orc",
                        # "compress": "SNAPPY"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)

def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)

if __name__ == '__main__':
    main(sys.argv[1:])

# python gen_import_config.py -d database -t table
# python gen_import_config.py -d bikeca -t t_bike_ca_trip_hive

    """
    导入Hive表
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "encoding": "utf-8",
                        "fileType": "orc",
                        "compress": "SNAPPY"  # ORC文件压缩方式,可选SNAPPY、ZLIB、NONE   ZLIB 压缩算法是 ORC 的默认压缩策略
                    }
                }
    """

# vim /opt/module/datax/job/gen_import_config.sh
#!/bin/bash
python /opt/module/datax/job/gen_import_config.py -d bikeca -t t_bike_ca_trip
python /opt/module/datax/job/gen_import_config.py -d bikeca -t t_biz_loc_service
python /opt/module/datax/job/gen_import_config.py -d bikeca -t t_bike_ca_trip_hive
python /opt/module/datax/job/gen_import_config.py -d bikeca -t t_biz_loc_service_hive

# 生成hdfs导入配置文件
sh /opt/module/datax/job/gen_import_config.sh

# vim /opt/module/datax/job/gen_import_config_hive.sh
#!/bin/bash
python /opt/module/datax/job/gen_import_config_hive.py -d bikeca -t t_bike_ca_trip
python /opt/module/datax/job/gen_import_config_hive.py -d bikeca -t t_biz_loc_service
python /opt/module/datax/job/gen_import_config_hive.py -d bikeca -t t_bike_ca_trip_hive
python /opt/module/datax/job/gen_import_config_hive.py -d bikeca -t t_biz_loc_service_hive

# 生成hdfs导入配置文件
sh /opt/module/datax/job/gen_import_config_hive.sh
# 测试
su - admin
python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/user/hive/warehouse/bikeca.db/t_biz_loc_service_test"  /opt/module/datax/job/import/bikeca.t_biz_loc_service.json

textfile格式

-- bikeca.t_biz_loc_service_hive definition
CREATE TABLE `bikeca.t_biz_loc_service_test`(
  `id_` string, 
  `serviceid_` string, 
  `service_name_` string, 
  `city_code_` string, 
  `service_type_` string, 
  `service_left_menu_` string, 
  `service_lock_` string, 
  `lon_` double, 
  `lat_` double, 
  `map_x_` double, 
  `map_y_` double, 
  `range_` int, 
  `weight_` int, 
  `biz_detail_` string, 
  `price_info_` string, 
  `nopile_price_info_` string, 
  `moped_price_info_` string, 
  `oper_time_` string, 
  `creator_` string, 
  `version_` int, 
  `ad_type_` string, 
  `operation_model_` string, 
  `operation_subject_` string, 
  `operation_start_date_` string, 
  `operation_status_` string, 
  `description_` string)
COMMENT 'Hive TEXTFILE TEST'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
STORED AS TEXTFILE;

ORC格式 无压缩

-- bikeca.t_biz_loc_service_hive definition
CREATE TABLE `bikeca.t_biz_loc_service_test`(
  `id_` string, 
  `serviceid_` string, 
  `service_name_` string, 
  `city_code_` string, 
  `service_type_` string, 
  `service_left_menu_` string, 
  `service_lock_` string, 
  `lon_` double, 
  `lat_` double, 
  `map_x_` double, 
  `map_y_` double, 
  `range_` int, 
  `weight_` int, 
  `biz_detail_` string, 
  `price_info_` string, 
  `nopile_price_info_` string, 
  `moped_price_info_` string, 
  `oper_time_` string, 
  `creator_` string, 
  `version_` int, 
  `ad_type_` string, 
  `operation_model_` string, 
  `operation_subject_` string, 
  `operation_start_date_` string, 
  `operation_status_` string, 
  `description_` string)
COMMENT 'Hive ORC TEST'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
STORED AS ORC;

# 添加压缩
-- bikeca.t_biz_loc_service_hive definition
CREATE TABLE `bikeca.t_biz_loc_service_orc`(
  `id_` string, 
  `serviceid_` string, 
  `service_name_` string, 
  `city_code_` string, 
  `service_type_` string, 
  `service_left_menu_` string, 
  `service_lock_` string, 
  `lon_` double, 
  `lat_` double, 
  `map_x_` double, 
  `map_y_` double, 
  `range_` int, 
  `weight_` int, 
  `biz_detail_` string, 
  `price_info_` string, 
  `nopile_price_info_` string, 
  `moped_price_info_` string, 
  `oper_time_` string, 
  `creator_` string, 
  `version_` int, 
  `ad_type_` string, 
  `operation_model_` string, 
  `operation_subject_` string, 
  `operation_start_date_` string, 
  `operation_status_` string, 
  `description_` string)
COMMENT 'Hive ORC SNAPPY TEST'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
STORED AS ORC
tblproperties("orc.compress"="SNAPPY");

python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/user/hive/warehouse/bikeca.db/t_biz_loc_service_orc"  /opt/module/datax/job/import/bikeca.t_biz_loc_service.json

全量表数据同步脚本-示例

vim mysql_to_hdfs_full.sh

#!/bin/bash

DATAX_HOME=/opt/module/datax
DATAX_DATA=/opt/module/datax/job

#清理脏数据
handle_targetdir() {
  hadoop fs -rm -r $1 >/dev/null 2>&1
  hadoop fs -mkdir -p $1
}

#数据同步
import_data() {
  local datax_config=$1
  local target_dir=$2

  handle_targetdir "$target_dir"
  echo "正在处理$1"
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config >/tmp/datax_run.log 2>&1
  if [ $? -ne 0 ]
  then
    echo "处理失败, 日志如下:"
    cat /tmp/datax_run.log 
  fi
  rm /tmp/datax_run.log 
}

#接收表名变量
tab=$1
# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=$(date -d "-1 day" +%F)
fi

case ${tab} in
base_category_info | base_province | base_source | base_subject_info | cart_info | chapter_info | course_info | knowledge_point | test_paper | test_paper_question | test_point_question | test_question_info | test_question_option | user_chapter_process | video_info)
  import_data $DATAX_DATA/import/edu2077.${tab}.json /origin_data/edu/db/${tab}_full/$do_date
  ;;
"all")
  for tmp in base_category_info base_province base_source base_subject_info cart_info chapter_info course_info knowledge_point test_paper test_paper_question test_point_question test_question_info test_question_option user_chapter_process video_info
  do
    import_data $DATAX_DATA/import/edu2077.${tmp}.json /origin_data/edu/db/${tmp}_full/$do_date
  done
  ;;
esac

# 运行
mysql_to_hdfs_full.sh all 2022-02-21
# 自定义字段清洗数据
vim bikeca.t_biz_loc_service_orc_etl.json

{
    "job": {
        "content": [
            {
                "writer": {
                    "parameter": {
                        "encoding": "utf-8", 
                        "writeMode": "append", 
                        "fieldDelimiter": "  ", 
                        "column": [
                            {
                                "type": "string", 
                                "name": "ID_"
                            }, 
                            {
                                "type": "string", 
                                "name": "SERVICEID_"
                            }, 
                            {
                                "type": "string", 
                                "name": "SERVICE_NAME_"
                            }, 
                            {
                                "type": "string", 
                                "name": "CITY_CODE_"
                            }, 
                            {
                                "type": "string", 
                                "name": "SERVICE_TYPE_"
                            }, 
                            {
                                "type": "string", 
                                "name": "SERVICE_LEFT_MENU_"
                            }, 
                            {
                                "type": "string", 
                                "name": "SERVICE_LOCK_"
                            }, 
                            {
                                "type": "float", 
                                "name": "LON_"
                            }, 
                            {
                                "type": "float", 
                                "name": "LAT_"
                            }, 
                            {
                                "type": "float", 
                                "name": "MAP_X_"
                            }, 
                            {
                                "type": "float", 
                                "name": "MAP_Y_"
                            }, 
                            {
                                "type": "bigint", 
                                "name": "RANGE_"
                            }, 
                            {
                                "type": "bigint", 
                                "name": "WEIGHT_"
                            }, 
                            {
                                "type": "string", 
                                "name": "BIZ_DETAIL_"
                            }, 
                            {
                                "type": "string", 
                                "name": "PRICE_INFO_"
                            }, 
                            {
                                "type": "string", 
                                "name": "NOPILE_PRICE_INFO_"
                            }, 
                            {
                                "type": "string", 
                                "name": "MOPED_PRICE_INFO_"
                            }, 
                            {
                                "type": "string", 
                                "name": "OPER_TIME_"
                            }, 
                            {
                                "type": "string", 
                                "name": "CREATOR_"
                            }, 
                            {
                                "type": "bigint", 
                                "name": "VERSION_"
                            }, 
                            {
                                "type": "string", 
                                "name": "AD_TYPE_"
                            }, 
                            {
                                "type": "string", 
                                "name": "OPERATION_MODEL_"
                            }, 
                            {
                                "type": "string", 
                                "name": "OPERATION_SUBJECT_"
                            }, 
                            {
                                "type": "string", 
                                "name": "OPERATION_START_DATE_"
                            }, 
                            {
                                "type": "string", 
                                "name": "OPERATION_STATUS_"
                            }, 
                            {
                                "type": "string", 
                                "name": "DESCRIPTION_"
                            }
                        ], 
                        "path": "${targetdir}", 
                        "fileType": "orc", 
                        "defaultFS": "hdfs://kube-42:8020", 
                        "compress": "SNAPPY", 
                        "fileName": "t_biz_loc_service_orc_etl"
                    }, 
                    "name": "hdfswriter"
                }, 
                "reader": {
                    "parameter": {
                        "username": "root", 
                        "column": [
                            "ID_", 
                            "SERVICEID_", 
                            "SERVICE_NAME_", 
                            "CITY_CODE_", 
                            "SERVICE_TYPE_", 
                            "SERVICE_LEFT_MENU_", 
                            "SERVICE_LOCK_", 
                            "LON_", 
                            "LAT_", 
                            "MAP_X_", 
                            "MAP_Y_", 
                            "RANGE_", 
                            "WEIGHT_", 
                            "BIZ_DETAIL_", 
                            "PRICE_INFO_", 
                            "NOPILE_PRICE_INFO_", 
                            "MOPED_PRICE_INFO_", 
                            "OPER_TIME_", 
                            "CREATOR_", 
                            "VERSION_", 
                            "AD_TYPE_", 
                            "OPERATION_MODEL_", 
                            "OPERATION_SUBJECT_", 
                            "OPERATION_START_DATE_", 
                            "OPERATION_STATUS_", 
                            "DESCRIPTION_"
                        ], 
                        "connection": [
                            {
                                "table": [
                                    "t_biz_loc_service_hive"
                                ], 
                                "jdbcUrl": [
                                    "jdbc:mysql://172.16.100.42:4000/bikeca"
                                ]
                            }
                        ], 
                        "password": "84^j1TZ-+2b9A@S7Hs", 
                        "splitPk": ""
                    }, 
                    "name": "mysqlreader"
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 3
            }, 
            "errorLimit": {
                "record": 0, 
                "percentage": 0.02
            }
        }
    }
}

-- bikeca.t_biz_loc_service_hive definition
CREATE TABLE `bikeca.t_biz_loc_service_orc_etl`(
  `id_` string COMMENT '', 
  `serviceid_` string COMMENT '', 
  `service_name_` string COMMENT '', 
  `city_code_` string COMMENT '', 
  `service_type_` string COMMENT '', 
  `service_left_menu_` string COMMENT '', 
  `service_lock_` string COMMENT '', 
  `lon_` double, 
  `lat_` double, 
  `map_x_` double, 
  `map_y_` double, 
  `range_` int, 
  `weight_` int, 
  `biz_detail_` string COMMENT '', 
  `price_info_` string COMMENT '', 
  `nopile_price_info_` string COMMENT '', 
  `moped_price_info_` string COMMENT '', 
  `oper_time_` string COMMENT '', 
  `creator_` string COMMENT '', 
  `version_` int, 
  `ad_type_` string COMMENT '', 
  `operation_model_` string COMMENT '', 
  `operation_subject_` string COMMENT '', 
  `operation_start_date_` string COMMENT '', 
  `operation_status_` string COMMENT '', 
  `description_` string COMMENT '')
COMMENT 'Hive ORC SNAPPY TEST'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
STORED AS ORC
tblproperties("orc.compress"="SNAPPY");

python /opt/module/datax/bin/datax.py \
-p"-Dtargetdir=/user/hive/warehouse/bikeca.db/t_biz_loc_service_orc_etl" \
/opt/module/datax/job/import/bikeca.t_biz_loc_service_orc_etl.json

历史数据同步至tidb t_bike_ca_trip_old

vim /opt/module/datax/job/import/bikeca.t_bike_ca_trip_old.json

{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 2097152
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                // "byte": 10485760
                "byte": 16777216,
                "batchSize": 2048
            },
            "errorLimit": {
                "percentage": 0.2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "ddarchive",
                        "password": "ddarchive@123",
                        "splitPk": "ID_",
                        "column": [
                            "ID_",
                            "PART_ID_",
                            "BIKE_ID_",
                            "HIRE_BIKEPOS_ID_",
                            "HIRE_STATION_ID_",
                            "HIRE_DATE_",
                            "HIRE_STATUS_ID_",
                            "RESTORE_BIKEPOS_ID_",
                            "RESTORE_STATION_ID_",
                            "RESTORE_DATE_",
                            "ERRORHDL_BIKEPOS_ID_",
                            "ERRORHDL_STATION_ID_",
                            "ERRORHDL_DATE_",
                            "ERRORHDL_TYPE_",
                            "TRANS_MONEY_",
                            "USER_ID_",
                            "CITY_CODE_",
                            "GEN_DATE_",
                            "LOGIN_TYPE_",
                            "PAY_ORDER_ID_",
                            "PAY_ORDER_STATUS_",
                            "APP_ID_",
                            "HIRE_STATION_NAME_",
                            "RESTORE_STATION_NAME_",
                            "ERRORHDL_STATION_NAME_",
                            "H_LOCK_TYPE_",
                            "R_LOCK_TYPE_",
                            "OPER_ID_",
                            "SERVICE_ID_",
                            "COUP_USED_ID_",
                            "BEF_DISCOUNT__MONEY_",
                            "COUP_TYPE_",
                            "RELET_ID_",
                            "CARD_USER_ID_",
                            "USER_PHONE_",
                            "SUPPLY_STATUS_"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "t_bike_ca_trip_17_01",
                                    "t_bike_ca_trip_17_02",
                                    "t_bike_ca_trip_17_03",
                                    "t_bike_ca_trip_17_04",
                                    "t_bike_ca_trip_17_05",
                                    "t_bike_ca_trip_17_06",
                                    "t_bike_ca_trip_17_07",
                                    "t_bike_ca_trip_17_08",
                                    "t_bike_ca_trip_17_09",
                                    "t_bike_ca_trip_17_10",
                                    "t_bike_ca_trip_17_11",
                                    "t_bike_ca_trip_17_12",
                                    "t_bike_ca_trip_18_01",
                                    "t_bike_ca_trip_18_02",
                                    "t_bike_ca_trip_18_03",
                                    "t_bike_ca_trip_18_04",
                                    "t_bike_ca_trip_18_05",
                                    "t_bike_ca_trip_18_06",
                                    "t_bike_ca_trip_18_07",
                                    "t_bike_ca_trip_18_08",
                                    "t_bike_ca_trip_18_09",
                                    "t_bike_ca_trip_18_10",
                                    "t_bike_ca_trip_18_11",
                                    "t_bike_ca_trip_18_12",
                                    "t_bike_ca_trip_19_01",
                                    "t_bike_ca_trip_19_02",
                                    "t_bike_ca_trip_19_03",
                                    "t_bike_ca_trip_19_04",
                                    "t_bike_ca_trip_19_05",
                                    "t_bike_ca_trip_19_06",
                                    "t_bike_ca_trip_19_07",
                                    "t_bike_ca_trip_19_08",
                                    "t_bike_ca_trip_19_09",
                                    "t_bike_ca_trip_19_10",
                                    "t_bike_ca_trip_19_11",
                                    "t_bike_ca_trip_19_12",
                                    "t_bike_ca_trip_20_01",
                                    "t_bike_ca_trip_20_02",
                                    "t_bike_ca_trip_20_03"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://rm-bp1ib4p42qz893l9y1o.mysql.rds.aliyuncs.com:3306/ddarchive?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "username": "root",
                        "password": "84^j1TZ-+2b9A@S7Hs",
                        "column": [
                            "ID_",
                            "PART_ID_",
                            "BIKE_ID_",
                            "HIRE_BIKEPOS_ID_",
                            "HIRE_STATION_ID_",
                            "HIRE_DATE_",
                            "HIRE_STATUS_ID_",
                            "RESTORE_BIKEPOS_ID_",
                            "RESTORE_STATION_ID_",
                            "RESTORE_DATE_",
                            "ERRORHDL_BIKEPOS_ID_",
                            "ERRORHDL_STATION_ID_",
                            "ERRORHDL_DATE_",
                            "ERRORHDL_TYPE_",
                            "TRANS_MONEY_",
                            "USER_ID_",
                            "CITY_CODE_",
                            "GEN_DATE_",
                            "LOGIN_TYPE_",
                            "PAY_ORDER_ID_",
                            "PAY_ORDER_STATUS_",
                            "APP_ID_",
                            "HIRE_STATION_NAME_",
                            "RESTORE_STATION_NAME_",
                            "ERRORHDL_STATION_NAME_",
                            "H_LOCK_TYPE_",
                            "R_LOCK_TYPE_",
                            "OPER_ID_",
                            "SERVICE_ID_",
                            "COUP_USED_ID_",
                            "BEF_DISCOUNT__MONEY_",
                            "COUP_TYPE_",
                            "RELET_ID_",
                            "CARD_USER_ID_",
                            "USER_PHONE_",
                            "SUPPLY_STATUS_" 
                        ],
                        "writeMode": "replace",
                        "encoding": "UTF-8",
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://172.16.100.42:4000/bikeca?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
                                "table": [
                                    "t_bike_ca_trip_old"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

# 运行
python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" /opt/module/datax/job/import/bikeca.t_bike_ca_trip_old.json

# 同步数据量验证
SELECT count(1) FROM t_bike_ca_trip_18_07;
SELECT COUNT(1) FROM t_bike_ca_trip_old WHERE HIRE_DATE_ BETWEEN  '2018-07-01 00:00:00' and '2018-08-01 00:00:00'