"""
vue3_admin 自定义后端接口服务
"""
import paramiko
from urllib.parse import quote_plus as urlquote
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
from flask import request, jsonify
import jwt
import json
from flask import request, jsonify
from datetime import timedelta, datetime
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, Response
from flask_cors import CORS, cross_origin
import sys
sys.dont_write_bytecode = True
app = Flask(__name__)
cors_config = {
'origins': "*",
'allow_headers': "*",
'expose_headers': ['X-My-Custom-Header']
}
CORS(app, resources={r"/api/*": cors_config}, supports_credentials=True)
"""
create user xxxxxx@'%' identified by 'xxxxxxxxxxxxxx';
GRANT all privileges ON flask.* TO `python`@`%` ;
FLUSH PRIVILEGES;
"""
host = "home.vimll.com"
port = xxxxxxxxx
user = "xxxxxx"
password = 'xxxxxxxxx'
db_name = "flask"
con_info = f'mysql+pymysql://{user}:{urlquote(password)}@{host}:{port}/{
db_name}?charset=utf8'
app.config['SQLALCHEMY_DATABASE_URI'] = con_info
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_pre_ping': True
}
app.app_context().push()
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True
app.json.ensure_ascii = False
db = SQLAlchemy(app)
autoincrement_seq = db.Sequence('autoincrement_seq', start=1)
class Groups(db.Model):
__tablename__ = 'groups'
id = db.Column(db.Integer, autoincrement_seq, primary_key=True,)
gid = db.Column(db.Integer, unique=True)
group_name = db.Column(db.String(64), unique=True)
create_time = db.Column(
db.DateTime, default=datetime.now, comment='创建时间')
update_time = db.Column(
db.DateTime, default=datetime.now, comment='更新时间')
remark = db.Column(db.String(128), default='')
def to_dict(self):
return {
'id': self.id,
'gid': self.gid,
'group_name': self.group_name,
'remark': self.remark
}
def __repr__(self):
return 'Groups: %s' % self.group_name
class Roles(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, autoincrement_seq, primary_key=True,)
role_name = db.Column(db.String(64), unique=True)
acls = db.Column(db.JSON)
create_time = db.Column(
db.DateTime, default=datetime.now, comment='创建时间')
update_time = db.Column(
db.DateTime, default=datetime.now, comment='更新时间')
remark = db.Column(db.String(128), default='')
def to_dict(self):
return {
'id': self.id,
'role_name': self.role_name,
'acls': self.acls,
'remark': self.remark,
'create_time': self.create_time.strftime(
'%Y-%m-%d %H:%M:%S') if self.create_time else None,
'update_time': self.update_time.strftime(
'%Y-%m-%d %H:%M:%S') if self.update_time else None,
}
def __repr__(self):
return 'Roles: %s' % self.role_name
class XUsers(db.Model):
__tablename__ = 'xusers'
id = db.Column(db.Integer, autoincrement_seq, primary_key=True)
username = db.Column(db.String(80), comment='登录名')
nickName = db.Column(db.String(180), comment='用户昵称')
password = db.Column(db.String(120), comment='密码')
token = db.Column(db.String(255), default='', comment='Token')
state = db.Column(db.String(120), default='正常', comment='用户状态')
userLevel = db.Column(db.Integer, default=1, nullable=False)
role = db.Column(db.String(180), default='')
group_id = db.Column(db.Integer, db.ForeignKey('groups.gid'))
avatar = db.Column(db.String(200), comment='用户头像')
create_time = db.Column(
db.DateTime, default=datetime.now, comment='创建时间')
update_time = db.Column(
db.DateTime, default=datetime.now, comment='更新时间')
groups_depart = db.relationship('Groups', backref='groups_to_xusers')
__table_args__ = (
db.UniqueConstraint(username, name='uni_name'),
)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'nickName': self.nickName,
'state': self.state,
'userLevel': self.userLevel,
'role': self.role,
'group_id': self.group_id,
'group_name': self.groups_depart.group_name,
'avatar': self.avatar,
'create_time': self.create_time.strftime(
'%Y-%m-%d %H:%M:%S') if self.create_time else None,
'update_time': self.update_time.strftime(
'%Y-%m-%d %H:%M:%S') if self.update_time else None,
}
SECRET_KEY = 'xxxxxxxxxxxxxxxxxxxx'
def generate_token(user_id):
try:
payload = {
'exp': datetime.now() + timedelta(days=30),
'iat': datetime.now().timestamp(),
'sub': user_id
}
return jwt.encode(
payload,
SECRET_KEY,
algorithm='HS256'
)
except Exception as e:
return e
def decode_token(token):
""" 解码Token并处理异常 """
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError as e:
print(e)
return False
except jwt.InvalidTokenError as e:
print(e)
return False
def check_token_and_user_status(token):
try:
user_id = decode_token(token)
token_in_user = XUsers.query.filter_by(token=token).first()
multi_device_login = True
if not multi_device_login and not token_in_user:
return False, 'token过期'
if user_id:
user = XUsers.query.get(user_id)
if user:
if user.state == '正常':
return True, user
else:
return False, '该用户状态异常!'
else:
return False, '用户不存在'
else:
return False, 'token无效'
except Exception as e:
print('check_token_and_user_status error', e)
return False, 'token已重置,请重新登录!'
def user_token_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Token')
if not token:
return jsonify({
'code': 401,
'data': {
'message': 'token不存在',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
user_state, user_info = check_token_and_user_status(token)
if not user_state:
return jsonify({
'code': 401,
'data': {
'message': user_info,
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
return f(*args, **kwargs, user_state=user_state, user_info=user_info)
return decorated_function
def general_token_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Token')
if not token:
return jsonify({
'code': 401,
'data': {
'message': 'token不存在',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
user_state, user_info = check_token_and_user_status(token)
if not user_state:
return jsonify({
'code': 401,
'data': {
'message': user_info,
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
return f(*args, **kwargs)
return decorated_function
@app.route('/api/user/login', methods=['POST'])
@cross_origin()
def user_login():
try:
data = request.json
username = data.get('username')
password = data.get('password')
user = XUsers.query.filter_by(username=username).first()
if not user:
return jsonify({
'code': 403,
'data': {
'message': '用户账号或密码错误',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
else:
if user.password == password and user.state == '正常':
if len(user.token) > 1:
user_id = decode_token(user.token)
if user_id:
token = user.token
else:
token = generate_token(user.id)
else:
token = generate_token(user.id)
user.token = token
db.session.commit()
return jsonify({
'code': 200,
'data': {
'token': token,
'username': user.username,
'avatar': user.avatar,
'message': '登陆成功',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
else:
return jsonify({
'code': 403,
'data': {
'message': '账号或密码错误!',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
except Exception as e:
print('user_login error', e)
return jsonify({
'code': 401,
'data': {
'message': '用户异常,请重新登录!',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
@app.route('/api/user/logout', methods=['POST'])
@cross_origin()
def user_logout():
data = request.json
token = data.get('token')
user_id = decode_token(token)
user = XUsers.query.filter_by(id=user_id).first()
if not user:
return jsonify({
'code': 403,
'data': {
'message': '用户异常,请重新登录!',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
else:
return jsonify({
'code': 200,
'data': {
'message': '用户已登出',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
},
})
if __name__ == '__main__':
import logging
logger = logging.getLogger()
logger.setLevel('DEBUG')
DEBUG_LOG_FORMAT = (
'-' * 80 + '\n' +
'%(levelname)s in %(module)s [%(pathname)s:%(lineno)d]:\n' +
'%(message)s\n' +
'-' * 80
)
PROD_LOG_FORMAT = '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
DATE_FORMAT = "[%Y-%m-%d %H:%M:%S %z]"
formatter = logging.Formatter(DEBUG_LOG_FORMAT, DATE_FORMAT)
chlr = logging.StreamHandler()
chlr.setFormatter(formatter)
logger.addHandler(chlr)
app.logger = logger
gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(gunicorn_logger.level)
app.run(host="0.0.0.0", port=5000, debug=True)