Flask 登录接口案例

常用工具包\Python_Pyjwt_Flask.py

"""
vue3_admin 自定义后端接口服务
"""
import paramiko
from urllib.parse import quote_plus as urlquote
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
from flask import request, jsonify
import jwt
import json
from flask import request, jsonify
from datetime import timedelta, datetime
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, Response
from flask_cors import CORS, cross_origin
import sys
sys.dont_write_bytecode = True

# 模型部分
app = Flask(__name__)

cors_config = {
    # 'origins': ['http://127.0.0.1:5000', 'http://168.12.1.50:5000', 'http://localhost:5173'],
    'origins': "*",
    # 'allow_headers': ['Content-Type', 'Authorization', 'Token'],   # 允许的请求头
    'allow_headers': "*",
    'expose_headers': ['X-My-Custom-Header']  # 设置允许客户端访问的响应头
}
CORS(app, resources={r"/api/*": cors_config}, supports_credentials=True)
# CORS(app, resources={r"/api/*": {"origins": "*"}}, supports_credentials=True)

"""
create user xxxxxx@'%' identified by 'xxxxxxxxxxxxxx';
GRANT  all privileges  ON flask.* TO `python`@`%` ;
FLUSH PRIVILEGES;
"""

host = "home.vimll.com"
port = xxxxxxxxx
user = "xxxxxx"
password = 'xxxxxxxxx'
db_name = "flask"
con_info = f'mysql+pymysql://{user}:{urlquote(password)}@{host}:{port}/{
    db_name}?charset=utf8'

# 设置连接数据库的URL
app.config['SQLALCHEMY_DATABASE_URI'] = con_info
# print(app.config['SQLALCHEMY_DATABASE_URI'])
# 设置每次请求结束后会自动提交数据库中的改动
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
    'pool_pre_ping': True  # Enable pre-ping to check for stale connections
}
app.app_context().push()  # Push an application context for thread-local data
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 查询时会显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
# 指定JSON_AS_ASCII的参数设置为False,阻止jsonify将json内容转为ASCII进行返回
# app.config['JSON_AS_ASCII'] = False  # 已弃用
app.json.ensure_ascii = False

db = SQLAlchemy(app)

# 创建自增序列
autoincrement_seq = db.Sequence('autoincrement_seq', start=1)

# 角色表
class Groups(db.Model):
    # 定义表名
    __tablename__ = 'groups'
    # 定义列对象
    id = db.Column(db.Integer, autoincrement_seq, primary_key=True,)
    gid = db.Column(db.Integer, unique=True)
    group_name = db.Column(db.String(64), unique=True)
    create_time = db.Column(
        db.DateTime, default=datetime.now, comment='创建时间')
    update_time = db.Column(
        db.DateTime, default=datetime.now, comment='更新时间')
    remark = db.Column(db.String(128), default='')

    def to_dict(self):
        return {
            'id': self.id,
            'gid': self.gid,
            'group_name': self.group_name,
            'remark': self.remark
        }

    # repr()方法显示一个可读字符串
    def __repr__(self):
        return 'Groups: %s' % self.group_name

# 岗位职责表
class Roles(db.Model):
    # 定义表名
    __tablename__ = 'roles'
    # 定义列对象
    id = db.Column(db.Integer, autoincrement_seq, primary_key=True,)
    role_name = db.Column(db.String(64), unique=True)
    acls = db.Column(db.JSON)
    create_time = db.Column(
        db.DateTime, default=datetime.now, comment='创建时间')
    update_time = db.Column(
        db.DateTime, default=datetime.now, comment='更新时间')
    remark = db.Column(db.String(128), default='')

    def to_dict(self):
        return {
            'id': self.id,
            'role_name': self.role_name,
            'acls': self.acls,
            'remark': self.remark,
            'create_time': self.create_time.strftime(
                '%Y-%m-%d %H:%M:%S') if self.create_time else None,
            'update_time': self.update_time.strftime(
                '%Y-%m-%d %H:%M:%S') if self.update_time else None,
        }

    # repr()方法显示一个可读字符串
    def __repr__(self):
        return 'Roles: %s' % self.role_name

# 用户表
class XUsers(db.Model):
    # 定义表名
    __tablename__ = 'xusers'

    id = db.Column(db.Integer, autoincrement_seq, primary_key=True)
    username = db.Column(db.String(80), comment='登录名')
    nickName = db.Column(db.String(180), comment='用户昵称')
    password = db.Column(db.String(120), comment='密码')
    token = db.Column(db.String(255), default='', comment='Token')
    state = db.Column(db.String(120), default='正常', comment='用户状态')
    userLevel = db.Column(db.Integer, default=1, nullable=False)
    role = db.Column(db.String(180), default='')
    group_id = db.Column(db.Integer, db.ForeignKey('groups.gid'))
    avatar = db.Column(db.String(200), comment='用户头像')
    create_time = db.Column(
        db.DateTime, default=datetime.now, comment='创建时间')
    update_time = db.Column(
        db.DateTime, default=datetime.now, comment='更新时间')

    groups_depart = db.relationship('Groups', backref='groups_to_xusers')

    # __table_args__的参数必须是元组的形式
    __table_args__ = (
        db.UniqueConstraint(username, name='uni_name'),  # 用户名和密码不能重复!
    )

    def to_dict(self):
        return {
            'id': self.id,
            'username': self.username,
            'nickName': self.nickName,
            'state': self.state,
            'userLevel': self.userLevel,
            'role': self.role,
            'group_id': self.group_id,
            'group_name': self.groups_depart.group_name,
            'avatar': self.avatar,
            'create_time': self.create_time.strftime(
                '%Y-%m-%d %H:%M:%S') if self.create_time else None,
            'update_time': self.update_time.strftime(
                '%Y-%m-%d %H:%M:%S') if self.update_time else None,
        }

# 生成token 与解密 token 和 拦截器
SECRET_KEY = 'xxxxxxxxxxxxxxxxxxxx'

# 创建token
def generate_token(user_id):
    try:
        payload = {
            'exp': datetime.now() + timedelta(days=30),  # 过期时间
            'iat': datetime.now().timestamp(),  # 开始时间
            'sub': user_id
        }
        # print("创建token", payload['exp'])
        # print("创建token", payload['iat'])
        return jwt.encode(
            payload,
            SECRET_KEY,  # 替换为你的密钥
            algorithm='HS256'
        )
    except Exception as e:
        return e

# 解析token
def decode_token(token):
    """ 解码Token并处理异常 """
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
        # print('解析token', payload)
        return payload['sub']  # 返回成功标志和用户ID
    except jwt.ExpiredSignatureError as e:
        print(e)
        return False
    except jwt.InvalidTokenError as e:
        print(e)
        return False

# 查询用户状态
def check_token_and_user_status(token):
    try:
        user_id = decode_token(token)
        token_in_user = XUsers.query.filter_by(token=token).first()

        # 根据是否开启多端登陆判断token是否有效
        multi_device_login = True
        if not multi_device_login and not token_in_user:
            return False, 'token过期'

        if user_id:
            user = XUsers.query.get(user_id)
            if user:
                if user.state == '正常':
                    return True, user
                else:
                    return False, '该用户状态异常!'
            else:
                return False, '用户不存在'
        else:
            return False, 'token无效'
    except Exception as e:
        print('check_token_and_user_status error', e)
        return False, 'token已重置,请重新登录!'

# 获取用户信息token验证
def user_token_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        token = request.headers.get('Token')
        # print(request.headers)
        # print('Token',token)
        if not token:
            return jsonify({
                'code': 401,
                'data': {
                    'message': 'token不存在',
                    'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                },
            })

        user_state, user_info = check_token_and_user_status(token)
        if not user_state:
            return jsonify({
                'code': 401,
                'data': {
                    'message': user_info,
                    'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                },
            })
        return f(*args, **kwargs, user_state=user_state, user_info=user_info)

    return decorated_function

# 通用token验证
def general_token_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        token = request.headers.get('Token')
        # print(request.headers)
        # print('Token',token)
        if not token:
            return jsonify({
                'code': 401,
                'data': {
                    'message': 'token不存在',
                    'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                },
            })

        user_state, user_info = check_token_and_user_status(token)
        if not user_state:
            return jsonify({
                'code': 401,
                'data': {
                    'message': user_info,
                    'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                },
            })
        return f(*args, **kwargs)

    return decorated_function

# 登陆
@app.route('/api/user/login', methods=['POST'])
@cross_origin()
def user_login():
    try:
        data = request.json
        # print(data)
        username = data.get('username')
        password = data.get('password')
        user = XUsers.query.filter_by(username=username).first()
        # print('user.username', user.username, user.state)
        if not user:
            return jsonify({
                'code': 403,
                'data': {
                    'message': '用户账号或密码错误',
                    'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                },
            })
        else:
            if user.password == password and user.state == '正常':
                if len(user.token) > 1:
                    user_id = decode_token(user.token)
                    # token未过期
                    if user_id:
                        token = user.token
                    else:
                        token = generate_token(user.id)
                else:
                    token = generate_token(user.id)
                user.token = token
                db.session.commit()
                return jsonify({
                    'code': 200,
                    'data': {
                        'token': token,
                        'username': user.username,
                        'avatar': user.avatar,
                        'message': '登陆成功',
                        'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    },
                })
            else:
                return jsonify({
                    'code': 403,
                    'data': {
                        'message': '账号或密码错误!',
                        'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    },
                })
    except Exception as e:
        print('user_login error', e)
        return jsonify({
            'code': 401,
            'data': {
                'message': '用户异常,请重新登录!',
                'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            },
        })

# 退出登录
@app.route('/api/user/logout', methods=['POST'])
@cross_origin()
def user_logout():
    # token = request.headers.get('Token')
    data = request.json
    # print("退出登录", data)
    token = data.get('token')

    # print("退出登录",token)
    user_id = decode_token(token)
    # print("退出登录",user_id)
    user = XUsers.query.filter_by(id=user_id).first()
    if not user:
        return jsonify({
            'code': 403,
            'data': {
                'message': '用户异常,请重新登录!',
                'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            },
        })
    else:
        #user.token = ''
        #db.session.commit()
        return jsonify({
            'code': 200,
            'data': {
                'message': '用户已登出',
                'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            },
        })

# 启动
if __name__ == '__main__':
    import logging

    logger = logging.getLogger()
    logger.setLevel('DEBUG')
    # 调试模式格式
    DEBUG_LOG_FORMAT = (
        '-' * 80 + '\n' +
        '%(levelname)s in %(module)s [%(pathname)s:%(lineno)d]:\n' +
        '%(message)s\n' +
        '-' * 80
    )
    # 非调试模式格式
    PROD_LOG_FORMAT = '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'

    # DATE_FORMAT = '%y-%m-%d %H:%M:%S'
    DATE_FORMAT = "[%Y-%m-%d %H:%M:%S %z]"
    formatter = logging.Formatter(DEBUG_LOG_FORMAT, DATE_FORMAT)
    chlr = logging.StreamHandler()
    chlr.setFormatter(formatter)
    logger.addHandler(chlr)
    # 将记录器设置为Flask应用程序的日志记录器
    app.logger = logger

    gunicorn_logger = logging.getLogger('gunicorn.error')
    app.logger.handlers = gunicorn_logger.handlers
    app.logger.setLevel(gunicorn_logger.level)

    app.run(host="0.0.0.0", port=5000, debug=True)