认识Spark
Spark是什么
定义:Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。
统一分析引擎?
Spark是一款分布式内存计算的统一分析引擎。
其特点就是对任意类型的数据进行自定义计算。
Spark可以计算:结构化、半结构化、非结构化等各种类型的数据结构,同时也支持使用Python、Java、Scala、R以及SQL语言去开发应用
程序计算数据。
Spark的适用面非常广泛,所以,被称之为统一的(适用面广)的分析引擎(数据处理)
Spark和前面学习的Hadoop技术栈有何区别呢?
Spark VS Hadoop(MapReduce)
尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop
- 在计算层面,Spark相比较MR(MapReduce)有巨大的性能优势,但至今仍有许多计算工具基于MR构架,比如非常成熟的Hive
- Spark仅做计算,而Hadoop生态圈不仅有计算(MR)也有存储(HDFS)和资源管理调度(YARN),HDFS和YARN仍是许多大数据
体系的核心架构
面试题:Hadoop的基于进程的计算和Spark基于线程方式优缺点?
- 答案:Hadoop中的MR中每个map/reduce task都是一个java进程方式运行,好处在于进程之间是互相独立的,每个task独享进程资源,没
有互相干扰,监控方便,但是问题在于task之间不方便共享数据,执行效率比较低。比如多个map task读取不同数据源文件需要将数据源加
载到每个map task中,造成重复加载和浪费内存。而基于线程的方式计算是为了数据共享和提高执行效率,Spark采用了线程的最小的执行
单位,但缺点是线程之间会有资源竞争。
【扩展阅读】:线程基本概念
- 线程是CPU的基本调度单位
- 一个进程一般包含多个线程, 一个进程下的多个线程共享进程的资源
- 不同进程之间的线程相互不可见
- 线程不能独立执行
- 一个线程可以创建和撤销另外一个线程
Spark 四大特点
速度快
Spark处理数据与MapReduce处理数据相比,有如下两个不同点:
- 其一、Spark处理数据时,可以将中间处理结果数据存储到内存中;
- 其二、Spark提供了非常丰富的算子(API), 可以做到复杂任务在一个Spark 程序中完成.
易于使用
Spark 的版本已经更新到Spark 3.4.0,支持了包括 Java、Scala、Python 、R和SQL语言在内的多种语言。为了
兼容Spark2.x企业级应用场景,Spark仍然持续更新Spark2版本。
通用性强
在Spark 的基础上,Spark 还提供了包括Spark SQL、Spark Streaming、MLib 及GraphX在内的多个工具库,我们可以在一个应用中无缝
地使用这些工具库。
运行模式多样
Spark 支持多种运行方式,包括在Hadoop 和Mesos 上,也支持Standalone的独立运行模式,同时也可以运行在云Kubernetes(Spark
2.3开始支持)上。对于数据源而言,Spark 支持从HDFS、HBase、Cassandra 及 Kafka 等多种途径获取数据。
Spark 框架模块
整个Spark 框架模块包含:Spark Core、Spark SQL、Spark Streaming、 Spark GraphX、 Spark MLlib,而后四项的能力都是建立在核心引擎之上
- Spark Core:Spark的核心,Spark核心功能均由Spark Core模块提供,是Spark运行的基础。Spark Core以RDD为数据抽象,提供Python、Java、
Scala、R语言的API,可以编程进行海量离线数据批处理计算。 - SparkSQL:基于SparkCore之上,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,SparkSQL本身针对离线计算场景。同
时基于SparkSQL,Spark提供了StructuredStreaming模块,可以以SparkSQL为基础,进行数据的流式计算。 - SparkStreaming:以SparkCore为基础,提供数据的流式计算功能。
- MLlib:以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法等。方便用户以分布式计算的模式进行机器学习计算。
- GraphX:以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用于以分布式计算模式进行图计算。
Spark的运行模式
Spark提供多种运行模式,包括:
- 本地模式(单机)
- 本地模式就是以一个独立的进程,通过其内部的多个线程来模拟整个Spark运行时环境
- Standalone模式(集群)
- Spark中的各个角色以独立进程的形式存在,并组成Spark集群环境
- Hadoop YARN模式(集群)
- Spark中的各个角色运行在YARN的容器内部,并组成Spark集群环境
- Kubernetes模式(容器集群)
- Spark中的各个角色运行在Kubernetes的容器内部,并组成Spark集群环境
- 云服务模式(运行在云平台)
Spark的架构角色
YARN主要有4类角色,从2个层面去看:
- 资源管理层面
- 集群资源管理者(Master):ResourceManager
- 单机资源管理者(Worker):NodeManager
- 任务计算层面
- 单任务管理者(Master):ApplicationMaster
- 单任务执行者(Worker):Task(容器内计算框架的工作角色)
Spark运行角色:
-
Driver(驱动程序):
- 职责:负责执行用户编写的 Spark 应用程序的 main 函数,并创建 SparkContext。Driver 负责将用户程序转换为任务,调度任务到 Executor 上执行,并监控任务的执行状态。
- 位置:通常运行在客户端或集群管理器(如 YARN、Mesos)中的一个节点上。
-
Executor(执行器):
- 职责:在集群的工作节点上运行,负责执行 Driver 分配的任务。每个 Executor 都有自己的 JVM 进程,可以并行执行多个任务。
- 资源:Executor 会占用一定的内存和 CPU 资源,用于执行任务和缓存数据。
- 位置:运行在集群的工作节点上。
-
Cluster Manager(集群管理器):
- 职责:负责管理集群资源,调度和分配资源给 Spark 应用程序。常见的集群管理器包括:
- Standalone:Spark 自带的简单集群管理器。
- YARN:Hadoop 的资源管理器。
- Mesos:通用的集群管理器。
- Kubernetes:容器编排平台。
- 位置:通常运行在集群的主节点上。
- 职责:负责管理集群资源,调度和分配资源给 Spark 应用程序。常见的集群管理器包括:
-
Worker Node(工作节点):
- 职责:在集群中运行 Executor 的节点,负责执行具体的计算任务。
- 资源:每个 Worker 节点可以运行多个 Executor,具体数量取决于集群的资源配置。
- 位置:运行在集群的工作节点上。
-
Master Node(主节点):
- 职责:在 Standalone 模式下,Master 节点负责管理 Worker 节点,并协调资源的分配。
- 位置:运行在集群的主节点上。
-
Application Master(应用程序主节点):
- 职责:在 YARN 或 Mesos 模式下,Application Master 负责与集群管理器通信,申请资源并启动 Executor。
- 位置:运行在集群的一个节点上。
Spark on YARN 的优势
1) 统一化资源管理
- Standalone 集群的局限性
- Standalone 是 Spark 专用的资源管理集群,其设计初衷是为了专门运行 Spark 程序。这意味着如果企业或组织中存在多种分布式计算需求,使用 Standalone 集群就需要额外为其他类型的分布式程序(如 MR、Tez、Flink 等)搭建不同的资源管理平台。从硬件资源利用和管理成本的角度来看,这种方式效率较低。
- YARN 的通用性
- YARN 是一个功能强大的分布式资源管理平台,它具有高度的通用性。可以在同一个 YARN 集群上运行各种不同类型的分布式程序。在实际工作场景中,硬件集群通常只有一套,选择 YARN 作为统一的资源管理平台,可以更好地整合资源,实现多种分布式计算任务的协同运行,从整个技术架构层面来说,YARN 是更为合适的选择。
2) YARN 调度机制的优势
- 动态资源分配
- YARN 支持动态资源分配,这对于运行 Spark 程序非常有帮助。在 Spark 作业运行过程中,计算资源的需求可能会随着数据处理阶段的不同而发生变化。例如,在数据读取和预处理阶段可能需要较少的资源,而在复杂的计算和聚合阶段可能需要更多的资源。YARN 的动态资源分配机制可以根据 Spark 作业的实际需求,动态地为其分配或回收资源,提高资源的利用率。
- 多种调度机制
- YARN 提供了多种调度机制,如容量调度和公平调度。容量调度可以为不同的用户或业务部门分配固定的资源容量,保证每个用户或部门都能获得稳定的计算资源,适用于多用户共享集群的场景。公平调度则确保每个作业都能公平地获取资源,避免某些作业长时间占用大量资源而导致其他作业饥饿的情况,提高了集群资源分配的公平性和整体作业执行效率。
Spark 应用架构
Spark Application运行到集群上时,由两部分组成:Driver Program和Executors
- Driver Program
- 相当于AppMaster,整个应用管理者,负责应用中所有Job的调度执行;
- 运行JVM Process,运行程序的MAIN函数,必须创建SparkContext上下文对象;
- 一个SparkApplication仅有一个;
- Executors
- 相当于一个线程池,运行JVM Process,其中有很多线程,每个线程运行一个Task任务,一个Task任务运行需要1 Core CPU,所
有可以认为Executor中线程数就等于CPU Core核数; - 一个Spark Application可以有多个,可以设置个数和资源信息;
- 相当于一个线程池,运行JVM Process,其中有很多线程,每个线程运行一个Task任务,一个Task任务运行需要1 Core CPU,所
用户程序从最开始的提交到最终的计算执行,需要经历以下几个阶段:
1) 用户程序创建SparkContext 时,新创建的SparkContext 实例会连接到ClusterManager。 Cluster Manager 会根据用户
提交时设置的CPU 和内存等信息为本次提交分配计算资源,启动Executor。
2) Driver会将用户程序划分为不同的执行阶段Stage,每个执行阶段Stage由一组完全相同Task组成,这些Task分别作用于待处
理数据的不同分区。在阶段划分完成和Task创建后,Driver会向Executor发送Task;
3) Executor在接收到Task后,会下载Task的运行时依赖,在准备好Task的执行环境后,会开始执行Task,并且将Task的运行状态
汇报给Driver;
4) Driver会根据收到的Task的运行状态来处理不同的状态更新。Task分为两种:一种是Shuffle Map Task,它实现数据的重新
洗牌,洗牌的结果保存到Executor 所在节点的文件系统中;另外一种是Result Task,它负责生成结果数据;
5) Driver 会不断地调用Task,将Task发送到Executor执行,在所有的Task 都正确执行或者超过执行次数的限制仍然没有执行成
功时停止;
Spark程序运行层次结构
在一个Spark Application中,包含多个Job,每个Job有多个Stage组成,每个Job执行按照DAG图进行的。
Spark Application程序运行时三个核心概念:Job、Stage、Task,说明如下:
- Job:由多个Task 的并行计算部分,一般Spark 中的action 操作(如 save、collect,后面进一步说明),会生成一个Job。
- Stage:Job 的组成单位,一个Job 会切分成多个Stage,Stage 彼此之间相互依赖顺序执行,而每个Stage 是多个Task 的集合,类似map 和reduce stage。
- Task:被分配到各个Executor 的单位工作内容,它是Spark 中的最小执行单位,一般来说有多少个Paritition(物理层面的概念,即分支可以理解为将数据划分成不同部分并行处理),就会有多少个Task,每个Task 只会处理单一分支上的数据。
SparkOnYarn
SparkOnYarn本质
- Master角色由YARN的ResourceManager担任.
- Worker角色由YARN的NodeManager担任.
- Driver角色运行在YARN容器内或提交任务的客户端进程中
- 真正干活的Executor运行在YARN提供的容器内
Spark On YARN是有两种运行模式的,一种是Cluster模式一种是Client模式.这两种模式的区别就是Driver运行的位置.
- Cluster模式即:Driver运行在YARN容器内部, 和ApplicationMaster在同一个容器内
- Client模式即:Driver运行在客户端进程中, 比如Driver运行在spark-submit程序的进程中
Spark On Yarn两种模式总结
Client模式和Cluster模式最最本质的区别是:Driver程序运行在哪里。
- Client模式:学习测试时使用,生产不推荐(要用也可以,性能略低,稳定性略低)
- Driver运行在Client上,和集群的通信成本高
- Driver输出结果会在客户端显示
- Cluster模式:生产环境中使用该模式
- Driver程序在YARN集群中,和集群的通信成本低
- Driver输出结果不能在客户端显示
- 该模式下Driver运行ApplicattionMaster这个节点上,由Yarn管理,如果出现问题,yarn会重启ApplicattionMaster(Driver)
Spark Shuffle
Map和Reduce
- 在Shuffle过程中.提供数据的称之为Map端(Shuffle Write) 接收数据的称之为Reduce端(Shuffle Read)
- 在Spark的两个阶段中,总是前一个阶段产生一批Map提供数据,下一阶段产生一批Reduce接收数据
Spark 提供2种Shuffle管理器:
- HashShuffleManager
- SortShuffleManager
优化后HashShuffleManager,基本和未优化的一致,不同点在于
- 在一个Executor内,不同Task是共享Buffer缓冲区
- 这样减少了缓冲区乃至写入磁盘文件的数量,提高性能
SortShuffleManager的运行机制主要分成两种
- 一种是普通运行机制
- 另一种是bypass运行机制。
bypass运行机制的触发条件如下:
- shuffle map task数量小于 spark.shuffle.sort.bypassMergeThreshold=200参数的值。
- 不是聚合类的shuffle算子(比如reduceByKey)。
- 和普通模式IDE区别在于: 磁盘写机制不同; 不会进行排序。
- SortShuffle对比HashShuffle可以减少很多的磁盘文件,以节省网络IO的开销
- SortShuffle主要是对磁盘文件进行合并来进行文件数量的减少,同时两类Shuffle都需要经过内存缓冲区溢写磁盘的场景.所以可以得知,尽管Spark是内存迭代计算框架,但是内存迭代主要在窄依赖中.在宽依赖(Shuffle)中磁盘交互还是一个无可避免的情况.所以,我们要尽量减少Shuffle的出现,不要进行无意义的Shuffle计算.
Spark 3.0 新特性
Adaptive Query Execution 自适应查询(SparkSQL)
- 动态合并Shuffle Partitions
- 动态调整Join策略
- 动态优化倾斜Join(Skew Joins)
开启AQE方式
- set spark.sql.adaptive.enabled = true;
动态合并Dynamically coalescing shuffle partitions
- 可以动态调整shuffle分区的数量。用户可以在开始时设置相对较多的shuffle分区数,AQE会在运行时将相邻的小分区合并为较大的分区。
动态调整Join策略Dynamically switching join strategies
- 此优化可以在一定程度上避免由于缺少统计信息或着错误估计大小(当然也可能两种情况同时存在),而导致执行计划性能不佳的情况。这种自适应优化可以在运行时sort merge join转换成broadcast hash join,从而进一步提升性能
动态优化倾斜Join
- skew joins可能导致负载的极端不平衡,并严重降低性能。在AQE从shuffle文件统计信息中检测到任何倾斜后,它可以将倾斜的分区分割成更小的分区,并将它们与另一侧的相应分区连接起来。这种优化可以并行化倾斜处理,获得更好的整体性能。
动态优化倾斜Join 触发条件:
- 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionFactor (default=10) * "median partition size(中位数分区大小)"
- 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default = 256MB)
AQE 总结:
- AQE的开启通过: spark.sql.adaptive.enabled 设置为true开启
- AQE是自动化优化机制, 无需我们设置复杂的参数调整, 开启AQE符合条件即可自动化应用AQE优化
- AQE带来了极大的SparkSQL性能提升
Dynamic Partition Pruning 动态分区裁剪(SparkSQL)
当优化器在编译时无法识别可跳过的分区时,可以使用"动态分区裁剪",即基于运行时推断的信息来进一步进行分区
裁剪。
增强的Python API: PySpark和Koalas
# pip install koalas 安装koalas类库 spark3.2版本以下支持,以上版本已弃用
import pandas as pd
import numpy as np
import databricks.koalas as ks
from pyspark.sql import SparkSession
import pyspark.sql.types as types
try:
from pyspark.sql.types import to_arrow_type, from_arrow_type
except ImportError:
from pyspark.sql.pandas.types import to_arrow_type, from_arrow_type
spark = SparkSession.builder.\
appName("SparkSQL Example").\
master("local[*]").\
config("spark.sql.adaptive.enabled", "true").\
config("spark.sql.shuffle.partitions", "20").\
config("spark.debug.maxToStringFields","100").\
config("spark.sql.warehouse.dir", "hdfs://ns1/user/hive/warehouse").\
config("hive.metastore.uris", "thrift://kube-40:9083").\
enableHiveSupport().\
getOrCreate()
# 构建Pandas的DatetimeIndex
dates = pd.date_range('20130101', periods=6)
# 构建Pandas的DataFrame
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf.head()
# 基于PDF构建Koalas DataFrame
kdf = ks.from_pandas(pdf)
# 或者基于SparkSession构建
# sdf = spark.createDataFrame(pdf)
# kdf = sdf.to_koalas()
# 先转换PandasDataFrame成SparkDataFrame
# 转换SparkDataFrame到KoalasDataFrame
# 或者直接创建kdf也可以
# kdf = ks.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
# 'foo', 'bar', 'foo', 'foo'],
# 'B': ['one', 'one', 'two', 'three',
# 'two', 'two', 'one', 'three'],
# 'C': np.random.randn(8),
# 'D': np.random.randn(8)})
print(type(kdf))
# kdf.head()
# kdf.describe()
# kdf.index
# kdf.columns
# kdf.dtypes
# kdf.shape
# kdf.info()
# kdf.select('A', 'B').show()
# kdf.select('A', 'B').head()
# kdf.select('A', 'B').tail()
# kdf.select('A', 'B').sample(fraction=0.5, withReplacement=False).show()