from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Enum, ForeignKey, UniqueConstraint, ForeignKeyConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from urllib.parse import quote_plus as urlquote
host = "home.vimll.com"
port = xxxx
user = "xxxx"
password = 'xxxxx'
db_name = "flask"
con_info = f'mysql+pymysql://{user}:{urlquote(password)}@{host}:{port}/{
db_name}?charset=utf8'
egine = create_engine(con_info, max_overflow=5)
Base = declarative_base()
# 创建单表:业务线
class Business(Base):
__tablename__ = 'business'
id = Column(Integer, primary_key=True, autoincrement=True)
bname = Column(String(32), nullable=False, index=True)
# 多对一:多个服务可以属于一个业务线,多个业务线不能包含同一个服务
class Service(Base):
__tablename__ = 'service'
id = Column(Integer, primary_key=True, autoincrement=True)
sname = Column(String(32), nullable=False, index=True)
ip = Column(String(15), nullable=False)
port = Column(Integer, nullable=False)
business_id = Column(Integer, ForeignKey('business.id'))
__table_args__ = (
UniqueConstraint(ip, port, name='uix_ip_port'),
Index('ix_id_sname', id, sname)
)
# 一对一:一种角色只能管理一条业务线,一条业务线只能被一种角色管理
class Role(Base):
__tablename__ = 'role'
id = Column(Integer, primary_key=True, autoincrement=True)
rname = Column(String(32), nullable=False, index=True)
priv = Column(String(64), nullable=False)
business_id = Column(Integer, ForeignKey('business.id'), unique=True)
# 多对多:多个用户可以是同一个role,多个role可以包含同一个用户
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
uname = Column(String(32), nullable=False, index=True)
class Users2Role(Base):
__tablename__ = 'users2role'
id = Column(Integer, primary_key=True, autoincrement=True)
uid = Column(Integer, ForeignKey('users.id'))
rid = Column(Integer, ForeignKey('role.id'))
__table_args__ = (
UniqueConstraint(uid, rid, name='uix_uid_rid'),
)
# 多对一:假设多个员工可以属于一个部门,而多个部门不能有同一个员工(只有创建公司才把员工当骆驼用,一个员工身兼数职)
class Dep(Base):
__tablename__ = 'dep'
id = Column(Integer, primary_key=True, autoincrement=True)
dname = Column(String(64), nullable=False, index=True)
class Emp(Base):
__tablename__ = 'emp'
id = Column(Integer, primary_key=True, autoincrement=True)
ename = Column(String(32), nullable=False, index=True)
dep_id = Column(Integer, ForeignKey('dep.id'))
#在ForeignKey所在的类内添加relationship的字段,注意:
#1:Dep是类名
#2:depart字段不会再数据库表中生成字段
#3:depart用于Emp表查询Dep表(正向查询),而xxoo用于Dep表查询Emp表(反向查询),
depart=relationship('Dep',backref='xxoo')
def init_db():
Base.metadata.create_all(egine)
def drop_db():
Base.metadata.drop_all(egine)
Session = sessionmaker(bind=egine)
session = Session()
# 准备数据
session.add_all([
Dep(dname='技术'),
Dep(dname='销售'),
Dep(dname='运营'),
Dep(dname='人事'),
])
session.add_all([
Emp(ename='林海峰', dep_id=1),
Emp(ename='李杰', dep_id=1),
Emp(ename='武配齐', dep_id=1),
Emp(ename='元昊', dep_id=2),
Emp(ename='李钢弹', dep_id=3),
Emp(ename='张二丫', dep_id=4),
Emp(ename='李坦克', dep_id=2),
Emp(ename='王大炮', dep_id=4),
Emp(ename='牛榴弹', dep_id=3)
])
if __name__ == '__main__':
drop_db()
init_db()
# 增
row_obj = Dep(dname='销售') # 按关键字传参,无需指定id,因其是自增长的
session.add(row_obj)
session.add_all([
Dep(dname='技术'),
Dep(dname='运营'),
Dep(dname='人事'),
])
session.commit()
# 删
session.query(Dep).filter(Dep.id > 3).delete()
session.commit()
# 改
session.query(Dep).filter(Dep.id == 1).update({'dname': '哇哈哈'})
session.query(Dep).filter(Dep.id > 0).update(
{'dname': Dep.dname+'_HAHA'}, synchronize_session=False)
session.query(Dep).filter(Dep.id > 0).update(
{'id': Dep.id*100}, synchronize_session='evaluate')
session.commit()
# 查所有,取所有字段
res = session.query(Dep).all()
for row in res:
print(row.id, row.dname)
# 查所有,取指定字段
res = session.query(Dep.dname).order_by(Dep.id).all()
for row in res:
print(row.dname)
# 查询一条数据
res = session.query(Dep.dname).first()
print(res)
# 过滤查
res = session.query(Dep).filter(Dep.id > 1, Dep.id < 1000) # 逗号分隔,默认为and
print([(row.id, row.dname) for row in res])
print("-----复杂查询----" * 5)
# 一、条件
sql = session.query(Emp).filter_by(ename='林海峰') # filter_by只能传参数:什么等于什么
res = sql.all() # sql语句的执行结果
res = session.query(Emp).filter(Emp.id > 0, Emp.ename ==
'林海峰').all() # filter内传的是表达式,逗号分隔,默认为and,
res = session.query(Emp).filter(
Emp.id.between(1, 3), Emp.ename == '林海峰').all()
res = session.query(Emp).filter(Emp.id.in_(
[1, 3, 99, 101]), Emp.ename == '林海峰').all()
res = session.query(Emp).filter(~Emp.id.in_(
[1, 3, 99, 101]), Emp.ename == '林海峰') # ~代表取反,转换成sql就是关键字not
from sqlalchemy import and_, or_
res = session.query(Emp).filter(and_(Emp.id > 0, Emp.ename == '林海峰')).all()
res = session.query(Emp).filter(or_(Emp.id < 2, Emp.ename == '功夫熊猫')).all()
res = session.query(Emp).filter(
or_(
Emp.dep_id == 3,
and_(Emp.id > 1, Emp.ename == '功夫熊猫'),
Emp.ename != ''
)
).all()
# 二、通配符
res = session.query(Emp).filter(Emp.ename.like('%海_%')).all()
res = session.query(Emp).filter(~Emp.ename.like('%海_%')).all()
# 三、limit
res = session.query(Emp)[0:5:2]
# 四、排序
res = session.query(Emp).order_by(Emp.dep_id.desc()).all()
res = session.query(Emp).order_by(Emp.dep_id.desc(), Emp.id.asc()).all()
# 五、分组
from sqlalchemy.sql import func
res = session.query(Emp.dep_id).group_by(Emp.dep_id).all()
res = session.query(
func.max(Emp.dep_id),
func.min(Emp.dep_id),
func.sum(Emp.dep_id),
func.avg(Emp.dep_id),
func.count(Emp.dep_id),
).group_by(Emp.dep_id).all()
res = session.query(
Emp.dep_id,
func.count(1),
).group_by(Emp.dep_id).having(func.count(1) > 2).all()
# 六、连表
# 笛卡尔积
res = session.query(Emp, Dep).all() # select * from emp,dep;
# where条件
res = session.query(Emp, Dep).filter(Emp.dep_id == Dep.id).all()
# for row in res:
# emp_tb=row[0]
# dep_tb=row[1]
# print(emp_tb.id,emp_tb.ename,dep_tb.id,dep_tb.dname)
# 内连接
res = session.query(Emp).join(Dep)
# join默认为内连接,SQLAlchemy会自动帮我们通过foreign key字段去找关联关系
# 但是上述查询的结果均为Emp表的字段,这样链表还有毛线意义,于是我们修改为
res = session.query(Emp.id, Emp.ename, Emp.dep_id,
Dep.dname).join(Dep).all()
# 左连接:isouter=True
res = session.query(Emp.id, Emp.ename, Emp.dep_id,
Dep.dname).join(Dep, isouter=True).all()
# 右连接:同左连接,只是把两个表的位置换一下
# 七、组合
q1 = session.query(Emp.id, Emp.ename).filter(Emp.id > 0, Emp.id < 5)
q2 = session.query(Emp.id, Emp.ename).filter(
or_(
Emp.ename.like('%海%'),
Emp.ename.like('%昊%'),
)
)
res1 = q1.union(q2) # 组合+去重
res2 = q1.union_all(q2) # 组合,不去重
print([i.ename for i in q1.all()]) # ['林海峰', '李杰', '武配齐', '元昊']
print([i.ename for i in q2.all()]) # ['林海峰', '元昊']
print([i.ename for i in res1.all()]) # ['林海峰', '李杰', '武配齐', '元昊']
# ['林海峰', '李杰', '武配齐', '元昊', '元昊', '林海峰']
print([i.ename for i in res2.all()])
# 子查询
# 有三种形式的子查询,注意:子查询的sql必须用括号包起来,尤其在形式三中需要注意这一点
#示例:查出id大于2的员工,当做子查询的表使用
#原生SQL:
# select * from (select * from emp where id > 2);
#ORM:
res=session.query(
session.query(Emp).filter(Emp.id > 8).subquery()
).all()
#示例:#查出销售部门的员工姓名
#原生SQL:
# select ename from emp where dep_id in (select id from dep where dname='销售');
#ORM:
res=session.query(Emp.ename).filter(Emp.dep_id.in_(
session.query(Dep.id).filter_by(dname='销售'), #传的是参数
# session.query(Dep.id).filter(Dep.dname=='销售') #传的是表达式
)).all()
#示例:查询所有的员工姓名与部门名
#原生SQL:
# select ename as 员工姓名,(select dname from dep where id = emp.dep_id) as 部门名 from emp;
#ORM:
sub_sql=session.query(Dep.dname).filter(Dep.id==Emp.dep_id) #SELECT dep.dname FROM dep, emp WHERE dep.id = emp.dep_id
sub_sql.as_scalar() #as_scalar的功能就是把上面的sub_sql加上了括号
res=session.query(Emp.ename,sub_sql.as_scalar()).all()
# 标准连表查询
# 示例:查询员工名与其部门名
res=session.query(Emp.ename,Dep.dname).join(Dep) #迭代器
for row in res:
print(row[0],row[1]) #等同于print(row.ename,row.dname)
# 基于relationship的正查、反查
#SQLAlchemy的relationship在内部帮我们做好表的链接
#查询员工名与其部门名(正向查)
res=session.query(Emp)
for row in res:
print(row.ename,row.id,row.depart.dname)
#查询部门名以及该部门下的员工(反向查)
res=session.query(Dep)
for row in res:
# print(row.dname,row.xxoo)
print(row.dname,[r.ename for r in row.xxoo])