class Record:
def __init__(self, date, order_id, money, province):
self.date = date
self.order_id = order_id
self.money = money
self.province = province
def __str__(self):
return f"{self.date}, {self.order_id}, {self.money}, {self.province}"
"""
定义文件相关的类
"""
import json
class FileReader:
def read_data(self) -> list[Record]:
"""读取文件的数据,读取到的每一条数据都转换为Record对象,将它们都封装到list内返回即可"""
pass
class TextFileReader(FileReader):
def __init__(self, path):
self.path = path
def read_data(self) -> list[Record]:
f = open(self.path, "r", encoding="UTF-8")
record_list = []
for line in f.readlines():
line = line.strip()
data_list = line.split(",")
record = Record(data_list[0], data_list[1], int(data_list[2]), data_list[3])
record_list.append(record)
f.close()
return record_list
class JsonFileReader(FileReader):
def __init__(self, path):
self.path = path
def read_data(self) -> list[Record]:
f = open(self.path, "r", encoding="UTF-8")
record_list = []
for line in f.readlines():
data_dict = json.loads(line)
record = Record(data_dict["date"], data_dict["order_id"], int(data_dict["money"]), data_dict["province"])
record_list.append(record)
f.close()
return record_list
"""
计算数据
"""
import sys
sys.dont_write_bytecode = True
from pymysql import connect
from cryptography import *
text_file_reader = TextFileReader('./bak/数据分析案例/2011年1月销售数据.txt')
json_file_reader = JsonFileReader('./bak/数据分析案例/2011年2月销售数据JSON.txt')
jan_data: list[Record] = text_file_reader.read_data()
feb_data: list[Record] = json_file_reader.read_data()
all_data: list[Record] = jan_data + feb_data
conn = connect(
host='home.vimll.com',
port=xxxx,
user='xxxx',
password='xxxx',
autocommit=True
)
print(conn.get_server_info())
cursor = conn.cursor()
conn.select_db("py_sql")
for record in all_data:
sql = f"insert into orders(order_date, order_id, money, province) \
values('{record.date}', '{record.order_id}', '{record.money}', '{record.province}')"
print(sql)
cursor.execute(sql)
conn.close()
import sys
sys.dont_write_bytecode = True
from pymysql import connect
from cryptography import *
import json
f = open("./bak/数据分析案例/new.txt", "a", encoding="utf-8")
all_data: list[Record] = jan_data + feb_data
conn = connect(
host='home.vimll.com',
port=xxx,
user='xxxx',
password='xxxx',
autocommit=True
)
print(conn.get_server_info())
cursor = conn.cursor()
conn.select_db("py_sql")
cursor.execute("select * from orders")
results = cursor.fetchall()
sale_data_dict={"order_date":"none","order_id":"none","money":0,"province":"none"}
for r in results:
sale_data_dict["order_date"]=r[0].strftime('%Y-%m-%d')
sale_data_dict["order_id"]=r[1]
sale_data_dict["money"]=r[2]
sale_data_dict["province"]=r[3]
f.write(str(sale_data_dict))
f.write("\n")
f.close()
conn.close()