SparkCore RDD算子
算子:分布式集合对象上的API称之为算子
算子分类:
- Transformation(转换):将RDD从一个类型转换成另一个类型,返回一个新的RDD
- Action(动作):对RDD进行计算,返回一个值,或者将数据写入外部存储系统
Transformation算子
- map:对RDD中的每个元素应用函数,返回一个新的RDD
- filter:过滤RDD中的元素,返回一个新的RDD
- flatMap:将RDD中的每个元素应用函数,返回一个新的RDD,然后进行解除嵌套操作,将结果展平
- mapPartitions:对RDD中的每个分区应用函数,返回一个新的RDD
- mapPartitionsWithIndex:对RDD中的每个分区应用函数,并传递分区索引,返回一个新的RDD
- sample:对RDD进行采样,返回一个新的RDD
- union:合并两个RDD,返回一个新的RDD
- intersection:求两个RDD的交集,返回一个新的RDD
- distinct:去重,返回一个新的RDD
- groupByKey:对RDD中的元素按照key进行分组,返回一个新的RDD
- reduceByKey:对RDD中的元素按照key进行聚合,返回一个新的RDD
- sortByKey:对RDD中的元素按照key进行排序,返回一个新的RDD
- join:对两个RDD进行内连接,返回一个新的RDD
- cogroup:对两个RDD进行分组连接,返回一个新的RDD
- cartesian:对两个RDD进行笛卡尔积,返回一个新的RDD
- coalesce:减少RDD的分区数量,返回一个新的RDD
- repartition:重新分区RDD,返回一个新的RDD
- partitionBy:按照给定的分区函数重新分区RDD,返回一个新的RDD
- cache:将RDD缓存到内存中
- persist:将RDD持久化到磁盘或内存中
- checkpoint:将RDD持久化到磁盘,并清除之前的缓存
- count:计算RDD中的元素数量
- first:返回RDD中的第一个元素
- take:返回RDD中的前n个元素
- reduce:对RDD中的元素进行聚合操作
Action算子
- reduce:对RDD中的元素进行聚合操作
- collect:将RDD中的所有元素收集到Driver端,返回一个列表
- count:计算RDD中的元素数量
- take:返回RDD中的前n个元素
- saveAsTextFile:将RDD中的元素保存为文本文件
- saveAsSequenceFile:将RDD中的元素保存为SequenceFile
- saveAsObjectFile:将RDD中的元素保存为对象文件
- countByKey:对RDD中的元素按照key进行计数,返回一个字典
- foreach:对RDD中的每个元素应用函数
- takeOrdered:返回RDD中的前n个元素,按照自然顺序排序
- top:返回RDD中的前n个元素,按照自然顺序排序
- reduceByKeyLocally:对RDD中的元素按照key进行聚合,返回一个字典
- lookup:返回RDD中指定key的所有元素
- fold:对RDD中的元素进行折叠操作
常用Transformation算子
map算子
# 语法:rdd.map(func)
# coding: utf-8
# 导入Spark相关包
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("MapRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 读取文件创建RDD
rdd = sc.textFile("file:///D:/git-python/大数据-Spark/py/*.py")
# 使用map算子将RDD中的每一行转换为大写
rdd = rdd.map(lambda x: x.upper())
print(rdd.collect()[0:1])
sc.stop()
flatMap算子
# 语法:rdd.flatMap(func)
# 功能:对rdd执行map操作,然后进行解嵌套操作
# coding: utf-8
# 导入Spark相关包
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
sc.stop()
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("flatMapRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(["hello world", "hello spark", "hello python"])
# 使用map与flatMap算子将RDD中的元素进行分隔
rdd1 = rdd.map(lambda x: x.split(" "))
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print('map:',rdd1.collect())
print('flapMap:',rdd2.collect())
sc.stop()
# [['hello', 'world'], ['hello', 'spark'], ['hello', 'python']] # map
# ['hello', 'world', 'hello', 'spark', 'hello', 'python'] # flatMap 解除了嵌套
reduceByKey算子
# 语法:rdd.reduceByKey(func)
# 功能:针对KV型RDD中的元素按照key进行分组,然后对每个分组中的元素进行聚合操作
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("reduceByKeyRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1), ('b', 1), ('a', 1)])
# 使用reduceByKey算子对RDD中的元素按照key进行分组,然后对每个分组中的元素进行求和操作
rdd = rdd.reduceByKey(lambda x, y: x + y)
print(rdd.collect())
sc.stop()
## groupByKey和reduceByKey的区别
# groupByKey仅仅有分组功能而已
# reduceByKey除了有 ByKey 的分组功能外,还有reduce 聚合功能,是一个分组+聚合一体化的算子.
mapValues算子
# 语法:rdd.mapValues(func)
# 功能:针对KV型RDD中的value进行map操作
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("mapValuesRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5)])
# 使用mapValues算子对RDD中的value进行map操作
rdd = rdd.mapValues(lambda x: x * 2)
print(rdd.collect())
sc.stop()
groupBy算子
# 语法:rdd.groupBy(func)
# 功能:针对RDD中的元素按照指定的函数进行分组
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("groupByRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 使用groupBy算子对RDD中的元素进行偶数和奇数分组
rdd2 = rdd.groupBy(lambda x: 'even' if (x % 2 == 0) else 'odd')
print(rdd2.collect())
# [('odd', <pyspark.resultiterable.ResultIterable object at 0x00000265B0A23DC0>), ('even', <pyspark.resultiterable.ResultIterable object at 0x00000265B08BF010>)]
# 将rdd2的元素的value转换成list
print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())
# [('odd', [1, 3, 5, 7, 9]), ('even', [2, 4, 6, 8, 10])]
# 关闭SparkContext
sc.stop()
Filter算子
# 语法:rdd.filter(func)
# 功能:针对RDD中的元素进行过滤,返回一个新的RDD
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("filterRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 使用filter算子对RDD中的元素进行过滤,返回一个新的RDD
rdd2 = rdd.filter(lambda x: x % 2 == 0)
print(rdd2.collect())
sc.stop()
distinct算子
# 语法:rdd.distinct()
# 功能:对RDD中的元素进行去重操作
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("distinctRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10,1,2,3,4,5,6,7,8,9,10])
# 使用distinct算子对RDD中的元素进行去重操作
rdd2 = rdd.distinct()
print(rdd2.collect())
sc.stop()
union算子
# 语法:rdd.union(otherRDD)
# 功能:对两个RDD进行合并操作,返回一个新的RDD
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("unionRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd2 = sc.parallelize([11,12,13,14,15,16,17,18,19,20])
# 使用union算子对两个RDD进行合并操作
rdd3 = rdd.union(rdd2)
print(rdd3.collect())
sc.stop()
join算子
# 语法:rdd1.join(rdd2),rdd1.leftOuterJoin(rdd2),rdd1.rightOuterJoin(rdd2)
# 功能:对两个RDD进行join操作(可实现sql的内/外连接)
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("joinRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([(1,2),(3,4),(5,6)])
rdd2 = sc.parallelize([(1,7),(3,8),(5,9)])
# 使用join算子对两个RDD进行join操作
# 对于join算子,如果两个RDD中存在相同的key,则将对应的value进行合并
rdd3 = rdd.join(rdd2)
print(rdd3.collect())
sc.stop()
intersection算子
# 语法:rdd.intersection(otherRDD)
# 功能:对两个RDD进行交集操作,返回一个新的RDD
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("intersectionRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd2 = sc.parallelize([5,6,7,8,9])
# 使用intersection算子对两个RDD进行交集操作
rdd3 = rdd.intersection(rdd2)
print(rdd3.collect())
sc.stop()
glom算子
# 语法:rdd.glom()
# 功能:将RDD中的每个分区合并为一个数组
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("glomRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],5)
# 使用glom算子将RDD中的每个分区合并为一个数组
rdd2 = rdd.glom()
print(rdd2.collect())
sc.stop()
groupByKey算子
# 语法:rdd.groupByKey()
# 功能:将RDD中相同key的元素进行分组,返回一个新的RDD
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("groupByKeyRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([(1,2),(3,4),(5,6),(1,7),(3,8),(5,9)])
# 使用groupByKey算子将RDD中相同key的元素进行分组
rdd2 = rdd.groupByKey()
# print(rdd2.collect())
print(rdd2.mapValues(list).collect()) # 将value转换为列表
sc.stop()
sortBy算子
# 语法:rdd.sortBy(lambda x:x[0],ascending=True, numPartitions=None)
# numPartitions:指定分区数,默认为None,表示不改变分区数,设置后多excutor时,排序结果可能不同
# 功能:对RDD中的元素进行排序,返回一个新的RDD
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("sortByRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([(1,2),(3,4),(5,6),(1,7),(3,8),(5,9)])
# 使用sortBy算子对RDD中的元素进行排序
rdd2 = rdd.sortBy(lambda x:x[0],ascending=True)
print(rdd2.collect())
sc.stop()
sortByKey算子
# 语法:rdd.sortByKey()
# 功能:针对kv型RDD中的元素按时key进行排序,返回一个新的RDD
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("sortByKeyRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a',1),('b',2),('D',3),('c',4),('R',6),('g',7),('d',8),('K',9)])
# 使用sortByKey算子对RDD中的元素进行排序,默认为升序,keyfunc=lambda key:str(key).lower()将key转换为小写字母
rdd2 = rdd.sortByKey(ascending=True,keyfunc=lambda key:str(key).lower())
print(rdd2.collect())
sc.stop()
案例
# 读取files目录的文件order.txt,提取北京的数据,组合北京与商品类别并进行出重
from pyspark import SparkContext,SparkConf
import json
if __name__ == '__main__':
# 初始化执行环境,构建SparkContext对象
conf = SparkConf().setAppName("orderRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
# file_rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/order.txt")
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
# 进行rdd的转换操作,以|为分隔符,得到一个个的json数据
json_rdd = file_rdd.flatMap(lambda line: line.split("|"))
# 将json数据转换为字典
dict_rdd = json_rdd.map(lambda line: json.loads(line))
# 过滤数据,提取北京的数据
beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')
# 组合北京与商品类别并进行去重
result_rdd = beijing_rdd.map(lambda d: d['areaName'] + '_' + d['category']).distinct()
print(result_rdd.collect())
sc.stop()
将案例提交到集群上运行
# 终端软件直接运行方式
# 配置好环境变量
import os
# 配置HADOOP配置文件目录
os.environ["HADOOP_CONF_DIR"] = "D:/hadoop/hadoop-3.3.6/etc/hadoop"
# 集群环境读取hdfs上的数据
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
# 如果在代码中运行,如果提交集群环境运行的文件有依赖其他的代码文件
# 使用spark.submit.pyfiles参数设置上传的依赖文件,也可以是多少个依赖文件的zip压缩包
conf.set("spark.submit.pyfiles", "order2.py")
# 通过spark-submit提交到集群上运行
# coding: utf-8
conf = SparkConf().setAppName("orderRDD").setMaster("yarn")
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt",use_unicode=False) # use_unicode 默认为True,读取文件时,会将文件内容转换为unicode编码,如果文件内容为utf-8编码,则会出现乱码,所以需要设置为False
# spark-submit --master yarn --deploy-mode cluster order.py --num-executors 3
Python环境中文字符串报错
# python2
# coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
# python3
import sys
import codecs
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
python2.x 可用utf-8编码,但是pyspark是用unicode编码的,所以涉及数据交互的时候必须用进行编码和解码;
python2.x 的中文是utf-8编码,需要 x.decode("utf-8") 转换为 unicode;
pyspark 的中文是unicode编码,转换的话可用 x.encode("utf-8")编码成utf-8;
# -*- coding: utf-8 -*-
# coding: utf-8
# coding=utf-8
# import sys
# reload(sys)
# sys.setdefaultencoding('utf-8')
from pyspark import SparkContext,SparkConf
import json
if __name__ == '__main__':
conf = SparkConf().setAppName("orderRDD").setMaster("yarn")
conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
conf.set('spark.python.unicode.decode.charset', 'utf-8')
sc = SparkContext(conf=conf)
#file_rdd = sc.textFile("hdfs://ns1/user/admin/files/order.txt")
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/test.txt",use_unicode=False)
print(file_rdd.collect())
json_rdd = file_rdd.flatMap(lambda line: str(line).split("|"))
dict_rdd = json_rdd.map(lambda line: json.loads(line,encoding='utf-8'))
print(dict_rdd.collect())
### 中文匹配筛选时,需要将中文转换为unicode编码
foo_utf = '北京'
foo_unicode = foo_utf.decode("utf-8")
print("-----------------",foo_unicode)
#beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')
beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == foo_unicode)
#beijing_rdd = dict_rdd.filter(lambda d: d['id'] == 10)
result_rdd = beijing_rdd.map(lambda d: d['areaName'] + '_' + d['category']).distinct()
result_rdd.saveAsTextFile("hdfs://ns1/user/admin/files/test2")
print(result_rdd.collect())
sc.stop()
spark-submit --conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
--conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
--master yarn --deploy-mode client order.py
# --conf spark.pyspark.python=/bin/python3 \
# --conf spark.pyspark.driver.python=/bin/python3 \
spark-submit --conf spark.driver.extraJavaOptions=" -Dfile.encoding=utf-8 " \
--conf spark.executor.extraJavaOptions=" -Dfile.encoding=utf-8 " \
--master yarn --deploy-mode client order.py
# hdfs dfs -cat /user/admin/files/test2/* 北京_家电
# 北京_手机
# 北京_家具
# 北京_书籍
# 北京_服饰
# 北京_电脑
# 北京_食品
# 北京_平板电脑
SparkSession
# -*- coding: utf-8 -*-
# coding: utf-8
# coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder \
.appName("Read HDFS File") \
.getOrCreate()
# 读取 HDFS 上的文件
#df = spark.read.format("text").option("header", "true").load("hdfs://ns1/user/admin/files/order.txt")
df = spark.read.option("encoding", "UTF-8").text("hdfs://ns1/user/admin/files/order.txt")
# 显示数据
df.show(100, truncate=False)
常用Action算子
countByKey算子
# 功能:统计每个key出现的次数,一般用于KV型RDD
# 语法:countByKey()
# 返回值:一个字典,key为RDD中的key,value为key出现的次数
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("countByKeyRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([(1,2),(3,4),(5,6),(1,7),(3,8),(5,9)])
result = rdd.countByKey()
print(result)
sc.stop()
collect算子
# 功能:将RDD中的数据收集到Driver端,形成一个List对象
# 语法:collect()
# 返回值:一个List对象
# 注意:如果RDD的数据量过大,不建议使用collect(),否则会导致Driver内存溢出
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("collectRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([(1,2),(3,4),(5,6),(1,7),(3,8),(5,9)])
result = rdd.collect()
print(result)
sc.stop()
reduce算子
# 功能:对RDD中的所有元素进行聚合操作
# 语法:reduce(func)
# 返回值:一个聚合后的值
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("reduceRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
result = rdd.reduce(lambda x,y:x+y)
print(result)
sc.stop()
fold算子
# 功能:对RDD中的所有元素进行聚合操作,初始值为zeroValue
# 语法:fold(zeroValue, func)
# 返回值:一个聚合后的值
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("foldRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
print(rdd.glom().collect())
result = rdd.reduce(lambda x,y:x+y)
result2 = rdd.fold(10, lambda x,y:x+y)
print(result,result2)
sc.stop()
# 10+(10+1+2+3)+(10+4+5+6)+(10+7+8+9)
print(10+(10+1+2+3)+(10+4+5+6)+(10+7+8+9))
first算子
# 功能:返回RDD中的第一个元素
# 语法:first()
# 返回值:一个元素
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("firstRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
result = rdd.first()
print(result)
sc.stop()
take算子
# 功能:返回RDD中的前n个元素
# 语法:take(n)
# 返回值:一个List对象
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("takeRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
result = rdd.take(3)
print(result)
sc.stop()
top算子
# 功能:返回RDD中按照自然顺序排序的前n个元素,降序排列
# 语法:top(n)
# 返回值:一个List对象
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("topRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
result = rdd.top(3)
print(result)
sc.stop()
count算子
# 功能:返回RDD中的元素个数
# 语法:count()
# 返回值:一个整数
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("countRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
result = rdd.count()
print(result)
sc.stop()
takeSample算子
# 功能:从RDD中随机抽取n个元素
# 语法:takeSample(withReplacement, num, [seed])
# 参数:
# withReplacement:是否放回
# num:抽取的元素个数
# seed:随机数种子
# 返回值:一个List对象
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("takeSampleRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
result = rdd.takeSample(True,3,1)
print(result)
sc.stop()
takeOrdered算子
# 功能:对RDD中的元素按照自然顺序排序,并返回前n个元素,升序排列
# 语法:takeOrdered(n)
# 返回值:一个List对象
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("takeOrderedRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
result = rdd.takeOrdered(3)
# 转换为负数,实现降序排列
result2 = rdd.takeOrdered(3,lambda x:-x)
print(result,result2)
sc.stop()
foreach算子
# 功能:对RDD中的每个元素执行func操作,注意:foreach不能保证顺序,分区Executor的直接执行,跳过Driver
# 语法:foreach(func)
# 返回值:无
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("foreachRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
rdd2 = rdd.foreach(lambda x: print(x * 10))
print(rdd2) # None
sc.stop()
# 40
# 20
# 30
# 90
# 70
# 60
# 80
# 10
# None
saveAsTextFile算子
# 功能:将RDD中的元素保存到文件中,注意:分区Executor的直接执行,跳过Driver,写出的时候,每个分区所在的Executor直接控制数据写出到目标文件系统中。
# 语法:saveAsTextFile(path)
# 参数:
# path:文件路径
# 返回值:无
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("saveAsTextFileRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
# rdd.saveAsTextFile("file:///d:/test")
rdd.saveAsTextFile("hdfs://ns1/user/admin/files/test3")
sc.stop()
分区操作算子
mapPartitions算子-Transformation
# 功能:将每个分区中的元素传递给函数,并将返回的元素合并成一个大的RDD
# 语法:mapPartitions(func)
# 参数:
# func:函数,函数的输入是一个迭代器,返回一个迭代器
# 返回值:一个新的RDD
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("mapPartitionsRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3) # map会对每个元素进行操作,而mapPartitions则对每个分区进行操作,效率更高
def func(iterator):
yield sum(iterator)
result = rdd.mapPartitions(func)
print(result.collect())
sc.stop()
# [6,15,24]
foreachPartition算子-Action
# 功能:对RDD中的每个分区执行func操作
# 语法:foreachPartition(func)
# 参数:
# func:函数,函数的输入是一个迭代器,无返回值
# 返回值:无
# coding: utf-8
from pyspark import SparkContext,SparkConf
# 不生成字节码
import sys
sys.dont_write_bytecode = True
if __name__ == '__main__':
conf = SparkConf().setAppName("foreachPartitionRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
def func(iterator):
print("=="*20)
result = []
# print(sum(iterator))
for i in iterator:
result.append(i * 10)
print(result)
rdd.foreachPartition(func)
sc.stop()
# ========================================
# [10, 20, 30]
# ========================================
# [70, 80, 90]
# ========================================
# [40, 50, 60]
partitionBy算子-Transformation
# 功能:根据给定的Partitioner函数重新分区
# 语法:partitionBy(partitioner)
# 参数:
# partitioner:Partitioner函数,默认为HashPartitioner
# 返回值:一个新的RDD
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("partitionByRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5), ('c', 2), ('c', 3)],3)
# 分区函数
def partitionFunc(key):
if key == 'a':
return 0
elif key == 'b':
return 1
else:
return 2
result = rdd.partitionBy(3, partitionFunc)
print(result.glom().collect())
sc.stop()
# 注意:分区号不要超标,你设置3个分区,分区号只能是012,设置5个分区分区号只能是01234
repartition算子-Transformation
# 功能:根据给定的分区数重新分区
# 语法:repartition(numPartitions)
# 参数:
# numPartitions:新的分区数
# 返回值:一个新的RDD
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("repartitionRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5), ('c', 2), ('c', 3)],3)
print(rdd.glom().collect())
print(rdd.repartition(2).glom().collect())
print(rdd.repartition(1).glom().collect())
sc.stop()
# 注意: 对分区的数量进行操作,一定要慎重
# 一般情况下,我们写Spark代码 除了要求全局排序设置为1个分区外
# 多数时候,所有API中关于分区相关的代码我们都不太理会
# 因为,如果你改分区了
# 会影响并行计算(内存迭代的并行管道数量)
# 分区如果增加,极大可能 导致 shuffle
coalesce算子-Transformation
# 功能:根据给定的分区数减少分区
# 语法:rdd.coalesce(参数1,参数2)
# 参数1,分区数
# 参数2,True or False
# True表示允许shuffle,也就是可以加分区
# False表示不允许shuffle,也就是不能加分区,False是默认
# 返回值:一个新的RDD
# coding: utf-8
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("coalesceRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5), ('c', 2), ('c', 3)],3)
rdd.cache()
print(rdd.glom().collect())
print(rdd.coalesce(2).glom().collect())
print(rdd.coalesce(1).glom().collect())
print(rdd.coalesce(5).glom().collect()) # 不写参数2=True,这个API只能减少分区,增加分区操作将什么事情也不做
rdd.unpersist()
sc.stop()
# 注意: coalesce算子默认不会进行shuffle操作
# 增加分区需要shuffle=True,默认是False
# print(rdd.coalesce(5,shuffle=True).glom().collect())
# 如果不写参数2=True,这个API只能减少分区,增加分区操作将什么事情也不做