Чтение правил из файла и применение этих правил к строкам кадра данных pyspark. - PullRequest
0 голосов
/ 25 сентября 2019

У меня есть книга правил csv, данные выглядят так:

operator|lastname|operator|firstname|val

equals    | ABC    |contains| XYZ     | 2

equals  | QWE    |contains| rty     | 3

, поэтому, если lastname равно ABC и firstname как XYZ, тогда val будет равно 2, вот так.этот файл может быть изменен или изменен, поэтому условия будут динамическими.В будущем могут быть добавлены даже строки.

Теперь мой фрейм данных pyspark:

lastname| firstname| service

ABC     | XYZNMO   | something

QUE     | rtysdf   | something

Мне нужно применить правило из этого CSV-файла к этому фрейму данных и добавить столбец val.Поэтому мой желаемый выходной фрейм данных будет выглядеть так:

lastname| firstname| service  | val

ABC     | XYZNMO   | something| 2

QUE     | rtysdf   | something| 3

Помните, что книга правил динамическая, правила можно добавлять, удалять или изменять в любое время.Даже операторы в книге правил могут быть изменены.Заранее спасибо

Ответы [ 2 ]

0 голосов
/ 25 сентября 2019

Вы можете добиться этого, используя приведенный ниже процесс, который я считаю

  1. Создание временных таблиц поверх ваших фреймов данных
  2. написать SQL с помощью API Spark SQL и сохранить его в текстовом файле.как отдельная запись
  3. прочитайте инструкцию sql, используя sqlStatement = spark.sparkContext.textFile ("sqllocation"). first (). toString (), подготовленный на шаге №2, и запустите его с помощью spark.sql (sqlStatement))

таким образом вы обновляете инструкцию sql, которая находится в текстовом файле, как при необходимости

0 голосов
/ 25 сентября 2019

Используйте синтаксический анализатор csv для анализа файлов csv и получения данных правил.Затем программно создайте оператор SQL, используя данные правила - что-то вроде:

query = "SELECT
        CASE WHEN lastname = 'ABC' and firstname LIKE 'XYZ%' THEN 2
             ELSE
                 CASE WHEN lastname = 'QUE' and firstname LIKE 'rty% THEN 3
             END
        END AS val
    FROM table"

, затем выполните:

df.createOrReplaceTempView("table")
result_df = spark.sql(query) # above dynamic query
...