7. 共享变量与Spark 内核调度

共享变量与Spark 内核调度

广播变量

# coding:utf8
# 示例代码-引出广播变量
import time

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    stu_info_list = [(1, '张大仙', 11),
                     (2, '王晓晓', 13),
                     (3, '张甜甜', 11),
                     (4, '王大力', 11)]

    score_info_rdd = sc.parallelize([
        (1, '语文', 99),
        (2, '数学', 99),
        (3, '英语', 99),
        (4, '编程', 99),
        (1, '语文', 99),
        (2, '编程', 99),
        (3, '语文', 99),
        (4, '英语', 99),
        (1, '语文', 99),
        (3, '英语', 99),
        (2, '编程', 99)
    ])

    def map_func(data):
        id = data[0]
        name = ""
        # 匹配本地list和分布式rdd中的学生ID  匹配成功后 即可获得当前学生的姓名
        for stu_info in stu_info_list:
            stu_id = stu_info[0]
            if id == stu_id:
                name = stu_info[1]

        return (name, data[1], data[2])

    print(score_info_rdd.map(map_func).collect())
    sc.stop()
- 场景: 本地集合对象 和 分布式集合对象(RDD) 进行关联的时候
- 本地list对象,被发送到每个分区的处理线程上使用,也就是一个executor内,其实存放了多个一样的数据
- executor是进程,进程内资源共享,造成了内存的浪费
- 如果将本地list对象标记为广播变量对象,那么上述场景出现的时候,Spark会给每个executor只发送一份广播变量对象,这样节省了内存。
- 可以节省:
  1. 网络IO的次数
  2. Executor的内存占用

解决方案-广播变量

使用方式:
1. 将本地list 标记成广播变量即可
  - broadcast =sc.broadcast(stu_info_list)
2. 使用广播变量,从broadcast对象中取出本地list对象即可
  - value = broadcast.value

先放进去broadcast内部,然后从broadcast内部在取出来用,中间传输的是broadcast这个对象了,而不是stu_info_list这个对象了

只要中间传输的是broadcast对象,spark就会留意,只会给每个Executor发一份了,而不是傻傻的哪个分区要都给
# coding:utf8
# 示例代码-引出广播变量
import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    stu_info_list = [(1, '张大仙', 11),
                     (2, '王晓晓', 13),
                     (3, '张甜甜', 11),
                     (4, '王大力', 11)]
    # 1. 将本地Python List对象标记为广播变量
    broadcast = sc.broadcast(stu_info_list)

    score_info_rdd = sc.parallelize([
        (1, '语文', 99),
        (2, '数学', 99),
        (3, '英语', 99),
        (4, '编程', 99),
        (1, '语文', 99),
        (2, '编程', 99),
        (3, '语文', 99),
        (4, '英语', 99),
        (1, '语文', 99),
        (3, '英语', 99),
        (2, '编程', 99)
    ])

    def map_func(data):
        id = data[0]
        name = ""
        # 匹配本地list和分布式rdd中的学生ID  匹配成功后 即可获得当前学生的姓名
        # 2. 在使用到本地集合对象的地方, 从广播变量中取出来用即可
        for stu_info in broadcast.value:
            stu_id = stu_info[0]
            if id == stu_id:
                name = stu_info[1]

        return (name, data[1], data[2])

    print(score_info_rdd.map(map_func).collect())
    sc.stop()

累加器

# 对map算子计算中的数据,进行计数累加,得到全部数据计算完后的累加结果
# coding:utf8
# 没有累加器的代码演示
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)

    count = 0

    def map_func(data):
        global count
        count += 1
        print(count)

    rdd2 = rdd.map(map_func).collect()
    print(count)
    sc.stop()

上述代码的问题在于:

  • count来自driver对象,当在分布式的map算子中需要count对象的时候
  • driver会将count对象发送给每一个executor一份(复制发送)
  • 每个executor各自收到一个,在最后执行print(count)的时候,这个被打印的count依旧是driver的那个
  • 所以不管executor中累加到多少,都和driver这个count无关

累加器的注意事项:

  • 使用累加器的时候,要注意,因为rdd是过程数据,如果rdd被多次使用,可能会重新构建此rdd
  • 如果累加器累加代码,存在重新构建的步骤中,累加器累加代码就可能被多次执行.
  • 如何解决: 加缓存 或者CheckPoint即可
# coding:utf8
# 累加器代码演示
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)

    # Spark提供的累加器变量, 参数是初始值
    acmlt = sc.accumulator(0)

    def map_func(data):
        global acmlt
        acmlt += 1
        # print(acmlt)

    rdd2 = rdd.map(map_func)
    rdd2.cache() # 缓存rdd2
    rdd2.collect()

    rdd3 = rdd2.map(lambda x:x)
    rdd3.collect()
    print(acmlt)  # 启用cache 结果为10,不启用cache rdd2会工作两次,结果为20
    sc.stop()

广播变量和累加器的综合案例

# coding:utf8
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import re

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # 1. 读取数据文件
    file_rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/accumulator_broadcast_data.txt")

    # 特殊字符的list定义
    abnormal_char = [",", ".", "!", "#", "$", "%"]

    # 2. 将特殊字符list 包装成广播变量
    broadcast = sc.broadcast(abnormal_char)

    # 3. 对特殊字符出现次数做累加, 累加使用累加器最好
    acmlt = sc.accumulator(0)

    # 4. 数据处理, 先处理数据的空行, 在Python中有内容就是True None就是False
    lines_rdd = file_rdd.filter(lambda line: line.strip())

    # 5. 去除前后的空格
    data_rdd = lines_rdd.map(lambda line: line.strip())

    # 6. 对数据进行切分, 按照正则表达式切分, 因为空格分隔符某些单词之间是两个或多个空格
    # 正则表达式 \s+ 表示 不确定多少个空格, 最少一个空格
    words_rdd = data_rdd.flatMap(lambda line: re.split("\s+", line))

    # 7. 当前words_rdd中有正常单词 也有特殊符号.
    # 现在需要过滤数据, 保留正常单词用于做单词计数, 在过滤 的过程中 对特殊符号做计数
    def filter_func(data):
        """过滤数据, 保留正常单词用于做单词计数, 在过滤 的过程中 对特殊符号做计数"""
        global acmlt
        # 取出广播变量中存储的特殊符号list
        abnormal_chars = broadcast.value
        if data in abnormal_chars:
            # 表示这个是 特殊字符
            acmlt += 1
            return False
        else:
            return True

    normal_words_rdd = words_rdd.filter(filter_func)
    # 8. 正常单词的单词计数逻辑
    result_rdd = normal_words_rdd.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)

    print("正常单词计数结果: ", result_rdd.collect())
    print("特殊字符数量: ", acmlt)

Spark 内核调度

DAG和Stage

DAG:有向无环图,只要描述一段执行任务,从开始一直往下走,不允许出现回调操作

Spark应用程序中,遇到一个Action算子,就会触发一个JOB任务的产生

对于每个JOB的任务,都会产生一个DAG执行流程图,流程图的形成的层级关系如下:

层级关系:

  1. 一个spark应用程序→遇到一个Action算子,就会触发形成一个JOB任务
  2. 一个JOB任务只有一个DAG有向无环图
  3. 一个DAG有向无环图→有多个stage
  4. 一个stage→有多个Task线程
  5. 一个RDD→有多个分区
  6. 一个分区会被一个Task线程所处理

RDD的依赖

RDD的依赖:一个RDD的形成可能由一个或者多个RDD得到的,此时这个RDD和之前的RDD之间产生依赖关系

Spark中,RDD之间的依赖关系,只要有两种类型:宽依赖和窄依赖

窄依赖:

  • 作用:能够让Spark程序并行计算,也就是一个分区数据计算出现问题的时候,其它分区不受影响
  • 特点:父RDD的分区和子RDD的分区是一对一关系,也就是父RDD分区的数据会整个被下游子RDD的分区接收

宽依赖:

  • 作用:划分stage的重要依据,宽依赖也叫shuffle依赖
  • 特点:父RDD的分区和子RDD的分区关系是一对多的关系,也就是父RDD的分区数据会被划成多份给到下游子RDD的多个分区做接收
  • 注意:如果有宽依赖,shuffle下游的其他操作,必须等待shuffle执行完成以后才能够继续执行,为了避免数据的不完整算子中一般以ByKey结尾的会发生shuffle;另外是重分区算子会发生shuffle

DAG执行流程图形成和Stage划分

  1. spark应用程序遇到Action算子后,就会触发一个JOB任务的产生,JOB任务就会将它所依赖的算子全部加载进来,形成一个stage
  2. 接着从action算子从后往前回溯,遇到窄依赖就将算子放在同一个stage中,如果遇到宽依赖,就划分形成新的stage,最后一直到回溯完成

Stage内部流程

默认并行度值的确认:

  1. 使用textFile读取HDFS上的文件,因此RDD分区数=max(文件的block块数量,defaultminpartition),继续需要知道defaultminpartition的值是多少
  2. defaultminpartition=min(spark.default.parallelism,2)取最小值,最终确认spark.default.parallelism的参数值就能最终确认RDD的分区数有多少个

spark.default.parallelism参数值的确认:

  1. 如果有父RDD,就取父RDD的最大分区数
  2. 如果没有父RDD,根据集群模式进行取值
    • 本地模式:机器的最大cpu核数
    • Mesos:默认是8
    • 其它模式:所有执行节点上的核总数或2,以较大者为准

Sort shuffle的普通机制

每个上游task线程处理数据,数据处理完以后,先放在内存中,接着对内存中的数据进行分区,排序,将内存中的数据溢写到磁盘,形成一个个小文件,溢写完成后,将多个小文件合并成一个大的磁盘文件,并且针对每个大的磁盘文件,提供一个索引文件,接着是下游Task根据索引文件来读取相应的数据

Job调度流程

  1. Driver进程启动后,底层PY4J创建SparkContext顶级对象,在创建该对象的进程中,还会创建另外两个对象,分别是:DAGScheduler和TaskScheduler
    • DAGScheduler:DAG调度器,将Job任务形成DAG有向无环图和划分Stage的阶段
    • TaskScheduler:Task调度器,将Task线程分配给到具体的Executor执行
  2. 一个saprk程序遇到一个action算子触发产生一个job任务,SparkContext将job任务给到DAG调度器,拿到job任务后,会将job任务形成有向无环图和划分stage阶段,并且确定每个stage有多少个Task线程,会将众多的Task线程放到TaskSet的集合中,DAG调度器将TaskSet集合给到Task调度器
  3. Task调度器拿到TaskSet集合以后,将Task分配给到具体的Executor执行,底层是基于SchedulerBackend调度队列来实现的
  4. Executor开始执行任务,并且Driver会监控各个Executor的执行状态,知道所有的Executor执行完成,就认为任务运行结束
  5. Driver通知Namenode释放资源

Spark程序的调度流程如图:

  1. Driver被构建出来
  2. 构建SparkContext(执行环境入口对象)
  3. 基于DAG Scheduler(DAG调度器) 构建逻辑Task分配
  4. 基于TaskScheduler (Task调度器) 将逻辑Task分配到各个Executor上干活,并监控它们
  5. Worker(Executor),被TaskScheduler管理监控,听从它们的指令干活,并定期汇报进度

层级关系梳理:

  1. 一个Spark环境可以运行多个Application
  2. 一个代码运行起来,会成为一个Application
  3. Application 内部可以有多个Job
  4. 每个Job由一个Action产生,并且每个Job有自己的DAG执行图
  5. 一个Job的DAG图 会基于宽窄依赖划分成不同的阶段
  6. 不同阶段内基于分区数量,形成多个并行的内存迭代管道
  7. 每一个内存迭代管道形成一个Task(DAG 调度器划分将Job内划分出县体的task任务,一个Job被划分出来的task 在逻辑上称之为这个Job的taskset)

Spark RDD并行度

整个Spark应用中,影响并行度的因素有以下两个原因:

  1. 资源的并行度:Executor数量和CPU核数以及内存的大小
  2. 数据的并行度:Task的线程和分区数量
  3. 一般将Task想层数量设置为CPU核数的2-3倍,另外每个线程分配3-5GB的内存资源
  4. 说明: spark.default.parallelism该参数是SparkCore中的参数。该参数只会影响shuffle以后的分区数量。另外该参数对parallelize并行化本地集合创建的RDD不起作用。
  5. 全局并行度配置的参数: spark.default.parallelism
  6. 优先级从高到低:
    • 代码中
    • 客户端提交参数中
    • 配置文件中
    • 默认(1,但是不会全部以1来跑,多数时候基于读取文件的分片数量来作为默认并行度)
# 全局并行度-推荐
# 配置文件中: conf/spark-defaults.conf 中设置
spark.default.parallelism 100
# 在客户端提交参数中:
bin/spark-submit --conf "spark.default.parallelism=100"
# 在代码中设置:
conf = SaprkConf()
conf.set("spark.default.parallelism", "100")

# 针对RDD的并行度设置-不推荐
# 只能在代码中写,算子:
repartition算子
coalesce算子
partitionBy算子

# 集群中如何规划并行度
结论: 设置为CPU总核心的2~10倍
比如集群可用CPU核心是100个,我们建议并行度是200~1000
确保是CPU核心的整数倍即可,最小是2倍,最大一般10倍或更高(适量)均可