Python_sqlalchemy

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Enum, ForeignKey, UniqueConstraint, ForeignKeyConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from urllib.parse import quote_plus as urlquote

host = "home.vimll.com"
port = xxxx
user = "xxxx"
password = 'xxxxx'
db_name = "flask"
con_info = f'mysql+pymysql://{user}:{urlquote(password)}@{host}:{port}/{
    db_name}?charset=utf8'

egine = create_engine(con_info, max_overflow=5)

Base = declarative_base()

# 创建单表:业务线
class Business(Base):
    __tablename__ = 'business'
    id = Column(Integer, primary_key=True, autoincrement=True)
    bname = Column(String(32), nullable=False, index=True)

# 多对一:多个服务可以属于一个业务线,多个业务线不能包含同一个服务
class Service(Base):
    __tablename__ = 'service'
    id = Column(Integer, primary_key=True, autoincrement=True)
    sname = Column(String(32), nullable=False, index=True)
    ip = Column(String(15), nullable=False)
    port = Column(Integer, nullable=False)

    business_id = Column(Integer, ForeignKey('business.id'))

    __table_args__ = (
        UniqueConstraint(ip, port, name='uix_ip_port'),
        Index('ix_id_sname', id, sname)
    )

# 一对一:一种角色只能管理一条业务线,一条业务线只能被一种角色管理
class Role(Base):
    __tablename__ = 'role'
    id = Column(Integer, primary_key=True, autoincrement=True)
    rname = Column(String(32), nullable=False, index=True)
    priv = Column(String(64), nullable=False)

    business_id = Column(Integer, ForeignKey('business.id'), unique=True)

# 多对多:多个用户可以是同一个role,多个role可以包含同一个用户
class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True, autoincrement=True)
    uname = Column(String(32), nullable=False, index=True)

class Users2Role(Base):
    __tablename__ = 'users2role'
    id = Column(Integer, primary_key=True, autoincrement=True)
    uid = Column(Integer, ForeignKey('users.id'))
    rid = Column(Integer, ForeignKey('role.id'))

    __table_args__ = (
        UniqueConstraint(uid, rid, name='uix_uid_rid'),
    )

# 多对一:假设多个员工可以属于一个部门,而多个部门不能有同一个员工(只有创建公司才把员工当骆驼用,一个员工身兼数职)
class Dep(Base):
    __tablename__ = 'dep'
    id = Column(Integer, primary_key=True, autoincrement=True)
    dname = Column(String(64), nullable=False, index=True)

class Emp(Base):
    __tablename__ = 'emp'
    id = Column(Integer, primary_key=True, autoincrement=True)
    ename = Column(String(32), nullable=False, index=True)
    dep_id = Column(Integer, ForeignKey('dep.id'))
    #在ForeignKey所在的类内添加relationship的字段,注意:
    #1:Dep是类名
    #2:depart字段不会再数据库表中生成字段
    #3:depart用于Emp表查询Dep表(正向查询),而xxoo用于Dep表查询Emp表(反向查询),
    depart=relationship('Dep',backref='xxoo')

def init_db():
    Base.metadata.create_all(egine)

def drop_db():
    Base.metadata.drop_all(egine)

Session = sessionmaker(bind=egine)
session = Session()

# 准备数据
session.add_all([
    Dep(dname='技术'),
    Dep(dname='销售'),
    Dep(dname='运营'),
    Dep(dname='人事'),
])

session.add_all([
    Emp(ename='林海峰', dep_id=1),
    Emp(ename='李杰', dep_id=1),
    Emp(ename='武配齐', dep_id=1),
    Emp(ename='元昊', dep_id=2),
    Emp(ename='李钢弹', dep_id=3),
    Emp(ename='张二丫', dep_id=4),
    Emp(ename='李坦克', dep_id=2),
    Emp(ename='王大炮', dep_id=4),
    Emp(ename='牛榴弹', dep_id=3)
])

if __name__ == '__main__':
    drop_db()
    init_db()

    # 增
    row_obj = Dep(dname='销售')  # 按关键字传参,无需指定id,因其是自增长的
    session.add(row_obj)
    session.add_all([
        Dep(dname='技术'),
        Dep(dname='运营'),
        Dep(dname='人事'),
    ])
    session.commit()

    # 删
    session.query(Dep).filter(Dep.id > 3).delete()
    session.commit()

    # 改
    session.query(Dep).filter(Dep.id == 1).update({'dname': '哇哈哈'})
    session.query(Dep).filter(Dep.id > 0).update(
        {'dname': Dep.dname+'_HAHA'}, synchronize_session=False)
    session.query(Dep).filter(Dep.id > 0).update(
        {'id': Dep.id*100}, synchronize_session='evaluate')
    session.commit()

    # 查所有,取所有字段
    res = session.query(Dep).all()
    for row in res:
        print(row.id, row.dname)

    # 查所有,取指定字段
    res = session.query(Dep.dname).order_by(Dep.id).all()
    for row in res:
        print(row.dname)

    # 查询一条数据
    res = session.query(Dep.dname).first()
    print(res)

    # 过滤查
    res = session.query(Dep).filter(Dep.id > 1, Dep.id < 1000)  # 逗号分隔,默认为and
    print([(row.id, row.dname) for row in res])

    print("-----复杂查询----" * 5)
    # 一、条件
    sql = session.query(Emp).filter_by(ename='林海峰')  # filter_by只能传参数:什么等于什么
    res = sql.all()  # sql语句的执行结果

    res = session.query(Emp).filter(Emp.id > 0, Emp.ename ==
                                    '林海峰').all()  # filter内传的是表达式,逗号分隔,默认为and,
    res = session.query(Emp).filter(
        Emp.id.between(1, 3), Emp.ename == '林海峰').all()
    res = session.query(Emp).filter(Emp.id.in_(
        [1, 3, 99, 101]), Emp.ename == '林海峰').all()
    res = session.query(Emp).filter(~Emp.id.in_(
        [1, 3, 99, 101]), Emp.ename == '林海峰')  # ~代表取反,转换成sql就是关键字not

    from sqlalchemy import and_, or_
    res = session.query(Emp).filter(and_(Emp.id > 0, Emp.ename == '林海峰')).all()
    res = session.query(Emp).filter(or_(Emp.id < 2, Emp.ename == '功夫熊猫')).all()
    res = session.query(Emp).filter(
        or_(
            Emp.dep_id == 3,
            and_(Emp.id > 1, Emp.ename == '功夫熊猫'),
            Emp.ename != ''
        )
    ).all()

    # 二、通配符
    res = session.query(Emp).filter(Emp.ename.like('%海_%')).all()
    res = session.query(Emp).filter(~Emp.ename.like('%海_%')).all()

    # 三、limit
    res = session.query(Emp)[0:5:2]

    # 四、排序
    res = session.query(Emp).order_by(Emp.dep_id.desc()).all()
    res = session.query(Emp).order_by(Emp.dep_id.desc(), Emp.id.asc()).all()

    # 五、分组
    from sqlalchemy.sql import func

    res = session.query(Emp.dep_id).group_by(Emp.dep_id).all()
    res = session.query(
        func.max(Emp.dep_id),
        func.min(Emp.dep_id),
        func.sum(Emp.dep_id),
        func.avg(Emp.dep_id),
        func.count(Emp.dep_id),
    ).group_by(Emp.dep_id).all()

    res = session.query(
        Emp.dep_id,
        func.count(1),
    ).group_by(Emp.dep_id).having(func.count(1) > 2).all()

    # 六、连表
    # 笛卡尔积
    res = session.query(Emp, Dep).all()  # select * from emp,dep;

    # where条件
    res = session.query(Emp, Dep).filter(Emp.dep_id == Dep.id).all()
    # for row in res:
    #     emp_tb=row[0]
    #     dep_tb=row[1]
    #     print(emp_tb.id,emp_tb.ename,dep_tb.id,dep_tb.dname)

    # 内连接
    res = session.query(Emp).join(Dep)
    # join默认为内连接,SQLAlchemy会自动帮我们通过foreign key字段去找关联关系
    # 但是上述查询的结果均为Emp表的字段,这样链表还有毛线意义,于是我们修改为
    res = session.query(Emp.id, Emp.ename, Emp.dep_id,
                        Dep.dname).join(Dep).all()

    # 左连接:isouter=True
    res = session.query(Emp.id, Emp.ename, Emp.dep_id,
                        Dep.dname).join(Dep, isouter=True).all()

    # 右连接:同左连接,只是把两个表的位置换一下

    # 七、组合
    q1 = session.query(Emp.id, Emp.ename).filter(Emp.id > 0, Emp.id < 5)
    q2 = session.query(Emp.id, Emp.ename).filter(
        or_(
            Emp.ename.like('%海%'),
            Emp.ename.like('%昊%'),
        )
    )
    res1 = q1.union(q2)  # 组合+去重
    res2 = q1.union_all(q2)  # 组合,不去重

    print([i.ename for i in q1.all()])  # ['林海峰', '李杰', '武配齐', '元昊']
    print([i.ename for i in q2.all()])  # ['林海峰', '元昊']
    print([i.ename for i in res1.all()])  # ['林海峰', '李杰', '武配齐', '元昊']
    # ['林海峰', '李杰', '武配齐', '元昊', '元昊', '林海峰']
    print([i.ename for i in res2.all()])

    # 子查询
    # 有三种形式的子查询,注意:子查询的sql必须用括号包起来,尤其在形式三中需要注意这一点

    #示例:查出id大于2的员工,当做子查询的表使用
    #原生SQL:
    # select * from (select * from emp where id > 2);
    #ORM:
    res=session.query(
        session.query(Emp).filter(Emp.id > 8).subquery()
    ).all()

    #示例:#查出销售部门的员工姓名
    #原生SQL:
    # select ename from emp where dep_id in (select id from dep where dname='销售');
    #ORM:
    res=session.query(Emp.ename).filter(Emp.dep_id.in_(
        session.query(Dep.id).filter_by(dname='销售'), #传的是参数
        # session.query(Dep.id).filter(Dep.dname=='销售') #传的是表达式
    )).all()

    #示例:查询所有的员工姓名与部门名
    #原生SQL:
    # select ename as 员工姓名,(select dname from dep where id = emp.dep_id) as 部门名 from emp;
    #ORM:
    sub_sql=session.query(Dep.dname).filter(Dep.id==Emp.dep_id) #SELECT dep.dname FROM dep, emp WHERE dep.id = emp.dep_id
    sub_sql.as_scalar() #as_scalar的功能就是把上面的sub_sql加上了括号

    res=session.query(Emp.ename,sub_sql.as_scalar()).all()

    # 标准连表查询
    # 示例:查询员工名与其部门名
    res=session.query(Emp.ename,Dep.dname).join(Dep) #迭代器
    for row in res:
        print(row[0],row[1]) #等同于print(row.ename,row.dname)

    # 基于relationship的正查、反查
    #SQLAlchemy的relationship在内部帮我们做好表的链接

    #查询员工名与其部门名(正向查)
    res=session.query(Emp)
    for row in res:
        print(row.ename,row.id,row.depart.dname)

    #查询部门名以及该部门下的员工(反向查)
    res=session.query(Dep)
    for row in res:
        # print(row.dname,row.xxoo)
        print(row.dname,[r.ename for r in row.xxoo])