generic.admin 源代码

from django.contrib import admin
from django.contrib.auth.admin import UserAdmin as _UserAdmin
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
from django.db.models import QuerySet

from utils.models.query import svlist
from utils.admin_utils import *
from generic.models import *
from generic.models import to_acronym


# 后台显示
admin.site.site_title = '元培智慧书院管理后台'
admin.site.site_header = '元培智慧书院 - 管理后台'


# Django自带模型
[文档] @admin.register(ContentType) class ContentTypeAdmin(admin.ModelAdmin): list_display = ['app_label', 'model', 'name'] list_filter = ['app_label'] def _check_invalid(self, request, obj: ContentType | None = None): if request.user.is_superuser and obj is not None: return obj.model_class() is None return False has_add_permission = _check_invalid has_change_permission = _check_invalid has_delete_permission = _check_invalid
[文档] @admin.register(Permission) class PermissionAdmin(admin.ModelAdmin): list_display = ['name', 'codename'] list_filter = ['content_type__app_label'] def _check_failed(self, request, obj=None): return False has_add_permission = _check_failed has_change_permission = _check_failed
[文档] def has_delete_permission(self, request, obj: Permission | None = None): if request.user.is_superuser and obj is not None: return ContentTypeAdmin._check_invalid(None, request, obj.content_type) return False
actions = []
[文档] @as_action('更新权限名称', actions, update=True) def update_name(self, request, queryset: QuerySet[Permission]): for perm in queryset: content_type: ContentType = perm.content_type if content_type.model_class() is None: continue try: prefix, perm_name, model = perm.name.split(maxsplit=2) except: continue model = content_type.name perm.name = ' '.join([prefix, perm_name, model]) perm.save(update_fields=['name']) return self.message_user(request, '操作成功')
# 通用模型后台
[文档] class UserAdmin(_UserAdmin): list_display = [ 'id', 'username', 'name', 'credit', 'YQpoint', 'utype', 'is_staff', 'is_superuser', ] # list_editable = ['credit'] search_fields = ['id', 'username', 'name', 'pinyin', 'acronym']
[文档] @classmethod def suggest_search_fields(cls, user_field: str = 'user'): return [f'{user_field}__{field}' for field in cls.search_fields[1:]]
list_filter = [ 'utype', 'is_superuser', 'is_staff', 'groups', 'active', 'is_active', ] fieldsets = [ (None, {'fields': ('username', 'name', 'acronym', 'pinyin', 'password')}), ('自定义信息', {'fields': ['credit', 'YQpoint', 'utype', 'is_newuser', 'active']}), # 内置部分 ('权限', { 'fields': ('is_active', 'is_staff', 'is_superuser', 'groups', 'user_permissions'), 'classes': ['collapse'] }), ('内置信息', {'fields': ('first_name', 'last_name', 'email'), 'classes': ['collapse']}), ('日期', {'fields': ('last_login', 'date_joined'), 'classes': ['collapse']}), ] actions = []
[文档] @as_action('同步用户类型', actions, atomic=True) def sync_user_type(self, request, queryset): from app.models import NaturalPerson, Organization User.objects.filter(id__in=svlist(Organization.organization_id) ).select_for_update().update(utype=User.Type.ORG) User.objects.filter(id__in=svlist(NaturalPerson.person_id) ).exclude(utype__in=User.Type.Persons() ).select_for_update().update(utype=User.Type.PERSON) return self.message_user(request, '操作成功')
[文档] @as_action('同步用户名称', actions, atomic=True) def sync_user_name(self, request, queryset): from app.models import NaturalPerson, Organization org_users = User.objects.filter(id__in=svlist(Organization.organization_id) ).select_for_update().select_related('organization') for user in org_users: user.name = user.organization.get_display_name() User.objects.bulk_update(org_users, ['name']) person_users = User.objects.filter(id__in=svlist(NaturalPerson.person_id) ).select_for_update().select_related('naturalperson') for user in person_users: user.name = user.naturalperson.get_display_name() User.objects.bulk_update(person_users, ['name']) return self.message_user(request, '操作成功')
def _update_acronym(self, queryset: QuerySet[User]): for user in queryset: user.acronym = to_acronym(user.name) User.objects.bulk_update(queryset, ['acronym'])
[文档] @as_action('更新全部缩写', actions, atomic=True) def renew_acronym(self, request, queryset): self._update_acronym(User.objects.select_for_update()) return self.message_user(request, '更新全部缩写成功!')
[文档] @as_action('更新名称缩写', actions, update=True) def renew_pinyin(self, request, queryset): self._update_acronym(queryset) return self.message_user(request, '更新名称缩写成功!')
[文档] @as_action('重置信用分', actions, atomic=True) def refresh_credit(self, request, queryset): User.objects.bulk_recover_credit(queryset, User.MAX_CREDIT, '用户:重置') return self.message_user(request, '操作成功!')
[文档] @as_action('恢复信用分 1分', actions, atomic=True) def recover_credit(self, request, queryset): User.objects.bulk_recover_credit(queryset, 1, '用户:恢复') return self.message_user(request, '操作成功!')
[文档] @as_action('全体恢复信用分 1分', actions, atomic=True) def recover(self, request, queryset): User.objects.bulk_recover_credit(User.objects.all(), 1, '用户:全体恢复') return self.message_user(request, '操作成功!')
admin.site.register(User, UserAdmin)
[文档] @admin.register(PermissionBlacklist) class PermissionBlacklistAdmin(admin.ModelAdmin): list_display = ['user', 'permission'] search_fields = [*UserAdmin.suggest_search_fields(), 'permission__name']
[文档] @admin.register(CreditRecord) class CreditRecordAdmin(admin.ModelAdmin): list_display = ['user', 'source', 'delta', 'overflow', 'time'] list_filter = [ 'time', 'source', 'overflow', 'old_credit', 'new_credit', get_sign_filter('delta', '变化类型'), ] search_fields = [*UserAdmin.suggest_search_fields(), 'source'] date_hierarchy = 'time'
[文档] @admin.register(YQPointRecord) class YQPointRecordAdmin(admin.ModelAdmin): list_display = ['user', 'source', 'delta', 'time'] list_filter = [ 'time', 'source_type', get_sign_filter('delta', '变化类型', choices=(('+', '收入'), ('-', '支出'))), ] search_fields = [*UserAdmin.suggest_search_fields(), 'source'] date_hierarchy = 'time'