10. SparkSQL DataFrame 操作

SparkSQL DataFrame 操作

DataFrame支持两种风格进行编程,分别是:

  • DSL风格
    • DSL称之为:领域特定语言。其实就是指DataFrame的特有API
    • DSL风格意思就是以调用API的方式来处理Data
    • df.where().limit()
  • SQL风格
    • SQL语法风格就是使用SQL语句处理DataFrame的数据
    • spark.sql("SELECT * FROM xxx")
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
    appName("test").\
    master("local[*]").\
    getOrCreate()

#### 读取text数据源
# 使用format("text")读取文本数据,读取到的DataFrame只会有一个列,列名默认称之为:value
df = spark.read.format("csv")\
 .option("sep", ";")\
 .option("header", True)\
 .option("encoding", "utf-8")\
 .schema("name STRING, age INT, job STRING")\
 .load("file:///D:/git-python/大数据-Spark/files/people.csv")

print("========== DSL - show 方法 ==========")
df.show(3,False)
#### DSL - show 方法
# 功能:展示DataFrame中的数据, 默认展示20条
# df.show(参数1, 参数2)
# - 参数1: 默认是20, 控制展示多少条
# - 参数2: 是否阶段列, 默认只输出20个字符的长度, 过长不显示, 要全部显示的话请填入truncate = False

print("========== DSL - printSchema方法 ==========")
#### DSL - printSchema方法
# 功能:打印输出df的schema信息
df.printSchema()

print("========== DSL - select ==========")
#### DSL - select
# 功能:选择DataFrame中的指定列(通过传入参数进行指定)
# 可传递:
# • 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串
# 列名来指定列
# • List[Column]对象或者List[str]对象, 用来选择多个列
df.select("name", "age").show(1, False)
df.select(df.name, df.age).show(1, False)
df.select(["name", "age"]).show(1, False)
# Column对象获取
name_column = df["name"]
age_column = df["age"]
df.select(name_column, age_column).show(1, False)

print("========== DSL - filter和where ==========")
#### DSL - filter和where
# 功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame
df.filter("age > 20").show(1, False)
df.filter(df.age > 20).show(1, False)
df.where("age > 20").show(1, False)
df.where(df.age > 20).show(1, False)

print("========== DSL - groupBy 分组 ==========")
#### DSL - groupBy 分组
# 功能:按照指定的列进行数据的分组,返回值是GroupedData对象,GroupedData对象有很多API,min、max、avg、sum、等等许多方法都存在
df.groupBy("age").count().show()
df.groupBy(df.job).count().orderBy("count", ascending=False).show()

#### SQL风格语法-注册DataFrame成为表
df.createTempView("score") #注册一个临时视图(表)
df.createOrReplaceTempView("score")#注册一个临时表,如果存在进行替换
# 注册一个全局表
df.createGlobalTempView("score")
# 全局表可以跨SparkSession使用,查询前需要使用:global_temp.表名
# 临时表只能在当前SparkSession中使用

print("========== SQL风格语法-使用SQL查询 ==========")
#### SQL风格语法-使用SQL查询
spark.sql("select * from global_temp.score").show(3, False)
spark.sql("select * from score").show(3, False)

#### pyspark.sql.functions 包
# PySpark提供了一个包: pyspark.sql.functions,这个包里面提供了一系列的计算函数供SparkSQL使用
from pyspark.sql import functions as F
# 使用F调用函数,常用函数如下:
# - split():将字符串分割成多个部分
# - explode():将数据转列
# - col():获取DataFrame中的指定列
# - lit():创建一个Column对象
# - expr():执行一个表达式
# - broadcast():广播一个DataFrame
# - collect_list():收集一列中的所有值到一个列表中
# - collect_set():收集一列中的所有值到一个集合中
# - concat():连接多个字符串
# - count():计算一列中的值的数量
# - first():获取一列中的第一个值
# - last():获取一列中的最后一个值
# - max():获取一列中的最大值
# - min():获取一列中的最小值
# - mean():计算一列中的平均值
# - sum():计算一列中的总和
# - stddev():计算一列中的标准差
# - variance():计算一列中的方差
# - to_date():将字符串转换为日期
# - to_timestamp():将字符串转换为时间戳
# - date_format():将日期格式化为字符串
# - date_add():向日期添加天数
# - date_sub():从日期减去天数
# - datediff():计算两个日期之间的天数
# - date_trunc():截断日期到指定的精度
# - unix_timestamp():将日期转换为Unix时间戳
# - from_unixtime():将Unix时间戳转换为日期
# - year():获取日期中的年份
# - month():获取日期中的月份
# - day():获取日期中的天数
# - hour():获取时间中的小时
# - minute():获取时间中的分钟
# - second():获取时间中的秒
# - window():创建一个窗口对象,用于窗口函数
# - lag():获取前一个值
# 等等...

spark.stop()

词频统计案例

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 1: SQL 风格进行处理
    rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/words.txt").\
        flatMap(lambda x: x.split(" ")).\
        map(lambda x: [x])

    print(rdd.collect())
    df = rdd.toDF(["word"])

    # 注册DF为表格
    df.createTempView("words")
    spark.sql("SELECT word, COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show()

    # 2: DSL 风格处理
    df = spark.read.format("text").load("file:///D:/git-python/大数据-Spark/files/words.txt")

    # withColumn方法
    # 方法功能: 对已存在的列进行操作, 返回一个新的列, 如果名字和老列相同, 那么替换, 否则作为新列存在
    df2 = df.withColumn("value", F.explode(F.split(df['value'], " ")))
    df2.groupBy("value").\
        count().\
        withColumnRenamed("value", "word").\
        withColumnRenamed("count", "cnt").\
        orderBy("cnt", ascending=False).\
        show()

    spark.stop()

电影评分数据分析案例

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    """
    spark.sql.shuffle.partitions 参数指的是, 在sql计算中, shuffle算子阶段默认的分区数是200个.
    对于集群模式来说, 200个默认也算比较合适
    如果在local下运行, 200个很多, 在调度上会带来额外的损耗
    所以在local下建议修改比较低 比如2\4\10均可
    这个参数和Spark RDD中设置并行度的参数 是相互独立的.
    """

    # 1. 读取数据集
    schema = StructType().add("user_id", StringType(), nullable=True).\
        add("movie_id", IntegerType(), nullable=True).\
        add("rank", IntegerType(), nullable=True).\
        add("ts", StringType(), nullable=True)
    df = spark.read.format("csv").\
        option("sep", "\t").\
        option("header", False).\
        option("encoding", "utf-8").\
        schema(schema=schema).\
        load("file:///D:/git-python/大数据-Spark/files/u.data")

    # 1: 用户平均分
    df.groupBy("user_id").\
        avg("rank").\
        withColumnRenamed("avg(rank)", "avg_rank").\
        withColumn("avg_rank", F.round("avg_rank", 2)).\
        orderBy("avg_rank", ascending=False).\
        show()

    # 2: 电影的平均分查询
    df.createTempView("movie")
    spark.sql("""
        SELECT movie_id, ROUND(AVG(rank), 2) AS avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC
    """).show()

    # 3: 查询大于平均分的电影的数量 # Row
    print("大于平均分电影的数量: ", df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())

    # 4: 查询高分电影中(>3)打分次数最多的用户, 此人打分的平均分
    # 先找出这个人
    user_id = df.where("rank > 3").\
        groupBy("user_id").\
        count().\
        withColumnRenamed("count", "cnt").\
        orderBy("cnt", ascending=False).\
        limit(1).\
        first()['user_id']
    # 计算这个人的打分平均分
    df.filter(df['user_id'] == user_id).\
        select(F.round(F.avg("rank"), 2)).show()

    # 5: 查询每个用户的平局打分, 最低打分, 最高打分
    df.groupBy("user_id").\
        agg(
            F.round(F.avg("rank"), 2).alias("avg_rank"),
            F.min("rank").alias("min_rank"),
            F.max("rank").alias("max_rank")
        ).show()

    # 6: 查询评分超过100次的电影, 的平均分 排名 TOP10
    df.groupBy("movie_id").\
        agg(
            F.count("movie_id").alias("cnt"),
            F.round(F.avg("rank"), 2).alias("avg_rank")
        ).where("cnt > 100").\
        orderBy("avg_rank", ascending=False).\
        limit(10).\
        show()

    spark.stop()

# 1. agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合
# 2. alias: 它是Column对象的API, 可以针对一个列 进行改名
# 3. withColumnRenamed: 它是DataFrame的API, 可以对DF中的列进行改名, 一次改一个列, 改多个列 可以链式调用
# 4. orderBy: DataFrame的API, 进行排序, 参数1是被排序的列, 参数2是 升序(True) 或 降序 False
# 5. first: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象.
# # Row对象 就是一个数组, 你可以通过row['列名'] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等)

SparkSQL 数据清洗API

# coding:utf8
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import functions as F

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    """读取数据"""
    df = spark.read.format("csv").\
        option("sep", ";").\
        option("header", True).\
        load("file:///D:/git-python/大数据-Spark/files/people.csv")

    # # 数据清洗: 数据去重
    # # dropDuplicates 是DataFrame的API, 可以完成数据去重
    # # 无参数使用, 对全部的列 联合起来进行比较, 去除重复值, 只保留一条
    df.dropDuplicates().show()
    #
    # df.dropDuplicates(['age', 'job']).show()
    #
    #
    # # 数据清洗: 缺失值处理
    # # dropna api是可以对缺失值的数据进行删除
    # # 无参数使用, 只要列中有null 就删除这一行数据
    # df.dropna().show()
    # # thresh = 3表示, 最少满足3个有效列,  不满足 就删除当前行数据
    # df.dropna(thresh=3).show()
    #
    # df.dropna(thresh=2, subset=['name', 'age']).show()

    # 缺失值处理也可以完成对缺失值进行填充
    # DataFrame的 fillna 对缺失的列进行填充
    df.fillna("loss").show()

    # 指定列进行填充
    df.fillna("N/A", subset=['job']).show()

    # 设定一个字典, 对所有的列 提供填充规则
    df.fillna({"name": "未知姓名", "age": 1, "job": "worker"}).show()

    spark.stop()