yp_library.utils 源代码

from typing import List, Dict, Tuple, Optional, Any
from datetime import datetime, timedelta

from django.db import transaction
from django.db.models import Q, QuerySet
from django.http import QueryDict, HttpRequest

from app.notification_utils import bulk_notification_create
from app.models import Notification, Organization, Activity
from app.extern.wechat import WechatMessageLevel
from yp_library.models import (
    User,
    Reader,
    Book,
    LendRecord,
)
from yp_library.config import library_config as CONFIG
from achievement.models import AchievementUnlock, Achievement

__all__ = [
    'get_readers_by_user', 'search_books',
    'get_query_dict', 'get_my_records', 'get_lendinfo_by_readers',
    'get_library_activity', 'get_recommended_or_newest_books', 'to_feedback_url',
]


def days_reminder(days: int, alert_msg: str):
    '''
    根据逾期时间时间向对应用户发送通知,不负责扣分

    :param days: 逾期时间
    :type days: int
    :param alert_msg: 通知内容
    :type alert_msg: str
    '''
    now = datetime.now()
    lendlist = LendRecord.objects.filter(
        returned=False,
        # 书房截止日期只包含日期信息,不包含时间
        due_time__date=now.date() - timedelta(days=days),
        lend_time__hour=now.hour,
    )

    receivers = lendlist.values_list('reader_id__student_id')
    receivers = User.objects.filter(username__in=receivers)
    _send_remind_notification(receivers, alert_msg)


def violate_reminder(days: int, alert_msg: str):
    '''
    扣除逾期超过指定天数的用户信用分一分并发送通知

    :param days: 逾期时间
    :type days: int
    :param alert_msg: 通知内容
    :type alert_msg: str
    '''
    before = datetime.now() - timedelta(hours=1)
    violate_lendlist = LendRecord.objects.filter(
        returned=False,
        due_time__lte=before - timedelta(days=days),
        lend_time__hour=before.hour,
        status=LendRecord.Status.NORMAL)

    # 逾期一周扣除信用分
    receivers = list(violate_lendlist.values_list(
        'reader_id__student_id', flat=True))
    receivers = User.objects.filter(username__in=receivers)
    # 绑定扣分和状态修改
    with transaction.atomic():
        for receiver in receivers:
            User.objects.modify_credit(receiver, -1, '书房:归还逾期')
        violate_lendlist.select_for_update().update(status=LendRecord.Status.OVERTIME)
    _send_remind_notification(receivers, alert_msg)


def _send_remind_notification(receivers: QuerySet[User], content: str):
    if not receivers:
        return
    # 发送通知
    URL = "/lendinfo/"
    typename = Notification.Type.NEEDREAD
    sender = Organization.objects.get(oname="何善衡图书室").get_user()
    bulk_notification_create(
        receivers=receivers,
        sender=sender,
        typename=typename,
        title=Notification.Title.YPLIB_INFORM,
        content=content,
        URL=URL,
        to_wechat=dict(level=WechatMessageLevel.IMPORTANT),
    )


[文档] def get_readers_by_user(user: User) -> QuerySet[Reader]: """ 根据学号寻找与user关联的reader,要求必须为个人账号且账号必须通过学号关联至少一个reader,否则抛出AssertionError :param user: HttpRequest的User :type user: User :raises AssertionError: 只允许个人账户登录 :raises AssertionError: user的学号没有关联任何书房账号 :return: 与user关联的所有reader :rtype: QuerySet[Reader] """ if not user.is_person(): raise AssertionError('您目前使用非个人账号登录,如要查询借阅记录,请使用个人账号。') # 获取与当前user的学号对应的所有readers readers = Reader.objects.filter(student_id=user.username) if len(readers) == 0: raise AssertionError('您的学号没有关联任何书房账号,如有借书需要,请前往书房开通账号。') return readers
[文档] def search_books(**query_dict) -> QuerySet[Book]: """ 根据给定的属性查询书 :param query_dict: key为id/identity_code/title/author/publisher/returned, value为相应的query id和returned是精确查询,剩下四个是string按contains查询 特别地,还支持全关键词查询:同时在identity_code/title/author/publisher中检索word :type query_dict: dict :return: 查询结果,每个记录是Book表的一行 :rtype: QuerySet[Book] """ query = Q() if query_dict.get("id", "") != "": query &= Q(id=int(query_dict["id"])) if query_dict.get("identity_code", "") != "": query &= Q(identity_code__contains=query_dict["identity_code"]) if query_dict.get("title", "") != "": query &= Q(title__contains=query_dict["title"]) if query_dict.get("author", "") != "": query &= Q(author__contains=query_dict["author"]) if query_dict.get("publisher", "") != "": query &= Q(publisher__contains=query_dict["publisher"]) if query_dict.get("returned", "") != "": query &= Q(returned=query_dict["returned"]) if query_dict.get("keywords", "") != "": kw_query = (Q(title__contains=query_dict["keywords"]) | Q(author__contains=query_dict["keywords"]) | Q(publisher__contains=query_dict["keywords"]) | Q(identity_code__contains=query_dict["keywords"])) query &= kw_query search_results = Book.objects.filter(query).values() # TODO: Return type doesn't match return search_results
[文档] def get_query_dict(post_dict: QueryDict) -> Dict[str, Any]: """ 从HttpRequest的POST中提取出用作search_books参数的query_dict :param post_dict: request.POST :type post_dict: QueryDict :return: 一个词典,key为id/identity_code/title/author/publisher/returned/keywords, value为相应的query :rtype: dict """ # 采用五种查询条件,即"identity_code", "title", "author", "publisher"和"returned",可视情况修改 # returned是精确搜索,剩下四个是包含即可(contains) # (暂不提供通过id查询,因为id应该没有实际含义,用到的可能性不大) # search_books函数要求输入为一个词典,其条目对应"id", "identity_code", "title", "author", "publisher"和"returned"的query # 这里没有id的query,故query为空串 # 此外,还提供“全关键词检索”,具体见search_books query_dict = {} for query_type in ["identity_code", "title", "author", "publisher"]: if query_type in post_dict.keys(): query_dict[query_type] = post_dict[query_type] # 上面的"identity_code", "title", "author", "publisher"在post_dict中可以没有(如welcome页面的搜索), # 下面的"returned"和"keywords"必须有 if len(post_dict.getlist("returned")) == 1: # 如果对returned有要求 query_dict["returned"] = True # 全关键词检索 query_dict["keywords"] = post_dict["keywords"] return query_dict
# TODO: Invalid type annotation
[文档] def get_my_records(reader_id: str, returned: Optional[bool] = None, status: 'list | int | LendRecord.Status' = None) -> List[dict]: """ 查询给定读者的借书记录 :param reader_id: reader的id :type reader_id: str :param returned: 如非空,则限定是否已归还, defaults to None :type returned: bool, optional :param status: 如非空,则限定当前状态, defaults to None :type status: Union[list, tuple, int, LendRecord.Status], optional :return: 查询结果,每个记录包括val_list中的属性以及记录类型(key为'type': 对于已归还记录,False表示逾期记录,True表示正常记录;对于未归还记录, 'normal'表示一般记录,'overtime'表示逾期记录,'approaching'表示接近 期限记录即距离应归还时期<=1天) :rtype: List[dict] """ all_records_list = LendRecord.objects.filter(reader_id=reader_id) val_list = ['id', 'book_id__title', 'lend_time', 'due_time', 'return_time'] if returned is not None: results = all_records_list.filter(returned=returned) if returned: val_list.append('status') # 已归还记录,增加申诉状态呈现 else: results = all_records_list if status is not None: if isinstance(status, (int, LendRecord.Status)): results = results.filter(status=status) else: results = results.filter(status__in=status) records = list(results.values(*val_list)) # 标记记录类型 if returned: for record in records: if record['return_time'] > record['due_time']: record['type'] = 'overtime_returned' # 逾期记录 else: record['type'] = 'returned' # 正常记录 else: now_time = datetime.now() for record in records: # 计算距离应归还时间的天数 delta_days = (record['due_time'] - now_time).total_seconds() / float(60 * 60 * 24) if delta_days > 1: record['type'] = 'normal' # 一般记录 elif delta_days < 0: record['type'] = 'overtime' # 逾期记录 else: record['type'] = 'approaching' # 接近期限记录 return records
# TODO: Invalid type annotation
[文档] def get_lendinfo_by_readers(readers: QuerySet[Reader]) -> Tuple[List[dict], List[dict]]: ''' 查询同一user关联的读者的借阅信息 :param readers: 与user关联的所有读者 :type readers: QuerySet[Reader] :return: 两个list,分别表示未归还记录和已归还记录 :rtype: List[dict], List[dict] ''' unreturned_records_list = [] returned_records_list = [] reader_ids = list(readers.values('id')) for reader_id in reader_ids: unreturned_records_list.extend( get_my_records(reader_id['id'], returned=False)) returned_records_list.extend( get_my_records(reader_id['id'], returned=True)) unreturned_records_list.sort( key=lambda r: r['due_time']) # 进行中记录按照应归还时间排序 returned_records_list.sort( key=lambda r: r['return_time'], reverse=True) # 已完成记录按照归还时间逆序排列 return unreturned_records_list, returned_records_list
[文档] def get_library_activity(num: int) -> QuerySet[Activity]: """ 获取书房欢迎页面展示的活动列表 目前筛选活动的逻辑是:书房组织的、状态为报名中/等待中/进行中、活动开始时间越晚越优先 :param num: 最多展示多少活动 :type num: int :return: 展示的活动 :rtype: QuerySet[Activity] """ all_valid_library_activities = Activity.objects.activated().filter( organization_id__oname=CONFIG.organization_name, status__in=[ Activity.Status.APPLYING, Activity.Status.WAITING, Activity.Status.PROGRESSING ] ).order_by('-start') display_activities = all_valid_library_activities[:num].values() # TODO: Return type doesn't match return display_activities
[文档] def to_feedback_url(request: HttpRequest) -> str: """ 检查预约记录是否可以申诉。 如果可以,向session添加传递到反馈填写界面的信息。 最终函数返回跳转到的url。 :param request: http请求 :type request: HttpRequest :return: 即将跳转到的url :rtype: str """ # 首先检查预约记录是否存在 try: id = request.POST['feedback'] record: LendRecord = LendRecord.objects.get(id=id) except: raise AssertionError("借阅记录不存在!") # 然后检查借阅记录是否可申诉 # TODO: May encounter None Value assert record.due_time < record.return_time, "该借阅记录不可申诉!" # 将record的状态改为“申诉中” record.status = LendRecord.Status.APPEALING record.save() book_name = record.book_id.title lend_time = record.lend_time.strftime('%Y-%m-%d %H:%M') due_time = record.due_time.strftime('%Y-%m-%d %H:%M') return_time = record.return_time.strftime('%Y-%m-%d %H:%M') # 向session添加信息 request.session['feedback_type'] = '书房借阅申诉' request.session['feedback_url'] = record.get_admin_url() request.session['feedback_content'] = '\n'.join(( f'借阅书籍:{book_name}', f'借阅时间:{lend_time}', f'应归还时间:{due_time}', f'实际归还时间:{return_time}', '姓名:', '申诉理由:' )) # 最终返回填写feedback的url return '/feedback/?argue'