SparkSQL DataFrame 操作
DataFrame支持两种风格进行编程,分别是:
- DSL风格
- DSL称之为:领域特定语言。其实就是指DataFrame的特有API
- DSL风格意思就是以调用API的方式来处理Data
- df.where().limit()
- SQL风格
- SQL语法风格就是使用SQL语句处理DataFrame的数据
- spark.sql("SELECT * FROM xxx")
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
#### 读取text数据源
# 使用format("text")读取文本数据,读取到的DataFrame只会有一个列,列名默认称之为:value
df = spark.read.format("csv")\
.option("sep", ";")\
.option("header", True)\
.option("encoding", "utf-8")\
.schema("name STRING, age INT, job STRING")\
.load("file:///D:/git-python/大数据-Spark/files/people.csv")
print("========== DSL - show 方法 ==========")
df.show(3,False)
#### DSL - show 方法
# 功能:展示DataFrame中的数据, 默认展示20条
# df.show(参数1, 参数2)
# - 参数1: 默认是20, 控制展示多少条
# - 参数2: 是否阶段列, 默认只输出20个字符的长度, 过长不显示, 要全部显示的话请填入truncate = False
print("========== DSL - printSchema方法 ==========")
#### DSL - printSchema方法
# 功能:打印输出df的schema信息
df.printSchema()
print("========== DSL - select ==========")
#### DSL - select
# 功能:选择DataFrame中的指定列(通过传入参数进行指定)
# 可传递:
# • 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串
# 列名来指定列
# • List[Column]对象或者List[str]对象, 用来选择多个列
df.select("name", "age").show(1, False)
df.select(df.name, df.age).show(1, False)
df.select(["name", "age"]).show(1, False)
# Column对象获取
name_column = df["name"]
age_column = df["age"]
df.select(name_column, age_column).show(1, False)
print("========== DSL - filter和where ==========")
#### DSL - filter和where
# 功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame
df.filter("age > 20").show(1, False)
df.filter(df.age > 20).show(1, False)
df.where("age > 20").show(1, False)
df.where(df.age > 20).show(1, False)
print("========== DSL - groupBy 分组 ==========")
#### DSL - groupBy 分组
# 功能:按照指定的列进行数据的分组,返回值是GroupedData对象,GroupedData对象有很多API,min、max、avg、sum、等等许多方法都存在
df.groupBy("age").count().show()
df.groupBy(df.job).count().orderBy("count", ascending=False).show()
#### SQL风格语法-注册DataFrame成为表
df.createTempView("score") #注册一个临时视图(表)
df.createOrReplaceTempView("score")#注册一个临时表,如果存在进行替换
# 注册一个全局表
df.createGlobalTempView("score")
# 全局表可以跨SparkSession使用,查询前需要使用:global_temp.表名
# 临时表只能在当前SparkSession中使用
print("========== SQL风格语法-使用SQL查询 ==========")
#### SQL风格语法-使用SQL查询
spark.sql("select * from global_temp.score").show(3, False)
spark.sql("select * from score").show(3, False)
#### pyspark.sql.functions 包
# PySpark提供了一个包: pyspark.sql.functions,这个包里面提供了一系列的计算函数供SparkSQL使用
from pyspark.sql import functions as F
# 使用F调用函数,常用函数如下:
# - split():将字符串分割成多个部分
# - explode():将数据转列
# - col():获取DataFrame中的指定列
# - lit():创建一个Column对象
# - expr():执行一个表达式
# - broadcast():广播一个DataFrame
# - collect_list():收集一列中的所有值到一个列表中
# - collect_set():收集一列中的所有值到一个集合中
# - concat():连接多个字符串
# - count():计算一列中的值的数量
# - first():获取一列中的第一个值
# - last():获取一列中的最后一个值
# - max():获取一列中的最大值
# - min():获取一列中的最小值
# - mean():计算一列中的平均值
# - sum():计算一列中的总和
# - stddev():计算一列中的标准差
# - variance():计算一列中的方差
# - to_date():将字符串转换为日期
# - to_timestamp():将字符串转换为时间戳
# - date_format():将日期格式化为字符串
# - date_add():向日期添加天数
# - date_sub():从日期减去天数
# - datediff():计算两个日期之间的天数
# - date_trunc():截断日期到指定的精度
# - unix_timestamp():将日期转换为Unix时间戳
# - from_unixtime():将Unix时间戳转换为日期
# - year():获取日期中的年份
# - month():获取日期中的月份
# - day():获取日期中的天数
# - hour():获取时间中的小时
# - minute():获取时间中的分钟
# - second():获取时间中的秒
# - window():创建一个窗口对象,用于窗口函数
# - lag():获取前一个值
# 等等...
spark.stop()
词频统计案例
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
# 1: SQL 风格进行处理
rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/words.txt").\
flatMap(lambda x: x.split(" ")).\
map(lambda x: [x])
print(rdd.collect())
df = rdd.toDF(["word"])
# 注册DF为表格
df.createTempView("words")
spark.sql("SELECT word, COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show()
# 2: DSL 风格处理
df = spark.read.format("text").load("file:///D:/git-python/大数据-Spark/files/words.txt")
# withColumn方法
# 方法功能: 对已存在的列进行操作, 返回一个新的列, 如果名字和老列相同, 那么替换, 否则作为新列存在
df2 = df.withColumn("value", F.explode(F.split(df['value'], " ")))
df2.groupBy("value").\
count().\
withColumnRenamed("value", "word").\
withColumnRenamed("count", "cnt").\
orderBy("cnt", ascending=False).\
show()
spark.stop()
电影评分数据分析案例
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
"""
spark.sql.shuffle.partitions 参数指的是, 在sql计算中, shuffle算子阶段默认的分区数是200个.
对于集群模式来说, 200个默认也算比较合适
如果在local下运行, 200个很多, 在调度上会带来额外的损耗
所以在local下建议修改比较低 比如2\4\10均可
这个参数和Spark RDD中设置并行度的参数 是相互独立的.
"""
# 1. 读取数据集
schema = StructType().add("user_id", StringType(), nullable=True).\
add("movie_id", IntegerType(), nullable=True).\
add("rank", IntegerType(), nullable=True).\
add("ts", StringType(), nullable=True)
df = spark.read.format("csv").\
option("sep", "\t").\
option("header", False).\
option("encoding", "utf-8").\
schema(schema=schema).\
load("file:///D:/git-python/大数据-Spark/files/u.data")
# 1: 用户平均分
df.groupBy("user_id").\
avg("rank").\
withColumnRenamed("avg(rank)", "avg_rank").\
withColumn("avg_rank", F.round("avg_rank", 2)).\
orderBy("avg_rank", ascending=False).\
show()
# 2: 电影的平均分查询
df.createTempView("movie")
spark.sql("""
SELECT movie_id, ROUND(AVG(rank), 2) AS avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC
""").show()
# 3: 查询大于平均分的电影的数量 # Row
print("大于平均分电影的数量: ", df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())
# 4: 查询高分电影中(>3)打分次数最多的用户, 此人打分的平均分
# 先找出这个人
user_id = df.where("rank > 3").\
groupBy("user_id").\
count().\
withColumnRenamed("count", "cnt").\
orderBy("cnt", ascending=False).\
limit(1).\
first()['user_id']
# 计算这个人的打分平均分
df.filter(df['user_id'] == user_id).\
select(F.round(F.avg("rank"), 2)).show()
# 5: 查询每个用户的平局打分, 最低打分, 最高打分
df.groupBy("user_id").\
agg(
F.round(F.avg("rank"), 2).alias("avg_rank"),
F.min("rank").alias("min_rank"),
F.max("rank").alias("max_rank")
).show()
# 6: 查询评分超过100次的电影, 的平均分 排名 TOP10
df.groupBy("movie_id").\
agg(
F.count("movie_id").alias("cnt"),
F.round(F.avg("rank"), 2).alias("avg_rank")
).where("cnt > 100").\
orderBy("avg_rank", ascending=False).\
limit(10).\
show()
spark.stop()
# 1. agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合
# 2. alias: 它是Column对象的API, 可以针对一个列 进行改名
# 3. withColumnRenamed: 它是DataFrame的API, 可以对DF中的列进行改名, 一次改一个列, 改多个列 可以链式调用
# 4. orderBy: DataFrame的API, 进行排序, 参数1是被排序的列, 参数2是 升序(True) 或 降序 False
# 5. first: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象.
# # Row对象 就是一个数组, 你可以通过row['列名'] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等)
SparkSQL 数据清洗API
# coding:utf8
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
"""读取数据"""
df = spark.read.format("csv").\
option("sep", ";").\
option("header", True).\
load("file:///D:/git-python/大数据-Spark/files/people.csv")
# # 数据清洗: 数据去重
# # dropDuplicates 是DataFrame的API, 可以完成数据去重
# # 无参数使用, 对全部的列 联合起来进行比较, 去除重复值, 只保留一条
df.dropDuplicates().show()
#
# df.dropDuplicates(['age', 'job']).show()
#
#
# # 数据清洗: 缺失值处理
# # dropna api是可以对缺失值的数据进行删除
# # 无参数使用, 只要列中有null 就删除这一行数据
# df.dropna().show()
# # thresh = 3表示, 最少满足3个有效列, 不满足 就删除当前行数据
# df.dropna(thresh=3).show()
#
# df.dropna(thresh=2, subset=['name', 'age']).show()
# 缺失值处理也可以完成对缺失值进行填充
# DataFrame的 fillna 对缺失的列进行填充
df.fillna("loss").show()
# 指定列进行填充
df.fillna("N/A", subset=['job']).show()
# 设定一个字典, 对所有的列 提供填充规则
df.fillna({"name": "未知姓名", "age": 1, "job": "worker"}).show()
spark.stop()