12. SparkSQL 函数定义

SparkSQL 函数定义

无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.functions中

SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。

回顾Hive中自定义函数有三种类型:

  • 第一种:UDF(User-Defined-Function)函数
    • 一对一的关系,输入一个值经过函数以后输出一个值;
    • 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;
  • 第二种:UDAF(User-Defined Aggregation Function)聚合函数
    • 多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;
  • 第三种:UDTF(User-Defined Table-Generating Functions)函数
    • 一对多的关系,输入一个值输出多个值(一行变为多行);
    • 用户自定义生成函数,有点像flatMap;

SparkSQL 定义UDF函数

定义方式有2种

  1. sparksession.udf.register()
    • 注册的UDF可以用于DSL和SQL
    • 返回值用于DSL风格,传参内给的名字用于SQL风格
  2. pyspark.sql.functions.udf
    • 仅能用于DSL风格

方式1语法:

  • udf对象 = sparksession.udf.register(参数1,参数2,参数3)
  • 参数1:UDF名称,可用于SQL风格
  • 参数2:被注册成UDF的方法名
  • 参数3:声明UDF的返回值类型
  • udf对象:返回值对象,是一个UDF对象,可用于DSL风格

方式2语法:

  • udf对象 = F.udf(参数1, 参数2)
  • 参数1:被注册成UDF的方法名
  • 参数2:声明UDF的返回值类型
  • udf对象:返回值对象,是一个UDF对象,可用于DSL风格
  • 其中F是:
    • from pyspark.sql import functions as F
    • 其中,被注册成UDF的方法名是指具体的计算方法,如:
    • def add(x, y): x + y
    • add就是将要被注册成UDF的方法名
# coding:utf8
# 构建一个Interger返回值类型的UDF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
    df = rdd.toDF(["num"])

    # 1: 方式1 sparksession.udf.register(), DSL和SQL风格均可以使用
    # UDF的处理函数
    def num_ride_10(num):
        return num * 10
    # 参数1: 注册的UDF的名称, 这个udf名称, 仅可以用于 SQL风格
    # 参数2: UDF的处理逻辑, 是一个单独的方法
    # 参数3: 声明UDF的返回值类型, 注意: UDF注册时候, 必须声明返回值类型, 并且UDF的真实返回值一定要和声明的返回值一致
    # 返回值对象: 这是一个UDF对象, 仅可以用于 DSL 语法
    # 当前这种方式定义的UDF, 可以通过参数1的名称用于SQL风格, 通过返回值对象用户DSL风格
    udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())

    # SQL风格中使用
    # selectExpr 以SELECT的表达式执行, 表达式 SQL风格的表达式(字符串)
    # select方法, 接受普通的字符串字段名, 或者返回值是Column对象的计算
    df.selectExpr("udf1(num)").show()

    # DSL 风格中使用
    # 返回值UDF对象 如果作为方法使用, 传入的参数 一定是Column对象
    df.select(udf2(df['num'])).show()

    # 2: 方式2注册, 仅能用于DSL风格
    udf3 = F.udf(num_ride_10, IntegerType())
    df.select(udf3(df['num'])).show()

    # df.selectExpr("udf3(num)").show() # AnalysisException: [UNRESOLVED_ROUTINE] Cannot resolve function `udf3` on search path

    spark.stop()
# coding:utf8
# 注册一个ArrayType(数字\list)类型的返回值UDF

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql import functions as F

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
    df = rdd.toDF(["line"])

    # 注册UDF, UDF的执行函数定义
    def split_line(data):
        return data.split(" ")  # 返回值是一个Array对象
    # 方式1 构建UDF
    udf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))

    # DLS风格
    df.select(udf2(df['line'])).show()
    # SQL风格
    df.createTempView("lines")
    spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)

    # 方式2的形式构建UDF
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df['line'])).show(truncate=False)

    spark.stop()
# coding:utf8
# 注册一个字典类型的返回值的UDF
import string
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 假设 有三个数字  1 2 3  我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回
    # 比如传入1 我们返回 {"num":1, "letters": "a"}
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(["num"])

    # 注册UDF
    def process(data):
        return {"num": data, "letters": string.ascii_letters[data]}

    """
    UDF的返回值是字典的话, 需要用StructType来接收
    """
    udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\
                              add("letters", StringType(), nullable=True))

    df.selectExpr("udf1(num)").show(truncate=False)
    df.select(udf1(df['num'])).show(truncate=False)

    spark.stop()
# coding:utf8
# 使用RDD代码模拟UDAF效果
from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
    df = rdd.map(lambda x: [x]).toDF(['num'])

    # 折中的方式 就是使用RDD的mapPartitions 算子来完成聚合操作
    # 如果用mapPartitions API 完成UDAF聚合, 一定要单分区
    single_partition_rdd = df.rdd.repartition(1)

    def process(iter):
        sum = 0
        for row in iter:
            sum += row['num']

        return [sum]    # 一定要嵌套list, 因为mapPartitions方法要求的返回值是list对象

    print(single_partition_rdd.mapPartitions(process).collect())
    spark.stop()

SparkSQL 使用窗口函数

# coding:utf8
# 演示sparksql 窗口函数(开窗函数)
from pyspark.sql import SparkSession
# 导入StructType对象
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType

if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("create df").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", "2").\
        getOrCreate()
    sc = spark.sparkContext
    rdd = sc.parallelize([
        ('张三', 'class_1', 99),
        ('王五', 'class_2', 35),
        ('王三', 'class_3', 57),
        ('王久', 'class_4', 12),
        ('王丽', 'class_5', 99),
        ('王娟', 'class_1', 90),
        ('王军', 'class_2', 91),
        ('王俊', 'class_3', 33),
        ('王君', 'class_4', 55),
        ('王珺', 'class_5', 66),
        ('郑颖', 'class_1', 11),
        ('郑辉', 'class_2', 33),
        ('张丽', 'class_3', 36),
        ('张张', 'class_4', 79),
        ('黄凯', 'class_5', 90),
        ('黄开', 'class_1', 90),
        ('黄恺', 'class_2', 90),
        ('王凯', 'class_3', 11),
        ('王凯杰', 'class_1', 11),
        ('王开杰', 'class_2', 3),
        ('王景亮', 'class_3', 99)
        ])

    schema = StructType().add("name", StringType()).\
    add("class", StringType()).\
    add("score", IntegerType())

    df = rdd.toDF(schema)

    # 窗口函数只用于SQL风格, 所以注册表先
    df.createTempView("stu")

    # 聚合窗口
    spark.sql("""
        SELECT *, CAST(AVG(score) OVER() AS DECIMAL(10,2)) AS avg_score FROM stu
        """).show()
    # SELECT *, AVG(score) OVER() AS avg_score FROM stu 等同于
    # SELECT * FROM stu
    # SELECT AVG(score) FROM stu

    # 两个SQL的结果集进行整合而来
    spark.sql("""
        SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu
        """).show()
    # SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu 等同于
    # SELECT * FROM stu
    # SELECT AVG(score) FROM stu GROUP BY class

    # 两个SQL的结果集进行整合而来
    # 排序窗口
    spark.sql("""
        SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank, 
        DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank, 
        RANK() OVER(ORDER BY score) AS rank
        FROM stu
        """).show()
    # NTILE
    spark.sql("""
        SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
        """).show()

    spark.stop()