3. SparkCore RDD

SparkCore RDD

SparkCore简介

Spark Core是Spark的核心组件,它提供了分布式数据集的基本操作和计算模型。Spark Core是Spark的核心组件,它提供了分布式数据集的基本操作和计算模型。Spark Core是Spark的核心组件,它提供了分布式数据集的基本操作和计算模型。

SparkCore架构

Spark Core的架构主要包括以下几个部分:

  • Driver Program:驱动程序,负责创建SparkContext,并提交作业到集群中执行。
  • Cluster Manager:集群管理器,负责管理集群资源,包括节点、CPU、内存等。
  • Executor:执行器,负责在集群中的节点上执行任务。
  • Task:任务,是Spark中最小的计算单元,由Executor执行。
  • RDD:弹性分布式数据集,是Spark的核心数据结构,用于存储和操作大规模数据。

什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

  • Dataset:一个数据集合,用于存放数据的。
  • Distributed:RDD中的数据是分布式存储的,可用于分布式计算。
  • Resilient:RDD中的数据可以存储在内存中或者磁盘中。

RDD具有以下特点:

  • 不可变性:RDD是不可变的,一旦创建就不能修改。
  • 分布式:RDD是分布式的,可以在多个节点上并行处理。
  • 容错性:RDD具有容错性,如果某个节点上的数据丢失,RDD可以自动从其他节点上恢复。
  • 可并行处理:RDD可以并行处理,可以在多个节点上同时执行任务。

RDD 五大特性:

  1. RDD有分区
  2. RDD计算方法会作用到每一个分区之上
  3. RDD之间是有相互依赖的关系
  4. KV型RDD可以有分区器(可选)
  5. RDD分区数据的读取会尽量靠近数据的所有地,(在同一台机器读取效率更高)。

RDD的依赖

RDD的依赖:一个RDD的形成可能由一个或者多个RDD得到的,此时这个RDD和之前的RDD之间产生依赖关系

Spark中,RDD之间的依赖关系,只要有两种类型:宽依赖和窄依赖

窄依赖:

  • 作用:能够让Spark程序并行计算,也就是一个分区数据计算出现问题的时候,其它分区不受影响
  • 特点:父RDD的分区和子RDD的分区是一对一关系,也就是父RDD分区的数据会整个被下游子RDD的分区接收

宽依赖:

  • 作用:划分stage的重要依据,宽依赖也叫shuffle依赖
  • 特点:父RDD的分区和子RDD的分区关系是一对多的关系,也就是父RDD的分区数据会被划成多份给到下游子RDD的多个分区做接收
  • 注意:如果有宽依赖,shuffle下游的其他操作,必须等待shuffle执行完成以后才能够继续执行,为了避免数据的不完整算子中一般以ByKey结尾的会发生shuffle;另外是重分区算子会发生shuffle

特性1:RDD有分区

# RDD中的数据被划分为多个分区,每个分区可以存储在集群中的一个节点上。分区数量可以根据集群的大小和数据的规模进行调整。
# 分区数量越多,可以并行处理的数据量就越大,从而提高计算效率。
pyspark.cmd --master local[*]
>>> sc.parallelize([1,2,3,4,5,6,7,8,9],3).glom().collect()
[[1, 2, 3], [4, 5, 6], [7, 8, 9]]
>>> sc.parallelize([1,2,3,4,5,6,7,8,9],6).glom().collect()
[[1], [2, 3], [4], [5, 6], [7], [8, 9]]

特性2:RDD计算方法会作用到每一个分区之上

# RDD的计算方法会作用到每一个分区之上,从而实现并行计算。例如,map、filter、reduce等操作都会作用到每一个分区之上。
>>> sc.parallelize([1,2,3,4,5,6,7,8,9],3).map(lambda x:x*10).glom().collect()
[[10, 20, 30], [40, 50, 60], [70, 80, 90]]

特性3:RDD之间是有相互依赖的关系

# RDD之间是有相互依赖的关系的,这种依赖关系称为血缘关系。RDD的血缘关系可以用于容错和优化计算。
>>> rdd=sc.parallelize([1,2,3,4,5,6,7,8,9],3)
>>> rdd1=rdd.map(lambda x:x*10)
>>> rdd2=rdd1.filter(lambda x:x>30)
>>> rdd1.collect()
[10, 20, 30, 40, 50, 60, 70, 80, 90]
>>> rdd2.collect()
[40, 50, 60, 70, 80, 90]
# rdd1和rdd2之间有依赖关系,rdd2依赖于rdd1,rdd1依赖于rdd。

特性4:KV型RDD可以有分区器(可选)

# KV型RDD可以有分区器,用于控制数据在分区之间的分布。例如,HashPartitioner和RangePartitioner等。默认HashPartitioner

特性5:RDD的分区规划会尽量规划到存储数据或者最接近数据的服务器上,(在同一台机器读取效率更高)。

RDD的创建

RDD可以通过以下几种方式创建:

  • 从集合中创建:通过调用SparkContext的parallelize方法,将一个集合转换为RDD。
  • 从外部存储系统中读取:通过调用SparkContext的textFile、sequenceFile等方法,从HDFS、HBase、Cassandra等外部存储系统中读取数据,并转换为RDD。
  • 从其他RDD转换:通过调用RDD的map、filter、reduce等方法,将一个RDD转换为另一个RDD。

并行化创建RDD

# 指的是装本地集合转向分布式RDD
# coding: utf-8
# 导入Spark相关包
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("ParallelizeRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # 创建一个本地集合
    local_list = [1, 2, 3, 4, 5, 6, 7, 8, 9]

    # 并行化创建RDD
    # rdd = sc.parallelize(local_list)   # 默认分区数根据CPU核数决定
    rdd = sc.parallelize(local_list, 3)   # 初始化数据集,并指定分区数

    # 打印RDD分区数
    print("RDD分区数:", rdd.getNumPartitions())

    # collect()方法将RDD中的数据收集到Driver端,并返回一个列表 print("RDD中的数据:", rdd.collect())
    # 打印RDD中的数据
    print(rdd.collect())

    # 打印每个分区的数据
    print(rdd.glom().collect())

    # 关闭SparkContext
    sc.stop()

读取文件创建RDD textFile API

# coding: utf-8
# 导入Spark相关包
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    sc.stop()
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("TextFileRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # 读取文件创建RDD
    # sc.textFile(参数1, 参数2)
    # 参数1:文件路径,可以是本地路径,也可以是HDFS路径
    # 参数2:可选,表示最小分区数量,spark会自己判断,允许范围内有效,超出失效,默认2
    rdd = sc.textFile("file:///D:/git-python/大数据-Spark/py/test.py")
    # rdd = sc.textFile("hdfs://ns1/user/admin/sh/test.sh")

    # 打印RDD分区数
    print("RDD分区数:", rdd.getNumPartitions())

    # 打印RDD中的数据,前三条
    print(rdd.take(3))  # take()方法返回一个列表,包含RDD中的前n个元素
    print(rdd.collect()[0:3])

    # 打印RDD后三条数据
    print(rdd.collect()[-3:])

    # 关闭SparkContext
    sc.stop()

读取文件创建RDD wholeTextFiles API

# coding: utf-8
# 导入Spark相关包
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    sc.stop()
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("WholeTextFileRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # 读取文件创建RDD
    # wholeTextFiles API 读取目录下的所有文件,返回一个键值对RDD,键是文件名,值是文件内容
    # rdd = sc.wholeTextFiles("hdfs://ns1/user/admin/sh")
    rdd = sc.wholeTextFiles("file:///D:/git-python/大数据-Spark/py/*.py")

    # # 打印RDD分区数
    print("RDD分区数:", rdd.getNumPartitions())

    # 打印RDD中的数据
    print(rdd.collect()[0:1])

    # 关闭SparkContext
    sc.stop()

案例

# 读取files目录的文件order.txt,提取北京的数据,组合北京与商品类别并进行出重
from pyspark import SparkContext,SparkConf
import json

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("orderRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    # file_rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/order.txt")
    file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
    # 进行rdd的转换操作,以|为分隔符,得到一个个的json数据
    json_rdd = file_rdd.flatMap(lambda line: line.split("|"))
    # 将json数据转换为字典
    dict_rdd = json_rdd.map(lambda line: json.loads(line))

    # 过滤数据,提取北京的数据
    beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')

    # 组合北京与商品类别并进行去重
    result_rdd = beijing_rdd.map(lambda d: d['areaName'] + '_' + d['category']).distinct()

    print(result_rdd.collect())

    sc.stop()

将案例提交到集群上运行

# 终端软件直接运行方式
# 配置好环境变量
import os
# 配置HADOOP配置文件目录
os.environ["HADOOP_CONF_DIR"] = "D:/hadoop/hadoop-3.3.6/etc/hadoop"
# 集群环境读取hdfs上的数据
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")

# 如果在代码中运行,如果提交集群环境运行的文件有依赖其他的代码文件
# 使用spark.submit.pyfiles参数设置上传的依赖文件,也可以是多少个依赖文件的zip压缩包
conf.set("spark.submit.pyfiles", "order2.py")

# 通过spark-submit提交到集群上运行
# coding: utf-8
conf = SparkConf().setAppName("orderRDD").setMaster("yarn")
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt",use_unicode=False)  # use_unicode 默认为True,读取文件时,会将文件内容转换为unicode编码,如果文件内容为utf-8编码,则会出现乱码,所以需要设置为False
# spark-submit --master yarn --deploy-mode cluster order.py  --num-executors 3

Python环境中文字符串报错

# python2
# coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

# python3
import sys
import codecs
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())

python2.x 可用utf-8编码,但是pyspark是用unicode编码的,所以涉及数据交互的时候必须用进行编码和解码;

python2.x 的中文是utf-8编码,需要 x.decode("utf-8") 转换为 unicode;

pyspark 的中文是unicode编码,转换的话可用 x.encode("utf-8")编码成utf-8;
# -*- coding: utf-8 -*-
# coding: utf-8
# coding=utf-8
# import sys
# reload(sys)
# sys.setdefaultencoding('utf-8')

from pyspark import SparkContext,SparkConf
import json

if __name__ == '__main__':
    conf = SparkConf().setAppName("orderRDD").setMaster("yarn")
    conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
    conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
    conf.set('spark.python.unicode.decode.charset', 'utf-8')
    sc = SparkContext(conf=conf)
    #file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
    file_rdd = sc.textFile("hdfs://ns1/user/admin/files/test.txt",use_unicode=False)
    print(file_rdd.collect())
    json_rdd = file_rdd.flatMap(lambda line: str(line).split("|"))
    dict_rdd = json_rdd.map(lambda line: json.loads(line,encoding='utf-8'))
    print(dict_rdd.collect())
    ### 中文匹配筛选时,需要将中文转换为unicode编码
    foo_utf = '北京'
    foo_unicode = foo_utf.decode("utf-8")
    print("-----------------",foo_unicode)
    #beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')
    beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == foo_unicode)
    #beijing_rdd = dict_rdd.filter(lambda d: d['id'] == 10)
    result_rdd = beijing_rdd.map(lambda d: d['areaName'] + '_' + d['category']).distinct()
    result_rdd.saveAsTextFile("hdfs://ns1/user/admin/files/test2")
    print(result_rdd.collect())
    sc.stop()
spark-submit --conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"  \
             --conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"  \
             --master yarn --deploy-mode client order.py
             #  --conf spark.pyspark.python=/bin/python3 \
             #  --conf spark.pyspark.driver.python=/bin/python3 \

spark-submit --conf spark.driver.extraJavaOptions=" -Dfile.encoding=utf-8 " \
             --conf spark.executor.extraJavaOptions=" -Dfile.encoding=utf-8 " \
             --master yarn --deploy-mode client order.py

# hdfs dfs -cat /user/admin/files/test2/*                                                                                            北京_家电
# 北京_手机
# 北京_家具
# 北京_书籍
# 北京_服饰
# 北京_电脑
# 北京_食品
# 北京_平板电脑

SparkSession

# -*- coding: utf-8 -*-
# coding: utf-8
# coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("Read HDFS File") \
    .getOrCreate()

# 读取 HDFS 上的文件
#df = spark.read.format("text").option("header", "true").load("hdfs://ns1/user/admin/files/order.txt")
df = spark.read.option("encoding", "UTF-8").text("hdfs://ns1/user/admin/files/order.txt")

# 显示数据
df.show(100, truncate=False)

wordcount案例

from pyspark import SparkContext, SparkConf
import os
import shutil
import subprocess

# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
os.environ['SPARK_HOME'] = 'D:\miniforge3\Lib\site-packages\pyspark'
# PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
# os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
# os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

# hdfs操作
def delete_hdfs_directory(directory):
    subprocess.call(["hdfs.cmd", "dfs", "-rm", "-r", directory])

def delete_directory(path):
    if os.path.exists(path):
        try:
            shutil.rmtree(path)
            print(f"The directory '{path}' has been deleted successfully.")
        except OSError as e:
            print(f"Error: {e.strerror}")
    else:
        print(f"The directory '{path}' does not exist.")

if __name__ == '__main__':
    print('PySpark First Program')
    # TODO: 当应用运行在集群上的时候,MAIN函数就是DriverProgram,必须创建SparkContext对象
    # 创建SparkConf对象,设置应用的配置信息,比如应用名称和应用运行模式
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    # TODO: 构建SparkContext上下文实例对象,读取数据和调度Job执行
    sc = SparkContext(conf=conf)
    # 设置编码方式
    conf.set("spark.sql.repl.eagerEval.enabled", True)
    conf.set("spark.sql.repl.eagerEval.truncate", 20)
    conf.set("spark.sql.repl.eagerEval.maxNumRows", 100)
    conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8")
    conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8")

    # 第一步、读取本地数据封装到RDD集合,认为列表List
    # wordsRDD = sc.textFile("hdfs://ns1/user/admin/sh/test.sh")
    wordsRDD = sc.textFile("file:///D:/git-python/大数据-Spark/py/test.py")
    # 第二步、处理数据调用RDD中函数,认为调用列表中的函数
    # a. 每行数据分割为单词
    flatMapRDD = wordsRDD.flatMap(lambda line: line.split(" "))
    # b. 转换为二元组,表示每个单词出现一次
    mapRDD = flatMapRDD.map(lambda x: (x, 1))
    # c. 按照Key分组聚合
    resultRDD = mapRDD.reduceByKey(lambda a, b: a + b)
    # 第三步、输出数据
    res_rdd_col2 = resultRDD.collect()
    # 输出到控制台
    for line in res_rdd_col2:
        print(line)
    # 输出到本地文件中
    # hdfs目录存在时删除
    # delete_hdfs_directory("/user/spark/py/test")
    # resultRDD.saveAsTextFile("hdfs://ns1/user/spark/py/test")
    # 执行前清空目录
    delete_directory("E:\\test\\wordcount")
    resultRDD.saveAsTextFile("file:///E:/test/wordcount")
    print('停止 PySpark SparkSession 对象')
    # 关闭SparkContext
    # sc.stop()

spark3.3.4

# coding: utf-8
from pyspark import SparkContext,SparkConf
import json

if __name__ == '__main__':
    conf = SparkConf().setAppName("orderRDD").setMaster("yarn")
    conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
    conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
    conf.set('spark.python.unicode.decode.charset', 'utf-8')
    sc = SparkContext(conf=conf)
    file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
    #file_rdd = sc.textFile("hdfs://ns1/user/admin/files/test.txt",use_unicode=False)
    print(file_rdd.collect())
    json_rdd = file_rdd.flatMap(lambda line: str(line).split("|"))
    dict_rdd = json_rdd.map(lambda line: json.loads(line))
    print(dict_rdd.collect())
    beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')
    print(beijing_rdd.collect())
    result_rdd = beijing_rdd.map(lambda d: d['areaName'] + '_' + d['category']).distinct()
    print(result_rdd.collect())
    #result_rdd.saveAsTextFile("hdfs://ns1/user/admin/files/test2")
    sc.stop()
spark3-submit --master yarn \
 --conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
 --conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
 --conf "spark.default.parallelism=100" \
 --executor-memory 8g \
 --executor-cores 8 \
 --num-executors 1 \
 --deploy-mode client order_spark3.py