数据导入清洗
抽取(Extract)、转换(Transform)、加载(Load)
DataX 分区导入参考,原始数据
vim mysql2hive.json
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 2097152
}
}
}
},
"job": {
"setting": {
"speed": {
// "byte": 10485760
"byte": 16777216,
"batchSize": 2048
},
"errorLimit": {
"percentage": 0.2
}
},
"content": [
{
"reader": {
"parameter": {
"username": "root",
"column": [
"ID_",
"PART_ID_",
"BIKE_ID_",
"HIRE_BIKEPOS_ID_",
"HIRE_STATION_ID_",
"HIRE_DATE_",
"HIRE_STATUS_ID_",
"RESTORE_BIKEPOS_ID_",
"RESTORE_STATION_ID_",
"RESTORE_DATE_",
"ERRORHDL_BIKEPOS_ID_",
"ERRORHDL_STATION_ID_",
"ERRORHDL_DATE_",
"ERRORHDL_TYPE_",
"TRANS_MONEY_",
"USER_ID_",
"USER_PHONE_",
"CITY_CODE_",
"GEN_DATE_",
"LOGIN_TYPE_",
"PAY_ORDER_ID_",
"PAY_ORDER_STATUS_",
"APP_ID_",
"HIRE_STATION_NAME_",
"RESTORE_STATION_NAME_",
"ERRORHDL_STATION_NAME_",
"H_LOCK_TYPE_",
"R_LOCK_TYPE_",
"OPER_ID_",
"SERVICE_ID_",
"COUP_USED_ID_",
"BEF_DISCOUNT__MONEY_",
"COUP_TYPE_",
"RELET_ID_",
"CARD_USER_ID_",
"COORD_HIRE_",
"COORD_RESTORE_",
"SUPPLY_STATUS_",
"CARD_TYPE_",
"FREE_TIME_",
"TRIP_TIME_",
"TAC_",
"MAC2_",
"PSAM_NUM_",
"FLOW_NUM_",
"PASM_TRANS_NUMBER_",
"CPU_CARD_TRANS_DATA_",
"UPDATE_TIME_",
"HIRE_STATION_SERVICE_ID_",
"RESTORE_STATION_SERVICE_ID_",
"RETURN_BIKE_WAY_"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://172.16.100.42:4000/bikeca?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
],
"table": [
"t_bike_ca_trip_old"
]
}
],
"password": "84^j1TZ-+2b9A@S7Hs",
"splitPk": "ID_"
},
"name": "mysqlreader"
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"type": "string",
"name": "ID_"
},
{
"type": "bigint",
"name": "PART_ID_"
},
{
"type": "string",
"name": "BIKE_ID_"
},
{
"type": "bigint",
"name": "HIRE_BIKEPOS_ID_"
},
{
"type": "bigint",
"name": "HIRE_STATION_ID_"
},
{
"type": "string",
"name": "HIRE_DATE_"
},
{
"type": "bigint",
"name": "HIRE_STATUS_ID_"
},
{
"type": "bigint",
"name": "RESTORE_BIKEPOS_ID_"
},
{
"type": "bigint",
"name": "RESTORE_STATION_ID_"
},
{
"type": "string",
"name": "RESTORE_DATE_"
},
{
"type": "bigint",
"name": "ERRORHDL_BIKEPOS_ID_"
},
{
"type": "bigint",
"name": "ERRORHDL_STATION_ID_"
},
{
"type": "string",
"name": "ERRORHDL_DATE_"
},
{
"type": "string",
"name": "ERRORHDL_TYPE_"
},
{
"type": "string",
"name": "TRANS_MONEY_"
},
{
"type": "string",
"name": "USER_ID_"
},
{
"type": "string",
"name": "USER_PHONE_"
},
{
"type": "string",
"name": "CITY_CODE_"
},
{
"type": "string",
"name": "GEN_DATE_"
},
{
"type": "string",
"name": "LOGIN_TYPE_"
},
{
"type": "string",
"name": "PAY_ORDER_ID_"
},
{
"type": "string",
"name": "PAY_ORDER_STATUS_"
},
{
"type": "string",
"name": "APP_ID_"
},
{
"type": "string",
"name": "HIRE_STATION_NAME_"
},
{
"type": "string",
"name": "RESTORE_STATION_NAME_"
},
{
"type": "string",
"name": "ERRORHDL_STATION_NAME_"
},
{
"type": "string",
"name": "H_LOCK_TYPE_"
},
{
"type": "string",
"name": "R_LOCK_TYPE_"
},
{
"type": "string",
"name": "OPER_ID_"
},
{
"type": "string",
"name": "SERVICE_ID_"
},
{
"type": "string",
"name": "COUP_USED_ID_"
},
{
"type": "bigint",
"name": "BEF_DISCOUNT__MONEY_"
},
{
"type": "string",
"name": "COUP_TYPE_"
},
{
"type": "string",
"name": "RELET_ID_"
},
{
"type": "string",
"name": "CARD_USER_ID_"
},
{
"type": "string",
"name": "COORD_HIRE_"
},
{
"type": "string",
"name": "COORD_RESTORE_"
},
{
"type": "string",
"name": "SUPPLY_STATUS_"
},
{
"type": "string",
"name": "CARD_TYPE_"
},
{
"type": "string",
"name": "FREE_TIME_"
},
{
"type": "string",
"name": "TRIP_TIME_"
},
{
"type": "string",
"name": "TAC_"
},
{
"type": "string",
"name": "MAC2_"
},
{
"type": "string",
"name": "PSAM_NUM_"
},
{
"type": "string",
"name": "FLOW_NUM_"
},
{
"type": "string",
"name": "PASM_TRANS_NUMBER_"
},
{
"type": "string",
"name": "CPU_CARD_TRANS_DATA_"
},
{
"type": "string",
"name": "UPDATE_TIME_"
},
{
"type": "string",
"name": "HIRE_STATION_SERVICE_ID_"
},
{
"type": "string",
"name": "RESTORE_STATION_SERVICE_ID_"
},
{
"type": "string",
"name": "RETURN_BIKE_WAY_"
}
],
"fieldDelimiter": "\t",
"fileType": "orc",
"defaultFS": "hdfs://kube-42:8020",
"compress": "SNAPPY",
"path": "/user/hive/warehouse/bikeca.db/t_bike_ca_trip_etl",
"fileName": "t_bike_ca_trip_etl_${date}",
"writeMode": "append",
"dateFormat": "yyyy-MM-dd",
"encoding": "UTF-8",
"nullFormat": "\\N",
"header": "false",
"footer": "false",
"mergeSmallFiles": "false",
"partition": [
{
"name": "dt",
"type": "string",
"format": "yyyyMMdd",
"value": "${date}"
}
],
"hiveConf": [
{
"name": "hive.exec.dynamic.partition.mode",
"value": "nonstrict"
}
],
"preSql": "",
"postSql": ""
}
},
"transformer": {
"name": "dt",
"parameter": {
"column": [
"ID_",
"PART_ID_",
"BIKE_ID_",
"HIRE_BIKEPOS_ID_",
"HIRE_STATION_ID_",
"HIRE_DATE_",
"HIRE_STATUS_ID_",
"RESTORE_BIKEPOS_ID_",
"RESTORE_STATION_ID_",
"RESTORE_DATE_",
"ERRORHDL_BIKEPOS_ID_",
"ERRORHDL_STATION_ID_",
"ERRORHDL_DATE_",
"ERRORHDL_TYPE_",
"TRANS_MONEY_",
"USER_ID_",
"USER_PHONE_",
"CITY_CODE_",
"GEN_DATE_",
"LOGIN_TYPE_",
"PAY_ORDER_ID_",
"PAY_ORDER_STATUS_",
"APP_ID_",
"HIRE_STATION_NAME_",
"RESTORE_STATION_NAME_",
"ERRORHDL_STATION_NAME_",
"H_LOCK_TYPE_",
"R_LOCK_TYPE_",
"OPER_ID_",
"SERVICE_ID_",
"COUP_USED_ID_",
"BEF_DISCOUNT__MONEY_",
"COUP_TYPE_",
"RELET_ID_",
"CARD_USER_ID_",
"COORD_HIRE_",
"COORD_RESTORE_",
"SUPPLY_STATUS_",
"CARD_TYPE_",
"FREE_TIME_",
"TRIP_TIME_",
"TAC_",
"MAC2_",
"PSAM_NUM_",
"FLOW_NUM_",
"PASM_TRANS_NUMBER_",
"CPU_CARD_TRANS_DATA_",
"UPDATE_TIME_",
"HIRE_STATION_SERVICE_ID_",
"RESTORE_STATION_SERVICE_ID_",
"RETURN_BIKE_WAY_"
]
}
}
}
]
}
}
CREATE TABLE `bikeca`.`t_bike_ca_trip_etl` (
`id_` string COMMENT 'UUID',
`part_id_` int COMMENT '分区标识',
`bike_id_` string COMMENT '车辆编号',
`hire_bikepos_id_` int COMMENT '租车墩位号',
`hire_station_id_` int COMMENT '租车网点编号',
`hire_date_` string COMMENT '租车时间',
`hire_status_id_` int COMMENT '租车状态编号',
`restore_bikepos_id_` int COMMENT '还车墩位号',
`restore_station_id_` int COMMENT '还车网点编号',
`restore_date_` string COMMENT '还车时间',
`errorhdl_bikepos_id_` int COMMENT '异常处理墩位号',
`errorhdl_station_id_` int COMMENT '异常处理网点编号',
`errorhdl_date_` string COMMENT '异常处理时间',
`errorhdl_type_` string COMMENT '异常处理类型',
`trans_money_` double COMMENT '交易金额',
`user_id_` string COMMENT '用户ID',
`user_phone_` string COMMENT '电话号码',
`city_code_` string COMMENT '城市代码',
`gen_date_` string COMMENT '生成时间',
`login_type_` string COMMENT '登录类型: 8-安卓,11-iOS,12-微信,13-支付宝',
`pay_order_id_` string COMMENT '支付订单ID',
`pay_order_status_` string COMMENT '支付订单状态',
`app_id_` string COMMENT 'APPID',
`hire_station_name_` string COMMENT '租车网点',
`restore_station_name_` string COMMENT '还车网点',
`errorhdl_station_name_` string COMMENT '异常处理网点',
`h_lock_type_` string COMMENT '租车锁类型 1-有桩,3-C1,4-电动助力车,5-N2,6,永久助力车,7-N2S,8-N3,9-武汉电动车只能锁,10-N5',
`r_lock_type_` string COMMENT '租车锁类型 1-有桩,3-C1,4-电动助力车,5-N2,6,永久助力车,7-N2S,8-N3,9-武汉电动车只能锁,10-N5',
`oper_id_` string COMMENT '操作ID',
`service_id_` string COMMENT '服务ID',
`coup_used_id_` string COMMENT '优惠券领用id',
`bef_discount__money_` int COMMENT '折扣前支付金额',
`coup_type_` string COMMENT '卡劵类型',
`relet_id_` string COMMENT '续租记录id',
`card_user_id_` string COMMENT '用户月卡id',
`coord_hire_` string COMMENT '租车坐标',
`coord_restore_` string COMMENT '还车坐标',
`supply_status_` string COMMENT '补录状态标识 1-补录',
`card_type_` string COMMENT '实体卡卡类型',
`free_time_` string COMMENT '免费时长,单位秒',
`trip_time_` string COMMENT '行程时长,单位秒',
`tac_` string COMMENT '交易TAC码',
`mac2_` string COMMENT 'MAC2码',
`psam_num_` string COMMENT 'PSAM终端号',
`flow_num_` string COMMENT '卡发行流水号',
`pasm_trans_number_` string COMMENT 'PSAM 交易卡号',
`cpu_card_trans_data_` string COMMENT '实体卡消费数据',
`update_time_` string COMMENT '更新时间',
`hire_station_service_id_` string COMMENT '租车站点城市ID',
`restore_station_service_id_` string COMMENT '还车站点城市ID',
`return_bike_way_` string COMMENT '还车方式')
COMMENT '行程表'
partitioned by (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS '\N'
STORED AS ORC
tblproperties("orc.compress"="SNAPPY");
#! /bin/bash
echo "获取前一天的时间,时间格式为2020-10-21"
yesterday=`date -d yesterday +%Y-%m-%d`
today=`date +%Y-%m-%d`
today=`date "+%Y-%m-%d %H:%M:%S"`
echo "开始啦"
python /opt/module/datax/bin/datax.py -p "-Ddate=${yesterday}" /opt/module/datax/job/mysql2hive.json
优化清洗数据,只取有价值的数据
CREATE TABLE `bikeca`.`t_bike_ca_trip_etl` (
`id_` string COMMENT 'UUID',
`bike_id_` string COMMENT '车辆编号',
`hire_bikepos_id_` int COMMENT '租车墩位号',
`hire_station_id_` int COMMENT '租车网点编号',
`hire_date_` string COMMENT '租车时间',
`hire_status_id_` int COMMENT '租车状态编号',
`restore_bikepos_id_` int COMMENT '还车墩位号',
`restore_station_id_` int COMMENT '还车网点编号',
`restore_date_` string COMMENT '还车时间',
`errorhdl_bikepos_id_` int COMMENT '异常处理墩位号',
`errorhdl_station_id_` int COMMENT '异常处理网点编号',
`errorhdl_date_` string COMMENT '异常处理时间',
`errorhdl_type_` string COMMENT '异常处理类型',
`trans_money_` double COMMENT '交易金额',
`user_id_` string COMMENT '用户ID',
`user_phone_` string COMMENT '电话号码',
`city_code_` string COMMENT '城市代码',
`gen_date_` string COMMENT '生成时间',
`login_type_` string COMMENT '登录类型: 8-安卓,11-iOS,12-微信,13-支付宝',
`pay_order_id_` string COMMENT '支付订单ID',
`pay_order_status_` string COMMENT '支付订单状态',
`app_id_` string COMMENT 'APPID',
`hire_station_name_` string COMMENT '租车网点',
`restore_station_name_` string COMMENT '还车网点',
`errorhdl_station_name_` string COMMENT '异常处理网点',
`h_lock_type_` string COMMENT '租车锁类型 1-有桩,3-C1,4-电动助力车,5-N2,6,永久助力车,7-N2S,8-N3,9-武汉电动车只能锁,10-N5',
`oper_id_` string COMMENT '操作ID',
`service_id_` string COMMENT '服务ID',
`coup_used_id_` string COMMENT '优惠券领用id',
`bef_discount__money_` int COMMENT '折扣前支付金额',
`coup_type_` string COMMENT '卡劵类型',
`relet_id_` string COMMENT '续租记录id',
`card_user_id_` string COMMENT '用户月卡id',
`coord_hire_` string COMMENT '租车坐标',
`coord_restore_` string COMMENT '还车坐标',
`update_time_` string COMMENT '更新时间',
`hire_station_service_id_` string COMMENT '租车站点城市ID',
`restore_station_service_id_` string COMMENT '还车站点城市ID',
`return_bike_way_` string COMMENT '还车方式')
COMMENT '行程表'
partitioned by (year string,month string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS '\N'
STORED AS ORC
tblproperties("orc.compress"="SNAPPY");
vim tidb2hive.json
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 16777216
}
}
}
},
"job": {
"setting": {
"speed": {
"byte": 16777216,
"batchSize": 2048
},
"errorLimit": {
"percentage": 0.2
}
},
"content": [
{
"reader": {
"parameter": {
"username": "root",
"column": [
"ID_",
"BIKE_ID_",
"HIRE_BIKEPOS_ID_",
"HIRE_STATION_ID_",
"HIRE_DATE_",
"HIRE_STATUS_ID_",
"RESTORE_BIKEPOS_ID_",
"RESTORE_STATION_ID_",
"RESTORE_DATE_",
"ERRORHDL_BIKEPOS_ID_",
"ERRORHDL_STATION_ID_",
"ERRORHDL_DATE_",
"ERRORHDL_TYPE_",
"TRANS_MONEY_",
"USER_ID_",
"USER_PHONE_",
"CITY_CODE_",
"GEN_DATE_",
"LOGIN_TYPE_",
"PAY_ORDER_ID_",
"PAY_ORDER_STATUS_",
"APP_ID_",
"HIRE_STATION_NAME_",
"RESTORE_STATION_NAME_",
"ERRORHDL_STATION_NAME_",
"H_LOCK_TYPE_",
"OPER_ID_",
"SERVICE_ID_",
"COUP_USED_ID_",
"BEF_DISCOUNT__MONEY_",
"COUP_TYPE_",
"RELET_ID_",
"CARD_USER_ID_",
"COORD_HIRE_",
"COORD_RESTORE_",
"UPDATE_TIME_",
"HIRE_STATION_SERVICE_ID_",
"RESTORE_STATION_SERVICE_ID_",
"RETURN_BIKE_WAY_"
],
"where": "GEN_DATE_ > '${date}' and GEN_DATE_ < '${date_to}'",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://172.16.100.42:4000/bikeca?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
],
"table": [
"t_bike_ca_trip_old"
]
}
],
"password": "84^j1TZ-+2b9A@S7Hs",
"splitPk": ""
},
"name": "mysqlreader"
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"type": "string",
"name": "ID_"
},
{
"type": "string",
"name": "BIKE_ID_"
},
{
"type": "bigint",
"name": "HIRE_BIKEPOS_ID_"
},
{
"type": "bigint",
"name": "HIRE_STATION_ID_"
},
{
"type": "string",
"name": "HIRE_DATE_"
},
{
"type": "bigint",
"name": "HIRE_STATUS_ID_"
},
{
"type": "bigint",
"name": "RESTORE_BIKEPOS_ID_"
},
{
"type": "bigint",
"name": "RESTORE_STATION_ID_"
},
{
"type": "string",
"name": "RESTORE_DATE_"
},
{
"type": "bigint",
"name": "ERRORHDL_BIKEPOS_ID_"
},
{
"type": "bigint",
"name": "ERRORHDL_STATION_ID_"
},
{
"type": "string",
"name": "ERRORHDL_DATE_"
},
{
"type": "string",
"name": "ERRORHDL_TYPE_"
},
{
"type": "string",
"name": "TRANS_MONEY_"
},
{
"type": "string",
"name": "USER_ID_"
},
{
"type": "string",
"name": "USER_PHONE_"
},
{
"type": "string",
"name": "CITY_CODE_"
},
{
"type": "string",
"name": "GEN_DATE_"
},
{
"type": "string",
"name": "LOGIN_TYPE_"
},
{
"type": "string",
"name": "PAY_ORDER_ID_"
},
{
"type": "string",
"name": "PAY_ORDER_STATUS_"
},
{
"type": "string",
"name": "APP_ID_"
},
{
"type": "string",
"name": "HIRE_STATION_NAME_"
},
{
"type": "string",
"name": "RESTORE_STATION_NAME_"
},
{
"type": "string",
"name": "ERRORHDL_STATION_NAME_"
},
{
"type": "string",
"name": "H_LOCK_TYPE_"
},
{
"type": "string",
"name": "OPER_ID_"
},
{
"type": "string",
"name": "SERVICE_ID_"
},
{
"type": "string",
"name": "COUP_USED_ID_"
},
{
"type": "bigint",
"name": "BEF_DISCOUNT__MONEY_"
},
{
"type": "string",
"name": "COUP_TYPE_"
},
{
"type": "string",
"name": "RELET_ID_"
},
{
"type": "string",
"name": "CARD_USER_ID_"
},
{
"type": "string",
"name": "COORD_HIRE_"
},
{
"type": "string",
"name": "COORD_RESTORE_"
},
{
"type": "string",
"name": "UPDATE_TIME_"
},
{
"type": "string",
"name": "HIRE_STATION_SERVICE_ID_"
},
{
"type": "string",
"name": "RESTORE_STATION_SERVICE_ID_"
},
{
"type": "string",
"name": "RETURN_BIKE_WAY_"
}
],
"fieldDelimiter": "\t",
"fileType": "orc",
"defaultFS": "hdfs://kube-42:8020",
"compress": "SNAPPY",
"path": "/user/hive/warehouse/bikeca.db/t_bike_ca_trip_etl/year=${year}/month=${month}",
"fileName": "t_bike_ca_trip_etl_${date}",
"writeMode": "append",
"dateFormat": "yyyy-MM-dd HH:mm:ss",
"encoding": "UTF-8",
"nullFormat": "\\N",
"header": "false",
"footer": "false",
"mergeSmallFiles": "false",
"partition": [
{
"name": "year",
"type": "string",
"format": "yyyy",
"value": "${year}"
},
{
"name": "month",
"type": "string",
"format": "MM",
"value": "${month}"
},
],
"hiveConf": [
{
"name": "hive.exec.dynamic.partition.mode",
"value": "nonstrict"
},
{
"name": "hive.exec.dynamic.partition",
"value": "true"
}
]
}
}
}
]
}
}
# 导入数据到hive
alter table bikeca.t_bike_ca_trip_etl add partition (year='2017',month='01');
"preSql": "alter table bikeca.t_bike_ca_trip_etl add partition (year=${year},month=${month})"
hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year=${year},month=${month});"
hive -e "alter table bikeca.t_bike_ca_trip_etl add partition (year=${year},month=${month});"
hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year='2017',month='02');"
hive -e "alter table bikeca.t_bike_ca_trip_etl add partition (year='2017',month='02');"
# 判断分区是否存在
SHOW PARTITIONS bikeca.t_bike_ca_trip_etl;
# shell 脚本全量分区导入数据到hive
#!/bin/bash
year=2017
month=03
b=$((`echo $month | bc` + 1))
#echo $b
# 判断b在是否在1-12之间
if [ $b -le 12 ] && [ $b -ge 1 ]; then
date_to=$year-$(printf "%02d" $b)-01
#echo $date_to
echo "正在导入 $year-$month 月份数据到hive"
echo "python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to" /opt/module/datax/job/import/tidb2hive.json"
hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year='${year}',month='${month}');
alter table bikeca.t_bike_ca_trip_etl add partition (year='${year}',month='${month}');"
[ $? -eq 0 ] || (echo "上个命令执行失败,退出"; exit) && python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to" /opt/module/datax/job/import/tidb2hive.json
elif [ $b -eq 13 ]; then
date_to=$(($year + 1))-01-01
echo $date_to
echo "正在导入 $year-$month 月份数据到hive"
echo "python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to" /opt/module/datax/job/import/tidb2hive.json >> /opt/module/datax/job/import/logs/tidb2hive_$year-$month.log 2>&1"
hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year='${year}',month='${month}');
alter table bikeca.t_bike_ca_trip_etl add partition (year='${year}',month='${month}');"
[ $? -eq 0 ] || (echo "上个命令执行失败,退出"; exit) && python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to" /opt/module/datax/job/import/tidb2hive.json >> /opt/module/datax/job/import/logs/tidb2hive_$year-$month.log 2>&1
else
echo "月份错误"
fi
脚本优化 全量分区导入数据到hive
#!/bin/bash
year=$1
month=$2
# year=2017
# month=03
# 判断参数是否为空
if [ -z "$year" ] || [ -z "$month" ]; then
echo "请输入年份和月份"
exit 1
fi
# 判断参数是否为数字
if ! [[ "$year" =~ ^[0-9]+$ ]] || ! [[ "$month" =~ ^[0-9]+$ ]]; then
echo "年份和月份必须是数字"
exit 1
fi
# 判断年份四位数字月份两位数字
if [ ${#year} -ne 4 ] || [ ${#month} -ne 2 ]; then
echo "年份必须是四位数字,月份必须是两位数字"
exit 1
fi
b=$((`echo $month | bc` + 1))
# echo $b
# 判断b在是否在1-12之间
if [ $b -le 12 ] && [ $b -ge 1 ]; then
date_to=$year-$(printf "%02d" $b)-01
# echo $date_to
echo "正在导入 $year-$month 月份数据到hive"
echo "python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to" /opt/module/datax/job/import/tidb2hive.json"
hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year='${year}',month='${month}');
alter table bikeca.t_bike_ca_trip_etl add partition (year='${year}',month='${month}');"
[ $? -eq 0 ] || (echo "上个命令执行失败,退出"; exit) && python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to" /opt/module/datax/job/import/tidb2hive.json
elif [ $b -eq 13 ]; then
date_to=$(($year + 1))-01-01
# echo $date_to
echo "正在导入 $year-$month 月份数据到hive"
echo "python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to" /opt/module/datax/job/import/tidb2hive.json"
hive -e "alter table bikeca.t_bike_ca_trip_etl drop partition (year='${year}',month='${month}');
alter table bikeca.t_bike_ca_trip_etl add partition (year='${year}',month='${month}');"
[ $? -eq 0 ] || (echo "上个命令执行失败,退出"; exit) && python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" -p "-Dyear=$year -Dmonth=$month -Ddate=$year-$month-01 -Ddate_to=$date_to" /opt/module/datax/job/import/tidb2hive.json
else
echo "月份错误"
fi
## 批量执行
vim full_to_hive_2019.sh
#!/bin/bash
/opt/module/datax/job/import/tidb2hive.sh 2019 01
/opt/module/datax/job/import/tidb2hive.sh 2019 02
/opt/module/datax/job/import/tidb2hive.sh 2019 03
/opt/module/datax/job/import/tidb2hive.sh 2019 04
/opt/module/datax/job/import/tidb2hive.sh 2019 05
/opt/module/datax/job/import/tidb2hive.sh 2019 06
/opt/module/datax/job/import/tidb2hive.sh 2019 07
/opt/module/datax/job/import/tidb2hive.sh 2019 08
/opt/module/datax/job/import/tidb2hive.sh 2019 09
/opt/module/datax/job/import/tidb2hive.sh 2019 10
/opt/module/datax/job/import/tidb2hive.sh 2019 11
/opt/module/datax/job/import/tidb2hive.sh 2019 12
bash full_to_hive_2019.sh
参考-增量分区导入数据到hive
# sqoop 增量分区参考
# 变量设置,之后应该是传入参数
mdb='bikeca'
hdb='bikeca'
table='t_bike_ca_trip_etl'
check_col='HIRE_DATE_'
ds='2017-01-01'
# 1.判断hive中是否有分区表
hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > tmp1.txt
a=`cat tmp1.txt`
# 2.判断时间戳的位数
tf=`cat ${mdb}.${table}.timestamp`
if [ -n "${tf}" ]; then
l=${#tf}
echo "时间戳长度文件存在 长度为:" ${l}
else
echo "时间戳长度文件不存在,需创建"
mysql -uroot -p'84^j1TZ-+2b9A@S7Hs' -h172.16.100.42 -P4000 -A ${mdb} -e "select max(${check_col}) from ${mdb}.${table} where ${check_col} >unix_timestamp(date_sub('${ds}',interval 30 day))" |awk 'NR>1' >${mdb}.${table}.timestamp
tf=`cat ${mdb}.${table}.timestamp`
l=${#m1}
echo "时间戳长度为:" ${l}
fi
# test
# mysql -u root -p'84^j1TZ-+2b9A@S7Hs' -h172.16.100.42 -P4000 -A bikeca -e "select max(HIRE_DATE_) from bikeca.t_bike_ca_trip where HIRE_DATE_ >unix_timestamp(date_sub('2017-01-01',interval 30 day))" |awk 'NR>1' >bikeca.t_bike_ca_trip.timestamp
# 编写语句
if [[ ! -n "$a" && l -eq 13 ]]; then
echo "全量导入"
#录入全量导入的代码
hive -e "drop table if exists ${hdb}.${table}"
sqoop import --connect jdbc:mysql://172.16.100.42:4000/${mdb}?tinyInt1isBit=false --username root --password '84^j1TZ-+2b9A@S7Hs' --table ${table} --compression-codec=snappy --as-parquetfile -m 2 --hive-import --hive-overwrite --hive-database ${hdb}
hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > tmp1.txt
b=`cat tmp1.txt| wc -l`
hive -e "show create table ${hdb}.${table};" >1
c=`head -$[${b}+1] 1`
# 把MySQL查询的固定字段拿出来
hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > ${hdb}.${table}.columns
sed -i '2,500s/^/,/' ${hdb}.${table}.columns
hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_tmp;"
hive -e "${c} partitioned by (ds string) stored as parquet;"
hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table};"
hive -e "set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=20000;
set hive.support.quoted.identifiers=none;
INSERT OVERWRITE TABLE ${hdb}.${table} partition(ds)
select *,to_date(cast(${check_col}/1000 as timestamp)) from ${hdb}.${table}_tmp;"
hive -e "drop table ${hdb}.${table}_tmp;"
elif [[ -n "$a" && l -eq 13 ]]; then
echo "增量导入,有表结构,历史有数据,本分区有数据"
hive -e "alter table ${hdb}.${table} drop partition (ds='${ds}');"
hive -e "alter table ${hdb}.${table} add partition (ds='${ds}');"
hive -e "select unix_timestamp(concat('${ds}',' 00:00:00'));" >tmp2.txt
j1=`head -1 tmp2.txt`
j=$[$j1*1000]
f=`cat ${hdb}.${table}.columns`
# 大于等于这个分区的最小值 小于等于这个分区的最大值
CONDITIONS=''
g="select ${f} from ${table}"
sqoop import --connect jdbc:mysql://172.16.100.42:4000/${mdb}?tinyInt1isBit=false --username root --password '84^j1TZ-+2b9A@S7Hs' --query " ${g} where ${check_col} > unix_timestamp('${ds}')*1000 and ${check_col} <= unix_timestamp(date_add('${ds}',interval 1 day))*1000 and \$CONDITIONS" --compression-codec=snappy --as-parquetfile --target-dir /user/hive/warehouse/${hdb}.db/${table}/ds=${ds} --incremental append --check-column ${check_col} --last-value ${j} -m 1
elif [[ ! -n "$a" && l -eq 10 ]]; then
echo "全量导入"
#录入全量导入的代码
hive -e "drop table if exists ${hdb}.${table}"
sqoop import --connect jdbc:mysql://172.16.100.42:4000/${mdb}?tinyInt1isBit=false --username root --password '84^j1TZ-+2b9A@S7Hs' --table ${table} --compression-codec=snappy --as-parquetfile -m 2 --hive-import --hive-overwrite --hive-database ${hdb}
hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > tmp1.txt
b=`cat tmp1.txt| wc -l`
hive -e "show create table ${hdb}.${table};" >1
c=`head -$[${b}+1] 1`
# 把MySQL查询的固定字段拿出来
hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > ${hdb}.${table}.columns
sed -i '2,500s/^/,/' ${hdb}.${table}.columns
hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_tmp;"
hive -e "${c} partitioned by (ds string) stored as parquet;"
hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table};"
hive -e "set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=20000;
set hive.support.quoted.identifiers=none;
INSERT OVERWRITE TABLE ${hdb}.${table} partition(ds)
select *,to_date(cast(${check_col}*1.0 as timestamp)) from ${hdb}.${table}_tmp;"
hive -e "drop table ${hdb}.${table}_tmp;"
elif [[ -n "$a" && l -eq 10 ]]; then
echo "增量导入,有表结构,历史有数据,本分区有数据"
hive -e "alter table ${hdb}.${table} drop partition (ds='${ds}');"
hive -e "alter table ${hdb}.${table} add partition (ds='${ds}');"
hive -e "select unix_timestamp(concat('${ds}',' 00:00:00'));" >tmp2.txt
j=`head -1 tmp2.txt`
f=`cat ${hdb}.${table}.columns`
# 大于等于这个分区的最小值 小于等于这个分区的最大值
CONDITIONS=''
g="select ${f} from ${table}"
sqoop import --connect jdbc:mysql://172.16.100.42:4000/${mdb}?tinyInt1isBit=false --username root --password '84^j1TZ-+2b9A@S7Hs' --query " ${g} where ${check_col} > unix_timestamp('${ds}') and ${check_col} <= unix_timestamp(date_add('${ds}',interval 1 day)) and \$CONDITIONS" --compression-codec=snappy --as-parquetfile --target-dir /user/hive/warehouse/${hdb}.db/${table}/ds=${ds} --incremental append --check-column ${check_col} --last-value ${j} -m 1
else
echo "其他异常"
fi