12. 在线教育仓库示例-意向客户看板

在线教育仓库示例-意向客户看板

建模

  • 指标和维度
    • 指标:意向客户量是单位时间内新增的意向客户量(包含线上线下),以天为单位显示。
    • 维度:
    • 时间维度:年、月、天、小时
    • 数据来源:线上线下
    • 客户属性:新客户、老客户
    • 地区、学科、校区、来源渠道、咨询中心。

事实表和维度表

  • customer_relationship客户意向表,包含了意向客户信息;显然此表就是意向客户指标的基础事实。
  • customer客户静态信息表主要用来关联获取客户的静态信息,比如地区信息。是我们的维度数据。
  • customer_clue客户线索表主要用来判断是新客户还是老客户;也属于要关联的维度信息;但因为此表包含了后续其他指标的事实数据,所以不放在维度DIM层。
  • 类似的,employee员工表、scrm_department部门表、itcast_school学校表、itcast_subject学科表都属于维度信息,所以作为维度表放在维度层。

分桶和分区的区别

  1. 分桶对数据的处理比分区更加细粒度化:分区针对的是数据的存储路径;分桶针对的是数据文件;
  2. 分桶是按照列的哈希函数进行分割的,相对比较平均;而分区是按照列的值来进行分割的,容易造成数据倾斜;
  3. 分桶和分区两者不干扰,可以把分区表进一步分桶。

注意:对于分桶表,不能使用load data的方式进行数据插入操作,因为load data导入的数据不会有分桶结构。

如何避免针对桶表使用load data插入数据的误操作呢?

  • 限制对桶表进行load操作
    • set hive.strict.checks.bucketing = true;
  • 那么对于文本数据如何处理呢?
    • 创建临时表,通过load data将txt文本导入临时表。
    • 使用insert select语句间接的把数据从临时表导入到分桶表。
  • 注意:sqoop不支持分桶表,如果需要从sqoop导入数据到分桶表,可以通过中间临时表进行过度。ODS也可以不做分桶,从DWD明细层开始分桶。

分桶表数据采样

  • 语法:select * from table_name tablesample(bucket x out of y on column_name);
    • column_name表示按照哪个字段进行分桶;
    • hive根据y的大小,决定抽样的比例。y必须是table总bucket数的倍数或者因子。
    • 例如,table总共分了10份bucket,当y=2时,抽取(10/2=)5个bucket的数据,当y=10时,抽取(10/10=)1个bucket的数据。
    • x表示从哪个bucket开始抽取,如果需要取多个分区,以后的分区号为当前分区号加上y。
    • 例如,table总bucket数为6,tablesample(bucket 1 out of 2),表示总共抽取(6/2=)3个bucket的数据,从第1个bucket开始,抽取第1(x)个和第3(x+y)个和第5(x+y)个bucket的数据。

Map Join

  • MapJoin顾名思义,就是在Map阶段进行表之间的连接。而不需要进入到Reduce阶段才进行连接。这样就节省了在Shuffle阶段时要进行的大量数据传输。
  • 开启: set hive.auto.convert.join=true;
  • 表文件的大小作为开启和关闭MapJoin的阈值: set hive.auto.convert.join.noconditionaltask.size=512000000
    • 旧版本为hive.mapjoin.smalltable.filesize
  • MapJoin的使用场景:
    1. 关联操作中有一张表非常小
    2. 不等值的链接操作
    3. 示例:select f.a,f.b from A t join B f on ( f.a=t.a and f.ftime=20110802)
  • Hive v0.7之前,需要使用hint提示 /+ mapjoin(table) /才会执行MapJoin:
    • select \/*+ mapjoin(A)*\/ f.a,f.b from A t join B f on ( f.a=t.a and f.ftime=20110802)

bucket map join的生效条件:

  • set hive.optimize.bucketmapjoin = true;
  • 一个表的bucket数是另一个表bucket数的整数倍;
  • bucket 列在join条件中;
  • 必须是应用在map join的场景中;

大表对大表-join 实现smb join的条件:

  • 使用SMB Join来提高性能。SMB Join基于bucket-mapjoin的有序bucket,可实现在map端完成join操作,可以有效地减少或避免shuffle的数据量。
  • hive.enforce.sorting 设置为 true。开启强制排序时,插数据到表中会进行强制排序,默认false。
  • 插入数据时通过在sql中用distributed c1 sort by c1 或者 cluster by c1
  • 表创建时必须是CLUSTERED且SORTED
  • create table test_smb_2(mid string,age_id string) CLUSTERED BY(mid) SORTED BY(mid) INTO 500 BUCKETS;
  • 涉及到分桶表操作的齐全配置为:
    • 写入数据强制分桶 set hive.enforce.bucketing=true;
    • 写入数据强制排序 set hive.enforce.sorting=true;
    • 开启bucketmapjoin set hive.optimize.bucketmapjoin.sortedmerge = true;
    • 开启SMB Join
    • set hive.auto.convert.sortmerge.join=true;
    • set hive.auto.convert.sortmerge.join.noconditionaltask=true;
  • 一个表的Bucket数要等于另一个表Bucket数;
  • bucket列 == join列 == sort列;
  • 必须应用在bucket map join的场景中;

数据准备

mysql -h 172.16.100.42 -P 4000 -uroot -p'84^j1TZ-+2b9A@S7Hs' nev < 01.employee.sql
mysql -h 172.16.100.42 -P 4000 -uroot -p'84^j1TZ-+2b9A@S7Hs' nev < 02.customer.sql
mysql -h 172.16.100.42 -P 4000 -uroot -p'84^j1TZ-+2b9A@S7Hs' nev < 03.customer_relationship.sql
mysql -h 172.16.100.42 -P 4000 -uroot -p'84^j1TZ-+2b9A@S7Hs' nev < 04.customer_clue.sql
mysql -h 172.16.100.42 -P 4000 -uroot -p'84^j1TZ-+2b9A@S7Hs' nev < itcast_school.sql
mysql -h 172.16.100.42 -P 4000 -uroot -p'84^j1TZ-+2b9A@S7Hs' nev < itcast_subject.sql
mysql -h 172.16.100.42 -P 4000 -uroot -p'84^j1TZ-+2b9A@S7Hs' nev < scrm_department.sql

ODS层:源数据层

写入时压缩生效
- set hive.exec.orc.compression.strategy=COMPRESSION;

拉链表:
- 意向客户看板中,对意向数据有新的需求:将customer_relationship的数据更新涉及到的维度按照最新值重新统计(比如2020年7月份的数据有修改更新,则需要将7月份的统计数据重新计算);同时要有历史快照。
- 此时需要使用缓慢渐变维,推荐采用SCD2拉链表的形式来做,既能满足数据更新的需求,又能满足数据历史快照的需求。需要在start_time字段的基础上,增加新的end_time字段,以标识封链时间。

内外部表:
- ODS层是原始数据,一般不允许修改,所以使用外部表保证数据的安全性,避免误删除;ODS中的customer_relationship客户意向表和customer_clue客户线索表,因为使用拉链表需要覆盖操作,所以没有定义为外部表。

分桶采集:
- sqoop不支持分桶表,如果需要从sqoop导入数据到分桶表,需要通过中间临时表进行过度。也可以ODS不做分桶,从DWD明细层开始分桶。

分桶关联与采样:
- ODS层的customer_relationship客户意向表和customer_clue客户线索表是存在关联关系的,customer_relationship通过 id 关联customer_clue表的 customer_relationship_id ,可以获取新老客户信息。因此我们将这两个字段作为分桶字段。可用于数据采样和MapJoin。

分区:
- 在之前的访问咨询主题看板中,为了便于后续T+1抽取数据时,方便获取昨天的数据,ODS模型要在原始mysql表的基础之上增加start_time字段,并且可以使用start_time字段做分区以提升查询的性能。
-- customer_relationship客户意向表
DROP TABLE itcast_ods.`customer_relationship`;
CREATE TABLE IF NOT EXISTS itcast_ods.`customer_relationship` (
  `id` int COMMENT '客户关系id',
  `create_date_time` STRING COMMENT '创建时间',
  `update_date_time` STRING COMMENT '最后更新时间',
  `deleted` int COMMENT '是否被删除(禁用)',
  `customer_id` int COMMENT '所属客户id',
  `first_id` int COMMENT '第一条客户关系id',
  `belonger` int COMMENT '归属人',
  `belonger_name` STRING COMMENT '归属人姓名',
  `initial_belonger` int COMMENT '初始归属人',
  `distribution_handler` int COMMENT '分配处理人',
  `business_scrm_department_id` int COMMENT '归属部门',
  `last_visit_time` STRING COMMENT '最后回访时间',
  `next_visit_time` STRING COMMENT '下次回访时间',
  `origin_type` STRING COMMENT '数据来源',
  `itcast_school_id` int COMMENT '校区Id',
  `itcast_subject_id` int COMMENT '学科Id',
  `intention_study_type` STRING COMMENT '意向学习方式',
  `anticipat_signup_date` STRING COMMENT '预计报名时间',
  `level` STRING COMMENT '客户级别',
  `creator` int COMMENT '创建人',
  `current_creator` int COMMENT '当前创建人:初始==创建人,当在公海拉回时为 拉回人',
  `creator_name` STRING COMMENT '创建者姓名',
  `origin_channel` STRING COMMENT '来源渠道',
  `comment` STRING COMMENT '备注',
  `first_customer_clue_id` int COMMENT '第一条线索id',
  `last_customer_clue_id` int COMMENT '最后一条线索id',
  `process_state` STRING COMMENT '处理状态',
  `process_time` STRING COMMENT '处理状态变动时间',
  `payment_state` STRING COMMENT '支付状态',
  `payment_time` STRING COMMENT '支付状态变动时间',
  `signup_state` STRING COMMENT '报名状态',
  `signup_time` STRING COMMENT '报名时间',
  `notice_state` STRING COMMENT '通知状态',
  `notice_time` STRING COMMENT '通知状态变动时间',
  `lock_state` STRING COMMENT '锁定状态',
  `lock_time` STRING COMMENT '锁定状态修改时间',
  `itcast_clazz_id` int COMMENT '所属ems班级id',
  `itcast_clazz_time` STRING COMMENT '报班时间',
  `payment_url` STRING COMMENT '付款链接',
  `payment_url_time` STRING COMMENT '支付链接生成时间',
  `ems_student_id` int COMMENT 'ems的学生id',
  `delete_reason` STRING COMMENT '删除原因',
  `deleter` int COMMENT '删除人',
  `deleter_name` STRING COMMENT '删除人姓名',
  `delete_time` STRING COMMENT '删除时间',
  `course_id` int COMMENT '课程ID',
  `course_name` STRING COMMENT '课程名称',
  `delete_comment` STRING COMMENT '删除原因说明',
  `close_state` STRING COMMENT '关闭装填',
  `close_time` STRING COMMENT '关闭状态变动时间',
  `appeal_id` int COMMENT '申诉id',
  `tenant` int COMMENT '租户',
  `total_fee` DECIMAL COMMENT '报名费总金额',
  `belonged` int COMMENT '小周期归属人',
  `belonged_time` STRING COMMENT '归属时间',
  `belonger_time` STRING COMMENT '归属时间',
  `transfer` int COMMENT '转移人',
  `transfer_time` STRING COMMENT '转移时间',
  `follow_type` int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  `transfer_bxg_oa_account` STRING COMMENT '转移到博学谷归属人OA账号',
  `transfer_bxg_belonger_name` STRING COMMENT '转移到博学谷归属人OA姓名',
  `end_time` STRING COMMENT '有效截止时间')
comment '客户关系表'
PARTITIONED BY(start_time STRING)
clustered by(id) sorted by(id) into 10 buckets
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');

-- customer_clue客户线索表
-- 使用start_time字段分区以提升条件查询性能。customer_clue是后面有效线索主题看板的事实表,需求也要求将数据更新涉及到的维度按照最新值重新统计、要有历史快照。
-- 采用拉链表(SCD2)的形式来做,增加新的end_time字段,以标识封链时间。
DROP TABLE itcast_ods.customer_clue;
CREATE TABLE IF NOT EXISTS itcast_ods.customer_clue (
  id int COMMENT 'customer_clue_id',
  create_date_time STRING COMMENT '创建时间',
  update_date_time STRING COMMENT '最后更新时间',
  deleted STRING COMMENT '是否被删除(禁用)',
  customer_id int COMMENT '客户id',
  customer_relationship_id int COMMENT '客户关系id',
  session_id STRING COMMENT '七陌会话id',
  sid STRING COMMENT '访客id',
  status STRING COMMENT '状态(undeal待领取 deal 已领取 finish 已关闭 changePeer 已流转)',
  users STRING COMMENT '所属坐席',
  create_time STRING COMMENT '七陌创建时间',
  platform STRING COMMENT '平台来源 (pc-网站咨询|wap-wap咨询|sdk-app咨询|weixin-微信咨询)',
  s_name STRING COMMENT '用户名称',
  seo_source STRING COMMENT '搜索来源',
  seo_keywords STRING COMMENT '关键字',
  ip STRING COMMENT 'IP地址',
  referrer STRING COMMENT '上级来源页面',
  from_url STRING COMMENT '会话来源页面',
  landing_page_url STRING COMMENT '访客着陆页面',
  url_title STRING COMMENT '咨询页面title',
  to_peer STRING COMMENT '所属技能组',
  manual_time STRING COMMENT '人工开始时间',
  begin_time STRING COMMENT '坐席领取时间 ',
  reply_msg_count int COMMENT '客服回复消息数',
  total_msg_count int COMMENT '消息总数',
  msg_count int COMMENT '客户发送消息数',
  comment STRING COMMENT '备注',
  finish_reason STRING COMMENT '结束类型',
  finish_user STRING COMMENT '结束坐席',
  end_time STRING COMMENT '会话结束时间',
  platform_description STRING COMMENT '客户平台信息',
  browser_name STRING COMMENT '浏览器名称',
  os_info STRING COMMENT '系统名称',
  area STRING COMMENT '区域',
  country STRING COMMENT '所在国家',
  province STRING COMMENT '省',
  city STRING COMMENT '城市',
  creator int COMMENT '创建人',
  name STRING COMMENT '客户姓名',
  idcard STRING COMMENT '身份证号',
  phone STRING COMMENT '手机号',
  itcast_school_id int COMMENT '校区Id',
  itcast_school STRING COMMENT '校区',
  itcast_subject_id int COMMENT '学科Id',
  itcast_subject STRING COMMENT '学科',
  wechat STRING COMMENT '微信',
  qq STRING COMMENT 'qq号',
  email STRING COMMENT '邮箱',
  gender STRING COMMENT '性别',
  level STRING COMMENT '客户级别',
  origin_type STRING COMMENT '数据来源渠道',
  information_way STRING COMMENT '资讯方式',
  working_years STRING COMMENT '开始工作时间',
  technical_directions STRING COMMENT '技术方向',
  customer_state STRING COMMENT '当前客户状态',
  valid STRING COMMENT '该线索是否是网资有效线索',
  anticipat_signup_date STRING COMMENT '预计报名时间',
  clue_state STRING COMMENT '线索状态',
  scrm_department_id int COMMENT 'SCRM内部部门id',
  superior_url STRING COMMENT '诸葛获取上级页面URL',
  superior_source STRING COMMENT '诸葛获取上级页面URL标题',
  landing_url STRING COMMENT '诸葛获取着陆页面URL',
  landing_source STRING COMMENT '诸葛获取着陆页面URL来源',
  info_url STRING COMMENT '诸葛获取留咨页URL',
  info_source STRING COMMENT '诸葛获取留咨页URL标题',
  origin_channel STRING COMMENT '投放渠道',
  course_id int COMMENT '课程编号',
  course_name STRING COMMENT '课程名称',
  zhuge_session_id STRING COMMENT 'zhuge会话id',
  is_repeat int COMMENT '是否重复线索(手机号维度) 0:正常 1:重复',
  tenant int COMMENT '租户id',
  activity_id STRING COMMENT '活动id',
  activity_name STRING COMMENT '活动名称',
  follow_type int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  shunt_mode_id int COMMENT '匹配到的技能组id',
  shunt_employee_group_id int COMMENT '所属分流员工组',
  ends_time STRING COMMENT '有效时间')
comment '客户关系表'
PARTITIONED BY(starts_time STRING)
clustered by(customer_relationship_id) sorted by(customer_relationship_id) into 10 buckets
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');

DIM层(Dimension):维度层

-- 为了保证数据安全,采用外部表。
CREATE DATABASE IF NOT EXISTS itcast_dimen;
-- Customer客户静态信息表
CREATE TABLE IF NOT EXISTS itcast_dimen.`customer` (
  `id` int COMMENT 'key id',
  `customer_relationship_id` int COMMENT '当前意向id',
  `create_date_time` STRING COMMENT '创建时间',
  `update_date_time` STRING COMMENT '最后更新时间',
  `deleted` int  COMMENT '是否被删除(禁用)',
  `name` STRING COMMENT '姓名',
  `idcard` STRING  COMMENT '身份证号',
  `birth_year` int COMMENT '出生年份',
  `gender` STRING COMMENT '性别',
  `phone` STRING COMMENT '手机号',
  `wechat` STRING COMMENT '微信',
  `qq` STRING COMMENT 'qq号',
  `email` STRING COMMENT '邮箱',
  `area` STRING COMMENT '所在区域',
  `leave_school_date` date COMMENT '离校时间',
  `graduation_date` date COMMENT '毕业时间',
  `bxg_student_id` STRING COMMENT '博学谷学员ID,可能未关联到,不存在',
  `creator` int COMMENT '创建人ID',
  `origin_type` STRING COMMENT '数据来源',
  `origin_channel` STRING COMMENT '来源渠道',
  `tenant` int,
  `md_id` int COMMENT '中台id')
comment '客户表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='SNAPPY');

-- employee员工表
CREATE TABLE IF NOT EXISTS itcast_dimen.employee (
  id int COMMENT '员工id',
  email STRING COMMENT '公司邮箱,OA登录账号',
  real_name STRING COMMENT '员工的真实姓名',
  phone STRING COMMENT '手机号,目前还没有使用;隐私问题OA接口没有提供这个属性,',
  department_id STRING COMMENT 'OA中的部门编号,有负值',
  department_name STRING COMMENT 'OA中的部门名',
  remote_login STRING COMMENT '员工是否可以远程登录',
  job_number STRING COMMENT '员工工号',
  cross_school STRING COMMENT '是否有跨校区权限',
  last_login_date STRING COMMENT '最后登录日期',
  creator int COMMENT '创建人',
  create_date_time STRING COMMENT '创建时间',
  update_date_time STRING COMMENT '最后更新时间',
  deleted STRING COMMENT '是否被删除(禁用)',
  scrm_department_id int COMMENT 'SCRM内部部门id',
  leave_office STRING COMMENT '离职状态',
  leave_office_time STRING COMMENT '离职时间',
  reinstated_time STRING COMMENT '复职时间',
  superior_leaders_id int COMMENT '上级领导ID',
  tdepart_id int COMMENT '直属部门',
  tenant int COMMENT '租户',
  ems_user_name STRING COMMENT 'ems用户名称'
)
comment '员工表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='SNAPPY');

-- scrm_department部门表
CREATE TABLE IF NOT EXISTS itcast_dimen.`scrm_department` (
  `id` int COMMENT '部门id',
  `name` STRING COMMENT '部门名称',
  `parent_id` int COMMENT '父部门id',
  `create_date_time` STRING COMMENT '创建时间',
  `update_date_time` STRING COMMENT '更新时间',
  `deleted` STRING COMMENT '删除标志',
  `id_path` STRING COMMENT '编码全路径',
  `tdepart_code` int COMMENT '直属部门',
  `creator` STRING COMMENT '创建者',
  `depart_level` int COMMENT '部门层级',
  `depart_sign` int COMMENT '部门标志,暂时默认1',
  `depart_line` int COMMENT '业务线,存储业务线编码',
  `depart_sort` int COMMENT '排序字段',
  `disable_flag` int COMMENT '禁用标志',
  `tenant` int COMMENT '租户')
comment 'scrm部门表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='SNAPPY');

-- itcast_school学校表
CREATE TABLE IF NOT EXISTS itcast_dimen.`itcast_school` (
  `id` int COMMENT '自增主键',
  `create_date_time` timestamp COMMENT '创建时间',
  `update_date_time` timestamp  COMMENT '最后更新时间',
  `deleted` STRING COMMENT '是否被删除(禁用)',
  `name` STRING COMMENT '校区名称',
  `code` STRING COMMENT '校区标识',
  `tenant` int COMMENT '租户')
comment '校区字典表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='SNAPPY');

-- itcast_subject学科表
CREATE TABLE IF NOT EXISTS itcast_dimen.`itcast_subject` (
  `id` int COMMENT '自增主键',
  `create_date_time` timestamp COMMENT '创建时间',
  `update_date_time` timestamp COMMENT '最后更新时间',
  `deleted` STRING COMMENT '是否被删除(禁用)',
  `name` STRING COMMENT '学科名称',
  `code` STRING COMMENT '学科编码',
  `tenant` int COMMENT '租户')
comment '学科字典表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='SNAPPY');

DWD层

ODS事实数据customer_relationship清洗转换后存入DWD明细层。

DW和APP层是统计数据,为了使覆盖插入等操作更方便,满足业务需求的同时,提高开发和测试效率,推荐使用内部表。
drop table itcast_dwd.`itcast_intention_dwd`;
CREATE TABLE IF NOT EXISTS itcast_dwd.`itcast_intention_dwd` (
  `rid` int COMMENT 'id',
  `customer_id` STRING COMMENT '客户id',
  `create_date_time` STRING COMMENT '创建时间',
  `itcast_school_id` STRING COMMENT '校区id',
  `deleted` STRING COMMENT '是否被删除',
  `origin_type` STRING COMMENT '来源渠道',
  `itcast_subject_id` STRING COMMENT '学科id',
  `creator` int COMMENT '创建人',
  `hourinfo` STRING COMMENT '小时信息',
  `origin_type_stat` STRING COMMENT '数据来源:0.线下;1.线上'
)
comment '客户意向dwd表'
PARTITIONED BY(yearinfo STRING,monthinfo STRING,dayinfo STRING)
clustered by(rid) sorted by(rid) into 10 buckets
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t'
stored as ORC
TBLPROPERTIES ('orc.compress'='SNAPPY');

DWM层

关联所有维表,并对获取的字段进行转换,便于统计时直接使用。
create database itcast_dwm;
drop table itcast_dwm.`itcast_intention_dwm`;
CREATE TABLE IF NOT EXISTS itcast_dwm.`itcast_intention_dwm` (
  `customer_id` STRING COMMENT 'id信息',
  `create_date_time` STRING COMMENT '创建时间',
  `area` STRING COMMENT '区域信息',
  `itcast_school_id` STRING COMMENT '校区id',
  `itcast_school_name` STRING COMMENT '校区名称',
  `deleted` STRING COMMENT '是否被删除',
  `origin_type` STRING COMMENT '来源渠道',
  `itcast_subject_id` STRING COMMENT '学科id',
  `itcast_subject_name` STRING COMMENT '学科名称',
  `hourinfo` STRING COMMENT '小时信息',
  `origin_type_stat` STRING COMMENT '数据来源:0.线下;1.线上',
  `clue_state_stat` STRING COMMENT '新老客户:0.老客户;1.新客户',
  `tdepart_id` STRING COMMENT '创建者部门id',
  `tdepart_name` STRING COMMENT '咨询中心名称'
)
comment '客户意向dwm表'
PARTITIONED BY(yearinfo STRING,monthinfo STRING,dayinfo STRING)
clustered by(customer_id) sorted by(customer_id) into 10 buckets
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as ORC
TBLPROPERTIES ('orc.compress'='SNAPPY');

DWS层

在DWM层的基础上,按照业务的要求进行统计分析;有三个常驻维度,分别增加对应的属性标识:
- 时间维度:1.年、2.月、3.天、4.小时
- 数据来源:0.线下;1.线上
- 客户属性:0.老客户、1.新客户
- 产品属性维度:1.总意向量;2.区域信息;3.校区、学科组合分组;4.来源渠道;5.贡献中心;

drop Table itcast_dws.itcast_intention_dws;
CREATE TABLE IF NOT EXISTS itcast_dws.itcast_intention_dws (
   `customer_total` INT COMMENT '聚合意向客户数',
   `area` STRING COMMENT '区域信息',
   `itcast_school_id` STRING COMMENT '校区id',
   `itcast_school_name` STRING COMMENT '校区名称',
   `origin_type` STRING COMMENT '来源渠道',
   `itcast_subject_id` STRING COMMENT '学科id',
   `itcast_subject_name` STRING COMMENT '学科名称',
   `hourinfo` STRING COMMENT '小时信息',
   `origin_type_stat` STRING COMMENT '数据来源:0.线下;1.线上',
   `clue_state_stat` STRING COMMENT '客户属性:0.老客户;1.新客户',
   `tdepart_id` STRING COMMENT '创建者部门id',
   `tdepart_name` STRING COMMENT '咨询中心名称',
   `time_str` STRING COMMENT '时间明细',
   `groupType` STRING COMMENT '产品属性类别:1.总意向量;2.区域信息;3.校区、学科组合分组;4.来源渠道;5.咨询中心;',
   `time_type` STRING COMMENT '时间维度:1、按小时聚合;2、按天聚合;3、按周聚合;4、按月聚合;5、按年聚合;'
)
comment '客户意向dws表'
PARTITIONED BY(yearinfo STRING,monthinfo STRING,dayinfo STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='SNAPPY');

数据采集-DIM层

# Customer客户表
sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/nev \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query 'select id, customer_relationship_id, create_date_time, update_date_time, deleted, name, idcard, birth_year, gender, phone,  wechat, qq, email, area, leave_school_date, graduation_date, bxg_student_id, creator, origin_type, origin_channel, tenant, md_id, FROM_UNIXTIME(unix_timestamp(),"%Y-%m-%d")as start_time from customer where $CONDITIONS' \
--hcatalog-database itcast_dimen \
--hcatalog-table customer \
-m 2 \
--split-by id

# employee员工表
sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/nev \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query 'select id,email,real_name,-1 as phone,department_id,department_name,remote_login,job_number,cross_school,last_login_date,creator,create_date_time,update_date_time,deleted,scrm_department_id,leave_office,leave_office_time,reinstated_time,superior_leaders_id,tdepart_id,tenant,ems_user_name,FROM_UNIXTIME(unix_timestamp(),"%Y-%m-%d")as start_time from employee where $CONDITIONS' \
--hcatalog-database itcast_dimen \
--hcatalog-table employee \
-m 2 \
--split-by id

# scrm_department部门表
sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/nev \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query 'select *, FROM_UNIXTIME(unix_timestamp(),"%Y-%m-%d")as start_time from scrm_department where $CONDITIONS' \
--hcatalog-database itcast_dimen \
--hcatalog-table scrm_department \
-m 2 \
--split-by id

# itcast_school学校表
sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/nev \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query 'select *, FROM_UNIXTIME(unix_timestamp(),"%Y-%m-%d")as start_time from itcast_school where $CONDITIONS' \
--hcatalog-database itcast_dimen \
--hcatalog-table itcast_school \
-m 2 \
--split-by id

# itcast_subject学科表
sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/nev \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query 'select *, FROM_UNIXTIME(unix_timestamp(),"%Y-%m-%d")as start_time from itcast_subject where $CONDITIONS' \
--hcatalog-database itcast_dimen \
--hcatalog-table itcast_subject \
-m 2 \
--split-by id

数据采集-ODS层

Sqoop不支持分桶表,需要通过临时表的方式实现。

sqoop导入分板表会报错:Import failed: org.apache.hive.hcatalog.common.HCatException : 2016 : Error operation not supported : Store into a partition with bucket definition from Pig/Mapreduce is not supported
# customer_relationship_tmp意向表临时表
# 重建ods临时表,注意不要有分桶
DROP TABLE itcast_ods.`customer_relationship_tmp`;
CREATE TABLE IF NOT EXISTS itcast_ods.`customer_relationship_tmp` (
  `id` int COMMENT '客户关系id',
  `create_date_time` STRING COMMENT '创建时间',
  `update_date_time` STRING COMMENT '最后更新时间',
  `deleted` int COMMENT '是否被删除(禁用)',
  `customer_id` int COMMENT '所属客户id',
  `first_id` int COMMENT '第一条客户关系id',
  `belonger` int COMMENT '归属人',
  `belonger_name` STRING COMMENT '归属人姓名',
  `initial_belonger` int COMMENT '初始归属人',
  `distribution_handler` int COMMENT '分配处理人',
  `business_scrm_department_id` int COMMENT '归属部门',
  `last_visit_time` STRING COMMENT '最后回访时间',
  `next_visit_time` STRING COMMENT '下次回访时间',
  `origin_type` STRING COMMENT '数据来源',
  `itcast_school_id` int COMMENT '校区Id',
  `itcast_subject_id` int COMMENT '学科Id',
  `intention_study_type` STRING COMMENT '意向学习方式',
  `anticipat_signup_date` STRING COMMENT '预计报名时间',
  `level` STRING COMMENT '客户级别',
  `creator` int COMMENT '创建人',
  `current_creator` int COMMENT '当前创建人:初始==创建人,当在公海拉回时为 拉回人',
  `creator_name` STRING COMMENT '创建者姓名',
  `origin_channel` STRING COMMENT '来源渠道',
  `comment` STRING COMMENT '备注',
  `first_customer_clue_id` int COMMENT '第一条线索id',
  `last_customer_clue_id` int COMMENT '最后一条线索id',
  `process_state` STRING COMMENT '处理状态',
  `process_time` STRING COMMENT '处理状态变动时间',
  `payment_state` STRING COMMENT '支付状态',
  `payment_time` STRING COMMENT '支付状态变动时间',
  `signup_state` STRING COMMENT '报名状态',
  `signup_time` STRING COMMENT '报名时间',
  `notice_state` STRING COMMENT '通知状态',
  `notice_time` STRING COMMENT '通知状态变动时间',
  `lock_state` STRING COMMENT '锁定状态',
  `lock_time` STRING COMMENT '锁定状态修改时间',
  `itcast_clazz_id` int COMMENT '所属ems班级id',
  `itcast_clazz_time` STRING COMMENT '报班时间',
  `payment_url` STRING COMMENT '付款链接',
  `payment_url_time` STRING COMMENT '支付链接生成时间',
  `ems_student_id` int COMMENT 'ems的学生id',
  `delete_reason` STRING COMMENT '删除原因',
  `deleter` int COMMENT '删除人',
  `deleter_name` STRING COMMENT '删除人姓名',
  `delete_time` STRING COMMENT '删除时间',
  `course_id` int COMMENT '课程ID',
  `course_name` STRING COMMENT '课程名称',
  `delete_comment` STRING COMMENT '删除原因说明',
  `close_state` STRING COMMENT '关闭装填',
  `close_time` STRING COMMENT '关闭状态变动时间',
  `appeal_id` int COMMENT '申诉id',
  `tenant` int COMMENT '租户',
  `total_fee` DECIMAL COMMENT '报名费总金额',
  `belonged` int COMMENT '小周期归属人',
  `belonged_time` STRING COMMENT '归属时间',
  `belonger_time` STRING COMMENT '归属时间',
  `transfer` int COMMENT '转移人',
  `transfer_time` STRING COMMENT '转移时间',
  `follow_type` int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  `transfer_bxg_oa_account` STRING COMMENT '转移到博学谷归属人OA账号',
  `transfer_bxg_belonger_name` STRING COMMENT '转移到博学谷归属人OA姓名',
  `end_time` STRING COMMENT '有效截止时间')
comment '客户关系表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');

# Customer_clue_tmp 线索表临时表
DROP TABLE itcast_ods.customer_clue_tmp;
CREATE TABLE IF NOT EXISTS itcast_ods.customer_clue_tmp (
  id int COMMENT 'customer_clue_id',
  create_date_time STRING COMMENT '创建时间',
  update_date_time STRING COMMENT '最后更新时间',
  deleted STRING COMMENT '是否被删除(禁用)',
  customer_id int COMMENT '客户id',
  customer_relationship_id int COMMENT '客户关系id',
  session_id STRING COMMENT '七陌会话id',
  sid STRING COMMENT '访客id',
  status STRING COMMENT '状态(undeal待领取 deal 已领取 finish 已关闭 changePeer 已流转)',
  users STRING COMMENT '所属坐席',
  create_time STRING COMMENT '七陌创建时间',
  platform STRING COMMENT '平台来源 (pc-网站咨询|wap-wap咨询|sdk-app咨询|weixin-微信咨询)',
  s_name STRING COMMENT '用户名称',
  seo_source STRING COMMENT '搜索来源',
  seo_keywords STRING COMMENT '关键字',
  ip STRING COMMENT 'IP地址',
  referrer STRING COMMENT '上级来源页面',
  from_url STRING COMMENT '会话来源页面',
  landing_page_url STRING COMMENT '访客着陆页面',
  url_title STRING COMMENT '咨询页面title',
  to_peer STRING COMMENT '所属技能组',
  manual_time STRING COMMENT '人工开始时间',
  begin_time STRING COMMENT '坐席领取时间 ',
  reply_msg_count int COMMENT '客服回复消息数',
  total_msg_count int COMMENT '消息总数',
  msg_count int COMMENT '客户发送消息数',
  comment STRING COMMENT '备注',
  finish_reason STRING COMMENT '结束类型',
  finish_user STRING COMMENT '结束坐席',
  end_time STRING COMMENT '会话结束时间',
  platform_description STRING COMMENT '客户平台信息',
  browser_name STRING COMMENT '浏览器名称',
  os_info STRING COMMENT '系统名称',
  area STRING COMMENT '区域',
  country STRING COMMENT '所在国家',
  province STRING COMMENT '省',
  city STRING COMMENT '城市',
  creator int COMMENT '创建人',
  name STRING COMMENT '客户姓名',
  idcard STRING COMMENT '身份证号',
  phone STRING COMMENT '手机号',
  itcast_school_id int COMMENT '校区Id',
  itcast_school STRING COMMENT '校区',
  itcast_subject_id int COMMENT '学科Id',
  itcast_subject STRING COMMENT '学科',
  wechat STRING COMMENT '微信',
  qq STRING COMMENT 'qq号',
  email STRING COMMENT '邮箱',
  gender STRING COMMENT '性别',
  level STRING COMMENT '客户级别',
  origin_type STRING COMMENT '数据来源渠道',
  information_way STRING COMMENT '资讯方式',
  working_years STRING COMMENT '开始工作时间',
  technical_directions STRING COMMENT '技术方向',
  customer_state STRING COMMENT '当前客户状态',
  valid STRING COMMENT '该线索是否是网资有效线索',
  anticipat_signup_date STRING COMMENT '预计报名时间',
  clue_state STRING COMMENT '线索状态',
  scrm_department_id int COMMENT 'SCRM内部部门id',
  superior_url STRING COMMENT '诸葛获取上级页面URL',
  superior_source STRING COMMENT '诸葛获取上级页面URL标题',
  landing_url STRING COMMENT '诸葛获取着陆页面URL',
  landing_source STRING COMMENT '诸葛获取着陆页面URL来源',
  info_url STRING COMMENT '诸葛获取留咨页URL',
  info_source STRING COMMENT '诸葛获取留咨页URL标题',
  origin_channel STRING COMMENT '投放渠道',
  course_id int COMMENT '课程编号',
  course_name STRING COMMENT '课程名称',
  zhuge_session_id STRING COMMENT 'zhuge会话id',
  is_repeat int COMMENT '是否重复线索(手机号维度) 0:正常 1:重复',
  tenant int COMMENT '租户id',
  activity_id STRING COMMENT '活动id',
  activity_name STRING COMMENT '活动名称',
  follow_type int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  shunt_mode_id int COMMENT '匹配到的技能组id',
  shunt_employee_group_id int COMMENT '所属分流员工组',
  ends_time STRING COMMENT '有效时间')
comment '客户关系表'
PARTITIONED BY(starts_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');

# 抽取数据到 customer_relationship_tmp意向表临时表
sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/nev \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query 'select id, create_date_time, update_date_time, deleted, customer_id, first_id, belonger, belonger_name, initial_belonger, distribution_handler, business_scrm_department_id, last_visit_time, next_visit_time, origin_type, itcast_school_id, itcast_subject_id, intention_study_type, anticipat_signup_date, level, creator, current_creator, creator_name, origin_channel, comment, first_customer_clue_id, last_customer_clue_id, process_state, process_time, payment_state, payment_time, signup_state, signup_time, notice_state, notice_time, lock_state, lock_time, itcast_clazz_id, itcast_clazz_time, payment_url, payment_url_time, ems_student_id, delete_reason, deleter, deleter_name, delete_time, course_id, course_name, delete_comment, close_state, close_time, appeal_id, tenant, total_fee, belonged, belonged_time, belonger_time, transfer, transfer_time, follow_type, transfer_bxg_oa_account, transfer_bxg_belonger_name, FROM_UNIXTIME(unix_timestamp(),"%Y-%m-%d")as start_time,date_format("9999-12-31","%Y-%m-%d") as end_time from customer_relationship where $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table customer_relationship_tmp \
-m 2 \
--split-by id

# 抽取数据到 Customer_clue_tmp 线索表临时表
sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/nev \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query 'select id,create_date_time,update_date_time,deleted,customer_id,customer_relationship_id,session_id,sid,status,user as users,create_time,platform,s_name,seo_source,seo_keywords,ip,referrer,from_url,landing_page_url,url_title,to_peer,manual_time,begin_time,reply_msg_count,total_msg_count,msg_count,comment,finish_reason,finish_user,end_time,platform_description,browser_name,os_info,area,country,province,city,creator,name,"-1" as idcard,"-1" as phone,itcast_school_id,itcast_school,itcast_subject_id,itcast_subject,"-1" as wechat,"-1" as qq,"-1" as email,gender,level,origin_type,information_way,working_years,technical_directions,customer_state,valid,anticipat_signup_date,clue_state,scrm_department_id,superior_url,superior_source,landing_url,landing_source,info_url,info_source,origin_channel,course_id,course_name,zhuge_session_id,is_repeat,tenant,activity_id,activity_name,follow_type,shunt_mode_id,shunt_employee_group_id,FROM_UNIXTIME(unix_timestamp(),"%Y-%m-%d")as starts_time,date_format("9999-12-31","%Y-%m-%d") as ends_time from customer_clue where $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table customer_clue_tmp \
-m 2 \
--split-by id

# 将数据覆盖插入到ODS  customer_relationship意向表和 Customer_clue线索表
beeline -u jdbc:hive2://kube-40:10000 -n admin
--分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
--分桶
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
insert overwrite table itcast_ods.customer_relationship partition(start_time) select * from itcast_ods.customer_relationship_tmp;
insert overwrite table itcast_ods.customer_clue partition(starts_time) select * from itcast_ods.customer_clue_tmp;

数据清洗转换

Hive执行计划:
- Hive执行计划: 执行计划可以告诉我们查询过程的关键信息,用来帮助我们判定优化措施是否已经生效。
- 基础语法: 在正常HiveQL前面加上EXPLAIN, EXPLAIN [EXTENDED] query

执行计划分为两部分:
- stage依赖(STAGE DEPENDENCIES)
  - 这部分展示本次查询分为两个stage:Stage-1,Stage-0.
  - 一般Stage-0是最终给查询用户展示数据用的,如LIMITE操作就会在这部分。
  - Stage-1是mr程序的执行阶段。
- stage详细执行计划(STAGE PLANS)
  - 包含了整个查询所有Stage的大部分处理过程。
  - 特定优化是否生效,主要通过此部分内容查看。

名次解释:
- TableScan:查看表
- alias: emp:所需要的表
- Statistics: Num rows: 2 Data size: 820 Basic stats: COMPLETE Column stats: NONE:这张表的基本统计信息:行数、大小等;
- expressions: empno (type: int), ename (type: string), job (type: string), mgr (type: int), hiredate (type: string), sal (type: double), comm (type: double), deptno (type: int):表中需要输出的字段及类型
- outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7:输出的的字段编号
- compressed: true:输出是否压缩;
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat:文件输入调用的Java类,显示以文本Text格式输入;
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat:文件输出调用的java类,显示以文本Text格式输出;

在DWD层对customer_relationship意向客户事实表做清洗转换:
- 清洗掉已删除的数据;
- 判断学校id和学科id,空值统一转换为-1;
- 将origin_type来源渠道字段转换为线上/线下,如果origin_type是NETSERVICE和PRESIGNUP类型,即为1线上,否则为0线下。

beeline -u jdbc:hive2://kube-40:10000 -n admin
--分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
--分桶
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
insert into table itcast_dwd.itcast_intention_dwd partition (yearinfo,monthinfo,dayinfo)
select
    rs.id as rid,
    rs.customer_id,
    rs.create_date_time,
    if((rs.itcast_school_id is null) or (rs.itcast_school_id = 0), -1, rs.itcast_school_id) as itcast_school_id,
    rs.deleted,
    rs.origin_type,
    if((rs.itcast_subject_id is null) or (rs.itcast_subject_id = 0), -1, rs.itcast_subject_id) as itcast_subject_id,
    rs.creator,
    substr(rs.create_date_time, 12, 2) hourinfo,
    if(rs.origin_type='NETSERVICE', '1', if(rs.origin_type='PRESIGNUP', '1', '0')) as origin_type_stat,
    substr(rs.create_date_time, 1, 4) yearinfo,
    substr(rs.create_date_time, 6, 2) monthinfo,
    substr(rs.create_date_time, 9, 2) dayinfo
from itcast_ods.customer_relationship rs
where rs.deleted = 0;
生成DWM层数据
由于DWM层的字段是来源于事实表和所有维度表中的字段, 此时如果生成DWM层数据, 必须要先将所有的表关联在一起

所有表的表与表之间的关联条件 
  - 客户意向表.creator = 员工表.id
  - 员工表.tdepart_id = 部门表.id
  - 线索表.customer_relationship_id = 客户意向表.id
  - 客户意向表.itcast_school_id = 校区表.id
  - 客户意向表.itcast_subject_id = 学科表.id
  - 客户意向表.customer_id = 客户表.id
--分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
--分桶
-- 开启分桶支持, 默认就是true
set hive.enforce.bucketing=true;
-- 开启强制排序
set hive.enforce.sorting=true;

-- 优化:  当前这种优化方案, 需要有非常足够的内存资源才可以运行, 如果没有, yarn会安排这些依次执行,导致执行效率更差
-- map join
set hive.auto.convert.join=true;
-- 开启 bucket map join
set hive.optimize.bucketmapjoin = true;
-- 开启SMB map join
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
-- 写入数据强制排序
set hive.enforce.sorting=true;
-- 开启自动尝试SMB连接
set hive.optimize.bucketmapjoin.sortedmerge = true;

-- 内存不充足优化:关闭掉所有的优化来执行
set hive.auto.convert.join=false;
set hive.optimize.bucketmapjoin = false; 
set hive.auto.convert.sortmerge.join=false;
set hive.auto.convert.sortmerge.join.noconditionaltask=false;
set hive.enforce.sorting=false;
set hive.optimize.bucketmapjoin.sortedmerge = false;

insert into table itcast_dwm.itcast_intention_dwm partition(yearinfo,monthinfo,dayinfo)
select  
    iid.customer_id,
    iid.create_date_time,
    c.area,
    iid.itcast_school_id,
    sch.name as itcast_school_name,
    iid.deleted,
    iid.origin_type,
    iid.itcast_subject_id,
    sub.name as itcast_subject_name,
    iid.hourinfo,
    iid.origin_type_stat,
    -- if(cc.clue_state = 'VALID_NEW_CLUES',1,if(cc.clue_state = 'VALID_PUBLIC_NEW_CLUE','0','-1')) as clue_state_stat, -- 此处有转换
    case cc.clue_state
        when 'VALID_NEW_CLUES' then '1'
        when 'VALID_PUBLIC_NEW_CLUE' then '0'
        else '-1' 
    end as clue_state_stat,
    emp.tdepart_id,
    dept.name as tdepart_name,
    iid.yearinfo,
    iid.monthinfo,
    iid.dayinfo
from itcast_dwd.itcast_intention_dwd  iid
    left join itcast_ods.customer_clue cc on cc.customer_relationship_id = iid.rid
    left join itcast_dimen.customer c on  iid.customer_id = c.id
    left join itcast_dimen.itcast_subject sub on  iid.itcast_subject_id = sub.id
    left join itcast_dimen.itcast_school sch  on iid.itcast_school_id = sch.id
    left join itcast_dimen.employee emp on iid.creator = emp.id
    left join itcast_dimen.scrm_department dept on emp.tdepart_id = dept.id;

统计分析操作

目的: 生产DWS层的数据, 数据来源于DWM

指标: 意向量

维度: 
- 固有维度:  时间(年 月 日 小时), 线上线下, 新老维度
- 产品属性维度: 地区, 学科维度, 校区维度, 咨询中心维度, 来源渠道,总意向量

-- 统计总意向量
-- 统计每年 线上线下 新老用户的总意向量
insert into table itcast_dws.itcast_intention_dws partition(yearinfo,monthinfo,dayinfo)
select 
    count( distinct customer_id) as customer_total, 
    '-1' as area,
    '-1' as itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    '-1' as hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    yearinfo as time_str,
    '1' as grouptype,
    '5' as time_type,
    yearinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwm.itcast_intention_dwm
group by yearinfo,origin_type_stat,clue_state_stat;

-- 统计每年每月 线上线下 新老用户的总意向量
insert into table itcast_dws.itcast_intention_dws partition(yearinfo,monthinfo,dayinfo)
select 
    count( distinct customer_id) as customer_total, 
    '-1' as area,
    '-1' as itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    '-1' as hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo,'-',monthinfo) as time_str,
    '1' as grouptype,
    '4' as time_type,
    yearinfo,
    monthinfo,
    '-1' as dayinfo
from itcast_dwm.itcast_intention_dwm
group by yearinfo,monthinfo,origin_type_stat,clue_state_stat;

-- 统计每年每月每日 线上线下 新老用户的总意向量
insert into table itcast_dws.itcast_intention_dws partition(yearinfo,monthinfo,dayinfo)
select 
    count( distinct customer_id) as customer_total, 
    '-1' as area,
    '-1' as itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    '-1' as hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '1' as grouptype,
    '2' as time_type,
    yearinfo,
    monthinfo,
    dayinfo
from itcast_dwm.itcast_intention_dwm
group by yearinfo,monthinfo,dayinfo,origin_type_stat,clue_state_stat;

-- 统计每年每月每日每小时 线上线下 新老用户的总意向量
insert into table itcast_dws.itcast_intention_dws partition(yearinfo,monthinfo,dayinfo)
select 
    count( distinct customer_id) as customer_total, 
    '-1' as area,
    '-1' as itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '1' as grouptype,
    '1' as time_type,
    yearinfo,
    monthinfo,
    dayinfo
from itcast_dwm.itcast_intention_dwm
group by yearinfo,monthinfo,dayinfo,hourinfo,origin_type_stat,clue_state_stat;

-- 统计 咨询中心维度
-- 统计每年线上线下, 新老用户产生各个咨询中心的意向量
insert into table itcast_dws.itcast_intention_dws partition(yearinfo,monthinfo,dayinfo)
select 
    count( distinct customer_id) as customer_total, 
    '-1' as area,
    '-1' as itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    '-1' as hourinfo,
    origin_type_stat,
    clue_state_stat,
    tdepart_id,
    tdepart_name,
    yearinfo as time_str,
    '5' as grouptype,
    '5' as time_type,
    yearinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwm.itcast_intention_dwm
group by yearinfo,origin_type_stat,clue_state_stat,tdepart_id,tdepart_name;

数据导出操作

从DWS层将数据导出到MYSQL中
# 在mysql中创建目标表
CREATE TABLE IF NOT EXISTS scrm_bi.itcast_intention (
   `customer_total` INT COMMENT '聚合意向客户数',
   `area` varchar(100) COMMENT '区域信息',
   `itcast_school_id` varchar(100) COMMENT '校区id',
   `itcast_school_name` varchar(100) COMMENT '校区名称',
   `origin_type` varchar(100) COMMENT '来源渠道',
   `itcast_subject_id` varchar(100) COMMENT '学科id',
   `itcast_subject_name` varchar(100) COMMENT '学科名称',
   `hourinfo` varchar(100) COMMENT '小时信息',
   `origin_type_stat` varchar(100) COMMENT '数据来源:0.线下;1.线上',
   `clue_state_stat` varchar(100) COMMENT '客户属性:0.老客户;1.新客户',
   `tdepart_id` varchar(100) COMMENT '创建者部门id',
   `tdepart_name` varchar(100) COMMENT '咨询中心名称',
   `time_str` varchar(100) COMMENT '时间明细',
   `groupType` varchar(100) COMMENT '产品属性类别:1.总意向量;2.区域信息;3.校区、学科组合分组;4.来源渠道;5.咨询中心;',
   `time_type` varchar(100) COMMENT '时间维度:1、按小时聚合;2、按天聚合;3、按周聚合;4、按月聚合;5、按年聚合;',
    yearinfo varchar(100) COMMENT '年' ,
    monthinfo varchar(100) COMMENT '月',
    dayinfo varchar(100) COMMENT '日'
)
comment '客户意向dws表';

# 执行sqoop, 将数据全部到导出 MySQL中
sqoop export \
--connect "jdbc:mysql://172.16.100.42:4000/scrm_bi?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--table itcast_intention \
--hcatalog-database itcast_dws \
--hcatalog-table itcast_intention_dws \
-m 1

增量

customer_relationship_update增量表,每次使用update表都需要重建,以避免因为数据重复而导致的问题。
-- customer_relationship_update增量表
DROP TABLE IF EXISTS itcast_ods.customer_relationship_update;
CREATE TABLE IF NOT EXISTS itcast_ods.customer_relationship_update (
  id int COMMENT '客户关系id',
  create_date_time STRING COMMENT '创建时间',
  update_date_time STRING COMMENT '最后更新时间',
  deleted int COMMENT '是否被删除(禁用)',
  customer_id int COMMENT '所属客户id',
  first_id int COMMENT '第一条客户关系id',
  belonger int COMMENT '归属人',
  belonger_name STRING COMMENT '归属人姓名',
  initial_belonger int COMMENT '初始归属人',
  distribution_handler int COMMENT '分配处理人',
  business_scrm_department_id int COMMENT '归属部门',
  last_visit_time STRING COMMENT '最后回访时间',
  next_visit_time STRING COMMENT '下次回访时间',
  origin_type STRING COMMENT '数据来源',
  itcast_school_id int COMMENT '校区Id',
  itcast_subject_id int COMMENT '学科Id',
  intention_study_type STRING COMMENT '意向学习方式',
  anticipat_signup_date STRING COMMENT '预计报名时间',
  level STRING COMMENT '客户级别',
  creator int COMMENT '创建人',
  current_creator int COMMENT '当前创建人:初始==创建人,当在公海拉回时为 拉回人',
  creator_name STRING COMMENT '创建者姓名',
  origin_channel STRING COMMENT '来源渠道',
  comment STRING COMMENT '备注',
  first_customer_clue_id int COMMENT '第一条线索id',
  last_customer_clue_id int COMMENT '最后一条线索id',
  process_state STRING COMMENT '处理状态',
  process_time STRING COMMENT '处理状态变动时间',
  payment_state STRING COMMENT '支付状态',
  payment_time STRING COMMENT '支付状态变动时间',
  signup_state STRING COMMENT '报名状态',
  signup_time STRING COMMENT '报名时间',
  notice_state STRING COMMENT '通知状态',
  notice_time STRING COMMENT '通知状态变动时间',
  lock_state STRING COMMENT '锁定状态',
  lock_time STRING COMMENT '锁定状态修改时间',
  itcast_clazz_id int COMMENT '所属ems班级id',
  itcast_clazz_time STRING COMMENT '报班时间',
  payment_url STRING COMMENT '付款链接',
  payment_url_time STRING COMMENT '支付链接生成时间',
  ems_student_id int COMMENT 'ems的学生id',
  delete_reason STRING COMMENT '删除原因',
  deleter int COMMENT '删除人',
  deleter_name STRING COMMENT '删除人姓名',
  delete_time STRING COMMENT '删除时间',
  course_id int COMMENT '课程ID',
  course_name STRING COMMENT '课程名称',
  delete_comment STRING COMMENT '删除原因说明',
  close_state STRING COMMENT '关闭装填',
  close_time STRING COMMENT '关闭状态变动时间',
  appeal_id int COMMENT '申诉id',
  tenant int COMMENT '租户',
  total_fee DECIMAL COMMENT '报名费总金额',
  belonged int COMMENT '小周期归属人',
  belonged_time STRING COMMENT '归属时间',
  belonger_time STRING COMMENT '归属时间',
  transfer int COMMENT '转移人',
  transfer_time STRING COMMENT '转移时间',
  follow_type int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  transfer_bxg_oa_account STRING COMMENT '转移到博学谷归属人OA账号',
  transfer_bxg_belonger_name STRING COMMENT '转移到博学谷归属人OA姓名',
  end_time STRING COMMENT '有效时间')
comment '客户关系表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');
# 抽取昨日新增和更新数据(逻辑删除也属于更新操作)
# 因为增量抽取是T+1,所以Sql中需要增加where条件,只查询昨天一天的数据(新增和更新),而不是所有表数据。
# 新增的数据create_time=昨天;更新的数据update_time=昨天。
# 注意,更新的数据可能是以前创建的数据,创建日期可能不是昨天。业务方将更新周期限制在30天内,也就是说,昨天更改的数据,create_time<='30天前的日期',而update_time的值就是昨天的日期。
# 查询条件需要包含创建日期和更新日期,因为需要将昨日新增和修改的数据都抽取到数仓中。

sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/nev \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query '
select id,
       create_date_time,
       update_date_time,
       deleted,
       customer_id,
       first_id,
       belonger,
       belonger_name,
       initial_belonger,
       distribution_handler,
       business_scrm_department_id,
       last_visit_time,
       next_visit_time,
       origin_type,
       itcast_school_id,
       itcast_subject_id,
       intention_study_type,
       anticipat_signup_date,
       level,
       creator,
       current_creator,
       creator_name,
       origin_channel,
       comment,
       first_customer_clue_id,
       last_customer_clue_id,
       process_state,
       process_time,
       payment_state,
       payment_time,
       signup_state,
       signup_time,
       notice_state,
       notice_time,
       lock_state,
       lock_time,
       itcast_clazz_id,
       itcast_clazz_time,
       payment_url,
       payment_url_time,
       ems_student_id,
       delete_reason,
       deleter,
       deleter_name,
       delete_time,
       course_id,
       course_name,
       delete_comment,
       close_state,
       close_time,
       appeal_id,
       tenant,
       total_fee,
       belonged,
       belonged_time,
       belonger_time,
       transfer,
       transfer_time,
       follow_type,
       transfer_bxg_oa_account,
       transfer_bxg_belonger_name,
       FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d") as start_time,
       date_format("9999-12-31", "%Y-%m-%d")       as end_time
from customer_relationship
where
  (
    create_date_time >= "2011-12-04 00:00:00"
     and
    create_date_time < "2011-12-05 00:00:00"
  )
  or
  (
    update_date_time >= "2011-12-04 00:00:00"
    and
    update_date_time < "2011-12-05 00:00:00"
  )
 and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table customer_relationship_update \
--hive-partition-key start_time \
--hive-partition-value  2020-07-15 \
-m 2 \
--split-by id
-- 重建customer_relationship_tmp临时表
-- 每次使用tmp表都需要重建,以避免因为数据重复而导致的问题。

DROP TABLE itcast_ods.`customer_relationship_tmp`;
CREATE TABLE IF NOT EXISTS itcast_ods.`customer_relationship_tmp` (
  `id` int COMMENT '客户关系id',
  `create_date_time` STRING COMMENT '创建时间',
  `update_date_time` STRING COMMENT '最后更新时间',
  `deleted` int COMMENT '是否被删除(禁用)',
  `customer_id` int COMMENT '所属客户id',
  `first_id` int COMMENT '第一条客户关系id',
  `belonger` int COMMENT '归属人',
  `belonger_name` STRING COMMENT '归属人姓名',
  `initial_belonger` int COMMENT '初始归属人',
  `distribution_handler` int COMMENT '分配处理人',
  `business_scrm_department_id` int COMMENT '归属部门',
  `last_visit_time` STRING COMMENT '最后回访时间',
  `next_visit_time` STRING COMMENT '下次回访时间',
  `origin_type` STRING COMMENT '数据来源',
  `itcast_school_id` int COMMENT '校区Id',
  `itcast_subject_id` int COMMENT '学科Id',
  `intention_study_type` STRING COMMENT '意向学习方式',
  `anticipat_signup_date` STRING COMMENT '预计报名时间',
  `level` STRING COMMENT '客户级别',
  `creator` int COMMENT '创建人',
  `current_creator` int COMMENT '当前创建人:初始==创建人,当在公海拉回时为 拉回人',
  `creator_name` STRING COMMENT '创建者姓名',
  `origin_channel` STRING COMMENT '来源渠道',
  `comment` STRING COMMENT '备注',
  `first_customer_clue_id` int COMMENT '第一条线索id',
  `last_customer_clue_id` int COMMENT '最后一条线索id',
  `process_state` STRING COMMENT '处理状态',
  `process_time` STRING COMMENT '处理状态变动时间',
  `payment_state` STRING COMMENT '支付状态',
  `payment_time` STRING COMMENT '支付状态变动时间',
  `signup_state` STRING COMMENT '报名状态',
  `signup_time` STRING COMMENT '报名时间',
  `notice_state` STRING COMMENT '通知状态',
  `notice_time` STRING COMMENT '通知状态变动时间',
  `lock_state` STRING COMMENT '锁定状态',
  `lock_time` STRING COMMENT '锁定状态修改时间',
  `itcast_clazz_id` int COMMENT '所属ems班级id',
  `itcast_clazz_time` STRING COMMENT '报班时间',
  `payment_url` STRING COMMENT '付款链接',
  `payment_url_time` STRING COMMENT '支付链接生成时间',
  `ems_student_id` int COMMENT 'ems的学生id',
  `delete_reason` STRING COMMENT '删除原因',
  `deleter` int COMMENT '删除人',
  `deleter_name` STRING COMMENT '删除人姓名',
  `delete_time` STRING COMMENT '删除时间',
  `course_id` int COMMENT '课程ID',
  `course_name` STRING COMMENT '课程名称',
  `delete_comment` STRING COMMENT '删除原因说明',
  `close_state` STRING COMMENT '关闭装填',
  `close_time` STRING COMMENT '关闭状态变动时间',
  `appeal_id` int COMMENT '申诉id',
  `tenant` int COMMENT '租户',
  `total_fee` DECIMAL COMMENT '报名费总金额',
  `belonged` int COMMENT '小周期归属人',
  `belonged_time` STRING COMMENT '归属时间',
  `belonger_time` STRING COMMENT '归属时间',
  `transfer` int COMMENT '转移人',
  `transfer_time` STRING COMMENT '转移时间',
  `follow_type` int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  `transfer_bxg_oa_account` STRING COMMENT '转移到博学谷归属人OA账号',
  `transfer_bxg_belonger_name` STRING COMMENT '转移到博学谷归属人OA姓名',
  `end_time` STRING COMMENT '有效截止时间')
comment '客户关系表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');

-- 合并增量数据与历史数据(根据需求仅更新30天之内的数据)
insert overwrite table itcast_ods.customer_relationship_tmp partition (start_time)
select * from
    (
--      一、update表更新的数据
        select
          id,
          create_date_time,
          update_date_time,
          deleted,
          customer_id,
          first_id,
          belonger,
          belonger_name,
          initial_belonger,
          distribution_handler,
          business_scrm_department_id,
          last_visit_time,
          next_visit_time,
          origin_type,
          itcast_school_id,
          itcast_subject_id,
          intention_study_type,
          anticipat_signup_date,
          level,
          creator,
          current_creator,
          creator_name,
          origin_channel,
          comment,
          first_customer_clue_id,
          last_customer_clue_id,
          process_state,
          process_time,
          payment_state,
          payment_time,
          signup_state,
          signup_time,
          notice_state,
          notice_time,
          lock_state,
          lock_time,
          itcast_clazz_id,
          itcast_clazz_time,
          payment_url,
          payment_url_time,
          ems_student_id,
          delete_reason,
          deleter,
          deleter_name,
          delete_time,
          course_id,
          course_name,
          delete_comment,
          close_state,
          close_time,
          appeal_id,
          tenant,
          total_fee,
          belonged,
          belonged_time,
          belonger_time,
          transfer,
          transfer_time,
          follow_type,
          transfer_bxg_oa_account,
          transfer_bxg_belonger_name,
          '9999-12-31' end_time,
          '2020-07-15' as start_time
        from itcast_ods.customer_relationship_update where start_time='2020-07-15'

        union all

--      二、历史拉链表数据,并根据update判断更新end_time有效期
        select
          rs.id,
          rs.create_date_time,
          rs.update_date_time,
          rs.deleted,
          rs.customer_id,
          rs.first_id,
          rs.belonger,
          rs.belonger_name,
          rs.initial_belonger,
          rs.distribution_handler,
          rs.business_scrm_department_id,
          rs.last_visit_time,
          rs.next_visit_time,
          rs.origin_type,
          rs.itcast_school_id,
          rs.itcast_subject_id,
          rs.intention_study_type,
          rs.anticipat_signup_date,
          rs.level,
          rs.creator,
          rs.current_creator,
          rs.creator_name,
          rs.origin_channel,
          rs.comment,
          rs.first_customer_clue_id,
          rs.last_customer_clue_id,
          rs.process_state,
          rs.process_time,
          rs.payment_state,
          rs.payment_time,
          rs.signup_state,
          rs.signup_time,
          rs.notice_state,
          rs.notice_time,
          rs.lock_state,
          rs.lock_time,
          rs.itcast_clazz_id,
          rs.itcast_clazz_time,
          rs.payment_url,
          rs.payment_url_time,
          rs.ems_student_id,
          rs.delete_reason,
          rs.deleter,
          rs.deleter_name,
          rs.delete_time,
          rs.course_id,
          rs.course_name,
          rs.delete_comment,
          rs.close_state,
          rs.close_time,
          rs.appeal_id,
          rs.tenant,
          rs.total_fee,
          rs.belonged,
          rs.belonged_time,
          rs.belonger_time,
          rs.transfer,
          rs.transfer_time,
          rs.follow_type,
          rs.transfer_bxg_oa_account,
          rs.transfer_bxg_belonger_name,
          --3、更新end_time:如果没有匹配到变更数据,或者当前已经是无效的历史数据,则保留原始end_time过期时间;否则变更end_time时间为前天(昨天之前有效)
          if(up.id is null or rs.end_time<'9999-12-31', rs.end_time, date_add(up.start_time,-1)) end_time,
          rs.start_time
        from itcast_ods.customer_relationship rs left join
            (
                select
                *
                from itcast_ods.customer_relationship_update
                where start_time='2020-07-15'
            ) up
        on rs.id=up.id
        --4、时间限制:历史表中30天之内的数据才有可能变更,结果会按照所属分区进行覆盖插入
        where rs.start_time >= date_add(up.start_time,-30)
    )his
order by his.id, start_time;

-- 临时表覆盖到拉链表
-- 注意如果有分区的情况下,只会覆盖所属分区的数据,所以不用在上一个步骤中查询出所有历史数据,我们只需要查询出30天内的数据即可,30天前的数据不会被覆盖。
INSERT OVERWRITE TABLE itcast_ods.customer_relationship partition (start_time) 
SELECT * from itcast_ods.customer_relationship_tmp;
-- 测试
-- 完整执行流程后,观察拉链表中对应条件的数据是否有变化:
SELECT * from itcast_ods.customer_relationship
WHERE create_date_time BETWEEN "2011-12-04 00:00:00" and "2011-12-05 00:00:00";
# Oozie脚本
#! /bin/bash
HIVE_HOME=/usr/bin/hive
if [[ $1 == "" ]];
then
    TD_DATE=`date -d ''1 days ago'' "+%Y-%m-%d"`
else
    TD_DATE=$1
fi

# 判断TD_DATE时间格式是否正确
if [[ $TD_DATE =~ ^[0-9]{4}-[0-9]{2}-[0-9]{2}$ ]];then
    echo "TD_DATE is $TD_DATE"
else
    echo "TD_DATE is not correct"
    exit 1
fi

output=$(${HIVE_HOME} -S -e "
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
DROP TABLE IF EXISTS itcast_ods.customer_relationship_update;
CREATE TABLE IF NOT EXISTS itcast_ods.customer_relationship_update (
  id int COMMENT '客户关系id',
  create_date_time STRING COMMENT '创建时间',
  update_date_time STRING COMMENT '最后更新时间',
  deleted int COMMENT '是否被删除(禁用)',
  customer_id int COMMENT '所属客户id',
  first_id int COMMENT '第一条客户关系id',
  belonger int COMMENT '归属人',
  belonger_name STRING COMMENT '归属人姓名',
  initial_belonger int COMMENT '初始归属人',
  distribution_handler int COMMENT '分配处理人',
  business_scrm_department_id int COMMENT '归属部门',
  last_visit_time STRING COMMENT '最后回访时间',
  next_visit_time STRING COMMENT '下次回访时间',
  origin_type STRING COMMENT '数据来源',
  itcast_school_id int COMMENT '校区Id',
  itcast_subject_id int COMMENT '学科Id',
  intention_study_type STRING COMMENT '意向学习方式',
  anticipat_signup_date STRING COMMENT '预计报名时间',
  level STRING COMMENT '客户级别',
  creator int COMMENT '创建人',
  current_creator int COMMENT '当前创建人:初始==创建人,当在公海拉回时为 拉回人',
  creator_name STRING COMMENT '创建者姓名',
  origin_channel STRING COMMENT '来源渠道',
  comment STRING COMMENT '备注',
  first_customer_clue_id int COMMENT '第一条线索id',
  last_customer_clue_id int COMMENT '最后一条线索id',
  process_state STRING COMMENT '处理状态',
  process_time STRING COMMENT '处理状态变动时间',
  payment_state STRING COMMENT '支付状态',
  payment_time STRING COMMENT '支付状态变动时间',
  signup_state STRING COMMENT '报名状态',
  signup_time STRING COMMENT '报名时间',
  notice_state STRING COMMENT '通知状态',
  notice_time STRING COMMENT '通知状态变动时间',
  lock_state STRING COMMENT '锁定状态',
  lock_time STRING COMMENT '锁定状态修改时间',
  itcast_clazz_id int COMMENT '所属ems班级id',
  itcast_clazz_time STRING COMMENT '报班时间',
  payment_url STRING COMMENT '付款链接',
  payment_url_time STRING COMMENT '支付链接生成时间',
  ems_student_id int COMMENT 'ems的学生id',
  delete_reason STRING COMMENT '删除原因',
  deleter int COMMENT '删除人',
  deleter_name STRING COMMENT '删除人姓名',
  delete_time STRING COMMENT '删除时间',
  course_id int COMMENT '课程ID',
  course_name STRING COMMENT '课程名称',
  delete_comment STRING COMMENT '删除原因说明',
  close_state STRING COMMENT '关闭装填',
  close_time STRING COMMENT '关闭状态变动时间',
  appeal_id int COMMENT '申诉id',
  tenant int COMMENT '租户',
  total_fee DECIMAL COMMENT '报名费总金额',
  belonged int COMMENT '小周期归属人',
  belonged_time STRING COMMENT '归属时间',
  belonger_time STRING COMMENT '归属时间',
  transfer int COMMENT '转移人',
  transfer_time STRING COMMENT '转移时间',
  follow_type int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  transfer_bxg_oa_account STRING COMMENT '转移到博学谷归属人OA账号',
  transfer_bxg_belonger_name STRING COMMENT '转移到博学谷归属人OA姓名',
  end_time STRING COMMENT '有效时间')
comment '客户关系表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');
")

SQOOP_HOME=/usr/bin/sqoop
output=$(${SQOOP_HOME} import \
--connect jdbc:mysql://172.16.100.42:4000/nev \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query 'select id,
       create_date_time,
       update_date_time,
       deleted,
       customer_id,
       first_id,
       belonger,
       belonger_name,
       initial_belonger,
       distribution_handler,
       business_scrm_department_id,
       last_visit_time,
       next_visit_time,
       origin_type,
       itcast_school_id,
       itcast_subject_id,
       intention_study_type,
       anticipat_signup_date,
       level,
       creator,
       current_creator,
       creator_name,
       origin_channel,
       comment,
       first_customer_clue_id,
       last_customer_clue_id,
       process_state,
       process_time,
       payment_state,
       payment_time,
       signup_state,
       signup_time,
       notice_state,
       notice_time,
       lock_state,
       lock_time,
       itcast_clazz_id,
       itcast_clazz_time,
       payment_url,
       payment_url_time,
       ems_student_id,
       delete_reason,
       deleter,
       deleter_name,
       delete_time,
       course_id,
       course_name,
       delete_comment,
       close_state,
       close_time,
       appeal_id,
       tenant,
       total_fee,
       belonged,
       belonged_time,
       belonger_time,
       transfer,
       transfer_time,
       follow_type,
       transfer_bxg_oa_account,
       transfer_bxg_belonger_name,
       FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d") as start_time,
       date_format("9999-12-31", "%Y-%m-%d")       as end_time
from customer_relationship
where
  (
    create_date_time >= FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(SYSDATE()AS DATE) - INTERVAL 1 DAY),"%Y-%m-%d %H:%i:%s")
     and
    create_date_time < FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(SYSDATE()AS DATE)),"%Y-%m-%d %H:%i:%s")
  )
  or
  (
    update_date_time >= FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(SYSDATE()AS DATE) - INTERVAL 1 DAY),"%Y-%m-%d %H:%i:%s")
    and
    update_date_time < FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(SYSDATE()AS DATE)),"%Y-%m-%d %H:%i:%s")
  ) and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table customer_relationship_update \
--hive-partition-key start_time \
--hive-partition-value ${TD_DATE} \
-m 2 \
--split-by id)

output=$(${HIVE_HOME} -S -e "
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

DROP TABLE itcast_ods.customer_clue_tmp;
CREATE TABLE IF NOT EXISTS itcast_ods.customer_clue_tmp (
  id int COMMENT 'customer_clue_id',
  create_date_time STRING COMMENT '创建时间',
  update_date_time STRING COMMENT '最后更新时间',
  deleted STRING COMMENT '是否被删除(禁用)',
  customer_id int COMMENT '客户id',
  customer_relationship_id int COMMENT '客户关系id',
  session_id STRING COMMENT '七陌会话id',
  sid STRING COMMENT '访客id',
  status STRING COMMENT '状态(undeal待领取 deal 已领取 finish 已关闭 changePeer 已流转)',
  users STRING COMMENT '所属坐席',
  create_time STRING COMMENT '七陌创建时间',
  platform STRING COMMENT '平台来源 (pc-网站咨询|wap-wap咨询|sdk-app咨询|weixin-微信咨询)',
  s_name STRING COMMENT '用户名称',
  seo_source STRING COMMENT '搜索来源',
  seo_keywords STRING COMMENT '关键字',
  ip STRING COMMENT 'IP地址',
  referrer STRING COMMENT '上级来源页面',
  from_url STRING COMMENT '会话来源页面',
  landing_page_url STRING COMMENT '访客着陆页面',
  url_title STRING COMMENT '咨询页面title',
  to_peer STRING COMMENT '所属技能组',
  manual_time STRING COMMENT '人工开始时间',
  begin_time STRING COMMENT '坐席领取时间 ',
  reply_msg_count int COMMENT '客服回复消息数',
  total_msg_count int COMMENT '消息总数',
  msg_count int COMMENT '客户发送消息数',
  comment STRING COMMENT '备注',
  finish_reason STRING COMMENT '结束类型',
  finish_user STRING COMMENT '结束坐席',
  end_time STRING COMMENT '会话结束时间',
  platform_description STRING COMMENT '客户平台信息',
  browser_name STRING COMMENT '浏览器名称',
  os_info STRING COMMENT '系统名称',
  area STRING COMMENT '区域',
  country STRING COMMENT '所在国家',
  province STRING COMMENT '省',
  city STRING COMMENT '城市',
  creator int COMMENT '创建人',
  name STRING COMMENT '客户姓名',
  idcard STRING COMMENT '身份证号',
  phone STRING COMMENT '手机号',
  itcast_school_id int COMMENT '校区Id',
  itcast_school STRING COMMENT '校区',
  itcast_subject_id int COMMENT '学科Id',
  itcast_subject STRING COMMENT '学科',
  wechat STRING COMMENT '微信',
  qq STRING COMMENT 'qq号',
  email STRING COMMENT '邮箱',
  gender STRING COMMENT '性别',
  level STRING COMMENT '客户级别',
  origin_type STRING COMMENT '数据来源渠道',
  information_way STRING COMMENT '资讯方式',
  working_years STRING COMMENT '开始工作时间',
  technical_directions STRING COMMENT '技术方向',
  customer_state STRING COMMENT '当前客户状态',
  valid STRING COMMENT '该线索是否是网资有效线索',
  anticipat_signup_date STRING COMMENT '预计报名时间',
  clue_state STRING COMMENT '线索状态',
  scrm_department_id int COMMENT 'SCRM内部部门id',
  superior_url STRING COMMENT '诸葛获取上级页面URL',
  superior_source STRING COMMENT '诸葛获取上级页面URL标题',
  landing_url STRING COMMENT '诸葛获取着陆页面URL',
  landing_source STRING COMMENT '诸葛获取着陆页面URL来源',
  info_url STRING COMMENT '诸葛获取留咨页URL',
  info_source STRING COMMENT '诸葛获取留咨页URL标题',
  origin_channel STRING COMMENT '投放渠道',
  course_id int COMMENT '课程编号',
  course_name STRING COMMENT '课程名称',
  zhuge_session_id STRING COMMENT 'zhuge会话id',
  is_repeat int COMMENT '是否重复线索(手机号维度) 0:正常 1:重复',
  tenant int COMMENT '租户id',
  activity_id STRING COMMENT '活动id',
  activity_name STRING COMMENT '活动名称',
  follow_type int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  shunt_mode_id int COMMENT '匹配到的技能组id',
  shunt_employee_group_id int COMMENT '所属分流员工组',
  ends_time STRING COMMENT '有效时间')
comment '客户关系表'
PARTITIONED BY(starts_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');

insert overwrite table itcast_ods.`customer_relationship_tmp` partition (start_time)
select * from
    (
        select
          id,
          create_date_time,
          update_date_time,
          deleted,
          customer_id,
          first_id,
          belonger,
          belonger_name,
          initial_belonger,
          distribution_handler,
          business_scrm_department_id,
          last_visit_time,
          next_visit_time,
          origin_type,
          itcast_school_id,
          itcast_subject_id,
          intention_study_type,
          anticipat_signup_date,
          level,
          creator,
          current_creator,
          creator_name,
          origin_channel,
          comment,
          first_customer_clue_id,
          last_customer_clue_id,
          process_state,
          process_time,
          payment_state,
          payment_time,
          signup_state,
          signup_time,
          notice_state,
          notice_time,
          lock_state,
          lock_time,
          itcast_clazz_id,
          itcast_clazz_time,
          payment_url,
          payment_url_time,
          ems_student_id,
          delete_reason,
          deleter,
          deleter_name,
          delete_time,
          course_id,
          course_name,
          delete_comment,
          close_state,
          close_time,
          appeal_id,
          tenant,
          total_fee,
          belonged,
          belonged_time,
          belonger_time,
          transfer,
          transfer_time,
          follow_type,
          transfer_bxg_oa_account,
          transfer_bxg_belonger_name,
          '9999-12-31' end_time,
          FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d") as start_time
        from itcast_ods.customer_relationship_update where start_time=FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d")

        union all
        select
          rs.id,
          rs.create_date_time,
          rs.update_date_time,
          rs.deleted,
          rs.customer_id,
          rs.first_id,
          rs.belonger,
          rs.belonger_name,
          rs.initial_belonger,
          rs.distribution_handler,
          rs.business_scrm_department_id,
          rs.last_visit_time,
          rs.next_visit_time,
          rs.origin_type,
          rs.itcast_school_id,
          rs.itcast_subject_id,
          rs.intention_study_type,
          rs.anticipat_signup_date,
          rs.level,
          rs.creator,
          rs.current_creator,
          rs.creator_name,
          rs.origin_channel,
          rs.comment,
          rs.first_customer_clue_id,
          rs.last_customer_clue_id,
          rs.process_state,
          rs.process_time,
          rs.payment_state,
          rs.payment_time,
          rs.signup_state,
          rs.signup_time,
          rs.notice_state,
          rs.notice_time,
          rs.lock_state,
          rs.lock_time,
          rs.itcast_clazz_id,
          rs.itcast_clazz_time,
          rs.payment_url,
          rs.payment_url_time,
          rs.ems_student_id,
          rs.delete_reason,
          rs.deleter,
          rs.deleter_name,
          rs.delete_time,
          rs.course_id,
          rs.course_name,
          rs.delete_comment,
          rs.close_state,
          rs.close_time,
          rs.appeal_id,
          rs.tenant,
          rs.total_fee,
          rs.belonged,
          rs.belonged_time,
          rs.belonger_time,
          rs.transfer,
          rs.transfer_time,
          rs.follow_type,
          rs.transfer_bxg_oa_account,
          rs.transfer_bxg_belonger_name,
          if(up.id is null, rs.end_time, date_add(up.start_time,-1)) end_time,
          rs.start_time
        from itcast_ods.customer_relationship rs left join
            (
                select
                *
                from itcast_ods.customer_relationship_update
                where start_time=FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d")
            ) up
        on rs.id=up.id where rs.start_time >= date_sub(FROM_UNIXTIME(UNIX_TIMESTAMP()),30) and rs.end_time='9999-12-31'
    )his
order by his.id, start_time;

INSERT OVERWRITE TABLE itcast_ods.customer_relationship partition (start_time) 
SELECT * from itcast_ods.customer_relationship_tmp;
")
-- Customer_clue线索表
-- 重建customer_clue_update更新表
DROP TABLE itcast_ods.customer_clue_update;
CREATE TABLE IF NOT EXISTS itcast_ods.customer_clue_update (
  id int COMMENT 'customer_clue_id',
  create_date_time STRING COMMENT '创建时间',
  update_date_time STRING COMMENT '最后更新时间',
  deleted STRING COMMENT '是否被删除(禁用)',
  customer_id int COMMENT '客户id',
  customer_relationship_id int COMMENT '客户关系id',
  session_id STRING COMMENT '七陌会话id',
  sid STRING COMMENT '访客id',
  status STRING COMMENT '状态(undeal待领取 deal 已领取 finish 已关闭 changePeer 已流转)',
  users STRING COMMENT '所属坐席',
  create_time STRING COMMENT '七陌创建时间',
  platform STRING COMMENT '平台来源 (pc-网站咨询|wap-wap咨询|sdk-app咨询|weixin-微信咨询)',
  s_name STRING COMMENT '用户名称',
  seo_source STRING COMMENT '搜索来源',
  seo_keywords STRING COMMENT '关键字',
  ip STRING COMMENT 'IP地址',
  referrer STRING COMMENT '上级来源页面',
  from_url STRING COMMENT '会话来源页面',
  landing_page_url STRING COMMENT '访客着陆页面',
  url_title STRING COMMENT '咨询页面title',
  to_peer STRING COMMENT '所属技能组',
  manual_time STRING COMMENT '人工开始时间',
  begin_time STRING COMMENT '坐席领取时间 ',
  reply_msg_count int COMMENT '客服回复消息数',
  total_msg_count int COMMENT '消息总数',
  msg_count int COMMENT '客户发送消息数',
  comment STRING COMMENT '备注',
  finish_reason STRING COMMENT '结束类型',
  finish_user STRING COMMENT '结束坐席',
  end_time STRING COMMENT '会话结束时间',
  platform_description STRING COMMENT '客户平台信息',
  browser_name STRING COMMENT '浏览器名称',
  os_info STRING COMMENT '系统名称',
  area STRING COMMENT '区域',
  country STRING COMMENT '所在国家',
  province STRING COMMENT '省',
  city STRING COMMENT '城市',
  creator int COMMENT '创建人',
  name STRING COMMENT '客户姓名',
  idcard STRING COMMENT '身份证号',
  phone STRING COMMENT '手机号',
  itcast_school_id int COMMENT '校区Id',
  itcast_school STRING COMMENT '校区',
  itcast_subject_id int COMMENT '学科Id',
  itcast_subject STRING COMMENT '学科',
  wechat STRING COMMENT '微信',
  qq STRING COMMENT 'qq号',
  email STRING COMMENT '邮箱',
  gender STRING COMMENT '性别',
  level STRING COMMENT '客户级别',
  origin_type STRING COMMENT '数据来源渠道',
  information_way STRING COMMENT '资讯方式',
  working_years STRING COMMENT '开始工作时间',
  technical_directions STRING COMMENT '技术方向',
  customer_state STRING COMMENT '当前客户状态',
  valid STRING COMMENT '该线索是否是网资有效线索',
  anticipat_signup_date STRING COMMENT '预计报名时间',
  clue_state STRING COMMENT '线索状态',
  scrm_department_id int COMMENT 'SCRM内部部门id',
  superior_url STRING COMMENT '诸葛获取上级页面URL',
  superior_source STRING COMMENT '诸葛获取上级页面URL标题',
  landing_url STRING COMMENT '诸葛获取着陆页面URL',
  landing_source STRING COMMENT '诸葛获取着陆页面URL来源',
  info_url STRING COMMENT '诸葛获取留咨页URL',
  info_source STRING COMMENT '诸葛获取留咨页URL标题',
  origin_channel STRING COMMENT '投放渠道',
  course_id int COMMENT '课程编号',
  course_name STRING COMMENT '课程名称',
  zhuge_session_id STRING COMMENT 'zhuge会话id',
  is_repeat int COMMENT '是否重复线索(手机号维度) 0:正常 1:重复',
  tenant int COMMENT '租户id',
  activity_id STRING COMMENT '活动id',
  activity_name STRING COMMENT '活动名称',
  follow_type int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  shunt_mode_id int COMMENT '匹配到的技能组id',
  shunt_employee_group_id int COMMENT '所属分流员工组',
  ends_time STRING COMMENT '有效时间')
comment '客户关系表'
PARTITIONED BY(starts_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');
# 抽取昨日新增和更新数据(逻辑删除也属于更新操作)
# 因为增量抽取是T+1,所以Sql中需要增加where条件,只查询昨天一天的数据,而不是所有表数据。
# 查询条件需要包含创建日期和更新日期,因为需要将昨日新增和修改的数据都抽取到数仓中。

sqoop import \
--connect jdbc:mysql://172.16.100.42:4000/nev \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query '
select id,
    create_date_time,
    update_date_time,
    deleted,
    customer_id,
    customer_relationship_id,
    session_id,
    sid,
    status, user as users, create_time, platform, s_name, seo_source, seo_keywords, ip, referrer, from_url, landing_page_url, url_title, to_peer, manual_time, begin_time, reply_msg_count, total_msg_count, msg_count, comment, finish_reason, finish_user, end_time, platform_description, browser_name, os_info, area, country, province, city, creator, name, idcard, phone, itcast_school_id, itcast_school, itcast_subject_id, itcast_subject, wechat, qq, email, gender, level, origin_type, information_way, working_years, technical_directions, customer_state, valid, anticipat_signup_date, clue_state, scrm_department_id, superior_url, superior_source, landing_url, landing_source, info_url, info_source, origin_channel, course_id, course_name, zhuge_session_id, is_repeat, tenant, activity_id, activity_name, follow_type, shunt_mode_id, shunt_employee_group_id,
    FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d") as starts_time, date_format("9999-12-31", "%Y-%m-%d") as ends_time
from customer_clue
where
  (
    create_date_time >= FROM_UNIXTIME(UNIX_TIMESTAMP("2019-12-04 00:00:00"),"%Y-%m-%d %H:%i:%s")
     and
    create_date_time < FROM_UNIXTIME(UNIX_TIMESTAMP("2019-12-04 23:59:59"),"%Y-%m-%d %H:%i:%s")
  )
  or
  (
    update_date_time >= FROM_UNIXTIME(UNIX_TIMESTAMP("2019-12-04 00:00:00"),"%Y-%m-%d %H:%i:%s")
    and
    update_date_time < FROM_UNIXTIME(UNIX_TIMESTAMP("2019-12-04 23:59:59"),"%Y-%m-%d %H:%i:%s")
  )
and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table customer_clue_update \
--hive-partition-key starts_time \
--hive-partition-value 2019-12-04 \
-m 2 \
--split-by id
-- 重建customer_clue_tmp临时表
DROP TABLE itcast_ods.customer_clue_tmp;
CREATE TABLE IF NOT EXISTS itcast_ods.customer_clue_tmp (
  id int COMMENT 'customer_clue_id',
  create_date_time STRING COMMENT '创建时间',
  update_date_time STRING COMMENT '最后更新时间',
  deleted STRING COMMENT '是否被删除(禁用)',
  customer_id int COMMENT '客户id',
  customer_relationship_id int COMMENT '客户关系id',
  session_id STRING COMMENT '七陌会话id',
  sid STRING COMMENT '访客id',
  status STRING COMMENT '状态(undeal待领取 deal 已领取 finish 已关闭 changePeer 已流转)',
  users STRING COMMENT '所属坐席',
  create_time STRING COMMENT '七陌创建时间',
  platform STRING COMMENT '平台来源 (pc-网站咨询|wap-wap咨询|sdk-app咨询|weixin-微信咨询)',
  s_name STRING COMMENT '用户名称',
  seo_source STRING COMMENT '搜索来源',
  seo_keywords STRING COMMENT '关键字',
  ip STRING COMMENT 'IP地址',
  referrer STRING COMMENT '上级来源页面',
  from_url STRING COMMENT '会话来源页面',
  landing_page_url STRING COMMENT '访客着陆页面',
  url_title STRING COMMENT '咨询页面title',
  to_peer STRING COMMENT '所属技能组',
  manual_time STRING COMMENT '人工开始时间',
  begin_time STRING COMMENT '坐席领取时间 ',
  reply_msg_count int COMMENT '客服回复消息数',
  total_msg_count int COMMENT '消息总数',
  msg_count int COMMENT '客户发送消息数',
  comment STRING COMMENT '备注',
  finish_reason STRING COMMENT '结束类型',
  finish_user STRING COMMENT '结束坐席',
  end_time STRING COMMENT '会话结束时间',
  platform_description STRING COMMENT '客户平台信息',
  browser_name STRING COMMENT '浏览器名称',
  os_info STRING COMMENT '系统名称',
  area STRING COMMENT '区域',
  country STRING COMMENT '所在国家',
  province STRING COMMENT '省',
  city STRING COMMENT '城市',
  creator int COMMENT '创建人',
  name STRING COMMENT '客户姓名',
  idcard STRING COMMENT '身份证号',
  phone STRING COMMENT '手机号',
  itcast_school_id int COMMENT '校区Id',
  itcast_school STRING COMMENT '校区',
  itcast_subject_id int COMMENT '学科Id',
  itcast_subject STRING COMMENT '学科',
  wechat STRING COMMENT '微信',
  qq STRING COMMENT 'qq号',
  email STRING COMMENT '邮箱',
  gender STRING COMMENT '性别',
  level STRING COMMENT '客户级别',
  origin_type STRING COMMENT '数据来源渠道',
  information_way STRING COMMENT '资讯方式',
  working_years STRING COMMENT '开始工作时间',
  technical_directions STRING COMMENT '技术方向',
  customer_state STRING COMMENT '当前客户状态',
  valid STRING COMMENT '该线索是否是网资有效线索',
  anticipat_signup_date STRING COMMENT '预计报名时间',
  clue_state STRING COMMENT '线索状态',
  scrm_department_id int COMMENT 'SCRM内部部门id',
  superior_url STRING COMMENT '诸葛获取上级页面URL',
  superior_source STRING COMMENT '诸葛获取上级页面URL标题',
  landing_url STRING COMMENT '诸葛获取着陆页面URL',
  landing_source STRING COMMENT '诸葛获取着陆页面URL来源',
  info_url STRING COMMENT '诸葛获取留咨页URL',
  info_source STRING COMMENT '诸葛获取留咨页URL标题',
  origin_channel STRING COMMENT '投放渠道',
  course_id int COMMENT '课程编号',
  course_name STRING COMMENT '课程名称',
  zhuge_session_id STRING COMMENT 'zhuge会话id',
  is_repeat int COMMENT '是否重复线索(手机号维度) 0:正常 1:重复',
  tenant int COMMENT '租户id',
  activity_id STRING COMMENT '活动id',
  activity_name STRING COMMENT '活动名称',
  follow_type int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  shunt_mode_id int COMMENT '匹配到的技能组id',
  shunt_employee_group_id int COMMENT '所属分流员工组',
  ends_time STRING COMMENT '有效时间')
comment '客户关系表'
PARTITIONED BY(starts_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');
-- 合并增量数据与历史数据(仅更新30天之内的数据,根据需求)
insert overwrite table itcast_ods.customer_clue_tmp partition (starts_time)
select * from 
(
    select 
      id,
      create_date_time,
      update_date_time,
      deleted,
      customer_id,
      customer_relationship_id,
      session_id,
      sid,
      status,
      users,
      create_time,
      platform,
      s_name,
      seo_source,
      seo_keywords,
      ip,
      referrer,
      from_url,
      landing_page_url,
      url_title,
      to_peer,
      manual_time,
      begin_time,
      reply_msg_count,
      total_msg_count,
      msg_count,
      comment,
      finish_reason,
      finish_user,
      end_time,
      platform_description,
      browser_name,
      os_info,
      area,
      country,
      province,
      city,
      creator,
      name,
      idcard,
      phone,
      itcast_school_id,
      itcast_school,
      itcast_subject_id,
      itcast_subject,
      wechat,
      qq,
      email,
      gender,
      level,
      origin_type,
      information_way,
      working_years,
      technical_directions,
      customer_state,
      valid,
      anticipat_signup_date,
      clue_state,
      scrm_department_id,
      superior_url,
      superior_source,
      landing_url,
      landing_source,
      info_url,
      info_source,
      origin_channel,
      course_id,
      course_name,
      zhuge_session_id,
      is_repeat,
      tenant,
      activity_id,
      activity_name,
      follow_type,
      shunt_mode_id,
      shunt_employee_group_id,
      '9999-12-31' ends_time,
      FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d") as starts_time
    from itcast_ods.customer_clue_update where starts_time=FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d")

    union all 
    select
      rs.id,
      rs.create_date_time,
      rs.update_date_time,
      rs.deleted,
      rs.customer_id,
      rs.customer_relationship_id,
      rs.session_id,
      rs.sid,
      rs.status,
      rs.users,
      rs.create_time,
      rs.platform,
      rs.s_name,
      rs.seo_source,
      rs.seo_keywords,
      rs.ip,
      rs.referrer,
      rs.from_url,
      rs.landing_page_url,
      rs.url_title,
      rs.to_peer,
      rs.manual_time,
      rs.begin_time,
      rs.reply_msg_count,
      rs.total_msg_count,
      rs.msg_count,
      rs.comment,
      rs.finish_reason,
      rs.finish_user,
      rs.end_time,
      rs.platform_description,
      rs.browser_name,
      rs.os_info,
      rs.area,
      rs.country,
      rs.province,
      rs.city,
      rs.creator,
      rs.name,
      rs.idcard,
      rs.phone,
      rs.itcast_school_id,
      rs.itcast_school,
      rs.itcast_subject_id,
      rs.itcast_subject,
      rs.wechat,
      rs.qq,
      rs.email,
      rs.gender,
      rs.level,
      rs.origin_type,
      rs.information_way,
      rs.working_years,
      rs.technical_directions,
      rs.customer_state,
      rs.valid,
      rs.anticipat_signup_date,
      rs.clue_state,
      rs.scrm_department_id,
      rs.superior_url,
      rs.superior_source,
      rs.landing_url,
      rs.landing_source,
      rs.info_url,
      rs.info_source,
      rs.origin_channel,
      rs.course_id,
      rs.course_name,
      rs.zhuge_session_id,
      rs.is_repeat,
      rs.tenant,
      rs.activity_id,
      rs.activity_name,
      rs.follow_type,
      rs.shunt_mode_id,
      rs.shunt_employee_group_id,
      if(up.id is null or rs.end_time<'9999-12-31', rs.ends_time, date_add(up.starts_time,-1)) ends_time,
      rs.starts_time
    from itcast_ods.customer_clue rs left join 
        (
            select
            *
            from itcast_ods.customer_clue_update
            where starts_time=FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d")
        ) up
    on rs.id=up.id  where rs.starts_time >= date_add(FROM_UNIXTIME(UNIX_TIMESTAMP()),-30)
)his 
order by his.id, starts_time;
-- 临时表覆盖到拉链表
INSERT OVERWRITE TABLE itcast_ods.customer_clue partition (starts_time) 
SELECT * from itcast_ods.customer_clue_tmp;
-- 测试
-- 1. 删除mysql和HDFS(外部表)中的测试数据,避免数据重复,便于验证测试结果
-- 2. 向mysql中插入新数据
-- 3. 验证sqoop中的sql是否能够在mysql正常查询出测试数据
-- 4. 重建update更新表
-- 5. 手动执行sqoop脚本抽取数据
-- 6. 重建tmp临时表
-- 7. 合并当天的新增和更新数据
-- 8. 临时表覆盖到拉链表
# Oozie脚本
#! /bin/bash
HIVE_HOME=/usr/bin/hive
if [[ $1 == "" ]];
then
    TD_DATE=`date -d ''1 days ago'' "+%Y-%m-%d"`
else
    TD_DATE=$1
fi

# 判断TD_DATE时间格式是否正确
if [[ $TD_DATE =~ ^[0-9]{4}-[0-9]{2}-[0-9]{2}$ ]];then
    echo "TD_DATE is $TD_DATE"
else
    echo "TD_DATE is not correct"
    exit 1
fi

output=$(${HIVE_HOME} -S -e "
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
DROP TABLE itcast_ods.customer_clue_update;
CREATE TABLE IF NOT EXISTS itcast_ods.customer_clue_update (
  id int COMMENT 'customer_clue_id',
  create_date_time STRING COMMENT '创建时间',
  update_date_time STRING COMMENT '最后更新时间',
  deleted STRING COMMENT '是否被删除(禁用)',
  customer_id int COMMENT '客户id',
  customer_relationship_id int COMMENT '客户关系id',
  session_id STRING COMMENT '七陌会话id',
  sid STRING COMMENT '访客id',
  status STRING COMMENT '状态(undeal待领取 deal 已领取 finish 已关闭 changePeer 已流转)',
  users STRING COMMENT '所属坐席',
  create_time STRING COMMENT '七陌创建时间',
  platform STRING COMMENT '平台来源 (pc-网站咨询|wap-wap咨询|sdk-app咨询|weixin-微信咨询)',
  s_name STRING COMMENT '用户名称',
  seo_source STRING COMMENT '搜索来源',
  seo_keywords STRING COMMENT '关键字',
  ip STRING COMMENT 'IP地址',
  referrer STRING COMMENT '上级来源页面',
  from_url STRING COMMENT '会话来源页面',
  landing_page_url STRING COMMENT '访客着陆页面',
  url_title STRING COMMENT '咨询页面title',
  to_peer STRING COMMENT '所属技能组',
  manual_time STRING COMMENT '人工开始时间',
  begin_time STRING COMMENT '坐席领取时间 ',
  reply_msg_count int COMMENT '客服回复消息数',
  total_msg_count int COMMENT '消息总数',
  msg_count int COMMENT '客户发送消息数',
  comment STRING COMMENT '备注',
  finish_reason STRING COMMENT '结束类型',
  finish_user STRING COMMENT '结束坐席',
  end_time STRING COMMENT '会话结束时间',
  platform_description STRING COMMENT '客户平台信息',
  browser_name STRING COMMENT '浏览器名称',
  os_info STRING COMMENT '系统名称',
  area STRING COMMENT '区域',
  country STRING COMMENT '所在国家',
  province STRING COMMENT '省',
  city STRING COMMENT '城市',
  creator int COMMENT '创建人',
  name STRING COMMENT '客户姓名',
  idcard STRING COMMENT '身份证号',
  phone STRING COMMENT '手机号',
  itcast_school_id int COMMENT '校区Id',
  itcast_school STRING COMMENT '校区',
  itcast_subject_id int COMMENT '学科Id',
  itcast_subject STRING COMMENT '学科',
  wechat STRING COMMENT '微信',
  qq STRING COMMENT 'qq号',
  email STRING COMMENT '邮箱',
  gender STRING COMMENT '性别',
  level STRING COMMENT '客户级别',
  origin_type STRING COMMENT '数据来源渠道',
  information_way STRING COMMENT '资讯方式',
  working_years STRING COMMENT '开始工作时间',
  technical_directions STRING COMMENT '技术方向',
  customer_state STRING COMMENT '当前客户状态',
  valid STRING COMMENT '该线索是否是网资有效线索',
  anticipat_signup_date STRING COMMENT '预计报名时间',
  clue_state STRING COMMENT '线索状态',
  scrm_department_id int COMMENT 'SCRM内部部门id',
  superior_url STRING COMMENT '诸葛获取上级页面URL',
  superior_source STRING COMMENT '诸葛获取上级页面URL标题',
  landing_url STRING COMMENT '诸葛获取着陆页面URL',
  landing_source STRING COMMENT '诸葛获取着陆页面URL来源',
  info_url STRING COMMENT '诸葛获取留咨页URL',
  info_source STRING COMMENT '诸葛获取留咨页URL标题',
  origin_channel STRING COMMENT '投放渠道',
  course_id int COMMENT '课程编号',
  course_name STRING COMMENT '课程名称',
  zhuge_session_id STRING COMMENT 'zhuge会话id',
  is_repeat int COMMENT '是否重复线索(手机号维度) 0:正常 1:重复',
  tenant int COMMENT '租户id',
  activity_id STRING COMMENT '活动id',
  activity_name STRING COMMENT '活动名称',
  follow_type int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  shunt_mode_id int COMMENT '匹配到的技能组id',
  shunt_employee_group_id int COMMENT '所属分流员工组',
  ends_time STRING COMMENT '有效时间')
comment '客户关系表'
PARTITIONED BY(starts_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');
")

SQOOP_HOME=/usr/bin/sqoop
output=$(${SQOOP_HOME} import \
--connect jdbc:mysql://172.16.100.42:4000/nev \
--username root \
--password 84^j1TZ-+2b9A@S7Hs \
--query '
select id,
    create_date_time,
    update_date_time,
    deleted,
    customer_id,
    customer_relationship_id,
    session_id,
    sid,
    status, user, create_time, platform, s_name, seo_source, seo_keywords, ip, referrer, from_url, landing_page_url, url_title, to_peer, manual_time, begin_time, reply_msg_count, total_msg_count, msg_count, comment, finish_reason, finish_user, end_time, platform_description, browser_name, os_info, area, country, province, city, creator, name, idcard, phone, itcast_school_id, itcast_school, itcast_subject_id, itcast_subject, wechat, qq, email, gender, level, origin_type, information_way, working_years, technical_directions, customer_state, valid, anticipat_signup_date, clue_state, scrm_department_id, superior_url, superior_source, landing_url, landing_source, info_url, info_source, origin_channel, course_id, course_name, zhuge_session_id, is_repeat, tenant, activity_id, activity_name, follow_type, shunt_mode_id, shunt_employee_group_id,
    FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d") as starts_time, date_format("9999-12-31", "%Y-%m-%d") as ends_time
from customer_clue
where
  (
    create_date_time >= FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(SYSDATE()AS DATE) - INTERVAL 1 DAY),"%Y-%m-%d %H:%i:%s")
     and
    create_date_time < FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(SYSDATE()AS DATE)),"%Y-%m-%d %H:%i:%s")
  )
  or
  (
    update_date_time >= FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(SYSDATE()AS DATE) - INTERVAL 1 DAY),"%Y-%m-%d %H:%i:%s")
    and
    update_date_time < FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(SYSDATE()AS DATE)),"%Y-%m-%d %H:%i:%s")
  ) and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table customer_clue_update \
--hive-partition-key starts_time \
--hive-partition-value ${TD_DATE} \
-m 2 \
--split-by id)

output=$(${HIVE_HOME} -S -e "
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

DROP TABLE itcast_ods.customer_clue_tmp;
CREATE TABLE IF NOT EXISTS itcast_ods.customer_clue_tmp (
  id int COMMENT 'customer_clue_id',
  create_date_time STRING COMMENT '创建时间',
  update_date_time STRING COMMENT '最后更新时间',
  deleted STRING COMMENT '是否被删除(禁用)',
  customer_id int COMMENT '客户id',
  customer_relationship_id int COMMENT '客户关系id',
  session_id STRING COMMENT '七陌会话id',
  sid STRING COMMENT '访客id',
  status STRING COMMENT '状态(undeal待领取 deal 已领取 finish 已关闭 changePeer 已流转)',
  users STRING COMMENT '所属坐席',
  create_time STRING COMMENT '七陌创建时间',
  platform STRING COMMENT '平台来源 (pc-网站咨询|wap-wap咨询|sdk-app咨询|weixin-微信咨询)',
  s_name STRING COMMENT '用户名称',
  seo_source STRING COMMENT '搜索来源',
  seo_keywords STRING COMMENT '关键字',
  ip STRING COMMENT 'IP地址',
  referrer STRING COMMENT '上级来源页面',
  from_url STRING COMMENT '会话来源页面',
  landing_page_url STRING COMMENT '访客着陆页面',
  url_title STRING COMMENT '咨询页面title',
  to_peer STRING COMMENT '所属技能组',
  manual_time STRING COMMENT '人工开始时间',
  begin_time STRING COMMENT '坐席领取时间 ',
  reply_msg_count int COMMENT '客服回复消息数',
  total_msg_count int COMMENT '消息总数',
  msg_count int COMMENT '客户发送消息数',
  comment STRING COMMENT '备注',
  finish_reason STRING COMMENT '结束类型',
  finish_user STRING COMMENT '结束坐席',
  end_time STRING COMMENT '会话结束时间',
  platform_description STRING COMMENT '客户平台信息',
  browser_name STRING COMMENT '浏览器名称',
  os_info STRING COMMENT '系统名称',
  area STRING COMMENT '区域',
  country STRING COMMENT '所在国家',
  province STRING COMMENT '省',
  city STRING COMMENT '城市',
  creator int COMMENT '创建人',
  name STRING COMMENT '客户姓名',
  idcard STRING COMMENT '身份证号',
  phone STRING COMMENT '手机号',
  itcast_school_id int COMMENT '校区Id',
  itcast_school STRING COMMENT '校区',
  itcast_subject_id int COMMENT '学科Id',
  itcast_subject STRING COMMENT '学科',
  wechat STRING COMMENT '微信',
  qq STRING COMMENT 'qq号',
  email STRING COMMENT '邮箱',
  gender STRING COMMENT '性别',
  level STRING COMMENT '客户级别',
  origin_type STRING COMMENT '数据来源渠道',
  information_way STRING COMMENT '资讯方式',
  working_years STRING COMMENT '开始工作时间',
  technical_directions STRING COMMENT '技术方向',
  customer_state STRING COMMENT '当前客户状态',
  valid STRING COMMENT '该线索是否是网资有效线索',
  anticipat_signup_date STRING COMMENT '预计报名时间',
  clue_state STRING COMMENT '线索状态',
  scrm_department_id int COMMENT 'SCRM内部部门id',
  superior_url STRING COMMENT '诸葛获取上级页面URL',
  superior_source STRING COMMENT '诸葛获取上级页面URL标题',
  landing_url STRING COMMENT '诸葛获取着陆页面URL',
  landing_source STRING COMMENT '诸葛获取着陆页面URL来源',
  info_url STRING COMMENT '诸葛获取留咨页URL',
  info_source STRING COMMENT '诸葛获取留咨页URL标题',
  origin_channel STRING COMMENT '投放渠道',
  course_id int COMMENT '课程编号',
  course_name STRING COMMENT '课程名称',
  zhuge_session_id STRING COMMENT 'zhuge会话id',
  is_repeat int COMMENT '是否重复线索(手机号维度) 0:正常 1:重复',
  tenant int COMMENT '租户id',
  activity_id STRING COMMENT '活动id',
  activity_name STRING COMMENT '活动名称',
  follow_type int COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  shunt_mode_id int COMMENT '匹配到的技能组id',
  shunt_employee_group_id int COMMENT '所属分流员工组',
  ends_time STRING COMMENT '有效时间')
comment '客户关系表'
PARTITIONED BY(starts_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
TBLPROPERTIES ('orc.compress'='ZLIB');

insert overwrite table itcast_ods.customer_clue_tmp partition (starts_time)
select * from 
(
    select 
      id,
      create_date_time,
      update_date_time,
      deleted,
      customer_id,
      customer_relationship_id,
      session_id,
      sid,
      status,
      user,
      create_time,
      platform,
      s_name,
      seo_source,
      seo_keywords,
      ip,
      referrer,
      from_url,
      landing_page_url,
      url_title,
      to_peer,
      manual_time,
      begin_time,
      reply_msg_count,
      total_msg_count,
      msg_count,
      comment,
      finish_reason,
      finish_user,
      ends_time,
      platform_description,
      browser_name,
      os_info,
      area,
      country,
      province,
      city,
      creator,
      name,
      idcard,
      phone,
      itcast_school_id,
      itcast_school,
      itcast_subject_id,
      itcast_subject,
      wechat,
      qq,
      email,
      gender,
      level,
      origin_type,
      information_way,
      working_years,
      technical_directions,
      customer_state,
      valid,
      anticipat_signup_date,
      clue_state,
      scrm_department_id,
      superior_url,
      superior_source,
      landing_url,
      landing_source,
      info_url,
      info_source,
      origin_channel,
      course_id,
      course_name,
      zhuge_session_id,
      is_repeat,
      tenant,
      activity_id,
      activity_name,
      follow_type,
      shunt_mode_id,
      shunt_employee_group_id,
      '9999-12-31' ends_time,
      FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d") as starts_time
    from itcast_ods.customer_clue_update where starts_time=FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d")

    union all 
    select
      rs.id,
      rs.create_date_time,
      rs.update_date_time,
      rs.deleted,
      rs.customer_id,
      rs.customer_relationship_id,
      rs.session_id,
      rs.sid,
      rs.status,
      rs.user,
      rs.create_time,
      rs.platform,
      rs.s_name,
      rs.seo_source,
      rs.seo_keywords,
      rs.ip,
      rs.referrer,
      rs.from_url,
      rs.landing_page_url,
      rs.url_title,
      rs.to_peer,
      rs.manual_time,
      rs.begin_time,
      rs.reply_msg_count,
      rs.total_msg_count,
      rs.msg_count,
      rs.comment,
      rs.finish_reason,
      rs.finish_user,
      rs.ends_time,
      rs.platform_description,
      rs.browser_name,
      rs.os_info,
      rs.area,
      rs.country,
      rs.province,
      rs.city,
      rs.creator,
      rs.name,
      rs.idcard,
      rs.phone,
      rs.itcast_school_id,
      rs.itcast_school,
      rs.itcast_subject_id,
      rs.itcast_subject,
      rs.wechat,
      rs.qq,
      rs.email,
      rs.gender,
      rs.level,
      rs.origin_type,
      rs.information_way,
      rs.working_years,
      rs.technical_directions,
      rs.customer_state,
      rs.valid,
      rs.anticipat_signup_date,
      rs.clue_state,
      rs.scrm_department_id,
      rs.superior_url,
      rs.superior_source,
      rs.landing_url,
      rs.landing_source,
      rs.info_url,
      rs.info_source,
      rs.origin_channel,
      rs.course_id,
      rs.course_name,
      rs.zhuge_session_id,
      rs.is_repeat,
      rs.tenant,
      rs.activity_id,
      rs.activity_name,
      rs.follow_type,
      rs.shunt_mode_id,
      rs.shunt_employee_group_id,
      if(up.id is null, rs.ends_time, date_add(up.starts_time,-1)) ends_time,
      rs.starts_time
    from itcast_ods.customer_clue rs left join 
        (
            select
            *
            from itcast_ods.customer_clue_update
            where starts_time=FROM_UNIXTIME(unix_timestamp(), "%Y-%m-%d")
        ) up
    on rs.id=up.id  where rs.starts_time >= date_sub(FROM_UNIXTIME(UNIX_TIMESTAMP()),30) and rs.ends_time='9999-12-31'
)his 
order by his.id, starts_time;

INSERT OVERWRITE TABLE itcast_ods.customer_clue partition (starts_time) 
SELECT * from itcast_ods.customer_clue_tmp;
")
# DWD 数据清洗转换
# 因为业务方将更新周期限制在30天内,而明细层不涉及统计,只有数据清洗转换操作,所以我们在进行增量统计时,只需要重新计算上个月1日至今的数据即可。
# 通过start_time来指定清洗的数据时间范围(昨天:新增/更新);
# 通过end_time来指定获取当前有效的数据。
# 清洗掉已删除的数据;
# 判断学校id和学科id,把为空的字段统一转换为-1;
# 将origin_type来源渠道字段转换为线上/线下,如果origin_type是NETSERVICE和PRESIGNUP类型,即为1线上,否则为0线下。

# 通过shell脚本获取上个月1日的日期,替换sql中的查询条件。
#! /bin/bash
SQOOP_HOME=/usr/bin/sqoop

#昨天
Last_DATE=$(date -d "-1 day" +%Y-%m-%d)

${HIVE_HOME} -S -e "
--分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
--分桶
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;

insert into table itcast_dwd.itcast_intention_dwd partition (yearinfo,monthinfo,dayinfo)
select
    rs.id as rid,
    rs.customer_id,
    rs.create_date_time,
    if((rs.itcast_school_id is null) or (rs.itcast_school_id = 0), -1, rs.itcast_school_id) as itcast_school_id,
    rs.deleted,
    rs.origin_type,
    if((rs.itcast_subject_id is null) or (rs.itcast_subject_id = 0), -1, rs.itcast_subject_id) as itcast_subject_id,
    substr(rs.create_date_time, 12, 2) hourinfo,
    if(rs.origin_type='NETSERVICE', '1', if(rs.origin_type='PRESIGNUP', '1', '0')) as origin_type_stat,
    substr(rs.create_date_time, 1, 4) yearinfo,
    substr(rs.create_date_time, 6, 2) monthinfo,
    substr(rs.create_date_time, 9, 2) dayinfo
from itcast_ods.customer_relationship rs
where rs.deleted = 0
  and substr(rs.start_time, 1, 10) = '${Last_DATE}'--2019-11-01
  and rs.end_time = '9999-12-31';
"

# DWM
# 通过年月日限定,只关联上个月1日至今的数据。
#! /bin/bash
SQOOP_HOME=/usr/bin/sqoop

#上个月1日
Last_Month_DATE=$(date -d "-1 month" +%Y-%m-01)

${HIVE_HOME} -S -e "
--分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
--分桶
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
insert into table itcast_dwm.itcast_intention_dwm partition (yearinfo,monthinfo,dayinfo)
select
    dwd.customer_id,
    dwd.create_date_time,
    cus.area,
    dwd.itcast_school_id,
    sch.name as itcast_school_name,
    dwd.deleted,
    dwd.origin_type,
    dwd.itcast_subject_id,
    sub.name as itcast_subject_name,
    dwd.hourinfo,
    dwd.origin_type_stat,
    if(clue.clue_state='VALID_NEW_CLUES', '1', if(clue.clue_state='VALID_PUBLIC_NEW_CLUE', '0', '-1')) as clue_state_stat,
    e.department_id as tdepart_id,
    dept.name as tdepart_name,
    dwd.yearinfo,
    dwd.monthinfo,
    dwd.dayinfo
from itcast_dwd.itcast_intention_dwd dwd
left join itcast_ods.customer_clue clue on clue.customer_relationship_id=dwd.rid
left join itcast_dimen.customer cus on dwd.customer_id = cus.id
left join itcast_dimen.employee e on dwd.creator = e.id
left join itcast_dimen.scrm_department dept on e.department_id = dept.id
left join itcast_dimen.itcast_subject sub on dwd.itcast_subject_id = sub.id
left join itcast_dimen.itcast_school sch on dwd.itcast_school_id = sch.id
where concat_ws('-',dwd.yearinfo,dwd.monthinfo,dwd.dayinfo) >= '${Last_Month_DATE}'--2019-11-01;
"
-- 统计分析
-- 新增总意向量
-- 可以查询2016-10-12之前的数据进行测试。
-- 小时和天数据,重新计算上个月1日之后的数据;月份维度,计算上个月之后的数据;年份维度,计算上个月1日所在的年份之后的数据。
--总意向量分组(按照时间和常驻类型统计)
--小时
insert into itcast_dws.itcast_intention_dws partition (yearinfo, monthinfo, dayinfo)
select
    count(distinct customer_id) as customer_total,
    '-1' as area,
    '-1' itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '1' as grouptype,
    '1' as time_type,
    yearinfo,
    monthinfo,
    dayinfo
from itcast_dwm.itcast_intention_dwm dwm
where concat_ws('-',dwm.yearinfo,dwm.monthinfo,dwm.dayinfo) >= '${Last_Month_DATE}'--2011-08-01
group by yearinfo, monthinfo, dayinfo, hourinfo, origin_type_stat, clue_state_stat;
--天
insert into itcast_dws.itcast_intention_dws partition (yearinfo, monthinfo, dayinfo)
select
    count(distinct customer_id) as customer_total,
    '-1' as area,
    '-1' itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    '-1' as hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '1' as grouptype,
    '2' as time_type,
    yearinfo,
    monthinfo,
    dayinfo
from itcast_dwm.itcast_intention_dwm dwm
where concat_ws('-',dwm.yearinfo,dwm.monthinfo,dwm.dayinfo) >= '${Last_Month_DATE}'--2011-08-19
group by yearinfo, monthinfo, dayinfo, origin_type_stat, clue_state_stat;
--月
insert into itcast_dws.itcast_intention_dws partition (yearinfo, monthinfo, dayinfo)
select
    count(distinct customer_id) as customer_total,
    '-1' as area,
    '-1' itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    '-1' as hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo,'-',monthinfo) as time_str,
    '1' as grouptype,
    '1' as time_type,
    yearinfo,
    monthinfo,
    '-1' as dayinfo
from itcast_dwm.itcast_intention_dwm dwm
where concat_ws('-',dwm.yearinfo,dwm.monthinfo) >= '${V_Month}'--2011-08
group by yearinfo, monthinfo, origin_type_stat, clue_state_stat;
--年
insert into itcast_dws.itcast_intention_dws partition (yearinfo, monthinfo, dayinfo)
select
    count(distinct customer_id) as customer_total,
    '-1' as area,
    '-1' itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    '-1' as hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo) as time_str,
    '1' as grouptype,
    '1' as time_type,
    yearinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwm.itcast_intention_dwm dwm
where dwm.yearinfo >= '${V_Year}'--2011
group by yearinfo, origin_type_stat, clue_state_stat;

-- 意向学员位置热力图
--地区分组(按照地区、时间和常驻类型统计)
--小时
insert into itcast_dws.itcast_intention_dws partition (yearinfo, monthinfo, dayinfo)
select
    count(distinct customer_id) as customer_total,
    area,
    '-1' itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '2' as grouptype,
    '1' as time_type,
    yearinfo,
    monthinfo,
    dayinfo
from itcast_dwm.itcast_intention_dwm dwm
where concat_ws('-',dwm.yearinfo,dwm.monthinfo,dwm.dayinfo) >= '${Last_Month_DATE}'--2011-08-19
group by area, yearinfo, monthinfo, dayinfo, hourinfo, origin_type_stat, clue_state_stat;
--天
insert into itcast_dws.itcast_intention_dws partition (yearinfo, monthinfo, dayinfo)
select
    count(distinct customer_id) as customer_total,
    area,
    '-1' itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    '-1' as hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '2' as grouptype,
    '2' as time_type,
    yearinfo,
    monthinfo,
    dayinfo
from itcast_dwm.itcast_intention_dwm dwm
where concat_ws('-',dwm.yearinfo,dwm.monthinfo,dwm.dayinfo) >= '${Last_Month_DATE}'--2011-08-19
group by area, yearinfo, monthinfo, dayinfo, origin_type_stat, clue_state_stat;
--月
insert into itcast_dws.itcast_intention_dws partition (yearinfo, monthinfo, dayinfo)
select
    count(distinct customer_id) as customer_total,
    area,
    '-1' itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    '-1' as hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo,'-',monthinfo) as time_str,
    '1' as grouptype,
    '1' as time_type,
    yearinfo,
    monthinfo,
    '-1' as dayinfo
from itcast_dwm.itcast_intention_dwm dwm
where concat_ws('-',dwm.yearinfo,dwm.monthinfo) >= '${V_Month}'--2011-08
group by area, yearinfo, monthinfo, origin_type_stat, clue_state_stat;
--年
insert into itcast_dws.itcast_intention_dws partition (yearinfo, monthinfo, dayinfo)
select
    count(distinct customer_id) as customer_total,
    area,
    '-1' itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    '-1' as hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo) as time_str,
    '2' as grouptype,
    '1' as time_type,
    yearinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwm.itcast_intention_dwm dwm
where dwm.yearinfo >= '${V_Year}'--2011
group by area, yearinfo, origin_type_stat, clue_state_stat;
# OOzie Shell示例
# 需要在上个月1日的基础上,获取到对应的年份、月份字符,以替换sql中的变量。
# 意向中心贡献占比小时数据:
#! /bin/bash
#上个月1日
Last_Month_DATE=$(date -d "$(date +%Y%m)01 last month" +%Y-%m-01)

#根据TD_DATE计算年季度月日
V_PARYEAR=`date --date="$Last_Month_DATE" +%Y`
V_PARMONTH=`date --date="$Last_Month_DATE" +%m`
V_PARDAY=`date --date="$Last_Month_DATE" +%d`

#获取季度,-m为不带0,比如7,而不是07
V_month_for_quarter=`date --date="$Last_Month_DATE" +%-m`
V_PARQUARTER=$(((${V_month_for_quarter}-1)/3+1))

${HIVE_HOME} -S -e "
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;

set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;

insert into itcast_dws.itcast_intention_dws partition (yearinfo, monthinfo, dayinfo)
select
    count(distinct customer_id) as customer_total,
    '-1' as area,
    '-1' itcast_school_id,
    '-1' as itcast_school_name,
    '-1' as origin_type,
    '-1' as itcast_subject_id,
    '-1' as itcast_subject_name,
    hourinfo,
    origin_type_stat,
    clue_state_stat,
    '-1' as tdepart_id,
    '-1' as tdepart_name,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '1' as grouptype,
    '1' as time_type,
    yearinfo,
    monthinfo,
    dayinfo
from itcast_dwm.itcast_intention_dwm dwm
where concat_ws('-',dwm.yearinfo,dwm.monthinfo,dwm.dayinfo) >= '${Last_Month_DATE}'
group by yearinfo, monthinfo, dayinfo, hourinfo, origin_type_stat, clue_state_stat;
"
# 导出数据
# 按照年份,先删除所在年的数据,后导出。
#! /bin/bash
SQOOP_HOME=/usr/bin/sqoop
HOST=172.16.100.42
USERNAME="root"
PASSWORD='84^j1TZ-+2b9A@S7Hs'
PORT=4000
DBNAME="scrm_bi"
MYSQL=/usr/bin/mysql
#上个月1日
if [[ $1 == "" ]];then
    Last_Month_DATE=$(date -d "-1 month" +%Y-%m-01)
else
    Last_Month_DATE=$1
fi
TD_YEAR=$(date -d "$Last_Month_DATE" +%Y)

${MYSQL} -h${HOST} -P${PORT} -u${USERNAME} -p${PASSWORD} -D${DBNAME} -e "delete from itcast_intention_app where yearinfo = '${Last_Month_DATE:0:4}'"
${SQOOP_HOME} export \
--connect "jdbc:mysql://${HOST}:${PORT}/${DBNAME}?useUnicode=true&characterEncoding=utf-8" \
--username ${USERNAME} \
--password ${PASSWORD} \
--table itcast_intention_app \
--hcatalog-database itcast_dws \
--hcatalog-table itcast_intention_dws \
--hcatalog-partition-keys yearinfo  \
--hcatalog-partition-values ${TD_YEAR} \
-m 2