Древовидные / вложенные структуры в Spark из реляционной модели данных - PullRequest
2 голосов
/ 17 марта 2019

Если я правильно понимаю, я мог бы рассматривать набор данных spark как список объектов типа T. Как можно объединить два набора данных так, чтобы родитель содержал список детей? Но и у ребенка будет список своих детей ...

Одним из подходов к этому было бы сделать groupBy дочерних элементов на основе ключа, но collect_list возвращает только один столбец, и я полагаю, что есть лучший способ сделать это.

Требуемый результат - это в основном набор данных (список объектов клиента?) Типа Customer, но с дополнениями:

  • У каждого клиента будет список счетов.
  • Каждый счет-фактура будет иметь свои собственные атрибуты, а также список предметов внутри ...
  • ... и это может продолжаться (дерево)

Конечный результат будет примерно таким:

case class Customer(customer_id: Int, name: String, address: String, age: Int, invoices: List[Invoices])
case class Invoice(invoice_id: Int, customer_id: Int, invoice_num:String, date: Int, invoice_type: String, items: List[Items])

И для этого результата мне понадобятся следующие данные:

case class Customer(customer_id: Int, name: String, address: String, age: Int)
case class Invoice(invoice_id: Int, customer_id: Int, invoice_num:String, date: Int, invoice_type: String)
case class InvoiceItem(item_id: Int, invoice_id: Int, num_of_items: Int, price: Double, total: Double)

    val customers_df = Seq(
       (11,"customer1", "address1", 10, "F")
      ,(12,"customer2", "address2", 20, "M")
      ,(13,"customer3", "address3", 30, "F")
    ).toDF("customer_id", "name", "address", "age", "sex")
    val customers_ds = customers_df.as[Customer].as("c")

    customers_ds.show

    val invoices_df = Seq(
       (21,11, "10101/1", 20181105, "manual")
      ,(22,11, "10101/2", 20181105, "manual")
      ,(23,11, "10101/3", 20181105, "manual")
      ,(24,12, "10101/4", 20181105, "generated")
      ,(25,12, "10101/5", 20181105, "pos")
    ).toDF("invoice_id", "customer_id", "invoice_num", "date", "invoice_type")
    val invoices_ds = invoices_df.as[Invoice].as("i")

    invoices_ds.show

    val invoice_items_df = Seq(
       (31, 21, 5, 10.0, 50.0)
      ,(32, 21, 3, 15.0, 45.0)
      ,(33, 22, 6, 11.0, 66.0)
      ,(34, 22, 7, 2.0, 14.0)
      ,(35, 23, 1, 100.0, 100.0)
      ,(36, 24, 4, 4.0, 16.0)
    ).toDF("item_id", "invoice_id", "num_of_items", "price", "total")
    val invoice_items_ds = invoice_items_df.as[InvoiceItem].as("ii")

    invoice_items_ds.show

В таблицах это выглядит так:

+-----------+---------+--------+---+---+
|customer_id|     name| address|age|sex|
+-----------+---------+--------+---+---+
|         11|customer1|address1| 10|  F|
|         12|customer2|address2| 20|  M|
|         13|customer3|address3| 30|  F|
+-----------+---------+--------+---+---+

+----------+-----------+-----------+--------+------------+
|invoice_id|customer_id|invoice_num|    date|invoice_type|
+----------+-----------+-----------+--------+------------+
|        21|         11|    10101/1|20181105|      manual|
|        22|         11|    10101/2|20181105|      manual|
|        23|         11|    10101/3|20181105|      manual|
|        24|         12|    10101/4|20181105|   generated|
|        25|         12|    10101/5|20181105|         pos|
+----------+-----------+-----------+--------+------------+

+-------+----------+------------+-----+-----+
|item_id|invoice_id|num_of_items|price|total|
+-------+----------+------------+-----+-----+
|     31|        21|           5| 10.0| 50.0|
|     32|        21|           3| 15.0| 45.0|
|     33|        22|           6| 11.0| 66.0|
|     34|        22|           7|  2.0| 14.0|
|     35|        23|           1|100.0|100.0|
|     36|        24|           4|  4.0| 16.0|
+-------+----------+------------+-----+-----+

Ответы [ 2 ]

1 голос
/ 17 марта 2019

Кажется, вы пытаетесь прочитать нормализованные данные в дереве объектов Scala. Конечно, вы можете сделать это с помощью Spark, но Spark, возможно, не является оптимальным инструментом для этого. Если данные достаточно малы, чтобы поместиться в памяти, что, как я полагаю, соответствует вашему вопросу, библиотеки объектно-реляционного отображения (ORM) могут лучше подходить для этой работы.

Если вы все еще хотите использовать Spark, вы на правильном пути с groupBy и collect_list. Чего вам не хватает, так это функции struct().

case class Customer(id: Int)
case class Invoice(id: Int, customer_id: Int)

val customers = spark.createDataset(Seq(Customer(1))).as("customers")
val invoices = spark.createDataset(Seq(Invoice(1, 1), Invoice(2, 1)))

case class CombinedCustomer(id: Int, invoices: Option[Seq[Invoice]])

customers
  .join(
    invoices
      .groupBy('customer_id)
      .agg(collect_list(struct('*)).as("invoices"))
      .withColumnRenamed("customer_id", "id"), 
    Seq("id"), "left_outer")
  .as[CombinedCustomer]
  .show

struct('*) создает столбец StructType из всей строки. Вы также можете выбрать любые столбцы, например, struct('x.as("colA"), 'colB).

Это производит

+---+----------------+
| id|        invoices|
+---+----------------+
|  1|[[1, 1], [2, 1]]|
+---+----------------+

Теперь, когда ожидается, что данные клиента не помещаются в памяти, т. Е. Использование простого collect не вариант, существует ряд различных стратегий, которые вы можете использовать.

Самый простой способ, который вы должны рассмотреть вместо сбора у водителя, требует приемлемой независимой обработки данных каждого клиента. В этом случае попробуйте использовать map и распространить логику обработки для каждого клиента среди рабочих.

Если самостоятельная обработка клиентом неприемлема, общая стратегия выглядит следующим образом:

  1. Агрегируйте данные в структурированные строки по мере необходимости, используя вышеуказанный подход.

  2. Перераспределите данные, чтобы все необходимое для обработки было в одном разделе.

  3. (опционально) sortWithinPartitions, чтобы гарантировать, что данные в разделе упорядочены так, как вам нужно.

  4. Использование mapPartitions.

0 голосов
/ 17 марта 2019

Вы можете использовать Spark-SQL и иметь один набор данных для каждого клиента, счетов и товаров.Затем вы можете просто использовать соединения и агрегатные функции между этими наборами данных, чтобы получить желаемый результат.

Spark SQL имеет очень незначительную разницу в производительности между стилем SQL и программным способом.

...