9. SparkSQL DataFrame

SparkSQL DataFrame

DataFrame的组成

DataFrame是一个二维表结构,那么表格结构就有无法绕开的三个点:

  • 表结构描述

比如,在MySQL中的一张表:

  • 由许多行组成
  • 数据也被分成多个列
  • 表也有表结构信息(列、列名、列类型、列约束等)

基于这个前提,DataFrame的组成如下:

  • 在结构层面:
    • StructType对象描述整个DataFrame的表结构
    • StructField对象描述一个列的信息
  • 在数据层面
    • Row对象记录一行数据
    • Column对象记录一列数据并包含列的信息
structtype=StructType([
    StructField("id",IntegerType(),False),
    StructField("name",StringType(),False),
    StructField("age",IntegerType(),False)
])

一个StructField记录:列名、列类型、列是否运行为空
多个StructField组成一个StructType对象。
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空
同时,一行数据描述为Row对象,如Row(1, 张三, 11)
一列数据描述为Column对象,Column对象包含一列数据和列的信息
Row、Column、StructType、StructField的编程我们在后面编码阶段会接触

DataFrame的代码构建

基于RDD方式1

DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构

# coding:utf8
from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 基于RDD转换成DataFrame
    rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/people.txt").\
        map(lambda x: x.split(",")).\
        map(lambda x: (x[0], int(x[1])))

    # 构建DataFrame对象
    # 参数1 被转换的RDD
    # 参数2 指定列名, 通过list的形式指定, 按照顺序依次提供字符串名称即可
    df = spark.createDataFrame(rdd, schema=['name', 'age'])

    # 打印DataFrame的表结构
    df.printSchema()

    # 打印df中的数据
    # 参数1 表示 展示出多少条数据, 默认不传的话是20
    # 参数2 表示是否对列进行截断, 如果列的数据长度超过20个字符串长度, 后续的内容不显示以...代替
    # 如果给False 表示不阶段全部显示, 默认是True
    df.show(20, False)

    # 将DF对象转换成临时视图表, 可供sql语句查询
    df.createOrReplaceTempView("people")
    spark.sql("SELECT * FROM people WHERE age < 30").show()

    spark.stop()

基于RDD方式2

通过StructType对象来定义DataFrame的“表结构”转换RDD

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 基于RDD转换成DataFrame
    rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/people.txt").\
        map(lambda x: x.split(",")).\
        map(lambda x: (x[0], int(x[1])))

    # 构建表结构的描述对象: StructType对象
    schema = StructType().\
        add("name", StringType(), nullable=True).\
        add("age", IntegerType(), nullable=False)

    # 基于StructType对象去构建RDD到DF的转换
    df = spark.createDataFrame(rdd, schema=schema)

    df.printSchema()
    df.show()
    spark.stop()

基于RDD方式3

使用RDD的toDF方法转换RDD

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 基于RDD转换成DataFrame
    rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/people.txt").\
        map(lambda x: x.split(",")).\
        map(lambda x: (x[0], int(x[1])))

    rdd.cache()

    # toDF的方式构建DataFrame
    df1 = rdd.toDF(["name", "age"])
    df1.printSchema()
    df1.show()

    # toDF的方式2 通过StructType来构建
    schema = StructType()\
        .add("name", StringType(), nullable=True)\
        .add("age", IntegerType(), nullable=False)

    df2 = rdd.toDF(schema=schema)
    df2.printSchema()
    df2.show()
    spark.stop()

基于Pandas的DataFrame

将Pandas的DataFrame对象,转变为分布式的SparkSQL DataFrame对象

# coding:utf8
from pyspark.sql import SparkSession
import pandas as pd

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # pandas==2.0.3 已删除了 iteritems,改用 items
    pd.DataFrame.iteritems = pd.DataFrame.items

    # 基于Pandas的DataFrame构建SparkSQL的DataFrame对象
    pdf = pd.DataFrame(
        {
            "id": [1, 2, 3],
            "name": ["张大仙", "王晓晓", "吕不为"],
            "age": [11, 21, 11]
        }
    )
    # 查看pdf的表结构
    # pdf.info()
    # 查看pdf的数据
    # print(pdf)
    # pdf 转成 SparkSQL的DataFrame对象
    df = spark.createDataFrame(pdf)
    # AttributeError: 'DataFrame' object has no attribute 'iteritems' 报错,降低pandas版本或者更改iteritems
    df.printSchema()
    df.show()
    spark.stop()

读取外部数据

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
    appName("test").\
    master("local[*]").\
    getOrCreate()

#### 读取text数据源
# 使用format("text")读取文本数据,读取到的DataFrame只会有一个列,列名默认称之为:value
schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text")\
    .schema(schema)\
    .load("file:///D:/git-python/大数据-Spark/files/people.txt")
df.show()

#### 读取json数据源
# 使用format("json")读取json数据
df2 = spark.read.format("json").\
  load("file:///D:/git-python/大数据-Spark/files/people.json")
# JSON 类型一般不用写.schema, json自带, json带有列名和列类型(字符串和数字)
df2.show()

#### 读取csv数据源
# 使用format("csv")读取csv数据
# sep 指定分隔符; header 是否有表头; encoding 指定编码格式
df3 = spark.read.format("csv")\
 .option("sep", ";")\
 .option("header", True)\
 .option("encoding", "utf-8")\
 .schema("name STRING, age INT, job STRING")\
 .load("file:///D:/git-python/大数据-Spark/files/people.csv")
df3.printSchema()
df3.limit(3).show()

#### 读取parquet数据源
# 使用format("parquet")读取parquet数据
# parquet: 是Spark中常用的一种列式存储文件格式
# 和Hive中的ORC差不多, 他俩都是列存储格式
# parquet对比普通的文本文件的区别:
# ● parquet 内置schema (列名\ 列类型\ 是否为空)
# ● 存储是以列作为存储格式
# ● 存储是序列化存储在文件中的(有压缩属性体积小)
# parquet 自带schema, 直接load啥也不需要了
df4 = spark.read.format("parquet").\
  load("file:///D:/git-python/大数据-Spark/files/users.parquet")
df4.printSchema()
df4.show()

spark.stop()