from datetime import datetime
from django.contrib import admin
from django.db.models import F, QuerySet
from django.utils.safestring import mark_safe
from utils.http.dependency import HttpRequest
from utils.models.query import sfilter, f
from utils.admin_utils import *
from app.models import *
from scheduler.cancel import remove_job
from app.YQPoint_utils import run_lottery
from app.org_utils import accept_modifyorg_submit
# 通用内联模型
[文档]
@readonly_inline
class PositionInline(admin.TabularInline):
model = Position
classes = ['collapse']
ordering = ['-id']
fields = [
'person', 'org',
'year', 'semester',
'is_admin', 'pos', 'status',
]
show_change_link = True
[文档]
@readonly_inline
class ParticipationInline(admin.TabularInline):
model = Participation
classes = ['collapse']
ordering = ['-' + f(model.activity)]
fields = [f(model.activity), f(model.person), f(model.status)]
show_change_link = True
[文档]
@readonly_inline
class CourseParticipantInline(admin.TabularInline):
model = CourseParticipant
classes = ['collapse']
ordering = ['-id']
fields = ['course', 'person', 'status']
show_change_link = True
# 后台模型
[文档]
@admin.register(NaturalPerson)
class NaturalPersonAdmin(admin.ModelAdmin):
_m = NaturalPerson
list_display = [
f(_m.person_id),
f(_m.name),
f(_m.identity),
]
search_fields = [f(_m.person_id, User.username), f(_m.name)]
readonly_fields = [f(_m.stu_id_dbonly)]
list_filter = [
f(_m.status), f(_m.identity),
f(_m.wechat_receive_level),
f(_m.stu_grade), f(_m.stu_class),
]
inlines = [PositionInline, ParticipationInline, CourseParticipantInline]
def _show_by_option(self, obj: NaturalPerson | None, option: str, detail: str):
if obj is None or getattr(obj, option):
return option, detail
return option
[文档]
def get_normal_fields(self, request, obj: NaturalPerson = None):
_m = NaturalPerson
fields = []
fields.append((f(_m.person_id), f(_m.stu_id_dbonly)))
fields.append(f(_m.name))
fields.append(self._show_by_option(obj, f(_m.show_nickname), f(_m.nickname)))
fields.append(self._show_by_option(obj, f(_m.show_gender), f(_m.gender)))
fields.extend([
f(_m.identity), f(_m.status),
f(_m.wechat_receive_level),
f(_m.accept_promote), f(_m.active_score),
])
return fields
[文档]
def get_student_fields(self, request, obj: NaturalPerson = None):
_m = NaturalPerson
fields = []
fields.append(f(_m.stu_grade))
fields.append(f(_m.stu_class))
fields.append(self._show_by_option(obj, f(_m.show_major), f(_m.stu_major)))
fields.append(self._show_by_option(obj, f(_m.show_email), f(_m.email)))
fields.append(self._show_by_option(obj, f(_m.show_tel), f(_m.telephone)))
fields.append(self._show_by_option(obj, f(_m.show_dorm), f(_m.stu_dorm)))
fields.append(self._show_by_option(obj, f(_m.show_birthday), f(_m.birthday)))
return fields
# 无论如何都不显示的字段
exclude = [
f(_m.avatar), f(_m.wallpaper), f(_m.QRcode), f(_m.biography),
f(_m.unsubscribe_list),
]
[文档]
def get_fieldsets(self, request, obj=None):
fieldsets = [
(None, {'fields': self.get_normal_fields(request, obj)}),
('学生信息', {'classes': ('collapse',),
'fields': self.get_student_fields(request, obj)}),
]
return fieldsets
[文档]
def view_on_site(self, obj: NaturalPerson):
return obj.get_absolute_url()
actions = [
'set_student', 'set_teacher',
'set_graduate', 'set_ungraduate',
'all_subscribe', 'all_unsubscribe',
]
[文档]
@as_action("设为 学生", update=True)
def set_student(self, request, queryset):
queryset.update(identity=NaturalPerson.Identity.STUDENT)
return self.message_user(request=request,
message='修改成功!')
[文档]
@as_action("设为 老师", update=True)
def set_teacher(self, request, queryset):
queryset.update(identity=NaturalPerson.Identity.TEACHER)
return self.message_user(request=request,
message='修改成功!')
[文档]
@as_action("设为 已毕业", update=True)
def set_graduate(self, request, queryset):
queryset.update(status=NaturalPerson.GraduateStatus.GRADUATED)
return self.message_user(request=request,
message='修改成功!')
[文档]
@as_action("设为 未毕业", update=True)
def set_ungraduate(self, request, queryset):
queryset.update(status=NaturalPerson.GraduateStatus.UNDERGRADUATED)
return self.message_user(request=request,
message='修改成功!')
[文档]
@as_action("设置 全部订阅")
def all_subscribe(self, request, queryset):
for org in queryset:
org.unsubscribe_list.clear()
org.save()
return self.message_user(request=request,
message='修改成功!')
[文档]
@as_action("设置 取消订阅")
def all_unsubscribe(self, request, queryset):
orgs = list(Organization.objects.exclude(
otype__otype_id=0).values_list('id', flat=True))
for person in queryset:
person.unsubscribe_list.set(orgs)
person.save()
return self.message_user(request=request,
message='修改成功!已经取消所有非官方组织的订阅!')
[文档]
@admin.register(Freshman)
class FreshmanAdmin(admin.ModelAdmin):
list_display = [
"sid",
"name",
"place",
"grade",
"status",
]
search_fields = ("sid", "name")
list_filter = ("status", "grade", "place")
[文档]
@admin.register(OrganizationType)
class OrganizationTypeAdmin(admin.ModelAdmin):
list_display = ["otype_id", "otype_name", "incharge", "job_name_list", "control_pos_threshold"]
search_fields = ("otype_name", "otype_id", "incharge__name", "job_name_list")
[文档]
@admin.register(Organization)
class OrganizationAdmin(admin.ModelAdmin):
list_display = ["organization_id", "oname", "otype", "Managers"]
search_fields = ("organization_id__username", "oname", "otype__otype_name")
list_filter = ["otype", "status"]
[文档]
def Managers(self, obj):
display = ''
all_pos = sorted(Position.objects.activated().filter(
org=obj, is_admin=True).values_list(
'pos', flat=True).distinct())
for pos in all_pos:
managers = Position.objects.activated().filter(
org=obj, pos=pos, is_admin=True)
if managers:
display += f'{obj.otype.get_name(pos)}:'
names = managers.values_list('person__name', flat=True)
display += f"<li>{'、'.join(names)}</li>"
if not display:
display = '暂无'
return mark_safe(display)
Managers.short_description = "管理者"
inlines = [PositionInline]
[文档]
def view_on_site(self, obj: Organization):
return obj.get_absolute_url()
actions = ['all_subscribe', 'all_unsubscribe']
[文档]
@as_action("设置 全部订阅")
def all_subscribe(self, request, queryset):
for org in queryset:
org.unsubscribers.clear()
org.save()
return self.message_user(request=request,
message='修改成功!')
[文档]
@as_action("设置 全部不订阅")
def all_unsubscribe(self, request, queryset):
persons = list(NaturalPerson.objects.all().values_list('id', flat=True))
for org in queryset:
org.unsubscribers.set(persons)
org.save()
return self.message_user(request=request,
message='修改成功!')
[文档]
@as_action("激活", actions, update=True)
def set_activate(self, request, queryset):
queryset.update(status=True)
return self.message_user(request, '修改成功!')
[文档]
@as_action("失效", actions, update=True)
def set_disabled(self, request, queryset):
queryset.update(status=False)
return self.message_user(request, '修改成功!')
[文档]
@admin.register(Position)
class PositionAdmin(admin.ModelAdmin):
list_display = ["person", "org", "pos", "pos_name", "year", "semester", "is_admin"]
search_fields = ("person__name", "org__oname", 'org__otype__otype_name')
list_filter = ('year', 'semester','is_admin', 'org__otype', 'pos')
autocomplete_fields = ['person', 'org']
[文档]
def pos_name(self, obj):
return obj.org.otype.get_name(obj.pos)
pos_name.short_description = "职务名称"
actions = ['demote', 'promote', 'to_member', 'to_manager', 'set_admin', 'set_not_admin']
[文档]
@as_action("职务等级 增加(降职)", update=True)
def demote(self, request, queryset):
for pos in queryset:
pos.pos += 1
pos.save()
return self.message_user(request=request,
message='修改成功!')
[文档]
@as_action("设为成员", update=True)
def to_member(self, request, queryset):
for pos in queryset:
pos.pos = pos.org.otype.get_length()
pos.is_admin = False
pos.save()
return self.message_user(request=request,
message='修改成功, 并收回了管理权限!')
[文档]
@as_action("设为负责人", update=True)
def to_manager(self, request, queryset):
for pos in queryset:
pos.pos = 0
pos.is_admin = True
pos.save()
return self.message_user(request=request,
message='修改成功, 并赋予了管理权限!')
[文档]
@as_action("赋予 管理权限", update=True)
def set_admin(self, request, queryset):
queryset.update(is_admin=True)
return self.message_user(request=request,
message='修改成功!')
[文档]
@as_action("收回 管理权限", update=True)
def set_not_admin(self, request, queryset):
queryset.update(is_admin=False)
return self.message_user(request=request,
message='修改成功!')
[文档]
@as_action("延长职务年限", actions, atomic=True)
def refresh(self, request, queryset):
from boot.config import GLOBAL_CONFIG
new = []
for position in queryset:
position: Position
if position.year != GLOBAL_CONFIG.acadamic_year and not Position.objects.filter(
person=position.person, org=position.org,
year=GLOBAL_CONFIG.acadamic_year).exists():
position.year = GLOBAL_CONFIG.acadamic_year
position.pk = None
position.save(force_insert=True)
new.append([position.pk, position.person.get_display_name()])
return self.message_user(request, f'修改成功!新增职务:{new}')
[文档]
@admin.register(Activity)
class ActivityAdmin(admin.ModelAdmin):
list_display = ["title", 'id', "organization_id",
"status", "participant_diaplay",
"publish_time", "start", "end",]
search_fields = ('id', "title", "organization_id__oname",
"current_participants",)
[文档]
class ErrorFilter(admin.SimpleListFilter):
title = '错误状态' # 过滤标题显示为"以 错误状态"
parameter_name = 'wrong_status' # 过滤器使用的过滤字段
[文档]
def lookups(self, request, model_admin):
'''针对字段值设置过滤器的显示效果'''
return (
('all', '全部错误状态'),
('not_waiting', '未进入 等待中 状态'),
('not_processing', '未进入 进行中 状态'),
('not_end', '未进入 已结束 状态'),
('review_end', '已结束的未审核'),
('normal', '正常'),
)
[文档]
def queryset(self, request, queryset):
'''定义过滤器的过滤动作'''
now = datetime.now()
error_id_set = set()
activate_queryset = queryset.exclude(
status__in=[
Activity.Status.REVIEWING,
Activity.Status.CANCELED,
Activity.Status.REJECT,
Activity.Status.ABORT,
])
if self.value() in ['not_waiting', 'all', 'normal']:
error_id_set.update(activate_queryset.exclude(
status=Activity.Status.WAITING).filter(
apply_end__lte=now,
start__gt=now,
).values_list('id', flat=True))
if self.value() in ['not_processing', 'all', 'normal']:
error_id_set.update(activate_queryset.exclude(
status=Activity.Status.PROGRESSING).filter(
start__lte=now,
end__gt=now,
).values_list('id', flat=True))
if self.value() in ['not_end', 'all', 'normal']:
error_id_set.update(activate_queryset.exclude(
status=Activity.Status.END).filter(
end__lte=now,
).values_list('id', flat=True))
if self.value() in ['review_end', 'all', 'normal']:
error_id_set.update(queryset.filter(
status=Activity.Status.REVIEWING,
end__lte=now,
).values_list('id', flat=True))
if self.value() == 'normal':
return queryset.exclude(id__in=error_id_set)
elif self.value() is not None:
return queryset.filter(id__in=error_id_set)
return queryset
list_filter = (
"status",
'year', 'semester', 'category',
"organization_id__otype",
"inner", "need_checkin", "valid",
ErrorFilter,
'endbefore',
"publish_time", 'start', 'end',
)
date_hierarchy = 'start'
[文档]
def participant_diaplay(self, obj):
return f'{obj.current_participants}/{"无限" if obj.capacity == 10000 else obj.capacity}'
participant_diaplay.short_description = "报名情况"
inlines = [ParticipationInline]
actions = []
[文档]
@as_action("更新 报名人数", actions, update=True)
def refresh_count(self, request, queryset: QuerySet[Activity]):
for activity in queryset:
activity.current_participants = sfilter(
Participation.activity, activity).filter(
status__in=[
Participation.AttendStatus.ATTENDED,
Participation.AttendStatus.UNATTENDED,
Participation.AttendStatus.APPLYSUCCESS,
]).count()
activity.save()
return self.message_user(request=request, message='修改成功!')
[文档]
@as_action('设为 普通活动', actions, update=True)
def set_normal_category(self, request, queryset):
queryset.update(category=Activity.ActivityCategory.NORMAL)
return self.message_user(request=request, message='修改成功!')
[文档]
@as_action('设为 课程活动', actions, update=True)
def set_course_category(self, request, queryset):
queryset.update(category=Activity.ActivityCategory.COURSE)
return self.message_user(request=request, message='修改成功!')
def _change_status(self, activity, from_status, to_status):
from app.activity_utils import changeActivityStatus
changeActivityStatus(activity.id, from_status, to_status)
if remove_job(f'activity_{activity.id}_{to_status}'):
return '修改成功, 并移除了定时任务!'
else:
return '修改成功!'
[文档]
@as_action("进入 等待中 状态", actions, single=True)
def to_waiting(self, request, queryset):
_from, _to = Activity.Status.APPLYING, Activity.Status.WAITING
msg = self._change_status(queryset[0], _from, _to)
return self.message_user(request, msg)
[文档]
@as_action("进入 进行中 状态", actions, single=True)
def to_processing(self, request, queryset):
_from, _to = Activity.Status.WAITING, Activity.Status.PROGRESSING
msg = self._change_status(queryset[0], _from, _to)
return self.message_user(request, msg)
[文档]
@as_action("进入 已结束 状态", actions, single=True)
def to_end(self, request, queryset):
_from, _to = Activity.Status.PROGRESSING, Activity.Status.END
msg = self._change_status(queryset[0], _from, _to)
return self.message_user(request, msg)
[文档]
@as_action("取消 定时任务", actions)
def cancel_scheduler(self, request, queryset):
success_list = []
failed_list = []
CANCEL_STATUSES = [
'remind',
Activity.Status.END,
Activity.Status.PROGRESSING,
Activity.Status.WAITING,
]
for activity in queryset:
failed_statuses = []
for status in CANCEL_STATUSES:
if not remove_job(f'activity_{activity.id}_{status}'):
failed_statuses.append(status)
if failed_statuses:
if len(failed_statuses) != len(CANCEL_STATUSES):
failed_list.append(f'{activity.id}: {",".join(failed_statuses)}')
else:
failed_list.append(f'{activity.id}')
else:
success_list.append(f'{activity.id}')
msg = f'成功取消{len(success_list)}项活动的定时任务!' if success_list else '未能完全取消任何任务'
if failed_list:
msg += f'\n{len(failed_list)}项活动取消失败:\n{";".join(failed_list)}'
return self.message_user(request=request, message=msg)
[文档]
@admin.register(Participation)
class ParticipationAdmin(admin.ModelAdmin):
_m = Participation
_act = _m.activity
list_display = ['id', f(_act), f(_m.person), f(_m.status)]
search_fields = ['id', f(_act, 'id'), f(_act, Activity.title),
f(_m.person, NaturalPerson.name)]
list_filter = [
f(_m.status), f(_act, Activity.category),
f(_act, Activity.year), f(_act, Activity.semester),
]
[文档]
@admin.register(Notification)
class NotificationAdmin(admin.ModelAdmin):
list_display = ["id", "receiver", "sender", "title", "start_time"]
search_fields = ('id', "receiver__username", "sender__username", 'title')
list_filter = ('start_time', 'status', 'typename', "finish_time")
actions = [
'set_delete',
'republish',
'republish_bulk_at_promote', 'republish_bulk_at_message',
]
[文档]
@as_action("设置状态为 删除", update=True)
def set_delete(self, request, queryset):
queryset.update(status=Notification.Status.DELETE)
return self.message_user(request=request,
message='修改成功!')
[文档]
@as_action("重发 单个通知")
def republish(self, request, queryset):
if len(queryset) != 1:
return self.message_user(request=request,
message='一次只能重发一个通知!',
level='error')
notification = queryset[0]
from app.extern.wechat import publish_notification, WechatApp
if not publish_notification(
notification,
app=WechatApp.NORMAL,
):
return self.message_user(request=request,
message='发送失败!请检查通知内容!',
level='error')
return self.message_user(request=request,
message='已成功定时,将发送至默认窗口!')
[文档]
def republish_bulk(self, request, queryset, app):
if not request.user.is_superuser:
return self.message_user(request=request,
message='操作失败,没有权限,请联系老师!',
level='warning')
if len(queryset) != 1:
return self.message_user(request=request,
message='一次只能选择一个通知!',
level='error')
bulk_identifier = queryset[0].bulk_identifier
if not bulk_identifier:
return self.message_user(request=request,
message='该通知不存在批次标识!',
level='error')
try:
from app.extern.wechat import publish_notifications
except Exception as e:
return self.message_user(request=request,
message=f'导入失败, 原因: {e}',
level='error')
if not publish_notifications(
filter_kws={'bulk_identifier': bulk_identifier},
app=app,
):
return self.message_user(request=request,
message='发送失败!请检查通知内容!',
level='error')
return self.message_user(request=request,
message=f'已成功定时!标识为{bulk_identifier}')
republish_bulk.short_description = "错误的重发操作"
[文档]
@as_action("重发 所在批次 于 消息窗口")
def republish_bulk_at_message(self, request, queryset):
try:
from app.extern.wechat import WechatApp
app = WechatApp._MESSAGE
except Exception as e:
return self.message_user(request=request,
message=f'导入失败, 原因: {e}',
level='error')
return self.republish_bulk(request, queryset, app)
[文档]
@admin.register(Help)
class HelpAdmin(admin.ModelAdmin):
list_display = ["id", "title"]
[文档]
@admin.register(Wishes)
class WishesAdmin(admin.ModelAdmin):
list_display = ["id", "text", 'time', "background_display"]
list_filter = ('time', 'background')
[文档]
def background_display(self, obj):
return mark_safe(f'<span style="color: {obj.background};"><strong>{obj.background}</strong></span>')
background_display.short_description = "背景颜色"
actions = ['change_color']
[文档]
@as_action("随机设置背景颜色", superuser=False, update=True)
def change_color(self, request, queryset):
for wish in queryset:
wish.background = Wishes.rand_color()
wish.save()
return self.message_user(request=request,
message='修改成功!已经随机设置了背景颜色!')
[文档]
@admin.register(ModifyRecord)
class ModifyRecordAdmin(admin.ModelAdmin):
list_display = ["id", "user", "usertype", "name", 'time']
search_fields = ('id', "user__username", "name")
list_filter = ('time', 'usertype')
actions = ['get_rank']
[文档]
@as_action("查询排名", superuser=False)
def get_rank(self, request, queryset):
if len(queryset) != 1:
return self.message_user(
request=request, message='一次只能查询一个用户的排名!', level='error')
try:
record = queryset[0]
usertype = record.usertype
records = ModifyRecord.objects.filter(
user=record.user, usertype=usertype)
first = records.order_by('time')[0]
rank = ModifyRecord.objects.filter(
usertype=usertype,
time__lte=first.time,
).values('user').distinct().count()
return self.message_user(request=request,
message=f'查询成功: {first.name}的排名为{rank}!')
except Exception as e:
return self.message_user(request=request,
message=f'查询失败: {e}!', level='error')
[文档]
@admin.register(ModifyPosition)
class ModifyPositionAdmin(admin.ModelAdmin):
list_display = ["id", "person", "org", "apply_type", "status"]
search_fields = ("org__oname", "person__name")
list_filter = ("apply_type", 'status', "org__otype", 'time', 'modify_time',)
[文档]
@admin.register(ModifyOrganization)
class ModifyOrganizationAdmin(admin.ModelAdmin):
list_display = ["id", "oname", "otype", "pos", "get_poster_name", "status"]
search_fields = ("id", "oname", "otype__otype_name", "pos__username",)
list_filter = ('status', "otype", 'time', 'modify_time',)
actions = []
ModifyOrganization.get_poster_name.short_description = "申请者"
[文档]
@as_action("同意申请", actions, 'change', update = True)
def approve_requests(self, request, queryset: QuerySet['ModifyOrganization']):
for application in queryset:
accept_modifyorg_submit(application)
self.message_user(request, '操作成功完成!')
[文档]
@admin.register(Course)
class CourseAdmin(admin.ModelAdmin):
list_display = [
"name",
"organization",
"type",
"participant_diaplay",
"status",
]
search_fields = (
"name", "organization__oname",
'classroom', 'teacher',
)
list_filter = ("year", "semester", "type", "status",)
autocomplete_fields = ['organization']
[文档]
class CourseTimeInline(admin.TabularInline):
model = CourseTime
classes = ['collapse']
extra = 1
inlines = [CourseTimeInline, CourseParticipantInline]
[文档]
def participant_diaplay(self, obj):
return f'{obj.current_participants}/{"无限" if obj.capacity == 10000 else obj.capacity}'
participant_diaplay.short_description = "报名情况"
actions = []
[文档]
@as_action("更新课程状态", actions)
def refresh_status(self, request, queryset):
from app.course_utils import register_selection
register_selection()
return self.message_user(request=request,
message='已设置定时任务!')
[文档]
@as_action("更新课程状态 延迟2分钟", actions)
def refresh_status_delay2(self, request, queryset):
from app.course_utils import register_selection
from datetime import timedelta
register_selection(wait_for=timedelta(minutes=2))
return self.message_user(request=request,
message='已设置定时任务!')
[文档]
@admin.register(CourseParticipant)
class CourseParticipantAdmin(admin.ModelAdmin):
list_display = ["course", "person", "status",]
search_fields = ("course__name", "person__name",)
autocomplete_fields = ['course', 'person']
[文档]
@admin.register(CourseRecord)
class CourseRecordAdmin(admin.ModelAdmin):
_m = CourseRecord
list_display = [
_m.get_course_name, f(_m.person),
f(_m.year), f(_m.semester),
f(_m.attend_times), f(_m.total_hours),
f(_m.invalid),
]
search_fields = [
f(_m.course, Course.name), f(_m.extra_name),
f(_m.person, NaturalPerson.name),
f(_m.person, NaturalPerson.person_id, User.username),
]
[文档]
class TypeFilter(admin.SimpleListFilter):
title = '学时类别'
parameter_name = 'type' # 过滤器使用的过滤字段
[文档]
def lookups(self, request, model_admin):
'''针对字段值设置过滤器的显示效果'''
# 自带一个None, '全部'
return (
('null', '无'),
('any', '任意'),
) + tuple(Course.CourseType.choices)
[文档]
def queryset(self, request, queryset):
'''定义过滤器的过滤动作'''
if self.value() == 'null':
return queryset.filter(course__isnull=True)
elif self.value() == 'any':
return queryset.exclude(course__isnull=True)
elif self.value() in map(str, Course.CourseType.values):
return queryset.filter(course__type=self.value())
return queryset
list_filter = [TypeFilter, f(_m.year), f(_m.semester), f(_m.invalid)]
autocomplete_fields = [f(_m.person), f(_m.course)]
actions = []
[文档]
@as_action("设置为 无效学时", actions, update=True)
def set_invalid(self, request, queryset):
queryset.update(invalid=True)
return self.message_user(request=request, message='修改成功!')
[文档]
@as_action("设置为 有效学时", actions, update=True)
def set_valid(self, request, queryset):
queryset.update(invalid=False)
return self.message_user(request=request, message='修改成功!')
[文档]
@admin.register(AcademicTag)
class AcademicTagAdmin(admin.ModelAdmin):
list_display = ["atype", "tag_content"]
search_fields = ("atype", "tag_content")
list_filter = ["atype"]
[文档]
class AcademicEntryAdmin(admin.ModelAdmin):
actions = []
[文档]
@as_action("通过审核", actions, 'change', update=True)
def accept(self, request, queryset):
queryset.filter(status=AcademicEntry.EntryStatus.WAIT_AUDIT
).update(status=AcademicEntry.EntryStatus.PUBLIC)
return self.message_user(request, '修改成功!')
[文档]
@as_action("取消公开", actions, 'change', update=True)
def reject(self, request, queryset):
queryset.filter(status=AcademicEntry.EntryStatus.PUBLIC
).update(status=AcademicEntry.EntryStatus.WAIT_AUDIT)
return self.message_user(request, '修改成功!')
[文档]
@admin.register(AcademicTagEntry)
class AcademicTagEntryAdmin(AcademicEntryAdmin):
list_display = ["person", "status", "tag"]
search_fields = ("person__name", "tag__tag_content")
list_filter = ["tag__atype", "status"]
[文档]
@admin.register(AcademicTextEntry)
class AcademicTextEntryAdmin(AcademicEntryAdmin):
list_display = ["person", "status", "atype", "content"]
search_fields = ("person__name", "content")
list_filter = ["atype", "status"]
[文档]
class PoolItemInline(admin.TabularInline):
model = PoolItem
classes = ['collapse']
ordering = ['-id']
fields = ['pool', 'prize', 'origin_num', 'consumed_num', 'exchange_limit', 'exchange_price']
show_change_link = True
PoolItemInline = readonly_inline(PoolItemInline, can_add=True)
[文档]
@admin.register(Prize)
class PrizeAdmin(admin.ModelAdmin):
autocomplete_fields = ['provider']
inlines = [PoolItemInline]
[文档]
@admin.register(Pool)
class PoolAdmin(admin.ModelAdmin):
inlines = [PoolItemInline]
actions = []
def _do_draw_lots(self, request, queryset: QuerySet['Pool']):
'''对queryset中所有未完成抽奖的抽奖奖池进行奖品分配。
这个函数假定queryset已经被select_for_update锁定,所以可以安全地查找“奖池记录”中与该奖池有关的行。
'''
lottery_pool_ids = list(queryset.filter(type = Pool.Type.LOTTERY).values_list('id', flat = True))
for pool_id in lottery_pool_ids:
pool_title = Pool.objects.get(id = pool_id).title
if PoolRecord.objects.filter(
pool__id = pool_id
).exclude(
status = PoolRecord.Status.LOTTERING
).exists():
self.message_user(request, "奖池【" + pool_title + "】在调用前已完成抽奖", 'warning')
continue
run_lottery(pool_id)
self.message_user(request, "奖池【" + pool_title + "】抽奖已完成")
[文档]
@as_action('立即抽奖', actions, 'change', update = True)
def draw_lots(self, request, queryset: QuerySet['Pool']):
self._do_draw_lots(request, queryset)
[文档]
@as_action('立即停止并抽奖', actions, 'change', update = True)
def stop_and_draw(self, request, queryset: QuerySet['Pool']):
queryset.update(end = datetime.now())
self.message_user(request, "已将选中奖池全部停止")
self._do_draw_lots(request, queryset)
[文档]
@admin.register(PoolRecord)
class PoolRecordAdmin(admin.ModelAdmin):
list_display = ['user_display', 'pool', 'status', 'prize', 'time']
search_fields = ['user__name']
list_filter = [
'status', 'prize', 'time',
('prize__provider', admin.RelatedOnlyFieldListFilter),
]
readonly_fields = ['time']
autocomplete_fields = ['user']
actions = []
[文档]
@as_display('用户')
def user_display(self, obj: PoolRecord):
return obj.user.name
[文档]
def has_manage_permission(self, request: HttpRequest, record: PoolRecord = None) -> bool:
if not request.user.is_authenticated:
return False
if record is not None:
return record.prize.provider == request.user
return Prize.objects.filter(provider=request.user).exists()
# return super().get_queryset(request).filter(prize__provider=request.user).exists()
[文档]
def has_module_permission(self, request: HttpRequest) -> bool:
return super().has_module_permission(request) or self.has_manage_permission(request)
[文档]
def has_view_permission(self, request: HttpRequest, obj: PoolRecord = None) -> bool:
return super().has_view_permission(request, obj) or self.has_manage_permission(request, obj)
[文档]
def get_queryset(self, request: HttpRequest):
qs = super().get_queryset(request)
if not self.has_change_permission(request) and self.has_manage_permission(request):
qs = qs.filter(prize__provider=request.user)
return qs
[文档]
@as_action('兑换', actions, ['change', 'manage'], update=True, single=False) # 修改single为False以允许选择多个记录
def redeem_prize(self, request, queryset):
# 检查权限和记录状态,统计可以兑换的记录数
redeemable_records = []
for record in queryset:
if not (self.has_change_permission(request, record) or self.has_manage_permission(request, record)):
return self.message_user(request, '无权负责一部分选定的礼品兑换!', 'warning')
if record.status != PoolRecord.Status.UN_REDEEM:
return self.message_user(request, f'奖品 {record.prize.name} 已被兑换或不可兑换!', 'warning')
redeemable_records.append(record)
# 如果没有可兑换的记录,提前返回
if not redeemable_records:
return self.message_usesr(request, '没有可兑换的奖品!', 'error')
# 对可以兑换的记录进行兑换处理
for record in redeemable_records:
if record.prize.name.startswith('信用分'):
User.objects.modify_credit(record.user, 1, '元气值:兑换')
record.status = PoolRecord.Status.REDEEMED
record.redeem_time = datetime.now()
record.save()
# 返回成功信息
self.message_user(request, f'成功兑换 {len(redeemable_records)} 个奖品!')
[文档]
@admin.register(HomepageImage)
class HomepageImageAdmin(admin.ModelAdmin):
actions = []
list_display = ['id', 'image', 'description', 'redirect_url', 'activated', 'sort_id']
list_filter = ['activated']
ordering = ['sort_id']
search_fields = ['image', 'description']
[文档]
@as_action("显示所选的图片", actions, ['change'], atomic = True, update = True)
def activate_selected(self, request, queryset: QuerySet[HomepageImage]):
queryset.update(activated = True)
[文档]
@as_action("不显示所选的图片", actions, ['change'], atomic = True, update = True)
def deactivate_selected(self, request, queryset: QuerySet[HomepageImage]):
queryset.update(activated = False)
[文档]
@as_action("所选图片展示顺序+1", actions, ['change'], atomic = True, update = True)
def increment_sort_id(self, request, queryset: QuerySet[HomepageImage]):
queryset.update(sort_id = F('sort_id') + 1)
[文档]
@as_action("所选图片展示顺序-1", actions, ['change'], atomic = True, update = True)
def decrement_sort_id(self, request, queryset: QuerySet[HomepageImage]):
queryset.update(sort_id = F('sort_id') - 1)
admin.site.register(OrganizationTag)
admin.site.register(Comment)
admin.site.register(CommentPhoto)
admin.site.register(PoolItem)
admin.site.register(ActivitySummary)