Как я могу СОЕДИНИТЬ два массива структур в одной строке? - PullRequest
0 голосов
/ 07 мая 2020

Я работаю с данными «разбивки действий», извлеченными из Facebook Ads Insights API

Facebook не помещает action (количество покупок) и action_value (сумма покупки) в том же столбце, поэтому мне нужно ПРИСОЕДИНЯТЬСЯ к тем, кто на моей стороне, на основе идентификатора действия (id # + тип устройства в моем случае).

Если бы каждое действие было простым его собственная строка, конечно, было бы тривиально присоединиться к ним с помощью SQL. Но в этом случае мне нужно СОЕДИНЯТЬ две структуры в каждой строке. То, что я хочу сделать, составляет LEFT JOIN в двух структурах, сопоставленных по двум столбцам. В идеале я мог бы сделать это только с SQL (не с PySpark / Scala / et c).

До сих пор я пробовал:

  • The Spark SQL inline генератор . Это дает мне каждое действие в отдельной строке, но, поскольку родительская строка в исходном наборе данных не имеет уникального идентификатора, нет возможности ПРИСОЕДИНЯТЬСЯ к этим структурам для каждой строки. Также пробовал использовать inline() в обоих столбцах, но одновременно может использоваться только 1 функция «генератора».
  • Использование Spark SQL arrays_zip функция для их объединения. Но это не работает, потому что порядок не всегда один и тот же, и иногда у них разные ключи.
  • Я подумывал написать функцию map в PySpark. Но кажется, что функции карты идентифицируют столбцы только по индексу, а не по имени, что кажется fr agile, если столбцы должны измениться позже (вероятно, при работе со сторонними API-интерфейсами).
  • Я подумывал написать PySpark UDF , который кажется лучшим вариантом, но требует разрешения, которого у меня нет (SELECT on anonymous function). Если это действительно лучший вариант, я попытаюсь использовать sh для этого разрешения.

Чтобы лучше проиллюстрировать: каждая строка в моем наборе данных имеет столбцы actions и action_values с данными вот так.

actions = [
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.123",
    "value": "1"
  },
  {
    "action_device": "desktop", /* Same conversion ID; different device. */
    "action_type": "offsite_conversion.custom.321",
    "value": "1"
  },
  {
    "action_device": "iphone", /* Same conversion ID; different device. */
    "action_type": "offsite_conversion.custom.321",
    "value": "2"
  }
  {
    "action_device": "iphone", /* has "actions" but not "actions_values" */
    "action_type": "offsite_conversion.custom.789",
    "value": "1"
  },
]
action_values = [
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.123",
    "value": "49.99"
  },
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.321",
    "value": "19.99"
  },
  {
    "action_device": "iphone",
    "action_type": "offsite_conversion.custom.321",
    "value": "99.99"
  }
]

Я бы хотел, чтобы в каждой строке были обе точки данных в одной структуре, например:

my_desired_result = [
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.123",
    "count": "1", /* This comes from the "action" struct */
    "value": "49.99" /* This comes from the "action_values" struct */
  },
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.321",
    "count": "1",
    "value": "19.99"
  },
  {
    "action_device": "iphone",
    "action_type": "offsite_conversion.custom.321",
    "count": "2",
    "value": "99.99"
  },
  {
    "action_device": "iphone",
    "action_type": "offsite_conversion.custom.789",
    "count": "1",
    "value": null /* NULL because there is no value for conversion#789 AND iphone */
  }
]

1 Ответ

1 голос
/ 07 мая 2020

IIU C, вы можете попробовать преобразовать , а затем использовать фильтр , чтобы найти первый совпавший элемент из action_values, сопоставив action_device и action_type:

df.printSchema()
root
 |-- action_values: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- action_device: string (nullable = true)
 |    |    |-- action_type: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- action_device: string (nullable = true)
 |    |    |-- action_type: string (nullable = true)
 |    |    |-- value: string (nullable = true)

df.createOrReplaceTempView("df_table")

spark.sql("""

  SELECT       
    transform(actions, x -> named_struct(
      'action_device', x.action_device,
      'action_type', x.action_type,
      'count', x.value,
      'value', filter(action_values, y -> y.action_device = x.action_device AND y.action_type = x.action_type)[0].value
    )) as result
  FROM df_table

""").show(truncate=False)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|result                                                                                                                                                                                                  |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[[desktop, offsite_conversion.custom.123, 1, 49.99], [desktop, offsite_conversion.custom.321, 1, 19.99], [iphone, offsite_conversion.custom.321, 2, 99.99], [iphone, offsite_conversion.custom.789, 1,]]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

UPDATE: в случае FULL JOIN вы можете попробовать следующее SQL:

spark.sql("""

  SELECT

  concat(
    /* actions left join action_values with potentially multiple matched values */
    flatten(
      transform(actions, x ->
        transform(
          filter(action_values, y -> y.action_device = x.action_device AND y.action_type = x.action_type),
          z -> named_struct(
            'action_device', x.action_device,
            'action_type', x.action_type,
            'count', x.value,
            'value', z.value
          )
        )
      )
    ),
    /* action_values missing from actions */
    transform(
      filter(action_values, x -> !exists(actions, y -> x.action_device = y.action_device AND x.action_type = y.action_type)),
      z -> named_struct(
        'action_device', z.action_device,
        'action_type', z.action_type,
        'count', NULL,
        'value', z.value
      )
    )
  ) as result

  FROM df_table

""").show(truncate=False)
...