app.management.commands.merge_duplicate_positions 源代码
from django.core.management.base import BaseCommand
from django.db import transaction
from app.models import NaturalPerson, Organization, Position
from collections import defaultdict
[文档]
class Command(BaseCommand):
help = 'Merge duplicate Position objects by keeping the one with minimum position value'
[文档]
def add_arguments(self, parser):
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be done without actually making changes',
)
[文档]
def handle(self, *args, **options):
dry_run = options['dry_run']
if dry_run:
self.stdout.write(self.style.WARNING(
'DRY RUN MODE - No changes will be made'))
if not dry_run:
# Wrap the entire operation in a single transaction to avoid race conditions
with transaction.atomic():
self._process_duplicates(dry_run)
else:
# For dry run, we don't need transaction protection
self._process_duplicates(dry_run)
def _process_duplicates(self, dry_run):
"""Process duplicate positions within a transaction"""
# Find duplicates based on (person, org, semester, year)
duplicates = self.find_duplicates()
if not duplicates:
self.stdout.write(self.style.SUCCESS(
'No duplicate Position objects found.'))
return
self.stdout.write(
f'Found {len(duplicates)} groups of duplicate Position objects.')
total_merged = 0
total_deleted = 0
for group_key, positions in duplicates.items():
if len(positions) <= 1:
continue
# Format group key for user-friendly display
person_id, org_id, semester, year = group_key
person = NaturalPerson.objects.get(id=person_id)
org = Organization.objects.get(id=org_id)
group_display = f"{person} in {org} ({year} {semester})"
self.stdout.write(f'\nProcessing group: {group_display}')
self.stdout.write(f' Found {len(positions)} duplicate positions')
# Sort by position value to find the minimum
positions_sorted = sorted(positions, key=lambda p: p.pos)
# Check if any of the positions to be deleted are admin positions
admin_positions_to_delete = [
pos for pos in positions_sorted[1:] if pos.is_admin]
if admin_positions_to_delete:
self.stdout.write(
self.style.ERROR(
f' ERROR: Found {len(admin_positions_to_delete)} admin positions to delete!'
)
)
self.stdout.write(
self.style.ERROR(
f' Rolling back entire transaction to avoid deleting admin positions.'
)
)
raise ValueError(
f'Cannot delete admin positions for {group_display}. '
f'Found {len(admin_positions_to_delete)} admin positions that would be deleted.'
)
keep_position = positions_sorted[0]
delete_positions = positions_sorted[1:]
self.stdout.write(
f' Keeping position ID {keep_position.id} with pos={keep_position.pos}'
)
self.stdout.write(
f' Deleting {len(delete_positions)} duplicate positions'
)
if not dry_run:
try:
# Delete the duplicate positions
for pos in delete_positions:
pos.delete()
total_deleted += 1
total_merged += 1
except Exception as e:
self.stdout.write(
self.style.ERROR(
f' Error processing group {group_key}: {e}')
)
# Re-raise to rollback the entire transaction
raise
else:
total_deleted += len(delete_positions)
total_merged += 1
if dry_run:
self.stdout.write(
self.style.SUCCESS(
f'\nDRY RUN SUMMARY: Would merge {total_merged} groups and delete {total_deleted} duplicate positions'
)
)
else:
self.stdout.write(
self.style.SUCCESS(
f'\nSUCCESS: Merged {total_merged} groups and deleted {total_deleted} duplicate positions'
)
)
[文档]
def find_duplicates(self) -> dict[tuple[int, int, str, int], list[Position]]:
"""Find groups of duplicate Position objects based on (person, org, semester, year)
Returns a dictionary that maps (person_id, org_id, semester, year) to a list of Position objects.
"""
duplicates = defaultdict(list)
# Get all positions and group them by the unique constraint fields
positions = Position.objects.all()
for position in positions:
group_key = (
position.person_id,
position.org_id,
position.semester,
position.year
)
duplicates[group_key].append(position)
# Filter to only groups with more than one position
return {k: v for k, v in duplicates.items() if len(v) > 1}