14. 实战示例-数据准备

实战示例-数据准备

Oracle数据导入

Oracle数据库部署

### Oracle数据库部署Docker
docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
# 启用临时容器拷贝持久化文件
sudo docker run -d -it -p 1521:1521 --name oracle11g --restart=always registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
sudo docker ps
sudo mkdir -p /data/oracle/oradata
sudo docker cp oracle11g:/home/oracle/app/oracle/oradata/helowin /data/oracle/oradata/helowin
sudo docker cp oracle11g:/home/oracle/app/oracle/flash_recovery_area/helowin /data/oracle/oradata/flash_recovery_area
sudo docker stop oracle11g
sudo docker rm oracle11g
sudo docker run -d -it -p 1521:1521 --name oracle11g --restart=always -v /data/oracle/oradata/helowin:/home/oracle/app/oracle/oradata/helowin -v /data/oracle/oradata/flash_recovery_area:/home/oracle/app/oracle/flash_recovery_area/helowin registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
sudo docker exec -it oracle11g bash
su - root # 密码是helowin
sudo chown -R oracle /home/oracle/app/oracle/flash_recovery_area/helowin
sudo chown -R oracle /home/oracle/app/oracle/oradata/helowin
sudo docker restart oracle11g
sudo docker logs oracle11g
sudo docker exec -it oracle11g bash
su - root #切换root,密码helowin
vi /etc/profile  #编辑profile文件配置Oracle环境变量
export ORACLE_HOME=/home/oracle/app/oracle/product/11.2.0/dbhome_2
export ORACLE_SID=helowin
export PATH=$ORACLE_HOME/bin:$PATH 
source /etc/profile
ln -s $ORACLE_HOME/bin/sqlplus /usr/bin
su - oracle
sqlplus / as sysdba                   # 登录
CONNECT / AS SYSDBA;                             # 连接,需要进行操作系统验证,才可进行连接登录
alter user system identified by Oracle1232123;           # 修改system用户账号密码oracle2024
alter user sys identified by Oracle1232123;                 # 修改sys用户账号密码oracle2024
ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED;      # 修改密码规则策略为密码永不过期
-- 创建内部管理员账号密码;
create user tang identified by Tang123; 
--将dba权限授权给内部管理员账号和密码;
grant connect,resource,dba to tang; 
--修改密码规则策略为密码永不过期;
ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED; 
--修改数据库最大连接数据;sql
alter system set processes=1000 scope=spfile; 
--用户锁定解锁
alter user db_user account unlock;
commit;
# 查看数据状态
lsnrctl status
# 查看表空间
SELECT tablespace_name, file_name, bytes/1024/1024 AS size_mb, autoextensible FROM dba_data_files;
# 表空间详情
select tablespace_name,file_name,autoextensible from dba_data_files where tablespace_name = '表空间名称';
# 开启自动扩展功能语法:
alter database datafile '对应的数据文件路径信息' autoextend on;
# 关闭自动扩展功能语法:
alter database datafile '对应的数据文件路径信息' autoextend off;
# 备份还原
-- 通过以下命令,可查询 expdp 支持的参数,impdp 同理
expdp help=y
# 备份数据:
# 前提:创建 directory 对象
-- 1.先看 directory 对象是否存在
select * from dba_directories t where t.directory_name = 'MYDIR';
-- 2.若不存在,则创建
create directory mydir as '/home/oracle/app/oracle/oradata/helowin/bak';
# 授权(若非 system 用户,则需要,如 scott)
-- 授予 create 权限
grant create any directory to scott;
-- 授予 read,write 权限
grant read, write on directory mydir to scott;
-- 查看拥有 directory 相关权限的用户
select * from dba_sys_privs t where t.privilege like '%DIRECTORY%';
-- 查询拥有 read,write 相关权限的用户
select * from dba_tab_privs t where t.privilege in ('READ', 'WRITE');
-- 实际开发中,只保留一个空格、不换行,此处仅为了 "显示差异"
expdp 用户名/密码@数据库    目录            转储文件(二进制)         导出模式           是否开启日志
expdp scott/scott@orcl   directory=mydir dumpfile=tables.dmp      tables=emp,dept   logfile=tables.log
expdp scott/scott@orcl   directory=mydir dumpfile=schemas.dmp     schemas=scott     logfile=schemas.log
expdp scott/scott@orcl   directory=mydir dumpfile=tablespaces.dmp tablespaces=users logfile=tablespaces.log
expdp system/system@orcl directory=mydir dumpfile=full.dmp        full=Y            nologfile=Y
-- 其中 mydir、emp.dmp、emp.log 为自定义名称
expdp scott/scott@orcl directory=mydir dumpfile=emp.dmp tables=emp logfile=emp.log
-- 导出多张表
expdp scott/scott@orcl directory=mydir dumpfile=emp.dmp tables=emp,dept logfile=emp.log
expdp scott/scott@orcl directory=mydir dumpfile=emp.dmp tables=emp query='emp:"WHERE deptno=10 AND sal>2000"' logfile=emp.log
-- 备份示例
expdp system/Oracle1232123@helowin directory=mydir dumpfile=tang.dmp  schemas=TANG nologfile=Y
# 还原
-- 一般格式
impdp scott/scott@orcl directory=mydir dumpfile=emp.dmp tables=emp logfile=emp.log
-- 导入,追加
impdp scott/scott@orcl directory=mydir dumpfile=emp.dmp tables=emp table_exists_action=APPEND
-- 还原示例
impdp tang/Tang123@helowin directory=mydir dumpfile=CISS_2021.dmp logfile=CISS_2021.log
-- imp
imp  tang/Tang123@helowin file=/home/oracle/app/oracle/oradata/helowin/bak/CISS_2021.dmp full=y

导入数据测试

# 导入oracle驱动
cd /opt/cloudera/parcels/CDH/lib/sqoop/lib
wget https://home.vimll.com:9999/download/oracle/ojdbc6-11.2.0.4.jar
# 创建文件目录
hdfs dfs -mkdir -p /test/full_imp/
hdfs dfs -chown -R admin:admin /test
su - admin
# 执行导入命令
sqoop import \
    --connect jdbc:oracle:thin:@172.16.100.41:1521:helowin \
    --username tang \
    --password Tang123 \
    --table CISS4.CISS_BASE_AREAS \
    --delete-target-dir \
    --target-dir /test/full_imp/ciss4.ciss_base_areas \
    --fields-terminated-by "\t" \
    -m 1
# 验证数据
hdfs dfs -cat /test/full_imp/ciss4.ciss_base_areas/* | head -n 10

提高YARN集群的并发度

# yarn资源配置
yarn.scheduler.capacity.maximum-am-resource-percent=0.8
yarn.scheduler.minimum-allocation-mb=512
# 提高并发度配置
- YARN资源配置
    yarn.nodemanager.resource.cpu-vcores=8
    yarn.nodemanager.resource.memory-mb=8192
- Container资源
    yarn.scheduler.minimum-allocation-vcores=1
    yarn.scheduler.maximum-allocation-vcores=32
    yarn.scheduler.minimum-allocation-mb=1024
    yarn.scheduler.maximum-allocation-mb=8192
- MR Task资源
    mapreduce.map.cpu.vcores=1
    mapreduce.map.memory.mb=1024
    mapreduce.reduce.cpu.vcores=1
    mapreduce.reduce.memory.mb=1024
- Spark Executor资源
    --driver-memory  #分配给Driver的内存,默认分配1GB
    --driver-cores   #分配给Driver运行的CPU核数,默认分配1核
    --executor-memory #分配给每个Executor的内存数,默认为1G,所有集群模式都通用的选项
    --executor-cores  #分配给每个Executor的核心数,YARN集合和Standalone集群通用的选项
    --total-executor-cores NUM  #Standalone模式下用于指定所有Executor所用的总CPU核数
    --num-executors NUM #YARN模式下用于指定Executor的个数,默认启动2个

# Uber模式下,程序只申请一个AM Container:所有Map Task和Reduce Task,均在这个Container中顺序执行
- 默认不开启
    mapreduce.job.ubertask.enable=true
    # 配置Uber必须满足以下条件才会执行
    mapreduce.job.ubertask.maxmaps=9
    mapreduce.job.ubertask.maxreduces=1
    mapreduce.job.ubertask.maxbytes=128M
    yarn.app.mapreduce.am.resource.cpu-vcores=1
    yarn.app.mapreduce.am.resource.mb=1536M
- 特点
    - Uber模式的进程为AM,所有资源的使用必须小于AM进程的资源
    - Uber模式条件不满足,不执行Uber模式
    - Uber模式,会禁用推测执行机制

Sqoop采集数据格式问题

- Sqoop采集完成后导致HDFS数据与Oracle数据量不符
  - sqoop以文本格式导入数据时,默认的换行符是特殊字符
  - Oracle中的数据列中如果出现了\n、\r、\t等特殊字符,就会被划分为多行
- 解决
    - 方案一:删除或者替换数据中的换行符
      - --hive-drop-import-delims:删除换行符
      - --hive-delims-replacement  char:替换换行符
      - 不建议使用:侵入了原始数据
    - 方案二:使用特殊文件格式:AVRO格式

- 常见格式介绍
    | 类型         | 介绍                                                         |
    | ------------ | ------------------------------------------------------------ |
    | TextFile     | Hive默认的文件格式,最简单的数据格式,便于查看和编辑,耗费存储空间,I/O性能较低 |
    | SequenceFile | 含有键值对的二进制文件,优化磁盘利用率和I/O,并行操作数据,查询效率高,但存储空间消耗最大 |
    | AvroFile     | 特殊的二进制文件,设计的主要目标是为了满足schema evolution,Schema和数据保存在一起 |
    | OrcFile      | 列式存储,Schema存储在footer中,不支持schema evolution,高度压缩比并包含索引,查询速度非常快 |
    | ParquetFile  | 列式存储,与Orc类似,压缩比不如Orc,但是查询性能接近,支持的工具更多,通用性更强 |

    - SparkCore缺点:RDD【数据】:没有Schema
    - SparkSQL优点:DataFrame【数据 + Schema】
    - Schema:列的信息【名称、类型】

- Avro格式特点
    - 优点
        - 二进制数据存储,性能好、效率高
        - 使用JSON描述模式,支持场景更丰富
        - Schema和数据统一存储,消息自描述
        - 模式定义允许定义数据的排序
    - 缺点
        - 只支持Avro自己的序列化格式
        - 少量列的读取性能比较差,压缩比较低
    - 场景:基于行的大规模结构化数据写入、列的读取非常多或者Schema变更操作比较频繁的场景

  - Sqoop使用Avro格式
    - 选项
      --as-avrodatafile      # Imports data to Avro datafiles
    - 注意:如果使用了MR的Uber模式,必须在程序中加上以下参数避免类冲突问题
      -Dmapreduce.job.user.classpath.first=true

导入数据测试

sqoop import \
    -Dmapreduce.job.user.classpath.first=true \
    --connect jdbc:oracle:thin:@172.16.100.41:1521:helowin \
    --username tang \
    --password Tang123 \
    --table CISS4.CISS_SERVICE_WORKORDER \
    --delete-target-dir \
    --target-dir /test/full_imp/ciss4.ciss_service_workorder \
    --as-avrodatafile \
    --fields-terminated-by "\001" \
    -m 1
# 24/12/02 15:05:34 INFO mapreduce.ImportJobBase: Retrieved 178609 records.

## hue 创建表测试
create external table test_avro(
    line string
    )
    stored as avro
    location '/test/full_imp/ciss4.ciss_service_workorder';

select count(*) from default.test_avro;
# 178609
drop table test_avro;

## 导入并创建表
CREATE DATABASE ciss4
COMMENT 'This is a ciss4 UTF-8 database'
LOCATION '/user/hive/warehouse/ciss4.db'
WITH DBPROPERTIES ('createdBy' = 'tang', 'creationDate' = '2024-10-09', 'characterSet' = 'UTF-8', 'collate' = 'utf8_general_ci');

sqoop import \
    -Dmapreduce.job.user.classpath.first=true \
    --connect jdbc:oracle:thin:@172.16.100.41:1521:helowin \
    --username tang \
    --password Tang123 \
    --table CISS4.CISS_SERVICE_WORKORDER \
    --fields-terminated-by "\001" \
    --hive-drop-import-delims \
    --hcatalog-database ciss4 \
    --create-hcatalog-table \
    --hcatalog-table ciss_service_workorder \
    --hcatalog-storage-stanza 'stored as orc tblproperties ("orc.compress"="SNAPPY")' \
    -m 1

Sqoop增量采集方案回顾

增量三种方式

  • Append

    • 要求:必须有一列自增的值,按照自增的int值进行判断
    • 特点:只能导入增加的数据,无法导入更新的数据
    • 场景:数据只会发生新增,不会发生更新的场景
    • 代码
      sqoop import \
      --connect jdbc:mysql://node3:3306/sqoopTest \
      --username root \
      --password 123456 \
      --table tb_tohdfs \
      --target-dir /sqoop/import/test02 \
      --fields-terminated-by '\t' \
      --check-column id \
      --incremental append \
      --last-value 0 \
      -m 1
  • Lastmodified

    • 要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断
    • 特点:既导入新增的数据也导入更新的数据
    • 场景:一般无法满足要求,所以不用
    • 代码
      sqoop import \
      --connect jdbc:mysql://node3:3306/sqoopTest \
      --username root \
      --password 123456 \
      --table tb_lastmode \
      --target-dir /sqoop/import/test03 \
      --fields-terminated-by '\t' \
      --incremental lastmodified \
      --check-column lastmode \
      --last-value '2021-06-06 16:09:32' \
      -m 1
  • 特殊方式

    • 要求:每次运行的输出目录不能相同
    • 特点:自己实现增量的数据过滤,可以实现新增和更新数据的采集
    • 场景:一般用于自定义增量采集每天的分区数据到Hive
    • 代码
      sqoop  import \
      --connect jdbc:mysql://node3:3306/db_order \
      --username root \
      --password-file file:///export/data/sqoop.passwd \
      --query "select * from tb_order where substring(create_time,1,10) = '2021-09-14' or substring(update_time,1,10) = '2021-09-14' and \$CONDITIONS " \
      --delete-target-dir \
      --target-dir /nginx/logs/tb_order/daystr=2021-09-14 \
      --fields-terminated-by '\t' \
      -m 1

使用脚本快速采集数据示例

cd /home/admin/sparksql_hive
mkdir log java_code
# 创建存放表名的文件
vim test_full_table.txt

CISS4.CISS_BASE_CUSTOMER
CISS4.CISS_BASE_DEVICE
CISS4.CISS_BASE_DEVICE_DETAIL
CISS4.CISS_BASE_DEVICE_FACTORY_INFO
CISS4.CISS_BASE_DEVICE_MODEL
CISS4.CISS_BASE_DEVICE_NAME
......

# 全量脚本
vim test_full_import_table.sh
#!/bin/bash
# export path
source /etc/profile
# export the tbname files
TB_NAME=test_full_table.txt
# export the import opt
IMP_OPT="sqoop import -Dmapreduce.job.user.classpath.first=true"
# export the jdbc opt
JDBC_OPT="--connect jdbc:oracle:thin:@172.16.100.41:1521:helowin --username tang --password Tang123"

# read tbname and exec sqoop
while read tbname
do
${IMP_OPT} ${JDBC_OPT} --table ${tbname^^} --delete-target-dir --target-dir /test/full_imp/${tbname,,} --as-avrodatafile --fields-terminated-by "\001" -m 1
done < ${TB_NAME}

# ${tbname^^} 将小写字母转换为大写字母
# ${tbname,,} 将大写字母转换为小写字母
# 测试
$ while read tbname
> do echo ${tbname^^}
> done < test_full_table.txt
ciss4.ciss_base_areas
ciss4.ciss_base_baseinfo
ciss4.ciss_base_csp
ciss4.ciss_base_customer
ciss4.ciss_base_device

sh -x test_full_import_table.sh

完整全量采集脚本

vim full_import_tables.sh
#!/usr/bin/env bash
# /bin/bash
biz_date=20210101
biz_fmt_date=2021-01-01
dw_parent_dir=/test/full_imp
workhome=/home/admin/sparksql_hive
full_imp_tables=${workhome}/full_import_tables.txt
# mkdir ${workhome}/log

orcl_srv=172.16.100.41
orcl_port=1521
orcl_sid=helowin
orcl_user=tang
orcl_pwd=Tang123

sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"
sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"
# --outdir:Sqoop解析出来的MR的Java程序等输出文件输出到指定目录
# load hadoop/sqoop env
source /etc/profile

while read p; do
    # parallel execution import
    ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} -m 1 &
    cur_time=date "+%F %T"
    echo "${cur_time}: ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p} -m 1 &" >> ${workhome}/log/${biz_fmt_date}_full_imp.log
    sleep 30
done < ${full_imp_tables}

### pythone脚本 D:\git-python\大数据-Spark\py\full_import_tables.py 参考

完整增量采集脚本

vim incr_import_tables.txt
ciss4.ciss_csp_m_bnblp_hx_sqd
ciss4.ciss_csp_m_bnblp_hx_sqd_dtl
ciss4.ciss_csp_m_bnlp_hx_sqd
ciss4.ciss_csp_m_bnlp_hx_sqd_dtl
ciss4.ciss_csp_m_wl_db_sqd
ciss4.ciss_csp_m_wl_db_sqd_dtl
......

vim incr_import_tables.sh
#!/usr/bin/env bash
# 编写SHELL脚本的时候要特别小心,特别是编写SQL的条件,如果中间加了空格,就会导致命令执行失败
# /bin/bash
biz_date=20210101
biz_fmt_date=2021-01-01
dw_parent_dir=/test/incr_imp
workhome=/home/admin/sparksql_hive
incr_imp_tables=${workhome}/incr_import_tables.txt
# mkdir ${workhome}/log

orcl_srv=172.16.100.41
orcl_port=1521
orcl_sid=helowin
orcl_user=tang
orcl_pwd=Tang123

sqoop_condition_params="--where \"'${biz_fmt_date}'=to_char(CREATE_TIME,'yyyy-mm-dd')\""
sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"
sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"

# load hadoop/sqoop env
source /etc/profile

while read p; do
    # clean old directory in HDFS
    hdfs dfs -rm -r ${dw_parent_dir}/${p}/${biz_date}

    # parallel execution import
    ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} ${sqoop_condition_params} -m 1 &
    cur_time=date "+%F %T"
    echo "${cur_time}: ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p} ${sqoop_condition_params} -m 1 &" >> ${workhome}/log/${biz_fmt_date}_incr_imp.log
    sleep 30

done < ${incr_imp_tables}

### pythone脚本 D:\git-python\大数据-Spark\py\incr_import_tables.py 参考

Schema备份及上传

# 将每张表的Schema进行上传到HDFS上,归档并且备份
# Avro文件HDFS存储
hdfs_schema_dir=/test/avsc
hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}
# Avro文件本地打包
biz_date=20210101
local_schema_backup_filename=schema_${biz_date}.tar.gz
tar -czf ${local_schema_backup_filename} ./java_code/*.avsc
hdfs_schema_backup_filename=${hdfs_schema_dir}/avro_schema_${biz_date}.tar.gz
hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}

## 使用脚本实现
vim upload_avro_schema.sh
#!/usr/bin/env bash
# 上传
# /bin/bash
workhome=/home/admin/sparksql_hive
hdfs_schema_dir=/test/avsc
biz_date=20210101
biz_fmt_date=2021-01-01
local_schema_backup_filename=schema_${biz_date}.tar.gz
hdfs_schema_backup_filename=${hdfs_schema_dir}/avro_schema_${biz_date}.tar.gz
log_file=${workhome}/log/upload_avro_schema_${biz_fmt_date}.log

# 打印日志
log() {
    cur_time=date "+%F %T"
    echo "${cur_time} $*" >> ${log_file}
}

source /etc/profile
cd ${workhome}

#  hadoop fs [generic options] [-test -[defsz] ]
# -test -[defsz]  :
#   Answer various questions about , with result via exit status.
#     -d  return 0 if  is a directory.
#     -e  return 0 if  exists.
#     -f  return 0 if  is a file.
#     -s  return 0 if file  is greater than zero bytes in size.
#     -z  return 0 if file  is zero bytes in size, else return 1.

log "Check if the HDFS Avro schema directory ${hdfs_schema_dir}..."
hdfs dfs -test -e ${hdfs_schema_dir} > /dev/null

if [ $? != 0 ]; then
    log "Path: ${hdfs_schema_dir} is not exists. Create a new one."
    log "hdfs dfs -mkdir -p ${hdfs_schema_dir}"
    hdfs dfs -mkdir -p ${hdfs_schema_dir}
fi

log "Check if the file ${hdfs_schema_dir}/CISS4_CISS_BASE_AREAS.avsc has uploaded to the HFDS..."
hdfs dfs -test -e ${hdfs_schema_dir}/CISS4_CISS_BASE_AREAS.avsc > /dev/null
if [ $? != 0 ]; then
    log "Upload all the .avsc schema file."
    log "hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}"
    hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}
fi

# backup
log "Check if the backup tar.gz file has generated in the local server..." 
if [ ! -e ${local_schema_backup_filename} ]; then
    log "package and compress the schema files"
    log "tar -czf ${local_schema_backup_filename} ./java_code/*.avsc"
    tar -czf ${local_schema_backup_filename} ./java_code/*.avsc
fi

log "Check if the backup tar.gz file has upload to the HDFS..."
hdfs dfs -test -e ${hdfs_schema_backup_filename} > /dev/null
if [ $? != 0 ]; then
    log "upload the schema package file to HDFS"
    log "hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}"
    hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}
fi

### pythone脚本 D:\git-python\大数据-Spark\py\upload_avro_schema.py 参考