13. SparkSQL 优化器、流程、HIVE、案例

SparkSQL 优化器、流程、HIVE、案例

Catalyst优化器 总结

catalyst的各种优化细节非常多,大方面的优化点有2个

  • 谓词下推(Predicate Pushdown)\断言下推:将逻辑判断 提前到前面,以减少shuffle阶段的数据量
    • 行过滤,提前执行where
  • 列值裁剪(Column Pruning):将加载的列进行裁剪,尽量减少被处理数据的 宽度
    • 列过滤,提前规划select的字段数量
    • 思考:列值裁剪,有一种非常合适的存储系统:parguet

SparkSQL的执行流程

  1. 提交SparkSQL代码
  2. catalyst优化
    • 生成原始AST语法数
    • 标记AST元数据
    • 进行断言下推和列值裁剪 以及其它方面的优化作用在AST上
    • 将最终AST得到,生成执行计划
    • 将执行计划翻译为RDD代码
  3. Driver执行环境入口构建(SparkSession)
  4. DAG 调度器规划逻辑任务
  5. TASK 调度区分配逻辑任务到具体Executor上工作并监控管理任务
  6. Worker干活

Spark On Hive

Spark提供执行引擎能力,Hive的MetaStore 提供元数据管理功能,让Spark和Metastore连接起来,那么Spark On Hive 就有了:

  1. 引擎: spark
  2. 元数据管理: metastore
# coding:utf8
from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        config("spark.sql.warehouse.dir", "hdfs://ns1/user/hive/warehouse").\
        config("hive.metastore.uris", "thrift://kube-40:9083").\
        enableHiveSupport().\
        getOrCreate()
    sc = spark.sparkContext

    spark.sql("show databases").show()

分布式SQL执行引擎

# 驱动包 查看spark-defaults.conf配置文件内jars所在目录上传jdbc驱动包
hdfs dfs -put -f mysql-connector-j-8.4.0.jar  /user/spark/spark3/jars/
hdfs dfs -put -f mysql-connector-java-5.1.49.jar  /user/spark/spark3/jars/
hdfs dfs -put -f mysql-connector-java-5.1.49-bin.jar  /user/spark/spark3/jars/
# 直接在root账户下启动即可
# Spark提供的ThriftServer,让用户可以通过JDBC/ODBC的方式连接到SparkSQL
cd /opt/cloudera/parcels/CDH/lib/spark3/sbin
/opt/cloudera/parcels/CDH/lib/spark3/sbin/start-thriftserver.sh \
    --hiveconf hive.server2.thrift.port=10000 \
    --hiveconf hive.server2.thrift.bind.host=kube-42 \
    --master local[20]
    --master yarn
# master选择local,每一条sql都是local进程执行
# master选择yarn,每一条sql 都是在YARN集群中执行

# pyhive
yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel libffi-devel gcc make gcc-c++ python-devel cyrus-sasl-devel cyrus-sasl-plain cyrus-sasl-gssapi -y
python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyhive pymysql sasl thrift thrift_sasl

# coding:utf8
from pyhive import hive

if __name__ == '__main__':
    # 获取到Hive(Spark ThriftServer的链接)
    conn = hive.Connection(host="kube-42", port=10000, username="admin")

    # 获取一个游标对象
    cursor = conn.cursor()

    # 执行SQL
    cursor.execute("show databases")

    # 通过fetchall API 获得返回值
    result = cursor.fetchall()

    print(result)

# python3 pyhive_test.py
# [('bikeca',), ('db_msg',), ('default',), ('itcast_dimen',), ('itcast_dwd',), ('itcast_dwm',), ('itcast_dws',), ('itcast_ods',)]

综合案例

# 原始文件地址:hdfs://ns1/user/admin/files/mini.json
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import StringType

"""
需求1: 各省销售额的统计
需求2: TOP3销售省份中, 有多少店铺达到过日销售额1000+
需求3: TOP3省份中, 各省的平均单单价
需求4: TOP3省份中, 各个省份的支付类型比例

receivable: 订单金额
storeProvince: 店铺省份
dateTS: 订单的销售日期
payType: 支付类型
storeID:店铺ID

2个操作
1. 写出结果到MySQL
2. 写出结果到Hive库
"""

if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("SparkSQL Example").\
        master("local[*]").\
        config("spark.sql.adaptive.enabled", "true").\
        config("spark.sql.shuffle.partitions", "20").\
        config("spark.debug.maxToStringFields","100").\
        config("spark.sql.warehouse.dir", "hdfs://ns1/user/hive/warehouse").\
        config("hive.metastore.uris", "thrift://kube-40:9083").\
        enableHiveSupport().\
        getOrCreate()

    # 1. 读取数据
    # 省份信息, 缺失值过滤, 同时省份信息中 会有"null" 字符串
    # 订单的金额, 数据集中有的订单的金额是单笔超过10000的, 这些是测试数据
    # 列值裁剪(SparkSQL会自动做这个优化)
    df = spark.read.format("json").load("hdfs://ns1/user/admin/files/mini.json").\
        dropna(thresh=1, subset=['storeProvince']).\
        filter("storeProvince != 'null'").\
        filter("receivable < 10000").\
        select("storeProvince", "storeID", "receivable", "dateTS", "payType")

    # 需求1: 各省 销售额统计
    province_sale_df = df.groupBy("storeProvince").sum("receivable").\
        withColumnRenamed("sum(receivable)", "money").\
        withColumn("money", F.round("money", 2)).\
        orderBy("money", ascending=False)
    province_sale_df.show(truncate=False)

    # 写出MySQL
    province_sale_df.write.mode("overwrite").\
        format("jdbc").\
        option("url", "jdbc:mysql://172.16.100.42:4000/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf-8").\
        option("dbtable", "province_sale").\
        option("user", "root").\
        option("password", "84^j1TZ-+2b9A@S7Hs").\
        option("encoding", "utf-8").\
        save()

    # 写出Hive表 saveAsTable 可以写出表 要求已经配置好Spark On Hive, 配置好后
    # 会将表写入到Hive的数据仓库中
    province_sale_df.write.mode("overwrite").saveAsTable("db_msg.province_sale", "parquet")

    # 需求2: TOP3销售省份中, 有多少店铺达到过日销售额1000+
    # 2.1 先找到TOP3的销售省份
    top3_province_df = province_sale_df.limit(3).select("storeProvince").withColumnRenamed("storeProvince", "top3_province")

    # 2.2 和 原始的DF进行内关联, 数据关联后, 就是全部都是TOP3省份的销售数据了
    top3_province_df_joined = df.join(top3_province_df, on = df['storeProvince'] == top3_province_df['top3_province'])

    top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)
    # 广东省 1 2021-01-03    1005
    # 广东省 2 ....
    # 广东省 3 ....
    # 湖南省 1 ...
    # 湖南省 2 ...
    # 广东省 33
    # 湖南省 123

    # from_unixtime的精度是秒级, 数据的精度是毫秒级, 要对数据进行精度的裁剪
    province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince", "storeID",
                                    F.from_unixtime(df['dateTS'].substr(0, 10), "yyyy-MM-dd").alias("day")).\
        sum("receivable").withColumnRenamed("sum(receivable)", "money").\
        filter("money > 1000").\
        dropDuplicates(subset=["storeID"]).\
        groupBy("storeProvince").count()
    province_hot_store_count_df.show(truncate=False)
    # 写出MySQL
    province_hot_store_count_df.write.mode("overwrite").\
        format("jdbc").\
        option("url", "jdbc:mysql://172.16.100.42:4000/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf-8").\
        option("dbtable", "province_hot_store_count").\
        option("user", "root").\
        option("password", "84^j1TZ-+2b9A@S7Hs").\
        option("encoding", "utf-8").\
        save()
    # 写出Hive
    province_hot_store_count_df.write.mode("overwrite").saveAsTable("db_msg.province_hot_store_count", "parquet")

    # 需求3: TOP3 省份中 各个省份的平均订单价格(单单价)
    top3_province_order_avg_df = top3_province_df_joined.groupBy("storeProvince").\
        avg("receivable").\
        withColumnRenamed("avg(receivable)", "money").\
        withColumn("money", F.round("money", 2)).\
        orderBy("money", ascending=False)

    top3_province_order_avg_df.show(truncate=False)

    top3_province_order_avg_df.write.mode("overwrite").\
        format("jdbc").\
        option("url", "jdbc:mysql://172.16.100.42:4000/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf-8").\
        option("dbtable", "top3_province_order_avg").\
        option("user", "root").\
        option("password", "84^j1TZ-+2b9A@S7Hs").\
        option("encoding", "utf-8").\
        save()

    top3_province_order_avg_df.write.mode("overwrite").saveAsTable("db_msg.top3_province_order_avg", "parquet")

    # 需求4: TOP3 省份中, 各个省份的支付比例
    # 湖南省 支付宝 33%
    # 湖南省 现金 36%
    # 广东省 微信 33%

    top3_province_df_joined.createTempView("province_pay")

    def udf_func(percent):
        return str(round(percent * 100, 2)) + "%"
    # 注册UDF
    my_udf = F.udf(udf_func, StringType())

    pay_type_df = spark.sql("""
        SELECT storeProvince, payType, (COUNT(payType) / total) AS percent FROM
        (SELECT storeProvince, payType, count(1) OVER(PARTITION BY storeProvince) AS total FROM province_pay) AS sub
        GROUP BY storeProvince, payType, total
    """).withColumn("percent", my_udf("percent"))

    pay_type_df.show()
    pay_type_df.write.mode("overwrite").\
        format("jdbc").\
        option("url", "jdbc:mysql://172.16.100.42:4000/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf-8").\
        option("dbtable", "pay_type").\
        option("user", "root").\
        option("password", "84^j1TZ-+2b9A@S7Hs").\
        option("encoding", "utf-8").\
        save()

    pay_type_df.write.mode("overwrite").saveAsTable("db_msg.pay_type", "parquet")

    top3_province_df_joined.unpersist()

    spark.stop()

# python3 -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark==3.3.4
# spark3-submit --master local[*]  \
#  --conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
#  --conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
#  --conf "spark.default.parallelism=100" \
#  --executor-memory 8g \
#  --executor-cores 8 \
#  --num-executors 1 \
#  --deploy-mode client \ 
#  spark_hive.py
## 注意写入权限