'''查询引用字段
- 获取查询字段的函数,且同时支持字段描述符、字段实例以及字段名称。
- 通过标记字段为特殊关联字段,可以获取特殊关联字段的查询名称。
- 进行单条件查询的函数,使用首个字段所在模型的默认管理器进行查询。
- 辅助多条件查询的函数,追踪关系并查询字段。
在本模块中,有 s 前缀的函数均为单字段转化函数或单条件查询函数,m 前缀的为多条件查询函数。
在以往的代码中,我们经常会看到这样的代码::
class AModel(Model):
fkey_name = models.ForeignKey(...)
class BModel(Model):
fkey2_name = models.ForeignKey(A_Model)
当跨越关系查询时,我们需要写很长的代码,这样不仅不美观,而且不利于代码的追踪修改,例如::
instance = BModel.objects.filter(fkey2_name__fkey_name__field_name=...)
values = BModel.objects.values('fkey2_name__fkey_name__field_name')
如需修改`fkey2_name`字段的名称,则无法追踪以上代码,因此修改起来非常困难,且容易出错。
这是因为我们使用字符串来引用字段来引用字段。如果我们通过字段本身来引用,就可以避免这个问题。
Example:
使用`f`和`q`类型函数来获取字段的查询名称和条件,并追踪引用::
instance = BModel.objects.filter(sq([BModel.fkey2_name, AModel.fkey_name, 'field_name'], ...))
values = BModel.objects.values(f(BModel.fkey2_name, AModel.fkey_name, 'field_name'))
对于**名称固定**的最终非关系字段,使用关键字参数查询往往更方便::
instance = queryset.get(mq(BModel.fkey2_name, AModel.fkey_name, field_name=...))
在默认管理器上进行查询时,可以直接使用查询函数::
instance = mget(BModel.fkey2_name, AModel.fkey_name, field_name=...)
names = svlist(BModel.fkey2_name, AModel.fkey_name, field_name)
blogs = mfilter(Blog.author, Author.user, score=0, active=True)
Warning:
从非抽象父类继承的字段仍存储于父类中,作为查询函数首个参数时,会使用父类的默认管理器进行查询。
本模块的函数在模型定义时无法使用,因为字段描述符在模型类创建后才会被创建,但你可以在任何方法中使用。
Note:
本模块不提供与更新相关的函数,因为更新不会跨越关系,且应以管理器要求的安全方式进行。
尽管多条件查询支持关键字参数,你仍应避免在条件中包含`__`以阻碍`field=`型字段追踪。
抽象模型的字段在子类被继承时被复制,因此可以在子类中使用本模块的函数。
'''
from typing import cast, Any, TypeAlias, TypeGuard, TypeVar
from django.db.models import Field, Q, QuerySet, Model
from django.db.models.query_utils import DeferredAttribute
from django.db.models.fields.files import FileDescriptor
from django.db.models.fields.related import RelatedField, ForeignObjectRel
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
ManyToManyDescriptor,
ForeignKeyDeferredAttribute,
)
from django.db.models.constants import LOOKUP_SEP
__all__ = [
'f', 'q', 'sq', 'mq',
'Index', 'Forward', 'Reverse',
'sget', 'sfilter', 'sexclude', 'svalues', 'svlist',
'qsvlist',
'mget', 'mfilter', 'mexclude',
]
NormalFieldDescriptor: TypeAlias = DeferredAttribute | FileDescriptor
NormalFieldLike: TypeAlias = Field | NormalFieldDescriptor
ForwardDescriptor: TypeAlias = ForwardManyToOneDescriptor | ForwardOneToOneDescriptor
ReverseDescriptor: TypeAlias = ReverseManyToOneDescriptor | ReverseOneToOneDescriptor
ForeignIndexDescriptor: TypeAlias = ForeignKeyDeferredAttribute
RelatedDescriptor: TypeAlias = (ForwardDescriptor | ReverseDescriptor
| ForeignIndexDescriptor | ManyToManyDescriptor)
RelatedFieldLike: TypeAlias = RelatedField | RelatedDescriptor
FieldLike: TypeAlias = NormalFieldLike | RelatedFieldLike
class SpecialRelation:
'''特殊关联字段
用于标记字段为特殊关联字段,转化查询时使用`fieldlike`属性。
'''
def __init__(self, fieldlike: RelatedFieldLike) -> None:
self.fieldlike = fieldlike
[文档]
class Index(SpecialRelation):
'''索引字段
标记字段为索引字段,查询时转化为`%field_name%_id`。
'''
pass
[文档]
class Forward(SpecialRelation):
'''正向关系字段
标记字段为正向关系字段,查询时转化为`%field_name%`。
'''
pass
[文档]
class Reverse(SpecialRelation):
'''反向关系字段
标记字段为反向关系字段,查询时转化为`%related_query_name%`。
'''
pass
IndexLike: TypeAlias = ForeignIndexDescriptor | Index
RelationLike: TypeAlias = RelatedFieldLike | SpecialRelation
FieldLikeObject: TypeAlias = FieldLike | SpecialRelation
FieldLikeExpr: TypeAlias = FieldLikeObject | str
def _is_relation(field: FieldLikeObject) -> TypeGuard[RelationLike]:
'''判断字段是否为关系字段相关属性'''
if isinstance(field, SpecialRelation):
field = field.fieldlike
if isinstance(field, Field):
return field.is_relation
if isinstance(field, RelatedDescriptor):
return True
return False
def _is_foreign_index(field: RelationLike) -> TypeGuard[IndexLike]:
'''判断字段是否为外键索引字段'''
if isinstance(field, Index):
return True
return isinstance(field, ForeignIndexDescriptor)
def _is_forward_relation(field: RelationLike) -> bool:
'''判断字段是否为正向关系字段'''
if isinstance(field, ManyToManyDescriptor):
return not field.reverse
if isinstance(field, Forward):
return True
return isinstance(field, ForwardDescriptor | RelatedField)
def _is_reverse_relation(field: RelationLike) -> bool:
'''判断字段是否为反向关系字段'''
if isinstance(field, ManyToManyDescriptor):
return field.reverse
if isinstance(field, Reverse):
return True
return isinstance(field, ReverseDescriptor)
def _get_normal_field(field: NormalFieldLike) -> Field:
'''获取普通字段,同样可用于关联字段,但不适用于关联字段描述符'''
if isinstance(field, NormalFieldDescriptor):
field = field.field
if not isinstance(field, Field):
raise TypeError(f'{type(field)} is not a normal field')
return field
def _get_related_field(field: RelationLike) -> RelatedField:
'''获取关系字段'''
if isinstance(field, SpecialRelation):
field = field.fieldlike
if isinstance(field, RelatedField):
return field
if isinstance(field, ForeignIndexDescriptor):
return cast(RelatedField, field.field)
if isinstance(field, ForwardDescriptor):
return cast(RelatedField, field.field)
if isinstance(field, ManyToManyDescriptor):
return field.field
if isinstance(field, ReverseOneToOneDescriptor):
return field.related.field
if isinstance(field, ReverseManyToOneDescriptor):
return cast(RelatedField, field.field)
raise TypeError(f'{type(field)} is not a related field')
def _normal_name(field: NormalFieldLike) -> str:
'''获取普通字段的查询名称'''
# 普通字段的`name`属性代表字段在查询时使用的名称
# 见`Field.get_filter_kwargs_for_object`
return _get_normal_field(field).name
def _foreign_index_name(field: RelationLike) -> str:
'''获取外键索引字段的查询名称'''
# `attname`属性代表`ForeignKey`对应数据库字段的名称,即`%field_name%_id`
return _get_related_field(field).attname
def _forward_name(field: RelationLike) -> str:
'''获取正向关系字段的查询名称'''
# 关联字段的`name`属性代表模型字段的名称
return _get_related_field(field).name
def _get_reverse_relation(related_field: RelatedField) -> ForeignObjectRel:
'''获取反向关系字段
Raises:
ValueError: 如果反向关系字段不可用
'''
rel = cast(ForeignObjectRel, related_field.remote_field)
if rel.is_hidden():
raise ValueError(f'Cannot reverse a hidden relation: {related_field}')
return rel
def _reverse_name(field: RelationLike) -> str:
'''获取反向关系字段的查询名称'''
# 反向关系字段的`name`属性代表模型字段的名称
field = _get_related_field(field)
_get_reverse_relation(field)
return field.related_query_name()
def _to_field_name(field: FieldLikeExpr) -> str:
'''获取字段的查询名称'''
if isinstance(field, str):
return field
if _is_relation(field):
if _is_foreign_index(field):
return _foreign_index_name(field)
if _is_forward_relation(field):
return _forward_name(field)
if _is_reverse_relation(field):
return _reverse_name(field)
elif isinstance(field, NormalFieldLike):
return _normal_name(field)
raise TypeError(f'Unsupported type: {type(field)} for field')
[文档]
def f(*fields: FieldLikeExpr) -> str:
'''获取连续字段的查询名称'''
return LOOKUP_SEP.join(_to_field_name(field) for field in fields)
[文档]
def q(*fields: FieldLikeExpr, value: Any) -> Q:
'''获取连续字段的查询Q对象'''
return Q(**{f(*fields): value})
def _concat_query_key(prefix: str, query_key: str) -> str:
'''拼接查询字段,转化特殊的关键字查询'''
if not prefix:
return query_key
if query_key.lower() == 'in':
query_key = 'in' # in 是 Python 的关键字,无法作为关键字参数传递
return prefix + LOOKUP_SEP + query_key
[文档]
def mq(*fields: FieldLikeExpr, **querys: Any) -> Q:
'''获取包含某字段多个查询条件的Q对象
Args:
*fields: 代表字段的完整路径,可以包含连续字段
**querys: 查询条件,键为查询类型(见Note部分),值为查询值
Returns:
Q: 将每个查询条件`key: value`转为`q(*fields, key, value=value)`的`Q`条件之交
Example:
>>> mq('user', 'id', lt=1, gt=0, isnull=False)
Q(user__id__lt=1, user__id__gt=0, user__id__isnull=False)
>>> mq('user', 'id', IN=[1, 2, 3])
Q(user__id__in=[1, 2, 3])
Note:
尽量避免在查询条件中包含`__`,这严重妨碍了`field=`型字段追踪。
Python 的关键字`in`无法作为关键字参数传递,可以使用其任何大小写形式代替。
'''
prefix = f(*fields)
return Q(**{_concat_query_key(prefix, key): value for key, value in querys.items()})
T = TypeVar('T')
ListLike: TypeAlias = list[T] | tuple[T, ...]
Extend: TypeAlias = T | ListLike[T]
def _as_seq(value: Extend[T]) -> ListLike[T]:
'''将参数转化成类似列表的序列形式'''
if not isinstance(value, (list, tuple)):
value = [value]
return value
def _first(field: Extend[T]) -> T:
'''获取字段的第一个元素'''
fields = _as_seq(field)
if not fields:
raise TypeError('Empty fields')
return fields[0]
[文档]
def sq(field: Extend[FieldLikeExpr], value: Any) -> Q:
'''获取单个查询条件的Q对象,可以包含连续字段'''
return q(*_as_seq(field), value=value)
def _get_queryset(field: Extend[FieldLikeObject]) -> QuerySet[Model]:
'''获取字段的查询集'''
field = _first(field)
if _is_relation(field):
if _is_reverse_relation(field):
rel = _get_reverse_relation(_get_related_field(field))
return cast(type[Model], rel.model)._default_manager.all()
field = _get_related_field(field)
else:
field = _get_normal_field(field) # type: ignore
return field.model._default_manager.all()
def _ext(fields: Extend[FieldLikeObject]) -> Extend[Any]:
return fields
[文档]
def sget(field: Extend[FieldLikeObject], value: Any) -> Any:
'''单条件获取模型实例,见`QuerySet.get`'''
return _get_queryset(field).get(sq(_ext(field), value))
[文档]
def mget(field: FieldLikeObject, *extras: FieldLikeExpr, **querys: Any) -> Any:
'''多条件获取模型实例,见`QuerySet.get`'''
return _get_queryset(field).get(mq(field, *extras, **querys))
[文档]
def sfilter(field: Extend[FieldLikeObject], value: Any) -> QuerySet:
'''单条件过滤查询集,见`QuerySet.filter`'''
return _get_queryset(field).filter(sq(_ext(field), value))
[文档]
def mfilter(field: FieldLikeObject, *extras: FieldLikeExpr, **querys: Any) -> QuerySet:
'''多条件过滤查询集,见`QuerySet.filter`'''
return _get_queryset(field).filter(mq(field, *extras, **querys))
[文档]
def sexclude(field: Extend[FieldLikeObject], value: Any) -> QuerySet:
'''单条件排除查询集,见`QuerySet.exclude`'''
return _get_queryset(field).exclude(sq(_ext(field), value))
[文档]
def mexclude(field: FieldLikeObject, *extras: FieldLikeExpr, **querys: Any) -> QuerySet:
'''多条件排除查询集,见`QuerySet.exclude`,查询条件为交集'''
return _get_queryset(field).exclude(mq(field, *extras, **querys))
[文档]
def svalues(field: FieldLikeObject, *extras: FieldLikeExpr):
'''单条件查询字段值,见`QuerySet.values`'''
return _get_queryset(field).values(f(field, *extras))
[文档]
def qsvlist(queryset: QuerySet, field: FieldLikeExpr, *extras: FieldLikeExpr) -> list[Any]:
'''单条件查询字段值,立即计算并转为列表,见`QuerySet.values_list`'''
return list(queryset.values_list(f(field, *extras), flat=True))
[文档]
def svlist(field: FieldLikeObject, *extras: FieldLikeExpr) -> list[Any]:
'''单条件查询字段值,立即计算并转为列表,见`QuerySet.values_list`'''
return qsvlist(_get_queryset(field), field, *extras)