Я попробовал предлагаемое решение, но оно не работает по многим причинам, включая конфликты метаклассов с графеном.ObjectType, поэтому я создал решение, которое работает довольно хорошо:
вы бы предоставили подклассу ObjectType список ваших моделей ORM (в моем случае SQLAlchemy), и он автоматически создаст схему. Единственное, что осталось сделать, это добавить специальную обработку, если вам нужно добавить дополнительные параметры фильтрации для любого из полей.
class SQLAlchemyAutoSchemaFactory(graphene.ObjectType):
@staticmethod
def set_fields_and_attrs(klazz, node_model, field_dict):
_name = camel_to_snake(node_model.__name__)
field_dict[f'all_{(s2p(_name))}'] = FilteredConnectionField(node_model)
field_dict[_name] = node_model.Field()
# log.info(f'interface:{node_model.__name__}')
setattr(klazz, _name, node_model.Field())
setattr(klazz, "all_{}".format(s2p(_name)), FilteredConnectionField(node_model))
@classmethod
def __init_subclass_with_meta__(
cls,
interfaces=(),
models=(),
excluded_models=(),
default_resolver=None,
_meta=None,
**options
):
if not _meta:
_meta = ObjectTypeOptions(cls)
fields = OrderedDict()
for interface in interfaces:
if issubclass(interface, SQLAlchemyInterface):
SQLAlchemyAutoSchemaFactory.set_fields_and_attrs(cls, interface, fields)
for model in excluded_models:
if model in models:
models = models[:models.index(model)] + models[models.index(model) + 1:]
possible_types = ()
for model in models:
model_name = model.__name__
_model_name = camel_to_snake(model.__name__)
if hasattr(cls, _model_name):
continue
if hasattr(cls, "all_{}".format(s2p(_model_name))):
continue
for iface in interfaces:
if issubclass(model, iface._meta.model):
model_interface = (iface,)
break
else:
model_interface = (CustomNode,)
_node_class = type(model_name,
(SQLAlchemyObjectType,),
{"Meta": {"model": model, "interfaces": model_interface, "only_fields": []}})
fields["all_{}".format(s2p(_model_name))] = FilteredConnectionField(_node_class)
setattr(cls, "all_{}".format(s2p(_model_name)), FilteredConnectionField(_node_class))
fields[_model_name] = CustomNode.Field(_node_class)
setattr(cls, _model_name, CustomNode.Field(_node_class))
possible_types += (_node_class,)
if _meta.fields:
_meta.fields.update(fields)
else:
_meta.fields = fields
_meta.schema_types = possible_types
super(SQLAlchemyAutoSchemaFactory, cls).__init_subclass_with_meta__(_meta=_meta, default_resolver=default_resolver, **options)
@classmethod
def resolve_with_filters(cls, info: ResolveInfo, model: Type[SQLAlchemyObjectType], **kwargs):
query = model.get_query(info)
for filter_name, filter_value in kwargs.items():
model_filter_column = getattr(model._meta.model, filter_name, None)
if not model_filter_column:
continue
if isinstance(filter_value, SQLAlchemyInputObjectType):
filter_model = filter_value.sqla_model
q = FilteredConnectionField.get_query(filter_model, info, sort=None, **kwargs)
# noinspection PyArgumentList
query = query.filter(model_filter_column == q.filter_by(**filter_value))
else:
query = query.filter(model_filter_column == filter_value)
return query
и вы создаете запрос следующим образом:
class Query(SQLAlchemyAutoSchemaFactory):
class Meta:
interfaces = (Interface1, Interface2,)
models = (*entities_for_iface1, *entities_for_iface2, *other_entities,)
excluded_models = (base_model_for_iface1, base_model_for_iface2)
создайте такой интерфейс:
class Interface1(SQLAlchemyInterface):
class Meta:
name = 'Iface1Node'
model = Iface1Model
и SQLAlchemyInterface:
class SQLAlchemyInterface(Node):
@classmethod
def __init_subclass_with_meta__(
cls,
model=None,
registry=None,
only_fields=(),
exclude_fields=(),
connection_field_factory=default_connection_field_factory,
_meta=None,
**options
):
_meta = SQLAlchemyInterfaceOptions(cls)
_meta.name = f'{cls.__name__}Node'
autoexclude_columns = exclude_autogenerated_sqla_columns(model=model)
exclude_fields += autoexclude_columns
assert is_mapped_class(model), (
"You need to pass a valid SQLAlchemy Model in " '{}.Meta, received "{}".'
).format(cls.__name__, model)
if not registry:
registry = get_global_registry()
assert isinstance(registry, Registry), (
"The attribute registry in {} needs to be an instance of "
'Registry, received "{}".'
).format(cls.__name__, registry)
sqla_fields = yank_fields_from_attrs(
construct_fields(
model=model,
registry=registry,
only_fields=only_fields,
exclude_fields=exclude_fields,
connection_field_factory=connection_field_factory
),
_as=Field
)
if not _meta:
_meta = SQLAlchemyInterfaceOptions(cls)
_meta.model = model
_meta.registry = registry
connection = Connection.create_type(
"{}Connection".format(cls.__name__), node=cls)
assert issubclass(connection, Connection), (
"The connection must be a Connection. Received {}"
).format(connection.__name__)
_meta.connection = connection
if _meta.fields:
_meta.fields.update(sqla_fields)
else:
_meta.fields = sqla_fields
super(SQLAlchemyInterface, cls).__init_subclass_with_meta__(_meta=_meta, **options)
@classmethod
def Field(cls, *args, **kwargs): # noqa: N802
return NodeField(cls, *args, **kwargs)
@classmethod
def node_resolver(cls, only_type, root, info, id):
return cls.get_node_from_global_id(info, id, only_type=only_type)
@classmethod
def get_node_from_global_id(cls, info, global_id, only_type=None):
try:
node: DeclarativeMeta = one_or_none(session=info.context.get('session'), model=cls._meta.model, id=global_id)
return node
except Exception:
return None
@classmethod
def from_global_id(cls, global_id):
return global_id
@classmethod
def to_global_id(cls, type, id):
return id
@classmethod
def resolve_type(cls, instance, info):
if isinstance(instance, graphene.ObjectType):
return type(instance)
graphene_model = get_global_registry().get_type_for_model(type(instance))
if graphene_model:
return graphene_model
else:
raise ValueError(f'{instance} must be a SQLAlchemy model or graphene.ObjectType')