SparkSQL 初识
SparkSQL 是Spark的一个模块, 用于处理海量结构化数据 限定: 结构化数据处理
SparkSQL是非常成熟的 海量结构化数据处理框架.学习SparkSQL主要在2个点:
- SparkSQL本身十分优秀, 支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等等
- 企业大面积在使用SparkSQL处理业务数据
- 离线开发
- 数仓搭建
- 科学计算
- 数据分析
SparkSQL的特点:
- 融合性
- SQL可以无缝集成在代码中, 随时用SQL处理数据
- 统一数据访问
- 一套标准API可读写不同数据源
- Hive兼容
- 可以使用SparkSQL直接计算并生成Hive数据表
- 标准化连接
- 支持标准化JDBC\ODBC连接, 方便和各种数据库进行数据交互
SparkSQL 其实有3类数据抽象对象
- SchemaRDD对象(已废弃)
- DataSet对象:可用于Java、Scala语言
- DataFrame对象:可用于Java、Scala、Python、R
SparkSession对象
- 在RDD阶段,程序的执行入口对象是:SparkContext
- 在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
- SparkSession对象可以:- 用于SparkSQL编程作为入口对象- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
初识SparkSession对象
# coding:utf8
# SparkSQL 中的入口对象是SparkSession对象
from pyspark.sql import SparkSession
from pyspark import SparkConf
if __name__ == '__main__':
# 创建SparkConf对象
conf = SparkConf()
# 设置应用程序名称
conf.setAppName("MySparkApp")
# 设置executor资源 单个Task内存计算方法为(executor.memory/executor.cores=8/4=2g)
conf.set("spark.executor.instances", "1")
conf.set("spark.executor.cores", "4")
conf.set("spark.executor.memory", "8g")
# 指定每个 Executor 进程为堆外内存分配的额外内存量。这通常是 Executor 堆内存的一部分,用于 Java 开销、垃圾收集、JIT 编译等。
conf.set("spark.yarn.executor.memoryOverhead", "3072")
# 设置driver资源
conf.set("spark.driver.cores", "4")
conf.set("spark.driver.memory", "8g")
# 设置partithion配置数
conf.set("spark.default.parallelism", "200") # 对rdd进行操作时, 默认的partition数量,对sparksql无效,默认值是2倍的CPU核心数
conf.set("spark.sql.shuffle.partitions", "200") # 调整shuffle的并行度,默认200
# 构建SparkSession对象, 这个对象是构建器模式通过builder方法来构建
# spark = SparkSession.builder.\
# appName("local[*]").\
# config("spark.sql.shuffle.partitions", "4").\
# config("spark.executor.memory", "2g").\
# config("spark.driver.memory", "1g").\
# getOrCreate()
# appName 设置程序名称, config设置一些常用属性
# 最后通过getOrCreate()方法创建SparkSession对象
# 使用SparkConf对象构建SparkSession
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# 通过SparkSession对象获取SparkContext对象
sc = spark.sparkContext
# 输出当前配置的所有参数
# print("当前Spark配置参数:")
# for item in spark.sparkContext.getConf().getAll():
# print(item) # 打印所有配置参数
# 读取文件数据
# SparkSQL 读取文件数据时,默认以行为单位读取,默认分隔符为\t
df = spark.read.csv('file:///D:/git-python/大数据-Spark/files/stu_score.txt', sep=',', header=False)
# toDF方法可以给DataFrame对象添加列名
df2 = df.toDF('id', 'name', 'score')
# 查看DataFrame对象的结构
df2.printSchema()
df2.limit(5).show()
# 注册临时表
# createOrReplaceTempView:创建临时视图,此视图的生命周期与用于创建此数据集的[SparkSession]相关联,如果已经存在同名的临时视图,则替换之。
# createGlobalTempView:创建全局临时视图,此时图的生命周期与Spark Application绑定。
# createTempView: 创建一个临时视图,如果已经存在同名的视图,则会抛出异常。
df2.createTempView("score")
# SQL 风格
spark.sql("""
SELECT * FROM score WHERE name='语文' LIMIT 5
""").show()
# DSL 风格
df2.where("name='语文'").limit(5).show()
spark.stop()