app.comment_utils 源代码

from utils.http import UserRequest

from app.utils_dependency import *
from app.models import (
    User,
    NaturalPerson,
    Notification,
    Comment,
    CommentPhoto,
)
from app.utils import (
    get_person_or_org,
    if_image,
)
from app.notification_utils import notification_create
from app.extern.wechat import (
    WechatApp,
    WechatMessageLevel,
)
from app.log import logger


[文档] @logger.secure_func(raise_exc=True) def addComment(request: UserRequest, comment_base, receiver=None, *, anonymous=False, notification_title=None) -> MESSAGECONTEXT: """添加评论 Args: ---- - request<WSGIRequest>: 传入的 request,其中 POST 参数至少应当包括: - comment_submit - comment - comment_base<Commentbase object>: 以 Commentbase 为基类的对象。 - 目前的 Commentbase 对象只有四种: modifyposition,neworganization,activity,feedback。 - 2022.8.17加入Chat,用于学术地图问答 - 添加 Commentbase 类型需要在 `content` 和 `URL` 中添加键值对。 - 注意:该对象会被调用**`save`保存** - receiver<User object/iterable>: - 为User object时,只向一个user发布通知消息; - 为iterable时,向该可迭代对象中的所有user发布通知消息。 - 注意:**不批量创建**通知,receiver个数应为常量级 Returns: context<dict>: 继承自wrong/succeed, 成功时包含new_comment """ sender = get_person_or_org(request.user) sender_name = "匿名者" if anonymous else sender.get_display_name() typename = comment_base.typename content = { 'modifyposition': f'{sender_name}在成员变动申请留有新的评论', 'neworganization': f'{sender_name}在新建小组中留有新的评论', 'activity': f"{sender_name}在活动申请中留有新的评论", 'feedback': f"{sender_name}在反馈中心留有新的评论", 'Chat': "问答中有新的消息", } URL = { 'modifyposition': f'/modifyPosition/?pos_id={comment_base.id}', 'neworganization': f'/modifyOrganization/?org_id={comment_base.id}', 'activity': f"/examineActivity/{comment_base.id}" # 发送者如果是组织,接收者就是老师 if request.user.is_org() else f"/editActivity/{comment_base.id}", 'feedback': f"/viewFeedback/{comment_base.id}", 'Chat': f"/viewQA/{comment_base.id}" } # 新建评论信息,并保存 if request.POST.get("comment_submit") is not None: text = str(request.POST.get("comment")) # 检查图片合法性 comment_images = request.FILES.getlist('comment_images') if not text and not comment_images: return wrong("内容为空,无法发送!") for comment_image in comment_images: if if_image(comment_image) != 2: return wrong("附件只支持图片格式。") try: with transaction.atomic(): new_comment = Comment.objects.create( commentbase=comment_base, commentator=request.user, text=text ) for comment_image in comment_images: CommentPhoto.objects.create( image=comment_image, comment=new_comment ) comment_base.save() # 每次save都会更新修改时间 except: return wrong("发送失败,请联系管理员。") if len(text) >= 32: text = text[:31] + "……" if len(text) > 0: text = f'{content[typename]}{text}' else: text = f'{content[typename]}。' if notification_title is None: notification_title = Notification.Title.VERIFY_INFORM # 向一个用户或多个用户发布消息 if receiver is not None: receivers = receiver if isinstance(receivers, User): receivers = [receivers] # 向多个用户发布消息 for _receiver in receivers: notification_create( _receiver, request.user, Notification.Type.NEEDREAD, notification_title, text, URL[typename], to_wechat=dict(app=WechatApp.AUDIT, level=WechatMessageLevel.INFO), anonymous_flag=anonymous, ) context = succeed("发送成功。") context["new_comment"] = new_comment return context else: return wrong(f"找不到信息, 请重试!")
[文档] @logger.secure_func(raise_exc=True) def showComment(commentbase, anonymous_users=None) -> list[dict]: ''' 获取可展示的对象相关评论,返回以时间顺序展示的评论列表,应赋值为`comments` ''' if commentbase is None: return None try: comments: QuerySet[Comment] = commentbase.comments.order_by("time") except: return None comments_display = [] anonymous_users = set() if anonymous_users is None else set(anonymous_users) for comment in comments: display = dict( text=comment.text, time=comment.time, ) commentator = get_person_or_org(comment.commentator) if anonymous_users and comment.commentator in anonymous_users: commentator_display = dict( name='匿名用户', avatar=NaturalPerson.get_user_ava(), ) else: commentator_display = dict( name=commentator.get_display_name(), avatar=commentator.get_user_ava(), URL=commentator.get_absolute_url(), ) commentator_display.update(real_name=commentator.get_display_name()) display.update(commentator=commentator_display) photos = list(comment.comment_photos.all()) display.update(photos=photos) comments_display.append(display) return comments_display