generic.views 源代码
from typing import cast
from django.contrib import auth
from utils.http.dependency import HttpRequest, HttpResponse, UserRequest
from generic.models import User
import utils.models.query as SQ
from utils.global_messages import succeed
from utils.health_check import db_connection_healthy
from utils.views import SecureTemplateView, SecureView
from app.models import Organization
from app.utils import update_related_account_in_session
[文档]
class Index(SecureTemplateView):
login_required = False
template_name = 'index.html'
[文档]
def dispatch_prepare(self, method: str) -> SecureView.HandlerType:
match method:
case 'get':
return (self.user_get
if self.request.user.is_authenticated else
self.visitor_get)
case 'post':
return self.prepare_login()
case _:
return self.default_prepare(method)
[文档]
def visitor_get(self) -> HttpResponse:
# Modify password
# Seems that after modification, log out by default?
if self.request.GET.get('modinfo') is not None:
succeed("修改密码成功!", self.extra_context)
return self.render()
[文档]
def user_get(self) -> HttpResponse:
self.request = cast(UserRequest, self.request)
# Special user
self.valid_user_check(self.request.user)
# Logout
if self.request.GET.get('is_logout') is not None:
return self.redirect('logout')
return self.redirect('welcome')
[文档]
def valid_user_check(self, user: User):
# Special user
if not user.is_valid():
self.permission_denied(
f'“{user.get_full_name()}”不存在成长档案,您可以登录其他账号'
)
[文档]
def ip_check(self) -> None:
# Prevent bug report
x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for and x_forwarded_for.split(',')[0] == '127.0.0.1':
self.permission_denied('请使用域名访问')
[文档]
def prepare_login(self) -> SecureView.HandlerType:
self.ip_check()
assert 'username' in self.request.POST
assert 'password' in self.request.POST
_user = self.request.user
assert not _user.is_authenticated or not cast(User, _user).is_valid()
username = self.request.POST['username']
# Check weather username exists
if not SQ.sfilter(User.username, username).exists():
# Allow org to login with orgname
org = SQ.sfilter(Organization.oname, username).first()
if org is None:
return self.wrong('用户名不存在')
username = cast(Organization, org).get_user().username
self.username = username
self.password = self.request.POST['password']
return self.login
[文档]
def login(self) -> HttpResponse:
# Try login
userinfo = auth.authenticate(username=self.username, password=self.password)
if userinfo is None:
return self.wrong('密码错误')
# special user
auth.login(self.request, userinfo)
self.request = cast(UserRequest, self.request)
self.valid_user_check(self.request.user)
# first time login
if self.request.user.is_newuser:
return self.redirect('modpw')
# Related account
# When login as np, related org accout is also available
update_related_account_in_session(self.request, self.username)
# If origin is present and valid, redirect
# Otherwise, redirect to welcome page
origin = self.request.GET.get('origin')
if origin and self._is_origin_safe(self.request, origin):
return self.redirect(origin)
else:
return self.redirect('welcome')
def _is_origin_safe(self, request: HttpRequest,
origin: str | None = None) -> bool:
return origin is None or origin.startswith('/')
[文档]
class Logout(SecureView):
login_required = False
[文档]
def dispatch_prepare(self, method: str) -> SecureView.HandlerType:
return self.get
[文档]
def get(self) -> HttpResponse:
auth.logout(self.request)
return self.redirect('index')
[文档]
def healthcheck(request: HttpRequest) -> HttpResponse:
'''
django健康状态检查
尝试执行数据库操作,若成功返回200,不成功返回500
'''
if db_connection_healthy():
return HttpResponse('healthy', status=200)
else:
return HttpResponse('unhealthy', status=500)