9. 数据导入清洗

数据导入清洗

抽取(Extract)、转换(Transform)、加载(Load)

DataX 分区导入参考,原始数据

vim mysql2hive.json

{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 2097152
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                // "byte": 10485760
                "byte": 16777216,
                "batchSize": 2048
            },
            "errorLimit": {
                "percentage": 0.2
            }
        },
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "column": [
                            "ID_",
                            "PART_ID_",
                            "BIKE_ID_",
                            "HIRE_BIKEPOS_ID_",
                            "HIRE_STATION_ID_",
                            "HIRE_DATE_",
                            "HIRE_STATUS_ID_",
                            "RESTORE_BIKEPOS_ID_",
                            "RESTORE_STATION_ID_",
                            "RESTORE_DATE_",
                            "ERRORHDL_BIKEPOS_ID_",
                            "ERRORHDL_STATION_ID_",
                            "ERRORHDL_DATE_",
                            "ERRORHDL_TYPE_",
                            "TRANS_MONEY_",
                            "USER_ID_",
                            "USER_PHONE_",
                            "CITY_CODE_",
                            "GEN_DATE_",
                            "LOGIN_TYPE_",
                            "PAY_ORDER_ID_",
                            "PAY_ORDER_STATUS_",
                            "APP_ID_",
                            "HIRE_STATION_NAME_",
                            "RESTORE_STATION_NAME_",
                            "ERRORHDL_STATION_NAME_",
                            "H_LOCK_TYPE_",
                            "R_LOCK_TYPE_",
                            "OPER_ID_",
                            "SERVICE_ID_",
                            "COUP_USED_ID_",
                            "BEF_DISCOUNT__MONEY_",
                            "COUP_TYPE_",
                            "RELET_ID_",
                            "CARD_USER_ID_",
                            "COORD_HIRE_",
                            "COORD_RESTORE_",
                            "SUPPLY_STATUS_",
                            "CARD_TYPE_",
                            "FREE_TIME_",
                            "TRIP_TIME_",
                            "TAC_",
                            "MAC2_",
                            "PSAM_NUM_",
                            "FLOW_NUM_",
                            "PASM_TRANS_NUMBER_",
                            "CPU_CARD_TRANS_DATA_",
                            "UPDATE_TIME_",
                            "HIRE_STATION_SERVICE_ID_",
                            "RESTORE_STATION_SERVICE_ID_",
                            "RETURN_BIKE_WAY_"
                        ],
                        "connection": [
                            {   
                                "jdbcUrl": [
                                    "jdbc:mysql://172.16.100.42:4000/bikeca?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
                                ],
                                "table": [
                                    "t_bike_ca_trip_old"
                                ]
                            }
                        ],
                        "password": "84^j1TZ-+2b9A@S7Hs",
                        "splitPk": "ID_"
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "type": "string",
                                "name": "ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "PART_ID_"
                            },
                            {
                                "type": "string",
                                "name": "BIKE_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "HIRE_BIKEPOS_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "HIRE_STATION_ID_"
                            },
                            {
                                "type": "string",
                                "name": "HIRE_DATE_"
                            },
                            {
                                "type": "bigint",
                                "name": "HIRE_STATUS_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "RESTORE_BIKEPOS_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "RESTORE_STATION_ID_"
                            },
                            {
                                "type": "string",
                                "name": "RESTORE_DATE_"
                            },
                            {
                                "type": "bigint",
                                "name": "ERRORHDL_BIKEPOS_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "ERRORHDL_STATION_ID_"
                            },
                            {
                                "type": "string",
                                "name": "ERRORHDL_DATE_"
                            },
                            {
                                "type": "string",
                                "name": "ERRORHDL_TYPE_"
                            },
                            {
                                "type": "string",
                                "name": "TRANS_MONEY_"
                            },
                            {
                                "type": "string",
                                "name": "USER_ID_"
                            },
                            {
                                "type": "string",
                                "name": "USER_PHONE_"
                            },
                            {
                                "type": "string",
                                "name": "CITY_CODE_"
                            },
                            {
                                "type": "string",
                                "name": "GEN_DATE_"
                            },
                            {
                                "type": "string",
                                "name": "LOGIN_TYPE_"
                            },
                            {
                                "type": "string",
                                "name": "PAY_ORDER_ID_"
                            },
                            {
                                "type": "string",
                                "name": "PAY_ORDER_STATUS_"
                            },
                            {
                                "type": "string",
                                "name": "APP_ID_"
                            },
                            {
                                "type": "string",
                                "name": "HIRE_STATION_NAME_"
                            },
                            {
                                "type": "string",
                                "name": "RESTORE_STATION_NAME_"
                            },
                            {
                                "type": "string",
                                "name": "ERRORHDL_STATION_NAME_"
                            },
                            {
                                "type": "string",
                                "name": "H_LOCK_TYPE_"
                            },
                            {
                                "type": "string",
                                "name": "R_LOCK_TYPE_"
                            },
                            {
                                "type": "string",
                                "name": "OPER_ID_"
                            },
                            {
                                "type": "string",
                                "name": "SERVICE_ID_"
                            },
                            {
                                "type": "string",
                                "name": "COUP_USED_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "BEF_DISCOUNT__MONEY_"
                            },
                            {
                                "type": "string",
                                "name": "COUP_TYPE_"
                            },
                            {
                                "type": "string",
                                "name": "RELET_ID_"
                            },
                            {
                                "type": "string",
                                "name": "CARD_USER_ID_"
                            },
                            {
                                "type": "string",
                                "name": "COORD_HIRE_"
                            },
                            {
                                "type": "string",
                                "name": "COORD_RESTORE_"
                            },
                            {
                                "type": "string",
                                "name": "SUPPLY_STATUS_"
                            },
                            {
                                "type": "string",
                                "name": "CARD_TYPE_"
                            },
                            {
                                "type": "string",
                                "name": "FREE_TIME_"
                            },
                            {
                                "type": "string",
                                "name": "TRIP_TIME_"
                            },
                            {
                                "type": "string",
                                "name": "TAC_"
                            },
                            {
                                "type": "string",
                                "name": "MAC2_"
                            },
                            {
                                "type": "string",
                                "name": "PSAM_NUM_"
                            },
                            {
                                "type": "string",
                                "name": "FLOW_NUM_"
                            },
                            {
                                "type": "string",
                                "name": "PASM_TRANS_NUMBER_"
                            },
                            {
                                "type": "string",
                                "name": "CPU_CARD_TRANS_DATA_"
                            },
                            {
                                "type": "string",
                                "name": "UPDATE_TIME_"
                            },
                            {
                                "type": "string",
                                "name": "HIRE_STATION_SERVICE_ID_"
                            },
                            {
                                "type": "string",
                                "name": "RESTORE_STATION_SERVICE_ID_"
                            },
                            {
                                "type": "string",
                                "name": "RETURN_BIKE_WAY_"
                            }
                        ],
                        "fieldDelimiter": "\t",
                        "fileType": "orc",
                        "defaultFS": "hdfs://kube-42:8020",
                        "compress": "SNAPPY",
                        "path": "/user/hive/warehouse/bikeca.db/t_bike_ca_trip_etl",
                        "fileName": "t_bike_ca_trip_etl_${date}",
                        "writeMode": "append",
                        "dateFormat": "yyyy-MM-dd",
                        "encoding": "UTF-8",
                        "nullFormat": "\\N",
                        "header": "false",
                        "footer": "false",
                        "mergeSmallFiles": "false",
                        "partition": [
                            {
                                "name": "dt",
                                "type": "string",
                                "format": "yyyyMMdd",
                                "value": "${date}"
                            }
                        ],
                        "hiveConf": [
                            {
                                "name": "hive.exec.dynamic.partition.mode",
                                "value": "nonstrict"
                            }
                        ],
                        "preSql": "",
                        "postSql": ""
                    }
                },
                "transformer": {
                    "name": "dt",
                    "parameter": {
                        "column": [
                            "ID_",
                            "PART_ID_",
                            "BIKE_ID_",
                            "HIRE_BIKEPOS_ID_",
                            "HIRE_STATION_ID_",
                            "HIRE_DATE_",
                            "HIRE_STATUS_ID_",
                            "RESTORE_BIKEPOS_ID_",
                            "RESTORE_STATION_ID_",
                            "RESTORE_DATE_",
                            "ERRORHDL_BIKEPOS_ID_",
                            "ERRORHDL_STATION_ID_",
                            "ERRORHDL_DATE_",
                            "ERRORHDL_TYPE_",
                            "TRANS_MONEY_",
                            "USER_ID_",
                            "USER_PHONE_",
                            "CITY_CODE_",
                            "GEN_DATE_",
                            "LOGIN_TYPE_",
                            "PAY_ORDER_ID_",
                            "PAY_ORDER_STATUS_",
                            "APP_ID_",
                            "HIRE_STATION_NAME_",
                            "RESTORE_STATION_NAME_",
                            "ERRORHDL_STATION_NAME_",
                            "H_LOCK_TYPE_",
                            "R_LOCK_TYPE_",
                            "OPER_ID_",
                            "SERVICE_ID_",
                            "COUP_USED_ID_",
                            "BEF_DISCOUNT__MONEY_",
                            "COUP_TYPE_",
                            "RELET_ID_",
                            "CARD_USER_ID_",
                            "COORD_HIRE_",
                            "COORD_RESTORE_",
                            "SUPPLY_STATUS_",
                            "CARD_TYPE_",
                            "FREE_TIME_",
                            "TRIP_TIME_",
                            "TAC_",
                            "MAC2_",
                            "PSAM_NUM_",
                            "FLOW_NUM_",
                            "PASM_TRANS_NUMBER_",
                            "CPU_CARD_TRANS_DATA_",
                            "UPDATE_TIME_",
                            "HIRE_STATION_SERVICE_ID_",
                            "RESTORE_STATION_SERVICE_ID_",
                            "RETURN_BIKE_WAY_"
                        ]
                    }
                }
            }
        ]
    }
}

CREATE TABLE `bikeca`.`t_bike_ca_trip_etl` (
    `id_` string  COMMENT 'UUID',
    `part_id_` int  COMMENT '分区标识',
    `bike_id_` string  COMMENT '车辆编号',
    `hire_bikepos_id_` int  COMMENT '租车墩位号',
    `hire_station_id_` int  COMMENT '租车网点编号',
    `hire_date_` string  COMMENT '租车时间',
    `hire_status_id_` int  COMMENT '租车状态编号',
    `restore_bikepos_id_` int  COMMENT '还车墩位号',
    `restore_station_id_` int  COMMENT '还车网点编号',
    `restore_date_` string  COMMENT '还车时间',
    `errorhdl_bikepos_id_` int  COMMENT '异常处理墩位号',
    `errorhdl_station_id_` int  COMMENT '异常处理网点编号',
    `errorhdl_date_` string  COMMENT '异常处理时间',
    `errorhdl_type_` string  COMMENT '异常处理类型',
    `trans_money_` double  COMMENT '交易金额',
    `user_id_` string  COMMENT '用户ID',
    `user_phone_` string  COMMENT '电话号码',
    `city_code_` string  COMMENT '城市代码',
    `gen_date_` string  COMMENT '生成时间',
    `login_type_` string  COMMENT '登录类型: 8-安卓,11-iOS,12-微信,13-支付宝',
    `pay_order_id_` string  COMMENT '支付订单ID',
    `pay_order_status_` string  COMMENT '支付订单状态',
    `app_id_` string  COMMENT 'APPID',
    `hire_station_name_` string  COMMENT '租车网点',
    `restore_station_name_` string  COMMENT '还车网点',
    `errorhdl_station_name_` string  COMMENT '异常处理网点',
    `h_lock_type_` string  COMMENT '租车锁类型  1-有桩,3-C1,4-电动助力车,5-N2,6,永久助力车,7-N2S,8-N3,9-武汉电动车只能锁,10-N5',
    `r_lock_type_` string  COMMENT '租车锁类型  1-有桩,3-C1,4-电动助力车,5-N2,6,永久助力车,7-N2S,8-N3,9-武汉电动车只能锁,10-N5',
    `oper_id_` string  COMMENT '操作ID',
    `service_id_` string  COMMENT '服务ID',
    `coup_used_id_` string  COMMENT '优惠券领用id',
    `bef_discount__money_` int  COMMENT '折扣前支付金额',
    `coup_type_` string  COMMENT '卡劵类型',
    `relet_id_` string  COMMENT '续租记录id',
    `card_user_id_` string  COMMENT '用户月卡id',
    `coord_hire_` string COMMENT '租车坐标',
    `coord_restore_` string COMMENT '还车坐标',
    `supply_status_` string  COMMENT '补录状态标识 1-补录',
    `card_type_` string  COMMENT '实体卡卡类型',
    `free_time_` string  COMMENT '免费时长,单位秒',
    `trip_time_` string  COMMENT '行程时长,单位秒',
    `tac_` string  COMMENT '交易TAC码',
    `mac2_` string  COMMENT 'MAC2码',
    `psam_num_` string  COMMENT 'PSAM终端号',
    `flow_num_` string  COMMENT '卡发行流水号',
    `pasm_trans_number_` string  COMMENT 'PSAM 交易卡号',
    `cpu_card_trans_data_` string  COMMENT '实体卡消费数据',
    `update_time_` string  COMMENT '更新时间',
    `hire_station_service_id_` string  COMMENT '租车站点城市ID',
    `restore_station_service_id_` string  COMMENT '还车站点城市ID',
    `return_bike_way_` string  COMMENT '还车方式')
COMMENT '行程表'
partitioned by (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS '\N'
STORED AS ORC
tblproperties("orc.compress"="SNAPPY");

#! /bin/bash
echo "获取前一天的时间,时间格式为2020-10-21"
yesterday=`date -d yesterday +%Y-%m-%d`
today=`date +%Y-%m-%d`
today=`date "+%Y-%m-%d %H:%M:%S"`
echo "开始啦"
python /opt/module/datax/bin/datax.py -p "-Ddate=${yesterday}"  /opt/module/datax/job/mysql2hive.json

优化清洗数据,只取有价值的数据

CREATE TABLE `bikeca`.`t_bike_ca_trip_etl` (
    `id_` string  COMMENT 'UUID',
    `bike_id_` string  COMMENT '车辆编号',
    `hire_bikepos_id_` int  COMMENT '租车墩位号',
    `hire_station_id_` int  COMMENT '租车网点编号',
    `hire_date_` string  COMMENT '租车时间',
    `hire_status_id_` int  COMMENT '租车状态编号',
    `restore_bikepos_id_` int  COMMENT '还车墩位号',
    `restore_station_id_` int  COMMENT '还车网点编号',
    `restore_date_` string  COMMENT '还车时间',
    `errorhdl_bikepos_id_` int  COMMENT '异常处理墩位号',
    `errorhdl_station_id_` int  COMMENT '异常处理网点编号',
    `errorhdl_date_` string  COMMENT '异常处理时间',
    `errorhdl_type_` string  COMMENT '异常处理类型',
    `trans_money_` double  COMMENT '交易金额',
    `user_id_` string  COMMENT '用户ID',
    `user_phone_` string  COMMENT '电话号码',
    `city_code_` string  COMMENT '城市代码',
    `gen_date_` string  COMMENT '生成时间',
    `login_type_` string  COMMENT '登录类型: 8-安卓,11-iOS,12-微信,13-支付宝',
    `pay_order_id_` string  COMMENT '支付订单ID',
    `pay_order_status_` string  COMMENT '支付订单状态',
    `app_id_` string  COMMENT 'APPID',
    `hire_station_name_` string  COMMENT '租车网点',
    `restore_station_name_` string  COMMENT '还车网点',
    `errorhdl_station_name_` string  COMMENT '异常处理网点',
    `h_lock_type_` string  COMMENT '租车锁类型  1-有桩,3-C1,4-电动助力车,5-N2,6,永久助力车,7-N2S,8-N3,9-武汉电动车只能锁,10-N5',
    `oper_id_` string  COMMENT '操作ID',
    `service_id_` string  COMMENT '服务ID',
    `coup_used_id_` string  COMMENT '优惠券领用id',
    `bef_discount__money_` int  COMMENT '折扣前支付金额',
    `coup_type_` string  COMMENT '卡劵类型',
    `relet_id_` string  COMMENT '续租记录id',
    `card_user_id_` string  COMMENT '用户月卡id',
    `coord_hire_` string COMMENT '租车坐标',
    `coord_restore_` string COMMENT '还车坐标',
    `update_time_` string  COMMENT '更新时间',
    `hire_station_service_id_` string  COMMENT '租车站点城市ID',
    `restore_station_service_id_` string  COMMENT '还车站点城市ID',
    `return_bike_way_` string  COMMENT '还车方式')
COMMENT '行程表'
partitioned by (year string,month string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS '\N'
STORED AS ORC
tblproperties("orc.compress"="SNAPPY");

vim tidb2hive.json

{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 16777216
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte": 16777216,
                "batchSize": 2048
            },
            "errorLimit": {
                "percentage": 0.2
            }
        },
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "column": [
                            "ID_",
                            "BIKE_ID_",
                            "HIRE_BIKEPOS_ID_",
                            "HIRE_STATION_ID_",
                            "HIRE_DATE_",
                            "HIRE_STATUS_ID_",
                            "RESTORE_BIKEPOS_ID_",
                            "RESTORE_STATION_ID_",
                            "RESTORE_DATE_",
                            "ERRORHDL_BIKEPOS_ID_",
                            "ERRORHDL_STATION_ID_",
                            "ERRORHDL_DATE_",
                            "ERRORHDL_TYPE_",
                            "TRANS_MONEY_",
                            "USER_ID_",
                            "USER_PHONE_",
                            "CITY_CODE_",
                            "GEN_DATE_",
                            "LOGIN_TYPE_",
                            "PAY_ORDER_ID_",
                            "PAY_ORDER_STATUS_",
                            "APP_ID_",
                            "HIRE_STATION_NAME_",
                            "RESTORE_STATION_NAME_",
                            "ERRORHDL_STATION_NAME_",
                            "H_LOCK_TYPE_",
                            "OPER_ID_",
                            "SERVICE_ID_",
                            "COUP_USED_ID_",
                            "BEF_DISCOUNT__MONEY_",
                            "COUP_TYPE_",
                            "RELET_ID_",
                            "CARD_USER_ID_",
                            "COORD_HIRE_",
                            "COORD_RESTORE_",
                            "UPDATE_TIME_",
                            "HIRE_STATION_SERVICE_ID_",
                            "RESTORE_STATION_SERVICE_ID_",
                            "RETURN_BIKE_WAY_"
                        ],
                        "where": "GEN_DATE_ > '${date}' and GEN_DATE_ < '${date_to}'",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://172.16.100.42:4000/bikeca?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
                                ],
                                "table": [
                                    "t_bike_ca_trip_old"
                                ]
                            }
                        ],
                        "password": "84^j1TZ-+2b9A@S7Hs",
                        "splitPk": ""
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "type": "string",
                                "name": "ID_"
                            },
                            {
                                "type": "string",
                                "name": "BIKE_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "HIRE_BIKEPOS_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "HIRE_STATION_ID_"
                            },
                            {
                                "type": "string",
                                "name": "HIRE_DATE_"
                            },
                            {
                                "type": "bigint",
                                "name": "HIRE_STATUS_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "RESTORE_BIKEPOS_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "RESTORE_STATION_ID_"
                            },
                            {
                                "type": "string",
                                "name": "RESTORE_DATE_"
                            },
                            {
                                "type": "bigint",
                                "name": "ERRORHDL_BIKEPOS_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "ERRORHDL_STATION_ID_"
                            },
                            {
                                "type": "string",
                                "name": "ERRORHDL_DATE_"
                            },
                            {
                                "type": "string",
                                "name": "ERRORHDL_TYPE_"
                            },
                            {
                                "type": "string",
                                "name": "TRANS_MONEY_"
                            },
                            {
                                "type": "string",
                                "name": "USER_ID_"
                            },
                            {
                                "type": "string",
                                "name": "USER_PHONE_"
                            },
                            {
                                "type": "string",
                                "name": "CITY_CODE_"
                            },
                            {
                                "type": "string",
                                "name": "GEN_DATE_"
                            },
                            {
                                "type": "string",
                                "name": "LOGIN_TYPE_"
                            },
                            {
                                "type": "string",
                                "name": "PAY_ORDER_ID_"
                            },
                            {
                                "type": "string",
                                "name": "PAY_ORDER_STATUS_"
                            },
                            {
                                "type": "string",
                                "name": "APP_ID_"
                            },
                            {
                                "type": "string",
                                "name": "HIRE_STATION_NAME_"
                            },
                            {
                                "type": "string",
                                "name": "RESTORE_STATION_NAME_"
                            },
                            {
                                "type": "string",
                                "name": "ERRORHDL_STATION_NAME_"
                            },
                            {
                                "type": "string",
                                "name": "H_LOCK_TYPE_"
                            },
                            {
                                "type": "string",
                                "name": "OPER_ID_"
                            },
                            {
                                "type": "string",
                                "name": "SERVICE_ID_"
                            },
                            {
                                "type": "string",
                                "name": "COUP_USED_ID_"
                            },
                            {
                                "type": "bigint",
                                "name": "BEF_DISCOUNT__MONEY_"
                            },
                            {
                                "type": "string",
                                "name": "COUP_TYPE_"
                            },
                            {
                                "type": "string",
                                "name": "RELET_ID_"
                            },
                            {
                                "type": "string",
                                "name": "CARD_USER_ID_"
                            },
                            {
                                "type": "string",
                                "name": "COORD_HIRE_"
                            },
                            {
                                "type": "string",
                                "name": "COORD_RESTORE_"
                            },
                            {
                                "type": "string",
                                "name": "UPDATE_TIME_"
                            },
                            {
                                "type": "string",
                                "name": "HIRE_STATION_SERVICE_ID_"
                            },
                            {
                                "type": "string",
                                "name": "RESTORE_STATION_SERVICE_ID_"
                            },
                            {
                                "type": "string",
                                "name": "RETURN_BIKE_WAY_"
                            }
                        ],
                        "fieldDelimiter": "\t",
                        "fileType": "orc",
                        "defaultFS": "hdfs://kube-42:8020",
                        "compress": "SNAPPY",
                        "path": "/user/hive/warehouse/bikeca.db/t_bike_ca_trip_etl/year=${year}/month=${month}",
                        "fileName": "t_bike_ca_trip_etl_${date}",
                        "writeMode": "append",
                        "dateFormat": "yyyy-MM-dd HH:mm:ss",
                        "encoding": "UTF-8",
                        "nullFormat": "\\N",
                        "header": "false",
                        "footer": "false",
                        "mergeSmallFiles": "false",
                        "partition": [
                            {
                                "name": "year",
                                "type": "string",
                                "format": "yyyy",
                                "value": "${year}"
                            },
                            {
                                "name": "month",
                                "type": "string",
                                "format": "MM",
                                "value": "${month}"
                            },
                        ],
                        "hiveConf": [
                            {
                                "name": "hive.exec.dynamic.partition.mode",
                                "value": "nonstrict"
                            },
                            {
                                "name": "hive.exec.dynamic.partition",
                                "value": "true"
                            }
                        ]
                    }
                }
            }
        ]
    }
}

# 导入数据到hive
alter table bikeca.t_bike_ca_trip_etl add partition (year='2017',month='01');
"preSql": "alter table bikeca.t_bike_ca_trip_etl add partition (year=${year},month=${month})"

hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year=${year},month=${month});"
hive -e "alter table bikeca.t_bike_ca_trip_etl add partition (year=${year},month=${month});"
hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year='2017',month='02');"
hive -e "alter table bikeca.t_bike_ca_trip_etl add partition (year='2017',month='02');"

# 判断分区是否存在
SHOW PARTITIONS bikeca.t_bike_ca_trip_etl;

# shell 脚本全量分区导入数据到hive
#!/bin/bash
year=2017
month=03
b=$((`echo $month | bc` + 1))
#echo $b
# 判断b在是否在1-12之间
if [ $b -le 12 ] && [ $b -ge 1 ]; then
    date_to=$year-$(printf "%02d" $b)-01
    #echo $date_to
    echo  "正在导入 $year-$month 月份数据到hive"
    echo "python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to"  /opt/module/datax/job/import/tidb2hive.json"
    hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year='${year}',month='${month}');
             alter table bikeca.t_bike_ca_trip_etl add partition (year='${year}',month='${month}');"
    [ $? -eq 0 ] || (echo "上个命令执行失败,退出"; exit) && python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to"  /opt/module/datax/job/import/tidb2hive.json
elif [ $b -eq 13 ]; then
    date_to=$(($year + 1))-01-01
    echo $date_to
    echo  "正在导入 $year-$month 月份数据到hive"
    echo "python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to"  /opt/module/datax/job/import/tidb2hive.json >> /opt/module/datax/job/import/logs/tidb2hive_$year-$month.log 2>&1"
    hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year='${year}',month='${month}');
             alter table bikeca.t_bike_ca_trip_etl add partition (year='${year}',month='${month}');"
    [ $? -eq 0 ] || (echo "上个命令执行失败,退出"; exit) && python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to"  /opt/module/datax/job/import/tidb2hive.json >> /opt/module/datax/job/import/logs/tidb2hive_$year-$month.log 2>&1
else
    echo "月份错误"
fi

脚本优化 全量分区导入数据到hive

#!/bin/bash
year=$1
month=$2
# year=2017
# month=03

# 判断参数是否为空
if [ -z "$year" ] || [ -z "$month" ]; then
    echo "请输入年份和月份"
    exit 1
fi

# 判断参数是否为数字
if ! [[ "$year" =~ ^[0-9]+$ ]] || ! [[ "$month" =~ ^[0-9]+$ ]]; then
    echo "年份和月份必须是数字"
    exit 1
fi

# 判断年份四位数字月份两位数字
if [ ${#year} -ne 4 ] || [ ${#month} -ne 2 ]; then
    echo "年份必须是四位数字,月份必须是两位数字"
    exit 1
fi

b=$((`echo $month | bc` + 1))
# echo $b
# 判断b在是否在1-12之间
if [ $b -le 12 ] && [ $b -ge 1 ]; then
    date_to=$year-$(printf "%02d" $b)-01
    # echo $date_to
    echo  "正在导入 $year-$month 月份数据到hive"
    echo "python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to"  /opt/module/datax/job/import/tidb2hive.json"
    hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year='${year}',month='${month}');
             alter table bikeca.t_bike_ca_trip_etl add partition (year='${year}',month='${month}');"
    [ $? -eq 0 ] || (echo "上个命令执行失败,退出"; exit) && python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to"  /opt/module/datax/job/import/tidb2hive.json
elif [ $b -eq 13 ]; then
    date_to=$(($year + 1))-01-01
    # echo $date_to
    echo  "正在导入 $year-$month 月份数据到hive"
    echo "python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to"  /opt/module/datax/job/import/tidb2hive.json"
    hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year='${year}',month='${month}');
             alter table bikeca.t_bike_ca_trip_etl add partition (year='${year}',month='${month}');"
    [ $? -eq 0 ] || (echo "上个命令执行失败,退出"; exit) && python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to"  /opt/module/datax/job/import/tidb2hive.json
else
    echo "月份错误"
fi

## 批量执行
vim full_to_hive_2019.sh
#!/bin/bash
/opt/module/datax/job/import/tidb2hive.sh 2019 01
/opt/module/datax/job/import/tidb2hive.sh 2019 02
/opt/module/datax/job/import/tidb2hive.sh 2019 03
/opt/module/datax/job/import/tidb2hive.sh 2019 04
/opt/module/datax/job/import/tidb2hive.sh 2019 05
/opt/module/datax/job/import/tidb2hive.sh 2019 06
/opt/module/datax/job/import/tidb2hive.sh 2019 07
/opt/module/datax/job/import/tidb2hive.sh 2019 08
/opt/module/datax/job/import/tidb2hive.sh 2019 09
/opt/module/datax/job/import/tidb2hive.sh 2019 10
/opt/module/datax/job/import/tidb2hive.sh 2019 11
/opt/module/datax/job/import/tidb2hive.sh 2019 12

bash full_to_hive_2019.sh

参考-增量分区导入数据到hive

# sqoop 增量分区参考
# 变量设置,之后应该是传入参数

mdb='bikeca'
hdb='bikeca'
table='t_bike_ca_trip_etl'
check_col='HIRE_DATE_'
ds='2017-01-01'

# 1.判断hive中是否有分区表

hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > tmp1.txt
a=`cat tmp1.txt`

# 2.判断时间戳的位数
tf=`cat ${mdb}.${table}.timestamp`
if [ -n "${tf}" ]; then
    l=${#tf}
    echo "时间戳长度文件存在 长度为:" ${l}
else
    echo "时间戳长度文件不存在,需创建"
    mysql -uroot -p'84^j1TZ-+2b9A@S7Hs' -h172.16.100.42 -P4000 -A ${mdb}  -e "select max(${check_col}) from ${mdb}.${table} where ${check_col} >unix_timestamp(date_sub('${ds}',interval 30 day))" |awk 'NR>1' >${mdb}.${table}.timestamp
    tf=`cat ${mdb}.${table}.timestamp`
    l=${#m1}
    echo "时间戳长度为:" ${l}
fi

# test
# mysql -u root -p'84^j1TZ-+2b9A@S7Hs' -h172.16.100.42 -P4000 -A bikeca  -e "select max(HIRE_DATE_) from bikeca.t_bike_ca_trip where HIRE_DATE_ >unix_timestamp(date_sub('2017-01-01',interval 30 day))" |awk 'NR>1' >bikeca.t_bike_ca_trip.timestamp

# 编写语句

if [[ ! -n "$a" && l -eq 13 ]]; then
  echo "全量导入"
  #录入全量导入的代码
  hive -e "drop table if exists ${hdb}.${table}"
  sqoop import --connect jdbc:mysql://172.16.100.42:4000/${mdb}?tinyInt1isBit=false --username root --password '84^j1TZ-+2b9A@S7Hs' --table ${table}   --compression-codec=snappy     --as-parquetfile  -m 2 --hive-import --hive-overwrite     --hive-database ${hdb}

  hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > tmp1.txt
  b=`cat tmp1.txt| wc -l`

  hive -e "show create table ${hdb}.${table};" >1 
  c=`head -$[${b}+1] 1`
  # 把MySQL查询的固定字段拿出来
  hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > ${hdb}.${table}.columns

  sed -i '2,500s/^/,/' ${hdb}.${table}.columns

  hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_tmp;"

  hive -e "${c} partitioned by (ds string) stored as parquet;"
  hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table};"

  hive -e "set hive.exec.dynamic.partition.mode=nonstrict;
           set hive.exec.dynamic.partition=true;
           set hive.exec.max.dynamic.partitions.pernode=10000;
           set hive.exec.max.dynamic.partitions=20000;
           set hive.support.quoted.identifiers=none;
           INSERT OVERWRITE TABLE ${hdb}.${table} partition(ds) 
           select *,to_date(cast(${check_col}/1000 as timestamp)) from ${hdb}.${table}_tmp;"

  hive -e "drop table ${hdb}.${table}_tmp;"

elif [[ -n "$a" && l -eq 13 ]]; then
  echo "增量导入,有表结构,历史有数据,本分区有数据"

  hive -e "alter table ${hdb}.${table} drop partition (ds='${ds}');"
  hive -e "alter table ${hdb}.${table} add partition (ds='${ds}');"

  hive -e "select unix_timestamp(concat('${ds}',' 00:00:00'));" >tmp2.txt
  j1=`head -1 tmp2.txt`
  j=$[$j1*1000]
  f=`cat ${hdb}.${table}.columns`
  # 大于等于这个分区的最小值 小于等于这个分区的最大值
  CONDITIONS=''

  g="select ${f} from ${table}"
  sqoop import --connect jdbc:mysql://172.16.100.42:4000/${mdb}?tinyInt1isBit=false --username root --password '84^j1TZ-+2b9A@S7Hs' --query " ${g} where ${check_col} > unix_timestamp('${ds}')*1000 and ${check_col} <= unix_timestamp(date_add('${ds}',interval 1 day))*1000 and \$CONDITIONS" --compression-codec=snappy --as-parquetfile --target-dir /user/hive/warehouse/${hdb}.db/${table}/ds=${ds} --incremental append --check-column ${check_col} --last-value ${j} -m 1

elif [[ ! -n "$a" && l -eq 10 ]]; then
  echo "全量导入"
  #录入全量导入的代码
  hive -e "drop table if exists ${hdb}.${table}"
  sqoop import --connect jdbc:mysql://172.16.100.42:4000/${mdb}?tinyInt1isBit=false --username root --password '84^j1TZ-+2b9A@S7Hs' --table ${table}   --compression-codec=snappy     --as-parquetfile  -m 2 --hive-import --hive-overwrite     --hive-database ${hdb}

  hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > tmp1.txt
  b=`cat tmp1.txt| wc -l`

  hive -e "show create table ${hdb}.${table};" >1 
  c=`head -$[${b}+1] 1`
  # 把MySQL查询的固定字段拿出来
  hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > ${hdb}.${table}.columns

  sed -i '2,500s/^/,/' ${hdb}.${table}.columns

  hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_tmp;"

  hive -e "${c} partitioned by (ds string) stored as parquet;"
  hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table};"

  hive -e "set hive.exec.dynamic.partition.mode=nonstrict;
           set hive.exec.dynamic.partition=true;
           set hive.exec.max.dynamic.partitions.pernode=10000;
           set hive.exec.max.dynamic.partitions=20000;
           set hive.support.quoted.identifiers=none;
           INSERT OVERWRITE TABLE ${hdb}.${table} partition(ds) 
           select *,to_date(cast(${check_col}*1.0 as timestamp)) from ${hdb}.${table}_tmp;"

  hive -e "drop table ${hdb}.${table}_tmp;"

elif [[ -n "$a" && l -eq 10 ]]; then
  echo "增量导入,有表结构,历史有数据,本分区有数据"

  hive -e "alter table ${hdb}.${table} drop partition (ds='${ds}');"
  hive -e "alter table ${hdb}.${table} add partition (ds='${ds}');"

  hive -e "select unix_timestamp(concat('${ds}',' 00:00:00'));" >tmp2.txt
  j=`head -1 tmp2.txt`
  f=`cat ${hdb}.${table}.columns`
  # 大于等于这个分区的最小值 小于等于这个分区的最大值
  CONDITIONS=''

  g="select ${f} from ${table}"
  sqoop import --connect jdbc:mysql://172.16.100.42:4000/${mdb}?tinyInt1isBit=false --username root --password '84^j1TZ-+2b9A@S7Hs' --query " ${g} where ${check_col} > unix_timestamp('${ds}') and ${check_col} <= unix_timestamp(date_add('${ds}',interval 1 day)) and \$CONDITIONS" --compression-codec=snappy --as-parquetfile --target-dir /user/hive/warehouse/${hdb}.db/${table}/ds=${ds} --incremental append --check-column ${check_col} --last-value ${j} -m 1

else
  echo "其他异常"
fi