Как настроить фильтр запросов Dynami c в pyspark? - PullRequest
1 голос
/ 24 апреля 2020

Мне нужно подготовить решение для создания параметризованного решения для запуска различных фильтров. Например: в настоящее время я использую запрос ниже, чтобы применить фильтр к кадру данных, но

input_df.filter("not is_deleted and status == 'Active' and brand in ('abc', 'def')")

Необходимо изменить этот подход для построения этого запроса из конфигурации:

  filters_to_apply = {'table_name' : 'input_df',
    'rule_1' : 'not is_deleted',
    'rule_2' : 'status == "Active"'
    'rule_3' : 'brand in ("abc", "def")'
                              }

filters_to_apply['table_name'].filter(' and '.join([(filters_to_apply[key]) for key in filters_to_apply.keys() if 'rule' in key]))

Я получаю ошибку as: AttributeError: объект 'str' не имеет атрибута 'filter'

Не могли бы вы посоветовать

Ответы [ 2 ]

1 голос
/ 24 апреля 2020

Прежде всего, важно понять, почему вы получаете эту ошибку.

Способ определения словаря filters_to_apply, это сопоставление строковых ключей со значениями string . Наличие определенной переменной с именем input_df не имеет ничего общего с строкой с символами "input_df". Для Python это две совершенно разные вещи.

Насколько я знаю, у вас есть две альтернативы:

  1. Вы можете передать переменную input_df (при условии, что это определяется где-то) вокруг и применяет фильтры непосредственно к нему (у вас может быть функция, в которой один из ее аргументов является DataFrame, поэтому он может обрабатывать разные DataFrames).
  2. С определенной переменной input_df, вы можете зарегистрировать временное представление и получить тот же самый DataFrame позже.

Мое предложение

Определить функцию, аргументы которой состоят из DataFrame и последовательность правил.

Пример определения функции:

from pyspark.sql import DataFrame
from typing import Iterable

def my_filter(df: DataFrame, conditions: Iterable[str]) -> DataFrame:
    return df.filter(" and ".join(conditions))

Пример использования:

df = sparksession.createDataFrame(
    [(1, True, "Active"), (2, False, "Active"), (3, True, "Disabled")],
    ["id", "bool", "status"]
)
df.show()

my_filter(df, ["not bool", "status = 'Active'"]).show()

Результаты:

+---+-----+--------+
| id| bool|  status|
+---+-----+--------+
|  1| true|  Active|
|  2|false|  Active|
|  3| true|Disabled|
+---+-----+--------+

+---+-----+------+
| id| bool|status|
+---+-----+------+
|  2|false|Active|
+---+-----+------+
0 голосов
/ 24 апреля 2020

Во-первых, конфигурация должна выглядеть так, как показано ниже, то есть input_df не должен быть строкой

  filters_to_apply = {'table_name' : input_df,
    'rule_1' : 'not is_deleted',
    'rule_2' : 'status == "Active"'
    'rule_3' : 'brand in ("abc", "def")'}

Но если вы зарегистрировали input_df в качестве временной таблицы, то, безусловно, вы можете передать ее в виде строки. Я имею в виду, как показано ниже:

input_df.registerTempTable("input_df")

Если вы зарегистрировали ее как временную таблицу, один из способов сделать это может быть следующим:

def prepare_data(config):
   df = spark.table(config['table_name'])
   for key in config.keys(): 
      if key.starts_with("rule_"):
         df = df.filter(config[key])
   return df

Кроме того, если вы хотите Узнайте больше о том, как автоматизировать ваши коды, перейдите по этой ссылке - https://youtu.be/KBRPUDovzmc

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...