Ограничение SQL-запроса определенными полями / столбцами в Graphene-SQLAlchemy - PullRequest
0 голосов
/ 24 мая 2018

Этот вопрос был опубликован как выпуск GH под https://github.com/graphql-python/graphene-sqlalchemy/issues/134, но я решил опубликовать его здесь, чтобы подключиться к SO.

Полная рабочая демонстрацияможно найти под https://github.com/somada141/demo-graphql-sqlalchemy-falcon.

Рассмотрим следующий класс SQLAlchemy ORM:

class Author(Base, OrmBaseMixin):
    __tablename__ = "authors"

    author_id = sqlalchemy.Column(
        sqlalchemy.types.Integer(),
        primary_key=True,
    )

    name_first = sqlalchemy.Column(
        sqlalchemy.types.Unicode(length=80),
        nullable=False,
    )

    name_last = sqlalchemy.Column(
        sqlalchemy.types.Unicode(length=80),
        nullable=False,
    )

Просто заключенный в SQLAlchemyObjectType как таковой:

class TypeAuthor(SQLAlchemyObjectType):
    class Meta:
        model = Author

и предоставляется через:

author = graphene.Field(
    TypeAuthor,
    author_id=graphene.Argument(type=graphene.Int, required=False),
    name_first=graphene.Argument(type=graphene.String, required=False),
    name_last=graphene.Argument(type=graphene.String, required=False),
)

@staticmethod
def resolve_author(
    args,
    info,
    author_id: Union[int, None] = None,
    name_first: Union[str, None] = None,
    name_last: Union[str, None] = None,
):
    query = TypeAuthor.get_query(info=info)

    if author_id:
        query = query.filter(Author.author_id == author_id)

    if name_first:
        query = query.filter(Author.name_first == name_first)

    if name_last:
        query = query.filter(Author.name_last == name_last)

    author = query.first()

    return author

Запрос GraphQL, такой как:

query GetAuthor{
  author(authorId: 1) {
    nameFirst
  }
}

, будет вызывать следующий необработанный SQL (взятый из журналов эха механизма SQLA):

SELECT authors.author_id AS authors_author_id, authors.name_first AS authors_name_first, authors.name_last AS authors_name_last
FROM authors
WHERE authors.author_id = ?
 LIMIT ? OFFSET ?
2018-05-24 16:23:03,669 INFO sqlalchemy.engine.base.Engine (1, 1, 0)

Как можно видеть, нам может понадобиться только поле nameFirst, т. Е. Столбец name_first, но извлекается вся строка.Конечно, ответ GraphQL содержит только запрошенные поля, т. Е.

{
  "data": {
    "author": {
      "nameFirst": "Robert"
    }
  }
}

, но мы по-прежнему извлекаем всю строку, что становится основной проблемой при работе с широкими таблицами.

Есть лиспособ автоматически сообщить, какие столбцы необходимы для SQLAlchemy, чтобы исключить эту форму перегрузки?

1 Ответ

0 голосов
/ 09 июня 2018

На мой вопрос был дан ответ по проблеме GitHub (https://github.com/graphql-python/graphene-sqlalchemy/issues/134).

). Идея состоит в том, чтобы идентифицировать запрошенные поля из аргумента info (типа graphql.execution.base.ResolveInfo), который передается функции распознавателя.через функцию get_field_names, такую ​​как приведенная ниже:

def get_field_names(info):
    """
    Parses a query info into a list of composite field names.
    For example the following query:
        {
          carts {
            edges {
              node {
                id
                name
                ...cartInfo
              }
            }
          }
        }
        fragment cartInfo on CartType { whatever }

    Will result in an array:
        [
            'carts',
            'carts.edges',
            'carts.edges.node',
            'carts.edges.node.id',
            'carts.edges.node.name',
            'carts.edges.node.whatever'
        ]
    """

    fragments = info.fragments

    def iterate_field_names(prefix, field):
        name = field.name.value

        if isinstance(field, FragmentSpread):
            _results = []
            new_prefix = prefix
            sub_selection = fragments[field.name.value].selection_set.selections
        else:
            _results = [prefix + name]
            new_prefix = prefix + name + "."
            if field.selection_set:
                sub_selection = field.selection_set.selections
            else:
                sub_selection = []

        for sub_field in sub_selection:
            _results += iterate_field_names(new_prefix, sub_field)

        return _results

    results = iterate_field_names('', info.field_asts[0])

    return results

Вышеуказанная функция была взята из https://github.com/graphql-python/graphene/issues/348#issuecomment-267717809. Эта проблема содержит другие версии этой функции, но я чувствовал, что это быланаиболее полный.

и использование указанных полей для ограничения найденных полей в запросе SQLAlchemy следующим образом:

fields = get_field_names(info=info)
query = TypeAuthor.get_query(info=info).options(load_only(*relation_fields))

Применительно к приведенному выше примеру запроса:

query GetAuthor{
  author(authorId: 1) {
    nameFirst
  }
}

Функция get_field_names вернет ['author', 'author.nameFirst']. Однако, поскольку «оригинальные» поля SQLAlchemy ORM заключены в змеи, необходимо обновить запрос get_field_names, чтобы удалить префикс author и преобразовать имена полей.с помощью функции graphene.utils.str_converters.to_snake_case.

Короче говоря, описанный выше подход дает сырой SQL-запрос, подобный следующему:

INFO:sqlalchemy.engine.base.Engine:SELECT authors.author_id AS authors_author_id, authors.name_first AS authors_name_first
FROM authors
WHERE authors.author_id = ?
 LIMIT ? OFFSET ?
2018-06-09 13:22:16,396 INFO sqlalchemy.engine.base.Engine (1, 1, 0)

Обновление

Если кто-нибудь приземлится здесь, задаваясь вопросом оИз реализации я нашел способ реализовать собственную версию функции get_query_fields следующим образом:

from typing import List, Dict, Union, Type

import graphql
from graphql.language.ast import FragmentSpread
from graphql.language.ast import Field
from graphene.utils.str_converters import to_snake_case
import sqlalchemy.orm

from demo.orm_base import OrmBaseMixin

def extract_requested_fields(
    info: graphql.execution.base.ResolveInfo,
    fields: List[Union[Field, FragmentSpread]],
    do_convert_to_snake_case: bool = True,
) -> Dict:
    """Extracts the fields requested in a GraphQL query by processing the AST
    and returns a nested dictionary representing the requested fields.

    Note:
        This function should support arbitrarily nested field structures
        including fragments.

    Example:
        Consider the following query passed to a resolver and running this
        function with the `ResolveInfo` object passed to the resolver.

        >>> query = "query getAuthor{author(authorId: 1){nameFirst, nameLast}}"
        >>> extract_requested_fields(info, info.field_asts, True)
        {'author': {'name_first': None, 'name_last': None}}

    Args:
        info (graphql.execution.base.ResolveInfo): The GraphQL query info passed
            to the resolver function.
        fields (List[Union[Field, FragmentSpread]]): The list of `Field` or
            `FragmentSpread` objects parsed out of the GraphQL query and stored
            in the AST.
        do_convert_to_snake_case (bool): Whether to convert the fields as they
            appear in the GraphQL query (typically in camel-case) back to
            snake-case (which is how they typically appear in ORM classes).

    Returns:
        Dict: The nested dictionary containing all the requested fields.
    """

    result = {}
    for field in fields:

        # Set the `key` as the field name.
        key = field.name.value

        # Convert the key from camel-case to snake-case (if required).
        if do_convert_to_snake_case:
            key = to_snake_case(name=key)

        # Initialize `val` to `None`. Fields without nested-fields under them
        # will have a dictionary value of `None`.
        val = None

        # If the field is of type `Field` then extract the nested fields under
        # the `selection_set` (if defined). These nested fields will be
        # extracted recursively and placed in a dictionary under the field
        # name in the `result` dictionary.
        if isinstance(field, Field):
            if (
                hasattr(field, "selection_set") and
                field.selection_set is not None
            ):
                # Extract field names out of the field selections.
                val = extract_requested_fields(
                    info=info,
                    fields=field.selection_set.selections,
                )
            result[key] = val
        # If the field is of type `FragmentSpread` then retrieve the fragment
        # from `info.fragments` and recursively extract the nested fields but
        # as we don't want the name of the fragment appearing in the result
        # dictionary (since it does not match anything in the ORM classes) the
        # result will simply be result of the extraction.
        elif isinstance(field, FragmentSpread):
            # Retrieve referened fragment.
            fragment = info.fragments[field.name.value]
            # Extract field names out of the fragment selections.
            val = extract_requested_fields(
                info=info,
                fields=fragment.selection_set.selections,
            )
            result = val

    return result

, которая анализирует AST в dict, сохраняя структуру запроса и (надеюсь) сопоставляя структуруORM.

Выполнение объекта info запроса типа:

query getAuthor{
  author(authorId: 1) {
    nameFirst,
    nameLast
  }
}

создает

{'author': {'name_first': None, 'name_last': None}}

, в то время как более сложный запрос, подобный этому:

query getAuthor{
  author(nameFirst: "Brandon") {
    ...authorFields
    books {
      ...bookFields
    }
  }
}

fragment authorFields on TypeAuthor {
  nameFirst,
  nameLast
}

fragment bookFields on TypeBook {
  title,
  year
}

производит:

{'author': {'books': {'title': None, 'year': None},
  'name_first': None,
  'name_last': None}}

Теперь эти словари можно использовать для определения того, что представляет собой поле в первичной таблице (в данном случае Author), поскольку они будут иметь значениеNone, например name_first или поле в отношении этой первичной таблицы, такое как поле title в books.

Упрощенный подход к автоматическому применению этих полей может занятьформа следующей функции:

def apply_requested_fields(
    info: graphql.execution.base.ResolveInfo,
    query: sqlalchemy.orm.Query,
    orm_class: Type[OrmBaseMixin]
) -> sqlalchemy.orm.Query:
    """Updates the SQLAlchemy Query object by limiting the loaded fields of the
    table and its relationship to the ones explicitly requested in the GraphQL
    query.

    Note:
        This function is fairly simplistic in that it assumes that (1) the
        SQLAlchemy query only selects a single ORM class/table and that (2)
        relationship fields are only one level deep, i.e., that requestd fields
        are either table fields or fields of the table relationship, e.g., it
        does not support fields of relationship relationships.

    Args:
        info (graphql.execution.base.ResolveInfo): The GraphQL query info passed
            to the resolver function.
        query (sqlalchemy.orm.Query): The SQLAlchemy Query object to be updated.
        orm_class (Type[OrmBaseMixin]): The ORM class of the selected table.

    Returns:
        sqlalchemy.orm.Query: The updated SQLAlchemy Query object.
    """

    # Extract the fields requested in the GraphQL query.
    fields = extract_requested_fields(
        info=info,
        fields=info.field_asts,
        do_convert_to_snake_case=True,
    )

    # We assume that the top level of the `fields` dictionary only contains a
    # single key referring to the GraphQL resource being resolved.
    tl_key = list(fields.keys())[0]
    # We assume that any keys that have a value of `None` (as opposed to
    # dictionaries) are fields of the primary table.
    table_fields = [
        key for key, val in fields[tl_key].items()
        if val is None
    ]

    # We assume that any keys that have a value being a dictionary are
    # relationship attributes on the primary table with the keys in the
    # dictionary being fields on that relationship. Thus we create a list of
    # `[relatioship_name, relationship_fields]` lists to be used in the
    # `joinedload` definitions.
    relationship_fieldsets = [
        [key, val.keys()]
        for key, val in fields[tl_key].items()
        if isinstance(val, dict)
    ]

    # Assemble a list of `joinedload` definitions on the defined relationship
    # attribute name and the requested fields on that relationship.
    options_joinedloads = []
    for relationship_fieldset in relationship_fieldsets:
        relationship = relationship_fieldset[0]
        rel_fields = relationship_fieldset[1]
        options_joinedloads.append(
            sqlalchemy.orm.joinedload(
                getattr(orm_class, relationship)
            ).load_only(*rel_fields)
        )

    # Update the SQLAlchemy query by limiting the loaded fields on the primary
    # table as well as by including the `joinedload` definitions.
    query = query.options(
        sqlalchemy.orm.load_only(*table_fields),
        *options_joinedloads
    )

    return query
...