Я бы хотел автоматически сгенерировать схему графен-питон для списка моделей SQLAlchemy.
Я посмотрел вокруг и нашел похожую проблему, используя модели Django здесь: Графен-Python: автоматическое создание схемы из модели Django
он предлагает метакласс, который выглядит следующим образом:
class AutoSchemaMeta(type):
def __new__(meta, clsname, superclasses, attributedict):
new_class = type(clsname, superclasses, attributedict)
for app_name in new_class.app_models.split(","):
app_models = apps.get_app_config(app_name.strip()).get_models()
for model in app_models:
model_name = model._meta.model_name
_model_name = c2u(model_name)
if hasattr(new_class, _model_name):
continue
_node_class = type("{}Node".format(model_name.title()),
(SQLAlchemyObjectType,),
{"Meta": {"model": model, "interfaces": (Node,), "filter_fields": []}})
# register(_node_class) not needed, done by SQLAlchemyObjectType
setattr(new_class, "all_{}".format(s2p(_model_name)), FilteredConnectionField(_node_class))
setattr(new_class, _model_name, Node.Field(_node_class))
print(new_class.__dict__)
return new_class
class Query(metaclass=AutoSchemaMeta):
app_models = 'app1,app2'
Я собираюсь сделать то же самое, что просто смущен тем, что он делает в этих 4 строках кода:
for app_name in new_class.app_models.split(","):
app_models = apps.get_app_config(app_name.strip()).get_models()
for model in app_models:
model_name = model._meta.model_name
Может ли кто-нибудь помочь подсказать, как использовать этот подход, учитывая список файлов / каталогов, содержащих модели SQLAlchemy?
пример orm dir / models:
main.orm.models:
model1.py:
class Model1(Base):
__tablename__ = 'model_one'
id = Column(UUID)
model2.py:
class Model2(Base):
___tablename__ = 'model_two'
id = Column(UUID)
автоматическая генерация следующих типов:
import graphene
from main.orm.models import Model1 as Model1Model, Model2 as Model2Model
class Model1Node(SQLalchemyObjectType):
class Meta:
model = Model1Model
interfaces = (Node,)
class Model2Node(SQLalchemyObjectType):
class Meta:
model = Model2Model
interfaces = (Node,)
class Query(graphene.ObjectType):
model1_node = Node.Field(Model1Node)
all_model1_nodes = FilteredConnectionField(Model1Node)
model2_node = Node.Field(Model2Node)
all_model2_nodes = FilteredConnectionField(Model2Node)
включая FilteredConnectionField, если это полезно
class FilteredConnectionField(SQLAlchemyConnectionField):
def __init__(self, type_, *args, **kwargs):
model = type_._meta.model
kwargs.setdefault("filter", create_filter_argument(model))
super(FilteredConnectionField, self).__init__(type_, *args, **kwargs)
@classmethod
def get_query(cls, model, info: ResolveInfo, filter=None, sort=None, group_by=None, order_by=None, **kwargs):
query = super().get_query(model, info, sort=None, **kwargs)
# columns = inspect(model).columns.values()
from core.main.graphene.util.sqla_types import SQLAlchemyInputObjectType
for filter_name, filter_value in kwargs.items():
model_filter_column = getattr(model, filter_name, None)
if not model_filter_column:
continue
if isinstance(filter_value, SQLAlchemyInputObjectType):
filter_model = filter_value.sqla_model
q = super().get_query(filter_model, info, sort=None, **kwargs)
# noinspection PyArgumentList
query.filter(model_filter_column == q.filter_by(**filter_value).one())
if filter:
for filter_name, filter_value in filter.items():
query = filter_query(query, model, filter_name, filter_value)
return query
@classmethod
def resolve_connection(cls, connection_type, model, info, args, resolved):
filters = args.get("filter", {})
field = getattr(info.schema._query, camel_to_snake(info.field_name))
if field and hasattr(field, 'required') and field.required:
required_filters = [rf.key for rf in field.required]
if required_filters:
missing_filters = set(required_filters) - set(filters.keys())
if missing_filters:
raise Exception(missing_filters)
return super(FilteredConnectionField, cls).resolve_connection(
connection_type, model, info, args, resolved)