7. Sqoop常用操作

Sqoop常用操作

# kube-42 数据准备 oss tidb生产库数据导入至测试tidb库
# 工具准备
cd /usr/local/src
wget https://download.pingcap.org/tidb-community-toolkit-v7.1.1-linux-amd64.tar.gz
tar xf tidb-community-toolkit-v7.1.1-linux-amd64.tar.gz
cd tidb-community-toolkit-v7.1.1-linux-amd64
tar xf br-v7.1.1-linux-amd64.tar.gz
cp br /usr/local/bin/
br -V

# 从oss获取生产数据至测试tidb库
export AWS_ACCESS_KEY_ID="LTAI5tC67mA-----------------"
export AWS_SECRET_ACCESS_KEY="gluDz02svC4Hy---------------"
# export Bucket=tidb-bak.oss-cn-hangzhou.aliyuncs.com
# export Endpoint=oss-cn-hangzhou.aliyuncs.com

# 还原单表
br restore table --db bikeca --table t_bike_ca_trip --pd "10.96.53.30:2379" \
  --send-credentials-to-tikv=true \
  --log-file restore.log \
  --s3.provider "alibaba" \
  --storage "s3://tidb-bak/diiing-08-10-2024?access-key=${AWS_ACCESS_KEY_ID}&secret-access-key=${AWS_SECRET_ACCESS_KEY}" \
  --s3.endpoint "http://oss-cn-hangzhou.aliyuncs.com" \
  --s3.region "cn-hangzhou" --ratelimit 64 \
  --check-requirements=false

nohup /bin/sh /root/tidb_restore.sh  >>/dev/null 2>&1 &

# ceph 对象存储
export AWS_ACCESS_KEY_ID="G4RBIYLMTB1X5S45W11H"
export AWS_SECRET_ACCESS_KEY="en0zxchEmhkIpheXwW5e7D4kjtrsvZc0yymUA8jv"
br restore table --db bikeca --table t_bike_ca_trip --pd "10.96.53.30:2379" \
  --send-credentials-to-tikv=true \
  --log-file ceph_restore.log \
  --storage "s3://tidbbak/diiing-08-10-2024?access-key=${AWS_ACCESS_KEY_ID}&secret-access-key=${AWS_SECRET_ACCESS_KEY}" \
  --s3.endpoint "http://10.96.136.0:7480" \
  --ratelimit 64 \
  --check-requirements=false

# tiup br
export AWS_ACCESS_KEY_ID="G4RBIYLMTB1X5S45W11H"
export AWS_SECRET_ACCESS_KEY="en0zxchEmhkIpheXwW5e7D4kjtrsvZc0yymUA8jv"
tiup br restore table --db bikeca --table t_bike_ca_trip --pd "172.16.100.42:2379" \
  --send-credentials-to-tikv=true \
  --log-file restore.log \
  --storage "s3://tidbbak/diiing-08-10-2024?access-key=${AWS_ACCESS_KEY_ID}&secret-access-key=${AWS_SECRET_ACCESS_KEY}" \
  --s3.endpoint "http://10.96.136.0:7480" \
  --ratelimit 64 \
  --check-requirements=false

tiup br restore table --db bikeca --table t_biz_loc_service --pd "172.16.100.42:2379" \
  --send-credentials-to-tikv=true \
  --log-file restore.log \
  --storage "s3://tidbbak/diiing-08-10-2024?access-key=${AWS_ACCESS_KEY_ID}&secret-access-key=${AWS_SECRET_ACCESS_KEY}" \
  --s3.endpoint "http://10.96.136.0:7480" \
  --ratelimit 64 \
  --check-requirements=false

ANALYZE TABLE t_biz_loc_service;
# kube-42 初始准备
# cat /etc/resolv.conf
nameserver 10.96.0.10
search backend.svc.cluster.local svc.cluster.local cluster.local localdomain
options ndots:5
# tidb test 源数据库
172.16.100.42:4000    root   84^j1TZ-+2b9A@S7Hs
# pxc数据库集群
pxc-db-haproxy-replicas.pxc.svc.cluster.local:3306   develop   DeveLOP@123

# 权限配置
vim /etc/passwd
hdfs:x:993:990:Hadoop HDFS:/var/lib/hadoop-hdfs:/bin/bash  # 管理员,所有HDFS权限

# 以下操作使用hue的登录用户admin,HDFS admin权限目录 /user/admin
useradd admin
su - admin
# 查询库表 list-databases、list-tables
sqoop list-databases \
--connect jdbc:mysql://172.16.100.42:4000/ \
--username root \
--password 84^j1TZ-+2b9A@S7Hs

sqoop list-tables \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs
# 导入所有表
# 可以将RDBMS中的所有表导入到HDFS中,每一个表都对应一个HDFS目录
sqoop import-all-tables \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs
--warehouse-dir /user/admin/all_tables
# 执行sql
sqoop eval \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query "SELECT * FROM t_bike_ca_trip limit 100;"
# 生成与关系数据库表结构对应的hive表结构
sqoop create-hive-table \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--table t_bike_ca_trip \
--hive-table t_bike_ca_trip

导入数据

全量导入

sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--table t_bike_ca_trip \
--target-dir /user/admin/t_bike_ca_trip \
--delete-target-dir \
--num-mappers 3 \
--fields-terminated-by "\t"

查询数据导入

sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--target-dir /user/admin/t_bike_ca_trip \
--delete-target-dir \
--num-mappers 3 \
--fields-terminated-by "\t" \
--query 'SELECT * FROM t_bike_ca_trip where $CONDITIONS limit 100' \
--split-by 'ID_'
# --table t_bike_ca_trip \
# --query 'select name,sex from staff where id <=1 and $CONDITIONS;'
# 提示:must contain '$CONDITIONS' in WHERE clause.
# --query和 --table不可以同时出现,因为query语法里有可能是多表查询

# -m 3 \ 导数命令中map task number=3,当-m 设置的值大于1时,split-by必须设置字段(需要是 int 类型的字段),
# 如果不是 int类型的字段,则需要加上参数 -Dorg.apache.sqoop.splitter.allow_text_splitter=true
# 例子:
sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect ${conn_str} \
--username ${db_username} \
--password ${db_password} \

# 验证数据
hdfs dfs -cat /user/admin/t_bike_ca_trip/*

按列导入

sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--target-dir /user/admin/t_bike_ca_trip \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t" \
--columns ID_,BIKE_ID_ \
--table t_bike_ca_trip \
--where "ID_='b46a4d83d31c4a1e89760640ee2a608e'"   # 测试-限制导入数据量

关键字筛选查询导入数据

sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--target-dir /user/admin/t_bike_ca_trip \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t" \
--table t_bike_ca_trip \
--where "ID_='3c928eb0e34a4e5fa00289c2b62a017f'"

# 验证数据
hdfs dfs -cat /user/admin/t_bike_ca_trip/*

导入数据到hive

# hive 创建数据库
CREATE DATABASE bikeca
COMMENT 'This is a UTF-8 database'
LOCATION '/user/hive/warehouse/bikeca.db'
WITH DBPROPERTIES ('createdBy' = 'tang', 'creationDate' = '2024-10-09', 'characterSet' = 'UTF-8', 'collate' = 'utf8_general_ci');

# 查看表信息
DESCRIBE EXTENDED t_bike_ca_trip_hive;

# 会自动创建表
# --hive-drop-import-delims  在导入数据到hive时,去掉数据中的\r\n\013\010这样的字符
# --hive-overwrite  覆盖掉在Hive表中已经存在的数据
# 全量导入
sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--num-mappers 1 \
--table t_biz_loc_service \
--fields-terminated-by "\t" \
--hive-drop-import-delims \
--hive-import \
--hive-overwrite \
--hive-table bikeca.t_biz_loc_service_hive

# 验证数据
hdfs dfs -ls -R /user/hive/warehouse/bikeca.db
hdfs dfs -cat /user/hive/warehouse/bikeca.db/t_biz_loc_service_hive/*

# 查询导入
sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--target-dir /user/hive/warehouse/bikeca.db/t_bike_ca_trip_hive \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t" \
--hive-drop-import-delims \
--query 'SELECT * FROM t_bike_ca_trip where $CONDITIONS limit 100' \
--split-by 'ID_' \
--hive-import \
--hive-table bikeca.t_bike_ca_trip_hive

# 验证数据
hdfs dfs -ls -R /user/hive/warehouse/bikeca.db
hdfs dfs -cat /user/hive/warehouse/bikeca.db/t_bike_ca_trip_hive/*

SELECT * FROM bikeca.t_bike_ca_trip_hive limit 10;

# impala 同步hive元数据
INVALIDATE METADATA bikeca.t_biz_loc_service_hive;
INVALIDATE METADATA bikeca.t_bike_ca_trip_hive;

增量导入数据到hive中,mode=append

sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--target-dir /user/hive/warehouse/bikeca.db/t_bike_ca_trip_hive \
--num-mappers 1 \
--fields-terminated-by "\t" \
--hive-drop-import-delims \
--query 'SELECT * FROM t_bike_ca_trip where $CONDITIONS limit 100,300' \
--split-by 'ID_' \
--check-column ID_ \   # 需要自增列
--incremental append \
--last-value 1000001

增量导入数据到hdfs中,mode=lastmodified

# 此方式要求原有表中有time字段,它能指定一个时间戳,让Sqoop把该时间戳之后的数据导入至Hadoop(这里为HDFS)。
# 因为后续订单可能状态会变化,变化后time字段时间戳也会变化,此时Sqoop依然会将相同状态更改后的订单导入HDFS,
# 当然我们可以指定merge-key参数为order_id,表示将后续新的记录与原有记录合并。
sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--target-dir /user/hive/warehouse/bikeca.db/t_bike_ca_trip_hive \
--num-mappers 1 \
--fields-terminated-by "\t" \
--hive-drop-import-delims \
--query 'SELECT * FROM t_bike_ca_trip where $CONDITIONS limit 100,300' \
--split-by 'ID_' \
--check-column GEN_DATE_ \
--incremental lastmodified \
--last-value "2020-05-02 07:16:46.0" \
--append
# --merge-key ID_ \
# 提示:使用lastmodified方式导入数据要指定增量数据是要--append(追加)还是要--merge-key(合并)

shell脚本按天导入示例

#!/bin/bash
# 获取昨天的日期
# yesterday=`date -d "1 days ago" "+%Y-%m-%d"`
yesterday=$1
sqoop import --connect jdbc:mysql://bigdata01:3306/sqoop \
--username root --password 123456 \
--query "select * from sales_order where order_date='${yesterday}' and \$CONDITIONS" \
--target-dir /home/dt=${yesterday} \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by '\t'

hcatalog方式

CREATE table t_biz_loc_service_hcatalog like t_biz_loc_service_hive;

# 全量导入
sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--split-by id_ \
--num-mappers 1 \
--table t_biz_loc_service \
--hive-drop-import-delims \   # hcatalog默认开启,会自动删除分隔符
--hcatalog-database bikeca \
--create-hcatalog-table \
--hcatalog-table t_biz_loc_service_hcatalog

# 查询导入
sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--num-mappers 1 \
--query 'SELECT * FROM t_biz_loc_service where $CONDITIONS limit 100' \
--split-by 'ID_' \
--hive-drop-import-delims \
--hcatalog-database bikeca \
--create-hcatalog-table \
--hcatalog-table t_biz_loc_service_hcatalog_100

# 增量导入 从100行开始导入200条数据
sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--num-mappers 1 \
--query 'SELECT * FROM t_biz_loc_service where $CONDITIONS limit 100,200' \
--split-by 'ID_' \
--hive-drop-import-delims \
--hcatalog-database bikeca \
--hcatalog-table t_biz_loc_service_hcatalog_100

# 格式为orc,压缩方式为snappy
--hcatalog-database my_hcat_db
--hcatalog-table fact
--create-hcatalog-table
--hcatalog-storage-stanza 'stored as orc tblproperties ("orc.compress"="SNAPPY")'

导出数据

# HIVE/HDFS到RDBMS
sqoop export \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--table t_bike_ca_trip_hive \
--num-mappers 1 \
--export-dir /user/hive/warehouse/bikeca.db/t_bike_ca_trip_hive \
--input-fields-terminated-by "\t"
# 提示:Mysql中如果表不存在,不会自动创建

sqoop export \
--connect jdbc:mysql://172.16.100.42:4000/bikeca \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--table t_biz_loc_service_hive \
--num-mappers 1 \
--export-dir /user/hive/warehouse/bikeca.db/t_biz_loc_service_hive \
--input-fields-terminated-by "\t"

# 如果导出有中文乱码,可以指定字符集
jdbc:mysql://172.16.100.42:4000/bikeca?useUnicode=true&characterEncoding=UTF-8&useSSL=false

# 脚本打包
# 使用opt格式的文件打包sqoop命令,然后执行
# 创建一个.opt文件
touch job_HDFS2RDBMS.opt
# 编写sqoop脚本
vi job_HDFS2RDBMS.opt

export
--connect
jdbc:mysql://172.16.100.42:4000/bikeca
--username
root
--password
84^j1TZ-+2b9A@S7Hs
--table
t_bike_ca_trip_hive
--num-mappers
1
--export-dir
/user/hive/warehouse/bikeca.db/t_bike_ca_trip_hive
--input-fields-terminated-by
"\t"

# 执行该脚本
sqoop --options-file job_HDFS2RDBMS.opt