DataX 批量处理
安装DataX
下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
源码地址:https://github.com/alibaba/DataX
wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
mkdir /opt/module -p
tar -zxvf datax.tar.gz -C /opt/module/
cd /opt/module/datax/bin/
python datax.py /opt/module/datax/job/job.json
rm -rf /opt/module/datax/plugin/*/._*
yum install MySQL-python -y
压缩比:ORC > Parquet > textFile(textfile没有进行压缩)
查询速度:三者几乎一致
如果为textfile的文件格式,直接load就OK,不需要走MapReduce;
如果是其他的类型就需要走MapReduce了,因为其他的类型都涉及到了文件的压缩,这需要借助MapReduce的压缩方式来实现
CREATE TABLE `t_user_active`(
`id` string COMMENT '主键ID',
`user_id` string COMMENT '用户ID',
`user_name` string COMMENT '用户名称',
`channel_id` string COMMENT '渠道号id',
`use_count` int COMMENT '使用个数据',
`create_by` string COMMENT '创建者ID',
`create_date` string COMMENT '创建时间',
`update_by` string COMMENT '更新者ID',
`update_date` timestamp COMMENT '更新时间'
)
PARTITIONED BY (
`create_data` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
"fileType": "text",
CREATE TABLE `t_user_active`(
`id` string COMMENT '主键ID',
`user_id` string COMMENT '用户ID',
`user_name` string COMMENT '用户名称',
`channel_id` string COMMENT '渠道号id',
`use_count` int COMMENT '使用个数据',
`create_by` string COMMENT '创建者ID',
`create_date` string COMMENT '创建时间',
`update_by` string COMMENT '更新者ID',
`update_date` timestamp COMMENT '更新时间'
)
PARTITIONED BY ( `create_data` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS orc
tblproperties("orc.compress"="SNAPPY");
"fileType": "orc",
"compress": "SNAPPY",
DataX配置文件生成脚本
import json
import getopt
import os
import sys
import MySQLdb
mysql_host = "172.16.100.42"
mysql_port = "4000"
mysql_user = "root"
mysql_passwd = "84^j1TZ-+2b9A@S7Hs"
hdfs_nn_host = "kube-42"
hdfs_nn_port = "8020"
output_path = "/opt/module/datax/job/import"
def get_connection():
return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)
def get_mysql_meta(database, table):
connection = get_connection()
cursor = connection.cursor()
sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
cursor.execute(sql, [database, table])
fetchall = cursor.fetchall()
cursor.close()
connection.close()
return fetchall
def get_mysql_columns(database, table):
return map(lambda x: x[0], get_mysql_meta(database, table))
def get_hive_columns(database, table):
def type_mapping(mysql_type):
mappings = {
"bigint": "bigint",
"int": "bigint",
"smallint": "bigint",
"tinyint": "bigint",
"decimal": "string",
"double": "double",
"float": "float",
"binary": "string",
"char": "string",
"varchar": "string",
"datetime": "string",
"time": "string",
"timestamp": "string",
"date": "string",
"text": "string"
}
return mappings[mysql_type]
meta = get_mysql_meta(database, table)
return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)
def generate_json(source_database, source_table):
job = {
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": mysql_user,
"password": mysql_passwd,
"column": get_mysql_columns(source_database, source_table),
"splitPk": "",
"connection": [{
"table": [source_table],
"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
}]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
"path": "${targetdir}",
"fileName": source_table,
"column": get_hive_columns(source_database, source_table),
"writeMode": "overwrite",
"fieldDelimiter": "\t",
"encoding": "utf-8",
"fileType": "text"
}
}
}]
}
}
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
json.dump(job, f)
def main(args):
source_database = ""
source_table = ""
options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
for opt_name, opt_value in options:
if opt_name in ('-d', '--sourcedb'):
source_database = opt_value
if opt_name in ('-t', '--sourcetbl'):
source_table = opt_value
generate_json(source_database, source_table)
if __name__ == '__main__':
main(sys.argv[1:])
"""
导入Hive表
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
"path": "${targetdir}",
"fileName": source_table,
"column": get_hive_columns(source_database, source_table),
"writeMode": "append",
"fieldDelimiter": "\t",
"encoding": "utf-8",
"fileType": "orc",
"compress": "SNAPPY" # ORC文件压缩方式,可选SNAPPY、ZLIB、NONE ZLIB 压缩算法是 ORC 的默认压缩策略
}
}
"""
python /opt/module/datax/job/gen_import_config.py -d bikeca -t t_bike_ca_trip
python /opt/module/datax/job/gen_import_config.py -d bikeca -t t_biz_loc_service
python /opt/module/datax/job/gen_import_config.py -d bikeca -t t_bike_ca_trip_hive
python /opt/module/datax/job/gen_import_config.py -d bikeca -t t_biz_loc_service_hive
sh /opt/module/datax/job/gen_import_config.sh
python /opt/module/datax/job/gen_import_config_hive.py -d bikeca -t t_bike_ca_trip
python /opt/module/datax/job/gen_import_config_hive.py -d bikeca -t t_biz_loc_service
python /opt/module/datax/job/gen_import_config_hive.py -d bikeca -t t_bike_ca_trip_hive
python /opt/module/datax/job/gen_import_config_hive.py -d bikeca -t t_biz_loc_service_hive
sh /opt/module/datax/job/gen_import_config_hive.sh
su - admin
python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/user/hive/warehouse/bikeca.db/t_biz_loc_service_test" /opt/module/datax/job/import/bikeca.t_biz_loc_service.json
textfile格式
-- bikeca.t_biz_loc_service_hive definition
CREATE TABLE `bikeca.t_biz_loc_service_test`(
`id_` string,
`serviceid_` string,
`service_name_` string,
`city_code_` string,
`service_type_` string,
`service_left_menu_` string,
`service_lock_` string,
`lon_` double,
`lat_` double,
`map_x_` double,
`map_y_` double,
`range_` int,
`weight_` int,
`biz_detail_` string,
`price_info_` string,
`nopile_price_info_` string,
`moped_price_info_` string,
`oper_time_` string,
`creator_` string,
`version_` int,
`ad_type_` string,
`operation_model_` string,
`operation_subject_` string,
`operation_start_date_` string,
`operation_status_` string,
`description_` string)
COMMENT 'Hive TEXTFILE TEST'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
STORED AS TEXTFILE;
ORC格式 无压缩
-- bikeca.t_biz_loc_service_hive definition
CREATE TABLE `bikeca.t_biz_loc_service_test`(
`id_` string,
`serviceid_` string,
`service_name_` string,
`city_code_` string,
`service_type_` string,
`service_left_menu_` string,
`service_lock_` string,
`lon_` double,
`lat_` double,
`map_x_` double,
`map_y_` double,
`range_` int,
`weight_` int,
`biz_detail_` string,
`price_info_` string,
`nopile_price_info_` string,
`moped_price_info_` string,
`oper_time_` string,
`creator_` string,
`version_` int,
`ad_type_` string,
`operation_model_` string,
`operation_subject_` string,
`operation_start_date_` string,
`operation_status_` string,
`description_` string)
COMMENT 'Hive ORC TEST'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
STORED AS ORC;
-- bikeca.t_biz_loc_service_hive definition
CREATE TABLE `bikeca.t_biz_loc_service_orc`(
`id_` string,
`serviceid_` string,
`service_name_` string,
`city_code_` string,
`service_type_` string,
`service_left_menu_` string,
`service_lock_` string,
`lon_` double,
`lat_` double,
`map_x_` double,
`map_y_` double,
`range_` int,
`weight_` int,
`biz_detail_` string,
`price_info_` string,
`nopile_price_info_` string,
`moped_price_info_` string,
`oper_time_` string,
`creator_` string,
`version_` int,
`ad_type_` string,
`operation_model_` string,
`operation_subject_` string,
`operation_start_date_` string,
`operation_status_` string,
`description_` string)
COMMENT 'Hive ORC SNAPPY TEST'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
STORED AS ORC
tblproperties("orc.compress"="SNAPPY");
python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/user/hive/warehouse/bikeca.db/t_biz_loc_service_orc" /opt/module/datax/job/import/bikeca.t_biz_loc_service.json
全量表数据同步脚本-示例
vim mysql_to_hdfs_full.sh
DATAX_HOME=/opt/module/datax
DATAX_DATA=/opt/module/datax/job
handle_targetdir() {
hadoop fs -rm -r $1 >/dev/null 2>&1
hadoop fs -mkdir -p $1
}
import_data() {
local datax_config=$1
local target_dir=$2
handle_targetdir "$target_dir"
echo "正在处理$1"
python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config >/tmp/datax_run.log 2>&1
if [ $? -ne 0 ]
then
echo "处理失败, 日志如下:"
cat /tmp/datax_run.log
fi
rm /tmp/datax_run.log
}
tab=$1
if [ -n "$2" ] ;then
do_date=$2
else
do_date=$(date -d "-1 day" +%F)
fi
case ${tab} in
base_category_info | base_province | base_source | base_subject_info | cart_info | chapter_info | course_info | knowledge_point | test_paper | test_paper_question | test_point_question | test_question_info | test_question_option | user_chapter_process | video_info)
import_data $DATAX_DATA/import/edu2077.${tab}.json /origin_data/edu/db/${tab}_full/$do_date
;;
"all")
for tmp in base_category_info base_province base_source base_subject_info cart_info chapter_info course_info knowledge_point test_paper test_paper_question test_point_question test_question_info test_question_option user_chapter_process video_info
do
import_data $DATAX_DATA/import/edu2077.${tmp}.json /origin_data/edu/db/${tmp}_full/$do_date
done
;;
esac
mysql_to_hdfs_full.sh all 2022-02-21
vim bikeca.t_biz_loc_service_orc_etl.json
{
"job": {
"content": [
{
"writer": {
"parameter": {
"encoding": "utf-8",
"writeMode": "append",
"fieldDelimiter": " ",
"column": [
{
"type": "string",
"name": "ID_"
},
{
"type": "string",
"name": "SERVICEID_"
},
{
"type": "string",
"name": "SERVICE_NAME_"
},
{
"type": "string",
"name": "CITY_CODE_"
},
{
"type": "string",
"name": "SERVICE_TYPE_"
},
{
"type": "string",
"name": "SERVICE_LEFT_MENU_"
},
{
"type": "string",
"name": "SERVICE_LOCK_"
},
{
"type": "float",
"name": "LON_"
},
{
"type": "float",
"name": "LAT_"
},
{
"type": "float",
"name": "MAP_X_"
},
{
"type": "float",
"name": "MAP_Y_"
},
{
"type": "bigint",
"name": "RANGE_"
},
{
"type": "bigint",
"name": "WEIGHT_"
},
{
"type": "string",
"name": "BIZ_DETAIL_"
},
{
"type": "string",
"name": "PRICE_INFO_"
},
{
"type": "string",
"name": "NOPILE_PRICE_INFO_"
},
{
"type": "string",
"name": "MOPED_PRICE_INFO_"
},
{
"type": "string",
"name": "OPER_TIME_"
},
{
"type": "string",
"name": "CREATOR_"
},
{
"type": "bigint",
"name": "VERSION_"
},
{
"type": "string",
"name": "AD_TYPE_"
},
{
"type": "string",
"name": "OPERATION_MODEL_"
},
{
"type": "string",
"name": "OPERATION_SUBJECT_"
},
{
"type": "string",
"name": "OPERATION_START_DATE_"
},
{
"type": "string",
"name": "OPERATION_STATUS_"
},
{
"type": "string",
"name": "DESCRIPTION_"
}
],
"path": "${targetdir}",
"fileType": "orc",
"defaultFS": "hdfs://kube-42:8020",
"compress": "SNAPPY",
"fileName": "t_biz_loc_service_orc_etl"
},
"name": "hdfswriter"
},
"reader": {
"parameter": {
"username": "root",
"column": [
"ID_",
"SERVICEID_",
"SERVICE_NAME_",
"CITY_CODE_",
"SERVICE_TYPE_",
"SERVICE_LEFT_MENU_",
"SERVICE_LOCK_",
"LON_",
"LAT_",
"MAP_X_",
"MAP_Y_",
"RANGE_",
"WEIGHT_",
"BIZ_DETAIL_",
"PRICE_INFO_",
"NOPILE_PRICE_INFO_",
"MOPED_PRICE_INFO_",
"OPER_TIME_",
"CREATOR_",
"VERSION_",
"AD_TYPE_",
"OPERATION_MODEL_",
"OPERATION_SUBJECT_",
"OPERATION_START_DATE_",
"OPERATION_STATUS_",
"DESCRIPTION_"
],
"connection": [
{
"table": [
"t_biz_loc_service_hive"
],
"jdbcUrl": [
"jdbc:mysql://172.16.100.42:4000/bikeca"
]
}
],
"password": "84^j1TZ-+2b9A@S7Hs",
"splitPk": ""
},
"name": "mysqlreader"
}
}
],
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
}
}
}
-- bikeca.t_biz_loc_service_hive definition
CREATE TABLE `bikeca.t_biz_loc_service_orc_etl`(
`id_` string COMMENT '',
`serviceid_` string COMMENT '',
`service_name_` string COMMENT '',
`city_code_` string COMMENT '',
`service_type_` string COMMENT '',
`service_left_menu_` string COMMENT '',
`service_lock_` string COMMENT '',
`lon_` double,
`lat_` double,
`map_x_` double,
`map_y_` double,
`range_` int,
`weight_` int,
`biz_detail_` string COMMENT '',
`price_info_` string COMMENT '',
`nopile_price_info_` string COMMENT '',
`moped_price_info_` string COMMENT '',
`oper_time_` string COMMENT '',
`creator_` string COMMENT '',
`version_` int,
`ad_type_` string COMMENT '',
`operation_model_` string COMMENT '',
`operation_subject_` string COMMENT '',
`operation_start_date_` string COMMENT '',
`operation_status_` string COMMENT '',
`description_` string COMMENT '')
COMMENT 'Hive ORC SNAPPY TEST'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
STORED AS ORC
tblproperties("orc.compress"="SNAPPY");
python /opt/module/datax/bin/datax.py \
-p"-Dtargetdir=/user/hive/warehouse/bikeca.db/t_biz_loc_service_orc_etl" \
/opt/module/datax/job/import/bikeca.t_biz_loc_service_orc_etl.json
历史数据同步至tidb t_bike_ca_trip_old
vim /opt/module/datax/job/import/bikeca.t_bike_ca_trip_old.json
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 2097152
}
}
}
},
"job": {
"setting": {
"speed": {
// "byte": 10485760
"byte": 16777216,
"batchSize": 2048
},
"errorLimit": {
"percentage": 0.2
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "ddarchive",
"password": "ddarchive@123",
"splitPk": "ID_",
"column": [
"ID_",
"PART_ID_",
"BIKE_ID_",
"HIRE_BIKEPOS_ID_",
"HIRE_STATION_ID_",
"HIRE_DATE_",
"HIRE_STATUS_ID_",
"RESTORE_BIKEPOS_ID_",
"RESTORE_STATION_ID_",
"RESTORE_DATE_",
"ERRORHDL_BIKEPOS_ID_",
"ERRORHDL_STATION_ID_",
"ERRORHDL_DATE_",
"ERRORHDL_TYPE_",
"TRANS_MONEY_",
"USER_ID_",
"CITY_CODE_",
"GEN_DATE_",
"LOGIN_TYPE_",
"PAY_ORDER_ID_",
"PAY_ORDER_STATUS_",
"APP_ID_",
"HIRE_STATION_NAME_",
"RESTORE_STATION_NAME_",
"ERRORHDL_STATION_NAME_",
"H_LOCK_TYPE_",
"R_LOCK_TYPE_",
"OPER_ID_",
"SERVICE_ID_",
"COUP_USED_ID_",
"BEF_DISCOUNT__MONEY_",
"COUP_TYPE_",
"RELET_ID_",
"CARD_USER_ID_",
"USER_PHONE_",
"SUPPLY_STATUS_"
],
"connection": [
{
"table": [
"t_bike_ca_trip_17_01",
"t_bike_ca_trip_17_02",
"t_bike_ca_trip_17_03",
"t_bike_ca_trip_17_04",
"t_bike_ca_trip_17_05",
"t_bike_ca_trip_17_06",
"t_bike_ca_trip_17_07",
"t_bike_ca_trip_17_08",
"t_bike_ca_trip_17_09",
"t_bike_ca_trip_17_10",
"t_bike_ca_trip_17_11",
"t_bike_ca_trip_17_12",
"t_bike_ca_trip_18_01",
"t_bike_ca_trip_18_02",
"t_bike_ca_trip_18_03",
"t_bike_ca_trip_18_04",
"t_bike_ca_trip_18_05",
"t_bike_ca_trip_18_06",
"t_bike_ca_trip_18_07",
"t_bike_ca_trip_18_08",
"t_bike_ca_trip_18_09",
"t_bike_ca_trip_18_10",
"t_bike_ca_trip_18_11",
"t_bike_ca_trip_18_12",
"t_bike_ca_trip_19_01",
"t_bike_ca_trip_19_02",
"t_bike_ca_trip_19_03",
"t_bike_ca_trip_19_04",
"t_bike_ca_trip_19_05",
"t_bike_ca_trip_19_06",
"t_bike_ca_trip_19_07",
"t_bike_ca_trip_19_08",
"t_bike_ca_trip_19_09",
"t_bike_ca_trip_19_10",
"t_bike_ca_trip_19_11",
"t_bike_ca_trip_19_12",
"t_bike_ca_trip_20_01",
"t_bike_ca_trip_20_02",
"t_bike_ca_trip_20_03"
],
"jdbcUrl": [
"jdbc:mysql://rm-bp1ib4p42qz893l9y1o.mysql.rds.aliyuncs.com:3306/ddarchive?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "84^j1TZ-+2b9A@S7Hs",
"column": [
"ID_",
"PART_ID_",
"BIKE_ID_",
"HIRE_BIKEPOS_ID_",
"HIRE_STATION_ID_",
"HIRE_DATE_",
"HIRE_STATUS_ID_",
"RESTORE_BIKEPOS_ID_",
"RESTORE_STATION_ID_",
"RESTORE_DATE_",
"ERRORHDL_BIKEPOS_ID_",
"ERRORHDL_STATION_ID_",
"ERRORHDL_DATE_",
"ERRORHDL_TYPE_",
"TRANS_MONEY_",
"USER_ID_",
"CITY_CODE_",
"GEN_DATE_",
"LOGIN_TYPE_",
"PAY_ORDER_ID_",
"PAY_ORDER_STATUS_",
"APP_ID_",
"HIRE_STATION_NAME_",
"RESTORE_STATION_NAME_",
"ERRORHDL_STATION_NAME_",
"H_LOCK_TYPE_",
"R_LOCK_TYPE_",
"OPER_ID_",
"SERVICE_ID_",
"COUP_USED_ID_",
"BEF_DISCOUNT__MONEY_",
"COUP_TYPE_",
"RELET_ID_",
"CARD_USER_ID_",
"USER_PHONE_",
"SUPPLY_STATUS_"
],
"writeMode": "replace",
"encoding": "UTF-8",
"connection": [
{
"jdbcUrl": "jdbc:mysql://172.16.100.42:4000/bikeca?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
"table": [
"t_bike_ca_trip_old"
]
}
]
}
}
}
]
}
}
python /opt/module/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" /opt/module/datax/job/import/bikeca.t_bike_ca_trip_old.json
SELECT count(1) FROM t_bike_ca_trip_18_07;
SELECT COUNT(1) FROM t_bike_ca_trip_old WHERE HIRE_DATE_ BETWEEN '2018-07-01 00:00:00' and '2018-08-01 00:00:00'