SparkCore RDD
SparkCore简介
Spark Core是Spark的核心组件,它提供了分布式数据集的基本操作和计算模型。Spark Core是Spark的核心组件,它提供了分布式数据集的基本操作和计算模型。Spark Core是Spark的核心组件,它提供了分布式数据集的基本操作和计算模型。
SparkCore架构
Spark Core的架构主要包括以下几个部分:
- Driver Program:驱动程序,负责创建SparkContext,并提交作业到集群中执行。
- Cluster Manager:集群管理器,负责管理集群资源,包括节点、CPU、内存等。
- Executor:执行器,负责在集群中的节点上执行任务。
- Task:任务,是Spark中最小的计算单元,由Executor执行。
- RDD:弹性分布式数据集,是Spark的核心数据结构,用于存储和操作大规模数据。
什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
- Dataset:一个数据集合,用于存放数据的。
- Distributed:RDD中的数据是分布式存储的,可用于分布式计算。
- Resilient:RDD中的数据可以存储在内存中或者磁盘中。
RDD具有以下特点:
- 不可变性:RDD是不可变的,一旦创建就不能修改。
- 分布式:RDD是分布式的,可以在多个节点上并行处理。
- 容错性:RDD具有容错性,如果某个节点上的数据丢失,RDD可以自动从其他节点上恢复。
- 可并行处理:RDD可以并行处理,可以在多个节点上同时执行任务。
RDD 五大特性:
- RDD有分区
- RDD计算方法会作用到每一个分区之上
- RDD之间是有相互依赖的关系
- KV型RDD可以有分区器(可选)
- RDD分区数据的读取会尽量靠近数据的所有地,(在同一台机器读取效率更高)。
RDD的依赖
RDD的依赖:一个RDD的形成可能由一个或者多个RDD得到的,此时这个RDD和之前的RDD之间产生依赖关系
Spark中,RDD之间的依赖关系,只要有两种类型:宽依赖和窄依赖
窄依赖:
- 作用:能够让Spark程序并行计算,也就是一个分区数据计算出现问题的时候,其它分区不受影响
- 特点:父RDD的分区和子RDD的分区是一对一关系,也就是父RDD分区的数据会整个被下游子RDD的分区接收
宽依赖:
- 作用:划分stage的重要依据,宽依赖也叫shuffle依赖
- 特点:父RDD的分区和子RDD的分区关系是一对多的关系,也就是父RDD的分区数据会被划成多份给到下游子RDD的多个分区做接收
- 注意:如果有宽依赖,shuffle下游的其他操作,必须等待shuffle执行完成以后才能够继续执行,为了避免数据的不完整算子中一般以ByKey结尾的会发生shuffle;另外是重分区算子会发生shuffle
特性1:RDD有分区
# RDD中的数据被划分为多个分区,每个分区可以存储在集群中的一个节点上。分区数量可以根据集群的大小和数据的规模进行调整。
# 分区数量越多,可以并行处理的数据量就越大,从而提高计算效率。
pyspark.cmd --master local[*]
>>> sc.parallelize([1,2,3,4,5,6,7,8,9],3).glom().collect()
[[1, 2, 3], [4, 5, 6], [7, 8, 9]]
>>> sc.parallelize([1,2,3,4,5,6,7,8,9],6).glom().collect()
[[1], [2, 3], [4], [5, 6], [7], [8, 9]]
特性2:RDD计算方法会作用到每一个分区之上
# RDD的计算方法会作用到每一个分区之上,从而实现并行计算。例如,map、filter、reduce等操作都会作用到每一个分区之上。
>>> sc.parallelize([1,2,3,4,5,6,7,8,9],3).map(lambda x:x*10).glom().collect()
[[10, 20, 30], [40, 50, 60], [70, 80, 90]]
特性3:RDD之间是有相互依赖的关系
# RDD之间是有相互依赖的关系的,这种依赖关系称为血缘关系。RDD的血缘关系可以用于容错和优化计算。
>>> rdd=sc.parallelize([1,2,3,4,5,6,7,8,9],3)
>>> rdd1=rdd.map(lambda x:x*10)
>>> rdd2=rdd1.filter(lambda x:x>30)
>>> rdd1.collect()
[10, 20, 30, 40, 50, 60, 70, 80, 90]
>>> rdd2.collect()
[40, 50, 60, 70, 80, 90]
# rdd1和rdd2之间有依赖关系,rdd2依赖于rdd1,rdd1依赖于rdd。
特性4:KV型RDD可以有分区器(可选)
# KV型RDD可以有分区器,用于控制数据在分区之间的分布。例如,HashPartitioner和RangePartitioner等。默认HashPartitioner
特性5:RDD的分区规划会尽量规划到存储数据或者最接近数据的服务器上,(在同一台机器读取效率更高)。
RDD的创建
RDD可以通过以下几种方式创建:
- 从集合中创建:通过调用SparkContext的parallelize方法,将一个集合转换为RDD。
- 从外部存储系统中读取:通过调用SparkContext的textFile、sequenceFile等方法,从HDFS、HBase、Cassandra等外部存储系统中读取数据,并转换为RDD。
- 从其他RDD转换:通过调用RDD的map、filter、reduce等方法,将一个RDD转换为另一个RDD。
并行化创建RDD
# 指的是装本地集合转向分布式RDD
# coding: utf-8
# 导入Spark相关包
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("ParallelizeRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 创建一个本地集合
local_list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
# 并行化创建RDD
# rdd = sc.parallelize(local_list) # 默认分区数根据CPU核数决定
rdd = sc.parallelize(local_list, 3) # 初始化数据集,并指定分区数
# 打印RDD分区数
print("RDD分区数:", rdd.getNumPartitions())
# collect()方法将RDD中的数据收集到Driver端,并返回一个列表 print("RDD中的数据:", rdd.collect())
# 打印RDD中的数据
print(rdd.collect())
# 打印每个分区的数据
print(rdd.glom().collect())
# 关闭SparkContext
sc.stop()
读取文件创建RDD textFile API
# coding: utf-8
# 导入Spark相关包
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
sc.stop()
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("TextFileRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 读取文件创建RDD
# sc.textFile(参数1, 参数2)
# 参数1:文件路径,可以是本地路径,也可以是HDFS路径
# 参数2:可选,表示最小分区数量,spark会自己判断,允许范围内有效,超出失效,默认2
rdd = sc.textFile("file:///D:/git-python/大数据-Spark/py/test.py")
# rdd = sc.textFile("hdfs://ns1/user/admin/sh/test.sh")
# 打印RDD分区数
print("RDD分区数:", rdd.getNumPartitions())
# 打印RDD中的数据,前三条
print(rdd.take(3)) # take()方法返回一个列表,包含RDD中的前n个元素
print(rdd.collect()[0:3])
# 打印RDD后三条数据
print(rdd.collect()[-3:])
# 关闭SparkContext
sc.stop()
读取文件创建RDD wholeTextFiles API
# coding: utf-8
# 导入Spark相关包
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
sc.stop()
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("WholeTextFileRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 读取文件创建RDD
# wholeTextFiles API 读取目录下的所有文件,返回一个键值对RDD,键是文件名,值是文件内容
# rdd = sc.wholeTextFiles("hdfs://ns1/user/admin/sh")
rdd = sc.wholeTextFiles("file:///D:/git-python/大数据-Spark/py/*.py")
# # 打印RDD分区数
print("RDD分区数:", rdd.getNumPartitions())
# 打印RDD中的数据
print(rdd.collect()[0:1])
# 关闭SparkContext
sc.stop()
案例
# 读取files目录的文件order.txt,提取北京的数据,组合北京与商品类别并进行出重
from pyspark import SparkContext,SparkConf
import json
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("orderRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
# file_rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/order.txt")
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
# 进行rdd的转换操作,以|为分隔符,得到一个个的json数据
json_rdd = file_rdd.flatMap(lambda line: line.split("|"))
# 将json数据转换为字典
dict_rdd = json_rdd.map(lambda line: json.loads(line))
# 过滤数据,提取北京的数据
beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')
# 组合北京与商品类别并进行去重
result_rdd = beijing_rdd.map(lambda d: d['areaName'] + '_' + d['category']).distinct()
print(result_rdd.collect())
sc.stop()
将案例提交到集群上运行
# 终端软件直接运行方式
# 配置好环境变量
import os
# 配置HADOOP配置文件目录
os.environ["HADOOP_CONF_DIR"] = "D:/hadoop/hadoop-3.3.6/etc/hadoop"
# 集群环境读取hdfs上的数据
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
# 如果在代码中运行,如果提交集群环境运行的文件有依赖其他的代码文件
# 使用spark.submit.pyfiles参数设置上传的依赖文件,也可以是多少个依赖文件的zip压缩包
conf.set("spark.submit.pyfiles", "order2.py")
# 通过spark-submit提交到集群上运行
# coding: utf-8
conf = SparkConf().setAppName("orderRDD").setMaster("yarn")
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt",use_unicode=False) # use_unicode 默认为True,读取文件时,会将文件内容转换为unicode编码,如果文件内容为utf-8编码,则会出现乱码,所以需要设置为False
# spark-submit --master yarn --deploy-mode cluster order.py --num-executors 3
Python环境中文字符串报错
# python2
# coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
# python3
import sys
import codecs
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
python2.x 可用utf-8编码,但是pyspark是用unicode编码的,所以涉及数据交互的时候必须用进行编码和解码;
python2.x 的中文是utf-8编码,需要 x.decode("utf-8") 转换为 unicode;
pyspark 的中文是unicode编码,转换的话可用 x.encode("utf-8")编码成utf-8;
# -*- coding: utf-8 -*-
# coding: utf-8
# coding=utf-8
# import sys
# reload(sys)
# sys.setdefaultencoding('utf-8')
from pyspark import SparkContext,SparkConf
import json
if __name__ == '__main__':
conf = SparkConf().setAppName("orderRDD").setMaster("yarn")
conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
conf.set('spark.python.unicode.decode.charset', 'utf-8')
sc = SparkContext(conf=conf)
#file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/test.txt",use_unicode=False)
print(file_rdd.collect())
json_rdd = file_rdd.flatMap(lambda line: str(line).split("|"))
dict_rdd = json_rdd.map(lambda line: json.loads(line,encoding='utf-8'))
print(dict_rdd.collect())
### 中文匹配筛选时,需要将中文转换为unicode编码
foo_utf = '北京'
foo_unicode = foo_utf.decode("utf-8")
print("-----------------",foo_unicode)
#beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')
beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == foo_unicode)
#beijing_rdd = dict_rdd.filter(lambda d: d['id'] == 10)
result_rdd = beijing_rdd.map(lambda d: d['areaName'] + '_' + d['category']).distinct()
result_rdd.saveAsTextFile("hdfs://ns1/user/admin/files/test2")
print(result_rdd.collect())
sc.stop()
spark-submit --conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
--conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
--master yarn --deploy-mode client order.py
# --conf spark.pyspark.python=/bin/python3 \
# --conf spark.pyspark.driver.python=/bin/python3 \
spark-submit --conf spark.driver.extraJavaOptions=" -Dfile.encoding=utf-8 " \
--conf spark.executor.extraJavaOptions=" -Dfile.encoding=utf-8 " \
--master yarn --deploy-mode client order.py
# hdfs dfs -cat /user/admin/files/test2/* 北京_家电
# 北京_手机
# 北京_家具
# 北京_书籍
# 北京_服饰
# 北京_电脑
# 北京_食品
# 北京_平板电脑
SparkSession
# -*- coding: utf-8 -*-
# coding: utf-8
# coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder \
.appName("Read HDFS File") \
.getOrCreate()
# 读取 HDFS 上的文件
#df = spark.read.format("text").option("header", "true").load("hdfs://ns1/user/admin/files/order.txt")
df = spark.read.option("encoding", "UTF-8").text("hdfs://ns1/user/admin/files/order.txt")
# 显示数据
df.show(100, truncate=False)
wordcount案例
from pyspark import SparkContext, SparkConf
import os
import shutil
import subprocess
# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
os.environ['SPARK_HOME'] = 'D:\miniforge3\Lib\site-packages\pyspark'
# PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
# os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
# os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
# hdfs操作
def delete_hdfs_directory(directory):
subprocess.call(["hdfs.cmd", "dfs", "-rm", "-r", directory])
def delete_directory(path):
if os.path.exists(path):
try:
shutil.rmtree(path)
print(f"The directory '{path}' has been deleted successfully.")
except OSError as e:
print(f"Error: {e.strerror}")
else:
print(f"The directory '{path}' does not exist.")
if __name__ == '__main__':
print('PySpark First Program')
# TODO: 当应用运行在集群上的时候,MAIN函数就是DriverProgram,必须创建SparkContext对象
# 创建SparkConf对象,设置应用的配置信息,比如应用名称和应用运行模式
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
# TODO: 构建SparkContext上下文实例对象,读取数据和调度Job执行
sc = SparkContext(conf=conf)
# 设置编码方式
conf.set("spark.sql.repl.eagerEval.enabled", True)
conf.set("spark.sql.repl.eagerEval.truncate", 20)
conf.set("spark.sql.repl.eagerEval.maxNumRows", 100)
conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8")
conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8")
# 第一步、读取本地数据封装到RDD集合,认为列表List
# wordsRDD = sc.textFile("hdfs://ns1/user/admin/sh/test.sh")
wordsRDD = sc.textFile("file:///D:/git-python/大数据-Spark/py/test.py")
# 第二步、处理数据调用RDD中函数,认为调用列表中的函数
# a. 每行数据分割为单词
flatMapRDD = wordsRDD.flatMap(lambda line: line.split(" "))
# b. 转换为二元组,表示每个单词出现一次
mapRDD = flatMapRDD.map(lambda x: (x, 1))
# c. 按照Key分组聚合
resultRDD = mapRDD.reduceByKey(lambda a, b: a + b)
# 第三步、输出数据
res_rdd_col2 = resultRDD.collect()
# 输出到控制台
for line in res_rdd_col2:
print(line)
# 输出到本地文件中
# hdfs目录存在时删除
# delete_hdfs_directory("/user/spark/py/test")
# resultRDD.saveAsTextFile("hdfs://ns1/user/spark/py/test")
# 执行前清空目录
delete_directory("E:\\test\\wordcount")
resultRDD.saveAsTextFile("file:///E:/test/wordcount")
print('停止 PySpark SparkSession 对象')
# 关闭SparkContext
# sc.stop()
spark3.3.4
# coding: utf-8
from pyspark import SparkContext,SparkConf
import json
if __name__ == '__main__':
conf = SparkConf().setAppName("orderRDD").setMaster("yarn")
conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
conf.set('spark.python.unicode.decode.charset', 'utf-8')
sc = SparkContext(conf=conf)
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
#file_rdd = sc.textFile("hdfs://ns1/user/admin/files/test.txt",use_unicode=False)
print(file_rdd.collect())
json_rdd = file_rdd.flatMap(lambda line: str(line).split("|"))
dict_rdd = json_rdd.map(lambda line: json.loads(line))
print(dict_rdd.collect())
beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')
print(beijing_rdd.collect())
result_rdd = beijing_rdd.map(lambda d: d['areaName'] + '_' + d['category']).distinct()
print(result_rdd.collect())
#result_rdd.saveAsTextFile("hdfs://ns1/user/admin/files/test2")
sc.stop()
spark3-submit --master yarn \
--conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
--conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
--conf "spark.default.parallelism=100" \
--executor-memory 8g \
--executor-cores 8 \
--num-executors 1 \
--deploy-mode client order_spark3.py