SparkSQL DataFrame
DataFrame的组成
DataFrame是一个二维表结构,那么表格结构就有无法绕开的三个点:
- 行
- 列
- 表结构描述
比如,在MySQL中的一张表:
- 由许多行组成
- 数据也被分成多个列
- 表也有表结构信息(列、列名、列类型、列约束等)
基于这个前提,DataFrame的组成如下:
- 在结构层面:
- StructType对象描述整个DataFrame的表结构
- StructField对象描述一个列的信息
- 在数据层面
- Row对象记录一行数据
- Column对象记录一列数据并包含列的信息
structtype=StructType([
StructField("id",IntegerType(),False),
StructField("name",StringType(),False),
StructField("age",IntegerType(),False)
])
一个StructField记录:列名、列类型、列是否运行为空
多个StructField组成一个StructType对象。
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空
同时,一行数据描述为Row对象,如Row(1, 张三, 11)
一列数据描述为Column对象,Column对象包含一列数据和列的信息
Row、Column、StructType、StructField的编程我们在后面编码阶段会接触
DataFrame的代码构建
基于RDD方式1
DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构
# coding:utf8
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
# 基于RDD转换成DataFrame
rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/people.txt").\
map(lambda x: x.split(",")).\
map(lambda x: (x[0], int(x[1])))
# 构建DataFrame对象
# 参数1 被转换的RDD
# 参数2 指定列名, 通过list的形式指定, 按照顺序依次提供字符串名称即可
df = spark.createDataFrame(rdd, schema=['name', 'age'])
# 打印DataFrame的表结构
df.printSchema()
# 打印df中的数据
# 参数1 表示 展示出多少条数据, 默认不传的话是20
# 参数2 表示是否对列进行截断, 如果列的数据长度超过20个字符串长度, 后续的内容不显示以...代替
# 如果给False 表示不阶段全部显示, 默认是True
df.show(20, False)
# 将DF对象转换成临时视图表, 可供sql语句查询
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE age < 30").show()
spark.stop()
基于RDD方式2
通过StructType对象来定义DataFrame的“表结构”转换RDD
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
# 基于RDD转换成DataFrame
rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/people.txt").\
map(lambda x: x.split(",")).\
map(lambda x: (x[0], int(x[1])))
# 构建表结构的描述对象: StructType对象
schema = StructType().\
add("name", StringType(), nullable=True).\
add("age", IntegerType(), nullable=False)
# 基于StructType对象去构建RDD到DF的转换
df = spark.createDataFrame(rdd, schema=schema)
df.printSchema()
df.show()
spark.stop()
基于RDD方式3
使用RDD的toDF方法转换RDD
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
# 基于RDD转换成DataFrame
rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/people.txt").\
map(lambda x: x.split(",")).\
map(lambda x: (x[0], int(x[1])))
rdd.cache()
# toDF的方式构建DataFrame
df1 = rdd.toDF(["name", "age"])
df1.printSchema()
df1.show()
# toDF的方式2 通过StructType来构建
schema = StructType()\
.add("name", StringType(), nullable=True)\
.add("age", IntegerType(), nullable=False)
df2 = rdd.toDF(schema=schema)
df2.printSchema()
df2.show()
spark.stop()
基于Pandas的DataFrame
将Pandas的DataFrame对象,转变为分布式的SparkSQL DataFrame对象
# coding:utf8
from pyspark.sql import SparkSession
import pandas as pd
if __name__ == '__main__':
# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
# pandas==2.0.3 已删除了 iteritems,改用 items
pd.DataFrame.iteritems = pd.DataFrame.items
# 基于Pandas的DataFrame构建SparkSQL的DataFrame对象
pdf = pd.DataFrame(
{
"id": [1, 2, 3],
"name": ["张大仙", "王晓晓", "吕不为"],
"age": [11, 21, 11]
}
)
# 查看pdf的表结构
# pdf.info()
# 查看pdf的数据
# print(pdf)
# pdf 转成 SparkSQL的DataFrame对象
df = spark.createDataFrame(pdf)
# AttributeError: 'DataFrame' object has no attribute 'iteritems' 报错,降低pandas版本或者更改iteritems
df.printSchema()
df.show()
spark.stop()
读取外部数据
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
#### 读取text数据源
# 使用format("text")读取文本数据,读取到的DataFrame只会有一个列,列名默认称之为:value
schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text")\
.schema(schema)\
.load("file:///D:/git-python/大数据-Spark/files/people.txt")
df.show()
#### 读取json数据源
# 使用format("json")读取json数据
df2 = spark.read.format("json").\
load("file:///D:/git-python/大数据-Spark/files/people.json")
# JSON 类型一般不用写.schema, json自带, json带有列名和列类型(字符串和数字)
df2.show()
#### 读取csv数据源
# 使用format("csv")读取csv数据
# sep 指定分隔符; header 是否有表头; encoding 指定编码格式
df3 = spark.read.format("csv")\
.option("sep", ";")\
.option("header", True)\
.option("encoding", "utf-8")\
.schema("name STRING, age INT, job STRING")\
.load("file:///D:/git-python/大数据-Spark/files/people.csv")
df3.printSchema()
df3.limit(3).show()
#### 读取parquet数据源
# 使用format("parquet")读取parquet数据
# parquet: 是Spark中常用的一种列式存储文件格式
# 和Hive中的ORC差不多, 他俩都是列存储格式
# parquet对比普通的文本文件的区别:
# ● parquet 内置schema (列名\ 列类型\ 是否为空)
# ● 存储是以列作为存储格式
# ● 存储是序列化存储在文件中的(有压缩属性体积小)
# parquet 自带schema, 直接load啥也不需要了
df4 = spark.read.format("parquet").\
load("file:///D:/git-python/大数据-Spark/files/users.parquet")
df4.printSchema()
df4.show()
spark.stop()