SparkSQL DataFrame 写出与JDBC
DataFrame 数据写出
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
# 1. 读取数据集
schema = StructType().add("user_id", StringType(), nullable=True). \
add("movie_id", IntegerType(), nullable=True). \
add("rank", IntegerType(), nullable=True). \
add("ts", StringType(), nullable=True)
df = spark.read.format("csv"). \
option("sep", "\t"). \
option("header", False). \
option("encoding", "utf-8"). \
schema(schema=schema). \
load("file:///D:/git-python/大数据-Spark/files/u.data")
# Write text 写出, 只能写出一个列的数据, 需要将df转换为单列df
df.select(F.concat_ws("---", "user_id", "movie_id", "rank", "ts")).\
write.\
mode("overwrite").\
format("text").\
save("file:///D:/git-python/大数据-Spark/files/text")
# 186---302---3---891717742
# 22---377---1---878887116
# Write csv
df.write.mode("overwrite").\
format("csv").\
option("sep", ";").\
option("header", True).\
save("file:///D:/git-python/大数据-Spark/files/csv")
# user_id;movie_id;rank;ts
# 196;242;3;881250949
# 186;302;3;891717742
# Write json
df.write.mode("overwrite").\
format("json").\
save("file:///D:/git-python/大数据-Spark/files/json")
# {"user_id":"196","movie_id":242,"rank":3,"ts":"881250949"}
# {"user_id":"186","movie_id":302,"rank":3,"ts":"891717742"}
# Write parquet
df.write.mode("overwrite").\
format("parquet").\
save("file:///D:/git-python/大数据-Spark/files/parquet")
DataFrame 通过JDBC读写数据库
将驱动jar包放在:python环境目录下/Lib/site-packages/pyspark/jars
- mysql-connector-java-5.1.49-bin.jar
- mysql-connector-j-8.4.0.jar
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
# import os
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path D:/miniforge3/Lib/site-packages/pyspark/jars --jars D:/miniforge3/Lib/site-packages/pyspark/jars/mysql-connector-java-5.1.49-bin.jar'
# config('spark.driver.extraClassPath', 'D:/miniforge3/Lib/site-packages/pyspark/jars/mysql-connector-java-5.1.49-bin.jar').\
if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
# 1. 读取数据集
schema = StructType().add("user_id", StringType(), nullable=True). \
add("movie_id", IntegerType(), nullable=True). \
add("rank", IntegerType(), nullable=True). \
add("ts", StringType(), nullable=True)
df = spark.read.format("csv"). \
option("sep", "\t"). \
option("header", False). \
option("encoding", "utf-8"). \
schema(schema=schema). \
load("file:///D:/git-python/大数据-Spark/files/u.data")
# 1. 写出df到mysql数据库中
df.write.mode("overwrite").\
format("jdbc").\
option("driver","com.mysql.jdbc.Driver").\
option("url", "jdbc:mysql://172.16.100.42:4000/bigdata?useSSL=false&useUnicode=true").\
option("dbtable", "movie_data").\
option("user", "root").\
option("password", "84^j1TZ-+2b9A@S7Hs").\
save()
df2 = spark.read.format("jdbc"). \
option("url", "jdbc:mysql://172.16.100.42:4000/bigdata?useSSL=false&useUnicode=true"). \
option("dbtable", "movie_data"). \
option("user", "root"). \
option("password", "84^j1TZ-+2b9A@S7Hs"). \
load()
df2.printSchema()
df2.show()
spark.stop()
# """
# JDBC写出, 会自动创建表的.
# 因为DataFrame中有表结构信息, StructType记录的 各个字段的 名称 类型 和是否运行为空
# 注意:
# jdbc连接字符串中,建议使用 useSSL=false 确保连接可以正常连接(不使用SSL安全协议进行连接)
# jdbc连接字符串中,建议使用 useUnicode=true 来确保传输中不出现乱码
# save()不要填参数,没有路径,是写出数据库
# dbtable属性: 指定写出的表名
# """