from collections import defaultdict
from typing import cast
from django.http import HttpRequest
from app.utils_dependency import *
from app.models import (
NaturalPerson,
AcademicTag,
AcademicEntry,
AcademicTagEntry,
AcademicTextEntry,
AcademicQA,
User,
Chat,
)
from app.utils import get_person_or_org
from app.comment_utils import showComment
__all__ = [
'get_search_results',
'chats2Display',
'comments2display',
'get_js_tag_list',
'get_text_list',
'get_tag_status',
'get_text_status',
'update_tag_entry',
'update_text_entry',
'update_academic_map',
'get_wait_audit_student',
'audit_academic_map',
'have_entries',
'get_tags_for_search',
]
[文档]
def get_search_results(query: str) -> dict[str, dict[str, list[str]]]:
# TODO: 更新文档
"""
根据提供的关键词获取搜索结果。
"""
# 搜索所有含有关键词的公开的学术地图项目,忽略大小写
academic_tags = AcademicTagEntry.objects.filter(
tag__tag_content__icontains=query,
status=AcademicEntry.EntryStatus.PUBLIC,
).values_list(
SQ.f(AcademicEntry.person, NaturalPerson.person_id, User.username),
SQ.f(AcademicTagEntry.tag, AcademicTag.atype),
SQ.f(AcademicTagEntry.tag, AcademicTag.tag_content),
)
academic_texts = AcademicTextEntry.objects.filter(
content__icontains=query,
status=AcademicEntry.EntryStatus.PUBLIC,
).values_list(
SQ.f(AcademicEntry.person, NaturalPerson.person_id, User.username),
SQ.f(AcademicTextEntry.atype),
SQ.f(AcademicTextEntry.content),
)
# 根据tag/text对应的人,整合学术地图项目
# 直接使用defaultdict会导致前端items不可用,因为Django先尝试以键访问,并得到空列表
academic_map_dict = defaultdict(lambda: defaultdict(list[str]))
for sid, ty, content in academic_tags:
academic_map_dict[sid][AcademicTag.Type(ty).label].append(content)
for sid, ty, content in academic_texts:
academic_map_dict[sid][AcademicTextEntry.Type(ty).label].append(content)
return {k: dict(v) for k, v in academic_map_dict.items()} # 转化为字典
[文档]
def chats2Display(user: User, sent: bool) -> dict[str, list[dict]]:
"""
把我收到/我发出的所有chat转化为供前端展示的两个列表,分别是进行中chat的信息、和其他chat的信息
:param chats: 我收到/我发出的所有chat
:type chats: QuerySet[Chat]
:param sent: 若为True则表示我发出的,否则表示我收到的
:type sent: bool
:return: 一个词典,key为progressing和not_progressing,value分别是进行中chat的列表、和其他chat的列表
:rtype: dict[str, list[dict]]
"""
not_progressing_chats = []
progressing_chats = []
if sent:
chats = Chat.objects.filter(questioner=user).order_by(
"-modify_time", "-time")
else:
chats = Chat.objects.filter(respondent=user).order_by(
"-modify_time", "-time")
for chat in chats:
chat_dict = {}
chat_dict['id'] = chat.id
chat_dict['questioner_anonymous'] = chat.questioner_anonymous
# 目前根据回答者是否匿名,来区分定向和非定向提问
chat_dict['respondent_anonymous'] = chat.respondent_anonymous
is_questioner = user == chat.questioner
chat_dict['is_questioner'] = is_questioner
chat_dict['questioner_name'] = get_person_or_org(
chat.questioner).get_display_name()
chat_dict['respondent_name'] = get_person_or_org(
chat.respondent).get_display_name()
if is_questioner:
chat_dict['academic_url'] = get_person_or_org(
chat.respondent).get_absolute_url()
else:
chat_dict['academic_url'] = get_person_or_org(
chat.questioner).get_absolute_url()
if len(chat.title) >= 12:
chat_dict['title'] = chat.title[:12] + "……"
else:
chat_dict['title'] = chat.title or "无主题"
chat_dict['status'] = chat.get_status_display()
chat_dict['start_time'] = chat.time
chat_dict['last_modification_time'] = chat.modify_time
chat_dict['chat_url'] = f"/viewQA/{chat.id}" # 问答详情的url,超链接放在title上
chat_dict['message_count'] = chat.comments.count()
if chat.status == Chat.Status.PROGRESSING:
progressing_chats.append(chat_dict)
else:
not_progressing_chats.append(chat_dict)
return {
"progressing": progressing_chats,
"not_progressing": not_progressing_chats
}
[文档]
def get_js_tag_list(author: NaturalPerson, type: AcademicTag.Type,
selected: bool,
status_in: list[AcademicEntry.EntryStatus] | None = None) -> list[dict]:
"""
用于前端显示支持搜索的专业/项目列表,返回形如[{id, content}]的列表。
:param author: 作者自然人信息
:type author: NaturalPerson
:param type: 标记所需的tag类型
:type type: AcademicTag.Type
:param selected: 用于标记是否获取本人已有的专业项目,selected代表获取前端默认选中的项目
:type selected: bool
:param status_in: 所要检索的状态的列表,默认为None,表示搜索全部
:type status_in: list[AcademicEntry.EntryStatus]
:return: 所有专业/项目组成的List[dict],key值如上所述
:rtype: list[dict]
"""
if selected:
all_my_tags = AcademicTagEntry.objects.activated().filter(person=author)
if status_in is not None:
all_my_tags = all_my_tags.filter(status__in=status_in)
tags = all_my_tags.filter(tag__atype=type).values(
'tag__id', 'tag__tag_content')
js_list = [{"id": tag['tag__id'], "text": tag['tag__tag_content']}
for tag in tags]
else:
tags = AcademicTag.objects.filter(atype=type)
js_list = [{"id": tag.id, "text": tag.tag_content} for tag in tags]
return js_list
[文档]
def get_text_list(author: NaturalPerson, type: AcademicTextEntry.Type,
status_in: list[AcademicEntry.EntryStatus] | None = None) -> list[str]:
"""
获取自己的所有类型为type的TextEntry的内容列表。
:param author: 作者自然人信息
:type author: NaturalPerson
:param type: TextEntry的类型
:type type: AcademicTextEntry.Type
:param status_in: 所要检索的状态的列表,默认为None,表示搜索全部
:type status_in: list
:return: 含有所有类型为type的TextEntry的content的list
:rtype: list[str]
"""
all_my_text = AcademicTextEntry.objects.activated().filter(person=author, atype=type)
if status_in is not None:
all_my_text = all_my_text.filter(status__in=status_in)
text_list = [text.content for text in all_my_text]
return text_list
[文档]
def get_tag_status(person: NaturalPerson, type: AcademicTag.Type) -> str:
"""
获取person的类型为type的TagEntry的公开状态。
如果person没有类型为type的TagEntry,返回"公开"。
:param person: 需要获取公开状态的人
:type person: NaturalPerson
:param type: TagEntry的类型
:type type: AcademicTag.Type
:return: 公开状态,返回"公开/私密"
:rtype: str
"""
# 首先获取person所有的TagEntry
all_tag_entries = AcademicTagEntry.objects.activated().filter(
person=person, tag__atype=type)
if all_tag_entries.exists():
# 因为所有类型为type的TagEntry的公开状态都一样,所以直接返回第一个entry的公开状态
entry = all_tag_entries[0]
return "私密" if entry.status == AcademicEntry.EntryStatus.PRIVATE else "公开"
else:
return "公开"
[文档]
def get_text_status(person: NaturalPerson, type: AcademicTextEntry.Type) -> str:
"""
获取person的类型为type的TextEntry的公开状态。
如果person没有类型为type的TextEntry,返回"公开"。
:param person: 需要获取公开状态的人
:type person: NaturalPerson
:param type: TextEntry的类型
:type type: AcademicTextEntry.Type
:return: 公开状态,返回"公开/私密"
:rtype: str
"""
# 首先获取person所有的类型为type的TextEntry
all_text_entries = AcademicTextEntry.objects.activated().filter(person=person,
atype=type)
if all_text_entries.exists():
# 因为所有类型为type的TextEntry的公开状态都一样,所以直接返回第一个entry的公开状态
entry = all_text_entries[0]
return "私密" if entry.status == AcademicEntry.EntryStatus.PRIVATE else "公开"
else:
return "公开"
[文档]
def update_tag_entry(person: NaturalPerson,
tag_ids: list[str],
status: bool,
type: AcademicTag.Type) -> None:
"""
更新TagEntry的工具函数。
:param person: 需要更新学术地图的人
:type person: NaturalPerson
:param tag_ids: 含有一系列tag_id(未经类型转换)的list
:type tag_ids: list[str]
:param status: tag_ids对应的所有tags的公开状态
:type status: bool
:param type: tag_ids对应的所有tags的类型
:type type: AcademicTag.Type
"""
# 首先获取person所有的TagEntry
all_tag_entries = AcademicTagEntry.objects.activated().filter(
person=person, tag__atype=type)
# 标签类型无需审核
updated_status = (AcademicEntry.EntryStatus.PUBLIC
if status == "公开" else
AcademicEntry.EntryStatus.PRIVATE)
for entry in all_tag_entries:
if not str(entry.tag.id) in tag_ids:
# 如果用户原有的TagEntry的id在tag_ids中未出现,则将其状态设置为“已弃用”
entry.status = AcademicEntry.EntryStatus.OUTDATE
entry.save()
else:
# 如果出现,直接更新其状态,并将这个id从tag_ids移除
entry.status = updated_status
entry.save()
tag_ids.remove(str(entry.tag.id))
# 接下来遍历的tag_id都是要新建的tag
for tag_id in tag_ids:
AcademicTagEntry.objects.create(
person=person, tag=AcademicTag.objects.get(id=int(tag_id)),
status=updated_status
)
[文档]
def update_text_entry(person: NaturalPerson,
contents: list[str],
status: bool,
type: AcademicTextEntry.Type) -> None:
"""
更新TextEntry的工具函数。
:param person: 需要更新学术地图的人
:type person: NaturalPerson
:param tag_ids: 含有一系列TextEntry的内容的list
:type tag_ids: list[str]
:param status: 该用户所有类型为type的TextEntry的公开状态
:type status: bool
:param type: contents对应的TextEntry的类型
:type type: AcademicTextEntry.Type
"""
# 首先获取person所有的类型为type的TextEntry
all_text_entries = AcademicTextEntry.objects.activated().filter(person=person,
atype=type)
updated_status = (AcademicEntry.EntryStatus.WAIT_AUDIT
if status == "公开" else
AcademicEntry.EntryStatus.PRIVATE)
previous_num = len(all_text_entries)
# 即将修改/创建的entry总数一定不小于原有的,因此先遍历原有的entry,判断是否更改/删除
for i, entry in enumerate(all_text_entries):
if entry.content != contents[i]:
# 内容发生修改,需要先将原有的content设置为“已弃用”
entry.status = AcademicEntry.EntryStatus.OUTDATE
entry.save()
if contents[i] != "": # 只有新的entry的内容不为空才创建
AcademicTextEntry.objects.create(
person=person, atype=type, content=contents[i],
status=updated_status,
)
elif entry.status != updated_status:
# 内容未修改但status修改,只更新entry的状态,不删除
entry.status = updated_status
entry.save()
# 接下来遍历的entry均为需要新建的
for content in contents[previous_num:]:
if content != "":
AcademicTextEntry.objects.create(
person=person, atype=type, content=content,
status=AcademicEntry.EntryStatus.WAIT_AUDIT if status == "公开"
else AcademicEntry.EntryStatus.PRIVATE
)
[文档]
def update_academic_map(request: HttpRequest) -> dict:
"""
从前端获取填写的学术地图信息,并在数据库中进行更新,返回含有成功/错误信息的dict。
:param request: http请求
:type request: HttpRequest
:return: 含成功/错误信息的dict,用于redirect后页面的前端展示
:rtype: dict
"""
# 首先从select栏获取所有选中的TagEntry
majors = request.POST.getlist('majors')
minors = request.POST.getlist('minors')
double_degrees = request.POST.getlist('double_degrees')
projects = request.POST.getlist('projects')
# 然后从其余栏目获取即将更新的TextEntry
scientific_research_num = int(request.POST['scientific_research_num'])
challenge_cup_num = int(request.POST['challenge_cup_num'])
internship_num = int(request.POST['internship_num'])
scientific_direction_num = int(request.POST['scientific_direction_num'])
graduation_num = int(request.POST['graduation_num'])
scientific_research = [request.POST[f'scientific_research_{i}']
for i in range(scientific_research_num+1)]
challenge_cup = [request.POST[f'challenge_cup_{i}']
for i in range(challenge_cup_num+1)]
internship = [request.POST[f'internship_{i}']
for i in range(internship_num+1)]
scientific_direction = [request.POST[f'scientific_direction_{i}']
for i in range(scientific_direction_num+1)]
graduation = [request.POST[f'graduation_{i}']
for i in range(graduation_num+1)]
# 对上述五个列表中的所有填写项目,检查是否超过数据库要求的字数上限
def max_length_of(items): return max(
[len(item) for item in items]) if len(items) > 0 else 0
MAX_LENGTH = 4095
if max_length_of(scientific_research) > MAX_LENGTH:
return wrong("您设置的本科生科研经历太长啦!请修改~")
elif max_length_of(challenge_cup) > MAX_LENGTH:
return wrong("您设置的挑战杯经历太长啦!请修改~")
elif max_length_of(internship) > MAX_LENGTH:
return wrong("您设置的实习经历太长啦!请修改~")
elif max_length_of(scientific_direction) > MAX_LENGTH:
return wrong("您设置的科研方向太长啦!请修改~")
elif max_length_of(graduation) > MAX_LENGTH:
return wrong("您设置的毕业去向太长啦!请修改~")
# 从checkbox获取所有栏目的公开状态
major_status = request.POST['major_status']
minor_status = request.POST['minor_status']
double_degree_status = request.POST['double_degree_status']
project_status = request.POST['project_status']
scientific_research_status = request.POST['scientific_research_status']
challenge_cup_status = request.POST['challenge_cup_status']
internship_status = request.POST['internship_status']
scientific_direction_status = request.POST['scientific_direction_status']
graduation_status = request.POST['graduation_status']
# 获取前端信息后对数据库进行更新
with transaction.atomic():
assert request.user.is_authenticated
user = cast(User, request.user)
me = get_person_or_org(user, User.Type.PERSON)
# 首先更新自己的TagEntry
update_tag_entry(me, majors, major_status, AcademicTag.Type.MAJOR)
update_tag_entry(me, minors, minor_status, AcademicTag.Type.MINOR)
update_tag_entry(me, double_degrees, double_degree_status,
AcademicTag.Type.DOUBLE_DEGREE)
update_tag_entry(me, projects, project_status,
AcademicTag.Type.PROJECT)
# 然后更新自己的TextEntry
update_text_entry(
me, scientific_research, scientific_research_status,
AcademicTextEntry.Type.SCIENTIFIC_RESEARCH
)
update_text_entry(
me, challenge_cup, challenge_cup_status,
AcademicTextEntry.Type.CHALLENGE_CUP
)
update_text_entry(
me, internship, internship_status,
AcademicTextEntry.Type.INTERNSHIP
)
update_text_entry(
me, scientific_direction, scientific_direction_status,
AcademicTextEntry.Type.SCIENTIFIC_DIRECTION
)
update_text_entry(
me, graduation, graduation_status,
AcademicTextEntry.Type.GRADUATION
)
# 最后更新是否允许他人提问
accept_chat = request.POST["accept_chat"]
user.accept_chat = True if accept_chat == "True" else False
user.save()
return succeed("学术地图修改成功!")
[文档]
def get_wait_audit_student() -> set[NaturalPerson]:
"""
获取当前审核中的AcademicEntry对应的学生,因为要去重,所以返回一个集合
:return: 当前审核中的AcademicEntry对应的NaturalPerson组成的集合
:rtype: set[NaturalPerson]
"""
wait_audit_tag_entries = AcademicTagEntry.objects.filter(
status=AcademicEntry.EntryStatus.WAIT_AUDIT)
wait_audit_text_entries = AcademicTextEntry.objects.filter(
status=AcademicEntry.EntryStatus.WAIT_AUDIT)
wait_audit_students = set()
for entry in wait_audit_tag_entries:
wait_audit_students.add(entry.person)
for entry in wait_audit_text_entries:
wait_audit_students.add(entry.person)
return wait_audit_students
[文档]
def audit_academic_map(author: NaturalPerson) -> None:
"""
审核通过某用户的记录。
:param author: 被审核用户
:type author: NaturalPerson
"""
# 筛选所有待审核的记录
AcademicTagEntry.objects.activated().filter(
person=author, status=AcademicEntry.EntryStatus.WAIT_AUDIT).update(
status=AcademicEntry.EntryStatus.PUBLIC)
AcademicTextEntry.objects.activated().filter(
person=author, status=AcademicEntry.EntryStatus.WAIT_AUDIT).update(
status=AcademicEntry.EntryStatus.PUBLIC)
[文档]
def have_entries(author: NaturalPerson,
status_in: list[AcademicEntry.EntryStatus]) -> bool:
"""
判断用户有无status属性为public/wait_audit...的学术地图条目(tag和text)
:param author: 条目作者用户
:type author: NaturalPerson
:param status_in: AcademicEntry.EntryStatus构成的list
:type status_in: list[AcademicEntry.EntryStatus]
:return: 是否有该类别的条目
:rtype: bool
"""
all_tag_entries = AcademicTagEntry.objects.activated().filter(
person=author, status__in=status_in)
all_text_entries = (AcademicTextEntry.objects.activated().filter(
person=author, status__in=status_in))
return bool(all_tag_entries) or bool(all_text_entries)