Как создать и сохранить таблицу для каждой строки в кадре данных Spark? - PullRequest
0 голосов
/ 07 января 2020

Цель : у меня есть таблица событий со столбцом отметки времени ts. Для каждого события в таблице событий я хочу выполнить поиск в отдельной таблице данных и найти все журналы, которые находятся в пределах времени от этого события. Я хочу сохранить эти журналы в файле, помеченном этим событием.

Что я пробовал до сих пор : у меня есть запрос Spark SQL для получения базового c количества для точек данных рядом с каждым событием.

counts = spark.sql("""
    SELECT e.ts,
           count(d.*)
    FROM events AS e
    LEFT JOIN data AS d ON d.ts <= u.ts + delta AND d.ts => u.ts - delta
""")

Как изменить счет в запросе на таблицы (или некоторую другую коллекцию) журналов, а затем записать эти таблицы в отдельные файлы? В идеале более дешевыми способами, поскольку набор данных очень большой.

Редактировать: Пример ввода

Event Table
+-------+----------+
|     id|      time|
+-------+----------+
|   2468|         0|
|   1234|        10|
+-------+----------+

Data Table
+-------+----------+
|     id|      time|
+-------+----------+
|      a|        -1|
|      b|         2|
|      c|         5|
|      d|         8|
|      e|        10|
|      f|        11|
|      g|        12|
+-------+----------+

Соответствующий требуемый результат : следующие таблицы по одному на каждую строку таблицы событий, записанную в файлы паркета. Предположим, что delta = 5.

Event 2468
+-------+----------+
|     id|      time|
+-------+----------+
|      a|        -1|
|      b|         2|
|      c|         5|
+-------+----------+

Event 1234
+-------+----------+
|     id|      time|
+-------+----------+
|      c|         5|
|      d|         8|
|      e|        10|
|      f|        11|
|      g|        12|
+-------+----------+

1 Ответ

0 голосов
/ 07 января 2020

Вот настройка для примера (я зарегистрировал таблицы, поскольку кажется, что вы уже работаете с таблицами в первую очередь)

>>> events = sc.parallelize([(2468, 0), (1234, 10)]).toDF(["id", "time"])
>>> data = sc.parallelize([("a", -1),("b",  2),("c",  5),("d",  8),("e", 10),("f", 11),("g", 12)]).toDF(["id", "time"])
>>> events.registerTempTable("events")
>>> data.registerTempTable("data")

, поэтому мы хотим объединить эти две таблицы только тогда, когда time столбцы содержат delta друг от друга, это будет abs(data.time - events.time) <= 5, вы можете отформатировать это для разных дельт

>>> events_range = spark.sql("SELECT events.id as event, data.id, data.time FROM events JOIN data ON abs(data.time - events.time) <= 5")
>>> events_range.show()
+-----+---+----+
|event| id|time|
+-----+---+----+
| 2468|  a|  -1|
| 2468|  b|   2|
| 2468|  c|   5|
| 1234|  c|   5|
| 1234|  d|   8|
| 1234|  e|  10|
| 1234|  f|  11|
| 1234|  g|  12|
+-----+---+----+

, и теперь мы хотим получить каждое уникальное событие результата как собственный фрейм данных

>>> eventNames = events.select("id").collect() #get all events
>>> eventNames
[Row(id=2468), Row(id=1234)]
>>> for evt in eventNames:
...    evt_df = events_range.where(f"event = {evt['id']}").drop("event")
...    evt_df.show()
...
+---+----+
| id|time|
+---+----+
|  a|  -1|
|  b|   2|
|  c|   5|
+---+----+

+---+----+
| id|time|
+---+----+
|  c|   5|
|  d|   8|
|  e|  10|
|  f|  11|
|  g|  12|
+---+----+

и там вы сохраните их вместо печати ... Также, если некоторые события не соответствуют критериям, вы можете получить пустые кадры данных здесь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...