常用工具包\Python_Pyjwt_Flask.py
"""
vue3_admin 自定义后端接口服务
"""
import paramiko
from urllib.parse import quote_plus as urlquote
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
from flask import request, jsonify
import jwt
import json
from flask import request, jsonify
from datetime import timedelta, datetime
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, Response
from flask_cors import CORS, cross_origin
import sys
sys.dont_write_bytecode = True
# 模型部分
app = Flask(__name__)
cors_config = {
# 'origins': ['http://127.0.0.1:5000', 'http://168.12.1.50:5000', 'http://localhost:5173'],
'origins': "*",
# 'allow_headers': ['Content-Type', 'Authorization', 'Token'], # 允许的请求头
'allow_headers': "*",
'expose_headers': ['X-My-Custom-Header'] # 设置允许客户端访问的响应头
}
CORS(app, resources={r"/api/*": cors_config}, supports_credentials=True)
# CORS(app, resources={r"/api/*": {"origins": "*"}}, supports_credentials=True)
"""
create user xxxxxx@'%' identified by 'xxxxxxxxxxxxxx';
GRANT all privileges ON flask.* TO `python`@`%` ;
FLUSH PRIVILEGES;
"""
host = "home.vimll.com"
port = xxxxxxxxx
user = "xxxxxx"
password = 'xxxxxxxxx'
db_name = "flask"
con_info = f'mysql+pymysql://{user}:{urlquote(password)}@{host}:{port}/{
db_name}?charset=utf8'
# 设置连接数据库的URL
app.config['SQLALCHEMY_DATABASE_URI'] = con_info
# print(app.config['SQLALCHEMY_DATABASE_URI'])
# 设置每次请求结束后会自动提交数据库中的改动
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_pre_ping': True # Enable pre-ping to check for stale connections
}
app.app_context().push() # Push an application context for thread-local data
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 查询时会显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
# 指定JSON_AS_ASCII的参数设置为False,阻止jsonify将json内容转为ASCII进行返回
# app.config['JSON_AS_ASCII'] = False # 已弃用
app.json.ensure_ascii = False
db = SQLAlchemy(app)
# 创建自增序列
autoincrement_seq = db.Sequence('autoincrement_seq', start=1)
# 角色表
class Groups(db.Model):
# 定义表名
__tablename__ = 'groups'
# 定义列对象
id = db.Column(db.Integer, autoincrement_seq, primary_key=True,)
gid = db.Column(db.Integer, unique=True)
group_name = db.Column(db.String(64), unique=True)
create_time = db.Column(
db.DateTime, default=datetime.now, comment='创建时间')
update_time = db.Column(
db.DateTime, default=datetime.now, comment='更新时间')
remark = db.Column(db.String(128), default='')
def to_dict(self):
return {
'id': self.id,
'gid': self.gid,
'group_name': self.group_name,
'remark': self.remark
}
# repr()方法显示一个可读字符串
def __repr__(self):
return 'Groups: %s' % self.group_name
# 岗位职责表
class Roles(db.Model):
# 定义表名
__tablename__ = 'roles'
# 定义列对象
id = db.Column(db.Integer, autoincrement_seq, primary_key=True,)
role_name = db.Column(db.String(64), unique=True)
acls = db.Column(db.JSON)
create_time = db.Column(
db.DateTime, default=datetime.now, comment='创建时间')
update_time = db.Column(
db.DateTime, default=datetime.now, comment='更新时间')
remark = db.Column(db.String(128), default='')
def to_dict(self):
return {
'id': self.id,
'role_name': self.role_name,
'acls': self.acls,
'remark': self.remark,
'create_time': self.create_time.strftime(
'%Y-%m-%d %H:%M:%S') if self.create_time else None,
'update_time': self.update_time.strftime(
'%Y-%m-%d %H:%M:%S') if self.update_time else None,
}
# repr()方法显示一个可读字符串
def __repr__(self):
return 'Roles: %s' % self.role_name
# 用户表
class XUsers(db.Model):
# 定义表名
__tablename__ = 'xusers'
id = db.Column(db.Integer, autoincrement_seq, primary_key=True)
username = db.Column(db.String(80), comment='登录名')
nickName = db.Column(db.String(180), comment='用户昵称')
password = db.Column(db.String(120), comment='密码')
token = db.Column(db.String(255), default='', comment='Token')
state = db.Column(db.String(120), default='正常', comment='用户状态')
userLevel = db.Column(db.Integer, default=1, nullable=False)
role = db.Column(db.String(180), default='')
group_id = db.Column(db.Integer, db.ForeignKey('groups.gid'))
avatar = db.Column(db.String(200), comment='用户头像')
create_time = db.Column(
db.DateTime, default=datetime.now, comment='创建时间')
update_time = db.Column(
db.DateTime, default=datetime.now, comment='更新时间')
groups_depart = db.relationship('Groups', backref='groups_to_xusers')
# __table_args__的参数必须是元组的形式
__table_args__ = (
db.UniqueConstraint(username, name='uni_name'), # 用户名和密码不能重复!
)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'nickName': self.nickName,
'state': self.state,
'userLevel': self.userLevel,
'role': self.role,
'group_id': self.group_id,
'group_name': self.groups_depart.group_name,
'avatar': self.avatar,
'create_time': self.create_time.strftime(
'%Y-%m-%d %H:%M:%S') if self.create_time else None,
'update_time': self.update_time.strftime(
'%Y-%m-%d %H:%M:%S') if self.update_time else None,
}
# 生成token 与解密 token 和 拦截器
SECRET_KEY = 'xxxxxxxxxxxxxxxxxxxx'
# 创建token
def generate_token(user_id):
try:
payload = {
'exp': datetime.now() + timedelta(days=30), # 过期时间
'iat': datetime.now().timestamp(), # 开始时间
'sub': user_id
}
# print("创建token", payload['exp'])
# print("创建token", payload['iat'])
return jwt.encode(
payload,
SECRET_KEY, # 替换为你的密钥
algorithm='HS256'
)
except Exception as e:
return e
# 解析token
def decode_token(token):
""" 解码Token并处理异常 """
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
# print('解析token', payload)
return payload['sub'] # 返回成功标志和用户ID
except jwt.ExpiredSignatureError as e:
print(e)
return False
except jwt.InvalidTokenError as e:
print(e)
return False
# 查询用户状态
def check_token_and_user_status(token):
try:
user_id = decode_token(token)
token_in_user = XUsers.query.filter_by(token=token).first()
# 根据是否开启多端登陆判断token是否有效
multi_device_login = True
if not multi_device_login and not token_in_user:
return False, 'token过期'
if user_id:
user = XUsers.query.get(user_id)
if user:
if user.state == '正常':
return True, user
else:
return False, '该用户状态异常!'
else:
return False, '用户不存在'
else:
return False, 'token无效'
except Exception as e:
print('check_token_and_user_status error', e)
return False, 'token已重置,请重新登录!'
# 获取用户信息token验证
def user_token_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Token')
# print(request.headers)
# print('Token',token)
if not token:
return jsonify({
'code': 401,
'data': {
'message': 'token不存在',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
user_state, user_info = check_token_and_user_status(token)
if not user_state:
return jsonify({
'code': 401,
'data': {
'message': user_info,
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
return f(*args, **kwargs, user_state=user_state, user_info=user_info)
return decorated_function
# 通用token验证
def general_token_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Token')
# print(request.headers)
# print('Token',token)
if not token:
return jsonify({
'code': 401,
'data': {
'message': 'token不存在',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
user_state, user_info = check_token_and_user_status(token)
if not user_state:
return jsonify({
'code': 401,
'data': {
'message': user_info,
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
return f(*args, **kwargs)
return decorated_function
# 登陆
@app.route('/api/user/login', methods=['POST'])
@cross_origin()
def user_login():
try:
data = request.json
# print(data)
username = data.get('username')
password = data.get('password')
user = XUsers.query.filter_by(username=username).first()
# print('user.username', user.username, user.state)
if not user:
return jsonify({
'code': 403,
'data': {
'message': '用户账号或密码错误',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
else:
if user.password == password and user.state == '正常':
if len(user.token) > 1:
user_id = decode_token(user.token)
# token未过期
if user_id:
token = user.token
else:
token = generate_token(user.id)
else:
token = generate_token(user.id)
user.token = token
db.session.commit()
return jsonify({
'code': 200,
'data': {
'token': token,
'username': user.username,
'avatar': user.avatar,
'message': '登陆成功',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
else:
return jsonify({
'code': 403,
'data': {
'message': '账号或密码错误!',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
except Exception as e:
print('user_login error', e)
return jsonify({
'code': 401,
'data': {
'message': '用户异常,请重新登录!',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
# 退出登录
@app.route('/api/user/logout', methods=['POST'])
@cross_origin()
def user_logout():
# token = request.headers.get('Token')
data = request.json
# print("退出登录", data)
token = data.get('token')
# print("退出登录",token)
user_id = decode_token(token)
# print("退出登录",user_id)
user = XUsers.query.filter_by(id=user_id).first()
if not user:
return jsonify({
'code': 403,
'data': {
'message': '用户异常,请重新登录!',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
else:
#user.token = ''
#db.session.commit()
return jsonify({
'code': 200,
'data': {
'message': '用户已登出',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
# 启动
if __name__ == '__main__':
import logging
logger = logging.getLogger()
logger.setLevel('DEBUG')
# 调试模式格式
DEBUG_LOG_FORMAT = (
'-' * 80 + '\n' +
'%(levelname)s in %(module)s [%(pathname)s:%(lineno)d]:\n' +
'%(message)s\n' +
'-' * 80
)
# 非调试模式格式
PROD_LOG_FORMAT = '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
# DATE_FORMAT = '%y-%m-%d %H:%M:%S'
DATE_FORMAT = "[%Y-%m-%d %H:%M:%S %z]"
formatter = logging.Formatter(DEBUG_LOG_FORMAT, DATE_FORMAT)
chlr = logging.StreamHandler()
chlr.setFormatter(formatter)
logger.addHandler(chlr)
# 将记录器设置为Flask应用程序的日志记录器
app.logger = logger
gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(gunicorn_logger.level)
app.run(host="0.0.0.0", port=5000, debug=True)