# 使用数据分析综合案例中的两张数据表的数据使用python写入数据库
# CREATE DATABASE py_sql CHARSET utf8;
# CREATE TABLE orders(order_date date,order_id varchar(255),money int,province VARCHAR(10));
# 数据定义的类
class Record:
def __init__(self, date, order_id, money, province):
self.date = date # 订单日期
self.order_id = order_id # 订单ID
self.money = money # 订单金额
self.province = province # 销售省份
def __str__(self):
return f"{self.date}, {self.order_id}, {self.money}, {self.province}"
"""
定义文件相关的类
"""
# 导包之前定义的数据类
import json
# 先定义一个抽象类来做顶层设计,确定有哪些功能需要实现
class FileReader:
def read_data(self) -> list[Record]:
"""读取文件的数据,读取到的每一条数据都转换为Record对象,将它们都封装到list内返回即可"""
pass
# 读取文件,生产数据对象
class TextFileReader(FileReader):
def __init__(self, path):
self.path = path # 定义成员变量记录文件的路径
# 复写(实现抽象方法)父类的方法
def read_data(self) -> list[Record]:
f = open(self.path, "r", encoding="UTF-8")
record_list = []
for line in f.readlines():
line = line.strip() # 消除读取到的每一行数据中的\n
data_list = line.split(",")
record = Record(data_list[0], data_list[1], int(data_list[2]), data_list[3])
record_list.append(record)
f.close()
# print(record_list)
return record_list
class JsonFileReader(FileReader):
def __init__(self, path):
self.path = path # 定义成员变量记录文件的路径
# 复写(实现抽象方法)父类的方法
def read_data(self) -> list[Record]:
f = open(self.path, "r", encoding="UTF-8")
record_list = []
for line in f.readlines():
data_dict = json.loads(line)
record = Record(data_dict["date"], data_dict["order_id"], int(data_dict["money"]), data_dict["province"])
record_list.append(record)
f.close()
return record_list
# # 测试
# if __name__ == '__main__':
# text_file_reader = TextFileReader('./bak/数据分析案例/2011年1月销售数据.txt')
# json_file_reader = JsonFileReader('./bak/数据分析案例/2011年2月销售数据JSON.txt')
# list1 = text_file_reader.read_data()
# list2 = json_file_reader.read_data()
# for l in list1:
# print(l)
# for l2 in list2:
# print(l2)
"""
计算数据
"""
# 进行数据需求的逻辑计算(计算每一天的销售额)
import sys
sys.dont_write_bytecode = True
from pymysql import connect
from cryptography import *
text_file_reader = TextFileReader('./bak/数据分析案例/2011年1月销售数据.txt')
json_file_reader = JsonFileReader('./bak/数据分析案例/2011年2月销售数据JSON.txt')
jan_data: list[Record] = text_file_reader.read_data()
feb_data: list[Record] = json_file_reader.read_data()
# 装2个月份的数据合并为1个list来存储
all_data: list[Record] = jan_data + feb_data
# 获取到MySQL数据库的链接对象
conn = connect(
host='home.vimll.com',
port=xxxx,
user='xxxx',
password='xxxx',
autocommit=True # 使用autocommit参数来自动提交事务
)
# 打印MySQL数据库软件信息
print(conn.get_server_info())
# 获取游标对象
cursor = conn.cursor()
# 选择数据库
conn.select_db("py_sql")
# 组织sql语句
for record in all_data:
sql = f"insert into orders(order_date, order_id, money, province) \
values('{record.date}', '{record.order_id}', '{record.money}', '{record.province}')"
print(sql)
cursor.execute(sql)
# 通过commit提交事务
# conn.commit()
# 关闭到数据库的链接
conn.close()
### 从py_sql库里读取数据存人json文件
# 进行数据需求的逻辑计算(计算每一天的销售额)
import sys
sys.dont_write_bytecode = True
from pymysql import connect
from cryptography import *
import json
f = open("./bak/数据分析案例/new.txt", "a", encoding="utf-8")
# 装2个月份的数据合并为1个list来存储
all_data: list[Record] = jan_data + feb_data
# 获取到MySQL数据库的链接对象
conn = connect(
host='home.vimll.com',
port=xxx,
user='xxxx',
password='xxxx',
autocommit=True # 使用autocommit参数来自动提交事务
)
# 打印MySQL数据库软件信息
print(conn.get_server_info())
# 获取游标对象
cursor = conn.cursor()
# 选择数据库
conn.select_db("py_sql")
# 组织sql语句
cursor.execute("select * from orders")
results = cursor.fetchall()
sale_data_dict={"order_date":"none","order_id":"none","money":0,"province":"none"}
for r in results:
sale_data_dict["order_date"]=r[0].strftime('%Y-%m-%d')
# 这里需要使用datatime.data.strftime函数可将数据库表中的'datatime.data(2011,1,12)'时间日期转换为'2011-1-12'格式
sale_data_dict["order_id"]=r[1]
sale_data_dict["money"]=r[2]
sale_data_dict["province"]=r[3]
# print(sale_data_dict)
f.write(str(sale_data_dict))
# f.write(str(sale_data_dict))也可以使用json.dumps(sale_data_dict)以json格式写入文件中,不过应用json.dumps()方法时需注意utf-8编码会改变中文字符的输出形式 列如:"湖南省"会变成"u6e56\u5357\u7701"这种utf-8对应的字符串的形式输出
f.write("\n")
# 关闭文件
f.close()
# 关闭到数据库的链接
conn.close()