SparkSQL 优化器、流程、HIVE、案例
Catalyst优化器 总结
catalyst的各种优化细节非常多,大方面的优化点有2个
- 谓词下推(Predicate Pushdown)\断言下推:将逻辑判断 提前到前面,以减少shuffle阶段的数据量
- 行过滤,提前执行where
- 列值裁剪(Column Pruning):将加载的列进行裁剪,尽量减少被处理数据的 宽度
- 列过滤,提前规划select的字段数量
- 思考:列值裁剪,有一种非常合适的存储系统:parguet
SparkSQL的执行流程
- 提交SparkSQL代码
- catalyst优化
- 生成原始AST语法数
- 标记AST元数据
- 进行断言下推和列值裁剪 以及其它方面的优化作用在AST上
- 将最终AST得到,生成执行计划
- 将执行计划翻译为RDD代码
- Driver执行环境入口构建(SparkSession)
- DAG 调度器规划逻辑任务
- TASK 调度区分配逻辑任务到具体Executor上工作并监控管理任务
- Worker干活
Spark On Hive
Spark提供执行引擎能力,Hive的MetaStore 提供元数据管理功能,让Spark和Metastore连接起来,那么Spark On Hive 就有了:
- 引擎: spark
- 元数据管理: metastore
# coding:utf8
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
config("spark.sql.warehouse.dir", "hdfs://ns1/user/hive/warehouse").\
config("hive.metastore.uris", "thrift://kube-40:9083").\
enableHiveSupport().\
getOrCreate()
sc = spark.sparkContext
spark.sql("show databases").show()
分布式SQL执行引擎
# 驱动包 查看spark-defaults.conf配置文件内jars所在目录上传jdbc驱动包
hdfs dfs -put -f mysql-connector-j-8.4.0.jar /user/spark/spark3/jars/
hdfs dfs -put -f mysql-connector-java-5.1.49.jar /user/spark/spark3/jars/
hdfs dfs -put -f mysql-connector-java-5.1.49-bin.jar /user/spark/spark3/jars/
# 直接在root账户下启动即可
# Spark提供的ThriftServer,让用户可以通过JDBC/ODBC的方式连接到SparkSQL
cd /opt/cloudera/parcels/CDH/lib/spark3/sbin
/opt/cloudera/parcels/CDH/lib/spark3/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10000 \
--hiveconf hive.server2.thrift.bind.host=kube-42 \
--master local[20]
--master yarn
# master选择local,每一条sql都是local进程执行
# master选择yarn,每一条sql 都是在YARN集群中执行
# pyhive
yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel libffi-devel gcc make gcc-c++ python-devel cyrus-sasl-devel cyrus-sasl-plain cyrus-sasl-gssapi -y
python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyhive pymysql sasl thrift thrift_sasl
# coding:utf8
from pyhive import hive
if __name__ == '__main__':
# 获取到Hive(Spark ThriftServer的链接)
conn = hive.Connection(host="kube-42", port=10000, username="admin")
# 获取一个游标对象
cursor = conn.cursor()
# 执行SQL
cursor.execute("show databases")
# 通过fetchall API 获得返回值
result = cursor.fetchall()
print(result)
# python3 pyhive_test.py
# [('bikeca',), ('db_msg',), ('default',), ('itcast_dimen',), ('itcast_dwd',), ('itcast_dwm',), ('itcast_dws',), ('itcast_ods',)]
综合案例
# 原始文件地址:hdfs://ns1/user/admin/files/mini.json
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import StringType
"""
需求1: 各省销售额的统计
需求2: TOP3销售省份中, 有多少店铺达到过日销售额1000+
需求3: TOP3省份中, 各省的平均单单价
需求4: TOP3省份中, 各个省份的支付类型比例
receivable: 订单金额
storeProvince: 店铺省份
dateTS: 订单的销售日期
payType: 支付类型
storeID:店铺ID
2个操作
1. 写出结果到MySQL
2. 写出结果到Hive库
"""
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("SparkSQL Example").\
master("local[*]").\
config("spark.sql.adaptive.enabled", "true").\
config("spark.sql.shuffle.partitions", "20").\
config("spark.debug.maxToStringFields","100").\
config("spark.sql.warehouse.dir", "hdfs://ns1/user/hive/warehouse").\
config("hive.metastore.uris", "thrift://kube-40:9083").\
enableHiveSupport().\
getOrCreate()
# 1. 读取数据
# 省份信息, 缺失值过滤, 同时省份信息中 会有"null" 字符串
# 订单的金额, 数据集中有的订单的金额是单笔超过10000的, 这些是测试数据
# 列值裁剪(SparkSQL会自动做这个优化)
df = spark.read.format("json").load("hdfs://ns1/user/admin/files/mini.json").\
dropna(thresh=1, subset=['storeProvince']).\
filter("storeProvince != 'null'").\
filter("receivable < 10000").\
select("storeProvince", "storeID", "receivable", "dateTS", "payType")
# 需求1: 各省 销售额统计
province_sale_df = df.groupBy("storeProvince").sum("receivable").\
withColumnRenamed("sum(receivable)", "money").\
withColumn("money", F.round("money", 2)).\
orderBy("money", ascending=False)
province_sale_df.show(truncate=False)
# 写出MySQL
province_sale_df.write.mode("overwrite").\
format("jdbc").\
option("url", "jdbc:mysql://172.16.100.42:4000/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf-8").\
option("dbtable", "province_sale").\
option("user", "root").\
option("password", "84^j1TZ-+2b9A@S7Hs").\
option("encoding", "utf-8").\
save()
# 写出Hive表 saveAsTable 可以写出表 要求已经配置好Spark On Hive, 配置好后
# 会将表写入到Hive的数据仓库中
province_sale_df.write.mode("overwrite").saveAsTable("db_msg.province_sale", "parquet")
# 需求2: TOP3销售省份中, 有多少店铺达到过日销售额1000+
# 2.1 先找到TOP3的销售省份
top3_province_df = province_sale_df.limit(3).select("storeProvince").withColumnRenamed("storeProvince", "top3_province")
# 2.2 和 原始的DF进行内关联, 数据关联后, 就是全部都是TOP3省份的销售数据了
top3_province_df_joined = df.join(top3_province_df, on = df['storeProvince'] == top3_province_df['top3_province'])
top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)
# 广东省 1 2021-01-03 1005
# 广东省 2 ....
# 广东省 3 ....
# 湖南省 1 ...
# 湖南省 2 ...
# 广东省 33
# 湖南省 123
# from_unixtime的精度是秒级, 数据的精度是毫秒级, 要对数据进行精度的裁剪
province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince", "storeID",
F.from_unixtime(df['dateTS'].substr(0, 10), "yyyy-MM-dd").alias("day")).\
sum("receivable").withColumnRenamed("sum(receivable)", "money").\
filter("money > 1000").\
dropDuplicates(subset=["storeID"]).\
groupBy("storeProvince").count()
province_hot_store_count_df.show(truncate=False)
# 写出MySQL
province_hot_store_count_df.write.mode("overwrite").\
format("jdbc").\
option("url", "jdbc:mysql://172.16.100.42:4000/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf-8").\
option("dbtable", "province_hot_store_count").\
option("user", "root").\
option("password", "84^j1TZ-+2b9A@S7Hs").\
option("encoding", "utf-8").\
save()
# 写出Hive
province_hot_store_count_df.write.mode("overwrite").saveAsTable("db_msg.province_hot_store_count", "parquet")
# 需求3: TOP3 省份中 各个省份的平均订单价格(单单价)
top3_province_order_avg_df = top3_province_df_joined.groupBy("storeProvince").\
avg("receivable").\
withColumnRenamed("avg(receivable)", "money").\
withColumn("money", F.round("money", 2)).\
orderBy("money", ascending=False)
top3_province_order_avg_df.show(truncate=False)
top3_province_order_avg_df.write.mode("overwrite").\
format("jdbc").\
option("url", "jdbc:mysql://172.16.100.42:4000/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf-8").\
option("dbtable", "top3_province_order_avg").\
option("user", "root").\
option("password", "84^j1TZ-+2b9A@S7Hs").\
option("encoding", "utf-8").\
save()
top3_province_order_avg_df.write.mode("overwrite").saveAsTable("db_msg.top3_province_order_avg", "parquet")
# 需求4: TOP3 省份中, 各个省份的支付比例
# 湖南省 支付宝 33%
# 湖南省 现金 36%
# 广东省 微信 33%
top3_province_df_joined.createTempView("province_pay")
def udf_func(percent):
return str(round(percent * 100, 2)) + "%"
# 注册UDF
my_udf = F.udf(udf_func, StringType())
pay_type_df = spark.sql("""
SELECT storeProvince, payType, (COUNT(payType) / total) AS percent FROM
(SELECT storeProvince, payType, count(1) OVER(PARTITION BY storeProvince) AS total FROM province_pay) AS sub
GROUP BY storeProvince, payType, total
""").withColumn("percent", my_udf("percent"))
pay_type_df.show()
pay_type_df.write.mode("overwrite").\
format("jdbc").\
option("url", "jdbc:mysql://172.16.100.42:4000/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf-8").\
option("dbtable", "pay_type").\
option("user", "root").\
option("password", "84^j1TZ-+2b9A@S7Hs").\
option("encoding", "utf-8").\
save()
pay_type_df.write.mode("overwrite").saveAsTable("db_msg.pay_type", "parquet")
top3_province_df_joined.unpersist()
spark.stop()
# python3 -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark==3.3.4
# spark3-submit --master local[*] \
# --conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
# --conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
# --conf "spark.default.parallelism=100" \
# --executor-memory 8g \
# --executor-cores 8 \
# --num-executors 1 \
# --deploy-mode client \
# spark_hive.py
## 注意写入权限