172 lines
6.1 KiB
Python
172 lines
6.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
@Remark:管理后台登录视图
|
|
"""
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
from captcha.views import CaptchaStore, captcha_image
|
|
from django.utils.translation import gettext_lazy as _
|
|
from drf_yasg import openapi
|
|
from drf_yasg.utils import swagger_auto_schema
|
|
from rest_framework import serializers
|
|
from rest_framework.views import APIView
|
|
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer
|
|
from rest_framework_simplejwt.views import TokenObtainPairView
|
|
|
|
from mysystem.models import Users
|
|
from utils.jsonResponse import SuccessResponse
|
|
from utils.validator import CustomValidationError
|
|
from utils.request_util import save_login_log
|
|
from django_redis import get_redis_connection
|
|
from django.conf import settings
|
|
from config import IS_SINGLE_TOKEN,LOGIN_ERROR_RETRY_TIMES,LOGIN_ERROR_RETRY_TIMEOUT
|
|
from django.core.cache import cache
|
|
|
|
class CaptchaView(APIView):
|
|
"""
|
|
获取图片验证码
|
|
"""
|
|
authentication_classes = []
|
|
|
|
@swagger_auto_schema(
|
|
responses={
|
|
'200': openapi.Response('获取成功')
|
|
},
|
|
security=[],
|
|
operation_id='captcha-get',
|
|
operation_description='验证码获取',
|
|
)
|
|
def get(self, request):
|
|
hashkey = CaptchaStore.generate_key()
|
|
id = CaptchaStore.objects.filter(hashkey=hashkey).first().id
|
|
imgage = captcha_image(request, hashkey)
|
|
# 将图片转换为base64
|
|
image_base = base64.b64encode(imgage.content)
|
|
json_data = {"key": id, "image_base": "data:image/png;base64," + image_base.decode('utf-8')}
|
|
return SuccessResponse(data=json_data)
|
|
|
|
def check_login_error_retry(username):
|
|
"""
|
|
登录错误次数限制
|
|
"""
|
|
if LOGIN_ERROR_RETRY_TIMES>0:
|
|
key = f'lyadmin_login_error_retry_count_{username}'
|
|
retry_count = cache.get(key)
|
|
if retry_count is None:
|
|
cache.set(key, 1, timeout=LOGIN_ERROR_RETRY_TIMEOUT)
|
|
else:
|
|
if retry_count >= LOGIN_ERROR_RETRY_TIMES:
|
|
raise ValueError('登录错误次数超过限制')
|
|
cache.incr(key)
|
|
|
|
class LoginSerializer(TokenObtainPairSerializer):
|
|
"""
|
|
登录的序列化器:
|
|
重写djangorestframework-simplejwt的序列化器
|
|
"""
|
|
captcha = serializers.CharField(max_length=6)
|
|
|
|
class Meta:
|
|
model = Users
|
|
fields = "__all__"
|
|
read_only_fields = ["id"]
|
|
|
|
default_error_messages = {
|
|
'no_active_account': _('该账号已被禁用,请联系管理员')
|
|
}
|
|
|
|
#开启验证码验证
|
|
def validate_captcha(self, captcha):
|
|
self.image_code = CaptchaStore.objects.filter(id=self.initial_data['captchaKey']).first()
|
|
five_minute_ago = datetime.now() - timedelta(hours=0, minutes=5, seconds=0)
|
|
if self.image_code and five_minute_ago > self.image_code.expiration:
|
|
self.image_code and self.image_code.delete()
|
|
raise CustomValidationError('验证码过期')
|
|
else:
|
|
if self.image_code and (self.image_code.response == captcha or self.image_code.challenge == captcha):
|
|
self.image_code and self.image_code.delete()
|
|
else:
|
|
self.image_code and self.image_code.delete()
|
|
raise CustomValidationError("图片验证码错误")
|
|
|
|
def validate(self, attrs):
|
|
username = attrs['username']
|
|
password = attrs['password']
|
|
user = Users.objects.filter(username=username).first()
|
|
|
|
if not user:
|
|
check_login_error_retry(username)
|
|
result = {
|
|
"code": 4000,
|
|
"msg": "账号/密码不正确",
|
|
"data": None
|
|
}
|
|
return result
|
|
|
|
if user and not user.is_staff:#判断是否允许登录后台
|
|
result = {
|
|
"code": 4000,
|
|
"msg": "您没有权限登录后台",
|
|
"data": None
|
|
}
|
|
|
|
return result
|
|
|
|
if user and not user.is_active:
|
|
result = {
|
|
"code": 4000,
|
|
"msg": "该账号已被禁用,请联系管理员",
|
|
"data": None
|
|
}
|
|
return result
|
|
|
|
if user and user.check_password(password): # check_password() 对明文进行加密,并验证
|
|
data = super().validate(attrs)
|
|
refresh = self.get_token(self.user)
|
|
role_names = ""
|
|
if user.identity == 0:
|
|
role_names = "超级管理员"
|
|
else:
|
|
role_list = user.role.values_list('id',"name")
|
|
role_names_list = [i[1] for i in role_list]
|
|
role_names = ','.join(role_names_list)
|
|
data['avatar'] = self.user.avatar
|
|
data['role_names'] = role_names
|
|
data['name'] = self.user.name
|
|
data['userId'] = self.user.id
|
|
data['refresh'] = str(refresh)
|
|
data['access'] = str(refresh.access_token)
|
|
request = self.context.get('request')
|
|
request.user = self.user
|
|
# 记录登录成功日志
|
|
save_login_log(request=request)
|
|
# 缓存用户的jwt token
|
|
if IS_SINGLE_TOKEN:
|
|
redis_conn = get_redis_connection("singletoken")
|
|
k = "lybbn-single-token{}".format(user.id)
|
|
TOKEN_EXPIRE_CONFIG = getattr(settings, 'SIMPLE_JWT', None)
|
|
if TOKEN_EXPIRE_CONFIG:
|
|
TOKEN_EXPIRE = TOKEN_EXPIRE_CONFIG['ACCESS_TOKEN_LIFETIME']
|
|
redis_conn.set(k, data['access'], TOKEN_EXPIRE)
|
|
result = {
|
|
"code": 2000,
|
|
"msg": "请求成功",
|
|
"data": data
|
|
}
|
|
else:
|
|
check_login_error_retry(username)
|
|
result = {
|
|
"code": 4000,
|
|
"msg": "账号/密码不正确",
|
|
"data": None
|
|
}
|
|
return result
|
|
|
|
|
|
class LoginView(TokenObtainPairView):
|
|
"""
|
|
登录接口
|
|
"""
|
|
serializer_class = LoginSerializer
|
|
permission_classes = [] |