4. SparkCore RDD算子

SparkCore RDD算子

算子:分布式集合对象上的API称之为算子

算子分类:

  • Transformation(转换):将RDD从一个类型转换成另一个类型,返回一个新的RDD
  • Action(动作):对RDD进行计算,返回一个值,或者将数据写入外部存储系统

Transformation算子

  • map:对RDD中的每个元素应用函数,返回一个新的RDD
  • filter:过滤RDD中的元素,返回一个新的RDD
  • flatMap:将RDD中的每个元素应用函数,返回一个新的RDD,然后进行解除嵌套操作,将结果展平
  • mapPartitions:对RDD中的每个分区应用函数,返回一个新的RDD
  • mapPartitionsWithIndex:对RDD中的每个分区应用函数,并传递分区索引,返回一个新的RDD
  • sample:对RDD进行采样,返回一个新的RDD
  • union:合并两个RDD,返回一个新的RDD
  • intersection:求两个RDD的交集,返回一个新的RDD
  • distinct:去重,返回一个新的RDD
  • groupByKey:对RDD中的元素按照key进行分组,返回一个新的RDD
  • reduceByKey:对RDD中的元素按照key进行聚合,返回一个新的RDD
  • sortByKey:对RDD中的元素按照key进行排序,返回一个新的RDD
  • join:对两个RDD进行内连接,返回一个新的RDD
  • cogroup:对两个RDD进行分组连接,返回一个新的RDD
  • cartesian:对两个RDD进行笛卡尔积,返回一个新的RDD
  • coalesce:减少RDD的分区数量,返回一个新的RDD
  • repartition:重新分区RDD,返回一个新的RDD
  • partitionBy:按照给定的分区函数重新分区RDD,返回一个新的RDD
  • cache:将RDD缓存到内存中
  • persist:将RDD持久化到磁盘或内存中
  • checkpoint:将RDD持久化到磁盘,并清除之前的缓存
  • count:计算RDD中的元素数量
  • first:返回RDD中的第一个元素
  • take:返回RDD中的前n个元素
  • reduce:对RDD中的元素进行聚合操作

Action算子

  • reduce:对RDD中的元素进行聚合操作
  • collect:将RDD中的所有元素收集到Driver端,返回一个列表
  • count:计算RDD中的元素数量
  • take:返回RDD中的前n个元素
  • saveAsTextFile:将RDD中的元素保存为文本文件
  • saveAsSequenceFile:将RDD中的元素保存为SequenceFile
  • saveAsObjectFile:将RDD中的元素保存为对象文件
  • countByKey:对RDD中的元素按照key进行计数,返回一个字典
  • foreach:对RDD中的每个元素应用函数
  • takeOrdered:返回RDD中的前n个元素,按照自然顺序排序
  • top:返回RDD中的前n个元素,按照自然顺序排序
  • reduceByKeyLocally:对RDD中的元素按照key进行聚合,返回一个字典
  • lookup:返回RDD中指定key的所有元素
  • fold:对RDD中的元素进行折叠操作

常用Transformation算子

map算子

# 语法:rdd.map(func)
# coding: utf-8
# 导入Spark相关包
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("MapRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # 读取文件创建RDD
    rdd = sc.textFile("file:///D:/git-python/大数据-Spark/py/*.py")

    # 使用map算子将RDD中的每一行转换为大写
    rdd = rdd.map(lambda x: x.upper())

    print(rdd.collect()[0:1])

    sc.stop()

flatMap算子

# 语法:rdd.flatMap(func)
# 功能:对rdd执行map操作,然后进行解嵌套操作
# coding: utf-8
# 导入Spark相关包
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    sc.stop()
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("flatMapRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize(["hello world", "hello spark", "hello python"])

    # 使用map与flatMap算子将RDD中的元素进行分隔
    rdd1 = rdd.map(lambda x: x.split(" "))
    rdd2 = rdd.flatMap(lambda x: x.split(" "))

    print('map:',rdd1.collect())
    print('flapMap:',rdd2.collect())

    sc.stop()

    # [['hello', 'world'], ['hello', 'spark'], ['hello', 'python']]  # map
    # ['hello', 'world', 'hello', 'spark', 'hello', 'python']   # flatMap 解除了嵌套

reduceByKey算子

# 语法:rdd.reduceByKey(func)
# 功能:针对KV型RDD中的元素按照key进行分组,然后对每个分组中的元素进行聚合操作
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("reduceByKeyRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1), ('b', 1), ('a', 1)])

    # 使用reduceByKey算子对RDD中的元素按照key进行分组,然后对每个分组中的元素进行求和操作
    rdd = rdd.reduceByKey(lambda x, y: x + y)

    print(rdd.collect())

    sc.stop()

##  groupByKey和reduceByKey的区别
# groupByKey仅仅有分组功能而已
# reduceByKey除了有 ByKey 的分组功能外,还有reduce 聚合功能,是一个分组+聚合一体化的算子.

mapValues算子

# 语法:rdd.mapValues(func)
# 功能:针对KV型RDD中的value进行map操作
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("mapValuesRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5)])

    # 使用mapValues算子对RDD中的value进行map操作
    rdd = rdd.mapValues(lambda x: x * 2)

    print(rdd.collect())

    sc.stop()

groupBy算子

# 语法:rdd.groupBy(func)
# 功能:针对RDD中的元素按照指定的函数进行分组
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("groupByRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

    # 使用groupBy算子对RDD中的元素进行偶数和奇数分组
    rdd2 = rdd.groupBy(lambda x: 'even' if (x % 2 == 0) else 'odd')

    print(rdd2.collect())
    # [('odd', <pyspark.resultiterable.ResultIterable object at 0x00000265B0A23DC0>), ('even', <pyspark.resultiterable.ResultIterable object at 0x00000265B08BF010>)]
    # 将rdd2的元素的value转换成list
    print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())
    # [('odd', [1, 3, 5, 7, 9]), ('even', [2, 4, 6, 8, 10])]
    # 关闭SparkContext
    sc.stop()

Filter算子

# 语法:rdd.filter(func)
# 功能:针对RDD中的元素进行过滤,返回一个新的RDD
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("filterRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

    # 使用filter算子对RDD中的元素进行过滤,返回一个新的RDD
    rdd2 = rdd.filter(lambda x: x % 2 == 0)

    print(rdd2.collect())

    sc.stop()

distinct算子

# 语法:rdd.distinct()
# 功能:对RDD中的元素进行去重操作
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("distinctRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10,1,2,3,4,5,6,7,8,9,10])
    # 使用distinct算子对RDD中的元素进行去重操作
    rdd2 = rdd.distinct()

    print(rdd2.collect())

    sc.stop()

union算子

# 语法:rdd.union(otherRDD)
# 功能:对两个RDD进行合并操作,返回一个新的RDD
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("unionRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
    rdd2 = sc.parallelize([11,12,13,14,15,16,17,18,19,20])
    # 使用union算子对两个RDD进行合并操作
    rdd3 = rdd.union(rdd2)

    print(rdd3.collect())

    sc.stop()

join算子

# 语法:rdd1.join(rdd2),rdd1.leftOuterJoin(rdd2),rdd1.rightOuterJoin(rdd2)
# 功能:对两个RDD进行join操作(可实现sql的内/外连接)
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("joinRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([(1,2),(3,4),(5,6)])
    rdd2 = sc.parallelize([(1,7),(3,8),(5,9)])
    # 使用join算子对两个RDD进行join操作
    # 对于join算子,如果两个RDD中存在相同的key,则将对应的value进行合并
    rdd3 = rdd.join(rdd2)

    print(rdd3.collect())

    sc.stop()

intersection算子

# 语法:rdd.intersection(otherRDD)
# 功能:对两个RDD进行交集操作,返回一个新的RDD
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("intersectionRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
    rdd2 = sc.parallelize([5,6,7,8,9])
    # 使用intersection算子对两个RDD进行交集操作
    rdd3 = rdd.intersection(rdd2)

    print(rdd3.collect())

    sc.stop()

glom算子

# 语法:rdd.glom()
# 功能:将RDD中的每个分区合并为一个数组
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("glomRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],5)
    # 使用glom算子将RDD中的每个分区合并为一个数组
    rdd2 = rdd.glom()

    print(rdd2.collect())
    sc.stop()

groupByKey算子

# 语法:rdd.groupByKey()
# 功能:将RDD中相同key的元素进行分组,返回一个新的RDD
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("groupByKeyRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([(1,2),(3,4),(5,6),(1,7),(3,8),(5,9)])
    # 使用groupByKey算子将RDD中相同key的元素进行分组
    rdd2 = rdd.groupByKey()

    # print(rdd2.collect())
    print(rdd2.mapValues(list).collect())   # 将value转换为列表
    sc.stop()

sortBy算子

# 语法:rdd.sortBy(lambda x:x[0],ascending=True, numPartitions=None)
# numPartitions:指定分区数,默认为None,表示不改变分区数,设置后多excutor时,排序结果可能不同
# 功能:对RDD中的元素进行排序,返回一个新的RDD
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("sortByRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([(1,2),(3,4),(5,6),(1,7),(3,8),(5,9)])
    # 使用sortBy算子对RDD中的元素进行排序
    rdd2 = rdd.sortBy(lambda x:x[0],ascending=True)

    print(rdd2.collect())
    sc.stop()

sortByKey算子

# 语法:rdd.sortByKey()
# 功能:针对kv型RDD中的元素按时key进行排序,返回一个新的RDD
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("sortByKeyRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('a',1),('b',2),('D',3),('c',4),('R',6),('g',7),('d',8),('K',9)])
    # 使用sortByKey算子对RDD中的元素进行排序,默认为升序,keyfunc=lambda key:str(key).lower()将key转换为小写字母
    rdd2 = rdd.sortByKey(ascending=True,keyfunc=lambda key:str(key).lower())

    print(rdd2.collect())
    sc.stop()

案例

# 读取files目录的文件order.txt,提取北京的数据,组合北京与商品类别并进行出重
from pyspark import SparkContext,SparkConf
import json

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("orderRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    # file_rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/order.txt")
    file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
    # 进行rdd的转换操作,以|为分隔符,得到一个个的json数据
    json_rdd = file_rdd.flatMap(lambda line: line.split("|"))
    # 将json数据转换为字典
    dict_rdd = json_rdd.map(lambda line: json.loads(line))

    # 过滤数据,提取北京的数据
    beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')

    # 组合北京与商品类别并进行去重
    result_rdd = beijing_rdd.map(lambda d: d['areaName'] + '_' + d['category']).distinct()

    print(result_rdd.collect())

    sc.stop()

将案例提交到集群上运行

# 终端软件直接运行方式
# 配置好环境变量
import os
# 配置HADOOP配置文件目录
os.environ["HADOOP_CONF_DIR"] = "D:/hadoop/hadoop-3.3.6/etc/hadoop"
# 集群环境读取hdfs上的数据
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")

# 如果在代码中运行,如果提交集群环境运行的文件有依赖其他的代码文件
# 使用spark.submit.pyfiles参数设置上传的依赖文件,也可以是多少个依赖文件的zip压缩包
conf.set("spark.submit.pyfiles", "order2.py")

# 通过spark-submit提交到集群上运行
# coding: utf-8
conf = SparkConf().setAppName("orderRDD").setMaster("yarn")
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt",use_unicode=False)  # use_unicode 默认为True,读取文件时,会将文件内容转换为unicode编码,如果文件内容为utf-8编码,则会出现乱码,所以需要设置为False
# spark-submit --master yarn --deploy-mode cluster order.py  --num-executors 3

Python环境中文字符串报错

# python2
# coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

# python3
import sys
import codecs
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())

python2.x 可用utf-8编码,但是pyspark是用unicode编码的,所以涉及数据交互的时候必须用进行编码和解码;

python2.x 的中文是utf-8编码,需要 x.decode("utf-8") 转换为 unicode;

pyspark 的中文是unicode编码,转换的话可用 x.encode("utf-8")编码成utf-8;
# -*- coding: utf-8 -*-
# coding: utf-8
# coding=utf-8
# import sys
# reload(sys)
# sys.setdefaultencoding('utf-8')

from pyspark import SparkContext,SparkConf
import json

if __name__ == '__main__':
    conf = SparkConf().setAppName("orderRDD").setMaster("yarn")
    conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
    conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
    conf.set('spark.python.unicode.decode.charset', 'utf-8')
    sc = SparkContext(conf=conf)
    #file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
    file_rdd = sc.textFile("hdfs://ns1/user/admin/files/test.txt",use_unicode=False)
    print(file_rdd.collect())
    json_rdd = file_rdd.flatMap(lambda line: str(line).split("|"))
    dict_rdd = json_rdd.map(lambda line: json.loads(line,encoding='utf-8'))
    print(dict_rdd.collect())
    ### 中文匹配筛选时,需要将中文转换为unicode编码
    foo_utf = '北京'
    foo_unicode = foo_utf.decode("utf-8")
    print("-----------------",foo_unicode)
    #beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')
    beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == foo_unicode)
    #beijing_rdd = dict_rdd.filter(lambda d: d['id'] == 10)
    result_rdd = beijing_rdd.map(lambda d: d['areaName'] + '_' + d['category']).distinct()
    result_rdd.saveAsTextFile("hdfs://ns1/user/admin/files/test2")
    print(result_rdd.collect())
    sc.stop()
spark-submit --conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"  \
             --conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"  \
             --master yarn --deploy-mode client order.py
             #  --conf spark.pyspark.python=/bin/python3 \
             #  --conf spark.pyspark.driver.python=/bin/python3 \

spark-submit --conf spark.driver.extraJavaOptions=" -Dfile.encoding=utf-8 " \
             --conf spark.executor.extraJavaOptions=" -Dfile.encoding=utf-8 " \
             --master yarn --deploy-mode client order.py

# hdfs dfs -cat /user/admin/files/test2/*                                                                                            北京_家电
# 北京_手机
# 北京_家具
# 北京_书籍
# 北京_服饰
# 北京_电脑
# 北京_食品
# 北京_平板电脑

SparkSession

# -*- coding: utf-8 -*-
# coding: utf-8
# coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("Read HDFS File") \
    .getOrCreate()

# 读取 HDFS 上的文件
#df = spark.read.format("text").option("header", "true").load("hdfs://ns1/user/admin/files/order.txt")
df = spark.read.option("encoding", "UTF-8").text("hdfs://ns1/user/admin/files/order.txt")

# 显示数据
df.show(100, truncate=False)

常用Action算子

countByKey算子

# 功能:统计每个key出现的次数,一般用于KV型RDD
# 语法:countByKey()
# 返回值:一个字典,key为RDD中的key,value为key出现的次数
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("countByKeyRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([(1,2),(3,4),(5,6),(1,7),(3,8),(5,9)])
    result = rdd.countByKey()
    print(result)
    sc.stop()

collect算子

# 功能:将RDD中的数据收集到Driver端,形成一个List对象
# 语法:collect()
# 返回值:一个List对象
# 注意:如果RDD的数据量过大,不建议使用collect(),否则会导致Driver内存溢出
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("collectRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([(1,2),(3,4),(5,6),(1,7),(3,8),(5,9)])
    result = rdd.collect()
    print(result)
    sc.stop()

reduce算子

# 功能:对RDD中的所有元素进行聚合操作
# 语法:reduce(func)
# 返回值:一个聚合后的值
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("reduceRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
    result = rdd.reduce(lambda x,y:x+y)
    print(result)
    sc.stop()

fold算子

# 功能:对RDD中的所有元素进行聚合操作,初始值为zeroValue
# 语法:fold(zeroValue, func)
# 返回值:一个聚合后的值
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("foldRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
    print(rdd.glom().collect())
    result = rdd.reduce(lambda x,y:x+y)
    result2 = rdd.fold(10, lambda x,y:x+y)
    print(result,result2)
    sc.stop()

    # 10+(10+1+2+3)+(10+4+5+6)+(10+7+8+9)
    print(10+(10+1+2+3)+(10+4+5+6)+(10+7+8+9))

first算子

# 功能:返回RDD中的第一个元素
# 语法:first()
# 返回值:一个元素
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("firstRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
    result = rdd.first()
    print(result)
    sc.stop()

take算子

# 功能:返回RDD中的前n个元素
# 语法:take(n)
# 返回值:一个List对象
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("takeRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
    result = rdd.take(3)
    print(result)
    sc.stop()

top算子

# 功能:返回RDD中按照自然顺序排序的前n个元素,降序排列
# 语法:top(n)
# 返回值:一个List对象
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("topRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
    result = rdd.top(3)
    print(result)
    sc.stop()

count算子

# 功能:返回RDD中的元素个数
# 语法:count()
# 返回值:一个整数
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("countRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
    result = rdd.count()
    print(result)
    sc.stop()

takeSample算子

# 功能:从RDD中随机抽取n个元素
# 语法:takeSample(withReplacement, num, [seed])
# 参数:
# withReplacement:是否放回
# num:抽取的元素个数
# seed:随机数种子
# 返回值:一个List对象
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("takeSampleRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
    result = rdd.takeSample(True,3,1)
    print(result)
    sc.stop()

takeOrdered算子

# 功能:对RDD中的元素按照自然顺序排序,并返回前n个元素,升序排列
# 语法:takeOrdered(n)
# 返回值:一个List对象
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("takeOrderedRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
    result = rdd.takeOrdered(3)
    # 转换为负数,实现降序排列
    result2 = rdd.takeOrdered(3,lambda x:-x)
    print(result,result2)
    sc.stop()

foreach算子

# 功能:对RDD中的每个元素执行func操作,注意:foreach不能保证顺序,分区Executor的直接执行,跳过Driver
# 语法:foreach(func)
# 返回值:无
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("foreachRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
    rdd2 = rdd.foreach(lambda x: print(x * 10))
    print(rdd2)  # None
    sc.stop()

# 40
# 20
# 30
# 90
# 70
# 60
# 80
# 10
# None

saveAsTextFile算子

# 功能:将RDD中的元素保存到文件中,注意:分区Executor的直接执行,跳过Driver,写出的时候,每个分区所在的Executor直接控制数据写出到目标文件系统中。
# 语法:saveAsTextFile(path)
# 参数:
# path:文件路径
# 返回值:无
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("saveAsTextFileRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
    # rdd.saveAsTextFile("file:///d:/test")
    rdd.saveAsTextFile("hdfs://ns1/user/admin/files/test3")
    sc.stop()

分区操作算子

mapPartitions算子-Transformation

# 功能:将每个分区中的元素传递给函数,并将返回的元素合并成一个大的RDD
# 语法:mapPartitions(func)
# 参数:
# func:函数,函数的输入是一个迭代器,返回一个迭代器
# 返回值:一个新的RDD
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("mapPartitionsRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)  # map会对每个元素进行操作,而mapPartitions则对每个分区进行操作,效率更高
    def func(iterator):
        yield sum(iterator)
    result = rdd.mapPartitions(func)
    print(result.collect())
    sc.stop()
# [6,15,24]

foreachPartition算子-Action

# 功能:对RDD中的每个分区执行func操作
# 语法:foreachPartition(func)
# 参数:
# func:函数,函数的输入是一个迭代器,无返回值
# 返回值:无
# coding: utf-8
from pyspark import SparkContext,SparkConf
# 不生成字节码
import sys
sys.dont_write_bytecode = True

if __name__ == '__main__':
    conf = SparkConf().setAppName("foreachPartitionRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
    def func(iterator):
        print("=="*20)
        result = []
        # print(sum(iterator))
        for i in iterator:
            result.append(i * 10)
        print(result)
    rdd.foreachPartition(func)
    sc.stop()

# ========================================
# [10, 20, 30]
# ========================================
# [70, 80, 90]
# ========================================
# [40, 50, 60]

partitionBy算子-Transformation

# 功能:根据给定的Partitioner函数重新分区
# 语法:partitionBy(partitioner)
# 参数:
# partitioner:Partitioner函数,默认为HashPartitioner
# 返回值:一个新的RDD
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("partitionByRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5), ('c', 2), ('c', 3)],3)

    # 分区函数
    def partitionFunc(key):
        if key == 'a':
            return 0
        elif key == 'b':
            return 1
        else:
            return 2

    result = rdd.partitionBy(3, partitionFunc)
    print(result.glom().collect())
    sc.stop()

# 注意:分区号不要超标,你设置3个分区,分区号只能是012,设置5个分区分区号只能是01234

repartition算子-Transformation

# 功能:根据给定的分区数重新分区
# 语法:repartition(numPartitions)
# 参数:
# numPartitions:新的分区数
# 返回值:一个新的RDD
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("repartitionRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5), ('c', 2), ('c', 3)],3)
    print(rdd.glom().collect())
    print(rdd.repartition(2).glom().collect())
    print(rdd.repartition(1).glom().collect())
    sc.stop()

# 注意: 对分区的数量进行操作,一定要慎重
# 一般情况下,我们写Spark代码 除了要求全局排序设置为1个分区外
# 多数时候,所有API中关于分区相关的代码我们都不太理会
# 因为,如果你改分区了
# 会影响并行计算(内存迭代的并行管道数量)
# 分区如果增加,极大可能 导致 shuffle

coalesce算子-Transformation

# 功能:根据给定的分区数减少分区
# 语法:rdd.coalesce(参数1,参数2)
# 参数1,分区数
# 参数2,True or False
# True表示允许shuffle,也就是可以加分区
# False表示不允许shuffle,也就是不能加分区,False是默认
# 返回值:一个新的RDD
# coding: utf-8
from pyspark import SparkContext,SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("coalesceRDD").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5), ('c', 2), ('c', 3)],3)
    rdd.cache()
    print(rdd.glom().collect())
    print(rdd.coalesce(2).glom().collect())
    print(rdd.coalesce(1).glom().collect())
    print(rdd.coalesce(5).glom().collect())  # 不写参数2=True,这个API只能减少分区,增加分区操作将什么事情也不做
    rdd.unpersist()
    sc.stop()

# 注意: coalesce算子默认不会进行shuffle操作
# 增加分区需要shuffle=True,默认是False
# print(rdd.coalesce(5,shuffle=True).glom().collect())
# 如果不写参数2=True,这个API只能减少分区,增加分区操作将什么事情也不做