0. Hive 索引与优化

Hive的索引与优化

Hive的索引

索引的作用: 加快查询的效率

为什么索引可以提升查询效率呢?

hive索引是在 分区 分桶优化基础上, 又提供一种新的优化手段, 如果分区 和分桶受限, 可以尝试使用索引的方式来优化处理

Hive支持索引,但是Hive的索引与关系型数据库中的索引并不相同,比如,Hive不支持主键或者外键。
Hive索引可以建立在表中的某些列上,以提升一些操作的效率,例如减少MapReduce任务中需要读取的数据块的数量。
在可以预见到分区数据非常庞大的情况下,分桶和索引常常是优于分区的。而分桶由于SMB Join对关联键要求严格,所以并不是总能生效。

hive提供了三种索引:
* 原始索引
* row group index(行组索引)
* bloom filter index(布隆过滤索引)

Hive原始索引

Hive的索引目的是提高Hive表指定列的查询速度。
没有索引时,类似'WHERE tab1.col1 = 10' 的查询,Hive会加载整张表或分区,然后处理所有的rows,但是如果在字段col1上面存在索引时,那么只会加载和处理文件的一部分。
。在每次建立、更新数据后,Hive索引不会自动更新,需要手动进行更新(重建索引以构建索引表),会触发一个mr job
Hive索引使用过程繁杂,而且性能一般,在Hive3.0中已被删除,在工作环境中不推荐优先使用,在分区数量过多或查询字段不是分区字段时,索引可以作为补充方案同时使用。推荐使用ORC文件格式的索引类型进行查询。

hive的原始索引, 可以针对表中某一列或者某几列构建索引, 构建之后, 当查询的时候使用到索引字段, 可以帮助提升一定效率
- 弊端: 
  - hive原始索引, 不会自动更新, 每一次对表中数据进行处理后, 都需要进行重建索引, 而这个效率是比较差, 而且整体效率的提升,相对比较有限

row group index(行组索引)

一个ORC文件包含一个或多个stripes(groups of row data),每个stripe中包含了每个column的min/max值的索引数据,当查询中有<,>,=的操作时,会根据min/max值,跳过扫描不包含的stripes。
而其中为每个stripe建立的包含min/max值的索引,就称为Row Group Index行组索引,也叫min-max Index大小对比索引,或者Storage Index。
在建立ORC格式表时,指定表参数’orc.create.index’=’true’之后,便会建立Row Group Index,需要注意的是,为了使Row Group Index有效利用,向表中加载数据时,必须对需要使用索引的字段进行排序,否则,min/max会失去意义。另外,这种索引主要用于数值型字段的查询过滤优化上。
设置hive.optimize.index.filter为true,并重启hive

条件:
  1) 要求表必须是ORC存储格式
  2) 需要对那个字段进行索引, 必须对这个字段进行排序, 否则索引意义不大
  3) 在创建表的时候, 必须开启行组索引: ’orc.create.index’=’true’
  4) 主要应用在 < > = 场景中, 主要针对数值类型数据

建表操作:

CREATE TABLE lxw1234_orc2(
    ....
) stored AS ORC
TBLPROPERTIES
(
    'orc.compress'='SNAPPY',
--     开启行组索引
    'orc.create.index'='true'
)

-- 插入数据:
    SELECT CAST(siteid AS INT) AS id,
    pcid
    FROM lxw1234_text
--     插入的数据保持排序
    DISTRIBUTE BY id sort BY id;

bloom filter index(布隆过滤索引)

在建表时候,通过表参数”orc.bloom.filter.columns”=”pcid”来指定为那些字段建立BloomFilter索引,这样,在生成数据的时候,会在每个stripe中,为该字段建立BloomFilter的数据结构,当查询条件中包含对该字段的=号过滤时候,先从BloomFilter中获取以下是否包含该值,如果不包含,则跳过该stripe。    

条件:
  1) 必须是ORC类型的表
  2) 需要按照哪一个字段进行等值连接, 需要对这个字段构建布隆索引
  3) 仅适合做等值连接, 不局限于数据类型
  4) 在建表的时候, 必须指定为那些字段开启: ”orc.bloom.filter.columns”=”pcid,字段2..”

如何使用:

CREATE TABLE lxw1234_orc2 (
   ....
) stored AS ORC
TBLPROPERTIES
(
    'orc.compress'='SNAPPY',
    'orc.create.index'='true',
--     pcid字段开启BloomFilter索引
    "orc.bloom.filter.columns"="pcid"
)

-- 插入数据: 
SELECT CAST(siteid AS INT) AS id,
pcid
FROM lxw1234_text
DISTRIBUTE BY id sort BY id;

-- 使用索引:
SET hive.optimize.index.filter=true --开启 hive的自动使用索引
SELECT COUNT(1) FROM lxw1234_orc1 WHERE id >= 0 AND id <= 1000   (行组索引)
AND pcid IN ('0005E26F0DCCDB56F9041C','A'); (布隆过滤索引)

SET hive.optimize.index.filter=true --开启 hive的自动使用索引  

-- 当然也可以将这个配置, 直接配置到cm的hive的配置窗口中, 进行统一配置

在生产中这些索引如何选择呢?

1) 对于行组索引, 建议是常开即可
    在插入数据, 如果不明确对那个字段使用行组索引, 可以任意的插入, 在条件合适情况下, 也在使用行组索引

2) 布隆过滤索引:  主要将那些用于 等值连接的字段开启索引即可
    一般主要指的是 join关联字段 以及清洗字段

Hive的相关优化

hive的相关的函数(补充说明)

* **if函数:**
  * 作用: 用于进行逻辑判断操作
  * 语法:  if(条件, true返回信息,false返回信息)
  * 注意: if函数支持嵌套使用
  * 说明:  当条件testCondition为TRUE时,返回valueTrue;否则返回valueFalseOrNull。
* **nvl函数:**
  * 作用: null值替换函数
  * 格式: nvl(T value, T default_value)
  * 说明:如果value值为NULL就返回default_value,否则返回value,主要功能就是实现null值替换。
* **COALESCE函数**
  * 作用: 非空查找函数:
  * 格式: coalesce(值1,值2,值3...)
  * 说明: 从第一个值开始判断, 找到第一个不为null的值, 将其返回, 如果都为null,返回null
* **CASE WHEN THEN 函数:**
  * 格式1: case 字段 when 条件 then  值1 when 条件 then 值2 .. else 值3 end
  * 格式2: case  when  条件 then 值1 when 条件2 then 值2 .. else 值3 end
  * 说明:如果a等于b,那么返回c;如果a等于d,那么返回e;否则返回f。注意这种when的判断条件可以有很多个。如CASE 4 WHEN 5  THEN 5 WHEN 4 THEN 4 ELSE 3 END 将返回4。
* **isnull() | isnotnull() 函数**
  * 作用: 
    * isnull() 判断是否为null, 如果为null返回true, 否则返回false
    * isnotnull() 判断是否不为null, 如果不为null, 返回true, 如果为null 返回false

hive的并行优化

1) 并行编译
    - 说明: hive在同一时刻只能编译一个会话中SQL, 如果有多个会话一起来执行SQL, 此时出现排队的情况, 只有当这一个会话中SQL全部编译后, 才能编译另一个会话的SQL, 导致执行效率变慢
    - 解决方案:
        - hive.driver.parallel.compilation 是否开启并行编译 设置为true   
        - hive.driver.parallel.compilation.global.limit 最大允许同时有多少个SQL一起编译 设置为0表示无限制

    说明:
        这两项可以建议直接在CM的hive配置窗口上进行永久配置 (通用配置)

2) 并行执行: 
    - 说明: 在运行一个SQL的时候, 这个SQL形成的执行计划中, 可能会被拆分为多个阶段, 当各个阶段之间没有依赖关系的时候, 可以尝试让多个阶段同时运行, 从而提升运行的效率, 这就是并行执行
    - 配置方案:
        - set hive.exec.parallel=true;  是否开启并行执行
        - set hive.exec.parallel.thread.number=16;  最大允许并行执行的数量

注意:关于并行优化, 必须要求服务器有资源, 如果没有资源, 及时满足并行的条件, 也不会执行

Hive小文件合并

思考: 小文件有什么影响呢?
- HDFS角度: 
    - 每一个小文件, 都会有一份元数据, 当小文件过多后, 会导致出现大量的元数据存储namenonde的内存中, 从而导致内存使用率增大, 一旦namenode内存存满了, 即使datanode依然有空间, 也是无法存储的

- MR角度: 
    - 在运行MR的时候, 每一个文件至少是一个文件切片, 也就意味至少需要运行一个mapTask. 当小文件过多后, 就会导致产生更多的mapTask, 而每一个mapTask只处理极少的数据, 导致资源被大量占用, 运行的时间都没有申请资源时间长
    - 假设: 150M  10M 8M 300M  12M 6M  请问会运行几个mapTask: 9个

从hive角度如何解决小文件过多的问题呢?  在执行SQL的时候, 输出的文件数量尽量变得少一些
- hive.merge.mapfiles : 是否开启map端小文件合并 (适用于MR只有map没有reduce, map输出结果就是最终结果)
- hive.merge.mapredfiles : 是否开启reduce端小文件合并操作
- hive.merge.size.per.task: 合并后输出文件的最大值 ,默认是128M
- hive.merge.smallfiles.avgsize: 判断输出各个文件平均大小, 当这个大小小于设置值, 认为出现了小文件问题,需要进行合并操作

比如说: 设置合并文件后, 输出最大值128M, 设置平均值为 50M
  - 假设一个MR输出一下几个文件: 
    - 1M,10M,5M,3M,150M,80M,2M  平均值:35.xxx
  - 发现输出的多个文件的平均值比设定的平均值要小, 说明出现小文件的问题, 需要进行合并, 此时会合并结果为: 128M,123M

矢量化查询

- 说明: 
  - 让hive在读取数据的时候, 一批一批的读取, 默认是一条一条的读, 一条条的处理, 开启矢量化查询后, 变成一批一批读取, 一批一批的处理
- 前提条件: 表的文件存储格式必须为ORC
- set hive.vectorized.execution.enabled=true;

#### 读取零拷贝
- 说明: 在hive读取数据的时候, 只需要读取跟SQL相关的列的数据即可, 不使用的列, 不进行读取, 从而减少读取数据, 提升效率
- 提前条件: 表的文件存储格式必须为ORC
- 如何开启: set hive.exec.orc.zerocopy=true;
- 示例: A表有 a,b,c,d,e 五个字段
  - select a,b,b  from A where b=xxx and c between xx and xxx;
  - 发现SQL中没有使用d和e两个字段, 如果开启读取零拷贝, 在读取数据的时候, 就不会将d和e这两个字段读取到内存中

数据倾斜的优化

什么是数据倾斜呢?

  • 在运行过程中,有多个reduce, 每一个reduce拿到的数据不是很均匀, 导致其中某一个或者某几个reduce拿到数据量远远大于其他的reduce拿到数据量, 此时认为出现了数据倾斜问题

数据倾斜会导致问题?
1) 执行效率下降(整个执行时间, 就看最后一个reduce结束时间)
2) 由于其中某几个reduce长时间运行, 资源长期被占用, 一旦超时, YARN强制回收资源, 导致运行失败
3) 导致节点出现宕机问题

在执行什么SQL的时候, 会出现多个reduce的情况呢?
1) 多表join的时候
2) 执行group by的时候
3) 执行分桶操作(跟数据倾斜没太大关系)

发生数据倾斜的情况:
1) 执行多表查询的时候
2) 执行group by的时候

group by 数据倾斜

解决方案:

方案一:  采用combiner的方式来解决 (在map端提前聚合)
    核心: 在每一个mapTask进行提前聚合操作, 将聚合之后结果, 发送给reduce中, 完成最终的聚合, 从而减少从map到reduce的数据量, 减轻数据倾斜压力
    配置: 
        set hive.map.aggr=true;  开启map端提前聚合操作(combiner)

方案二:  负载均衡解决方案  (大combiner)
    核心: 采用两个MR来解决, 第一个MR负责将数据均匀落在不同reduce上, 进行聚合统计操作, 形成一个局部的结果, 在运行第二个MR读取第一个MR的局部结果, 按照相同key发往同一个reduce的方案, 完成最终聚合统计操作
    配置: 
        set hive.groupby.skewindata=true;
    注意:
        一旦使用方案二, hive不支持多列上的采用多次distinct去重操作, 一旦使用, 就会报错
            错误内容: DISTINCT on different columns notsupported with skew in data.
        示例: 
            (1) SELECT count(DISTINCT uid) FROM log
            (2) SELECT ip, count(DISTINCT uid) FROM log GROUP BY ip
            (3) SELECT ip, count(DISTINCT uid, uname) FROMlog GROUP BY ip
            (4) SELECT ip, count(DISTINCT uid), count(DISTINCT uname) FROMlog GROUP BY ip
            其中: 1,2,3 是可以正常执行的, 4会报错

join的数据倾斜

解决方案一 : 
    通过采用 map join,bucket map join, SMB map join
    方案: 将reduce端join的操作, 移植到map端进行join即可, 直接将倾斜排除即可, 因为在map端基本不会有倾斜问题
但是: 不管是map join, 还是 bucket map join以及SMB map join在使用的时候 都必须满足相关的条件, 但是很多时候, 我们的环境无法满足这些条件, 那么也就意味无法使用这些解决方案

解决方案二:
    思路: 将那些容易产生倾斜的key的值, 从这个环境中, 排除掉, 这样自然就没有倾斜问题, 讲这些倾斜的数据单独找一个MR来处理即可
    处理方案:  
        编译期解决方案:
            配置:
                set hive.optimize.skewjoin.compiletime=true;
            建表:
                CREATE TABLE list_bucket_single (key STRING, value STRING)
                -- 倾斜的字段和需要拆分的key值
                SKEWED BY (key) ON (1,5,6)
                --  为倾斜值创建子目录单独存放
                [STORED AS DIRECTORIES];
            说明:
                当明确知道表中那些key的值有倾斜问题, 一般采用编译期解决, 在建表的时候, 提前设置好对应值有倾斜即可, 这样在执行的时候, hive会直接将这些倾斜的key的值从这个MR排除掉, 单独找一个MR来处理即可
        运行期解决方案:
            配置: 
                set hive.optimize.skewjoin=true; 是否开启运行期倾斜解决join
                set hive.skewjoin.key=100000; 当key出现多少个的时候, 认为有倾斜
            说明: 
                在执行的过程中, hive会记录每一个key出现的次数, 当出现次数达到设置的阈值后, 认为这个key有倾斜的问题, 直接将这个key对应数据排除掉, 单独找一个MR来处理即可

建议:
    如果提前知道表中有那些key有倾斜, 直接使用编译期即可
    如果仅知道一部分, 对于其他key无法保证, 建议编译期和运行期同时开启

map join相关优化

Map Join
- MapJoin顾名思义,就是在Map阶段进行表之间的连接。而不需要进入到Reduce阶段才进行连接。这样就节省了在Shuffle阶段时要进行的大量数据传输。
- 开启: set hive.auto.convert.join=true;
- 表文件的大小作为开启和关闭MapJoin的阈值: set hive.auto.convert.join.noconditionaltask.size=512000000
  - 旧版本为hive.mapjoin.smalltable.filesize
- MapJoin的使用场景:
  1. 关联操作中有一张表非常小
  2. 不等值的链接操作
  3. 示例:select f.a,f.b from A t join B f  on ( f.a=t.a and f.ftime=20110802)  
- Hive v0.7之前,需要使用hint提示 /*+ mapjoin(table) */才会执行MapJoin: 
  - select \/\*+ mapjoin(A)\*\/ f.a,f.b from A t join B f  on ( f.a=t.a and f.ftime=20110802) 

bucket map join的生效条件:
- set hive.optimize.bucketmapjoin = true;
- 一个表的bucket数是另一个表bucket数的整数倍;
- bucket 列在join条件中;
- 必须是应用在map join的场景中;

大表对大表-join 实现smb join的条件:
- 使用SMB Join来提高性能。SMB Join基于bucket-mapjoin的有序bucket,可实现在map端完成join操作,可以有效地减少或避免shuffle的数据量。
- hive.enforce.sorting 设置为 true。开启强制排序时,插数据到表中会进行强制排序,默认false。
- 插入数据时通过在sql中用distributed c1 sort by c1 或者 cluster by c1
- 表创建时必须是CLUSTERED且SORTED
- create table test_smb_2(mid string,age_id string) CLUSTERED BY(mid) SORTED BY(mid) INTO 500 BUCKETS;
- 涉及到分桶表操作的齐全配置为:
  - 写入数据强制分桶 set hive.enforce.bucketing=true;
  - 写入数据强制排序 set hive.enforce.sorting=true;
  - 开启bucketmapjoin set hive.optimize.bucketmapjoin.sortedmerge = true;
  - 开启SMB Join 
    - set hive.auto.convert.sortmerge.join=true;
    - set hive.auto.convert.sortmerge.join.noconditionaltask=true;
- 一个表的Bucket数要等于另一个表Bucket数;
- bucket列 == join列 == sort列;
- 必须应用在bucket map join的场景中;

union all相关优化点:

配置项:
    set hive.optimize.union.remove=true;
作用:
    此项配置减少对Union all子查询中间结果的二次读写
说明:
    此项配置一般和join的数据倾斜组合使用

如何感知有数据倾斜

方案一: 通过查看 job history历史日志(19888端口)   适用于MR已经执行完成了

方案二: 在运行过程中如何查看是否有倾斜呢?  借助HUE查看作业预览持续时间

关联优化器(共享shuffle)

配置:
    set hive.optimize.correlation=true;
说明:
    在Hive的一些复杂关联查询中,可能同时还包含有group by等能够触发shuffle的操作,有些时候shuffle操作是可以共享的,通过关联优化器选项,可以尽量减少复杂查询中的shuffle,从而提升性能。
比如: 
    select  id,max(id)  from itcast_ods.web_chat_ems group by id;
    union all
    select  id,min(id)  from itcast_ods.web_chat_ems group by id;

数据倾斜优化总结说明

# 常开项: 
# 是否开启并行执行
set hive.exec.parallel=true;
# 最大允许并行执行的数量
set hive.exec.parallel.thread.number=16;
# 矢量化查询
set hive.vectorized.execution.enabled=true;
# 读取零拷贝
set hive.exec.orc.zerocopy=true;
# 关联优化器
set hive.optimize.correlation=true;

# 针对性开启:
# 开启 group by combiner数据倾斜方案
set hive.map.aggr=true;
# 开启groupby 负载均衡优化
set hive.groupby.skewindata=true;
# join的编译期优化
set hive.optimize.skewjoin.compiletime=true;
# 是否开启运行期倾斜解决join
set hive.optimize.skewjoin=true;
# 当key出现多少个的时候, 认为有倾斜
set hive.skewjoin.key=100000;
# union all优化
set hive.optimize.union.remove=true;