Как применить несколько фильтров к фрейму данных? - PullRequest
1 голос
/ 20 сентября 2019

У меня есть фрейм данных, который выглядит как

+-------+-------+
| Code1 | Code2 |
+-------+-------+
| A     |     1 |
| B     |     1 |
| A     |     2 |
| B     |     2 |
| C     |     2 |
| D     |     2 |
| D     |     3 |
| F     |     3 |
| G     |     3 |
+-------+-------+

Затем я хочу применить уникальный набор фильтров, например:

  • Сценарий 1 -> фильтр на Code1 IN(A, B)
  • Сценарий 2 -> фильтр для Code1 IN (A, D) и Code2 IN (2,3)
  • Сценарий 3 -> фильтр для Code2 = 2

Результатом применения фильтра должен быть кадр данных, который выглядит следующим образом:

+-------+-------+----------+
| Code1 | Code2 | Scenario |
+-------+-------+----------+
| A     |     1 |        1 |
| B     |     1 |        1 |
| A     |     2 |        1 |
| B     |     2 |        1 |
| A     |     2 |        2 |
| D     |     2 |        2 |
| D     |     3 |        2 |
| A     |     2 |        3 |
| B     |     2 |        3 |
| C     |     2 |        3 |
| D     |     2 |        3 |
+-------+-------+----------+

ВОПРОС : Какой самый эффективный способ сделать это с помощью spark черезпитон?

Я новичок в спарке, поэтому я действительно спрашиваю с концептуального уровня и не нуждаюсь в явном решении.Я стремлюсь достичь как можно большего параллелизма в операции.Мой реальный пример включает использование начального фрейма данных с 38 столбцами размером от 100 МБ до пары ГБ в виде CSV-файла, и я обычно использую не более 100-150 сценариев.

Оригинальный дизайнрешение состояло в том, чтобы последовательно обрабатывать каждый фильтр сценариев и объединять результирующие отфильтрованные кадры данных, но мне кажется, что это сводит на нет весь смысл использования искры.

РЕДАКТИРОВАТЬ : Правда ли?Для каждого сценария я бы отфильтровал, а затем объединил оба преобразования (lazy eval).Будет ли конечный план выполнения достаточно продуманным, чтобы автоматически распараллеливать несколько уникальных фильтров?

Нет ли способа применить фильтры параллельно, например, применить фильтр сценариев 1 одновременно сприменять фильтры 2 и 3?Должны ли мы «взорвать» исходный кадр данных N раз, где N = # фильтров сценариев, добавить столбец Scenario # в новый кадр данных и применить один большой фильтр, который выглядит примерно так:

WHERE (Scenario = 1 AND Code1 IN (A,B)) OR
      (Scenario = 2 AND Code1 IN (A,D) AND Code2 IN (2,3)) OR
      (Scenario = 3 AND Code2 = 2)

И если это окажется самым эффективным способом, не зависит ли это также от того, сколько памяти занимает «взорванный» фрейм данных?Если «взорванный» фрейм данных занимает больше памяти, чем у моего кластера, мне придется обрабатывать столько сценариев, сколько может уместиться в памяти?

Ответы [ 2 ]

0 голосов
/ 23 сентября 2019

В EDIT к моему вопросу я спросил, является ли ленивая оценка ключом к проблеме.Проведя некоторые исследования в пользовательском интерфейсе Spark, я пришел к выводу, что, хотя мое первоначальное решение выглядит как , оно последовательно применяет преобразования (фильтр, затем объединение) для каждого сценария, но фактически применяет все преобразования одновременно, один раз.вызывается действие (например, dataframe.count ()).Снимок экрана здесь представляет временную шкалу события из фазы преобразования задания dataframe.count ().

Задание включает 96 сценариев, каждый с уникальным фильтром в исходном фрейме данных.Вы можете видеть, что на моей локальной машине одновременно выполняется 8 задач, где каждая задача представляет собой фильтр из одного из сценариев.

В заключение, Spark заботится об оптимизации фильтров для параллельной работы после вызова действия в результирующем кадре данных.

0 голосов
/ 20 сентября 2019

вы можете применить все фильтры одновременно:

  data.withColumn("scenario",
    when('code1.isin("A", "B"), 1).otherwise(
      when('code1.isin("A", "D") && 'code2.isin("2","3"), 2).otherwise(
        when('code2==="2",3)
      )
    )
  ).show()

, но у вас есть еще одна проблема, например, значения (A, 2) могут быть во всех ваших сценариях 1,2,3.В этом случае вы можете попробовать что-то вроде этого:

  data.withColumn("s1", when('code1.isin("A", "B"), 1).otherwise(0))
    .withColumn("s2",when('code1.isin("A", "D") && 'code2.isin("2","3"), 1).otherwise(0))
    .withColumn("s3",when('code2==="2",1).otherwise(0))
    .show()

output:

+-----+-----+---+---+---+
|code1|code2| s1| s2| s3|
+-----+-----+---+---+---+
|    A|    1|  1|  0|  0|
|    B|    1|  1|  0|  0|
|    A|    2|  1|  1|  1|
|    B|    2|  1|  0|  1|
|    A|    2|  1|  1|  1|
|    D|    2|  0|  1|  1|
|    D|    3|  0|  1|  0|
|    A|    2|  1|  1|  1|
|    B|    2|  1|  0|  1|
|    C|    2|  0|  0|  1|
|    D|    2|  0|  1|  1|
+-----+-----+---+---+---+
...