Appointment.utils.identity 源代码

负责人: pht


from typing import Callable, ParamSpec, Concatenate, TypeVar, overload, Literal, TypeGuard
from functools import wraps

from django.db.models import QuerySet
from django.shortcuts import redirect
from django.urls import reverse
from utils.http.rewrite_auth import login_required
from utils.http.dependency import HttpRequest, UserRequest, HttpResponse

from Appointment.models import User, Participant
from Appointment.config import appointment_config as CONFIG
from utils.global_messages import wrong, succeed, message_url
from app import API

__all__ = [

def get_participant(user: User | str, update: bool = ...,
                    raise_except: Literal[False] = ...) -> Participant | None: ...
def get_participant(user: User | str, update: bool = ...,
                    raise_except: Literal[True] = ...) -> Participant: ...

[文档] def get_participant(user: User | str, update: bool = False, raise_except: bool = False) -> Participant | None: '''通过User对象或学号获取对应的参与人对象 Args: - update: 获取带更新锁的对象 - raise_except: 失败时是否抛出异常,默认不抛出 Returns: - participant: 满足participant.Sid=user, 不存在时返回`None` Raises: - DoesNotExist: 当`raise_except`为`True`时,如果不存在对应的参与人对象则抛出异常 ''' try: par_all: QuerySet[Participant] = Participant.objects.all() if update: par_all = par_all.select_for_update() if isinstance(user, str): return par_all.get(Sid_id=user) return par_all.get(Sid=user) except: if raise_except: raise return None
def _arg2user(participant: Participant | User) -> User: '''把范围内的参数转化为User对象''' if isinstance(participant, Participant): user = participant.Sid else: user = participant return user # 获取用户信息
[文档] def get_name(participant: Participant | User): '''返回participant(个人或组织)的名称''' user = _arg2user(participant) return API.get_display_name(user)
[文档] def get_avatar(participant: Participant | User): '''返回participant的头像''' user = _arg2user(participant) return API.get_avatar_url(user)
[文档] def get_member_ids(participant: Participant | User, noncurrent: bool = False): '''返回participant的成员id列表,个人返回空列表''' user = _arg2user(participant) return API.get_members(user, noncurrent=noncurrent)
[文档] def get_auditor_ids(participant: Participant | User): '''返回participant的审核者id列表''' user = _arg2user(participant) return API.get_auditors(user)
# 用户验证、创建和更新 # TODO: Create Account For all Person with Command def _create_account(request: UserRequest, **values) -> Participant | None: ''' 根据请求信息创建账户, 根据创建结果返回生成的对象或者`None`, noexcept ''' from django.db import transaction try: assert request.user.is_authenticated with transaction.atomic(): values.update(Sid=request.user) values.setdefault('hidden', request.user.is_org()) values.setdefault('longterm', request.user.is_org() and len(get_member_ids(request.user)) >= 10) account = Participant.objects.create(**values) return account except: return None P = ParamSpec('P') R = TypeVar('R', bound=HttpRequest) AuthFunction = Callable[[Participant | None], bool] ViewFunction = Callable[Concatenate[R, P], HttpResponse] def _authenticate(participant: Participant | None) -> TypeGuard[Participant]: return participant is not None
[文档] def identity_check( auth_func: AuthFunction = _authenticate, redirect_field_name: str = 'origin', allow_create: bool = True, ) -> Callable[[ViewFunction[UserRequest, P]], ViewFunction[HttpRequest, P]]: def decorator(view_function: ViewFunction[UserRequest, P]) -> ViewFunction[HttpRequest, P]: @login_required(redirect_field_name=redirect_field_name) @wraps(view_function) def _wrapped_view(request: UserRequest, *args: P.args, **kwargs: P.kwargs): _allow_create = allow_create and CONFIG.allow_newstu_appoint context = {} if not request.user.is_valid(): _allow_create = False cur_part = get_participant(request.user) if cur_part is None and _allow_create: cur_part = _create_account(request) if cur_part is not None: succeed('账号不存在,已为您自动创建账号!', context) else: warn_message = ('创建地下室账户失败,请联系管理员为您解决。' '在此之前,您可以查看实时人数。') wrong(warn_message, context) if auth_func is not None and not auth_func(cur_part): # TODO: task 0 lzp, log it and notify admin if cur_part is not None: warn_message = ('您访问了未授权的页面,如需访问请先登录。') wrong(warn_message, context) elif not _allow_create: warn_message = ('本页面暂不支持地下室账户创建,您可以先查看实时人数。') wrong(warn_message, context) if context: return redirect(message_url(context, reverse('Appointment:index'))) return view_function(request, *args, **kwargs) return _wrapped_view return decorator