实战示例-数据准备
Oracle数据导入
Oracle数据库部署
### Oracle数据库部署Docker
docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
# 启用临时容器拷贝持久化文件
sudo docker run -d -it -p 1521:1521 --name oracle11g --restart=always registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
sudo docker ps
sudo mkdir -p /data/oracle/oradata
sudo docker cp oracle11g:/home/oracle/app/oracle/oradata/helowin /data/oracle/oradata/helowin
sudo docker cp oracle11g:/home/oracle/app/oracle/flash_recovery_area/helowin /data/oracle/oradata/flash_recovery_area
sudo docker stop oracle11g
sudo docker rm oracle11g
sudo docker run -d -it -p 1521:1521 --name oracle11g --restart=always -v /data/oracle/oradata/helowin:/home/oracle/app/oracle/oradata/helowin -v /data/oracle/oradata/flash_recovery_area:/home/oracle/app/oracle/flash_recovery_area/helowin registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
sudo docker exec -it oracle11g bash
su - root # 密码是helowin
sudo chown -R oracle /home/oracle/app/oracle/flash_recovery_area/helowin
sudo chown -R oracle /home/oracle/app/oracle/oradata/helowin
sudo docker restart oracle11g
sudo docker logs oracle11g
sudo docker exec -it oracle11g bash
su - root #切换root,密码helowin
vi /etc/profile #编辑profile文件配置Oracle环境变量
export ORACLE_HOME=/home/oracle/app/oracle/product/11.2.0/dbhome_2
export ORACLE_SID=helowin
export PATH=$ORACLE_HOME/bin:$PATH
source /etc/profile
ln -s $ORACLE_HOME/bin/sqlplus /usr/bin
su - oracle
sqlplus / as sysdba # 登录
CONNECT / AS SYSDBA; # 连接,需要进行操作系统验证,才可进行连接登录
alter user system identified by Oracle1232123; # 修改system用户账号密码oracle2024
alter user sys identified by Oracle1232123; # 修改sys用户账号密码oracle2024
ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED; # 修改密码规则策略为密码永不过期
-- 创建内部管理员账号密码;
create user tang identified by Tang123;
--将dba权限授权给内部管理员账号和密码;
grant connect,resource,dba to tang;
--修改密码规则策略为密码永不过期;
ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED;
--修改数据库最大连接数据;sql
alter system set processes=1000 scope=spfile;
--用户锁定解锁
alter user db_user account unlock;
commit;
# 查看数据状态
lsnrctl status
# 查看表空间
SELECT tablespace_name, file_name, bytes/1024/1024 AS size_mb, autoextensible FROM dba_data_files;
# 表空间详情
select tablespace_name,file_name,autoextensible from dba_data_files where tablespace_name = '表空间名称';
# 开启自动扩展功能语法:
alter database datafile '对应的数据文件路径信息' autoextend on;
# 关闭自动扩展功能语法:
alter database datafile '对应的数据文件路径信息' autoextend off;
# 备份还原
-- 通过以下命令,可查询 expdp 支持的参数,impdp 同理
expdp help=y
# 备份数据:
# 前提:创建 directory 对象
-- 1.先看 directory 对象是否存在
select * from dba_directories t where t.directory_name = 'MYDIR';
-- 2.若不存在,则创建
create directory mydir as '/home/oracle/app/oracle/oradata/helowin/bak';
# 授权(若非 system 用户,则需要,如 scott)
-- 授予 create 权限
grant create any directory to scott;
-- 授予 read,write 权限
grant read, write on directory mydir to scott;
-- 查看拥有 directory 相关权限的用户
select * from dba_sys_privs t where t.privilege like '%DIRECTORY%';
-- 查询拥有 read,write 相关权限的用户
select * from dba_tab_privs t where t.privilege in ('READ', 'WRITE');
-- 实际开发中,只保留一个空格、不换行,此处仅为了 "显示差异"
expdp 用户名/密码@数据库 目录 转储文件(二进制) 导出模式 是否开启日志
expdp scott/scott@orcl directory=mydir dumpfile=tables.dmp tables=emp,dept logfile=tables.log
expdp scott/scott@orcl directory=mydir dumpfile=schemas.dmp schemas=scott logfile=schemas.log
expdp scott/scott@orcl directory=mydir dumpfile=tablespaces.dmp tablespaces=users logfile=tablespaces.log
expdp system/system@orcl directory=mydir dumpfile=full.dmp full=Y nologfile=Y
-- 其中 mydir、emp.dmp、emp.log 为自定义名称
expdp scott/scott@orcl directory=mydir dumpfile=emp.dmp tables=emp logfile=emp.log
-- 导出多张表
expdp scott/scott@orcl directory=mydir dumpfile=emp.dmp tables=emp,dept logfile=emp.log
expdp scott/scott@orcl directory=mydir dumpfile=emp.dmp tables=emp query='emp:"WHERE deptno=10 AND sal>2000"' logfile=emp.log
-- 备份示例
expdp system/Oracle1232123@helowin directory=mydir dumpfile=tang.dmp schemas=TANG nologfile=Y
# 还原
-- 一般格式
impdp scott/scott@orcl directory=mydir dumpfile=emp.dmp tables=emp logfile=emp.log
-- 导入,追加
impdp scott/scott@orcl directory=mydir dumpfile=emp.dmp tables=emp table_exists_action=APPEND
-- 还原示例
impdp tang/Tang123@helowin directory=mydir dumpfile=CISS_2021.dmp logfile=CISS_2021.log
-- imp
imp tang/Tang123@helowin file=/home/oracle/app/oracle/oradata/helowin/bak/CISS_2021.dmp full=y
导入数据测试
# 导入oracle驱动
cd /opt/cloudera/parcels/CDH/lib/sqoop/lib
wget https://home.vimll.com:9999/download/oracle/ojdbc6-11.2.0.4.jar
# 创建文件目录
hdfs dfs -mkdir -p /test/full_imp/
hdfs dfs -chown -R admin:admin /test
su - admin
# 执行导入命令
sqoop import \
--connect jdbc:oracle:thin:@172.16.100.41:1521:helowin \
--username tang \
--password Tang123 \
--table CISS4.CISS_BASE_AREAS \
--delete-target-dir \
--target-dir /test/full_imp/ciss4.ciss_base_areas \
--fields-terminated-by "\t" \
-m 1
# 验证数据
hdfs dfs -cat /test/full_imp/ciss4.ciss_base_areas/* | head -n 10
提高YARN集群的并发度
# yarn资源配置
yarn.scheduler.capacity.maximum-am-resource-percent=0.8
yarn.scheduler.minimum-allocation-mb=512
# 提高并发度配置
- YARN资源配置
yarn.nodemanager.resource.cpu-vcores=8
yarn.nodemanager.resource.memory-mb=8192
- Container资源
yarn.scheduler.minimum-allocation-vcores=1
yarn.scheduler.maximum-allocation-vcores=32
yarn.scheduler.minimum-allocation-mb=1024
yarn.scheduler.maximum-allocation-mb=8192
- MR Task资源
mapreduce.map.cpu.vcores=1
mapreduce.map.memory.mb=1024
mapreduce.reduce.cpu.vcores=1
mapreduce.reduce.memory.mb=1024
- Spark Executor资源
--driver-memory #分配给Driver的内存,默认分配1GB
--driver-cores #分配给Driver运行的CPU核数,默认分配1核
--executor-memory #分配给每个Executor的内存数,默认为1G,所有集群模式都通用的选项
--executor-cores #分配给每个Executor的核心数,YARN集合和Standalone集群通用的选项
--total-executor-cores NUM #Standalone模式下用于指定所有Executor所用的总CPU核数
--num-executors NUM #YARN模式下用于指定Executor的个数,默认启动2个
# Uber模式下,程序只申请一个AM Container:所有Map Task和Reduce Task,均在这个Container中顺序执行
- 默认不开启
mapreduce.job.ubertask.enable=true
# 配置Uber必须满足以下条件才会执行
mapreduce.job.ubertask.maxmaps=9
mapreduce.job.ubertask.maxreduces=1
mapreduce.job.ubertask.maxbytes=128M
yarn.app.mapreduce.am.resource.cpu-vcores=1
yarn.app.mapreduce.am.resource.mb=1536M
- 特点
- Uber模式的进程为AM,所有资源的使用必须小于AM进程的资源
- Uber模式条件不满足,不执行Uber模式
- Uber模式,会禁用推测执行机制
Sqoop采集数据格式问题
- Sqoop采集完成后导致HDFS数据与Oracle数据量不符
- sqoop以文本格式导入数据时,默认的换行符是特殊字符
- Oracle中的数据列中如果出现了\n、\r、\t等特殊字符,就会被划分为多行
- 解决
- 方案一:删除或者替换数据中的换行符
- --hive-drop-import-delims:删除换行符
- --hive-delims-replacement char:替换换行符
- 不建议使用:侵入了原始数据
- 方案二:使用特殊文件格式:AVRO格式
- 常见格式介绍
| 类型 | 介绍 |
| ------------ | ------------------------------------------------------------ |
| TextFile | Hive默认的文件格式,最简单的数据格式,便于查看和编辑,耗费存储空间,I/O性能较低 |
| SequenceFile | 含有键值对的二进制文件,优化磁盘利用率和I/O,并行操作数据,查询效率高,但存储空间消耗最大 |
| AvroFile | 特殊的二进制文件,设计的主要目标是为了满足schema evolution,Schema和数据保存在一起 |
| OrcFile | 列式存储,Schema存储在footer中,不支持schema evolution,高度压缩比并包含索引,查询速度非常快 |
| ParquetFile | 列式存储,与Orc类似,压缩比不如Orc,但是查询性能接近,支持的工具更多,通用性更强 |
- SparkCore缺点:RDD【数据】:没有Schema
- SparkSQL优点:DataFrame【数据 + Schema】
- Schema:列的信息【名称、类型】
- Avro格式特点
- 优点
- 二进制数据存储,性能好、效率高
- 使用JSON描述模式,支持场景更丰富
- Schema和数据统一存储,消息自描述
- 模式定义允许定义数据的排序
- 缺点
- 只支持Avro自己的序列化格式
- 少量列的读取性能比较差,压缩比较低
- 场景:基于行的大规模结构化数据写入、列的读取非常多或者Schema变更操作比较频繁的场景
- Sqoop使用Avro格式
- 选项
--as-avrodatafile # Imports data to Avro datafiles
- 注意:如果使用了MR的Uber模式,必须在程序中加上以下参数避免类冲突问题
-Dmapreduce.job.user.classpath.first=true
导入数据测试
sqoop import \
-Dmapreduce.job.user.classpath.first=true \
--connect jdbc:oracle:thin:@172.16.100.41:1521:helowin \
--username tang \
--password Tang123 \
--table CISS4.CISS_SERVICE_WORKORDER \
--delete-target-dir \
--target-dir /test/full_imp/ciss4.ciss_service_workorder \
--as-avrodatafile \
--fields-terminated-by "\001" \
-m 1
# 24/12/02 15:05:34 INFO mapreduce.ImportJobBase: Retrieved 178609 records.
## hue 创建表测试
create external table test_avro(
line string
)
stored as avro
location '/test/full_imp/ciss4.ciss_service_workorder';
select count(*) from default.test_avro;
# 178609
drop table test_avro;
## 导入并创建表
CREATE DATABASE ciss4
COMMENT 'This is a ciss4 UTF-8 database'
LOCATION '/user/hive/warehouse/ciss4.db'
WITH DBPROPERTIES ('createdBy' = 'tang', 'creationDate' = '2024-10-09', 'characterSet' = 'UTF-8', 'collate' = 'utf8_general_ci');
sqoop import \
-Dmapreduce.job.user.classpath.first=true \
--connect jdbc:oracle:thin:@172.16.100.41:1521:helowin \
--username tang \
--password Tang123 \
--table CISS4.CISS_SERVICE_WORKORDER \
--fields-terminated-by "\001" \
--hive-drop-import-delims \
--hcatalog-database ciss4 \
--create-hcatalog-table \
--hcatalog-table ciss_service_workorder \
--hcatalog-storage-stanza 'stored as orc tblproperties ("orc.compress"="SNAPPY")' \
-m 1
Sqoop增量采集方案回顾
增量三种方式
-
Append
- 要求:必须有一列自增的值,按照自增的int值进行判断
- 特点:只能导入增加的数据,无法导入更新的数据
- 场景:数据只会发生新增,不会发生更新的场景
- 代码
sqoop import \ --connect jdbc:mysql://node3:3306/sqoopTest \ --username root \ --password 123456 \ --table tb_tohdfs \ --target-dir /sqoop/import/test02 \ --fields-terminated-by '\t' \ --check-column id \ --incremental append \ --last-value 0 \ -m 1
-
Lastmodified
- 要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断
- 特点:既导入新增的数据也导入更新的数据
- 场景:一般无法满足要求,所以不用
- 代码
sqoop import \ --connect jdbc:mysql://node3:3306/sqoopTest \ --username root \ --password 123456 \ --table tb_lastmode \ --target-dir /sqoop/import/test03 \ --fields-terminated-by '\t' \ --incremental lastmodified \ --check-column lastmode \ --last-value '2021-06-06 16:09:32' \ -m 1
-
特殊方式
- 要求:每次运行的输出目录不能相同
- 特点:自己实现增量的数据过滤,可以实现新增和更新数据的采集
- 场景:一般用于自定义增量采集每天的分区数据到Hive
- 代码
sqoop import \ --connect jdbc:mysql://node3:3306/db_order \ --username root \ --password-file file:///export/data/sqoop.passwd \ --query "select * from tb_order where substring(create_time,1,10) = '2021-09-14' or substring(update_time,1,10) = '2021-09-14' and \$CONDITIONS " \ --delete-target-dir \ --target-dir /nginx/logs/tb_order/daystr=2021-09-14 \ --fields-terminated-by '\t' \ -m 1
使用脚本快速采集数据示例
cd /home/admin/sparksql_hive
mkdir log java_code
# 创建存放表名的文件
vim test_full_table.txt
CISS4.CISS_BASE_CUSTOMER
CISS4.CISS_BASE_DEVICE
CISS4.CISS_BASE_DEVICE_DETAIL
CISS4.CISS_BASE_DEVICE_FACTORY_INFO
CISS4.CISS_BASE_DEVICE_MODEL
CISS4.CISS_BASE_DEVICE_NAME
......
# 全量脚本
vim test_full_import_table.sh
#!/bin/bash
# export path
source /etc/profile
# export the tbname files
TB_NAME=test_full_table.txt
# export the import opt
IMP_OPT="sqoop import -Dmapreduce.job.user.classpath.first=true"
# export the jdbc opt
JDBC_OPT="--connect jdbc:oracle:thin:@172.16.100.41:1521:helowin --username tang --password Tang123"
# read tbname and exec sqoop
while read tbname
do
${IMP_OPT} ${JDBC_OPT} --table ${tbname^^} --delete-target-dir --target-dir /test/full_imp/${tbname,,} --as-avrodatafile --fields-terminated-by "\001" -m 1
done < ${TB_NAME}
# ${tbname^^} 将小写字母转换为大写字母
# ${tbname,,} 将大写字母转换为小写字母
# 测试
$ while read tbname
> do echo ${tbname^^}
> done < test_full_table.txt
ciss4.ciss_base_areas
ciss4.ciss_base_baseinfo
ciss4.ciss_base_csp
ciss4.ciss_base_customer
ciss4.ciss_base_device
sh -x test_full_import_table.sh
完整全量采集脚本
vim full_import_tables.sh
#!/usr/bin/env bash
# /bin/bash
biz_date=20210101
biz_fmt_date=2021-01-01
dw_parent_dir=/test/full_imp
workhome=/home/admin/sparksql_hive
full_imp_tables=${workhome}/full_import_tables.txt
# mkdir ${workhome}/log
orcl_srv=172.16.100.41
orcl_port=1521
orcl_sid=helowin
orcl_user=tang
orcl_pwd=Tang123
sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"
sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"
# --outdir:Sqoop解析出来的MR的Java程序等输出文件输出到指定目录
# load hadoop/sqoop env
source /etc/profile
while read p; do
# parallel execution import
${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} -m 1 &
cur_time=date "+%F %T"
echo "${cur_time}: ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p} -m 1 &" >> ${workhome}/log/${biz_fmt_date}_full_imp.log
sleep 30
done < ${full_imp_tables}
### pythone脚本 D:\git-python\大数据-Spark\py\full_import_tables.py 参考
完整增量采集脚本
vim incr_import_tables.txt
ciss4.ciss_csp_m_bnblp_hx_sqd
ciss4.ciss_csp_m_bnblp_hx_sqd_dtl
ciss4.ciss_csp_m_bnlp_hx_sqd
ciss4.ciss_csp_m_bnlp_hx_sqd_dtl
ciss4.ciss_csp_m_wl_db_sqd
ciss4.ciss_csp_m_wl_db_sqd_dtl
......
vim incr_import_tables.sh
#!/usr/bin/env bash
# 编写SHELL脚本的时候要特别小心,特别是编写SQL的条件,如果中间加了空格,就会导致命令执行失败
# /bin/bash
biz_date=20210101
biz_fmt_date=2021-01-01
dw_parent_dir=/test/incr_imp
workhome=/home/admin/sparksql_hive
incr_imp_tables=${workhome}/incr_import_tables.txt
# mkdir ${workhome}/log
orcl_srv=172.16.100.41
orcl_port=1521
orcl_sid=helowin
orcl_user=tang
orcl_pwd=Tang123
sqoop_condition_params="--where \"'${biz_fmt_date}'=to_char(CREATE_TIME,'yyyy-mm-dd')\""
sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"
sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"
# load hadoop/sqoop env
source /etc/profile
while read p; do
# clean old directory in HDFS
hdfs dfs -rm -r ${dw_parent_dir}/${p}/${biz_date}
# parallel execution import
${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} ${sqoop_condition_params} -m 1 &
cur_time=date "+%F %T"
echo "${cur_time}: ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p} ${sqoop_condition_params} -m 1 &" >> ${workhome}/log/${biz_fmt_date}_incr_imp.log
sleep 30
done < ${incr_imp_tables}
### pythone脚本 D:\git-python\大数据-Spark\py\incr_import_tables.py 参考
Schema备份及上传
# 将每张表的Schema进行上传到HDFS上,归档并且备份
# Avro文件HDFS存储
hdfs_schema_dir=/test/avsc
hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}
# Avro文件本地打包
biz_date=20210101
local_schema_backup_filename=schema_${biz_date}.tar.gz
tar -czf ${local_schema_backup_filename} ./java_code/*.avsc
hdfs_schema_backup_filename=${hdfs_schema_dir}/avro_schema_${biz_date}.tar.gz
hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}
## 使用脚本实现
vim upload_avro_schema.sh
#!/usr/bin/env bash
# 上传
# /bin/bash
workhome=/home/admin/sparksql_hive
hdfs_schema_dir=/test/avsc
biz_date=20210101
biz_fmt_date=2021-01-01
local_schema_backup_filename=schema_${biz_date}.tar.gz
hdfs_schema_backup_filename=${hdfs_schema_dir}/avro_schema_${biz_date}.tar.gz
log_file=${workhome}/log/upload_avro_schema_${biz_fmt_date}.log
# 打印日志
log() {
cur_time=date "+%F %T"
echo "${cur_time} $*" >> ${log_file}
}
source /etc/profile
cd ${workhome}
# hadoop fs [generic options] [-test -[defsz] ]
# -test -[defsz] :
# Answer various questions about , with result via exit status.
# -d return 0 if is a directory.
# -e return 0 if exists.
# -f return 0 if is a file.
# -s return 0 if file is greater than zero bytes in size.
# -z return 0 if file is zero bytes in size, else return 1.
log "Check if the HDFS Avro schema directory ${hdfs_schema_dir}..."
hdfs dfs -test -e ${hdfs_schema_dir} > /dev/null
if [ $? != 0 ]; then
log "Path: ${hdfs_schema_dir} is not exists. Create a new one."
log "hdfs dfs -mkdir -p ${hdfs_schema_dir}"
hdfs dfs -mkdir -p ${hdfs_schema_dir}
fi
log "Check if the file ${hdfs_schema_dir}/CISS4_CISS_BASE_AREAS.avsc has uploaded to the HFDS..."
hdfs dfs -test -e ${hdfs_schema_dir}/CISS4_CISS_BASE_AREAS.avsc > /dev/null
if [ $? != 0 ]; then
log "Upload all the .avsc schema file."
log "hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}"
hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}
fi
# backup
log "Check if the backup tar.gz file has generated in the local server..."
if [ ! -e ${local_schema_backup_filename} ]; then
log "package and compress the schema files"
log "tar -czf ${local_schema_backup_filename} ./java_code/*.avsc"
tar -czf ${local_schema_backup_filename} ./java_code/*.avsc
fi
log "Check if the backup tar.gz file has upload to the HDFS..."
hdfs dfs -test -e ${hdfs_schema_backup_filename} > /dev/null
if [ $? != 0 ]; then
log "upload the schema package file to HDFS"
log "hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}"
hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}
fi
### pythone脚本 D:\git-python\大数据-Spark\py\upload_avro_schema.py 参考