Spark
定义:Apache Spark 是用于大规模数据(large-scala data)处理的统一(unified)分析引擎
Spark 是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的少量数据
Python On Spark
- Spark 对Python语言的支持,重点体现在Python第三方库:PySpark之上
- 应用于大数据开发与人工智能方向
安装:pip install pyspark
构建PySpark执行环境入口对象
想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象
PySpark 的执行环境入口对象是:类SparkContext 的类对象
# 导包
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
# 返回值是同一个对象时可使用链式调用写法
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 普通写法
# conf = SparkConf()
# conf.setMaster("local[*]")
# conf.setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext 对象的运行(停止PySpark程序)
sc.stop()
3.5.0
PySpark 的编程模型
SparkContext 类对象,是PySpark 编程中一切功能的入口
PySpark 的编程,主要分为如下三大步骤:
- 数据输入:通过SparkContext类对象的成员方法,完成数据的读取操作,读取后得到RDD类对象
- 数据处理计算:通过RDD类对象的成员方法,完成各种数据计算的需求
- 数据输出:将处理完成后的RDD对象,调用各种成员方法完成,写出文件、转换为list等操作
RDD 对象
PySpark 支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象
RDD 全称为:弹性分布式数据集(Resilient Distributed Datasets)
PySpark 针对数据的处理,都是以RDD对象作为载体,即:
- 数据存储在RDD内
- 各类数据的计算方法,也都是RDD的成员方法
- RDD的数据计算方法,返回值依旧是RDD对象
Python 数据容器转RDD对象
PySpark 支持通过SparkContext 对象的parallelize 成员方法,将:list,tuple,set,dict,str转换为PySpark的RDD对象
注意:
- 字符串会被拆分出1个个的字符,存入RDD对象
- 字典仅有key会被存入RDD对象
PySpark 也支持通过SparkContext 对象的textFile成员方法,来读取文件,来构建出RDD对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
print(sc.version)
# 通过parallelize方法Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
# 用过textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd6 = sc.textFile("bak/test.txt")
print(rdd6.collect())
sc.stop()
3.5.0
[1, 2, 3]
[1, 2, 3, 4, 5]
['a', 'b', 'c', 'd', 'e', 'f', 'g']
[1, 2, 3, 4, 5]
['key1', 'key2']
['hahaha', 'heiheihei', '666666', '88888888']
PySpark数据计算
PySpark 的数据计算,都是基于RDD对象来进行的,RDD对象内置丰富的:成员方法(算子)
map算子
功能:map算子,是将RDD的数据一条条处理(处理的逻辑 基于map算子中接收的处理函数),返回新的RDD
语法:rdd.map(func)
func : f:(T) -> U
- f: 表示这是一个函数(方法)
- (T) -> U 表示的是方法的定义:() 表示传入参数,(T)表示传入1个参数,()表示没有传入参数
- T 是泛型的代称,在这里表示 任意类型
- U 也是泛型代称,在这里表示 任意类型
- -> U 表示返回值
- (T) -> U 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入参数类型不限,返回一个返回值,返回值类型不限
- (A) -> A 总结:这是一个方法,这个方法接受一个参数传入,传入参数类型不限,返回一个返回值,返回值和传入参数类型一致
链式调用:对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
# java8_location = 'D:/Java/jdk-1.8'
# os.environ['JAVA_HOME'] = java8_location
# os.environ['PYSPARK_PYTHON'] = sys.executable
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# print(sc.version)
# 通过parallelize方法Python对象加载到Spark内,成为RDD对象
rdd = sc.parallelize([1, 2, 3])
# 通过map方法将全部数据都乘以10
# def func(data):
# return data * 10
# rdd2 = rdd.map(func)
rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
print(rdd2.collect())
sc.stop()
flatMap算子
功能:对RDD执行map操作,然后进行解除嵌套操作
# 解除嵌套
# 嵌套的list
list = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
# 如果解除了嵌套
list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 通过parallelize方法Python对象加载到Spark内,成为RDD对象
rdd = sc.parallelize(["a b c", "b c d", "c d e", "d e f"])
# rdd2 = rdd.map(lambda x: x.split(" "))
# [['a', 'b', 'c'], ['b', 'c', 'd'], ['c', 'd', 'e'], ['d', 'e', 'f']]
rdd2 = rdd.flatMap(lambda x: x.split(" "))
#['a', 'b', 'c', 'b', 'c', 'd', 'c', 'd', 'e', 'd', 'e', 'f']
print(rdd2.collect())
sc.stop()
reduceByKey算子
功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作
reduceByKey中的聚合逻辑是:有[1, 2, 3, 4, 5],然后聚合函数是:lambda a, b: a + b
用法:
rdd.reduceByKey(func)
- func: (V, V) -> V
- 接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致
注意:reduceByKey中接收的函数,只负责聚合,不理会分组,分组是自动by key来分组的
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 通过parallelize方法Python对象加载到Spark内,成为RDD对象
rdd = sc.parallelize([("男", 99), ("男", 88), ("女", 90), ("女", 80)])
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
# [('男', 187), ('女', 170)]
sc.stop()
WordCount案例
读取文件,统计文件内,单词的出现次数 D:/git-python/bak/hello.txt
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 读取数据文件
rdd = sc.textFile("D:/git-python/bak/hello.txt")
# 取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
print(word_rdd.collect())
# [('itcast', 4), ('python', 6), ('itheima', 7), ('spark', 4), ('pyspark', 3)]
sc.stop()
Filter
功能:过滤想要的数据进行保留
# func: (T) -> bool 传入1个参数进来随意类型,返回值必须是True or False
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
# 对RDD的数据进行过滤
rdd2 = rdd.filter(lambda num: num % 2 == 0)
print(rdd2.collect())
# [2, 4, 6]
sc.stop()
distinct 算子
功能:对RDD数据进行去重,返回新RDD
语法:
rdd.distinct() # 无需传参
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5 ,3, 4, 5, 6, 2, 7, 8, 8, 9, 7])
# 对RDD进行去重
rdd2 = rdd.distinct()
print(rdd2.collect())
# [1, 2, 3, 4, 5, 6, 7, 8, 9]
sc.stop()
sortBy 算子
功能:对RDD数据进行排序,基于你指定的排序依据
语法:
- rdd.sortBy(func, ascending=False, numPartitions=1)
- func: (T) -> U: 告知按照rdd中的哪个数据进行排序,比如lambda x: x[1] 表示按照rdd中的第二列元素进行排序
- ascending True升序 False 降序
- numPartitions: 用多少分区排序
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 读取数据文件
rdd = sc.textFile("D:/git-python/bak/hello.txt")
# 取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
print(word_rdd.collect())
# [('itcast', 4), ('python', 6), ('itheima', 7), ('spark', 4), ('pyspark', 3)]
# 对结果进行排序
final_rdd = word_rdd.sortBy(lambda x: x[1] ,ascending=False, numPartitions=1)
print(final_rdd.collect())
# [('itheima', 7), ('python', 6), ('itcast', 4), ('spark', 4), ('pyspark', 3)]
sc.stop()
综合案例
D:/git-python/bak/orders.txt
# 需求1:城市销售额排名
# 读取文件得到RDD
from pyspark import SparkConf, SparkContext
import os, json
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
file_rdd = sc.textFile("D:/git-python/bak/orders.txt")
# 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# print(json_str_rdd.collect())
# 将JSON字符串转换为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# print(dict_rdd.collect())
# 取出城市和销售额数据
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
# 按照销售额聚合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print(f"需求1的计算结果是{result1_rdd.collect()}")
# 需求2:全部城市有哪些商品类别在售卖
# 取出全部的商品类别
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print(f"需求2的结果:{category_rdd.collect()}")
# 需求3:北京市有哪些商品在进行售卖
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
# 取出北京的全部商品类别
result3_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
print(f"需求3的结果:{result3_rdd.collect()}")
# 需求1的计算结果是[('北京', 91556), ('杭州', 28831), ('天津', 12260), ('上海', 1513), ('郑州', 1120)]
# 需求2的结果:['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']
# 需求3的结果:['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']
sc.stop()
collect 算子
功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
用法:rdd.collect() 返回值是一个list
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 通过parallelize方法Python对象加载到Spark内,成为RDD对象
rdd = sc.parallelize((1, 2, 3, 4, 5))
rdd_list = rdd.collect()
print(type(rdd_list))
print(rdd_list)
# [1, 2, 3, 4, 5]
sc.stop()
reduce 算子
功能:对RDD数据集按照你传入的逻辑进行聚合
语法:rdd.reduce(func)
- func: (T, T) -> T
- 2参数传入,1个返回值,返回值和参数要求类型一致
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1, 10))
# 将rdd的数据进行累加求和,返回值等同于计算函数的返回值
print(rdd.reduce(lambda x, y: x + y))
# 45
sc.stop()
take 算子
功能:取RDD的前N个元素,组合成list返回给你
用法:
- sc.parallelize([3, 2, 1, 4, 5, 6]).take(5) # [3, 2, 1, 4, 5]
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1, 10))
# 将rdd的数据进行累加求和,返回值等同于计算函数的返回值
print(rdd.take(3))
# [1, 2, 3]
sc.stop()
count 算子
功能:计算RDD有多少条数据,返回值是一个数字
用法:
- sc.parallelize([3, 2, 1, 4, 5, 6]).take(5) # 6
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1, 10))
# 将rdd的数据进行累加求和,返回值等同于计算函数的返回值
print(rdd.count())
# 9
sc.stop()
saveAsTextFile 算子
功能:将RDD的数据写入文本文件中,支持本地写出,hdfs等文件系统
用法:
- rdd = sc.parallelize([1, 2, 3, 4, 5])
- rdd.saveAsTextFile("D:/git-python/bak/saveastextfile.txt")
依赖:
- https://archive.apache.org/dist/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz 解压至电脑任意位置
- https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dl 放入c:/windows/system32文件夹内
- https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe 放入hadoop解压目录内的bin目录里面
- Python代码中使用os模块配置:os.environ['HADOOP_HOME']='HADOOP解压文件夹路径'
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
os.environ['HADOOP_HOME']='D:/hadoop/hadoop-3.3.6'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1, 10))
rdd.saveAsTextFile("D:/git-python/bak/saveastextfile")
# 默认输出了16份数据,因为CPU默认16逻辑核心
sc.stop()
修改rdd 分区为1个
方式1:SparkConf 对象设置属性全局并行度为1:
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- conf.set("spark.default.parallelism", "1")
- sc = SparkContext(conf=conf)
方式2:创建RDD的时候设置(parallelize方法传入numSlices参数为1)
- rdd = sc.parallelize(range(1, 10), numSlices=1)
- rdd = sc.parallelize(range(1, 10), 1)
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
os.environ['HADOOP_HOME']='D:/hadoop/hadoop-3.3.6'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1, 10))
# 或 创建的时候使用参数numSlices可省略 rdd = sc.parallelize(range(1, 10), 1)
rdd.saveAsTextFile("D:/git-python/bak/saveastextfile")
# 设置了全局并行度为1,只输出一个文件
sc.stop()
综合案例
D:/git-python/bak/search_log.txt
# 读取文件得到RDD
from pyspark import SparkConf, SparkContext
import os, json
os.environ['PYSPARK_PYTHON'] = "D:/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
file_rdd = sc.textFile("D:/git-python/bak/search_log.txt")
# 需求1:热闹搜索时间段top3(小时精度)
result1 = file_rdd.map(lambda x: x.split("\t")).\
map(lambda x: x[0][:2]).\
map(lambda x: (x, 1)).\
reduceByKey(lambda a, b: a + b).\
sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
take(3)
print(result1)
# [('20', 3479), ('23', 3087), ('21', 2989)]
# 需求2:热闹搜索词top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\
reduceByKey(lambda a, b: a + b).\
sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
take(3)
print(result2)
# [('scala', 2310), ('hadoop', 2268), ('博学谷', 2002)]
# 需求3:统计黑马程序员关键字在什么时段被搜索最多
result3 = file_rdd.map(lambda x: x.split("\t")).\
filter(lambda x: x[2] == '黑马程序员').\
map(lambda x: (x[0][:2], 1)).\
reduceByKey(lambda a, b: a + b).\
sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
take(3)
print(result3)
# [('22', 245), ('20', 196), ('21', 147)]
# 需求4:将数据转换为JSON格式,写到文件中
result3 = file_rdd.map(lambda x: x.split("\t")).\
map(lambda x: {'time': x[0], 'user_id': x[1], 'key_word': x[2], 'rank1': x[3], 'rank2': x[4], 'url': x[5]}).\
saveAsTextFile("D:/git-python/bak/search_log")