1. 认识Spark

认识Spark

Spark是什么

定义:Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。

统一分析引擎?

Spark是一款分布式内存计算的统一分析引擎。

其特点就是对任意类型的数据进行自定义计算。

Spark可以计算:结构化、半结构化、非结构化等各种类型的数据结构,同时也支持使用Python、Java、Scala、R以及SQL语言去开发应用
程序计算数据。

Spark的适用面非常广泛,所以,被称之为统一的(适用面广)的分析引擎(数据处理)

Spark和前面学习的Hadoop技术栈有何区别呢?

Spark VS Hadoop(MapReduce)

尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop

  • 在计算层面,Spark相比较MR(MapReduce)有巨大的性能优势,但至今仍有许多计算工具基于MR构架,比如非常成熟的Hive
  • Spark仅做计算,而Hadoop生态圈不仅有计算(MR)也有存储(HDFS)和资源管理调度(YARN),HDFS和YARN仍是许多大数据
    体系的核心架构

面试题:Hadoop的基于进程的计算和Spark基于线程方式优缺点?

  • 答案:Hadoop中的MR中每个map/reduce task都是一个java进程方式运行,好处在于进程之间是互相独立的,每个task独享进程资源,没
    有互相干扰,监控方便,但是问题在于task之间不方便共享数据,执行效率比较低。比如多个map task读取不同数据源文件需要将数据源加
    载到每个map task中,造成重复加载和浪费内存。而基于线程的方式计算是为了数据共享和提高执行效率,Spark采用了线程的最小的执行
    单位,但缺点是线程之间会有资源竞争。
【扩展阅读】:线程基本概念
  • 线程是CPU的基本调度单位
  • 一个进程一般包含多个线程, 一个进程下的多个线程共享进程的资源
  • 不同进程之间的线程相互不可见
  • 线程不能独立执行
  • 一个线程可以创建和撤销另外一个线程

Spark 四大特点

速度快

Spark处理数据与MapReduce处理数据相比,有如下两个不同点:

  • 其一、Spark处理数据时,可以将中间处理结果数据存储到内存中;
  • 其二、Spark提供了非常丰富的算子(API), 可以做到复杂任务在一个Spark 程序中完成.
易于使用

Spark 的版本已经更新到Spark 3.4.0,支持了包括 Java、Scala、Python 、R和SQL语言在内的多种语言。为了
兼容Spark2.x企业级应用场景,Spark仍然持续更新Spark2版本。

通用性强

在Spark 的基础上,Spark 还提供了包括Spark SQL、Spark Streaming、MLib 及GraphX在内的多个工具库,我们可以在一个应用中无缝
地使用这些工具库。

运行模式多样

Spark 支持多种运行方式,包括在Hadoop 和Mesos 上,也支持Standalone的独立运行模式,同时也可以运行在云Kubernetes(Spark
2.3开始支持)上。对于数据源而言,Spark 支持从HDFS、HBase、Cassandra 及 Kafka 等多种途径获取数据。

Spark 框架模块

整个Spark 框架模块包含:Spark Core、Spark SQL、Spark Streaming、 Spark GraphX、 Spark MLlib,而后四项的能力都是建立在核心引擎之上

  • Spark Core:Spark的核心,Spark核心功能均由Spark Core模块提供,是Spark运行的基础。Spark Core以RDD为数据抽象,提供Python、Java、
    Scala、R语言的API,可以编程进行海量离线数据批处理计算。
  • SparkSQL:基于SparkCore之上,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,SparkSQL本身针对离线计算场景。同
    时基于SparkSQL,Spark提供了StructuredStreaming模块,可以以SparkSQL为基础,进行数据的流式计算。
  • SparkStreaming:以SparkCore为基础,提供数据的流式计算功能。
  • MLlib:以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法等。方便用户以分布式计算的模式进行机器学习计算。
  • GraphX:以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用于以分布式计算模式进行图计算。

Spark的运行模式

Spark提供多种运行模式,包括:

  • 本地模式(单机)
    • 本地模式就是以一个独立的进程,通过其内部的多个线程来模拟整个Spark运行时环境
  • Standalone模式(集群)
    • Spark中的各个角色以独立进程的形式存在,并组成Spark集群环境
  • Hadoop YARN模式(集群)
    • Spark中的各个角色运行在YARN的容器内部,并组成Spark集群环境
  • Kubernetes模式(容器集群)
    • Spark中的各个角色运行在Kubernetes的容器内部,并组成Spark集群环境
  • 云服务模式(运行在云平台)

Spark的架构角色

YARN主要有4类角色,从2个层面去看:

  • 资源管理层面
    • 集群资源管理者(Master):ResourceManager
    • 单机资源管理者(Worker):NodeManager
  • 任务计算层面
    • 单任务管理者(Master):ApplicationMaster
    • 单任务执行者(Worker):Task(容器内计算框架的工作角色)
Spark运行角色:
  • Driver(驱动程序):

    • 职责:负责执行用户编写的 Spark 应用程序的 main 函数,并创建 SparkContext。Driver 负责将用户程序转换为任务,调度任务到 Executor 上执行,并监控任务的执行状态。
    • 位置:通常运行在客户端或集群管理器(如 YARN、Mesos)中的一个节点上。
  • Executor(执行器):

    • 职责:在集群的工作节点上运行,负责执行 Driver 分配的任务。每个 Executor 都有自己的 JVM 进程,可以并行执行多个任务。
    • 资源:Executor 会占用一定的内存和 CPU 资源,用于执行任务和缓存数据。
    • 位置:运行在集群的工作节点上。
  • Cluster Manager(集群管理器):

    • 职责:负责管理集群资源,调度和分配资源给 Spark 应用程序。常见的集群管理器包括:
      • Standalone:Spark 自带的简单集群管理器。
      • YARN:Hadoop 的资源管理器。
      • Mesos:通用的集群管理器。
      • Kubernetes:容器编排平台。
    • 位置:通常运行在集群的主节点上。
  • Worker Node(工作节点):

    • 职责:在集群中运行 Executor 的节点,负责执行具体的计算任务。
    • 资源:每个 Worker 节点可以运行多个 Executor,具体数量取决于集群的资源配置。
    • 位置:运行在集群的工作节点上。
  • Master Node(主节点):

    • 职责:在 Standalone 模式下,Master 节点负责管理 Worker 节点,并协调资源的分配。
    • 位置:运行在集群的主节点上。
  • Application Master(应用程序主节点):

    • 职责:在 YARN 或 Mesos 模式下,Application Master 负责与集群管理器通信,申请资源并启动 Executor。
    • 位置:运行在集群的一个节点上。

Spark on YARN 的优势

1) 统一化资源管理

  • Standalone 集群的局限性
    • Standalone 是 Spark 专用的资源管理集群,其设计初衷是为了专门运行 Spark 程序。这意味着如果企业或组织中存在多种分布式计算需求,使用 Standalone 集群就需要额外为其他类型的分布式程序(如 MR、Tez、Flink 等)搭建不同的资源管理平台。从硬件资源利用和管理成本的角度来看,这种方式效率较低。
  • YARN 的通用性
    • YARN 是一个功能强大的分布式资源管理平台,它具有高度的通用性。可以在同一个 YARN 集群上运行各种不同类型的分布式程序。在实际工作场景中,硬件集群通常只有一套,选择 YARN 作为统一的资源管理平台,可以更好地整合资源,实现多种分布式计算任务的协同运行,从整个技术架构层面来说,YARN 是更为合适的选择。

2) YARN 调度机制的优势

  • 动态资源分配
    • YARN 支持动态资源分配,这对于运行 Spark 程序非常有帮助。在 Spark 作业运行过程中,计算资源的需求可能会随着数据处理阶段的不同而发生变化。例如,在数据读取和预处理阶段可能需要较少的资源,而在复杂的计算和聚合阶段可能需要更多的资源。YARN 的动态资源分配机制可以根据 Spark 作业的实际需求,动态地为其分配或回收资源,提高资源的利用率。
  • 多种调度机制
    • YARN 提供了多种调度机制,如容量调度和公平调度。容量调度可以为不同的用户或业务部门分配固定的资源容量,保证每个用户或部门都能获得稳定的计算资源,适用于多用户共享集群的场景。公平调度则确保每个作业都能公平地获取资源,避免某些作业长时间占用大量资源而导致其他作业饥饿的情况,提高了集群资源分配的公平性和整体作业执行效率。

Spark 应用架构

Spark Application运行到集群上时,由两部分组成:Driver Program和Executors

  • Driver Program
    • 相当于AppMaster,整个应用管理者,负责应用中所有Job的调度执行;
    • 运行JVM Process,运行程序的MAIN函数,必须创建SparkContext上下文对象;
    • 一个SparkApplication仅有一个;
  • Executors
    • 相当于一个线程池,运行JVM Process,其中有很多线程,每个线程运行一个Task任务,一个Task任务运行需要1 Core CPU,所
      有可以认为Executor中线程数就等于CPU Core核数;
    • 一个Spark Application可以有多个,可以设置个数和资源信息;
用户程序从最开始的提交到最终的计算执行,需要经历以下几个阶段:

1) 用户程序创建SparkContext 时,新创建的SparkContext 实例会连接到ClusterManager。 Cluster Manager 会根据用户
提交时设置的CPU 和内存等信息为本次提交分配计算资源,启动Executor。
2) Driver会将用户程序划分为不同的执行阶段Stage,每个执行阶段Stage由一组完全相同Task组成,这些Task分别作用于待处
理数据的不同分区。在阶段划分完成和Task创建后,Driver会向Executor发送Task;
3) Executor在接收到Task后,会下载Task的运行时依赖,在准备好Task的执行环境后,会开始执行Task,并且将Task的运行状态
汇报给Driver;
4) Driver会根据收到的Task的运行状态来处理不同的状态更新。Task分为两种:一种是Shuffle Map Task,它实现数据的重新
洗牌,洗牌的结果保存到Executor 所在节点的文件系统中;另外一种是Result Task,它负责生成结果数据;
5) Driver 会不断地调用Task,将Task发送到Executor执行,在所有的Task 都正确执行或者超过执行次数的限制仍然没有执行成
功时停止;

Spark程序运行层次结构

在一个Spark Application中,包含多个Job,每个Job有多个Stage组成,每个Job执行按照DAG图进行的。

Spark Application程序运行时三个核心概念:Job、Stage、Task,说明如下:

  • Job:由多个Task 的并行计算部分,一般Spark 中的action 操作(如 save、collect,后面进一步说明),会生成一个Job。
  • Stage:Job 的组成单位,一个Job 会切分成多个Stage,Stage 彼此之间相互依赖顺序执行,而每个Stage 是多个Task 的集合,类似map 和reduce stage。
  • Task:被分配到各个Executor 的单位工作内容,它是Spark 中的最小执行单位,一般来说有多少个Paritition(物理层面的概念,即分支可以理解为将数据划分成不同部分并行处理),就会有多少个Task,每个Task 只会处理单一分支上的数据。

SparkOnYarn

SparkOnYarn本质

  • Master角色由YARN的ResourceManager担任.
  • Worker角色由YARN的NodeManager担任.
  • Driver角色运行在YARN容器内或提交任务的客户端进程中
  • 真正干活的Executor运行在YARN提供的容器内

Spark On YARN是有两种运行模式的,一种是Cluster模式一种是Client模式.这两种模式的区别就是Driver运行的位置.

  • Cluster模式即:Driver运行在YARN容器内部, 和ApplicationMaster在同一个容器内
  • Client模式即:Driver运行在客户端进程中, 比如Driver运行在spark-submit程序的进程中

Spark On Yarn两种模式总结

Client模式和Cluster模式最最本质的区别是:Driver程序运行在哪里。

  • Client模式:学习测试时使用,生产不推荐(要用也可以,性能略低,稳定性略低)
    1. Driver运行在Client上,和集群的通信成本高
    2. Driver输出结果会在客户端显示
  • Cluster模式:生产环境中使用该模式
    1. Driver程序在YARN集群中,和集群的通信成本低
    2. Driver输出结果不能在客户端显示
    3. 该模式下Driver运行ApplicattionMaster这个节点上,由Yarn管理,如果出现问题,yarn会重启ApplicattionMaster(Driver)

Spark Shuffle

Map和Reduce

  • 在Shuffle过程中.提供数据的称之为Map端(Shuffle Write) 接收数据的称之为Reduce端(Shuffle Read)
  • 在Spark的两个阶段中,总是前一个阶段产生一批Map提供数据,下一阶段产生一批Reduce接收数据

Spark 提供2种Shuffle管理器:

  • HashShuffleManager
  • SortShuffleManager

优化后HashShuffleManager,基本和未优化的一致,不同点在于

  1. 在一个Executor内,不同Task是共享Buffer缓冲区
  2. 这样减少了缓冲区乃至写入磁盘文件的数量,提高性能

SortShuffleManager的运行机制主要分成两种

  • 一种是普通运行机制
  • 另一种是bypass运行机制。

bypass运行机制的触发条件如下:

  • shuffle map task数量小于 spark.shuffle.sort.bypassMergeThreshold=200参数的值。
  • 不是聚合类的shuffle算子(比如reduceByKey)。
  • 和普通模式IDE区别在于: 磁盘写机制不同; 不会进行排序。
  1. SortShuffle对比HashShuffle可以减少很多的磁盘文件,以节省网络IO的开销
  2. SortShuffle主要是对磁盘文件进行合并来进行文件数量的减少,同时两类Shuffle都需要经过内存缓冲区溢写磁盘的场景.所以可以得知,尽管Spark是内存迭代计算框架,但是内存迭代主要在窄依赖中.在宽依赖(Shuffle)中磁盘交互还是一个无可避免的情况.所以,我们要尽量减少Shuffle的出现,不要进行无意义的Shuffle计算.

Spark 3.0 新特性

Adaptive Query Execution 自适应查询(SparkSQL)

  • 动态合并Shuffle Partitions
  • 动态调整Join策略
  • 动态优化倾斜Join(Skew Joins)

开启AQE方式

  • set spark.sql.adaptive.enabled = true;

动态合并Dynamically coalescing shuffle partitions

  • 可以动态调整shuffle分区的数量。用户可以在开始时设置相对较多的shuffle分区数,AQE会在运行时将相邻的小分区合并为较大的分区。

动态调整Join策略Dynamically switching join strategies

  • 此优化可以在一定程度上避免由于缺少统计信息或着错误估计大小(当然也可能两种情况同时存在),而导致执行计划性能不佳的情况。这种自适应优化可以在运行时sort merge join转换成broadcast hash join,从而进一步提升性能

动态优化倾斜Join

  • skew joins可能导致负载的极端不平衡,并严重降低性能。在AQE从shuffle文件统计信息中检测到任何倾斜后,它可以将倾斜的分区分割成更小的分区,并将它们与另一侧的相应分区连接起来。这种优化可以并行化倾斜处理,获得更好的整体性能。

动态优化倾斜Join 触发条件:

  1. 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionFactor (default=10) * "median partition size(中位数分区大小)"
  2. 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default = 256MB)

AQE 总结:

  1. AQE的开启通过: spark.sql.adaptive.enabled 设置为true开启
  2. AQE是自动化优化机制, 无需我们设置复杂的参数调整, 开启AQE符合条件即可自动化应用AQE优化
  3. AQE带来了极大的SparkSQL性能提升

Dynamic Partition Pruning 动态分区裁剪(SparkSQL)

当优化器在编译时无法识别可跳过的分区时,可以使用"动态分区裁剪",即基于运行时推断的信息来进一步进行分区
裁剪。

增强的Python API: PySpark和Koalas

# pip install koalas 安装koalas类库  spark3.2版本以下支持,以上版本已弃用
import pandas as pd
import numpy as np
import databricks.koalas as ks
from pyspark.sql import SparkSession
import pyspark.sql.types as types

try:
    from pyspark.sql.types import to_arrow_type, from_arrow_type
except ImportError:
    from pyspark.sql.pandas.types import to_arrow_type, from_arrow_type

spark = SparkSession.builder.\
    appName("SparkSQL Example").\
    master("local[*]").\
    config("spark.sql.adaptive.enabled", "true").\
    config("spark.sql.shuffle.partitions", "20").\
    config("spark.debug.maxToStringFields","100").\
    config("spark.sql.warehouse.dir", "hdfs://ns1/user/hive/warehouse").\
    config("hive.metastore.uris", "thrift://kube-40:9083").\
    enableHiveSupport().\
    getOrCreate()

# 构建Pandas的DatetimeIndex
dates = pd.date_range('20130101', periods=6)
# 构建Pandas的DataFrame
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf.head()
# 基于PDF构建Koalas DataFrame
kdf = ks.from_pandas(pdf)
# 或者基于SparkSession构建
# sdf = spark.createDataFrame(pdf)
# kdf = sdf.to_koalas()
# 先转换PandasDataFrame成SparkDataFrame
# 转换SparkDataFrame到KoalasDataFrame

# 或者直接创建kdf也可以
# kdf = ks.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
#  'foo', 'bar', 'foo', 'foo'],
#  'B': ['one', 'one', 'two', 'three',
#  'two', 'two', 'one', 'three'],
#  'C': np.random.randn(8),
#  'D': np.random.randn(8)})

print(type(kdf))

# kdf.head()

# kdf.describe()

# kdf.index

# kdf.columns

# kdf.dtypes

# kdf.shape

# kdf.info()

# kdf.select('A', 'B').show()

# kdf.select('A', 'B').head()

# kdf.select('A', 'B').tail()

# kdf.select('A', 'B').sample(fraction=0.5, withReplacement=False).show()