8. SparkSQL 初识

SparkSQL 初识

SparkSQL 是Spark的一个模块, 用于处理海量结构化数据 限定: 结构化数据处理

SparkSQL是非常成熟的 海量结构化数据处理框架.学习SparkSQL主要在2个点:

  • SparkSQL本身十分优秀, 支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等等
  • 企业大面积在使用SparkSQL处理业务数据
    • 离线开发
    • 数仓搭建
    • 科学计算
    • 数据分析

SparkSQL的特点:

  1. 融合性
    • SQL可以无缝集成在代码中, 随时用SQL处理数据
  2. 统一数据访问
    • 一套标准API可读写不同数据源
  3. Hive兼容
    • 可以使用SparkSQL直接计算并生成Hive数据表
  4. 标准化连接
    • 支持标准化JDBC\ODBC连接, 方便和各种数据库进行数据交互

SparkSQL 其实有3类数据抽象对象

  • SchemaRDD对象(已废弃)
  • DataSet对象:可用于Java、Scala语言
  • DataFrame对象:可用于Java、Scala、Python、R

SparkSession对象

  • 在RDD阶段,程序的执行入口对象是:SparkContext
  • 在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
  • SparkSession对象可以:- 用于SparkSQL编程作为入口对象- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

初识SparkSession对象

# coding:utf8
 # SparkSQL 中的入口对象是SparkSession对象
from pyspark.sql import SparkSession
from pyspark import SparkConf

if __name__ == '__main__':
    # 创建SparkConf对象
    conf = SparkConf()
    # 设置应用程序名称
    conf.setAppName("MySparkApp")
    # 设置executor资源 单个Task内存计算方法为(executor.memory/executor.cores=8/4=2g)
    conf.set("spark.executor.instances", "1")
    conf.set("spark.executor.cores", "4")
    conf.set("spark.executor.memory", "8g")
    # 指定每个 Executor 进程为堆外内存分配的额外内存量。这通常是 Executor 堆内存的一部分,用于 Java 开销、垃圾收集、JIT 编译等。
    conf.set("spark.yarn.executor.memoryOverhead", "3072")
    # 设置driver资源
    conf.set("spark.driver.cores", "4")
    conf.set("spark.driver.memory", "8g")
    # 设置partithion配置数
    conf.set("spark.default.parallelism", "200")   # 对rdd进行操作时, 默认的partition数量,对sparksql无效,默认值是2倍的CPU核心数
    conf.set("spark.sql.shuffle.partitions", "200")  # 调整shuffle的并行度,默认200

    # 构建SparkSession对象, 这个对象是构建器模式通过builder方法来构建
    # spark = SparkSession.builder.\
    # appName("local[*]").\
    # config("spark.sql.shuffle.partitions", "4").\
    # config("spark.executor.memory", "2g").\
    # config("spark.driver.memory", "1g").\
    # getOrCreate()
    # appName 设置程序名称, config设置一些常用属性
    # 最后通过getOrCreate()方法创建SparkSession对象

    # 使用SparkConf对象构建SparkSession
    spark = SparkSession.builder.config(conf=conf).getOrCreate()

    # 通过SparkSession对象获取SparkContext对象
    sc = spark.sparkContext

    # 输出当前配置的所有参数
    # print("当前Spark配置参数:")
    # for item in spark.sparkContext.getConf().getAll():
    #     print(item)  # 打印所有配置参数

    # 读取文件数据
    # SparkSQL 读取文件数据时,默认以行为单位读取,默认分隔符为\t
    df = spark.read.csv('file:///D:/git-python/大数据-Spark/files/stu_score.txt', sep=',', header=False)
    # toDF方法可以给DataFrame对象添加列名
    df2 = df.toDF('id', 'name', 'score')
    # 查看DataFrame对象的结构
    df2.printSchema()
    df2.limit(5).show()
    # 注册临时表
    # createOrReplaceTempView:创建临时视图,此视图的生命周期与用于创建此数据集的[SparkSession]相关联,如果已经存在同名的临时视图,则替换之。
    # createGlobalTempView:创建全局临时视图,此时图的生命周期与Spark Application绑定。
    # createTempView: 创建一个临时视图,如果已经存在同名的视图,则会抛出异常。
    df2.createTempView("score")
    # SQL 风格
    spark.sql("""
    SELECT * FROM score WHERE name='语文' LIMIT 5
    """).show()
    # DSL 风格
    df2.where("name='语文'").limit(5).show()

    spark.stop()