import random
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta, date
from django.db.models import QuerySet, Q, Exists, OuterRef
from django.forms.models import model_to_dict
from generic.models import User, YQPointRecord
from app.config import CONFIG
from app.utils_dependency import *
from app.models import (
Pool,
PoolItem,
PoolRecord,
Notification,
Organization,
Participation,
)
from app.extern.wechat import WechatApp, WechatMessageLevel
from app.notification_utils import bulk_notification_create, notification_create
from achievement.api import unlock_signin_achievements
__all__ = [
'add_signin_point',
'get_pools_and_items',
'buy_exchange_item',
'buy_lottery_pool',
'buy_random_pool',
'run_lottery',
'get_income_expenditure',
]
DAY2POINT = CONFIG.yqpoint.signin_points
MAX_CHECK_DAYS = len(DAY2POINT)
def get_signin_infos(user: User, detailed_days: int = MAX_CHECK_DAYS,
check_days: int = None, today: date = None,
signin_today: bool = True):
'''
获取一定日期内每天的签到信息
:param user: 要查询的用户
:type user: User
:param detailed_days: 显示详细签到信息的天数, defaults to None
:type detailed_days: int, optional
:param check_days: 查询天数(包括今天), defaults to None
:type check_days: int, optional
:param today: 查询的当天, defaults to None
:type today: date, optional
:param signin_today: 计算连续签到天数时认为今天已签到, defaults to True
:type signin_today: bool, optional
:return: 已连续签到天数,和今天起共detailed_days天的签到信息
:rtype: tuple[int, list[bool] | None]
'''
if today is None:
today = datetime.now().date()
day_check_kws = {}
if check_days is not None:
day_check_kws.update(time__date__gt=today - timedelta(days=check_days))
signin_days = set(YQPointRecord.objects.filter(
user=user,
source_type=YQPointRecord.SourceType.CHECK_IN,
**day_check_kws,
).order_by('time').values_list('time__date', flat=True).distinct())
# 获取连续签到天数
last_day = today
if signin_today:
last_day -= timedelta(days=1)
while last_day in signin_days:
last_day -= timedelta(days=1)
continuous_days = (today - last_day).days - 1
if signin_today:
continuous_days += 1
if detailed_days is not None:
# 从今天开始,第前n天是否签到(今天不计入本次签到)
# 可用来提供提示信息
detailed_infos = [
(today - timedelta(days=day)) in signin_days
for day in range(detailed_days)
]
else:
detailed_infos = None
return continuous_days, detailed_infos
def distribution2point(distribution: list, day_type: int) -> int:
'''根据获取积分分布和当日类别,获取应获得的实际元气值'''
result = distribution[day_type]
if isinstance(result, (tuple, list)) and len(result) == 2:
result = random.randint(*result)
return result
[文档]
def add_signin_point(user: User):
'''
用户获得今日签到的积分,并返回用户提示信息
:param user: 签到的用户
:type user: User
:return: 本次签到获得的积分,以及应看到的提示(若为空则显示默认提示)
:rtype: tuple[int, str]
'''
# 获取已连续签到的日期和近几天签到信息
continuous_days, signed_in = get_signin_infos(
user, MAX_CHECK_DAYS, signin_today=True)
day_type = (continuous_days - 1) % MAX_CHECK_DAYS
# 连续签到的基础元气值,可以从文件中读取,此类写法便于分析
add_point = distribution2point(DAY2POINT, day_type)
User.objects.modify_YQPoint(user, add_point, "每日登录",
YQPointRecord.SourceType.CHECK_IN)
# 元气值活动等获得的额外元气值
bonus_point = 0
if bonus_point:
User.objects.modify_YQPoint(user, bonus_point, "登录额外奖励",
YQPointRecord.SourceType.CHECK_IN)
# 顺便进行解锁成就检验
unlock_signin_achievements(user, continuous_days)
# 用户应看到的信息
user_display = [
f'今日首次签到,获得{add_point}元气值!',
f'连续签到{continuous_days}天,获得{add_point}元气值!',
f'连续签到{continuous_days}天,获得{add_point}元气值,连续签到{7}天有惊喜!',
f'连续签到{continuous_days}天,获得{add_point}元气值!',
f'连续签到{continuous_days}天,再签到{2}天即可获得大量元气值!',
f'连续签到{continuous_days}天,获得{add_point}元气值,明日可获得大量元气值!',
f'第7日签到,获得{add_point}元气值!',
][day_type]
# 获取的额外元气值可能需要提示
if bonus_point:
pass
total_point = add_point + bonus_point
return total_point, user_display
[文档]
def get_pools_and_items(pool_type: Pool.Type, user: User, frontend_dict: Dict[str, any]):
"""
获取某一种类的所有当前开放的pool的前端所需信息。如果用户未参加奖池关联的活动,这个奖池的信息不会被返回。
:param pool_type: pool种类
:type pool_type: Pool.Type
:param user: 当前用户
:type user: User
:param frontend_dict: 前端字典
:type frontend_dict: Dict[str, any]
"""
assert hasattr(user, 'naturalperson'), "非个人用户发起了奖池兑换请求"
pools = Pool.objects.filter(
Q(type=pool_type) & Q(start__lte=datetime.now())
& (Q(end__isnull=True) | Q(end__gte=datetime.now() - timedelta(days=1)))
& (Q(activity__isnull=True) | Exists(
Participation.objects.filter(
activity = OuterRef('activity'),
person = user.naturalperson,
status = Participation.AttendStatus.ATTENDED,
)
))
)
pools_info = []
# 此列表中含有若干dict,每个dict对应一个待展示的pool,例如:
# {
# "title": "xxx", "type": "兑换/抽奖/盲盒",
# "entry_time": 1, # 对于盲盒/抽奖奖池,一个用户最多能买几次
# "ticket_price": 1, # 盲盒/抽奖奖池价格
# "start": "2022-9-4", "end": "2022-9-5", # end可能为空
# "redeem_start": "2022-9-10", "redeem_end": "2022-9-20", # 指线下获取奖品实物的时间,均可能为空
#
# "status": 0/1, # 0表示进行中的奖池,1表示结束一天内的抽奖奖池
# "items": [], # 含有若干dict,每个dict代表该奖池中的一个poolitem
# # key包括"id", "origin_num", "consumed_num", "exchange_price",
# # "exchange_limit", "is_big_prize", "is_empty",
# # "prize__name", "prize__more_info", "prize__stock",
# # "prize__reference_price", "prize__image", "prize__id",
# # 以及origin_num-consumed_num得到的remain_num
# # 如果是兑换类奖池,还有my_exchange_time,即当前用户兑换过该item多少次
# "my_entry_time": 0, # 当前用户进过抽奖/盲盒奖池多少次
# "records_num": 0, # 抽奖/盲盒奖池总共被买了多少次
# "capacity": 0, # 盲盒奖池最多能被买多少次(即包括谢谢参与在内的所有poolitem的数量和)
# "results": { # 已结束的抽奖奖池有这一项,表示抽奖结果,
# # 其中包含"big_prize_results"和"normal_prize_results"两个列表
# # 每个列表中又是若干词典,每个词典表示一种奖品的获奖情况(这些词典按奖品参考价格的降序排列),
# # 其key包括prize_name、prize_image和winners,其中winners是NaturalPerson.name的list,即这种奖品的获奖者列表
# "big_prize_results": [
# {"prize_name": "大奖1", "prize_image": "imageurl", "winners": ["张三", "李四"]},
# {"prize_name": "大奖2", "prize_image": "imageurl", "winners": ["Alice"]},
# ],
# "normal_prize_results": [
# {"prize_name": "奖品1", "prize_image": "imageurl", "winners": ["王五"]},
# {"prize_name": "奖品2", "prize_image": "imageurl", "winners": ["Alice", "Bob"]},
# ]
# }
# }
for pool in pools:
this_pool_info = model_to_dict(pool)
if pool.start <= datetime.now() and (pool.end is None or pool.end >= datetime.now()):
this_pool_info["status"] = 0
else:
this_pool_info["status"] = 1
this_pool_info["capacity"] = pool.get_capacity()
this_pool_items = list(pool.items.filter(prize__isnull=False).values(
"id", "origin_num", "consumed_num", "exchange_price",
"exchange_limit", "is_big_prize",
"prize__name", "prize__more_info", "prize__stock",
"prize__reference_price", "prize__image", "prize__id", "exchange_attributes",
))
for item in this_pool_items:
item["remain_num"] = item["origin_num"] - item["consumed_num"]
this_pool_info["items"] = sorted(
this_pool_items, key=lambda x: -x["remain_num"]) # 按剩余数量降序排序,已卖完的在最后
if pool_type != Pool.Type.EXCHANGE:
this_pool_info["my_entry_time"] = PoolRecord.objects.filter(
user=user, pool=pool).count()
this_pool_info["records_num"] = PoolRecord.objects.filter(
pool=pool).count()
if pool_type == Pool.Type.RANDOM:
for item in this_pool_items:
# 此处显示的是抽奖概率,目前使用原始的占比
percent = (100 * item["origin_num"] /
this_pool_info["capacity"])
if percent == int(percent):
percent = int(percent)
elif round(percent, 1) != 0:
# 保留最低精度
percent = round(percent, 1)
item["probability"] = percent
# LOTTERY类的pool不需要capacity
else:
for item in this_pool_items:
item["my_exchange_time"] = PoolRecord.objects.filter(
user=user, pool=pool, prize=item["prize__id"]).count()
# EXCHANGE类的pool不需要capcity和records_num和my_entry_time
if this_pool_info["status"] == 1: # 如果是刚结束的抽奖,需要填充results
big_prize_items = PoolItem.objects.filter(
pool=pool, is_big_prize=True).order_by("-prize__reference_price")
normal_prize_items = PoolItem.objects.filter(
pool=pool, is_big_prize=False).order_by("-prize__reference_price")
big_prizes_and_winners = []
normal_prizes_and_winners = []
for big_prize_item in big_prize_items:
big_prizes_and_winners.append(
{"prize_name": big_prize_item.prize.name, "prize_image": big_prize_item.prize.image})
winner_names = list(PoolRecord.objects.filter(
pool=pool, prize=big_prize_item.prize).values_list(
"user__name", flat=True)) # TODO: 需要distinct()吗?
big_prizes_and_winners[-1]["winners"] = winner_names
for normal_prize_item in normal_prize_items:
if normal_prize_item.is_empty:
continue
normal_prizes_and_winners.append(
{"prize_name": normal_prize_item.prize.name, "prize_image": normal_prize_item.prize.image})
winner_names = list(PoolRecord.objects.filter(
pool=pool, prize=normal_prize_item.prize).values_list(
"user__name", flat=True)) # TODO: 需要distinct()吗?
normal_prizes_and_winners[-1]["winners"] = winner_names
this_pool_info["results"] = {}
this_pool_info["results"]["big_prize_results"] = big_prizes_and_winners
this_pool_info["results"]["normal_prize_results"] = normal_prizes_and_winners
pools_info.append(this_pool_info)
frontend_dict["pools_info"] = pools_info
def check_user_pool(user: User, pool: Pool) -> None | str:
"""
检查用户 user 是否已经毕业,以及是否参加了 pool 所关联的活动;当前是否在奖池的运行时间内。
:param user: 当前用户
:type user: User
:param pool: 待购买的物品所属奖池
:type pool: Pool
:return: 如果检查通过,返回None。否则,返回一个可以作为 wrong() 函数参数的字符串。
:rtype: None | str
"""
# 检查奖池运行时间
if pool.start > datetime.now():
return '当前奖池时间未开始!'
if pool.end is not None and pool.end < datetime.now():
return '当前奖池时间已结束!'
# 检查用户是否已经毕业
if not user.active:
return '您已毕业!'
# 检查用户是否参加了相关的活动
if pool.activity is not None:
assert hasattr(user, 'naturalperson'), "非个人用户发起了奖池兑换请求"
participates = Participation.objects.filter(
activity = pool.activity,
person = user.naturalperson,
status = Participation.AttendStatus.ATTENDED,
)
if not participates.exists():
return '该奖池为"' + str(pool.activity) + '"活动限定奖池,请先参加再来购买!'
return None
[文档]
def buy_exchange_item(
user: User, poolitem_id: str,
attributes: dict[str, str] = {}) -> MESSAGECONTEXT:
"""
购买兑换奖池的某个奖品
:param user: 当前用户
:type user: User
:param poolitem_id: 待购买的奖池奖品id,因为是前端传过来的所以是str
:type poolitem_id: str
:return: 表明购买结果的warn_code和warn_message
:rtype: MESSAGECONTEXT
"""
# 检查奖品是否可以购买
try:
poolitem_id = int(poolitem_id)
poolitem = PoolItem.objects.get(
id=poolitem_id, pool__type=Pool.Type.EXCHANGE)
except:
return wrong('奖品不存在!')
if poolitem.origin_num - poolitem.consumed_num <= 0:
return wrong('奖品已售罄!')
other_errors = check_user_pool(user, poolitem.pool)
if other_errors:
return wrong(other_errors)
my_exchanged_time = PoolRecord.objects.filter(
user=user, pool=poolitem.pool, prize=poolitem.prize).count()
if my_exchanged_time >= poolitem.exchange_limit:
return wrong('您兑换该奖品的次数已达上限!')
for exchange_attribute in poolitem.exchange_attributes:
if exchange_attribute['name'] not in attributes:
return wrong('请填写完整的兑换信息!')
if attributes[exchange_attribute['name']] not in exchange_attribute['range']:
return wrong('兑换信息填写错误!')
try:
with transaction.atomic():
poolitem = PoolItem.objects.select_for_update().get(
id=poolitem_id, pool__type=Pool.Type.EXCHANGE)
assert poolitem.pool.start <= datetime.now(), "兑换时间未开始!"
assert poolitem.pool.end is None or poolitem.pool.end >= datetime.now(), "兑换时间已结束!"
assert poolitem.origin_num - poolitem.consumed_num > 0, "奖品已售罄!"
my_exchanged_time = PoolRecord.objects.filter(
user=user, pool=poolitem.pool, prize=poolitem.prize).count()
assert my_exchanged_time < poolitem.exchange_limit, '您兑换该奖品的次数已达上限!'
assert user.YQpoint >= poolitem.exchange_price, '您的元气值不足,兑换失败!'
# 更新奖品状态
poolitem.consumed_num += 1
poolitem.save()
# 创建兑换记录
PoolRecord.objects.create(
user=user,
pool=poolitem.pool,
prize=poolitem.prize,
attributes=attributes,
status=PoolRecord.Status.UN_REDEEM,
)
# 扣除元气值
User.objects.modify_YQPoint(
user,
-poolitem.exchange_price,
source=f'兑换奖池:{poolitem.pool.title}-{poolitem.prize.name}',
source_type=YQPointRecord.SourceType.CONSUMPTION
)
except AssertionError as e:
return wrong(str(e))
return succeed('兑换成功!')
[文档]
def buy_lottery_pool(user: User, pool_id: str) -> MESSAGECONTEXT:
"""
购买抽奖奖池
:param user: 当前用户
:type user: User
:param pool_id: 待购买的奖池id,因为是前端传过来的所以是str
:type pool_id: str
:return: 表明购买结果的warn_code和warn_message
:rtype: MESSAGECONTEXT
"""
# 检查抽奖奖池状态
try:
pool_id = int(pool_id)
pool = Pool.objects.get(id=pool_id, type=Pool.Type.LOTTERY)
except:
return wrong('抽奖不存在!')
my_entry_time = PoolRecord.objects.filter(pool=pool, user=user).count()
if my_entry_time >= pool.entry_time:
return wrong('您在本奖池中抽奖的次数已达上限!')
other_errors = check_user_pool(user, pool)
if other_errors:
return wrong(other_errors)
try:
with transaction.atomic():
pool = Pool.objects.select_for_update().get(id=pool_id, type=Pool.Type.LOTTERY)
assert pool.start <= datetime.now(), '抽奖未开始!'
assert pool.end is None or pool.end >= datetime.now(), '抽奖已结束!'
my_entry_time = PoolRecord.objects.filter(
pool=pool, user=user).count()
assert my_entry_time < pool.entry_time, '您在本奖池中抽奖的次数已达上限!'
assert user.YQpoint >= pool.ticket_price, '您的元气值不足,兑换失败!'
# 创建抽奖记录
PoolRecord.objects.create(
user=user,
pool=pool,
status=PoolRecord.Status.LOTTERING,
)
# 扣除元气值
User.objects.modify_YQPoint(
user,
-pool.ticket_price,
source=f'抽奖奖池:{pool.title}',
source_type=YQPointRecord.SourceType.CONSUMPTION
)
except AssertionError as e:
return wrong(str(e))
return succeed('成功进行一次抽奖!您可以在抽奖时间结束后查看抽奖结果~')
def select_random_prize(poolitems: QuerySet[PoolItem], select_num: Optional[int] = None) -> List[int]:
"""
实现无放回随机抽取select_num个PoolItem(的id),初始时每种PoolItem有origin_num-consumed_num个
:param poolitems: 待抽取的PoolItem构成的QuerySet(每个元素表示一种PoolItem而非一个)
:type poolitems: QuerySet[PoolItem]
:param select_num: 抽几个,若为None则抽取所有奖品,也即对poolitems做一次shuffle, defaults to None
:type select_num: Optional[int], optional
:return: 抽出的poolitem的id组成的list,长度等于select_num
:rtype: List[int]
"""
assert poolitems.count() > 0
num_all_items = 0 # 奖品的总数
item_dict = {} # int: PoolItem,实现把一个自然数区间映射到一种奖品
for item in poolitems:
if item.origin_num - item.consumed_num <= 0:
continue
item_dict[num_all_items] = item
num_all_items += item.origin_num - item.consumed_num
if select_num is None: # 不给出select_num就默认抽取所有奖品,也即对poolitems做一次shuffle
select_num = num_all_items
assert select_num <= num_all_items
selected_idx = random.sample(
range(num_all_items), select_num) # 选出select_num个序号
selected_items_id = []
for idx in selected_idx:
for key in sorted(item_dict.keys(), reverse=True):
if idx >= key: # 寻找idx落入的区间
selected_items_id.append(
item_dict[key].id) # 把idx映射为PoolItem.id
break
return selected_items_id
[文档]
def buy_random_pool(user: User, pool_id: str) -> Tuple[MESSAGECONTEXT, int, int]:
"""
购买盲盒
:param user: 当前用户
:type user: User
:param pool_id: 待购买的奖池id,因为是前端传过来的所以是str
:type pool_id: str
:return: 表明购买结果的warn_code和warn_message;买到的prize的id(如果购买失败就是-1);
表明盲盒结果的一个int:2表示无反应、1表示开出空盒、0表示开出奖品
:rtype: Tuple[MESSAGECONTEXT, int, int]
"""
# 检查盲盒奖池状态
try:
pool_id = int(pool_id)
pool = Pool.objects.get(id=pool_id, type=Pool.Type.RANDOM)
except:
return wrong('盲盒不存在!'), -1, 2
my_entry_time = PoolRecord.objects.filter(pool=pool, user=user).count()
if my_entry_time >= pool.entry_time:
return wrong('您兑换这款盲盒的次数已达上限!'), -1, 2
total_entry_time = PoolRecord.objects.filter(pool=pool).count()
capacity = pool.get_capacity()
if capacity <= total_entry_time:
return wrong('盲盒已售罄!'), -1, 2
other_errors = check_user_pool(user, pool)
if other_errors:
return wrong(other_errors), -1, 2
try:
with transaction.atomic():
pool = Pool.objects.select_for_update().get(id=pool_id, type=Pool.Type.RANDOM)
assert pool.start <= datetime.now(), '盲盒兑换时间未开始!'
assert pool.end is None or pool.end >= datetime.now(), '盲盒兑换时间已结束!'
my_entry_time = PoolRecord.objects.filter(
pool=pool, user=user).count()
assert my_entry_time < pool.entry_time, '您兑换这款盲盒的次数已达上限!'
assert user.YQpoint >= pool.ticket_price, '您的元气值不足,兑换失败!'
total_entry_time = PoolRecord.objects.filter(pool=pool).count()
capacity = pool.get_capacity()
assert capacity > total_entry_time, '盲盒已售罄!'
# 开盒,修改poolitem记录,创建poolrecord记录
items = pool.items.select_for_update().all()
real_item_id = select_random_prize(items, 1)[0]
modify_item: PoolItem = PoolItem.objects.select_for_update().get(id=real_item_id)
modify_item.consumed_num += 1
modify_item.save()
if modify_item.is_empty: # 如果是空盲盒,没法兑奖,record的状态记为NOT_LUCKY
item_status = PoolRecord.Status.NOT_LUCKY
else:
item_status = PoolRecord.Status.UN_REDEEM
PoolRecord.objects.create(
user=user,
pool=pool,
status=item_status,
prize=modify_item.prize,
)
# 扣除元气值
User.objects.modify_YQPoint(
user,
-pool.ticket_price,
source=f'盲盒奖池:{pool.title}',
source_type=YQPointRecord.SourceType.CONSUMPTION
)
# 如果抽到了空盒子,按照设定值对用户给予元气值补偿并返回相应的提示
if modify_item.is_empty:
compensate_YQPoint = random.randint(
pool.empty_YQPoint_compensation_lowerbound, pool.empty_YQPoint_compensation_upperbound)
if compensate_YQPoint == 0:
return succeed(f'兑换盲盒成功!您抽到了空盒子,但是很遗憾这次没有元气值补偿QAQ'), -1, 1
User.objects.modify_YQPoint(
user,
compensate_YQPoint,
source=f'盲盒奖池:{pool.title}空盒子补偿',
source_type=YQPointRecord.SourceType.COMPENSATION
)
return succeed(f'兑换盲盒成功!您抽到了空盒子,获得{compensate_YQPoint}点元气值补偿!'), -1, 1
if modify_item.prize is None:
return succeed('兑换盲盒成功!'), -1, 1
return succeed('兑换盲盒成功!'), modify_item.prize.id, int(modify_item.is_empty)
except AssertionError as e:
return wrong(str(e)), -1, 2
[文档]
@transaction.atomic
def run_lottery(pool_id: int):
"""
抽奖;更新PoolRecord表和PoolItem表;给所有参与者发送通知
:param pool_id: 待抽取的抽奖奖池id
:type pool_id: int
"""
# 部分参考了course_utils.py的draw_lots函数
pool = Pool.objects.select_for_update().get(id=pool_id, type=Pool.Type.LOTTERY)
assert not PoolRecord.objects.filter( # 此时pool关联的所有records都应该是LOTTERING
pool=pool).exclude(status=PoolRecord.Status.LOTTERING).exists()
related_records = PoolRecord.objects.filter(
pool=pool, status=PoolRecord.Status.LOTTERING)
records_num = related_records.count()
if records_num == 0:
return
# 抽奖
record_ids_and_participant_ids = list(
related_records.values("id", "user__id"))
items = pool.items.all()
user2prize_names = {d["user__id"]: []
for d in record_ids_and_participant_ids} # 便于发通知
winner_record_id2item_id = {} # poolrecord.id: poolitem.id,便于更新poolrecord
loser_record_ids = [] # poolrecord.id,便于更新poolrecord
num_all_items = 0 # 该奖池中奖品总数
for item in items:
num_all_items += item.origin_num - item.consumed_num
if num_all_items >= records_num: # 抽奖记录数少于或等于奖品数,人人有奖,给每个记录分配一个随机奖品
shuffled_items = select_random_prize(
items, records_num) # 随机选出待发放的奖品
for i in range(records_num): # 遍历所有记录,每个记录都有奖品
user2prize_names[record_ids_and_participant_ids[i]["user__id"]].append(
items.get(id=shuffled_items[i]).prize.name
)
winner_record_id2item_id[record_ids_and_participant_ids[i]
["id"]] = shuffled_items[i]
else: # 抽奖记录数多于奖品数,给每个奖品分配一个中奖者
for item in items: # 遍历所有奖品,每个奖品都会送给一个记录
for i in range(item.origin_num - item.consumed_num):
winner_record_index = random.randint(
0, len(record_ids_and_participant_ids) - 1)
user2prize_names[record_ids_and_participant_ids[winner_record_index]["user__id"]].append(
item.prize.name)
winner_record_id2item_id[record_ids_and_participant_ids[winner_record_index]["id"]] = item.id
# 因为记录多,奖品少,这里肯定不会pop成空列表
record_ids_and_participant_ids.pop(winner_record_index)
# pop剩下的就是没中奖的那些记录
loser_record_ids = [d["id"]
for d in record_ids_and_participant_ids]
# 更新数据库
for winner_record_id, poolitem_id in winner_record_id2item_id.items():
record = PoolRecord.objects.select_for_update().get(id=winner_record_id)
item = PoolItem.objects.select_for_update().get(id=poolitem_id)
record.status = PoolRecord.Status.UN_REDEEM
record.prize = item.prize
record.time = datetime.now()
item.consumed_num += 1
record.save()
item.save()
for loser_record_id in loser_record_ids:
record = PoolRecord.objects.select_for_update().get(id=loser_record_id)
record.status = PoolRecord.Status.NOT_LUCKY
record.time = datetime.now()
record.save()
# 给中奖的同学发送通知
sender = Organization.objects.get(
oname=CONFIG.yqpoint.org_name).get_user()
for user_id in user2prize_names.keys():
receiver = User.objects.get(id=user_id)
typename = Notification.Type.NEEDREAD
title = Notification.Title.LOTTERY_INFORM
content = f"恭喜您在奖池【{pool.title}】中抽中奖品"
for prize_name in user2prize_names[user_id]:
content += f"【{prize_name}】" # 可能出现重复,即一种奖品中了好几次,不过感觉问题也不太大
notification_create(
receiver=receiver,
sender=sender,
typename=typename,
title=title,
content=content,
# URL=f'', # TODO: 我的奖品页面?
to_wechat=dict(app=WechatApp.TO_PARTICIPANT,
level=WechatMessageLevel.IMPORTANT),
)
# 给没中奖的同学发送通知
receivers = PoolRecord.objects.filter(
id__in=loser_record_ids,
).values_list("user", flat=True)
receivers = User.objects.filter(id__in=receivers)
content = f"很抱歉通知您,您在奖池【{pool.title}】中没有中奖"
if len(receivers) > 0:
bulk_notification_create(
receivers=receivers,
sender=sender,
typename=typename,
title=title,
content=content,
# URL=f'', # TODO: 我的奖品页面?
to_wechat=dict(app=WechatApp.TO_PARTICIPANT,
level=WechatMessageLevel.IMPORTANT),
)
[文档]
def get_income_expenditure(
user: User, start_time: datetime, end_time: datetime
) -> tuple[int, int]:
'''获取用户一段时间内收支情况
Args:
user(Usesr): 要查询的用户
start_time(datetime): 开始时间
end_time(datetime): 结束时间
Returns:
tuple[int, int]: 收入, 支出
'''
# 根据user选出YQPointRecord
records = YQPointRecord.objects.filter(
user=user, time__gte=start_time, time__lte=end_time)
# 统计时期内收支情况
income = 0
expenditure = 0
for record in records:
if record.delta >= 0:
income += record.delta
else:
expenditure += abs(record.delta)
return income, expenditure